Merge branch 'main' into 2-news-api
This commit is contained in:
186
src/app/utils/aggregated_models.py
Normal file
186
src/app/utils/aggregated_models.py
Normal file
@@ -0,0 +1,186 @@
|
||||
import statistics
|
||||
from typing import Dict, List, Optional, Set
|
||||
from pydantic import BaseModel, Field, PrivateAttr
|
||||
from app.markets.base import ProductInfo
|
||||
|
||||
class AggregationMetadata(BaseModel):
|
||||
"""Metadati nascosti per debugging e audit trail"""
|
||||
sources_used: Set[str] = Field(default_factory=set, description="Exchange usati nell'aggregazione")
|
||||
sources_ignored: Set[str] = Field(default_factory=set, description="Exchange ignorati (errori)")
|
||||
aggregation_timestamp: str = Field(default="", description="Timestamp dell'aggregazione")
|
||||
confidence_score: float = Field(default=0.0, description="Score 0-1 sulla qualità dei dati")
|
||||
|
||||
class Config:
|
||||
# Nasconde questi campi dalla serializzazione di default
|
||||
extra = "forbid"
|
||||
|
||||
class AggregatedProductInfo(ProductInfo):
|
||||
"""
|
||||
Versione aggregata di ProductInfo che mantiene la trasparenza per l'utente finale
|
||||
mentre fornisce metadati di debugging opzionali.
|
||||
"""
|
||||
|
||||
# Override dei campi con logica di aggregazione
|
||||
id: str = Field(description="ID aggregato basato sul simbolo standardizzato")
|
||||
status: str = Field(description="Status aggregato (majority vote o conservative)")
|
||||
|
||||
# Campi privati per debugging (non visibili di default)
|
||||
_metadata: Optional[AggregationMetadata] = PrivateAttr(default=None)
|
||||
_source_data: Optional[Dict[str, ProductInfo]] = PrivateAttr(default=None)
|
||||
|
||||
@classmethod
|
||||
def from_multiple_sources(cls, products: List[ProductInfo]) -> 'AggregatedProductInfo':
|
||||
"""
|
||||
Crea un AggregatedProductInfo da una lista di ProductInfo.
|
||||
Usa strategie intelligenti per gestire ID e status.
|
||||
"""
|
||||
if not products:
|
||||
raise ValueError("Nessun prodotto da aggregare")
|
||||
|
||||
# Raggruppa per symbol (la chiave vera per l'aggregazione)
|
||||
symbol_groups = {}
|
||||
for product in products:
|
||||
if product.symbol not in symbol_groups:
|
||||
symbol_groups[product.symbol] = []
|
||||
symbol_groups[product.symbol].append(product)
|
||||
|
||||
# Per ora gestiamo un symbol alla volta
|
||||
if len(symbol_groups) > 1:
|
||||
raise ValueError(f"Simboli multipli non supportati: {list(symbol_groups.keys())}")
|
||||
|
||||
symbol_products = list(symbol_groups.values())[0]
|
||||
|
||||
# Estrai tutte le fonti
|
||||
sources = []
|
||||
for product in symbol_products:
|
||||
# Determina la fonte dall'ID o da altri metadati se disponibili
|
||||
source = cls._detect_source(product)
|
||||
sources.append(source)
|
||||
|
||||
# Aggrega i dati
|
||||
aggregated_data = cls._aggregate_products(symbol_products, sources)
|
||||
|
||||
# Crea l'istanza e assegna gli attributi privati
|
||||
instance = cls(**aggregated_data)
|
||||
instance._metadata = aggregated_data.get("_metadata")
|
||||
instance._source_data = aggregated_data.get("_source_data")
|
||||
|
||||
return instance
|
||||
|
||||
@staticmethod
|
||||
def _detect_source(product: ProductInfo) -> str:
|
||||
"""Rileva la fonte da un ProductInfo"""
|
||||
# Strategia semplice: usa pattern negli ID
|
||||
if "coinbase" in product.id.lower() or "cb" in product.id.lower():
|
||||
return "coinbase"
|
||||
elif "binance" in product.id.lower() or "bn" in product.id.lower():
|
||||
return "binance"
|
||||
elif "crypto" in product.id.lower() or "cc" in product.id.lower():
|
||||
return "cryptocompare"
|
||||
elif "yfinance" in product.id.lower() or "yf" in product.id.lower():
|
||||
return "yfinance"
|
||||
else:
|
||||
return "unknown"
|
||||
|
||||
@classmethod
|
||||
def _aggregate_products(cls, products: List[ProductInfo], sources: List[str]) -> dict:
|
||||
"""
|
||||
Logica di aggregazione principale.
|
||||
Gestisce ID, status e altri campi numerici.
|
||||
"""
|
||||
import statistics
|
||||
from datetime import datetime
|
||||
|
||||
# ID: usa il symbol come chiave standardizzata
|
||||
symbol = products[0].symbol
|
||||
aggregated_id = f"{symbol}_AGG"
|
||||
|
||||
# Status: strategia "conservativa" - il più restrittivo vince
|
||||
# Ordine: trading_only < limit_only < auction < maintenance < offline
|
||||
status_priority = {
|
||||
"trading": 1,
|
||||
"limit_only": 2,
|
||||
"auction": 3,
|
||||
"maintenance": 4,
|
||||
"offline": 5,
|
||||
"": 0 # Default se non specificato
|
||||
}
|
||||
|
||||
statuses = [p.status for p in products if p.status]
|
||||
if statuses:
|
||||
# Prendi lo status con priorità più alta (più restrittivo)
|
||||
aggregated_status = max(statuses, key=lambda s: status_priority.get(s, 0))
|
||||
else:
|
||||
aggregated_status = "trading" # Default ottimistico
|
||||
|
||||
# Prezzo: media semplice (uso diretto del campo price come float)
|
||||
prices = [p.price for p in products if p.price > 0]
|
||||
aggregated_price = statistics.mean(prices) if prices else 0.0
|
||||
|
||||
# Volume: somma (assumendo che i volumi siano esclusivi per exchange)
|
||||
volumes = [p.volume_24h for p in products if p.volume_24h > 0]
|
||||
total_volume = sum(volumes)
|
||||
aggregated_volume = sum(price_i * volume_i for price_i, volume_i in zip((p.price for p in products), (volume for volume in volumes))) / total_volume
|
||||
aggregated_volume = round(aggregated_volume, 5)
|
||||
# aggregated_volume = sum(volumes) if volumes else 0.0 # NOTE old implementation
|
||||
|
||||
# Valuta: prendi la prima (dovrebbero essere tutte uguali)
|
||||
quote_currency = next((p.quote_currency for p in products if p.quote_currency), "USD")
|
||||
|
||||
# Calcola confidence score
|
||||
confidence = cls._calculate_confidence(products, sources)
|
||||
|
||||
# Crea metadati per debugging
|
||||
metadata = AggregationMetadata(
|
||||
sources_used=set(sources),
|
||||
aggregation_timestamp=datetime.now().isoformat(),
|
||||
confidence_score=confidence
|
||||
)
|
||||
|
||||
# Salva dati sorgente per debugging
|
||||
source_data = dict(zip(sources, products))
|
||||
|
||||
return {
|
||||
"symbol": symbol,
|
||||
"price": aggregated_price,
|
||||
"volume_24h": aggregated_volume,
|
||||
"quote_currency": quote_currency,
|
||||
"id": aggregated_id,
|
||||
"status": aggregated_status,
|
||||
"_metadata": metadata,
|
||||
"_source_data": source_data
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _calculate_confidence(products: List[ProductInfo], sources: List[str]) -> float:
|
||||
"""Calcola un punteggio di confidenza 0-1"""
|
||||
if not products:
|
||||
return 0.0
|
||||
|
||||
score = 1.0
|
||||
|
||||
# Riduci score se pochi dati
|
||||
if len(products) < 2:
|
||||
score *= 0.7
|
||||
|
||||
# Riduci score se prezzi troppo diversi
|
||||
prices = [p.price for p in products if p.price > 0]
|
||||
if len(prices) > 1:
|
||||
price_std = (max(prices) - min(prices)) / statistics.mean(prices)
|
||||
if price_std > 0.05: # >5% variazione
|
||||
score *= 0.8
|
||||
|
||||
# Riduci score se fonti sconosciute
|
||||
unknown_sources = sum(1 for s in sources if s == "unknown")
|
||||
if unknown_sources > 0:
|
||||
score *= (1 - unknown_sources / len(sources))
|
||||
|
||||
return max(0.0, min(1.0, score))
|
||||
|
||||
def get_debug_info(self) -> dict:
|
||||
"""Metodo opzionale per ottenere informazioni di debug"""
|
||||
return {
|
||||
"aggregated_product": self.dict(),
|
||||
"metadata": self._metadata.dict() if self._metadata else None,
|
||||
"sources": list(self._source_data.keys()) if self._source_data else []
|
||||
}
|
||||
@@ -1,71 +0,0 @@
|
||||
import statistics
|
||||
from typing import Dict, Any
|
||||
|
||||
class MarketAggregator:
|
||||
"""
|
||||
Aggrega dati di mercato da più provider e genera segnali e analisi avanzate.
|
||||
"""
|
||||
@staticmethod
|
||||
def aggregate(symbol: str, sources: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
|
||||
prices = []
|
||||
volumes = []
|
||||
price_map = {}
|
||||
for provider, data in sources.items():
|
||||
price = data.get('price')
|
||||
if price is not None:
|
||||
prices.append(price)
|
||||
price_map[provider] = price
|
||||
volume = data.get('volume')
|
||||
if volume is not None:
|
||||
volumes.append(MarketAggregator._parse_volume(volume))
|
||||
|
||||
# Aggregated price (mean)
|
||||
agg_price = statistics.mean(prices) if prices else None
|
||||
# Spread analysis
|
||||
spread = (max(prices) - min(prices)) / agg_price if prices and agg_price else 0
|
||||
# Confidence
|
||||
stddev = statistics.stdev(prices) if len(prices) > 1 else 0
|
||||
confidence = max(0.5, 1 - (stddev / agg_price)) if agg_price else 0
|
||||
if spread < 0.005:
|
||||
confidence += 0.1
|
||||
if len(prices) >= 3:
|
||||
confidence += 0.05
|
||||
confidence = min(confidence, 1.0)
|
||||
# Volume trend
|
||||
total_volume = sum(volumes) if volumes else None
|
||||
# Price divergence
|
||||
max_deviation = (max(prices) - min(prices)) / agg_price if prices and agg_price else 0
|
||||
# Signals
|
||||
signals = {
|
||||
"spread_analysis": f"Low spread ({spread:.2%}) indicates healthy liquidity" if spread < 0.01 else f"Spread {spread:.2%} - check liquidity",
|
||||
"volume_trend": f"Combined volume: {total_volume:.2f}" if total_volume else "Volume data not available",
|
||||
"price_divergence": f"Max deviation: {max_deviation:.2%} - {'Normal range' if max_deviation < 0.01 else 'High divergence'}"
|
||||
}
|
||||
return {
|
||||
"aggregated_data": {
|
||||
f"{symbol}_USD": {
|
||||
"price": round(agg_price, 2) if agg_price else None,
|
||||
"confidence": round(confidence, 2),
|
||||
"sources_count": len(prices)
|
||||
}
|
||||
},
|
||||
"individual_sources": price_map,
|
||||
"market_signals": signals
|
||||
}
|
||||
@staticmethod
|
||||
def _parse_volume(volume: Any) -> float:
|
||||
# Supporta stringhe tipo "1.2M" o numeri
|
||||
if isinstance(volume, (int, float)):
|
||||
return float(volume)
|
||||
if isinstance(volume, str):
|
||||
v = volume.upper().replace(' ', '')
|
||||
if v.endswith('M'):
|
||||
return float(v[:-1]) * 1_000_000
|
||||
if v.endswith('K'):
|
||||
return float(v[:-1]) * 1_000
|
||||
try:
|
||||
return float(v)
|
||||
except Exception as e:
|
||||
print(f"Errore nel parsing del volume: {e}")
|
||||
return 0.0
|
||||
return 0.0
|
||||
184
src/app/utils/market_data_aggregator.py
Normal file
184
src/app/utils/market_data_aggregator.py
Normal file
@@ -0,0 +1,184 @@
|
||||
from typing import List, Optional, Dict, Any
|
||||
from app.markets.base import ProductInfo, Price
|
||||
from app.utils.aggregated_models import AggregatedProductInfo
|
||||
|
||||
class MarketDataAggregator:
|
||||
"""
|
||||
Aggregatore di dati di mercato che mantiene la trasparenza per l'utente.
|
||||
|
||||
Compone MarketAPIs per fornire gli stessi metodi, ma restituisce dati aggregati
|
||||
da tutte le fonti disponibili. L'utente finale non vede la complessità.
|
||||
"""
|
||||
|
||||
def __init__(self, currency: str = "USD"):
|
||||
# Import lazy per evitare circular import
|
||||
from app.markets import MarketAPIsTool
|
||||
self._market_apis = MarketAPIsTool(currency)
|
||||
self._aggregation_enabled = True
|
||||
|
||||
def get_product(self, asset_id: str) -> ProductInfo:
|
||||
"""
|
||||
Override che aggrega dati da tutte le fonti disponibili.
|
||||
Per l'utente sembra un normale ProductInfo.
|
||||
"""
|
||||
if not self._aggregation_enabled:
|
||||
return self._market_apis.get_product(asset_id)
|
||||
|
||||
# Raccogli dati da tutte le fonti
|
||||
try:
|
||||
raw_results = self.wrappers.try_call_all(
|
||||
lambda wrapper: wrapper.get_product(asset_id)
|
||||
)
|
||||
|
||||
# Converti in ProductInfo se necessario
|
||||
products = []
|
||||
for wrapper_class, result in raw_results.items():
|
||||
if isinstance(result, ProductInfo):
|
||||
products.append(result)
|
||||
elif isinstance(result, dict):
|
||||
# Converti dizionario in ProductInfo
|
||||
products.append(ProductInfo(**result))
|
||||
|
||||
if not products:
|
||||
raise Exception("Nessun dato disponibile")
|
||||
|
||||
# Aggrega i risultati
|
||||
aggregated = AggregatedProductInfo.from_multiple_sources(products)
|
||||
|
||||
# Restituisci come ProductInfo normale (nascondi la complessità)
|
||||
return ProductInfo(**aggregated.dict(exclude={"_metadata", "_source_data"}))
|
||||
|
||||
except Exception as e:
|
||||
# Fallback: usa il comportamento normale se l'aggregazione fallisce
|
||||
return self._market_apis.get_product(asset_id)
|
||||
|
||||
def get_products(self, asset_ids: List[str]) -> List[ProductInfo]:
|
||||
"""
|
||||
Aggrega dati per multiple asset.
|
||||
"""
|
||||
if not self._aggregation_enabled:
|
||||
return self._market_apis.get_products(asset_ids)
|
||||
|
||||
aggregated_products = []
|
||||
|
||||
for asset_id in asset_ids:
|
||||
try:
|
||||
product = self.get_product(asset_id)
|
||||
aggregated_products.append(product)
|
||||
except Exception as e:
|
||||
# Salta asset che non riescono ad aggregare
|
||||
continue
|
||||
|
||||
return aggregated_products
|
||||
|
||||
def get_all_products(self) -> List[ProductInfo]:
|
||||
"""
|
||||
Aggrega tutti i prodotti disponibili.
|
||||
"""
|
||||
if not self._aggregation_enabled:
|
||||
return self._market_apis.get_all_products()
|
||||
|
||||
# Raccogli tutti i prodotti da tutte le fonti
|
||||
try:
|
||||
all_products_by_source = self.wrappers.try_call_all(
|
||||
lambda wrapper: wrapper.get_all_products()
|
||||
)
|
||||
|
||||
# Raggruppa per symbol per aggregare
|
||||
symbol_groups = {}
|
||||
for wrapper_class, products in all_products_by_source.items():
|
||||
if not isinstance(products, list):
|
||||
continue
|
||||
|
||||
for product in products:
|
||||
if isinstance(product, dict):
|
||||
product = ProductInfo(**product)
|
||||
|
||||
if product.symbol not in symbol_groups:
|
||||
symbol_groups[product.symbol] = []
|
||||
symbol_groups[product.symbol].append(product)
|
||||
|
||||
# Aggrega ogni gruppo
|
||||
aggregated_products = []
|
||||
for symbol, products in symbol_groups.items():
|
||||
try:
|
||||
aggregated = AggregatedProductInfo.from_multiple_sources(products)
|
||||
# Restituisci come ProductInfo normale
|
||||
aggregated_products.append(
|
||||
ProductInfo(**aggregated.dict(exclude={"_metadata", "_source_data"}))
|
||||
)
|
||||
except Exception:
|
||||
# Se l'aggregazione fallisce, usa il primo disponibile
|
||||
if products:
|
||||
aggregated_products.append(products[0])
|
||||
|
||||
return aggregated_products
|
||||
|
||||
except Exception as e:
|
||||
# Fallback: usa il comportamento normale
|
||||
return self._market_apis.get_all_products()
|
||||
|
||||
def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> List[Price]:
|
||||
"""
|
||||
Per i dati storici, usa una strategia diversa:
|
||||
prendi i dati dalla fonte più affidabile o aggrega se possibile.
|
||||
"""
|
||||
if not self._aggregation_enabled:
|
||||
return self._market_apis.get_historical_prices(asset_id, limit)
|
||||
|
||||
# Per dati storici, usa il primo wrapper che funziona
|
||||
# (l'aggregazione di dati storici è più complessa)
|
||||
try:
|
||||
return self.wrappers.try_call(
|
||||
lambda wrapper: wrapper.get_historical_prices(asset_id, limit)
|
||||
)
|
||||
except Exception as e:
|
||||
# Fallback: usa il comportamento normale
|
||||
return self._market_apis.get_historical_prices(asset_id, limit)
|
||||
|
||||
def enable_aggregation(self, enabled: bool = True):
|
||||
"""Abilita o disabilita l'aggregazione"""
|
||||
self._aggregation_enabled = enabled
|
||||
|
||||
def is_aggregation_enabled(self) -> bool:
|
||||
"""Controlla se l'aggregazione è abilitata"""
|
||||
return self._aggregation_enabled
|
||||
|
||||
# Metodi proxy per completare l'interfaccia BaseWrapper
|
||||
@property
|
||||
def wrappers(self):
|
||||
"""Accesso al wrapper handler per compatibilità"""
|
||||
return self._market_apis.wrappers
|
||||
|
||||
def get_aggregated_product_with_debug(self, asset_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Metodo speciale per debugging: restituisce dati aggregati con metadati.
|
||||
Usato solo per testing e monitoraggio.
|
||||
"""
|
||||
try:
|
||||
raw_results = self.wrappers.try_call_all(
|
||||
lambda wrapper: wrapper.get_product(asset_id)
|
||||
)
|
||||
|
||||
products = []
|
||||
for wrapper_class, result in raw_results.items():
|
||||
if isinstance(result, ProductInfo):
|
||||
products.append(result)
|
||||
elif isinstance(result, dict):
|
||||
products.append(ProductInfo(**result))
|
||||
|
||||
if not products:
|
||||
raise Exception("Nessun dato disponibile")
|
||||
|
||||
aggregated = AggregatedProductInfo.from_multiple_sources(products)
|
||||
|
||||
return {
|
||||
"product": aggregated.dict(exclude={"_metadata", "_source_data"}),
|
||||
"debug": aggregated.get_debug_info()
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"error": str(e),
|
||||
"debug": {"error": str(e)}
|
||||
}
|
||||
Reference in New Issue
Block a user