refactor: clean up imports and remove unused files
This commit is contained in:
@@ -6,9 +6,6 @@
|
||||
# Vedi https://docs.agno.com/examples/models per vedere tutti i modelli supportati
|
||||
GOOGLE_API_KEY=
|
||||
|
||||
# Inserire il percorso di installazione di ollama (es. /usr/share/ollama/.ollama)
|
||||
# attenzione che fra Linux nativo e WSL il percorso è diverso
|
||||
OLLAMA_MODELS_PATH=
|
||||
###############################################################################
|
||||
# Configurazioni per gli agenti di mercato
|
||||
###############################################################################
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
from typing import List, Optional
|
||||
from agno.tools import Toolkit
|
||||
from app.utils.wrapper_handler import WrapperHandler
|
||||
from .base import BaseWrapper, ProductInfo, Price
|
||||
from .coinbase import CoinBaseWrapper
|
||||
from .binance import BinanceWrapper
|
||||
from .cryptocompare import CryptoCompareWrapper
|
||||
from .yfinance import YFinanceWrapper
|
||||
from .binance_public import PublicBinanceAgent
|
||||
from app.utils.wrapper_handler import WrapperHandler
|
||||
from typing import List, Optional
|
||||
from agno.tools import Toolkit
|
||||
|
||||
|
||||
__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "PublicBinanceAgent" ]
|
||||
|
||||
|
||||
@@ -8,11 +8,11 @@ def create_product_info(symbol: str, stock_data: dict) -> ProductInfo:
|
||||
Converte i dati di YFinanceTools in ProductInfo.
|
||||
"""
|
||||
product = ProductInfo()
|
||||
|
||||
|
||||
# ID univoco per yfinance
|
||||
product.id = f"yfinance_{symbol}"
|
||||
product.symbol = symbol
|
||||
|
||||
|
||||
# Estrai il prezzo corrente - gestisci diversi formati
|
||||
if 'currentPrice' in stock_data:
|
||||
product.price = float(stock_data['currentPrice'])
|
||||
@@ -27,7 +27,7 @@ def create_product_info(symbol: str, stock_data: dict) -> ProductInfo:
|
||||
product.price = 0.0
|
||||
else:
|
||||
product.price = 0.0
|
||||
|
||||
|
||||
# Volume 24h
|
||||
if 'volume' in stock_data:
|
||||
product.volume_24h = float(stock_data['volume'])
|
||||
@@ -35,13 +35,13 @@ def create_product_info(symbol: str, stock_data: dict) -> ProductInfo:
|
||||
product.volume_24h = float(stock_data['regularMarketVolume'])
|
||||
else:
|
||||
product.volume_24h = 0.0
|
||||
|
||||
|
||||
# Status basato sulla disponibilità dei dati
|
||||
product.status = "trading" if product.price > 0 else "offline"
|
||||
|
||||
|
||||
# Valuta (default USD)
|
||||
product.quote_currency = stock_data.get('currency', 'USD') or 'USD'
|
||||
|
||||
|
||||
return product
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ def create_price_from_history(hist_data: dict, timestamp: str) -> Price:
|
||||
Converte i dati storici di YFinanceTools in Price.
|
||||
"""
|
||||
price = Price()
|
||||
|
||||
|
||||
if timestamp in hist_data:
|
||||
day_data = hist_data[timestamp]
|
||||
price.high = float(day_data.get('High', 0.0))
|
||||
@@ -59,7 +59,7 @@ def create_price_from_history(hist_data: dict, timestamp: str) -> Price:
|
||||
price.close = float(day_data.get('Close', 0.0))
|
||||
price.volume = float(day_data.get('Volume', 0.0))
|
||||
price.time = timestamp
|
||||
|
||||
|
||||
return price
|
||||
|
||||
|
||||
@@ -69,41 +69,41 @@ class YFinanceWrapper(BaseWrapper):
|
||||
Implementa l'interfaccia BaseWrapper per compatibilità con il sistema esistente.
|
||||
Usa YFinanceTools dalla libreria agno per coerenza con altri wrapper.
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, currency: str = "USD"):
|
||||
self.currency = currency
|
||||
# Inizializza YFinanceTools - non richiede parametri specifici
|
||||
self.tool = YFinanceTools()
|
||||
|
||||
|
||||
def _format_symbol(self, asset_id: str) -> str:
|
||||
"""
|
||||
Formatta il simbolo per yfinance.
|
||||
Per crypto, aggiunge '-USD' se non presente.
|
||||
"""
|
||||
asset_id = asset_id.upper()
|
||||
|
||||
|
||||
# Se è già nel formato corretto (es: BTC-USD), usa così
|
||||
if '-' in asset_id:
|
||||
return asset_id
|
||||
|
||||
|
||||
# Per crypto singole (BTC, ETH), aggiungi -USD
|
||||
if asset_id in ['BTC', 'ETH', 'ADA', 'SOL', 'DOT', 'LINK', 'UNI', 'AAVE']:
|
||||
return f"{asset_id}-USD"
|
||||
|
||||
|
||||
# Per azioni, usa il simbolo così com'è
|
||||
return asset_id
|
||||
|
||||
|
||||
def get_product(self, asset_id: str) -> ProductInfo:
|
||||
"""
|
||||
Recupera le informazioni di un singolo prodotto.
|
||||
"""
|
||||
symbol = self._format_symbol(asset_id)
|
||||
|
||||
|
||||
# Usa YFinanceTools per ottenere i dati
|
||||
try:
|
||||
# Ottieni le informazioni base dello stock
|
||||
stock_info = self.tool.get_company_info(symbol)
|
||||
|
||||
|
||||
# Se il risultato è una stringa JSON, parsala
|
||||
if isinstance(stock_info, str):
|
||||
try:
|
||||
@@ -118,9 +118,9 @@ class YFinanceWrapper(BaseWrapper):
|
||||
raise Exception("Dati non validi")
|
||||
else:
|
||||
stock_data = stock_info
|
||||
|
||||
|
||||
return create_product_info(symbol, stock_data)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
# Fallback: prova a ottenere solo il prezzo
|
||||
try:
|
||||
@@ -140,13 +140,13 @@ class YFinanceWrapper(BaseWrapper):
|
||||
product.symbol = symbol
|
||||
product.status = "offline"
|
||||
return product
|
||||
|
||||
|
||||
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
|
||||
"""
|
||||
Recupera le informazioni di multiple assets.
|
||||
"""
|
||||
products = []
|
||||
|
||||
|
||||
for asset_id in asset_ids:
|
||||
try:
|
||||
product = self.get_product(asset_id)
|
||||
@@ -154,9 +154,9 @@ class YFinanceWrapper(BaseWrapper):
|
||||
except Exception as e:
|
||||
# Se un asset non è disponibile, continua con gli altri
|
||||
continue
|
||||
|
||||
|
||||
return products
|
||||
|
||||
|
||||
def get_all_products(self) -> list[ProductInfo]:
|
||||
"""
|
||||
Recupera tutti i prodotti disponibili.
|
||||
@@ -168,15 +168,15 @@ class YFinanceWrapper(BaseWrapper):
|
||||
'AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN',
|
||||
'SPY', 'QQQ', 'VTI', 'GLD', 'VIX'
|
||||
]
|
||||
|
||||
|
||||
return self.get_products(popular_assets)
|
||||
|
||||
|
||||
def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
|
||||
"""
|
||||
Recupera i dati storici di prezzo per un asset.
|
||||
"""
|
||||
symbol = self._format_symbol(asset_id)
|
||||
|
||||
|
||||
try:
|
||||
# Determina il periodo appropriato in base al limite
|
||||
if limit <= 7:
|
||||
@@ -191,24 +191,24 @@ class YFinanceWrapper(BaseWrapper):
|
||||
else:
|
||||
period = "3mo"
|
||||
interval = "1d"
|
||||
|
||||
|
||||
# Ottieni i dati storici
|
||||
hist_data = self.tool.get_historical_stock_prices(symbol, period=period, interval=interval)
|
||||
|
||||
|
||||
if isinstance(hist_data, str):
|
||||
hist_data = json.loads(hist_data)
|
||||
|
||||
|
||||
# Il formato dei dati è {timestamp: {Open: x, High: y, Low: z, Close: w, Volume: v}}
|
||||
prices = []
|
||||
timestamps = sorted(hist_data.keys())[-limit:] # Prendi gli ultimi 'limit' timestamp
|
||||
|
||||
|
||||
for timestamp in timestamps:
|
||||
price = create_price_from_history(hist_data, timestamp)
|
||||
if price.close > 0: # Solo se ci sono dati validi
|
||||
prices.append(price)
|
||||
|
||||
|
||||
return prices
|
||||
|
||||
|
||||
except Exception as e:
|
||||
# Se fallisce, restituisci lista vuota
|
||||
return []
|
||||
@@ -1,29 +0,0 @@
|
||||
from agno.tools import Toolkit
|
||||
from app.markets import MarketAPIsTool
|
||||
|
||||
|
||||
# TODO (?) in futuro fare in modo che la LLM faccia da sé per il mercato
|
||||
# Non so se può essere utile, per ora lo lascio qui
|
||||
# per ora mettiamo tutto statico e poi, se abbiamo API-Key senza limiti
|
||||
# possiamo fare in modo di far scegliere alla LLM quale crypto proporre
|
||||
# in base alle sue proprie chiamate API
|
||||
class MarketToolkit(Toolkit):
|
||||
def __init__(self):
|
||||
self.market_api = MarketAPIs()
|
||||
|
||||
super().__init__(
|
||||
name="Market Toolkit",
|
||||
tools=[
|
||||
self.market_api.get_historical_prices,
|
||||
self.market_api.get_product,
|
||||
],
|
||||
)
|
||||
|
||||
def instructions():
|
||||
return """
|
||||
Utilizza questo strumento per ottenere dati di mercato storici e attuali per criptovalute specifiche.
|
||||
Puoi richiedere i prezzi storici o il prezzo attuale di una criptovaluta specifica.
|
||||
Esempio di utilizzo:
|
||||
- get_historical_prices("BTC", limit=10) # ottieni gli ultimi 10 prezzi storici di Bitcoin
|
||||
- get_product("ETH")
|
||||
"""
|
||||
@@ -9,7 +9,7 @@ class AggregationMetadata(BaseModel):
|
||||
sources_ignored: Set[str] = Field(default_factory=set, description="Exchange ignorati (errori)")
|
||||
aggregation_timestamp: str = Field(default="", description="Timestamp dell'aggregazione")
|
||||
confidence_score: float = Field(default=0.0, description="Score 0-1 sulla qualità dei dati")
|
||||
|
||||
|
||||
class Config:
|
||||
# Nasconde questi campi dalla serializzazione di default
|
||||
extra = "forbid"
|
||||
@@ -19,15 +19,15 @@ class AggregatedProductInfo(ProductInfo):
|
||||
Versione aggregata di ProductInfo che mantiene la trasparenza per l'utente finale
|
||||
mentre fornisce metadati di debugging opzionali.
|
||||
"""
|
||||
|
||||
|
||||
# Override dei campi con logica di aggregazione
|
||||
id: str = Field(description="ID aggregato basato sul simbolo standardizzato")
|
||||
status: str = Field(description="Status aggregato (majority vote o conservative)")
|
||||
|
||||
|
||||
# Campi privati per debugging (non visibili di default)
|
||||
_metadata: Optional[AggregationMetadata] = PrivateAttr(default=None)
|
||||
_source_data: Optional[Dict[str, ProductInfo]] = PrivateAttr(default=None)
|
||||
|
||||
|
||||
@classmethod
|
||||
def from_multiple_sources(cls, products: List[ProductInfo]) -> 'AggregatedProductInfo':
|
||||
"""
|
||||
@@ -36,37 +36,37 @@ class AggregatedProductInfo(ProductInfo):
|
||||
"""
|
||||
if not products:
|
||||
raise ValueError("Nessun prodotto da aggregare")
|
||||
|
||||
|
||||
# Raggruppa per symbol (la chiave vera per l'aggregazione)
|
||||
symbol_groups = {}
|
||||
for product in products:
|
||||
if product.symbol not in symbol_groups:
|
||||
symbol_groups[product.symbol] = []
|
||||
symbol_groups[product.symbol].append(product)
|
||||
|
||||
|
||||
# Per ora gestiamo un symbol alla volta
|
||||
if len(symbol_groups) > 1:
|
||||
raise ValueError(f"Simboli multipli non supportati: {list(symbol_groups.keys())}")
|
||||
|
||||
|
||||
symbol_products = list(symbol_groups.values())[0]
|
||||
|
||||
|
||||
# Estrai tutte le fonti
|
||||
sources = []
|
||||
for product in symbol_products:
|
||||
# Determina la fonte dall'ID o da altri metadati se disponibili
|
||||
source = cls._detect_source(product)
|
||||
sources.append(source)
|
||||
|
||||
|
||||
# Aggrega i dati
|
||||
aggregated_data = cls._aggregate_products(symbol_products, sources)
|
||||
|
||||
|
||||
# Crea l'istanza e assegna gli attributi privati
|
||||
instance = cls(**aggregated_data)
|
||||
instance._metadata = aggregated_data.get("_metadata")
|
||||
instance._source_data = aggregated_data.get("_source_data")
|
||||
|
||||
|
||||
return instance
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _detect_source(product: ProductInfo) -> str:
|
||||
"""Rileva la fonte da un ProductInfo"""
|
||||
@@ -81,7 +81,7 @@ class AggregatedProductInfo(ProductInfo):
|
||||
return "yfinance"
|
||||
else:
|
||||
return "unknown"
|
||||
|
||||
|
||||
@classmethod
|
||||
def _aggregate_products(cls, products: List[ProductInfo], sources: List[str]) -> dict:
|
||||
"""
|
||||
@@ -90,11 +90,11 @@ class AggregatedProductInfo(ProductInfo):
|
||||
"""
|
||||
import statistics
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
# ID: usa il symbol come chiave standardizzata
|
||||
symbol = products[0].symbol
|
||||
aggregated_id = f"{symbol}_AGG"
|
||||
|
||||
|
||||
# Status: strategia "conservativa" - il più restrittivo vince
|
||||
# Ordine: trading_only < limit_only < auction < maintenance < offline
|
||||
status_priority = {
|
||||
@@ -105,41 +105,41 @@ class AggregatedProductInfo(ProductInfo):
|
||||
"offline": 5,
|
||||
"": 0 # Default se non specificato
|
||||
}
|
||||
|
||||
|
||||
statuses = [p.status for p in products if p.status]
|
||||
if statuses:
|
||||
# Prendi lo status con priorità più alta (più restrittivo)
|
||||
aggregated_status = max(statuses, key=lambda s: status_priority.get(s, 0))
|
||||
else:
|
||||
aggregated_status = "trading" # Default ottimistico
|
||||
|
||||
|
||||
# Prezzo: media semplice (uso diretto del campo price come float)
|
||||
prices = [p.price for p in products if p.price > 0]
|
||||
aggregated_price = statistics.mean(prices) if prices else 0.0
|
||||
|
||||
|
||||
# Volume: somma (assumendo che i volumi siano esclusivi per exchange)
|
||||
volumes = [p.volume_24h for p in products if p.volume_24h > 0]
|
||||
total_volume = sum(volumes)
|
||||
aggregated_volume = sum(price_i * volume_i for price_i, volume_i in zip((p.price for p in products), (volume for volume in volumes))) / total_volume
|
||||
aggregated_volume = round(aggregated_volume, 5)
|
||||
# aggregated_volume = sum(volumes) if volumes else 0.0 # NOTE old implementation
|
||||
|
||||
|
||||
# Valuta: prendi la prima (dovrebbero essere tutte uguali)
|
||||
quote_currency = next((p.quote_currency for p in products if p.quote_currency), "USD")
|
||||
|
||||
|
||||
# Calcola confidence score
|
||||
confidence = cls._calculate_confidence(products, sources)
|
||||
|
||||
|
||||
# Crea metadati per debugging
|
||||
metadata = AggregationMetadata(
|
||||
sources_used=set(sources),
|
||||
aggregation_timestamp=datetime.now().isoformat(),
|
||||
confidence_score=confidence
|
||||
)
|
||||
|
||||
|
||||
# Salva dati sorgente per debugging
|
||||
source_data = dict(zip(sources, products))
|
||||
|
||||
|
||||
return {
|
||||
"symbol": symbol,
|
||||
"price": aggregated_price,
|
||||
@@ -150,33 +150,33 @@ class AggregatedProductInfo(ProductInfo):
|
||||
"_metadata": metadata,
|
||||
"_source_data": source_data
|
||||
}
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _calculate_confidence(products: List[ProductInfo], sources: List[str]) -> float:
|
||||
"""Calcola un punteggio di confidenza 0-1"""
|
||||
if not products:
|
||||
return 0.0
|
||||
|
||||
|
||||
score = 1.0
|
||||
|
||||
|
||||
# Riduci score se pochi dati
|
||||
if len(products) < 2:
|
||||
score *= 0.7
|
||||
|
||||
|
||||
# Riduci score se prezzi troppo diversi
|
||||
prices = [p.price for p in products if p.price > 0]
|
||||
if len(prices) > 1:
|
||||
price_std = (max(prices) - min(prices)) / statistics.mean(prices)
|
||||
if price_std > 0.05: # >5% variazione
|
||||
score *= 0.8
|
||||
|
||||
|
||||
# Riduci score se fonti sconosciute
|
||||
unknown_sources = sum(1 for s in sources if s == "unknown")
|
||||
if unknown_sources > 0:
|
||||
score *= (1 - unknown_sources / len(sources))
|
||||
|
||||
|
||||
return max(0.0, min(1.0, score))
|
||||
|
||||
|
||||
def get_debug_info(self) -> dict:
|
||||
"""Metodo opzionale per ottenere informazioni di debug"""
|
||||
return {
|
||||
|
||||
@@ -5,17 +5,17 @@ from app.utils.aggregated_models import AggregatedProductInfo
|
||||
class MarketDataAggregator:
|
||||
"""
|
||||
Aggregatore di dati di mercato che mantiene la trasparenza per l'utente.
|
||||
|
||||
|
||||
Compone MarketAPIs per fornire gli stessi metodi, ma restituisce dati aggregati
|
||||
da tutte le fonti disponibili. L'utente finale non vede la complessità.
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, currency: str = "USD"):
|
||||
# Import lazy per evitare circular import
|
||||
from app.markets import MarketAPIsTool
|
||||
self._market_apis = MarketAPIsTool(currency)
|
||||
self._aggregation_enabled = True
|
||||
|
||||
|
||||
def get_product(self, asset_id: str) -> ProductInfo:
|
||||
"""
|
||||
Override che aggrega dati da tutte le fonti disponibili.
|
||||
@@ -23,13 +23,13 @@ class MarketDataAggregator:
|
||||
"""
|
||||
if not self._aggregation_enabled:
|
||||
return self._market_apis.get_product(asset_id)
|
||||
|
||||
|
||||
# Raccogli dati da tutte le fonti
|
||||
try:
|
||||
raw_results = self.wrappers.try_call_all(
|
||||
lambda wrapper: wrapper.get_product(asset_id)
|
||||
)
|
||||
|
||||
|
||||
# Converti in ProductInfo se necessario
|
||||
products = []
|
||||
for wrapper_class, result in raw_results.items():
|
||||
@@ -38,29 +38,29 @@ class MarketDataAggregator:
|
||||
elif isinstance(result, dict):
|
||||
# Converti dizionario in ProductInfo
|
||||
products.append(ProductInfo(**result))
|
||||
|
||||
|
||||
if not products:
|
||||
raise Exception("Nessun dato disponibile")
|
||||
|
||||
|
||||
# Aggrega i risultati
|
||||
aggregated = AggregatedProductInfo.from_multiple_sources(products)
|
||||
|
||||
|
||||
# Restituisci come ProductInfo normale (nascondi la complessità)
|
||||
return ProductInfo(**aggregated.dict(exclude={"_metadata", "_source_data"}))
|
||||
|
||||
|
||||
except Exception as e:
|
||||
# Fallback: usa il comportamento normale se l'aggregazione fallisce
|
||||
return self._market_apis.get_product(asset_id)
|
||||
|
||||
|
||||
def get_products(self, asset_ids: List[str]) -> List[ProductInfo]:
|
||||
"""
|
||||
Aggrega dati per multiple asset.
|
||||
"""
|
||||
if not self._aggregation_enabled:
|
||||
return self._market_apis.get_products(asset_ids)
|
||||
|
||||
|
||||
aggregated_products = []
|
||||
|
||||
|
||||
for asset_id in asset_ids:
|
||||
try:
|
||||
product = self.get_product(asset_id)
|
||||
@@ -68,36 +68,36 @@ class MarketDataAggregator:
|
||||
except Exception as e:
|
||||
# Salta asset che non riescono ad aggregare
|
||||
continue
|
||||
|
||||
|
||||
return aggregated_products
|
||||
|
||||
|
||||
def get_all_products(self) -> List[ProductInfo]:
|
||||
"""
|
||||
Aggrega tutti i prodotti disponibili.
|
||||
"""
|
||||
if not self._aggregation_enabled:
|
||||
return self._market_apis.get_all_products()
|
||||
|
||||
|
||||
# Raccogli tutti i prodotti da tutte le fonti
|
||||
try:
|
||||
all_products_by_source = self.wrappers.try_call_all(
|
||||
lambda wrapper: wrapper.get_all_products()
|
||||
)
|
||||
|
||||
|
||||
# Raggruppa per symbol per aggregare
|
||||
symbol_groups = {}
|
||||
for wrapper_class, products in all_products_by_source.items():
|
||||
if not isinstance(products, list):
|
||||
continue
|
||||
|
||||
|
||||
for product in products:
|
||||
if isinstance(product, dict):
|
||||
product = ProductInfo(**product)
|
||||
|
||||
|
||||
if product.symbol not in symbol_groups:
|
||||
symbol_groups[product.symbol] = []
|
||||
symbol_groups[product.symbol].append(product)
|
||||
|
||||
|
||||
# Aggrega ogni gruppo
|
||||
aggregated_products = []
|
||||
for symbol, products in symbol_groups.items():
|
||||
@@ -111,13 +111,13 @@ class MarketDataAggregator:
|
||||
# Se l'aggregazione fallisce, usa il primo disponibile
|
||||
if products:
|
||||
aggregated_products.append(products[0])
|
||||
|
||||
|
||||
return aggregated_products
|
||||
|
||||
|
||||
except Exception as e:
|
||||
# Fallback: usa il comportamento normale
|
||||
return self._market_apis.get_all_products()
|
||||
|
||||
|
||||
def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> List[Price]:
|
||||
"""
|
||||
Per i dati storici, usa una strategia diversa:
|
||||
@@ -125,7 +125,7 @@ class MarketDataAggregator:
|
||||
"""
|
||||
if not self._aggregation_enabled:
|
||||
return self._market_apis.get_historical_prices(asset_id, limit)
|
||||
|
||||
|
||||
# Per dati storici, usa il primo wrapper che funziona
|
||||
# (l'aggregazione di dati storici è più complessa)
|
||||
try:
|
||||
@@ -135,21 +135,21 @@ class MarketDataAggregator:
|
||||
except Exception as e:
|
||||
# Fallback: usa il comportamento normale
|
||||
return self._market_apis.get_historical_prices(asset_id, limit)
|
||||
|
||||
|
||||
def enable_aggregation(self, enabled: bool = True):
|
||||
"""Abilita o disabilita l'aggregazione"""
|
||||
self._aggregation_enabled = enabled
|
||||
|
||||
|
||||
def is_aggregation_enabled(self) -> bool:
|
||||
"""Controlla se l'aggregazione è abilitata"""
|
||||
return self._aggregation_enabled
|
||||
|
||||
|
||||
# Metodi proxy per completare l'interfaccia BaseWrapper
|
||||
@property
|
||||
def wrappers(self):
|
||||
"""Accesso al wrapper handler per compatibilità"""
|
||||
return self._market_apis.wrappers
|
||||
|
||||
|
||||
def get_aggregated_product_with_debug(self, asset_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Metodo speciale per debugging: restituisce dati aggregati con metadati.
|
||||
@@ -159,24 +159,24 @@ class MarketDataAggregator:
|
||||
raw_results = self.wrappers.try_call_all(
|
||||
lambda wrapper: wrapper.get_product(asset_id)
|
||||
)
|
||||
|
||||
|
||||
products = []
|
||||
for wrapper_class, result in raw_results.items():
|
||||
if isinstance(result, ProductInfo):
|
||||
products.append(result)
|
||||
elif isinstance(result, dict):
|
||||
products.append(ProductInfo(**result))
|
||||
|
||||
|
||||
if not products:
|
||||
raise Exception("Nessun dato disponibile")
|
||||
|
||||
|
||||
aggregated = AggregatedProductInfo.from_multiple_sources(products)
|
||||
|
||||
|
||||
return {
|
||||
"product": aggregated.dict(exclude={"_metadata", "_source_data"}),
|
||||
"debug": aggregated.get_debug_info()
|
||||
}
|
||||
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"error": str(e),
|
||||
|
||||
@@ -94,6 +94,7 @@ class WrapperHandler(Generic[W]):
|
||||
def __check(wrappers: list[W]) -> bool:
|
||||
return all(w.__class__ is type for w in wrappers)
|
||||
|
||||
@staticmethod
|
||||
def __concise_error(e: Exception) -> str:
|
||||
last_frame = traceback.extract_tb(e.__traceback__)[-1]
|
||||
return f"{e} [\"{last_frame.filename}\", line {last_frame.lineno}]"
|
||||
|
||||
Reference in New Issue
Block a user