Rinominato 'quote_currency' in 'currency' e aggiornato il trattamento del timestamp in Price

This commit is contained in:
2025-10-05 19:06:39 +02:00
parent ac356c3753
commit f5816bb74f
12 changed files with 73 additions and 43 deletions

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from pydantic import BaseModel
@@ -10,7 +11,7 @@ class ProductInfo(BaseModel):
symbol: str = ""
price: float = 0.0
volume_24h: float = 0.0
quote_currency: str = ""
currency: str = ""
class Price(BaseModel):
"""
@@ -22,7 +23,24 @@ class Price(BaseModel):
open: float = 0.0
close: float = 0.0
volume: float = 0.0
timestamp_ms: int = 0 # Timestamp in milliseconds
timestamp: str = ""
"""Timestamp con formato YYYY-MM-DD HH:MM"""
def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None:
"""
Imposta il timestamp in millisecondi.
Args:
timestamp (int | datetime): Il timestamp in millisecondi o come oggetto datetime.
"""
if timestamp_ms is not None:
timestamp = timestamp_ms // 1000
elif timestamp_s is not None:
timestamp = timestamp_s
else:
raise ValueError("Either timestamp_ms or timestamp_s must be provided")
assert timestamp > 0, "Invalid timestamp data received"
self.timestamp = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M')
class MarketWrapper:
"""

View File

@@ -10,17 +10,19 @@ def extract_product(currency: str, ticker_data: dict[str, Any]) -> ProductInfo:
product.symbol = ticker_data.get('symbol', '').replace(currency, '')
product.price = float(ticker_data.get('price', 0))
product.volume_24h = float(ticker_data.get('volume', 0))
product.quote_currency = currency
product.currency = currency
return product
def extract_price(kline_data: list[Any]) -> Price:
timestamp = kline_data[0]
price = Price()
price.open = float(kline_data[1])
price.high = float(kline_data[2])
price.low = float(kline_data[3])
price.close = float(kline_data[4])
price.volume = float(kline_data[5])
price.timestamp_ms = kline_data[0]
price.set_timestamp(timestamp_ms=timestamp)
return price
class BinanceWrapper(MarketWrapper):

View File

@@ -15,13 +15,15 @@ def extract_product(product_data: GetProductResponse | Product) -> ProductInfo:
return product
def extract_price(candle_data: Candle) -> Price:
timestamp = int(candle_data.start) if candle_data.start else 0
price = Price()
price.high = float(candle_data.high) if candle_data.high else 0.0
price.low = float(candle_data.low) if candle_data.low else 0.0
price.open = float(candle_data.open) if candle_data.open else 0.0
price.close = float(candle_data.close) if candle_data.close else 0.0
price.volume = float(candle_data.volume) if candle_data.volume else 0.0
price.timestamp_ms = int(candle_data.start) * 1000 if candle_data.start else 0
price.set_timestamp(timestamp_s=timestamp)
return price

View File

@@ -14,14 +14,15 @@ def extract_product(asset_data: dict[str, Any]) -> ProductInfo:
return product
def extract_price(price_data: dict[str, Any]) -> Price:
timestamp = price_data.get('time', 0)
price = Price()
price.high = float(price_data.get('high', 0))
price.low = float(price_data.get('low', 0))
price.open = float(price_data.get('open', 0))
price.close = float(price_data.get('close', 0))
price.volume = float(price_data.get('volumeto', 0))
price.timestamp_ms = price_data.get('time', 0) * 1000
assert price.timestamp_ms > 0, "Invalid timestamp data received from CryptoCompare"
price.set_timestamp(timestamp_s=timestamp)
return price

View File

@@ -12,20 +12,22 @@ def extract_product(stock_data: dict[str, str]) -> ProductInfo:
product.symbol = product.id.split('-')[0] # Rimuovi il suffisso della valuta per le crypto
product.price = float(stock_data.get('Current Stock Price', f"0.0 USD").split(" ")[0]) # prende solo il numero
product.volume_24h = 0.0 # YFinance non fornisce il volume 24h direttamente
product.quote_currency = product.id.split('-')[1] # La valuta è la parte dopo il '-'
product.currency = product.id.split('-')[1] # La valuta è la parte dopo il '-'
return product
def extract_price(hist_data: dict[str, str]) -> Price:
"""
Converte i dati storici di YFinanceTools in Price.
"""
timestamp = int(hist_data.get('Timestamp', '0'))
price = Price()
price.high = float(hist_data.get('High', 0.0))
price.low = float(hist_data.get('Low', 0.0))
price.open = float(hist_data.get('Open', 0.0))
price.close = float(hist_data.get('Close', 0.0))
price.volume = float(hist_data.get('Volume', 0.0))
price.timestamp_ms = int(hist_data.get('Timestamp', '0'))
price.set_timestamp(timestamp_ms=timestamp)
return price

View File

@@ -4,25 +4,24 @@ from app.base.markets import ProductInfo, Price
def aggregate_history_prices(prices: dict[str, list[Price]]) -> list[Price]:
"""
Aggrega i prezzi storici per symbol calcolando la media oraria.
Aggrega i prezzi storici per symbol calcolando la media.
Args:
prices (dict[str, list[Price]]): Mappa provider -> lista di Price
Returns:
list[Price]: Lista di Price aggregati per ora
list[Price]: Lista di Price aggregati per timestamp
"""
# Costruiamo una mappa timestamp_h -> lista di Price
timestamped_prices: dict[int, list[Price]] = {}
# Costruiamo una mappa timestamp -> lista di Price
timestamped_prices: dict[str, list[Price]] = {}
for _, price_list in prices.items():
for price in price_list:
time = price.timestamp_ms - (price.timestamp_ms % 3600000) # arrotonda all'ora (non dovrebbe essere necessario)
timestamped_prices.setdefault(time, []).append(price)
timestamped_prices.setdefault(price.timestamp, []).append(price)
# Ora aggregiamo i prezzi per ogni ora
# Ora aggregiamo i prezzi per ogni timestamp
aggregated_prices: list[Price] = []
for time, price_list in timestamped_prices.items():
price = Price()
price.timestamp_ms = time
price.timestamp = time
price.high = statistics.mean([p.high for p in price_list])
price.low = statistics.mean([p.low for p in price_list])
price.open = statistics.mean([p.open for p in price_list])
@@ -53,7 +52,7 @@ def aggregate_product_info(products: dict[str, list[ProductInfo]]) -> list[Produ
product.id = f"{symbol}_AGGREGATED"
product.symbol = symbol
product.quote_currency = next(p.quote_currency for p in product_list if p.quote_currency)
product.currency = next(p.currency for p in product_list if p.currency)
volume_sum = sum(p.volume_24h for p in product_list)
product.volume_24h = volume_sum / len(product_list) if product_list else 0.0