diff --git a/src/app/api/base/markets.py b/src/app/api/base/markets.py index e6a0657..8b6c754 100644 --- a/src/app/api/base/markets.py +++ b/src/app/api/base/markets.py @@ -5,8 +5,8 @@ from pydantic import BaseModel class ProductInfo(BaseModel): """ - Informazioni sul prodotto, come ottenute dalle API di mercato. - Implementa i metodi di conversione dai dati grezzi delle API. + Product information as obtained from market APIs. + Implements conversion methods from raw API data. """ id: str = "" symbol: str = "" @@ -14,10 +14,46 @@ class ProductInfo(BaseModel): volume_24h: float = 0.0 currency: str = "" + @staticmethod + def aggregate(products: dict[str, list['ProductInfo']]) -> list['ProductInfo']: + """ + Aggregates a list of ProductInfo by symbol. + Args: + products (dict[str, list[ProductInfo]]): Map provider -> list of ProductInfo + Returns: + list[ProductInfo]: List of ProductInfo aggregated by symbol + """ + + # Costruzione mappa symbol -> lista di ProductInfo + symbols_infos: dict[str, list[ProductInfo]] = {} + for _, product_list in products.items(): + for product in product_list: + symbols_infos.setdefault(product.symbol, []).append(product) + + # Aggregazione per ogni symbol + aggregated_products: list[ProductInfo] = [] + for symbol, product_list in symbols_infos.items(): + product = ProductInfo() + + product.id = f"{symbol}_AGGREGATED" + product.symbol = symbol + product.currency = next(p.currency for p in product_list if p.currency) + + volume_sum = sum(p.volume_24h for p in product_list) + product.volume_24h = volume_sum / len(product_list) if product_list else 0.0 + + prices = sum(p.price * p.volume_24h for p in product_list) + product.price = (prices / volume_sum) if volume_sum > 0 else 0.0 + + aggregated_products.append(product) + return aggregated_products + + + class Price(BaseModel): """ - Rappresenta i dati di prezzo per un asset, come ottenuti dalle API di mercato. - Implementa i metodi di conversione dai dati grezzi delle API. + Represents price data for an asset as obtained from market APIs. + Implements conversion methods from raw API data. """ high: float = 0.0 low: float = 0.0 @@ -25,16 +61,17 @@ class Price(BaseModel): close: float = 0.0 volume: float = 0.0 timestamp: str = "" - """Timestamp con formato YYYY-MM-DD HH:MM""" + """Timestamp in format YYYY-MM-DD HH:MM""" def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None: """ - Imposta il timestamp a partire da millisecondi o secondi. - IL timestamp viene salvato come stringa formattata 'YYYY-MM-DD HH:MM'. + Sets the timestamp from milliseconds or seconds. + The timestamp is saved as a formatted string 'YYYY-MM-DD HH:MM'. Args: - timestamp_ms: Timestamp in millisecondi. - timestamp_s: Timestamp in secondi. + timestamp_ms: Timestamp in milliseconds. + timestamp_s: Timestamp in seconds. Raises: + ValueError: If neither timestamp_ms nor timestamp_s is provided. """ if timestamp_ms is not None: timestamp = timestamp_ms // 1000 @@ -46,10 +83,41 @@ class Price(BaseModel): self.timestamp = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M') + @staticmethod + def aggregate(prices: dict[str, list['Price']]) -> list['Price']: + """ + Aggregates historical prices for the same symbol by calculating the mean. + Args: + prices (dict[str, list[Price]]): Map provider -> list of Price. + The map must contain only Price objects for the same symbol. + Returns: + list[Price]: List of Price objects aggregated by timestamp. + """ + + # Costruiamo una mappa timestamp -> lista di Price + timestamped_prices: dict[str, list[Price]] = {} + for _, price_list in prices.items(): + for price in price_list: + timestamped_prices.setdefault(price.timestamp, []).append(price) + + # Ora aggregiamo i prezzi per ogni timestamp + aggregated_prices: list[Price] = [] + for time, price_list in timestamped_prices.items(): + price = Price() + price.timestamp = time + price.high = statistics.mean([p.high for p in price_list]) + price.low = statistics.mean([p.low for p in price_list]) + price.open = statistics.mean([p.open for p in price_list]) + price.close = statistics.mean([p.close for p in price_list]) + price.volume = statistics.mean([p.volume for p in price_list]) + aggregated_prices.append(price) + return aggregated_prices + class MarketWrapper: """ Base class for market API wrappers. All market API wrappers should inherit from this class and implement the methods. + Provides interface for retrieving product and price information from market APIs. """ def get_product(self, asset_id: str) -> ProductInfo: @@ -82,66 +150,3 @@ class MarketWrapper: list[Price]: A list of Price objects. """ raise NotImplementedError("This method should be overridden by subclasses") - - -def aggregate_history_prices(prices: dict[str, list[Price]]) -> list[Price]: - """ - Aggrega i prezzi storici per symbol calcolando la media. - Args: - prices (dict[str, list[Price]]): Mappa provider -> lista di Price - Returns: - list[Price]: Lista di Price aggregati per timestamp - """ - - # Costruiamo una mappa timestamp -> lista di Price - timestamped_prices: dict[str, list[Price]] = {} - for _, price_list in prices.items(): - for price in price_list: - timestamped_prices.setdefault(price.timestamp, []).append(price) - - # Ora aggregiamo i prezzi per ogni timestamp - aggregated_prices: list[Price] = [] - for time, price_list in timestamped_prices.items(): - price = Price() - price.timestamp = time - price.high = statistics.mean([p.high for p in price_list]) - price.low = statistics.mean([p.low for p in price_list]) - price.open = statistics.mean([p.open for p in price_list]) - price.close = statistics.mean([p.close for p in price_list]) - price.volume = statistics.mean([p.volume for p in price_list]) - aggregated_prices.append(price) - return aggregated_prices - -def aggregate_product_info(products: dict[str, list[ProductInfo]]) -> list[ProductInfo]: - """ - Aggrega una lista di ProductInfo per symbol. - Args: - products (dict[str, list[ProductInfo]]): Mappa provider -> lista di ProductInfo - Returns: - list[ProductInfo]: Lista di ProductInfo aggregati per symbol - """ - - # Costruzione mappa symbol -> lista di ProductInfo - symbols_infos: dict[str, list[ProductInfo]] = {} - for _, product_list in products.items(): - for product in product_list: - symbols_infos.setdefault(product.symbol, []).append(product) - - # Aggregazione per ogni symbol - aggregated_products: list[ProductInfo] = [] - for symbol, product_list in symbols_infos.items(): - product = ProductInfo() - - product.id = f"{symbol}_AGGREGATED" - product.symbol = symbol - product.currency = next(p.currency for p in product_list if p.currency) - - volume_sum = sum(p.volume_24h for p in product_list) - product.volume_24h = volume_sum / len(product_list) if product_list else 0.0 - - prices = sum(p.price * p.volume_24h for p in product_list) - product.price = (prices / volume_sum) if volume_sum > 0 else 0.0 - - aggregated_products.append(product) - return aggregated_products - diff --git a/src/app/api/base/news.py b/src/app/api/base/news.py index 8a0d51e..1f67999 100644 --- a/src/app/api/base/news.py +++ b/src/app/api/base/news.py @@ -2,6 +2,9 @@ from pydantic import BaseModel class Article(BaseModel): + """ + Represents a news article with source, time, title, and description. + """ source: str = "" time: str = "" title: str = "" @@ -11,11 +14,12 @@ class NewsWrapper: """ Base class for news API wrappers. All news API wrappers should inherit from this class and implement the methods. + Provides interface for retrieving news articles from news APIs. """ def get_top_headlines(self, limit: int = 100) -> list[Article]: """ - Get top headlines, optionally limited by limit. + Retrieve top headlines, optionally limited by the specified number. Args: limit (int): The maximum number of articles to return. Returns: @@ -25,7 +29,7 @@ class NewsWrapper: def get_latest_news(self, query: str, limit: int = 100) -> list[Article]: """ - Get latest news based on a query. + Retrieve the latest news based on a search query. Args: query (str): The search query. limit (int): The maximum number of articles to return. diff --git a/src/app/api/base/social.py b/src/app/api/base/social.py index dd894f5..721ac0c 100644 --- a/src/app/api/base/social.py +++ b/src/app/api/base/social.py @@ -2,12 +2,18 @@ from pydantic import BaseModel class SocialPost(BaseModel): + """ + Represents a social media post with time, title, description, and comments. + """ time: str = "" title: str = "" description: str = "" comments: list["SocialComment"] = [] class SocialComment(BaseModel): + """ + Represents a comment on a social media post. + """ time: str = "" description: str = "" @@ -16,11 +22,12 @@ class SocialWrapper: """ Base class for social media API wrappers. All social media API wrappers should inherit from this class and implement the methods. + Provides interface for retrieving social media posts and comments from APIs. """ def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]: """ - Get top cryptocurrency-related posts, optionally limited by total. + Retrieve top cryptocurrency-related posts, optionally limited by the specified number. Args: limit (int): The maximum number of posts to return. Returns: diff --git a/src/app/api/markets/__init__.py b/src/app/api/markets/__init__.py index 784913a..9a48853 100644 --- a/src/app/api/markets/__init__.py +++ b/src/app/api/markets/__init__.py @@ -1,6 +1,6 @@ from agno.tools import Toolkit from app.api.wrapper_handler import WrapperHandler -from app.api.base.markets import MarketWrapper, Price, ProductInfo, aggregate_history_prices, aggregate_product_info +from app.api.base.markets import MarketWrapper, Price, ProductInfo from app.api.markets.binance import BinanceWrapper from app.api.markets.coinbase import CoinBaseWrapper from app.api.markets.cryptocompare import CryptoCompareWrapper @@ -68,7 +68,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit): Exception: If all wrappers fail to provide results. """ all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids)) - return aggregate_product_info(all_products) + return ProductInfo.aggregate(all_products) def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]: """ @@ -83,4 +83,4 @@ class MarketAPIsTool(MarketWrapper, Toolkit): Exception: If all wrappers fail to provide results. """ all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit)) - return aggregate_history_prices(all_prices) + return Price.aggregate(all_prices) diff --git a/tests/api/test_binance.py b/tests/api/test_binance.py index 46c6c2b..4fee373 100644 --- a/tests/api/test_binance.py +++ b/tests/api/test_binance.py @@ -1,6 +1,19 @@ import pytest +import asyncio from app.api.markets.binance import BinanceWrapper +# fix warning about no event loop +@pytest.fixture(scope="session", autouse=True) +def event_loop(): + """ + Ensure there is an event loop for the duration of the tests. + """ + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + yield loop + loop.close() + + @pytest.mark.market @pytest.mark.api class TestBinance: diff --git a/tests/utils/test_market_aggregator.py b/tests/utils/test_market_aggregator.py index fb789b3..8c6ea18 100644 --- a/tests/utils/test_market_aggregator.py +++ b/tests/utils/test_market_aggregator.py @@ -1,6 +1,6 @@ import pytest from datetime import datetime -from app.api.base.markets import ProductInfo, Price, aggregate_history_prices, aggregate_product_info +from app.api.base.markets import ProductInfo, Price @pytest.mark.aggregator @@ -33,7 +33,7 @@ class TestMarketDataAggregator: "Provider3": [self.__product("BTC", 49900.0, 900.0, "USD")], } - aggregated = aggregate_product_info(products) + aggregated = ProductInfo.aggregate(products) assert len(aggregated) == 1 info = aggregated[0] @@ -57,7 +57,7 @@ class TestMarketDataAggregator: ], } - aggregated = aggregate_product_info(products) + aggregated = ProductInfo.aggregate(products) assert len(aggregated) == 2 btc_info = next((p for p in aggregated if p.symbol == "BTC"), None) @@ -80,7 +80,7 @@ class TestMarketDataAggregator: "Provider1": [], "Provider2": [], } - aggregated = aggregate_product_info(products) + aggregated = ProductInfo.aggregate(products) assert len(aggregated) == 0 def test_aggregate_product_info_with_partial_data(self): @@ -88,7 +88,7 @@ class TestMarketDataAggregator: "Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")], "Provider2": [], } - aggregated = aggregate_product_info(products) + aggregated = ProductInfo.aggregate(products) assert len(aggregated) == 1 info = aggregated[0] assert info.symbol == "BTC" @@ -119,7 +119,7 @@ class TestMarketDataAggregator: price.set_timestamp(timestamp_s=timestamp_2h_ago) timestamp_2h_ago = price.timestamp - aggregated = aggregate_history_prices(prices) + aggregated = Price.aggregate(prices) assert len(aggregated) == 2 assert aggregated[0].timestamp == timestamp_1h_ago assert aggregated[0].high == pytest.approx(50050.0, rel=1e-3) # type: ignore