diff --git a/src/app/agents/core.py b/src/app/agents/core.py index 4a685cb..1258f54 100644 --- a/src/app/agents/core.py +++ b/src/app/agents/core.py @@ -41,6 +41,13 @@ class PipelineInputs: # ====================== # Dropdown handlers # ====================== + def choose_query_checker(self, index: int): + """ + Sceglie il modello LLM da usare per l'analizzatore di query. + """ + assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + self.query_analyzer_model = self.configs.models.all_models[index] + def choose_team_leader(self, index: int): """ Sceglie il modello LLM da usare per il Team Leader. @@ -55,6 +62,13 @@ class PipelineInputs: assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." self.team_model = self.configs.models.all_models[index] + def choose_report_generator(self, index: int): + """ + Sceglie il modello LLM da usare per il generatore di report. + """ + assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + self.report_generation_model = self.configs.models.all_models[index] + def choose_strategy(self, index: int): """ Sceglie la strategia da usare per il Team. @@ -119,3 +133,81 @@ class PipelineInputs: social_tool = SocialAPIsTool() social_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds) return market_tool, news_tool, social_tool + + def __str__(self) -> str: + return "\n".join([ + f"Query Check: {self.query_analyzer_model.label}", + f"Team Leader: {self.team_leader_model.label}", + f"Team: {self.team_model.label}", + f"Report: {self.report_generation_model.label}", + f"Strategy: {self.strategy.label}", + f"User Query: \"{self.user_query}\"", + ]) + + +class RunMessage: + """ + Classe per gestire i messaggi di stato durante l'esecuzione della pipeline. + Inizializza il messaggio con gli step e aggiorna lo stato, permettendo di ottenere + il messaggio piΓΉ recente da inviare all'utente. + """ + + def __init__(self, inputs: PipelineInputs, prefix: str = "", suffix: str = ""): + """ + Inizializza il messaggio di esecuzione con gli step iniziali. + Tre stati possibili per ogni step: + - In corso (πŸ”³) + - In esecuzione (➑️) + - Completato (βœ…) + + Lo stato di esecuzione puΓ² essere assegnato solo ad uno step alla volta. + Args: + inputs (PipelineInputs): Input della pipeline per mostrare la configurazione. + prefix (str, optional): Prefisso del messaggio. Defaults to "". + suffix (str, optional): Suffisso del messaggio. Defaults to "". + """ + self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n" + self.emojis = ['πŸ”³', '➑️', 'βœ…'] + self.placeholder = '<<<>>>' + self.current = 0 + self.steps_total = [ + (f"{self.placeholder} Query Check", 1), + (f"{self.placeholder} Info Recovery", 0), + (f"{self.placeholder} Report Generation", 0), + ] + + def update(self) -> 'RunMessage': + """ + Sposta lo stato di esecuzione al passo successivo. + Lo step precedente completato viene marcato come completato. + Returns: + RunMessage: L'istanza aggiornata di RunMessage. + """ + text_curr, state_curr = self.steps_total[self.current] + self.steps_total[self.current] = (text_curr, state_curr + 1) + self.current = min(self.current + 1, len(self.steps_total)) + if self.current < len(self.steps_total): + text_curr, state_curr = self.steps_total[self.current] + self.steps_total[self.current] = (text_curr, state_curr + 1) + return self + + def update_step(self, text_extra: str = "") -> 'RunMessage': + """ + Aggiorna il messaggio per lo step corrente. + Args: + text_extra (str, optional): Testo aggiuntivo da includere nello step. Defaults to "". + """ + text_curr, state_curr = self.steps_total[self.current] + if text_extra: + text_curr = f"{text_curr.replace('β•š', 'β• ')}\nβ•šβ• {text_extra}" + self.steps_total[self.current] = (text_curr, state_curr) + return self + + def get_latest(self) -> str: + """ + Restituisce il messaggio di esecuzione piΓΉ recente. + Returns: + str: Messaggio di esecuzione aggiornato. + """ + steps = [msg.replace(self.placeholder, self.emojis[state]) for msg, state in self.steps_total] + return self.base_message + "\n".join(steps) diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index bcec72d..0498f90 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -33,7 +33,7 @@ class PipelineEvent(str, Enum): (PipelineEvent.QUERY_ANALYZER, lambda _: logging.info(f"[{run_id}] Query Analyzer completed.")), (PipelineEvent.INFO_RECOVERY, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")), (PipelineEvent.REPORT_GENERATION, lambda _: logging.info(f"[{run_id}] Report Generation completed.")), - (PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name}] by {e.agent_name}.")), + (PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")), (PipelineEvent.RUN_FINISHED, lambda _: logging.info(f"[{run_id}] Run completed.")), ] diff --git a/src/app/api/wrapper_handler.py b/src/app/api/wrapper_handler.py index 30b3887..9c40567 100644 --- a/src/app/api/wrapper_handler.py +++ b/src/app/api/wrapper_handler.py @@ -87,7 +87,7 @@ class WrapperHandler(Generic[WrapperType]): Exception: If all wrappers fail after retries. """ - logging.info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}") + logging.debug(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}") results: dict[str, OutputType] = {} starting_index = self.index diff --git a/src/app/interface/__init__.py b/src/app/interface/__init__.py index 186558a..1788339 100644 --- a/src/app/interface/__init__.py +++ b/src/app/interface/__init__.py @@ -1,4 +1,4 @@ from app.interface.chat import ChatManager -from app.interface.telegram_app import TelegramApp +from app.interface.telegram import TelegramApp __all__ = ["ChatManager", "TelegramApp"] diff --git a/src/app/interface/telegram_app.py b/src/app/interface/telegram.py similarity index 51% rename from src/app/interface/telegram_app.py rename to src/app/interface/telegram.py index 71ff4c8..b720c43 100644 --- a/src/app/interface/telegram_app.py +++ b/src/app/interface/telegram.py @@ -1,6 +1,8 @@ +import asyncio import io import os import json +from typing import Any import httpx import logging import warnings @@ -9,7 +11,7 @@ from markdown_pdf import MarkdownPdf, Section from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User from telegram.constants import ChatAction from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, ConversationHandler, MessageHandler, filters -from app.agents.pipeline import Pipeline, PipelineInputs +from app.agents.pipeline import Pipeline, PipelineEvent, PipelineInputs, RunMessage # per per_message di ConversationHandler che rompe sempre qualunque input tu metta warnings.filterwarnings("ignore") @@ -21,23 +23,44 @@ logging = logging.getLogger("telegram") # Un semplice schema delle interazioni: # /start # β•‘ -# V +# v # ╔══ CONFIGS <═════╗ # β•‘ β•‘ β•šβ•β•> SELECT_CONFIG -# β•‘ V -# β•‘ start_team (polling for updates) +# β•‘ v ^ +# β•‘ MODELS ══════╝ +# β•‘ +# ╠══> start (polling for updates) # β•‘ β•‘ -# β•‘ V +# β•‘ v # β•šβ•β•β•> END -CONFIGS, SELECT_CONFIG = range(2) +CONFIGS, SELECT_MODEL, SELECT_CONFIG = range(3) # Usato per separare la query arrivata da Telegram QUERY_SEP = "|==|" class ConfigsChat(Enum): + MODEL_CHECK = "Check Model" + MODEL_TEAM_LEADER = "Team Leader Model" MODEL_TEAM = "Team Model" - MODEL_OUTPUT = "Output Model" + MODEL_REPORT = "Report Model" + CHANGE_MODELS = "Change Models" STRATEGY = "Strategy" + CANCEL = "Cancel" + + def get_inline_button(self, value_to_display:str="") -> InlineKeyboardButton: + display = self.value if not value_to_display else f"{self.value}: {value_to_display}" + return InlineKeyboardButton(display, callback_data=self.name) + + def change_value(self, inputs: PipelineInputs, new_value:int) -> None: + functions_map = { + self.MODEL_CHECK.name: inputs.choose_query_checker, + self.MODEL_TEAM_LEADER.name: inputs.choose_team_leader, + self.MODEL_TEAM.name: inputs.choose_team, + self.MODEL_REPORT.name: inputs.choose_report_generator, + self.STRATEGY.name: inputs.choose_strategy, + } + functions_map[self.name](new_value) + class TelegramApp: def __init__(self): @@ -72,14 +95,21 @@ class TelegramApp: entry_points=[CommandHandler('start', self.__start)], states={ CONFIGS: [ - CallbackQueryHandler(self.__model_team, pattern=ConfigsChat.MODEL_TEAM.name), - CallbackQueryHandler(self.__model_output, pattern=ConfigsChat.MODEL_OUTPUT.name), + CallbackQueryHandler(self.__models, pattern=ConfigsChat.CHANGE_MODELS.name), CallbackQueryHandler(self.__strategy, pattern=ConfigsChat.STRATEGY.name), - CallbackQueryHandler(self.__cancel, pattern='^cancel$'), - MessageHandler(filters.TEXT, self.__start_team) # Any text message + CallbackQueryHandler(self.__cancel, pattern='^CANCEL$'), + MessageHandler(filters.TEXT, self.__start_llms) # Any text message + ], + SELECT_MODEL: [ + CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_CHECK.name), + CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_TEAM_LEADER.name), + CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_TEAM.name), + CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_REPORT.name), + CallbackQueryHandler(self.__go_to_start, pattern='^CANCEL$'), ], SELECT_CONFIG: [ CallbackQueryHandler(self.__select_config, pattern=f"^__select_config{QUERY_SEP}.*$"), + CallbackQueryHandler(self.__go_to_start, pattern='^CANCEL$'), ] }, fallbacks=[CommandHandler('start', self.__start)], @@ -87,45 +117,28 @@ class TelegramApp: self.app = app def run(self) -> None: + """ + Start the Telegram bot polling. This will keep the bot running and listening for updates.\n + This function blocks until the bot is stopped. + """ self.app.run_polling() ######################################## # Funzioni di utilitΓ  ######################################## - async def start_message(self, user: User, query: CallbackQuery | Message) -> None: - confs = self.user_requests.setdefault(user, PipelineInputs()) - - str_model_team = f"{ConfigsChat.MODEL_TEAM.value}: {confs.team_model.label}" - str_model_output = f"{ConfigsChat.MODEL_OUTPUT.value}: {confs.team_leader_model.label}" - str_strategy = f"{ConfigsChat.STRATEGY.value}: {confs.strategy.label}" - - msg, keyboard = ( - "Please choose an option or write your query", - InlineKeyboardMarkup([ - [InlineKeyboardButton(str_model_team, callback_data=ConfigsChat.MODEL_TEAM.name)], - [InlineKeyboardButton(str_model_output, callback_data=ConfigsChat.MODEL_OUTPUT.name)], - [InlineKeyboardButton(str_strategy, callback_data=ConfigsChat.STRATEGY.name)], - [InlineKeyboardButton("Cancel", callback_data='cancel')] - ]) - ) - - if isinstance(query, CallbackQuery): - await query.edit_message_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2') - else: - await query.reply_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2') - async def handle_callbackquery(self, update: Update) -> tuple[CallbackQuery, User]: - assert update.callback_query and update.callback_query.from_user, "Update callback_query or user is None" + assert update.callback_query, "Update callback_query is None" + assert update.effective_user, "Update effective_user is None" query = update.callback_query await query.answer() # Acknowledge the callback query - return query, query.from_user + return query, update.effective_user - async def handle_message(self, update: Update) -> tuple[Message, User]: - assert update.message and update.message.from_user, "Update message or user is None" - return update.message, update.message.from_user + def handle_message(self, update: Update) -> tuple[Message, User]: + assert update.message and update.effective_user, "Update message or user is None" + return update.message, update.effective_user def build_callback_data(self, callback: str, config: ConfigsChat, labels: list[str]) -> list[tuple[str, str]]: - return [(label, QUERY_SEP.join((callback, config.value, str(i)))) for i, label in enumerate(labels)] + return [(label, QUERY_SEP.join((callback, config.name, str(i)))) for i, label in enumerate(labels)] async def __error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None: try: @@ -142,28 +155,69 @@ class TelegramApp: logging.exception("Exception in the error handler") ######################################### - # Funzioni async per i comandi e messaggi + # Funzioni base di gestione stati ######################################### async def __start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - message, user = await self.handle_message(update) - logging.info(f"@{user.username} started the conversation.") - await self.start_message(user, message) + user = update.effective_user.username if update.effective_user else "Unknown" + logging.info(f"@{user} started the conversation.") + return await self.__go_to_start(update, context) + + async def __go_to_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + user = update.effective_user + assert user, "Update effective_user is None" + msg = update.callback_query if update.callback_query else update.message + assert msg, "Update message and callback_query are both None" + + confs = self.user_requests.setdefault(user, PipelineInputs()) # despite the name, it creates a default only if not present + args: dict[str, Any] = { + "text": "Please choose an option or write your query", + "parse_mode": 'MarkdownV2', + "reply_markup": InlineKeyboardMarkup([ + [ConfigsChat.CHANGE_MODELS.get_inline_button()], + [ConfigsChat.STRATEGY.get_inline_button(confs.strategy.label)], + [ConfigsChat.CANCEL.get_inline_button()], + ]) + } + + await (msg.edit_message_text(**args) if isinstance(msg, CallbackQuery) else msg.reply_text(**args)) return CONFIGS - async def __model_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - return await self._model_select(update, ConfigsChat.MODEL_TEAM) + async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query, user = await self.handle_callbackquery(update) + logging.info(f"@{user.username} canceled the conversation.") + if user in self.user_requests: + del self.user_requests[user] + await query.edit_message_text("Conversation canceled. Use /start to begin again.") + return ConversationHandler.END - async def __model_output(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - return await self._model_select(update, ConfigsChat.MODEL_OUTPUT) + ########################################## + # Configurazioni + ########################################## + async def __models(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query, user = await self.handle_callbackquery(update) + req = self.user_requests[user] - async def _model_select(self, update: Update, state: ConfigsChat, msg: str | None = None) -> int: + await query.edit_message_text("Select a model", reply_markup=InlineKeyboardMarkup([ + [ConfigsChat.MODEL_CHECK.get_inline_button(req.query_analyzer_model.label)], + [ConfigsChat.MODEL_TEAM_LEADER.get_inline_button(req.team_leader_model.label)], + [ConfigsChat.MODEL_TEAM.get_inline_button(req.team_model.label)], + [ConfigsChat.MODEL_REPORT.get_inline_button(req.report_generation_model.label)], + [ConfigsChat.CANCEL.get_inline_button()] + ])) + return SELECT_MODEL + + async def __model_select(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query, user = await self.handle_callbackquery(update) + if not query.data: + logging.error("Callback query data is None") + return CONFIGS + req = self.user_requests[user] - models = self.build_callback_data("__select_config", state, req.list_models_names()) + models = self.build_callback_data("__select_config", ConfigsChat[query.data], req.list_models_names()) inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models] - await query.edit_message_text(msg or state.value, reply_markup=InlineKeyboardMarkup(inline_btns)) + await query.edit_message_text("Select a model", reply_markup=InlineKeyboardMarkup(inline_btns)) return SELECT_CONFIG async def __strategy(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: @@ -182,71 +236,62 @@ class TelegramApp: req = self.user_requests[user] _, state, index = str(query.data).split(QUERY_SEP) - if state == str(ConfigsChat.MODEL_TEAM): - req.choose_team(int(index)) - if state == str(ConfigsChat.MODEL_OUTPUT): - req.choose_team_leader(int(index)) - if state == str(ConfigsChat.STRATEGY): - req.choose_strategy(int(index)) + ConfigsChat[state].change_value(req, int(index)) - await self.start_message(user, query) - return CONFIGS + return await self.__go_to_start(update, context) - async def __start_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - message, user = await self.handle_message(update) + async def __start_llms(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + message, user = self.handle_message(update) confs = self.user_requests[user] confs.user_query = message.text or "" - logging.info(f"@{user.username} started the team with [{confs.team_model.label}, {confs.team_leader_model.label}, {confs.strategy.label}]") - await self.__run_team(update, confs) + logging.info(f"@{user.username} started the team with [{confs.query_analyzer_model.label}, {confs.team_model.label}, {confs.team_leader_model.label}, {confs.report_generation_model.label}, {confs.strategy.label}]") + await self.__run(update, confs) logging.info(f"@{user.username} team finished.") return ConversationHandler.END - async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - query, user = await self.handle_callbackquery(update) - logging.info(f"@{user.username} canceled the conversation.") - if user in self.user_requests: - del self.user_requests[user] - await query.edit_message_text("Conversation canceled. Use /start to begin again.") - return ConversationHandler.END - async def __run_team(self, update: Update, inputs: PipelineInputs) -> None: + ########################################## + # RUN APP + ########################################## + async def __run(self, update: Update, inputs: PipelineInputs) -> None: if not update.message: return bot = update.get_bot() msg_id = update.message.message_id - 1 chat_id = update.message.chat_id - configs_str = [ - 'Running with configurations: ', - f'Team: {inputs.team_model.label}', - f'Output: {inputs.team_leader_model.label}', - f'Strategy: {inputs.strategy.label}', - f'Query: "{inputs.user_query}"' - ] - full_message = f"""```\n{'\n'.join(configs_str)}\n```\n\n""" - first_message = full_message + "Generating report, please wait" - msg = await bot.edit_message_text(chat_id=chat_id, message_id=msg_id, text=first_message, parse_mode='MarkdownV2') + run_message = RunMessage(inputs, prefix="```\n", suffix="\n```") + msg = await bot.edit_message_text(chat_id=chat_id, message_id=msg_id, text=run_message.get_latest(), parse_mode='MarkdownV2') if isinstance(msg, bool): return # Remove user query and bot message await bot.delete_message(chat_id=chat_id, message_id=update.message.id) - # TODO migliorare messaggi di attesa + def update_user(update_step: str = "") -> None: + if update_step: run_message.update_step(update_step) + else: run_message.update() + + message = run_message.get_latest() + if msg.text != message: + asyncio.create_task(msg.edit_text(message, parse_mode='MarkdownV2')) + await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) pipeline = Pipeline(inputs) - report_content = await pipeline.interact_async() - await msg.delete() + report_content = await pipeline.interact_async(listeners=[ + (PipelineEvent.QUERY_CHECK, lambda _: update_user()), + (PipelineEvent.TOOL_USED, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))), + (PipelineEvent.INFO_RECOVERY, lambda _: update_user()), + (PipelineEvent.REPORT_GENERATION, lambda _: update_user()), + ]) # attach report file to the message pdf = MarkdownPdf(toc_level=2, optimize=True) pdf.add_section(Section(report_content, toc=False)) - # TODO vedere se ha senso dare il pdf o solo il messaggio document = io.BytesIO() pdf.save_bytes(document) document.seek(0) - await bot.send_document(chat_id=chat_id, document=document, filename="report.pdf", parse_mode='MarkdownV2', caption=full_message) - + await msg.reply_document(document=document, filename="report.pdf")