Add Telegram bot support #23

Merged
Berack96 merged 23 commits from 6-telegram-interface into main 2025-10-13 10:49:46 +02:00
15 changed files with 541 additions and 149 deletions

View File

@@ -5,6 +5,7 @@
# https://makersuite.google.com/app/apikey
GOOGLE_API_KEY=
###############################################################################
# Configurazioni per gli agenti di mercato
###############################################################################
@@ -21,6 +22,7 @@ CRYPTOCOMPARE_API_KEY=
BINANCE_API_KEY=
BINANCE_API_SECRET=
###############################################################################
# Configurazioni per gli agenti di notizie
###############################################################################
@@ -31,6 +33,7 @@ NEWS_API_KEY=
# https://cryptopanic.com/developers/api/
CRYPTOPANIC_API_KEY=
###############################################################################
# Configurazioni per API di social media
###############################################################################
@@ -38,3 +41,11 @@ CRYPTOPANIC_API_KEY=
# https://www.reddit.com/prefs/apps
REDDIT_API_CLIENT_ID=
REDDIT_API_CLIENT_SECRET=
###############################################################################
# Configurazioni per API di messaggistica
###############################################################################
# https://core.telegram.org/bots/features#creating-a-new-bot
TELEGRAM_BOT_TOKEN=

8
.gitignore vendored
View File

@@ -173,8 +173,8 @@ cython_debug/
# PyPI configuration file
.pypirc
# chroma db
./chroma_db/
# VS Code
.vscode/
.vscode/
# Gradio
.gradio/

View File

@@ -91,13 +91,22 @@ uv run src/app
# **Applicazione**
***L'applicazione è attualmente in fase di sviluppo.***
> [!CAUTION]\
> ***L'applicazione è attualmente in fase di sviluppo.***
Usando la libreria ``gradio`` è stata creata un'interfaccia web semplice per interagire con l'agente principale. Gli agenti secondari si trovano nella cartella `src/app/agents` e sono:
- **Market Agent**: Agente unificato che supporta multiple fonti di dati con auto-retry e gestione degli errori.
- **News Agent**: Recupera le notizie finanziarie più recenti sul mercato delle criptovalute.
- **Social Agent**: Analizza i sentimenti sui social media riguardo alle criptovalute.
- **Predictor Agent**: Utilizza i dati raccolti dagli altri agenti per fare previsioni.
L'applicazione viene fatta partire tramite il file [src/app/\_\_main\_\_.py](src/app/__main__.py) che inizializza l'agente principale e gli agenti secondari.
In esso viene creato il server `gradio` per l'interfaccia web e viene anche inizializzato il bot di Telegram (se è stata inserita la chiave nel file `.env` ottenuta da [BotFather](https://core.telegram.org/bots/features#creating-a-new-bot)).
L'interazione è guidata, sia tramite l'interfaccia web che tramite il bot di Telegram; l'utente può scegliere prima di tutto delle opzioni generali (come il modello e la strategia di investimento), dopodiché può inviare un messaggio di testo libero per chiedere consigli o informazioni specifiche. Per esempio: "Qual è l'andamento attuale di Bitcoin?" o "Consigliami quali sono le migliori criptovalute in cui investire questo mese".
L'applicazione, una volta ricevuta la richiesta, la passa al [Team](src/app/agents/team.py) di agenti che si occupano di raccogliere i dati necessari per rispondere in modo completo e ragionato.
Gli agenti coinvolti nel Team sono:
- **Leader**: Coordina gli altri agenti e fornisce la risposta finale all'utente.
- **Market Agent**: Recupera i dati di mercato attuali delle criptovalute da Binance e Yahoo Finance.
- **News Agent**: Recupera le ultime notizie sul mercato delle criptovalute da NewsAPI e GNews.
- **Social Agent**: Recupera i dati dai social media (Reddit) per analizzare il sentiment del mercato.
## Struttura del codice del Progetto

View File

@@ -17,8 +17,8 @@ models:
gemini:
- name: gemini-2.0-flash
label: Gemini
- name: gemini-2.0-pro
label: Gemini Pro
# - name: gemini-2.0-pro # TODO Non funziona, ha un nome diverso
# label: Gemini Pro
ollama:
copilot-pull-request-reviewer[bot] commented 2025-10-13 10:47:12 +02:00 (Migrated from github.com)
Review

The Italian comment contains a spelling error: 'Non funziona' should be 'Non funziona' (already correct), but 'ha' should be 'ha' (already correct). However, there's inconsistent capitalization - 'Non' should be lowercase 'non' in Italian.

    # - name: gemini-2.0-pro # TODO non funziona, ha un nome diverso
The Italian comment contains a spelling error: 'Non funziona' should be 'Non funziona' (already correct), but 'ha' should be 'ha' (already correct). However, there's inconsistent capitalization - 'Non' should be lowercase 'non' in Italian. ```suggestion # - name: gemini-2.0-pro # TODO non funziona, ha un nome diverso ```
- name: gpt-oss:latest
label: Ollama GPT

View File

@@ -0,0 +1,59 @@
import os
from dotenv import load_dotenv
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import Application, CommandHandler, CallbackQueryHandler, MessageHandler, filters, ContextTypes
# Esempio di funzione per gestire il comando /start
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message: return
copilot-pull-request-reviewer[bot] commented 2025-10-12 20:27:14 +02:00 (Migrated from github.com)
Review

[nitpick] Consider using explicit None return for better code clarity: 'return None' instead of bare 'return'.

    if not update.message: return None
[nitpick] Consider using explicit None return for better code clarity: 'return None' instead of bare 'return'. ```suggestion if not update.message: return None ```
await update.message.reply_text('Ciao! Inviami un messaggio e ti risponderò!')
# Esempio di funzione per fare echo del messaggio ricevuto
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE):
message = update.message
if not message: return
copilot-pull-request-reviewer[bot] commented 2025-10-12 20:27:14 +02:00 (Migrated from github.com)
Review

[nitpick] Consider using explicit None return for better code clarity: 'return None' instead of bare 'return'.

    if not message: return None
[nitpick] Consider using explicit None return for better code clarity: 'return None' instead of bare 'return'. ```suggestion if not message: return None ```
print(f"Ricevuto messaggio: {message.text} da chat id: {message.chat.id}")
await message.reply_text(text=f"Hai detto: {message.text}")
# Esempio di funzione per far partire una inline keyboard (comando /keyboard)
async def inline_keyboard(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message: return
copilot-pull-request-reviewer[bot] commented 2025-10-12 20:27:14 +02:00 (Migrated from github.com)
Review

[nitpick] Consider using explicit None return for better code clarity: 'return None' instead of bare 'return'.

    if not update.message: return None
[nitpick] Consider using explicit None return for better code clarity: 'return None' instead of bare 'return'. ```suggestion if not update.message: return None ```
keyboard = [
[
InlineKeyboardButton("Option 1", callback_data='1'),
InlineKeyboardButton("Option 2", callback_data='2'),
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text('Please choose:', reply_markup=reply_markup)
async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.callback_query
if not query: return
copilot-pull-request-reviewer[bot] commented 2025-10-12 20:27:14 +02:00 (Migrated from github.com)
Review

[nitpick] Consider using explicit None return for better code clarity: 'return None' instead of bare 'return'.

    if not query: return None
[nitpick] Consider using explicit None return for better code clarity: 'return None' instead of bare 'return'. ```suggestion if not query: return None ```
await query.answer()
await query.edit_message_text(text=f"Selected option: {query.data}")
def main():
print("Bot in ascolto...")
load_dotenv()
token = os.getenv("TELEGRAM_BOT_TOKEN", '')
app = Application.builder().token(token).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("keyboard", inline_keyboard))
app.add_handler(MessageHandler(filters=filters.TEXT, callback=echo))
app.add_handler(CallbackQueryHandler(button_handler))
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()

View File

@@ -36,6 +36,10 @@ dependencies = [
# API di social media
"praw", # Reddit
# Per telegram bot
"python-telegram-bot", # Interfaccia Telegram Bot
"markdown-pdf", # Per convertire markdown in pdf
]
[tool.pytest.ini_options]

View File

@@ -1,86 +1,31 @@
import asyncio
import gradio as gr
import logging
from dotenv import load_dotenv
from agno.utils.log import log_info #type: ignore
from app.configs import AppConfig
from app.interface import ChatManager
from app.interface import *
from app.agents import Pipeline
if __name__ == "__main__":
# Inizializzazioni
load_dotenv()
configs = AppConfig.load()
pipeline = Pipeline(configs)
chat = ChatManager()
########################################
# Funzioni Gradio
########################################
def respond(message: str, history: list[dict[str, str]]) -> tuple[list[dict[str, str]], list[dict[str, str]], str]:
chat.send_message(message)
response = pipeline.interact(message)
chat.receive_message(response)
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": response})
return history, history, ""
def save_current_chat() -> str:
chat.save_chat("chat.json")
return "💾 Chat salvata in chat.json"
def load_previous_chat() -> tuple[list[dict[str, str]], list[dict[str, str]]]:
chat.load_chat("chat.json")
history: list[dict[str, str]] = []
for m in chat.get_history():
history.append({"role": m["role"], "content": m["content"]})
return history, history
def reset_chat() -> tuple[list[dict[str, str]], list[dict[str, str]]]:
chat.reset_chat()
return [], []
########################################
# Interfaccia Gradio
########################################
with gr.Blocks() as demo:
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)")
# Dropdown provider e stile
with gr.Row():
provider = gr.Dropdown(
choices=pipeline.list_providers(),
type="index",
label="Modello da usare"
)
provider.change(fn=pipeline.choose_predictor, inputs=provider, outputs=None)
style = gr.Dropdown(
choices=pipeline.list_styles(),
type="index",
label="Stile di investimento"
)
style.change(fn=pipeline.choose_strategy, inputs=style, outputs=None)
chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages")
msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?")
with gr.Row():
clear_btn = gr.Button("🗑️ Reset Chat")
save_btn = gr.Button("💾 Salva Chat")
load_btn = gr.Button("📂 Carica Chat")
# Eventi e interazioni
msg.submit(respond, inputs=[msg, chatbot], outputs=[chatbot, chatbot, msg])
clear_btn.click(reset_chat, inputs=None, outputs=[chatbot, chatbot])
save_btn.click(save_current_chat, inputs=None, outputs=None)
load_btn.click(load_previous_chat, inputs=None, outputs=[chatbot, chatbot])
chat = ChatManager(pipeline)
gradio = chat.gradio_build_interface()
_app, local_url, share_url = gradio.launch(server_name="0.0.0.0", server_port=configs.port, quiet=True, prevent_thread_lock=True, share=configs.gradio_share)
logging.info(f"UPO AppAI Chat is running on {share_url or local_url}")
try:
_app, local, shared = demo.launch(server_name="0.0.0.0", server_port=configs.port, quiet=True, prevent_thread_lock=True, share=configs.gradio_share)
log_info(f"Starting UPO AppAI Chat on {shared or local}")
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
demo.close()
telegram = TelegramApp(pipeline)
telegram.add_miniapp_url(share_url)
telegram.run()
except AssertionError as e:
try:
logging.warning(f"Telegram bot could not be started: {e}")
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
logging.info("Shutting down due to KeyboardInterrupt")
finally:
gradio.close()

View File

@@ -1,10 +1,10 @@
from agno.run.agent import RunOutput
import logging
from app.agents.team import create_team_with
from app.agents.predictor import PredictorInput, PredictorOutput
from app.agents.prompts import *
from app.api.core.markets import ProductInfo
from app.configs import AppConfig
logging = logging.getLogger("pipeline")
class Pipeline:
"""
@@ -17,27 +17,30 @@ class Pipeline:
self.configs = configs
# Stato iniziale
self.choose_strategy(0)
self.choose_predictor(0)
self.leader_model = self.configs.get_model_by_name(self.configs.agents.team_leader_model)
self.team_model = self.configs.get_model_by_name(self.configs.agents.team_model)
self.strategy = self.configs.get_strategy_by_name(self.configs.agents.strategy)
# ======================
# Dropdown handlers
# ======================
def choose_predictor(self, index: int):
def choose_leader(self, index: int):
"""
Sceglie il modello LLM da usare per il Predictor.
Sceglie il modello LLM da usare per il Team.
"""
model = self.configs.models.all_models[index]
self.predictor = model.get_agent(
PREDICTOR_INSTRUCTIONS,
output_schema=PredictorOutput,
)
self.leader_model = self.configs.models.all_models[index]
def choose_team(self, index: int):
"""
Sceglie il modello LLM da usare per il Team.
"""
self.team_model = self.configs.models.all_models[index]
def choose_strategy(self, index: int):
"""
Sceglie la strategia da usare per il Predictor.
"""
self.strat = self.configs.strategies[index].description
self.strategy = self.configs.strategies[index]
# ======================
# Helpers
@@ -64,46 +67,18 @@ class Pipeline:
3. Invoca Predictor
4. Restituisce la strategia finale
"""
# Step 1: raccolta output dai membri del Team
team_model = self.configs.get_model_by_name(self.configs.agents.team_model)
leader_model = self.configs.get_model_by_name(self.configs.agents.team_leader_model)
# Step 1: Creazione Team
team = create_team_with(self.configs, self.team_model, self.leader_model)
team = create_team_with(self.configs, team_model, leader_model)
# Step 2: raccolta output dai membri del Team
logging.info(f"Pipeline received query: {query}")
# TODO migliorare prompt (?)
query = f"The user query is: {query}\n\n They requested a {self.strategy.label} investment strategy."
team_outputs = team.run(query) # type: ignore
# Step 2: aggregazione output strutturati
all_products: list[ProductInfo] = []
sentiments: list[str] = []
for agent_output in team_outputs.member_responses:
if isinstance(agent_output, RunOutput) and agent_output.metadata is not None:
keys = agent_output.metadata.keys()
if "products" in keys:
all_products.extend(agent_output.metadata["products"])
if "sentiment_news" in keys:
sentiments.append(agent_output.metadata["sentiment_news"])
if "sentiment_social" in keys:
sentiments.append(agent_output.metadata["sentiment_social"])
aggregated_sentiment = "\n".join(sentiments)
# Step 3: invocazione Predictor
predictor_input = PredictorInput(
data=all_products,
style=self.strat,
sentiment=aggregated_sentiment
)
result = self.predictor.run(predictor_input) # type: ignore
if not isinstance(result.content, PredictorOutput):
return "❌ Errore: il modello non ha restituito un output valido."
prediction: PredictorOutput = result.content
# Step 4: restituzione strategia finale
portfolio_lines = "\n".join(
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
)
return (
f"📊 Strategia ({self.strat}): {prediction.strategy}\n\n"
f"💼 Portafoglio consigliato:\n{portfolio_lines}"
)
# Step 3: recupero ouput
if not isinstance(team_outputs.content, str):
logging.error(f"Team output is not a string: {team_outputs.content}")
raise ValueError("Team output is not a string")
logging.info(f"Team finished")
return team_outputs.content

View File

@@ -59,6 +59,7 @@ class RedditWrapper(SocialWrapper):
client_id=client_id,
client_secret=client_secret,
user_agent="upo-appAI",
check_for_async=False,
)
self.subreddits = self.tool.subreddit("+".join(SUBREDDITS))

View File

@@ -1,9 +1,10 @@
import inspect
import logging
import time
import traceback
from typing import Any, Callable, Generic, TypeVar
from agno.utils.log import log_info, log_warning #type: ignore
logging = logging.getLogger("wrapper_handler")
WrapperType = TypeVar("WrapperType")
WrapperClassType = TypeVar("WrapperClassType")
OutputType = TypeVar("OutputType")
@@ -86,7 +87,7 @@ class WrapperHandler(Generic[WrapperType]):
Exception: If all wrappers fail after retries.
"""
log_info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
logging.info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
results: dict[str, OutputType] = {}
starting_index = self.index
@@ -96,18 +97,18 @@ class WrapperHandler(Generic[WrapperType]):
wrapper_name = wrapper.__class__.__name__
if not try_all:
log_info(f"try_call {wrapper_name}")
logging.info(f"try_call {wrapper_name}")
for try_count in range(1, self.retry_per_wrapper + 1):
try:
result = func(wrapper)
log_info(f"{wrapper_name} succeeded")
logging.info(f"{wrapper_name} succeeded")
results[wrapper_name] = result
break
except Exception as e:
error = WrapperHandler.__concise_error(e)
log_warning(f"{wrapper_name} failed {try_count}/{self.retry_per_wrapper}: {error}")
logging.warning(f"{wrapper_name} failed {try_count}/{self.retry_per_wrapper}: {error}")
time.sleep(self.retry_delay)
if not try_all and results:
@@ -153,6 +154,6 @@ class WrapperHandler(Generic[WrapperType]):
wrapper = wrapper_class(**(kwargs or {}))
result.append(wrapper)
except Exception as e:
log_warning(f"{wrapper_class} cannot be initialized: {e}")
logging.warning(f"'{wrapper_class.__name__}' cannot be initialized: {e}")
return WrapperHandler(result, try_per_wrapper, retry_delay)

View File

@@ -145,6 +145,17 @@ class AppConfig(BaseModel):
return strat
raise ValueError(f"Strategy with name '{name}' not found.")
def get_defaults(self) -> tuple[AppModel, AppModel, Strategy]:
"""
Retrieve the default team model, leader model, and strategy.
Returns:
A tuple containing the default team model (AppModel), leader model (AppModel), and strategy (Strategy).
"""
team_model = self.get_model_by_name(self.agents.team_model)
leader_model = self.get_model_by_name(self.agents.team_leader_model)
strategy = self.get_strategy_by_name(self.agents.strategy)
return team_model, leader_model, strategy
def set_logging_level(self) -> None:
"""
Set the logging level based on the configuration.

View File

@@ -1,3 +1,4 @@
from app.interface.chat import ChatManager
from app.interface.telegram_app import TelegramApp
__all__ = ["ChatManager"]
__all__ = ["ChatManager", "TelegramApp"]

View File

@@ -1,5 +1,8 @@
import json
import os
import json
import gradio as gr
from app.agents.pipeline import Pipeline
class ChatManager:
"""
@@ -9,8 +12,9 @@ class ChatManager:
- salva e ricarica le chat
"""
def __init__(self):
def __init__(self, pipeline: Pipeline):
self.history: list[dict[str, str]] = [] # [{"role": "user"/"assistant", "content": "..."}]
self.pipeline = pipeline
def send_message(self, message: str) -> None:
"""
@@ -56,3 +60,66 @@ class ChatManager:
Restituisce lo storico completo della chat.
"""
return self.history
########################################
# Funzioni Gradio
########################################
def gradio_respond(self, message: str, history: list[dict[str, str]]) -> tuple[list[dict[str, str]], list[dict[str, str]], str]:
self.send_message(message)
response = self.pipeline.interact(message)
self.receive_message(response)
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": response})
return history, history, ""
def gradio_save(self) -> str:
self.save_chat("chat.json")
return "💾 Chat salvata in chat.json"
def gradio_load(self) -> tuple[list[dict[str, str]], list[dict[str, str]]]:
self.load_chat("chat.json")
history: list[dict[str, str]] = []
for m in self.get_history():
history.append({"role": m["role"], "content": m["content"]})
return history, history
def gradio_clear(self) -> tuple[list[dict[str, str]], list[dict[str, str]]]:
self.reset_chat()
return [], []
def gradio_build_interface(self) -> gr.Blocks:
with gr.Blocks() as interface:
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)")
# Dropdown provider e stile
with gr.Row():
provider = gr.Dropdown(
choices=self.pipeline.list_providers(),
type="index",
label="Modello da usare"
)
provider.change(fn=self.pipeline.choose_leader, inputs=provider, outputs=None)
style = gr.Dropdown(
choices=self.pipeline.list_styles(),
type="index",
label="Stile di investimento"
)
style.change(fn=self.pipeline.choose_strategy, inputs=style, outputs=None)
chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages")
msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?")
with gr.Row():
clear_btn = gr.Button("🗑️ Reset Chat")
save_btn = gr.Button("💾 Salva Chat")
load_btn = gr.Button("📂 Carica Chat")
# Eventi e interazioni
msg.submit(self.gradio_respond, inputs=[msg, chatbot], outputs=[chatbot, chatbot, msg])
clear_btn.click(self.gradio_clear, inputs=None, outputs=[chatbot, chatbot])
save_btn.click(self.gradio_save, inputs=None, outputs=None)
load_btn.click(self.gradio_load, inputs=None, outputs=[chatbot, chatbot])
return interface

View File

@@ -0,0 +1,264 @@
import io
copilot-pull-request-reviewer[bot] commented 2025-10-09 13:55:41 +02:00 (Migrated from github.com)
Review

Comment is in Italian and not descriptive. Consider using English and explaining why the typing is complex.

    # Typing for users_req is complex because it maps User objects to ConfigsRun instances.
Comment is in Italian and not descriptive. Consider using English and explaining why the typing is complex. ```suggestion # Typing for users_req is complex because it maps User objects to ConfigsRun instances. ```
import os
import json
import httpx
import logging
import warnings
from enum import Enum
from markdown_pdf import MarkdownPdf, Section
from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User
from telegram.constants import ChatAction
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, ConversationHandler, MessageHandler, filters
from app.agents.pipeline import Pipeline
from app.configs import AppConfig
# per per_message di ConversationHandler che rompe sempre qualunque input tu metta
warnings.filterwarnings("ignore")
logging = logging.getLogger("telegram")
# Lo stato cambia in base al valore di ritorno delle funzioni async
# END state è già definito in telegram.ext.ConversationHandler
# Un semplice schema delle interazioni:
# /start
# ║
# V
# ╔══ CONFIGS <═════╗
# ║ ║ ╚══> SELECT_CONFIG
# ║ V
# ║ start_team (polling for updates)
# ║ ║
# ║ V
# ╚═══> END
CONFIGS, SELECT_CONFIG = range(2)
# Usato per separare la query arrivata da Telegram
QUERY_SEP = "|==|"
class ConfigsChat(Enum):
MODEL_TEAM = "Team Model"
MODEL_OUTPUT = "Output Model"
STRATEGY = "Strategy"
class ConfigsRun:
def __init__(self, configs: AppConfig):
team, leader, strategy = configs.get_defaults()
self.team_model = team
self.leader_model = leader
self.strategy = strategy
self.user_query = ""
class TelegramApp:
def __init__(self, pipeline: Pipeline):
token = os.getenv("TELEGRAM_BOT_TOKEN")
assert token, "TELEGRAM_BOT_TOKEN environment variable not set"
self.user_requests: dict[User, ConfigsRun] = {}
self.pipeline = pipeline
self.token = token
self.create_bot()
def add_miniapp_url(self, url: str) -> None:
try:
endpoint = f"https://api.telegram.org/bot{self.token}/setChatMenuButton"
payload = {"menu_button": json.dumps({
"type": "web_app",
"text": "MiniApp",
"web_app": { "url": url }
})}
httpx.post(endpoint, data=payload)
except httpx.HTTPError as e:
logging.warning(f"Failed to update mini app URL: {e}")
def create_bot(self) -> None:
"""
Initialize the Telegram bot and set up the conversation handler.
"""
app = Application.builder().token(self.token).build()
app.add_error_handler(self.__error_handler)
app.add_handler(ConversationHandler(
per_message=False, # capire a cosa serve perchè da un warning quando parte il server
copilot-pull-request-reviewer[bot] commented 2025-10-09 13:55:41 +02:00 (Migrated from github.com)
Review

Comment is in Italian. Consider translating to English: '# understand what this does because it gives a warning when the server starts'

            per_message=False, # understand what this does because it gives a warning when the server starts
Comment is in Italian. Consider translating to English: '# understand what this does because it gives a warning when the server starts' ```suggestion per_message=False, # understand what this does because it gives a warning when the server starts ```
copilot-pull-request-reviewer[bot] commented 2025-10-12 20:27:13 +02:00 (Migrated from github.com)
Review

Corrected spelling of 'perchè' to 'perché' in Italian comment.

            per_message=False, # capire a cosa serve perché da un warning quando parte il server
Corrected spelling of 'perchè' to 'perché' in Italian comment. ```suggestion per_message=False, # capire a cosa serve perché da un warning quando parte il server ```
entry_points=[CommandHandler('start', self.__start)],
states={
CONFIGS: [
CallbackQueryHandler(self.__model_team, pattern=ConfigsChat.MODEL_TEAM.name),
CallbackQueryHandler(self.__model_output, pattern=ConfigsChat.MODEL_OUTPUT.name),
CallbackQueryHandler(self.__strategy, pattern=ConfigsChat.STRATEGY.name),
CallbackQueryHandler(self.__cancel, pattern='^cancel$'),
MessageHandler(filters.TEXT, self.__start_team) # Any text message
],
SELECT_CONFIG: [
CallbackQueryHandler(self.__select_config, pattern=f"^__select_config{QUERY_SEP}.*$"),
]
},
fallbacks=[CommandHandler('start', self.__start)],
))
self.app = app
def run(self) -> None:
self.app.run_polling()
########################################
# Funzioni di utilità
########################################
async def start_message(self, user: User, query: CallbackQuery | Message) -> None:
confs = self.user_requests.setdefault(user, ConfigsRun(self.pipeline.configs))
str_model_team = f"{ConfigsChat.MODEL_TEAM.value}: {confs.team_model.label}"
str_model_output = f"{ConfigsChat.MODEL_OUTPUT.value}: {confs.leader_model.label}"
str_strategy = f"{ConfigsChat.STRATEGY.value}: {confs.strategy.label}"
msg, keyboard = (
"Please choose an option or write your query",
InlineKeyboardMarkup([
[InlineKeyboardButton(str_model_team, callback_data=ConfigsChat.MODEL_TEAM.name)],
[InlineKeyboardButton(str_model_output, callback_data=ConfigsChat.MODEL_OUTPUT.name)],
[InlineKeyboardButton(str_strategy, callback_data=ConfigsChat.STRATEGY.name)],
[InlineKeyboardButton("Cancel", callback_data='cancel')]
])
)
if isinstance(query, CallbackQuery):
await query.edit_message_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2')
else:
await query.reply_text(msg, reply_markup=keyboard, parse_mode='MarkdownV2')
async def handle_callbackquery(self, update: Update) -> tuple[CallbackQuery, User]:
assert update.callback_query and update.callback_query.from_user, "Update callback_query or user is None"
query = update.callback_query
await query.answer() # Acknowledge the callback query
return query, query.from_user
async def handle_message(self, update: Update) -> tuple[Message, User]:
assert update.message and update.message.from_user, "Update message or user is None"
return update.message, update.message.from_user
def callback_data(self, strings: list[str]) -> str:
return QUERY_SEP.join(strings)
async def __error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
logging.error(f"Unhandled exception in Telegram handler: {context.error}")
# Try to notify the user in chat if possible
if isinstance(update, Update) and update.effective_chat:
chat_id = update.effective_chat.id
msg = "An error occurred while processing your request."
await context.bot.send_message(chat_id=chat_id, text=msg)
except Exception:
# Ensure we never raise from the error handler itself
logging.exception("Exception in the error handler")
#########################################
# Funzioni async per i comandi e messaggi
#########################################
async def __start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
message, user = await self.handle_message(update)
logging.info(f"@{user.username} started the conversation.")
await self.start_message(user, message)
return CONFIGS
async def __model_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
return await self._model_select(update, ConfigsChat.MODEL_TEAM)
async def __model_output(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
return await self._model_select(update, ConfigsChat.MODEL_OUTPUT)
async def _model_select(self, update: Update, state: ConfigsChat, msg: str | None = None) -> int:
query, _ = await self.handle_callbackquery(update)
models = [(m.label, self.callback_data([f"__select_config", str(state), m.name])) for m in self.pipeline.configs.models.all_models]
inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in models]
await query.edit_message_text(msg or state.value, reply_markup=InlineKeyboardMarkup(inline_btns))
return SELECT_CONFIG
async def __strategy(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query, _ = await self.handle_callbackquery(update)
strategies = [(s.label, self.callback_data([f"__select_config", str(ConfigsChat.STRATEGY), s.name])) for s in self.pipeline.configs.strategies]
inline_btns = [[InlineKeyboardButton(name, callback_data=callback_data)] for name, callback_data in strategies]
await query.edit_message_text("Select a strategy", reply_markup=InlineKeyboardMarkup(inline_btns))
return SELECT_CONFIG
async def __select_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query, user = await self.handle_callbackquery(update)
logging.debug(f"@{user.username} --> {query.data}")
req = self.user_requests[user]
_, state, model_name = str(query.data).split(QUERY_SEP)
if state == str(ConfigsChat.MODEL_TEAM):
req.team_model = self.pipeline.configs.get_model_by_name(model_name)
if state == str(ConfigsChat.MODEL_OUTPUT):
req.leader_model = self.pipeline.configs.get_model_by_name(model_name)
if state == str(ConfigsChat.STRATEGY):
req.strategy = self.pipeline.configs.get_strategy_by_name(model_name)
await self.start_message(user, query)
return CONFIGS
async def __start_team(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
message, user = await self.handle_message(update)
confs = self.user_requests[user]
confs.user_query = message.text or ""
logging.info(f"@{user.username} started the team with [{confs.team_model.label}, {confs.leader_model.label}, {confs.strategy.label}]")
await self.__run_team(update, confs)
logging.info(f"@{user.username} team finished.")
return ConversationHandler.END
async def __cancel(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query, user = await self.handle_callbackquery(update)
logging.info(f"@{user.username} canceled the conversation.")
if user in self.user_requests:
del self.user_requests[user]
await query.edit_message_text("Conversation canceled. Use /start to begin again.")
return ConversationHandler.END
async def __run_team(self, update: Update, confs: ConfigsRun) -> None:
if not update.message: return
copilot-pull-request-reviewer[bot] commented 2025-10-12 20:27:13 +02:00 (Migrated from github.com)
Review

[nitpick] Consider using explicit None return for better code clarity: 'return None' instead of bare 'return'.

        if not update.message: return None
[nitpick] Consider using explicit None return for better code clarity: 'return None' instead of bare 'return'. ```suggestion if not update.message: return None ```
bot = update.get_bot()
msg_id = update.message.message_id - 1
chat_id = update.message.chat_id
configs_str = [
'Running with configurations: ',
f'Team: {confs.team_model.label}',
f'Output: {confs.leader_model.label}',
f'Strategy: {confs.strategy.label}',
f'Query: "{confs.user_query}"'
]
full_message = f"""```\n{'\n'.join(configs_str)}\n```\n\n"""
first_message = full_message + "Generating report, please wait"
msg = await bot.edit_message_text(chat_id=chat_id, message_id=msg_id, text=first_message, parse_mode='MarkdownV2')
if isinstance(msg, bool): return
# Remove user query and bot message
await bot.delete_message(chat_id=chat_id, message_id=update.message.id)
self.pipeline.leader_model = confs.leader_model
self.pipeline.team_model = confs.team_model
self.pipeline.strategy = confs.strategy
# TODO migliorare messaggi di attesa
await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
report_content = self.pipeline.interact(confs.user_query)
await msg.delete()
# attach report file to the message
pdf = MarkdownPdf(toc_level=2, optimize=True)
pdf.add_section(Section(report_content, toc=False))
# TODO vedere se ha senso dare il pdf o solo il messaggio
document = io.BytesIO()
pdf.save_bytes(document)
document.seek(0)
await bot.send_document(chat_id=chat_id, document=document, filename="report.pdf", parse_mode='MarkdownV2', caption=full_message)

50
uv.lock generated
View File

@@ -816,14 +816,27 @@ wheels = [
[[package]]
name = "markdown-it-py"
version = "4.0.0"
version = "3.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "mdurl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" }
sdist = { url = "https://files.pythonhosted.org/packages/38/71/3b932df36c1a044d397a1f92d1cf91ee0a503d91e470cbd670aa66b07ed0/markdown-it-py-3.0.0.tar.gz", hash = "sha256:e3f60a94fa066dc52ec76661e37c851cb232d92f9886b15cb560aaada2df8feb", size = 74596, upload-time = "2023-06-03T06:41:14.443Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" },
{ url = "https://files.pythonhosted.org/packages/42/d7/1ec15b46af6af88f19b8e5ffea08fa375d433c998b8a7639e76935c14f1f/markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1", size = 87528, upload-time = "2023-06-03T06:41:11.019Z" },
]
[[package]]
name = "markdown-pdf"
version = "1.10"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
{ name = "pymupdf" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5e/e6/969311a194074afa9672324244adbf64a7e8663f2ba0003395b7140f5c4a/markdown_pdf-1.10.tar.gz", hash = "sha256:bcf23d816baa56aec3a60f940681652c4e46ee048c6335835cddf86d1ff20a8e", size = 17783, upload-time = "2025-09-24T19:01:38.758Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1f/78/c593979cf1525be786d63b285a7a67afae397fc132382158432490ebd1ed/markdown_pdf-1.10-py3-none-any.whl", hash = "sha256:1863e78454e5aa9bcb34c125f385d4ff045c727660c5172877e82e69d06fae6d", size = 17994, upload-time = "2025-09-24T19:01:37.155Z" },
]
[[package]]
@@ -1238,6 +1251,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/61/ad/689f02752eeec26aed679477e80e632ef1b682313be70793d798c1d5fc8f/PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb", size = 22997, upload-time = "2024-11-28T03:43:27.893Z" },
]
[[package]]
name = "pymupdf"
version = "1.26.4"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/90/35/031556dfc0d332d8e9ed9b61ca105138606d3f8971b9eb02e20118629334/pymupdf-1.26.4.tar.gz", hash = "sha256:be13a066d42bfaed343a488168656637c4d9843ddc63b768dc827c9dfc6b9989", size = 83077563, upload-time = "2025-08-25T14:20:29.499Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/27/ae/3be722886cc7be2093585cd94f466db1199133ab005645a7a567b249560f/pymupdf-1.26.4-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:cb95562a0a63ce906fd788bdad5239063b63068cf4a991684f43acb09052cb99", size = 23061974, upload-time = "2025-08-25T14:16:58.811Z" },
{ url = "https://files.pythonhosted.org/packages/fc/b0/9a451d837e1fe18ecdbfbc34a6499f153c8a008763229cc634725383a93f/pymupdf-1.26.4-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:67e9e6b45832c33726651c2a031e9a20108fd9e759140b9e843f934de813a7ff", size = 22410112, upload-time = "2025-08-25T14:17:24.511Z" },
{ url = "https://files.pythonhosted.org/packages/d8/13/0916e8e02cb5453161fb9d9167c747d0a20d58633e30728645374153f815/pymupdf-1.26.4-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:2604f687dd02b6a1b98c81bd8becfc0024899a2d2085adfe3f9e91607721fd22", size = 23454948, upload-time = "2025-08-25T21:20:07.71Z" },
{ url = "https://files.pythonhosted.org/packages/4e/c6/d3cfafc75d383603884edeabe4821a549345df954a88d79e6764e2c87601/pymupdf-1.26.4-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:973a6dda61ebd34040e4df3753bf004b669017663fbbfdaa294d44eceba98de0", size = 24060686, upload-time = "2025-08-25T14:17:56.536Z" },
{ url = "https://files.pythonhosted.org/packages/72/08/035e9d22c801e801bba50c6745bc90ba8696a042fe2c68793e28bf0c3b07/pymupdf-1.26.4-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:299a49797df5b558e695647fa791329ba3911cbbb31ed65f24a6266c118ef1a7", size = 24265046, upload-time = "2025-08-25T14:18:21.238Z" },
{ url = "https://files.pythonhosted.org/packages/28/8c/c201e4846ec0fb6ae5d52aa3a5d66f9355f0c69fb94230265714df0de65e/pymupdf-1.26.4-cp39-abi3-win32.whl", hash = "sha256:51b38379aad8c71bd7a8dd24d93fbe7580c2a5d9d7e1f9cd29ebbba315aa1bd1", size = 17127332, upload-time = "2025-08-25T14:18:39.132Z" },
{ url = "https://files.pythonhosted.org/packages/d1/c4/87d27b108c2f6d773aa5183c5ae367b2a99296ea4bc16eb79f453c679e30/pymupdf-1.26.4-cp39-abi3-win_amd64.whl", hash = "sha256:0b6345a93a9afd28de2567e433055e873205c52e6b920b129ca50e836a3aeec6", size = 18743491, upload-time = "2025-08-25T14:19:01.104Z" },
]
[[package]]
name = "pytest"
version = "8.4.2"
@@ -1301,6 +1329,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/45/58/38b5afbc1a800eeea951b9285d3912613f2603bdf897a4ab0f4bd7f405fc/python_multipart-0.0.20-py3-none-any.whl", hash = "sha256:8a62d3a8335e06589fe01f2a3e178cdcc632f3fbe0d492ad9ee0ec35aab1f104", size = 24546, upload-time = "2024-12-16T19:45:44.423Z" },
]
[[package]]
name = "python-telegram-bot"
version = "22.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "httpx" },
]
sdist = { url = "https://files.pythonhosted.org/packages/0b/6b/400f88e5c29a270c1c519a3ca8ad0babc650ec63dbfbd1b73babf625ed54/python_telegram_bot-22.5.tar.gz", hash = "sha256:82d4efd891d04132f308f0369f5b5929e0b96957901f58bcef43911c5f6f92f8", size = 1488269, upload-time = "2025-09-27T13:50:27.879Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bc/c3/340c7520095a8c79455fcf699cbb207225e5b36490d2b9ee557c16a7b21b/python_telegram_bot-22.5-py3-none-any.whl", hash = "sha256:4b7cd365344a7dce54312cc4520d7fa898b44d1a0e5f8c74b5bd9b540d035d16", size = 730976, upload-time = "2025-09-27T13:50:25.93Z" },
]
[[package]]
name = "pytz"
version = "2025.2"
@@ -1622,11 +1662,13 @@ dependencies = [
{ name = "gnews" },
{ name = "google-genai" },
{ name = "gradio" },
{ name = "markdown-pdf" },
{ name = "newsapi-python" },
{ name = "ollama" },
{ name = "praw" },
{ name = "pytest" },
{ name = "python-binance" },
{ name = "python-telegram-bot" },
{ name = "yfinance" },
]
@@ -1640,11 +1682,13 @@ requires-dist = [
{ name = "gnews" },
{ name = "google-genai" },
{ name = "gradio" },
{ name = "markdown-pdf" },
{ name = "newsapi-python" },
{ name = "ollama" },
{ name = "praw" },
{ name = "pytest" },
{ name = "python-binance" },
{ name = "python-telegram-bot" },
{ name = "yfinance" },
]