feat: add detailed market instructions and improve error handling in price aggregation methods

This commit is contained in:
2025-10-02 00:28:42 +02:00
parent 31f38efdf5
commit 8b81cd5a71
4 changed files with 67 additions and 23 deletions

View File

@@ -4,10 +4,12 @@ from app.markets.base import ProductInfo, Price
def aggregate_history_prices(prices: dict[str, list[Price]]) -> list[Price]:
"""
Aggrega i prezzi storici per symbol calcolando la media
Aggrega i prezzi storici per symbol calcolando la media oraria.
Args:
prices (dict[str, list[Price]]): Mappa provider -> lista di Price
Returns:
list[Price]: Lista di Price aggregati per ora
"""
max_list_length = max(len(p) for p in prices.values())
# Costruiamo una mappa timestamp_h -> lista di Price
timestamped_prices: dict[int, list[Price]] = {}
@@ -27,13 +29,15 @@ def aggregate_history_prices(prices: dict[str, list[Price]]) -> list[Price]:
price.close = statistics.mean([p.close for p in price_list])
price.volume = statistics.mean([p.volume for p in price_list])
aggregated_prices.append(price)
assert(len(aggregated_prices) <= max_list_length)
return aggregated_prices
def aggregate_product_info(products: dict[str, list[ProductInfo]]) -> list[ProductInfo]:
"""
Aggrega una lista di ProductInfo per symbol.
Args:
products (dict[str, list[ProductInfo]]): Mappa provider -> lista di ProductInfo
Returns:
list[ProductInfo]: Lista di ProductInfo aggregati per symbol
"""
# Costruzione mappa symbol -> lista di ProductInfo
@@ -48,15 +52,16 @@ def aggregate_product_info(products: dict[str, list[ProductInfo]]) -> list[Produ
for symbol, product_list in symbols_infos.items():
product = ProductInfo()
product.id = f"{symbol}_AGG"
product.id = f"{symbol}_AGGREGATED"
product.symbol = symbol
product.quote_currency = next(p.quote_currency for p in product_list if p.quote_currency)
prices = [p.price for p in product_list]
product.price = statistics.mean(prices)
volume_sum = sum(p.volume_24h for p in product_list)
product.volume_24h = volume_sum / len(product_list) if product_list else 0.0
prices = sum(p.price * p.volume_24h for p in product_list)
product.price = (prices / volume_sum) if volume_sum > 0 else 0.0
volumes = [p.volume_24h for p in product_list]
product.volume_24h = sum([p * v for p, v in zip(prices, volumes)]) / sum(volumes)
aggregated_products.append(product)
confidence = _calculate_confidence(product_list, sources) # TODO necessary?