Files
upo-app-agents/src/app/utils/wrapper_handler.py
Giacomo Bertolazzi d2fbc0ceea 12 fix docs (#13)
* fix dependencies uv.lock

* refactor test markers for clarity

* refactor: clean up imports and remove unused files

* refactor: remove unused agent files and clean up market API instructions

* refactor: enhance wrapper initialization with keyword arguments and clean up tests

* refactor: remove PublicBinanceAgent

* refactor: aggregator
- simplified MarketDataAggregator and related models to functions

* refactor: update README and .env.example to reflect the latest changes to the project

* refactor: simplify product info and price creation in YFinanceWrapper

* refactor: remove get_all_products method from market API wrappers and update documentation

* fix: environment variable assertions

* refactor: remove status attribute from ProductInfo and update related methods to use timestamp_ms

* feat: implement aggregate_history_prices function to calculate hourly price averages

* refactor: update docker-compose and app.py for improved environment variable handling and compatibility

* feat: add detailed market instructions and improve error handling in price aggregation methods

* feat: add aggregated news retrieval methods for top headlines and latest news

* refactor: improve error messages in WrapperHandler for better clarity

* fix: correct quote currency extraction in create_product_info and remove debug prints from tests
2025-10-02 01:40:59 +02:00

140 lines
6.0 KiB
Python

import inspect
import time
import traceback
from typing import TypeVar, Callable, Generic, Iterable, Type
from agno.utils.log import log_warning, log_info
W = TypeVar("W")
T = TypeVar("T")
class WrapperHandler(Generic[W]):
"""
A handler for managing multiple wrappers with retry logic.
It attempts to call a function on the current wrapper, and if it fails,
it retries a specified number of times before switching to the next wrapper.
If all wrappers fail, it raises an exception.
Note: use `build_wrappers` to create an instance of this class for better error handling.
"""
def __init__(self, wrappers: list[W], try_per_wrapper: int = 3, retry_delay: int = 2):
"""
Initializes the WrapperHandler with a list of wrappers and retry settings.\n
Use `build_wrappers` to create an instance of this class for better error handling.
Args:
wrappers (list[W]): A list of wrapper instances to manage.
try_per_wrapper (int): Number of retries per wrapper before switching to the next.
retry_delay (int): Delay in seconds between retries.
"""
assert not WrapperHandler.__check(wrappers), "All wrappers must be instances of their respective classes. Use `build_wrappers` to create the WrapperHandler."
self.wrappers = wrappers
self.retry_per_wrapper = try_per_wrapper
self.retry_delay = retry_delay
self.index = 0
self.retry_count = 0
def try_call(self, func: Callable[[W], T]) -> T:
"""
Attempts to call the provided function on the current wrapper.
If it fails, it retries a specified number of times before switching to the next wrapper.
If all wrappers fail, it raises an exception.
Args:
func (Callable[[W], T]): A function that takes a wrapper and returns a result.
Returns:
T: The result of the function call.
Raises:
Exception: If all wrappers fail after retries.
"""
log_info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
iterations = 0
while iterations < len(self.wrappers):
wrapper = self.wrappers[self.index]
wrapper_name = wrapper.__class__.__name__
try:
log_info(f"try_call {wrapper_name}")
result = func(wrapper)
log_info(f"{wrapper_name} succeeded")
self.retry_count = 0
return result
except Exception as e:
self.retry_count += 1
error = WrapperHandler.__concise_error(e)
log_warning(f"{wrapper_name} failed {self.retry_count}/{self.retry_per_wrapper}: {error}")
if self.retry_count >= self.retry_per_wrapper:
self.index = (self.index + 1) % len(self.wrappers)
self.retry_count = 0
iterations += 1
else:
time.sleep(self.retry_delay)
raise Exception(f"All wrappers failed, latest error: {error}")
def try_call_all(self, func: Callable[[W], T]) -> dict[str, T]:
"""
Calls the provided function on all wrappers, collecting results.
If a wrapper fails, it logs a warning and continues with the next.
If all wrappers fail, it raises an exception.
Args:
func (Callable[[W], T]): A function that takes a wrapper and returns a result.
Returns:
list[T]: A list of results from the function calls.
Raises:
Exception: If all wrappers fail.
"""
log_info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
results = {}
for wrapper in self.wrappers:
wrapper_name = wrapper.__class__.__name__
try:
result = func(wrapper)
log_info(f"{wrapper_name} succeeded")
results[wrapper.__class__] = result
except Exception as e:
error = WrapperHandler.__concise_error(e)
log_warning(f"{wrapper_name} failed: {error}")
if not results:
raise Exception(f"All wrappers failed, latest error: {error}")
return results
@staticmethod
def __check(wrappers: list[W]) -> bool:
return all(w.__class__ is type for w in wrappers)
@staticmethod
def __concise_error(e: Exception) -> str:
last_frame = traceback.extract_tb(e.__traceback__)[-1]
return f"{e} [\"{last_frame.filename}\", line {last_frame.lineno}]"
@staticmethod
def build_wrappers(constructors: Iterable[Type[W]], try_per_wrapper: int = 3, retry_delay: int = 2, kwargs: dict | None = None) -> 'WrapperHandler[W]':
"""
Builds a WrapperHandler instance with the given wrapper constructors.
It attempts to initialize each wrapper and logs a warning if any cannot be initialized.
Only successfully initialized wrappers are included in the handler.
Args:
constructors (Iterable[Type[W]]): An iterable of wrapper classes to instantiate. e.g. [WrapperA, WrapperB]
try_per_wrapper (int): Number of retries per wrapper before switching to the next.
retry_delay (int): Delay in seconds between retries.
kwargs (dict | None): Optional dictionary with keyword arguments common to all wrappers.
Returns:
WrapperHandler[W]: An instance of WrapperHandler with the initialized wrappers.
Raises:
Exception: If no wrappers could be initialized.
"""
assert WrapperHandler.__check(constructors), f"All constructors must be classes. Received: {constructors}"
result = []
for wrapper_class in constructors:
try:
wrapper = wrapper_class(**(kwargs or {}))
result.append(wrapper)
except Exception as e:
log_warning(f"{wrapper_class} cannot be initialized: {e}")
return WrapperHandler(result, try_per_wrapper, retry_delay)