Refactor and update structure (#20)
* Aggiorna gli agenti e il modello del team per utilizzare OLLAMA_QWEN_1B * Riorganizza e rinomina funzioni di estrazione in moduli di mercato e notizie; migliora la gestione delle importazioni * Spostato main nel corretto file __main__ e aggiornato il README.md * Aggiunta cartella per i modelli, agenti e team * Aggiornata la posizione delle istruzioni * Rimossi TODO e Aggiunto documentazione per metodi aggregated * Aggiornate le istruzioni del coordinatore del team * utils type checks * Rinominato BaseWrapper in MarketWrapper e fix type check markets * fix type checks di notizie e social. * Aggiunti type hints finali * Riorganizzati gli import * Refactoring architetturale e spostamento classi base - Eliminazione del file __init__.py obsoleto che importava ChatManager e Pipeline - Spostamento della classe Pipeline in agents/pipeline.py - Spostamento della classe ChatManager in utils/chat_manager.py - Aggiornamento di __main__.py per importare da app.utils e app.agents, e modifica della logica per utilizzare Pipeline invece di chat per la selezione di provider e stile - Creazione della cartella base con classi base comuni: markets.py (ProductInfo, Price, MarketWrapper), news.py (Article, NewsWrapper), social.py (SocialPost, SocialComment, SocialWrapper) - Aggiornamento di tutti gli import nel progetto (markets/, news/, social/, utils/, tests/) per utilizzare la nuova struttura base/ * Aggiornato Readme * Corretto il valore predefinito della valuta in BinanceWrapper da "USDT" a "USD" * fix type in tests * fix type per models * Rinominato 'quote_currency' in 'currency' e aggiornato il trattamento del timestamp in Price * fix errors found by Copilot * WrapperHandler: semplificata la logica di chiamata delle funzioni sui wrapper * fix docs * fix demos, semplificata logica lista ollama
This commit was merged in pull request #20.
This commit is contained in:
committed by
GitHub
parent
85153c405b
commit
517842c834
@@ -1,11 +1,11 @@
|
||||
import pytest
|
||||
from app.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle
|
||||
from app.markets.base import ProductInfo
|
||||
from app.models import AppModels
|
||||
from app.agents import AppModels
|
||||
from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle
|
||||
from app.base.markets import ProductInfo
|
||||
|
||||
def unified_checks(model: AppModels, input):
|
||||
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type]
|
||||
result = llm.run(input)
|
||||
def unified_checks(model: AppModels, input: PredictorInput) -> None:
|
||||
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output_schema=PredictorOutput) # type: ignore[arg-type]
|
||||
result = llm.run(input) # type: ignore
|
||||
content = result.content
|
||||
|
||||
assert isinstance(content, PredictorOutput)
|
||||
@@ -27,9 +27,8 @@ def unified_checks(model: AppModels, input):
|
||||
|
||||
class TestPredictor:
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def inputs(self):
|
||||
data = []
|
||||
def inputs(self) -> PredictorInput:
|
||||
data: list[ProductInfo] = []
|
||||
for symbol, price in [("BTC", 60000.00), ("ETH", 3500.00), ("SOL", 150.00)]:
|
||||
product_info = ProductInfo()
|
||||
product_info.symbol = symbol
|
||||
@@ -38,13 +37,20 @@ class TestPredictor:
|
||||
|
||||
return PredictorInput(data=data, style=PredictorStyle.AGGRESSIVE, sentiment="positivo")
|
||||
|
||||
def test_gemini_model_output(self, inputs):
|
||||
def test_gemini_model_output(self):
|
||||
inputs = self.inputs()
|
||||
unified_checks(AppModels.GEMINI, inputs)
|
||||
|
||||
def test_ollama_qwen_4b_model_output(self):
|
||||
inputs = self.inputs()
|
||||
unified_checks(AppModels.OLLAMA_QWEN_4B, inputs)
|
||||
|
||||
@pytest.mark.slow
|
||||
def test_ollama_qwen_model_output(self, inputs):
|
||||
def test_ollama_qwen_latest_model_output(self):
|
||||
inputs = self.inputs()
|
||||
unified_checks(AppModels.OLLAMA_QWEN, inputs)
|
||||
|
||||
@pytest.mark.slow
|
||||
def test_ollama_gpt_oss_model_output(self, inputs):
|
||||
def test_ollama_gpt_oss_model_output(self):
|
||||
inputs = self.inputs()
|
||||
unified_checks(AppModels.OLLAMA_GPT, inputs)
|
||||
|
||||
@@ -45,9 +45,9 @@ class TestBinance:
|
||||
assert isinstance(history, list)
|
||||
assert len(history) == 5
|
||||
for entry in history:
|
||||
assert hasattr(entry, 'timestamp_ms')
|
||||
assert hasattr(entry, 'timestamp')
|
||||
assert hasattr(entry, 'close')
|
||||
assert hasattr(entry, 'high')
|
||||
assert entry.close > 0
|
||||
assert entry.high > 0
|
||||
assert entry.timestamp_ms > 0
|
||||
assert entry.timestamp != ''
|
||||
|
||||
@@ -47,9 +47,9 @@ class TestCoinBase:
|
||||
assert isinstance(history, list)
|
||||
assert len(history) == 5
|
||||
for entry in history:
|
||||
assert hasattr(entry, 'timestamp_ms')
|
||||
assert hasattr(entry, 'timestamp')
|
||||
assert hasattr(entry, 'close')
|
||||
assert hasattr(entry, 'high')
|
||||
assert entry.close > 0
|
||||
assert entry.high > 0
|
||||
assert entry.timestamp_ms > 0
|
||||
assert entry.timestamp != ''
|
||||
|
||||
@@ -49,9 +49,9 @@ class TestCryptoCompare:
|
||||
assert isinstance(history, list)
|
||||
assert len(history) == 5
|
||||
for entry in history:
|
||||
assert hasattr(entry, 'timestamp_ms')
|
||||
assert hasattr(entry, 'timestamp')
|
||||
assert hasattr(entry, 'close')
|
||||
assert hasattr(entry, 'high')
|
||||
assert entry.close > 0
|
||||
assert entry.high > 0
|
||||
assert entry.timestamp_ms > 0
|
||||
assert entry.timestamp != ''
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import os
|
||||
import pytest
|
||||
from praw import Reddit
|
||||
from app.social.reddit import MAX_COMMENTS, RedditWrapper
|
||||
|
||||
@pytest.mark.social
|
||||
@@ -10,7 +9,7 @@ class TestRedditWrapper:
|
||||
def test_initialization(self):
|
||||
wrapper = RedditWrapper()
|
||||
assert wrapper is not None
|
||||
assert isinstance(wrapper.tool, Reddit)
|
||||
assert wrapper.tool is not None
|
||||
|
||||
def test_get_top_crypto_posts(self):
|
||||
wrapper = RedditWrapper()
|
||||
|
||||
@@ -48,9 +48,9 @@ class TestYFinance:
|
||||
assert isinstance(history, list)
|
||||
assert len(history) == 5
|
||||
for entry in history:
|
||||
assert hasattr(entry, 'timestamp_ms')
|
||||
assert hasattr(entry, 'timestamp')
|
||||
assert hasattr(entry, 'close')
|
||||
assert hasattr(entry, 'high')
|
||||
assert entry.close > 0
|
||||
assert entry.high > 0
|
||||
assert entry.timestamp_ms > 0
|
||||
assert entry.timestamp != ''
|
||||
|
||||
@@ -33,7 +33,7 @@ def pytest_configure(config:pytest.Config):
|
||||
line = f"{marker[0]}: {marker[1]}"
|
||||
config.addinivalue_line("markers", line)
|
||||
|
||||
def pytest_collection_modifyitems(config, items):
|
||||
def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None:
|
||||
"""Modifica automaticamente degli item di test rimovendoli"""
|
||||
# Rimuovo i test "limited" e "slow" se non richiesti esplicitamente
|
||||
mark_to_remove = ['limited', 'slow']
|
||||
|
||||
@@ -7,15 +7,15 @@ from app.markets import MarketAPIsTool
|
||||
@pytest.mark.api
|
||||
class TestMarketAPIsTool:
|
||||
def test_wrapper_initialization(self):
|
||||
market_wrapper = MarketAPIsTool("USD")
|
||||
market_wrapper = MarketAPIsTool("EUR")
|
||||
assert market_wrapper is not None
|
||||
assert hasattr(market_wrapper, 'get_product')
|
||||
assert hasattr(market_wrapper, 'get_products')
|
||||
assert hasattr(market_wrapper, 'get_historical_prices')
|
||||
|
||||
def test_wrapper_capabilities(self):
|
||||
market_wrapper = MarketAPIsTool("USD")
|
||||
capabilities = []
|
||||
market_wrapper = MarketAPIsTool("EUR")
|
||||
capabilities: list[str] = []
|
||||
if hasattr(market_wrapper, 'get_product'):
|
||||
capabilities.append('single_product')
|
||||
if hasattr(market_wrapper, 'get_products'):
|
||||
@@ -25,7 +25,7 @@ class TestMarketAPIsTool:
|
||||
assert len(capabilities) > 0
|
||||
|
||||
def test_market_data_retrieval(self):
|
||||
market_wrapper = MarketAPIsTool("USD")
|
||||
market_wrapper = MarketAPIsTool("EUR")
|
||||
btc_product = market_wrapper.get_product("BTC")
|
||||
assert btc_product is not None
|
||||
assert hasattr(btc_product, 'symbol')
|
||||
@@ -34,8 +34,8 @@ class TestMarketAPIsTool:
|
||||
|
||||
def test_error_handling(self):
|
||||
try:
|
||||
market_wrapper = MarketAPIsTool("USD")
|
||||
market_wrapper = MarketAPIsTool("EUR")
|
||||
fake_product = market_wrapper.get_product("NONEXISTENT_CRYPTO_SYMBOL_12345")
|
||||
assert fake_product is None or fake_product.price == 0
|
||||
except Exception as e:
|
||||
except Exception as _:
|
||||
pass
|
||||
|
||||
@@ -33,7 +33,7 @@ class TestNewsAPITool:
|
||||
result = tool.wrapper_handler.try_call_all(lambda w: w.get_top_headlines(limit=2))
|
||||
assert isinstance(result, dict)
|
||||
assert len(result.keys()) > 0
|
||||
for provider, articles in result.items():
|
||||
for _provider, articles in result.items():
|
||||
for article in articles:
|
||||
assert article.title is not None
|
||||
assert article.source is not None
|
||||
@@ -43,7 +43,7 @@ class TestNewsAPITool:
|
||||
result = tool.wrapper_handler.try_call_all(lambda w: w.get_latest_news(query="crypto", limit=2))
|
||||
assert isinstance(result, dict)
|
||||
assert len(result.keys()) > 0
|
||||
for provider, articles in result.items():
|
||||
for _provider, articles in result.items():
|
||||
for article in articles:
|
||||
assert article.title is not None
|
||||
assert article.source is not None
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import pytest
|
||||
from app.markets.base import ProductInfo, Price
|
||||
from datetime import datetime
|
||||
from app.base.markets import ProductInfo, Price
|
||||
from app.utils.market_aggregation import aggregate_history_prices, aggregate_product_info
|
||||
|
||||
|
||||
@@ -13,12 +14,12 @@ class TestMarketDataAggregator:
|
||||
prod.symbol=symbol
|
||||
prod.price=price
|
||||
prod.volume_24h=volume
|
||||
prod.quote_currency=currency
|
||||
prod.currency=currency
|
||||
return prod
|
||||
|
||||
def __price(self, timestamp_ms: int, high: float, low: float, open: float, close: float, volume: float) -> Price:
|
||||
def __price(self, timestamp_s: int, high: float, low: float, open: float, close: float, volume: float) -> Price:
|
||||
price = Price()
|
||||
price.timestamp_ms = timestamp_ms
|
||||
price.set_timestamp(timestamp_s=timestamp_s)
|
||||
price.high = high
|
||||
price.low = low
|
||||
price.open = open
|
||||
@@ -41,9 +42,9 @@ class TestMarketDataAggregator:
|
||||
assert info.symbol == "BTC"
|
||||
|
||||
avg_weighted_price = (50000.0 * 1000.0 + 50100.0 * 1100.0 + 49900.0 * 900.0) / (1000.0 + 1100.0 + 900.0)
|
||||
assert info.price == pytest.approx(avg_weighted_price, rel=1e-3)
|
||||
assert info.volume_24h == pytest.approx(1000.0, rel=1e-3)
|
||||
assert info.quote_currency == "USD"
|
||||
assert info.price == pytest.approx(avg_weighted_price, rel=1e-3) # type: ignore
|
||||
assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore
|
||||
assert info.currency == "USD"
|
||||
|
||||
def test_aggregate_product_info_multiple_symbols(self):
|
||||
products = {
|
||||
@@ -65,18 +66,18 @@ class TestMarketDataAggregator:
|
||||
|
||||
assert btc_info is not None
|
||||
avg_weighted_price_btc = (50000.0 * 1000.0 + 50100.0 * 1100.0) / (1000.0 + 1100.0)
|
||||
assert btc_info.price == pytest.approx(avg_weighted_price_btc, rel=1e-3)
|
||||
assert btc_info.volume_24h == pytest.approx(1050.0, rel=1e-3)
|
||||
assert btc_info.quote_currency == "USD"
|
||||
assert btc_info.price == pytest.approx(avg_weighted_price_btc, rel=1e-3) # type: ignore
|
||||
assert btc_info.volume_24h == pytest.approx(1050.0, rel=1e-3) # type: ignore
|
||||
assert btc_info.currency == "USD"
|
||||
|
||||
assert eth_info is not None
|
||||
avg_weighted_price_eth = (4000.0 * 2000.0 + 4050.0 * 2100.0) / (2000.0 + 2100.0)
|
||||
assert eth_info.price == pytest.approx(avg_weighted_price_eth, rel=1e-3)
|
||||
assert eth_info.volume_24h == pytest.approx(2050.0, rel=1e-3)
|
||||
assert eth_info.quote_currency == "USD"
|
||||
assert eth_info.price == pytest.approx(avg_weighted_price_eth, rel=1e-3) # type: ignore
|
||||
assert eth_info.volume_24h == pytest.approx(2050.0, rel=1e-3) # type: ignore
|
||||
assert eth_info.currency == "USD"
|
||||
|
||||
def test_aggregate_product_info_with_no_data(self):
|
||||
products = {
|
||||
products: dict[str, list[ProductInfo]] = {
|
||||
"Provider1": [],
|
||||
"Provider2": [],
|
||||
}
|
||||
@@ -84,7 +85,7 @@ class TestMarketDataAggregator:
|
||||
assert len(aggregated) == 0
|
||||
|
||||
def test_aggregate_product_info_with_partial_data(self):
|
||||
products = {
|
||||
products: dict[str, list[ProductInfo]] = {
|
||||
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")],
|
||||
"Provider2": [],
|
||||
}
|
||||
@@ -92,29 +93,38 @@ class TestMarketDataAggregator:
|
||||
assert len(aggregated) == 1
|
||||
info = aggregated[0]
|
||||
assert info.symbol == "BTC"
|
||||
assert info.price == pytest.approx(50000.0, rel=1e-3)
|
||||
assert info.volume_24h == pytest.approx(1000.0, rel=1e-3)
|
||||
assert info.quote_currency == "USD"
|
||||
assert info.price == pytest.approx(50000.0, rel=1e-3) # type: ignore
|
||||
assert info.volume_24h == pytest.approx(1000.0, rel=1e-3) # type: ignore
|
||||
assert info.currency == "USD"
|
||||
|
||||
def test_aggregate_history_prices(self):
|
||||
"""Test aggregazione di prezzi storici usando aggregate_history_prices"""
|
||||
timestamp_now = datetime.now()
|
||||
timestamp_1h_ago = int(timestamp_now.replace(hour=timestamp_now.hour - 1).timestamp())
|
||||
timestamp_2h_ago = int(timestamp_now.replace(hour=timestamp_now.hour - 2).timestamp())
|
||||
|
||||
prices = {
|
||||
"Provider1": [
|
||||
self.__price(1685577600000, 50000.0, 49500.0, 49600.0, 49900.0, 150.0),
|
||||
self.__price(1685581200000, 50200.0, 49800.0, 50000.0, 50100.0, 200.0),
|
||||
self.__price(timestamp_1h_ago, 50000.0, 49500.0, 49600.0, 49900.0, 150.0),
|
||||
self.__price(timestamp_2h_ago, 50200.0, 49800.0, 50000.0, 50100.0, 200.0),
|
||||
],
|
||||
"Provider2": [
|
||||
self.__price(1685577600000, 50100.0, 49600.0, 49700.0, 50000.0, 180.0),
|
||||
self.__price(1685581200000, 50300.0, 49900.0, 50100.0, 50200.0, 220.0),
|
||||
self.__price(timestamp_1h_ago, 50100.0, 49600.0, 49700.0, 50000.0, 180.0),
|
||||
self.__price(timestamp_2h_ago, 50300.0, 49900.0, 50100.0, 50200.0, 220.0),
|
||||
],
|
||||
}
|
||||
|
||||
price = Price()
|
||||
price.set_timestamp(timestamp_s=timestamp_1h_ago)
|
||||
timestamp_1h_ago = price.timestamp
|
||||
price.set_timestamp(timestamp_s=timestamp_2h_ago)
|
||||
timestamp_2h_ago = price.timestamp
|
||||
|
||||
aggregated = aggregate_history_prices(prices)
|
||||
assert len(aggregated) == 2
|
||||
assert aggregated[0].timestamp_ms == 1685577600000
|
||||
assert aggregated[0].high == pytest.approx(50050.0, rel=1e-3)
|
||||
assert aggregated[0].low == pytest.approx(49550.0, rel=1e-3)
|
||||
assert aggregated[1].timestamp_ms == 1685581200000
|
||||
assert aggregated[1].high == pytest.approx(50250.0, rel=1e-3)
|
||||
assert aggregated[1].low == pytest.approx(49850.0, rel=1e-3)
|
||||
assert aggregated[0].timestamp == timestamp_1h_ago
|
||||
assert aggregated[0].high == pytest.approx(50050.0, rel=1e-3) # type: ignore
|
||||
assert aggregated[0].low == pytest.approx(49550.0, rel=1e-3) # type: ignore
|
||||
assert aggregated[1].timestamp == timestamp_2h_ago
|
||||
assert aggregated[1].high == pytest.approx(50250.0, rel=1e-3) # type: ignore
|
||||
assert aggregated[1].low == pytest.approx(49850.0, rel=1e-3) # type: ignore
|
||||
|
||||
@@ -37,7 +37,7 @@ class TestWrapperHandler:
|
||||
|
||||
def test_init_failing_with_instances(self):
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
WrapperHandler.build_wrappers([MockWrapper(), MockWrapper2()])
|
||||
WrapperHandler.build_wrappers([MockWrapper(), MockWrapper2()]) # type: ignore
|
||||
assert exc_info.type == AssertionError
|
||||
|
||||
def test_init_not_failing(self):
|
||||
@@ -49,104 +49,98 @@ class TestWrapperHandler:
|
||||
assert len(handler.wrappers) == 2
|
||||
|
||||
def test_all_wrappers_fail(self):
|
||||
wrappers = [FailingWrapper, FailingWrapper]
|
||||
handler: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=2, retry_delay=0)
|
||||
wrappers: list[type[MockWrapper]] = [FailingWrapper, FailingWrapper]
|
||||
handler = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=2, retry_delay=0)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
handler.try_call(lambda w: w.do_something())
|
||||
assert "All wrappers failed" in str(exc_info.value)
|
||||
|
||||
def test_success_on_first_try(self):
|
||||
wrappers = [MockWrapper, FailingWrapper]
|
||||
handler: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=2, retry_delay=0)
|
||||
wrappers: list[type[MockWrapper]] = [MockWrapper, FailingWrapper]
|
||||
handler = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=2, retry_delay=0)
|
||||
|
||||
result = handler.try_call(lambda w: w.do_something())
|
||||
assert result == "Success"
|
||||
assert handler.index == 0 # Should still be on the first wrapper
|
||||
assert handler.retry_count == 0
|
||||
|
||||
def test_eventual_success(self):
|
||||
wrappers = [FailingWrapper, MockWrapper]
|
||||
handler: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=2, retry_delay=0)
|
||||
wrappers: list[type[MockWrapper]] = [FailingWrapper, MockWrapper]
|
||||
handler = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=2, retry_delay=0)
|
||||
|
||||
result = handler.try_call(lambda w: w.do_something())
|
||||
assert result == "Success"
|
||||
assert handler.index == 1 # Should have switched to the second wrapper
|
||||
assert handler.retry_count == 0
|
||||
|
||||
def test_partial_failures(self):
|
||||
wrappers = [FailingWrapper, MockWrapper, FailingWrapper]
|
||||
handler: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
wrappers: list[type[MockWrapper]] = [FailingWrapper, MockWrapper, FailingWrapper]
|
||||
handler = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
|
||||
result = handler.try_call(lambda w: w.do_something())
|
||||
assert result == "Success"
|
||||
assert handler.index == 1 # Should have switched to the second wrapper
|
||||
assert handler.retry_count == 0
|
||||
|
||||
# Next call should still succeed on the second wrapper
|
||||
result = handler.try_call(lambda w: w.do_something())
|
||||
assert result == "Success"
|
||||
assert handler.index == 1 # Should still be on the second wrapper
|
||||
assert handler.retry_count == 0
|
||||
|
||||
handler.index = 2 # Manually switch to the third wrapper
|
||||
result = handler.try_call(lambda w: w.do_something())
|
||||
assert result == "Success"
|
||||
assert handler.index == 1 # Should return to the second wrapper after failure
|
||||
assert handler.retry_count == 0
|
||||
|
||||
def test_try_call_all_success(self):
|
||||
wrappers = [MockWrapper, MockWrapper2]
|
||||
handler: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
wrappers: list[type[MockWrapper]] = [MockWrapper, MockWrapper2]
|
||||
handler = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
results = handler.try_call_all(lambda w: w.do_something())
|
||||
assert results == {MockWrapper: "Success", MockWrapper2: "Success 2"}
|
||||
assert results == {MockWrapper.__name__: "Success", MockWrapper2.__name__: "Success 2"}
|
||||
|
||||
def test_try_call_all_partial_failures(self):
|
||||
# Only the second wrapper should succeed
|
||||
wrappers = [FailingWrapper, MockWrapper, FailingWrapper]
|
||||
handler: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
wrappers: list[type[MockWrapper]] = [FailingWrapper, MockWrapper, FailingWrapper]
|
||||
handler = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
results = handler.try_call_all(lambda w: w.do_something())
|
||||
assert results == {MockWrapper: "Success"}
|
||||
assert results == {MockWrapper.__name__: "Success"}
|
||||
|
||||
# Only the second and fourth wrappers should succeed
|
||||
wrappers = [FailingWrapper, MockWrapper, FailingWrapper, MockWrapper2]
|
||||
handler: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
wrappers: list[type[MockWrapper]] = [FailingWrapper, MockWrapper, FailingWrapper, MockWrapper2]
|
||||
handler = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
results = handler.try_call_all(lambda w: w.do_something())
|
||||
assert results == {MockWrapper: "Success", MockWrapper2: "Success 2"}
|
||||
assert results == {MockWrapper.__name__: "Success", MockWrapper2.__name__: "Success 2"}
|
||||
|
||||
def test_try_call_all_all_fail(self):
|
||||
# Test when all wrappers fail
|
||||
handler_all_fail: WrapperHandler[MockWrapper] = WrapperHandler.build_wrappers([FailingWrapper, FailingWrapper], try_per_wrapper=1, retry_delay=0)
|
||||
handler_all_fail = WrapperHandler.build_wrappers([FailingWrapper, FailingWrapper], try_per_wrapper=1, retry_delay=0)
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
handler_all_fail.try_call_all(lambda w: w.do_something())
|
||||
assert "All wrappers failed" in str(exc_info.value)
|
||||
|
||||
def test_wrappers_with_parameters(self):
|
||||
wrappers = [FailingWrapperWithParameters, MockWrapperWithParameters]
|
||||
handler: WrapperHandler[MockWrapperWithParameters] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=2, retry_delay=0)
|
||||
wrappers: list[type[MockWrapperWithParameters]] = [FailingWrapperWithParameters, MockWrapperWithParameters]
|
||||
handler = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=2, retry_delay=0)
|
||||
|
||||
result = handler.try_call(lambda w: w.do_something("test", 42))
|
||||
assert result == "Success test and 42"
|
||||
assert handler.index == 1 # Should have switched to the second wrapper
|
||||
assert handler.retry_count == 0
|
||||
|
||||
def test_wrappers_with_parameters_all_fail(self):
|
||||
wrappers = [FailingWrapperWithParameters, FailingWrapperWithParameters]
|
||||
handler: WrapperHandler[MockWrapperWithParameters] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
wrappers: list[type[MockWrapperWithParameters]] = [FailingWrapperWithParameters, FailingWrapperWithParameters]
|
||||
handler = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
handler.try_call(lambda w: w.do_something("test", 42))
|
||||
assert "All wrappers failed" in str(exc_info.value)
|
||||
|
||||
def test_try_call_all_with_parameters(self):
|
||||
wrappers = [FailingWrapperWithParameters, MockWrapperWithParameters]
|
||||
handler: WrapperHandler[MockWrapperWithParameters] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
wrappers: list[type[MockWrapperWithParameters]] = [FailingWrapperWithParameters, MockWrapperWithParameters]
|
||||
handler = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
results = handler.try_call_all(lambda w: w.do_something("param", 99))
|
||||
assert results == {MockWrapperWithParameters: "Success param and 99"}
|
||||
assert results == {MockWrapperWithParameters.__name__: "Success param and 99"}
|
||||
|
||||
def test_try_call_all_with_parameters_all_fail(self):
|
||||
wrappers = [FailingWrapperWithParameters, FailingWrapperWithParameters]
|
||||
handler: WrapperHandler[MockWrapperWithParameters] = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
wrappers: list[type[MockWrapperWithParameters]] = [FailingWrapperWithParameters, FailingWrapperWithParameters]
|
||||
handler = WrapperHandler.build_wrappers(wrappers, try_per_wrapper=1, retry_delay=0)
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
handler.try_call_all(lambda w: w.do_something("param", 99))
|
||||
assert "All wrappers failed" in str(exc_info.value)
|
||||
|
||||
Reference in New Issue
Block a user