diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index b27e0dd..31b0e0a 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -1,9 +1,7 @@ import logging from typing import Callable -from agno.run.agent import RunOutputEvent -from agno.run.team import TeamRunOutputEvent from app.agents.prompts import * -from app.agents.team import AppTeam +from app.agents.team import AppTeam, AppEvent, TeamRunEvent, RunEvent from app.configs import AppConfig logging = logging.getLogger("pipeline") @@ -63,7 +61,7 @@ class Pipeline: # ====================== # Core interaction # ====================== - def interact(self, query: str, listeners: dict[str, Callable[[RunOutputEvent | TeamRunOutputEvent], None]] = {}) -> str: + def interact(self, query: str, listeners: dict[RunEvent | TeamRunEvent, Callable[[AppEvent], None]] = {}) -> str: """ Esegue la pipeline di agenti per rispondere alla query dell'utente. 1. Crea il Team di agenti. diff --git a/src/app/agents/team.py b/src/app/agents/team.py index 221828e..9ff506c 100644 --- a/src/app/agents/team.py +++ b/src/app/agents/team.py @@ -1,6 +1,6 @@ import asyncio from typing import Callable -from agno.run.agent import RunOutputEvent +from agno.agent import RunEvent, RunOutputEvent from agno.team import Team, TeamRunEvent, TeamRunOutputEvent from agno.tools.reasoning import ReasoningTools from app.agents.prompts import * @@ -8,15 +8,20 @@ from app.configs import AppConfig, AppModel from app.api.tools import * + +# Define a unified event type for team and agent events +AppEvent = TeamRunOutputEvent | RunOutputEvent + + class AppTeam: def __init__(self, configs: AppConfig, team_models: AppModel, team_leader: AppModel | None = None): self.configs = configs self.team_models = team_models self.team_leader = team_leader or team_models - self.listeners: dict[str, Callable[[RunOutputEvent | TeamRunOutputEvent], None]] = {} + self.listeners: dict[str, Callable[[AppEvent], None]] = {} - def add_listener(self, event: str, listener: Callable[[RunOutputEvent | TeamRunOutputEvent], None]) -> None: - self.listeners[event] = listener + def add_listener(self, event: RunEvent | TeamRunEvent, listener: Callable[[AppEvent], None]) -> None: + self.listeners[event.value] = listener def run_team(self, query: str) -> str: return asyncio.run(self.run_team_async(query))