feat(markets): add yfinance integration for stock and crypto data

- Add yfinance wrapper with support for stocks and cryptocurrencies
- Update aggregated models to recognize yfinance products
- Include yfinance in market APIs tool and demo script
- Add comprehensive tests for yfinance functionality
- Update dependencies to include yfinance and required packages
This commit is contained in:
Simone Garau
2025-10-01 15:46:46 +02:00
parent 31057007fb
commit 42690acfbb
11 changed files with 418 additions and 78 deletions

View File

@@ -2,13 +2,14 @@ from .base import BaseWrapper, ProductInfo, Price
from .coinbase import CoinBaseWrapper
from .binance import BinanceWrapper
from .cryptocompare import CryptoCompareWrapper
from .yfinance import YFinanceWrapper
from .binance_public import PublicBinanceAgent
from app.utils.wrapper_handler import WrapperHandler
from typing import List, Optional
from agno.tools import Toolkit
__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "PublicBinanceAgent" ]
__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "PublicBinanceAgent" ]
class MarketAPIsTool(BaseWrapper, Toolkit):
@@ -24,7 +25,7 @@ class MarketAPIsTool(BaseWrapper, Toolkit):
def __init__(self, currency: str = "USD", enable_aggregation: bool = False):
self.currency = currency
wrappers = [ BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper ]
wrappers = [ BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper, YFinanceWrapper ]
self.wrappers: WrapperHandler[BaseWrapper] = WrapperHandler.build_wrappers(wrappers)
# Inizializza l'aggregatore solo se richiesto (lazy initialization)

View File

@@ -23,10 +23,10 @@ class BinanceWrapper(BaseWrapper):
def __init__(self, currency: str = "USDT"):
api_key = os.getenv("BINANCE_API_KEY")
assert api_key is not None, "API key is required"
assert api_key is None, "API key is required"
api_secret = os.getenv("BINANCE_API_SECRET")
assert api_secret is not None, "API secret is required"
assert api_secret is None, "API secret is required"
self.currency = currency
self.client = Client(api_key=api_key, api_secret=api_secret)

214
src/app/markets/yfinance.py Normal file
View File

@@ -0,0 +1,214 @@
import json
from agno.tools.yfinance import YFinanceTools
from .base import BaseWrapper, ProductInfo, Price
def create_product_info(symbol: str, stock_data: dict) -> ProductInfo:
"""
Converte i dati di YFinanceTools in ProductInfo.
"""
product = ProductInfo()
# ID univoco per yfinance
product.id = f"yfinance_{symbol}"
product.symbol = symbol
# Estrai il prezzo corrente - gestisci diversi formati
if 'currentPrice' in stock_data:
product.price = float(stock_data['currentPrice'])
elif 'regularMarketPrice' in stock_data:
product.price = float(stock_data['regularMarketPrice'])
elif 'Current Stock Price' in stock_data:
# Formato: "254.63 USD" - estrai solo il numero
price_str = stock_data['Current Stock Price'].split()[0]
try:
product.price = float(price_str)
except ValueError:
product.price = 0.0
else:
product.price = 0.0
# Volume 24h
if 'volume' in stock_data:
product.volume_24h = float(stock_data['volume'])
elif 'regularMarketVolume' in stock_data:
product.volume_24h = float(stock_data['regularMarketVolume'])
else:
product.volume_24h = 0.0
# Status basato sulla disponibilità dei dati
product.status = "trading" if product.price > 0 else "offline"
# Valuta (default USD)
product.quote_currency = stock_data.get('currency', 'USD') or 'USD'
return product
def create_price_from_history(hist_data: dict, timestamp: str) -> Price:
"""
Converte i dati storici di YFinanceTools in Price.
"""
price = Price()
if timestamp in hist_data:
day_data = hist_data[timestamp]
price.high = float(day_data.get('High', 0.0))
price.low = float(day_data.get('Low', 0.0))
price.open = float(day_data.get('Open', 0.0))
price.close = float(day_data.get('Close', 0.0))
price.volume = float(day_data.get('Volume', 0.0))
price.time = timestamp
return price
class YFinanceWrapper(BaseWrapper):
"""
Wrapper per YFinanceTools che fornisce dati di mercato per azioni, ETF e criptovalute.
Implementa l'interfaccia BaseWrapper per compatibilità con il sistema esistente.
Usa YFinanceTools dalla libreria agno per coerenza con altri wrapper.
"""
def __init__(self, currency: str = "USD"):
self.currency = currency
# Inizializza YFinanceTools - non richiede parametri specifici
self.tool = YFinanceTools()
def _format_symbol(self, asset_id: str) -> str:
"""
Formatta il simbolo per yfinance.
Per crypto, aggiunge '-USD' se non presente.
"""
asset_id = asset_id.upper()
# Se è già nel formato corretto (es: BTC-USD), usa così
if '-' in asset_id:
return asset_id
# Per crypto singole (BTC, ETH), aggiungi -USD
if asset_id in ['BTC', 'ETH', 'ADA', 'SOL', 'DOT', 'LINK', 'UNI', 'AAVE']:
return f"{asset_id}-USD"
# Per azioni, usa il simbolo così com'è
return asset_id
def get_product(self, asset_id: str) -> ProductInfo:
"""
Recupera le informazioni di un singolo prodotto.
"""
symbol = self._format_symbol(asset_id)
# Usa YFinanceTools per ottenere i dati
try:
# Ottieni le informazioni base dello stock
stock_info = self.tool.get_company_info(symbol)
# Se il risultato è una stringa JSON, parsala
if isinstance(stock_info, str):
try:
stock_data = json.loads(stock_info)
except json.JSONDecodeError:
# Se non è JSON valido, prova a ottenere solo il prezzo
price_data_str = self.tool.get_current_stock_price(symbol)
if price_data_str and price_data_str.replace('.', '').replace('-', '').isdigit():
price = float(price_data_str)
stock_data = {'currentPrice': price, 'currency': 'USD'}
else:
raise Exception("Dati non validi")
else:
stock_data = stock_info
return create_product_info(symbol, stock_data)
except Exception as e:
# Fallback: prova a ottenere solo il prezzo
try:
price_data_str = self.tool.get_current_stock_price(symbol)
if price_data_str and price_data_str.replace('.', '').replace('-', '').isdigit():
price = float(price_data_str)
minimal_data = {
'currentPrice': price,
'currency': 'USD'
}
return create_product_info(symbol, minimal_data)
else:
raise Exception("Prezzo non disponibile")
except Exception:
# Se tutto fallisce, restituisci un prodotto vuoto
product = ProductInfo()
product.symbol = symbol
product.status = "offline"
return product
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Recupera le informazioni di multiple assets.
"""
products = []
for asset_id in asset_ids:
try:
product = self.get_product(asset_id)
products.append(product)
except Exception as e:
# Se un asset non è disponibile, continua con gli altri
continue
return products
def get_all_products(self) -> list[ProductInfo]:
"""
Recupera tutti i prodotti disponibili.
Restituisce una lista predefinita di asset popolari.
"""
# Lista di asset popolari (azioni, ETF, crypto)
popular_assets = [
'BTC', 'ETH', 'ADA', 'SOL', 'DOT',
'AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN',
'SPY', 'QQQ', 'VTI', 'GLD', 'VIX'
]
return self.get_products(popular_assets)
def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
"""
Recupera i dati storici di prezzo per un asset.
"""
symbol = self._format_symbol(asset_id)
try:
# Determina il periodo appropriato in base al limite
if limit <= 7:
period = "1d"
interval = "15m"
elif limit <= 30:
period = "5d"
interval = "1h"
elif limit <= 90:
period = "1mo"
interval = "1d"
else:
period = "3mo"
interval = "1d"
# Ottieni i dati storici
hist_data = self.tool.get_historical_stock_prices(symbol, period=period, interval=interval)
if isinstance(hist_data, str):
hist_data = json.loads(hist_data)
# Il formato dei dati è {timestamp: {Open: x, High: y, Low: z, Close: w, Volume: v}}
prices = []
timestamps = sorted(hist_data.keys())[-limit:] # Prendi gli ultimi 'limit' timestamp
for timestamp in timestamps:
price = create_price_from_history(hist_data, timestamp)
if price.close > 0: # Solo se ci sono dati validi
prices.append(price)
return prices
except Exception as e:
# Se fallisce, restituisci lista vuota
return []

View File

@@ -77,6 +77,8 @@ class AggregatedProductInfo(ProductInfo):
return "binance"
elif "crypto" in product.id.lower() or "cc" in product.id.lower():
return "cryptocompare"
elif "yfinance" in product.id.lower() or "yf" in product.id.lower():
return "yfinance"
else:
return "unknown"

View File

@@ -1,71 +0,0 @@
import statistics
from typing import Dict, Any
class MarketAggregator:
"""
Aggrega dati di mercato da più provider e genera segnali e analisi avanzate.
"""
@staticmethod
def aggregate(symbol: str, sources: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
prices = []
volumes = []
price_map = {}
for provider, data in sources.items():
price = data.get('price')
if price is not None:
prices.append(price)
price_map[provider] = price
volume = data.get('volume')
if volume is not None:
volumes.append(MarketAggregator._parse_volume(volume))
# Aggregated price (mean)
agg_price = statistics.mean(prices) if prices else None
# Spread analysis
spread = (max(prices) - min(prices)) / agg_price if prices and agg_price else 0
# Confidence
stddev = statistics.stdev(prices) if len(prices) > 1 else 0
confidence = max(0.5, 1 - (stddev / agg_price)) if agg_price else 0
if spread < 0.005:
confidence += 0.1
if len(prices) >= 3:
confidence += 0.05
confidence = min(confidence, 1.0)
# Volume trend
total_volume = sum(volumes) if volumes else None
# Price divergence
max_deviation = (max(prices) - min(prices)) / agg_price if prices and agg_price else 0
# Signals
signals = {
"spread_analysis": f"Low spread ({spread:.2%}) indicates healthy liquidity" if spread < 0.01 else f"Spread {spread:.2%} - check liquidity",
"volume_trend": f"Combined volume: {total_volume:.2f}" if total_volume else "Volume data not available",
"price_divergence": f"Max deviation: {max_deviation:.2%} - {'Normal range' if max_deviation < 0.01 else 'High divergence'}"
}
return {
"aggregated_data": {
f"{symbol}_USD": {
"price": round(agg_price, 2) if agg_price else None,
"confidence": round(confidence, 2),
"sources_count": len(prices)
}
},
"individual_sources": price_map,
"market_signals": signals
}
@staticmethod
def _parse_volume(volume: Any) -> float:
# Supporta stringhe tipo "1.2M" o numeri
if isinstance(volume, (int, float)):
return float(volume)
if isinstance(volume, str):
v = volume.upper().replace(' ', '')
if v.endswith('M'):
return float(v[:-1]) * 1_000_000
if v.endswith('K'):
return float(v[:-1]) * 1_000
try:
return float(v)
except Exception as e:
print(f"Errore nel parsing del volume: {e}")
return 0.0
return 0.0

View File

@@ -12,8 +12,8 @@ class MarketDataAggregator:
def __init__(self, currency: str = "USD"):
# Import lazy per evitare circular import
from app.markets import MarketAPIs
self._market_apis = MarketAPIs(currency)
from app.markets import MarketAPIsTool
self._market_apis = MarketAPIsTool(currency)
self._aggregation_enabled = True
def get_product(self, asset_id: str) -> ProductInfo: