Refactor team management; asynchronous execution

This commit is contained in:
2025-10-10 10:46:08 +02:00
parent 517842c834
commit c7569df06e
2 changed files with 86 additions and 66 deletions

View File

@@ -1,8 +1,5 @@
from agno.run.agent import RunOutput
from app.agents.models import AppModels
from app.agents.team import create_team_with
from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle
from app.base.markets import ProductInfo
from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorOutput, PredictorStyle
class Pipeline:
@@ -17,7 +14,6 @@ class Pipeline:
self.all_styles = list(PredictorStyle)
self.style = self.all_styles[0]
self.team = create_team_with(AppModels.OLLAMA_QWEN_1B)
self.choose_predictor(0) # Modello di default
# ======================
@@ -65,41 +61,10 @@ class Pipeline:
4. Restituisce la strategia finale
"""
# Step 1: raccolta output dai membri del Team
team_outputs = self.team.run(query) # type: ignore
# Step 2: aggregazione output strutturati
all_products: list[ProductInfo] = []
sentiments: list[str] = []
for agent_output in team_outputs.member_responses:
if isinstance(agent_output, RunOutput) and agent_output.metadata is not None:
keys = agent_output.metadata.keys()
if "products" in keys:
all_products.extend(agent_output.metadata["products"])
if "sentiment_news" in keys:
sentiments.append(agent_output.metadata["sentiment_news"])
if "sentiment_social" in keys:
sentiments.append(agent_output.metadata["sentiment_social"])
aggregated_sentiment = "\n".join(sentiments)
# Step 3: invocazione Predictor
predictor_input = PredictorInput(
data=all_products,
style=self.style,
sentiment=aggregated_sentiment
)
result = self.predictor.run(predictor_input) # type: ignore
if not isinstance(result.content, PredictorOutput):
return "❌ Errore: il modello non ha restituito un output valido."
prediction: PredictorOutput = result.content
# Step 4: restituzione strategia finale
portfolio_lines = "\n".join(
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
)
return (
f"📊 Strategia ({self.style.value}): {prediction.strategy}\n\n"
f"💼 Portafoglio consigliato:\n{portfolio_lines}"
)
from app.agents import AppTeam
from agno.agent import RunEvent
team = AppTeam(AppModels.OLLAMA_QWEN_1B) # TODO rendere dinamico
team.add_listener(RunEvent.tool_call_started, lambda e: print(f"Team tool call started: {e.agent_name}")) # type: ignore
team.add_listener(RunEvent.tool_call_completed, lambda e: print(f"Team tool call completed: {e.agent_name}")) # type: ignore
result = team.run_team(query)
return result

View File

@@ -1,33 +1,88 @@
from agno.team import Team
import asyncio
import logging
from typing import Callable, Self
from agno.run.agent import RunOutputEvent
from agno.team import Team, TeamRunEvent, TeamRunOutputEvent
from agno.tools.reasoning import ReasoningTools
from app.agents import AppModels
from app.markets import MarketAPIsTool
from app.news import NewsAPIsTool
from app.social import SocialAPIsTool
logging = logging.getLogger("AppTeam")
def create_team_with(models: AppModels, coordinator: AppModels | None = None) -> Team:
market_agent = models.get_agent(
instructions=MARKET_INSTRUCTIONS,
name="MarketAgent",
tools=[MarketAPIsTool()]
)
news_agent = models.get_agent(
instructions=NEWS_INSTRUCTIONS,
name="NewsAgent",
tools=[NewsAPIsTool()]
)
social_agent = models.get_agent(
instructions=SOCIAL_INSTRUCTIONS,
name="SocialAgent",
tools=[SocialAPIsTool()]
)
coordinator = coordinator or models
return Team(
model=coordinator.get_model(COORDINATOR_INSTRUCTIONS),
name="CryptoAnalysisTeam",
members=[market_agent, news_agent, social_agent],
)
class AllTools:
__instance: Self
def __new__(cls) -> Self:
if not hasattr(cls, "__instance"):
cls.__instance = super(AllTools, cls).__new__(cls)
return cls.__instance
# TODO scegliere un modo migliore per inizializzare gli strumenti
# TODO magari usare un config file o una classe apposta per i configs
def __init__(self):
self.market = MarketAPIsTool("EUR")
self.news = NewsAPIsTool()
self.social = SocialAPIsTool()
class AppTeam:
def __init__(self, team_models: AppModels, coordinator: AppModels | None = None):
self.team_models = team_models
self.coordinator = coordinator or team_models
self.listeners: dict[str, Callable[[RunOutputEvent | TeamRunOutputEvent], None]] = {}
def add_listener(self, event: str, listener: Callable[[RunOutputEvent | TeamRunOutputEvent], None]) -> None:
self.listeners[event] = listener
def run_team(self, query: str) -> str:
return asyncio.run(self.run_team_async(query))
async def run_team_async(self, query: str) -> str:
logging.info(f"Running team q='{query}'")
team = AppTeam.create_team_with(self.team_models, self.coordinator)
result = "No output from team"
async for run_event in team.arun(query, stream=True, stream_intermediate_steps=True): # type: ignore
if run_event.event in self.listeners:
self.listeners[run_event.event](run_event)
if run_event.event in [TeamRunEvent.run_completed]:
if isinstance(run_event.content, str):
result = run_event.content
thinking = result.rfind("</think>")
if thinking != -1: result = result[thinking:]
logging.info(f"Team finished")
return result
@staticmethod
def create_team_with(models: AppModels, coordinator: AppModels) -> Team:
tools = AllTools()
market_agent = models.get_agent(
instructions=MARKET_INSTRUCTIONS,
name="MarketAgent",
tools=[tools.market]
)
news_agent = models.get_agent(
instructions=NEWS_INSTRUCTIONS,
name="NewsAgent",
tools=[tools.news]
)
social_agent = models.get_agent(
instructions=SOCIAL_INSTRUCTIONS,
name="SocialAgent",
tools=[tools.social]
)
return Team(
model=coordinator.get_model(COORDINATOR_INSTRUCTIONS),
name="CryptoAnalysisTeam",
members=[market_agent, news_agent, social_agent],
tools=[ReasoningTools()]
)
COORDINATOR_INSTRUCTIONS = """
You are the expert coordinator of a financial analysis team specializing in cryptocurrencies.