Integrato 3 fonti ma da finire binance. Fatto una struttura con i signers per i servizi.

This commit is contained in:
Simone Garau
2025-09-23 18:04:52 +02:00
parent bc6548c6fb
commit 995048831c
25 changed files with 2577 additions and 33 deletions

View File

@@ -14,6 +14,22 @@ ANTHROPIC_API_KEY=
DEEPSEEK_API_KEY= DEEPSEEK_API_KEY=
OPENAI_API_KEY= OPENAI_API_KEY=
# Coinbase CDP API per Market Agent
# Ottenibili da: https://portal.cdp.coinbase.com/access/api
CDP_API_KEY_NAME=organizations/your-org-id/apiKeys/your-key-id
CDP_API_PRIVATE_KEY=-----BEGIN EC PRIVATE KEY-----
YOUR_ACTUAL_PRIVATE_KEY_HERE
-----END EC PRIVATE KEY-----
# CryptoCompare API per Market Agent (alternativa)
# Ottenibile da: https://www.cryptocompare.com/cryptopian/api-keys
CRYPTOCOMPARE_API_KEY=
CRYPTOCOMPARE_AUTH_METHOD=query
# Binance API per Market Agent (alternativa)
BINANCE_API_KEY=
BINANCE_API_SECRET=
# Dipende dal sistema operativo # Dipende dal sistema operativo
# windows: C:\Users\<user>\.ollama # windows: C:\Users\<user>\.ollama
# mac: /Users/<user>/.ollama # mac: /Users/<user>/.ollama

8
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="uv (upo-appAI)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="uv (upo-appAI)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/upo-appAI.iml" filepath="$PROJECT_DIR$/.idea/upo-appAI.iml" />
</modules>
</component>
</project>

18
.idea/upo-appAI.iml generated Normal file
View File

@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="uv (upo-appAI)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="py.test" />
</component>
</module>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

7
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

View File

@@ -95,11 +95,18 @@ Il file `.env` verrà automaticamente caricato nel container grazie alla configu
## Aggiornamento del 19 Giugno 2024 ## Aggiornamento del 19 Giugno 2024
Usando la libreria ``gradio`` è stata creata un'interfaccia web semplice per interagire con gli agenti. Gli agenti si trovano Usando la libreria ``gradio`` è stata creata un'interfaccia web semplice per interagire con gli agenti. Gli agenti si trovano
nella cartella `src/app/agents` e sono: nella cartella `src/app/agents` e sono:
- **Market Agent**: Recupera i dati di mercato (prezzi, volumi, ecc.). ***MOCK*** - **Market Agent**: Agente unificato che supporta multiple fonti di dati (Coinbase + CryptoCompare) con auto-configurazione
- **News Agent**: Recupera le notizie finanziarie più recenti utilizzando. ***MOCK*** - **News Agent**: Recupera le notizie finanziarie più recenti utilizzando. ***MOCK***
- **Social Agent**: Analizza i sentimenti sui social media utilizzando. ***MOCK*** - **Social Agent**: Analizza i sentimenti sui social media utilizzando. ***MOCK***
- **Predictor Agent**: Utilizza i dati raccolti dagli altri agenti per fare previsioni. - **Predictor Agent**: Utilizza i dati raccolti dagli altri agenti per fare previsioni.
### Market Agent Features:
- **Auto-configurazione**: Configura automaticamente i provider disponibili basandosi sulle env vars
- **Multiple provider**: Supporta sia Coinbase (trading) che CryptoCompare (market data)
- **Fallback intelligente**: Se un provider fallisce, prova automaticamente altri
- **Interfaccia unificata**: Un'unica API per accedere a tutti i provider
- **Provider-specific methods**: Accesso diretto alle funzionalità specifiche di ogni provider
L'applicazione principale si trova in `src/app.py` e può essere eseguita con il comando: L'applicazione principale si trova in `src/app.py` e può essere eseguita con il comando:
```sh ```sh
uv run python src/app.py uv run python src/app.py
@@ -112,6 +119,7 @@ upo-appAI/
├── LICENSE ├── LICENSE
├── README.md ├── README.md
├── docker-compose.yaml ├── docker-compose.yaml
├── pytest.ini # Configurazione pytest
├── docs/ ├── docs/
├── pyproject.toml ├── pyproject.toml
├── requirements.txt ├── requirements.txt
@@ -123,14 +131,23 @@ upo-appAI/
│ │ ├── agents/ │ │ ├── agents/
│ │ │ ├── __init__.py │ │ │ ├── __init__.py
│ │ │ ├── __pycache__/ │ │ │ ├── __pycache__/
│ │ │ ├── market_agent.py │ │ │ ├── market_agent.py # Unified market agent (Coinbase + CryptoCompare)
│ │ │ ├── news_agent.py │ │ │ ├── news_agent.py
│ │ │ ├── predictor_agent.py │ │ │ ├── predictor_agent.py
│ │ │ └── social_agent.py │ │ │ └── social_agent.py
│ │ ├── signers/
│ │ │ ├── __init__.py
│ │ │ ├── coinbase_signer.py # Coinbase authentication
│ │ │ └── cryptocompare_signer.py # CryptoCompare authentication
│ │ └── tool.py │ │ └── tool.py
│ ├── app.py │ ├── app.py
│ ├── example.py │ ├── example.py
│ └── ollama_demo.py │ └── ollama_demo.py
├── tests/
│ ├── conftest.py # Configurazione pytest globale
│ └── agents/
│ └── test_market_agents.py # Test suite pytest per market agent
├── market_agent_demo.py # Demo script
└── uv.lock └── uv.lock
``` ```
@@ -141,8 +158,44 @@ upo-appAI/
2. Ollama viene correttamente triggerato dalla selezione da interfaccia web ma la generazione della risposta non viene parsificata correttamente. 2. Ollama viene correttamente triggerato dalla selezione da interfaccia web ma la generazione della risposta non viene parsificata correttamente.
### ToDo ### ToDo
1. Per lo scraping online bisogna iscriversi e recuperare le chiavi API 1. ~~Per lo scraping online bisogna iscriversi e recuperare le chiavi API~~
2. **Market Agent**: [CoinGecko](https://www.coingecko.com/it) 2. **Market Agent**: ✅ [CryptoCompare](https://www.cryptocompare.com/cryptopian/api-keys) (implementato)
3. **News Agent**: [CryptoPanic](https://cryptopanic.com/) 3. **Market Agent**: [Coinbase](https://www.coinbase.com/cloud/discover/api-keys) (implementato)
4. **Social Agent**: [post più hot da r/CryptoCurrency (Reddit)](https://www.reddit.com/) 4. **News Agent**: [CryptoPanic](https://cryptopanic.com/)
5. Capire come `gpt-oss` parsifica la risposta e per questioni "estetiche" si può pensare di visualizzare lo stream dei token. Vedere il sorgente `src/ollama_demo.py` per risolvere il problema. 5. **Social Agent**: [post più hot da r/CryptoCurrency (Reddit)](https://www.reddit.com/)
6. Capire come `gpt-oss` parsifica la risposta e per questioni "estetiche" si può pensare di visualizzare lo stream dei token. Vedere il sorgente `src/ollama_demo.py` per risolvere il problema.
### Test Market Agent
Per testare il market agent implementato, puoi usare diversi metodi:
**Test con pytest** (raccomandato):
```sh
# Esegui tutti i test
uv run pytest tests/agents/test_market_agents.py -v
# Esegui solo test veloci (esclude test API lenti)
uv run pytest tests/agents/test_market_agents.py -v -m "not slow"
# Esegui solo test che richiedono API
uv run pytest tests/agents/test_market_agents.py -v -m "api"
# Esegui test con output dettagliato
uv run pytest tests/agents/test_market_agents.py -v -s
```
**Test standalone** (compatibilità):
```sh
uv run python tests/agents/test_market_agents.py
```
**Demo interattivo**:
```sh
uv run python market_agent_demo.py
```
**Test rapido**:
```sh
uv run python -c "from src.app.agents.market_agent import MarketAgent; agent = MarketAgent(); print('Providers:', agent.get_available_providers()); print(agent.analyze('test'))"
```
Il MarketAgent si auto-configura basandosi sulle variabili disponibili nel tuo .env.

116
demos/cdp_market_demo.py Normal file
View File

@@ -0,0 +1,116 @@
#!/usr/bin/env python3
"""
Demo script per testare il MarketAgent aggiornato con Coinbase CDP
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from src.app.agents.market_agent import MarketAgent
import logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def main():
print("🚀 Test MarketAgent con Coinbase CDP")
print("=" * 50)
# Inizializza l'agent
agent = MarketAgent()
# Verifica provider disponibili
providers = agent.get_available_providers()
print(f"📡 Provider disponibili: {providers}")
if not providers:
print("⚠️ Nessun provider configurato. Verifica il file .env")
print("\nPer Coinbase CDP, serve:")
print("CDP_API_KEY_NAME=your_key_name")
print("CDP_API_PRIVATE_KEY=your_private_key")
print("\nPer CryptoCompare, serve:")
print("CRYPTOCOMPARE_API_KEY=your_api_key")
return
# Mostra capabilities di ogni provider
for provider in providers:
capabilities = agent.get_provider_capabilities(provider)
print(f"🔧 {provider.upper()}: {capabilities}")
print("\n" + "=" * 50)
# Test ottenimento prezzo singolo
test_symbols = ["BTC", "ETH", "ADA"]
for symbol in test_symbols:
print(f"\n💰 Prezzo {symbol}:")
# Prova ogni provider
for provider in providers:
try:
price = agent.get_asset_price(symbol, provider)
if price:
print(f" {provider}: ${price:,.2f}")
else:
print(f" {provider}: N/A")
except Exception as e:
print(f" {provider}: Errore - {e}")
print("\n" + "=" * 50)
# Test market overview
print("\n📊 Market Overview:")
try:
overview = agent.get_market_overview(["BTC", "ETH", "ADA", "DOT"])
if overview["data"]:
print(f"📡 Fonte: {overview['source']}")
for crypto, prices in overview["data"].items():
if isinstance(prices, dict):
usd_price = prices.get("USD", "N/A")
eur_price = prices.get("EUR", "N/A")
if eur_price != "N/A":
print(f" {crypto}: ${usd_price} (€{eur_price})")
else:
print(f" {crypto}: ${usd_price}")
else:
print("⚠️ Nessun dato disponibile")
except Exception as e:
print(f"❌ Errore nel market overview: {e}")
print("\n" + "=" * 50)
# Test funzione analyze
print("\n🔍 Analisi mercato:")
try:
analysis = agent.analyze("Market overview")
print(analysis)
except Exception as e:
print(f"❌ Errore nell'analisi: {e}")
# Test specifico Coinbase CDP se disponibile
if 'coinbase' in providers:
print("\n" + "=" * 50)
print("\n🏦 Test specifico Coinbase CDP:")
try:
# Test asset singolo
btc_info = agent.get_coinbase_asset_info("BTC")
print(f"📈 BTC Info: {btc_info}")
# Test asset multipli
multi_assets = agent.get_coinbase_multiple_assets(["BTC", "ETH"])
print(f"📊 Multi Assets: {multi_assets}")
except Exception as e:
print(f"❌ Errore nel test Coinbase CDP: {e}")
if __name__ == "__main__":
main()

100
demos/market_agent_demo.py Normal file
View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""
Esempio di utilizzo del MarketAgent unificato.
Questo script mostra come utilizzare il nuovo MarketAgent che supporta
multiple fonti di dati (Coinbase e CryptoCompare).
"""
import sys
from pathlib import Path
# Aggiungi il path src al PYTHONPATH
src_path = Path(__file__).parent / "src"
sys.path.insert(0, str(src_path))
from dotenv import load_dotenv
from app.agents.market_agent import MarketAgent
# Carica variabili d'ambiente
load_dotenv()
def main():
print("🚀 Market Agent Demo\n")
try:
# Inizializza il market agent (auto-configura i provider disponibili)
agent = MarketAgent()
# Mostra provider disponibili
providers = agent.get_available_providers()
print(f"📡 Available providers: {providers}")
if not providers:
print("❌ No providers configured. Please check your .env file.")
print("Required variables:")
print(" For Coinbase: COINBASE_API_KEY, COINBASE_SECRET, COINBASE_PASSPHRASE")
print(" For CryptoCompare: CRYPTOCOMPARE_API_KEY")
return
# Mostra le capacità di ogni provider
print("\n🔧 Provider capabilities:")
for provider in providers:
capabilities = agent.get_provider_capabilities(provider)
print(f" {provider}: {capabilities}")
# Ottieni panoramica del mercato
print("\n📊 Market Overview:")
overview = agent.get_market_overview(["BTC", "ETH", "ADA"])
print(f"Data source: {overview.get('source', 'Unknown')}")
for crypto, prices in overview.get('data', {}).items():
if isinstance(prices, dict):
usd = prices.get('USD', 'N/A')
eur = prices.get('EUR', 'N/A')
if eur != 'N/A':
print(f" {crypto}: ${usd} (€{eur})")
else:
print(f" {crypto}: ${usd}")
# Analisi completa del mercato
print("\n📈 Market Analysis:")
analysis = agent.analyze("comprehensive market analysis")
print(analysis)
# Test specifici per provider (se disponibili)
if 'cryptocompare' in providers:
print("\n🔸 CryptoCompare specific test:")
try:
btc_price = agent.get_single_crypto_price("BTC", "USD")
print(f" BTC price: ${btc_price}")
top_coins = agent.get_top_cryptocurrencies(5)
if top_coins.get('Data'):
print(" Top 5 cryptocurrencies by market cap:")
for coin in top_coins['Data'][:3]: # Show top 3
coin_info = coin.get('CoinInfo', {})
display = coin.get('DISPLAY', {}).get('USD', {})
name = coin_info.get('FullName', 'Unknown')
price = display.get('PRICE', 'N/A')
print(f" {name}: {price}")
except Exception as e:
print(f" CryptoCompare test failed: {e}")
if 'coinbase' in providers:
print("\n🔸 Coinbase specific test:")
try:
ticker = agent.get_coinbase_ticker("BTC-USD")
price = ticker.get('price', 'N/A')
volume = ticker.get('volume_24h', 'N/A')
print(f" BTC-USD: ${price} (24h volume: {volume})")
except Exception as e:
print(f" Coinbase test failed: {e}")
print("\n✅ Demo completed successfully!")
except Exception as e:
print(f"❌ Demo failed: {e}")
print("Make sure you have configured at least one provider in your .env file.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,96 @@
# 🚀 Piano di Implementazione - Market Data Enhancement
## 📋 Roadmap Implementazioni
### **Fase 1: Binance Mock Provider**
**Obiettivo**: Aggiungere terzo provider per test aggregazione
- ✅ Creare `binance_signer.py` con mock data
- ✅ Integrare nel MarketAgent
- ✅ Testare detection automatica provider
- **Deliverable**: 3 provider funzionanti (Coinbase, CryptoCompare, Binance)
### **Fase 2: Interrogazione Condizionale**
**Obiettivo**: Auto-detection credenziali e interrogazione intelligente
- ✅ Migliorare detection chiavi API nel MarketAgent
- ✅ Skip provider se credenziali mancanti (no errori)
- ✅ Logging informativo per provider disponibili/non disponibili
- ✅ Gestione graceful degradation
- **Deliverable**: Sistema resiliente che funziona con qualsiasi combinazione di provider
### **Fase 3: Interrogazione Asincrona + Aggregazione JSON**
**Obiettivo**: Performance boost e formato dati professionale
#### **3A. Implementazione Asincrona**
- ✅ Refactor MarketAgent per supporto `async/await`
- ✅ Chiamate parallele a tutti i provider disponibili
- ✅ Timeout management per provider lenti
- ✅ Error handling per provider che falliscono
#### **3B. Aggregazione Dati Intelligente**
- ✅ Calcolo `confidence` basato su concordanza prezzi
- ✅ Analisi `spread` tra provider
- ✅ Detection `price_divergence` per anomalie
- ✅ Volume trend analysis
- ✅ Formato JSON strutturato:
```json
{
"aggregated_data": {
"BTC_USD": {
"price": 43250.12,
"confidence": 0.95,
"sources_count": 4
}
},
"individual_sources": {
"coinbase": {"price": 43245.67, "volume": "1.2M"},
"binance": {"price": 43255.89, "volume": "2.1M"},
"cryptocompare": {"price": 43248.34, "volume": "0.8M"}
},
"market_signals": {
"spread_analysis": "Low spread (0.02%) indicates healthy liquidity",
"volume_trend": "Volume up 15% from 24h average",
"price_divergence": "Max deviation: 0.05% - Normal range"
}
}
```
**Deliverable**: Sistema asincrono con analisi avanzata dei dati di mercato
## 🎯 Benefici Attesi
### **Performance**
- ⚡ Tempo risposta: da ~4s sequenziali a ~1s paralleli
- 🔄 Resilienza: sistema funziona anche se 1-2 provider falliscono
- 📊 Qualità dati: validazione incrociata tra provider
### **Professionalità**
- 📈 Confidence scoring per decisioni informate
- 🔍 Market signals per trading insights
- 📋 Formato standardizzato per integrazioni future
### **Scalabilità**
- Facile aggiunta nuovi provider
- 🔧 Configurazione flessibile via environment
- 📝 Logging completo per debugging
## 🧪 Test Strategy
1. **Unit Tests**: Ogni provider singolarmente
2. **Integration Tests**: Aggregazione multi-provider
3. **Performance Tests**: Confronto sync vs async
4. **Resilience Tests**: Fallimento provider singoli
5. **E2E Tests**: Full workflow con UI Gradio
## 📅 Timeline Stimata
- **Fase 1**: ~1h (setup Binance mock)
- **Fase 2**: ~1h (detection condizionale)
- **Fase 3**: ~2-3h (async + aggregazione)
- **Testing**: ~1h (validation completa)
**Total**: ~5-6h di lavoro strutturato
---
*Documento creato: 2025-09-23*
*Versione: 1.0*

View File

@@ -10,15 +10,16 @@ requires-python = "==3.12.*"
# Per ogni roba ho fatto un commento per evitare di dimenticarmi cosa fa chi. # Per ogni roba ho fatto un commento per evitare di dimenticarmi cosa fa chi.
# Inoltre ho messo una emoji per indicare se è raccomandato o meno. # Inoltre ho messo una emoji per indicare se è raccomandato o meno.
dependencies = [ dependencies = [
# ✅ per i test
"pytest",
# ✅ per gestire variabili d'ambiente (generalmente API keys od opzioni) # ✅ per gestire variabili d'ambiente (generalmente API keys od opzioni)
"dotenv", "dotenv",
# 🟡 per fare scraping di pagine web # 🟡 per fare scraping di pagine web
#"bs4", #"bs4",
# ✅ per fare una UI web semplice con input e output # ✅ per fare una UI web semplice con input e output
"gradio", "gradio",
# ✅ per la crittografia (necessaria per autenticazione Coinbase)
"cryptography",
# ❌ per l'elaborazione del linguaggio naturale in locale (https://huggingface.co/learn/llm-course/chapter1/3?fw=pt) # ❌ per l'elaborazione del linguaggio naturale in locale (https://huggingface.co/learn/llm-course/chapter1/3?fw=pt)
#"transformers", #"transformers",
# ❌ per fare chiamate a modelli indipendentemente dal modello specifico (astrae meglio rispetto a openai) # ❌ per fare chiamate a modelli indipendentemente dal modello specifico (astrae meglio rispetto a openai)
@@ -33,4 +34,8 @@ dependencies = [
"openai", "openai",
"anthropic", "anthropic",
"google", "google",
"coinbase-advanced-py",
"cryptocompare",
"cdp-sdk",
"python-binance"
] ]

34
pytest.ini Normal file
View File

@@ -0,0 +1,34 @@
[tool:pytest]
# Configurazione pytest per upo-appAI
# Directory dei test
testpaths = tests
# Pattern per trovare i file di test
python_files = test_*.py *_test.py
# Pattern per trovare le classi di test
python_classes = Test*
# Pattern per trovare le funzioni di test
python_functions = test_*
# Opzioni di default
addopts =
-v
--tb=short
--strict-markers
--disable-warnings
# Marker personalizzati
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
api: marks tests that require API access
coinbase: marks tests that require Coinbase credentials
cryptocompare: marks tests that require CryptoCompare credentials
integration: marks tests as integration tests
# Filtri per warnings
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning

View File

@@ -1,4 +1,10 @@
import gradio as gr import gradio as gr
import sys
import os
# Aggiungi src al Python path per gli import
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.tool import ToolAgent from app.tool import ToolAgent
tool_agent = ToolAgent() tool_agent = ToolAgent()

View File

@@ -1,5 +1,268 @@
from typing import Dict, List, Optional, Any
import requests
import logging
import os
from dotenv import load_dotenv
from app.signers.market_signers.coinbase_rest_signer import CoinbaseCDPSigner
from app.signers.market_signers.cryptocompare_signer import CryptoCompareSigner
load_dotenv()
logger = logging.getLogger(__name__)
class MarketAgent: class MarketAgent:
@staticmethod """
def analyze(query: str) -> str: Market Agent unificato che supporta multiple fonti di dati:
# Mock analisi mercato - Coinbase Advanced Trade API (dati di mercato reali)
return "📊 Analisi di mercato: BTC stabile, ETH in leggera crescita, altcoin volatili." - CryptoCompare (market data)
Auto-configura i provider basandosi sulle variabili d'ambiente disponibili.
"""
def __init__(self):
self.providers = {}
self._setup_providers()
if not self.providers:
logger.warning("No market data providers configured. Check your .env file.")
def _setup_providers(self):
"""Configura automaticamente i provider disponibili"""
# Setup Coinbase Advanced Trade API (nuovo formato)
cdp_api_key_name = os.getenv('CDP_API_KEY_NAME')
cdp_api_private_key = os.getenv('CDP_API_PRIVATE_KEY')
if cdp_api_key_name and cdp_api_private_key:
try:
signer = CoinbaseCDPSigner(cdp_api_key_name, cdp_api_private_key)
self.providers['coinbase'] = {
'type': 'coinbase_advanced_trade',
'signer': signer,
'capabilities': ['assets', 'market_data', 'trading', 'real_time_prices']
}
logger.info("✅ Coinbase Advanced Trade API provider configured")
except Exception as e:
logger.error(f"Failed to setup Coinbase Advanced Trade API provider: {e}")
# Setup CryptoCompare se la API key è disponibile
cryptocompare_key = os.getenv('CRYPTOCOMPARE_API_KEY')
if cryptocompare_key:
try:
auth_method = os.getenv('CRYPTOCOMPARE_AUTH_METHOD', 'query')
signer = CryptoCompareSigner(cryptocompare_key, auth_method)
self.providers['cryptocompare'] = {
'type': 'cryptocompare',
'signer': signer,
'base_url': 'https://min-api.cryptocompare.com',
'capabilities': ['prices', 'historical', 'top_coins']
}
logger.info("✅ CryptoCompare provider configured")
except Exception as e:
logger.error(f"Failed to setup CryptoCompare provider: {e}")
def get_available_providers(self) -> List[str]:
"""Restituisce la lista dei provider configurati"""
return list(self.providers.keys())
def get_provider_capabilities(self, provider: str) -> List[str]:
"""Restituisce le capacità di un provider specifico"""
if provider in self.providers:
return self.providers[provider]['capabilities']
return []
# === COINBASE CDP METHODS ===
def get_coinbase_asset_info(self, symbol: str) -> Dict:
"""Ottiene informazioni su un asset da Coinbase CDP"""
if 'coinbase' not in self.providers:
raise ValueError("Coinbase provider not configured")
signer = self.providers['coinbase']['signer']
return signer.get_asset_info(symbol)
def get_coinbase_multiple_assets(self, symbols: List[str]) -> Dict:
"""Ottiene informazioni su multipli asset da Coinbase CDP"""
if 'coinbase' not in self.providers:
raise ValueError("Coinbase provider not configured")
signer = self.providers['coinbase']['signer']
return signer.get_multiple_assets(symbols)
def get_asset_price(self, symbol: str, provider: str = None) -> Optional[float]:
"""
Ottiene il prezzo di un asset usando il provider specificato o il primo disponibile
"""
if provider == 'coinbase' and 'coinbase' in self.providers:
try:
asset_info = self.get_coinbase_asset_info(symbol)
return float(asset_info.get('price', 0))
except Exception as e:
logger.error(f"Error getting {symbol} price from Coinbase: {e}")
return None
elif provider == 'cryptocompare' and 'cryptocompare' in self.providers:
try:
return self.get_single_crypto_price(symbol)
except Exception as e:
logger.error(f"Error getting {symbol} price from CryptoCompare: {e}")
return None
# Auto-select provider
if 'cryptocompare' in self.providers:
try:
return self.get_single_crypto_price(symbol)
except Exception:
pass
if 'coinbase' in self.providers:
try:
asset_info = self.get_coinbase_asset_info(symbol)
return float(asset_info.get('price', 0))
except Exception:
pass
return None
# === CRYPTOCOMPARE METHODS ===
def _cryptocompare_request(self, endpoint: str, params: Dict = None) -> Dict:
"""Esegue una richiesta CryptoCompare autenticata"""
if 'cryptocompare' not in self.providers:
raise ValueError("CryptoCompare provider not configured")
provider = self.providers['cryptocompare']
request_data = provider['signer'].prepare_request(
provider['base_url'], endpoint, params
)
response = requests.get(
request_data['url'],
headers=request_data['headers'],
timeout=10
)
response.raise_for_status()
return response.json()
def get_crypto_prices(self, from_symbols: List[str], to_symbols: List[str] = None) -> Dict:
"""Ottiene prezzi da CryptoCompare"""
if to_symbols is None:
to_symbols = ["USD", "EUR"]
params = {
"fsyms": ",".join(from_symbols),
"tsyms": ",".join(to_symbols)
}
return self._cryptocompare_request("/data/pricemulti", params)
def get_single_crypto_price(self, from_symbol: str, to_symbol: str = "USD") -> float:
"""Ottiene il prezzo di una singola crypto da CryptoCompare"""
params = {
"fsym": from_symbol,
"tsyms": to_symbol
}
data = self._cryptocompare_request("/data/price", params)
return data.get(to_symbol, 0.0)
def get_top_cryptocurrencies(self, limit: int = 10, to_symbol: str = "USD") -> Dict:
"""Ottiene le top crypto per market cap da CryptoCompare"""
params = {
"limit": str(limit),
"tsym": to_symbol
}
return self._cryptocompare_request("/data/top/mktcapfull", params)
# === UNIFIED INTERFACE ===
def get_market_overview(self, symbols: List[str] = None) -> Dict:
"""
Ottiene una panoramica del mercato usando il miglior provider disponibile
"""
if symbols is None:
symbols = ["BTC", "ETH", "ADA"]
result = {
"timestamp": None,
"data": {},
"source": None,
"providers_used": []
}
# Prova CryptoCompare per prezzi multipli (più completo)
if 'cryptocompare' in self.providers:
try:
prices = self.get_crypto_prices(symbols, ["USD", "EUR"])
result["data"] = prices
result["source"] = "cryptocompare"
result["providers_used"].append("cryptocompare")
logger.info("Market overview retrieved from CryptoCompare")
except Exception as e:
logger.warning(f"CryptoCompare failed, trying fallback: {e}")
# Fallback a Coinbase Advanced Trade se CryptoCompare fallisce
if not result["data"] and 'coinbase' in self.providers:
try:
# Usa il nuovo metodo Advanced Trade per ottenere multipli asset
coinbase_data = self.get_coinbase_multiple_assets(symbols)
if coinbase_data:
# Trasforma i dati Advanced Trade nel formato standard
formatted_data = {}
for symbol in symbols:
if symbol in coinbase_data:
formatted_data[symbol] = {
"USD": coinbase_data[symbol].get("price", 0)
}
result["data"] = formatted_data
result["source"] = "coinbase_advanced_trade"
result["providers_used"].append("coinbase")
logger.info("Market overview retrieved from Coinbase Advanced Trade API")
except Exception as e:
logger.error(f"Coinbase Advanced Trade fallback failed: {e}")
return result
def analyze(self, query: str) -> str:
"""
Analizza il mercato usando tutti i provider disponibili
"""
if not self.providers:
return "⚠️ Nessun provider di dati di mercato configurato. Controlla il file .env."
try:
# Ottieni panoramica del mercato
overview = self.get_market_overview(["BTC", "ETH", "ADA", "DOT"])
if not overview["data"]:
return "⚠️ Impossibile recuperare dati di mercato da nessun provider."
# Formatta i risultati
result_lines = [
f"📊 **Market Analysis** (via {overview['source'].upper()})\n"
]
for crypto, prices in overview["data"].items():
if isinstance(prices, dict):
usd_price = prices.get("USD", "N/A")
eur_price = prices.get("EUR", "N/A")
if eur_price != "N/A":
result_lines.append(f"**{crypto}**: ${usd_price} (€{eur_price})")
else:
result_lines.append(f"**{crypto}**: ${usd_price}")
# Aggiungi info sui provider
providers_info = f"\n🔧 **Available providers**: {', '.join(self.get_available_providers())}"
result_lines.append(providers_info)
return "\n".join(result_lines)
except Exception as e:
logger.error(f"Market analysis failed: {e}")
return f"⚠️ Errore nell'analisi del mercato: {e}"

View File

View File

@@ -0,0 +1,27 @@
# Versione pubblica senza autenticazione
from binance.client import Client
class PublicBinanceAgent:
def __init__(self):
# Client pubblico (senza credenziali)
self.client = Client()
def get_public_prices(self):
"""Ottiene prezzi pubblici"""
try:
btc_price = self.client.get_symbol_ticker(symbol="BTCUSDT")
eth_price = self.client.get_symbol_ticker(symbol="ETHUSDT")
return {
'BTC_USD': float(btc_price['price']),
'ETH_USD': float(eth_price['price']),
'source': 'binance_public'
}
except Exception as e:
print(f"Errore: {e}")
return None
# Uso senza credenziali
public_agent = PublicBinanceAgent()
public_prices = public_agent.get_public_prices()
print(public_prices)

View File

@@ -0,0 +1,186 @@
import os
import logging
from cdp import *
from typing import Dict, List, Any, Optional
logger = logging.getLogger(__name__)
class CoinbaseCDPSigner:
"""
Signer per Coinbase Developer Platform (CDP) SDK.
Utilizza il nuovo sistema di autenticazione di Coinbase basato su CDP.
"""
def __init__(self, api_key_name: str = None, api_private_key: str = None):
"""
Inizializza il CDP signer.
Args:
api_key_name: Nome della API key (formato: organizations/org-id/apiKeys/key-id)
api_private_key: Private key in formato PEM
"""
self.api_key_name = api_key_name or os.getenv('CDP_API_KEY_NAME')
self.api_private_key = api_private_key or os.getenv('CDP_API_PRIVATE_KEY')
if not self.api_key_name or not self.api_private_key:
raise ValueError("CDP_API_KEY_NAME and CDP_API_PRIVATE_KEY are required")
# Configura CDP client
try:
self.client = CdpClient(
api_key_id=self.api_key_name,
api_key_secret=self.api_private_key,
debugging=False
)
self._configured = True
logger.info(f"✅ CDP Client configured with key: {self.api_key_name[:50]}...")
except Exception as e:
self._configured = False
logger.error(f"Failed to configure CDP Client: {e}")
raise ValueError(f"Failed to configure CDP SDK: {e}")
def is_configured(self) -> bool:
"""Verifica se CDP è configurato correttamente"""
return getattr(self, '_configured', False)
def get_asset_info(self, asset_id: str) -> Dict[str, Any]:
"""
Ottiene informazioni su un asset specifico.
Args:
asset_id: ID dell'asset (es. "BTC", "ETH")
Returns:
Dict con informazioni sull'asset
"""
if not self.is_configured():
return {
'asset_id': asset_id,
'error': 'CDP Client not configured',
'success': False
}
try:
# Per ora, restituiamo un mock data structure
# In futuro, quando CDP avrà metodi per asset info, useremo quelli
return {
'asset_id': asset_id,
'price': self._get_mock_price(asset_id),
'symbol': asset_id,
'name': self._get_asset_name(asset_id),
'success': True,
'source': 'cdp_mock'
}
except Exception as e:
logger.error(f"Error getting asset info for {asset_id}: {e}")
return {
'asset_id': asset_id,
'error': str(e),
'success': False
}
def get_multiple_assets(self, asset_ids: List[str]) -> Dict[str, Any]:
"""
Ottiene informazioni su multipli asset.
Args:
asset_ids: Lista di ID degli asset
Returns:
Dict con informazioni sugli asset
"""
if not self.is_configured():
return {
'error': 'CDP Client not configured',
'success': False
}
results = {}
for asset_id in asset_ids:
asset_info = self.get_asset_info(asset_id)
if asset_info.get('success'):
results[asset_id] = asset_info
return results
def _get_mock_price(self, asset_id: str) -> float:
"""
Mock prices per i test - da sostituire con vere API CDP quando disponibili
"""
mock_prices = {
'BTC': 63500.0,
'ETH': 2650.0,
'ADA': 0.45,
'DOT': 5.2,
'SOL': 145.0,
'MATIC': 0.85,
'LINK': 11.2,
'UNI': 7.8
}
return mock_prices.get(asset_id.upper(), 100.0)
def _get_asset_name(self, asset_id: str) -> str:
"""
Mock asset names
"""
names = {
'BTC': 'Bitcoin',
'ETH': 'Ethereum',
'ADA': 'Cardano',
'DOT': 'Polkadot',
'SOL': 'Solana',
'MATIC': 'Polygon',
'LINK': 'Chainlink',
'UNI': 'Uniswap'
}
return names.get(asset_id.upper(), f"{asset_id} Token")
# Metodi di compatibilità con l'interfaccia precedente
def build_headers(self, method: str, request_path: str, body: Optional[Dict] = None) -> Dict[str, str]:
"""
Metodo di compatibilità - CDP SDK gestisce internamente l'autenticazione.
Restituisce headers basic.
"""
return {
'Content-Type': 'application/json',
'User-Agent': 'upo-appAI/1.0-cdp'
}
def sign_request(self, method: str, request_path: str, body: Optional[Dict] = None) -> Dict[str, Any]:
"""
Metodo di compatibilità - CDP SDK gestisce internamente l'autenticazione.
"""
return {
'method': method,
'path': request_path,
'body': body or {},
'headers': self.build_headers(method, request_path, body),
'cdp_configured': self.is_configured()
}
def test_connection(self) -> Dict[str, Any]:
"""
Testa la connessione CDP
"""
try:
if not self.is_configured():
return {
'success': False,
'error': 'CDP Client not configured'
}
# Test basic con mock data
test_asset = self.get_asset_info('BTC')
return {
'success': test_asset.get('success', False),
'client_configured': True,
'test_asset': test_asset.get('asset_id'),
'message': 'CDP Client is working with mock data'
}
except Exception as e:
return {
'success': False,
'error': str(e),
'client_configured': False
}

View File

@@ -0,0 +1,243 @@
import os
import logging
from coinbase.rest import RESTClient
from typing import Dict, List, Any, Optional
logger = logging.getLogger(__name__)
class CoinbaseCDPSigner:
"""
Signer per Coinbase Advanced Trade API.
Utilizza le credenziali CDP per accedere alle vere API di market data di Coinbase.
"""
def __init__(self, api_key_name: str = None, api_private_key: str = None):
"""
Inizializza il Coinbase REST client.
Args:
api_key_name: Nome della API key (da CDP_API_KEY_NAME)
api_private_key: Private key (da CDP_API_PRIVATE_KEY)
"""
self.api_key_name = api_key_name or os.getenv('CDP_API_KEY_NAME')
self.api_private_key = api_private_key or os.getenv('CDP_API_PRIVATE_KEY')
if not self.api_key_name or not self.api_private_key:
raise ValueError("CDP_API_KEY_NAME and CDP_API_PRIVATE_KEY are required")
# Configura Coinbase REST client
try:
self.client = RESTClient(
api_key=self.api_key_name,
api_secret=self.api_private_key
)
self._configured = True
logger.info(f"✅ Coinbase REST Client configured with key: {self.api_key_name[:50]}...")
except Exception as e:
self._configured = False
logger.error(f"Failed to configure Coinbase REST Client: {e}")
raise ValueError(f"Failed to configure Coinbase REST Client: {e}")
def is_configured(self) -> bool:
"""Verifica se Coinbase REST client è configurato correttamente"""
return getattr(self, '_configured', False)
def get_asset_info(self, asset_id: str) -> Dict[str, Any]:
"""
Ottiene informazioni su un asset specifico usando Coinbase Advanced Trade API.
Args:
asset_id: ID dell'asset (es. "BTC", "ETH")
Returns:
Dict con informazioni sull'asset
"""
if not self.is_configured():
return {
'asset_id': asset_id,
'error': 'Coinbase REST Client not configured',
'success': False
}
try:
# Prova con USD prima, poi EUR se non funziona
product_id = f"{asset_id.upper()}-USD"
product_data = self.client.get_product(product_id)
return {
'asset_id': asset_id,
'symbol': product_data.product_id,
'price': float(product_data.price),
'volume_24h': float(product_data.volume_24h) if product_data.volume_24h else 0,
'status': product_data.status,
'base_currency': product_data.base_currency_id,
'quote_currency': product_data.quote_currency_id,
'success': True,
'source': 'coinbase_advanced_trade'
}
except Exception as e:
logger.error(f"Error getting asset info for {asset_id}: {e}")
return {
'asset_id': asset_id,
'error': str(e),
'success': False
}
def get_multiple_assets(self, asset_ids: List[str]) -> Dict[str, Any]:
"""
Ottiene informazioni su multipli asset.
Args:
asset_ids: Lista di ID degli asset
Returns:
Dict con informazioni sugli asset
"""
if not self.is_configured():
return {
'error': 'Coinbase REST Client not configured',
'success': False
}
results = {}
for asset_id in asset_ids:
asset_info = self.get_asset_info(asset_id)
if asset_info.get('success'):
results[asset_id] = asset_info
return results
def get_all_products(self) -> Dict[str, Any]:
"""
Ottiene lista di tutti i prodotti disponibili su Coinbase.
"""
if not self.is_configured():
return {
'error': 'Coinbase REST Client not configured',
'success': False
}
try:
products = self.client.get_products()
products_data = []
for product in products.products:
if product.status == "online": # Solo prodotti attivi
products_data.append({
'product_id': product.product_id,
'price': float(product.price) if product.price else 0,
'volume_24h': float(product.volume_24h) if product.volume_24h else 0,
'status': product.status,
'base_currency': product.base_currency_id,
'quote_currency': product.quote_currency_id
})
return {
'products': products_data,
'total': len(products_data),
'success': True
}
except Exception as e:
logger.error(f"Error getting products: {e}")
return {
'error': str(e),
'success': False
}
def get_market_trades(self, symbol: str = "BTC-USD", limit: int = 10) -> Dict[str, Any]:
"""
Ottiene gli ultimi trade di mercato per un prodotto.
Args:
symbol: Simbolo del prodotto (es. "BTC-USD")
limit: Numero massimo di trade da restituire
Returns:
Dict con i trade
"""
if not self.is_configured():
return {
'error': 'Coinbase REST Client not configured',
'success': False
}
try:
trades = self.client.get_market_trades(product_id=symbol, limit=limit)
trades_data = []
for trade in trades.trades:
trades_data.append({
'trade_id': trade.trade_id,
'price': float(trade.price),
'size': float(trade.size),
'time': trade.time,
'side': trade.side
})
return {
'symbol': symbol,
'trades': trades_data,
'count': len(trades_data),
'success': True
}
except Exception as e:
logger.error(f"Error getting market trades for {symbol}: {e}")
return {
'symbol': symbol,
'error': str(e),
'success': False
}
# Metodi di compatibilità con l'interfaccia precedente
def build_headers(self, method: str, request_path: str, body: Optional[Dict] = None) -> Dict[str, str]:
"""
Metodo di compatibilità - Coinbase REST client gestisce internamente l'autenticazione.
"""
return {
'Content-Type': 'application/json',
'User-Agent': 'upo-appAI/1.0-coinbase-advanced'
}
def sign_request(self, method: str, request_path: str, body: Optional[Dict] = None) -> Dict[str, Any]:
"""
Metodo di compatibilità - Coinbase REST client gestisce internamente l'autenticazione.
"""
return {
'method': method,
'path': request_path,
'body': body or {},
'headers': self.build_headers(method, request_path, body),
'coinbase_configured': self.is_configured()
}
def test_connection(self) -> Dict[str, Any]:
"""
Testa la connessione Coinbase con dati reali.
"""
try:
if not self.is_configured():
return {
'success': False,
'error': 'Coinbase REST Client not configured'
}
# Test con BTC-USD
test_asset = self.get_asset_info('BTC')
return {
'success': test_asset.get('success', False),
'client_configured': True,
'test_asset': test_asset.get('asset_id'),
'test_price': test_asset.get('price'),
'message': 'Coinbase Advanced Trade API is working with real data'
}
except Exception as e:
return {
'success': False,
'error': str(e),
'client_configured': False
}

View File

@@ -0,0 +1,159 @@
import base64
import hashlib
import hmac
import json
import time
from typing import Any, Mapping, Optional
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
class CoinbaseSigner:
"""
Genera le intestazioni di autenticazione per Coinbase Advanced Trade API.
Supporta due formati di autenticazione:
1. Legacy: API key, secret (base64), passphrase (per retrocompatibilità)
2. New: API key name, private key (nuovo formato Coinbase)
Contratto:
- Input: method, request_path, body opzionale, timestamp opzionale
- Output: dict di header richiesti dall'API
- Errori: solleva eccezioni se le credenziali non sono valide
"""
def __init__(self, api_key: str, secret_or_private_key: str, passphrase: str = None) -> None:
self.api_key = api_key
self.passphrase = passphrase
# Determina se stiamo usando il nuovo formato o il legacy
if passphrase is None:
# Nuovo formato: solo API key + private key
self.auth_method = "new"
self.private_key = self._load_private_key(secret_or_private_key)
self.secret_b64 = None
else:
# Formato legacy: API key + secret + passphrase
self.auth_method = "legacy"
self.secret_b64 = secret_or_private_key
self.private_key = None
def _load_private_key(self, private_key_str: str):
"""Carica la private key dal formato PEM"""
try:
# Rimuovi eventuali spazi e aggiungi header/footer se mancanti
key_str = private_key_str.strip()
if not key_str.startswith("-----BEGIN"):
key_str = f"-----BEGIN EC PRIVATE KEY-----\n{key_str}\n-----END EC PRIVATE KEY-----"
private_key = serialization.load_pem_private_key(
key_str.encode('utf-8'),
password=None,
)
return private_key
except Exception as e:
raise ValueError(f"Invalid private key format: {e}")
@staticmethod
def _normalize_path(path: str) -> str:
if not path:
return "/"
return path if path.startswith("/") else f"/{path}"
def _build_legacy_headers(
self,
method: str,
request_path: str,
body: Optional[Mapping[str, Any]] = None,
timestamp: Optional[str] = None,
) -> dict:
"""Costruisce header usando il formato legacy (HMAC)"""
# Timestamp in secondi come stringa
ts = timestamp or str(int(time.time()))
m = method.upper()
req_path = self._normalize_path(request_path)
# Il body deve essere stringa vuota per GET/DELETE o quando assente
if body is None or m in ("GET", "DELETE"):
body_str = ""
else:
# JSON deterministico, senza spazi
body_str = json.dumps(body, separators=(",", ":"), ensure_ascii=False)
# Concatenazione: timestamp + method + request_path + body
message = f"{ts}{m}{req_path}{body_str}"
# Decodifica secret (base64) e firma HMAC-SHA256
key = base64.b64decode(self.secret_b64)
signature = hmac.new(key, message.encode("utf-8"), hashlib.sha256).digest()
cb_access_sign = base64.b64encode(signature).decode("utf-8")
return {
"CB-ACCESS-KEY": self.api_key,
"CB-ACCESS-SIGN": cb_access_sign,
"CB-ACCESS-TIMESTAMP": ts,
"CB-ACCESS-PASSPHRASE": self.passphrase,
"Content-Type": "application/json",
}
def _build_new_headers(
self,
method: str,
request_path: str,
body: Optional[Mapping[str, Any]] = None,
timestamp: Optional[str] = None,
) -> dict:
"""Costruisce header usando il nuovo formato (EC signature)"""
# Timestamp in secondi come stringa
ts = timestamp or str(int(time.time()))
m = method.upper()
req_path = self._normalize_path(request_path)
# Il body deve essere stringa vuota per GET/DELETE o quando assente
if body is None or m in ("GET", "DELETE"):
body_str = ""
else:
# JSON deterministico, senza spazi
body_str = json.dumps(body, separators=(",", ":"), ensure_ascii=False)
# Concatenazione: timestamp + method + request_path + body
message = f"{ts}{m}{req_path}{body_str}"
# Firma con ECDSA
signature = self.private_key.sign(
message.encode("utf-8"),
ec.ECDSA(hashes.SHA256())
)
# Converti signature in base64
cb_access_sign = base64.b64encode(signature).decode("utf-8")
return {
"CB-ACCESS-KEY": self.api_key,
"CB-ACCESS-SIGN": cb_access_sign,
"CB-ACCESS-TIMESTAMP": ts,
"Content-Type": "application/json",
}
def build_headers(
self,
method: str,
request_path: str,
body: Optional[Mapping[str, Any]] = None,
timestamp: Optional[str] = None,
) -> dict:
"""Costruisce gli header di autenticazione usando il metodo appropriato"""
if self.auth_method == "legacy":
return self._build_legacy_headers(method, request_path, body, timestamp)
else:
return self._build_new_headers(method, request_path, body, timestamp)
def sign_request(
self,
method: str,
request_path: str,
body: Optional[Mapping[str, Any]] = None,
passphrase: Optional[str] = None,
) -> dict:
return self.build_headers(method, request_path, body)

View File

@@ -0,0 +1,135 @@
import time
from typing import Any, Dict, Optional
from urllib.parse import urlencode
class CryptoCompareSigner:
"""Genera le intestazioni e parametri di autenticazione per CryptoCompare API.
CryptoCompare utilizza un'autenticazione semplice basata su API key che può essere
passata come parametro nella query string o nell'header Authorization.
Contratto:
- Input: api_key, metodo di autenticazione (query o header)
- Output: dict di header e parametri per la richiesta
- Errori: solleva ValueError se api_key non è fornita
"""
def __init__(self, api_key: str, auth_method: str = "query") -> None:
"""
Inizializza il signer per CryptoCompare.
Args:
api_key: La chiave API di CryptoCompare
auth_method: Metodo di autenticazione ("query" o "header")
- "query": aggiunge api_key come parametro URL
- "header": aggiunge api_key nell'header Authorization
"""
if not api_key:
raise ValueError("API key è richiesta per CryptoCompare")
self.api_key = api_key
self.auth_method = auth_method.lower()
if self.auth_method not in ("query", "header"):
raise ValueError("auth_method deve essere 'query' o 'header'")
def build_headers(self, include_timestamp: bool = False) -> Dict[str, str]:
"""
Costruisce gli header per la richiesta CryptoCompare.
Args:
include_timestamp: Se includere un timestamp nell'header (opzionale)
Returns:
Dict con gli header necessari
"""
headers = {
"Content-Type": "application/json",
"User-Agent": "upo-appAI/1.0"
}
# Se si usa autenticazione via header
if self.auth_method == "header":
headers["Authorization"] = f"Apikey {self.api_key}"
# Aggiungi timestamp se richiesto (utile per debugging)
if include_timestamp:
headers["X-Request-Timestamp"] = str(int(time.time()))
return headers
def build_url_params(self, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Costruisce i parametri URL includendo l'API key se necessario.
Args:
params: Parametri aggiuntivi per la query
Returns:
Dict con tutti i parametri per l'URL
"""
if params is None:
params = {}
# Se si usa autenticazione via query string
if self.auth_method == "query":
params["api_key"] = self.api_key
return params
def build_full_url(self, base_url: str, endpoint: str, params: Optional[Dict[str, Any]] = None) -> str:
"""
Costruisce l'URL completo con tutti i parametri.
Args:
base_url: URL base dell'API (es. "https://min-api.cryptocompare.com")
endpoint: Endpoint specifico (es. "/data/pricemulti")
params: Parametri aggiuntivi per la query
Returns:
URL completo con parametri
"""
base_url = base_url.rstrip("/")
endpoint = endpoint if endpoint.startswith("/") else f"/{endpoint}"
url_params = self.build_url_params(params)
if url_params:
query_string = urlencode(url_params)
return f"{base_url}{endpoint}?{query_string}"
else:
return f"{base_url}{endpoint}"
def prepare_request(self,
base_url: str,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
include_timestamp: bool = False) -> Dict[str, Any]:
"""
Prepara tutti i componenti per una richiesta CryptoCompare.
Args:
base_url: URL base dell'API
endpoint: Endpoint specifico
params: Parametri per la query
include_timestamp: Se includere timestamp negli header
Returns:
Dict con url, headers e params pronti per la richiesta
"""
return {
"url": self.build_full_url(base_url, endpoint, params),
"headers": self.build_headers(include_timestamp),
"params": self.build_url_params(params) if self.auth_method == "query" else params or {}
}
# Alias per compatibilità con il pattern Coinbase
def sign_request(self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
base_url: str = "https://min-api.cryptocompare.com") -> Dict[str, Any]:
"""
Alias per prepare_request per mantenere compatibilità con il pattern del CoinbaseSigner.
"""
return self.prepare_request(base_url, endpoint, params)

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Test suite per il MarketAgent unificato.
Compatibile con pytest per l'esecuzione automatizzata dei test.
"""
import os
import sys
import pytest
from pathlib import Path
# Aggiungi il path src al PYTHONPATH per gli import
src_path = Path(__file__).parent.parent.parent / "src"
sys.path.insert(0, str(src_path))
from dotenv import load_dotenv
# Carica le variabili d'ambiente
load_dotenv()
class TestMarketAgent:
"""Test suite per il MarketAgent unificato"""
@pytest.fixture(scope="class")
def market_agent(self):
"""Fixture per inizializzare il MarketAgent"""
from app.agents.market_agent import MarketAgent
return MarketAgent()
def test_agent_initialization(self, market_agent):
"""Testa che l'agent si inizializzi correttamente"""
assert market_agent is not None
providers = market_agent.get_available_providers()
assert isinstance(providers, list)
def test_providers_configuration(self, market_agent):
"""Testa che almeno un provider sia configurato"""
providers = market_agent.get_available_providers()
# Se nessun provider è configurato, skippa i test
if not providers:
pytest.skip("No market data providers configured. Check your .env file.")
assert len(providers) > 0
print(f"Available providers: {providers}")
def test_provider_capabilities(self, market_agent):
"""Testa che ogni provider abbia delle capacità definite"""
providers = market_agent.get_available_providers()
if not providers:
pytest.skip("No providers configured")
for provider in providers:
capabilities = market_agent.get_provider_capabilities(provider)
assert isinstance(capabilities, list)
assert len(capabilities) > 0
print(f"{provider} capabilities: {capabilities}")
def test_market_overview(self, market_agent):
"""Testa la funzionalità di panoramica del mercato"""
providers = market_agent.get_available_providers()
if not providers:
pytest.skip("No providers configured")
overview = market_agent.get_market_overview(["BTC", "ETH"])
assert isinstance(overview, dict)
assert "data" in overview
assert "source" in overview
assert "providers_used" in overview
# Se abbiamo dati, verifichiamo la struttura
if overview.get("data"):
assert isinstance(overview["data"], dict)
assert overview.get("source") is not None
print(f"Market overview source: {overview.get('source')}")
def test_market_analysis(self, market_agent):
"""Testa la funzione di analisi del mercato"""
providers = market_agent.get_available_providers()
if not providers:
pytest.skip("No providers configured")
analysis = market_agent.analyze("market overview")
assert isinstance(analysis, str)
assert len(analysis) > 0
assert not analysis.startswith("⚠️ Nessun provider")
print(f"Analysis preview: {analysis[:100]}...")
@pytest.mark.skipif(
not os.getenv('CRYPTOCOMPARE_API_KEY'),
reason="CRYPTOCOMPARE_API_KEY not configured"
)
def test_cryptocompare_specific_methods(self, market_agent):
"""Testa i metodi specifici di CryptoCompare"""
providers = market_agent.get_available_providers()
if 'cryptocompare' not in providers:
pytest.skip("CryptoCompare provider not available")
# Test single price
btc_price = market_agent.get_single_crypto_price("BTC", "USD")
assert isinstance(btc_price, (int, float))
assert btc_price > 0
print(f"BTC Price (CryptoCompare): ${btc_price}")
# Test multiple prices
prices = market_agent.get_crypto_prices(["BTC", "ETH"], ["USD"])
assert isinstance(prices, dict)
assert "BTC" in prices or "ETH" in prices
# Test top cryptocurrencies
top_coins = market_agent.get_top_cryptocurrencies(5)
assert isinstance(top_coins, dict)
@pytest.mark.skipif(
not (
(os.getenv('COINBASE_API_KEY') and os.getenv('COINBASE_PRIVATE_KEY')) or
(os.getenv('COINBASE_API_KEY') and os.getenv('COINBASE_SECRET') and os.getenv('COINBASE_PASSPHRASE'))
),
reason="Coinbase credentials not configured (need either new format: API_KEY+PRIVATE_KEY or legacy: API_KEY+SECRET+PASSPHRASE)"
)
def test_coinbase_specific_methods(self, market_agent):
"""Testa i metodi specifici di Coinbase"""
providers = market_agent.get_available_providers()
if 'coinbase' not in providers:
pytest.skip("Coinbase provider not available")
# Test ticker
ticker = market_agent.get_coinbase_ticker("BTC-USD")
assert isinstance(ticker, dict)
assert "price" in ticker
price = float(ticker.get("price", 0))
assert price > 0
print(f"BTC Price (Coinbase): ${price}")
def test_fallback_mechanism(self, market_agent):
"""Testa il meccanismo di fallback tra provider"""
providers = market_agent.get_available_providers()
if len(providers) < 2:
pytest.skip("Need at least 2 providers to test fallback")
# Il test del fallback è implicito nel get_market_overview
# che prova CryptoCompare prima e poi Coinbase
overview = market_agent.get_market_overview(["BTC"])
assert overview.get("data") is not None
assert len(overview.get("providers_used", [])) > 0
def test_error_handling(self, market_agent):
"""Testa la gestione degli errori"""
providers = market_agent.get_available_providers()
if not providers:
pytest.skip("No providers configured")
# Test con simbolo crypto inesistente
overview = market_agent.get_market_overview(["NONEXISTENT_CRYPTO"])
# Dovrebbe gestire l'errore gracefully
assert isinstance(overview, dict)
# I dati potrebbero essere vuoti ma non dovrebbe crashare

58
tests/conftest.py Normal file
View File

@@ -0,0 +1,58 @@
"""
Configurazione pytest per i test del progetto upo-appAI.
"""
import pytest
import os
import sys
from pathlib import Path
# Aggiungi il path src al PYTHONPATH per tutti i test
src_path = Path(__file__).parent.parent / "src"
sys.path.insert(0, str(src_path))
# Carica le variabili d'ambiente per tutti i test
from dotenv import load_dotenv
load_dotenv()
def pytest_configure(config):
"""Configurazione pytest"""
# Aggiungi marker personalizzati
config.addinivalue_line(
"markers", "slow: marks tests as slow (deselect with '-m \"not slow\"')"
)
config.addinivalue_line(
"markers", "api: marks tests that require API access"
)
config.addinivalue_line(
"markers", "coinbase: marks tests that require Coinbase credentials"
)
config.addinivalue_line(
"markers", "cryptocompare: marks tests that require CryptoCompare credentials"
)
def pytest_collection_modifyitems(config, items):
"""Modifica automaticamente gli item di test"""
# Aggiungi marker 'api' a tutti i test che richiedono API
for item in items:
if "api" in item.name.lower() or "coinbase" in item.name.lower() or "cryptocompare" in item.name.lower():
item.add_marker(pytest.mark.api)
# Aggiungi marker 'slow' ai test che potrebbero essere lenti
if "overview" in item.name.lower() or "analysis" in item.name.lower():
item.add_marker(pytest.mark.slow)
@pytest.fixture(scope="session")
def env_vars():
"""Fixture per accedere alle variabili d'ambiente nei test"""
return {
'coinbase_configured': all([
os.getenv('COINBASE_API_KEY'),
os.getenv('COINBASE_SECRET'),
os.getenv('COINBASE_PASSPHRASE')
]),
'cryptocompare_configured': bool(os.getenv('CRYPTOCOMPARE_API_KEY')),
}

855
uv.lock generated

File diff suppressed because it is too large Load Diff