diff --git a/src/app/agents/core.py b/src/app/agents/core.py index 147d92c..a855a74 100644 --- a/src/app/agents/core.py +++ b/src/app/agents/core.py @@ -45,28 +45,28 @@ class PipelineInputs: """ Sceglie il modello LLM da usare per l'analizzatore di query. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.query_analyzer_model = self.configs.models.all_models[index] def choose_team_leader(self, index: int): """ Sceglie il modello LLM da usare per il Team Leader. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.team_leader_model = self.configs.models.all_models[index] def choose_team(self, index: int): """ Sceglie il modello LLM da usare per il Team. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.team_model = self.configs.models.all_models[index] def choose_report_generator(self, index: int): """ Sceglie il modello LLM da usare per il generatore di report. """ - assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list." + assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list." self.report_generation_model = self.configs.models.all_models[index] def choose_strategy(self, index: int): @@ -111,6 +111,7 @@ class PipelineInputs: name="CryptoAnalysisTeam", tools=[ReasoningTools(), PlanMemoryTool(), CryptoSymbolsTools()], members=[market_agent, news_agent, social_agent], + stream_intermediate_steps=True ) def get_agent_query_checker(self) -> Agent: diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index 0498f90..f235ad5 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -84,6 +84,25 @@ class Pipeline: result = await self.run(workflow, query, events=events) return result + async def interact_stream(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []): + """ + Versione asincrona in streaming che ESEGUE (yield) la pipeline, + restituendo gli aggiornamenti di stato e il risultato finale. + """ + run_id = random.randint(1000, 9999) # Per tracciare i log + logging.info(f"[{run_id}] Pipeline query: {self.inputs.user_query}") + + events = [*PipelineEvent.get_log_events(run_id), *listeners] + query = QueryInputs( + user_query=self.inputs.user_query, + strategy=self.inputs.strategy.description + ) + + workflow = self.build_workflow() + + # Delega al classmethod 'run_stream' per lo streaming + async for item in self.run_stream(workflow, query, events=events): + yield item def build_workflow(self) -> Workflow: """ @@ -114,33 +133,88 @@ class Pipeline: ]) @classmethod - async def run(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str: + async def run(cls, workflow: Workflow, query: QueryInputs, + events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str: """ - Esegue il workflow e gestisce gli eventi tramite le callback fornite. - Args: - workflow: istanza di Workflow da eseguire - query: query dell'utente da passare al workflow - events: dizionario di callback per eventi specifici (opzionale) - Returns: - La risposta generata dal workflow. + Esegue il workflow e gestisce gli eventi, restituendo solo il risultato finale. + Consuma il generatore 'run_stream'. + """ + final_result = "Errore durante l'esecuzione del workflow." + # Consuma il generatore e salva solo l'ultimo item + async for item in cls.run_stream(workflow, query, events): + final_result = item + + return final_result + + @classmethod + async def run_stream(cls, workflow: Workflow, query: QueryInputs, + events: list[tuple[PipelineEvent, Callable[[Any], None]]]): + """ + Esegue il workflow e restituisce gli eventi di stato e il risultato finale. """ iterator = await workflow.arun(query, stream=True, stream_intermediate_steps=True) - content = None + current_active_step = None + async for event in iterator: step_name = getattr(event, 'step_name', '') + + # 1. Chiama i listeners (per i log) for app_event, listener in events: if app_event.check_event(event.event, step_name): listener(event) - if event.event == WorkflowRunEvent.step_completed: + + # 2. Restituisce gli aggiornamenti di stato per Gradio + if event.event == WorkflowRunEvent.step_started.value: + current_active_step = step_name + if step_name == PipelineEvent.QUERY_CHECK.value: + yield "🔍 Sto controllando la tua richiesta..." + elif step_name == PipelineEvent.INFO_RECOVERY.value: + yield "📊 Sto recuperando i dati (mercato, news, social)..." + elif step_name == PipelineEvent.REPORT_GENERATION.value: + yield "✍️ Sto scrivendo il report finale..." + + # Gestisce i tool usati da agenti singoli (come Query Check) + elif event.event == WorkflowRunEvent.step_output.value: + agent_event = event.content + if hasattr(agent_event, 'event') and agent_event.event == RunEvent.tool_call_completed.value: + tool_name = getattr(agent_event.tool, 'tool_name', 'uno strumento') + yield f"🛠️ Sto usando lo strumento: {tool_name}..." + + # Gestisce i tool usati da agenti interni al team (come CustomEvent) + elif event.event == WorkflowRunEvent.custom_event.value: + custom_content = getattr(event, 'content', None) + if custom_content and hasattr(custom_content, 'event'): + agent_event = custom_content + if agent_event.event == RunEvent.tool_call_completed.value: + if step_name == PipelineEvent.INFO_RECOVERY.value: + tool_name = getattr(agent_event.tool, 'tool_name', 'uno strumento') + yield f"🛠️ (Team) Sto usando lo strumento: {tool_name}..." + + # Gestisce gli eventi di tool promossi dal Team + elif event.event == PipelineEvent.TOOL_USED.value: + # Ci assicuriamo che l'evento provenga dallo step corretto + if current_active_step == PipelineEvent.INFO_RECOVERY.value: + tool_object = getattr(event, 'tool', None) + if tool_object: + tool_name = getattr(tool_object, 'tool_name', 'uno strumento') + yield f"🛠️ (Team) Sto usando lo strumento: {tool_name}..." + else: + yield f"🛠️ (Team) Sto usando uno strumento sconosciuto..." + + # 3. Salva il contenuto finale quando uno step è completato + elif event.event == WorkflowRunEvent.step_completed.value: + current_active_step = None content = getattr(event, 'content', '') + # 4. Restituisce la risposta finale if content and isinstance(content, str): think_str = "" think = content.rfind(think_str) - return content[(think + len(think_str)):] if think != -1 else content - if content and isinstance(content, QueryOutputs): - return content.response - - logging.error(f"No output from workflow: {content}") - return "No output from workflow, something went wrong." + final_answer = content[(think + len(think_str)):] if think != -1 else content + yield final_answer + elif content and isinstance(content, QueryOutputs): + yield content.response + else: + logging.error(f"No output from workflow: {content}") + yield "Nessun output dal workflow, qualcosa è andato storto." diff --git a/src/app/interface/chat.py b/src/app/interface/chat.py index ca38ddf..10d05da 100644 --- a/src/app/interface/chat.py +++ b/src/app/interface/chat.py @@ -49,13 +49,23 @@ class ChatManager: ######################################## # Funzioni Gradio ######################################## - def gradio_respond(self, message: str, history: list[tuple[str, str]]) -> str: + async def gradio_respond(self, message: str, history: list[tuple[str, str]]): + """ + Versione asincrona in streaming. + Produce (yield) aggiornamenti di stato e la risposta finale. + """ self.inputs.user_query = message pipeline = Pipeline(self.inputs) - response = pipeline.interact() - self.history.append((message, response)) - return response + response = None + # Itera sul nuovo generatore asincrono + async for chunk in pipeline.interact_stream(): + response = chunk # Salva l'ultimo chunk (che sarà la risposta finale) + yield response # Restituisce l'aggiornamento (o la risposta finale) a Gradio + + # Dopo che il generatore è completo, salva l'ultima risposta nello storico + if response: + self.history.append((message, response)) def gradio_save(self) -> str: self.save_chat("chat.json")