diff --git a/.env.example b/.env.example index 06a53cb..4cfc34a 100644 --- a/.env.example +++ b/.env.example @@ -6,9 +6,6 @@ # Vedi https://docs.agno.com/examples/models per vedere tutti i modelli supportati GOOGLE_API_KEY= -# Inserire il percorso di installazione di ollama (es. /usr/share/ollama/.ollama) -# attenzione che fra Linux nativo e WSL il percorso è diverso -OLLAMA_MODELS_PATH= ############################################################################### # Configurazioni per gli agenti di mercato ############################################################################### diff --git a/src/app/markets/__init__.py b/src/app/markets/__init__.py index eefc442..aea3504 100644 --- a/src/app/markets/__init__.py +++ b/src/app/markets/__init__.py @@ -1,13 +1,12 @@ +from typing import List, Optional +from agno.tools import Toolkit +from app.utils.wrapper_handler import WrapperHandler from .base import BaseWrapper, ProductInfo, Price from .coinbase import CoinBaseWrapper from .binance import BinanceWrapper from .cryptocompare import CryptoCompareWrapper from .yfinance import YFinanceWrapper from .binance_public import PublicBinanceAgent -from app.utils.wrapper_handler import WrapperHandler -from typing import List, Optional -from agno.tools import Toolkit - __all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "PublicBinanceAgent" ] diff --git a/src/app/markets/yfinance.py b/src/app/markets/yfinance.py index f0e5d6d..82d88fe 100644 --- a/src/app/markets/yfinance.py +++ b/src/app/markets/yfinance.py @@ -8,11 +8,11 @@ def create_product_info(symbol: str, stock_data: dict) -> ProductInfo: Converte i dati di YFinanceTools in ProductInfo. """ product = ProductInfo() - + # ID univoco per yfinance product.id = f"yfinance_{symbol}" product.symbol = symbol - + # Estrai il prezzo corrente - gestisci diversi formati if 'currentPrice' in stock_data: product.price = float(stock_data['currentPrice']) @@ -27,7 +27,7 @@ def create_product_info(symbol: str, stock_data: dict) -> ProductInfo: product.price = 0.0 else: product.price = 0.0 - + # Volume 24h if 'volume' in stock_data: product.volume_24h = float(stock_data['volume']) @@ -35,13 +35,13 @@ def create_product_info(symbol: str, stock_data: dict) -> ProductInfo: product.volume_24h = float(stock_data['regularMarketVolume']) else: product.volume_24h = 0.0 - + # Status basato sulla disponibilità dei dati product.status = "trading" if product.price > 0 else "offline" - + # Valuta (default USD) product.quote_currency = stock_data.get('currency', 'USD') or 'USD' - + return product @@ -50,7 +50,7 @@ def create_price_from_history(hist_data: dict, timestamp: str) -> Price: Converte i dati storici di YFinanceTools in Price. """ price = Price() - + if timestamp in hist_data: day_data = hist_data[timestamp] price.high = float(day_data.get('High', 0.0)) @@ -59,7 +59,7 @@ def create_price_from_history(hist_data: dict, timestamp: str) -> Price: price.close = float(day_data.get('Close', 0.0)) price.volume = float(day_data.get('Volume', 0.0)) price.time = timestamp - + return price @@ -69,41 +69,41 @@ class YFinanceWrapper(BaseWrapper): Implementa l'interfaccia BaseWrapper per compatibilità con il sistema esistente. Usa YFinanceTools dalla libreria agno per coerenza con altri wrapper. """ - + def __init__(self, currency: str = "USD"): self.currency = currency # Inizializza YFinanceTools - non richiede parametri specifici self.tool = YFinanceTools() - + def _format_symbol(self, asset_id: str) -> str: """ Formatta il simbolo per yfinance. Per crypto, aggiunge '-USD' se non presente. """ asset_id = asset_id.upper() - + # Se è già nel formato corretto (es: BTC-USD), usa così if '-' in asset_id: return asset_id - + # Per crypto singole (BTC, ETH), aggiungi -USD if asset_id in ['BTC', 'ETH', 'ADA', 'SOL', 'DOT', 'LINK', 'UNI', 'AAVE']: return f"{asset_id}-USD" - + # Per azioni, usa il simbolo così com'è return asset_id - + def get_product(self, asset_id: str) -> ProductInfo: """ Recupera le informazioni di un singolo prodotto. """ symbol = self._format_symbol(asset_id) - + # Usa YFinanceTools per ottenere i dati try: # Ottieni le informazioni base dello stock stock_info = self.tool.get_company_info(symbol) - + # Se il risultato è una stringa JSON, parsala if isinstance(stock_info, str): try: @@ -118,9 +118,9 @@ class YFinanceWrapper(BaseWrapper): raise Exception("Dati non validi") else: stock_data = stock_info - + return create_product_info(symbol, stock_data) - + except Exception as e: # Fallback: prova a ottenere solo il prezzo try: @@ -140,13 +140,13 @@ class YFinanceWrapper(BaseWrapper): product.symbol = symbol product.status = "offline" return product - + def get_products(self, asset_ids: list[str]) -> list[ProductInfo]: """ Recupera le informazioni di multiple assets. """ products = [] - + for asset_id in asset_ids: try: product = self.get_product(asset_id) @@ -154,9 +154,9 @@ class YFinanceWrapper(BaseWrapper): except Exception as e: # Se un asset non è disponibile, continua con gli altri continue - + return products - + def get_all_products(self) -> list[ProductInfo]: """ Recupera tutti i prodotti disponibili. @@ -168,15 +168,15 @@ class YFinanceWrapper(BaseWrapper): 'AAPL', 'GOOGL', 'MSFT', 'TSLA', 'AMZN', 'SPY', 'QQQ', 'VTI', 'GLD', 'VIX' ] - + return self.get_products(popular_assets) - + def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]: """ Recupera i dati storici di prezzo per un asset. """ symbol = self._format_symbol(asset_id) - + try: # Determina il periodo appropriato in base al limite if limit <= 7: @@ -191,24 +191,24 @@ class YFinanceWrapper(BaseWrapper): else: period = "3mo" interval = "1d" - + # Ottieni i dati storici hist_data = self.tool.get_historical_stock_prices(symbol, period=period, interval=interval) - + if isinstance(hist_data, str): hist_data = json.loads(hist_data) - + # Il formato dei dati è {timestamp: {Open: x, High: y, Low: z, Close: w, Volume: v}} prices = [] timestamps = sorted(hist_data.keys())[-limit:] # Prendi gli ultimi 'limit' timestamp - + for timestamp in timestamps: price = create_price_from_history(hist_data, timestamp) if price.close > 0: # Solo se ci sono dati validi prices.append(price) - + return prices - + except Exception as e: # Se fallisce, restituisci lista vuota return [] \ No newline at end of file diff --git a/src/app/toolkits/__init__.py b/src/app/toolkits/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/app/toolkits/market_toolkit.py b/src/app/toolkits/market_toolkit.py deleted file mode 100644 index 7267b96..0000000 --- a/src/app/toolkits/market_toolkit.py +++ /dev/null @@ -1,29 +0,0 @@ -from agno.tools import Toolkit -from app.markets import MarketAPIsTool - - -# TODO (?) in futuro fare in modo che la LLM faccia da sé per il mercato -# Non so se può essere utile, per ora lo lascio qui -# per ora mettiamo tutto statico e poi, se abbiamo API-Key senza limiti -# possiamo fare in modo di far scegliere alla LLM quale crypto proporre -# in base alle sue proprie chiamate API -class MarketToolkit(Toolkit): - def __init__(self): - self.market_api = MarketAPIs() - - super().__init__( - name="Market Toolkit", - tools=[ - self.market_api.get_historical_prices, - self.market_api.get_product, - ], - ) - -def instructions(): - return """ - Utilizza questo strumento per ottenere dati di mercato storici e attuali per criptovalute specifiche. - Puoi richiedere i prezzi storici o il prezzo attuale di una criptovaluta specifica. - Esempio di utilizzo: - - get_historical_prices("BTC", limit=10) # ottieni gli ultimi 10 prezzi storici di Bitcoin - - get_product("ETH") - """ diff --git a/src/app/utils/aggregated_models.py b/src/app/utils/aggregated_models.py index ee9f3ef..e4a5ff4 100644 --- a/src/app/utils/aggregated_models.py +++ b/src/app/utils/aggregated_models.py @@ -9,7 +9,7 @@ class AggregationMetadata(BaseModel): sources_ignored: Set[str] = Field(default_factory=set, description="Exchange ignorati (errori)") aggregation_timestamp: str = Field(default="", description="Timestamp dell'aggregazione") confidence_score: float = Field(default=0.0, description="Score 0-1 sulla qualità dei dati") - + class Config: # Nasconde questi campi dalla serializzazione di default extra = "forbid" @@ -19,15 +19,15 @@ class AggregatedProductInfo(ProductInfo): Versione aggregata di ProductInfo che mantiene la trasparenza per l'utente finale mentre fornisce metadati di debugging opzionali. """ - + # Override dei campi con logica di aggregazione id: str = Field(description="ID aggregato basato sul simbolo standardizzato") status: str = Field(description="Status aggregato (majority vote o conservative)") - + # Campi privati per debugging (non visibili di default) _metadata: Optional[AggregationMetadata] = PrivateAttr(default=None) _source_data: Optional[Dict[str, ProductInfo]] = PrivateAttr(default=None) - + @classmethod def from_multiple_sources(cls, products: List[ProductInfo]) -> 'AggregatedProductInfo': """ @@ -36,37 +36,37 @@ class AggregatedProductInfo(ProductInfo): """ if not products: raise ValueError("Nessun prodotto da aggregare") - + # Raggruppa per symbol (la chiave vera per l'aggregazione) symbol_groups = {} for product in products: if product.symbol not in symbol_groups: symbol_groups[product.symbol] = [] symbol_groups[product.symbol].append(product) - + # Per ora gestiamo un symbol alla volta if len(symbol_groups) > 1: raise ValueError(f"Simboli multipli non supportati: {list(symbol_groups.keys())}") - + symbol_products = list(symbol_groups.values())[0] - + # Estrai tutte le fonti sources = [] for product in symbol_products: # Determina la fonte dall'ID o da altri metadati se disponibili source = cls._detect_source(product) sources.append(source) - + # Aggrega i dati aggregated_data = cls._aggregate_products(symbol_products, sources) - + # Crea l'istanza e assegna gli attributi privati instance = cls(**aggregated_data) instance._metadata = aggregated_data.get("_metadata") instance._source_data = aggregated_data.get("_source_data") - + return instance - + @staticmethod def _detect_source(product: ProductInfo) -> str: """Rileva la fonte da un ProductInfo""" @@ -81,7 +81,7 @@ class AggregatedProductInfo(ProductInfo): return "yfinance" else: return "unknown" - + @classmethod def _aggregate_products(cls, products: List[ProductInfo], sources: List[str]) -> dict: """ @@ -90,11 +90,11 @@ class AggregatedProductInfo(ProductInfo): """ import statistics from datetime import datetime - + # ID: usa il symbol come chiave standardizzata symbol = products[0].symbol aggregated_id = f"{symbol}_AGG" - + # Status: strategia "conservativa" - il più restrittivo vince # Ordine: trading_only < limit_only < auction < maintenance < offline status_priority = { @@ -105,41 +105,41 @@ class AggregatedProductInfo(ProductInfo): "offline": 5, "": 0 # Default se non specificato } - + statuses = [p.status for p in products if p.status] if statuses: # Prendi lo status con priorità più alta (più restrittivo) aggregated_status = max(statuses, key=lambda s: status_priority.get(s, 0)) else: aggregated_status = "trading" # Default ottimistico - + # Prezzo: media semplice (uso diretto del campo price come float) prices = [p.price for p in products if p.price > 0] aggregated_price = statistics.mean(prices) if prices else 0.0 - + # Volume: somma (assumendo che i volumi siano esclusivi per exchange) volumes = [p.volume_24h for p in products if p.volume_24h > 0] total_volume = sum(volumes) aggregated_volume = sum(price_i * volume_i for price_i, volume_i in zip((p.price for p in products), (volume for volume in volumes))) / total_volume aggregated_volume = round(aggregated_volume, 5) # aggregated_volume = sum(volumes) if volumes else 0.0 # NOTE old implementation - + # Valuta: prendi la prima (dovrebbero essere tutte uguali) quote_currency = next((p.quote_currency for p in products if p.quote_currency), "USD") - + # Calcola confidence score confidence = cls._calculate_confidence(products, sources) - + # Crea metadati per debugging metadata = AggregationMetadata( sources_used=set(sources), aggregation_timestamp=datetime.now().isoformat(), confidence_score=confidence ) - + # Salva dati sorgente per debugging source_data = dict(zip(sources, products)) - + return { "symbol": symbol, "price": aggregated_price, @@ -150,33 +150,33 @@ class AggregatedProductInfo(ProductInfo): "_metadata": metadata, "_source_data": source_data } - + @staticmethod def _calculate_confidence(products: List[ProductInfo], sources: List[str]) -> float: """Calcola un punteggio di confidenza 0-1""" if not products: return 0.0 - + score = 1.0 - + # Riduci score se pochi dati if len(products) < 2: score *= 0.7 - + # Riduci score se prezzi troppo diversi prices = [p.price for p in products if p.price > 0] if len(prices) > 1: price_std = (max(prices) - min(prices)) / statistics.mean(prices) if price_std > 0.05: # >5% variazione score *= 0.8 - + # Riduci score se fonti sconosciute unknown_sources = sum(1 for s in sources if s == "unknown") if unknown_sources > 0: score *= (1 - unknown_sources / len(sources)) - + return max(0.0, min(1.0, score)) - + def get_debug_info(self) -> dict: """Metodo opzionale per ottenere informazioni di debug""" return { diff --git a/src/app/utils/market_data_aggregator.py b/src/app/utils/market_data_aggregator.py index ea2d7c0..9c6206d 100644 --- a/src/app/utils/market_data_aggregator.py +++ b/src/app/utils/market_data_aggregator.py @@ -5,17 +5,17 @@ from app.utils.aggregated_models import AggregatedProductInfo class MarketDataAggregator: """ Aggregatore di dati di mercato che mantiene la trasparenza per l'utente. - + Compone MarketAPIs per fornire gli stessi metodi, ma restituisce dati aggregati da tutte le fonti disponibili. L'utente finale non vede la complessità. """ - + def __init__(self, currency: str = "USD"): # Import lazy per evitare circular import from app.markets import MarketAPIsTool self._market_apis = MarketAPIsTool(currency) self._aggregation_enabled = True - + def get_product(self, asset_id: str) -> ProductInfo: """ Override che aggrega dati da tutte le fonti disponibili. @@ -23,13 +23,13 @@ class MarketDataAggregator: """ if not self._aggregation_enabled: return self._market_apis.get_product(asset_id) - + # Raccogli dati da tutte le fonti try: raw_results = self.wrappers.try_call_all( lambda wrapper: wrapper.get_product(asset_id) ) - + # Converti in ProductInfo se necessario products = [] for wrapper_class, result in raw_results.items(): @@ -38,29 +38,29 @@ class MarketDataAggregator: elif isinstance(result, dict): # Converti dizionario in ProductInfo products.append(ProductInfo(**result)) - + if not products: raise Exception("Nessun dato disponibile") - + # Aggrega i risultati aggregated = AggregatedProductInfo.from_multiple_sources(products) - + # Restituisci come ProductInfo normale (nascondi la complessità) return ProductInfo(**aggregated.dict(exclude={"_metadata", "_source_data"})) - + except Exception as e: # Fallback: usa il comportamento normale se l'aggregazione fallisce return self._market_apis.get_product(asset_id) - + def get_products(self, asset_ids: List[str]) -> List[ProductInfo]: """ Aggrega dati per multiple asset. """ if not self._aggregation_enabled: return self._market_apis.get_products(asset_ids) - + aggregated_products = [] - + for asset_id in asset_ids: try: product = self.get_product(asset_id) @@ -68,36 +68,36 @@ class MarketDataAggregator: except Exception as e: # Salta asset che non riescono ad aggregare continue - + return aggregated_products - + def get_all_products(self) -> List[ProductInfo]: """ Aggrega tutti i prodotti disponibili. """ if not self._aggregation_enabled: return self._market_apis.get_all_products() - + # Raccogli tutti i prodotti da tutte le fonti try: all_products_by_source = self.wrappers.try_call_all( lambda wrapper: wrapper.get_all_products() ) - + # Raggruppa per symbol per aggregare symbol_groups = {} for wrapper_class, products in all_products_by_source.items(): if not isinstance(products, list): continue - + for product in products: if isinstance(product, dict): product = ProductInfo(**product) - + if product.symbol not in symbol_groups: symbol_groups[product.symbol] = [] symbol_groups[product.symbol].append(product) - + # Aggrega ogni gruppo aggregated_products = [] for symbol, products in symbol_groups.items(): @@ -111,13 +111,13 @@ class MarketDataAggregator: # Se l'aggregazione fallisce, usa il primo disponibile if products: aggregated_products.append(products[0]) - + return aggregated_products - + except Exception as e: # Fallback: usa il comportamento normale return self._market_apis.get_all_products() - + def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> List[Price]: """ Per i dati storici, usa una strategia diversa: @@ -125,7 +125,7 @@ class MarketDataAggregator: """ if not self._aggregation_enabled: return self._market_apis.get_historical_prices(asset_id, limit) - + # Per dati storici, usa il primo wrapper che funziona # (l'aggregazione di dati storici è più complessa) try: @@ -135,21 +135,21 @@ class MarketDataAggregator: except Exception as e: # Fallback: usa il comportamento normale return self._market_apis.get_historical_prices(asset_id, limit) - + def enable_aggregation(self, enabled: bool = True): """Abilita o disabilita l'aggregazione""" self._aggregation_enabled = enabled - + def is_aggregation_enabled(self) -> bool: """Controlla se l'aggregazione è abilitata""" return self._aggregation_enabled - + # Metodi proxy per completare l'interfaccia BaseWrapper @property def wrappers(self): """Accesso al wrapper handler per compatibilità""" return self._market_apis.wrappers - + def get_aggregated_product_with_debug(self, asset_id: str) -> Dict[str, Any]: """ Metodo speciale per debugging: restituisce dati aggregati con metadati. @@ -159,24 +159,24 @@ class MarketDataAggregator: raw_results = self.wrappers.try_call_all( lambda wrapper: wrapper.get_product(asset_id) ) - + products = [] for wrapper_class, result in raw_results.items(): if isinstance(result, ProductInfo): products.append(result) elif isinstance(result, dict): products.append(ProductInfo(**result)) - + if not products: raise Exception("Nessun dato disponibile") - + aggregated = AggregatedProductInfo.from_multiple_sources(products) - + return { "product": aggregated.dict(exclude={"_metadata", "_source_data"}), "debug": aggregated.get_debug_info() } - + except Exception as e: return { "error": str(e), diff --git a/src/app/utils/wrapper_handler.py b/src/app/utils/wrapper_handler.py index 4f22a8e..19181ea 100644 --- a/src/app/utils/wrapper_handler.py +++ b/src/app/utils/wrapper_handler.py @@ -94,6 +94,7 @@ class WrapperHandler(Generic[W]): def __check(wrappers: list[W]) -> bool: return all(w.__class__ is type for w in wrappers) + @staticmethod def __concise_error(e: Exception) -> str: last_frame = traceback.extract_tb(e.__traceback__)[-1] return f"{e} [\"{last_frame.filename}\", line {last_frame.lineno}]"