refactor: remove unused agent files and clean up market API instructions

This commit is contained in:
2025-10-01 17:22:45 +02:00
parent daedf6cbba
commit ca67eca4c4
6 changed files with 51 additions and 183 deletions

View File

@@ -1,90 +0,0 @@
from typing import Union, List, Dict, Optional, Any, Iterator, Sequence
from agno.agent import Agent
from agno.models.message import Message
from agno.run.agent import RunOutput, RunOutputEvent
from pydantic import BaseModel
from app.toolkits.market_toolkit import MarketToolkit
from app.markets.base import ProductInfo # modello dati già definito nel tuo progetto
class MarketAgent(Agent):
"""
Wrapper che trasforma MarketToolkit in un Agent compatibile con Team.
Produce sia output leggibile (content) che dati strutturati (metadata).
"""
def __init__(self, currency: str = "USD"):
super().__init__()
self.toolkit = MarketToolkit()
self.currency = currency
self.name = "MarketAgent"
def run(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[Sequence[Any]] = None,
images: Optional[Sequence[Any]] = None,
videos: Optional[Sequence[Any]] = None,
files: Optional[Sequence[Any]] = None,
retries: Optional[int] = None,
knowledge_filters: Optional[Dict[str, Any]] = None,
add_history_to_context: Optional[bool] = None,
add_dependencies_to_context: Optional[bool] = None,
add_session_state_to_context: Optional[bool] = None,
dependencies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
yield_run_response: bool = False,
debug_mode: Optional[bool] = None,
**kwargs: Any,
) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]:
# 1. Estraggo la query dal parametro "input"
if isinstance(input, str):
query = input
elif isinstance(input, dict) and "query" in input:
query = input["query"]
elif isinstance(input, Message):
query = input.content
elif isinstance(input, BaseModel):
query = str(input)
elif isinstance(input, list) and input and isinstance(input[0], Message):
query = input[0].content
else:
query = str(input)
# 2. Individuo i simboli da analizzare
symbols = []
for token in query.upper().split():
if token in ("BTC", "ETH", "XRP", "LTC", "BCH"): # TODO: estendere dinamicamente
symbols.append(token)
if not symbols:
symbols = ["BTC", "ETH"] # default
# 3. Recupero i dati dal toolkit
results = []
products: List[ProductInfo] = []
try:
products.extend(self.toolkit.get_current_prices(symbols)) # supponiamo ritorni un ProductInfo o simile
# Usa list comprehension per iterare symbols e products insieme
results.extend([
f"{symbol}: ${product.price:.2f}" if hasattr(product, 'price') and product.price else f"{symbol}: N/A"
for symbol, product in zip(symbols, products)
])
except Exception as e:
results.extend(f"Errore: Impossibile recuperare i dati di mercato\n{str(e)}")
# 4. Preparo output leggibile + metadati strutturati
output_text = "📊 Dati di mercato:\n" + "\n".join(results)
return RunOutput(
content=output_text,
metadata={"products": products}
)

View File

@@ -1,35 +0,0 @@
from agno.agent import Agent
class NewsAgent(Agent):
"""
Gli agenti devono esporre un metodo run con questa firma.
def run(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[Sequence[Any]] = None,
images: Optional[Sequence[Any]] = None,
videos: Optional[Sequence[Any]] = None,
files: Optional[Sequence[Any]] = None,
retries: Optional[int] = None,
knowledge_filters: Optional[Dict[str, Any]] = None,
add_history_to_context: Optional[bool] = None,
add_dependencies_to_context: Optional[bool] = None,
add_session_state_to_context: Optional[bool] = None,
dependencies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
yield_run_response: bool = False,
debug_mode: Optional[bool] = None,
**kwargs: Any,
) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]:
"""
@staticmethod
def analyze(query: str) -> str:
# Mock analisi news
return "📰 Sentiment news: ottimismo sul mercato crypto grazie all'adozione istituzionale."

View File

@@ -1,36 +0,0 @@
from agno.agent import Agent
class SocialAgent(Agent):
"""
Gli agenti devono esporre un metodo run con questa firma.
def run(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[Sequence[Any]] = None,
images: Optional[Sequence[Any]] = None,
videos: Optional[Sequence[Any]] = None,
files: Optional[Sequence[Any]] = None,
retries: Optional[int] = None,
knowledge_filters: Optional[Dict[str, Any]] = None,
add_history_to_context: Optional[bool] = None,
add_dependencies_to_context: Optional[bool] = None,
add_session_state_to_context: Optional[bool] = None,
dependencies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
yield_run_response: bool = False,
debug_mode: Optional[bool] = None,
**kwargs: Any,
) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]:
"""
@staticmethod
def analyze(query: str) -> str:
# Mock analisi social
return "💬 Sentiment social: forte interesse retail su nuove altcoin emergenti."

View File

@@ -94,3 +94,8 @@ class MarketAPIsTool(BaseWrapper, Toolkit):
if not self._aggregation_enabled:
raise RuntimeError("L'aggregazione deve essere abilitata per usare questo metodo")
return self._get_aggregator().get_aggregated_product_with_debug(asset_id)
# TODO definire istruzioni per gli agenti di mercato
MARKET_INSTRUCTIONS = """
"""

View File

@@ -1,11 +1,9 @@
from typing import List
from agno.team import Team
from agno.utils.log import log_info
from app.agents.market_agent import MarketAgent
from app.agents.news_agent import NewsAgent
from app.agents.social_agent import SocialAgent
from app.news import NewsAPIsTool, NEWS_INSTRUCTIONS
from app.social import SocialAPIsTool, SOCIAL_INSTRUCTIONS
from app.markets import MarketAPIsTool, MARKET_INSTRUCTIONS
from app.models import AppModels
from app.predictor import PredictorStyle, PredictorInput, PredictorOutput, PREDICTOR_INSTRUCTIONS
@@ -17,12 +15,38 @@ class Pipeline:
def __init__(self):
# Inizializza gli agenti
self.market_agent = MarketAgent()
self.news_agent = NewsAgent()
self.social_agent = SocialAgent()
market_agent = AppModels.OLLAMA_QWEN_1B.get_agent(
instructions=MARKET_INSTRUCTIONS,
name="MarketAgent",
tools=[MarketAPIsTool()]
)
news_agent = AppModels.OLLAMA_QWEN_1B.get_agent(
instructions=NEWS_INSTRUCTIONS,
name="NewsAgent",
tools=[NewsAPIsTool()]
)
social_agent = AppModels.OLLAMA_QWEN_1B.get_agent(
instructions=SOCIAL_INSTRUCTIONS,
name="SocialAgent",
tools=[SocialAPIsTool()]
)
# Crea il Team
self.team = Team(name="CryptoAnalysisTeam", members=[self.market_agent, self.news_agent, self.social_agent])
prompt = """
You are the coordinator of a team of analysts specialized in cryptocurrency market analysis.
Your role is to gather insights from various sources, including market data, news articles, and social media trends.
Based on the information provided by your team members, you will synthesize a comprehensive sentiment analysis for each cryptocurrency discussed.
Your analysis should consider the following aspects:
1. Market Trends: Evaluate the current market trends and price movements.
2. News Impact: Assess the impact of recent news articles on market sentiment.
3. Social Media Buzz: Analyze social media discussions and trends related to the cryptocurrencies.
Your final output should be a well-rounded sentiment analysis that can guide investment decisions.
""" # TODO migliorare il prompt
self.team = Team(
model = AppModels.OLLAMA_QWEN_1B.get_model(prompt),
name="CryptoAnalysisTeam",
members=[market_agent, news_agent, social_agent]
)
# Modelli disponibili e Predictor
self.available_models = AppModels.availables()
@@ -76,8 +100,8 @@ class Pipeline:
return output
def list_providers(self) -> List[str]:
def list_providers(self) -> list[str]:
return [m.name for m in self.available_models]
def list_styles(self) -> List[str]:
def list_styles(self) -> list[str]:
return [s.value for s in self.styles]