Merge remote-tracking branch 'origin/3-market-api' into tool

# Conflicts:
#	src/app.py
#	src/app/agents/market_agent.py
#	src/app/markets/__init__.py
#	src/app/markets/coinbase.py
#	src/app/markets/cryptocompare.py
#	src/app/pipeline.py
#	tests/agents/test_market.py
#	tests/agents/test_predictor.py
This commit is contained in:
trojanhorse47
2025-09-30 12:37:46 +02:00
14 changed files with 1764 additions and 417 deletions

View File

@@ -6,19 +6,30 @@
# Vedi https://docs.agno.com/examples/models per vedere tutti i modelli supportati
GOOGLE_API_KEY=
# Inserire il percorso di installazione di ollama (es. /usr/share/ollama/.ollama)
# attenzione che fra Linux nativo e WSL il percorso è diverso
OLLAMA_MODELS_PATH=
###############################################################################
# Configurazioni per gli agenti di mercato
###############################################################################
# Coinbase CDP API per Market Agent
# Ottenibili da: https://portal.cdp.coinbase.com/access/api
CDP_API_KEY_NAME=
CDP_API_PRIVATE_KEY=
# IMPORTANTE: Usare le credenziali CDP (NON Exchange legacy)
# - COINBASE_API_KEY: organizations/{org_id}/apiKeys/{key_id}
# - COINBASE_API_SECRET: La private key completa (inclusi BEGIN/END)
# - NON serve COINBASE_PASSPHRASE (solo per Exchange legacy)
COINBASE_API_KEY=
COINBASE_API_SECRET=
# CryptoCompare API per Market Agent (alternativa)
# CryptoCompare API per Market Agent
# Ottenibile da: https://www.cryptocompare.com/cryptopian/api-keys
# NOTA: API legacy, potrebbe essere deprecata in futuro
# Funzionalità limitata: get_all_products() non supportato
CRYPTOCOMPARE_API_KEY=
# Binance API per Market Agent (alternativa)
# Binance API per Market Agent
# Ottenibili da: https://www.binance.com/en/my/settings/api-management
# Supporta sia API autenticate che pubbliche (PublicBinance)
BINANCE_API_KEY=
BINANCE_API_SECRET=

View File

@@ -1,116 +0,0 @@
#!/usr/bin/env python3
"""
Demo script per testare il MarketAgent aggiornato con Coinbase CDP
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from src.app.agents.market_agent import MarketAgent
import logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def main():
print("🚀 Test MarketAgent con Coinbase CDP")
print("=" * 50)
# Inizializza l'agent
agent = MarketAgent()
# Verifica provider disponibili
providers = agent.get_available_providers()
print(f"📡 Provider disponibili: {providers}")
if not providers:
print("⚠️ Nessun provider configurato. Verifica il file .env")
print("\nPer Coinbase CDP, serve:")
print("CDP_API_KEY_NAME=your_key_name")
print("CDP_API_PRIVATE_KEY=your_private_key")
print("\nPer CryptoCompare, serve:")
print("CRYPTOCOMPARE_API_KEY=your_api_key")
return
# Mostra capabilities di ogni provider
for provider in providers:
capabilities = agent.get_provider_capabilities(provider)
print(f"🔧 {provider.upper()}: {capabilities}")
print("\n" + "=" * 50)
# Test ottenimento prezzo singolo
test_symbols = ["BTC", "ETH", "ADA"]
for symbol in test_symbols:
print(f"\n💰 Prezzo {symbol}:")
# Prova ogni provider
for provider in providers:
try:
price = agent.get_asset_price(symbol, provider)
if price:
print(f" {provider}: ${price:,.2f}")
else:
print(f" {provider}: N/A")
except Exception as e:
print(f" {provider}: Errore - {e}")
print("\n" + "=" * 50)
# Test market overview
print("\n📊 Market Overview:")
try:
overview = agent.get_market_overview(["BTC", "ETH", "ADA", "DOT"])
if overview["data"]:
print(f"📡 Fonte: {overview['source']}")
for crypto, prices in overview["data"].items():
if isinstance(prices, dict):
usd_price = prices.get("USD", "N/A")
eur_price = prices.get("EUR", "N/A")
if eur_price != "N/A":
print(f" {crypto}: ${usd_price} (€{eur_price})")
else:
print(f" {crypto}: ${usd_price}")
else:
print("⚠️ Nessun dato disponibile")
except Exception as e:
print(f"❌ Errore nel market overview: {e}")
print("\n" + "=" * 50)
# Test funzione analyze
print("\n🔍 Analisi mercato:")
try:
analysis = agent.analyze("Market overview")
print(analysis)
except Exception as e:
print(f"❌ Errore nell'analisi: {e}")
# Test specifico Coinbase CDP se disponibile
if 'coinbase' in providers:
print("\n" + "=" * 50)
print("\n🏦 Test specifico Coinbase CDP:")
try:
# Test asset singolo
btc_info = agent.get_coinbase_asset_info("BTC")
print(f"📈 BTC Info: {btc_info}")
# Test asset multipli
multi_assets = agent.get_coinbase_multiple_assets(["BTC", "ETH"])
print(f"📊 Multi Assets: {multi_assets}")
except Exception as e:
print(f"❌ Errore nel test Coinbase CDP: {e}")
if __name__ == "__main__":
main()

View File

@@ -1,100 +0,0 @@
#!/usr/bin/env python3
"""
Esempio di utilizzo del MarketAgent unificato.
Questo script mostra come utilizzare il nuovo MarketAgent che supporta
multiple fonti di dati (Coinbase e CryptoCompare).
"""
import sys
from pathlib import Path
# Aggiungi il path src al PYTHONPATH
src_path = Path(__file__).parent / "src"
sys.path.insert(0, str(src_path))
from dotenv import load_dotenv
from app.agents.market_agent import MarketAgent
# Carica variabili d'ambiente
load_dotenv()
def main():
print("🚀 Market Agent Demo\n")
try:
# Inizializza il market agent (auto-configura i provider disponibili)
agent = MarketAgent()
# Mostra provider disponibili
providers = agent.get_available_providers()
print(f"📡 Available providers: {providers}")
if not providers:
print("❌ No providers configured. Please check your .env file.")
print("Required variables:")
print(" For Coinbase: COINBASE_API_KEY, COINBASE_SECRET, COINBASE_PASSPHRASE")
print(" For CryptoCompare: CRYPTOCOMPARE_API_KEY")
return
# Mostra le capacità di ogni provider
print("\n🔧 Provider capabilities:")
for provider in providers:
capabilities = agent.get_provider_capabilities(provider)
print(f" {provider}: {capabilities}")
# Ottieni panoramica del mercato
print("\n📊 Market Overview:")
overview = agent.get_market_overview(["BTC", "ETH", "ADA"])
print(f"Data source: {overview.get('source', 'Unknown')}")
for crypto, prices in overview.get('data', {}).items():
if isinstance(prices, dict):
usd = prices.get('USD', 'N/A')
eur = prices.get('EUR', 'N/A')
if eur != 'N/A':
print(f" {crypto}: ${usd} (€{eur})")
else:
print(f" {crypto}: ${usd}")
# Analisi completa del mercato
print("\n📈 Market Analysis:")
analysis = agent.analyze("comprehensive market analysis")
print(analysis)
# Test specifici per provider (se disponibili)
if 'cryptocompare' in providers:
print("\n🔸 CryptoCompare specific test:")
try:
btc_price = agent.get_single_crypto_price("BTC", "USD")
print(f" BTC price: ${btc_price}")
top_coins = agent.get_top_cryptocurrencies(5)
if top_coins.get('Data'):
print(" Top 5 cryptocurrencies by market cap:")
for coin in top_coins['Data'][:3]: # Show top 3
coin_info = coin.get('CoinInfo', {})
display = coin.get('DISPLAY', {}).get('USD', {})
name = coin_info.get('FullName', 'Unknown')
price = display.get('PRICE', 'N/A')
print(f" {name}: {price}")
except Exception as e:
print(f" CryptoCompare test failed: {e}")
if 'coinbase' in providers:
print("\n🔸 Coinbase specific test:")
try:
ticker = agent.get_coinbase_ticker("BTC-USD")
price = ticker.get('price', 'N/A')
volume = ticker.get('volume_24h', 'N/A')
print(f" BTC-USD: ${price} (24h volume: {volume})")
except Exception as e:
print(f" Coinbase test failed: {e}")
print("\n✅ Demo completed successfully!")
except Exception as e:
print(f"❌ Demo failed: {e}")
print("Make sure you have configured at least one provider in your .env file.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,374 @@
#!/usr/bin/env python3
"""
Demo Completo per Market Data Providers
========================================
Questo script dimostra l'utilizzo di tutti i wrapper che implementano BaseWrapper:
- CoinBaseWrapper (richiede credenziali)
- CryptoCompareWrapper (richiede API key)
- BinanceWrapper (richiede credenziali)
- PublicBinanceAgent (accesso pubblico)
Lo script effettua chiamate GET a diversi provider e visualizza i dati
in modo strutturato con informazioni dettagliate su timestamp, stato
delle richieste e formattazione tabellare.
"""
import sys
import os
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Any
import traceback
# Aggiungi il path src al PYTHONPATH
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from dotenv import load_dotenv
from app.markets import (
CoinBaseWrapper,
CryptoCompareWrapper,
BinanceWrapper,
PublicBinanceAgent,
BaseWrapper
)
# Carica variabili d'ambiente
load_dotenv()
class DemoFormatter:
"""Classe per formattare l'output del demo in modo strutturato."""
@staticmethod
def print_header(title: str, char: str = "=", width: int = 80):
"""Stampa un'intestazione formattata."""
print(f"\n{char * width}")
print(f"{title:^{width}}")
print(f"{char * width}")
@staticmethod
def print_subheader(title: str, char: str = "-", width: int = 60):
"""Stampa una sotto-intestazione formattata."""
print(f"\n{char * width}")
print(f" {title}")
print(f"{char * width}")
@staticmethod
def print_request_info(provider_name: str, method: str, timestamp: datetime,
status: str, error: Optional[str] = None):
"""Stampa informazioni sulla richiesta."""
print(f"🕒 Timestamp: {timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🏷️ Provider: {provider_name}")
print(f"🔧 Method: {method}")
print(f"📊 Status: {status}")
if error:
print(f"❌ Error: {error}")
print()
@staticmethod
def print_product_table(products: List[Any], title: str = "Products"):
"""Stampa una tabella di prodotti."""
if not products:
print(f"📋 {title}: Nessun prodotto trovato")
return
print(f"📋 {title} ({len(products)} items):")
print(f"{'Symbol':<15} {'ID':<20} {'Price':<12} {'Quote':<10} {'Status':<10}")
print("-" * 67)
for product in products[:10]: # Mostra solo i primi 10
symbol = getattr(product, 'symbol', 'N/A')
product_id = getattr(product, 'id', 'N/A')
price = getattr(product, 'price', 0.0)
quote = getattr(product, 'quote_currency', 'N/A')
status = getattr(product, 'status', 'N/A')
# Tronca l'ID se troppo lungo
if len(product_id) > 18:
product_id = product_id[:15] + "..."
price_str = f"${price:.2f}" if price > 0 else "N/A"
print(f"{symbol:<15} {product_id:<20} {price_str:<12} {quote:<10} {status:<10}")
if len(products) > 10:
print(f"... e altri {len(products) - 10} prodotti")
print()
@staticmethod
def print_prices_table(prices: List[Any], title: str = "Historical Prices"):
"""Stampa una tabella di prezzi storici."""
if not prices:
print(f"💰 {title}: Nessun prezzo trovato")
return
print(f"💰 {title} ({len(prices)} entries):")
print(f"{'Time':<12} {'Open':<12} {'High':<12} {'Low':<12} {'Close':<12} {'Volume':<15}")
print("-" * 75)
for price in prices[:5]: # Mostra solo i primi 5
time_str = getattr(price, 'time', 'N/A')
# Il time è già una stringa, non serve strftime
if len(time_str) > 10:
time_str = time_str[:10] # Tronca se troppo lungo
open_price = f"${getattr(price, 'open', 0):.2f}"
high_price = f"${getattr(price, 'high', 0):.2f}"
low_price = f"${getattr(price, 'low', 0):.2f}"
close_price = f"${getattr(price, 'close', 0):.2f}"
volume = f"{getattr(price, 'volume', 0):,.0f}"
print(f"{time_str:<12} {open_price:<12} {high_price:<12} {low_price:<12} {close_price:<12} {volume:<15}")
if len(prices) > 5:
print(f"... e altri {len(prices) - 5} prezzi")
print()
class ProviderTester:
"""Classe per testare i provider di market data."""
def __init__(self):
self.formatter = DemoFormatter()
self.test_symbols = ["BTC", "ETH", "ADA"]
def test_provider(self, wrapper: BaseWrapper, provider_name: str) -> Dict[str, Any]:
"""Testa un provider specifico con tutti i metodi disponibili."""
results = {
"provider_name": provider_name,
"tests": {},
"overall_status": "SUCCESS"
}
self.formatter.print_subheader(f"🔍 Testing {provider_name}")
# Test get_product
for symbol in self.test_symbols:
timestamp = datetime.now()
try:
product = wrapper.get_product(symbol)
self.formatter.print_request_info(
provider_name, f"get_product({symbol})", timestamp, "✅ SUCCESS"
)
if product:
print(f"📦 Product: {product.symbol} (ID: {product.id})")
print(f" Price: ${product.price:.2f}, Quote: {product.quote_currency}")
print(f" Status: {product.status}, Volume 24h: {product.volume_24h:,.2f}")
else:
print(f"📦 Product: Nessun prodotto trovato per {symbol}")
results["tests"][f"get_product_{symbol}"] = "SUCCESS"
except Exception as e:
error_msg = str(e)
self.formatter.print_request_info(
provider_name, f"get_product({symbol})", timestamp, "❌ ERROR", error_msg
)
results["tests"][f"get_product_{symbol}"] = f"ERROR: {error_msg}"
results["overall_status"] = "PARTIAL"
# Test get_products
timestamp = datetime.now()
try:
products = wrapper.get_products(self.test_symbols)
self.formatter.print_request_info(
provider_name, f"get_products({self.test_symbols})", timestamp, "✅ SUCCESS"
)
self.formatter.print_product_table(products, f"{provider_name} Products")
results["tests"]["get_products"] = "SUCCESS"
except Exception as e:
error_msg = str(e)
self.formatter.print_request_info(
provider_name, f"get_products({self.test_symbols})", timestamp, "❌ ERROR", error_msg
)
results["tests"]["get_products"] = f"ERROR: {error_msg}"
results["overall_status"] = "PARTIAL"
# Test get_all_products
timestamp = datetime.now()
try:
all_products = wrapper.get_all_products()
self.formatter.print_request_info(
provider_name, "get_all_products()", timestamp, "✅ SUCCESS"
)
self.formatter.print_product_table(all_products, f"{provider_name} All Products")
results["tests"]["get_all_products"] = "SUCCESS"
except Exception as e:
error_msg = str(e)
self.formatter.print_request_info(
provider_name, "get_all_products()", timestamp, "❌ ERROR", error_msg
)
results["tests"]["get_all_products"] = f"ERROR: {error_msg}"
results["overall_status"] = "PARTIAL"
# Test get_historical_prices
timestamp = datetime.now()
try:
prices = wrapper.get_historical_prices("BTC")
self.formatter.print_request_info(
provider_name, "get_historical_prices(BTC)", timestamp, "✅ SUCCESS"
)
self.formatter.print_prices_table(prices, f"{provider_name} BTC Historical Prices")
results["tests"]["get_historical_prices"] = "SUCCESS"
except Exception as e:
error_msg = str(e)
self.formatter.print_request_info(
provider_name, "get_historical_prices(BTC)", timestamp, "❌ ERROR", error_msg
)
results["tests"]["get_historical_prices"] = f"ERROR: {error_msg}"
results["overall_status"] = "PARTIAL"
return results
def check_environment_variables() -> Dict[str, bool]:
"""Verifica la presenza delle variabili d'ambiente necessarie."""
env_vars = {
"COINBASE_API_KEY": bool(os.getenv("COINBASE_API_KEY")),
"COINBASE_API_SECRET": bool(os.getenv("COINBASE_API_SECRET")),
"CRYPTOCOMPARE_API_KEY": bool(os.getenv("CRYPTOCOMPARE_API_KEY")),
"BINANCE_API_KEY": bool(os.getenv("BINANCE_API_KEY")),
"BINANCE_API_SECRET": bool(os.getenv("BINANCE_API_SECRET")),
}
return env_vars
def initialize_providers() -> Dict[str, BaseWrapper]:
"""Inizializza tutti i provider disponibili."""
providers = {}
env_vars = check_environment_variables()
# PublicBinanceAgent (sempre disponibile)
try:
providers["PublicBinance"] = PublicBinanceAgent()
print("✅ PublicBinanceAgent inizializzato con successo")
except Exception as e:
print(f"❌ Errore nell'inizializzazione di PublicBinanceAgent: {e}")
# CryptoCompareWrapper
if env_vars["CRYPTOCOMPARE_API_KEY"]:
try:
providers["CryptoCompare"] = CryptoCompareWrapper()
print("✅ CryptoCompareWrapper inizializzato con successo")
except Exception as e:
print(f"❌ Errore nell'inizializzazione di CryptoCompareWrapper: {e}")
else:
print("⚠️ CryptoCompareWrapper saltato: CRYPTOCOMPARE_API_KEY non trovata")
# CoinBaseWrapper
if env_vars["COINBASE_API_KEY"] and env_vars["COINBASE_API_SECRET"]:
try:
providers["CoinBase"] = CoinBaseWrapper()
print("✅ CoinBaseWrapper inizializzato con successo")
except Exception as e:
print(f"❌ Errore nell'inizializzazione di CoinBaseWrapper: {e}")
else:
print("⚠️ CoinBaseWrapper saltato: credenziali Coinbase non complete")
# BinanceWrapper
if env_vars["BINANCE_API_KEY"] and env_vars["BINANCE_API_SECRET"]:
try:
providers["Binance"] = BinanceWrapper()
print("✅ BinanceWrapper inizializzato con successo")
except Exception as e:
print(f"❌ Errore nell'inizializzazione di BinanceWrapper: {e}")
else:
print("⚠️ BinanceWrapper saltato: credenziali Binance non complete")
return providers
def print_summary(results: List[Dict[str, Any]]):
"""Stampa un riassunto finale dei risultati."""
formatter = DemoFormatter()
formatter.print_header("📊 RIASSUNTO FINALE", "=", 80)
total_providers = len(results)
successful_providers = sum(1 for r in results if r["overall_status"] == "SUCCESS")
partial_providers = sum(1 for r in results if r["overall_status"] == "PARTIAL")
print(f"🔢 Provider testati: {total_providers}")
print(f"✅ Provider completamente funzionanti: {successful_providers}")
print(f"⚠️ Provider parzialmente funzionanti: {partial_providers}")
print(f"❌ Provider non funzionanti: {total_providers - successful_providers - partial_providers}")
print("\n📋 Dettaglio per provider:")
for result in results:
provider_name = result["provider_name"]
status = result["overall_status"]
status_icon = "" if status == "SUCCESS" else "⚠️" if status == "PARTIAL" else ""
print(f"\n{status_icon} {provider_name}:")
for test_name, test_result in result["tests"].items():
test_icon = "" if test_result == "SUCCESS" else ""
print(f" {test_icon} {test_name}: {test_result}")
def main():
"""Funzione principale del demo."""
formatter = DemoFormatter()
# Intestazione principale
formatter.print_header("🚀 DEMO COMPLETO MARKET DATA PROVIDERS", "=", 80)
print(f"🕒 Avvio demo: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("📝 Questo demo testa tutti i wrapper BaseWrapper disponibili")
print("🔍 Ogni test include timestamp, stato della richiesta e dati formattati")
# Verifica variabili d'ambiente
formatter.print_subheader("🔐 Verifica Configurazione")
env_vars = check_environment_variables()
print("Variabili d'ambiente:")
for var_name, is_present in env_vars.items():
status = "✅ Presente" if is_present else "❌ Mancante"
print(f" {var_name}: {status}")
# Inizializza provider
formatter.print_subheader("🏗️ Inizializzazione Provider")
providers = initialize_providers()
if not providers:
print("❌ Nessun provider disponibile. Verifica la configurazione.")
return
print(f"\n🎯 Provider disponibili per il test: {list(providers.keys())}")
# Testa ogni provider
formatter.print_header("🧪 ESECUZIONE TEST PROVIDER", "=", 80)
tester = ProviderTester()
all_results = []
for provider_name, wrapper in providers.items():
try:
result = tester.test_provider(wrapper, provider_name)
all_results.append(result)
except Exception as e:
print(f"❌ Errore critico nel test di {provider_name}: {e}")
traceback.print_exc()
all_results.append({
"provider_name": provider_name,
"tests": {},
"overall_status": "CRITICAL_ERROR",
"error": str(e)
})
# Stampa riassunto finale
print_summary(all_results)
# Informazioni aggiuntive
formatter.print_header(" INFORMAZIONI AGGIUNTIVE", "=", 80)
print("📚 Documentazione:")
print(" - BaseWrapper: src/app/markets/base.py")
print(" - Test completi: tests/agents/test_market.py")
print(" - Configurazione: .env")
print("\n🔧 Per abilitare tutti i provider:")
print(" 1. Configura le credenziali nel file .env")
print(" 2. Segui la documentazione di ogni provider")
print(" 3. Riavvia il demo")
print(f"\n🏁 Demo completato: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if __name__ == "__main__":
main()

View File

@@ -1,8 +1,8 @@
import gradio as gr
from agno.utils.log import log_info
from dotenv import load_dotenv
from dotenv import load_dotenv
from app.pipeline import Pipeline
from agno.utils.log import log_info
########################################
# MAIN APP & GRADIO INTERFACE

View File

@@ -1,9 +1,14 @@
from base import BaseWrapper
from app.markets.coinbase import CoinBaseWrapper
from app.markets.cryptocompare import CryptoCompareWrapper
from app.markets.binance import BinanceWrapper
from app.markets.binance_public import PublicBinanceAgent
from app.markets.error_handler import ProviderFallback, MarketAPIError, safe_execute
from agno.utils.log import log_warning
import logging
from src.app.markets.base import BaseWrapper
from src.app.markets.coinbase import CoinBaseWrapper
from src.app.markets.cryptocompare import CryptoCompareWrapper
logger = logging.getLogger(__name__)
class MarketAPIs(BaseWrapper):
"""
@@ -46,15 +51,37 @@ class MarketAPIs(BaseWrapper):
"""
self.currency = currency
self.wrappers = MarketAPIs.get_list_available_market_apis(currency=currency)
self.fallback_manager = ProviderFallback(self.wrappers)
# Metodi con fallback robusto tra provider multipli
def get_product(self, asset_id: str):
"""Ottiene informazioni su un prodotto con fallback automatico tra provider."""
try:
return self.fallback_manager.execute_with_fallback("get_product", asset_id)
except MarketAPIError as e:
logger.error(f"Failed to get product {asset_id}: {str(e)}")
raise
# Metodi che semplicemente chiamano il metodo corrispondente del primo wrapper disponibile
# TODO magari fare in modo che se il primo fallisce, prova con il secondo, ecc.
# oppure fare un round-robin tra i vari wrapper oppure usarli tutti e fare una media dei risultati
def get_product(self, asset_id):
return self.wrappers[0].get_product(asset_id)
def get_products(self, asset_ids: list):
return self.wrappers[0].get_products(asset_ids)
"""Ottiene informazioni su più prodotti con fallback automatico tra provider."""
try:
return self.fallback_manager.execute_with_fallback("get_products", asset_ids)
except MarketAPIError as e:
logger.error(f"Failed to get products {asset_ids}: {str(e)}")
raise
def get_all_products(self):
return self.wrappers[0].get_all_products()
def get_historical_prices(self, asset_id = "BTC"):
return self.wrappers[0].get_historical_prices(asset_id)
"""Ottiene tutti i prodotti con fallback automatico tra provider."""
try:
return self.fallback_manager.execute_with_fallback("get_all_products")
except MarketAPIError as e:
logger.error(f"Failed to get all products: {str(e)}")
raise
def get_historical_prices(self, asset_id: str = "BTC"):
"""Ottiene prezzi storici con fallback automatico tra provider."""
try:
return self.fallback_manager.execute_with_fallback("get_historical_prices", asset_id)
except MarketAPIError as e:
logger.error(f"Failed to get historical prices for {asset_id}: {str(e)}")
raise

View File

@@ -1,4 +1,4 @@
from coinbase.rest.types.product_types import Candle, GetProductResponse
from coinbase.rest.types.product_types import Candle, GetProductResponse, Product
from pydantic import BaseModel
class BaseWrapper:
@@ -27,16 +27,28 @@ class ProductInfo(BaseModel):
status: str = ""
quote_currency: str = ""
@staticmethod
def from_coinbase(product_data: GetProductResponse) -> 'ProductInfo':
product = ProductInfo()
product.id = product_data.product_id
product.symbol = product_data.base_currency_id
product.price = float(product_data.price)
product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0
product.id = product_data.product_id or ""
product.symbol = product_data.base_currency_id or ""
product.price = float(product_data.price) if product_data.price else 0.0
product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0.0
# TODO Check what status means in Coinbase
product.status = product_data.status
product.status = product_data.status or ""
return product
@staticmethod
def from_coinbase_product(product_data: Product) -> 'ProductInfo':
product = ProductInfo()
product.id = product_data.product_id or ""
product.symbol = product_data.base_currency_id or ""
product.price = float(product_data.price) if product_data.price else 0.0
product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0.0
product.status = product_data.status or ""
return product
@staticmethod
def from_cryptocompare(asset_data: dict) -> 'ProductInfo':
product = ProductInfo()
product.id = asset_data['FROMSYMBOL'] + '-' + asset_data['TOSYMBOL']
@@ -46,6 +58,27 @@ class ProductInfo(BaseModel):
product.status = "" # Cryptocompare does not provide status
return product
@staticmethod
def from_binance(ticker_data: dict, ticker_24h_data: dict) -> 'ProductInfo':
"""
Crea un oggetto ProductInfo da dati Binance.
Args:
ticker_data: Dati del ticker di prezzo
ticker_24h_data: Dati del ticker 24h
Returns:
Oggetto ProductInfo
"""
product = ProductInfo()
product.id = ticker_data['symbol']
product.symbol = ticker_data['symbol'].replace('USDT', '').replace('BUSD', '')
product.price = float(ticker_data['price'])
product.volume_24h = float(ticker_24h_data['volume'])
product.status = "TRADING" # Binance non fornisce status esplicito
product.quote_currency = "USDT" # Assumiamo USDT come default
return product
class Price(BaseModel):
"""
Rappresenta i dati di prezzo per un asset, come ottenuti dalle API di mercato.
@@ -58,16 +91,18 @@ class Price(BaseModel):
volume: float = 0.0
time: str = ""
@staticmethod
def from_coinbase(candle_data: Candle) -> 'Price':
price = Price()
price.high = float(candle_data.high)
price.low = float(candle_data.low)
price.open = float(candle_data.open)
price.close = float(candle_data.close)
price.volume = float(candle_data.volume)
price.time = str(candle_data.start)
price.high = float(candle_data.high) if candle_data.high else 0.0
price.low = float(candle_data.low) if candle_data.low else 0.0
price.open = float(candle_data.open) if candle_data.open else 0.0
price.close = float(candle_data.close) if candle_data.close else 0.0
price.volume = float(candle_data.volume) if candle_data.volume else 0.0
price.time = str(candle_data.start) if candle_data.start else ""
return price
@staticmethod
def from_cryptocompare(price_data: dict) -> 'Price':
price = Price()
price.high = float(price_data['high'])

View File

@@ -1,30 +1,169 @@
# Versione pubblica senza autenticazione
import os
from typing import Optional
from datetime import datetime, timedelta
from binance.client import Client
from .base import ProductInfo, BaseWrapper, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError
# TODO fare l'aggancio con API in modo da poterlo usare come wrapper di mercato
# TODO implementare i metodi di BaseWrapper
class PublicBinanceAgent:
def __init__(self):
# Client pubblico (senza credenziali)
self.client = Client()
class BinanceWrapper(BaseWrapper):
"""
Wrapper per le API autenticate di Binance.
Implementa l'interfaccia BaseWrapper per fornire accesso unificato
ai dati di mercato di Binance tramite le API REST con autenticazione.
La documentazione delle API è disponibile qui:
https://binance-docs.github.io/apidocs/spot/en/
"""
def __init__(self, api_key: Optional[str] = None, api_secret: Optional[str] = None, currency: str = "USDT"):
"""
Inizializza il wrapper con le credenziali API.
Args:
api_key: Chiave API di Binance (se None, usa variabile d'ambiente)
api_secret: Secret API di Binance (se None, usa variabile d'ambiente)
currency: Valuta di quotazione di default (default: USDT)
"""
if api_key is None:
api_key = os.getenv("BINANCE_API_KEY")
assert api_key is not None, "API key is required"
def get_public_prices(self):
"""Ottiene prezzi pubblici"""
if api_secret is None:
api_secret = os.getenv("BINANCE_API_SECRET")
assert api_secret is not None, "API secret is required"
self.currency = currency
self.client = Client(api_key=api_key, api_secret=api_secret)
def __format_symbol(self, asset_id: str) -> str:
"""
Formatta l'asset_id nel formato richiesto da Binance.
Args:
asset_id: ID dell'asset (es. "BTC" o "BTC-USDT")
Returns:
Simbolo formattato per Binance (es. "BTCUSDT")
"""
if '-' in asset_id:
# Se già nel formato "BTC-USDT", converte in "BTCUSDT"
return asset_id.replace('-', '')
else:
# Se solo "BTC", aggiunge la valuta di default
return f"{asset_id}{self.currency}"
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_product(self, asset_id: str) -> ProductInfo:
"""
Ottiene informazioni su un singolo prodotto.
Args:
asset_id: ID dell'asset da recuperare
Returns:
Oggetto ProductInfo con le informazioni del prodotto
"""
symbol = self.__format_symbol(asset_id)
ticker = self.client.get_symbol_ticker(symbol=symbol)
ticker_24h = self.client.get_ticker(symbol=symbol)
return ProductInfo.from_binance(ticker, ticker_24h)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Ottiene informazioni su più prodotti.
Args:
asset_ids: Lista di ID degli asset da recuperare
Returns:
Lista di oggetti ProductInfo
"""
products = []
for asset_id in asset_ids:
try:
product = self.get_product(asset_id)
products.append(product)
except Exception as e:
print(f"Errore nel recupero di {asset_id}: {e}")
continue
return products
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_all_products(self) -> list[ProductInfo]:
"""
Ottiene informazioni su tutti i prodotti disponibili.
Returns:
Lista di oggetti ProductInfo per tutti i prodotti
"""
try:
btc_price = self.client.get_symbol_ticker(symbol="BTCUSDT")
eth_price = self.client.get_symbol_ticker(symbol="ETHUSDT")
return {
'BTC_USD': float(btc_price['price']),
'ETH_USD': float(eth_price['price']),
'source': 'binance_public'
}
# Ottiene tutti i ticker 24h che contengono le informazioni necessarie
all_tickers = self.client.get_ticker()
products = []
for ticker in all_tickers:
# Filtra solo i simboli che terminano con la valuta di default
if ticker['symbol'].endswith(self.currency):
try:
# Crea ProductInfo direttamente dal ticker 24h
product = ProductInfo()
product.id = ticker['symbol']
product.symbol = ticker['symbol'].replace(self.currency, '')
product.price = float(ticker['lastPrice'])
product.volume_24h = float(ticker['volume'])
product.status = "TRADING" # Binance non fornisce status esplicito
product.quote_currency = self.currency
products.append(product)
except (ValueError, KeyError) as e:
print(f"Errore nel parsing di {ticker['symbol']}: {e}")
continue
return products
except Exception as e:
print(f"Errore: {e}")
return None
print(f"Errore nel recupero di tutti i prodotti: {e}")
return []
# Uso senza credenziali
public_agent = PublicBinanceAgent()
public_prices = public_agent.get_public_prices()
print(public_prices)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_historical_prices(self, asset_id: str = "BTC") -> list[Price]:
"""
Ottiene i prezzi storici per un asset.
Args:
asset_id: ID dell'asset (default: "BTC")
Returns:
Lista di oggetti Price con i dati storici
"""
symbol = self.__format_symbol(asset_id)
try:
# Ottiene candele orarie degli ultimi 30 giorni
klines = self.client.get_historical_klines(
symbol=symbol,
interval=Client.KLINE_INTERVAL_1HOUR,
start_str="30 days ago UTC"
)
prices = []
for kline in klines:
price = Price()
price.open = float(kline[1])
price.high = float(kline[2])
price.low = float(kline[3])
price.close = float(kline[4])
price.volume = float(kline[5])
price.time = str(datetime.fromtimestamp(kline[0] / 1000))
prices.append(price)
return prices
except Exception as e:
print(f"Errore nel recupero dei prezzi storici per {symbol}: {e}")
return []

View File

@@ -0,0 +1,227 @@
"""
Versione pubblica di Binance per accesso ai dati pubblici senza autenticazione.
Questa implementazione estende BaseWrapper per mantenere coerenza
con l'architettura del modulo markets.
"""
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from binance.client import Client
from .base import BaseWrapper, ProductInfo, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError
class PublicBinanceAgent(BaseWrapper):
"""
Agent per l'accesso ai dati pubblici di Binance.
Utilizza l'API pubblica di Binance per ottenere informazioni
sui prezzi e sui mercati senza richiedere autenticazione.
"""
def __init__(self):
"""Inizializza il client pubblico senza credenziali."""
self.client = Client()
def __format_symbol(self, asset_id: str) -> str:
"""
Formatta l'asset_id per Binance (es. BTC -> BTCUSDT).
Args:
asset_id: ID dell'asset (es. "BTC", "ETH")
Returns:
Simbolo formattato per Binance
"""
if asset_id.endswith("USDT") or asset_id.endswith("BUSD"):
return asset_id
return f"{asset_id}USDT"
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_product(self, asset_id: str) -> ProductInfo:
"""
Ottiene informazioni su un singolo prodotto.
Args:
asset_id: ID dell'asset (es. "BTC")
Returns:
Oggetto ProductInfo con le informazioni del prodotto
"""
symbol = self.__format_symbol(asset_id)
try:
ticker = self.client.get_symbol_ticker(symbol=symbol)
ticker_24h = self.client.get_ticker(symbol=symbol)
return ProductInfo.from_binance(ticker, ticker_24h)
except Exception as e:
print(f"Errore nel recupero del prodotto {asset_id}: {e}")
return ProductInfo(id=asset_id, symbol=asset_id)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Ottiene informazioni su più prodotti.
Args:
asset_ids: Lista di ID degli asset
Returns:
Lista di oggetti ProductInfo
"""
products = []
for asset_id in asset_ids:
product = self.get_product(asset_id)
products.append(product)
return products
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_all_products(self) -> list[ProductInfo]:
"""
Ottiene informazioni su tutti i prodotti disponibili.
Returns:
Lista di oggetti ProductInfo per i principali asset
"""
# Per la versione pubblica, restituiamo solo i principali asset
major_assets = ["BTC", "ETH", "BNB", "ADA", "DOT", "LINK", "LTC", "XRP"]
return self.get_products(major_assets)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_historical_prices(self, asset_id: str = "BTC") -> list[Price]:
"""
Ottiene i prezzi storici per un asset.
Args:
asset_id: ID dell'asset (default: "BTC")
Returns:
Lista di oggetti Price con i dati storici
"""
symbol = self.__format_symbol(asset_id)
try:
# Ottieni candele degli ultimi 30 giorni
end_time = datetime.now()
start_time = end_time - timedelta(days=30)
klines = self.client.get_historical_klines(
symbol,
Client.KLINE_INTERVAL_1DAY,
start_time.strftime("%d %b %Y %H:%M:%S"),
end_time.strftime("%d %b %Y %H:%M:%S")
)
prices = []
for kline in klines:
price = Price(
open=float(kline[1]),
high=float(kline[2]),
low=float(kline[3]),
close=float(kline[4]),
volume=float(kline[5]),
time=str(datetime.fromtimestamp(kline[0] / 1000))
)
prices.append(price)
return prices
except Exception as e:
print(f"Errore nel recupero dei prezzi storici per {asset_id}: {e}")
return []
def get_public_prices(self, symbols: Optional[list[str]] = None) -> Optional[Dict[str, Any]]:
"""
Ottiene i prezzi pubblici per i simboli specificati.
Args:
symbols: Lista di simboli da recuperare (es. ["BTCUSDT", "ETHUSDT"]).
Se None, recupera BTC e ETH di default.
Returns:
Dizionario con i prezzi e informazioni sulla fonte, o None in caso di errore.
"""
if symbols is None:
symbols = ["BTCUSDT", "ETHUSDT"]
try:
prices = {}
for symbol in symbols:
ticker = self.client.get_symbol_ticker(symbol=symbol)
# Converte BTCUSDT -> BTC_USD per consistenza
clean_symbol = symbol.replace("USDT", "_USD").replace("BUSD", "_USD")
prices[clean_symbol] = float(ticker['price'])
return {
**prices,
'source': 'binance_public',
'timestamp': self.client.get_server_time()['serverTime']
}
except Exception as e:
print(f"Errore nel recupero dei prezzi pubblici: {e}")
return None
def get_24hr_ticker(self, symbol: str) -> Optional[Dict[str, Any]]:
"""
Ottiene le statistiche 24h per un simbolo specifico.
Args:
symbol: Simbolo del trading pair (es. "BTCUSDT")
Returns:
Dizionario con le statistiche 24h o None in caso di errore.
"""
try:
ticker = self.client.get_ticker(symbol=symbol)
return {
'symbol': ticker['symbol'],
'price': float(ticker['lastPrice']),
'price_change': float(ticker['priceChange']),
'price_change_percent': float(ticker['priceChangePercent']),
'high_24h': float(ticker['highPrice']),
'low_24h': float(ticker['lowPrice']),
'volume_24h': float(ticker['volume']),
'source': 'binance_public'
}
except Exception as e:
print(f"Errore nel recupero del ticker 24h per {symbol}: {e}")
return None
def get_exchange_info(self) -> Optional[Dict[str, Any]]:
"""
Ottiene informazioni generali sull'exchange.
Returns:
Dizionario con informazioni sull'exchange o None in caso di errore.
"""
try:
info = self.client.get_exchange_info()
return {
'timezone': info['timezone'],
'server_time': info['serverTime'],
'symbols_count': len(info['symbols']),
'source': 'binance_public'
}
except Exception as e:
print(f"Errore nel recupero delle informazioni exchange: {e}")
return None
# Esempio di utilizzo
if __name__ == "__main__":
# Uso senza credenziali
public_agent = PublicBinanceAgent()
# Ottieni prezzi di default (BTC e ETH)
public_prices = public_agent.get_public_prices()
print("Prezzi pubblici:", public_prices)
# Ottieni statistiche 24h per BTC
btc_stats = public_agent.get_24hr_ticker("BTCUSDT")
print("Statistiche BTC 24h:", btc_stats)
# Ottieni informazioni exchange
exchange_info = public_agent.get_exchange_info()
print("Info exchange:", exchange_info)

View File

@@ -1,16 +1,21 @@
import os
from typing import Optional
from datetime import datetime, timedelta
from coinbase.rest import RESTClient
from src.app.markets.base import ProductInfo, BaseWrapper, Price
from .base import ProductInfo, BaseWrapper, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError, RateLimitError
class CoinBaseWrapper(BaseWrapper):
"""
Wrapper per le API di Coinbase.
La documentazione delle API è disponibile qui: https://docs.cdp.coinbase.com/api-reference/advanced-trade-api/rest-api/introduction
Wrapper per le API di Coinbase Advanced Trade.
Implementa l'interfaccia BaseWrapper per fornire accesso unificato
ai dati di mercato di Coinbase tramite le API REST.
La documentazione delle API è disponibile qui:
https://docs.cdp.coinbase.com/api-reference/advanced-trade-api/rest-api/introduction
"""
def __init__(self, api_key:str = None, api_private_key:str = None, currency: str = "USD"):
def __init__(self, api_key: Optional[str] = None, api_private_key: Optional[str] = None, currency: str = "USD"):
if api_key is None:
api_key = os.getenv("COINBASE_API_KEY")
assert api_key is not None, "API key is required"
@@ -28,21 +33,49 @@ class CoinBaseWrapper(BaseWrapper):
def __format(self, asset_id: str) -> str:
return asset_id if '-' in asset_id else f"{asset_id}-{self.currency}"
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_product(self, asset_id: str) -> ProductInfo:
asset_id = self.__format(asset_id)
asset = self.client.get_product(asset_id)
return ProductInfo.from_coinbase(asset)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
all_asset_ids = [self.__format(asset_id) for asset_id in asset_ids]
assets = self.client.get_products(all_asset_ids)
return [ProductInfo.from_coinbase(asset) for asset in assets.products]
assets = self.client.get_products(product_ids=all_asset_ids)
if assets.products:
return [ProductInfo.from_coinbase_product(asset) for asset in assets.products]
return []
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_all_products(self) -> list[ProductInfo]:
assets = self.client.get_products()
return [ProductInfo.from_coinbase(asset) for asset in assets.products]
if assets.products:
return [ProductInfo.from_coinbase_product(asset) for asset in assets.products]
return []
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_historical_prices(self, asset_id: str = "BTC") -> list[Price]:
asset_id = self.__format(asset_id)
data = self.client.get_candles(product_id=asset_id)
return [Price.from_coinbase(candle) for candle in data.candles]
# Get last 14 days of hourly data (14*24 = 336 candles, within 350 limit)
end_time = datetime.now()
start_time = end_time - timedelta(days=14)
# Convert to UNIX timestamps as strings (required by Coinbase API)
start_timestamp = str(int(start_time.timestamp()))
end_timestamp = str(int(end_time.timestamp()))
data = self.client.get_candles(
product_id=asset_id,
start=start_timestamp,
end=end_timestamp,
granularity="ONE_HOUR",
limit=350 # Explicitly set the limit
)
if data.candles:
return [Price.from_coinbase(candle) for candle in data.candles]
return []

View File

@@ -1,8 +1,8 @@
import os
import requests
from src.app.markets.base import ProductInfo, BaseWrapper, Price
from typing import Optional, Dict, Any
from .base import ProductInfo, BaseWrapper, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError
BASE_URL = "https://min-api.cryptocompare.com"
@@ -10,9 +10,9 @@ class CryptoCompareWrapper(BaseWrapper):
"""
Wrapper per le API pubbliche di CryptoCompare.
La documentazione delle API è disponibile qui: https://developers.coindesk.com/documentation/legacy/Price/SingleSymbolPriceEndpoint
!ATTENZIONE! Sembra essere una API legacy e potrebbe essere deprecata in futuro.
!!ATTENZIONE!! sembra essere una API legacy e potrebbe essere deprecata in futuro.
"""
def __init__(self, api_key:str = None, currency:str='USD'):
def __init__(self, api_key: Optional[str] = None, currency: str = 'USD'):
if api_key is None:
api_key = os.getenv("CRYPTOCOMPARE_API_KEY")
assert api_key is not None, "API key is required"
@@ -20,7 +20,7 @@ class CryptoCompareWrapper(BaseWrapper):
self.api_key = api_key
self.currency = currency
def __request(self, endpoint: str, params: dict = None) -> dict:
def __request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
if params is None:
params = {}
params['api_key'] = self.api_key
@@ -28,6 +28,8 @@ class CryptoCompareWrapper(BaseWrapper):
response = requests.get(f"{BASE_URL}{endpoint}", params=params)
return response.json()
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_product(self, asset_id: str) -> ProductInfo:
response = self.__request("/data/pricemultifull", params = {
"fsyms": asset_id,
@@ -36,6 +38,8 @@ class CryptoCompareWrapper(BaseWrapper):
data = response.get('RAW', {}).get(asset_id, {}).get(self.currency, {})
return ProductInfo.from_cryptocompare(data)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
response = self.__request("/data/pricemultifull", params = {
"fsyms": ",".join(asset_ids),
@@ -48,10 +52,37 @@ class CryptoCompareWrapper(BaseWrapper):
assets.append(ProductInfo.from_cryptocompare(asset_data))
return assets
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_all_products(self) -> list[ProductInfo]:
raise NotImplementedError("CryptoCompare does not support fetching all assets")
"""
Workaround per CryptoCompare: utilizza una lista predefinita di asset popolari
poiché l'API non fornisce un endpoint per recuperare tutti i prodotti.
"""
# Lista di asset popolari supportati da CryptoCompare
popular_assets = [
"BTC", "ETH", "ADA", "DOT", "LINK", "LTC", "XRP", "BCH", "BNB", "SOL",
"MATIC", "AVAX", "ATOM", "UNI", "DOGE", "SHIB", "TRX", "ETC", "FIL", "XLM"
]
try:
# Utilizza get_products per recuperare i dati di tutti gli asset popolari
return self.get_products(popular_assets)
except Exception as e:
# Fallback: prova con un set ridotto di asset principali
main_assets = ["BTC", "ETH", "ADA", "DOT", "LINK"]
try:
return self.get_products(main_assets)
except Exception as fallback_error:
# Se anche il fallback fallisce, solleva l'errore originale con informazioni aggiuntive
raise NotImplementedError(
f"CryptoCompare get_all_products() workaround failed. "
f"Original error: {str(e)}, Fallback error: {str(fallback_error)}"
)
def get_historical_prices(self, asset_id: str, day_back: int = 10) -> list[dict]:
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_historical_prices(self, asset_id: str = "BTC", day_back: int = 10) -> list[Price]:
assert day_back <= 30, "day_back should be less than or equal to 30"
response = self.__request("/data/v2/histohour", params = {
"fsym": asset_id,

View File

@@ -0,0 +1,236 @@
"""
Modulo per la gestione robusta degli errori nei market providers.
Fornisce decoratori e utilità per:
- Retry automatico con backoff esponenziale
- Logging standardizzato degli errori
- Gestione di timeout e rate limiting
- Fallback tra provider multipli
"""
import time
import logging
from functools import wraps
from typing import Any, Callable, Optional, Type, Union, List
from requests.exceptions import RequestException, Timeout, ConnectionError
from binance.exceptions import BinanceAPIException, BinanceRequestException
# Configurazione logging
logger = logging.getLogger(__name__)
class MarketAPIError(Exception):
"""Eccezione base per errori delle API di mercato."""
pass
class RateLimitError(MarketAPIError):
"""Eccezione per errori di rate limiting."""
pass
class AuthenticationError(MarketAPIError):
"""Eccezione per errori di autenticazione."""
pass
class DataNotFoundError(MarketAPIError):
"""Eccezione quando i dati richiesti non sono disponibili."""
pass
def retry_on_failure(
max_retries: int = 3,
delay: float = 1.0,
backoff_factor: float = 2.0,
exceptions: tuple = (RequestException, BinanceAPIException, BinanceRequestException)
) -> Callable:
"""
Decoratore per retry automatico con backoff esponenziale.
Args:
max_retries: Numero massimo di tentativi
delay: Delay iniziale in secondi
backoff_factor: Fattore di moltiplicazione per il delay
exceptions: Tuple di eccezioni da catturare per il retry
Returns:
Decoratore per la funzione
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
last_exception = None
current_delay = delay
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
logger.error(
f"Function {func.__name__} failed after {max_retries + 1} attempts. "
f"Last error: {str(e)}"
)
raise MarketAPIError(f"Max retries exceeded: {str(e)}") from e
logger.warning(
f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: {str(e)}. "
f"Retrying in {current_delay:.1f}s..."
)
time.sleep(current_delay)
current_delay *= backoff_factor
except Exception as e:
# Per eccezioni non previste, non fare retry
logger.error(f"Unexpected error in {func.__name__}: {str(e)}")
raise
# Questo non dovrebbe mai essere raggiunto
if last_exception:
raise last_exception
else:
raise MarketAPIError("Unknown error occurred")
return wrapper
return decorator
def handle_api_errors(func: Callable) -> Callable:
"""
Decoratore per gestione standardizzata degli errori API.
Converte errori specifici dei provider in eccezioni standardizzate.
"""
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
try:
return func(*args, **kwargs)
except BinanceAPIException as e:
if e.code == -1021: # Timestamp error
raise MarketAPIError(f"Binance timestamp error: {e.message}")
elif e.code == -1003: # Rate limit
raise RateLimitError(f"Binance rate limit exceeded: {e.message}")
elif e.code in [-2014, -2015]: # API key errors
raise AuthenticationError(f"Binance authentication error: {e.message}")
else:
raise MarketAPIError(f"Binance API error [{e.code}]: {e.message}")
except ConnectionError as e:
raise MarketAPIError(f"Connection error: {str(e)}")
except Timeout as e:
raise MarketAPIError(f"Request timeout: {str(e)}")
except RequestException as e:
raise MarketAPIError(f"Request error: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error in {func.__name__}: {str(e)}")
raise MarketAPIError(f"Unexpected error: {str(e)}") from e
return wrapper
def safe_execute(
func: Callable,
default_value: Any = None,
log_errors: bool = True,
error_message: Optional[str] = None
) -> Any:
"""
Esegue una funzione in modo sicuro, restituendo un valore di default in caso di errore.
Args:
func: Funzione da eseguire
default_value: Valore da restituire in caso di errore
log_errors: Se loggare gli errori
error_message: Messaggio di errore personalizzato
Returns:
Risultato della funzione o valore di default
"""
try:
return func()
except Exception as e:
if log_errors:
message = error_message or f"Error executing {func.__name__}"
logger.warning(f"{message}: {str(e)}")
return default_value
class ProviderFallback:
"""
Classe per gestire il fallback tra provider multipli.
"""
def __init__(self, providers: List[Any]):
"""
Inizializza con una lista di provider ordinati per priorità.
Args:
providers: Lista di provider ordinati per priorità
"""
self.providers = providers
def execute_with_fallback(
self,
method_name: str,
*args,
**kwargs
) -> Any:
"""
Esegue un metodo su tutti i provider fino a trovarne uno che funziona.
Args:
method_name: Nome del metodo da chiamare
*args: Argomenti posizionali
**kwargs: Argomenti nominali
Returns:
Risultato del primo provider che funziona
Raises:
MarketAPIError: Se tutti i provider falliscono
"""
last_error = None
for i, provider in enumerate(self.providers):
try:
if hasattr(provider, method_name):
method = getattr(provider, method_name)
result = method(*args, **kwargs)
if i > 0: # Se non è il primo provider
logger.info(f"Fallback successful: used provider {type(provider).__name__}")
return result
else:
logger.warning(f"Provider {type(provider).__name__} doesn't have method {method_name}")
continue
except Exception as e:
last_error = e
logger.warning(
f"Provider {type(provider).__name__} failed for {method_name}: {str(e)}"
)
continue
# Se arriviamo qui, tutti i provider hanno fallito
raise MarketAPIError(
f"All providers failed for method {method_name}. Last error: {str(last_error)}"
)
def validate_response_data(data: Any, required_fields: Optional[List[str]] = None) -> bool:
"""
Valida che i dati di risposta contengano i campi richiesti.
Args:
data: Dati da validare
required_fields: Lista di campi richiesti
Returns:
True se i dati sono validi, False altrimenti
"""
if data is None:
return False
if required_fields is None:
return True
if isinstance(data, dict):
return all(field in data for field in required_fields)
elif hasattr(data, '__dict__'):
return all(hasattr(data, field) for field in required_fields)
return False

View File

@@ -1,146 +1,596 @@
"""
Test suite completo per il sistema di mercato.
Questo modulo testa approfonditamente tutte le implementazioni di BaseWrapper
e verifica la conformità all'interfaccia definita in base.py.
"""
import os
import pytest
from src.app.agents.market_toolkit import MarketToolkit
from src.app.markets.base import BaseWrapper
from src.app.markets.coinbase import CoinBaseWrapper
from src.app.markets.cryptocompare import CryptoCompareWrapper
from src.app.markets import MarketAPIs
from unittest.mock import Mock, patch, MagicMock
from typing import Type, List
class TestMarketSystem:
"""Test suite per il sistema di mercato (wrappers + toolkit)"""
# Import delle classi da testare
from app.markets.base import BaseWrapper, ProductInfo, Price
from app.markets.coinbase import CoinBaseWrapper
from app.markets.cryptocompare import CryptoCompareWrapper
from app.markets.binance import BinanceWrapper
from app.markets.binance_public import PublicBinanceAgent
from app.markets import MarketAPIs
@pytest.fixture(scope="class")
def market_wrapper(self) -> BaseWrapper:
return MarketAPIs("USD")
def test_wrapper_initialization(self, market_wrapper):
assert market_wrapper is not None
assert hasattr(market_wrapper, 'get_product')
assert hasattr(market_wrapper, 'get_products')
assert hasattr(market_wrapper, 'get_all_products')
assert hasattr(market_wrapper, 'get_historical_prices')
class TestBaseWrapperInterface:
"""Test per verificare che tutte le implementazioni rispettino l'interfaccia BaseWrapper."""
def test_all_wrappers_extend_basewrapper(self):
"""Verifica che tutte le classi wrapper estendano BaseWrapper."""
wrapper_classes = [
CoinBaseWrapper,
CryptoCompareWrapper,
BinanceWrapper,
PublicBinanceAgent,
MarketAPIs
]
for wrapper_class in wrapper_classes:
assert issubclass(wrapper_class, BaseWrapper), f"{wrapper_class.__name__} deve estendere BaseWrapper"
def test_all_wrappers_implement_required_methods(self):
"""Verifica che tutte le classi implementino i metodi richiesti dall'interfaccia."""
wrapper_classes = [
CoinBaseWrapper,
CryptoCompareWrapper,
BinanceWrapper,
PublicBinanceAgent,
MarketAPIs
]
required_methods = ['get_product', 'get_products', 'get_all_products', 'get_historical_prices']
for wrapper_class in wrapper_classes:
for method in required_methods:
assert hasattr(wrapper_class, method), f"{wrapper_class.__name__} deve implementare {method}"
assert callable(getattr(wrapper_class, method)), f"{method} deve essere callable in {wrapper_class.__name__}"
def test_providers_configuration(self):
available_providers = []
if os.getenv('CDP_API_KEY_NAME') and os.getenv('CDP_API_PRIVATE_KEY'):
available_providers.append('coinbase')
if os.getenv('CRYPTOCOMPARE_API_KEY'):
available_providers.append('cryptocompare')
assert len(available_providers) > 0
def test_wrapper_capabilities(self, market_wrapper):
capabilities = []
if hasattr(market_wrapper, 'get_product'):
capabilities.append('single_product')
if hasattr(market_wrapper, 'get_products'):
capabilities.append('multiple_products')
if hasattr(market_wrapper, 'get_historical_prices'):
capabilities.append('historical_data')
assert len(capabilities) > 0
class TestProductInfoModel:
"""Test per la classe ProductInfo e i suoi metodi di conversione."""
def test_productinfo_initialization(self):
"""Test inizializzazione di ProductInfo."""
product = ProductInfo()
assert product.id == ""
assert product.symbol == ""
assert product.price == 0.0
assert product.volume_24h == 0.0
assert product.status == ""
assert product.quote_currency == ""
def test_productinfo_with_data(self):
"""Test ProductInfo con dati specifici."""
product = ProductInfo(
id="BTC-USD",
symbol="BTC",
price=50000.0,
volume_24h=1000000.0,
status="TRADING",
quote_currency="USD"
)
assert product.id == "BTC-USD"
assert product.symbol == "BTC"
assert product.price == 50000.0
assert product.volume_24h == 1000000.0
assert product.status == "TRADING"
assert product.quote_currency == "USD"
def test_productinfo_from_cryptocompare(self):
"""Test conversione da dati CryptoCompare."""
mock_data = {
'FROMSYMBOL': 'BTC',
'TOSYMBOL': 'USD',
'PRICE': 50000.0,
'VOLUME24HOUR': 1000000.0
}
product = ProductInfo.from_cryptocompare(mock_data)
assert product.id == "BTC-USD"
assert product.symbol == "BTC"
assert product.price == 50000.0
assert product.volume_24h == 1000000.0
assert product.status == ""
def test_productinfo_from_binance(self):
"""Test conversione da dati Binance."""
ticker_data = {'symbol': 'BTCUSDT', 'price': '50000.0'}
ticker_24h_data = {'volume': '1000000.0'}
product = ProductInfo.from_binance(ticker_data, ticker_24h_data)
assert product.id == "BTCUSDT"
assert product.symbol == "BTC"
assert product.price == 50000.0
assert product.volume_24h == 1000000.0
assert product.status == "TRADING"
assert product.quote_currency == "USDT"
def test_market_data_retrieval(self, market_wrapper):
btc_product = market_wrapper.get_product("BTC")
assert btc_product is not None
assert hasattr(btc_product, 'symbol')
assert hasattr(btc_product, 'price')
assert btc_product.price > 0
def test_market_toolkit_integration(self, market_wrapper):
try:
toolkit = MarketToolkit()
assert toolkit is not None
assert hasattr(toolkit, 'market_agent')
assert toolkit.market_api is not None
class TestPriceModel:
"""Test per la classe Price e i suoi metodi di conversione."""
def test_price_initialization(self):
"""Test inizializzazione di Price."""
price = Price()
assert price.high == 0.0
assert price.low == 0.0
assert price.open == 0.0
assert price.close == 0.0
assert price.volume == 0.0
assert price.time == ""
def test_price_with_data(self):
"""Test Price con dati specifici."""
price = Price(
high=51000.0,
low=49000.0,
open=50000.0,
close=50500.0,
volume=1000.0,
time="2024-01-01T00:00:00Z"
)
assert price.high == 51000.0
assert price.low == 49000.0
assert price.open == 50000.0
assert price.close == 50500.0
assert price.volume == 1000.0
assert price.time == "2024-01-01T00:00:00Z"
def test_price_from_cryptocompare(self):
"""Test conversione da dati CryptoCompare."""
mock_data = {
'high': 51000.0,
'low': 49000.0,
'open': 50000.0,
'close': 50500.0,
'volumeto': 1000.0,
'time': 1704067200
}
price = Price.from_cryptocompare(mock_data)
assert price.high == 51000.0
assert price.low == 49000.0
assert price.open == 50000.0
assert price.close == 50500.0
assert price.volume == 1000.0
assert price.time == "1704067200"
tools = toolkit.tools
assert len(tools) > 0
except Exception as e:
print(f"MarketToolkit test failed: {e}")
# Non fail completamente - il toolkit potrebbe avere dipendenze specifiche
class TestCoinBaseWrapper:
"""Test specifici per CoinBaseWrapper."""
@pytest.mark.skipif(
not os.getenv('CRYPTOCOMPARE_API_KEY'),
reason="CRYPTOCOMPARE_API_KEY not configured"
not (os.getenv('COINBASE_API_KEY') and os.getenv('COINBASE_API_SECRET')),
reason="Credenziali Coinbase non configurate"
)
def test_cryptocompare_wrapper(self):
try:
api_key = os.getenv('CRYPTOCOMPARE_API_KEY')
wrapper = CryptoCompareWrapper(api_key=api_key, currency="USD")
def test_coinbase_initialization_with_env_vars(self):
"""Test inizializzazione con variabili d'ambiente."""
wrapper = CoinBaseWrapper(currency="USD")
assert wrapper.currency == "USD"
assert wrapper.client is not None
@patch.dict(os.environ, {}, clear=True)
def test_coinbase_initialization_with_params(self):
"""Test inizializzazione con parametri espliciti quando non ci sono variabili d'ambiente."""
with pytest.raises(AssertionError, match="API key is required"):
CoinBaseWrapper(api_key=None, api_private_key=None)
@patch('app.markets.coinbase.RESTClient')
def test_coinbase_asset_formatting_behavior(self, mock_client):
"""Test comportamento di formattazione asset ID attraverso get_product."""
mock_response = Mock()
mock_response.product_id = "BTC-USD"
mock_response.base_currency_id = "BTC"
mock_response.price = "50000.0"
mock_response.volume_24h = "1000000.0"
mock_response.status = "TRADING"
mock_client_instance = Mock()
mock_client_instance.get_product.return_value = mock_response
mock_client.return_value = mock_client_instance
wrapper = CoinBaseWrapper(api_key="test", api_private_key="test")
# Test che entrambi i formati funzionino
wrapper.get_product("BTC")
wrapper.get_product("BTC-USD")
# Verifica che get_product sia stato chiamato con il formato corretto
assert mock_client_instance.get_product.call_count == 2
@patch('app.markets.coinbase.RESTClient')
def test_coinbase_get_product(self, mock_client):
"""Test get_product con mock."""
mock_response = Mock()
mock_response.product_id = "BTC-USD"
mock_response.base_currency_id = "BTC"
mock_response.price = "50000.0"
mock_response.volume_24h = "1000000.0"
mock_response.status = "TRADING"
mock_client_instance = Mock()
mock_client_instance.get_product.return_value = mock_response
mock_client.return_value = mock_client_instance
wrapper = CoinBaseWrapper(api_key="test", api_private_key="test")
product = wrapper.get_product("BTC")
assert isinstance(product, ProductInfo)
assert product.symbol == "BTC"
mock_client_instance.get_product.assert_called_once_with("BTC-USD")
btc_product = wrapper.get_product("BTC")
assert btc_product is not None
assert btc_product.symbol == "BTC"
assert btc_product.price > 0
products = wrapper.get_products(["BTC", "ETH"])
assert isinstance(products, list)
assert len(products) > 0
for product in products:
if product.symbol in ["BTC", "ETH"]:
assert product.price > 0
except Exception as e:
print(f"CryptoCompare test failed: {e}")
# Non fail il test se c'è un errore di rete o API
class TestCryptoCompareWrapper:
"""Test specifici per CryptoCompareWrapper."""
@pytest.mark.skipif(
not (os.getenv('CDP_API_KEY_NAME') and os.getenv('CDP_API_PRIVATE_KEY')),
reason="Coinbase credentials not configured"
not os.getenv('CRYPTOCOMPARE_API_KEY'),
reason="CRYPTOCOMPARE_API_KEY non configurata"
)
def test_coinbase_wrapper(self):
try:
api_key = os.getenv('CDP_API_KEY_NAME')
api_secret = os.getenv('CDP_API_PRIVATE_KEY')
wrapper = CoinBaseWrapper(
api_key=api_key,
api_private_key=api_secret,
currency="USD"
)
def test_cryptocompare_initialization_with_env_var(self):
"""Test inizializzazione con variabile d'ambiente."""
wrapper = CryptoCompareWrapper(currency="USD")
assert wrapper.currency == "USD"
assert wrapper.api_key is not None
def test_cryptocompare_initialization_with_param(self):
"""Test inizializzazione con parametro esplicito."""
wrapper = CryptoCompareWrapper(api_key="test_key", currency="EUR")
assert wrapper.api_key == "test_key"
assert wrapper.currency == "EUR"
@patch('app.markets.cryptocompare.requests.get')
def test_cryptocompare_get_product(self, mock_get):
"""Test get_product con mock."""
mock_response = Mock()
mock_response.json.return_value = {
'RAW': {
'BTC': {
'USD': {
'FROMSYMBOL': 'BTC',
'TOSYMBOL': 'USD',
'PRICE': 50000.0,
'VOLUME24HOUR': 1000000.0
}
}
}
}
mock_response.raise_for_status.return_value = None
mock_get.return_value = mock_response
wrapper = CryptoCompareWrapper(api_key="test_key")
product = wrapper.get_product("BTC")
assert isinstance(product, ProductInfo)
assert product.symbol == "BTC"
assert product.price == 50000.0
def test_cryptocompare_get_all_products_workaround(self):
"""Test che get_all_products funzioni con il workaround implementato."""
wrapper = CryptoCompareWrapper(api_key="test_key")
# Il metodo ora dovrebbe restituire una lista di ProductInfo invece di sollevare NotImplementedError
products = wrapper.get_all_products()
assert isinstance(products, list)
# Verifica che la lista non sia vuota (dovrebbe contenere almeno alcuni asset popolari)
assert len(products) > 0
# Verifica che ogni elemento sia un ProductInfo
for product in products:
assert isinstance(product, ProductInfo)
btc_product = wrapper.get_product("BTC")
assert btc_product is not None
assert btc_product.symbol == "BTC"
assert btc_product.price > 0
products = wrapper.get_products(["BTC", "ETH"])
assert isinstance(products, list)
assert len(products) > 0
class TestBinanceWrapper:
"""Test specifici per BinanceWrapper."""
def test_binance_initialization_without_credentials(self):
"""Test che l'inizializzazione fallisca senza credenziali."""
# Assicuriamoci che le variabili d'ambiente siano vuote per questo test
with patch.dict(os.environ, {}, clear=True):
with pytest.raises(AssertionError, match="API key is required"):
BinanceWrapper(api_key=None, api_secret="test")
with pytest.raises(AssertionError, match="API secret is required"):
BinanceWrapper(api_key="test", api_secret=None)
@patch('app.markets.binance.Client')
def test_binance_symbol_formatting_behavior(self, mock_client):
"""Test comportamento di formattazione simbolo attraverso get_product."""
mock_client_instance = Mock()
mock_client_instance.get_symbol_ticker.return_value = {
'symbol': 'BTCUSDT',
'price': '50000.0'
}
mock_client_instance.get_ticker.return_value = {
'volume': '1000000.0'
}
mock_client.return_value = mock_client_instance
wrapper = BinanceWrapper(api_key="test", api_secret="test")
# Test che entrambi i formati funzionino
wrapper.get_product("BTC")
wrapper.get_product("BTCUSDT")
# Verifica che i metodi siano stati chiamati
assert mock_client_instance.get_symbol_ticker.call_count == 2
@patch('app.markets.binance.Client')
def test_binance_get_product(self, mock_client):
"""Test get_product con mock."""
mock_client_instance = Mock()
mock_client_instance.get_symbol_ticker.return_value = {
'symbol': 'BTCUSDT',
'price': '50000.0'
}
mock_client_instance.get_ticker.return_value = {
'volume': '1000000.0'
}
mock_client.return_value = mock_client_instance
wrapper = BinanceWrapper(api_key="test", api_secret="test")
product = wrapper.get_product("BTC")
assert isinstance(product, ProductInfo)
assert product.symbol == "BTC"
assert product.price == 50000.0
except Exception as e:
print(f"Coinbase test failed: {e}")
# Non fail il test se c'è un errore di credenziali o rete
def test_provider_selection_mechanism(self):
potential_providers = 0
if os.getenv('CDP_API_KEY_NAME') and os.getenv('CDP_API_PRIVATE_KEY'):
potential_providers += 1
if os.getenv('CRYPTOCOMPARE_API_KEY'):
potential_providers += 1
class TestPublicBinanceAgent:
"""Test specifici per PublicBinanceAgent."""
@patch('app.markets.binance_public.Client')
def test_public_binance_initialization(self, mock_client):
"""Test inizializzazione senza credenziali."""
agent = PublicBinanceAgent()
assert agent.client is not None
mock_client.assert_called_once_with()
@patch('app.markets.binance_public.Client')
def test_public_binance_symbol_formatting_behavior(self, mock_client):
"""Test comportamento di formattazione simbolo attraverso get_product."""
mock_client_instance = Mock()
mock_client_instance.get_symbol_ticker.return_value = {
'symbol': 'BTCUSDT',
'price': '50000.0'
}
mock_client_instance.get_ticker.return_value = {
'volume': '1000000.0'
}
mock_client.return_value = mock_client_instance
agent = PublicBinanceAgent()
# Test che entrambi i formati funzionino
agent.get_product("BTC")
agent.get_product("BTCUSDT")
# Verifica che i metodi siano stati chiamati
assert mock_client_instance.get_symbol_ticker.call_count == 2
@patch('app.markets.binance_public.Client')
def test_public_binance_get_product(self, mock_client):
"""Test get_product con mock."""
mock_client_instance = Mock()
mock_client_instance.get_symbol_ticker.return_value = {
'symbol': 'BTCUSDT',
'price': '50000.0'
}
mock_client_instance.get_ticker.return_value = {
'volume': '1000000.0'
}
mock_client.return_value = mock_client_instance
agent = PublicBinanceAgent()
product = agent.get_product("BTC")
assert isinstance(product, ProductInfo)
assert product.symbol == "BTC"
assert product.price == 50000.0
@patch('app.markets.binance_public.Client')
def test_public_binance_get_all_products(self, mock_client):
"""Test get_all_products restituisce asset principali."""
mock_client_instance = Mock()
mock_client_instance.get_symbol_ticker.return_value = {
'symbol': 'BTCUSDT',
'price': '50000.0'
}
mock_client_instance.get_ticker.return_value = {
'volume': '1000000.0'
}
mock_client.return_value = mock_client_instance
agent = PublicBinanceAgent()
products = agent.get_all_products()
assert isinstance(products, list)
assert len(products) == 8 # Numero di asset principali definiti
for product in products:
assert isinstance(product, ProductInfo)
@patch('app.markets.binance_public.Client')
def test_public_binance_get_public_prices(self, mock_client):
"""Test metodo specifico get_public_prices."""
mock_client_instance = Mock()
mock_client_instance.get_symbol_ticker.return_value = {'price': '50000.0'}
mock_client_instance.get_server_time.return_value = {'serverTime': 1704067200000}
mock_client.return_value = mock_client_instance
agent = PublicBinanceAgent()
prices = agent.get_public_prices(["BTCUSDT"])
assert isinstance(prices, dict)
assert 'BTC_USD' in prices
assert prices['BTC_USD'] == 50000.0
assert 'source' in prices
assert prices['source'] == 'binance_public'
if potential_providers == 0:
with pytest.raises(AssertionError, match="No valid API keys"):
MarketAPIs.get_list_available_market_apis()
else:
wrapper = MarketAPIs("USD")
assert wrapper is not None
assert hasattr(wrapper, 'get_product')
def test_error_handling(self, market_wrapper):
try:
fake_product = market_wrapper.get_product("NONEXISTENT_CRYPTO_SYMBOL_12345")
assert fake_product is None or fake_product.price == 0
except Exception as e:
pass
class TestMarketAPIs:
"""Test per la classe MarketAPIs che aggrega i wrapper."""
def test_market_apis_initialization_no_providers(self):
"""Test che l'inizializzazione fallisca senza provider disponibili."""
with patch.dict(os.environ, {}, clear=True):
with pytest.raises(AssertionError, match="No market API keys"):
MarketAPIs("USD")
@patch('app.markets.CoinBaseWrapper')
def test_market_apis_with_coinbase_only(self, mock_coinbase):
"""Test con solo Coinbase disponibile."""
mock_instance = Mock()
mock_coinbase.return_value = mock_instance
with patch('app.markets.CryptoCompareWrapper', side_effect=Exception("No API key")):
apis = MarketAPIs("USD")
assert len(apis.wrappers) == 1
assert apis.wrappers[0] == mock_instance
@patch('app.markets.CoinBaseWrapper')
@patch('app.markets.CryptoCompareWrapper')
def test_market_apis_delegation(self, mock_crypto, mock_coinbase):
"""Test che i metodi vengano delegati al primo wrapper disponibile."""
mock_coinbase_instance = Mock()
mock_crypto_instance = Mock()
mock_coinbase.return_value = mock_coinbase_instance
mock_crypto.return_value = mock_crypto_instance
apis = MarketAPIs("USD")
# Test delegazione get_product
apis.get_product("BTC")
mock_coinbase_instance.get_product.assert_called_once_with("BTC")
# Test delegazione get_products
apis.get_products(["BTC", "ETH"])
mock_coinbase_instance.get_products.assert_called_once_with(["BTC", "ETH"])
# Test delegazione get_all_products
apis.get_all_products()
mock_coinbase_instance.get_all_products.assert_called_once()
# Test delegazione get_historical_prices
apis.get_historical_prices("BTC")
mock_coinbase_instance.get_historical_prices.assert_called_once_with("BTC")
try:
empty_products = market_wrapper.get_products([])
assert isinstance(empty_products, list)
except Exception as e:
pass
def test_wrapper_currency_support(self, market_wrapper):
assert hasattr(market_wrapper, 'currency')
assert isinstance(market_wrapper.currency, str)
assert len(market_wrapper.currency) >= 3 # USD, EUR, etc.
class TestErrorHandling:
"""Test per la gestione degli errori in tutti i wrapper."""
@patch('app.markets.binance_public.Client')
def test_public_binance_error_handling(self, mock_client):
"""Test gestione errori in PublicBinanceAgent."""
mock_client_instance = Mock()
mock_client_instance.get_symbol_ticker.side_effect = Exception("API Error")
mock_client.return_value = mock_client_instance
agent = PublicBinanceAgent()
product = agent.get_product("INVALID")
# Dovrebbe restituire un ProductInfo vuoto invece di sollevare eccezione
assert isinstance(product, ProductInfo)
assert product.id == "INVALID"
assert product.symbol == "INVALID"
@patch('app.markets.cryptocompare.requests.get')
def test_cryptocompare_network_error(self, mock_get):
"""Test gestione errori di rete in CryptoCompareWrapper."""
mock_get.side_effect = Exception("Network Error")
wrapper = CryptoCompareWrapper(api_key="test")
with pytest.raises(Exception):
wrapper.get_product("BTC")
@patch('app.markets.binance.Client')
def test_binance_api_error_in_get_products(self, mock_client):
"""Test gestione errori in BinanceWrapper.get_products."""
mock_client_instance = Mock()
mock_client_instance.get_symbol_ticker.side_effect = Exception("API Error")
mock_client.return_value = mock_client_instance
wrapper = BinanceWrapper(api_key="test", api_secret="test")
products = wrapper.get_products(["BTC", "ETH"])
# Dovrebbe restituire lista vuota invece di sollevare eccezione
assert isinstance(products, list)
assert len(products) == 0
class TestIntegrationScenarios:
"""Test di integrazione per scenari reali."""
def test_wrapper_method_signatures(self):
"""Verifica che tutti i wrapper abbiano le stesse signature dei metodi."""
wrapper_classes = [CoinBaseWrapper, CryptoCompareWrapper, BinanceWrapper, PublicBinanceAgent]
for wrapper_class in wrapper_classes:
# Verifica get_product
assert hasattr(wrapper_class, 'get_product')
# Verifica get_products
assert hasattr(wrapper_class, 'get_products')
# Verifica get_all_products
assert hasattr(wrapper_class, 'get_all_products')
# Verifica get_historical_prices
assert hasattr(wrapper_class, 'get_historical_prices')
def test_productinfo_consistency(self):
"""Test che tutti i metodi from_* di ProductInfo restituiscano oggetti consistenti."""
# Test from_cryptocompare
crypto_data = {
'FROMSYMBOL': 'BTC',
'TOSYMBOL': 'USD',
'PRICE': 50000.0,
'VOLUME24HOUR': 1000000.0
}
crypto_product = ProductInfo.from_cryptocompare(crypto_data)
# Test from_binance
binance_ticker = {'symbol': 'BTCUSDT', 'price': '50000.0'}
binance_24h = {'volume': '1000000.0'}
binance_product = ProductInfo.from_binance(binance_ticker, binance_24h)
# Verifica che entrambi abbiano gli stessi campi
assert hasattr(crypto_product, 'id')
assert hasattr(crypto_product, 'symbol')
assert hasattr(crypto_product, 'price')
assert hasattr(crypto_product, 'volume_24h')
assert hasattr(binance_product, 'id')
assert hasattr(binance_product, 'symbol')
assert hasattr(binance_product, 'price')
assert hasattr(binance_product, 'volume_24h')
def test_price_consistency(self):
"""Test che tutti i metodi from_* di Price restituiscano oggetti consistenti."""
# Test from_cryptocompare
crypto_data = {
'high': 51000.0,
'low': 49000.0,
'open': 50000.0,
'close': 50500.0,
'volumeto': 1000.0,
'time': 1704067200
}
crypto_price = Price.from_cryptocompare(crypto_data)
# Verifica che abbia tutti i campi richiesti
assert hasattr(crypto_price, 'high')
assert hasattr(crypto_price, 'low')
assert hasattr(crypto_price, 'open')
assert hasattr(crypto_price, 'close')
assert hasattr(crypto_price, 'volume')
assert hasattr(crypto_price, 'time')
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,11 +1,11 @@
import pytest
from src.app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle
from src.app.markets.base import ProductInfo
from src.app.models import AppModels
from app.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle
from app.markets.base import ProductInfo
from app.models import AppModels
def unified_checks(model: AppModels, user_input):
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type]
result = llm.run(user_input)
def unified_checks(model: AppModels, input):
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput)
result = llm.run(input)
content = result.content
assert isinstance(content, PredictorOutput)