diff --git a/src/app/agents/action_registry.py b/src/app/agents/action_registry.py new file mode 100644 index 0000000..2d04d09 --- /dev/null +++ b/src/app/agents/action_registry.py @@ -0,0 +1,37 @@ +from typing import Any, Callable + + +# Registro centrale popolato da tutti i file Toolkit all'avvio. +ACTION_DESCRIPTIONS: dict[str, str] = {} + +def get_user_friendly_action(tool_name: str) -> str: + """ + Restituisce un messaggio leggibile e descrittivo per l'utente + leggendo dal registro globale. + """ + # Usa il dizionario ACTION_DESCRIPTIONS importato + return ACTION_DESCRIPTIONS.get(tool_name, f"⚙️ Eseguo l'operazione: {tool_name}...") + +def friendly_action(description: str) -> Callable[..., Any]: + """ + Decoratore che registra automaticamente la descrizione "user-friendly" + di un metodo nel registro globale. + + Questo decoratore viene eseguito all'avvio dell'app (quando i file + vengono importati) e popola il dizionario ACTION_DESCRIPTIONS. + + Restituisce la funzione originale non modificata. + """ + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + # Registra l'azione + tool_name = func.__name__ + if tool_name in ACTION_DESCRIPTIONS: + print(f"Attenzione: Azione '{tool_name}' registrata più volte.") + + ACTION_DESCRIPTIONS[tool_name] = description + + # Restituisce la funzione originale + return func + + return decorator diff --git a/src/app/agents/core.py b/src/app/agents/core.py index a4adb43..28deb7d 100644 --- a/src/app/agents/core.py +++ b/src/app/agents/core.py @@ -45,28 +45,28 @@ class PipelineInputs: """ Sceglie il modello LLM da usare per l'analizzatore di query. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.query_analyzer_model = self.configs.models.all_models[index] def choose_team_leader(self, index: int): """ Sceglie il modello LLM da usare per il Team Leader. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.team_leader_model = self.configs.models.all_models[index] def choose_team(self, index: int): """ Sceglie il modello LLM da usare per il Team. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.team_model = self.configs.models.all_models[index] def choose_report_generator(self, index: int): """ Sceglie il modello LLM da usare per il generatore di report. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.report_generation_model = self.configs.models.all_models[index] def choose_strategy(self, index: int): @@ -145,21 +145,31 @@ class RunMessage: - In esecuzione (➡️) - Completato (✅) - Lo stato di esecuzione può essere assegnato solo ad uno step alla volta. + Lo stato di esecuzione può essere assegnato solo a uno step alla volta. Args: - inputs (PipelineInputs): Input della pipeline per mostrare la configurazione. - prefix (str, optional): Prefisso del messaggio. Defaults to "". - suffix (str, optional): Suffisso del messaggio. Defaults to "". + inputs (PipelineInputs): Input della pipeline per mostrare la configurazione + prefix (str, optional): Prefisso del messaggio. Defaults to "" + suffix (str, optional): Suffisso del messaggio. Defaults to "" """ self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n" self.emojis = ['🔳', '➡️', '✅'] self.placeholder = '<<<>>>' self.current = 0 - self.steps_total = [ - (f"{self.placeholder} Query Check", 1), - (f"{self.placeholder} Info Recovery", 0), - (f"{self.placeholder} Report Generation", 0), - ] + self.steps_total: list[tuple[str, int]] = [] + self.set_steps(["Query Check", "Info Recovery", "Report Generation"]) + + def set_steps(self, steps: list[str]) -> 'RunMessage': + """ + Inizializza gli step di esecuzione con lo stato iniziale. + Args: + steps (list[str]): Lista degli step da includere nel messaggio. + Returns: + RunMessage: L'istanza aggiornata di RunMessage. + """ + self.steps_total = [(f"{self.placeholder} {step}", 0) for step in steps] + self.steps_total[0] = (self.steps_total[0][0], 1) # Primo step in esecuzione + self.current = 0 + return self def update(self) -> 'RunMessage': """ @@ -176,15 +186,15 @@ class RunMessage: self.steps_total[self.current] = (text_curr, state_curr + 1) return self - def update_step(self, text_extra: str = "") -> 'RunMessage': + def update_step_with_tool(self, tool_used: str = "") -> 'RunMessage': """ Aggiorna il messaggio per lo step corrente. Args: - text_extra (str, optional): Testo aggiuntivo da includere nello step. Defaults to "". + tool_used (str, optional): Testo aggiuntivo da includere nello step. Defaults to "". """ text_curr, state_curr = self.steps_total[self.current] - if text_extra: - text_curr = f"{text_curr.replace('╚', '╠')}\n╚═ {text_extra}" + if tool_used: + text_curr = f"{text_curr.replace('╚', '╠')}\n╚═ {tool_used}" self.steps_total[self.current] = (text_curr, state_curr) return self @@ -196,3 +206,4 @@ class RunMessage: """ steps = [msg.replace(self.placeholder, self.emojis[state]) for msg, state in self.steps_total] return self.base_message + "\n".join(steps) + diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index 0498f90..d69d7f1 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -1,8 +1,7 @@ -import asyncio from enum import Enum import logging import random -from typing import Any, Callable +from typing import Any, AsyncGenerator, Callable from agno.agent import RunEvent from agno.run.workflow import WorkflowRunEvent from agno.workflow.types import StepInput, StepOutput @@ -13,28 +12,34 @@ from app.agents.core import * logging = logging.getLogger("pipeline") - class PipelineEvent(str, Enum): QUERY_CHECK = "Query Check" - QUERY_ANALYZER = "Query Analyzer" + QUERY_CHECK_END = "Query Check End" INFO_RECOVERY = "Info Recovery" + INFO_RECOVERY_END = "Info Recovery End" REPORT_GENERATION = "Report Generation" - REPORT_TRANSLATION = "Report Translation" - RUN_FINISHED = WorkflowRunEvent.workflow_completed.value - TOOL_USED = RunEvent.tool_call_completed.value + REPORT_GENERATION_END = "Report Generation End" + TOOL_USED = RunEvent.tool_call_started.value + TOOL_USED_END = RunEvent.tool_call_completed.value + RUN_END = WorkflowRunEvent.workflow_completed.value def check_event(self, event: str, step_name: str) -> bool: - return event == self.value or (WorkflowRunEvent.step_completed == event and step_name == self.value) + if event == self.value: + return True + + index = self.value.rfind(" End") + value = self.value[:index] if index > -1 else self.value + step_state = WorkflowRunEvent.step_completed if index > -1 else WorkflowRunEvent.step_started + return step_name == value and step_state == event @classmethod - def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], None]]]: + def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], str | None]]]: return [ - (PipelineEvent.QUERY_CHECK, lambda _: logging.info(f"[{run_id}] Query Check completed.")), - (PipelineEvent.QUERY_ANALYZER, lambda _: logging.info(f"[{run_id}] Query Analyzer completed.")), - (PipelineEvent.INFO_RECOVERY, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")), - (PipelineEvent.REPORT_GENERATION, lambda _: logging.info(f"[{run_id}] Report Generation completed.")), - (PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")), - (PipelineEvent.RUN_FINISHED, lambda _: logging.info(f"[{run_id}] Run completed.")), + (PipelineEvent.QUERY_CHECK_END, lambda _: logging.info(f"[{run_id}] Query Check completed.")), + (PipelineEvent.INFO_RECOVERY_END, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")), + (PipelineEvent.REPORT_GENERATION_END, lambda _: logging.info(f"[{run_id}] Report Generation completed.")), + (PipelineEvent.TOOL_USED_END, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")), + (PipelineEvent.RUN_END, lambda _: logging.info(f"[{run_id}] Run completed.")), ] @@ -53,7 +58,7 @@ class Pipeline: """ self.inputs = inputs - def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str: + async def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> str: """ Esegue la pipeline di agenti per rispondere alla query dell'utente. Args: @@ -61,9 +66,12 @@ class Pipeline: Returns: La risposta generata dalla pipeline. """ - return asyncio.run(self.interact_async(listeners)) + response = "" + async for chunk in self.interact_stream(listeners): + response = chunk + return response - async def interact_async(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str: + async def interact_stream(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> AsyncGenerator[str, None]: """ Versione asincrona che esegue la pipeline di agenti per rispondere alla query dell'utente. Args: @@ -81,9 +89,8 @@ class Pipeline: ) workflow = self.build_workflow() - result = await self.run(workflow, query, events=events) - return result - + async for item in self.run_stream(workflow, query, events=events): + yield item def build_workflow(self) -> Workflow: """ @@ -99,7 +106,8 @@ class Pipeline: # Step 2: Crea gli steps def condition_query_ok(step_input: StepInput) -> StepOutput: val = step_input.previous_step_content - return StepOutput(stop=not val.is_crypto) if isinstance(val, QueryOutputs) else StepOutput(stop=True) + stop = (not val.is_crypto) if isinstance(val, QueryOutputs) else True + return StepOutput(stop=stop) query_check = Step(name=PipelineEvent.QUERY_CHECK, agent=query_check) info_recovery = Step(name=PipelineEvent.INFO_RECOVERY, team=team) @@ -114,33 +122,39 @@ class Pipeline: ]) @classmethod - async def run(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str: + async def run_stream(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], str | None]]]) -> AsyncGenerator[str, None]: """ - Esegue il workflow e gestisce gli eventi tramite le callback fornite. + Esegue il workflow e restituisce gli eventi di stato e il risultato finale. Args: - workflow: istanza di Workflow da eseguire - query: query dell'utente da passare al workflow - events: dizionario di callback per eventi specifici (opzionale) - Returns: - La risposta generata dal workflow. + workflow: L'istanza di Workflow da eseguire + query: Gli input della query + events: La lista di eventi e callback da gestire durante l'esecuzione. + Yields: + Aggiornamenti di stato e la risposta finale generata dal workflow. """ iterator = await workflow.arun(query, stream=True, stream_intermediate_steps=True) - content = None + async for event in iterator: step_name = getattr(event, 'step_name', '') + + # Chiama i listeners (se presenti) per ogni evento for app_event, listener in events: if app_event.check_event(event.event, step_name): - listener(event) - if event.event == WorkflowRunEvent.step_completed: + update = listener(event) + if update: yield update + + # Salva il contenuto finale quando uno step è completato + if event.event == WorkflowRunEvent.step_completed.value: content = getattr(event, 'content', '') + # Restituisce la risposta finale if content and isinstance(content, str): think_str = "" think = content.rfind(think_str) - return content[(think + len(think_str)):] if think != -1 else content - if content and isinstance(content, QueryOutputs): - return content.response - - logging.error(f"No output from workflow: {content}") - return "No output from workflow, something went wrong." + yield content[(think + len(think_str)):] if think != -1 else content + elif content and isinstance(content, QueryOutputs): + yield content.response + else: + logging.error(f"No output from workflow: {content}") + yield "Nessun output dal workflow, qualcosa è andato storto." diff --git a/src/app/api/tools/market_tool.py b/src/app/api/tools/market_tool.py index 5aa11b1..d96054c 100644 --- a/src/app/api/tools/market_tool.py +++ b/src/app/api/tools/market_tool.py @@ -1,4 +1,6 @@ from agno.tools import Toolkit + +from app.agents.action_registry import friendly_action from app.api.tools.instructions import MARKET_TOOL_INSTRUCTIONS from app.api.wrapper_handler import WrapperHandler from app.api.core.markets import MarketWrapper, Price, ProductInfo @@ -40,6 +42,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): ], ) + @friendly_action("🔍 Recupero le informazioni sul prodotto richiesto...") def get_product(self, asset_id: str) -> ProductInfo: """ Gets product information for a *single* asset from the *first available* provider. @@ -56,6 +59,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_product(asset_id)) + @friendly_action("📦 Recupero i dati su più asset...") def get_products(self, asset_ids: list[str]) -> list[ProductInfo]: """ Gets product information for a *list* of assets from the *first available* provider. @@ -72,6 +76,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_products(asset_ids)) + @friendly_action("📊 Recupero i dati storici dei prezzi...") def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]: """ Gets historical price data for a *single* asset from the *first available* provider. @@ -89,6 +94,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit)) + @friendly_action("🧩 Aggrego le informazioni da più fonti...") def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]: """ Gets product information for multiple assets from *all available providers* and *aggregates* the results. @@ -109,6 +115,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids)) return ProductInfo.aggregate(all_products) + @friendly_action("📈 Creo uno storico aggregato dei prezzi...") def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]: """ Gets historical price data for a single asset from *all available providers* and *aggregates* the results. diff --git a/src/app/api/tools/news_tool.py b/src/app/api/tools/news_tool.py index 1f834c5..0fff9c0 100644 --- a/src/app/api/tools/news_tool.py +++ b/src/app/api/tools/news_tool.py @@ -1,4 +1,6 @@ from agno.tools import Toolkit + +from app.agents.action_registry import friendly_action from app.api.tools.instructions import NEWS_TOOL_INSTRUCTIONS from app.api.wrapper_handler import WrapperHandler from app.api.core.news import NewsWrapper, Article @@ -42,6 +44,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit): ], ) + @friendly_action("📰 Cerco le notizie principali...") def get_top_headlines(self, limit: int = 100) -> list[Article]: """ Retrieves top headlines from the *first available* news provider. @@ -58,6 +61,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_top_headlines(limit)) + @friendly_action("🔎 Cerco notizie recenti sull'argomento...") def get_latest_news(self, query: str, limit: int = 100) -> list[Article]: """ Searches for the latest news on a specific topic from the *first available* provider. @@ -75,6 +79,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_latest_news(query, limit)) + @friendly_action("🗞️ Raccolgo le notizie principali da tutte le fonti...") def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]: """ Retrieves top headlines from *all available providers* and aggregates the results. @@ -94,6 +99,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit): """ return self.handler.try_call_all(lambda w: w.get_top_headlines(limit)) + @friendly_action("📚 Raccolgo notizie specifiche da tutte le fonti...") def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]: """ Searches for news on a specific topic from *all available providers* and aggregates the results. diff --git a/src/app/api/tools/plan_memory_tool.py b/src/app/api/tools/plan_memory_tool.py index 646d118..1155130 100644 --- a/src/app/api/tools/plan_memory_tool.py +++ b/src/app/api/tools/plan_memory_tool.py @@ -14,6 +14,7 @@ class PlanMemoryTool(Toolkit): def __init__(self): self.tasks: list[Task] = [] + Toolkit.__init__(self, # type: ignore[call-arg] name="Plan Memory Toolkit", instructions=PLAN_MEMORY_TOOL_INSTRUCTIONS, diff --git a/src/app/api/tools/social_tool.py b/src/app/api/tools/social_tool.py index 5d78c12..1d0e53e 100644 --- a/src/app/api/tools/social_tool.py +++ b/src/app/api/tools/social_tool.py @@ -1,4 +1,6 @@ from agno.tools import Toolkit + +from app.agents.action_registry import friendly_action from app.api.tools.instructions import SOCIAL_TOOL_INSTRUCTIONS from app.api.wrapper_handler import WrapperHandler from app.api.core.social import SocialPost, SocialWrapper @@ -41,6 +43,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit): ], ) + @friendly_action("📱 Cerco i post più popolari sui social...") def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]: """ Retrieves top cryptocurrency-related posts from the *first available* social media provider. @@ -57,6 +60,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit)) + @friendly_action("🌐 Raccolgo i post da tutte le piattaforme social...") def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]: """ Retrieves top cryptocurrency-related posts from *all available providers* and aggregates the results. diff --git a/src/app/api/tools/symbols_tool.py b/src/app/api/tools/symbols_tool.py index b49d879..049175a 100644 --- a/src/app/api/tools/symbols_tool.py +++ b/src/app/api/tools/symbols_tool.py @@ -22,6 +22,7 @@ class CryptoSymbolsTools(Toolkit): def __init__(self, cache_file: str = 'resources/cryptos.csv'): self.cache_file = cache_file self.final_table = pd.read_csv(self.cache_file) if os.path.exists(self.cache_file) else pd.DataFrame() # type: ignore + Toolkit.__init__(self, # type: ignore name="Crypto Symbols Tool", instructions=SYMBOLS_TOOL_INSTRUCTIONS, diff --git a/src/app/interface/chat.py b/src/app/interface/chat.py index 1f307e8..e52a203 100644 --- a/src/app/interface/chat.py +++ b/src/app/interface/chat.py @@ -1,7 +1,9 @@ import os import json +from typing import Any, Callable import gradio as gr -from app.agents.pipeline import Pipeline, PipelineInputs +from app.agents.action_registry import get_user_friendly_action +from app.agents.pipeline import Pipeline, PipelineEvent, PipelineInputs class ChatManager: @@ -49,13 +51,28 @@ class ChatManager: ######################################## # Funzioni Gradio ######################################## - def gradio_respond(self, message: str, history: list[tuple[str, str]]) -> str: + async def gradio_respond(self, message: str, history: list[tuple[str, str]]): + """ + Versione asincrona in streaming. + Produce (yield) aggiornamenti di stato e la risposta finale. + """ self.inputs.user_query = message pipeline = Pipeline(self.inputs) - response = pipeline.interact() + listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = [ # type: ignore + (PipelineEvent.QUERY_CHECK, lambda _: "🔍 Sto controllando la tua richiesta..."), + (PipelineEvent.INFO_RECOVERY, lambda _: "📊 Sto recuperando i dati (mercato, news, social)..."), + (PipelineEvent.REPORT_GENERATION, lambda _: "✍️ Sto scrivendo il report finale..."), + (PipelineEvent.TOOL_USED, lambda e: get_user_friendly_action(e.tool.tool_name)) + ] - self.history.append((message, response)) - return response + response = None + async for chunk in pipeline.interact_stream(listeners=listeners): + response = chunk # Salva l'ultimo chunk (che sarà la risposta finale) + yield response # Restituisce l'aggiornamento (o la risposta finale) a Gradio + + # Dopo che il generatore è completo, salva l'ultima risposta nello storico + if response: + self.history.append((message, response)) def gradio_save(self) -> str: self.save_chat("chat.json") @@ -72,7 +89,7 @@ class ChatManager: def gradio_build_interface(self) -> gr.Blocks: - with gr.Blocks() as interface: + with gr.Blocks(fill_height=True, fill_width=True) as interface: gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)") # --- Prepara le etichette di default per i dropdown diff --git a/src/app/interface/telegram.py b/src/app/interface/telegram.py index b720c43..3edc810 100644 --- a/src/app/interface/telegram.py +++ b/src/app/interface/telegram.py @@ -271,7 +271,7 @@ class TelegramApp: await bot.delete_message(chat_id=chat_id, message_id=update.message.id) def update_user(update_step: str = "") -> None: - if update_step: run_message.update_step(update_step) + if update_step: run_message.update_step_with_tool(update_step) else: run_message.update() message = run_message.get_latest() @@ -280,11 +280,11 @@ class TelegramApp: await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) pipeline = Pipeline(inputs) - report_content = await pipeline.interact_async(listeners=[ - (PipelineEvent.QUERY_CHECK, lambda _: update_user()), - (PipelineEvent.TOOL_USED, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))), - (PipelineEvent.INFO_RECOVERY, lambda _: update_user()), - (PipelineEvent.REPORT_GENERATION, lambda _: update_user()), + report_content = await pipeline.interact(listeners=[ + (PipelineEvent.QUERY_CHECK_END, lambda _: update_user()), + (PipelineEvent.TOOL_USED_END, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))), + (PipelineEvent.INFO_RECOVERY_END, lambda _: update_user()), + (PipelineEvent.REPORT_GENERATION_END, lambda _: update_user()), ]) # attach report file to the message