diff --git a/src/app/__main__.py b/src/app/__main__.py index a1dd453..7c6557a 100644 --- a/src/app/__main__.py +++ b/src/app/__main__.py @@ -3,7 +3,7 @@ import asyncio import logging from dotenv import load_dotenv from app.configs import AppConfig -from app.interface import ChatManager, BotFunctions +from app.interface import * from app.agents import Pipeline @@ -14,14 +14,15 @@ if __name__ == "__main__": configs = AppConfig.load() pipeline = Pipeline(configs) - chat = ChatManager() + chat = ChatManager(pipeline) gradio = chat.gradio_build_interface() _app, local_url, share_url = gradio.launch(server_name="0.0.0.0", server_port=configs.port, quiet=True, prevent_thread_lock=True, share=configs.gradio_share) logging.info(f"UPO AppAI Chat is running on {share_url or local_url}") try: - telegram = BotFunctions.create_bot(share_url) - telegram.run_polling() + telegram = TelegramApp(pipeline) + telegram.add_miniapp_url(share_url) + telegram.run() except Exception as _: logging.warning("Telegram bot could not be started. Continuing without it.") asyncio.get_event_loop().run_forever() diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index 82a80da..f52cf28 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -18,7 +18,8 @@ class Pipeline: # Stato iniziale self.leader_model = self.configs.get_model_by_name(self.configs.agents.team_leader_model) - self.choose_strategy(0) + self.team_model = self.configs.get_model_by_name(self.configs.agents.team_model) + self.strategy = self.configs.get_strategy_by_name(self.configs.agents.strategy) # ====================== # Dropdown handlers @@ -33,7 +34,7 @@ class Pipeline: """ Sceglie la strategia da usare per il Predictor. """ - self.strat = self.configs.strategies[index].description + self.strategy = self.configs.strategies[index] # ====================== # Helpers @@ -61,14 +62,15 @@ class Pipeline: 4. Restituisce la strategia finale """ # Step 1: Creazione Team - team_model = self.configs.get_model_by_name(self.configs.agents.team_model) - team = create_team_with(self.configs, team_model, self.leader_model) + team = create_team_with(self.configs, self.team_model, self.leader_model) - # Step 1: raccolta output dai membri del Team + # Step 2: raccolta output dai membri del Team logging.info(f"Pipeline received query: {query}") + # TODO migliorare prompt (?) + query = f"The user query is: {query}\n\n They requested a {self.strategy.label} investment strategy." team_outputs = team.run(query) # type: ignore - # Step 2: recupero ouput + # Step 3: recupero ouput if not isinstance(team_outputs.content, str): logging.error(f"Team output is not a string: {team_outputs.content}") raise ValueError("Team output is not a string") diff --git a/src/app/configs.py b/src/app/configs.py index 6da942f..f0cd797 100644 --- a/src/app/configs.py +++ b/src/app/configs.py @@ -4,7 +4,7 @@ import ollama import yaml import logging.config import agno.utils.log # type: ignore -from typing import Any +from typing import Any, ClassVar from pydantic import BaseModel from agno.agent import Agent from agno.tools import Toolkit @@ -88,7 +88,7 @@ class AppConfig(BaseModel): models: ModelsConfig = ModelsConfig() agents: AgentsConfigs = AgentsConfigs() - __lock = threading.Lock() + _lock: ClassVar[threading.Lock] = threading.Lock() @classmethod def load(cls, file_path: str = "configs.yaml") -> 'AppConfig': @@ -110,7 +110,7 @@ class AppConfig(BaseModel): return configs def __new__(cls, *args: Any, **kwargs: Any) -> 'AppConfig': - with cls.__lock: + with cls._lock: if not hasattr(cls, 'instance'): cls.instance = super(AppConfig, cls).__new__(cls) return cls.instance @@ -145,6 +145,17 @@ class AppConfig(BaseModel): return strat raise ValueError(f"Strategy with name '{name}' not found.") + def get_defaults(self) -> tuple[AppModel, AppModel, Strategy]: + """ + Retrieve the default team model, leader model, and strategy. + Returns: + A tuple containing the default team model (AppModel), leader model (AppModel), and strategy (Strategy). + """ + team_model = self.get_model_by_name(self.agents.team_model) + leader_model = self.get_model_by_name(self.agents.team_leader_model) + strategy = self.get_strategy_by_name(self.agents.strategy) + return team_model, leader_model, strategy + def set_logging_level(self) -> None: """ Set the logging level based on the configuration. diff --git a/src/app/interface/__init__.py b/src/app/interface/__init__.py index a58cd50..186558a 100644 --- a/src/app/interface/__init__.py +++ b/src/app/interface/__init__.py @@ -1,4 +1,4 @@ from app.interface.chat import ChatManager -from app.interface.telegram_app import BotFunctions +from app.interface.telegram_app import TelegramApp -__all__ = ["ChatManager", "BotFunctions"] +__all__ = ["ChatManager", "TelegramApp"] diff --git a/src/app/interface/chat.py b/src/app/interface/chat.py index 6ce4807..aaba2af 100644 --- a/src/app/interface/chat.py +++ b/src/app/interface/chat.py @@ -12,9 +12,9 @@ class ChatManager: - salva e ricarica le chat """ - def __init__(self): + def __init__(self, pipeline: Pipeline): self.history: list[dict[str, str]] = [] # [{"role": "user"/"assistant", "content": "..."}] - self.pipeline = Pipeline() + self.pipeline = pipeline def send_message(self, message: str) -> None: """ @@ -106,7 +106,7 @@ class ChatManager: type="index", label="Stile di investimento" ) - style.change(fn=self.pipeline.choose_style, inputs=style, outputs=None) + style.change(fn=self.pipeline.choose_strategy, inputs=style, outputs=None) chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages") msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?") diff --git a/src/app/interface/telegram_app.py b/src/app/interface/telegram_app.py index e9559ee..a091797 100644 --- a/src/app/interface/telegram_app.py +++ b/src/app/interface/telegram_app.py @@ -5,12 +5,12 @@ import httpx import logging import warnings from enum import Enum -from typing import Any from markdown_pdf import MarkdownPdf, Section from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User from telegram.constants import ChatAction -from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, ConversationHandler, ExtBot, JobQueue, MessageHandler, filters +from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, ConversationHandler, MessageHandler, filters from app.agents.pipeline import Pipeline +from app.configs import AppConfig # per per_message di ConversationHandler che rompe sempre qualunque input tu metta warnings.filterwarnings("ignore") @@ -32,73 +32,83 @@ logging = logging.getLogger(__name__) # ╚═══> END CONFIGS, SELECT_CONFIG = range(2) +# Usato per separare la query arrivata da Telegram +QUERY_SEP = "|==|" + class ConfigsChat(Enum): MODEL_TEAM = "Team Model" MODEL_OUTPUT = "Output Model" STRATEGY = "Strategy" class ConfigsRun: - def __init__(self): - self.model_team = Pipeline.available_models[0] - self.model_output = Pipeline.available_models[0] - self.strategy = Pipeline.all_styles[0] + def __init__(self, configs: AppConfig): + team, leader, strategy = configs.get_defaults() + self.team_model = team + self.leader_model = leader + self.strategy = strategy self.user_query = "" - -class BotFunctions: - - # In theory this is already thread-safe if run with CPython - users_req: dict[User, ConfigsRun] - - # che incubo di typing - @staticmethod - def create_bot(miniapp_url: str | None = None) -> Application[ExtBot[None], ContextTypes.DEFAULT_TYPE, dict[str, Any], dict[str, Any], dict[str, Any], JobQueue[ContextTypes.DEFAULT_TYPE]]: - """ - Create a Telegram bot application instance. - Assumes the TELEGRAM_BOT_TOKEN environment variable is set. - Returns: - Application: The Telegram bot application instance. - Raises: - AssertionError: If the TELEGRAM_BOT_TOKEN environment variable is not set. - """ - BotFunctions.users_req = {} - - token = os.getenv("TELEGRAM_BOT_TOKEN", '') +class TelegramApp: + def __init__(self, pipeline: Pipeline): + token = os.getenv("TELEGRAM_BOT_TOKEN") assert token, "TELEGRAM_BOT_TOKEN environment variable not set" - if miniapp_url: BotFunctions.update_miniapp_url(miniapp_url, token) - app = Application.builder().token(token).build() + self.user_requests: dict[User, ConfigsRun] = {} + self.pipeline = pipeline + self.token = token + self.create_bot() + def add_miniapp_url(self, url: str) -> None: + try: + endpoint = f"https://api.telegram.org/bot{self.token}/setChatMenuButton" + payload = {"menu_button": json.dumps({ + "type": "web_app", + "text": "MiniApp", + "web_app": { "url": url } + })} + httpx.post(endpoint, data=payload) + except httpx.HTTPError as e: + logging.info(f"Failed to update mini app URL: {e}") + + def create_bot(self) -> None: + """ + Initialize the Telegram bot and set up the conversation handler. + """ + app = Application.builder().token(self.token).build() + + app.add_error_handler(self.__error_handler) app.add_handler(ConversationHandler( per_message=False, # capire a cosa serve perchè da un warning quando parte il server - entry_points=[CommandHandler('start', BotFunctions.__start)], + entry_points=[CommandHandler('start', self.__start)], states={ CONFIGS: [ - CallbackQueryHandler(BotFunctions.__model_team, pattern=ConfigsChat.MODEL_TEAM.name), - CallbackQueryHandler(BotFunctions.__model_output, pattern=ConfigsChat.MODEL_OUTPUT.name), - CallbackQueryHandler(BotFunctions.__strategy, pattern=ConfigsChat.STRATEGY.name), - CallbackQueryHandler(BotFunctions.__cancel, pattern='^cancel$'), - MessageHandler(filters.TEXT, BotFunctions.__start_team) # Any text message + CallbackQueryHandler(self.__model_team, pattern=ConfigsChat.MODEL_TEAM.name), + CallbackQueryHandler(self.__model_output, pattern=ConfigsChat.MODEL_OUTPUT.name), + CallbackQueryHandler(self.__strategy, pattern=ConfigsChat.STRATEGY.name), + CallbackQueryHandler(self.__cancel, pattern='^cancel$'), + MessageHandler(filters.TEXT, self.__start_team) # Any text message ], SELECT_CONFIG: [ - CallbackQueryHandler(BotFunctions.__select_config, pattern='^__select_config:.*$'), + CallbackQueryHandler(self.__select_config, pattern=f"^__select_config{QUERY_SEP}.*$"), ] }, - fallbacks=[CommandHandler('start', BotFunctions.__start)], + fallbacks=[CommandHandler('start', self.__start)], )) - return app + self.app = app + + def run(self) -> None: + self.app.run_polling() ######################################## # Funzioni di utilità ######################################## - @staticmethod - async def start_message(user: User, query: CallbackQuery | Message) -> None: - confs = BotFunctions.users_req.setdefault(user, ConfigsRun()) + async def start_message(self, user: User, query: CallbackQuery | Message) -> None: + confs = self.user_requests.setdefault(user, ConfigsRun(self.pipeline.configs)) - str_model_team = f"{ConfigsChat.MODEL_TEAM.value}: {confs.model_team.name}" - str_model_output = f"{ConfigsChat.MODEL_OUTPUT.value}: {confs.model_output.name}" - str_strategy = f"{ConfigsChat.STRATEGY.value}: {confs.strategy.name}" + str_model_team = f"{ConfigsChat.MODEL_TEAM.value}: {confs.team_model.label}" + str_model_output = f"{ConfigsChat.MODEL_OUTPUT.value}: {confs.leader_model.label}" + str_strategy = f"{ConfigsChat.STRATEGY.value}: {confs.strategy.label}" msg, keyboard = ( "Please choose an option or write your query", @@ -115,113 +125,103 @@ class BotFunctions: else: await query.reply_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2') - @staticmethod - async def handle_configs(update: Update, state: ConfigsChat, msg: str | None = None) -> int: - query, _ = await BotFunctions.handle_callbackquery(update) - - models = [(m.name, f"__select_config:{state}:{m.name}") for m in Pipeline.available_models] - inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models] - - await query.edit_message_text(msg or state.value, reply_markup=InlineKeyboardMarkup(inline_btns)) - return SELECT_CONFIG - - @staticmethod - async def handle_callbackquery(update: Update) -> tuple[CallbackQuery, User]: + async def handle_callbackquery(self, update: Update) -> tuple[CallbackQuery, User]: assert update.callback_query and update.callback_query.from_user, "Update callback_query or user is None" query = update.callback_query await query.answer() # Acknowledge the callback query return query, query.from_user - @staticmethod - async def handle_message(update: Update) -> tuple[Message, User]: + async def handle_message(self, update: Update) -> tuple[Message, User]: assert update.message and update.message.from_user, "Update message or user is None" return update.message, update.message.from_user - @staticmethod - def update_miniapp_url(url: str, token: str) -> None: + def callback_data(self, strings: list[str]) -> str: + return QUERY_SEP.join(strings) + + async def __error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None: try: - endpoint = f"https://api.telegram.org/bot{token}/setChatMenuButton" - payload = {"menu_button": json.dumps({ - "type": "web_app", - "text": "MiniApp", - "web_app": { - "url": url - } - })} - httpx.post(endpoint, data=payload) - except httpx.HTTPError as e: - logging.info(f"Failed to update mini app URL: {e}") + logging.exception(f"Unhandled exception in Telegram handler {context.error}") + + # Try to notify the user in chat if possible + if isinstance(update, Update) and update.effective_chat: + chat_id = update.effective_chat.id + msg = "Si è verificato un errore inatteso. Gli sviluppatori sono stati avvisati." + await context.bot.send_message(chat_id=chat_id, text=msg) + + except Exception: + # Ensure we never raise from the error handler itself + logging.exception("Exception in the error handler") ######################################### # Funzioni async per i comandi e messaggi ######################################### - @staticmethod - async def __start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - message, user = await BotFunctions.handle_message(update) + async def __start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + message, user = await self.handle_message(update) logging.info(f"@{user.username} started the conversation.") - await BotFunctions.start_message(user, message) + await self.start_message(user, message) return CONFIGS - @staticmethod - async def __model_team(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - return await BotFunctions.handle_configs(update, ConfigsChat.MODEL_TEAM) + async def __model_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + return await self._model_select(update, ConfigsChat.MODEL_TEAM) - @staticmethod - async def __model_output(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - return await BotFunctions.handle_configs(update, ConfigsChat.MODEL_OUTPUT) + async def __model_output(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + return await self._model_select(update, ConfigsChat.MODEL_OUTPUT) - @staticmethod - async def __strategy(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - query, _ = await BotFunctions.handle_callbackquery(update) + async def _model_select(self, update: Update, state: ConfigsChat, msg: str | None = None) -> int: + query, _ = await self.handle_callbackquery(update) - strategies = [(s.name, f"__select_config:{ConfigsChat.STRATEGY}:{s.name}") for s in Pipeline.all_styles] + models = [(m.label, self.callback_data([f"__select_config", str(state), m.name])) for m in self.pipeline.configs.models.all_models] + inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models] + + await query.edit_message_text(msg or state.value, reply_markup=InlineKeyboardMarkup(inline_btns)) + return SELECT_CONFIG + + async def __strategy(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query, _ = await self.handle_callbackquery(update) + + strategies = [(s.label, self.callback_data([f"__select_config", str(ConfigsChat.STRATEGY), s.name])) for s in self.pipeline.configs.strategies] inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in strategies] await query.edit_message_text("Select a strategy", reply_markup=InlineKeyboardMarkup(inline_btns)) return SELECT_CONFIG - @staticmethod - async def __select_config(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - query, user = await BotFunctions.handle_callbackquery(update) - logging.info(f"@{user.username} --> {query.data}") + async def __select_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query, user = await self.handle_callbackquery(update) + logging.debug(f"@{user.username} --> {query.data}") - req = BotFunctions.users_req[user] - - _, state, model_name = str(query.data).split(':') + req = self.user_requests[user] + _, state, model_name = str(query.data).split(QUERY_SEP) if state == str(ConfigsChat.MODEL_TEAM): - req.model_team = AppModels[model_name] + req.team_model = self.pipeline.configs.get_model_by_name(model_name) if state == str(ConfigsChat.MODEL_OUTPUT): - req.model_output = AppModels[model_name] + req.leader_model = self.pipeline.configs.get_model_by_name(model_name) if state == str(ConfigsChat.STRATEGY): - req.strategy = PredictorStyle[model_name] + req.strategy = self.pipeline.configs.get_strategy_by_name(model_name) - await BotFunctions.start_message(user, query) + await self.start_message(user, query) return CONFIGS - @staticmethod - async def __start_team(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - message, user = await BotFunctions.handle_message(update) + async def __start_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + message, user = await self.handle_message(update) - confs = BotFunctions.users_req[user] + confs = self.user_requests[user] confs.user_query = message.text or "" - logging.info(f"@{user.username} started the team with [{confs.model_team}, {confs.model_output}, {confs.strategy}]") - await BotFunctions.__run_team(update, confs) + logging.info(f"@{user.username} started the team with [{confs.team_model}, {confs.leader_model}, {confs.strategy}]") + await self.__run_team(update, confs) logging.info(f"@{user.username} team finished.") return ConversationHandler.END - @staticmethod - async def __cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: - query, user = await BotFunctions.handle_callbackquery(update) + async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: + query, user = await self.handle_callbackquery(update) logging.info(f"@{user.username} canceled the conversation.") - if user in BotFunctions.users_req: - del BotFunctions.users_req[user] + if user in self.user_requests: + del self.user_requests[user] await query.edit_message_text("Conversation canceled. Use /start to begin again.") return ConversationHandler.END - @staticmethod - async def __run_team(update: Update, confs: ConfigsRun) -> None: + async def __run_team(self, update: Update, confs: ConfigsRun) -> None: if not update.message: return bot = update.get_bot() @@ -230,9 +230,9 @@ class BotFunctions: configs_str = [ 'Running with configurations: ', - f'Team: {confs.model_team.name}', - f'Output: {confs.model_output.name}', - f'Strategy: {confs.strategy.name}', + f'Team: {confs.team_model.label}', + f'Output: {confs.leader_model.label}', + f'Strategy: {confs.strategy.label}', f'Query: "{confs.user_query}"' ] full_message = f"""```\n{'\n'.join(configs_str)}\n```\n\n""" @@ -242,14 +242,13 @@ class BotFunctions: # Remove user query and bot message await bot.delete_message(chat_id=chat_id, message_id=update.message.id) - # TODO settare correttamente i modelli - pipeline = Pipeline() - #pipeline.choose_predictor(Pipeline.available_models.index(confs.model_team)) - pipeline.choose_style(Pipeline.all_styles.index(confs.strategy)) + self.pipeline.leader_model = confs.leader_model + self.pipeline.team_model = confs.team_model + self.pipeline.strategy = confs.strategy # TODO migliorare messaggi di attesa await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) - report_content = pipeline.interact(confs.user_query) + report_content = self.pipeline.interact(confs.user_query) await msg.delete() # attach report file to the message