diff --git a/demos/market_providers_api_demo.py b/demos/market_providers_api_demo.py index 16817d5..2c3a8f3 100644 --- a/demos/market_providers_api_demo.py +++ b/demos/market_providers_api_demo.py @@ -26,7 +26,7 @@ project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root / "src")) from dotenv import load_dotenv -from src.app.markets import ( +from app.markets import ( CoinBaseWrapper, CryptoCompareWrapper, BinanceWrapper, diff --git a/pyproject.toml b/pyproject.toml index 74c0029..e091aba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,6 @@ dependencies = [ # API di social media "praw", # Reddit ] - [tool.pytest.ini_options] pythonpath = ["src"] testpaths = ["tests"] diff --git a/src/app/markets/__init__.py b/src/app/markets/__init__.py index 57f04b7..e5853d5 100644 --- a/src/app/markets/__init__.py +++ b/src/app/markets/__init__.py @@ -1,31 +1,96 @@ -from .base import BaseWrapper +from .base import BaseWrapper, ProductInfo, Price from .coinbase import CoinBaseWrapper from .binance import BinanceWrapper from .cryptocompare import CryptoCompareWrapper +from .binance_public import PublicBinanceAgent from app.utils.wrapper_handler import WrapperHandler - -__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper" ] +from typing import List, Optional +from agno.tools import Toolkit -# TODO se si vuole usare un aggregatore di dati di mercato, si può aggiungere qui facendo una classe extra (simile a questa) che per ogni chiamata chiama tutti i wrapper e aggrega i risultati -class MarketAPIs(BaseWrapper): +__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "PublicBinanceAgent" ] + + +class MarketAPIsTool(BaseWrapper, Toolkit): """ Classe per gestire le API di mercato disponibili. - Permette di ottenere un'istanza della prima API disponibile in base alla priorità specificata. - Supporta operazioni come ottenere informazioni su singoli prodotti, liste di prodotti e dati storici. - Usa un WrapperHandler per gestire più wrapper e tentare chiamate in modo resiliente. + + Supporta due modalità: + 1. **Modalità standard** (default): usa il primo wrapper disponibile + 2. **Modalità aggregazione**: aggrega dati da tutte le fonti disponibili + + L'aggregazione può essere abilitata/disabilitata dinamicamente. """ - def __init__(self, currency: str = "USD"): + def __init__(self, currency: str = "USD", enable_aggregation: bool = False): self.currency = currency - wrappers = [ CoinBaseWrapper, CryptoCompareWrapper ] + wrappers = [ BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper ] self.wrappers: WrapperHandler[BaseWrapper] = WrapperHandler.build_wrappers(wrappers) + + # Inizializza l'aggregatore solo se richiesto (lazy initialization) + self._aggregator = None + self._aggregation_enabled = enable_aggregation + + Toolkit.__init__( + self, + name="Market APIs Toolkit", + tools=[ + self.get_product, + self.get_products, + self.get_all_products, + self.get_historical_prices, + ], + ) + + def _get_aggregator(self): + """Lazy initialization dell'aggregatore""" + if self._aggregator is None: + from app.utils.market_data_aggregator import MarketDataAggregator + self._aggregator = MarketDataAggregator(self.currency) + self._aggregator.enable_aggregation(self._aggregation_enabled) + return self._aggregator - def get_product(self, asset_id): + def get_product(self, asset_id: str) -> Optional[ProductInfo]: + """Ottieni informazioni su un prodotto specifico""" + if self._aggregation_enabled: + return self._get_aggregator().get_product(asset_id) return self.wrappers.try_call(lambda w: w.get_product(asset_id)) - def get_products(self, asset_ids: list): + + def get_products(self, asset_ids: List[str]) -> List[ProductInfo]: + """Ottieni informazioni su multiple prodotti""" + if self._aggregation_enabled: + return self._get_aggregator().get_products(asset_ids) return self.wrappers.try_call(lambda w: w.get_products(asset_ids)) - def get_all_products(self): + + def get_all_products(self) -> List[ProductInfo]: + """Ottieni tutti i prodotti disponibili""" + if self._aggregation_enabled: + return self._get_aggregator().get_all_products() return self.wrappers.try_call(lambda w: w.get_all_products()) - def get_historical_prices(self, asset_id = "BTC", limit: int = 100): + + def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> List[Price]: + """Ottieni dati storici dei prezzi""" + if self._aggregation_enabled: + return self._get_aggregator().get_historical_prices(asset_id, limit) return self.wrappers.try_call(lambda w: w.get_historical_prices(asset_id, limit)) + + # Metodi per controllare l'aggregazione + def enable_aggregation(self, enabled: bool = True): + """Abilita/disabilita la modalità aggregazione""" + self._aggregation_enabled = enabled + if self._aggregator: + self._aggregator.enable_aggregation(enabled) + + def is_aggregation_enabled(self) -> bool: + """Verifica se l'aggregazione è abilitata""" + return self._aggregation_enabled + + # Metodo speciale per debugging (opzionale) + def get_aggregated_product_with_debug(self, asset_id: str) -> dict: + """ + Metodo speciale per ottenere dati aggregati con informazioni di debug. + Disponibile solo quando l'aggregazione è abilitata. + """ + if not self._aggregation_enabled: + raise RuntimeError("L'aggregazione deve essere abilitata per usare questo metodo") + return self._get_aggregator().get_aggregated_product_with_debug(asset_id) diff --git a/src/app/markets/binance_public.py b/src/app/markets/binance_public.py index 598840b..c1d9896 100644 --- a/src/app/markets/binance_public.py +++ b/src/app/markets/binance_public.py @@ -9,7 +9,6 @@ from typing import Optional, Dict, Any from datetime import datetime, timedelta from binance.client import Client from .base import BaseWrapper, ProductInfo, Price -from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError class PublicBinanceAgent(BaseWrapper): @@ -38,8 +37,6 @@ class PublicBinanceAgent(BaseWrapper): return asset_id return f"{asset_id}USDT" - @retry_on_failure(max_retries=3, delay=1.0) - @handle_api_errors def get_product(self, asset_id: str) -> ProductInfo: """ Ottiene informazioni su un singolo prodotto. @@ -59,8 +56,6 @@ class PublicBinanceAgent(BaseWrapper): print(f"Errore nel recupero del prodotto {asset_id}: {e}") return ProductInfo(id=asset_id, symbol=asset_id) - @retry_on_failure(max_retries=3, delay=1.0) - @handle_api_errors def get_products(self, asset_ids: list[str]) -> list[ProductInfo]: """ Ottiene informazioni su più prodotti. @@ -77,8 +72,6 @@ class PublicBinanceAgent(BaseWrapper): products.append(product) return products - @retry_on_failure(max_retries=3, delay=1.0) - @handle_api_errors def get_all_products(self) -> list[ProductInfo]: """ Ottiene informazioni su tutti i prodotti disponibili. @@ -90,8 +83,6 @@ class PublicBinanceAgent(BaseWrapper): major_assets = ["BTC", "ETH", "BNB", "ADA", "DOT", "LINK", "LTC", "XRP"] return self.get_products(major_assets) - @retry_on_failure(max_retries=3, delay=1.0) - @handle_api_errors def get_historical_prices(self, asset_id: str = "BTC") -> list[Price]: """ Ottiene i prezzi storici per un asset. diff --git a/src/app/markets/error_handler.py b/src/app/markets/error_handler.py deleted file mode 100644 index c98301a..0000000 --- a/src/app/markets/error_handler.py +++ /dev/null @@ -1,237 +0,0 @@ -""" -Modulo per la gestione robusta degli errori nei market providers. - -Fornisce decoratori e utilità per: -- Retry automatico con backoff esponenziale -- Logging standardizzato degli errori -- Gestione di timeout e rate limiting -- Fallback tra provider multipli -""" - -import time -import logging -from functools import wraps -from typing import Any, Callable, Optional, Type, Union, List -from requests.exceptions import RequestException, Timeout, ConnectionError -from binance.exceptions import BinanceAPIException, BinanceRequestException -from .base import ProductInfo - -# Configurazione logging -logger = logging.getLogger(__name__) - -class MarketAPIError(Exception): - """Eccezione base per errori delle API di mercato.""" - pass - -class RateLimitError(MarketAPIError): - """Eccezione per errori di rate limiting.""" - pass - -class AuthenticationError(MarketAPIError): - """Eccezione per errori di autenticazione.""" - pass - -class DataNotFoundError(MarketAPIError): - """Eccezione quando i dati richiesti non sono disponibili.""" - pass - -def retry_on_failure( - max_retries: int = 3, - delay: float = 1.0, - backoff_factor: float = 2.0, - exceptions: tuple = (RequestException, BinanceAPIException, BinanceRequestException) -) -> Callable: - """ - Decoratore per retry automatico con backoff esponenziale. - - Args: - max_retries: Numero massimo di tentativi - delay: Delay iniziale in secondi - backoff_factor: Fattore di moltiplicazione per il delay - exceptions: Tuple di eccezioni da catturare per il retry - - Returns: - Decoratore per la funzione - """ - def decorator(func: Callable) -> Callable: - @wraps(func) - def wrapper(*args, **kwargs) -> Any: - last_exception = None - current_delay = delay - - for attempt in range(max_retries + 1): - try: - return func(*args, **kwargs) - except exceptions as e: - last_exception = e - - if attempt == max_retries: - logger.error( - f"Function {func.__name__} failed after {max_retries + 1} attempts. " - f"Last error: {str(e)}" - ) - raise MarketAPIError(f"Max retries exceeded: {str(e)}") from e - - logger.warning( - f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: {str(e)}. " - f"Retrying in {current_delay:.1f}s..." - ) - - time.sleep(current_delay) - current_delay *= backoff_factor - except Exception as e: - # Per eccezioni non previste, non fare retry - logger.error(f"Unexpected error in {func.__name__}: {str(e)}") - raise - - # Questo non dovrebbe mai essere raggiunto - if last_exception: - raise last_exception - else: - raise MarketAPIError("Unknown error occurred") - - return wrapper - return decorator - -def handle_api_errors(func: Callable) -> Callable: - """ - Decoratore per gestione standardizzata degli errori API. - - Converte errori specifici dei provider in eccezioni standardizzate. - """ - @wraps(func) - def wrapper(*args, **kwargs) -> Any: - try: - return func(*args, **kwargs) - except BinanceAPIException as e: - if e.code == -1021: # Timestamp error - raise MarketAPIError(f"Binance timestamp error: {e.message}") - elif e.code == -1003: # Rate limit - raise RateLimitError(f"Binance rate limit exceeded: {e.message}") - elif e.code in [-2014, -2015]: # API key errors - raise AuthenticationError(f"Binance authentication error: {e.message}") - else: - raise MarketAPIError(f"Binance API error [{e.code}]: {e.message}") - except ConnectionError as e: - raise MarketAPIError(f"Connection error: {str(e)}") - except Timeout as e: - raise MarketAPIError(f"Request timeout: {str(e)}") - except RequestException as e: - raise MarketAPIError(f"Request error: {str(e)}") - except Exception as e: - logger.error(f"Unexpected error in {func.__name__}: {str(e)}") - raise MarketAPIError(f"Unexpected error: {str(e)}") from e - - return wrapper - -def safe_execute( - func: Callable, - default_value: Any = None, - log_errors: bool = True, - error_message: Optional[str] = None -) -> Any: - """ - Esegue una funzione in modo sicuro, restituendo un valore di default in caso di errore. - - Args: - func: Funzione da eseguire - default_value: Valore da restituire in caso di errore - log_errors: Se loggare gli errori - error_message: Messaggio di errore personalizzato - - Returns: - Risultato della funzione o valore di default - """ - try: - return func() - except Exception as e: - if log_errors: - message = error_message or f"Error executing {func.__name__}" - logger.warning(f"{message}: {str(e)}") - return default_value - -class ProviderFallback: - """ - Classe per gestire il fallback tra provider multipli. - """ - - def __init__(self, providers: List[Any]): - """ - Inizializza con una lista di provider ordinati per priorità. - - Args: - providers: Lista di provider ordinati per priorità - """ - self.providers = providers - - def execute_with_fallback( - self, - method_name: str, - *args, - **kwargs - ) -> list[ProductInfo]: - """ - Esegue un metodo su tutti i provider fino a trovarne uno che funziona. - - Args: - method_name: Nome del metodo da chiamare - *args: Argomenti posizionali - **kwargs: Argomenti nominali - - Returns: - Risultato del primo provider che funziona - - Raises: - MarketAPIError: Se tutti i provider falliscono - """ - last_error = None - - for i, provider in enumerate(self.providers): - try: - if hasattr(provider, method_name): - method = getattr(provider, method_name) - result = method(*args, **kwargs) - - if i > 0: # Se non è il primo provider - logger.info(f"Fallback successful: used provider {type(provider).__name__}") - - return result - else: - logger.warning(f"Provider {type(provider).__name__} doesn't have method {method_name}") - continue - - except Exception as e: - last_error = e - logger.warning( - f"Provider {type(provider).__name__} failed for {method_name}: {str(e)}" - ) - continue - - # Se arriviamo qui, tutti i provider hanno fallito - raise MarketAPIError( - f"All providers failed for method {method_name}. Last error: {str(last_error)}" - ) - -def validate_response_data(data: Any, required_fields: Optional[List[str]] = None) -> bool: - """ - Valida che i dati di risposta contengano i campi richiesti. - - Args: - data: Dati da validare - required_fields: Lista di campi richiesti - - Returns: - True se i dati sono validi, False altrimenti - """ - if data is None: - return False - - if required_fields is None: - return True - - if isinstance(data, dict): - return all(field in data for field in required_fields) - elif hasattr(data, '__dict__'): - return all(hasattr(data, field) for field in required_fields) - - return False \ No newline at end of file diff --git a/src/app/pipeline.py b/src/app/pipeline.py index f515053..7a440de 100644 --- a/src/app/pipeline.py +++ b/src/app/pipeline.py @@ -6,7 +6,6 @@ from agno.utils.log import log_info from app.agents.market_agent import MarketAgent from app.agents.news_agent import NewsAgent from app.agents.social_agent import SocialAgent -from app.markets import MarketAPIs from app.models import AppModels from app.predictor import PredictorStyle, PredictorInput, PredictorOutput, PREDICTOR_INSTRUCTIONS diff --git a/src/app/toolkits/market_toolkit.py b/src/app/toolkits/market_toolkit.py index ff6e48d..61a4d9f 100644 --- a/src/app/toolkits/market_toolkit.py +++ b/src/app/toolkits/market_toolkit.py @@ -1,5 +1,5 @@ from agno.tools import Toolkit -from app.markets import MarketAPIs +from app.markets import MarketAPIsTool # TODO (?) in futuro fare in modo che la LLM faccia da sé per il mercato @@ -9,7 +9,7 @@ from app.markets import MarketAPIs # in base alle sue proprie chiamate API class MarketToolkit(Toolkit): def __init__(self): - self.market_api = MarketAPIs("USD") # change currency if needed + self.market_api = MarketAPIsTool("USD") # change currency if needed super().__init__( name="Market Toolkit", diff --git a/src/app/utils/aggregated_models.py b/src/app/utils/aggregated_models.py new file mode 100644 index 0000000..8eba8a5 --- /dev/null +++ b/src/app/utils/aggregated_models.py @@ -0,0 +1,184 @@ +import statistics +from typing import Dict, List, Optional, Set +from pydantic import BaseModel, Field, PrivateAttr +from app.markets.base import ProductInfo + +class AggregationMetadata(BaseModel): + """Metadati nascosti per debugging e audit trail""" + sources_used: Set[str] = Field(default_factory=set, description="Exchange usati nell'aggregazione") + sources_ignored: Set[str] = Field(default_factory=set, description="Exchange ignorati (errori)") + aggregation_timestamp: str = Field(default="", description="Timestamp dell'aggregazione") + confidence_score: float = Field(default=0.0, description="Score 0-1 sulla qualità dei dati") + + class Config: + # Nasconde questi campi dalla serializzazione di default + extra = "forbid" + +class AggregatedProductInfo(ProductInfo): + """ + Versione aggregata di ProductInfo che mantiene la trasparenza per l'utente finale + mentre fornisce metadati di debugging opzionali. + """ + + # Override dei campi con logica di aggregazione + id: str = Field(description="ID aggregato basato sul simbolo standardizzato") + status: str = Field(description="Status aggregato (majority vote o conservative)") + + # Campi privati per debugging (non visibili di default) + _metadata: Optional[AggregationMetadata] = PrivateAttr(default=None) + _source_data: Optional[Dict[str, ProductInfo]] = PrivateAttr(default=None) + + @classmethod + def from_multiple_sources(cls, products: List[ProductInfo]) -> 'AggregatedProductInfo': + """ + Crea un AggregatedProductInfo da una lista di ProductInfo. + Usa strategie intelligenti per gestire ID e status. + """ + if not products: + raise ValueError("Nessun prodotto da aggregare") + + # Raggruppa per symbol (la chiave vera per l'aggregazione) + symbol_groups = {} + for product in products: + if product.symbol not in symbol_groups: + symbol_groups[product.symbol] = [] + symbol_groups[product.symbol].append(product) + + # Per ora gestiamo un symbol alla volta + if len(symbol_groups) > 1: + raise ValueError(f"Simboli multipli non supportati: {list(symbol_groups.keys())}") + + symbol_products = list(symbol_groups.values())[0] + + # Estrai tutte le fonti + sources = [] + for product in symbol_products: + # Determina la fonte dall'ID o da altri metadati se disponibili + source = cls._detect_source(product) + sources.append(source) + + # Aggrega i dati + aggregated_data = cls._aggregate_products(symbol_products, sources) + + # Crea l'istanza e assegna gli attributi privati + instance = cls(**aggregated_data) + instance._metadata = aggregated_data.get("_metadata") + instance._source_data = aggregated_data.get("_source_data") + + return instance + + @staticmethod + def _detect_source(product: ProductInfo) -> str: + """Rileva la fonte da un ProductInfo""" + # Strategia semplice: usa pattern negli ID + if "coinbase" in product.id.lower() or "cb" in product.id.lower(): + return "coinbase" + elif "binance" in product.id.lower() or "bn" in product.id.lower(): + return "binance" + elif "crypto" in product.id.lower() or "cc" in product.id.lower(): + return "cryptocompare" + else: + return "unknown" + + @classmethod + def _aggregate_products(cls, products: List[ProductInfo], sources: List[str]) -> dict: + """ + Logica di aggregazione principale. + Gestisce ID, status e altri campi numerici. + """ + import statistics + from datetime import datetime + + # ID: usa il symbol come chiave standardizzata + symbol = products[0].symbol + aggregated_id = f"{symbol}_AGG" + + # Status: strategia "conservativa" - il più restrittivo vince + # Ordine: trading_only < limit_only < auction < maintenance < offline + status_priority = { + "trading": 1, + "limit_only": 2, + "auction": 3, + "maintenance": 4, + "offline": 5, + "": 0 # Default se non specificato + } + + statuses = [p.status for p in products if p.status] + if statuses: + # Prendi lo status con priorità più alta (più restrittivo) + aggregated_status = max(statuses, key=lambda s: status_priority.get(s, 0)) + else: + aggregated_status = "trading" # Default ottimistico + + # Prezzo: media semplice (uso diretto del campo price come float) + prices = [p.price for p in products if p.price > 0] + aggregated_price = statistics.mean(prices) if prices else 0.0 + + # Volume: somma (assumendo che i volumi siano esclusivi per exchange) + volumes = [p.volume_24h for p in products if p.volume_24h > 0] + total_volume = sum(volumes) + aggregated_volume = sum(price_i * volume_i for price_i, volume_i in zip((p.price for p in products), (volume for volume in volumes))) / total_volume + aggregated_volume = round(aggregated_volume, 5) + # aggregated_volume = sum(volumes) if volumes else 0.0 # NOTE old implementation + + # Valuta: prendi la prima (dovrebbero essere tutte uguali) + quote_currency = next((p.quote_currency for p in products if p.quote_currency), "USD") + + # Calcola confidence score + confidence = cls._calculate_confidence(products, sources) + + # Crea metadati per debugging + metadata = AggregationMetadata( + sources_used=set(sources), + aggregation_timestamp=datetime.now().isoformat(), + confidence_score=confidence + ) + + # Salva dati sorgente per debugging + source_data = dict(zip(sources, products)) + + return { + "symbol": symbol, + "price": aggregated_price, + "volume_24h": aggregated_volume, + "quote_currency": quote_currency, + "id": aggregated_id, + "status": aggregated_status, + "_metadata": metadata, + "_source_data": source_data + } + + @staticmethod + def _calculate_confidence(products: List[ProductInfo], sources: List[str]) -> float: + """Calcola un punteggio di confidenza 0-1""" + if not products: + return 0.0 + + score = 1.0 + + # Riduci score se pochi dati + if len(products) < 2: + score *= 0.7 + + # Riduci score se prezzi troppo diversi + prices = [p.price for p in products if p.price > 0] + if len(prices) > 1: + price_std = (max(prices) - min(prices)) / statistics.mean(prices) + if price_std > 0.05: # >5% variazione + score *= 0.8 + + # Riduci score se fonti sconosciute + unknown_sources = sum(1 for s in sources if s == "unknown") + if unknown_sources > 0: + score *= (1 - unknown_sources / len(sources)) + + return max(0.0, min(1.0, score)) + + def get_debug_info(self) -> dict: + """Metodo opzionale per ottenere informazioni di debug""" + return { + "aggregated_product": self.dict(), + "metadata": self._metadata.dict() if self._metadata else None, + "sources": list(self._source_data.keys()) if self._source_data else [] + } \ No newline at end of file diff --git a/src/app/utils/market_data_aggregator.py b/src/app/utils/market_data_aggregator.py new file mode 100644 index 0000000..f72e91c --- /dev/null +++ b/src/app/utils/market_data_aggregator.py @@ -0,0 +1,184 @@ +from typing import List, Optional, Dict, Any +from app.markets.base import ProductInfo, Price +from app.utils.aggregated_models import AggregatedProductInfo + +class MarketDataAggregator: + """ + Aggregatore di dati di mercato che mantiene la trasparenza per l'utente. + + Compone MarketAPIs per fornire gli stessi metodi, ma restituisce dati aggregati + da tutte le fonti disponibili. L'utente finale non vede la complessità. + """ + + def __init__(self, currency: str = "USD"): + # Import lazy per evitare circular import + from app.markets import MarketAPIs + self._market_apis = MarketAPIs(currency) + self._aggregation_enabled = True + + def get_product(self, asset_id: str) -> ProductInfo: + """ + Override che aggrega dati da tutte le fonti disponibili. + Per l'utente sembra un normale ProductInfo. + """ + if not self._aggregation_enabled: + return self._market_apis.get_product(asset_id) + + # Raccogli dati da tutte le fonti + try: + raw_results = self.wrappers.try_call_all( + lambda wrapper: wrapper.get_product(asset_id) + ) + + # Converti in ProductInfo se necessario + products = [] + for wrapper_class, result in raw_results.items(): + if isinstance(result, ProductInfo): + products.append(result) + elif isinstance(result, dict): + # Converti dizionario in ProductInfo + products.append(ProductInfo(**result)) + + if not products: + raise Exception("Nessun dato disponibile") + + # Aggrega i risultati + aggregated = AggregatedProductInfo.from_multiple_sources(products) + + # Restituisci come ProductInfo normale (nascondi la complessità) + return ProductInfo(**aggregated.dict(exclude={"_metadata", "_source_data"})) + + except Exception as e: + # Fallback: usa il comportamento normale se l'aggregazione fallisce + return self._market_apis.get_product(asset_id) + + def get_products(self, asset_ids: List[str]) -> List[ProductInfo]: + """ + Aggrega dati per multiple asset. + """ + if not self._aggregation_enabled: + return self._market_apis.get_products(asset_ids) + + aggregated_products = [] + + for asset_id in asset_ids: + try: + product = self.get_product(asset_id) + aggregated_products.append(product) + except Exception as e: + # Salta asset che non riescono ad aggregare + continue + + return aggregated_products + + def get_all_products(self) -> List[ProductInfo]: + """ + Aggrega tutti i prodotti disponibili. + """ + if not self._aggregation_enabled: + return self._market_apis.get_all_products() + + # Raccogli tutti i prodotti da tutte le fonti + try: + all_products_by_source = self.wrappers.try_call_all( + lambda wrapper: wrapper.get_all_products() + ) + + # Raggruppa per symbol per aggregare + symbol_groups = {} + for wrapper_class, products in all_products_by_source.items(): + if not isinstance(products, list): + continue + + for product in products: + if isinstance(product, dict): + product = ProductInfo(**product) + + if product.symbol not in symbol_groups: + symbol_groups[product.symbol] = [] + symbol_groups[product.symbol].append(product) + + # Aggrega ogni gruppo + aggregated_products = [] + for symbol, products in symbol_groups.items(): + try: + aggregated = AggregatedProductInfo.from_multiple_sources(products) + # Restituisci come ProductInfo normale + aggregated_products.append( + ProductInfo(**aggregated.dict(exclude={"_metadata", "_source_data"})) + ) + except Exception: + # Se l'aggregazione fallisce, usa il primo disponibile + if products: + aggregated_products.append(products[0]) + + return aggregated_products + + except Exception as e: + # Fallback: usa il comportamento normale + return self._market_apis.get_all_products() + + def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> List[Price]: + """ + Per i dati storici, usa una strategia diversa: + prendi i dati dalla fonte più affidabile o aggrega se possibile. + """ + if not self._aggregation_enabled: + return self._market_apis.get_historical_prices(asset_id, limit) + + # Per dati storici, usa il primo wrapper che funziona + # (l'aggregazione di dati storici è più complessa) + try: + return self.wrappers.try_call( + lambda wrapper: wrapper.get_historical_prices(asset_id, limit) + ) + except Exception as e: + # Fallback: usa il comportamento normale + return self._market_apis.get_historical_prices(asset_id, limit) + + def enable_aggregation(self, enabled: bool = True): + """Abilita o disabilita l'aggregazione""" + self._aggregation_enabled = enabled + + def is_aggregation_enabled(self) -> bool: + """Controlla se l'aggregazione è abilitata""" + return self._aggregation_enabled + + # Metodi proxy per completare l'interfaccia BaseWrapper + @property + def wrappers(self): + """Accesso al wrapper handler per compatibilità""" + return self._market_apis.wrappers + + def get_aggregated_product_with_debug(self, asset_id: str) -> Dict[str, Any]: + """ + Metodo speciale per debugging: restituisce dati aggregati con metadati. + Usato solo per testing e monitoraggio. + """ + try: + raw_results = self.wrappers.try_call_all( + lambda wrapper: wrapper.get_product(asset_id) + ) + + products = [] + for wrapper_class, result in raw_results.items(): + if isinstance(result, ProductInfo): + products.append(result) + elif isinstance(result, dict): + products.append(ProductInfo(**result)) + + if not products: + raise Exception("Nessun dato disponibile") + + aggregated = AggregatedProductInfo.from_multiple_sources(products) + + return { + "product": aggregated.dict(exclude={"_metadata", "_source_data"}), + "debug": aggregated.get_debug_info() + } + + except Exception as e: + return { + "error": str(e), + "debug": {"error": str(e)} + } \ No newline at end of file diff --git a/tests/agents/test_market.py b/tests/agents/test_market.py index 5fef76e..10e5cb4 100644 --- a/tests/agents/test_market.py +++ b/tests/agents/test_market.py @@ -1,12 +1,12 @@ import os import pytest from app.agents.market_agent import MarketToolkit -from app.markets import MarketAPIs +from app.markets import MarketAPIsTool @pytest.mark.limited # usa molte api calls e non voglio esaurire le chiavi api -class TestMarketAPIs: +class TestMarketAPIsTool: def test_wrapper_initialization(self): - market_wrapper = MarketAPIs("USD") + market_wrapper = MarketAPIsTool("USD") assert market_wrapper is not None assert hasattr(market_wrapper, 'get_product') assert hasattr(market_wrapper, 'get_products') @@ -14,7 +14,7 @@ class TestMarketAPIs: assert hasattr(market_wrapper, 'get_historical_prices') def test_wrapper_capabilities(self): - market_wrapper = MarketAPIs("USD") + market_wrapper = MarketAPIsTool("USD") capabilities = [] if hasattr(market_wrapper, 'get_product'): capabilities.append('single_product') @@ -25,7 +25,7 @@ class TestMarketAPIs: assert len(capabilities) > 0 def test_market_data_retrieval(self): - market_wrapper = MarketAPIs("USD") + market_wrapper = MarketAPIsTool("USD") btc_product = market_wrapper.get_product("BTC") assert btc_product is not None assert hasattr(btc_product, 'symbol') @@ -55,14 +55,14 @@ class TestMarketAPIs: def test_error_handling(self): try: - market_wrapper = MarketAPIs("USD") + market_wrapper = MarketAPIsTool("USD") fake_product = market_wrapper.get_product("NONEXISTENT_CRYPTO_SYMBOL_12345") assert fake_product is None or fake_product.price == 0 except Exception as e: pass def test_wrapper_currency_support(self): - market_wrapper = MarketAPIs("USD") + market_wrapper = MarketAPIsTool("USD") assert hasattr(market_wrapper, 'currency') assert isinstance(market_wrapper.currency, str) assert len(market_wrapper.currency) >= 3 # USD, EUR, etc. diff --git a/tests/test_market_data_aggregator.py b/tests/test_market_data_aggregator.py new file mode 100644 index 0000000..604362c --- /dev/null +++ b/tests/test_market_data_aggregator.py @@ -0,0 +1,90 @@ +import pytest + +from app.utils.market_data_aggregator import MarketDataAggregator +from app.utils.aggregated_models import AggregatedProductInfo +from app.markets.base import ProductInfo, Price + + +class TestMarketDataAggregator: + + def test_initialization(self): + """Test che il MarketDataAggregator si inizializzi correttamente""" + aggregator = MarketDataAggregator() + assert aggregator is not None + assert aggregator.is_aggregation_enabled() == True + + def test_aggregation_toggle(self): + """Test del toggle dell'aggregazione""" + aggregator = MarketDataAggregator() + + # Disabilita aggregazione + aggregator.enable_aggregation(False) + assert aggregator.is_aggregation_enabled() == False + + # Riabilita aggregazione + aggregator.enable_aggregation(True) + assert aggregator.is_aggregation_enabled() == True + + def test_aggregated_product_info_creation(self): + """Test creazione AggregatedProductInfo da fonti multiple""" + + # Crea dati di esempio + product1 = ProductInfo( + id="BTC-USD", + symbol="BTC-USD", + price=50000.0, + volume_24h=1000.0, + status="active", + quote_currency="USD" + ) + + product2 = ProductInfo( + id="BTC-USD", + symbol="BTC-USD", + price=50100.0, + volume_24h=1100.0, + status="active", + quote_currency="USD" + ) + + # Aggrega i prodotti + aggregated = AggregatedProductInfo.from_multiple_sources([product1, product2]) + + assert aggregated.symbol == "BTC-USD" + assert aggregated.price == pytest.approx(50050.0, rel=1e-3) # media tra 50000 e 50100 + assert aggregated.volume_24h == 50052.38095 # somma dei volumi + assert aggregated.status == "active" # majority vote + assert aggregated.id == "BTC-USD_AGG" # mapping_id con suffisso aggregazione + + def test_confidence_calculation(self): + """Test del calcolo della confidence""" + + product1 = ProductInfo( + id="BTC-USD", + symbol="BTC-USD", + price=50000.0, + volume_24h=1000.0, + status="active", + quote_currency="USD" + ) + + product2 = ProductInfo( + id="BTC-USD", + symbol="BTC-USD", + price=50100.0, + volume_24h=1100.0, + status="active", + quote_currency="USD" + ) + + aggregated = AggregatedProductInfo.from_multiple_sources([product1, product2]) + + # Verifica che ci siano metadati + assert aggregated._metadata is not None + assert len(aggregated._metadata.sources_used) > 0 + assert aggregated._metadata.aggregation_timestamp != "" + # La confidence può essere 0.0 se ci sono fonti "unknown" + + +if __name__ == "__main__": + pytest.main([__file__]) \ No newline at end of file