listener personalizzati per eventi nella funzione di interazione della pipeline

This commit is contained in:
2025-10-13 11:11:02 +02:00
parent f9ac821e11
commit 0162071ea9

View File

@@ -1,5 +1,7 @@
import logging
from agno.run.agent import RunEvent
from typing import Callable
from agno.run.agent import RunOutputEvent
from agno.run.team import TeamRunOutputEvent
from app.agents.prompts import *
from app.agents.team import AppTeam
from app.configs import AppConfig
@@ -61,7 +63,7 @@ class Pipeline:
# ======================
# Core interaction
# ======================
def interact(self, query: str) -> str:
def interact(self, query: str, listeners: dict[str, Callable[[RunOutputEvent | TeamRunOutputEvent], None]] = {}) -> str:
"""
Esegue la pipeline di agenti per rispondere alla query dell'utente.
1. Crea il Team di agenti.
@@ -73,16 +75,16 @@ class Pipeline:
Returns:
str: La risposta generata dagli agenti.
"""
logging.info(f"Pipeline received query: {query}")
# Step 1: Creazione Team
team = AppTeam(self.configs, self.team_model, self.leader_model)
# Step 2: Aggiunti listener per eventi
team.add_listener(RunEvent.tool_call_started, lambda e: print(f"Team tool call started: {e.agent_name}")) # type: ignore
team.add_listener(RunEvent.tool_call_completed, lambda e: print(f"Team tool call completed: {e.agent_name}")) # type: ignore
for event_name, listener in listeners.items():
team.add_listener(event_name, listener)
# Step 3: Esecuzione Team
logging.info(f"Pipeline received query: {query}")
# TODO migliorare prompt (?)
query = f"The user query is: {query}\n\n They requested a {self.strategy.label} investment strategy."
result = team.run_team(query)