From c96617a03911a115939ffd328b04b86f92a764d8 Mon Sep 17 00:00:00 2001 From: Giacomo Bertolazzi <31776951+Berack96@users.noreply.github.com> Date: Mon, 13 Oct 2025 10:49:46 +0200 Subject: [PATCH] Telegram bot support (#23) * Aggiungi supporto per il bot Telegram: aggiorna .env.example, pyproject.toml e uv.lock * demo per bot Telegram con gestione comandi e inline keyboard * Implementazione del bot Telegram con gestione dei comandi e stati di conversazione iniziali * Aggiorna la gestione delle configurazioni nel bot Telegram: modifica gli stati della conversazione e aggiungi il supporto per la gestione dei messaggi. * fix static models & readme * aggiunto il supporto per la query dell'utente e modificata la visualizzazione dei messaggi di stato. * Aggiunto il supporto per la gestione del bot Telegram e aggiornata la configurazione del pipeline * Aggiornato .gitignore per includere la cartella .gradio e rimosso chroma_db. Aggiunto il supporto per la generazione di report in PDF utilizzando markdown-pdf nel bot Telegram. * Refactor pipeline and chat manager for improved structure and functionality * Better logging * Aggiornato il comportamento del logging per i logger di agno. Aggiunto il supporto per l'opzione check_for_async nella configurazione di RedditWrapper. * Rimosso codice commentato e import non utilizzati nella classe Pipeline per semplificare la struttura * Aggiornata la sezione "Applicazione" nel README & fix main * Telegram instance instead of static * Fix logging to use labels for team model, leader model, and strategy * Rinomina il lock da _lock a __lock per garantire l'incapsulamento nella classe AppConfig * Rinomina i logger per una migliore identificazione e gestisce le eccezioni nel bot di Telegram * Aggiorna i messaggi di errore nel gestore Telegram per una migliore chiarezza e modifica il commento nel file di configurazione per riflettere lo stato del modello. * Aggiungi un messaggio di attesa durante la generazione del report nel bot di Telegram --- .env.example | 11 ++ .gitignore | 8 +- README.md | 21 ++- configs.yaml | 4 +- demos/telegram_bot_demo.py | 59 +++++++ pyproject.toml | 4 + src/app/__main__.py | 89 ++-------- src/app/agents/pipeline.py | 81 ++++----- src/app/api/social/reddit.py | 1 + src/app/api/wrapper_handler.py | 13 +- src/app/configs.py | 11 ++ src/app/interface/__init__.py | 3 +- src/app/interface/chat.py | 71 +++++++- src/app/interface/telegram_app.py | 264 ++++++++++++++++++++++++++++++ uv.lock | 50 +++++- 15 files changed, 541 insertions(+), 149 deletions(-) create mode 100644 demos/telegram_bot_demo.py create mode 100644 src/app/interface/telegram_app.py diff --git a/.env.example b/.env.example index fd9a427..ce6f756 100644 --- a/.env.example +++ b/.env.example @@ -5,6 +5,7 @@ # https://makersuite.google.com/app/apikey GOOGLE_API_KEY= + ############################################################################### # Configurazioni per gli agenti di mercato ############################################################################### @@ -21,6 +22,7 @@ CRYPTOCOMPARE_API_KEY= BINANCE_API_KEY= BINANCE_API_SECRET= + ############################################################################### # Configurazioni per gli agenti di notizie ############################################################################### @@ -31,6 +33,7 @@ NEWS_API_KEY= # https://cryptopanic.com/developers/api/ CRYPTOPANIC_API_KEY= + ############################################################################### # Configurazioni per API di social media ############################################################################### @@ -38,3 +41,11 @@ CRYPTOPANIC_API_KEY= # https://www.reddit.com/prefs/apps REDDIT_API_CLIENT_ID= REDDIT_API_CLIENT_SECRET= + + +############################################################################### +# Configurazioni per API di messaggistica +############################################################################### + +# https://core.telegram.org/bots/features#creating-a-new-bot +TELEGRAM_BOT_TOKEN= diff --git a/.gitignore b/.gitignore index b532676..609ad99 100644 --- a/.gitignore +++ b/.gitignore @@ -173,8 +173,8 @@ cython_debug/ # PyPI configuration file .pypirc -# chroma db -./chroma_db/ - # VS Code -.vscode/ \ No newline at end of file +.vscode/ + +# Gradio +.gradio/ diff --git a/README.md b/README.md index 662a9b3..1c5f023 100644 --- a/README.md +++ b/README.md @@ -91,13 +91,22 @@ uv run src/app # **Applicazione** -***L'applicazione è attualmente in fase di sviluppo.*** +> [!CAUTION]\ +> ***L'applicazione è attualmente in fase di sviluppo.*** -Usando la libreria ``gradio`` è stata creata un'interfaccia web semplice per interagire con l'agente principale. Gli agenti secondari si trovano nella cartella `src/app/agents` e sono: -- **Market Agent**: Agente unificato che supporta multiple fonti di dati con auto-retry e gestione degli errori. -- **News Agent**: Recupera le notizie finanziarie più recenti sul mercato delle criptovalute. -- **Social Agent**: Analizza i sentimenti sui social media riguardo alle criptovalute. -- **Predictor Agent**: Utilizza i dati raccolti dagli altri agenti per fare previsioni. +L'applicazione viene fatta partire tramite il file [src/app/\_\_main\_\_.py](src/app/__main__.py) che inizializza l'agente principale e gli agenti secondari. + +In esso viene creato il server `gradio` per l'interfaccia web e viene anche inizializzato il bot di Telegram (se è stata inserita la chiave nel file `.env` ottenuta da [BotFather](https://core.telegram.org/bots/features#creating-a-new-bot)). + +L'interazione è guidata, sia tramite l'interfaccia web che tramite il bot di Telegram; l'utente può scegliere prima di tutto delle opzioni generali (come il modello e la strategia di investimento), dopodiché può inviare un messaggio di testo libero per chiedere consigli o informazioni specifiche. Per esempio: "Qual è l'andamento attuale di Bitcoin?" o "Consigliami quali sono le migliori criptovalute in cui investire questo mese". + +L'applicazione, una volta ricevuta la richiesta, la passa al [Team](src/app/agents/team.py) di agenti che si occupano di raccogliere i dati necessari per rispondere in modo completo e ragionato. + +Gli agenti coinvolti nel Team sono: +- **Leader**: Coordina gli altri agenti e fornisce la risposta finale all'utente. +- **Market Agent**: Recupera i dati di mercato attuali delle criptovalute da Binance e Yahoo Finance. +- **News Agent**: Recupera le ultime notizie sul mercato delle criptovalute da NewsAPI e GNews. +- **Social Agent**: Recupera i dati dai social media (Reddit) per analizzare il sentiment del mercato. ## Struttura del codice del Progetto diff --git a/configs.yaml b/configs.yaml index e2f444d..5d70b13 100644 --- a/configs.yaml +++ b/configs.yaml @@ -17,8 +17,8 @@ models: gemini: - name: gemini-2.0-flash label: Gemini - - name: gemini-2.0-pro - label: Gemini Pro + # - name: gemini-2.0-pro # TODO Non funziona, ha un nome diverso + # label: Gemini Pro ollama: - name: gpt-oss:latest label: Ollama GPT diff --git a/demos/telegram_bot_demo.py b/demos/telegram_bot_demo.py new file mode 100644 index 0000000..2a2b7d9 --- /dev/null +++ b/demos/telegram_bot_demo.py @@ -0,0 +1,59 @@ +import os +from dotenv import load_dotenv +from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update +from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes + +# Esempio di funzione per gestire il comando /start +async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if not update.message: return + await update.message.reply_text('Ciao! Inviami un messaggio e ti risponderò!') + + +# Esempio di funzione per fare echo del messaggio ricevuto +async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE): + message = update.message + if not message: return + + print(f"Ricevuto messaggio: {message.text} da chat id: {message.chat.id}") + await message.reply_text(text=f"Hai detto: {message.text}") + + +# Esempio di funzione per far partire una inline keyboard (comando /keyboard) +async def inline_keyboard(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if not update.message: return + keyboard = [ + [ + InlineKeyboardButton("Option 1", callback_data='1'), + InlineKeyboardButton("Option 2", callback_data='2'), + ] + ] + reply_markup = InlineKeyboardMarkup(keyboard) + await update.message.reply_text('Please choose:', reply_markup=reply_markup) + + +async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + query = update.callback_query + if not query: return + await query.answer() + await query.edit_message_text(text=f"Selected option: {query.data}") + + + + + +def main(): + print("Bot in ascolto...") + + load_dotenv() + token = os.getenv("TELEGRAM_BOT_TOKEN", '') + app = Application.builder().token(token).build() + + app.add_handler(CommandHandler("start", start)) + app.add_handler(CommandHandler("keyboard", inline_keyboard)) + app.add_handler(MessageHandler(filters=filters.TEXT, callback=echo)) + app.add_handler(CallbackQueryHandler(button_handler)) + + app.run_polling(allowed_updates=Update.ALL_TYPES) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 97eb413..127d77a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,10 @@ dependencies = [ # API di social media "praw", # Reddit + + # Per telegram bot + "python-telegram-bot", # Interfaccia Telegram Bot + "markdown-pdf", # Per convertire markdown in pdf ] [tool.pytest.ini_options] diff --git a/src/app/__main__.py b/src/app/__main__.py index 4347ecf..dca46fb 100644 --- a/src/app/__main__.py +++ b/src/app/__main__.py @@ -1,86 +1,31 @@ import asyncio -import gradio as gr +import logging from dotenv import load_dotenv -from agno.utils.log import log_info #type: ignore from app.configs import AppConfig -from app.interface import ChatManager +from app.interface import * from app.agents import Pipeline if __name__ == "__main__": - # Inizializzazioni load_dotenv() configs = AppConfig.load() pipeline = Pipeline(configs) - chat = ChatManager() - - ######################################## - # Funzioni Gradio - ######################################## - def respond(message: str, history: list[dict[str, str]]) -> tuple[list[dict[str, str]], list[dict[str, str]], str]: - chat.send_message(message) - response = pipeline.interact(message) - chat.receive_message(response) - history.append({"role": "user", "content": message}) - history.append({"role": "assistant", "content": response}) - return history, history, "" - - def save_current_chat() -> str: - chat.save_chat("chat.json") - return "💾 Chat salvata in chat.json" - - def load_previous_chat() -> tuple[list[dict[str, str]], list[dict[str, str]]]: - chat.load_chat("chat.json") - history: list[dict[str, str]] = [] - for m in chat.get_history(): - history.append({"role": m["role"], "content": m["content"]}) - return history, history - - def reset_chat() -> tuple[list[dict[str, str]], list[dict[str, str]]]: - chat.reset_chat() - return [], [] - - ######################################## - # Interfaccia Gradio - ######################################## - with gr.Blocks() as demo: - gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)") - - # Dropdown provider e stile - with gr.Row(): - provider = gr.Dropdown( - choices=pipeline.list_providers(), - type="index", - label="Modello da usare" - ) - provider.change(fn=pipeline.choose_predictor, inputs=provider, outputs=None) - - style = gr.Dropdown( - choices=pipeline.list_styles(), - type="index", - label="Stile di investimento" - ) - style.change(fn=pipeline.choose_strategy, inputs=style, outputs=None) - - chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages") - msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?") - - with gr.Row(): - clear_btn = gr.Button("🗑️ Reset Chat") - save_btn = gr.Button("💾 Salva Chat") - load_btn = gr.Button("📂 Carica Chat") - - # Eventi e interazioni - msg.submit(respond, inputs=[msg, chatbot], outputs=[chatbot, chatbot, msg]) - clear_btn.click(reset_chat, inputs=None, outputs=[chatbot, chatbot]) - save_btn.click(save_current_chat, inputs=None, outputs=None) - load_btn.click(load_previous_chat, inputs=None, outputs=[chatbot, chatbot]) + chat = ChatManager(pipeline) + gradio = chat.gradio_build_interface() + _app, local_url, share_url = gradio.launch(server_name="0.0.0.0", server_port=configs.port, quiet=True, prevent_thread_lock=True, share=configs.gradio_share) + logging.info(f"UPO AppAI Chat is running on {share_url or local_url}") try: - _app, local, shared = demo.launch(server_name="0.0.0.0", server_port=configs.port, quiet=True, prevent_thread_lock=True, share=configs.gradio_share) - log_info(f"Starting UPO AppAI Chat on {shared or local}") - asyncio.get_event_loop().run_forever() - except KeyboardInterrupt: - demo.close() + telegram = TelegramApp(pipeline) + telegram.add_miniapp_url(share_url) + telegram.run() + except AssertionError as e: + try: + logging.warning(f"Telegram bot could not be started: {e}") + asyncio.get_event_loop().run_forever() + except KeyboardInterrupt: + logging.info("Shutting down due to KeyboardInterrupt") + finally: + gradio.close() diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index 3522432..3338cb8 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -1,10 +1,10 @@ -from agno.run.agent import RunOutput +import logging from app.agents.team import create_team_with -from app.agents.predictor import PredictorInput, PredictorOutput from app.agents.prompts import * -from app.api.core.markets import ProductInfo from app.configs import AppConfig +logging = logging.getLogger("pipeline") + class Pipeline: """ @@ -17,27 +17,30 @@ class Pipeline: self.configs = configs # Stato iniziale - self.choose_strategy(0) - self.choose_predictor(0) + self.leader_model = self.configs.get_model_by_name(self.configs.agents.team_leader_model) + self.team_model = self.configs.get_model_by_name(self.configs.agents.team_model) + self.strategy = self.configs.get_strategy_by_name(self.configs.agents.strategy) # ====================== # Dropdown handlers # ====================== - def choose_predictor(self, index: int): + def choose_leader(self, index: int): """ - Sceglie il modello LLM da usare per il Predictor. + Sceglie il modello LLM da usare per il Team. """ - model = self.configs.models.all_models[index] - self.predictor = model.get_agent( - PREDICTOR_INSTRUCTIONS, - output_schema=PredictorOutput, - ) + self.leader_model = self.configs.models.all_models[index] + + def choose_team(self, index: int): + """ + Sceglie il modello LLM da usare per il Team. + """ + self.team_model = self.configs.models.all_models[index] def choose_strategy(self, index: int): """ Sceglie la strategia da usare per il Predictor. """ - self.strat = self.configs.strategies[index].description + self.strategy = self.configs.strategies[index] # ====================== # Helpers @@ -64,46 +67,18 @@ class Pipeline: 3. Invoca Predictor 4. Restituisce la strategia finale """ - # Step 1: raccolta output dai membri del Team - team_model = self.configs.get_model_by_name(self.configs.agents.team_model) - leader_model = self.configs.get_model_by_name(self.configs.agents.team_leader_model) + # Step 1: Creazione Team + team = create_team_with(self.configs, self.team_model, self.leader_model) - team = create_team_with(self.configs, team_model, leader_model) + # Step 2: raccolta output dai membri del Team + logging.info(f"Pipeline received query: {query}") + # TODO migliorare prompt (?) + query = f"The user query is: {query}\n\n They requested a {self.strategy.label} investment strategy." team_outputs = team.run(query) # type: ignore - # Step 2: aggregazione output strutturati - all_products: list[ProductInfo] = [] - sentiments: list[str] = [] - - for agent_output in team_outputs.member_responses: - if isinstance(agent_output, RunOutput) and agent_output.metadata is not None: - keys = agent_output.metadata.keys() - if "products" in keys: - all_products.extend(agent_output.metadata["products"]) - if "sentiment_news" in keys: - sentiments.append(agent_output.metadata["sentiment_news"]) - if "sentiment_social" in keys: - sentiments.append(agent_output.metadata["sentiment_social"]) - - aggregated_sentiment = "\n".join(sentiments) - - # Step 3: invocazione Predictor - predictor_input = PredictorInput( - data=all_products, - style=self.strat, - sentiment=aggregated_sentiment - ) - - result = self.predictor.run(predictor_input) # type: ignore - if not isinstance(result.content, PredictorOutput): - return "❌ Errore: il modello non ha restituito un output valido." - prediction: PredictorOutput = result.content - - # Step 4: restituzione strategia finale - portfolio_lines = "\n".join( - [f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio] - ) - return ( - f"📊 Strategia ({self.strat}): {prediction.strategy}\n\n" - f"💼 Portafoglio consigliato:\n{portfolio_lines}" - ) + # Step 3: recupero ouput + if not isinstance(team_outputs.content, str): + logging.error(f"Team output is not a string: {team_outputs.content}") + raise ValueError("Team output is not a string") + logging.info(f"Team finished") + return team_outputs.content diff --git a/src/app/api/social/reddit.py b/src/app/api/social/reddit.py index ca06211..bda7687 100644 --- a/src/app/api/social/reddit.py +++ b/src/app/api/social/reddit.py @@ -59,6 +59,7 @@ class RedditWrapper(SocialWrapper): client_id=client_id, client_secret=client_secret, user_agent="upo-appAI", + check_for_async=False, ) self.subreddits = self.tool.subreddit("+".join(SUBREDDITS)) diff --git a/src/app/api/wrapper_handler.py b/src/app/api/wrapper_handler.py index d28bd62..cf6ce74 100644 --- a/src/app/api/wrapper_handler.py +++ b/src/app/api/wrapper_handler.py @@ -1,9 +1,10 @@ import inspect +import logging import time import traceback from typing import Any, Callable, Generic, TypeVar -from agno.utils.log import log_info, log_warning #type: ignore +logging = logging.getLogger("wrapper_handler") WrapperType = TypeVar("WrapperType") WrapperClassType = TypeVar("WrapperClassType") OutputType = TypeVar("OutputType") @@ -86,7 +87,7 @@ class WrapperHandler(Generic[WrapperType]): Exception: If all wrappers fail after retries. """ - log_info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}") + logging.info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}") results: dict[str, OutputType] = {} starting_index = self.index @@ -96,18 +97,18 @@ class WrapperHandler(Generic[WrapperType]): wrapper_name = wrapper.__class__.__name__ if not try_all: - log_info(f"try_call {wrapper_name}") + logging.info(f"try_call {wrapper_name}") for try_count in range(1, self.retry_per_wrapper + 1): try: result = func(wrapper) - log_info(f"{wrapper_name} succeeded") + logging.info(f"{wrapper_name} succeeded") results[wrapper_name] = result break except Exception as e: error = WrapperHandler.__concise_error(e) - log_warning(f"{wrapper_name} failed {try_count}/{self.retry_per_wrapper}: {error}") + logging.warning(f"{wrapper_name} failed {try_count}/{self.retry_per_wrapper}: {error}") time.sleep(self.retry_delay) if not try_all and results: @@ -153,6 +154,6 @@ class WrapperHandler(Generic[WrapperType]): wrapper = wrapper_class(**(kwargs or {})) result.append(wrapper) except Exception as e: - log_warning(f"{wrapper_class} cannot be initialized: {e}") + logging.warning(f"'{wrapper_class.__name__}' cannot be initialized: {e}") return WrapperHandler(result, try_per_wrapper, retry_delay) \ No newline at end of file diff --git a/src/app/configs.py b/src/app/configs.py index 59dad48..29c2178 100644 --- a/src/app/configs.py +++ b/src/app/configs.py @@ -145,6 +145,17 @@ class AppConfig(BaseModel): return strat raise ValueError(f"Strategy with name '{name}' not found.") + def get_defaults(self) -> tuple[AppModel, AppModel, Strategy]: + """ + Retrieve the default team model, leader model, and strategy. + Returns: + A tuple containing the default team model (AppModel), leader model (AppModel), and strategy (Strategy). + """ + team_model = self.get_model_by_name(self.agents.team_model) + leader_model = self.get_model_by_name(self.agents.team_leader_model) + strategy = self.get_strategy_by_name(self.agents.strategy) + return team_model, leader_model, strategy + def set_logging_level(self) -> None: """ Set the logging level based on the configuration. diff --git a/src/app/interface/__init__.py b/src/app/interface/__init__.py index dc925f8..186558a 100644 --- a/src/app/interface/__init__.py +++ b/src/app/interface/__init__.py @@ -1,3 +1,4 @@ from app.interface.chat import ChatManager +from app.interface.telegram_app import TelegramApp -__all__ = ["ChatManager"] +__all__ = ["ChatManager", "TelegramApp"] diff --git a/src/app/interface/chat.py b/src/app/interface/chat.py index d51819d..aaba2af 100644 --- a/src/app/interface/chat.py +++ b/src/app/interface/chat.py @@ -1,5 +1,8 @@ -import json import os +import json +import gradio as gr +from app.agents.pipeline import Pipeline + class ChatManager: """ @@ -9,8 +12,9 @@ class ChatManager: - salva e ricarica le chat """ - def __init__(self): + def __init__(self, pipeline: Pipeline): self.history: list[dict[str, str]] = [] # [{"role": "user"/"assistant", "content": "..."}] + self.pipeline = pipeline def send_message(self, message: str) -> None: """ @@ -56,3 +60,66 @@ class ChatManager: Restituisce lo storico completo della chat. """ return self.history + + + ######################################## + # Funzioni Gradio + ######################################## + def gradio_respond(self, message: str, history: list[dict[str, str]]) -> tuple[list[dict[str, str]], list[dict[str, str]], str]: + self.send_message(message) + response = self.pipeline.interact(message) + self.receive_message(response) + history.append({"role": "user", "content": message}) + history.append({"role": "assistant", "content": response}) + return history, history, "" + + def gradio_save(self) -> str: + self.save_chat("chat.json") + return "💾 Chat salvata in chat.json" + + def gradio_load(self) -> tuple[list[dict[str, str]], list[dict[str, str]]]: + self.load_chat("chat.json") + history: list[dict[str, str]] = [] + for m in self.get_history(): + history.append({"role": m["role"], "content": m["content"]}) + return history, history + + def gradio_clear(self) -> tuple[list[dict[str, str]], list[dict[str, str]]]: + self.reset_chat() + return [], [] + + def gradio_build_interface(self) -> gr.Blocks: + with gr.Blocks() as interface: + gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)") + + # Dropdown provider e stile + with gr.Row(): + provider = gr.Dropdown( + choices=self.pipeline.list_providers(), + type="index", + label="Modello da usare" + ) + provider.change(fn=self.pipeline.choose_leader, inputs=provider, outputs=None) + + style = gr.Dropdown( + choices=self.pipeline.list_styles(), + type="index", + label="Stile di investimento" + ) + style.change(fn=self.pipeline.choose_strategy, inputs=style, outputs=None) + + chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages") + msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?") + + with gr.Row(): + clear_btn = gr.Button("🗑️ Reset Chat") + save_btn = gr.Button("💾 Salva Chat") + load_btn = gr.Button("📂 Carica Chat") + + # Eventi e interazioni + msg.submit(self.gradio_respond, inputs=[msg, chatbot], outputs=[chatbot, chatbot, msg]) + clear_btn.click(self.gradio_clear, inputs=None, outputs=[chatbot, chatbot]) + save_btn.click(self.gradio_save, inputs=None, outputs=None) + load_btn.click(self.gradio_load, inputs=None, outputs=[chatbot, chatbot]) + + return interface \ No newline at end of file diff --git a/src/app/interface/telegram_app.py b/src/app/interface/telegram_app.py new file mode 100644 index 0000000..3bef9d9 --- /dev/null +++ b/src/app/interface/telegram_app.py @@ -0,0 +1,264 @@ +import io +import os +import json +import httpx +import logging +import warnings +from enum import Enum +from markdown_pdf import MarkdownPdf, Section +from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User +from telegram.constants import ChatAction +from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, ConversationHandler, MessageHandler, filters +from app.agents.pipeline import Pipeline +from app.configs import AppConfig + +# per per_message di ConversationHandler che rompe sempre qualunque input tu metta +warnings.filterwarnings("ignore") +logging = logging.getLogger("telegram") + + +# Lo stato cambia in base al valore di ritorno delle funzioni async +# END state è già definito in telegram.ext.ConversationHandler +# Un semplice schema delle interazioni: +# /start +# ║ +# V +# ╔══ CONFIGS <═════╗ +# ║ ║ ╚══> SELECT_CONFIG +# ║ V +# ║ start_team (polling for updates) +# ║ ║ +# ║ V +# ╚═══> END +CONFIGS, SELECT_CONFIG = range(2) + +# Usato per separare la query arrivata da Telegram +QUERY_SEP = "|==|" + +class ConfigsChat(Enum): + MODEL_TEAM = "Team Model" + MODEL_OUTPUT = "Output Model" + STRATEGY = "Strategy" + +class ConfigsRun: + def __init__(self, configs: AppConfig): + team, leader, strategy = configs.get_defaults() + self.team_model = team + self.leader_model = leader + self.strategy = strategy + self.user_query = "" + + +class TelegramApp: + def __init__(self, pipeline: Pipeline): + token = os.getenv("TELEGRAM_BOT_TOKEN") + assert token, "TELEGRAM_BOT_TOKEN environment variable not set" + + self.user_requests: dict[User, ConfigsRun] = {} + self.pipeline = pipeline + self.token = token + self.create_bot() + + def add_miniapp_url(self, url: str) -> None: + try: + endpoint = f"https://api.telegram.org/bot{self.token}/setChatMenuButton" + payload = {"menu_button": json.dumps({ + "type": "web_app", + "text": "MiniApp", + "web_app": { "url": url } + })} + httpx.post(endpoint, data=payload) + except httpx.HTTPError as e: + logging.warning(f"Failed to update mini app URL: {e}") + + def create_bot(self) -> None: + """ + Initialize the Telegram bot and set up the conversation handler. + """ + app = Application.builder().token(self.token).build() + + app.add_error_handler(self.__error_handler) + app.add_handler(ConversationHandler( + per_message=False, # capire a cosa serve perchè da un warning quando parte il server + entry_points=[CommandHandler('start', self.__start)], + states={ + CONFIGS: [ + CallbackQueryHandler(self.__model_team, pattern=ConfigsChat.MODEL_TEAM.name), + CallbackQueryHandler(self.__model_output, pattern=ConfigsChat.MODEL_OUTPUT.name), + CallbackQueryHandler(self.__strategy, pattern=ConfigsChat.STRATEGY.name), + CallbackQueryHandler(self.__cancel, pattern='^cancel$'), + MessageHandler(filters.TEXT, self.__start_team) # Any text message + ], + SELECT_CONFIG: [ + CallbackQueryHandler(self.__select_config, pattern=f"^__select_config{QUERY_SEP}.*$"), + ] + }, + fallbacks=[CommandHandler('start', self.__start)], + )) + self.app = app + + def run(self) -> None: + self.app.run_polling() + + ######################################## + # Funzioni di utilità + ######################################## + async def start_message(self, user: User, query: CallbackQuery | Message) -> None: + confs = self.user_requests.setdefault(user, ConfigsRun(self.pipeline.configs)) + + str_model_team = f"{ConfigsChat.MODEL_TEAM.value}: {confs.team_model.label}" + str_model_output = f"{ConfigsChat.MODEL_OUTPUT.value}: {confs.leader_model.label}" + str_strategy = f"{ConfigsChat.STRATEGY.value}: {confs.strategy.label}" + + msg, keyboard = ( + "Please choose an option or write your query", + InlineKeyboardMarkup([ + [InlineKeyboardButton(str_model_team, callback_data=ConfigsChat.MODEL_TEAM.name)], + [InlineKeyboardButton(str_model_output, callback_data=ConfigsChat.MODEL_OUTPUT.name)], + [InlineKeyboardButton(str_strategy, callback_data=ConfigsChat.STRATEGY.name)], + [InlineKeyboardButton("Cancel", callback_data='cancel')] + ]) + ) + + if isinstance(query, CallbackQuery): + await query.edit_message_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2') + else: + await query.reply_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2') + + async def handle_callbackquery(self, update: Update) -> tuple[CallbackQuery, User]: + assert update.callback_query and update.callback_query.from_user, "Update callback_query or user is None" + query = update.callback_query + await query.answer() # Acknowledge the callback query + return query, query.from_user + + async def handle_message(self, update: Update) -> tuple[Message, User]: + assert update.message and update.message.from_user, "Update message or user is None" + return update.message, update.message.from_user + + def callback_data(self, strings: list[str]) -> str: + return QUERY_SEP.join(strings) + + async def __error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None: + try: + logging.error(f"Unhandled exception in Telegram handler: {context.error}") + + # Try to notify the user in chat if possible + if isinstance(update, Update) and update.effective_chat: + chat_id = update.effective_chat.id + msg = "An error occurred while processing your request." + await context.bot.send_message(chat_id=chat_id, text=msg) + + except Exception: + # Ensure we never raise from the error handler itself + logging.exception("Exception in the error handler") + + ######################################### + # Funzioni async per i comandi e messaggi + ######################################### + async def __start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + message, user = await self.handle_message(update) + logging.info(f"@{user.username} started the conversation.") + await self.start_message(user, message) + return CONFIGS + + async def __model_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + return await self._model_select(update, ConfigsChat.MODEL_TEAM) + + async def __model_output(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + return await self._model_select(update, ConfigsChat.MODEL_OUTPUT) + + async def _model_select(self, update: Update, state: ConfigsChat, msg: str | None = None) -> int: + query, _ = await self.handle_callbackquery(update) + + models = [(m.label, self.callback_data([f"__select_config", str(state), m.name])) for m in self.pipeline.configs.models.all_models] + inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models] + + await query.edit_message_text(msg or state.value, reply_markup=InlineKeyboardMarkup(inline_btns)) + return SELECT_CONFIG + + async def __strategy(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query, _ = await self.handle_callbackquery(update) + + strategies = [(s.label, self.callback_data([f"__select_config", str(ConfigsChat.STRATEGY), s.name])) for s in self.pipeline.configs.strategies] + inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in strategies] + + await query.edit_message_text("Select a strategy", reply_markup=InlineKeyboardMarkup(inline_btns)) + return SELECT_CONFIG + + async def __select_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query, user = await self.handle_callbackquery(update) + logging.debug(f"@{user.username} --> {query.data}") + + req = self.user_requests[user] + _, state, model_name = str(query.data).split(QUERY_SEP) + if state == str(ConfigsChat.MODEL_TEAM): + req.team_model = self.pipeline.configs.get_model_by_name(model_name) + if state == str(ConfigsChat.MODEL_OUTPUT): + req.leader_model = self.pipeline.configs.get_model_by_name(model_name) + if state == str(ConfigsChat.STRATEGY): + req.strategy = self.pipeline.configs.get_strategy_by_name(model_name) + + await self.start_message(user, query) + return CONFIGS + + async def __start_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + message, user = await self.handle_message(update) + + confs = self.user_requests[user] + confs.user_query = message.text or "" + + logging.info(f"@{user.username} started the team with [{confs.team_model.label}, {confs.leader_model.label}, {confs.strategy.label}]") + await self.__run_team(update, confs) + + logging.info(f"@{user.username} team finished.") + return ConversationHandler.END + + async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query, user = await self.handle_callbackquery(update) + logging.info(f"@{user.username} canceled the conversation.") + if user in self.user_requests: + del self.user_requests[user] + await query.edit_message_text("Conversation canceled. Use /start to begin again.") + return ConversationHandler.END + + async def __run_team(self, update: Update, confs: ConfigsRun) -> None: + if not update.message: return + + bot = update.get_bot() + msg_id = update.message.message_id - 1 + chat_id = update.message.chat_id + + configs_str = [ + 'Running with configurations: ', + f'Team: {confs.team_model.label}', + f'Output: {confs.leader_model.label}', + f'Strategy: {confs.strategy.label}', + f'Query: "{confs.user_query}"' + ] + full_message = f"""```\n{'\n'.join(configs_str)}\n```\n\n""" + first_message = full_message + "Generating report, please wait" + msg = await bot.edit_message_text(chat_id=chat_id, message_id=msg_id, text=first_message, parse_mode='MarkdownV2') + if isinstance(msg, bool): return + + # Remove user query and bot message + await bot.delete_message(chat_id=chat_id, message_id=update.message.id) + + self.pipeline.leader_model = confs.leader_model + self.pipeline.team_model = confs.team_model + self.pipeline.strategy = confs.strategy + + # TODO migliorare messaggi di attesa + await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) + report_content = self.pipeline.interact(confs.user_query) + await msg.delete() + + # attach report file to the message + pdf = MarkdownPdf(toc_level=2, optimize=True) + pdf.add_section(Section(report_content, toc=False)) + + # TODO vedere se ha senso dare il pdf o solo il messaggio + document = io.BytesIO() + pdf.save_bytes(document) + document.seek(0) + await bot.send_document(chat_id=chat_id, document=document, filename="report.pdf", parse_mode='MarkdownV2', caption=full_message) + diff --git a/uv.lock b/uv.lock index bb0b6b5..000517c 100644 --- a/uv.lock +++ b/uv.lock @@ -816,14 +816,27 @@ wheels = [ [[package]] name = "markdown-it-py" -version = "4.0.0" +version = "3.0.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "mdurl" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" } +sdist = { url = "https://files.pythonhosted.org/packages/38/71/3b932df36c1a044d397a1f92d1cf91ee0a503d91e470cbd670aa66b07ed0/markdown-it-py-3.0.0.tar.gz", hash = "sha256:e3f60a94fa066dc52ec76661e37c851cb232d92f9886b15cb560aaada2df8feb", size = 74596, upload-time = "2023-06-03T06:41:14.443Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" }, + { url = "https://files.pythonhosted.org/packages/42/d7/1ec15b46af6af88f19b8e5ffea08fa375d433c998b8a7639e76935c14f1f/markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1", size = 87528, upload-time = "2023-06-03T06:41:11.019Z" }, +] + +[[package]] +name = "markdown-pdf" +version = "1.10" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py" }, + { name = "pymupdf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5e/e6/969311a194074afa9672324244adbf64a7e8663f2ba0003395b7140f5c4a/markdown_pdf-1.10.tar.gz", hash = "sha256:bcf23d816baa56aec3a60f940681652c4e46ee048c6335835cddf86d1ff20a8e", size = 17783, upload-time = "2025-09-24T19:01:38.758Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1f/78/c593979cf1525be786d63b285a7a67afae397fc132382158432490ebd1ed/markdown_pdf-1.10-py3-none-any.whl", hash = "sha256:1863e78454e5aa9bcb34c125f385d4ff045c727660c5172877e82e69d06fae6d", size = 17994, upload-time = "2025-09-24T19:01:37.155Z" }, ] [[package]] @@ -1238,6 +1251,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/61/ad/689f02752eeec26aed679477e80e632ef1b682313be70793d798c1d5fc8f/PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb", size = 22997, upload-time = "2024-11-28T03:43:27.893Z" }, ] +[[package]] +name = "pymupdf" +version = "1.26.4" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/90/35/031556dfc0d332d8e9ed9b61ca105138606d3f8971b9eb02e20118629334/pymupdf-1.26.4.tar.gz", hash = "sha256:be13a066d42bfaed343a488168656637c4d9843ddc63b768dc827c9dfc6b9989", size = 83077563, upload-time = "2025-08-25T14:20:29.499Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/27/ae/3be722886cc7be2093585cd94f466db1199133ab005645a7a567b249560f/pymupdf-1.26.4-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:cb95562a0a63ce906fd788bdad5239063b63068cf4a991684f43acb09052cb99", size = 23061974, upload-time = "2025-08-25T14:16:58.811Z" }, + { url = "https://files.pythonhosted.org/packages/fc/b0/9a451d837e1fe18ecdbfbc34a6499f153c8a008763229cc634725383a93f/pymupdf-1.26.4-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:67e9e6b45832c33726651c2a031e9a20108fd9e759140b9e843f934de813a7ff", size = 22410112, upload-time = "2025-08-25T14:17:24.511Z" }, + { url = "https://files.pythonhosted.org/packages/d8/13/0916e8e02cb5453161fb9d9167c747d0a20d58633e30728645374153f815/pymupdf-1.26.4-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:2604f687dd02b6a1b98c81bd8becfc0024899a2d2085adfe3f9e91607721fd22", size = 23454948, upload-time = "2025-08-25T21:20:07.71Z" }, + { url = "https://files.pythonhosted.org/packages/4e/c6/d3cfafc75d383603884edeabe4821a549345df954a88d79e6764e2c87601/pymupdf-1.26.4-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:973a6dda61ebd34040e4df3753bf004b669017663fbbfdaa294d44eceba98de0", size = 24060686, upload-time = "2025-08-25T14:17:56.536Z" }, + { url = "https://files.pythonhosted.org/packages/72/08/035e9d22c801e801bba50c6745bc90ba8696a042fe2c68793e28bf0c3b07/pymupdf-1.26.4-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:299a49797df5b558e695647fa791329ba3911cbbb31ed65f24a6266c118ef1a7", size = 24265046, upload-time = "2025-08-25T14:18:21.238Z" }, + { url = "https://files.pythonhosted.org/packages/28/8c/c201e4846ec0fb6ae5d52aa3a5d66f9355f0c69fb94230265714df0de65e/pymupdf-1.26.4-cp39-abi3-win32.whl", hash = "sha256:51b38379aad8c71bd7a8dd24d93fbe7580c2a5d9d7e1f9cd29ebbba315aa1bd1", size = 17127332, upload-time = "2025-08-25T14:18:39.132Z" }, + { url = "https://files.pythonhosted.org/packages/d1/c4/87d27b108c2f6d773aa5183c5ae367b2a99296ea4bc16eb79f453c679e30/pymupdf-1.26.4-cp39-abi3-win_amd64.whl", hash = "sha256:0b6345a93a9afd28de2567e433055e873205c52e6b920b129ca50e836a3aeec6", size = 18743491, upload-time = "2025-08-25T14:19:01.104Z" }, +] + [[package]] name = "pytest" version = "8.4.2" @@ -1301,6 +1329,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/45/58/38b5afbc1a800eeea951b9285d3912613f2603bdf897a4ab0f4bd7f405fc/python_multipart-0.0.20-py3-none-any.whl", hash = "sha256:8a62d3a8335e06589fe01f2a3e178cdcc632f3fbe0d492ad9ee0ec35aab1f104", size = 24546, upload-time = "2024-12-16T19:45:44.423Z" }, ] +[[package]] +name = "python-telegram-bot" +version = "22.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "httpx" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/0b/6b/400f88e5c29a270c1c519a3ca8ad0babc650ec63dbfbd1b73babf625ed54/python_telegram_bot-22.5.tar.gz", hash = "sha256:82d4efd891d04132f308f0369f5b5929e0b96957901f58bcef43911c5f6f92f8", size = 1488269, upload-time = "2025-09-27T13:50:27.879Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bc/c3/340c7520095a8c79455fcf699cbb207225e5b36490d2b9ee557c16a7b21b/python_telegram_bot-22.5-py3-none-any.whl", hash = "sha256:4b7cd365344a7dce54312cc4520d7fa898b44d1a0e5f8c74b5bd9b540d035d16", size = 730976, upload-time = "2025-09-27T13:50:25.93Z" }, +] + [[package]] name = "pytz" version = "2025.2" @@ -1622,11 +1662,13 @@ dependencies = [ { name = "gnews" }, { name = "google-genai" }, { name = "gradio" }, + { name = "markdown-pdf" }, { name = "newsapi-python" }, { name = "ollama" }, { name = "praw" }, { name = "pytest" }, { name = "python-binance" }, + { name = "python-telegram-bot" }, { name = "yfinance" }, ] @@ -1640,11 +1682,13 @@ requires-dist = [ { name = "gnews" }, { name = "google-genai" }, { name = "gradio" }, + { name = "markdown-pdf" }, { name = "newsapi-python" }, { name = "ollama" }, { name = "praw" }, { name = "pytest" }, { name = "python-binance" }, + { name = "python-telegram-bot" }, { name = "yfinance" }, ]