WIP: Fix Aggregazione market Product #67

Draft
Simo93-rgb wants to merge 13 commits from 62-aggregazione-market-product-non-corretta into main
8 changed files with 317 additions and 42 deletions
Showing only changes of commit c07938618a - Show all commits

View File

@@ -17,11 +17,52 @@
- **Interval**: Determine granularity (hourly, daily, weekly) from context
Simo93-rgb commented 2025-10-30 15:41:21 +01:00 (Migrated from github.com)
Review

Questo sarebbe dovuto andare nell'issue opportuno ma può anche stare qui. Verrà perfezionato nel branch 64

Questo sarebbe dovuto andare nell'issue opportuno ma può anche stare qui. Verrà perfezionato nel branch 64
- **Defaults**: If not specified, use current price or last 24h data
**TOOL DESCRIPTIONS:**
- get_product: Fetches current price for a specific cryptocurrency from a single source.
- get_historical_price: Retrieves historical price data for a cryptocurrency over a specified time range from a single source.
- get_products_aggregated: Fetches current prices by aggregating data from multiple sources. Use this if user requests more specific or reliable data.
- get_historical_prices_aggregated: Retrieves historical price data by aggregating multiple sources. Use this if user requests more specific or reliable data.
**AVAILABLE TOOLS (6 total):**
**Single-Source Tools (FAST - use first available provider):**
1. `get_product(asset_id: str)` → ProductInfo
- Fetches current price for ONE asset from the first available provider
- Example: `get_product("BTC")` → returns BTC price from Binance/YFinance/Coinbase/CryptoCompare
- Use for: Quick single asset lookup
2. `get_products(asset_ids: list[str])` → list[ProductInfo]
- Fetches current prices for MULTIPLE assets from the first available provider
- Example: `get_products(["BTC", "ETH", "SOL"])` → returns 3 prices from same provider
- Use for: Quick multi-asset lookup
3. `get_historical_prices(asset_id: str, limit: int = 100)` → list[Price]
- Fetches historical price data for ONE asset from the first available provider
- Example: `get_historical_prices("BTC", limit=30)` → last 30 price points
- Use for: Quick historical data lookup
**Multi-Source Aggregated Tools (COMPREHENSIVE - queries ALL providers and merges results):**
4. `get_product_aggregated(asset_id: str)` → ProductInfo
- Queries ALL providers (Binance, YFinance, Coinbase, CryptoCompare) for ONE asset and aggregates
- Returns most reliable price using volume-weighted average (VWAP)
- Example: `get_product_aggregated("BTC")` → BTC price from all 4 providers, merged
- Use for: When user requests "reliable", "accurate", or "comprehensive" data for ONE asset
- Warning: Uses more API calls (4x)
5. `get_products_aggregated(asset_ids: list[str])` → list[ProductInfo]
- Queries ALL providers for MULTIPLE assets and aggregates results
- Returns more reliable data with multiple sources and confidence scores
- Example: `get_products_aggregated(["BTC", "ETH"])` → prices from all 4 providers, merged
- Use for: When user requests "comprehensive" or "detailed" data for MULTIPLE assets
- Warning: Uses more API calls (4x per asset)
6. `get_historical_prices_aggregated(asset_id: str = "BTC", limit: int = 100)` → list[Price]
- Queries ALL providers for historical data and aggregates results
- Returns more complete historical dataset with multiple sources
- Example: `get_historical_prices_aggregated("BTC", limit=50)` → 50 points from each provider
- Use for: When user requests "comprehensive" or "detailed" historical analysis
- Warning: Uses more API calls (4x)
**TOOL SELECTION STRATEGY:**
- **Simple queries** ("What's BTC price?") → Use `get_product()` (tool #1)
- **Reliable single asset** ("Get me the most accurate BTC price") → Use `get_product_aggregated()` (tool #4)
- **Multiple assets quick** ("Compare BTC, ETH prices") → Use `get_products()` (tool #2)
- **Multiple assets comprehensive** ("Detailed analysis of BTC and ETH") → Use `get_products_aggregated()` (tool #5)
- **Historical data** → Specify appropriate `limit` parameter (7 for week, 30 for month, etc.)
**OUTPUT FORMAT JSON:**

View File

@@ -14,12 +14,9 @@ class ProductInfo(BaseModel):
volume_24h: float = 0.0
currency: str = ""
provider: str = ""
Simo93-rgb commented 2025-10-30 20:34:42 +01:00 (Migrated from github.com)
Review

Mi serviva nell'aggregate. Ma a pensarci ora quel dato era come chiave del dizionario. Vorrei giustificartelo ma adesso mi sento perso. Sono stanco e non vorrei dirti altre stronzate. Scusa, mi odierai...

Mi serviva nell'aggregate. Ma a pensarci ora quel dato era come chiave del dizionario. Vorrei giustificartelo ma adesso mi sento perso. Sono stanco e non vorrei dirti altre stronzate. Scusa, mi odierai...
Berack96 commented 2025-10-30 23:24:14 +01:00 (Migrated from github.com)
Review

Guarda che non odio nessuno, sono solo più pistino di Copilot.
Io chiedo solo le modifiche che sono state effettuate solo perchè le avrei fatte in un altro modo o non mi aspetto qualche cosa che è stata fatta.

Guarda che non odio nessuno, sono solo più pistino di Copilot. Io chiedo solo le modifiche che sono state effettuate solo perchè le avrei fatte in un altro modo o non mi aspetto qualche cosa che è stata fatta.
def init(self, provider:str):
self.provider = provider
@staticmethod
def aggregate(products: dict[str, list['ProductInfo']]) -> list['ProductInfo']:
def aggregate_multi_assets(products: dict[str, list['ProductInfo']]) -> list['ProductInfo']:
"""
Aggregates a list of ProductInfo by symbol across different providers.
Args:
@@ -65,6 +62,68 @@ class ProductInfo(BaseModel):
aggregated_products.append(product)
return aggregated_products
@staticmethod
def aggregate_single_asset(assets: list['ProductInfo'] | dict[str, 'ProductInfo'] | dict[str, list['ProductInfo']]) -> 'ProductInfo':
"""
Aggregates an asset across different exchanges.
Args:
assets: Can be:
- list[ProductInfo]: Direct list of products
- dict[str, ProductInfo]: Map provider -> ProductInfo (from WrapperHandler.try_call_all)
- dict[str, list[ProductInfo]]: Map provider -> list of ProductInfo
Returns:
ProductInfo: Aggregated ProductInfo combining data from all exchanges
"""
# Defensive handling: normalize to a flat list of ProductInfo
if not assets:
raise ValueError("aggregate_single_asset requires at least one ProductInfo")
# Normalize to a flat list of ProductInfo
if isinstance(assets, dict):
# Check if dict values are ProductInfo or list[ProductInfo]
first_value = next(iter(assets.values())) if assets else None
if first_value and isinstance(first_value, list):
# dict[str, list[ProductInfo]] -> flatten
assets_list = [product for product_list in assets.values() for product in product_list]
else:
# dict[str, ProductInfo] -> extract values
assets_list = list(assets.values())
elif isinstance(assets, list) and assets and isinstance(assets[0], list):
# Flatten list[list[ProductInfo]] -> list[ProductInfo]
assets_list = [product for sublist in assets for product in sublist]
else:
# Already a flat list of ProductInfo
assets_list = list(assets)
if not assets_list:
raise ValueError("aggregate_single_asset requires at least one ProductInfo")
# Aggregazione per ogni Exchange
aggregated: ProductInfo = ProductInfo()
first = assets_list[0]
aggregated.id = f"{first.symbol}_AGGREGATED"
aggregated.symbol = first.symbol
aggregated.currency = next((p.currency for p in assets_list if p.currency), "")
# Raccogliamo i provider che hanno fornito dati
providers = [p.provider for p in assets_list if p.provider]
aggregated.provider = ", ".join(set(providers)) if providers else "AGGREGATED"
# Calcolo del volume medio
volume_sum = sum(p.volume_24h for p in assets_list if p.volume_24h > 0)
aggregated.volume_24h = volume_sum / len(assets_list) if assets_list else 0.0
# Calcolo del prezzo pesato per volume (VWAP - Volume Weighted Average Price)
if volume_sum > 0:
prices_weighted = sum(p.price * p.volume_24h for p in assets_list if p.volume_24h > 0)
aggregated.price = prices_weighted / volume_sum
else:
# Se non c'è volume, facciamo una media semplice dei prezzi
valid_prices = [p.price for p in assets_list if p.price > 0]
aggregated.price = sum(valid_prices) / len(valid_prices) if valid_prices else 0.0
return aggregated

View File

@@ -11,6 +11,7 @@ def extract_product(currency: str, ticker_data: dict[str, Any]) -> ProductInfo:
product.price = float(ticker_data.get('price', 0))
product.volume_24h = float(ticker_data.get('volume', 0))
product.currency = currency
product.provider = "Binance"
return product
def extract_price(kline_data: list[Any]) -> Price:

View File

@@ -12,6 +12,7 @@ def extract_product(product_data: GetProductResponse | Product) -> ProductInfo:
product.symbol = product_data.base_currency_id or ""
product.price = float(product_data.price) if product_data.price else 0.0
product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0.0
product.provider = "Coinbase"
return product
def extract_price(candle_data: Candle) -> Price:

View File

@@ -11,6 +11,7 @@ def extract_product(asset_data: dict[str, Any]) -> ProductInfo:
product.price = float(asset_data.get('PRICE', 0))
product.volume_24h = float(asset_data.get('VOLUME24HOUR', 0))
assert product.price > 0, "Invalid price data received from CryptoCompare"
product.provider = "CryptoCompare"
return product
def extract_price(price_data: dict[str, Any]) -> Price:

View File

@@ -13,6 +13,7 @@ def extract_product(stock_data: dict[str, str]) -> ProductInfo:
product.price = float(stock_data.get('Current Stock Price', f"0.0 USD").split(" ")[0]) # prende solo il numero
product.volume_24h = 0.0 # YFinance non fornisce il volume 24h direttamente
product.currency = product.id.split('-')[1] # La valuta è la parte dopo il '-'
product.provider = "YFinance"
return product
def extract_price(hist_data: dict[str, str]) -> Price:

View File

@@ -33,6 +33,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
self.get_product,
self.get_products,
self.get_historical_prices,
self.get_product_aggregated,
self.get_products_aggregated,
self.get_historical_prices_aggregated,
],
@@ -87,6 +88,36 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
"""
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
def get_product_aggregated(self, asset_id: str) -> ProductInfo:
"""
Gets product information for a *single* asset from *all available providers* and *aggregates* the results.
This method queries all configured sources (Binance, YFinance, Coinbase, CryptoCompare)
and combines the data using volume-weighted average price (VWAP) to provide
the most accurate and comprehensive price data.
Use this when you need highly reliable price data from multiple sources.
Warning: This uses more API calls (4x) than get_product().
Args:
asset_id (str): The asset ID to retrieve information for (e.g., "BTC", "ETH").
Returns:
ProductInfo: A single ProductInfo object with aggregated data from all providers.
The 'provider' field will list all sources used (e.g., "Binance, YFinance, Coinbase").
Raises:
Exception: If all providers fail to return results.
Example:
>>> tool.get_product_aggregated("BTC")
ProductInfo(symbol="BTC", price=45123.50, provider="Binance, YFinance, Coinbase", ...)
"""
# try_call_all returns dict[str, ProductInfo] where key is provider name
# We need list[ProductInfo] for aggregation, so we extract values
all_products = self.handler.try_call_all(lambda w: w.get_product(asset_id))
return ProductInfo.aggregate_single_asset(all_products)
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Gets product information for multiple assets from *all available providers* and *aggregates* the results.
@@ -107,7 +138,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
all_products: dict[str, list[ProductInfo]] = {}
for asset in asset_ids:
all_products[asset] = self.handler.try_call_all(lambda w: w.get_product(asset))
return ProductInfo.aggregate(all_products)
return ProductInfo.aggregate_multi_assets(all_products)
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
"""

View File

@@ -7,13 +7,14 @@ from app.api.core.markets import ProductInfo, Price
@pytest.mark.market
class TestMarketDataAggregator:
def __product(self, symbol: str, price: float, volume: float, currency: str) -> ProductInfo:
def __product(self, symbol: str, price: float, volume: float, currency: str, provider: str = "") -> ProductInfo:
prod = ProductInfo()
prod.id=f"{symbol}-{currency}"
prod.symbol=symbol
prod.price=price
prod.volume_24h=volume
prod.currency=currency
prod.id = f"{symbol}-{currency}"
prod.symbol = symbol
prod.price = price
prod.volume_24h = volume
prod.currency = currency
prod.provider = provider
return prod
def __price(self, timestamp_s: int, high: float, low: float, open: float, close: float, volume: float) -> Price:
@@ -28,15 +29,13 @@ class TestMarketDataAggregator:
def test_aggregate_product_info(self):
products: dict[str, list[ProductInfo]] = {
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")],
"Provider2": [self.__product("BTC", 50100.0, 1100.0, "USD")],
"Provider3": [self.__product("BTC", 49900.0, 900.0, "USD")],
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")],
"Provider2": [self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2")],
"Provider3": [self.__product("BTC", 49900.0, 900.0, "USD", "Provider3")],
}
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 1
info = aggregated[0]
# aggregate_single_asset returns a single ProductInfo, not a list
info = ProductInfo.aggregate_single_asset(products)
assert info is not None
assert info.symbol == "BTC"
@@ -48,16 +47,17 @@ class TestMarketDataAggregator:
def test_aggregate_product_info_multiple_symbols(self):
products = {
"Provider1": [
self.__product("BTC", 50000.0, 1000.0, "USD"),
self.__product("ETH", 4000.0, 2000.0, "USD"),
self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1"),
self.__product("ETH", 4000.0, 2000.0, "USD", "Provider1"),
],
"Provider2": [
self.__product("BTC", 50100.0, 1100.0, "USD"),
self.__product("ETH", 4050.0, 2100.0, "USD"),
self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2"),
self.__product("ETH", 4050.0, 2100.0, "USD", "Provider2"),
],
}
aggregated = ProductInfo.aggregate(products)
# aggregate_multi_assets aggregates by symbol across providers
aggregated = ProductInfo.aggregate_multi_assets(products)
assert len(aggregated) == 2
btc_info = next((p for p in aggregated if p.symbol == "BTC"), None)
@@ -80,15 +80,15 @@ class TestMarketDataAggregator:
"Provider1": [],
"Provider2": [],
}
aggregated = ProductInfo.aggregate(products)
aggregated = ProductInfo.aggregate_multi_assets(products)
assert len(aggregated) == 0
def test_aggregate_product_info_with_partial_data(self):
products: dict[str, list[ProductInfo]] = {
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")],
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")],
"Provider2": [],
}
aggregated = ProductInfo.aggregate(products)
aggregated = ProductInfo.aggregate_multi_assets(products)
assert len(aggregated) == 1
info = aggregated[0]
assert info.symbol == "BTC"
@@ -130,25 +130,165 @@ class TestMarketDataAggregator:
def test_aggregate_product_info_different_currencies(self):
products: dict[str, list[ProductInfo]] = {
"Provider1": [self.__product("BTC", 100000.0, 1000.0, "USD")],
"Provider2": [self.__product("BTC", 70000.0, 800.0, "EUR")],
"Provider1": [self.__product("BTC", 100000.0, 1000.0, "USD", "Provider1")],
"Provider2": [self.__product("BTC", 70000.0, 800.0, "EUR", "Provider2")],
}
aggregated = ProductInfo.aggregate(products)
aggregated = ProductInfo.aggregate_multi_assets(products)
assert len(aggregated) == 1
info = aggregated[0]
assert info is not None
assert info.id == "BTC_AGGREGATED"
assert info.symbol == "BTC"
assert info.currency == "USD"
assert info.price == pytest.approx(100000.0, rel=1e-3) # type: ignore
assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore
assert info.currency in ["USD", "EUR"] # Can be either, depending on which is found first
# When aggregating different currencies, VWAP is calculated
# (100000.0 * 1000.0 + 70000.0 * 800.0) / (1000.0 + 800.0)
expected_price = (100000.0 * 1000.0 + 70000.0 * 800.0) / (1000.0 + 800.0)
assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore
assert info.volume_24h == pytest.approx(900.0, rel=1e-3) # type: ignore # Average of volumes
info = aggregated[1]
# ===== Tests for aggregate_single_asset =====
def test_aggregate_single_asset_from_dict(self):
"""Test aggregate_single_asset with dict input (simulating WrapperHandler.try_call_all)"""
products_dict: dict[str, ProductInfo] = {
"BinanceWrapper": self.__product("BTC", 50000.0, 1000.0, "USD", "Binance"),
"YFinanceWrapper": self.__product("BTC", 50100.0, 1100.0, "USD", "YFinance"),
"CoinBaseWrapper": self.__product("BTC", 49900.0, 900.0, "USD", "Coinbase"),
}
info = ProductInfo.aggregate_single_asset(products_dict)
assert info is not None
assert info.id == "BTC-EUR_AGGREGATED"
assert info.symbol == "BTC"
assert info.currency == "EUR"
assert info.price == pytest.approx(70000.0, rel=1e-3) # type: ignore
assert info.volume_24h == pytest.approx(800.0, rel=1e-3) # type: ignore
assert info.id == "BTC_AGGREGATED"
assert "Binance" in info.provider
assert "YFinance" in info.provider
assert "Coinbase" in info.provider
# VWAP calculation
expected_price = (50000.0 * 1000.0 + 50100.0 * 1100.0 + 49900.0 * 900.0) / (1000.0 + 1100.0 + 900.0)
assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore
assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore
assert info.currency == "USD"
def test_aggregate_single_asset_from_list(self):
"""Test aggregate_single_asset with list input"""
products_list = [
self.__product("ETH", 4000.0, 2000.0, "USD", "Binance"),
self.__product("ETH", 4050.0, 2100.0, "USD", "Coinbase"),
]
info = ProductInfo.aggregate_single_asset(products_list)
assert info is not None
assert info.symbol == "ETH"
assert info.id == "ETH_AGGREGATED"
assert "Binance" in info.provider
assert "Coinbase" in info.provider
expected_price = (4000.0 * 2000.0 + 4050.0 * 2100.0) / (2000.0 + 2100.0)
assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore
def test_aggregate_single_asset_no_volume_fallback(self):
"""Test fallback to simple average when no volume data"""
products_list = [
self.__product("SOL", 100.0, 0.0, "USD", "Provider1"),
self.__product("SOL", 110.0, 0.0, "USD", "Provider2"),
self.__product("SOL", 90.0, 0.0, "USD", "Provider3"),
]
info = ProductInfo.aggregate_single_asset(products_list)
assert info is not None
assert info.symbol == "SOL"
# Simple average: (100 + 110 + 90) / 3 = 100
assert info.price == pytest.approx(100.0, rel=1e-3) # type: ignore
assert info.volume_24h == pytest.approx(0.0, rel=1e-3) # type: ignore
def test_aggregate_single_asset_empty_raises(self):
"""Test that empty input raises ValueError"""
with pytest.raises(ValueError, match="requires at least one ProductInfo"):
ProductInfo.aggregate_single_asset([])
with pytest.raises(ValueError, match="requires at least one ProductInfo"):
ProductInfo.aggregate_single_asset({})
def test_aggregate_single_asset_dict_with_lists(self):
"""Test aggregate_single_asset with dict[str, list[ProductInfo]] (flattens correctly)"""
products_dict_lists: dict[str, list[ProductInfo]] = {
"Provider1": [self.__product("ADA", 0.50, 1000.0, "USD", "Provider1")],
"Provider2": [self.__product("ADA", 0.52, 1200.0, "USD", "Provider2")],
}
info = ProductInfo.aggregate_single_asset(products_dict_lists)
assert info is not None
assert info.symbol == "ADA"
expected_price = (0.50 * 1000.0 + 0.52 * 1200.0) / (1000.0 + 1200.0)
assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore
def test_aggregate_single_asset_missing_currency(self):
"""Test that aggregate_single_asset handles missing currency gracefully"""
products_list = [
self.__product("DOT", 10.0, 500.0, "", "Provider1"),
self.__product("DOT", 10.5, 600.0, "USD", "Provider2"),
]
info = ProductInfo.aggregate_single_asset(products_list)
assert info is not None
assert info.symbol == "DOT"
assert info.currency == "USD" # Should pick the first non-empty currency
def test_aggregate_single_asset_single_provider(self):
"""Test aggregate_single_asset with only one provider (edge case)"""
products = {
"BinanceWrapper": self.__product("MATIC", 0.80, 5000.0, "USD", "Binance"),
}
info = ProductInfo.aggregate_single_asset(products)
assert info is not None
assert info.symbol == "MATIC"
assert info.price == pytest.approx(0.80, rel=1e-3) # type: ignore
assert info.volume_24h == pytest.approx(5000.0, rel=1e-3) # type: ignore
assert info.provider == "Binance"
# ===== Tests for aggregate_multi_assets with edge cases =====
def test_aggregate_multi_assets_empty_providers(self):
"""Test aggregate_multi_assets with some providers returning empty lists"""
products = {
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")],
"Provider2": [],
"Provider3": [self.__product("BTC", 50100.0, 1100.0, "USD", "Provider3")],
}
aggregated = ProductInfo.aggregate_multi_assets(products)
assert len(aggregated) == 1
info = aggregated[0]
assert info.symbol == "BTC"
assert "Provider1" in info.provider
assert "Provider3" in info.provider
def test_aggregate_multi_assets_mixed_symbols(self):
"""Test that aggregate_multi_assets correctly separates different symbols"""
products = {
"Provider1": [
self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1"),
self.__product("ETH", 4000.0, 2000.0, "USD", "Provider1"),
self.__product("SOL", 100.0, 500.0, "USD", "Provider1"),
],
"Provider2": [
self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2"),
self.__product("ETH", 4050.0, 2100.0, "USD", "Provider2"),
],
}
aggregated = ProductInfo.aggregate_multi_assets(products)
assert len(aggregated) == 3
symbols = {p.symbol for p in aggregated}
assert symbols == {"BTC", "ETH", "SOL"}
btc = next(p for p in aggregated if p.symbol == "BTC")
assert "Provider1" in btc.provider and "Provider2" in btc.provider
sol = next(p for p in aggregated if p.symbol == "SOL")
assert sol.provider == "Provider1" # Only one provider