1. Aggiungere un aggregator per i dati recuperati dai provider.
2. Lavorare effettivamente all'issue

Done:
1. creati test per i provider
2. creato market_providers_api_demo.py per mostrare i dati recuperati dalle api dei providers
3. aggiornato i provider
4. creato il provider binance sia pubblico che con chiave
5. creato error_handler.py per gestire decoratori e utilità: retry automatico, gestione timeout...
This commit is contained in:
Simone Garau
2025-09-29 21:28:41 +02:00
parent 4615ebe63e
commit dfca44c9d5
12 changed files with 1753 additions and 400 deletions

View File

@@ -1,8 +1,14 @@
from app.markets.base import BaseWrapper
from app.markets.coinbase import CoinBaseWrapper
from app.markets.cryptocompare import CryptoCompareWrapper
from .base import BaseWrapper
from .coinbase import CoinBaseWrapper
from .cryptocompare import CryptoCompareWrapper
from .binance import BinanceWrapper
from .binance_public import PublicBinanceAgent
from .error_handler import ProviderFallback, MarketAPIError, safe_execute
from agno.utils.log import log_warning
import logging
logger = logging.getLogger(__name__)
class MarketAPIs(BaseWrapper):
"""
@@ -43,15 +49,37 @@ class MarketAPIs(BaseWrapper):
"""
self.currency = currency
self.wrappers = MarketAPIs.get_list_available_market_apis(currency=currency)
self.fallback_manager = ProviderFallback(self.wrappers)
# Metodi che semplicemente chiamano il metodo corrispondente del primo wrapper disponibile
# TODO magari fare in modo che se il primo fallisce, prova con il secondo, ecc.
# oppure fare un round-robin tra i vari wrapper oppure usarli tutti e fare una media dei risultati
def get_product(self, asset_id):
return self.wrappers[0].get_product(asset_id)
# Metodi con fallback robusto tra provider multipli
def get_product(self, asset_id: str):
"""Ottiene informazioni su un prodotto con fallback automatico tra provider."""
try:
return self.fallback_manager.execute_with_fallback("get_product", asset_id)
except MarketAPIError as e:
logger.error(f"Failed to get product {asset_id}: {str(e)}")
raise
def get_products(self, asset_ids: list):
return self.wrappers[0].get_products(asset_ids)
"""Ottiene informazioni su più prodotti con fallback automatico tra provider."""
try:
return self.fallback_manager.execute_with_fallback("get_products", asset_ids)
except MarketAPIError as e:
logger.error(f"Failed to get products {asset_ids}: {str(e)}")
raise
def get_all_products(self):
return self.wrappers[0].get_all_products()
def get_historical_prices(self, asset_id = "BTC"):
return self.wrappers[0].get_historical_prices(asset_id)
"""Ottiene tutti i prodotti con fallback automatico tra provider."""
try:
return self.fallback_manager.execute_with_fallback("get_all_products")
except MarketAPIError as e:
logger.error(f"Failed to get all products: {str(e)}")
raise
def get_historical_prices(self, asset_id: str = "BTC"):
"""Ottiene prezzi storici con fallback automatico tra provider."""
try:
return self.fallback_manager.execute_with_fallback("get_historical_prices", asset_id)
except MarketAPIError as e:
logger.error(f"Failed to get historical prices for {asset_id}: {str(e)}")
raise

View File

@@ -1,4 +1,4 @@
from coinbase.rest.types.product_types import Candle, GetProductResponse
from coinbase.rest.types.product_types import Candle, GetProductResponse, Product
from pydantic import BaseModel
class BaseWrapper:
@@ -27,16 +27,28 @@ class ProductInfo(BaseModel):
status: str = ""
quote_currency: str = ""
@staticmethod
def from_coinbase(product_data: GetProductResponse) -> 'ProductInfo':
product = ProductInfo()
product.id = product_data.product_id
product.symbol = product_data.base_currency_id
product.price = float(product_data.price)
product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0
product.id = product_data.product_id or ""
product.symbol = product_data.base_currency_id or ""
product.price = float(product_data.price) if product_data.price else 0.0
product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0.0
# TODO Check what status means in Coinbase
product.status = product_data.status
product.status = product_data.status or ""
return product
@staticmethod
def from_coinbase_product(product_data: Product) -> 'ProductInfo':
product = ProductInfo()
product.id = product_data.product_id or ""
product.symbol = product_data.base_currency_id or ""
product.price = float(product_data.price) if product_data.price else 0.0
product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0.0
product.status = product_data.status or ""
return product
@staticmethod
def from_cryptocompare(asset_data: dict) -> 'ProductInfo':
product = ProductInfo()
product.id = asset_data['FROMSYMBOL'] + '-' + asset_data['TOSYMBOL']
@@ -46,6 +58,27 @@ class ProductInfo(BaseModel):
product.status = "" # Cryptocompare does not provide status
return product
@staticmethod
def from_binance(ticker_data: dict, ticker_24h_data: dict) -> 'ProductInfo':
"""
Crea un oggetto ProductInfo da dati Binance.
Args:
ticker_data: Dati del ticker di prezzo
ticker_24h_data: Dati del ticker 24h
Returns:
Oggetto ProductInfo
"""
product = ProductInfo()
product.id = ticker_data['symbol']
product.symbol = ticker_data['symbol'].replace('USDT', '').replace('BUSD', '')
product.price = float(ticker_data['price'])
product.volume_24h = float(ticker_24h_data['volume'])
product.status = "TRADING" # Binance non fornisce status esplicito
product.quote_currency = "USDT" # Assumiamo USDT come default
return product
class Price(BaseModel):
"""
Rappresenta i dati di prezzo per un asset, come ottenuti dalle API di mercato.
@@ -58,16 +91,18 @@ class Price(BaseModel):
volume: float = 0.0
time: str = ""
@staticmethod
def from_coinbase(candle_data: Candle) -> 'Price':
price = Price()
price.high = float(candle_data.high)
price.low = float(candle_data.low)
price.open = float(candle_data.open)
price.close = float(candle_data.close)
price.volume = float(candle_data.volume)
price.time = str(candle_data.start)
price.high = float(candle_data.high) if candle_data.high else 0.0
price.low = float(candle_data.low) if candle_data.low else 0.0
price.open = float(candle_data.open) if candle_data.open else 0.0
price.close = float(candle_data.close) if candle_data.close else 0.0
price.volume = float(candle_data.volume) if candle_data.volume else 0.0
price.time = str(candle_data.start) if candle_data.start else ""
return price
@staticmethod
def from_cryptocompare(price_data: dict) -> 'Price':
price = Price()
price.high = float(price_data['high'])

View File

@@ -1,30 +1,169 @@
# Versione pubblica senza autenticazione
import os
from typing import Optional
from datetime import datetime, timedelta
from binance.client import Client
from .base import ProductInfo, BaseWrapper, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError
# TODO fare l'aggancio con API in modo da poterlo usare come wrapper di mercato
# TODO implementare i metodi di BaseWrapper
class PublicBinanceAgent:
def __init__(self):
# Client pubblico (senza credenziali)
self.client = Client()
class BinanceWrapper(BaseWrapper):
"""
Wrapper per le API autenticate di Binance.
Implementa l'interfaccia BaseWrapper per fornire accesso unificato
ai dati di mercato di Binance tramite le API REST con autenticazione.
La documentazione delle API è disponibile qui:
https://binance-docs.github.io/apidocs/spot/en/
"""
def __init__(self, api_key: Optional[str] = None, api_secret: Optional[str] = None, currency: str = "USDT"):
"""
Inizializza il wrapper con le credenziali API.
Args:
api_key: Chiave API di Binance (se None, usa variabile d'ambiente)
api_secret: Secret API di Binance (se None, usa variabile d'ambiente)
currency: Valuta di quotazione di default (default: USDT)
"""
if api_key is None:
api_key = os.getenv("BINANCE_API_KEY")
assert api_key is not None, "API key is required"
def get_public_prices(self):
"""Ottiene prezzi pubblici"""
if api_secret is None:
api_secret = os.getenv("BINANCE_API_SECRET")
assert api_secret is not None, "API secret is required"
self.currency = currency
self.client = Client(api_key=api_key, api_secret=api_secret)
def __format_symbol(self, asset_id: str) -> str:
"""
Formatta l'asset_id nel formato richiesto da Binance.
Args:
asset_id: ID dell'asset (es. "BTC" o "BTC-USDT")
Returns:
Simbolo formattato per Binance (es. "BTCUSDT")
"""
if '-' in asset_id:
# Se già nel formato "BTC-USDT", converte in "BTCUSDT"
return asset_id.replace('-', '')
else:
# Se solo "BTC", aggiunge la valuta di default
return f"{asset_id}{self.currency}"
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_product(self, asset_id: str) -> ProductInfo:
"""
Ottiene informazioni su un singolo prodotto.
Args:
asset_id: ID dell'asset da recuperare
Returns:
Oggetto ProductInfo con le informazioni del prodotto
"""
symbol = self.__format_symbol(asset_id)
ticker = self.client.get_symbol_ticker(symbol=symbol)
ticker_24h = self.client.get_ticker(symbol=symbol)
return ProductInfo.from_binance(ticker, ticker_24h)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Ottiene informazioni su più prodotti.
Args:
asset_ids: Lista di ID degli asset da recuperare
Returns:
Lista di oggetti ProductInfo
"""
products = []
for asset_id in asset_ids:
try:
product = self.get_product(asset_id)
products.append(product)
except Exception as e:
print(f"Errore nel recupero di {asset_id}: {e}")
continue
return products
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_all_products(self) -> list[ProductInfo]:
"""
Ottiene informazioni su tutti i prodotti disponibili.
Returns:
Lista di oggetti ProductInfo per tutti i prodotti
"""
try:
btc_price = self.client.get_symbol_ticker(symbol="BTCUSDT")
eth_price = self.client.get_symbol_ticker(symbol="ETHUSDT")
return {
'BTC_USD': float(btc_price['price']),
'ETH_USD': float(eth_price['price']),
'source': 'binance_public'
}
# Ottiene tutti i ticker 24h che contengono le informazioni necessarie
all_tickers = self.client.get_ticker()
products = []
for ticker in all_tickers:
# Filtra solo i simboli che terminano con la valuta di default
if ticker['symbol'].endswith(self.currency):
try:
# Crea ProductInfo direttamente dal ticker 24h
product = ProductInfo()
product.id = ticker['symbol']
product.symbol = ticker['symbol'].replace(self.currency, '')
product.price = float(ticker['lastPrice'])
product.volume_24h = float(ticker['volume'])
product.status = "TRADING" # Binance non fornisce status esplicito
product.quote_currency = self.currency
products.append(product)
except (ValueError, KeyError) as e:
print(f"Errore nel parsing di {ticker['symbol']}: {e}")
continue
return products
except Exception as e:
print(f"Errore: {e}")
return None
print(f"Errore nel recupero di tutti i prodotti: {e}")
return []
# Uso senza credenziali
public_agent = PublicBinanceAgent()
public_prices = public_agent.get_public_prices()
print(public_prices)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_historical_prices(self, asset_id: str = "BTC") -> list[Price]:
"""
Ottiene i prezzi storici per un asset.
Args:
asset_id: ID dell'asset (default: "BTC")
Returns:
Lista di oggetti Price con i dati storici
"""
symbol = self.__format_symbol(asset_id)
try:
# Ottiene candele orarie degli ultimi 30 giorni
klines = self.client.get_historical_klines(
symbol=symbol,
interval=Client.KLINE_INTERVAL_1HOUR,
start_str="30 days ago UTC"
)
prices = []
for kline in klines:
price = Price()
price.open = float(kline[1])
price.high = float(kline[2])
price.low = float(kline[3])
price.close = float(kline[4])
price.volume = float(kline[5])
price.time = str(datetime.fromtimestamp(kline[0] / 1000))
prices.append(price)
return prices
except Exception as e:
print(f"Errore nel recupero dei prezzi storici per {symbol}: {e}")
return []

View File

@@ -0,0 +1,227 @@
"""
Versione pubblica di Binance per accesso ai dati pubblici senza autenticazione.
Questa implementazione estende BaseWrapper per mantenere coerenza
con l'architettura del modulo markets.
"""
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from binance.client import Client
from .base import BaseWrapper, ProductInfo, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError
class PublicBinanceAgent(BaseWrapper):
"""
Agent per l'accesso ai dati pubblici di Binance.
Utilizza l'API pubblica di Binance per ottenere informazioni
sui prezzi e sui mercati senza richiedere autenticazione.
"""
def __init__(self):
"""Inizializza il client pubblico senza credenziali."""
self.client = Client()
def __format_symbol(self, asset_id: str) -> str:
"""
Formatta l'asset_id per Binance (es. BTC -> BTCUSDT).
Args:
asset_id: ID dell'asset (es. "BTC", "ETH")
Returns:
Simbolo formattato per Binance
"""
if asset_id.endswith("USDT") or asset_id.endswith("BUSD"):
return asset_id
return f"{asset_id}USDT"
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_product(self, asset_id: str) -> ProductInfo:
"""
Ottiene informazioni su un singolo prodotto.
Args:
asset_id: ID dell'asset (es. "BTC")
Returns:
Oggetto ProductInfo con le informazioni del prodotto
"""
symbol = self.__format_symbol(asset_id)
try:
ticker = self.client.get_symbol_ticker(symbol=symbol)
ticker_24h = self.client.get_ticker(symbol=symbol)
return ProductInfo.from_binance(ticker, ticker_24h)
except Exception as e:
print(f"Errore nel recupero del prodotto {asset_id}: {e}")
return ProductInfo(id=asset_id, symbol=asset_id)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Ottiene informazioni su più prodotti.
Args:
asset_ids: Lista di ID degli asset
Returns:
Lista di oggetti ProductInfo
"""
products = []
for asset_id in asset_ids:
product = self.get_product(asset_id)
products.append(product)
return products
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_all_products(self) -> list[ProductInfo]:
"""
Ottiene informazioni su tutti i prodotti disponibili.
Returns:
Lista di oggetti ProductInfo per i principali asset
"""
# Per la versione pubblica, restituiamo solo i principali asset
major_assets = ["BTC", "ETH", "BNB", "ADA", "DOT", "LINK", "LTC", "XRP"]
return self.get_products(major_assets)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_historical_prices(self, asset_id: str = "BTC") -> list[Price]:
"""
Ottiene i prezzi storici per un asset.
Args:
asset_id: ID dell'asset (default: "BTC")
Returns:
Lista di oggetti Price con i dati storici
"""
symbol = self.__format_symbol(asset_id)
try:
# Ottieni candele degli ultimi 30 giorni
end_time = datetime.now()
start_time = end_time - timedelta(days=30)
klines = self.client.get_historical_klines(
symbol,
Client.KLINE_INTERVAL_1DAY,
start_time.strftime("%d %b %Y %H:%M:%S"),
end_time.strftime("%d %b %Y %H:%M:%S")
)
prices = []
for kline in klines:
price = Price(
open=float(kline[1]),
high=float(kline[2]),
low=float(kline[3]),
close=float(kline[4]),
volume=float(kline[5]),
time=str(datetime.fromtimestamp(kline[0] / 1000))
)
prices.append(price)
return prices
except Exception as e:
print(f"Errore nel recupero dei prezzi storici per {asset_id}: {e}")
return []
def get_public_prices(self, symbols: Optional[list[str]] = None) -> Optional[Dict[str, Any]]:
"""
Ottiene i prezzi pubblici per i simboli specificati.
Args:
symbols: Lista di simboli da recuperare (es. ["BTCUSDT", "ETHUSDT"]).
Se None, recupera BTC e ETH di default.
Returns:
Dizionario con i prezzi e informazioni sulla fonte, o None in caso di errore.
"""
if symbols is None:
symbols = ["BTCUSDT", "ETHUSDT"]
try:
prices = {}
for symbol in symbols:
ticker = self.client.get_symbol_ticker(symbol=symbol)
# Converte BTCUSDT -> BTC_USD per consistenza
clean_symbol = symbol.replace("USDT", "_USD").replace("BUSD", "_USD")
prices[clean_symbol] = float(ticker['price'])
return {
**prices,
'source': 'binance_public',
'timestamp': self.client.get_server_time()['serverTime']
}
except Exception as e:
print(f"Errore nel recupero dei prezzi pubblici: {e}")
return None
def get_24hr_ticker(self, symbol: str) -> Optional[Dict[str, Any]]:
"""
Ottiene le statistiche 24h per un simbolo specifico.
Args:
symbol: Simbolo del trading pair (es. "BTCUSDT")
Returns:
Dizionario con le statistiche 24h o None in caso di errore.
"""
try:
ticker = self.client.get_ticker(symbol=symbol)
return {
'symbol': ticker['symbol'],
'price': float(ticker['lastPrice']),
'price_change': float(ticker['priceChange']),
'price_change_percent': float(ticker['priceChangePercent']),
'high_24h': float(ticker['highPrice']),
'low_24h': float(ticker['lowPrice']),
'volume_24h': float(ticker['volume']),
'source': 'binance_public'
}
except Exception as e:
print(f"Errore nel recupero del ticker 24h per {symbol}: {e}")
return None
def get_exchange_info(self) -> Optional[Dict[str, Any]]:
"""
Ottiene informazioni generali sull'exchange.
Returns:
Dizionario con informazioni sull'exchange o None in caso di errore.
"""
try:
info = self.client.get_exchange_info()
return {
'timezone': info['timezone'],
'server_time': info['serverTime'],
'symbols_count': len(info['symbols']),
'source': 'binance_public'
}
except Exception as e:
print(f"Errore nel recupero delle informazioni exchange: {e}")
return None
# Esempio di utilizzo
if __name__ == "__main__":
# Uso senza credenziali
public_agent = PublicBinanceAgent()
# Ottieni prezzi di default (BTC e ETH)
public_prices = public_agent.get_public_prices()
print("Prezzi pubblici:", public_prices)
# Ottieni statistiche 24h per BTC
btc_stats = public_agent.get_24hr_ticker("BTCUSDT")
print("Statistiche BTC 24h:", btc_stats)
# Ottieni informazioni exchange
exchange_info = public_agent.get_exchange_info()
print("Info exchange:", exchange_info)

View File

@@ -1,13 +1,21 @@
import os
from typing import Optional
from datetime import datetime, timedelta
from coinbase.rest import RESTClient
from app.markets.base import ProductInfo, BaseWrapper, Price
from .base import ProductInfo, BaseWrapper, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError, RateLimitError
class CoinBaseWrapper(BaseWrapper):
"""
Wrapper per le API di Coinbase.
La documentazione delle API è disponibile qui: https://docs.cdp.coinbase.com/api-reference/advanced-trade-api/rest-api/introduction
Wrapper per le API di Coinbase Advanced Trade.
Implementa l'interfaccia BaseWrapper per fornire accesso unificato
ai dati di mercato di Coinbase tramite le API REST.
La documentazione delle API è disponibile qui:
https://docs.cdp.coinbase.com/api-reference/advanced-trade-api/rest-api/introduction
"""
def __init__(self, api_key:str = None, api_private_key:str = None, currency: str = "USD"):
def __init__(self, api_key: Optional[str] = None, api_private_key: Optional[str] = None, currency: str = "USD"):
if api_key is None:
api_key = os.getenv("COINBASE_API_KEY")
assert api_key is not None, "API key is required"
@@ -25,21 +33,49 @@ class CoinBaseWrapper(BaseWrapper):
def __format(self, asset_id: str) -> str:
return asset_id if '-' in asset_id else f"{asset_id}-{self.currency}"
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_product(self, asset_id: str) -> ProductInfo:
asset_id = self.__format(asset_id)
asset = self.client.get_product(asset_id)
return ProductInfo.from_coinbase(asset)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
all_asset_ids = [self.__format(asset_id) for asset_id in asset_ids]
assets = self.client.get_products(all_asset_ids)
return [ProductInfo.from_coinbase(asset) for asset in assets.products]
assets = self.client.get_products(product_ids=all_asset_ids)
if assets.products:
return [ProductInfo.from_coinbase_product(asset) for asset in assets.products]
return []
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_all_products(self) -> list[ProductInfo]:
assets = self.client.get_products()
return [ProductInfo.from_coinbase(asset) for asset in assets.products]
if assets.products:
return [ProductInfo.from_coinbase_product(asset) for asset in assets.products]
return []
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_historical_prices(self, asset_id: str = "BTC") -> list[Price]:
asset_id = self.__format(asset_id)
data = self.client.get_candles(product_id=asset_id)
return [Price.from_coinbase(candle) for candle in data.candles]
# Get last 14 days of hourly data (14*24 = 336 candles, within 350 limit)
end_time = datetime.now()
start_time = end_time - timedelta(days=14)
# Convert to UNIX timestamps as strings (required by Coinbase API)
start_timestamp = str(int(start_time.timestamp()))
end_timestamp = str(int(end_time.timestamp()))
data = self.client.get_candles(
product_id=asset_id,
start=start_timestamp,
end=end_timestamp,
granularity="ONE_HOUR",
limit=350 # Explicitly set the limit
)
if data.candles:
return [Price.from_coinbase(candle) for candle in data.candles]
return []

View File

@@ -1,6 +1,8 @@
import os
import requests
from app.markets.base import ProductInfo, BaseWrapper, Price
from typing import Optional, Dict, Any
from .base import ProductInfo, BaseWrapper, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError
BASE_URL = "https://min-api.cryptocompare.com"
@@ -10,7 +12,7 @@ class CryptoCompareWrapper(BaseWrapper):
La documentazione delle API è disponibile qui: https://developers.coindesk.com/documentation/legacy/Price/SingleSymbolPriceEndpoint
!!ATTENZIONE!! sembra essere una API legacy e potrebbe essere deprecata in futuro.
"""
def __init__(self, api_key:str = None, currency:str='USD'):
def __init__(self, api_key: Optional[str] = None, currency: str = 'USD'):
if api_key is None:
api_key = os.getenv("CRYPTOCOMPARE_API_KEY")
assert api_key is not None, "API key is required"
@@ -18,7 +20,7 @@ class CryptoCompareWrapper(BaseWrapper):
self.api_key = api_key
self.currency = currency
def __request(self, endpoint: str, params: dict = None) -> dict:
def __request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
if params is None:
params = {}
params['api_key'] = self.api_key
@@ -26,6 +28,8 @@ class CryptoCompareWrapper(BaseWrapper):
response = requests.get(f"{BASE_URL}{endpoint}", params=params)
return response.json()
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_product(self, asset_id: str) -> ProductInfo:
response = self.__request("/data/pricemultifull", params = {
"fsyms": asset_id,
@@ -34,6 +38,8 @@ class CryptoCompareWrapper(BaseWrapper):
data = response.get('RAW', {}).get(asset_id, {}).get(self.currency, {})
return ProductInfo.from_cryptocompare(data)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
response = self.__request("/data/pricemultifull", params = {
"fsyms": ",".join(asset_ids),
@@ -46,10 +52,37 @@ class CryptoCompareWrapper(BaseWrapper):
assets.append(ProductInfo.from_cryptocompare(asset_data))
return assets
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_all_products(self) -> list[ProductInfo]:
raise NotImplementedError("CryptoCompare does not support fetching all assets")
"""
Workaround per CryptoCompare: utilizza una lista predefinita di asset popolari
poiché l'API non fornisce un endpoint per recuperare tutti i prodotti.
"""
# Lista di asset popolari supportati da CryptoCompare
popular_assets = [
"BTC", "ETH", "ADA", "DOT", "LINK", "LTC", "XRP", "BCH", "BNB", "SOL",
"MATIC", "AVAX", "ATOM", "UNI", "DOGE", "SHIB", "TRX", "ETC", "FIL", "XLM"
]
try:
# Utilizza get_products per recuperare i dati di tutti gli asset popolari
return self.get_products(popular_assets)
except Exception as e:
# Fallback: prova con un set ridotto di asset principali
main_assets = ["BTC", "ETH", "ADA", "DOT", "LINK"]
try:
return self.get_products(main_assets)
except Exception as fallback_error:
# Se anche il fallback fallisce, solleva l'errore originale con informazioni aggiuntive
raise NotImplementedError(
f"CryptoCompare get_all_products() workaround failed. "
f"Original error: {str(e)}, Fallback error: {str(fallback_error)}"
)
def get_historical_prices(self, asset_id: str, day_back: int = 10) -> list[dict]:
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_historical_prices(self, asset_id: str = "BTC", day_back: int = 10) -> list[Price]:
assert day_back <= 30, "day_back should be less than or equal to 30"
response = self.__request("/data/v2/histohour", params = {
"fsym": asset_id,

View File

@@ -0,0 +1,236 @@
"""
Modulo per la gestione robusta degli errori nei market providers.
Fornisce decoratori e utilità per:
- Retry automatico con backoff esponenziale
- Logging standardizzato degli errori
- Gestione di timeout e rate limiting
- Fallback tra provider multipli
"""
import time
import logging
from functools import wraps
from typing import Any, Callable, Optional, Type, Union, List
from requests.exceptions import RequestException, Timeout, ConnectionError
from binance.exceptions import BinanceAPIException, BinanceRequestException
# Configurazione logging
logger = logging.getLogger(__name__)
class MarketAPIError(Exception):
"""Eccezione base per errori delle API di mercato."""
pass
class RateLimitError(MarketAPIError):
"""Eccezione per errori di rate limiting."""
pass
class AuthenticationError(MarketAPIError):
"""Eccezione per errori di autenticazione."""
pass
class DataNotFoundError(MarketAPIError):
"""Eccezione quando i dati richiesti non sono disponibili."""
pass
def retry_on_failure(
max_retries: int = 3,
delay: float = 1.0,
backoff_factor: float = 2.0,
exceptions: tuple = (RequestException, BinanceAPIException, BinanceRequestException)
) -> Callable:
"""
Decoratore per retry automatico con backoff esponenziale.
Args:
max_retries: Numero massimo di tentativi
delay: Delay iniziale in secondi
backoff_factor: Fattore di moltiplicazione per il delay
exceptions: Tuple di eccezioni da catturare per il retry
Returns:
Decoratore per la funzione
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
last_exception = None
current_delay = delay
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
logger.error(
f"Function {func.__name__} failed after {max_retries + 1} attempts. "
f"Last error: {str(e)}"
)
raise MarketAPIError(f"Max retries exceeded: {str(e)}") from e
logger.warning(
f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: {str(e)}. "
f"Retrying in {current_delay:.1f}s..."
)
time.sleep(current_delay)
current_delay *= backoff_factor
except Exception as e:
# Per eccezioni non previste, non fare retry
logger.error(f"Unexpected error in {func.__name__}: {str(e)}")
raise
# Questo non dovrebbe mai essere raggiunto
if last_exception:
raise last_exception
else:
raise MarketAPIError("Unknown error occurred")
return wrapper
return decorator
def handle_api_errors(func: Callable) -> Callable:
"""
Decoratore per gestione standardizzata degli errori API.
Converte errori specifici dei provider in eccezioni standardizzate.
"""
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
try:
return func(*args, **kwargs)
except BinanceAPIException as e:
if e.code == -1021: # Timestamp error
raise MarketAPIError(f"Binance timestamp error: {e.message}")
elif e.code == -1003: # Rate limit
raise RateLimitError(f"Binance rate limit exceeded: {e.message}")
elif e.code in [-2014, -2015]: # API key errors
raise AuthenticationError(f"Binance authentication error: {e.message}")
else:
raise MarketAPIError(f"Binance API error [{e.code}]: {e.message}")
except ConnectionError as e:
raise MarketAPIError(f"Connection error: {str(e)}")
except Timeout as e:
raise MarketAPIError(f"Request timeout: {str(e)}")
except RequestException as e:
raise MarketAPIError(f"Request error: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error in {func.__name__}: {str(e)}")
raise MarketAPIError(f"Unexpected error: {str(e)}") from e
return wrapper
def safe_execute(
func: Callable,
default_value: Any = None,
log_errors: bool = True,
error_message: Optional[str] = None
) -> Any:
"""
Esegue una funzione in modo sicuro, restituendo un valore di default in caso di errore.
Args:
func: Funzione da eseguire
default_value: Valore da restituire in caso di errore
log_errors: Se loggare gli errori
error_message: Messaggio di errore personalizzato
Returns:
Risultato della funzione o valore di default
"""
try:
return func()
except Exception as e:
if log_errors:
message = error_message or f"Error executing {func.__name__}"
logger.warning(f"{message}: {str(e)}")
return default_value
class ProviderFallback:
"""
Classe per gestire il fallback tra provider multipli.
"""
def __init__(self, providers: List[Any]):
"""
Inizializza con una lista di provider ordinati per priorità.
Args:
providers: Lista di provider ordinati per priorità
"""
self.providers = providers
def execute_with_fallback(
self,
method_name: str,
*args,
**kwargs
) -> Any:
"""
Esegue un metodo su tutti i provider fino a trovarne uno che funziona.
Args:
method_name: Nome del metodo da chiamare
*args: Argomenti posizionali
**kwargs: Argomenti nominali
Returns:
Risultato del primo provider che funziona
Raises:
MarketAPIError: Se tutti i provider falliscono
"""
last_error = None
for i, provider in enumerate(self.providers):
try:
if hasattr(provider, method_name):
method = getattr(provider, method_name)
result = method(*args, **kwargs)
if i > 0: # Se non è il primo provider
logger.info(f"Fallback successful: used provider {type(provider).__name__}")
return result
else:
logger.warning(f"Provider {type(provider).__name__} doesn't have method {method_name}")
continue
except Exception as e:
last_error = e
logger.warning(
f"Provider {type(provider).__name__} failed for {method_name}: {str(e)}"
)
continue
# Se arriviamo qui, tutti i provider hanno fallito
raise MarketAPIError(
f"All providers failed for method {method_name}. Last error: {str(last_error)}"
)
def validate_response_data(data: Any, required_fields: Optional[List[str]] = None) -> bool:
"""
Valida che i dati di risposta contengano i campi richiesti.
Args:
data: Dati da validare
required_fields: Lista di campi richiesti
Returns:
True se i dati sono validi, False altrimenti
"""
if data is None:
return False
if required_fields is None:
return True
if isinstance(data, dict):
return all(field in data for field in required_fields)
elif hasattr(data, '__dict__'):
return all(hasattr(data, field) for field in required_fields)
return False