simplified aggregation logic

This commit is contained in:
2025-11-02 00:01:48 +01:00
parent 3327bf8127
commit 30ddb76df7
3 changed files with 93 additions and 241 deletions

View File

@@ -16,118 +16,51 @@ class ProductInfo(BaseModel):
provider: str = ""
@staticmethod
def aggregate_multi_assets(products: dict[str, list['ProductInfo']]) -> list['ProductInfo']:
def aggregate(products: dict[str, list['ProductInfo']], filter_currency: str="USD") -> list['ProductInfo']:
"""
Aggregates a list of ProductInfo by symbol across different providers.
Aggregates a list of ProductInfo by symbol.
Args:
products (dict[str, list[ProductInfo]]): Map provider -> list of ProductInfo
filter_currency (str): If set, only products with this currency are considered. Defaults to "USD".
Returns:
list[ProductInfo]: List of ProductInfo aggregated by symbol, combining data from all providers
dict[ProductInfo, str]: Map of aggregated ProductInfo by symbol
"""
# Costruzione mappa symbol -> lista di ProductInfo (da tutti i provider)
symbols_infos: dict[str, list[ProductInfo]] = {}
for provider_name, product_list in products.items():
# Costruzione mappa id -> lista di ProductInfo + lista di provider
id_infos: dict[str, tuple[list[ProductInfo], list[str]]] = {}
for provider, product_list in products.items():
for product in product_list:
# Assicuriamo che il provider sia impostato
if not product.provider:
product.provider = provider_name
symbols_infos.setdefault(product.symbol, []).append(product)
if filter_currency and product.currency != filter_currency:
continue
id_value = product.id.upper().replace("-", "") # Normalizzazione id per compatibilità (es. BTC-USD -> btcusd)
product_list, provider_list = id_infos.setdefault(id_value, ([], []) )
product_list.append(product)
provider_list.append(provider)
# Aggregazione per ogni symbol usando aggregate_single_asset
# Aggregazione per ogni id
aggregated_products: list[ProductInfo] = []
for symbol, product_list in symbols_infos.items():
try:
# Usa aggregate_single_asset per aggregare ogni simbolo
aggregated = ProductInfo.aggregate_single_asset(product_list)
# aggregate_single_asset calcola il volume medio, ma per multi_assets
# vogliamo il volume totale. Ricalcoliamo il volume come somma dopo il filtro USD
# Dobbiamo rifare il filtro USD per contare correttamente
currencies = set(p.currency for p in product_list if p.currency)
if len(currencies) > 1:
product_list = [p for p in product_list if p.currency.upper() == "USD"]
# Volume totale
aggregated.volume_24h = sum(p.volume_24h for p in product_list if p.volume_24h > 0)
aggregated_products.append(aggregated)
except ValueError:
# Se aggregate_single_asset fallisce (es. no USD when currencies differ), salta
continue
return aggregated_products
@staticmethod
def aggregate_single_asset(assets: list['ProductInfo'] | dict[str, 'ProductInfo'] | dict[str, list['ProductInfo']]) -> 'ProductInfo':
"""
Aggregates an asset across different exchanges.
Args:
assets: Can be:
- list[ProductInfo]: Direct list of products
- dict[str, ProductInfo]: Map provider -> ProductInfo (from WrapperHandler.try_call_all)
- dict[str, list[ProductInfo]]: Map provider -> list of ProductInfo
Returns:
ProductInfo: Aggregated ProductInfo combining data from all exchanges
"""
for id_value, (product_list, provider_list) in id_infos.items():
product = ProductInfo()
# Defensive handling: normalize to a flat list of ProductInfo
if not assets:
raise ValueError("aggregate_single_asset requires at least one ProductInfo")
product.id = f"{id_value}_AGGREGATED"
product.symbol = next(p.symbol for p in product_list if p.symbol)
product.currency = next(p.currency for p in product_list if p.currency)
# Normalize to a flat list of ProductInfo
if isinstance(assets, dict):
# Check if dict values are ProductInfo or list[ProductInfo]
first_value = next(iter(assets.values())) if assets else None
if first_value and isinstance(first_value, list):
# dict[str, list[ProductInfo]] -> flatten
assets_list = [product for product_list in assets.values() for product in product_list]
volume_sum = sum(p.volume_24h for p in product_list)
product.volume_24h = volume_sum / len(product_list) if product_list else 0.0
if volume_sum > 0:
# Calcolo del prezzo pesato per volume (VWAP - Volume Weighted Average Price)
prices_weighted = sum(p.price * p.volume_24h for p in product_list if p.volume_24h > 0)
product.price = prices_weighted / volume_sum
else:
# dict[str, ProductInfo] -> extract values
assets_list = list(assets.values())
elif isinstance(assets, list) and assets and isinstance(assets[0], list):
# Flatten list[list[ProductInfo]] -> list[ProductInfo]
assets_list = [product for sublist in assets for product in sublist]
else:
# Already a flat list of ProductInfo
assets_list = list(assets)
if not assets_list:
raise ValueError("aggregate_single_asset requires at least one ProductInfo")
# Controllo valuta: se non sono tutte uguali, filtra solo USD
currencies = set(p.currency for p in assets_list if p.currency)
if len(currencies) > 1:
# Valute diverse: filtra solo USD
assets_list = [p for p in assets_list if p.currency.upper() == "USD"]
if not assets_list:
raise ValueError("aggregate_single_asset: no USD products available when currencies differ")
# Aggregazione per ogni Exchange
aggregated: ProductInfo = ProductInfo()
first = assets_list[0]
aggregated.id = f"{first.symbol}_AGGREGATED"
aggregated.symbol = first.symbol
aggregated.currency = next((p.currency for p in assets_list if p.currency), "")
# Raccogliamo i provider che hanno fornito dati
providers = [p.provider for p in assets_list if p.provider]
aggregated.provider = ", ".join(set(providers)) if providers else "AGGREGATED"
# Calcolo del volume medio
volume_sum = sum(p.volume_24h for p in assets_list if p.volume_24h > 0)
aggregated.volume_24h = volume_sum / len(assets_list) if assets_list else 0.0
# Calcolo del prezzo pesato per volume (VWAP - Volume Weighted Average Price)
if volume_sum > 0:
prices_weighted = sum(p.price * p.volume_24h for p in assets_list if p.volume_24h > 0)
aggregated.price = prices_weighted / volume_sum
else:
# Se non c'è volume, facciamo una media semplice dei prezzi
valid_prices = [p.price for p in assets_list if p.price > 0]
aggregated.price = sum(valid_prices) / len(valid_prices) if valid_prices else 0.0
return aggregated
# Se non c'è volume, facciamo una media semplice dei prezzi
valid_prices = [p.price for p in product_list if p.price > 0]
product.price = sum(valid_prices) / len(valid_prices) if valid_prices else 0.0
product.provider = ",".join(provider_list)
aggregated_products.append(product)
return aggregated_products
class Price(BaseModel):

View File

@@ -126,7 +126,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
Exception: If all providers fail to return results.
"""
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
return ProductInfo.aggregate_multi_assets(all_products)
return ProductInfo.aggregate(all_products)
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
"""

View File

@@ -7,14 +7,13 @@ from app.api.core.markets import ProductInfo, Price
@pytest.mark.market
class TestMarketDataAggregator:
def __product(self, symbol: str, price: float, volume: float, currency: str, provider: str = "") -> ProductInfo:
def __product(self, symbol: str, price: float, volume: float, currency: str) -> ProductInfo:
prod = ProductInfo()
prod.id = f"{symbol}-{currency}"
prod.symbol = symbol
prod.price = price
prod.volume_24h = volume
prod.currency = currency
prod.provider = provider
return prod
def __price(self, timestamp_s: int, high: float, low: float, open: float, close: float, volume: float) -> Price:
@@ -29,35 +28,41 @@ class TestMarketDataAggregator:
def test_aggregate_product_info(self):
products: dict[str, list[ProductInfo]] = {
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")],
"Provider2": [self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2")],
"Provider3": [self.__product("BTC", 49900.0, 900.0, "USD", "Provider3")],
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")],
"Provider2": [self.__product("BTC", 50100.0, 1100.0, "USD")],
"Provider3": [self.__product("BTC", 49900.0, 900.0, "USD")],
}
# aggregate_single_asset returns a single ProductInfo, not a list
info = ProductInfo.aggregate_single_asset(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 1
info = aggregated[0]
assert info is not None
assert info.id == "BTCUSD_AGGREGATED"
assert info.symbol == "BTC"
assert info.currency == "USD"
assert "Provider1" in info.provider
assert "Provider2" in info.provider
assert "Provider3" in info.provider
avg_weighted_price = (50000.0 * 1000.0 + 50100.0 * 1100.0 + 49900.0 * 900.0) / (1000.0 + 1100.0 + 900.0)
assert info.price == pytest.approx(avg_weighted_price, rel=1e-3) # type: ignore
assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore
assert info.currency == "USD"
def test_aggregate_product_info_multiple_symbols(self):
products = {
"Provider1": [
self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1"),
self.__product("ETH", 4000.0, 2000.0, "USD", "Provider1"),
self.__product("BTC", 50000.0, 1000.0, "USD"),
self.__product("ETH", 4000.0, 2000.0, "USD"),
],
"Provider2": [
self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2"),
self.__product("ETH", 4050.0, 2100.0, "USD", "Provider2"),
self.__product("BTC", 50100.0, 1100.0, "USD"),
self.__product("ETH", 4050.0, 2100.0, "USD"),
],
}
# aggregate_multi_assets aggregates by symbol across providers
aggregated = ProductInfo.aggregate_multi_assets(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 2
btc_info = next((p for p in aggregated if p.symbol == "BTC"), None)
@@ -66,13 +71,13 @@ class TestMarketDataAggregator:
assert btc_info is not None
avg_weighted_price_btc = (50000.0 * 1000.0 + 50100.0 * 1100.0) / (1000.0 + 1100.0)
assert btc_info.price == pytest.approx(avg_weighted_price_btc, rel=1e-3) # type: ignore
assert btc_info.volume_24h == pytest.approx(2100.0, rel=1e-3) # type: ignore # Total volume (1000 + 1100)
assert btc_info.volume_24h == pytest.approx(1050.0, rel=1e-3) # type: ignore
assert btc_info.currency == "USD"
assert eth_info is not None
avg_weighted_price_eth = (4000.0 * 2000.0 + 4050.0 * 2100.0) / (2000.0 + 2100.0)
assert eth_info.price == pytest.approx(avg_weighted_price_eth, rel=1e-3) # type: ignore
assert eth_info.volume_24h == pytest.approx(4100.0, rel=1e-3) # type: ignore # Total volume (2000 + 2100)
assert eth_info.volume_24h == pytest.approx(2050.0, rel=1e-3) # type: ignore
assert eth_info.currency == "USD"
def test_aggregate_product_info_with_no_data(self):
@@ -80,15 +85,15 @@ class TestMarketDataAggregator:
"Provider1": [],
"Provider2": [],
}
aggregated = ProductInfo.aggregate_multi_assets(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 0
def test_aggregate_product_info_with_partial_data(self):
products: dict[str, list[ProductInfo]] = {
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")],
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")],
"Provider2": [],
}
aggregated = ProductInfo.aggregate_multi_assets(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 1
info = aggregated[0]
assert info.symbol == "BTC"
@@ -129,157 +134,54 @@ class TestMarketDataAggregator:
assert aggregated[1].low == pytest.approx(49850.0, rel=1e-3) # type: ignore
def test_aggregate_product_info_different_currencies(self):
products: dict[str, list[ProductInfo]] = {
"Provider1": [self.__product("BTC", 100000.0, 1000.0, "USD", "Provider1")],
"Provider2": [self.__product("BTC", 70000.0, 800.0, "EUR", "Provider2")],
products = {
"Provider1": [self.__product("BTC", 100000.0, 1000.0, "USD")],
"Provider2": [self.__product("BTC", 70000.0, 800.0, "EUR")],
}
aggregated = ProductInfo.aggregate_multi_assets(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 1
info = aggregated[0]
assert info is not None
assert info.id == "BTC_AGGREGATED"
assert info.id == "BTCUSD_AGGREGATED"
assert info.symbol == "BTC"
assert info.currency == "USD" # Only USD products are kept
# When currencies differ, only USD is aggregated (only Provider1 in this case)
assert info.price == pytest.approx(100000.0, rel=1e-3) # type: ignore
assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore # Only USD volume
# ===== Tests for aggregate_single_asset =====
def test_aggregate_single_asset_from_dict(self):
"""Test aggregate_single_asset with dict input (simulating WrapperHandler.try_call_all)"""
products_dict: dict[str, ProductInfo] = {
"BinanceWrapper": self.__product("BTC", 50000.0, 1000.0, "USD", "Binance"),
"YFinanceWrapper": self.__product("BTC", 50100.0, 1100.0, "USD", "YFinance"),
"CoinBaseWrapper": self.__product("BTC", 49900.0, 900.0, "USD", "Coinbase"),
}
info = ProductInfo.aggregate_single_asset(products_dict)
assert info is not None
assert info.symbol == "BTC"
assert info.id == "BTC_AGGREGATED"
assert "Binance" in info.provider
assert "YFinance" in info.provider
assert "Coinbase" in info.provider
# VWAP calculation
expected_price = (50000.0 * 1000.0 + 50100.0 * 1100.0 + 49900.0 * 900.0) / (1000.0 + 1100.0 + 900.0)
assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore
assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore
assert info.currency == "USD"
def test_aggregate_single_asset_from_list(self):
"""Test aggregate_single_asset with list input"""
products_list = [
self.__product("ETH", 4000.0, 2000.0, "USD", "Binance"),
self.__product("ETH", 4050.0, 2100.0, "USD", "Coinbase"),
]
info = ProductInfo.aggregate_single_asset(products_list)
assert info is not None
assert info.symbol == "ETH"
assert info.id == "ETH_AGGREGATED"
assert "Binance" in info.provider
assert "Coinbase" in info.provider
expected_price = (4000.0 * 2000.0 + 4050.0 * 2100.0) / (2000.0 + 2100.0)
assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore
def test_aggregate_single_asset_no_volume_fallback(self):
"""Test fallback to simple average when no volume data"""
products_list = [
self.__product("SOL", 100.0, 0.0, "USD", "Provider1"),
self.__product("SOL", 110.0, 0.0, "USD", "Provider2"),
self.__product("SOL", 90.0, 0.0, "USD", "Provider3"),
]
info = ProductInfo.aggregate_single_asset(products_list)
assert info is not None
assert info.symbol == "SOL"
# Simple average: (100 + 110 + 90) / 3 = 100
assert info.price == pytest.approx(100.0, rel=1e-3) # type: ignore
assert info.volume_24h == pytest.approx(0.0, rel=1e-3) # type: ignore
def test_aggregate_single_asset_empty_raises(self):
"""Test that empty input raises ValueError"""
with pytest.raises(ValueError, match="requires at least one ProductInfo"):
ProductInfo.aggregate_single_asset([])
with pytest.raises(ValueError, match="requires at least one ProductInfo"):
ProductInfo.aggregate_single_asset({})
def test_aggregate_single_asset_dict_with_lists(self):
"""Test aggregate_single_asset with dict[str, list[ProductInfo]] (flattens correctly)"""
products_dict_lists: dict[str, list[ProductInfo]] = {
"Provider1": [self.__product("ADA", 0.50, 1000.0, "USD", "Provider1")],
"Provider2": [self.__product("ADA", 0.52, 1200.0, "USD", "Provider2")],
}
info = ProductInfo.aggregate_single_asset(products_dict_lists)
assert info is not None
assert info.symbol == "ADA"
expected_price = (0.50 * 1000.0 + 0.52 * 1200.0) / (1000.0 + 1200.0)
assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore
def test_aggregate_single_asset_missing_currency(self):
"""Test that aggregate_single_asset handles missing currency gracefully"""
products_list = [
self.__product("DOT", 10.0, 500.0, "", "Provider1"),
self.__product("DOT", 10.5, 600.0, "USD", "Provider2"),
]
info = ProductInfo.aggregate_single_asset(products_list)
assert info is not None
assert info.symbol == "DOT"
assert info.currency == "USD" # Should pick the first non-empty currency
def test_aggregate_single_asset_single_provider(self):
"""Test aggregate_single_asset with only one provider (edge case)"""
products = {
"BinanceWrapper": self.__product("MATIC", 0.80, 5000.0, "USD", "Binance"),
}
info = ProductInfo.aggregate_single_asset(products)
assert info is not None
assert info.symbol == "MATIC"
assert info.price == pytest.approx(0.80, rel=1e-3) # type: ignore
assert info.volume_24h == pytest.approx(5000.0, rel=1e-3) # type: ignore
assert info.provider == "Binance"
# ===== Tests for aggregate_multi_assets with edge cases =====
def test_aggregate_multi_assets_empty_providers(self):
"""Test aggregate_multi_assets with some providers returning empty lists"""
products = {
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")],
def test_aggregate_product_info_empty_providers(self):
"""Test aggregate_product_info with some providers returning empty lists"""
products: dict[str, list[ProductInfo]] = {
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")],
"Provider2": [],
"Provider3": [self.__product("BTC", 50100.0, 1100.0, "USD", "Provider3")],
"Provider3": [self.__product("BTC", 50100.0, 1100.0, "USD")],
}
aggregated = ProductInfo.aggregate_multi_assets(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 1
info = aggregated[0]
assert info.symbol == "BTC"
assert "Provider1" in info.provider
assert "Provider2" not in info.provider
assert "Provider3" in info.provider
def test_aggregate_multi_assets_mixed_symbols(self):
"""Test that aggregate_multi_assets correctly separates different symbols"""
def test_aggregate_product_info_mixed_symbols(self):
"""Test that aggregate_product_info correctly separates different symbols"""
products = {
"Provider1": [
self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1"),
self.__product("ETH", 4000.0, 2000.0, "USD", "Provider1"),
self.__product("SOL", 100.0, 500.0, "USD", "Provider1"),
self.__product("BTC", 50000.0, 1000.0, "USD"),
self.__product("ETH", 4000.0, 2000.0, "USD"),
self.__product("SOL", 100.0, 500.0, "USD"),
],
"Provider2": [
self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2"),
self.__product("ETH", 4050.0, 2100.0, "USD", "Provider2"),
self.__product("BTC", 50100.0, 1100.0, "USD"),
self.__product("ETH", 4050.0, 2100.0, "USD"),
],
}
aggregated = ProductInfo.aggregate_multi_assets(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 3
symbols = {p.symbol for p in aggregated}
@@ -290,3 +192,20 @@ class TestMarketDataAggregator:
sol = next(p for p in aggregated if p.symbol == "SOL")
assert sol.provider == "Provider1" # Only one provider
def test_aggregate_product_info_zero_volume(self):
"""Test aggregazione quando tutti i prodotti hanno volume zero"""
products = {
"Provider1": [self.__product("BTC", 50000.0, 0.0, "USD")],
"Provider2": [self.__product("BTC", 50100.0, 0.0, "USD")],
"Provider3": [self.__product("BTC", 49900.0, 0.0, "USD")],
}
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 1
info = aggregated[0]
# Con volume zero, dovrebbe usare la media semplice dei prezzi
expected_price = (50000.0 + 50100.0 + 49900.0) / 3
assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore
assert info.volume_24h == 0.0