Update telegram interface (#44)

* Rename telegram file
* Added LLM providers selection
* Updated callback handlers
* Improved telegram user waiting message
This commit was merged in pull request #44.
This commit is contained in:
Giacomo Bertolazzi
2025-10-27 12:42:13 +01:00
committed by GitHub
parent 93174afc81
commit 551b6a049f
5 changed files with 226 additions and 89 deletions

View File

@@ -41,6 +41,13 @@ class PipelineInputs:
# ======================
# Dropdown handlers
# ======================
def choose_query_checker(self, index: int):
"""
Sceglie il modello LLM da usare per l'analizzatore di query.
"""
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
self.query_analyzer_model = self.configs.models.all_models[index]
def choose_team_leader(self, index: int):
"""
Sceglie il modello LLM da usare per il Team Leader.
@@ -55,6 +62,13 @@ class PipelineInputs:
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
self.team_model = self.configs.models.all_models[index]
def choose_report_generator(self, index: int):
"""
Sceglie il modello LLM da usare per il generatore di report.
"""
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
self.report_generation_model = self.configs.models.all_models[index]
def choose_strategy(self, index: int):
"""
Sceglie la strategia da usare per il Team.
@@ -119,3 +133,81 @@ class PipelineInputs:
social_tool = SocialAPIsTool()
social_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds)
return market_tool, news_tool, social_tool
def __str__(self) -> str:
return "\n".join([
f"Query Check: {self.query_analyzer_model.label}",
f"Team Leader: {self.team_leader_model.label}",
f"Team: {self.team_model.label}",
f"Report: {self.report_generation_model.label}",
f"Strategy: {self.strategy.label}",
f"User Query: \"{self.user_query}\"",
])
class RunMessage:
"""
Classe per gestire i messaggi di stato durante l'esecuzione della pipeline.
Inizializza il messaggio con gli step e aggiorna lo stato, permettendo di ottenere
il messaggio più recente da inviare all'utente.
"""
def __init__(self, inputs: PipelineInputs, prefix: str = "", suffix: str = ""):
"""
Inizializza il messaggio di esecuzione con gli step iniziali.
Tre stati possibili per ogni step:
- In corso (🔳)
- In esecuzione (➡️)
- Completato (✅)
Lo stato di esecuzione può essere assegnato solo ad uno step alla volta.
Args:
inputs (PipelineInputs): Input della pipeline per mostrare la configurazione.
prefix (str, optional): Prefisso del messaggio. Defaults to "".
suffix (str, optional): Suffisso del messaggio. Defaults to "".
"""
self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n"
self.emojis = ['🔳', '➡️', '']
self.placeholder = '<<<>>>'
self.current = 0
self.steps_total = [
(f"{self.placeholder} Query Check", 1),
(f"{self.placeholder} Info Recovery", 0),
(f"{self.placeholder} Report Generation", 0),
]
def update(self) -> 'RunMessage':
"""
Sposta lo stato di esecuzione al passo successivo.
Lo step precedente completato viene marcato come completato.
Returns:
RunMessage: L'istanza aggiornata di RunMessage.
"""
text_curr, state_curr = self.steps_total[self.current]
self.steps_total[self.current] = (text_curr, state_curr + 1)
self.current = min(self.current + 1, len(self.steps_total))
if self.current < len(self.steps_total):
text_curr, state_curr = self.steps_total[self.current]
self.steps_total[self.current] = (text_curr, state_curr + 1)
return self
def update_step(self, text_extra: str = "") -> 'RunMessage':
"""
Aggiorna il messaggio per lo step corrente.
Args:
text_extra (str, optional): Testo aggiuntivo da includere nello step. Defaults to "".
"""
text_curr, state_curr = self.steps_total[self.current]
if text_extra:
text_curr = f"{text_curr.replace('', '')}\n╚═ {text_extra}"
self.steps_total[self.current] = (text_curr, state_curr)
return self
def get_latest(self) -> str:
"""
Restituisce il messaggio di esecuzione più recente.
Returns:
str: Messaggio di esecuzione aggiornato.
"""
steps = [msg.replace(self.placeholder, self.emojis[state]) for msg, state in self.steps_total]
return self.base_message + "\n".join(steps)

View File

@@ -33,7 +33,7 @@ class PipelineEvent(str, Enum):
(PipelineEvent.QUERY_ANALYZER, lambda _: logging.info(f"[{run_id}] Query Analyzer completed.")),
(PipelineEvent.INFO_RECOVERY, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")),
(PipelineEvent.REPORT_GENERATION, lambda _: logging.info(f"[{run_id}] Report Generation completed.")),
(PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name}] by {e.agent_name}.")),
(PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")),
(PipelineEvent.RUN_FINISHED, lambda _: logging.info(f"[{run_id}] Run completed.")),
]

View File

@@ -87,7 +87,7 @@ class WrapperHandler(Generic[WrapperType]):
Exception: If all wrappers fail after retries.
"""
logging.info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
logging.debug(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
results: dict[str, OutputType] = {}
starting_index = self.index

View File

@@ -1,4 +1,4 @@
from app.interface.chat import ChatManager
from app.interface.telegram_app import TelegramApp
from app.interface.telegram import TelegramApp
__all__ = ["ChatManager", "TelegramApp"]

View File

@@ -1,6 +1,8 @@
import asyncio
import io
import os
import json
from typing import Any
import httpx
import logging
import warnings
@@ -9,7 +11,7 @@ from markdown_pdf import MarkdownPdf, Section
from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User
from telegram.constants import ChatAction
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, ConversationHandler, MessageHandler, filters
from app.agents.pipeline import Pipeline, PipelineInputs
from app.agents.pipeline import Pipeline, PipelineEvent, PipelineInputs, RunMessage
# per per_message di ConversationHandler che rompe sempre qualunque input tu metta
warnings.filterwarnings("ignore")
@@ -21,23 +23,44 @@ logging = logging.getLogger("telegram")
# Un semplice schema delle interazioni:
# /start
# ║
# V
# v
# ╔══ CONFIGS <═════╗
# ║ ║ ╚══> SELECT_CONFIG
# ║ V
# ║ start_team (polling for updates)
# ║ v ^
# ║ MODELS ══════╝
# ║
# ╠══> start (polling for updates)
# ║ ║
# ║ V
# ║ v
# ╚═══> END
CONFIGS, SELECT_CONFIG = range(2)
CONFIGS, SELECT_MODEL, SELECT_CONFIG = range(3)
# Usato per separare la query arrivata da Telegram
QUERY_SEP = "|==|"
class ConfigsChat(Enum):
MODEL_CHECK = "Check Model"
MODEL_TEAM_LEADER = "Team Leader Model"
MODEL_TEAM = "Team Model"
MODEL_OUTPUT = "Output Model"
MODEL_REPORT = "Report Model"
CHANGE_MODELS = "Change Models"
STRATEGY = "Strategy"
CANCEL = "Cancel"
def get_inline_button(self, value_to_display:str="") -> InlineKeyboardButton:
display = self.value if not value_to_display else f"{self.value}: {value_to_display}"
return InlineKeyboardButton(display, callback_data=self.name)
def change_value(self, inputs: PipelineInputs, new_value:int) -> None:
functions_map = {
self.MODEL_CHECK.name: inputs.choose_query_checker,
self.MODEL_TEAM_LEADER.name: inputs.choose_team_leader,
self.MODEL_TEAM.name: inputs.choose_team,
self.MODEL_REPORT.name: inputs.choose_report_generator,
self.STRATEGY.name: inputs.choose_strategy,
}
functions_map[self.name](new_value)
class TelegramApp:
def __init__(self):
@@ -72,14 +95,21 @@ class TelegramApp:
entry_points=[CommandHandler('start', self.__start)],
states={
CONFIGS: [
CallbackQueryHandler(self.__model_team, pattern=ConfigsChat.MODEL_TEAM.name),
CallbackQueryHandler(self.__model_output, pattern=ConfigsChat.MODEL_OUTPUT.name),
CallbackQueryHandler(self.__models, pattern=ConfigsChat.CHANGE_MODELS.name),
CallbackQueryHandler(self.__strategy, pattern=ConfigsChat.STRATEGY.name),
CallbackQueryHandler(self.__cancel, pattern='^cancel$'),
MessageHandler(filters.TEXT, self.__start_team) # Any text message
CallbackQueryHandler(self.__cancel, pattern='^CANCEL$'),
MessageHandler(filters.TEXT, self.__start_llms) # Any text message
],
SELECT_MODEL: [
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_CHECK.name),
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_TEAM_LEADER.name),
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_TEAM.name),
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_REPORT.name),
CallbackQueryHandler(self.__go_to_start, pattern='^CANCEL$'),
],
SELECT_CONFIG: [
CallbackQueryHandler(self.__select_config, pattern=f"^__select_config{QUERY_SEP}.*$"),
CallbackQueryHandler(self.__go_to_start, pattern='^CANCEL$'),
]
},
fallbacks=[CommandHandler('start', self.__start)],
@@ -87,45 +117,28 @@ class TelegramApp:
self.app = app
def run(self) -> None:
"""
Start the Telegram bot polling. This will keep the bot running and listening for updates.\n
This function blocks until the bot is stopped.
"""
self.app.run_polling()
########################################
# Funzioni di utilità
########################################
async def start_message(self, user: User, query: CallbackQuery | Message) -> None:
confs = self.user_requests.setdefault(user, PipelineInputs())
str_model_team = f"{ConfigsChat.MODEL_TEAM.value}: {confs.team_model.label}"
str_model_output = f"{ConfigsChat.MODEL_OUTPUT.value}: {confs.team_leader_model.label}"
str_strategy = f"{ConfigsChat.STRATEGY.value}: {confs.strategy.label}"
msg, keyboard = (
"Please choose an option or write your query",
InlineKeyboardMarkup([
[InlineKeyboardButton(str_model_team, callback_data=ConfigsChat.MODEL_TEAM.name)],
[InlineKeyboardButton(str_model_output, callback_data=ConfigsChat.MODEL_OUTPUT.name)],
[InlineKeyboardButton(str_strategy, callback_data=ConfigsChat.STRATEGY.name)],
[InlineKeyboardButton("Cancel", callback_data='cancel')]
])
)
if isinstance(query, CallbackQuery):
await query.edit_message_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2')
else:
await query.reply_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2')
async def handle_callbackquery(self, update: Update) -> tuple[CallbackQuery, User]:
assert update.callback_query and update.callback_query.from_user, "Update callback_query or user is None"
assert update.callback_query, "Update callback_query is None"
assert update.effective_user, "Update effective_user is None"
query = update.callback_query
await query.answer() # Acknowledge the callback query
return query, query.from_user
return query, update.effective_user
async def handle_message(self, update: Update) -> tuple[Message, User]:
assert update.message and update.message.from_user, "Update message or user is None"
return update.message, update.message.from_user
def handle_message(self, update: Update) -> tuple[Message, User]:
assert update.message and update.effective_user, "Update message or user is None"
return update.message, update.effective_user
def build_callback_data(self, callback: str, config: ConfigsChat, labels: list[str]) -> list[tuple[str, str]]:
return [(label, QUERY_SEP.join((callback, config.value, str(i)))) for i, label in enumerate(labels)]
return [(label, QUERY_SEP.join((callback, config.name, str(i)))) for i, label in enumerate(labels)]
async def __error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
@@ -142,28 +155,69 @@ class TelegramApp:
logging.exception("Exception in the error handler")
#########################################
# Funzioni async per i comandi e messaggi
# Funzioni base di gestione stati
#########################################
async def __start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
message, user = await self.handle_message(update)
logging.info(f"@{user.username} started the conversation.")
await self.start_message(user, message)
user = update.effective_user.username if update.effective_user else "Unknown"
logging.info(f"@{user} started the conversation.")
return await self.__go_to_start(update, context)
async def __go_to_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user = update.effective_user
assert user, "Update effective_user is None"
msg = update.callback_query if update.callback_query else update.message
assert msg, "Update message and callback_query are both None"
confs = self.user_requests.setdefault(user, PipelineInputs()) # despite the name, it creates a default only if not present
args: dict[str, Any] = {
"text": "Please choose an option or write your query",
"parse_mode": 'MarkdownV2',
"reply_markup": InlineKeyboardMarkup([
[ConfigsChat.CHANGE_MODELS.get_inline_button()],
[ConfigsChat.STRATEGY.get_inline_button(confs.strategy.label)],
[ConfigsChat.CANCEL.get_inline_button()],
])
}
await (msg.edit_message_text(**args) if isinstance(msg, CallbackQuery) else msg.reply_text(**args))
return CONFIGS
async def __model_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
return await self._model_select(update, ConfigsChat.MODEL_TEAM)
async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query, user = await self.handle_callbackquery(update)
logging.info(f"@{user.username} canceled the conversation.")
if user in self.user_requests:
del self.user_requests[user]
await query.edit_message_text("Conversation canceled. Use /start to begin again.")
return ConversationHandler.END
async def __model_output(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
return await self._model_select(update, ConfigsChat.MODEL_OUTPUT)
##########################################
# Configurazioni
##########################################
async def __models(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query, user = await self.handle_callbackquery(update)
req = self.user_requests[user]
async def _model_select(self, update: Update, state: ConfigsChat, msg: str | None = None) -> int:
await query.edit_message_text("Select a model", reply_markup=InlineKeyboardMarkup([
[ConfigsChat.MODEL_CHECK.get_inline_button(req.query_analyzer_model.label)],
[ConfigsChat.MODEL_TEAM_LEADER.get_inline_button(req.team_leader_model.label)],
[ConfigsChat.MODEL_TEAM.get_inline_button(req.team_model.label)],
[ConfigsChat.MODEL_REPORT.get_inline_button(req.report_generation_model.label)],
[ConfigsChat.CANCEL.get_inline_button()]
]))
return SELECT_MODEL
async def __model_select(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query, user = await self.handle_callbackquery(update)
if not query.data:
logging.error("Callback query data is None")
return CONFIGS
req = self.user_requests[user]
models = self.build_callback_data("__select_config", state, req.list_models_names())
models = self.build_callback_data("__select_config", ConfigsChat[query.data], req.list_models_names())
inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models]
await query.edit_message_text(msg or state.value, reply_markup=InlineKeyboardMarkup(inline_btns))
await query.edit_message_text("Select a model", reply_markup=InlineKeyboardMarkup(inline_btns))
return SELECT_CONFIG
async def __strategy(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
@@ -182,71 +236,62 @@ class TelegramApp:
req = self.user_requests[user]
_, state, index = str(query.data).split(QUERY_SEP)
if state == str(ConfigsChat.MODEL_TEAM):
req.choose_team(int(index))
if state == str(ConfigsChat.MODEL_OUTPUT):
req.choose_team_leader(int(index))
if state == str(ConfigsChat.STRATEGY):
req.choose_strategy(int(index))
ConfigsChat[state].change_value(req, int(index))
await self.start_message(user, query)
return CONFIGS
return await self.__go_to_start(update, context)
async def __start_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
message, user = await self.handle_message(update)
async def __start_llms(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
message, user = self.handle_message(update)
confs = self.user_requests[user]
confs.user_query = message.text or ""
logging.info(f"@{user.username} started the team with [{confs.team_model.label}, {confs.team_leader_model.label}, {confs.strategy.label}]")
await self.__run_team(update, confs)
logging.info(f"@{user.username} started the team with [{confs.query_analyzer_model.label}, {confs.team_model.label}, {confs.team_leader_model.label}, {confs.report_generation_model.label}, {confs.strategy.label}]")
await self.__run(update, confs)
logging.info(f"@{user.username} team finished.")
return ConversationHandler.END
async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query, user = await self.handle_callbackquery(update)
logging.info(f"@{user.username} canceled the conversation.")
if user in self.user_requests:
del self.user_requests[user]
await query.edit_message_text("Conversation canceled. Use /start to begin again.")
return ConversationHandler.END
async def __run_team(self, update: Update, inputs: PipelineInputs) -> None:
##########################################
# RUN APP
##########################################
async def __run(self, update: Update, inputs: PipelineInputs) -> None:
if not update.message: return
bot = update.get_bot()
msg_id = update.message.message_id - 1
chat_id = update.message.chat_id
configs_str = [
'Running with configurations: ',
f'Team: {inputs.team_model.label}',
f'Output: {inputs.team_leader_model.label}',
f'Strategy: {inputs.strategy.label}',
f'Query: "{inputs.user_query}"'
]
full_message = f"""```\n{'\n'.join(configs_str)}\n```\n\n"""
first_message = full_message + "Generating report, please wait"
msg = await bot.edit_message_text(chat_id=chat_id, message_id=msg_id, text=first_message, parse_mode='MarkdownV2')
run_message = RunMessage(inputs, prefix="```\n", suffix="\n```")
msg = await bot.edit_message_text(chat_id=chat_id, message_id=msg_id, text=run_message.get_latest(), parse_mode='MarkdownV2')
if isinstance(msg, bool): return
# Remove user query and bot message
await bot.delete_message(chat_id=chat_id, message_id=update.message.id)
# TODO migliorare messaggi di attesa
def update_user(update_step: str = "") -> None:
if update_step: run_message.update_step(update_step)
else: run_message.update()
message = run_message.get_latest()
if msg.text != message:
asyncio.create_task(msg.edit_text(message, parse_mode='MarkdownV2'))
await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
pipeline = Pipeline(inputs)
report_content = await pipeline.interact_async()
await msg.delete()
report_content = await pipeline.interact_async(listeners=[
(PipelineEvent.QUERY_CHECK, lambda _: update_user()),
(PipelineEvent.TOOL_USED, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))),
(PipelineEvent.INFO_RECOVERY, lambda _: update_user()),
(PipelineEvent.REPORT_GENERATION, lambda _: update_user()),
])
# attach report file to the message
pdf = MarkdownPdf(toc_level=2, optimize=True)
pdf.add_section(Section(report_content, toc=False))
# TODO vedere se ha senso dare il pdf o solo il messaggio
document = io.BytesIO()
pdf.save_bytes(document)
document.seek(0)
await bot.send_document(chat_id=chat_id, document=document, filename="report.pdf", parse_mode='MarkdownV2', caption=full_message)
await msg.reply_document(document=document, filename="report.pdf")