From 5297bf8a9a5e3b6654e35145588920f1d05bb8a3 Mon Sep 17 00:00:00 2001 From: trojanhorse47 Date: Thu, 30 Oct 2025 10:18:34 +0100 Subject: [PATCH 1/8] Update chat interface to fill height and width in Gradio blocks --- src/app/interface/chat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/app/interface/chat.py b/src/app/interface/chat.py index 1f307e8..ca38ddf 100644 --- a/src/app/interface/chat.py +++ b/src/app/interface/chat.py @@ -72,7 +72,7 @@ class ChatManager: def gradio_build_interface(self) -> gr.Blocks: - with gr.Blocks() as interface: + with gr.Blocks(fill_height=True, fill_width=True) as interface: gr.Markdown("# πŸ€– Agente di Analisi e Consulenza Crypto (Chat)") # --- Prepara le etichette di default per i dropdown -- 2.49.1 From 0799a4ab080c85b741c6c3bd7a2762c40847ec62 Mon Sep 17 00:00:00 2001 From: trojanhorse47 Date: Thu, 30 Oct 2025 14:26:12 +0100 Subject: [PATCH 2/8] Implement asynchronous streaming for Gradio responses and enhance pipeline event handling --- src/app/agents/core.py | 9 ++-- src/app/agents/pipeline.py | 106 +++++++++++++++++++++++++++++++------ src/app/interface/chat.py | 18 +++++-- 3 files changed, 109 insertions(+), 24 deletions(-) diff --git a/src/app/agents/core.py b/src/app/agents/core.py index 147d92c..a855a74 100644 --- a/src/app/agents/core.py +++ b/src/app/agents/core.py @@ -45,28 +45,28 @@ class PipelineInputs: """ Sceglie il modello LLM da usare per l'analizzatore di query. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.query_analyzer_model = self.configs.models.all_models[index] def choose_team_leader(self, index: int): """ Sceglie il modello LLM da usare per il Team Leader. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.team_leader_model = self.configs.models.all_models[index] def choose_team(self, index: int): """ Sceglie il modello LLM da usare per il Team. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.team_model = self.configs.models.all_models[index] def choose_report_generator(self, index: int): """ Sceglie il modello LLM da usare per il generatore di report. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.report_generation_model = self.configs.models.all_models[index] def choose_strategy(self, index: int): @@ -111,6 +111,7 @@ class PipelineInputs: name="CryptoAnalysisTeam", tools=[ReasoningTools(), PlanMemoryTool(), CryptoSymbolsTools()], members=[market_agent, news_agent, social_agent], + stream_intermediate_steps=True ) def get_agent_query_checker(self) -> Agent: diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index 0498f90..f235ad5 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -84,6 +84,25 @@ class Pipeline: result = await self.run(workflow, query, events=events) return result + async def interact_stream(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []): + """ + Versione asincrona in streaming che ESEGUE (yield) la pipeline, + restituendo gli aggiornamenti di stato e il risultato finale. + """ + run_id = random.randint(1000, 9999) # Per tracciare i log + logging.info(f"[{run_id}] Pipeline query: {self.inputs.user_query}") + + events = [*PipelineEvent.get_log_events(run_id), *listeners] + query = QueryInputs( + user_query=self.inputs.user_query, + strategy=self.inputs.strategy.description + ) + + workflow = self.build_workflow() + + # Delega al classmethod 'run_stream' per lo streaming + async for item in self.run_stream(workflow, query, events=events): + yield item def build_workflow(self) -> Workflow: """ @@ -114,33 +133,88 @@ class Pipeline: ]) @classmethod - async def run(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str: + async def run(cls, workflow: Workflow, query: QueryInputs, + events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str: """ - Esegue il workflow e gestisce gli eventi tramite le callback fornite. - Args: - workflow: istanza di Workflow da eseguire - query: query dell'utente da passare al workflow - events: dizionario di callback per eventi specifici (opzionale) - Returns: - La risposta generata dal workflow. + Esegue il workflow e gestisce gli eventi, restituendo solo il risultato finale. + Consuma il generatore 'run_stream'. + """ + final_result = "Errore durante l'esecuzione del workflow." + # Consuma il generatore e salva solo l'ultimo item + async for item in cls.run_stream(workflow, query, events): + final_result = item + + return final_result + + @classmethod + async def run_stream(cls, workflow: Workflow, query: QueryInputs, + events: list[tuple[PipelineEvent, Callable[[Any], None]]]): + """ + Esegue il workflow e restituisce gli eventi di stato e il risultato finale. """ iterator = await workflow.arun(query, stream=True, stream_intermediate_steps=True) - content = None + current_active_step = None + async for event in iterator: step_name = getattr(event, 'step_name', '') + + # 1. Chiama i listeners (per i log) for app_event, listener in events: if app_event.check_event(event.event, step_name): listener(event) - if event.event == WorkflowRunEvent.step_completed: + + # 2. Restituisce gli aggiornamenti di stato per Gradio + if event.event == WorkflowRunEvent.step_started.value: + current_active_step = step_name + if step_name == PipelineEvent.QUERY_CHECK.value: + yield "πŸ” Sto controllando la tua richiesta..." + elif step_name == PipelineEvent.INFO_RECOVERY.value: + yield "πŸ“Š Sto recuperando i dati (mercato, news, social)..." + elif step_name == PipelineEvent.REPORT_GENERATION.value: + yield "✍️ Sto scrivendo il report finale..." + + # Gestisce i tool usati da agenti singoli (come Query Check) + elif event.event == WorkflowRunEvent.step_output.value: + agent_event = event.content + if hasattr(agent_event, 'event') and agent_event.event == RunEvent.tool_call_completed.value: + tool_name = getattr(agent_event.tool, 'tool_name', 'uno strumento') + yield f"πŸ› οΈ Sto usando lo strumento: {tool_name}..." + + # Gestisce i tool usati da agenti interni al team (come CustomEvent) + elif event.event == WorkflowRunEvent.custom_event.value: + custom_content = getattr(event, 'content', None) + if custom_content and hasattr(custom_content, 'event'): + agent_event = custom_content + if agent_event.event == RunEvent.tool_call_completed.value: + if step_name == PipelineEvent.INFO_RECOVERY.value: + tool_name = getattr(agent_event.tool, 'tool_name', 'uno strumento') + yield f"πŸ› οΈ (Team) Sto usando lo strumento: {tool_name}..." + + # Gestisce gli eventi di tool promossi dal Team + elif event.event == PipelineEvent.TOOL_USED.value: + # Ci assicuriamo che l'evento provenga dallo step corretto + if current_active_step == PipelineEvent.INFO_RECOVERY.value: + tool_object = getattr(event, 'tool', None) + if tool_object: + tool_name = getattr(tool_object, 'tool_name', 'uno strumento') + yield f"πŸ› οΈ (Team) Sto usando lo strumento: {tool_name}..." + else: + yield f"πŸ› οΈ (Team) Sto usando uno strumento sconosciuto..." + + # 3. Salva il contenuto finale quando uno step Γ¨ completato + elif event.event == WorkflowRunEvent.step_completed.value: + current_active_step = None content = getattr(event, 'content', '') + # 4. Restituisce la risposta finale if content and isinstance(content, str): think_str = "" think = content.rfind(think_str) - return content[(think + len(think_str)):] if think != -1 else content - if content and isinstance(content, QueryOutputs): - return content.response - - logging.error(f"No output from workflow: {content}") - return "No output from workflow, something went wrong." + final_answer = content[(think + len(think_str)):] if think != -1 else content + yield final_answer + elif content and isinstance(content, QueryOutputs): + yield content.response + else: + logging.error(f"No output from workflow: {content}") + yield "Nessun output dal workflow, qualcosa Γ¨ andato storto." diff --git a/src/app/interface/chat.py b/src/app/interface/chat.py index ca38ddf..10d05da 100644 --- a/src/app/interface/chat.py +++ b/src/app/interface/chat.py @@ -49,13 +49,23 @@ class ChatManager: ######################################## # Funzioni Gradio ######################################## - def gradio_respond(self, message: str, history: list[tuple[str, str]]) -> str: + async def gradio_respond(self, message: str, history: list[tuple[str, str]]): + """ + Versione asincrona in streaming. + Produce (yield) aggiornamenti di stato e la risposta finale. + """ self.inputs.user_query = message pipeline = Pipeline(self.inputs) - response = pipeline.interact() - self.history.append((message, response)) - return response + response = None + # Itera sul nuovo generatore asincrono + async for chunk in pipeline.interact_stream(): + response = chunk # Salva l'ultimo chunk (che sarΓ  la risposta finale) + yield response # Restituisce l'aggiornamento (o la risposta finale) a Gradio + + # Dopo che il generatore Γ¨ completo, salva l'ultima risposta nello storico + if response: + self.history.append((message, response)) def gradio_save(self) -> str: self.save_chat("chat.json") -- 2.49.1 From fb74db374e43dab2ac47084e94d10cf7db80bd1b Mon Sep 17 00:00:00 2001 From: trojanhorse47 Date: Thu, 30 Oct 2025 16:34:25 +0100 Subject: [PATCH 3/8] Add missing newlines in PlanMemoryTool and SymbolsTool constructors --- src/app/agents/plan_memory_tool.py | 1 + src/app/api/tools/symbols_tool.py | 1 + 2 files changed, 2 insertions(+) diff --git a/src/app/agents/plan_memory_tool.py b/src/app/agents/plan_memory_tool.py index 93bbda5..7b78981 100644 --- a/src/app/agents/plan_memory_tool.py +++ b/src/app/agents/plan_memory_tool.py @@ -12,6 +12,7 @@ class Task(TypedDict): class PlanMemoryTool(Toolkit): def __init__(self): self.tasks: list[Task] = [] + Toolkit.__init__(self, # type: ignore[call-arg] instructions="Provides stateful, persistent memory for the Team Leader. " \ "This is your primary to-do list and state tracker. " \ diff --git a/src/app/api/tools/symbols_tool.py b/src/app/api/tools/symbols_tool.py index 90cee7e..9f3a97f 100644 --- a/src/app/api/tools/symbols_tool.py +++ b/src/app/api/tools/symbols_tool.py @@ -21,6 +21,7 @@ class CryptoSymbolsTools(Toolkit): def __init__(self, cache_file: str = 'resources/cryptos.csv'): self.cache_file = cache_file self.final_table = pd.read_csv(self.cache_file) if os.path.exists(self.cache_file) else pd.DataFrame() # type: ignore + Toolkit.__init__(self, # type: ignore name="Crypto Symbols Tool", instructions="Tool to get cryptocurrency symbols and search them by name.", -- 2.49.1 From 46fac8bbacadbcba0d3af0e73172d3dbf2f5475e Mon Sep 17 00:00:00 2001 From: trojanhorse47 Date: Thu, 30 Oct 2025 17:19:20 +0100 Subject: [PATCH 4/8] Refactor tool event handling to provide user-friendly messages and add utility function for descriptive tool actions --- src/app/agents/pipeline.py | 53 +++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index f235ad5..2ba03bb 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -174,33 +174,16 @@ class Pipeline: elif step_name == PipelineEvent.REPORT_GENERATION.value: yield "✍️ Sto scrivendo il report finale..." - # Gestisce i tool usati da agenti singoli (come Query Check) - elif event.event == WorkflowRunEvent.step_output.value: - agent_event = event.content - if hasattr(agent_event, 'event') and agent_event.event == RunEvent.tool_call_completed.value: - tool_name = getattr(agent_event.tool, 'tool_name', 'uno strumento') - yield f"πŸ› οΈ Sto usando lo strumento: {tool_name}..." - - # Gestisce i tool usati da agenti interni al team (come CustomEvent) - elif event.event == WorkflowRunEvent.custom_event.value: - custom_content = getattr(event, 'content', None) - if custom_content and hasattr(custom_content, 'event'): - agent_event = custom_content - if agent_event.event == RunEvent.tool_call_completed.value: - if step_name == PipelineEvent.INFO_RECOVERY.value: - tool_name = getattr(agent_event.tool, 'tool_name', 'uno strumento') - yield f"πŸ› οΈ (Team) Sto usando lo strumento: {tool_name}..." - # Gestisce gli eventi di tool promossi dal Team elif event.event == PipelineEvent.TOOL_USED.value: - # Ci assicuriamo che l'evento provenga dallo step corretto if current_active_step == PipelineEvent.INFO_RECOVERY.value: tool_object = getattr(event, 'tool', None) if tool_object: - tool_name = getattr(tool_object, 'tool_name', 'uno strumento') - yield f"πŸ› οΈ (Team) Sto usando lo strumento: {tool_name}..." + tool_name = getattr(tool_object, 'tool_name', 'uno strumento sconosciuto') + user_message = _get_user_friendly_action(tool_name) + yield f"{user_message}" else: - yield f"πŸ› οΈ (Team) Sto usando uno strumento sconosciuto..." + yield f"Sto usando uno strumento sconosciuto..." # 3. Salva il contenuto finale quando uno step Γ¨ completato elif event.event == WorkflowRunEvent.step_completed.value: @@ -218,3 +201,31 @@ class Pipeline: else: logging.error(f"No output from workflow: {content}") yield "Nessun output dal workflow, qualcosa Γ¨ andato storto." + +# Funzione di utilitΓ  per messaggi user-friendly +def _get_user_friendly_action(tool_name: str) -> str: + """ + Restituisce un messaggio leggibile e descrittivo per l'utente + in base al nome dello strumento o funzione invocata. + """ + descriptions = { + # --- MarketAPIsTool --- + "get_product": "πŸ” Recupero le informazioni sul prodotto richiesto...", + "get_products": "πŸ“¦ Recupero i dati su piΓΉ asset...", + "get_historical_prices": "πŸ“Š Recupero i dati storici dei prezzi...", + "get_products_aggregated": "🧩 Aggrego le informazioni da piΓΉ fonti...", + "get_historical_prices_aggregated": "πŸ“ˆ Creo uno storico aggregato dei prezzi...", + + # --- NewsAPIsTool (Aggiunto) --- + "get_top_headlines": "πŸ“° Cerco le notizie principali...", + "get_latest_news": "πŸ”Ž Cerco notizie recenti su un argomento...", + "get_top_headlines_aggregated": "πŸ—žοΈ Raccolgo le notizie principali da tutte le fonti...", + "get_latest_news_aggregated": "πŸ“š Raccolgo notizie specifiche da tutte le fonti...", + + # --- SocialAPIsTool (Aggiunto) --- + "get_top_crypto_posts": "πŸ“± Cerco i post piΓΉ popolari sui social...", + "get_top_crypto_posts_aggregated": "🌐 Raccolgo i post da tutte le piattaforme social...", + } + + # Messaggio di fallback generico + return descriptions.get(tool_name, f"βš™οΈ Eseguo l'operazione: {tool_name}...") \ No newline at end of file -- 2.49.1 From 765fc0ac723325d1928a8acb94fa0f48f6e65fda Mon Sep 17 00:00:00 2001 From: trojanhorse47 Date: Thu, 30 Oct 2025 17:32:52 +0100 Subject: [PATCH 5/8] Decouple friendly tool descriptions via registry --- src/app/agents/action_registry.py | 9 ++++++++ src/app/agents/pipeline.py | 38 ++++++++----------------------- src/app/api/tools/market_tool.py | 10 ++++++++ src/app/api/tools/news_tool.py | 9 ++++++++ src/app/api/tools/social_tool.py | 7 ++++++ 5 files changed, 45 insertions(+), 28 deletions(-) create mode 100644 src/app/agents/action_registry.py diff --git a/src/app/agents/action_registry.py b/src/app/agents/action_registry.py new file mode 100644 index 0000000..f9a8e2b --- /dev/null +++ b/src/app/agents/action_registry.py @@ -0,0 +1,9 @@ +# Registro popolato da tutti i file Toolkit presenti all'avvio. +ACTION_DESCRIPTIONS: dict[str, str] = {} + +def register_friendly_actions(actions: dict[str, str]): + """ + Aggiunge le descrizioni di un Toolkit al registro globale. + """ + global ACTION_DESCRIPTIONS + ACTION_DESCRIPTIONS.update(actions) \ No newline at end of file diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index 2ba03bb..038ac9b 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -8,11 +8,21 @@ from agno.run.workflow import WorkflowRunEvent from agno.workflow.types import StepInput, StepOutput from agno.workflow.step import Step from agno.workflow.workflow import Workflow + +from app.agents.action_registry import ACTION_DESCRIPTIONS from app.agents.core import * logging = logging.getLogger("pipeline") +def _get_user_friendly_action(tool_name: str) -> str: + """ + Restituisce un messaggio leggibile e descrittivo per l'utente + leggendo dal registro globale. + """ + # Usa il dizionario ACTION_DESCRIPTIONS importato + return ACTION_DESCRIPTIONS.get(tool_name, f"βš™οΈ Eseguo l'operazione: {tool_name}...") + class PipelineEvent(str, Enum): QUERY_CHECK = "Query Check" @@ -201,31 +211,3 @@ class Pipeline: else: logging.error(f"No output from workflow: {content}") yield "Nessun output dal workflow, qualcosa Γ¨ andato storto." - -# Funzione di utilitΓ  per messaggi user-friendly -def _get_user_friendly_action(tool_name: str) -> str: - """ - Restituisce un messaggio leggibile e descrittivo per l'utente - in base al nome dello strumento o funzione invocata. - """ - descriptions = { - # --- MarketAPIsTool --- - "get_product": "πŸ” Recupero le informazioni sul prodotto richiesto...", - "get_products": "πŸ“¦ Recupero i dati su piΓΉ asset...", - "get_historical_prices": "πŸ“Š Recupero i dati storici dei prezzi...", - "get_products_aggregated": "🧩 Aggrego le informazioni da piΓΉ fonti...", - "get_historical_prices_aggregated": "πŸ“ˆ Creo uno storico aggregato dei prezzi...", - - # --- NewsAPIsTool (Aggiunto) --- - "get_top_headlines": "πŸ“° Cerco le notizie principali...", - "get_latest_news": "πŸ”Ž Cerco notizie recenti su un argomento...", - "get_top_headlines_aggregated": "πŸ—žοΈ Raccolgo le notizie principali da tutte le fonti...", - "get_latest_news_aggregated": "πŸ“š Raccolgo notizie specifiche da tutte le fonti...", - - # --- SocialAPIsTool (Aggiunto) --- - "get_top_crypto_posts": "πŸ“± Cerco i post piΓΉ popolari sui social...", - "get_top_crypto_posts_aggregated": "🌐 Raccolgo i post da tutte le piattaforme social...", - } - - # Messaggio di fallback generico - return descriptions.get(tool_name, f"βš™οΈ Eseguo l'operazione: {tool_name}...") \ No newline at end of file diff --git a/src/app/api/tools/market_tool.py b/src/app/api/tools/market_tool.py index 649f9d4..5ee7bf7 100644 --- a/src/app/api/tools/market_tool.py +++ b/src/app/api/tools/market_tool.py @@ -1,4 +1,6 @@ from agno.tools import Toolkit + +from app.agents.action_registry import register_friendly_actions from app.api.wrapper_handler import WrapperHandler from app.api.core.markets import MarketWrapper, Price, ProductInfo from app.api.markets import BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper, YFinanceWrapper @@ -127,3 +129,11 @@ class MarketAPIsTool(MarketWrapper, Toolkit): """ all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit)) return Price.aggregate(all_prices) + +register_friendly_actions({ + "get_product": "πŸ” Recupero le informazioni sul prodotto richiesto...", + "get_products": "πŸ“¦ Recupero i dati su piΓΉ asset...", + "get_historical_prices": "πŸ“Š Recupero i dati storici dei prezzi...", + "get_products_aggregated": "🧩 Aggrego le informazioni da piΓΉ fonti...", + "get_historical_prices_aggregated": "πŸ“ˆ Creo uno storico aggregato dei prezzi...", +}) \ No newline at end of file diff --git a/src/app/api/tools/news_tool.py b/src/app/api/tools/news_tool.py index 88b5685..9fd6bed 100644 --- a/src/app/api/tools/news_tool.py +++ b/src/app/api/tools/news_tool.py @@ -1,4 +1,6 @@ from agno.tools import Toolkit + +from app.agents.action_registry import register_friendly_actions from app.api.wrapper_handler import WrapperHandler from app.api.core.news import NewsWrapper, Article from app.api.news import NewsApiWrapper, GoogleNewsWrapper, CryptoPanicWrapper, DuckDuckGoWrapper @@ -111,3 +113,10 @@ class NewsAPIsTool(NewsWrapper, Toolkit): Exception: If all providers fail to return results. """ return self.handler.try_call_all(lambda w: w.get_latest_news(query, limit)) + +register_friendly_actions({ + "get_top_headlines": "πŸ“° Cerco le notizie principali...", + "get_latest_news": "πŸ”Ž Cerco notizie recenti su un argomento...", + "get_top_headlines_aggregated": "πŸ—žοΈ Raccolgo le notizie principali da tutte le fonti...", + "get_latest_news_aggregated": "πŸ“š Raccolgo notizie specifiche da tutte le fonti...", +}) \ No newline at end of file diff --git a/src/app/api/tools/social_tool.py b/src/app/api/tools/social_tool.py index 044c7c3..c6c2b1f 100644 --- a/src/app/api/tools/social_tool.py +++ b/src/app/api/tools/social_tool.py @@ -1,4 +1,6 @@ from agno.tools import Toolkit + +from app.agents.action_registry import register_friendly_actions from app.api.wrapper_handler import WrapperHandler from app.api.core.social import SocialPost, SocialWrapper from app.api.social import * @@ -73,3 +75,8 @@ class SocialAPIsTool(SocialWrapper, Toolkit): Exception: If all providers fail to return results. """ return self.handler.try_call_all(lambda w: w.get_top_crypto_posts(limit_per_wrapper)) + +register_friendly_actions({ + "get_top_crypto_posts": "πŸ“± Cerco i post piΓΉ popolari sui social...", + "get_top_crypto_posts_aggregated": "🌐 Raccolgo i post da tutte le piattaforme social...", +}) \ No newline at end of file -- 2.49.1 From 2803263bed3c91dfa2564601e33c851fe2ec1ff7 Mon Sep 17 00:00:00 2001 From: Berack96 Date: Thu, 30 Oct 2025 22:06:29 +0100 Subject: [PATCH 6/8] Refactor - moved action_registry fnction to action_registry file - removed unnecesasry functions in pupeline and fixed events --- src/app/agents/action_registry.py | 10 ++- src/app/agents/core.py | 27 ++++-- src/app/agents/pipeline.py | 134 +++++++++--------------------- src/app/interface/chat.py | 13 ++- src/app/interface/telegram.py | 12 +-- 5 files changed, 83 insertions(+), 113 deletions(-) diff --git a/src/app/agents/action_registry.py b/src/app/agents/action_registry.py index f9a8e2b..ab13741 100644 --- a/src/app/agents/action_registry.py +++ b/src/app/agents/action_registry.py @@ -6,4 +6,12 @@ def register_friendly_actions(actions: dict[str, str]): Aggiunge le descrizioni di un Toolkit al registro globale. """ global ACTION_DESCRIPTIONS - ACTION_DESCRIPTIONS.update(actions) \ No newline at end of file + ACTION_DESCRIPTIONS.update(actions) + +def get_user_friendly_action(tool_name: str) -> str: + """ + Restituisce un messaggio leggibile e descrittivo per l'utente + leggendo dal registro globale. + """ + # Usa il dizionario ACTION_DESCRIPTIONS importato + return ACTION_DESCRIPTIONS.get(tool_name, f"βš™οΈ Eseguo l'operazione: {tool_name}...") diff --git a/src/app/agents/core.py b/src/app/agents/core.py index a855a74..3779b7c 100644 --- a/src/app/agents/core.py +++ b/src/app/agents/core.py @@ -155,12 +155,20 @@ class RunMessage: self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n" self.emojis = ['πŸ”³', '➑️', 'βœ…'] self.placeholder = '<<<>>>' + self.set_steps(["Query Check", "Info Recovery", "Report Generation"]) + + def set_steps(self, steps: list[str]) -> 'RunMessage': + """ + Inizializza gli step di esecuzione con lo stato iniziale. + Args: + steps (list[str]): Lista degli step da includere nel messaggio. + Returns: + RunMessage: L'istanza aggiornata di RunMessage. + """ + self.steps_total = [(f"{self.placeholder} {step}", 0) for step in steps] + self.steps_total[0] = (self.steps_total[0][0], 1) # Primo step in esecuzione self.current = 0 - self.steps_total = [ - (f"{self.placeholder} Query Check", 1), - (f"{self.placeholder} Info Recovery", 0), - (f"{self.placeholder} Report Generation", 0), - ] + return self def update(self) -> 'RunMessage': """ @@ -177,15 +185,15 @@ class RunMessage: self.steps_total[self.current] = (text_curr, state_curr + 1) return self - def update_step(self, text_extra: str = "") -> 'RunMessage': + def update_step_with_tool(self, tool_used: str = "") -> 'RunMessage': """ Aggiorna il messaggio per lo step corrente. Args: - text_extra (str, optional): Testo aggiuntivo da includere nello step. Defaults to "". + tool_used (str, optional): Testo aggiuntivo da includere nello step. Defaults to "". """ text_curr, state_curr = self.steps_total[self.current] - if text_extra: - text_curr = f"{text_curr.replace('β•š', 'β• ')}\nβ•šβ• {text_extra}" + if tool_used: + text_curr = f"{text_curr.replace('β•š', 'β• ')}\nβ•šβ• {tool_used}" self.steps_total[self.current] = (text_curr, state_curr) return self @@ -197,3 +205,4 @@ class RunMessage: """ steps = [msg.replace(self.placeholder, self.emojis[state]) for msg, state in self.steps_total] return self.base_message + "\n".join(steps) + diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index 038ac9b..370d5a6 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -1,50 +1,45 @@ -import asyncio from enum import Enum import logging import random -from typing import Any, Callable +from typing import Any, AsyncGenerator, Callable from agno.agent import RunEvent from agno.run.workflow import WorkflowRunEvent from agno.workflow.types import StepInput, StepOutput from agno.workflow.step import Step from agno.workflow.workflow import Workflow - -from app.agents.action_registry import ACTION_DESCRIPTIONS from app.agents.core import * logging = logging.getLogger("pipeline") -def _get_user_friendly_action(tool_name: str) -> str: - """ - Restituisce un messaggio leggibile e descrittivo per l'utente - leggendo dal registro globale. - """ - # Usa il dizionario ACTION_DESCRIPTIONS importato - return ACTION_DESCRIPTIONS.get(tool_name, f"βš™οΈ Eseguo l'operazione: {tool_name}...") - - class PipelineEvent(str, Enum): QUERY_CHECK = "Query Check" - QUERY_ANALYZER = "Query Analyzer" + QUERY_CHECK_END = "Query Check End" INFO_RECOVERY = "Info Recovery" + INFO_RECOVERY_END = "Info Recovery End" REPORT_GENERATION = "Report Generation" - REPORT_TRANSLATION = "Report Translation" - RUN_FINISHED = WorkflowRunEvent.workflow_completed.value - TOOL_USED = RunEvent.tool_call_completed.value + REPORT_GENERATION_END = "Report Generation End" + TOOL_USED = RunEvent.tool_call_started.value + TOOL_USED_END = RunEvent.tool_call_completed.value + RUN_END = WorkflowRunEvent.workflow_completed.value def check_event(self, event: str, step_name: str) -> bool: - return event == self.value or (WorkflowRunEvent.step_completed == event and step_name == self.value) + if event == self.value: + return True + + index = self.value.rfind(" End") + value = self.value[:index] if index > -1 else self.value + step_state = WorkflowRunEvent.step_completed if index > -1 else WorkflowRunEvent.step_started + return step_name == value and step_state == event @classmethod - def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], None]]]: + def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], str | None]]]: return [ - (PipelineEvent.QUERY_CHECK, lambda _: logging.info(f"[{run_id}] Query Check completed.")), - (PipelineEvent.QUERY_ANALYZER, lambda _: logging.info(f"[{run_id}] Query Analyzer completed.")), - (PipelineEvent.INFO_RECOVERY, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")), - (PipelineEvent.REPORT_GENERATION, lambda _: logging.info(f"[{run_id}] Report Generation completed.")), - (PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")), - (PipelineEvent.RUN_FINISHED, lambda _: logging.info(f"[{run_id}] Run completed.")), + (PipelineEvent.QUERY_CHECK_END, lambda _: logging.info(f"[{run_id}] Query Check completed.")), + (PipelineEvent.INFO_RECOVERY_END, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")), + (PipelineEvent.REPORT_GENERATION_END, lambda _: logging.info(f"[{run_id}] Report Generation completed.")), + (PipelineEvent.TOOL_USED_END, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")), + (PipelineEvent.RUN_END, lambda _: logging.info(f"[{run_id}] Run completed.")), ] @@ -63,7 +58,7 @@ class Pipeline: """ self.inputs = inputs - def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str: + async def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> str: """ Esegue la pipeline di agenti per rispondere alla query dell'utente. Args: @@ -71,9 +66,12 @@ class Pipeline: Returns: La risposta generata dalla pipeline. """ - return asyncio.run(self.interact_async(listeners)) + response = "" + async for chunk in self.interact_stream(listeners): + response = chunk + return response - async def interact_async(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str: + async def interact_stream(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> AsyncGenerator[str, None]: """ Versione asincrona che esegue la pipeline di agenti per rispondere alla query dell'utente. Args: @@ -91,26 +89,6 @@ class Pipeline: ) workflow = self.build_workflow() - result = await self.run(workflow, query, events=events) - return result - - async def interact_stream(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []): - """ - Versione asincrona in streaming che ESEGUE (yield) la pipeline, - restituendo gli aggiornamenti di stato e il risultato finale. - """ - run_id = random.randint(1000, 9999) # Per tracciare i log - logging.info(f"[{run_id}] Pipeline query: {self.inputs.user_query}") - - events = [*PipelineEvent.get_log_events(run_id), *listeners] - query = QueryInputs( - user_query=self.inputs.user_query, - strategy=self.inputs.strategy.description - ) - - workflow = self.build_workflow() - - # Delega al classmethod 'run_stream' per lo streaming async for item in self.run_stream(workflow, query, events=events): yield item @@ -143,69 +121,37 @@ class Pipeline: ]) @classmethod - async def run(cls, workflow: Workflow, query: QueryInputs, - events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str: - """ - Esegue il workflow e gestisce gli eventi, restituendo solo il risultato finale. - Consuma il generatore 'run_stream'. - """ - final_result = "Errore durante l'esecuzione del workflow." - # Consuma il generatore e salva solo l'ultimo item - async for item in cls.run_stream(workflow, query, events): - final_result = item - - return final_result - - @classmethod - async def run_stream(cls, workflow: Workflow, query: QueryInputs, - events: list[tuple[PipelineEvent, Callable[[Any], None]]]): + async def run_stream(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], str | None]]]) -> AsyncGenerator[str, None]: """ Esegue il workflow e restituisce gli eventi di stato e il risultato finale. + Args: + workflow: L'istanza di Workflow da eseguire. + query: Gli input della query. + events: La lista di eventi e callback da gestire durante l'esecuzione. + Yields: + Aggiornamenti di stato e la risposta finale generata dal workflow. """ iterator = await workflow.arun(query, stream=True, stream_intermediate_steps=True) content = None - current_active_step = None async for event in iterator: step_name = getattr(event, 'step_name', '') - # 1. Chiama i listeners (per i log) + # Chiama i listeners (se presenti) per ogni evento for app_event, listener in events: if app_event.check_event(event.event, step_name): - listener(event) + update = listener(event) + if update: yield update - # 2. Restituisce gli aggiornamenti di stato per Gradio - if event.event == WorkflowRunEvent.step_started.value: - current_active_step = step_name - if step_name == PipelineEvent.QUERY_CHECK.value: - yield "πŸ” Sto controllando la tua richiesta..." - elif step_name == PipelineEvent.INFO_RECOVERY.value: - yield "πŸ“Š Sto recuperando i dati (mercato, news, social)..." - elif step_name == PipelineEvent.REPORT_GENERATION.value: - yield "✍️ Sto scrivendo il report finale..." - - # Gestisce gli eventi di tool promossi dal Team - elif event.event == PipelineEvent.TOOL_USED.value: - if current_active_step == PipelineEvent.INFO_RECOVERY.value: - tool_object = getattr(event, 'tool', None) - if tool_object: - tool_name = getattr(tool_object, 'tool_name', 'uno strumento sconosciuto') - user_message = _get_user_friendly_action(tool_name) - yield f"{user_message}" - else: - yield f"Sto usando uno strumento sconosciuto..." - - # 3. Salva il contenuto finale quando uno step Γ¨ completato - elif event.event == WorkflowRunEvent.step_completed.value: - current_active_step = None + # Salva il contenuto finale quando uno step Γ¨ completato + if event.event == WorkflowRunEvent.step_completed.value: content = getattr(event, 'content', '') - # 4. Restituisce la risposta finale + # Restituisce la risposta finale if content and isinstance(content, str): think_str = "" think = content.rfind(think_str) - final_answer = content[(think + len(think_str)):] if think != -1 else content - yield final_answer + yield content[(think + len(think_str)):] if think != -1 else content elif content and isinstance(content, QueryOutputs): yield content.response else: diff --git a/src/app/interface/chat.py b/src/app/interface/chat.py index 10d05da..221b503 100644 --- a/src/app/interface/chat.py +++ b/src/app/interface/chat.py @@ -1,7 +1,9 @@ import os import json +from typing import Any, Callable import gradio as gr -from app.agents.pipeline import Pipeline, PipelineInputs +from app.agents.action_registry import get_user_friendly_action +from app.agents.pipeline import Pipeline, PipelineEvent, PipelineInputs class ChatManager: @@ -56,10 +58,15 @@ class ChatManager: """ self.inputs.user_query = message pipeline = Pipeline(self.inputs) + listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = [ + (PipelineEvent.QUERY_CHECK, lambda _: "πŸ” Sto controllando la tua richiesta..."), + (PipelineEvent.INFO_RECOVERY, lambda _: "πŸ“Š Sto recuperando i dati (mercato, news, social)..."), + (PipelineEvent.REPORT_GENERATION, lambda _: "✍️ Sto scrivendo il report finale..."), + (PipelineEvent.TOOL_USED, lambda e: get_user_friendly_action(e.tool.tool_name)) + ] response = None - # Itera sul nuovo generatore asincrono - async for chunk in pipeline.interact_stream(): + async for chunk in pipeline.interact_stream(listeners=listeners): response = chunk # Salva l'ultimo chunk (che sarΓ  la risposta finale) yield response # Restituisce l'aggiornamento (o la risposta finale) a Gradio diff --git a/src/app/interface/telegram.py b/src/app/interface/telegram.py index b720c43..3edc810 100644 --- a/src/app/interface/telegram.py +++ b/src/app/interface/telegram.py @@ -271,7 +271,7 @@ class TelegramApp: await bot.delete_message(chat_id=chat_id, message_id=update.message.id) def update_user(update_step: str = "") -> None: - if update_step: run_message.update_step(update_step) + if update_step: run_message.update_step_with_tool(update_step) else: run_message.update() message = run_message.get_latest() @@ -280,11 +280,11 @@ class TelegramApp: await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) pipeline = Pipeline(inputs) - report_content = await pipeline.interact_async(listeners=[ - (PipelineEvent.QUERY_CHECK, lambda _: update_user()), - (PipelineEvent.TOOL_USED, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))), - (PipelineEvent.INFO_RECOVERY, lambda _: update_user()), - (PipelineEvent.REPORT_GENERATION, lambda _: update_user()), + report_content = await pipeline.interact(listeners=[ + (PipelineEvent.QUERY_CHECK_END, lambda _: update_user()), + (PipelineEvent.TOOL_USED_END, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))), + (PipelineEvent.INFO_RECOVERY_END, lambda _: update_user()), + (PipelineEvent.REPORT_GENERATION_END, lambda _: update_user()), ]) # attach report file to the message -- 2.49.1 From 0598e587084e40f265192b4850e41eb5bb95d969 Mon Sep 17 00:00:00 2001 From: trojanhorse47 Date: Fri, 31 Oct 2025 13:55:59 +0100 Subject: [PATCH 7/8] Refactor action registration to use a decorator for user-friendly descriptions --- src/app/agents/action_registry.py | 33 +++++++++++++++++++++++-------- src/app/agents/core.py | 10 ++++++---- src/app/agents/pipeline.py | 4 ++-- src/app/api/tools/market_tool.py | 15 ++++++-------- src/app/api/tools/news_tool.py | 13 +++++------- src/app/api/tools/social_tool.py | 9 +++------ src/app/interface/chat.py | 2 +- 7 files changed, 48 insertions(+), 38 deletions(-) diff --git a/src/app/agents/action_registry.py b/src/app/agents/action_registry.py index ab13741..683989e 100644 --- a/src/app/agents/action_registry.py +++ b/src/app/agents/action_registry.py @@ -1,13 +1,6 @@ -# Registro popolato da tutti i file Toolkit presenti all'avvio. +# Registro centrale popolato da tutti i file Toolkit all'avvio. ACTION_DESCRIPTIONS: dict[str, str] = {} -def register_friendly_actions(actions: dict[str, str]): - """ - Aggiunge le descrizioni di un Toolkit al registro globale. - """ - global ACTION_DESCRIPTIONS - ACTION_DESCRIPTIONS.update(actions) - def get_user_friendly_action(tool_name: str) -> str: """ Restituisce un messaggio leggibile e descrittivo per l'utente @@ -15,3 +8,27 @@ def get_user_friendly_action(tool_name: str) -> str: """ # Usa il dizionario ACTION_DESCRIPTIONS importato return ACTION_DESCRIPTIONS.get(tool_name, f"βš™οΈ Eseguo l'operazione: {tool_name}...") + +def friendly_action(description: str): + """ + Decoratore che registra automaticamente la descrizione "user-friendly" + di un metodo nel registro globale. + + Questo decoratore viene eseguito all'avvio dell'app (quando i file + vengono importati) e popola il dizionario ACTION_DESCRIPTIONS. + + Restituisce la funzione originale non modificata. + """ + + def decorator(func): + # Registra l'azione + tool_name = func.__name__ + if tool_name in ACTION_DESCRIPTIONS: + print(f"Attenzione: Azione '{tool_name}' registrata piΓΉ volte.") + + ACTION_DESCRIPTIONS[tool_name] = description + + # Restituisce la funzione originale + return func + + return decorator \ No newline at end of file diff --git a/src/app/agents/core.py b/src/app/agents/core.py index 3779b7c..b801216 100644 --- a/src/app/agents/core.py +++ b/src/app/agents/core.py @@ -146,12 +146,14 @@ class RunMessage: - In esecuzione (➑️) - Completato (βœ…) - Lo stato di esecuzione puΓ² essere assegnato solo ad uno step alla volta. + Lo stato di esecuzione puΓ² essere assegnato solo a uno step alla volta. Args: - inputs (PipelineInputs): Input della pipeline per mostrare la configurazione. - prefix (str, optional): Prefisso del messaggio. Defaults to "". - suffix (str, optional): Suffisso del messaggio. Defaults to "". + inputs (PipelineInputs): Input della pipeline per mostrare la configurazione + prefix (str, optional): Prefisso del messaggio. Defaults to "" + suffix (str, optional): Suffisso del messaggio. Defaults to "" """ + self.current = None + self.steps_total = None self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n" self.emojis = ['πŸ”³', '➑️', 'βœ…'] self.placeholder = '<<<>>>' diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index 370d5a6..b684aa6 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -125,8 +125,8 @@ class Pipeline: """ Esegue il workflow e restituisce gli eventi di stato e il risultato finale. Args: - workflow: L'istanza di Workflow da eseguire. - query: Gli input della query. + workflow: L'istanza di Workflow da eseguire + query: Gli input della query events: La lista di eventi e callback da gestire durante l'esecuzione. Yields: Aggiornamenti di stato e la risposta finale generata dal workflow. diff --git a/src/app/api/tools/market_tool.py b/src/app/api/tools/market_tool.py index 5ee7bf7..9531899 100644 --- a/src/app/api/tools/market_tool.py +++ b/src/app/api/tools/market_tool.py @@ -1,6 +1,6 @@ from agno.tools import Toolkit -from app.agents.action_registry import register_friendly_actions +from app.agents.action_registry import friendly_action from app.api.wrapper_handler import WrapperHandler from app.api.core.markets import MarketWrapper, Price, ProductInfo from app.api.markets import BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper, YFinanceWrapper @@ -40,6 +40,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): ], ) + @friendly_action("πŸ” Recupero le informazioni sul prodotto richiesto...") def get_product(self, asset_id: str) -> ProductInfo: """ Gets product information for a *single* asset from the *first available* provider. @@ -56,6 +57,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_product(asset_id)) + @friendly_action("πŸ“¦ Recupero i dati su piΓΉ asset...") def get_products(self, asset_ids: list[str]) -> list[ProductInfo]: """ Gets product information for a *list* of assets from the *first available* provider. @@ -72,6 +74,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_products(asset_ids)) + @friendly_action("πŸ“Š Recupero i dati storici dei prezzi...") def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]: """ Gets historical price data for a *single* asset from the *first available* provider. @@ -89,6 +92,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit)) + @friendly_action("🧩 Aggrego le informazioni da piΓΉ fonti...") def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]: """ Gets product information for multiple assets from *all available providers* and *aggregates* the results. @@ -109,6 +113,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids)) return ProductInfo.aggregate(all_products) + @friendly_action("πŸ“ˆ Creo uno storico aggregato dei prezzi...") def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]: """ Gets historical price data for a single asset from *all available providers* and *aggregates* the results. @@ -129,11 +134,3 @@ class MarketAPIsTool(MarketWrapper, Toolkit): """ all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit)) return Price.aggregate(all_prices) - -register_friendly_actions({ - "get_product": "πŸ” Recupero le informazioni sul prodotto richiesto...", - "get_products": "πŸ“¦ Recupero i dati su piΓΉ asset...", - "get_historical_prices": "πŸ“Š Recupero i dati storici dei prezzi...", - "get_products_aggregated": "🧩 Aggrego le informazioni da piΓΉ fonti...", - "get_historical_prices_aggregated": "πŸ“ˆ Creo uno storico aggregato dei prezzi...", -}) \ No newline at end of file diff --git a/src/app/api/tools/news_tool.py b/src/app/api/tools/news_tool.py index 9fd6bed..ec94481 100644 --- a/src/app/api/tools/news_tool.py +++ b/src/app/api/tools/news_tool.py @@ -1,6 +1,6 @@ from agno.tools import Toolkit -from app.agents.action_registry import register_friendly_actions +from app.agents.action_registry import friendly_action from app.api.wrapper_handler import WrapperHandler from app.api.core.news import NewsWrapper, Article from app.api.news import NewsApiWrapper, GoogleNewsWrapper, CryptoPanicWrapper, DuckDuckGoWrapper @@ -42,6 +42,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit): ], ) + @friendly_action("πŸ“° Cerco le notizie principali...") def get_top_headlines(self, limit: int = 100) -> list[Article]: """ Retrieves top headlines from the *first available* news provider. @@ -58,6 +59,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_top_headlines(limit)) + @friendly_action("πŸ”Ž Cerco notizie recenti sull'argomento...") def get_latest_news(self, query: str, limit: int = 100) -> list[Article]: """ Searches for the latest news on a specific topic from the *first available* provider. @@ -75,6 +77,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_latest_news(query, limit)) + @friendly_action("πŸ—žοΈ Raccolgo le notizie principali da tutte le fonti...") def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]: """ Retrieves top headlines from *all available providers* and aggregates the results. @@ -94,6 +97,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit): """ return self.handler.try_call_all(lambda w: w.get_top_headlines(limit)) + @friendly_action("πŸ“š Raccolgo notizie specifiche da tutte le fonti...") def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]: """ Searches for news on a specific topic from *all available providers* and aggregates the results. @@ -113,10 +117,3 @@ class NewsAPIsTool(NewsWrapper, Toolkit): Exception: If all providers fail to return results. """ return self.handler.try_call_all(lambda w: w.get_latest_news(query, limit)) - -register_friendly_actions({ - "get_top_headlines": "πŸ“° Cerco le notizie principali...", - "get_latest_news": "πŸ”Ž Cerco notizie recenti su un argomento...", - "get_top_headlines_aggregated": "πŸ—žοΈ Raccolgo le notizie principali da tutte le fonti...", - "get_latest_news_aggregated": "πŸ“š Raccolgo notizie specifiche da tutte le fonti...", -}) \ No newline at end of file diff --git a/src/app/api/tools/social_tool.py b/src/app/api/tools/social_tool.py index c6c2b1f..25346c9 100644 --- a/src/app/api/tools/social_tool.py +++ b/src/app/api/tools/social_tool.py @@ -1,6 +1,6 @@ from agno.tools import Toolkit -from app.agents.action_registry import register_friendly_actions +from app.agents.action_registry import friendly_action from app.api.wrapper_handler import WrapperHandler from app.api.core.social import SocialPost, SocialWrapper from app.api.social import * @@ -41,6 +41,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit): ], ) + @friendly_action("πŸ“± Cerco i post piΓΉ popolari sui social...") def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]: """ Retrieves top cryptocurrency-related posts from the *first available* social media provider. @@ -57,6 +58,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit)) + @friendly_action("🌐 Raccolgo i post da tutte le piattaforme social...") def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]: """ Retrieves top cryptocurrency-related posts from *all available providers* and aggregates the results. @@ -75,8 +77,3 @@ class SocialAPIsTool(SocialWrapper, Toolkit): Exception: If all providers fail to return results. """ return self.handler.try_call_all(lambda w: w.get_top_crypto_posts(limit_per_wrapper)) - -register_friendly_actions({ - "get_top_crypto_posts": "πŸ“± Cerco i post piΓΉ popolari sui social...", - "get_top_crypto_posts_aggregated": "🌐 Raccolgo i post da tutte le piattaforme social...", -}) \ No newline at end of file diff --git a/src/app/interface/chat.py b/src/app/interface/chat.py index 221b503..e52a203 100644 --- a/src/app/interface/chat.py +++ b/src/app/interface/chat.py @@ -58,7 +58,7 @@ class ChatManager: """ self.inputs.user_query = message pipeline = Pipeline(self.inputs) - listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = [ + listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = [ # type: ignore (PipelineEvent.QUERY_CHECK, lambda _: "πŸ” Sto controllando la tua richiesta..."), (PipelineEvent.INFO_RECOVERY, lambda _: "πŸ“Š Sto recuperando i dati (mercato, news, social)..."), (PipelineEvent.REPORT_GENERATION, lambda _: "✍️ Sto scrivendo il report finale..."), -- 2.49.1 From 0a0033c47b626db4fc47e9bd6e1ed48dd7acffa5 Mon Sep 17 00:00:00 2001 From: Berack96 Date: Fri, 31 Oct 2025 14:23:18 +0100 Subject: [PATCH 8/8] fix typing hint --- src/app/agents/action_registry.py | 9 ++++++--- src/app/agents/core.py | 5 ++--- src/app/agents/pipeline.py | 3 ++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/app/agents/action_registry.py b/src/app/agents/action_registry.py index 683989e..2d04d09 100644 --- a/src/app/agents/action_registry.py +++ b/src/app/agents/action_registry.py @@ -1,3 +1,6 @@ +from typing import Any, Callable + + # Registro centrale popolato da tutti i file Toolkit all'avvio. ACTION_DESCRIPTIONS: dict[str, str] = {} @@ -9,7 +12,7 @@ def get_user_friendly_action(tool_name: str) -> str: # Usa il dizionario ACTION_DESCRIPTIONS importato return ACTION_DESCRIPTIONS.get(tool_name, f"βš™οΈ Eseguo l'operazione: {tool_name}...") -def friendly_action(description: str): +def friendly_action(description: str) -> Callable[..., Any]: """ Decoratore che registra automaticamente la descrizione "user-friendly" di un metodo nel registro globale. @@ -20,7 +23,7 @@ def friendly_action(description: str): Restituisce la funzione originale non modificata. """ - def decorator(func): + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: # Registra l'azione tool_name = func.__name__ if tool_name in ACTION_DESCRIPTIONS: @@ -31,4 +34,4 @@ def friendly_action(description: str): # Restituisce la funzione originale return func - return decorator \ No newline at end of file + return decorator diff --git a/src/app/agents/core.py b/src/app/agents/core.py index 29c0572..28deb7d 100644 --- a/src/app/agents/core.py +++ b/src/app/agents/core.py @@ -111,7 +111,6 @@ class PipelineInputs: name="CryptoAnalysisTeam", tools=[ReasoningTools(), PlanMemoryTool(), CryptoSymbolsTools()], members=[market_agent, news_agent, social_agent], - stream_intermediate_steps=True ) def get_agent_query_checker(self) -> Agent: @@ -152,11 +151,11 @@ class RunMessage: prefix (str, optional): Prefisso del messaggio. Defaults to "" suffix (str, optional): Suffisso del messaggio. Defaults to "" """ - self.current = None - self.steps_total = None self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n" self.emojis = ['πŸ”³', '➑️', 'βœ…'] self.placeholder = '<<<>>>' + self.current = 0 + self.steps_total: list[tuple[str, int]] = [] self.set_steps(["Query Check", "Info Recovery", "Report Generation"]) def set_steps(self, steps: list[str]) -> 'RunMessage': diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index b684aa6..d69d7f1 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -106,7 +106,8 @@ class Pipeline: # Step 2: Crea gli steps def condition_query_ok(step_input: StepInput) -> StepOutput: val = step_input.previous_step_content - return StepOutput(stop=not val.is_crypto) if isinstance(val, QueryOutputs) else StepOutput(stop=True) + stop = (not val.is_crypto) if isinstance(val, QueryOutputs) else True + return StepOutput(stop=stop) query_check = Step(name=PipelineEvent.QUERY_CHECK, agent=query_check) info_recovery = Step(name=PipelineEvent.INFO_RECOVERY, team=team) -- 2.49.1