diff --git a/src/app/agents/core.py b/src/app/agents/core.py index 4a685cb..dbf2707 100644 --- a/src/app/agents/core.py +++ b/src/app/agents/core.py @@ -41,6 +41,13 @@ class PipelineInputs: # ====================== # Dropdown handlers # ====================== + def choose_query_checker(self, index: int): + """ + Sceglie il modello LLM da usare per l'analizzatore di query. + """ + assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + self.query_analyzer_model = self.configs.models.all_models[index] + def choose_team_leader(self, index: int): """ Sceglie il modello LLM da usare per il Team Leader. @@ -55,6 +62,13 @@ class PipelineInputs: assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." self.team_model = self.configs.models.all_models[index] + def choose_report_generator(self, index: int): + """ + Sceglie il modello LLM da usare per il generatore di report. + """ + assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + self.report_generation_model = self.configs.models.all_models[index] + def choose_strategy(self, index: int): """ Sceglie la strategia da usare per il Team. diff --git a/src/app/interface/telegram.py b/src/app/interface/telegram.py index 71ff4c8..7773e66 100644 --- a/src/app/interface/telegram.py +++ b/src/app/interface/telegram.py @@ -1,6 +1,7 @@ import io import os import json +from typing import Any import httpx import logging import warnings @@ -21,23 +22,46 @@ logging = logging.getLogger("telegram") # Un semplice schema delle interazioni: # /start # ║ -# V +# v # ╔══ CONFIGS <═════╗ # ║ ║ ╚══> SELECT_CONFIG -# ║ V -# ║ start_team (polling for updates) +# ║ v ^ +# ║ MODELS ══════╝ +# ║ +# ╠══> start (polling for updates) # ║ ║ -# ║ V +# ║ v # ╚═══> END -CONFIGS, SELECT_CONFIG = range(2) +CONFIGS, SELECT_MODEL, SELECT_CONFIG = range(3) # Usato per separare la query arrivata da Telegram QUERY_SEP = "|==|" class ConfigsChat(Enum): + MODEL_CHECK = "Check Model" + MODEL_TEAM_LEADER = "Team Leader Model" MODEL_TEAM = "Team Model" - MODEL_OUTPUT = "Output Model" + MODEL_REPORT = "Report Model" + CHANGE_MODELS = "Change Models" STRATEGY = "Strategy" + CANCEL = "Cancel" + + def get_inline_button(self, value_to_display:str="") -> InlineKeyboardButton: + display = self.value if not value_to_display else f"{self.value}: {value_to_display}" + return InlineKeyboardButton(display, callback_data=self.name) + + def change_value(self, inputs: PipelineInputs, new_value:int) -> None: + if self.name == self.MODEL_CHECK.name: + inputs.choose_query_checker(new_value) + elif self.name == self.MODEL_TEAM_LEADER.name: + inputs.choose_team_leader(new_value) + elif self.name == self.MODEL_TEAM.name: + inputs.choose_team(new_value) + elif self.name == self.MODEL_REPORT.name: + inputs.choose_report_generator(new_value) + elif self.name == self.STRATEGY.name: + inputs.choose_strategy(new_value) + class TelegramApp: def __init__(self): @@ -72,14 +96,21 @@ class TelegramApp: entry_points=[CommandHandler('start', self.__start)], states={ CONFIGS: [ - CallbackQueryHandler(self.__model_team, pattern=ConfigsChat.MODEL_TEAM.name), - CallbackQueryHandler(self.__model_output, pattern=ConfigsChat.MODEL_OUTPUT.name), + CallbackQueryHandler(self.__models, pattern=ConfigsChat.CHANGE_MODELS.name), CallbackQueryHandler(self.__strategy, pattern=ConfigsChat.STRATEGY.name), - CallbackQueryHandler(self.__cancel, pattern='^cancel$'), - MessageHandler(filters.TEXT, self.__start_team) # Any text message + CallbackQueryHandler(self.__cancel, pattern='^CANCEL$'), + MessageHandler(filters.TEXT, self.__start_llms) # Any text message + ], + SELECT_MODEL: [ + CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_CHECK.name), + CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_TEAM_LEADER.name), + CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_TEAM.name), + CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_REPORT.name), + CallbackQueryHandler(self.__go_to_start, pattern='^CANCEL$'), ], SELECT_CONFIG: [ CallbackQueryHandler(self.__select_config, pattern=f"^__select_config{QUERY_SEP}.*$"), + CallbackQueryHandler(self.__go_to_start, pattern='^CANCEL$'), ] }, fallbacks=[CommandHandler('start', self.__start)], @@ -87,45 +118,28 @@ class TelegramApp: self.app = app def run(self) -> None: + """ + Start the Telegram bot polling. This will keep the bot running and listening for updates.\n + This function blocks until the bot is stopped. + """ self.app.run_polling() ######################################## # Funzioni di utilità ######################################## - async def start_message(self, user: User, query: CallbackQuery | Message) -> None: - confs = self.user_requests.setdefault(user, PipelineInputs()) - - str_model_team = f"{ConfigsChat.MODEL_TEAM.value}: {confs.team_model.label}" - str_model_output = f"{ConfigsChat.MODEL_OUTPUT.value}: {confs.team_leader_model.label}" - str_strategy = f"{ConfigsChat.STRATEGY.value}: {confs.strategy.label}" - - msg, keyboard = ( - "Please choose an option or write your query", - InlineKeyboardMarkup([ - [InlineKeyboardButton(str_model_team, callback_data=ConfigsChat.MODEL_TEAM.name)], - [InlineKeyboardButton(str_model_output, callback_data=ConfigsChat.MODEL_OUTPUT.name)], - [InlineKeyboardButton(str_strategy, callback_data=ConfigsChat.STRATEGY.name)], - [InlineKeyboardButton("Cancel", callback_data='cancel')] - ]) - ) - - if isinstance(query, CallbackQuery): - await query.edit_message_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2') - else: - await query.reply_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2') - async def handle_callbackquery(self, update: Update) -> tuple[CallbackQuery, User]: - assert update.callback_query and update.callback_query.from_user, "Update callback_query or user is None" + assert update.callback_query, "Update callback_query is None" + assert update.effective_user, "Update effective_user is None" query = update.callback_query await query.answer() # Acknowledge the callback query - return query, query.from_user + return query, update.effective_user - async def handle_message(self, update: Update) -> tuple[Message, User]: - assert update.message and update.message.from_user, "Update message or user is None" - return update.message, update.message.from_user + def handle_message(self, update: Update) -> tuple[Message, User]: + assert update.message and update.effective_user, "Update message or user is None" + return update.message, update.effective_user def build_callback_data(self, callback: str, config: ConfigsChat, labels: list[str]) -> list[tuple[str, str]]: - return [(label, QUERY_SEP.join((callback, config.value, str(i)))) for i, label in enumerate(labels)] + return [(label, QUERY_SEP.join((callback, config.name, str(i)))) for i, label in enumerate(labels)] async def __error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None: try: @@ -142,28 +156,69 @@ class TelegramApp: logging.exception("Exception in the error handler") ######################################### - # Funzioni async per i comandi e messaggi + # Funzioni base di gestione stati ######################################### async def __start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - message, user = await self.handle_message(update) - logging.info(f"@{user.username} started the conversation.") - await self.start_message(user, message) + user = update.effective_user.username if update.effective_user else "Unknown" + logging.info(f"@{user} started the conversation.") + return await self.__go_to_start(update, context) + + async def __go_to_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + user = update.effective_user + assert user, "Update effective_user is None" + msg = update.callback_query if update.callback_query else update.message + assert msg, "Update message and callback_query are both None" + + confs = self.user_requests.setdefault(user, PipelineInputs()) # despite the name, it creates a default only if not present + args: dict[str, Any] = { + "text": "Please choose an option or write your query", + "parse_mode": 'MarkdownV2', + "reply_markup": InlineKeyboardMarkup([ + [ConfigsChat.CHANGE_MODELS.get_inline_button()], + [ConfigsChat.STRATEGY.get_inline_button(confs.strategy.label)], + [ConfigsChat.CANCEL.get_inline_button()], + ]) + } + + await (msg.edit_message_text(**args) if isinstance(msg, CallbackQuery) else msg.reply_text(**args)) return CONFIGS - async def __model_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - return await self._model_select(update, ConfigsChat.MODEL_TEAM) + async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query, user = await self.handle_callbackquery(update) + logging.info(f"@{user.username} canceled the conversation.") + if user in self.user_requests: + del self.user_requests[user] + await query.edit_message_text("Conversation canceled. Use /start to begin again.") + return ConversationHandler.END - async def __model_output(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - return await self._model_select(update, ConfigsChat.MODEL_OUTPUT) + ########################################## + # Configurazioni + ########################################## + async def __models(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query, user = await self.handle_callbackquery(update) + req = self.user_requests[user] - async def _model_select(self, update: Update, state: ConfigsChat, msg: str | None = None) -> int: + await query.edit_message_text("Select a model", reply_markup=InlineKeyboardMarkup([ + [ConfigsChat.MODEL_CHECK.get_inline_button(req.query_analyzer_model.label)], + [ConfigsChat.MODEL_TEAM_LEADER.get_inline_button(req.team_leader_model.label)], + [ConfigsChat.MODEL_TEAM.get_inline_button(req.team_model.label)], + [ConfigsChat.MODEL_REPORT.get_inline_button(req.report_generation_model.label)], + [ConfigsChat.CANCEL.get_inline_button()] + ])) + return SELECT_MODEL + + async def __model_select(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query, user = await self.handle_callbackquery(update) + if not query.data: + logging.error("Callback query data is None") + return CONFIGS + req = self.user_requests[user] - models = self.build_callback_data("__select_config", state, req.list_models_names()) + models = self.build_callback_data("__select_config", ConfigsChat[query.data], req.list_models_names()) inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models] - await query.edit_message_text(msg or state.value, reply_markup=InlineKeyboardMarkup(inline_btns)) + await query.edit_message_text("Select a model", reply_markup=InlineKeyboardMarkup(inline_btns)) return SELECT_CONFIG async def __strategy(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: @@ -178,41 +233,31 @@ class TelegramApp: async def __select_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: query, user = await self.handle_callbackquery(update) - logging.debug(f"@{user.username} --> {query.data}") + logging.info(f"@{user.username} --> {query.data}") req = self.user_requests[user] _, state, index = str(query.data).split(QUERY_SEP) - if state == str(ConfigsChat.MODEL_TEAM): - req.choose_team(int(index)) - if state == str(ConfigsChat.MODEL_OUTPUT): - req.choose_team_leader(int(index)) - if state == str(ConfigsChat.STRATEGY): - req.choose_strategy(int(index)) + ConfigsChat[state].change_value(req, int(index)) - await self.start_message(user, query) - return CONFIGS + return await self.__go_to_start(update, context) - async def __start_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - message, user = await self.handle_message(update) + async def __start_llms(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + message, user = self.handle_message(update) confs = self.user_requests[user] confs.user_query = message.text or "" - logging.info(f"@{user.username} started the team with [{confs.team_model.label}, {confs.team_leader_model.label}, {confs.strategy.label}]") - await self.__run_team(update, confs) + logging.info(f"@{user.username} started the team with [{confs.query_analyzer_model.label}, {confs.team_model.label}, {confs.team_leader_model.label}, {confs.report_generation_model.label}, {confs.strategy.label}]") + await self.__run(update, confs) logging.info(f"@{user.username} team finished.") return ConversationHandler.END - async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - query, user = await self.handle_callbackquery(update) - logging.info(f"@{user.username} canceled the conversation.") - if user in self.user_requests: - del self.user_requests[user] - await query.edit_message_text("Conversation canceled. Use /start to begin again.") - return ConversationHandler.END - async def __run_team(self, update: Update, inputs: PipelineInputs) -> None: + ########################################## + # RUN APP + ########################################## + async def __run(self, update: Update, inputs: PipelineInputs) -> None: if not update.message: return bot = update.get_bot() @@ -221,8 +266,10 @@ class TelegramApp: configs_str = [ 'Running with configurations: ', + f'Check: {inputs.query_analyzer_model.label}', + f'Leader: {inputs.team_leader_model.label}', f'Team: {inputs.team_model.label}', - f'Output: {inputs.team_leader_model.label}', + f'Report: {inputs.report_generation_model.label}', f'Strategy: {inputs.strategy.label}', f'Query: "{inputs.user_query}"' ]