Add Telegram bot support #23
@@ -3,24 +3,32 @@ from enum import Enum
|
||||
from typing import Any
|
||||
from agno.utils.log import log_info # type: ignore
|
||||
from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User
|
||||
from telegram.ext import Application, CommandHandler, ContextTypes, ConversationHandler, ExtBot, JobQueue, CallbackQueryHandler
|
||||
from telegram.ext import Application, CommandHandler, ContextTypes, ConversationHandler, ExtBot, JobQueue, CallbackQueryHandler, MessageHandler, filters
|
||||
from app.models import AppModels
|
||||
from app.predictor import PredictorStyle
|
||||
|
||||
|
||||
# conversation states
|
||||
class ConfigStates(Enum):
|
||||
# Lo stato cambia in base al valore di ritorno delle funzioni async
|
||||
# END state è già definito in telegram.ext.ConversationHandler
|
||||
# Un semplice schema delle interazioni:
|
||||
# /start
|
||||
# ║
|
||||
# V
|
||||
# ╔══ CONFIGS <═════╗
|
||||
# ║ ║ ╚══> SELECT_CONFIG
|
||||
# ║ V
|
||||
# ║ start_team (polling for updates)
|
||||
# ║ ║
|
||||
# ║ V
|
||||
# ╚═══> END
|
||||
CONFIGS, SELECT_CONFIG = range(2)
|
||||
|
||||
class ConfigsChat(Enum):
|
||||
MODEL_TEAM = "Team Model"
|
||||
MODEL_OUTPUT = "Output Model"
|
||||
STRATEGY = "Strategy"
|
||||
|
||||
# conversation stages (checkpoints)
|
||||
class Checkpoints(Enum):
|
||||
CONFIGS = 1
|
||||
TEAM_RUNNING = 2
|
||||
END = 3
|
||||
|
||||
class RunConfigs:
|
||||
class ConfigsRun:
|
||||
model_team: AppModels
|
||||
model_output: AppModels
|
||||
strategy: PredictorStyle
|
||||
@@ -30,10 +38,12 @@ class RunConfigs:
|
||||
self.model_output = AppModels.OLLAMA_QWEN_1B
|
||||
self.strategy = PredictorStyle.CONSERVATIVE
|
||||
|
||||
|
||||
|
||||
class BotFunctions:
|
||||
|
||||
# In theory this is already thread-safe if run with CPython
|
||||
users_req: dict[User, RunConfigs] = {}
|
||||
users_req: dict[User, ConfigsRun] = {}
|
||||
app_models: list[AppModels] = AppModels.availables()
|
||||
strategies: list[PredictorStyle] = list(PredictorStyle)
|
||||
|
||||
@@ -55,17 +65,18 @@ class BotFunctions:
|
||||
app = Application.builder().token(token).build()
|
||||
|
||||
conv_handler = ConversationHandler(
|
||||
per_message=False, # capire a cosa serve perchè da un warning quando parte il server
|
||||
entry_points=[CommandHandler('start', BotFunctions.__start)],
|
||||
states={
|
||||
Checkpoints.CONFIGS: [
|
||||
CallbackQueryHandler(BotFunctions.__model_team, pattern=ConfigStates.MODEL_TEAM.name),
|
||||
CallbackQueryHandler(BotFunctions.__model_output, pattern=ConfigStates.MODEL_OUTPUT.name),
|
||||
CallbackQueryHandler(BotFunctions.__strategy, pattern=ConfigStates.STRATEGY.name),
|
||||
CallbackQueryHandler(BotFunctions.__next, pattern='^__next'),
|
||||
CallbackQueryHandler(BotFunctions.__cancel, pattern='^cancel$')
|
||||
CONFIGS: [
|
||||
CallbackQueryHandler(BotFunctions.__model_team, pattern=ConfigsChat.MODEL_TEAM.name),
|
||||
CallbackQueryHandler(BotFunctions.__model_output, pattern=ConfigsChat.MODEL_OUTPUT.name),
|
||||
CallbackQueryHandler(BotFunctions.__strategy, pattern=ConfigsChat.STRATEGY.name),
|
||||
CallbackQueryHandler(BotFunctions.__cancel, pattern='^cancel$'),
|
||||
MessageHandler(filters.TEXT, BotFunctions.__start_team) # Any text message
|
||||
],
|
||||
Checkpoints.TEAM_RUNNING: [],
|
||||
Checkpoints.END: [
|
||||
SELECT_CONFIG: [
|
||||
CallbackQueryHandler(BotFunctions.__select_config, pattern='^__select_config:.*$'),
|
||||
]
|
||||
},
|
||||
fallbacks=[CommandHandler('start', BotFunctions.__start)],
|
||||
@@ -81,18 +92,18 @@ class BotFunctions:
|
||||
########################################
|
||||
@staticmethod
|
||||
async def start_message(user: User, query: CallbackQuery | Message) -> None:
|
||||
confs = BotFunctions.users_req.setdefault(user, RunConfigs())
|
||||
confs = BotFunctions.users_req.setdefault(user, ConfigsRun())
|
||||
|
||||
str_model_team = f"{ConfigStates.MODEL_TEAM.value}:\t\t {confs.model_team.name}"
|
||||
str_model_output = f"{ConfigStates.MODEL_OUTPUT.value}:\t\t {confs.model_output.name}"
|
||||
str_strategy = f"{ConfigStates.STRATEGY.value}:\t\t {confs.strategy.name}"
|
||||
str_model_team = f"{ConfigsChat.MODEL_TEAM.value}: {confs.model_team.name}"
|
||||
str_model_output = f"{ConfigsChat.MODEL_OUTPUT.value}:\t\t {confs.model_output.name}"
|
||||
str_strategy = f"{ConfigsChat.STRATEGY.value}:\t\t {confs.strategy.name}"
|
||||
|
||||
msg, keyboard = (
|
||||
"Please choose an option or write your query",
|
||||
InlineKeyboardMarkup([
|
||||
[InlineKeyboardButton(str_model_team, callback_data=ConfigStates.MODEL_TEAM.name)],
|
||||
[InlineKeyboardButton(str_model_output, callback_data=ConfigStates.MODEL_OUTPUT.name)],
|
||||
[InlineKeyboardButton(str_strategy, callback_data=ConfigStates.STRATEGY.name)],
|
||||
[InlineKeyboardButton(str_model_team, callback_data=ConfigsChat.MODEL_TEAM.name)],
|
||||
[InlineKeyboardButton(str_model_output, callback_data=ConfigsChat.MODEL_OUTPUT.name)],
|
||||
[InlineKeyboardButton(str_strategy, callback_data=ConfigsChat.STRATEGY.name)],
|
||||
[InlineKeyboardButton("Cancel", callback_data='cancel')]
|
||||
])
|
||||
)
|
||||
@@ -103,14 +114,14 @@ class BotFunctions:
|
||||
await query.reply_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2')
|
||||
|
||||
@staticmethod
|
||||
async def handle_configs(update: Update, state: ConfigStates, msg: str | None = None) -> Checkpoints:
|
||||
async def handle_configs(update: Update, state: ConfigsChat, msg: str | None = None) -> int:
|
||||
query, _ = await BotFunctions.handle_callbackquery(update)
|
||||
|
||||
models = [(m.name, f"__next:{state}:{m.name}") for m in BotFunctions.app_models]
|
||||
models = [(m.name, f"__select_config:{state}:{m.name}") for m in BotFunctions.app_models]
|
||||
inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models]
|
||||
|
||||
await query.edit_message_text(msg or state.value, reply_markup=InlineKeyboardMarkup(inline_btns))
|
||||
return Checkpoints.CONFIGS
|
||||
return SELECT_CONFIG
|
||||
|
||||
@staticmethod
|
||||
async def handle_callbackquery(update: Update) -> tuple[CallbackQuery, User]:
|
||||
@@ -119,62 +130,92 @@ class BotFunctions:
|
||||
await query.answer() # Acknowledge the callback query
|
||||
return query, query.from_user
|
||||
|
||||
@staticmethod
|
||||
async def handle_message(update: Update) -> tuple[Message, User]:
|
||||
assert update.message and update.message.from_user, "Update message or user is None"
|
||||
return update.message, update.message.from_user
|
||||
|
||||
|
||||
#########################################
|
||||
# Funzioni async per i comandi e messaggi
|
||||
#########################################
|
||||
@staticmethod
|
||||
async def __start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Checkpoints:
|
||||
assert update.message and update.message.from_user, "Update message or user is None"
|
||||
user = update.message.from_user
|
||||
async def __start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
message, user = await BotFunctions.handle_message(update)
|
||||
log_info(f"@{user.username} started the conversation.")
|
||||
await BotFunctions.start_message(user, update.message)
|
||||
return Checkpoints.CONFIGS
|
||||
await BotFunctions.start_message(user, message)
|
||||
return CONFIGS
|
||||
|
||||
@staticmethod
|
||||
async def __cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Checkpoints:
|
||||
query, user = await BotFunctions.handle_callbackquery(update)
|
||||
log_info(f"@{user.username} canceled the conversation.")
|
||||
if user in BotFunctions.users_req:
|
||||
del BotFunctions.users_req[user]
|
||||
await query.edit_message_text("Conversation canceled. Use /start to begin again.")
|
||||
return Checkpoints.END
|
||||
async def __model_team(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
return await BotFunctions.handle_configs(update, ConfigsChat.MODEL_TEAM)
|
||||
|
||||
@staticmethod
|
||||
async def __model_team(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Checkpoints:
|
||||
return await BotFunctions.handle_configs(update, ConfigStates.MODEL_TEAM)
|
||||
async def __model_output(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
return await BotFunctions.handle_configs(update, ConfigsChat.MODEL_OUTPUT)
|
||||
|
||||
@staticmethod
|
||||
async def __model_output(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Checkpoints:
|
||||
return await BotFunctions.handle_configs(update, ConfigStates.MODEL_OUTPUT)
|
||||
|
||||
@staticmethod
|
||||
async def __strategy(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Checkpoints:
|
||||
async def __strategy(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
query, _ = await BotFunctions.handle_callbackquery(update)
|
||||
|
||||
strategies = [(s.name, f"__next:{ConfigStates.STRATEGY}:{s.name}") for s in BotFunctions.strategies]
|
||||
strategies = [(s.name, f"__select_config:{ConfigsChat.STRATEGY}:{s.name}") for s in BotFunctions.strategies]
|
||||
inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in strategies]
|
||||
|
||||
await query.edit_message_text("Select a strategy", reply_markup=InlineKeyboardMarkup(inline_btns))
|
||||
return Checkpoints.CONFIGS
|
||||
return SELECT_CONFIG
|
||||
|
||||
@staticmethod
|
||||
async def __next(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Checkpoints:
|
||||
async def __select_config(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
query, user = await BotFunctions.handle_callbackquery(update)
|
||||
log_info(f"@{user.username} --> {query.data}")
|
||||
|
||||
req = BotFunctions.users_req[user]
|
||||
|
||||
_, state, model_name = str(query.data).split(':')
|
||||
if state == str(ConfigStates.MODEL_TEAM):
|
||||
if state == str(ConfigsChat.MODEL_TEAM):
|
||||
req.model_team = AppModels[model_name]
|
||||
if state == str(ConfigStates.MODEL_OUTPUT):
|
||||
if state == str(ConfigsChat.MODEL_OUTPUT):
|
||||
req.model_output = AppModels[model_name]
|
||||
if state == str(ConfigStates.STRATEGY):
|
||||
if state == str(ConfigsChat.STRATEGY):
|
||||
req.strategy = PredictorStyle[model_name]
|
||||
|
||||
await BotFunctions.start_message(user, query)
|
||||
return Checkpoints.CONFIGS
|
||||
return CONFIGS
|
||||
|
||||
@staticmethod
|
||||
async def __start_team(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
message, user = await BotFunctions.handle_message(update)
|
||||
msg2 = await message.reply_text("Elaborating your request...")
|
||||
|
||||
confs = BotFunctions.users_req[user]
|
||||
log_info(f"@{user.username} started the team with [{confs.model_team}, {confs.model_output}, {confs.strategy}]")
|
||||
|
||||
await BotFunctions.__run_team(confs, msg2)
|
||||
return ConversationHandler.END
|
||||
|
||||
@staticmethod
|
||||
async def __cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
query, user = await BotFunctions.handle_callbackquery(update)
|
||||
log_info(f"@{user.username} canceled the conversation.")
|
||||
if user in BotFunctions.users_req:
|
||||
del BotFunctions.users_req[user]
|
||||
await query.edit_message_text("Conversation canceled. Use /start to begin again.")
|
||||
return ConversationHandler.END
|
||||
|
||||
@staticmethod
|
||||
async def __run_team(confs: ConfigsRun, msg: Message) -> None:
|
||||
# TODO fare il run effettivo del team
|
||||
import asyncio
|
||||
|
||||
# Simulate a long-running task
|
||||
n_simulations = 3
|
||||
for i in range(n_simulations):
|
||||
await msg.edit_text(f"Working... {i+1}/{n_simulations}")
|
||||
await asyncio.sleep(2)
|
||||
await msg.edit_text("Team work completed.")
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from dotenv import load_dotenv
|
||||
|
||||
Reference in New Issue
Block a user