diff --git a/src/app/agents/__init__.py b/src/app/agents/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/app/agents/market_agent.py b/src/app/agents/market_agent.py deleted file mode 100644 index 12f9eab..0000000 --- a/src/app/agents/market_agent.py +++ /dev/null @@ -1,90 +0,0 @@ -from typing import Union, List, Dict, Optional, Any, Iterator, Sequence -from agno.agent import Agent -from agno.models.message import Message -from agno.run.agent import RunOutput, RunOutputEvent -from pydantic import BaseModel - -from app.toolkits.market_toolkit import MarketToolkit -from app.markets.base import ProductInfo # modello dati già definito nel tuo progetto - - -class MarketAgent(Agent): - """ - Wrapper che trasforma MarketToolkit in un Agent compatibile con Team. - Produce sia output leggibile (content) che dati strutturati (metadata). - """ - - def __init__(self, currency: str = "USD"): - super().__init__() - self.toolkit = MarketToolkit() - self.currency = currency - self.name = "MarketAgent" - - def run( - self, - input: Union[str, List, Dict, Message, BaseModel, List[Message]], - *, - stream: Optional[bool] = None, - stream_intermediate_steps: Optional[bool] = None, - user_id: Optional[str] = None, - session_id: Optional[str] = None, - session_state: Optional[Dict[str, Any]] = None, - audio: Optional[Sequence[Any]] = None, - images: Optional[Sequence[Any]] = None, - videos: Optional[Sequence[Any]] = None, - files: Optional[Sequence[Any]] = None, - retries: Optional[int] = None, - knowledge_filters: Optional[Dict[str, Any]] = None, - add_history_to_context: Optional[bool] = None, - add_dependencies_to_context: Optional[bool] = None, - add_session_state_to_context: Optional[bool] = None, - dependencies: Optional[Dict[str, Any]] = None, - metadata: Optional[Dict[str, Any]] = None, - yield_run_response: bool = False, - debug_mode: Optional[bool] = None, - **kwargs: Any, - ) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]: - # 1. Estraggo la query dal parametro "input" - if isinstance(input, str): - query = input - elif isinstance(input, dict) and "query" in input: - query = input["query"] - elif isinstance(input, Message): - query = input.content - elif isinstance(input, BaseModel): - query = str(input) - elif isinstance(input, list) and input and isinstance(input[0], Message): - query = input[0].content - else: - query = str(input) - - # 2. Individuo i simboli da analizzare - symbols = [] - for token in query.upper().split(): - if token in ("BTC", "ETH", "XRP", "LTC", "BCH"): # TODO: estendere dinamicamente - symbols.append(token) - - if not symbols: - symbols = ["BTC", "ETH"] # default - - # 3. Recupero i dati dal toolkit - results = [] - products: List[ProductInfo] = [] - - try: - products.extend(self.toolkit.get_current_prices(symbols)) # supponiamo ritorni un ProductInfo o simile - # Usa list comprehension per iterare symbols e products insieme - results.extend([ - f"{symbol}: ${product.price:.2f}" if hasattr(product, 'price') and product.price else f"{symbol}: N/A" - for symbol, product in zip(symbols, products) - ]) - except Exception as e: - results.extend(f"Errore: Impossibile recuperare i dati di mercato\n{str(e)}") - - # 4. Preparo output leggibile + metadati strutturati - output_text = "📊 Dati di mercato:\n" + "\n".join(results) - - return RunOutput( - content=output_text, - metadata={"products": products} - ) diff --git a/src/app/agents/news_agent.py b/src/app/agents/news_agent.py deleted file mode 100644 index d6de5e5..0000000 --- a/src/app/agents/news_agent.py +++ /dev/null @@ -1,35 +0,0 @@ -from agno.agent import Agent - -class NewsAgent(Agent): - """ - Gli agenti devono esporre un metodo run con questa firma. - - def run( - self, - input: Union[str, List, Dict, Message, BaseModel, List[Message]], - *, - stream: Optional[bool] = None, - stream_intermediate_steps: Optional[bool] = None, - user_id: Optional[str] = None, - session_id: Optional[str] = None, - session_state: Optional[Dict[str, Any]] = None, - audio: Optional[Sequence[Any]] = None, - images: Optional[Sequence[Any]] = None, - videos: Optional[Sequence[Any]] = None, - files: Optional[Sequence[Any]] = None, - retries: Optional[int] = None, - knowledge_filters: Optional[Dict[str, Any]] = None, - add_history_to_context: Optional[bool] = None, - add_dependencies_to_context: Optional[bool] = None, - add_session_state_to_context: Optional[bool] = None, - dependencies: Optional[Dict[str, Any]] = None, - metadata: Optional[Dict[str, Any]] = None, - yield_run_response: bool = False, - debug_mode: Optional[bool] = None, - **kwargs: Any, - ) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]: - """ - @staticmethod - def analyze(query: str) -> str: - # Mock analisi news - return "📰 Sentiment news: ottimismo sul mercato crypto grazie all'adozione istituzionale." diff --git a/src/app/agents/social_agent.py b/src/app/agents/social_agent.py deleted file mode 100644 index cefa7ef..0000000 --- a/src/app/agents/social_agent.py +++ /dev/null @@ -1,36 +0,0 @@ -from agno.agent import Agent - - -class SocialAgent(Agent): - """ - Gli agenti devono esporre un metodo run con questa firma. - - def run( - self, - input: Union[str, List, Dict, Message, BaseModel, List[Message]], - *, - stream: Optional[bool] = None, - stream_intermediate_steps: Optional[bool] = None, - user_id: Optional[str] = None, - session_id: Optional[str] = None, - session_state: Optional[Dict[str, Any]] = None, - audio: Optional[Sequence[Any]] = None, - images: Optional[Sequence[Any]] = None, - videos: Optional[Sequence[Any]] = None, - files: Optional[Sequence[Any]] = None, - retries: Optional[int] = None, - knowledge_filters: Optional[Dict[str, Any]] = None, - add_history_to_context: Optional[bool] = None, - add_dependencies_to_context: Optional[bool] = None, - add_session_state_to_context: Optional[bool] = None, - dependencies: Optional[Dict[str, Any]] = None, - metadata: Optional[Dict[str, Any]] = None, - yield_run_response: bool = False, - debug_mode: Optional[bool] = None, - **kwargs: Any, - ) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]: - """ - @staticmethod - def analyze(query: str) -> str: - # Mock analisi social - return "💬 Sentiment social: forte interesse retail su nuove altcoin emergenti." diff --git a/src/app/markets/__init__.py b/src/app/markets/__init__.py index aea3504..30a24fb 100644 --- a/src/app/markets/__init__.py +++ b/src/app/markets/__init__.py @@ -14,11 +14,11 @@ __all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWra class MarketAPIsTool(BaseWrapper, Toolkit): """ Classe per gestire le API di mercato disponibili. - + Supporta due modalità: 1. **Modalità standard** (default): usa il primo wrapper disponibile 2. **Modalità aggregazione**: aggrega dati da tutte le fonti disponibili - + L'aggregazione può essere abilitata/disabilitata dinamicamente. """ @@ -26,11 +26,11 @@ class MarketAPIsTool(BaseWrapper, Toolkit): self.currency = currency wrappers = [ BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper, YFinanceWrapper ] self.wrappers: WrapperHandler[BaseWrapper] = WrapperHandler.build_wrappers(wrappers) - + # Inizializza l'aggregatore solo se richiesto (lazy initialization) self._aggregator = None self._aggregation_enabled = enable_aggregation - + Toolkit.__init__( self, name="Market APIs Toolkit", @@ -41,7 +41,7 @@ class MarketAPIsTool(BaseWrapper, Toolkit): self.get_historical_prices, ], ) - + def _get_aggregator(self): """Lazy initialization dell'aggregatore""" if self._aggregator is None: @@ -55,36 +55,36 @@ class MarketAPIsTool(BaseWrapper, Toolkit): if self._aggregation_enabled: return self._get_aggregator().get_product(asset_id) return self.wrappers.try_call(lambda w: w.get_product(asset_id)) - + def get_products(self, asset_ids: List[str]) -> List[ProductInfo]: """Ottieni informazioni su multiple prodotti""" if self._aggregation_enabled: return self._get_aggregator().get_products(asset_ids) return self.wrappers.try_call(lambda w: w.get_products(asset_ids)) - + def get_all_products(self) -> List[ProductInfo]: """Ottieni tutti i prodotti disponibili""" if self._aggregation_enabled: return self._get_aggregator().get_all_products() return self.wrappers.try_call(lambda w: w.get_all_products()) - + def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> List[Price]: """Ottieni dati storici dei prezzi""" if self._aggregation_enabled: return self._get_aggregator().get_historical_prices(asset_id, limit) return self.wrappers.try_call(lambda w: w.get_historical_prices(asset_id, limit)) - + # Metodi per controllare l'aggregazione def enable_aggregation(self, enabled: bool = True): """Abilita/disabilita la modalità aggregazione""" self._aggregation_enabled = enabled if self._aggregator: self._aggregator.enable_aggregation(enabled) - + def is_aggregation_enabled(self) -> bool: """Verifica se l'aggregazione è abilitata""" return self._aggregation_enabled - + # Metodo speciale per debugging (opzionale) def get_aggregated_product_with_debug(self, asset_id: str) -> dict: """ @@ -94,3 +94,8 @@ class MarketAPIsTool(BaseWrapper, Toolkit): if not self._aggregation_enabled: raise RuntimeError("L'aggregazione deve essere abilitata per usare questo metodo") return self._get_aggregator().get_aggregated_product_with_debug(asset_id) + +# TODO definire istruzioni per gli agenti di mercato +MARKET_INSTRUCTIONS = """ + +""" \ No newline at end of file diff --git a/src/app/pipeline.py b/src/app/pipeline.py index 7a440de..10dddab 100644 --- a/src/app/pipeline.py +++ b/src/app/pipeline.py @@ -1,11 +1,9 @@ -from typing import List - from agno.team import Team from agno.utils.log import log_info -from app.agents.market_agent import MarketAgent -from app.agents.news_agent import NewsAgent -from app.agents.social_agent import SocialAgent +from app.news import NewsAPIsTool, NEWS_INSTRUCTIONS +from app.social import SocialAPIsTool, SOCIAL_INSTRUCTIONS +from app.markets import MarketAPIsTool, MARKET_INSTRUCTIONS from app.models import AppModels from app.predictor import PredictorStyle, PredictorInput, PredictorOutput, PREDICTOR_INSTRUCTIONS @@ -17,12 +15,38 @@ class Pipeline: def __init__(self): # Inizializza gli agenti - self.market_agent = MarketAgent() - self.news_agent = NewsAgent() - self.social_agent = SocialAgent() + market_agent = AppModels.OLLAMA_QWEN_1B.get_agent( + instructions=MARKET_INSTRUCTIONS, + name="MarketAgent", + tools=[MarketAPIsTool()] + ) + news_agent = AppModels.OLLAMA_QWEN_1B.get_agent( + instructions=NEWS_INSTRUCTIONS, + name="NewsAgent", + tools=[NewsAPIsTool()] + ) + social_agent = AppModels.OLLAMA_QWEN_1B.get_agent( + instructions=SOCIAL_INSTRUCTIONS, + name="SocialAgent", + tools=[SocialAPIsTool()] + ) # Crea il Team - self.team = Team(name="CryptoAnalysisTeam", members=[self.market_agent, self.news_agent, self.social_agent]) + prompt = """ + You are the coordinator of a team of analysts specialized in cryptocurrency market analysis. + Your role is to gather insights from various sources, including market data, news articles, and social media trends. + Based on the information provided by your team members, you will synthesize a comprehensive sentiment analysis for each cryptocurrency discussed. + Your analysis should consider the following aspects: + 1. Market Trends: Evaluate the current market trends and price movements. + 2. News Impact: Assess the impact of recent news articles on market sentiment. + 3. Social Media Buzz: Analyze social media discussions and trends related to the cryptocurrencies. + Your final output should be a well-rounded sentiment analysis that can guide investment decisions. + """ # TODO migliorare il prompt + self.team = Team( + model = AppModels.OLLAMA_QWEN_1B.get_model(prompt), + name="CryptoAnalysisTeam", + members=[market_agent, news_agent, social_agent] + ) # Modelli disponibili e Predictor self.available_models = AppModels.availables() @@ -76,8 +100,8 @@ class Pipeline: return output - def list_providers(self) -> List[str]: + def list_providers(self) -> list[str]: return [m.name for m in self.available_models] - def list_styles(self) -> List[str]: + def list_styles(self) -> list[str]: return [s.value for s in self.styles]