diff --git a/src/app/markets/__init__.py b/src/app/markets/__init__.py index a2efa16..d0eb6ad 100644 --- a/src/app/markets/__init__.py +++ b/src/app/markets/__init__.py @@ -1,36 +1,27 @@ -from typing import List, Optional from agno.tools import Toolkit from app.utils.wrapper_handler import WrapperHandler +from app.utils.market_aggregation import aggregate_product_info, aggregate_history_prices from .base import BaseWrapper, ProductInfo, Price from .coinbase import CoinBaseWrapper from .binance import BinanceWrapper from .cryptocompare import CryptoCompareWrapper from .yfinance import YFinanceWrapper -from .binance_public import PublicBinanceAgent -__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "PublicBinanceAgent" ] +__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper" ] class MarketAPIsTool(BaseWrapper, Toolkit): """ - Classe per gestire le API di mercato disponibili. - - Supporta due modalità: - 1. **Modalità standard** (default): usa il primo wrapper disponibile - 2. **Modalità aggregazione**: aggrega dati da tutte le fonti disponibili - - L'aggregazione può essere abilitata/disabilitata dinamicamente. + Classe per comporre più MarketAPI con gestione degli errori e aggregazione dei dati. + Usa WrapperHandler per gestire più API con logica di retry e failover. + Si può scegliere se aggregare i dati da tutte le fonti o usare una singola fonte tramite delle chiamate apposta. """ - def __init__(self, currency: str = "USD", enable_aggregation: bool = False): + def __init__(self, currency: str = "USD"): kwargs = {"currency": currency or "USD"} wrappers = [ BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper, YFinanceWrapper ] self.wrappers: WrapperHandler[BaseWrapper] = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs) - # Inizializza l'aggregatore solo se richiesto (lazy initialization) - self._aggregator = None - self._aggregation_enabled = enable_aggregation - Toolkit.__init__( self, name="Market APIs Toolkit", @@ -39,61 +30,45 @@ class MarketAPIsTool(BaseWrapper, Toolkit): self.get_products, self.get_all_products, self.get_historical_prices, + self.get_products_aggregated, + self.get_historical_prices_aggregated, ], ) - def _get_aggregator(self): - """Lazy initialization dell'aggregatore""" - if self._aggregator is None: - from app.utils.market_data_aggregator import MarketDataAggregator - self._aggregator = MarketDataAggregator(self.currency) - self._aggregator.enable_aggregation(self._aggregation_enabled) - return self._aggregator - - def get_product(self, asset_id: str) -> Optional[ProductInfo]: - """Ottieni informazioni su un prodotto specifico""" - if self._aggregation_enabled: - return self._get_aggregator().get_product(asset_id) + def get_product(self, asset_id: str) -> ProductInfo: return self.wrappers.try_call(lambda w: w.get_product(asset_id)) - - def get_products(self, asset_ids: List[str]) -> List[ProductInfo]: - """Ottieni informazioni su multiple prodotti""" - if self._aggregation_enabled: - return self._get_aggregator().get_products(asset_ids) + def get_products(self, asset_ids: list[str]) -> list[ProductInfo]: return self.wrappers.try_call(lambda w: w.get_products(asset_ids)) - - def get_all_products(self) -> List[ProductInfo]: - """Ottieni tutti i prodotti disponibili""" - if self._aggregation_enabled: - return self._get_aggregator().get_all_products() + def get_all_products(self) -> list[ProductInfo]: return self.wrappers.try_call(lambda w: w.get_all_products()) - - def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> List[Price]: - """Ottieni dati storici dei prezzi""" - if self._aggregation_enabled: - return self._get_aggregator().get_historical_prices(asset_id, limit) + def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]: return self.wrappers.try_call(lambda w: w.get_historical_prices(asset_id, limit)) - # Metodi per controllare l'aggregazione - def enable_aggregation(self, enabled: bool = True): - """Abilita/disabilita la modalità aggregazione""" - self._aggregation_enabled = enabled - if self._aggregator: - self._aggregator.enable_aggregation(enabled) - def is_aggregation_enabled(self) -> bool: - """Verifica se l'aggregazione è abilitata""" - return self._aggregation_enabled + def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]: + """ + Restituisce i dati aggregati per una lista di asset_id.\n + Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento). + Args: + asset_ids (list[str]): Lista di asset_id da cercare. + Returns: + list[ProductInfo]: Lista di ProductInfo aggregati. + """ + all_products = self.wrappers.try_call_all(lambda w: w.get_products(asset_ids)) + return aggregate_product_info(all_products) - # Metodo speciale per debugging (opzionale) - def get_aggregated_product_with_debug(self, asset_id: str) -> dict: + def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]: """ - Metodo speciale per ottenere dati aggregati con informazioni di debug. - Disponibile solo quando l'aggregazione è abilitata. + Restituisce i dati storici aggregati per un asset_id. Usa i dati di tutte le fonti disponibili e li aggrega.\n + Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento). + Args: + asset_id (str): Asset ID da cercare. + limit (int): Numero massimo di dati storici da restituire. + Returns: + list[Price]: Lista di Price aggregati. """ - if not self._aggregation_enabled: - raise RuntimeError("L'aggregazione deve essere abilitata per usare questo metodo") - return self._get_aggregator().get_aggregated_product_with_debug(asset_id) + all_prices = self.wrappers.try_call_all(lambda w: w.get_historical_prices(asset_id, limit)) + return aggregate_history_prices(all_prices) # TODO definire istruzioni per gli agenti di mercato MARKET_INSTRUCTIONS = """ diff --git a/src/app/markets/base.py b/src/app/markets/base.py index 117c174..5761675 100644 --- a/src/app/markets/base.py +++ b/src/app/markets/base.py @@ -1,4 +1,3 @@ - from pydantic import BaseModel class BaseWrapper: diff --git a/src/app/utils/aggregated_models.py b/src/app/utils/aggregated_models.py deleted file mode 100644 index e4a5ff4..0000000 --- a/src/app/utils/aggregated_models.py +++ /dev/null @@ -1,186 +0,0 @@ -import statistics -from typing import Dict, List, Optional, Set -from pydantic import BaseModel, Field, PrivateAttr -from app.markets.base import ProductInfo - -class AggregationMetadata(BaseModel): - """Metadati nascosti per debugging e audit trail""" - sources_used: Set[str] = Field(default_factory=set, description="Exchange usati nell'aggregazione") - sources_ignored: Set[str] = Field(default_factory=set, description="Exchange ignorati (errori)") - aggregation_timestamp: str = Field(default="", description="Timestamp dell'aggregazione") - confidence_score: float = Field(default=0.0, description="Score 0-1 sulla qualità dei dati") - - class Config: - # Nasconde questi campi dalla serializzazione di default - extra = "forbid" - -class AggregatedProductInfo(ProductInfo): - """ - Versione aggregata di ProductInfo che mantiene la trasparenza per l'utente finale - mentre fornisce metadati di debugging opzionali. - """ - - # Override dei campi con logica di aggregazione - id: str = Field(description="ID aggregato basato sul simbolo standardizzato") - status: str = Field(description="Status aggregato (majority vote o conservative)") - - # Campi privati per debugging (non visibili di default) - _metadata: Optional[AggregationMetadata] = PrivateAttr(default=None) - _source_data: Optional[Dict[str, ProductInfo]] = PrivateAttr(default=None) - - @classmethod - def from_multiple_sources(cls, products: List[ProductInfo]) -> 'AggregatedProductInfo': - """ - Crea un AggregatedProductInfo da una lista di ProductInfo. - Usa strategie intelligenti per gestire ID e status. - """ - if not products: - raise ValueError("Nessun prodotto da aggregare") - - # Raggruppa per symbol (la chiave vera per l'aggregazione) - symbol_groups = {} - for product in products: - if product.symbol not in symbol_groups: - symbol_groups[product.symbol] = [] - symbol_groups[product.symbol].append(product) - - # Per ora gestiamo un symbol alla volta - if len(symbol_groups) > 1: - raise ValueError(f"Simboli multipli non supportati: {list(symbol_groups.keys())}") - - symbol_products = list(symbol_groups.values())[0] - - # Estrai tutte le fonti - sources = [] - for product in symbol_products: - # Determina la fonte dall'ID o da altri metadati se disponibili - source = cls._detect_source(product) - sources.append(source) - - # Aggrega i dati - aggregated_data = cls._aggregate_products(symbol_products, sources) - - # Crea l'istanza e assegna gli attributi privati - instance = cls(**aggregated_data) - instance._metadata = aggregated_data.get("_metadata") - instance._source_data = aggregated_data.get("_source_data") - - return instance - - @staticmethod - def _detect_source(product: ProductInfo) -> str: - """Rileva la fonte da un ProductInfo""" - # Strategia semplice: usa pattern negli ID - if "coinbase" in product.id.lower() or "cb" in product.id.lower(): - return "coinbase" - elif "binance" in product.id.lower() or "bn" in product.id.lower(): - return "binance" - elif "crypto" in product.id.lower() or "cc" in product.id.lower(): - return "cryptocompare" - elif "yfinance" in product.id.lower() or "yf" in product.id.lower(): - return "yfinance" - else: - return "unknown" - - @classmethod - def _aggregate_products(cls, products: List[ProductInfo], sources: List[str]) -> dict: - """ - Logica di aggregazione principale. - Gestisce ID, status e altri campi numerici. - """ - import statistics - from datetime import datetime - - # ID: usa il symbol come chiave standardizzata - symbol = products[0].symbol - aggregated_id = f"{symbol}_AGG" - - # Status: strategia "conservativa" - il più restrittivo vince - # Ordine: trading_only < limit_only < auction < maintenance < offline - status_priority = { - "trading": 1, - "limit_only": 2, - "auction": 3, - "maintenance": 4, - "offline": 5, - "": 0 # Default se non specificato - } - - statuses = [p.status for p in products if p.status] - if statuses: - # Prendi lo status con priorità più alta (più restrittivo) - aggregated_status = max(statuses, key=lambda s: status_priority.get(s, 0)) - else: - aggregated_status = "trading" # Default ottimistico - - # Prezzo: media semplice (uso diretto del campo price come float) - prices = [p.price for p in products if p.price > 0] - aggregated_price = statistics.mean(prices) if prices else 0.0 - - # Volume: somma (assumendo che i volumi siano esclusivi per exchange) - volumes = [p.volume_24h for p in products if p.volume_24h > 0] - total_volume = sum(volumes) - aggregated_volume = sum(price_i * volume_i for price_i, volume_i in zip((p.price for p in products), (volume for volume in volumes))) / total_volume - aggregated_volume = round(aggregated_volume, 5) - # aggregated_volume = sum(volumes) if volumes else 0.0 # NOTE old implementation - - # Valuta: prendi la prima (dovrebbero essere tutte uguali) - quote_currency = next((p.quote_currency for p in products if p.quote_currency), "USD") - - # Calcola confidence score - confidence = cls._calculate_confidence(products, sources) - - # Crea metadati per debugging - metadata = AggregationMetadata( - sources_used=set(sources), - aggregation_timestamp=datetime.now().isoformat(), - confidence_score=confidence - ) - - # Salva dati sorgente per debugging - source_data = dict(zip(sources, products)) - - return { - "symbol": symbol, - "price": aggregated_price, - "volume_24h": aggregated_volume, - "quote_currency": quote_currency, - "id": aggregated_id, - "status": aggregated_status, - "_metadata": metadata, - "_source_data": source_data - } - - @staticmethod - def _calculate_confidence(products: List[ProductInfo], sources: List[str]) -> float: - """Calcola un punteggio di confidenza 0-1""" - if not products: - return 0.0 - - score = 1.0 - - # Riduci score se pochi dati - if len(products) < 2: - score *= 0.7 - - # Riduci score se prezzi troppo diversi - prices = [p.price for p in products if p.price > 0] - if len(prices) > 1: - price_std = (max(prices) - min(prices)) / statistics.mean(prices) - if price_std > 0.05: # >5% variazione - score *= 0.8 - - # Riduci score se fonti sconosciute - unknown_sources = sum(1 for s in sources if s == "unknown") - if unknown_sources > 0: - score *= (1 - unknown_sources / len(sources)) - - return max(0.0, min(1.0, score)) - - def get_debug_info(self) -> dict: - """Metodo opzionale per ottenere informazioni di debug""" - return { - "aggregated_product": self.dict(), - "metadata": self._metadata.dict() if self._metadata else None, - "sources": list(self._source_data.keys()) if self._source_data else [] - } \ No newline at end of file diff --git a/src/app/utils/market_aggregation.py b/src/app/utils/market_aggregation.py new file mode 100644 index 0000000..3d7b6b8 --- /dev/null +++ b/src/app/utils/market_aggregation.py @@ -0,0 +1,82 @@ +import statistics +from app.markets.base import ProductInfo, Price + + +def aggregate_history_prices(prices: dict[str, list[Price]]) -> list[float]: + """Aggrega i prezzi storici per symbol calcolando la media""" + raise NotImplementedError("Funzione non ancora implementata per problemi di timestamp he deve essere uniformato prima di usare questa funzione.") + # TODO implementare l'aggregazione dopo aver modificato la classe Price in modo che abbia un timestamp integer + # aggregated_prices = [] + # for timestamp in range(len(next(iter(prices.values())))): + # timestamp_prices = [ + # price_list[timestamp].price + # for price_list in prices.values() + # if len(price_list) > timestamp and price_list[timestamp].price is not None + # ] + # if timestamp_prices: + # aggregated_prices.append(statistics.mean(timestamp_prices)) + # else: + # aggregated_prices.append(None) + # return aggregated_prices + +def aggregate_product_info(products: dict[str, list[ProductInfo]]) -> list[ProductInfo]: + """ + Aggrega una lista di ProductInfo per symbol. + """ + + # Costruzione mappa symbol -> lista di ProductInfo + symbols_infos: dict[str, list[ProductInfo]] = {} + for _, product_list in products.items(): + for product in product_list: + symbols_infos.setdefault(product.symbol, []).append(product) + + # Aggregazione per ogni symbol + sources = list(products.keys()) + aggregated_products = [] + for symbol, product_list in symbols_infos.items(): + product = ProductInfo() + + product.id = f"{symbol}_AGG" + product.symbol = symbol + product.quote_currency = next(p.quote_currency for p in product_list if p.quote_currency) + + statuses = {} + for p in product_list: + statuses[p.status] = statuses.get(p.status, 0) + 1 + product.status = max(statuses, key=statuses.get) if statuses else "" + + prices = [p.price for p in product_list] + product.price = statistics.mean(prices) + + volumes = [p.volume_24h for p in product_list] + product.volume_24h = sum([p * v for p, v in zip(prices, volumes)]) / sum(volumes) + aggregated_products.append(product) + + confidence = _calculate_confidence(product_list, sources) # TODO necessary? + + return aggregated_products + +def _calculate_confidence(products: list[ProductInfo], sources: list[str]) -> float: + """Calcola un punteggio di confidenza 0-1""" + if not products: + return 0.0 + + score = 1.0 + + # Riduci score se pochi dati + if len(products) < 2: + score *= 0.7 + + # Riduci score se prezzi troppo diversi + prices = [p.price for p in products if p.price > 0] + if len(prices) > 1: + price_std = (max(prices) - min(prices)) / statistics.mean(prices) + if price_std > 0.05: # >5% variazione + score *= 0.8 + + # Riduci score se fonti sconosciute + unknown_sources = sum(1 for s in sources if s == "unknown") + if unknown_sources > 0: + score *= (1 - unknown_sources / len(sources)) + + return max(0.0, min(1.0, score)) diff --git a/src/app/utils/market_data_aggregator.py b/src/app/utils/market_data_aggregator.py deleted file mode 100644 index 9c6206d..0000000 --- a/src/app/utils/market_data_aggregator.py +++ /dev/null @@ -1,184 +0,0 @@ -from typing import List, Optional, Dict, Any -from app.markets.base import ProductInfo, Price -from app.utils.aggregated_models import AggregatedProductInfo - -class MarketDataAggregator: - """ - Aggregatore di dati di mercato che mantiene la trasparenza per l'utente. - - Compone MarketAPIs per fornire gli stessi metodi, ma restituisce dati aggregati - da tutte le fonti disponibili. L'utente finale non vede la complessità. - """ - - def __init__(self, currency: str = "USD"): - # Import lazy per evitare circular import - from app.markets import MarketAPIsTool - self._market_apis = MarketAPIsTool(currency) - self._aggregation_enabled = True - - def get_product(self, asset_id: str) -> ProductInfo: - """ - Override che aggrega dati da tutte le fonti disponibili. - Per l'utente sembra un normale ProductInfo. - """ - if not self._aggregation_enabled: - return self._market_apis.get_product(asset_id) - - # Raccogli dati da tutte le fonti - try: - raw_results = self.wrappers.try_call_all( - lambda wrapper: wrapper.get_product(asset_id) - ) - - # Converti in ProductInfo se necessario - products = [] - for wrapper_class, result in raw_results.items(): - if isinstance(result, ProductInfo): - products.append(result) - elif isinstance(result, dict): - # Converti dizionario in ProductInfo - products.append(ProductInfo(**result)) - - if not products: - raise Exception("Nessun dato disponibile") - - # Aggrega i risultati - aggregated = AggregatedProductInfo.from_multiple_sources(products) - - # Restituisci come ProductInfo normale (nascondi la complessità) - return ProductInfo(**aggregated.dict(exclude={"_metadata", "_source_data"})) - - except Exception as e: - # Fallback: usa il comportamento normale se l'aggregazione fallisce - return self._market_apis.get_product(asset_id) - - def get_products(self, asset_ids: List[str]) -> List[ProductInfo]: - """ - Aggrega dati per multiple asset. - """ - if not self._aggregation_enabled: - return self._market_apis.get_products(asset_ids) - - aggregated_products = [] - - for asset_id in asset_ids: - try: - product = self.get_product(asset_id) - aggregated_products.append(product) - except Exception as e: - # Salta asset che non riescono ad aggregare - continue - - return aggregated_products - - def get_all_products(self) -> List[ProductInfo]: - """ - Aggrega tutti i prodotti disponibili. - """ - if not self._aggregation_enabled: - return self._market_apis.get_all_products() - - # Raccogli tutti i prodotti da tutte le fonti - try: - all_products_by_source = self.wrappers.try_call_all( - lambda wrapper: wrapper.get_all_products() - ) - - # Raggruppa per symbol per aggregare - symbol_groups = {} - for wrapper_class, products in all_products_by_source.items(): - if not isinstance(products, list): - continue - - for product in products: - if isinstance(product, dict): - product = ProductInfo(**product) - - if product.symbol not in symbol_groups: - symbol_groups[product.symbol] = [] - symbol_groups[product.symbol].append(product) - - # Aggrega ogni gruppo - aggregated_products = [] - for symbol, products in symbol_groups.items(): - try: - aggregated = AggregatedProductInfo.from_multiple_sources(products) - # Restituisci come ProductInfo normale - aggregated_products.append( - ProductInfo(**aggregated.dict(exclude={"_metadata", "_source_data"})) - ) - except Exception: - # Se l'aggregazione fallisce, usa il primo disponibile - if products: - aggregated_products.append(products[0]) - - return aggregated_products - - except Exception as e: - # Fallback: usa il comportamento normale - return self._market_apis.get_all_products() - - def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> List[Price]: - """ - Per i dati storici, usa una strategia diversa: - prendi i dati dalla fonte più affidabile o aggrega se possibile. - """ - if not self._aggregation_enabled: - return self._market_apis.get_historical_prices(asset_id, limit) - - # Per dati storici, usa il primo wrapper che funziona - # (l'aggregazione di dati storici è più complessa) - try: - return self.wrappers.try_call( - lambda wrapper: wrapper.get_historical_prices(asset_id, limit) - ) - except Exception as e: - # Fallback: usa il comportamento normale - return self._market_apis.get_historical_prices(asset_id, limit) - - def enable_aggregation(self, enabled: bool = True): - """Abilita o disabilita l'aggregazione""" - self._aggregation_enabled = enabled - - def is_aggregation_enabled(self) -> bool: - """Controlla se l'aggregazione è abilitata""" - return self._aggregation_enabled - - # Metodi proxy per completare l'interfaccia BaseWrapper - @property - def wrappers(self): - """Accesso al wrapper handler per compatibilità""" - return self._market_apis.wrappers - - def get_aggregated_product_with_debug(self, asset_id: str) -> Dict[str, Any]: - """ - Metodo speciale per debugging: restituisce dati aggregati con metadati. - Usato solo per testing e monitoraggio. - """ - try: - raw_results = self.wrappers.try_call_all( - lambda wrapper: wrapper.get_product(asset_id) - ) - - products = [] - for wrapper_class, result in raw_results.items(): - if isinstance(result, ProductInfo): - products.append(result) - elif isinstance(result, dict): - products.append(ProductInfo(**result)) - - if not products: - raise Exception("Nessun dato disponibile") - - aggregated = AggregatedProductInfo.from_multiple_sources(products) - - return { - "product": aggregated.dict(exclude={"_metadata", "_source_data"}), - "debug": aggregated.get_debug_info() - } - - except Exception as e: - return { - "error": str(e), - "debug": {"error": str(e)} - } \ No newline at end of file diff --git a/tests/utils/test_market_data_aggregator.py b/tests/utils/test_market_data_aggregator.py index e8d1a6f..57ef4a1 100644 --- a/tests/utils/test_market_data_aggregator.py +++ b/tests/utils/test_market_data_aggregator.py @@ -1,88 +1,104 @@ import pytest -from app.utils.market_data_aggregator import MarketDataAggregator -from app.utils.aggregated_models import AggregatedProductInfo from app.markets.base import ProductInfo, Price +from app.utils.market_aggregation import aggregate_history_prices, aggregate_product_info + @pytest.mark.aggregator -@pytest.mark.limited @pytest.mark.market -@pytest.mark.api class TestMarketDataAggregator: - - def test_initialization(self): - """Test che il MarketDataAggregator si inizializzi correttamente""" - aggregator = MarketDataAggregator() - assert aggregator is not None - assert aggregator.is_aggregation_enabled() == True - - def test_aggregation_toggle(self): - """Test del toggle dell'aggregazione""" - aggregator = MarketDataAggregator() - - # Disabilita aggregazione - aggregator.enable_aggregation(False) - assert aggregator.is_aggregation_enabled() == False - - # Riabilita aggregazione - aggregator.enable_aggregation(True) - assert aggregator.is_aggregation_enabled() == True - - def test_aggregated_product_info_creation(self): - """Test creazione AggregatedProductInfo da fonti multiple""" - - # Crea dati di esempio - product1 = ProductInfo( - id="BTC-USD", - symbol="BTC-USD", + + def __product(self, symbol: str, price: float, volume: float, status: str, currency: str) -> ProductInfo: + prod = ProductInfo() + prod.id=f"{symbol}-{currency}" + prod.symbol=symbol + prod.price=price + prod.volume_24h=volume + prod.status=status + prod.quote_currency=currency + return prod + + def test_aggregate_product_info(self): + products: dict[str, list[ProductInfo]] = { + "Provider1": [self.__product("BTC", 50000.0, 1000.0, "active", "USD")], + "Provider2": [self.__product("BTC", 50100.0, 1100.0, "active", "USD")], + "Provider3": [self.__product("BTC", 49900.0, 900.0, "inactive", "USD")], + } + + aggregated = aggregate_product_info(products) + print(aggregated) + assert len(aggregated) == 1 + + info = aggregated[0] + assert info is not None + assert info.symbol == "BTC" + assert info.price == pytest.approx(50000.0, rel=1e-3) + + avg_weighted_volume = (50000.0 * 1000.0 + 50100.0 * 1100.0 + 49900.0 * 900.0) / (1000.0 + 1100.0 + 900.0) + assert info.volume_24h == pytest.approx(avg_weighted_volume, rel=1e-3) + assert info.status == "active" + assert info.quote_currency == "USD" + + def test_aggregate_product_info_multiple_symbols(self): + products = { + "Provider1": [ + self.__product("BTC", 50000.0, 1000.0, "active", "USD"), + self.__product("ETH", 4000.0, 2000.0, "active", "USD"), + ], + "Provider2": [ + self.__product("BTC", 50100.0, 1100.0, "active", "USD"), + self.__product("ETH", 4050.0, 2100.0, "active", "USD"), + ], + } + + aggregated = aggregate_product_info(products) + assert len(aggregated) == 2 + + btc_info = next((p for p in aggregated if p.symbol == "BTC"), None) + eth_info = next((p for p in aggregated if p.symbol == "ETH"), None) + + assert btc_info is not None + assert btc_info.price == pytest.approx(50050.0, rel=1e-3) + avg_weighted_volume_btc = (50000.0 * 1000.0 + 50100.0 * 1100.0) / (1000.0 + 1100.0) + assert btc_info.volume_24h == pytest.approx(avg_weighted_volume_btc, rel=1e-3) + assert btc_info.status == "active" + assert btc_info.quote_currency == "USD" + + assert eth_info is not None + assert eth_info.price == pytest.approx(4025.0, rel=1e-3) + avg_weighted_volume_eth = (4000.0 * 2000.0 + 4050.0 * 2100.0) / (2000.0 + 2100.0) + assert eth_info.volume_24h == pytest.approx(avg_weighted_volume_eth, rel=1e-3) + assert eth_info.status == "active" + assert eth_info.quote_currency == "USD" + + def test_aggregate_history_prices(self): + """Test aggregazione di prezzi storici usando aggregate_history_prices""" + + price1 = Price( + timestamp="2024-06-01T00:00:00Z", price=50000.0, - volume_24h=1000.0, - status="active", - quote_currency="USD" + source="exchange1" ) - - product2 = ProductInfo( - id="BTC-USD", - symbol="BTC-USD", + price2 = Price( + timestamp="2024-06-01T00:00:00Z", price=50100.0, - volume_24h=1100.0, - status="active", - quote_currency="USD" + source="exchange2" ) - - # Aggrega i prodotti - aggregated = AggregatedProductInfo.from_multiple_sources([product1, product2]) - - assert aggregated.symbol == "BTC-USD" - assert aggregated.price == pytest.approx(50050.0, rel=1e-3) # media tra 50000 e 50100 - assert aggregated.volume_24h == 50052.38095 # somma dei volumi - assert aggregated.status == "active" # majority vote - assert aggregated.id == "BTC-USD_AGG" # mapping_id con suffisso aggregazione - - def test_confidence_calculation(self): - """Test del calcolo della confidence""" - - product1 = ProductInfo( - id="BTC-USD", - symbol="BTC-USD", - price=50000.0, - volume_24h=1000.0, - status="active", - quote_currency="USD" + price3 = Price( + timestamp="2024-06-01T01:00:00Z", + price=50200.0, + source="exchange1" ) - - product2 = ProductInfo( - id="BTC-USD", - symbol="BTC-USD", - price=50100.0, - volume_24h=1100.0, - status="active", - quote_currency="USD" + price4 = Price( + timestamp="2024-06-01T01:00:00Z", + price=50300.0, + source="exchange2" ) - - aggregated = AggregatedProductInfo.from_multiple_sources([product1, product2]) - - # Verifica che ci siano metadati - assert aggregated._metadata is not None - assert len(aggregated._metadata.sources_used) > 0 - assert aggregated._metadata.aggregation_timestamp != "" - # La confidence può essere 0.0 se ci sono fonti "unknown" + + prices = [price1, price2, price3, price4] + aggregated_prices = aggregate_history_prices(prices) + + assert len(aggregated_prices) == 2 + assert aggregated_prices[0].timestamp == "2024-06-01T00:00:00Z" + assert aggregated_prices[0].price == pytest.approx(50050.0, rel=1e-3) + assert aggregated_prices[1].timestamp == "2024-06-01T01:00:00Z" + assert aggregated_prices[1].price == pytest.approx(50250.0, rel=1e-3)