Implement WrapperHandler for managing multiple news API wrappers; add tests for wrapper functionality
This commit is contained in:
@@ -1,5 +1,18 @@
|
||||
from app.utils.wrapper_handler import WrapperHandler
|
||||
from .base import NewsWrapper, Article
|
||||
from .news_api import NewsApiWrapper
|
||||
from .gnews_api import GnewsWrapper
|
||||
from .cryptopanic_api import CryptoPanicWrapper
|
||||
|
||||
__all__ = ["NewsApiWrapper", "GnewsWrapper", "CryptoPanicWrapper"]
|
||||
__all__ = ["NewsApiWrapper", "GnewsWrapper", "CryptoPanicWrapper"]
|
||||
|
||||
|
||||
class NewsAPIs(NewsWrapper):
|
||||
def __init__(self):
|
||||
wrappers = [GnewsWrapper, NewsApiWrapper, CryptoPanicWrapper]
|
||||
self.wrapper_handler: WrapperHandler[NewsWrapper] = WrapperHandler.build_wrappers(wrappers)
|
||||
|
||||
def get_top_headlines(self, query: str, total: int = 100) -> list[Article]:
|
||||
return self.wrapper_handler.try_call(lambda w: w.get_top_headlines(query, total))
|
||||
def get_latest_news(self, query: str, total: int = 100) -> list[Article]:
|
||||
return self.wrapper_handler.try_call(lambda w: w.get_latest_news(query, total))
|
||||
|
||||
48
src/app/utils/wrapper_handler.py
Normal file
48
src/app/utils/wrapper_handler.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import time
|
||||
from typing import TypeVar, Callable, Generic, Iterable, Type
|
||||
from agno.utils.log import log_warning
|
||||
|
||||
W = TypeVar("W")
|
||||
T = TypeVar("T")
|
||||
|
||||
class WrapperHandler(Generic[W]):
|
||||
def __init__(self, wrappers: list[W], try_per_wrapper: int = 3, retry_delay: int = 2):
|
||||
self.wrappers = wrappers
|
||||
self.retry_per_wrapper = try_per_wrapper
|
||||
self.retry_delay = retry_delay
|
||||
self.index = 0
|
||||
self.retry_count = 0
|
||||
|
||||
def try_call(self, func: Callable[[W], T]) -> T:
|
||||
iterations = 0
|
||||
while iterations < len(self.wrappers):
|
||||
print(f"Trying wrapper {self.index}")
|
||||
try:
|
||||
wrapper = self.wrappers[self.index]
|
||||
result = func(wrapper)
|
||||
self.retry_count = 0
|
||||
return result
|
||||
except Exception as e:
|
||||
self.retry_count += 1
|
||||
print(f"Error occurred {self.retry_count}/{self.retry_per_wrapper}: {e}")
|
||||
if self.retry_count >= self.retry_per_wrapper:
|
||||
self.index = (self.index + 1) % len(self.wrappers)
|
||||
self.retry_count = 0
|
||||
iterations += 1
|
||||
else:
|
||||
log_warning(f"{wrapper} failed {self.retry_count}/{self.retry_per_wrapper}: {e}")
|
||||
time.sleep(self.retry_delay)
|
||||
|
||||
raise Exception(f"All wrappers failed after retries")
|
||||
|
||||
@staticmethod
|
||||
def build_wrappers(constructors: Iterable[Type[W]], try_per_wrapper: int = 3, retry_delay: int = 2) -> 'WrapperHandler[W]':
|
||||
result = []
|
||||
for wrapper_class in constructors:
|
||||
try:
|
||||
wrapper = wrapper_class()
|
||||
result.append(wrapper)
|
||||
except Exception as e:
|
||||
log_warning(f"{wrapper_class} cannot be initialized: {e}")
|
||||
|
||||
return WrapperHandler(result, try_per_wrapper, retry_delay)
|
||||
@@ -16,8 +16,8 @@ def unified_checks(model: AppModels, input):
|
||||
for item in content.portfolio:
|
||||
assert item.asset not in (None, "", "null")
|
||||
assert isinstance(item.asset, str)
|
||||
assert item.percentage > 0
|
||||
assert item.percentage <= 100
|
||||
assert item.percentage >= 0.0
|
||||
assert item.percentage <= 100.0
|
||||
assert isinstance(item.percentage, (int, float))
|
||||
assert item.motivation not in (None, "", "null")
|
||||
assert isinstance(item.motivation, str)
|
||||
@@ -41,6 +41,7 @@ class TestPredictor:
|
||||
def test_gemini_model_output(self, inputs):
|
||||
unified_checks(AppModels.GEMINI, inputs)
|
||||
|
||||
@pytest.mark.slow
|
||||
def test_ollama_qwen_model_output(self, inputs):
|
||||
unified_checks(AppModels.OLLAMA_QWEN, inputs)
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ def pytest_configure(config:pytest.Config):
|
||||
("ollama_qwen", "marks tests that use Ollama Qwen model"),
|
||||
("news", "marks tests that use news"),
|
||||
("limited", "marks tests that have limited execution due to API constraints"),
|
||||
("wrapper", "marks tests for wrapper handler"),
|
||||
]
|
||||
for marker in markers:
|
||||
line = f"{marker[0]}: {marker[1]}"
|
||||
|
||||
60
tests/utils/test_wrapper_handler.py
Normal file
60
tests/utils/test_wrapper_handler.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import pytest
|
||||
from app.utils.wrapper_handler import WrapperHandler
|
||||
|
||||
class MockWrapper:
|
||||
def do_something(self) -> str:
|
||||
return "Success"
|
||||
|
||||
class FailingWrapper(MockWrapper):
|
||||
def do_something(self):
|
||||
raise Exception("Intentional Failure")
|
||||
|
||||
|
||||
@pytest.mark.wrapper
|
||||
class TestWrapperHandler:
|
||||
def test_all_wrappers_fail(self):
|
||||
wrappers = [FailingWrapper, FailingWrapper]
|
||||
handler: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=2, retry_delay=0)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
handler.try_call(lambda w: w.do_something())
|
||||
assert "All wrappers failed after retries" in str(exc_info.value)
|
||||
|
||||
def test_success_on_first_try(self):
|
||||
wrappers = [MockWrapper, FailingWrapper]
|
||||
handler: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=2, retry_delay=0)
|
||||
|
||||
result = handler.try_call(lambda w: w.do_something())
|
||||
assert result == "Success"
|
||||
assert handler.index == 0 # Should still be on the first wrapper
|
||||
assert handler.retry_count == 0
|
||||
|
||||
def test_eventual_success(self):
|
||||
wrappers = [FailingWrapper, MockWrapper]
|
||||
handler: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=2, retry_delay=0)
|
||||
|
||||
result = handler.try_call(lambda w: w.do_something())
|
||||
assert result == "Success"
|
||||
assert handler.index == 1 # Should have switched to the second wrapper
|
||||
assert handler.retry_count == 0
|
||||
|
||||
def test_partial_failures(self):
|
||||
wrappers = [FailingWrapper, MockWrapper, FailingWrapper]
|
||||
handler: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
|
||||
result = handler.try_call(lambda w: w.do_something())
|
||||
assert result == "Success"
|
||||
assert handler.index == 1 # Should have switched to the second wrapper
|
||||
assert handler.retry_count == 0
|
||||
|
||||
# Next call should still succeed on the second wrapper
|
||||
result = handler.try_call(lambda w: w.do_something())
|
||||
assert result == "Success"
|
||||
assert handler.index == 1 # Should still be on the second wrapper
|
||||
assert handler.retry_count == 0
|
||||
|
||||
handler.index = 2 # Manually switch to the third wrapper
|
||||
result = handler.try_call(lambda w: w.do_something())
|
||||
assert result == "Success"
|
||||
assert handler.index == 1 # Should return to the second wrapper after failure
|
||||
assert handler.retry_count == 0
|
||||
Reference in New Issue
Block a user