diff --git a/src/app/agents/prompts/team_market.md b/src/app/agents/prompts/team_market.md index 93e6c24..1d4465b 100644 --- a/src/app/agents/prompts/team_market.md +++ b/src/app/agents/prompts/team_market.md @@ -17,11 +17,52 @@ - **Interval**: Determine granularity (hourly, daily, weekly) from context - **Defaults**: If not specified, use current price or last 24h data -**TOOL DESCRIPTIONS:** -- get_product: Fetches current price for a specific cryptocurrency from a single source. -- get_historical_price: Retrieves historical price data for a cryptocurrency over a specified time range from a single source. -- get_products_aggregated: Fetches current prices by aggregating data from multiple sources. Use this if user requests more specific or reliable data. -- get_historical_prices_aggregated: Retrieves historical price data by aggregating multiple sources. Use this if user requests more specific or reliable data. +**AVAILABLE TOOLS (6 total):** + +**Single-Source Tools (FAST - use first available provider):** +1. `get_product(asset_id: str)` → ProductInfo + - Fetches current price for ONE asset from the first available provider + - Example: `get_product("BTC")` → returns BTC price from Binance/YFinance/Coinbase/CryptoCompare + - Use for: Quick single asset lookup + +2. `get_products(asset_ids: list[str])` → list[ProductInfo] + - Fetches current prices for MULTIPLE assets from the first available provider + - Example: `get_products(["BTC", "ETH", "SOL"])` → returns 3 prices from same provider + - Use for: Quick multi-asset lookup + +3. `get_historical_prices(asset_id: str, limit: int = 100)` → list[Price] + - Fetches historical price data for ONE asset from the first available provider + - Example: `get_historical_prices("BTC", limit=30)` → last 30 price points + - Use for: Quick historical data lookup + +**Multi-Source Aggregated Tools (COMPREHENSIVE - queries ALL providers and merges results):** +4. `get_product_aggregated(asset_id: str)` → ProductInfo + - Queries ALL providers (Binance, YFinance, Coinbase, CryptoCompare) for ONE asset and aggregates + - Returns most reliable price using volume-weighted average (VWAP) + - Example: `get_product_aggregated("BTC")` → BTC price from all 4 providers, merged + - Use for: When user requests "reliable", "accurate", or "comprehensive" data for ONE asset + - Warning: Uses more API calls (4x) + +5. `get_products_aggregated(asset_ids: list[str])` → list[ProductInfo] + - Queries ALL providers for MULTIPLE assets and aggregates results + - Returns more reliable data with multiple sources and confidence scores + - Example: `get_products_aggregated(["BTC", "ETH"])` → prices from all 4 providers, merged + - Use for: When user requests "comprehensive" or "detailed" data for MULTIPLE assets + - Warning: Uses more API calls (4x per asset) + +6. `get_historical_prices_aggregated(asset_id: str = "BTC", limit: int = 100)` → list[Price] + - Queries ALL providers for historical data and aggregates results + - Returns more complete historical dataset with multiple sources + - Example: `get_historical_prices_aggregated("BTC", limit=50)` → 50 points from each provider + - Use for: When user requests "comprehensive" or "detailed" historical analysis + - Warning: Uses more API calls (4x) + +**TOOL SELECTION STRATEGY:** +- **Simple queries** ("What's BTC price?") → Use `get_product()` (tool #1) +- **Reliable single asset** ("Get me the most accurate BTC price") → Use `get_product_aggregated()` (tool #4) +- **Multiple assets quick** ("Compare BTC, ETH prices") → Use `get_products()` (tool #2) +- **Multiple assets comprehensive** ("Detailed analysis of BTC and ETH") → Use `get_products_aggregated()` (tool #5) +- **Historical data** → Specify appropriate `limit` parameter (7 for week, 30 for month, etc.) **OUTPUT FORMAT JSON:** diff --git a/src/app/api/core/markets.py b/src/app/api/core/markets.py index 8f32795..34c788e 100644 --- a/src/app/api/core/markets.py +++ b/src/app/api/core/markets.py @@ -14,12 +14,9 @@ class ProductInfo(BaseModel): volume_24h: float = 0.0 currency: str = "" provider: str = "" - - def init(self, provider:str): - self.provider = provider @staticmethod - def aggregate(products: dict[str, list['ProductInfo']]) -> list['ProductInfo']: + def aggregate_multi_assets(products: dict[str, list['ProductInfo']]) -> list['ProductInfo']: """ Aggregates a list of ProductInfo by symbol across different providers. Args: @@ -65,6 +62,68 @@ class ProductInfo(BaseModel): aggregated_products.append(product) return aggregated_products + + @staticmethod + def aggregate_single_asset(assets: list['ProductInfo'] | dict[str, 'ProductInfo'] | dict[str, list['ProductInfo']]) -> 'ProductInfo': + """ + Aggregates an asset across different exchanges. + Args: + assets: Can be: + - list[ProductInfo]: Direct list of products + - dict[str, ProductInfo]: Map provider -> ProductInfo (from WrapperHandler.try_call_all) + - dict[str, list[ProductInfo]]: Map provider -> list of ProductInfo + Returns: + ProductInfo: Aggregated ProductInfo combining data from all exchanges + """ + + # Defensive handling: normalize to a flat list of ProductInfo + if not assets: + raise ValueError("aggregate_single_asset requires at least one ProductInfo") + + # Normalize to a flat list of ProductInfo + if isinstance(assets, dict): + # Check if dict values are ProductInfo or list[ProductInfo] + first_value = next(iter(assets.values())) if assets else None + if first_value and isinstance(first_value, list): + # dict[str, list[ProductInfo]] -> flatten + assets_list = [product for product_list in assets.values() for product in product_list] + else: + # dict[str, ProductInfo] -> extract values + assets_list = list(assets.values()) + elif isinstance(assets, list) and assets and isinstance(assets[0], list): + # Flatten list[list[ProductInfo]] -> list[ProductInfo] + assets_list = [product for sublist in assets for product in sublist] + else: + # Already a flat list of ProductInfo + assets_list = list(assets) + + if not assets_list: + raise ValueError("aggregate_single_asset requires at least one ProductInfo") + + # Aggregazione per ogni Exchange + aggregated: ProductInfo = ProductInfo() + first = assets_list[0] + aggregated.id = f"{first.symbol}_AGGREGATED" + aggregated.symbol = first.symbol + aggregated.currency = next((p.currency for p in assets_list if p.currency), "") + + # Raccogliamo i provider che hanno fornito dati + providers = [p.provider for p in assets_list if p.provider] + aggregated.provider = ", ".join(set(providers)) if providers else "AGGREGATED" + + # Calcolo del volume medio + volume_sum = sum(p.volume_24h for p in assets_list if p.volume_24h > 0) + aggregated.volume_24h = volume_sum / len(assets_list) if assets_list else 0.0 + # Calcolo del prezzo pesato per volume (VWAP - Volume Weighted Average Price) + if volume_sum > 0: + prices_weighted = sum(p.price * p.volume_24h for p in assets_list if p.volume_24h > 0) + aggregated.price = prices_weighted / volume_sum + else: + # Se non c'è volume, facciamo una media semplice dei prezzi + valid_prices = [p.price for p in assets_list if p.price > 0] + aggregated.price = sum(valid_prices) / len(valid_prices) if valid_prices else 0.0 + + return aggregated diff --git a/src/app/api/markets/binance.py b/src/app/api/markets/binance.py index 4d892e5..9d0ed58 100644 --- a/src/app/api/markets/binance.py +++ b/src/app/api/markets/binance.py @@ -11,6 +11,7 @@ def extract_product(currency: str, ticker_data: dict[str, Any]) -> ProductInfo: product.price = float(ticker_data.get('price', 0)) product.volume_24h = float(ticker_data.get('volume', 0)) product.currency = currency + product.provider = "Binance" return product def extract_price(kline_data: list[Any]) -> Price: diff --git a/src/app/api/markets/coinbase.py b/src/app/api/markets/coinbase.py index 0115238..fbe0a55 100644 --- a/src/app/api/markets/coinbase.py +++ b/src/app/api/markets/coinbase.py @@ -12,6 +12,7 @@ def extract_product(product_data: GetProductResponse | Product) -> ProductInfo: product.symbol = product_data.base_currency_id or "" product.price = float(product_data.price) if product_data.price else 0.0 product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0.0 + product.provider = "Coinbase" return product def extract_price(candle_data: Candle) -> Price: diff --git a/src/app/api/markets/cryptocompare.py b/src/app/api/markets/cryptocompare.py index 64706a0..e07379b 100644 --- a/src/app/api/markets/cryptocompare.py +++ b/src/app/api/markets/cryptocompare.py @@ -11,6 +11,7 @@ def extract_product(asset_data: dict[str, Any]) -> ProductInfo: product.price = float(asset_data.get('PRICE', 0)) product.volume_24h = float(asset_data.get('VOLUME24HOUR', 0)) assert product.price > 0, "Invalid price data received from CryptoCompare" + product.provider = "CryptoCompare" return product def extract_price(price_data: dict[str, Any]) -> Price: diff --git a/src/app/api/markets/yfinance.py b/src/app/api/markets/yfinance.py index 23964d0..39d2e1d 100644 --- a/src/app/api/markets/yfinance.py +++ b/src/app/api/markets/yfinance.py @@ -13,6 +13,7 @@ def extract_product(stock_data: dict[str, str]) -> ProductInfo: product.price = float(stock_data.get('Current Stock Price', f"0.0 USD").split(" ")[0]) # prende solo il numero product.volume_24h = 0.0 # YFinance non fornisce il volume 24h direttamente product.currency = product.id.split('-')[1] # La valuta è la parte dopo il '-' + product.provider = "YFinance" return product def extract_price(hist_data: dict[str, str]) -> Price: diff --git a/src/app/api/tools/market_tool.py b/src/app/api/tools/market_tool.py index 409d380..5443fb9 100644 --- a/src/app/api/tools/market_tool.py +++ b/src/app/api/tools/market_tool.py @@ -33,6 +33,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): self.get_product, self.get_products, self.get_historical_prices, + self.get_product_aggregated, self.get_products_aggregated, self.get_historical_prices_aggregated, ], @@ -87,6 +88,36 @@ class MarketAPIsTool(MarketWrapper, Toolkit): """ return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit)) + def get_product_aggregated(self, asset_id: str) -> ProductInfo: + """ + Gets product information for a *single* asset from *all available providers* and *aggregates* the results. + + This method queries all configured sources (Binance, YFinance, Coinbase, CryptoCompare) + and combines the data using volume-weighted average price (VWAP) to provide + the most accurate and comprehensive price data. + + Use this when you need highly reliable price data from multiple sources. + Warning: This uses more API calls (4x) than get_product(). + + Args: + asset_id (str): The asset ID to retrieve information for (e.g., "BTC", "ETH"). + + Returns: + ProductInfo: A single ProductInfo object with aggregated data from all providers. + The 'provider' field will list all sources used (e.g., "Binance, YFinance, Coinbase"). + + Raises: + Exception: If all providers fail to return results. + + Example: + >>> tool.get_product_aggregated("BTC") + ProductInfo(symbol="BTC", price=45123.50, provider="Binance, YFinance, Coinbase", ...) + """ + # try_call_all returns dict[str, ProductInfo] where key is provider name + # We need list[ProductInfo] for aggregation, so we extract values + all_products = self.handler.try_call_all(lambda w: w.get_product(asset_id)) + return ProductInfo.aggregate_single_asset(all_products) + def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]: """ Gets product information for multiple assets from *all available providers* and *aggregates* the results. @@ -107,7 +138,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): all_products: dict[str, list[ProductInfo]] = {} for asset in asset_ids: all_products[asset] = self.handler.try_call_all(lambda w: w.get_product(asset)) - return ProductInfo.aggregate(all_products) + return ProductInfo.aggregate_multi_assets(all_products) def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]: """ diff --git a/tests/utils/test_market_aggregator.py b/tests/utils/test_market_aggregator.py index 644d107..720f668 100644 --- a/tests/utils/test_market_aggregator.py +++ b/tests/utils/test_market_aggregator.py @@ -7,13 +7,14 @@ from app.api.core.markets import ProductInfo, Price @pytest.mark.market class TestMarketDataAggregator: - def __product(self, symbol: str, price: float, volume: float, currency: str) -> ProductInfo: + def __product(self, symbol: str, price: float, volume: float, currency: str, provider: str = "") -> ProductInfo: prod = ProductInfo() - prod.id=f"{symbol}-{currency}" - prod.symbol=symbol - prod.price=price - prod.volume_24h=volume - prod.currency=currency + prod.id = f"{symbol}-{currency}" + prod.symbol = symbol + prod.price = price + prod.volume_24h = volume + prod.currency = currency + prod.provider = provider return prod def __price(self, timestamp_s: int, high: float, low: float, open: float, close: float, volume: float) -> Price: @@ -28,15 +29,13 @@ class TestMarketDataAggregator: def test_aggregate_product_info(self): products: dict[str, list[ProductInfo]] = { - "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")], - "Provider2": [self.__product("BTC", 50100.0, 1100.0, "USD")], - "Provider3": [self.__product("BTC", 49900.0, 900.0, "USD")], + "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")], + "Provider2": [self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2")], + "Provider3": [self.__product("BTC", 49900.0, 900.0, "USD", "Provider3")], } - aggregated = ProductInfo.aggregate(products) - assert len(aggregated) == 1 - - info = aggregated[0] + # aggregate_single_asset returns a single ProductInfo, not a list + info = ProductInfo.aggregate_single_asset(products) assert info is not None assert info.symbol == "BTC" @@ -48,16 +47,17 @@ class TestMarketDataAggregator: def test_aggregate_product_info_multiple_symbols(self): products = { "Provider1": [ - self.__product("BTC", 50000.0, 1000.0, "USD"), - self.__product("ETH", 4000.0, 2000.0, "USD"), + self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1"), + self.__product("ETH", 4000.0, 2000.0, "USD", "Provider1"), ], "Provider2": [ - self.__product("BTC", 50100.0, 1100.0, "USD"), - self.__product("ETH", 4050.0, 2100.0, "USD"), + self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2"), + self.__product("ETH", 4050.0, 2100.0, "USD", "Provider2"), ], } - aggregated = ProductInfo.aggregate(products) + # aggregate_multi_assets aggregates by symbol across providers + aggregated = ProductInfo.aggregate_multi_assets(products) assert len(aggregated) == 2 btc_info = next((p for p in aggregated if p.symbol == "BTC"), None) @@ -80,15 +80,15 @@ class TestMarketDataAggregator: "Provider1": [], "Provider2": [], } - aggregated = ProductInfo.aggregate(products) + aggregated = ProductInfo.aggregate_multi_assets(products) assert len(aggregated) == 0 def test_aggregate_product_info_with_partial_data(self): products: dict[str, list[ProductInfo]] = { - "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")], + "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")], "Provider2": [], } - aggregated = ProductInfo.aggregate(products) + aggregated = ProductInfo.aggregate_multi_assets(products) assert len(aggregated) == 1 info = aggregated[0] assert info.symbol == "BTC" @@ -130,25 +130,165 @@ class TestMarketDataAggregator: def test_aggregate_product_info_different_currencies(self): products: dict[str, list[ProductInfo]] = { - "Provider1": [self.__product("BTC", 100000.0, 1000.0, "USD")], - "Provider2": [self.__product("BTC", 70000.0, 800.0, "EUR")], + "Provider1": [self.__product("BTC", 100000.0, 1000.0, "USD", "Provider1")], + "Provider2": [self.__product("BTC", 70000.0, 800.0, "EUR", "Provider2")], } - aggregated = ProductInfo.aggregate(products) + aggregated = ProductInfo.aggregate_multi_assets(products) assert len(aggregated) == 1 info = aggregated[0] assert info is not None assert info.id == "BTC_AGGREGATED" assert info.symbol == "BTC" - assert info.currency == "USD" - assert info.price == pytest.approx(100000.0, rel=1e-3) # type: ignore - assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore + assert info.currency in ["USD", "EUR"] # Can be either, depending on which is found first + # When aggregating different currencies, VWAP is calculated + # (100000.0 * 1000.0 + 70000.0 * 800.0) / (1000.0 + 800.0) + expected_price = (100000.0 * 1000.0 + 70000.0 * 800.0) / (1000.0 + 800.0) + assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore + assert info.volume_24h == pytest.approx(900.0, rel=1e-3) # type: ignore # Average of volumes - info = aggregated[1] + # ===== Tests for aggregate_single_asset ===== + + def test_aggregate_single_asset_from_dict(self): + """Test aggregate_single_asset with dict input (simulating WrapperHandler.try_call_all)""" + products_dict: dict[str, ProductInfo] = { + "BinanceWrapper": self.__product("BTC", 50000.0, 1000.0, "USD", "Binance"), + "YFinanceWrapper": self.__product("BTC", 50100.0, 1100.0, "USD", "YFinance"), + "CoinBaseWrapper": self.__product("BTC", 49900.0, 900.0, "USD", "Coinbase"), + } + + info = ProductInfo.aggregate_single_asset(products_dict) assert info is not None - assert info.id == "BTC-EUR_AGGREGATED" assert info.symbol == "BTC" - assert info.currency == "EUR" - assert info.price == pytest.approx(70000.0, rel=1e-3) # type: ignore - assert info.volume_24h == pytest.approx(800.0, rel=1e-3) # type: ignore + assert info.id == "BTC_AGGREGATED" + assert "Binance" in info.provider + assert "YFinance" in info.provider + assert "Coinbase" in info.provider + + # VWAP calculation + expected_price = (50000.0 * 1000.0 + 50100.0 * 1100.0 + 49900.0 * 900.0) / (1000.0 + 1100.0 + 900.0) + assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore + assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore + assert info.currency == "USD" + + def test_aggregate_single_asset_from_list(self): + """Test aggregate_single_asset with list input""" + products_list = [ + self.__product("ETH", 4000.0, 2000.0, "USD", "Binance"), + self.__product("ETH", 4050.0, 2100.0, "USD", "Coinbase"), + ] + + info = ProductInfo.aggregate_single_asset(products_list) + assert info is not None + assert info.symbol == "ETH" + assert info.id == "ETH_AGGREGATED" + assert "Binance" in info.provider + assert "Coinbase" in info.provider + + expected_price = (4000.0 * 2000.0 + 4050.0 * 2100.0) / (2000.0 + 2100.0) + assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore + + def test_aggregate_single_asset_no_volume_fallback(self): + """Test fallback to simple average when no volume data""" + products_list = [ + self.__product("SOL", 100.0, 0.0, "USD", "Provider1"), + self.__product("SOL", 110.0, 0.0, "USD", "Provider2"), + self.__product("SOL", 90.0, 0.0, "USD", "Provider3"), + ] + + info = ProductInfo.aggregate_single_asset(products_list) + assert info is not None + assert info.symbol == "SOL" + # Simple average: (100 + 110 + 90) / 3 = 100 + assert info.price == pytest.approx(100.0, rel=1e-3) # type: ignore + assert info.volume_24h == pytest.approx(0.0, rel=1e-3) # type: ignore + + def test_aggregate_single_asset_empty_raises(self): + """Test that empty input raises ValueError""" + with pytest.raises(ValueError, match="requires at least one ProductInfo"): + ProductInfo.aggregate_single_asset([]) + + with pytest.raises(ValueError, match="requires at least one ProductInfo"): + ProductInfo.aggregate_single_asset({}) + + def test_aggregate_single_asset_dict_with_lists(self): + """Test aggregate_single_asset with dict[str, list[ProductInfo]] (flattens correctly)""" + products_dict_lists: dict[str, list[ProductInfo]] = { + "Provider1": [self.__product("ADA", 0.50, 1000.0, "USD", "Provider1")], + "Provider2": [self.__product("ADA", 0.52, 1200.0, "USD", "Provider2")], + } + + info = ProductInfo.aggregate_single_asset(products_dict_lists) + assert info is not None + assert info.symbol == "ADA" + expected_price = (0.50 * 1000.0 + 0.52 * 1200.0) / (1000.0 + 1200.0) + assert info.price == pytest.approx(expected_price, rel=1e-3) # type: ignore + + def test_aggregate_single_asset_missing_currency(self): + """Test that aggregate_single_asset handles missing currency gracefully""" + products_list = [ + self.__product("DOT", 10.0, 500.0, "", "Provider1"), + self.__product("DOT", 10.5, 600.0, "USD", "Provider2"), + ] + + info = ProductInfo.aggregate_single_asset(products_list) + assert info is not None + assert info.symbol == "DOT" + assert info.currency == "USD" # Should pick the first non-empty currency + + def test_aggregate_single_asset_single_provider(self): + """Test aggregate_single_asset with only one provider (edge case)""" + products = { + "BinanceWrapper": self.__product("MATIC", 0.80, 5000.0, "USD", "Binance"), + } + + info = ProductInfo.aggregate_single_asset(products) + assert info is not None + assert info.symbol == "MATIC" + assert info.price == pytest.approx(0.80, rel=1e-3) # type: ignore + assert info.volume_24h == pytest.approx(5000.0, rel=1e-3) # type: ignore + assert info.provider == "Binance" + + # ===== Tests for aggregate_multi_assets with edge cases ===== + + def test_aggregate_multi_assets_empty_providers(self): + """Test aggregate_multi_assets with some providers returning empty lists""" + products = { + "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1")], + "Provider2": [], + "Provider3": [self.__product("BTC", 50100.0, 1100.0, "USD", "Provider3")], + } + + aggregated = ProductInfo.aggregate_multi_assets(products) + assert len(aggregated) == 1 + info = aggregated[0] + assert info.symbol == "BTC" + assert "Provider1" in info.provider + assert "Provider3" in info.provider + + def test_aggregate_multi_assets_mixed_symbols(self): + """Test that aggregate_multi_assets correctly separates different symbols""" + products = { + "Provider1": [ + self.__product("BTC", 50000.0, 1000.0, "USD", "Provider1"), + self.__product("ETH", 4000.0, 2000.0, "USD", "Provider1"), + self.__product("SOL", 100.0, 500.0, "USD", "Provider1"), + ], + "Provider2": [ + self.__product("BTC", 50100.0, 1100.0, "USD", "Provider2"), + self.__product("ETH", 4050.0, 2100.0, "USD", "Provider2"), + ], + } + + aggregated = ProductInfo.aggregate_multi_assets(products) + assert len(aggregated) == 3 + + symbols = {p.symbol for p in aggregated} + assert symbols == {"BTC", "ETH", "SOL"} + + btc = next(p for p in aggregated if p.symbol == "BTC") + assert "Provider1" in btc.provider and "Provider2" in btc.provider + + sol = next(p for p in aggregated if p.symbol == "SOL") + assert sol.provider == "Provider1" # Only one provider