Refactor Predictor and market data handling

- Added Predictor class with input preparation and instructions for financial strategy generation.
- Removed PredictorAgent class and integrated its functionality into the new Predictor module.
- Created a base market API wrapper and specific implementations for Coinbase and CryptoCompare.
- Introduced PublicBinanceAgent for fetching public prices from Binance.
- Refactored ToolAgent to utilize the new Predictor and market API wrappers for improved data handling and predictions.
- Updated models to streamline the selection of available LLM providers.
- Removed deprecated signer classes for Coinbase and CryptoCompare.
This commit is contained in:
2025-09-26 03:43:31 +02:00
parent 48502fc6c7
commit 148bff7cfd
18 changed files with 362 additions and 2324 deletions

39
src/app/agents/market.py Normal file
View File

@@ -0,0 +1,39 @@
from agno.tools import Toolkit
from app.markets import get_first_available_market_api
# TODO (?) in futuro fare in modo che la LLM faccia da sé per il mercato
# Non so se può essere utile, per ora lo lascio qui
# per ora mettiamo tutto statico e poi, se abbiamo API-Key senza limiti
# possiamo fare in modo di far scegliere alla LLM quale crypto proporre
# in base alle sue proprie chiamate API
class MarketToolkit(Toolkit):
def __init__(self):
self.market_agent = get_first_available_market_api("USD") # change currency if needed
super().__init__(
name="Market Toolkit",
tools=[
self.get_historical_data,
self.get_current_price,
],
)
def get_historical_data(self, symbol: str):
return self.market_agent.get_historical_prices(symbol)
def get_current_price(self, symbol: str):
return self.market_agent.get_products(symbol)
def prepare_inputs():
pass
def instructions():
return """
Utilizza questo strumento per ottenere dati di mercato storici e attuali per criptovalute specifiche.
Puoi richiedere i prezzi storici o il prezzo attuale di una criptovaluta specifica.
Esempio di utilizzo:
- get_historical_data("BTC")
- get_current_price("ETH")
"""

View File

@@ -1,266 +0,0 @@
from typing import Dict, List, Optional, Any
import requests
import logging
import os
from app.signers.market_signers.coinbase_signer import CoinbaseCDPSigner
from app.signers.market_signers.cryptocompare_signer import CryptoCompareSigner
logger = logging.getLogger(__name__)
class MarketAgent:
"""
Market Agent unificato che supporta multiple fonti di dati:
- Coinbase Advanced Trade API (dati di mercato reali)
- CryptoCompare (market data)
Auto-configura i provider basandosi sulle variabili d'ambiente disponibili.
"""
def __init__(self):
self.providers = {}
self._setup_providers()
if not self.providers:
logger.warning("No market data providers configured. Check your .env file.")
def _setup_providers(self):
"""Configura automaticamente i provider disponibili"""
# Setup Coinbase Advanced Trade API (nuovo formato)
cdp_api_key_name = os.getenv('CDP_API_KEY_NAME')
cdp_api_private_key = os.getenv('CDP_API_PRIVATE_KEY')
if cdp_api_key_name and cdp_api_private_key:
try:
signer = CoinbaseCDPSigner(cdp_api_key_name, cdp_api_private_key)
self.providers['coinbase'] = {
'type': 'coinbase_advanced_trade',
'signer': signer,
'capabilities': ['assets', 'market_data', 'trading', 'real_time_prices']
}
logger.info("✅ Coinbase Advanced Trade API provider configured")
except Exception as e:
logger.error(f"Failed to setup Coinbase Advanced Trade API provider: {e}")
# Setup CryptoCompare se la API key è disponibile
cryptocompare_key = os.getenv('CRYPTOCOMPARE_API_KEY')
if cryptocompare_key:
try:
auth_method = os.getenv('CRYPTOCOMPARE_AUTH_METHOD', 'query')
signer = CryptoCompareSigner(cryptocompare_key, auth_method)
self.providers['cryptocompare'] = {
'type': 'cryptocompare',
'signer': signer,
'base_url': 'https://min-api.cryptocompare.com',
'capabilities': ['prices', 'historical', 'top_coins']
}
logger.info("✅ CryptoCompare provider configured")
except Exception as e:
logger.error(f"Failed to setup CryptoCompare provider: {e}")
def get_available_providers(self) -> List[str]:
"""Restituisce la lista dei provider configurati"""
return list(self.providers.keys())
def get_provider_capabilities(self, provider: str) -> List[str]:
"""Restituisce le capacità di un provider specifico"""
if provider in self.providers:
return self.providers[provider]['capabilities']
return []
# === COINBASE CDP METHODS ===
def get_coinbase_asset_info(self, symbol: str) -> Dict:
"""Ottiene informazioni su un asset da Coinbase CDP"""
if 'coinbase' not in self.providers:
raise ValueError("Coinbase provider not configured")
signer = self.providers['coinbase']['signer']
return signer.get_asset_info(symbol)
def get_coinbase_multiple_assets(self, symbols: List[str]) -> Dict:
"""Ottiene informazioni su multipli asset da Coinbase CDP"""
if 'coinbase' not in self.providers:
raise ValueError("Coinbase provider not configured")
signer = self.providers['coinbase']['signer']
return signer.get_multiple_assets(symbols)
def get_asset_price(self, symbol: str, provider: str = None) -> Optional[float]:
"""
Ottiene il prezzo di un asset usando il provider specificato o il primo disponibile
"""
if provider == 'coinbase' and 'coinbase' in self.providers:
try:
asset_info = self.get_coinbase_asset_info(symbol)
return float(asset_info.get('price', 0))
except Exception as e:
logger.error(f"Error getting {symbol} price from Coinbase: {e}")
return None
elif provider == 'cryptocompare' and 'cryptocompare' in self.providers:
try:
return self.get_single_crypto_price(symbol)
except Exception as e:
logger.error(f"Error getting {symbol} price from CryptoCompare: {e}")
return None
# Auto-select provider
if 'cryptocompare' in self.providers:
try:
return self.get_single_crypto_price(symbol)
except Exception:
pass
if 'coinbase' in self.providers:
try:
asset_info = self.get_coinbase_asset_info(symbol)
return float(asset_info.get('price', 0))
except Exception:
pass
return None
# === CRYPTOCOMPARE METHODS ===
def _cryptocompare_request(self, endpoint: str, params: Dict = None) -> Dict:
"""Esegue una richiesta CryptoCompare autenticata"""
if 'cryptocompare' not in self.providers:
raise ValueError("CryptoCompare provider not configured")
provider = self.providers['cryptocompare']
request_data = provider['signer'].prepare_request(
provider['base_url'], endpoint, params
)
response = requests.get(
request_data['url'],
headers=request_data['headers'],
timeout=10
)
response.raise_for_status()
return response.json()
def get_crypto_prices(self, from_symbols: List[str], to_symbols: List[str] = None) -> Dict:
"""Ottiene prezzi da CryptoCompare"""
if to_symbols is None:
to_symbols = ["USD", "EUR"]
params = {
"fsyms": ",".join(from_symbols),
"tsyms": ",".join(to_symbols)
}
return self._cryptocompare_request("/data/pricemulti", params)
def get_single_crypto_price(self, from_symbol: str, to_symbol: str = "USD") -> float:
"""Ottiene il prezzo di una singola crypto da CryptoCompare"""
params = {
"fsym": from_symbol,
"tsyms": to_symbol
}
data = self._cryptocompare_request("/data/price", params)
return data.get(to_symbol, 0.0)
def get_top_cryptocurrencies(self, limit: int = 10, to_symbol: str = "USD") -> Dict:
"""Ottiene le top crypto per market cap da CryptoCompare"""
params = {
"limit": str(limit),
"tsym": to_symbol
}
return self._cryptocompare_request("/data/top/mktcapfull", params)
# === UNIFIED INTERFACE ===
def get_market_overview(self, symbols: List[str] = None) -> Dict:
"""
Ottiene una panoramica del mercato usando il miglior provider disponibile
"""
if symbols is None:
symbols = ["BTC", "ETH", "ADA"]
result = {
"timestamp": None,
"data": {},
"source": None,
"providers_used": []
}
# Prova CryptoCompare per prezzi multipli (più completo)
if 'cryptocompare' in self.providers:
try:
prices = self.get_crypto_prices(symbols, ["USD", "EUR"])
result["data"] = prices
result["source"] = "cryptocompare"
result["providers_used"].append("cryptocompare")
logger.info("Market overview retrieved from CryptoCompare")
except Exception as e:
logger.warning(f"CryptoCompare failed, trying fallback: {e}")
# Fallback a Coinbase Advanced Trade se CryptoCompare fallisce
if not result["data"] and 'coinbase' in self.providers:
try:
# Usa il nuovo metodo Advanced Trade per ottenere multipli asset
coinbase_data = self.get_coinbase_multiple_assets(symbols)
if coinbase_data:
# Trasforma i dati Advanced Trade nel formato standard
formatted_data = {}
for symbol in symbols:
if symbol in coinbase_data:
formatted_data[symbol] = {
"USD": coinbase_data[symbol].get("price", 0)
}
result["data"] = formatted_data
result["source"] = "coinbase_advanced_trade"
result["providers_used"].append("coinbase")
logger.info("Market overview retrieved from Coinbase Advanced Trade API")
except Exception as e:
logger.error(f"Coinbase Advanced Trade fallback failed: {e}")
return result
def analyze(self, query: str) -> str:
"""
Analizza il mercato usando tutti i provider disponibili
"""
if not self.providers:
return "⚠️ Nessun provider di dati di mercato configurato. Controlla il file .env."
try:
# Ottieni panoramica del mercato
overview = self.get_market_overview(["BTC", "ETH", "ADA", "DOT"])
if not overview["data"]:
return "⚠️ Impossibile recuperare dati di mercato da nessun provider."
# Formatta i risultati
result_lines = [
f"📊 **Market Analysis** (via {overview['source'].upper()})\n"
]
for crypto, prices in overview["data"].items():
if isinstance(prices, dict):
usd_price = prices.get("USD", "N/A")
eur_price = prices.get("EUR", "N/A")
if eur_price != "N/A":
result_lines.append(f"**{crypto}**: ${usd_price} (€{eur_price})")
else:
result_lines.append(f"**{crypto}**: ${usd_price}")
# Aggiungi info sui provider
providers_info = f"\n🔧 **Available providers**: {', '.join(self.get_available_providers())}"
result_lines.append(providers_info)
return "\n".join(result_lines)
except Exception as e:
logger.error(f"Market analysis failed: {e}")
return f"⚠️ Errore nell'analisi del mercato: {e}"

View File

@@ -0,0 +1,67 @@
import json
from enum import Enum
from app.markets.base import ProductInfo
class PredictorStyle(Enum):
CONSERVATIVE = "Conservativo"
AGGRESSIVE = "Aggressivo"
# TODO (?) Change sentiment to a more structured format or merge it with data analysis (change then also the prompt)
def prepare_inputs(data: list[ProductInfo], style: PredictorStyle, sentiment: str) -> str:
return json.dumps({
"data": [(product.symbol, f"{product.price:.2f}") for product in data],
"style": style.value,
"sentiment": sentiment
})
def instructions() -> str:
return """
Sei un **Consulente Finanziario Algoritmico (CFA) Specializzato in Criptovalute**. Il tuo compito è agire come un sistema esperto di gestione del rischio e allocazione di portafoglio.
**Istruzione Principale:** Analizza l'Input fornito in formato JSON. La tua strategia deve essere **logica, misurabile e basata esclusivamente sui dati e sullo stile di rischio/rendimento richiesto**.
## Input Dati (Formato JSON)
Ti sarà passato un singolo blocco JSON contenente i seguenti campi obbligatori:
1. **"data":** *Array di tuple (stringa)*. Rappresenta i dati di mercato in tempo reale o recenti. Ogni tupla è `[Nome_Asset, Prezzo_Corrente]`. **Esempio:** `[["BTC", "60000.00"], ["ETH", "3500.00"]]`.
2. **"style":** *Stringa ENUM (solo "conservativo" o "aggressivo")*. Definisce l'approccio alla volatilità e all'allocazione.
3. **"sentiment":** *Stringa descrittiva*. Riassume il sentiment di mercato estratto da fonti sociali e notizie. **Esempio:** `"Sentiment estremamente positivo, alta FOMO per le altcoin."`.
## Regole di Logica dello Stile di Investimento
- **Stile "Aggressivo":**
- **Obiettivo:** Massimizzazione del rendimento, accettando Volatilità Massima.
- **Allocazione:** Maggiore focus su **Asset a Media/Bassa Capitalizzazione (Altcoin)** o su criptovalute che mostrano un'elevata Momentum di crescita, anche se il rischio di ribasso è superiore. L'allocazione su BTC/ETH deve rimanere una base (ancoraggio) ma non dominare il portafoglio.
- **Correlazione Sentiment:** Sfrutta il sentiment positivo per allocazioni ad alto beta (più reattive ai cambiamenti di mercato).
- **Stile "Conservativo":**
- **Obiettivo:** Preservazione del capitale, minimizzazione della Volatilità.
- **Allocazione:** Maggioranza del capitale allocata in **Asset a Larga Capitalizzazione (Blue Chip: BTC e/o ETH)**. Eventuali allocazioni su Altcoin devono essere minime e su progetti con utilità comprovata e rischio regolatorio basso.
- **Correlazione Sentiment:** Utilizza il sentiment positivo come conferma per un'esposizione maggiore, ma ignora segnali eccessivi di "FOMO" (Fear Of Missing Out) per evitare asset speculativi.
## Requisiti di Formato dell'Output
L'output deve essere formattato in modo rigoroso, composto da **due sezioni distinte**: la Strategia e il Dettaglio del Portafoglio.
### 1. Strategia Sintetica
Fornisci una **descrizione operativa** della strategia. Deve essere:
- Estremamente concisa.
- Contenuta in un **massimo di 5 frasi totali**.
### 2. Dettaglio del Portafoglio
Presenta l'allocazione del portafoglio come una **lista puntata**.
- La somma delle percentuali deve essere **esattamente 100%**.
- Per **ogni Asset allocato**, devi fornire:
- **Nome dell'Asset** (es. BTC, ETH, SOL, ecc.)
- **Percentuale di Allocazione** (X%)
- **Motivazione Tecnica/Sintetica** (Massimo 1 frase chiara) che giustifichi l'allocazione in base ai *Dati di Mercato*, allo *Style* e al *Sentiment*.
**Formato Esempio di Output (da seguire fedelmente):**
[Strategia sintetico-operativa in massimo 5 frasi...]
- **Asset_A:** X% (Motivazione: [Massimo una frase che collega dati, stile e allocazione])
- **Asset_B:** Y% (Motivazione: [Massimo una frase che collega dati, stile e allocazione])
- **Asset_C:** Z% (Motivazione: [Massimo una frase che collega dati, stile e allocazione])
...
"""

View File

@@ -1,169 +0,0 @@
import json
import os
from typing import Any
import anthropic
import requests
from google import genai
from openai import OpenAI
class PredictorAgent:
def __init__(self):
# Ollama via HTTP locale
self.providers = {
"ollama": {"type": "ollama", "host": os.getenv("OLLAMA_HOST", "http://localhost:11434"), "model": "gpt-oss:latest"}
}
# OpenAI
openai_key = os.getenv("OPENAI_API_KEY")
if openai_key:
client = OpenAI(api_key=openai_key)
self.providers["openai"] = {
"type": "openai",
"client": client,
"model": "gpt-4o-mini"
}
# Anthropic
anthropic_key = os.getenv("ANTHROPIC_API_KEY")
if anthropic_key:
client = anthropic.Anthropic(api_key=anthropic_key)
self.providers["anthropic"] = {
"type": "anthropic",
"client": client,
"model": "claude-3-haiku-20240307"
}
# Google Gemini
google_key = os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY")
if google_key:
client = genai.Client(api_key=google_key)
self.providers["google"] = {"type": "google", "client": client, "model": "gemini-1.5-flash"}
# DeepSeek
deepseek_key = os.getenv("DEEPSEEK_API_KEY")
if deepseek_key:
self.providers["deepseek"] = {"type": "deepseek", "api_key": deepseek_key, "model": "deepseek-chat"}
print("✅ Providers attivi:", list(self.providers.keys()))
def predict(self, data, sentiment, style="conservative", provider="mock"):
provider = provider.lower()
if provider == "mock" or provider not in self.providers:
return self._predict_mock(style)
prompt = f"""
Sei un consulente finanziario crypto.
Dati di mercato: {data}
Sentiment estratto: {sentiment}
Stile richiesto: {style}
Fornisci una strategia di investimento chiara e breve (max 5 frasi),
con percentuali di portafoglio e motivazioni sintetiche."""
cfg: Any = self.providers[provider]
try:
if cfg["type"] == "ollama":
return self._predict_ollama_http(prompt, cfg["host"], cfg["model"])
elif cfg["type"] == "openai":
return self._predict_openai(prompt, cfg["client"], cfg["model"])
elif cfg["type"] == "anthropic":
return self._predict_anthropic(prompt, cfg["client"], cfg["model"])
elif cfg["type"] == "google":
return self._predict_google(prompt, cfg["client"], cfg["model"])
elif cfg["type"] == "deepseek":
return self._predict_deepseek(prompt, cfg["api_key"], cfg["model"])
return None
except Exception as e:
return f"⚠️ Provider {provider} non riconosciuto: {e}"
@staticmethod
def _predict_ollama_http(prompt, host, model):
url = host.rstrip("/") + "/api/generate"
payload = {"model": model, "prompt": prompt, "max_tokens": 300}
r = requests.post(url, json=payload, timeout=60)
r.raise_for_status()
try:
data = r.json()
if isinstance(data, dict):
for key in ("text", "generated", "content"):
if key in data:
return str(data[key])
if "choices" in data and isinstance(data["choices"], list) and data["choices"]:
c = data["choices"][0]
if "text" in c:
return c["text"]
return json.dumps(data)
except ValueError:
return r.text
@staticmethod
def _predict_openai(prompt, client, model):
resp = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "Sei un consulente finanziario crypto."},
{"role": "user", "content": prompt}
],
max_tokens=300,
temperature=0.7
)
return resp.choices[0].message.content.strip()
@staticmethod
def _predict_anthropic(prompt, client, model):
response = client.completions.create(
model=model,
prompt=prompt,
max_tokens=300,
temperature=0.7
)
return response.completion.strip()
@staticmethod
def _predict_google(prompt, client, model):
response = client.models.generate_content(
model=model,
contents=prompt,
config={
"temperature": 0.7, # Controlla la creatività (0.0-1.0)
"max_output_tokens": 300, # Numero massimo di token nella risposta
"top_p": 0.9, # Nucleus sampling (opzionale)
"top_k": 40, # Top-k sampling (opzionale)
"candidate_count": 1, # Numero di risposte candidate (di solito 1)
"stop_sequences": [] # Sequenze che fermano la generazione (opzionale)
}
)
# Gestisce il caso in cui response.text sia None
result = getattr(response, 'text', None)
if result is None:
return "⚠️ Google API ha restituito una risposta vuota"
return result.strip()
@staticmethod
def _predict_deepseek(prompt, api_key, model):
url = "https://api.deepseek.ai/v1/chat/completions"
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 300,
"temperature": 0.7
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"].strip()
@staticmethod
def _predict_mock(style):
if style.lower() in ("aggressive", "aggr"):
return (
"🚀 Strategia aggressiva (mock): "
"30% BTC, 20% ETH, 50% altcoins emergenti. "
"Motivo: alta volatilità + potenziale upside."
)
return (
"🛡️ Strategia conservativa (mock): "
"60% BTC, 30% ETH, 10% stablecoins. "
"Motivo: protezione da volatilità + focus su asset solidi."
)