3 market api #8

Merged
Simo93-rgb merged 25 commits from 3-market-api into main 2025-10-01 15:51:25 +02:00
5 changed files with 125 additions and 16 deletions
Showing only changes of commit 8304cf9ea8 - Show all commits

View File

@@ -26,7 +26,7 @@ project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src")) sys.path.insert(0, str(project_root / "src"))
from dotenv import load_dotenv from dotenv import load_dotenv
from app.markets import ( from src.app.markets import (
CoinBaseWrapper, CoinBaseWrapper,
CryptoCompareWrapper, CryptoCompareWrapper,
BinanceWrapper, BinanceWrapper,

View File

@@ -71,17 +71,15 @@ class MarketAgent(Agent):
results = [] results = []
products: List[ProductInfo] = [] products: List[ProductInfo] = []
for sym in symbols: try:
try: products.extend(self.toolkit.get_current_prices(symbols)) # supponiamo ritorni un ProductInfo o simile
product = self.toolkit.get_current_price(sym) # supponiamo ritorni un ProductInfo o simile # Usa list comprehension per iterare symbols e products insieme
if isinstance(product, list): results.extend([
products.extend(product) f"{symbol}: ${product.price:.2f}" if hasattr(product, 'price') and product.price else f"{symbol}: N/A"
else: for symbol, product in zip(symbols, products)
products.append(product) ])
except Exception as e:
results.append(f"{sym}: {product.price if hasattr(product, 'price') else product}") results.extend(f"Errore: Impossibile recuperare i dati di mercato\n{str(e)}")
except Exception as e:
results.append(f"{sym}: errore ({e})")
# 4. Preparo output leggibile + metadati strutturati # 4. Preparo output leggibile + metadati strutturati
output_text = "📊 Dati di mercato:\n" + "\n".join(results) output_text = "📊 Dati di mercato:\n" + "\n".join(results)

View File

@@ -14,6 +14,7 @@ from functools import wraps
from typing import Any, Callable, Optional, Type, Union, List from typing import Any, Callable, Optional, Type, Union, List
from requests.exceptions import RequestException, Timeout, ConnectionError from requests.exceptions import RequestException, Timeout, ConnectionError
from binance.exceptions import BinanceAPIException, BinanceRequestException from binance.exceptions import BinanceAPIException, BinanceRequestException
from base import ProductInfo
# Configurazione logging # Configurazione logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -168,7 +169,7 @@ class ProviderFallback:
method_name: str, method_name: str,
*args, *args,
**kwargs **kwargs
) -> Any: ) -> list[ProductInfo]:
""" """
Esegue un metodo su tutti i provider fino a trovarne uno che funziona. Esegue un metodo su tutti i provider fino a trovarne uno che funziona.

View File

@@ -16,14 +16,14 @@ class MarketToolkit(Toolkit):
name="Market Toolkit", name="Market Toolkit",
tools=[ tools=[
self.get_historical_data, self.get_historical_data,
self.get_current_price, self.get_current_prices,
], ],
) )
def get_historical_data(self, symbol: str): def get_historical_data(self, symbol: str):
return self.market_api.get_historical_prices(symbol) return self.market_api.get_historical_prices(symbol)
def get_current_price(self, symbol: str): def get_current_prices(self, symbol: list):
return self.market_api.get_products(symbol) return self.market_api.get_products(symbol)
def prepare_inputs(): def prepare_inputs():

View File

@@ -0,0 +1,110 @@
import time
from typing import TypeVar, Callable, Generic, Iterable, Type
from agno.utils.log import log_warning
W = TypeVar("W")
T = TypeVar("T")
class WrapperHandler(Generic[W]):
"""
A handler for managing multiple wrappers with retry logic.
It attempts to call a function on the current wrapper, and if it fails,
it retries a specified number of times before switching to the next wrapper.
If all wrappers fail, it raises an exception.
Note: use `build_wrappers` to create an instance of this class for better error handling.
"""
def __init__(self, wrappers: list[W], try_per_wrapper: int = 3, retry_delay: int = 2):
"""
Initializes the WrapperHandler with a list of wrappers and retry settings.\n
Use `build_wrappers` to create an instance of this class for better error handling.
Args:
wrappers (list[W]): A list of wrapper instances to manage.
try_per_wrapper (int): Number of retries per wrapper before switching to the next.
retry_delay (int): Delay in seconds between retries.
"""
self.wrappers = wrappers
self.retry_per_wrapper = try_per_wrapper
self.retry_delay = retry_delay
self.index = 0
self.retry_count = 0
def try_call(self, func: Callable[[W], T]) -> T:
"""
Attempts to call the provided function on the current wrapper.
If it fails, it retries a specified number of times before switching to the next wrapper.
If all wrappers fail, it raises an exception.
Args:
func (Callable[[W], T]): A function that takes a wrapper and returns a result.
Returns:
T: The result of the function call.
Raises:
Exception: If all wrappers fail after retries.
"""
iterations = 0
while iterations < len(self.wrappers):
try:
wrapper = self.wrappers[self.index]
result = func(wrapper)
self.retry_count = 0
return result
except Exception as e:
self.retry_count += 1
if self.retry_count >= self.retry_per_wrapper:
self.index = (self.index + 1) % len(self.wrappers)
self.retry_count = 0
iterations += 1
else:
log_warning(f"{wrapper} failed {self.retry_count}/{self.retry_per_wrapper}: {e}")
time.sleep(self.retry_delay)
raise Exception(f"All wrappers failed after retries")
def try_call_all(self, func: Callable[[W], T]) -> dict[str, T]:
"""
Calls the provided function on all wrappers, collecting results.
If a wrapper fails, it logs a warning and continues with the next.
If all wrappers fail, it raises an exception.
Args:
func (Callable[[W], T]): A function that takes a wrapper and returns a result.
Returns:
list[T]: A list of results from the function calls.
Raises:
Exception: If all wrappers fail.
"""
results = {}
for wrapper in self.wrappers:
try:
result = func(wrapper)
results[wrapper.__class__] = result
except Exception as e:
log_warning(f"{wrapper} failed: {e}")
if not results:
raise Exception("All wrappers failed")
return results
@staticmethod
def build_wrappers(constructors: Iterable[Type[W]], try_per_wrapper: int = 3, retry_delay: int = 2) -> 'WrapperHandler[W]':
"""
Builds a WrapperHandler instance with the given wrapper constructors.
It attempts to initialize each wrapper and logs a warning if any cannot be initialized.
Only successfully initialized wrappers are included in the handler.
Args:
constructors (Iterable[Type[W]]): An iterable of wrapper classes to instantiate. e.g. [WrapperA, WrapperB]
try_per_wrapper (int): Number of retries per wrapper before switching to the next.
retry_delay (int): Delay in seconds between retries.
Returns:
WrapperHandler[W]: An instance of WrapperHandler with the initialized wrappers.
Raises:
Exception: If no wrappers could be initialized.
"""
result = []
for wrapper_class in constructors:
try:
wrapper = wrapper_class()
result.append(wrapper)
except Exception as e:
log_warning(f"{wrapper_class} cannot be initialized: {e}")
return WrapperHandler(result, try_per_wrapper, retry_delay)