feat: implement aggregate_history_prices function to calculate hourly price averages
This commit is contained in:
@@ -3,21 +3,33 @@ from app.markets.base import ProductInfo, Price
|
||||
|
||||
|
||||
def aggregate_history_prices(prices: dict[str, list[Price]]) -> list[Price]:
|
||||
"""Aggrega i prezzi storici per symbol calcolando la media"""
|
||||
raise NotImplementedError("Funzione non ancora implementata per problemi di timestamp he deve essere uniformato prima di usare questa funzione.")
|
||||
# TODO implementare l'aggregazione dopo aver modificato la classe Price in modo che abbia un timestamp integer
|
||||
# aggregated_prices = []
|
||||
# for timestamp in range(len(next(iter(prices.values())))):
|
||||
# timestamp_prices = [
|
||||
# price_list[timestamp].price
|
||||
# for price_list in prices.values()
|
||||
# if len(price_list) > timestamp and price_list[timestamp].price is not None
|
||||
# ]
|
||||
# if timestamp_prices:
|
||||
# aggregated_prices.append(statistics.mean(timestamp_prices))
|
||||
# else:
|
||||
# aggregated_prices.append(None)
|
||||
# return aggregated_prices
|
||||
"""
|
||||
Aggrega i prezzi storici per symbol calcolando la media
|
||||
|
||||
"""
|
||||
max_list_length = max(len(p) for p in prices.values())
|
||||
|
||||
# Costruiamo una mappa timestamp_h -> lista di Price
|
||||
timestamped_prices: dict[int, list[Price]] = {}
|
||||
for _, price_list in prices.items():
|
||||
for price in price_list:
|
||||
time = price.timestamp_ms - (price.timestamp_ms % 3600000) # arrotonda all'ora (non dovrebbe essere necessario)
|
||||
timestamped_prices.setdefault(time, []).append(price)
|
||||
|
||||
# Ora aggregiamo i prezzi per ogni ora
|
||||
aggregated_prices = []
|
||||
for time, price_list in timestamped_prices.items():
|
||||
price = Price()
|
||||
price.timestamp_ms = time
|
||||
price.high = statistics.mean([p.high for p in price_list])
|
||||
price.low = statistics.mean([p.low for p in price_list])
|
||||
price.open = statistics.mean([p.open for p in price_list])
|
||||
price.close = statistics.mean([p.close for p in price_list])
|
||||
price.volume = statistics.mean([p.volume for p in price_list])
|
||||
aggregated_prices.append(price)
|
||||
|
||||
assert(len(aggregated_prices) <= max_list_length)
|
||||
return aggregated_prices
|
||||
|
||||
def aggregate_product_info(products: dict[str, list[ProductInfo]]) -> list[ProductInfo]:
|
||||
"""
|
||||
|
||||
@@ -94,7 +94,7 @@ class TestMarketDataAggregator:
|
||||
assert len(aggregated) == 2
|
||||
assert aggregated[0].timestamp_ms == 1685577600000
|
||||
assert aggregated[0].high == pytest.approx(50050.0, rel=1e-3)
|
||||
assert aggregated[0].low == pytest.approx(49500.0, rel=1e-3)
|
||||
assert aggregated[0].low == pytest.approx(49550.0, rel=1e-3)
|
||||
assert aggregated[1].timestamp_ms == 1685581200000
|
||||
assert aggregated[1].high == pytest.approx(50250.0, rel=1e-3)
|
||||
assert aggregated[1].low == pytest.approx(49800.0, rel=1e-3)
|
||||
assert aggregated[1].low == pytest.approx(49850.0, rel=1e-3)
|
||||
|
||||
Reference in New Issue
Block a user