diff --git a/src/app/agents/pipeline.py b/src/app/agents/pipeline.py index a7d1001..261c3d4 100644 --- a/src/app/agents/pipeline.py +++ b/src/app/agents/pipeline.py @@ -1,8 +1,5 @@ -from agno.run.agent import RunOutput from app.agents.models import AppModels -from app.agents.team import create_team_with -from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle -from app.base.markets import ProductInfo +from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorOutput, PredictorStyle class Pipeline: @@ -17,7 +14,6 @@ class Pipeline: self.all_styles = list(PredictorStyle) self.style = self.all_styles[0] - self.team = create_team_with(AppModels.OLLAMA_QWEN_1B) self.choose_predictor(0) # Modello di default # ====================== @@ -65,41 +61,10 @@ class Pipeline: 4. Restituisce la strategia finale """ # Step 1: raccolta output dai membri del Team - team_outputs = self.team.run(query) # type: ignore - - # Step 2: aggregazione output strutturati - all_products: list[ProductInfo] = [] - sentiments: list[str] = [] - - for agent_output in team_outputs.member_responses: - if isinstance(agent_output, RunOutput) and agent_output.metadata is not None: - keys = agent_output.metadata.keys() - if "products" in keys: - all_products.extend(agent_output.metadata["products"]) - if "sentiment_news" in keys: - sentiments.append(agent_output.metadata["sentiment_news"]) - if "sentiment_social" in keys: - sentiments.append(agent_output.metadata["sentiment_social"]) - - aggregated_sentiment = "\n".join(sentiments) - - # Step 3: invocazione Predictor - predictor_input = PredictorInput( - data=all_products, - style=self.style, - sentiment=aggregated_sentiment - ) - - result = self.predictor.run(predictor_input) # type: ignore - if not isinstance(result.content, PredictorOutput): - return "❌ Errore: il modello non ha restituito un output valido." - prediction: PredictorOutput = result.content - - # Step 4: restituzione strategia finale - portfolio_lines = "\n".join( - [f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio] - ) - return ( - f"📊 Strategia ({self.style.value}): {prediction.strategy}\n\n" - f"💼 Portafoglio consigliato:\n{portfolio_lines}" - ) + from app.agents import AppTeam + from agno.agent import RunEvent + team = AppTeam(AppModels.OLLAMA_QWEN_1B) # TODO rendere dinamico + team.add_listener(RunEvent.tool_call_started, lambda e: print(f"Team tool call started: {e.agent_name}")) # type: ignore + team.add_listener(RunEvent.tool_call_completed, lambda e: print(f"Team tool call completed: {e.agent_name}")) # type: ignore + result = team.run_team(query) + return result diff --git a/src/app/agents/team.py b/src/app/agents/team.py index 27b9cae..a9c5721 100644 --- a/src/app/agents/team.py +++ b/src/app/agents/team.py @@ -1,33 +1,88 @@ -from agno.team import Team +import asyncio +import logging +from typing import Callable, Self +from agno.run.agent import RunOutputEvent +from agno.team import Team, TeamRunEvent, TeamRunOutputEvent +from agno.tools.reasoning import ReasoningTools from app.agents import AppModels from app.markets import MarketAPIsTool from app.news import NewsAPIsTool from app.social import SocialAPIsTool +logging = logging.getLogger("AppTeam") -def create_team_with(models: AppModels, coordinator: AppModels | None = None) -> Team: - market_agent = models.get_agent( - instructions=MARKET_INSTRUCTIONS, - name="MarketAgent", - tools=[MarketAPIsTool()] - ) - news_agent = models.get_agent( - instructions=NEWS_INSTRUCTIONS, - name="NewsAgent", - tools=[NewsAPIsTool()] - ) - social_agent = models.get_agent( - instructions=SOCIAL_INSTRUCTIONS, - name="SocialAgent", - tools=[SocialAPIsTool()] - ) - coordinator = coordinator or models - return Team( - model=coordinator.get_model(COORDINATOR_INSTRUCTIONS), - name="CryptoAnalysisTeam", - members=[market_agent, news_agent, social_agent], - ) +class AllTools: + __instance: Self + + def __new__(cls) -> Self: + if not hasattr(cls, "__instance"): + cls.__instance = super(AllTools, cls).__new__(cls) + return cls.__instance + + # TODO scegliere un modo migliore per inizializzare gli strumenti + # TODO magari usare un config file o una classe apposta per i configs + def __init__(self): + self.market = MarketAPIsTool("EUR") + self.news = NewsAPIsTool() + self.social = SocialAPIsTool() + + +class AppTeam: + def __init__(self, team_models: AppModels, coordinator: AppModels | None = None): + self.team_models = team_models + self.coordinator = coordinator or team_models + self.listeners: dict[str, Callable[[RunOutputEvent | TeamRunOutputEvent], None]] = {} + + def add_listener(self, event: str, listener: Callable[[RunOutputEvent | TeamRunOutputEvent], None]) -> None: + self.listeners[event] = listener + + def run_team(self, query: str) -> str: + return asyncio.run(self.run_team_async(query)) + + async def run_team_async(self, query: str) -> str: + logging.info(f"Running team q='{query}'") + team = AppTeam.create_team_with(self.team_models, self.coordinator) + result = "No output from team" + + async for run_event in team.arun(query, stream=True, stream_intermediate_steps=True): # type: ignore + if run_event.event in self.listeners: + self.listeners[run_event.event](run_event) + if run_event.event in [TeamRunEvent.run_completed]: + if isinstance(run_event.content, str): + result = run_event.content + thinking = result.rfind("") + if thinking != -1: result = result[thinking:] + + logging.info(f"Team finished") + return result + + @staticmethod + def create_team_with(models: AppModels, coordinator: AppModels) -> Team: + tools = AllTools() + + market_agent = models.get_agent( + instructions=MARKET_INSTRUCTIONS, + name="MarketAgent", + tools=[tools.market] + ) + news_agent = models.get_agent( + instructions=NEWS_INSTRUCTIONS, + name="NewsAgent", + tools=[tools.news] + ) + social_agent = models.get_agent( + instructions=SOCIAL_INSTRUCTIONS, + name="SocialAgent", + tools=[tools.social] + ) + + return Team( + model=coordinator.get_model(COORDINATOR_INSTRUCTIONS), + name="CryptoAnalysisTeam", + members=[market_agent, news_agent, social_agent], + tools=[ReasoningTools()] + ) COORDINATOR_INSTRUCTIONS = """ You are the expert coordinator of a financial analysis team specializing in cryptocurrencies.