diff --git a/src/app/agents/action_registry.py b/src/app/agents/action_registry.py index f9a8e2b..ab13741 100644 --- a/src/app/agents/action_registry.py +++ b/src/app/agents/action_registry.py @@ -6,4 +6,12 @@ def register_friendly_actions(actions: dict[str, str]): Aggiunge le descrizioni di un Toolkit al registro globale. """ global ACTION_DESCRIPTIONS - ACTION_DESCRIPTIONS.update(actions) \ No newline at end of file + ACTION_DESCRIPTIONS.update(actions) + +def get_user_friendly_action(tool_name: str) -> str: + """ + Restituisce un messaggio leggibile e descrittivo per l'utente + leggendo dal registro globale. + """ + # Usa il dizionario ACTION_DESCRIPTIONS importato + return ACTION_DESCRIPTIONS.get(tool_name, f"βš™οΈ Eseguo l'operazione: {tool_name}...") diff --git a/src/app/agents/core.py b/src/app/agents/core.py index a855a74..3779b7c 100644 --- a/src/app/agents/core.py +++ b/src/app/agents/core.py @@ -155,12 +155,20 @@ class RunMessage: self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n" self.emojis = ['πŸ”³', '➑️', 'βœ…'] self.placeholder = '<<<>>>' + self.set_steps(["Query Check", "Info Recovery", "Report Generation"]) + + def set_steps(self, steps: list[str]) -> 'RunMessage': + """ + Inizializza gli step di esecuzione con lo stato iniziale. + Args: + steps (list[str]): Lista degli step da includere nel messaggio. + Returns: + RunMessage: L'istanza aggiornata di RunMessage. + """ + self.steps_total = [(f"{self.placeholder} {step}", 0) for step in steps] + self.steps_total[0] = (self.steps_total[0][0], 1) # Primo step in esecuzione self.current = 0 - self.steps_total = [ - (f"{self.placeholder} Query Check", 1), - (f"{self.placeholder} Info Recovery", 0), - (f"{self.placeholder} Report Generation", 0), - ] + return self def update(self) -> 'RunMessage': """ @@ -177,15 +185,15 @@ class RunMessage: self.steps_total[self.current] = (text_curr, state_curr + 1) return self - def update_step(self, text_extra: str = "") -> 'RunMessage': + def update_step_with_tool(self, tool_used: str = "") -> 'RunMessage': """ Aggiorna il messaggio per lo step corrente. Args: - text_extra (str, optional): Testo aggiuntivo da includere nello step. Defaults to "". + tool_used (str, optional): Testo aggiuntivo da includere nello step. Defaults to "". """ text_curr, state_curr = self.steps_total[self.current] - if text_extra: - text_curr = f"{text_curr.replace('β•š', 'β• ')}\nβ•šβ• {text_extra}" + if tool_used: + text_curr = f"{text_curr.replace('β•š', 'β• ')}\nβ•šβ• {tool_used}" self.steps_total[self.current] = (text_curr, state_curr) return self @@ -197,3 +205,4 @@ class RunMessage: """ steps = [msg.replace(self.placeholder, self.emojis[state]) for msg, state in self.steps_total] return self.base_message + "\n".join(steps) + diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index 038ac9b..370d5a6 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -1,50 +1,45 @@ -import asyncio from enum import Enum import logging import random -from typing import Any, Callable +from typing import Any, AsyncGenerator, Callable from agno.agent import RunEvent from agno.run.workflow import WorkflowRunEvent from agno.workflow.types import StepInput, StepOutput from agno.workflow.step import Step from agno.workflow.workflow import Workflow - -from app.agents.action_registry import ACTION_DESCRIPTIONS from app.agents.core import * logging = logging.getLogger("pipeline") -def _get_user_friendly_action(tool_name: str) -> str: - """ - Restituisce un messaggio leggibile e descrittivo per l'utente - leggendo dal registro globale. - """ - # Usa il dizionario ACTION_DESCRIPTIONS importato - return ACTION_DESCRIPTIONS.get(tool_name, f"βš™οΈ Eseguo l'operazione: {tool_name}...") - - class PipelineEvent(str, Enum): QUERY_CHECK = "Query Check" - QUERY_ANALYZER = "Query Analyzer" + QUERY_CHECK_END = "Query Check End" INFO_RECOVERY = "Info Recovery" + INFO_RECOVERY_END = "Info Recovery End" REPORT_GENERATION = "Report Generation" - REPORT_TRANSLATION = "Report Translation" - RUN_FINISHED = WorkflowRunEvent.workflow_completed.value - TOOL_USED = RunEvent.tool_call_completed.value + REPORT_GENERATION_END = "Report Generation End" + TOOL_USED = RunEvent.tool_call_started.value + TOOL_USED_END = RunEvent.tool_call_completed.value + RUN_END = WorkflowRunEvent.workflow_completed.value def check_event(self, event: str, step_name: str) -> bool: - return event == self.value or (WorkflowRunEvent.step_completed == event and step_name == self.value) + if event == self.value: + return True + + index = self.value.rfind(" End") + value = self.value[:index] if index > -1 else self.value + step_state = WorkflowRunEvent.step_completed if index > -1 else WorkflowRunEvent.step_started + return step_name == value and step_state == event @classmethod - def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], None]]]: + def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], str | None]]]: return [ - (PipelineEvent.QUERY_CHECK, lambda _: logging.info(f"[{run_id}] Query Check completed.")), - (PipelineEvent.QUERY_ANALYZER, lambda _: logging.info(f"[{run_id}] Query Analyzer completed.")), - (PipelineEvent.INFO_RECOVERY, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")), - (PipelineEvent.REPORT_GENERATION, lambda _: logging.info(f"[{run_id}] Report Generation completed.")), - (PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")), - (PipelineEvent.RUN_FINISHED, lambda _: logging.info(f"[{run_id}] Run completed.")), + (PipelineEvent.QUERY_CHECK_END, lambda _: logging.info(f"[{run_id}] Query Check completed.")), + (PipelineEvent.INFO_RECOVERY_END, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")), + (PipelineEvent.REPORT_GENERATION_END, lambda _: logging.info(f"[{run_id}] Report Generation completed.")), + (PipelineEvent.TOOL_USED_END, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")), + (PipelineEvent.RUN_END, lambda _: logging.info(f"[{run_id}] Run completed.")), ] @@ -63,7 +58,7 @@ class Pipeline: """ self.inputs = inputs - def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str: + async def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> str: """ Esegue la pipeline di agenti per rispondere alla query dell'utente. Args: @@ -71,9 +66,12 @@ class Pipeline: Returns: La risposta generata dalla pipeline. """ - return asyncio.run(self.interact_async(listeners)) + response = "" + async for chunk in self.interact_stream(listeners): + response = chunk + return response - async def interact_async(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str: + async def interact_stream(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> AsyncGenerator[str, None]: """ Versione asincrona che esegue la pipeline di agenti per rispondere alla query dell'utente. Args: @@ -91,26 +89,6 @@ class Pipeline: ) workflow = self.build_workflow() - result = await self.run(workflow, query, events=events) - return result - - async def interact_stream(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []): - """ - Versione asincrona in streaming che ESEGUE (yield) la pipeline, - restituendo gli aggiornamenti di stato e il risultato finale. - """ - run_id = random.randint(1000, 9999) # Per tracciare i log - logging.info(f"[{run_id}] Pipeline query: {self.inputs.user_query}") - - events = [*PipelineEvent.get_log_events(run_id), *listeners] - query = QueryInputs( - user_query=self.inputs.user_query, - strategy=self.inputs.strategy.description - ) - - workflow = self.build_workflow() - - # Delega al classmethod 'run_stream' per lo streaming async for item in self.run_stream(workflow, query, events=events): yield item @@ -143,69 +121,37 @@ class Pipeline: ]) @classmethod - async def run(cls, workflow: Workflow, query: QueryInputs, - events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str: - """ - Esegue il workflow e gestisce gli eventi, restituendo solo il risultato finale. - Consuma il generatore 'run_stream'. - """ - final_result = "Errore durante l'esecuzione del workflow." - # Consuma il generatore e salva solo l'ultimo item - async for item in cls.run_stream(workflow, query, events): - final_result = item - - return final_result - - @classmethod - async def run_stream(cls, workflow: Workflow, query: QueryInputs, - events: list[tuple[PipelineEvent, Callable[[Any], None]]]): + async def run_stream(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], str | None]]]) -> AsyncGenerator[str, None]: """ Esegue il workflow e restituisce gli eventi di stato e il risultato finale. + Args: + workflow: L'istanza di Workflow da eseguire. + query: Gli input della query. + events: La lista di eventi e callback da gestire durante l'esecuzione. + Yields: + Aggiornamenti di stato e la risposta finale generata dal workflow. """ iterator = await workflow.arun(query, stream=True, stream_intermediate_steps=True) content = None - current_active_step = None async for event in iterator: step_name = getattr(event, 'step_name', '') - # 1. Chiama i listeners (per i log) + # Chiama i listeners (se presenti) per ogni evento for app_event, listener in events: if app_event.check_event(event.event, step_name): - listener(event) + update = listener(event) + if update: yield update - # 2. Restituisce gli aggiornamenti di stato per Gradio - if event.event == WorkflowRunEvent.step_started.value: - current_active_step = step_name - if step_name == PipelineEvent.QUERY_CHECK.value: - yield "πŸ” Sto controllando la tua richiesta..." - elif step_name == PipelineEvent.INFO_RECOVERY.value: - yield "πŸ“Š Sto recuperando i dati (mercato, news, social)..." - elif step_name == PipelineEvent.REPORT_GENERATION.value: - yield "✍️ Sto scrivendo il report finale..." - - # Gestisce gli eventi di tool promossi dal Team - elif event.event == PipelineEvent.TOOL_USED.value: - if current_active_step == PipelineEvent.INFO_RECOVERY.value: - tool_object = getattr(event, 'tool', None) - if tool_object: - tool_name = getattr(tool_object, 'tool_name', 'uno strumento sconosciuto') - user_message = _get_user_friendly_action(tool_name) - yield f"{user_message}" - else: - yield f"Sto usando uno strumento sconosciuto..." - - # 3. Salva il contenuto finale quando uno step Γ¨ completato - elif event.event == WorkflowRunEvent.step_completed.value: - current_active_step = None + # Salva il contenuto finale quando uno step Γ¨ completato + if event.event == WorkflowRunEvent.step_completed.value: content = getattr(event, 'content', '') - # 4. Restituisce la risposta finale + # Restituisce la risposta finale if content and isinstance(content, str): think_str = "" think = content.rfind(think_str) - final_answer = content[(think + len(think_str)):] if think != -1 else content - yield final_answer + yield content[(think + len(think_str)):] if think != -1 else content elif content and isinstance(content, QueryOutputs): yield content.response else: diff --git a/src/app/interface/chat.py b/src/app/interface/chat.py index 10d05da..221b503 100644 --- a/src/app/interface/chat.py +++ b/src/app/interface/chat.py @@ -1,7 +1,9 @@ import os import json +from typing import Any, Callable import gradio as gr -from app.agents.pipeline import Pipeline, PipelineInputs +from app.agents.action_registry import get_user_friendly_action +from app.agents.pipeline import Pipeline, PipelineEvent, PipelineInputs class ChatManager: @@ -56,10 +58,15 @@ class ChatManager: """ self.inputs.user_query = message pipeline = Pipeline(self.inputs) + listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = [ + (PipelineEvent.QUERY_CHECK, lambda _: "πŸ” Sto controllando la tua richiesta..."), + (PipelineEvent.INFO_RECOVERY, lambda _: "πŸ“Š Sto recuperando i dati (mercato, news, social)..."), + (PipelineEvent.REPORT_GENERATION, lambda _: "✍️ Sto scrivendo il report finale..."), + (PipelineEvent.TOOL_USED, lambda e: get_user_friendly_action(e.tool.tool_name)) + ] response = None - # Itera sul nuovo generatore asincrono - async for chunk in pipeline.interact_stream(): + async for chunk in pipeline.interact_stream(listeners=listeners): response = chunk # Salva l'ultimo chunk (che sarΓ  la risposta finale) yield response # Restituisce l'aggiornamento (o la risposta finale) a Gradio diff --git a/src/app/interface/telegram.py b/src/app/interface/telegram.py index b720c43..3edc810 100644 --- a/src/app/interface/telegram.py +++ b/src/app/interface/telegram.py @@ -271,7 +271,7 @@ class TelegramApp: await bot.delete_message(chat_id=chat_id, message_id=update.message.id) def update_user(update_step: str = "") -> None: - if update_step: run_message.update_step(update_step) + if update_step: run_message.update_step_with_tool(update_step) else: run_message.update() message = run_message.get_latest() @@ -280,11 +280,11 @@ class TelegramApp: await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) pipeline = Pipeline(inputs) - report_content = await pipeline.interact_async(listeners=[ - (PipelineEvent.QUERY_CHECK, lambda _: update_user()), - (PipelineEvent.TOOL_USED, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))), - (PipelineEvent.INFO_RECOVERY, lambda _: update_user()), - (PipelineEvent.REPORT_GENERATION, lambda _: update_user()), + report_content = await pipeline.interact(listeners=[ + (PipelineEvent.QUERY_CHECK_END, lambda _: update_user()), + (PipelineEvent.TOOL_USED_END, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))), + (PipelineEvent.INFO_RECOVERY_END, lambda _: update_user()), + (PipelineEvent.REPORT_GENERATION_END, lambda _: update_user()), ]) # attach report file to the message