import statistics from app.markets.base import ProductInfo, Price def aggregate_history_prices(prices: dict[str, list[Price]]) -> list[Price]: """ Aggrega i prezzi storici per symbol calcolando la media oraria. Args: prices (dict[str, list[Price]]): Mappa provider -> lista di Price Returns: list[Price]: Lista di Price aggregati per ora """ # Costruiamo una mappa timestamp_h -> lista di Price timestamped_prices: dict[int, list[Price]] = {} for _, price_list in prices.items(): for price in price_list: time = price.timestamp_ms - (price.timestamp_ms % 3600000) # arrotonda all'ora (non dovrebbe essere necessario) timestamped_prices.setdefault(time, []).append(price) # Ora aggregiamo i prezzi per ogni ora aggregated_prices = [] for time, price_list in timestamped_prices.items(): price = Price() price.timestamp_ms = time price.high = statistics.mean([p.high for p in price_list]) price.low = statistics.mean([p.low for p in price_list]) price.open = statistics.mean([p.open for p in price_list]) price.close = statistics.mean([p.close for p in price_list]) price.volume = statistics.mean([p.volume for p in price_list]) aggregated_prices.append(price) return aggregated_prices def aggregate_product_info(products: dict[str, list[ProductInfo]]) -> list[ProductInfo]: """ Aggrega una lista di ProductInfo per symbol. Args: products (dict[str, list[ProductInfo]]): Mappa provider -> lista di ProductInfo Returns: list[ProductInfo]: Lista di ProductInfo aggregati per symbol """ # Costruzione mappa symbol -> lista di ProductInfo symbols_infos: dict[str, list[ProductInfo]] = {} for _, product_list in products.items(): for product in product_list: symbols_infos.setdefault(product.symbol, []).append(product) # Aggregazione per ogni symbol sources = list(products.keys()) aggregated_products = [] for symbol, product_list in symbols_infos.items(): product = ProductInfo() product.id = f"{symbol}_AGGREGATED" product.symbol = symbol product.quote_currency = next(p.quote_currency for p in product_list if p.quote_currency) volume_sum = sum(p.volume_24h for p in product_list) product.volume_24h = volume_sum / len(product_list) if product_list else 0.0 prices = sum(p.price * p.volume_24h for p in product_list) product.price = (prices / volume_sum) if volume_sum > 0 else 0.0 aggregated_products.append(product) return aggregated_products def _calculate_confidence(products: list[ProductInfo], sources: list[str]) -> float: """Calcola un punteggio di confidenza 0-1""" if not products: return 0.0 score = 1.0 # Riduci score se pochi dati if len(products) < 2: score *= 0.7 # Riduci score se prezzi troppo diversi prices = [p.price for p in products if p.price > 0] if len(prices) > 1: price_std = (max(prices) - min(prices)) / statistics.mean(prices) if price_std > 0.05: # >5% variazione score *= 0.8 # Riduci score se fonti sconosciute unknown_sources = sum(1 for s in sources if s == "unknown") if unknown_sources > 0: score *= (1 - unknown_sources / len(sources)) return max(0.0, min(1.0, score))