diff --git a/src/app/api/core/markets.py b/src/app/api/core/markets.py index e5fe647..a1b1b79 100644 --- a/src/app/api/core/markets.py +++ b/src/app/api/core/markets.py @@ -16,118 +16,51 @@ class ProductInfo(BaseModel): provider: str = "" @staticmethod - def aggregate_multi_assets(products: dict[str, list['ProductInfo']]) -> list['ProductInfo']: + def aggregate(products: dict[str, list['ProductInfo']], filter_currency: str="USD") -> list['ProductInfo']: """ - Aggregates a list of ProductInfo by symbol across different providers. + Aggregates a list of ProductInfo by symbol. Args: products (dict[str, list[ProductInfo]]): Map provider -> list of ProductInfo + filter_currency (str): If set, only products with this currency are considered. Defaults to "USD". Returns: - list[ProductInfo]: List of ProductInfo aggregated by symbol, combining data from all providers + dict[ProductInfo, str]: Map of aggregated ProductInfo by symbol """ - # Costruzione mappa symbol -> lista di ProductInfo (da tutti i provider) - symbols_infos: dict[str, list[ProductInfo]] = {} - for provider_name, product_list in products.items(): + # Costruzione mappa id -> lista di ProductInfo + lista di provider + id_infos: dict[str, tuple[list[ProductInfo], list[str]]] = {} + for provider, product_list in products.items(): for product in product_list: - # Assicuriamo che il provider sia impostato - if not product.provider: - product.provider = provider_name - symbols_infos.setdefault(product.symbol, []).append(product) + if filter_currency and product.currency != filter_currency: + continue + id_value = product.id.upper().replace("-", "") # Normalizzazione id per compatibilità (es. BTC-USD -> btcusd) + product_list, provider_list = id_infos.setdefault(id_value, ([], []) ) + product_list.append(product) + provider_list.append(provider) - # Aggregazione per ogni symbol usando aggregate_single_asset + # Aggregazione per ogni id aggregated_products: list[ProductInfo] = [] - for symbol, product_list in symbols_infos.items(): - try: - # Usa aggregate_single_asset per aggregare ogni simbolo - aggregated = ProductInfo.aggregate_single_asset(product_list) - - # aggregate_single_asset calcola il volume medio, ma per multi_assets - # vogliamo il volume totale. Ricalcoliamo il volume come somma dopo il filtro USD - # Dobbiamo rifare il filtro USD per contare correttamente - currencies = set(p.currency for p in product_list if p.currency) - if len(currencies) > 1: - product_list = [p for p in product_list if p.currency.upper() == "USD"] - - # Volume totale - aggregated.volume_24h = sum(p.volume_24h for p in product_list if p.volume_24h > 0) - - aggregated_products.append(aggregated) - except ValueError: - # Se aggregate_single_asset fallisce (es. no USD when currencies differ), salta - continue - - return aggregated_products - - @staticmethod - def aggregate_single_asset(assets: list['ProductInfo'] | dict[str, 'ProductInfo'] | dict[str, list['ProductInfo']]) -> 'ProductInfo': - """ - Aggregates an asset across different exchanges. - Args: - assets: Can be: - - list[ProductInfo]: Direct list of products - - dict[str, ProductInfo]: Map provider -> ProductInfo (from WrapperHandler.try_call_all) - - dict[str, list[ProductInfo]]: Map provider -> list of ProductInfo - Returns: - ProductInfo: Aggregated ProductInfo combining data from all exchanges - """ + for id_value, (product_list, provider_list) in id_infos.items(): + product = ProductInfo() - # Defensive handling: normalize to a flat list of ProductInfo - if not assets: - raise ValueError("aggregate_single_asset requires at least one ProductInfo") + product.id = f"{id_value}_AGGREGATED" + product.symbol = next(p.symbol for p in product_list if p.symbol) + product.currency = next(p.currency for p in product_list if p.currency) - # Normalize to a flat list of ProductInfo - if isinstance(assets, dict): - # Check if dict values are ProductInfo or list[ProductInfo] - first_value = next(iter(assets.values())) if assets else None - if first_value and isinstance(first_value, list): - # dict[str, list[ProductInfo]] -> flatten - assets_list = [product for product_list in assets.values() for product in product_list] + volume_sum = sum(p.volume_24h for p in product_list) + product.volume_24h = volume_sum / len(product_list) if product_list else 0.0 + + if volume_sum > 0: + # Calcolo del prezzo pesato per volume (VWAP - Volume Weighted Average Price) + prices_weighted = sum(p.price * p.volume_24h for p in product_list if p.volume_24h > 0) + product.price = prices_weighted / volume_sum else: - # dict[str, ProductInfo] -> extract values - assets_list = list(assets.values()) - elif isinstance(assets, list) and assets and isinstance(assets[0], list): - # Flatten list[list[ProductInfo]] -> list[ProductInfo] - assets_list = [product for sublist in assets for product in sublist] - else: - # Already a flat list of ProductInfo - assets_list = list(assets) - - if not assets_list: - raise ValueError("aggregate_single_asset requires at least one ProductInfo") - - # Controllo valuta: se non sono tutte uguali, filtra solo USD - currencies = set(p.currency for p in assets_list if p.currency) - if len(currencies) > 1: - # Valute diverse: filtra solo USD - assets_list = [p for p in assets_list if p.currency.upper() == "USD"] - if not assets_list: - raise ValueError("aggregate_single_asset: no USD products available when currencies differ") - - # Aggregazione per ogni Exchange - aggregated: ProductInfo = ProductInfo() - first = assets_list[0] - aggregated.id = f"{first.symbol}_AGGREGATED" - aggregated.symbol = first.symbol - aggregated.currency = next((p.currency for p in assets_list if p.currency), "") - - # Raccogliamo i provider che hanno fornito dati - providers = [p.provider for p in assets_list if p.provider] - aggregated.provider = ", ".join(set(providers)) if providers else "AGGREGATED" - - # Calcolo del volume medio - volume_sum = sum(p.volume_24h for p in assets_list if p.volume_24h > 0) - aggregated.volume_24h = volume_sum / len(assets_list) if assets_list else 0.0 - # Calcolo del prezzo pesato per volume (VWAP - Volume Weighted Average Price) - if volume_sum > 0: - prices_weighted = sum(p.price * p.volume_24h for p in assets_list if p.volume_24h > 0) - aggregated.price = prices_weighted / volume_sum - else: - # Se non c'è volume, facciamo una media semplice dei prezzi - valid_prices = [p.price for p in assets_list if p.price > 0] - aggregated.price = sum(valid_prices) / len(valid_prices) if valid_prices else 0.0 - - return aggregated + # Se non c'è volume, facciamo una media semplice dei prezzi + valid_prices = [p.price for p in product_list if p.price > 0] + product.price = sum(valid_prices) / len(valid_prices) if valid_prices else 0.0 + product.provider = ",".join(provider_list) + aggregated_products.append(product) + return aggregated_products class Price(BaseModel): diff --git a/src/app/api/tools/market_tool.py b/src/app/api/tools/market_tool.py index f8dbb68..05bec46 100644 --- a/src/app/api/tools/market_tool.py +++ b/src/app/api/tools/market_tool.py @@ -126,7 +126,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): Exception: If all providers fail to return results. """ all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids)) - return ProductInfo.aggregate_multi_assets(all_products) + return ProductInfo.aggregate(all_products) def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]: """ diff --git a/tests/utils/test_market_aggregator.py b/tests/utils/test_market_aggregator.py index 93f96fa..087711a 100644 --- a/tests/utils/test_market_aggregator.py +++ b/tests/utils/test_market_aggregator.py @@ -7,14 +7,13 @@ from app.api.core.markets import ProductInfo, Price @pytest.mark.market class TestMarketDataAggregator: - def __product(self, symbol: str, price: float, volume: float, currency: str, provider: str = "") -> ProductInfo: + def __product(self, symbol: str, price: float, volume: float, currency: str) -> ProductInfo: prod = ProductInfo() prod.id = f"{symbol}-{currency}" prod.symbol = symbol prod.price = price prod.volume_24h = volume prod.currency = currency - prod.provider = provider return prod def __price(self, timestamp_s: int, high: float, low: float, open: float, close: float, volume: float) -> Price: @@ -29,35 +28,41 @@ class TestMarketDataAggregator: def test_aggregate_product_info(self): products: dict[str, list[ProductInfo]] = { - "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")], - "Provider2": [self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2")], - "Provider3": [self.__product("BTC", 49900.0, 900.0, "USD", "Provider3")], + "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")], + "Provider2": [self.__product("BTC", 50100.0, 1100.0, "USD")], + "Provider3": [self.__product("BTC", 49900.0, 900.0, "USD")], } # aggregate_single_asset returns a single ProductInfo, not a list - info = ProductInfo.aggregate_single_asset(products) + aggregated = ProductInfo.aggregate(products) + assert len(aggregated) == 1 + + info = aggregated[0] assert info is not None + assert info.id == "BTCUSD_AGGREGATED" assert info.symbol == "BTC" + assert info.currency == "USD" + assert "Provider1" in info.provider + assert "Provider2" in info.provider + assert "Provider3" in info.provider avg_weighted_price = (50000.0 * 1000.0 + 50100.0 * 1100.0 + 49900.0 * 900.0) / (1000.0 + 1100.0 + 900.0) assert info.price == pytest.approx(avg_weighted_price, rel=1e-3) # type: ignore assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore - assert info.currency == "USD" def test_aggregate_product_info_multiple_symbols(self): products = { "Provider1": [ - self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1"), - self.__product("ETH", 4000.0, 2000.0, "USD", "Provider1"), + self.__product("BTC", 50000.0, 1000.0, "USD"), + self.__product("ETH", 4000.0, 2000.0, "USD"), ], "Provider2": [ - self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2"), - self.__product("ETH", 4050.0, 2100.0, "USD", "Provider2"), + self.__product("BTC", 50100.0, 1100.0, "USD"), + self.__product("ETH", 4050.0, 2100.0, "USD"), ], } - # aggregate_multi_assets aggregates by symbol across providers - aggregated = ProductInfo.aggregate_multi_assets(products) + aggregated = ProductInfo.aggregate(products) assert len(aggregated) == 2 btc_info = next((p for p in aggregated if p.symbol == "BTC"), None) @@ -66,13 +71,13 @@ class TestMarketDataAggregator: assert btc_info is not None avg_weighted_price_btc = (50000.0 * 1000.0 + 50100.0 * 1100.0) / (1000.0 + 1100.0) assert btc_info.price == pytest.approx(avg_weighted_price_btc, rel=1e-3) # type: ignore - assert btc_info.volume_24h == pytest.approx(2100.0, rel=1e-3) # type: ignore # Total volume (1000 + 1100) + assert btc_info.volume_24h == pytest.approx(1050.0, rel=1e-3) # type: ignore assert btc_info.currency == "USD" assert eth_info is not None avg_weighted_price_eth = (4000.0 * 2000.0 + 4050.0 * 2100.0) / (2000.0 + 2100.0) assert eth_info.price == pytest.approx(avg_weighted_price_eth, rel=1e-3) # type: ignore - assert eth_info.volume_24h == pytest.approx(4100.0, rel=1e-3) # type: ignore # Total volume (2000 + 2100) + assert eth_info.volume_24h == pytest.approx(2050.0, rel=1e-3) # type: ignore assert eth_info.currency == "USD" def test_aggregate_product_info_with_no_data(self): @@ -80,15 +85,15 @@ class TestMarketDataAggregator: "Provider1": [], "Provider2": [], } - aggregated = ProductInfo.aggregate_multi_assets(products) + aggregated = ProductInfo.aggregate(products) assert len(aggregated) == 0 def test_aggregate_product_info_with_partial_data(self): products: dict[str, list[ProductInfo]] = { - "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")], + "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")], "Provider2": [], } - aggregated = ProductInfo.aggregate_multi_assets(products) + aggregated = ProductInfo.aggregate(products) assert len(aggregated) == 1 info = aggregated[0] assert info.symbol == "BTC" @@ -129,157 +134,54 @@ class TestMarketDataAggregator: assert aggregated[1].low == pytest.approx(49850.0, rel=1e-3) # type: ignore def test_aggregate_product_info_different_currencies(self): - products: dict[str, list[ProductInfo]] = { - "Provider1": [self.__product("BTC", 100000.0, 1000.0, "USD", "Provider1")], - "Provider2": [self.__product("BTC", 70000.0, 800.0, "EUR", "Provider2")], + products = { + "Provider1": [self.__product("BTC", 100000.0, 1000.0, "USD")], + "Provider2": [self.__product("BTC", 70000.0, 800.0, "EUR")], } - aggregated = ProductInfo.aggregate_multi_assets(products) + aggregated = ProductInfo.aggregate(products) assert len(aggregated) == 1 info = aggregated[0] assert info is not None - assert info.id == "BTC_AGGREGATED" + assert info.id == "BTCUSD_AGGREGATED" assert info.symbol == "BTC" assert info.currency == "USD" # Only USD products are kept # When currencies differ, only USD is aggregated (only Provider1 in this case) assert info.price == pytest.approx(100000.0, rel=1e-3) # type: ignore assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore # Only USD volume - # ===== Tests for aggregate_single_asset ===== - - def test_aggregate_single_asset_from_dict(self): - """Test aggregate_single_asset with dict input (simulating WrapperHandler.try_call_all)""" - products_dict: dict[str, ProductInfo] = { - "BinanceWrapper": self.__product("BTC", 50000.0, 1000.0, "USD", "Binance"), - "YFinanceWrapper": self.__product("BTC", 50100.0, 1100.0, "USD", "YFinance"), - "CoinBaseWrapper": self.__product("BTC", 49900.0, 900.0, "USD", "Coinbase"), - } - - info = ProductInfo.aggregate_single_asset(products_dict) - assert info is not None - assert info.symbol == "BTC" - assert info.id == "BTC_AGGREGATED" - assert "Binance" in info.provider - assert "YFinance" in info.provider - assert "Coinbase" in info.provider - - # VWAP calculation - expected_price = (50000.0 * 1000.0 + 50100.0 * 1100.0 + 49900.0 * 900.0) / (1000.0 + 1100.0 + 900.0) - assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore - assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore - assert info.currency == "USD" - - def test_aggregate_single_asset_from_list(self): - """Test aggregate_single_asset with list input""" - products_list = [ - self.__product("ETH", 4000.0, 2000.0, "USD", "Binance"), - self.__product("ETH", 4050.0, 2100.0, "USD", "Coinbase"), - ] - - info = ProductInfo.aggregate_single_asset(products_list) - assert info is not None - assert info.symbol == "ETH" - assert info.id == "ETH_AGGREGATED" - assert "Binance" in info.provider - assert "Coinbase" in info.provider - - expected_price = (4000.0 * 2000.0 + 4050.0 * 2100.0) / (2000.0 + 2100.0) - assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore - - def test_aggregate_single_asset_no_volume_fallback(self): - """Test fallback to simple average when no volume data""" - products_list = [ - self.__product("SOL", 100.0, 0.0, "USD", "Provider1"), - self.__product("SOL", 110.0, 0.0, "USD", "Provider2"), - self.__product("SOL", 90.0, 0.0, "USD", "Provider3"), - ] - - info = ProductInfo.aggregate_single_asset(products_list) - assert info is not None - assert info.symbol == "SOL" - # Simple average: (100 + 110 + 90) / 3 = 100 - assert info.price == pytest.approx(100.0, rel=1e-3) # type: ignore - assert info.volume_24h == pytest.approx(0.0, rel=1e-3) # type: ignore - - def test_aggregate_single_asset_empty_raises(self): - """Test that empty input raises ValueError""" - with pytest.raises(ValueError, match="requires at least one ProductInfo"): - ProductInfo.aggregate_single_asset([]) - - with pytest.raises(ValueError, match="requires at least one ProductInfo"): - ProductInfo.aggregate_single_asset({}) - - def test_aggregate_single_asset_dict_with_lists(self): - """Test aggregate_single_asset with dict[str, list[ProductInfo]] (flattens correctly)""" - products_dict_lists: dict[str, list[ProductInfo]] = { - "Provider1": [self.__product("ADA", 0.50, 1000.0, "USD", "Provider1")], - "Provider2": [self.__product("ADA", 0.52, 1200.0, "USD", "Provider2")], - } - - info = ProductInfo.aggregate_single_asset(products_dict_lists) - assert info is not None - assert info.symbol == "ADA" - expected_price = (0.50 * 1000.0 + 0.52 * 1200.0) / (1000.0 + 1200.0) - assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore - - def test_aggregate_single_asset_missing_currency(self): - """Test that aggregate_single_asset handles missing currency gracefully""" - products_list = [ - self.__product("DOT", 10.0, 500.0, "", "Provider1"), - self.__product("DOT", 10.5, 600.0, "USD", "Provider2"), - ] - - info = ProductInfo.aggregate_single_asset(products_list) - assert info is not None - assert info.symbol == "DOT" - assert info.currency == "USD" # Should pick the first non-empty currency - - def test_aggregate_single_asset_single_provider(self): - """Test aggregate_single_asset with only one provider (edge case)""" - products = { - "BinanceWrapper": self.__product("MATIC", 0.80, 5000.0, "USD", "Binance"), - } - - info = ProductInfo.aggregate_single_asset(products) - assert info is not None - assert info.symbol == "MATIC" - assert info.price == pytest.approx(0.80, rel=1e-3) # type: ignore - assert info.volume_24h == pytest.approx(5000.0, rel=1e-3) # type: ignore - assert info.provider == "Binance" - - # ===== Tests for aggregate_multi_assets with edge cases ===== - - def test_aggregate_multi_assets_empty_providers(self): - """Test aggregate_multi_assets with some providers returning empty lists""" - products = { - "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")], + def test_aggregate_product_info_empty_providers(self): + """Test aggregate_product_info with some providers returning empty lists""" + products: dict[str, list[ProductInfo]] = { + "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")], "Provider2": [], - "Provider3": [self.__product("BTC", 50100.0, 1100.0, "USD", "Provider3")], + "Provider3": [self.__product("BTC", 50100.0, 1100.0, "USD")], } - aggregated = ProductInfo.aggregate_multi_assets(products) + aggregated = ProductInfo.aggregate(products) assert len(aggregated) == 1 info = aggregated[0] assert info.symbol == "BTC" assert "Provider1" in info.provider + assert "Provider2" not in info.provider assert "Provider3" in info.provider - def test_aggregate_multi_assets_mixed_symbols(self): - """Test that aggregate_multi_assets correctly separates different symbols""" + def test_aggregate_product_info_mixed_symbols(self): + """Test that aggregate_product_info correctly separates different symbols""" products = { "Provider1": [ - self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1"), - self.__product("ETH", 4000.0, 2000.0, "USD", "Provider1"), - self.__product("SOL", 100.0, 500.0, "USD", "Provider1"), + self.__product("BTC", 50000.0, 1000.0, "USD"), + self.__product("ETH", 4000.0, 2000.0, "USD"), + self.__product("SOL", 100.0, 500.0, "USD"), ], "Provider2": [ - self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2"), - self.__product("ETH", 4050.0, 2100.0, "USD", "Provider2"), + self.__product("BTC", 50100.0, 1100.0, "USD"), + self.__product("ETH", 4050.0, 2100.0, "USD"), ], } - aggregated = ProductInfo.aggregate_multi_assets(products) + aggregated = ProductInfo.aggregate(products) assert len(aggregated) == 3 symbols = {p.symbol for p in aggregated} @@ -290,3 +192,20 @@ class TestMarketDataAggregator: sol = next(p for p in aggregated if p.symbol == "SOL") assert sol.provider == "Provider1" # Only one provider + + def test_aggregate_product_info_zero_volume(self): + """Test aggregazione quando tutti i prodotti hanno volume zero""" + products = { + "Provider1": [self.__product("BTC", 50000.0, 0.0, "USD")], + "Provider2": [self.__product("BTC", 50100.0, 0.0, "USD")], + "Provider3": [self.__product("BTC", 49900.0, 0.0, "USD")], + } + + aggregated = ProductInfo.aggregate(products) + assert len(aggregated) == 1 + + info = aggregated[0] + # Con volume zero, dovrebbe usare la media semplice dei prezzi + expected_price = (50000.0 + 50100.0 + 49900.0) / 3 + assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore + assert info.volume_24h == 0.0