Update telegram interface #44
@@ -41,6 +41,13 @@ class PipelineInputs:
|
|||||||
# ======================
|
# ======================
|
||||||
# Dropdown handlers
|
# Dropdown handlers
|
||||||
# ======================
|
# ======================
|
||||||
|
def choose_query_checker(self, index: int):
|
||||||
|
"""
|
||||||
|
Sceglie il modello LLM da usare per l'analizzatore di query.
|
||||||
|
"""
|
||||||
|
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||||
|
self.query_analyzer_model = self.configs.models.all_models[index]
|
||||||
|
|
||||||
def choose_team_leader(self, index: int):
|
def choose_team_leader(self, index: int):
|
||||||
"""
|
"""
|
||||||
Sceglie il modello LLM da usare per il Team Leader.
|
Sceglie il modello LLM da usare per il Team Leader.
|
||||||
@@ -55,6 +62,13 @@ class PipelineInputs:
|
|||||||
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||||
self.team_model = self.configs.models.all_models[index]
|
self.team_model = self.configs.models.all_models[index]
|
||||||
|
|
||||||
|
def choose_report_generator(self, index: int):
|
||||||
|
"""
|
||||||
|
Sceglie il modello LLM da usare per il generatore di report.
|
||||||
|
"""
|
||||||
|
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||||
|
self.report_generation_model = self.configs.models.all_models[index]
|
||||||
|
|
||||||
def choose_strategy(self, index: int):
|
def choose_strategy(self, index: int):
|
||||||
"""
|
"""
|
||||||
Sceglie la strategia da usare per il Team.
|
Sceglie la strategia da usare per il Team.
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
|
from typing import Any
|
||||||
import httpx
|
import httpx
|
||||||
import logging
|
import logging
|
||||||
import warnings
|
import warnings
|
||||||
@@ -21,23 +22,46 @@ logging = logging.getLogger("telegram")
|
|||||||
# Un semplice schema delle interazioni:
|
# Un semplice schema delle interazioni:
|
||||||
# /start
|
# /start
|
||||||
|
|
|||||||
# ║
|
# ║
|
||||||
# V
|
# v
|
||||||
# ╔══ CONFIGS <═════╗
|
# ╔══ CONFIGS <═════╗
|
||||||
# ║ ║ ╚══> SELECT_CONFIG
|
# ║ ║ ╚══> SELECT_CONFIG
|
||||||
# ║ V
|
# ║ v ^
|
||||||
# ║ start_team (polling for updates)
|
# ║ MODELS ══════╝
|
||||||
|
# ║
|
||||||
|
# ╠══> start (polling for updates)
|
||||||
# ║ ║
|
# ║ ║
|
||||||
# ║ V
|
# ║ v
|
||||||
# ╚═══> END
|
# ╚═══> END
|
||||||
CONFIGS, SELECT_CONFIG = range(2)
|
CONFIGS, SELECT_MODEL, SELECT_CONFIG = range(3)
|
||||||
|
|
||||||
# Usato per separare la query arrivata da Telegram
|
# Usato per separare la query arrivata da Telegram
|
||||||
QUERY_SEP = "|==|"
|
QUERY_SEP = "|==|"
|
||||||
|
|
||||||
class ConfigsChat(Enum):
|
class ConfigsChat(Enum):
|
||||||
|
MODEL_CHECK = "Check Model"
|
||||||
|
MODEL_TEAM_LEADER = "Team Leader Model"
|
||||||
MODEL_TEAM = "Team Model"
|
MODEL_TEAM = "Team Model"
|
||||||
MODEL_OUTPUT = "Output Model"
|
MODEL_REPORT = "Report Model"
|
||||||
|
CHANGE_MODELS = "Change Models"
|
||||||
STRATEGY = "Strategy"
|
STRATEGY = "Strategy"
|
||||||
|
CANCEL = "Cancel"
|
||||||
|
|
||||||
|
def get_inline_button(self, value_to_display:str="") -> InlineKeyboardButton:
|
||||||
|
display = self.value if not value_to_display else f"{self.value}: {value_to_display}"
|
||||||
|
return InlineKeyboardButton(display, callback_data=self.name)
|
||||||
|
|
||||||
|
def change_value(self, inputs: PipelineInputs, new_value:int) -> None:
|
||||||
|
if self.name == self.MODEL_CHECK.name:
|
||||||
|
inputs.choose_query_checker(new_value)
|
||||||
|
elif self.name == self.MODEL_TEAM_LEADER.name:
|
||||||
|
inputs.choose_team_leader(new_value)
|
||||||
|
elif self.name == self.MODEL_TEAM.name:
|
||||||
|
inputs.choose_team(new_value)
|
||||||
|
elif self.name == self.MODEL_REPORT.name:
|
||||||
|
inputs.choose_report_generator(new_value)
|
||||||
|
elif self.name == self.STRATEGY.name:
|
||||||
|
inputs.choose_strategy(new_value)
|
||||||
|
|
||||||
|
|
||||||
class TelegramApp:
|
class TelegramApp:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -72,14 +96,21 @@ class TelegramApp:
|
|||||||
entry_points=[CommandHandler('start', self.__start)],
|
entry_points=[CommandHandler('start', self.__start)],
|
||||||
states={
|
states={
|
||||||
CONFIGS: [
|
CONFIGS: [
|
||||||
CallbackQueryHandler(self.__model_team, pattern=ConfigsChat.MODEL_TEAM.name),
|
CallbackQueryHandler(self.__models, pattern=ConfigsChat.CHANGE_MODELS.name),
|
||||||
CallbackQueryHandler(self.__model_output, pattern=ConfigsChat.MODEL_OUTPUT.name),
|
|
||||||
CallbackQueryHandler(self.__strategy, pattern=ConfigsChat.STRATEGY.name),
|
CallbackQueryHandler(self.__strategy, pattern=ConfigsChat.STRATEGY.name),
|
||||||
CallbackQueryHandler(self.__cancel, pattern='^cancel$'),
|
CallbackQueryHandler(self.__cancel, pattern='^CANCEL$'),
|
||||||
MessageHandler(filters.TEXT, self.__start_team) # Any text message
|
MessageHandler(filters.TEXT, self.__start_llms) # Any text message
|
||||||
|
],
|
||||||
|
SELECT_MODEL: [
|
||||||
|
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_CHECK.name),
|
||||||
|
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_TEAM_LEADER.name),
|
||||||
|
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_TEAM.name),
|
||||||
|
CallbackQueryHandler(self.__model_select, pattern=ConfigsChat.MODEL_REPORT.name),
|
||||||
|
CallbackQueryHandler(self.__go_to_start, pattern='^CANCEL$'),
|
||||||
],
|
],
|
||||||
SELECT_CONFIG: [
|
SELECT_CONFIG: [
|
||||||
CallbackQueryHandler(self.__select_config, pattern=f"^__select_config{QUERY_SEP}.*$"),
|
CallbackQueryHandler(self.__select_config, pattern=f"^__select_config{QUERY_SEP}.*$"),
|
||||||
|
CallbackQueryHandler(self.__go_to_start, pattern='^CANCEL$'),
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
fallbacks=[CommandHandler('start', self.__start)],
|
fallbacks=[CommandHandler('start', self.__start)],
|
||||||
@@ -87,45 +118,28 @@ class TelegramApp:
|
|||||||
self.app = app
|
self.app = app
|
||||||
|
|
||||||
def run(self) -> None:
|
def run(self) -> None:
|
||||||
|
"""
|
||||||
|
Start the Telegram bot polling. This will keep the bot running and listening for updates.\n
|
||||||
|
This function blocks until the bot is stopped.
|
||||||
|
"""
|
||||||
self.app.run_polling()
|
self.app.run_polling()
|
||||||
|
|
||||||
########################################
|
########################################
|
||||||
# Funzioni di utilità
|
# Funzioni di utilità
|
||||||
########################################
|
########################################
|
||||||
async def start_message(self, user: User, query: CallbackQuery | Message) -> None:
|
|
||||||
confs = self.user_requests.setdefault(user, PipelineInputs())
|
|
||||||
|
|
||||||
str_model_team = f"{ConfigsChat.MODEL_TEAM.value}: {confs.team_model.label}"
|
|
||||||
str_model_output = f"{ConfigsChat.MODEL_OUTPUT.value}: {confs.team_leader_model.label}"
|
|
||||||
str_strategy = f"{ConfigsChat.STRATEGY.value}: {confs.strategy.label}"
|
|
||||||
|
|
||||||
msg, keyboard = (
|
|
||||||
"Please choose an option or write your query",
|
|
||||||
InlineKeyboardMarkup([
|
|
||||||
[InlineKeyboardButton(str_model_team, callback_data=ConfigsChat.MODEL_TEAM.name)],
|
|
||||||
[InlineKeyboardButton(str_model_output, callback_data=ConfigsChat.MODEL_OUTPUT.name)],
|
|
||||||
[InlineKeyboardButton(str_strategy, callback_data=ConfigsChat.STRATEGY.name)],
|
|
||||||
[InlineKeyboardButton("Cancel", callback_data='cancel')]
|
|
||||||
])
|
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(query, CallbackQuery):
|
|
||||||
await query.edit_message_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2')
|
|
||||||
else:
|
|
||||||
await query.reply_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2')
|
|
||||||
|
|
||||||
async def handle_callbackquery(self, update: Update) -> tuple[CallbackQuery, User]:
|
async def handle_callbackquery(self, update: Update) -> tuple[CallbackQuery, User]:
|
||||||
assert update.callback_query and update.callback_query.from_user, "Update callback_query or user is None"
|
assert update.callback_query, "Update callback_query is None"
|
||||||
|
assert update.effective_user, "Update effective_user is None"
|
||||||
query = update.callback_query
|
query = update.callback_query
|
||||||
await query.answer() # Acknowledge the callback query
|
await query.answer() # Acknowledge the callback query
|
||||||
return query, query.from_user
|
return query, update.effective_user
|
||||||
|
|
||||||
async def handle_message(self, update: Update) -> tuple[Message, User]:
|
def handle_message(self, update: Update) -> tuple[Message, User]:
|
||||||
assert update.message and update.message.from_user, "Update message or user is None"
|
assert update.message and update.effective_user, "Update message or user is None"
|
||||||
return update.message, update.message.from_user
|
return update.message, update.effective_user
|
||||||
|
|
||||||
def build_callback_data(self, callback: str, config: ConfigsChat, labels: list[str]) -> list[tuple[str, str]]:
|
def build_callback_data(self, callback: str, config: ConfigsChat, labels: list[str]) -> list[tuple[str, str]]:
|
||||||
return [(label, QUERY_SEP.join((callback, config.value, str(i)))) for i, label in enumerate(labels)]
|
return [(label, QUERY_SEP.join((callback, config.name, str(i)))) for i, label in enumerate(labels)]
|
||||||
|
|
||||||
async def __error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
|
async def __error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
try:
|
try:
|
||||||
@@ -142,28 +156,69 @@ class TelegramApp:
|
|||||||
logging.exception("Exception in the error handler")
|
logging.exception("Exception in the error handler")
|
||||||
|
|
||||||
#########################################
|
#########################################
|
||||||
# Funzioni async per i comandi e messaggi
|
# Funzioni base di gestione stati
|
||||||
#########################################
|
#########################################
|
||||||
async def __start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
async def __start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||||
message, user = await self.handle_message(update)
|
user = update.effective_user.username if update.effective_user else "Unknown"
|
||||||
logging.info(f"@{user.username} started the conversation.")
|
logging.info(f"@{user} started the conversation.")
|
||||||
await self.start_message(user, message)
|
return await self.__go_to_start(update, context)
|
||||||
|
|
||||||
|
async def __go_to_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||||
|
user = update.effective_user
|
||||||
|
assert user, "Update effective_user is None"
|
||||||
|
msg = update.callback_query if update.callback_query else update.message
|
||||||
|
assert msg, "Update message and callback_query are both None"
|
||||||
|
|
||||||
|
confs = self.user_requests.setdefault(user, PipelineInputs()) # despite the name, it creates a default only if not present
|
||||||
|
args: dict[str, Any] = {
|
||||||
|
"text": "Please choose an option or write your query",
|
||||||
|
"parse_mode": 'MarkdownV2',
|
||||||
|
Pattern mismatch: The pattern is Pattern mismatch: The pattern is `'^CANCEL$'` but the callback data is set to `self.name` which would be 'CANCEL' without regex anchors. However, `ConfigsChat.CANCEL.name` returns 'CANCEL', so this pattern should match correctly. The inconsistency is that line 110 and 114 use the same pattern but previous line 102 used lowercase 'cancel'. This could cause confusion - consider using consistent casing throughout.
|
|||||||
|
"reply_markup": InlineKeyboardMarkup([
|
||||||
|
[ConfigsChat.CHANGE_MODELS.get_inline_button()],
|
||||||
|
[ConfigsChat.STRATEGY.get_inline_button(confs.strategy.label)],
|
||||||
|
[ConfigsChat.CANCEL.get_inline_button()],
|
||||||
|
])
|
||||||
|
}
|
||||||
|
|
||||||
|
await (msg.edit_message_text(**args) if isinstance(msg, CallbackQuery) else msg.reply_text(**args))
|
||||||
return CONFIGS
|
return CONFIGS
|
||||||
|
|
||||||
async def __model_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||||
return await self._model_select(update, ConfigsChat.MODEL_TEAM)
|
query, user = await self.handle_callbackquery(update)
|
||||||
|
logging.info(f"@{user.username} canceled the conversation.")
|
||||||
|
if user in self.user_requests:
|
||||||
|
del self.user_requests[user]
|
||||||
|
await query.edit_message_text("Conversation canceled. Use /start to begin again.")
|
||||||
|
return ConversationHandler.END
|
||||||
|
|
||||||
async def __model_output(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
##########################################
|
||||||
return await self._model_select(update, ConfigsChat.MODEL_OUTPUT)
|
# Configurazioni
|
||||||
|
##########################################
|
||||||
|
async def __models(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||||
|
query, user = await self.handle_callbackquery(update)
|
||||||
|
req = self.user_requests[user]
|
||||||
|
|
||||||
async def _model_select(self, update: Update, state: ConfigsChat, msg: str | None = None) -> int:
|
await query.edit_message_text("Select a model", reply_markup=InlineKeyboardMarkup([
|
||||||
|
[ConfigsChat.MODEL_CHECK.get_inline_button(req.query_analyzer_model.label)],
|
||||||
|
[ConfigsChat.MODEL_TEAM_LEADER.get_inline_button(req.team_leader_model.label)],
|
||||||
|
[ConfigsChat.MODEL_TEAM.get_inline_button(req.team_model.label)],
|
||||||
|
[ConfigsChat.MODEL_REPORT.get_inline_button(req.report_generation_model.label)],
|
||||||
|
[ConfigsChat.CANCEL.get_inline_button()]
|
||||||
|
]))
|
||||||
|
return SELECT_MODEL
|
||||||
|
|
||||||
|
async def __model_select(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||||
query, user = await self.handle_callbackquery(update)
|
query, user = await self.handle_callbackquery(update)
|
||||||
|
|
||||||
|
if not query.data:
|
||||||
|
logging.error("Callback query data is None")
|
||||||
|
return CONFIGS
|
||||||
|
|
||||||
req = self.user_requests[user]
|
req = self.user_requests[user]
|
||||||
models = self.build_callback_data("__select_config", state, req.list_models_names())
|
models = self.build_callback_data("__select_config", ConfigsChat[query.data], req.list_models_names())
|
||||||
inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models]
|
inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models]
|
||||||
|
|
||||||
await query.edit_message_text(msg or state.value, reply_markup=InlineKeyboardMarkup(inline_btns))
|
await query.edit_message_text("Select a model", reply_markup=InlineKeyboardMarkup(inline_btns))
|
||||||
return SELECT_CONFIG
|
return SELECT_CONFIG
|
||||||
|
|
||||||
async def __strategy(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
async def __strategy(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||||
@@ -178,41 +233,31 @@ class TelegramApp:
|
|||||||
|
|
||||||
async def __select_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
async def __select_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||||
query, user = await self.handle_callbackquery(update)
|
query, user = await self.handle_callbackquery(update)
|
||||||
logging.debug(f"@{user.username} --> {query.data}")
|
logging.info(f"@{user.username} --> {query.data}")
|
||||||
|
|
||||||
req = self.user_requests[user]
|
req = self.user_requests[user]
|
||||||
|
Creating async tasks with Creating async tasks with `asyncio.create_task` without awaiting or tracking them can lead to unhandled exceptions and race conditions. The task may fail silently if `msg.edit_text` raises an exception. Consider using `await` directly or implementing proper task tracking and error handling.
The f-string uses backticks for markdown code formatting, but wraps the entire expression. If The f-string uses backticks for markdown code formatting, but wraps the entire expression. If `e.agent_name` or `e.tool.tool_name` contain special MarkdownV2 characters, they should be escaped. Consider adding proper markdown escaping for user-facing content.
|
|||||||
_, state, index = str(query.data).split(QUERY_SEP)
|
_, state, index = str(query.data).split(QUERY_SEP)
|
||||||
if state == str(ConfigsChat.MODEL_TEAM):
|
ConfigsChat[state].change_value(req, int(index))
|
||||||
req.choose_team(int(index))
|
|
||||||
if state == str(ConfigsChat.MODEL_OUTPUT):
|
|
||||||
req.choose_team_leader(int(index))
|
|
||||||
if state == str(ConfigsChat.STRATEGY):
|
|
||||||
req.choose_strategy(int(index))
|
|
||||||
|
|
||||||
await self.start_message(user, query)
|
return await self.__go_to_start(update, context)
|
||||||
return CONFIGS
|
|
||||||
|
|
||||||
async def __start_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
async def __start_llms(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||||
|
Changed from Changed from `config.value` to `config.name`, but line 241 expects the string representation to match `ConfigsChat[state]`. This works correctly with `.name`, but the original code at line 240-247 compared `state == str(ConfigsChat.MODEL_TEAM)` which used string comparison with the enum value. The new implementation using `ConfigsChat[state]` is correct, but ensure this change is intentional and all call sites are updated.
|
|||||||
message, user = await self.handle_message(update)
|
message, user = self.handle_message(update)
|
||||||
|
|
||||||
confs = self.user_requests[user]
|
confs = self.user_requests[user]
|
||||||
confs.user_query = message.text or ""
|
confs.user_query = message.text or ""
|
||||||
|
|
||||||
logging.info(f"@{user.username} started the team with [{confs.team_model.label}, {confs.team_leader_model.label}, {confs.strategy.label}]")
|
logging.info(f"@{user.username} started the team with [{confs.query_analyzer_model.label}, {confs.team_model.label}, {confs.team_leader_model.label}, {confs.report_generation_model.label}, {confs.strategy.label}]")
|
||||||
await self.__run_team(update, confs)
|
await self.__run(update, confs)
|
||||||
|
|
||||||
logging.info(f"@{user.username} team finished.")
|
logging.info(f"@{user.username} team finished.")
|
||||||
return ConversationHandler.END
|
return ConversationHandler.END
|
||||||
|
|
||||||
async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
|
||||||
query, user = await self.handle_callbackquery(update)
|
|
||||||
logging.info(f"@{user.username} canceled the conversation.")
|
|
||||||
if user in self.user_requests:
|
|
||||||
del self.user_requests[user]
|
|
||||||
await query.edit_message_text("Conversation canceled. Use /start to begin again.")
|
|
||||||
return ConversationHandler.END
|
|
||||||
|
|
||||||
async def __run_team(self, update: Update, inputs: PipelineInputs) -> None:
|
##########################################
|
||||||
|
# RUN APP
|
||||||
|
##########################################
|
||||||
|
async def __run(self, update: Update, inputs: PipelineInputs) -> None:
|
||||||
if not update.message: return
|
if not update.message: return
|
||||||
|
|
||||||
bot = update.get_bot()
|
bot = update.get_bot()
|
||||||
@@ -221,8 +266,10 @@ class TelegramApp:
|
|||||||
|
|
||||||
configs_str = [
|
configs_str = [
|
||||||
'Running with configurations: ',
|
'Running with configurations: ',
|
||||||
|
f'Check: {inputs.query_analyzer_model.label}',
|
||||||
|
f'Leader: {inputs.team_leader_model.label}',
|
||||||
f'Team: {inputs.team_model.label}',
|
f'Team: {inputs.team_model.label}',
|
||||||
f'Output: {inputs.team_leader_model.label}',
|
f'Report: {inputs.report_generation_model.label}',
|
||||||
f'Strategy: {inputs.strategy.label}',
|
f'Strategy: {inputs.strategy.label}',
|
||||||
f'Query: "{inputs.user_query}"'
|
f'Query: "{inputs.user_query}"'
|
||||||
]
|
]
|
||||||
|
|||||||
The comparison
self.name == self.MODEL_CHECK.namewill always be true whenselfisMODEL_CHECK. Consider using a dictionary mapping instead of if-elif chains for better maintainability. Example:model_map = {self.MODEL_CHECK.name: inputs.choose_query_checker, ...}thenmodel_map[self.name](new_value)