diff --git a/demos/market_providers_api_demo.py b/demos/market_providers_api_demo.py index 39055bb..16817d5 100644 --- a/demos/market_providers_api_demo.py +++ b/demos/market_providers_api_demo.py @@ -26,7 +26,7 @@ project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root / "src")) from dotenv import load_dotenv -from app.markets import ( +from src.app.markets import ( CoinBaseWrapper, CryptoCompareWrapper, BinanceWrapper, diff --git a/src/app/agents/market_agent.py b/src/app/agents/market_agent.py index 94d01d5..b2c4cfd 100644 --- a/src/app/agents/market_agent.py +++ b/src/app/agents/market_agent.py @@ -71,18 +71,16 @@ class MarketAgent(Agent): results = [] products: List[ProductInfo] = [] - for sym in symbols: - try: - product = self.toolkit.get_current_price(sym) # supponiamo ritorni un ProductInfo o simile - if isinstance(product, list): - products.extend(product) - else: - products.append(product) - - results.append(f"{sym}: {product.price if hasattr(product, 'price') else product}") - except Exception as e: - results.append(f"{sym}: errore ({e})") - + try: + products.extend(self.toolkit.get_current_prices(symbols)) # supponiamo ritorni un ProductInfo o simile + # Usa list comprehension per iterare symbols e products insieme + results.extend([ + f"{symbol}: ${product.price:.2f}" if hasattr(product, 'price') and product.price else f"{symbol}: N/A" + for symbol, product in zip(symbols, products) + ]) + except Exception as e: + results.extend(f"Errore: Impossibile recuperare i dati di mercato\n{str(e)}") + # 4. Preparo output leggibile + metadati strutturati output_text = "📊 Dati di mercato:\n" + "\n".join(results) diff --git a/src/app/markets/error_handler.py b/src/app/markets/error_handler.py index 38aa47f..22b6aeb 100644 --- a/src/app/markets/error_handler.py +++ b/src/app/markets/error_handler.py @@ -14,6 +14,7 @@ from functools import wraps from typing import Any, Callable, Optional, Type, Union, List from requests.exceptions import RequestException, Timeout, ConnectionError from binance.exceptions import BinanceAPIException, BinanceRequestException +from base import ProductInfo # Configurazione logging logger = logging.getLogger(__name__) @@ -168,7 +169,7 @@ class ProviderFallback: method_name: str, *args, **kwargs - ) -> Any: + ) -> list[ProductInfo]: """ Esegue un metodo su tutti i provider fino a trovarne uno che funziona. diff --git a/src/app/toolkits/market_toolkit.py b/src/app/toolkits/market_toolkit.py index f0aef93..f366986 100644 --- a/src/app/toolkits/market_toolkit.py +++ b/src/app/toolkits/market_toolkit.py @@ -16,14 +16,14 @@ class MarketToolkit(Toolkit): name="Market Toolkit", tools=[ self.get_historical_data, - self.get_current_price, + self.get_current_prices, ], ) def get_historical_data(self, symbol: str): return self.market_api.get_historical_prices(symbol) - def get_current_price(self, symbol: str): + def get_current_prices(self, symbol: list): return self.market_api.get_products(symbol) def prepare_inputs(): diff --git a/src/app/utils/wrapper_handler.py b/src/app/utils/wrapper_handler.py new file mode 100644 index 0000000..df86c36 --- /dev/null +++ b/src/app/utils/wrapper_handler.py @@ -0,0 +1,110 @@ +import time +from typing import TypeVar, Callable, Generic, Iterable, Type +from agno.utils.log import log_warning + +W = TypeVar("W") +T = TypeVar("T") + +class WrapperHandler(Generic[W]): + """ + A handler for managing multiple wrappers with retry logic. + It attempts to call a function on the current wrapper, and if it fails, + it retries a specified number of times before switching to the next wrapper. + If all wrappers fail, it raises an exception. + + Note: use `build_wrappers` to create an instance of this class for better error handling. + """ + + def __init__(self, wrappers: list[W], try_per_wrapper: int = 3, retry_delay: int = 2): + """ + Initializes the WrapperHandler with a list of wrappers and retry settings.\n + Use `build_wrappers` to create an instance of this class for better error handling. + Args: + wrappers (list[W]): A list of wrapper instances to manage. + try_per_wrapper (int): Number of retries per wrapper before switching to the next. + retry_delay (int): Delay in seconds between retries. + """ + self.wrappers = wrappers + self.retry_per_wrapper = try_per_wrapper + self.retry_delay = retry_delay + self.index = 0 + self.retry_count = 0 + + def try_call(self, func: Callable[[W], T]) -> T: + """ + Attempts to call the provided function on the current wrapper. + If it fails, it retries a specified number of times before switching to the next wrapper. + If all wrappers fail, it raises an exception. + Args: + func (Callable[[W], T]): A function that takes a wrapper and returns a result. + Returns: + T: The result of the function call. + Raises: + Exception: If all wrappers fail after retries. + """ + iterations = 0 + while iterations < len(self.wrappers): + try: + wrapper = self.wrappers[self.index] + result = func(wrapper) + self.retry_count = 0 + return result + except Exception as e: + self.retry_count += 1 + if self.retry_count >= self.retry_per_wrapper: + self.index = (self.index + 1) % len(self.wrappers) + self.retry_count = 0 + iterations += 1 + else: + log_warning(f"{wrapper} failed {self.retry_count}/{self.retry_per_wrapper}: {e}") + time.sleep(self.retry_delay) + + raise Exception(f"All wrappers failed after retries") + + def try_call_all(self, func: Callable[[W], T]) -> dict[str, T]: + """ + Calls the provided function on all wrappers, collecting results. + If a wrapper fails, it logs a warning and continues with the next. + If all wrappers fail, it raises an exception. + Args: + func (Callable[[W], T]): A function that takes a wrapper and returns a result. + Returns: + list[T]: A list of results from the function calls. + Raises: + Exception: If all wrappers fail. + """ + results = {} + for wrapper in self.wrappers: + try: + result = func(wrapper) + results[wrapper.__class__] = result + except Exception as e: + log_warning(f"{wrapper} failed: {e}") + if not results: + raise Exception("All wrappers failed") + return results + + @staticmethod + def build_wrappers(constructors: Iterable[Type[W]], try_per_wrapper: int = 3, retry_delay: int = 2) -> 'WrapperHandler[W]': + """ + Builds a WrapperHandler instance with the given wrapper constructors. + It attempts to initialize each wrapper and logs a warning if any cannot be initialized. + Only successfully initialized wrappers are included in the handler. + Args: + constructors (Iterable[Type[W]]): An iterable of wrapper classes to instantiate. e.g. [WrapperA, WrapperB] + try_per_wrapper (int): Number of retries per wrapper before switching to the next. + retry_delay (int): Delay in seconds between retries. + Returns: + WrapperHandler[W]: An instance of WrapperHandler with the initialized wrappers. + Raises: + Exception: If no wrappers could be initialized. + """ + result = [] + for wrapper_class in constructors: + try: + wrapper = wrapper_class() + result.append(wrapper) + except Exception as e: + log_warning(f"{wrapper_class} cannot be initialized: {e}") + + return WrapperHandler(result, try_per_wrapper, retry_delay) \ No newline at end of file