Telegram bot support (#23)
* Aggiungi supporto per il bot Telegram: aggiorna .env.example, pyproject.toml e uv.lock * demo per bot Telegram con gestione comandi e inline keyboard * Implementazione del bot Telegram con gestione dei comandi e stati di conversazione iniziali * Aggiorna la gestione delle configurazioni nel bot Telegram: modifica gli stati della conversazione e aggiungi il supporto per la gestione dei messaggi. * fix static models & readme * aggiunto il supporto per la query dell'utente e modificata la visualizzazione dei messaggi di stato. * Aggiunto il supporto per la gestione del bot Telegram e aggiornata la configurazione del pipeline * Aggiornato .gitignore per includere la cartella .gradio e rimosso chroma_db. Aggiunto il supporto per la generazione di report in PDF utilizzando markdown-pdf nel bot Telegram. * Refactor pipeline and chat manager for improved structure and functionality * Better logging * Aggiornato il comportamento del logging per i logger di agno. Aggiunto il supporto per l'opzione check_for_async nella configurazione di RedditWrapper. * Rimosso codice commentato e import non utilizzati nella classe Pipeline per semplificare la struttura * Aggiornata la sezione "Applicazione" nel README & fix main * Telegram instance instead of static * Fix logging to use labels for team model, leader model, and strategy * Rinomina il lock da _lock a __lock per garantire l'incapsulamento nella classe AppConfig * Rinomina i logger per una migliore identificazione e gestisce le eccezioni nel bot di Telegram * Aggiorna i messaggi di errore nel gestore Telegram per una migliore chiarezza e modifica il commento nel file di configurazione per riflettere lo stato del modello. * Aggiungi un messaggio di attesa durante la generazione del report nel bot di Telegram
This commit was merged in pull request #23.
This commit is contained in:
committed by
GitHub
parent
45a17d4570
commit
c96617a039
@@ -1,86 +1,31 @@
|
||||
import asyncio
|
||||
import gradio as gr
|
||||
import logging
|
||||
from dotenv import load_dotenv
|
||||
from agno.utils.log import log_info #type: ignore
|
||||
from app.configs import AppConfig
|
||||
from app.interface import ChatManager
|
||||
from app.interface import *
|
||||
from app.agents import Pipeline
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Inizializzazioni
|
||||
load_dotenv()
|
||||
|
||||
configs = AppConfig.load()
|
||||
pipeline = Pipeline(configs)
|
||||
|
||||
chat = ChatManager()
|
||||
|
||||
########################################
|
||||
# Funzioni Gradio
|
||||
########################################
|
||||
def respond(message: str, history: list[dict[str, str]]) -> tuple[list[dict[str, str]], list[dict[str, str]], str]:
|
||||
chat.send_message(message)
|
||||
response = pipeline.interact(message)
|
||||
chat.receive_message(response)
|
||||
history.append({"role": "user", "content": message})
|
||||
history.append({"role": "assistant", "content": response})
|
||||
return history, history, ""
|
||||
|
||||
def save_current_chat() -> str:
|
||||
chat.save_chat("chat.json")
|
||||
return "💾 Chat salvata in chat.json"
|
||||
|
||||
def load_previous_chat() -> tuple[list[dict[str, str]], list[dict[str, str]]]:
|
||||
chat.load_chat("chat.json")
|
||||
history: list[dict[str, str]] = []
|
||||
for m in chat.get_history():
|
||||
history.append({"role": m["role"], "content": m["content"]})
|
||||
return history, history
|
||||
|
||||
def reset_chat() -> tuple[list[dict[str, str]], list[dict[str, str]]]:
|
||||
chat.reset_chat()
|
||||
return [], []
|
||||
|
||||
########################################
|
||||
# Interfaccia Gradio
|
||||
########################################
|
||||
with gr.Blocks() as demo:
|
||||
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)")
|
||||
|
||||
# Dropdown provider e stile
|
||||
with gr.Row():
|
||||
provider = gr.Dropdown(
|
||||
choices=pipeline.list_providers(),
|
||||
type="index",
|
||||
label="Modello da usare"
|
||||
)
|
||||
provider.change(fn=pipeline.choose_predictor, inputs=provider, outputs=None)
|
||||
|
||||
style = gr.Dropdown(
|
||||
choices=pipeline.list_styles(),
|
||||
type="index",
|
||||
label="Stile di investimento"
|
||||
)
|
||||
style.change(fn=pipeline.choose_strategy, inputs=style, outputs=None)
|
||||
|
||||
chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages")
|
||||
msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?")
|
||||
|
||||
with gr.Row():
|
||||
clear_btn = gr.Button("🗑️ Reset Chat")
|
||||
save_btn = gr.Button("💾 Salva Chat")
|
||||
load_btn = gr.Button("📂 Carica Chat")
|
||||
|
||||
# Eventi e interazioni
|
||||
msg.submit(respond, inputs=[msg, chatbot], outputs=[chatbot, chatbot, msg])
|
||||
clear_btn.click(reset_chat, inputs=None, outputs=[chatbot, chatbot])
|
||||
save_btn.click(save_current_chat, inputs=None, outputs=None)
|
||||
load_btn.click(load_previous_chat, inputs=None, outputs=[chatbot, chatbot])
|
||||
chat = ChatManager(pipeline)
|
||||
gradio = chat.gradio_build_interface()
|
||||
_app, local_url, share_url = gradio.launch(server_name="0.0.0.0", server_port=configs.port, quiet=True, prevent_thread_lock=True, share=configs.gradio_share)
|
||||
logging.info(f"UPO AppAI Chat is running on {share_url or local_url}")
|
||||
|
||||
try:
|
||||
_app, local, shared = demo.launch(server_name="0.0.0.0", server_port=configs.port, quiet=True, prevent_thread_lock=True, share=configs.gradio_share)
|
||||
log_info(f"Starting UPO AppAI Chat on {shared or local}")
|
||||
asyncio.get_event_loop().run_forever()
|
||||
except KeyboardInterrupt:
|
||||
demo.close()
|
||||
telegram = TelegramApp(pipeline)
|
||||
telegram.add_miniapp_url(share_url)
|
||||
telegram.run()
|
||||
except AssertionError as e:
|
||||
try:
|
||||
logging.warning(f"Telegram bot could not be started: {e}")
|
||||
asyncio.get_event_loop().run_forever()
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Shutting down due to KeyboardInterrupt")
|
||||
finally:
|
||||
gradio.close()
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
from agno.run.agent import RunOutput
|
||||
import logging
|
||||
from app.agents.team import create_team_with
|
||||
from app.agents.predictor import PredictorInput, PredictorOutput
|
||||
from app.agents.prompts import *
|
||||
from app.api.core.markets import ProductInfo
|
||||
from app.configs import AppConfig
|
||||
|
||||
logging = logging.getLogger("pipeline")
|
||||
|
||||
|
||||
class Pipeline:
|
||||
"""
|
||||
@@ -17,27 +17,30 @@ class Pipeline:
|
||||
self.configs = configs
|
||||
|
||||
# Stato iniziale
|
||||
self.choose_strategy(0)
|
||||
self.choose_predictor(0)
|
||||
self.leader_model = self.configs.get_model_by_name(self.configs.agents.team_leader_model)
|
||||
self.team_model = self.configs.get_model_by_name(self.configs.agents.team_model)
|
||||
self.strategy = self.configs.get_strategy_by_name(self.configs.agents.strategy)
|
||||
|
||||
# ======================
|
||||
# Dropdown handlers
|
||||
# ======================
|
||||
def choose_predictor(self, index: int):
|
||||
def choose_leader(self, index: int):
|
||||
"""
|
||||
Sceglie il modello LLM da usare per il Predictor.
|
||||
Sceglie il modello LLM da usare per il Team.
|
||||
"""
|
||||
model = self.configs.models.all_models[index]
|
||||
self.predictor = model.get_agent(
|
||||
PREDICTOR_INSTRUCTIONS,
|
||||
output_schema=PredictorOutput,
|
||||
)
|
||||
self.leader_model = self.configs.models.all_models[index]
|
||||
|
||||
def choose_team(self, index: int):
|
||||
"""
|
||||
Sceglie il modello LLM da usare per il Team.
|
||||
"""
|
||||
self.team_model = self.configs.models.all_models[index]
|
||||
|
||||
def choose_strategy(self, index: int):
|
||||
"""
|
||||
Sceglie la strategia da usare per il Predictor.
|
||||
"""
|
||||
self.strat = self.configs.strategies[index].description
|
||||
self.strategy = self.configs.strategies[index]
|
||||
|
||||
# ======================
|
||||
# Helpers
|
||||
@@ -64,46 +67,18 @@ class Pipeline:
|
||||
3. Invoca Predictor
|
||||
4. Restituisce la strategia finale
|
||||
"""
|
||||
# Step 1: raccolta output dai membri del Team
|
||||
team_model = self.configs.get_model_by_name(self.configs.agents.team_model)
|
||||
leader_model = self.configs.get_model_by_name(self.configs.agents.team_leader_model)
|
||||
# Step 1: Creazione Team
|
||||
team = create_team_with(self.configs, self.team_model, self.leader_model)
|
||||
|
||||
team = create_team_with(self.configs, team_model, leader_model)
|
||||
# Step 2: raccolta output dai membri del Team
|
||||
logging.info(f"Pipeline received query: {query}")
|
||||
# TODO migliorare prompt (?)
|
||||
query = f"The user query is: {query}\n\n They requested a {self.strategy.label} investment strategy."
|
||||
team_outputs = team.run(query) # type: ignore
|
||||
|
||||
# Step 2: aggregazione output strutturati
|
||||
all_products: list[ProductInfo] = []
|
||||
sentiments: list[str] = []
|
||||
|
||||
for agent_output in team_outputs.member_responses:
|
||||
if isinstance(agent_output, RunOutput) and agent_output.metadata is not None:
|
||||
keys = agent_output.metadata.keys()
|
||||
if "products" in keys:
|
||||
all_products.extend(agent_output.metadata["products"])
|
||||
if "sentiment_news" in keys:
|
||||
sentiments.append(agent_output.metadata["sentiment_news"])
|
||||
if "sentiment_social" in keys:
|
||||
sentiments.append(agent_output.metadata["sentiment_social"])
|
||||
|
||||
aggregated_sentiment = "\n".join(sentiments)
|
||||
|
||||
# Step 3: invocazione Predictor
|
||||
predictor_input = PredictorInput(
|
||||
data=all_products,
|
||||
style=self.strat,
|
||||
sentiment=aggregated_sentiment
|
||||
)
|
||||
|
||||
result = self.predictor.run(predictor_input) # type: ignore
|
||||
if not isinstance(result.content, PredictorOutput):
|
||||
return "❌ Errore: il modello non ha restituito un output valido."
|
||||
prediction: PredictorOutput = result.content
|
||||
|
||||
# Step 4: restituzione strategia finale
|
||||
portfolio_lines = "\n".join(
|
||||
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
|
||||
)
|
||||
return (
|
||||
f"📊 Strategia ({self.strat}): {prediction.strategy}\n\n"
|
||||
f"💼 Portafoglio consigliato:\n{portfolio_lines}"
|
||||
)
|
||||
# Step 3: recupero ouput
|
||||
if not isinstance(team_outputs.content, str):
|
||||
logging.error(f"Team output is not a string: {team_outputs.content}")
|
||||
raise ValueError("Team output is not a string")
|
||||
logging.info(f"Team finished")
|
||||
return team_outputs.content
|
||||
|
||||
@@ -59,6 +59,7 @@ class RedditWrapper(SocialWrapper):
|
||||
client_id=client_id,
|
||||
client_secret=client_secret,
|
||||
user_agent="upo-appAI",
|
||||
check_for_async=False,
|
||||
)
|
||||
self.subreddits = self.tool.subreddit("+".join(SUBREDDITS))
|
||||
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import inspect
|
||||
import logging
|
||||
import time
|
||||
import traceback
|
||||
from typing import Any, Callable, Generic, TypeVar
|
||||
from agno.utils.log import log_info, log_warning #type: ignore
|
||||
|
||||
logging = logging.getLogger("wrapper_handler")
|
||||
WrapperType = TypeVar("WrapperType")
|
||||
WrapperClassType = TypeVar("WrapperClassType")
|
||||
OutputType = TypeVar("OutputType")
|
||||
@@ -86,7 +87,7 @@ class WrapperHandler(Generic[WrapperType]):
|
||||
Exception: If all wrappers fail after retries.
|
||||
"""
|
||||
|
||||
log_info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
|
||||
logging.info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
|
||||
results: dict[str, OutputType] = {}
|
||||
starting_index = self.index
|
||||
|
||||
@@ -96,18 +97,18 @@ class WrapperHandler(Generic[WrapperType]):
|
||||
wrapper_name = wrapper.__class__.__name__
|
||||
|
||||
if not try_all:
|
||||
log_info(f"try_call {wrapper_name}")
|
||||
logging.info(f"try_call {wrapper_name}")
|
||||
|
||||
for try_count in range(1, self.retry_per_wrapper + 1):
|
||||
try:
|
||||
result = func(wrapper)
|
||||
log_info(f"{wrapper_name} succeeded")
|
||||
logging.info(f"{wrapper_name} succeeded")
|
||||
results[wrapper_name] = result
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
error = WrapperHandler.__concise_error(e)
|
||||
log_warning(f"{wrapper_name} failed {try_count}/{self.retry_per_wrapper}: {error}")
|
||||
logging.warning(f"{wrapper_name} failed {try_count}/{self.retry_per_wrapper}: {error}")
|
||||
time.sleep(self.retry_delay)
|
||||
|
||||
if not try_all and results:
|
||||
@@ -153,6 +154,6 @@ class WrapperHandler(Generic[WrapperType]):
|
||||
wrapper = wrapper_class(**(kwargs or {}))
|
||||
result.append(wrapper)
|
||||
except Exception as e:
|
||||
log_warning(f"{wrapper_class} cannot be initialized: {e}")
|
||||
logging.warning(f"'{wrapper_class.__name__}' cannot be initialized: {e}")
|
||||
|
||||
return WrapperHandler(result, try_per_wrapper, retry_delay)
|
||||
@@ -145,6 +145,17 @@ class AppConfig(BaseModel):
|
||||
return strat
|
||||
raise ValueError(f"Strategy with name '{name}' not found.")
|
||||
|
||||
def get_defaults(self) -> tuple[AppModel, AppModel, Strategy]:
|
||||
"""
|
||||
Retrieve the default team model, leader model, and strategy.
|
||||
Returns:
|
||||
A tuple containing the default team model (AppModel), leader model (AppModel), and strategy (Strategy).
|
||||
"""
|
||||
team_model = self.get_model_by_name(self.agents.team_model)
|
||||
leader_model = self.get_model_by_name(self.agents.team_leader_model)
|
||||
strategy = self.get_strategy_by_name(self.agents.strategy)
|
||||
return team_model, leader_model, strategy
|
||||
|
||||
def set_logging_level(self) -> None:
|
||||
"""
|
||||
Set the logging level based on the configuration.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from app.interface.chat import ChatManager
|
||||
from app.interface.telegram_app import TelegramApp
|
||||
|
||||
__all__ = ["ChatManager"]
|
||||
__all__ = ["ChatManager", "TelegramApp"]
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import json
|
||||
import os
|
||||
import json
|
||||
import gradio as gr
|
||||
from app.agents.pipeline import Pipeline
|
||||
|
||||
|
||||
class ChatManager:
|
||||
"""
|
||||
@@ -9,8 +12,9 @@ class ChatManager:
|
||||
- salva e ricarica le chat
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, pipeline: Pipeline):
|
||||
self.history: list[dict[str, str]] = [] # [{"role": "user"/"assistant", "content": "..."}]
|
||||
self.pipeline = pipeline
|
||||
|
||||
def send_message(self, message: str) -> None:
|
||||
"""
|
||||
@@ -56,3 +60,66 @@ class ChatManager:
|
||||
Restituisce lo storico completo della chat.
|
||||
"""
|
||||
return self.history
|
||||
|
||||
|
||||
########################################
|
||||
# Funzioni Gradio
|
||||
########################################
|
||||
def gradio_respond(self, message: str, history: list[dict[str, str]]) -> tuple[list[dict[str, str]], list[dict[str, str]], str]:
|
||||
self.send_message(message)
|
||||
response = self.pipeline.interact(message)
|
||||
self.receive_message(response)
|
||||
history.append({"role": "user", "content": message})
|
||||
history.append({"role": "assistant", "content": response})
|
||||
return history, history, ""
|
||||
|
||||
def gradio_save(self) -> str:
|
||||
self.save_chat("chat.json")
|
||||
return "💾 Chat salvata in chat.json"
|
||||
|
||||
def gradio_load(self) -> tuple[list[dict[str, str]], list[dict[str, str]]]:
|
||||
self.load_chat("chat.json")
|
||||
history: list[dict[str, str]] = []
|
||||
for m in self.get_history():
|
||||
history.append({"role": m["role"], "content": m["content"]})
|
||||
return history, history
|
||||
|
||||
def gradio_clear(self) -> tuple[list[dict[str, str]], list[dict[str, str]]]:
|
||||
self.reset_chat()
|
||||
return [], []
|
||||
|
||||
def gradio_build_interface(self) -> gr.Blocks:
|
||||
with gr.Blocks() as interface:
|
||||
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)")
|
||||
|
||||
# Dropdown provider e stile
|
||||
with gr.Row():
|
||||
provider = gr.Dropdown(
|
||||
choices=self.pipeline.list_providers(),
|
||||
type="index",
|
||||
label="Modello da usare"
|
||||
)
|
||||
provider.change(fn=self.pipeline.choose_leader, inputs=provider, outputs=None)
|
||||
|
||||
style = gr.Dropdown(
|
||||
choices=self.pipeline.list_styles(),
|
||||
type="index",
|
||||
label="Stile di investimento"
|
||||
)
|
||||
style.change(fn=self.pipeline.choose_strategy, inputs=style, outputs=None)
|
||||
|
||||
chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages")
|
||||
msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?")
|
||||
|
||||
with gr.Row():
|
||||
clear_btn = gr.Button("🗑️ Reset Chat")
|
||||
save_btn = gr.Button("💾 Salva Chat")
|
||||
load_btn = gr.Button("📂 Carica Chat")
|
||||
|
||||
# Eventi e interazioni
|
||||
msg.submit(self.gradio_respond, inputs=[msg, chatbot], outputs=[chatbot, chatbot, msg])
|
||||
clear_btn.click(self.gradio_clear, inputs=None, outputs=[chatbot, chatbot])
|
||||
save_btn.click(self.gradio_save, inputs=None, outputs=None)
|
||||
load_btn.click(self.gradio_load, inputs=None, outputs=[chatbot, chatbot])
|
||||
|
||||
return interface
|
||||
264
src/app/interface/telegram_app.py
Normal file
264
src/app/interface/telegram_app.py
Normal file
@@ -0,0 +1,264 @@
|
||||
import io
|
||||
import os
|
||||
import json
|
||||
import httpx
|
||||
import logging
|
||||
import warnings
|
||||
from enum import Enum
|
||||
from markdown_pdf import MarkdownPdf, Section
|
||||
from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User
|
||||
from telegram.constants import ChatAction
|
||||
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, ConversationHandler, MessageHandler, filters
|
||||
from app.agents.pipeline import Pipeline
|
||||
from app.configs import AppConfig
|
||||
|
||||
# per per_message di ConversationHandler che rompe sempre qualunque input tu metta
|
||||
warnings.filterwarnings("ignore")
|
||||
logging = logging.getLogger("telegram")
|
||||
|
||||
|
||||
# Lo stato cambia in base al valore di ritorno delle funzioni async
|
||||
# END state è già definito in telegram.ext.ConversationHandler
|
||||
# Un semplice schema delle interazioni:
|
||||
# /start
|
||||
# ║
|
||||
# V
|
||||
# ╔══ CONFIGS <═════╗
|
||||
# ║ ║ ╚══> SELECT_CONFIG
|
||||
# ║ V
|
||||
# ║ start_team (polling for updates)
|
||||
# ║ ║
|
||||
# ║ V
|
||||
# ╚═══> END
|
||||
CONFIGS, SELECT_CONFIG = range(2)
|
||||
|
||||
# Usato per separare la query arrivata da Telegram
|
||||
QUERY_SEP = "|==|"
|
||||
|
||||
class ConfigsChat(Enum):
|
||||
MODEL_TEAM = "Team Model"
|
||||
MODEL_OUTPUT = "Output Model"
|
||||
STRATEGY = "Strategy"
|
||||
|
||||
class ConfigsRun:
|
||||
def __init__(self, configs: AppConfig):
|
||||
team, leader, strategy = configs.get_defaults()
|
||||
self.team_model = team
|
||||
self.leader_model = leader
|
||||
self.strategy = strategy
|
||||
self.user_query = ""
|
||||
|
||||
|
||||
class TelegramApp:
|
||||
def __init__(self, pipeline: Pipeline):
|
||||
token = os.getenv("TELEGRAM_BOT_TOKEN")
|
||||
assert token, "TELEGRAM_BOT_TOKEN environment variable not set"
|
||||
|
||||
self.user_requests: dict[User, ConfigsRun] = {}
|
||||
self.pipeline = pipeline
|
||||
self.token = token
|
||||
self.create_bot()
|
||||
|
||||
def add_miniapp_url(self, url: str) -> None:
|
||||
try:
|
||||
endpoint = f"https://api.telegram.org/bot{self.token}/setChatMenuButton"
|
||||
payload = {"menu_button": json.dumps({
|
||||
"type": "web_app",
|
||||
"text": "MiniApp",
|
||||
"web_app": { "url": url }
|
||||
})}
|
||||
httpx.post(endpoint, data=payload)
|
||||
except httpx.HTTPError as e:
|
||||
logging.warning(f"Failed to update mini app URL: {e}")
|
||||
|
||||
def create_bot(self) -> None:
|
||||
"""
|
||||
Initialize the Telegram bot and set up the conversation handler.
|
||||
"""
|
||||
app = Application.builder().token(self.token).build()
|
||||
|
||||
app.add_error_handler(self.__error_handler)
|
||||
app.add_handler(ConversationHandler(
|
||||
per_message=False, # capire a cosa serve perchè da un warning quando parte il server
|
||||
entry_points=[CommandHandler('start', self.__start)],
|
||||
states={
|
||||
CONFIGS: [
|
||||
CallbackQueryHandler(self.__model_team, pattern=ConfigsChat.MODEL_TEAM.name),
|
||||
CallbackQueryHandler(self.__model_output, pattern=ConfigsChat.MODEL_OUTPUT.name),
|
||||
CallbackQueryHandler(self.__strategy, pattern=ConfigsChat.STRATEGY.name),
|
||||
CallbackQueryHandler(self.__cancel, pattern='^cancel$'),
|
||||
MessageHandler(filters.TEXT, self.__start_team) # Any text message
|
||||
],
|
||||
SELECT_CONFIG: [
|
||||
CallbackQueryHandler(self.__select_config, pattern=f"^__select_config{QUERY_SEP}.*$"),
|
||||
]
|
||||
},
|
||||
fallbacks=[CommandHandler('start', self.__start)],
|
||||
))
|
||||
self.app = app
|
||||
|
||||
def run(self) -> None:
|
||||
self.app.run_polling()
|
||||
|
||||
########################################
|
||||
# Funzioni di utilità
|
||||
########################################
|
||||
async def start_message(self, user: User, query: CallbackQuery | Message) -> None:
|
||||
confs = self.user_requests.setdefault(user, ConfigsRun(self.pipeline.configs))
|
||||
|
||||
str_model_team = f"{ConfigsChat.MODEL_TEAM.value}: {confs.team_model.label}"
|
||||
str_model_output = f"{ConfigsChat.MODEL_OUTPUT.value}: {confs.leader_model.label}"
|
||||
str_strategy = f"{ConfigsChat.STRATEGY.value}: {confs.strategy.label}"
|
||||
|
||||
msg, keyboard = (
|
||||
"Please choose an option or write your query",
|
||||
InlineKeyboardMarkup([
|
||||
[InlineKeyboardButton(str_model_team, callback_data=ConfigsChat.MODEL_TEAM.name)],
|
||||
[InlineKeyboardButton(str_model_output, callback_data=ConfigsChat.MODEL_OUTPUT.name)],
|
||||
[InlineKeyboardButton(str_strategy, callback_data=ConfigsChat.STRATEGY.name)],
|
||||
[InlineKeyboardButton("Cancel", callback_data='cancel')]
|
||||
])
|
||||
)
|
||||
|
||||
if isinstance(query, CallbackQuery):
|
||||
await query.edit_message_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2')
|
||||
else:
|
||||
await query.reply_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2')
|
||||
|
||||
async def handle_callbackquery(self, update: Update) -> tuple[CallbackQuery, User]:
|
||||
assert update.callback_query and update.callback_query.from_user, "Update callback_query or user is None"
|
||||
query = update.callback_query
|
||||
await query.answer() # Acknowledge the callback query
|
||||
return query, query.from_user
|
||||
|
||||
async def handle_message(self, update: Update) -> tuple[Message, User]:
|
||||
assert update.message and update.message.from_user, "Update message or user is None"
|
||||
return update.message, update.message.from_user
|
||||
|
||||
def callback_data(self, strings: list[str]) -> str:
|
||||
return QUERY_SEP.join(strings)
|
||||
|
||||
async def __error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
try:
|
||||
logging.error(f"Unhandled exception in Telegram handler: {context.error}")
|
||||
|
||||
# Try to notify the user in chat if possible
|
||||
if isinstance(update, Update) and update.effective_chat:
|
||||
chat_id = update.effective_chat.id
|
||||
msg = "An error occurred while processing your request."
|
||||
await context.bot.send_message(chat_id=chat_id, text=msg)
|
||||
|
||||
except Exception:
|
||||
# Ensure we never raise from the error handler itself
|
||||
logging.exception("Exception in the error handler")
|
||||
|
||||
#########################################
|
||||
# Funzioni async per i comandi e messaggi
|
||||
#########################################
|
||||
async def __start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
message, user = await self.handle_message(update)
|
||||
logging.info(f"@{user.username} started the conversation.")
|
||||
await self.start_message(user, message)
|
||||
return CONFIGS
|
||||
|
||||
async def __model_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
return await self._model_select(update, ConfigsChat.MODEL_TEAM)
|
||||
|
||||
async def __model_output(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
return await self._model_select(update, ConfigsChat.MODEL_OUTPUT)
|
||||
|
||||
async def _model_select(self, update: Update, state: ConfigsChat, msg: str | None = None) -> int:
|
||||
query, _ = await self.handle_callbackquery(update)
|
||||
|
||||
models = [(m.label, self.callback_data([f"__select_config", str(state), m.name])) for m in self.pipeline.configs.models.all_models]
|
||||
inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models]
|
||||
|
||||
await query.edit_message_text(msg or state.value, reply_markup=InlineKeyboardMarkup(inline_btns))
|
||||
return SELECT_CONFIG
|
||||
|
||||
async def __strategy(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
query, _ = await self.handle_callbackquery(update)
|
||||
|
||||
strategies = [(s.label, self.callback_data([f"__select_config", str(ConfigsChat.STRATEGY), s.name])) for s in self.pipeline.configs.strategies]
|
||||
inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in strategies]
|
||||
|
||||
await query.edit_message_text("Select a strategy", reply_markup=InlineKeyboardMarkup(inline_btns))
|
||||
return SELECT_CONFIG
|
||||
|
||||
async def __select_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
query, user = await self.handle_callbackquery(update)
|
||||
logging.debug(f"@{user.username} --> {query.data}")
|
||||
|
||||
req = self.user_requests[user]
|
||||
_, state, model_name = str(query.data).split(QUERY_SEP)
|
||||
if state == str(ConfigsChat.MODEL_TEAM):
|
||||
req.team_model = self.pipeline.configs.get_model_by_name(model_name)
|
||||
if state == str(ConfigsChat.MODEL_OUTPUT):
|
||||
req.leader_model = self.pipeline.configs.get_model_by_name(model_name)
|
||||
if state == str(ConfigsChat.STRATEGY):
|
||||
req.strategy = self.pipeline.configs.get_strategy_by_name(model_name)
|
||||
|
||||
await self.start_message(user, query)
|
||||
return CONFIGS
|
||||
|
||||
async def __start_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
message, user = await self.handle_message(update)
|
||||
|
||||
confs = self.user_requests[user]
|
||||
confs.user_query = message.text or ""
|
||||
|
||||
logging.info(f"@{user.username} started the team with [{confs.team_model.label}, {confs.leader_model.label}, {confs.strategy.label}]")
|
||||
await self.__run_team(update, confs)
|
||||
|
||||
logging.info(f"@{user.username} team finished.")
|
||||
return ConversationHandler.END
|
||||
|
||||
async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||
query, user = await self.handle_callbackquery(update)
|
||||
logging.info(f"@{user.username} canceled the conversation.")
|
||||
if user in self.user_requests:
|
||||
del self.user_requests[user]
|
||||
await query.edit_message_text("Conversation canceled. Use /start to begin again.")
|
||||
return ConversationHandler.END
|
||||
|
||||
async def __run_team(self, update: Update, confs: ConfigsRun) -> None:
|
||||
if not update.message: return
|
||||
|
||||
bot = update.get_bot()
|
||||
msg_id = update.message.message_id - 1
|
||||
chat_id = update.message.chat_id
|
||||
|
||||
configs_str = [
|
||||
'Running with configurations: ',
|
||||
f'Team: {confs.team_model.label}',
|
||||
f'Output: {confs.leader_model.label}',
|
||||
f'Strategy: {confs.strategy.label}',
|
||||
f'Query: "{confs.user_query}"'
|
||||
]
|
||||
full_message = f"""```\n{'\n'.join(configs_str)}\n```\n\n"""
|
||||
first_message = full_message + "Generating report, please wait"
|
||||
msg = await bot.edit_message_text(chat_id=chat_id, message_id=msg_id, text=first_message, parse_mode='MarkdownV2')
|
||||
if isinstance(msg, bool): return
|
||||
|
||||
# Remove user query and bot message
|
||||
await bot.delete_message(chat_id=chat_id, message_id=update.message.id)
|
||||
|
||||
self.pipeline.leader_model = confs.leader_model
|
||||
self.pipeline.team_model = confs.team_model
|
||||
self.pipeline.strategy = confs.strategy
|
||||
|
||||
# TODO migliorare messaggi di attesa
|
||||
await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
|
||||
report_content = self.pipeline.interact(confs.user_query)
|
||||
await msg.delete()
|
||||
|
||||
# attach report file to the message
|
||||
pdf = MarkdownPdf(toc_level=2, optimize=True)
|
||||
pdf.add_section(Section(report_content, toc=False))
|
||||
|
||||
# TODO vedere se ha senso dare il pdf o solo il messaggio
|
||||
document = io.BytesIO()
|
||||
pdf.save_bytes(document)
|
||||
document.seek(0)
|
||||
await bot.send_document(chat_id=chat_id, document=document, filename="report.pdf", parse_mode='MarkdownV2', caption=full_message)
|
||||
|
||||
Reference in New Issue
Block a user