refactor: aggregator

- simplified MarketDataAggregator and related models to functions
This commit is contained in:
2025-10-01 18:39:07 +02:00
parent ff1c701536
commit 020b22a756
6 changed files with 207 additions and 505 deletions

View File

@@ -1,36 +1,27 @@
from typing import List, Optional
from agno.tools import Toolkit
from app.utils.wrapper_handler import WrapperHandler
from app.utils.market_aggregation import aggregate_product_info, aggregate_history_prices
from .base import BaseWrapper, ProductInfo, Price
from .coinbase import CoinBaseWrapper
from .binance import BinanceWrapper
from .cryptocompare import CryptoCompareWrapper
from .yfinance import YFinanceWrapper
from .binance_public import PublicBinanceAgent
__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "PublicBinanceAgent" ]
__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper" ]
class MarketAPIsTool(BaseWrapper, Toolkit):
"""
Classe per gestire le API di mercato disponibili.
Supporta due modalità:
1. **Modalità standard** (default): usa il primo wrapper disponibile
2. **Modalità aggregazione**: aggrega dati da tutte le fonti disponibili
L'aggregazione può essere abilitata/disabilitata dinamicamente.
Classe per comporre più MarketAPI con gestione degli errori e aggregazione dei dati.
Usa WrapperHandler per gestire più API con logica di retry e failover.
Si può scegliere se aggregare i dati da tutte le fonti o usare una singola fonte tramite delle chiamate apposta.
"""
def __init__(self, currency: str = "USD", enable_aggregation: bool = False):
def __init__(self, currency: str = "USD"):
kwargs = {"currency": currency or "USD"}
wrappers = [ BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper, YFinanceWrapper ]
self.wrappers: WrapperHandler[BaseWrapper] = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs)
# Inizializza l'aggregatore solo se richiesto (lazy initialization)
self._aggregator = None
self._aggregation_enabled = enable_aggregation
Toolkit.__init__(
self,
name="Market APIs Toolkit",
@@ -39,61 +30,45 @@ class MarketAPIsTool(BaseWrapper, Toolkit):
self.get_products,
self.get_all_products,
self.get_historical_prices,
self.get_products_aggregated,
self.get_historical_prices_aggregated,
],
)
def _get_aggregator(self):
"""Lazy initialization dell'aggregatore"""
if self._aggregator is None:
from app.utils.market_data_aggregator import MarketDataAggregator
self._aggregator = MarketDataAggregator(self.currency)
self._aggregator.enable_aggregation(self._aggregation_enabled)
return self._aggregator
def get_product(self, asset_id: str) -> Optional[ProductInfo]:
"""Ottieni informazioni su un prodotto specifico"""
if self._aggregation_enabled:
return self._get_aggregator().get_product(asset_id)
def get_product(self, asset_id: str) -> ProductInfo:
return self.wrappers.try_call(lambda w: w.get_product(asset_id))
def get_products(self, asset_ids: List[str]) -> List[ProductInfo]:
"""Ottieni informazioni su multiple prodotti"""
if self._aggregation_enabled:
return self._get_aggregator().get_products(asset_ids)
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
return self.wrappers.try_call(lambda w: w.get_products(asset_ids))
def get_all_products(self) -> List[ProductInfo]:
"""Ottieni tutti i prodotti disponibili"""
if self._aggregation_enabled:
return self._get_aggregator().get_all_products()
def get_all_products(self) -> list[ProductInfo]:
return self.wrappers.try_call(lambda w: w.get_all_products())
def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> List[Price]:
"""Ottieni dati storici dei prezzi"""
if self._aggregation_enabled:
return self._get_aggregator().get_historical_prices(asset_id, limit)
def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
return self.wrappers.try_call(lambda w: w.get_historical_prices(asset_id, limit))
# Metodi per controllare l'aggregazione
def enable_aggregation(self, enabled: bool = True):
"""Abilita/disabilita la modalità aggregazione"""
self._aggregation_enabled = enabled
if self._aggregator:
self._aggregator.enable_aggregation(enabled)
def is_aggregation_enabled(self) -> bool:
"""Verifica se l'aggregazione è abilitata"""
return self._aggregation_enabled
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Restituisce i dati aggregati per una lista di asset_id.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_ids (list[str]): Lista di asset_id da cercare.
Returns:
list[ProductInfo]: Lista di ProductInfo aggregati.
"""
all_products = self.wrappers.try_call_all(lambda w: w.get_products(asset_ids))
return aggregate_product_info(all_products)
# Metodo speciale per debugging (opzionale)
def get_aggregated_product_with_debug(self, asset_id: str) -> dict:
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
"""
Metodo speciale per ottenere dati aggregati con informazioni di debug.
Disponibile solo quando l'aggregazione è abilitata.
Restituisce i dati storici aggregati per un asset_id. Usa i dati di tutte le fonti disponibili e li aggrega.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_id (str): Asset ID da cercare.
limit (int): Numero massimo di dati storici da restituire.
Returns:
list[Price]: Lista di Price aggregati.
"""
if not self._aggregation_enabled:
raise RuntimeError("L'aggregazione deve essere abilitata per usare questo metodo")
return self._get_aggregator().get_aggregated_product_with_debug(asset_id)
all_prices = self.wrappers.try_call_all(lambda w: w.get_historical_prices(asset_id, limit))
return aggregate_history_prices(all_prices)
# TODO definire istruzioni per gli agenti di mercato
MARKET_INSTRUCTIONS = """

View File

@@ -1,4 +1,3 @@
from pydantic import BaseModel
class BaseWrapper: