Refactor team management (#26)

* Refactor pipeline integration
* remove direct pipeline dependency from ChatManager and TelegramApp
* introduce PipelineInputs for better configuration management
* listener personalizzati per eventi nella funzione di interazione della pipeline
* added demos for agno
* USD in configs
* Dockerfile better cache
This commit was merged in pull request #26.
This commit is contained in:
Giacomo Bertolazzi
2025-10-15 14:00:39 +02:00
committed by GitHub
parent d85d6ed1eb
commit 38daafce9a
11 changed files with 281 additions and 133 deletions

View File

@@ -1,5 +1,4 @@
from app.agents.predictor import PredictorInput, PredictorOutput
from app.agents.team import create_team_with
from app.agents.pipeline import Pipeline
from app.agents.pipeline import Pipeline, PipelineInputs, PipelineEvent
__all__ = ["PredictorInput", "PredictorOutput", "create_team_with", "Pipeline"]
__all__ = ["PredictorInput", "PredictorOutput", "Pipeline", "PipelineInputs", "PipelineEvent"]

View File

@@ -1,32 +1,59 @@
import asyncio
from enum import Enum
import logging
from app.agents.team import create_team_with
import random
from typing import Any, Callable
from agno.agent import RunEvent
from agno.team import Team, TeamRunEvent
from agno.tools.reasoning import ReasoningTools
from agno.run.workflow import WorkflowRunEvent
from agno.workflow.step import Step
from agno.workflow.workflow import Workflow
from app.api.tools import *
from app.agents.prompts import *
from app.configs import AppConfig
logging = logging.getLogger("pipeline")
class Pipeline:
class PipelineEvent(str, Enum):
PLANNER = "Planner"
INFO_RECOVERY = "Info Recovery"
REPORT_GENERATION = "Report Generation"
REPORT_TRANSLATION = "Report Translation"
TOOL_USED = RunEvent.tool_call_completed
def check_event(self, event: str, step_name: str) -> bool:
return event == self.value or (WorkflowRunEvent.step_completed and step_name == self.value)
class PipelineInputs:
"""
Coordina gli agenti di servizio (Market, News, Social) e il Predictor finale.
Il Team è orchestrato da qwen3:latest (Ollama), mentre il Predictor è dinamico
e scelto dall'utente tramite i dropdown dell'interfaccia grafica.
Classe necessaria per passare gli input alla Pipeline.
Serve per raggruppare i parametri e semplificare l'inizializzazione.
"""
def __init__(self, configs: AppConfig):
self.configs = configs
def __init__(self, configs: AppConfig | None = None) -> None:
"""
Inputs per la Pipeline di agenti.
Setta i valori di default se non specificati.
"""
self.configs = configs if configs else AppConfig()
# Stato iniziale
self.leader_model = self.configs.get_model_by_name(self.configs.agents.team_leader_model)
self.team_model = self.configs.get_model_by_name(self.configs.agents.team_model)
self.strategy = self.configs.get_strategy_by_name(self.configs.agents.strategy)
agents = self.configs.agents
self.team_model = self.configs.get_model_by_name(agents.team_model)
self.team_leader_model = self.configs.get_model_by_name(agents.team_leader_model)
self.predictor_model = self.configs.get_model_by_name(agents.predictor_model)
self.strategy = self.configs.get_strategy_by_name(agents.strategy)
self.user_query = ""
# ======================
# Dropdown handlers
# ======================
def choose_leader(self, index: int):
def choose_team_leader(self, index: int):
"""
Sceglie il modello LLM da usare per il Team.
Sceglie il modello LLM da usare per il Team Leader.
"""
self.leader_model = self.configs.models.all_models[index]
@@ -38,47 +65,139 @@ class Pipeline:
def choose_strategy(self, index: int):
"""
Sceglie la strategia da usare per il Predictor.
Sceglie la strategia da usare per il Team.
"""
self.strategy = self.configs.strategies[index]
# ======================
# Helpers
# ======================
def list_providers(self) -> list[str]:
def list_models_names(self) -> list[str]:
"""
Restituisce la lista dei nomi dei modelli disponibili.
"""
return [model.label for model in self.configs.models.all_models]
def list_styles(self) -> list[str]:
def list_strategies_names(self) -> list[str]:
"""
Restituisce la lista degli stili di previsione disponibili.
Restituisce la lista delle strategie disponibili.
"""
return [strat.label for strat in self.configs.strategies]
class Pipeline:
"""
Coordina gli agenti di servizio (Market, News, Social) e il Predictor finale.
Il Team è orchestrato da qwen3:latest (Ollama), mentre il Predictor è dinamico
e scelto dall'utente tramite i dropdown dell'interfaccia grafica.
"""
def __init__(self, inputs: PipelineInputs):
self.inputs = inputs
# ======================
# Core interaction
# ======================
def interact(self, query: str) -> str:
def interact(self, listeners: dict[RunEvent | TeamRunEvent, Callable[[PipelineEvent], None]] = {}) -> str:
"""
1. Raccoglie output dai membri del Team
2. Aggrega output strutturati
3. Invoca Predictor
4. Restituisce la strategia finale
Esegue la pipeline di agenti per rispondere alla query dell'utente.
Args:
listeners: dizionario di callback per eventi specifici (opzionale)
Returns:
La risposta generata dalla pipeline.
"""
# Step 1: Creazione Team
team = create_team_with(self.configs, self.team_model, self.leader_model)
return asyncio.run(self.interact_async(listeners))
# Step 2: raccolta output dai membri del Team
logging.info(f"Pipeline received query: {query}")
# TODO migliorare prompt (?)
query = f"The user query is: {query}\n\n They requested a {self.strategy.label} investment strategy."
team_outputs = team.run(query) # type: ignore
async def interact_async(self, listeners: dict[RunEvent | TeamRunEvent, Callable[[PipelineEvent], None]] = {}) -> str:
"""
Versione asincrona che esegue la pipeline di agenti per rispondere alla query dell'utente.
Args:
listeners: dizionario di callback per eventi specifici (opzionale)
Returns:
La risposta generata dalla pipeline.
"""
run_id = random.randint(1000, 9999) # Per tracciare i log
logging.info(f"[{run_id}] Pipeline query: {self.inputs.user_query}")
# Step 3: recupero ouput
if not isinstance(team_outputs.content, str):
logging.error(f"Team output is not a string: {team_outputs.content}")
raise ValueError("Team output is not a string")
logging.info(f"Team finished")
return team_outputs.content
# Step 1: Crea gli agenti e il team
market_tool, news_tool, social_tool = self.get_tools()
market_agent = self.inputs.team_model.get_agent(instructions=MARKET_INSTRUCTIONS, name="MarketAgent", tools=[market_tool])
news_agent = self.inputs.team_model.get_agent(instructions=NEWS_INSTRUCTIONS, name="NewsAgent", tools=[news_tool])
social_agent = self.inputs.team_model.get_agent(instructions=SOCIAL_INSTRUCTIONS, name="SocialAgent", tools=[social_tool])
team = Team(
model=self.inputs.team_leader_model.get_model(COORDINATOR_INSTRUCTIONS),
name="CryptoAnalysisTeam",
tools=[ReasoningTools()],
members=[market_agent, news_agent, social_agent],
)
# Step 3: Crea il workflow
#query_planner = Step(name=PipelineEvent.PLANNER, agent=Agent())
info_recovery = Step(name=PipelineEvent.INFO_RECOVERY, team=team)
#report_generation = Step(name=PipelineEvent.REPORT_GENERATION, agent=Agent())
#report_translate = Step(name=AppEvent.REPORT_TRANSLATION, agent=Agent())
workflow = Workflow(
name="App Workflow",
steps=[
#query_planner,
info_recovery,
#report_generation,
#report_translate
]
)
# Step 4: Fai partire il workflow e prendi l'output
query = f"The user query is: {self.inputs.user_query}\n\n They requested a {self.inputs.strategy.label} investment strategy."
result = await self.run(workflow, query, events={})
logging.info(f"[{run_id}] Run finished")
return result
# ======================
# Helpers
# =====================
def get_tools(self) -> tuple[MarketAPIsTool, NewsAPIsTool, SocialAPIsTool]:
"""
Restituisce la lista di tools disponibili per gli agenti.
"""
api = self.inputs.configs.api
market_tool = MarketAPIsTool(currency=api.currency)
market_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds)
news_tool = NewsAPIsTool()
news_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds)
social_tool = SocialAPIsTool()
social_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds)
return (market_tool, news_tool, social_tool)
@classmethod
async def run(cls, workflow: Workflow, query: str, events: dict[PipelineEvent, Callable[[Any], None]]) -> str:
"""
Esegue il workflow e gestisce gli eventi tramite le callback fornite.
Args:
workflow: istanza di Workflow da eseguire
query: query dell'utente da passare al workflow
events: dizionario di callback per eventi specifici (opzionale)
Returns:
La risposta generata dal workflow.
"""
iterator = await workflow.arun(query, stream=True, stream_intermediate_steps=True)
content = None
async for event in iterator:
step_name = getattr(event, 'step_name', '')
for app_event, listener in events.items():
if app_event.check_event(event.event, step_name):
listener(event)
if event.event == WorkflowRunEvent.workflow_completed:
content = getattr(event, 'content', '')
if isinstance(content, str):
think_str = "</think>"
think = content.rfind(think_str)
content = content[(think + len(think_str)):] if think != -1 else content
return content if content else "No output from workflow, something went wrong."

View File

@@ -1,25 +0,0 @@
from agno.team import Team
from app.api.tools import *
from app.agents.prompts import *
from app.configs import AppConfig, AppModel
def create_team_with(configs: AppConfig, model: AppModel, coordinator: AppModel | None = None) -> Team:
market_tool = MarketAPIsTool(currency=configs.api.currency)
market_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
news_tool = NewsAPIsTool()
news_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
social_tool = SocialAPIsTool()
social_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
market_agent = model.get_agent(instructions=MARKET_INSTRUCTIONS, name="MarketAgent", tools=[market_tool])
news_agent = model.get_agent(instructions=NEWS_INSTRUCTIONS, name="NewsAgent", tools=[news_tool])
social_agent = model.get_agent(instructions=SOCIAL_INSTRUCTIONS, name="SocialAgent", tools=[social_tool])
coordinator = coordinator or model
return Team(
model=coordinator.get_model(COORDINATOR_INSTRUCTIONS),
name="CryptoAnalysisTeam",
members=[market_agent, news_agent, social_agent],
)