Refactor project structure to organize APIs #24

Merged
Berack96 merged 4 commits from api-modules into main 2025-10-11 21:36:13 +02:00
6 changed files with 113 additions and 84 deletions
Showing only changes of commit 757da56a21 - Show all commits

View File

@@ -5,8 +5,8 @@ from pydantic import BaseModel
class ProductInfo(BaseModel):
"""
Informazioni sul prodotto, come ottenute dalle API di mercato.
Implementa i metodi di conversione dai dati grezzi delle API.
Product information as obtained from market APIs.
Implements conversion methods from raw API data.
"""
id: str = ""
symbol: str = ""
@@ -14,10 +14,46 @@ class ProductInfo(BaseModel):
volume_24h: float = 0.0
currency: str = ""
@staticmethod
def aggregate(products: dict[str, list['ProductInfo']]) -> list['ProductInfo']:
"""
Aggregates a list of ProductInfo by symbol.
Args:
products (dict[str, list[ProductInfo]]): Map provider -> list of ProductInfo
Returns:
list[ProductInfo]: List of ProductInfo aggregated by symbol
"""
# Costruzione mappa symbol -> lista di ProductInfo
symbols_infos: dict[str, list[ProductInfo]] = {}
for _, product_list in products.items():
for product in product_list:
symbols_infos.setdefault(product.symbol, []).append(product)
# Aggregazione per ogni symbol
aggregated_products: list[ProductInfo] = []
for symbol, product_list in symbols_infos.items():
product = ProductInfo()
product.id = f"{symbol}_AGGREGATED"
product.symbol = symbol
product.currency = next(p.currency for p in product_list if p.currency)
volume_sum = sum(p.volume_24h for p in product_list)
product.volume_24h = volume_sum / len(product_list) if product_list else 0.0
prices = sum(p.price * p.volume_24h for p in product_list)
product.price = (prices / volume_sum) if volume_sum > 0 else 0.0
aggregated_products.append(product)
return aggregated_products
class Price(BaseModel):
"""
Rappresenta i dati di prezzo per un asset, come ottenuti dalle API di mercato.
Implementa i metodi di conversione dai dati grezzi delle API.
Represents price data for an asset as obtained from market APIs.
Implements conversion methods from raw API data.
"""
high: float = 0.0
low: float = 0.0
@@ -25,16 +61,17 @@ class Price(BaseModel):
close: float = 0.0
volume: float = 0.0
timestamp: str = ""
"""Timestamp con formato YYYY-MM-DD HH:MM"""
"""Timestamp in format YYYY-MM-DD HH:MM"""
def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None:
"""
Imposta il timestamp a partire da millisecondi o secondi.
IL timestamp viene salvato come stringa formattata 'YYYY-MM-DD HH:MM'.
Sets the timestamp from milliseconds or seconds.
The timestamp is saved as a formatted string 'YYYY-MM-DD HH:MM'.
Args:
timestamp_ms: Timestamp in millisecondi.
timestamp_s: Timestamp in secondi.
timestamp_ms: Timestamp in milliseconds.
timestamp_s: Timestamp in seconds.
Raises:
ValueError: If neither timestamp_ms nor timestamp_s is provided.
"""
if timestamp_ms is not None:
timestamp = timestamp_ms // 1000
@@ -46,10 +83,41 @@ class Price(BaseModel):
self.timestamp = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M')
@staticmethod
def aggregate(prices: dict[str, list['Price']]) -> list['Price']:
"""
Aggregates historical prices for the same symbol by calculating the mean.
Args:
prices (dict[str, list[Price]]): Map provider -> list of Price.
The map must contain only Price objects for the same symbol.
Returns:
list[Price]: List of Price objects aggregated by timestamp.
"""
# Costruiamo una mappa timestamp -> lista di Price
timestamped_prices: dict[str, list[Price]] = {}
for _, price_list in prices.items():
for price in price_list:
timestamped_prices.setdefault(price.timestamp, []).append(price)
# Ora aggregiamo i prezzi per ogni timestamp
aggregated_prices: list[Price] = []
for time, price_list in timestamped_prices.items():
price = Price()
price.timestamp = time
price.high = statistics.mean([p.high for p in price_list])
price.low = statistics.mean([p.low for p in price_list])
price.open = statistics.mean([p.open for p in price_list])
price.close = statistics.mean([p.close for p in price_list])
price.volume = statistics.mean([p.volume for p in price_list])
aggregated_prices.append(price)
return aggregated_prices
class MarketWrapper:
"""
Base class for market API wrappers.
All market API wrappers should inherit from this class and implement the methods.
Provides interface for retrieving product and price information from market APIs.
"""
def get_product(self, asset_id: str) -> ProductInfo:
@@ -82,66 +150,3 @@ class MarketWrapper:
list[Price]: A list of Price objects.
"""
raise NotImplementedError("This method should be overridden by subclasses")
def aggregate_history_prices(prices: dict[str, list[Price]]) -> list[Price]:
"""
Aggrega i prezzi storici per symbol calcolando la media.
Args:
prices (dict[str, list[Price]]): Mappa provider -> lista di Price
Returns:
list[Price]: Lista di Price aggregati per timestamp
"""
# Costruiamo una mappa timestamp -> lista di Price
timestamped_prices: dict[str, list[Price]] = {}
for _, price_list in prices.items():
for price in price_list:
timestamped_prices.setdefault(price.timestamp, []).append(price)
# Ora aggregiamo i prezzi per ogni timestamp
aggregated_prices: list[Price] = []
for time, price_list in timestamped_prices.items():
price = Price()
price.timestamp = time
price.high = statistics.mean([p.high for p in price_list])
price.low = statistics.mean([p.low for p in price_list])
price.open = statistics.mean([p.open for p in price_list])
price.close = statistics.mean([p.close for p in price_list])
price.volume = statistics.mean([p.volume for p in price_list])
aggregated_prices.append(price)
return aggregated_prices
def aggregate_product_info(products: dict[str, list[ProductInfo]]) -> list[ProductInfo]:
"""
Aggrega una lista di ProductInfo per symbol.
Args:
products (dict[str, list[ProductInfo]]): Mappa provider -> lista di ProductInfo
Returns:
list[ProductInfo]: Lista di ProductInfo aggregati per symbol
"""
# Costruzione mappa symbol -> lista di ProductInfo
symbols_infos: dict[str, list[ProductInfo]] = {}
for _, product_list in products.items():
for product in product_list:
symbols_infos.setdefault(product.symbol, []).append(product)
# Aggregazione per ogni symbol
aggregated_products: list[ProductInfo] = []
for symbol, product_list in symbols_infos.items():
product = ProductInfo()
product.id = f"{symbol}_AGGREGATED"
product.symbol = symbol
product.currency = next(p.currency for p in product_list if p.currency)
volume_sum = sum(p.volume_24h for p in product_list)
product.volume_24h = volume_sum / len(product_list) if product_list else 0.0
prices = sum(p.price * p.volume_24h for p in product_list)
product.price = (prices / volume_sum) if volume_sum > 0 else 0.0
aggregated_products.append(product)
return aggregated_products

View File

@@ -2,6 +2,9 @@ from pydantic import BaseModel
class Article(BaseModel):
"""
Represents a news article with source, time, title, and description.
"""
source: str = ""
time: str = ""
title: str = ""
@@ -11,11 +14,12 @@ class NewsWrapper:
"""
Base class for news API wrappers.
All news API wrappers should inherit from this class and implement the methods.
Provides interface for retrieving news articles from news APIs.
"""
def get_top_headlines(self, limit: int = 100) -> list[Article]:
"""
Get top headlines, optionally limited by limit.
Retrieve top headlines, optionally limited by the specified number.
Args:
limit (int): The maximum number of articles to return.
Returns:
@@ -25,7 +29,7 @@ class NewsWrapper:
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
"""
Get latest news based on a query.
Retrieve the latest news based on a search query.
Args:
query (str): The search query.
limit (int): The maximum number of articles to return.

View File

@@ -2,12 +2,18 @@ from pydantic import BaseModel
class SocialPost(BaseModel):
"""
Represents a social media post with time, title, description, and comments.
"""
time: str = ""
title: str = ""
description: str = ""
comments: list["SocialComment"] = []
class SocialComment(BaseModel):
"""
Represents a comment on a social media post.
"""
time: str = ""
description: str = ""
@@ -16,11 +22,12 @@ class SocialWrapper:
"""
Base class for social media API wrappers.
All social media API wrappers should inherit from this class and implement the methods.
Provides interface for retrieving social media posts and comments from APIs.
"""
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
"""
Get top cryptocurrency-related posts, optionally limited by total.
Retrieve top cryptocurrency-related posts, optionally limited by the specified number.
Args:
limit (int): The maximum number of posts to return.
Returns:

View File

@@ -1,6 +1,6 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.base.markets import MarketWrapper, Price, ProductInfo, aggregate_history_prices, aggregate_product_info
from app.api.base.markets import MarketWrapper, Price, ProductInfo
from app.api.markets.binance import BinanceWrapper
from app.api.markets.coinbase import CoinBaseWrapper
from app.api.markets.cryptocompare import CryptoCompareWrapper
@@ -68,7 +68,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
Exception: If all wrappers fail to provide results.
"""
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
return aggregate_product_info(all_products)
return ProductInfo.aggregate(all_products)
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
"""
@@ -83,4 +83,4 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
Exception: If all wrappers fail to provide results.
"""
all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit))
return aggregate_history_prices(all_prices)
return Price.aggregate(all_prices)

View File

@@ -1,6 +1,19 @@
import pytest
import asyncio
from app.api.markets.binance import BinanceWrapper
# fix warning about no event loop
@pytest.fixture(scope="session", autouse=True)
def event_loop():
"""
Ensure there is an event loop for the duration of the tests.
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
@pytest.mark.market
@pytest.mark.api
class TestBinance:

View File

@@ -1,6 +1,6 @@
import pytest
from datetime import datetime
from app.api.base.markets import ProductInfo, Price, aggregate_history_prices, aggregate_product_info
from app.api.base.markets import ProductInfo, Price
@pytest.mark.aggregator
@@ -33,7 +33,7 @@ class TestMarketDataAggregator:
"Provider3": [self.__product("BTC", 49900.0, 900.0, "USD")],
}
aggregated = aggregate_product_info(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 1
info = aggregated[0]
@@ -57,7 +57,7 @@ class TestMarketDataAggregator:
],
}
aggregated = aggregate_product_info(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 2
btc_info = next((p for p in aggregated if p.symbol == "BTC"), None)
@@ -80,7 +80,7 @@ class TestMarketDataAggregator:
"Provider1": [],
"Provider2": [],
}
aggregated = aggregate_product_info(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 0
def test_aggregate_product_info_with_partial_data(self):
@@ -88,7 +88,7 @@ class TestMarketDataAggregator:
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")],
"Provider2": [],
}
aggregated = aggregate_product_info(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 1
info = aggregated[0]
assert info.symbol == "BTC"
@@ -119,7 +119,7 @@ class TestMarketDataAggregator:
price.set_timestamp(timestamp_s=timestamp_2h_ago)
timestamp_2h_ago = price.timestamp
aggregated = aggregate_history_prices(prices)
aggregated = Price.aggregate(prices)
assert len(aggregated) == 2
assert aggregated[0].timestamp == timestamp_1h_ago
assert aggregated[0].high == pytest.approx(50050.0, rel=1e-3) # type: ignore