3 market api #8

Merged
Simo93-rgb merged 25 commits from 3-market-api into main 2025-10-01 15:51:25 +02:00
5 changed files with 157 additions and 222 deletions
Showing only changes of commit f7dec6fdb6 - Show all commits

View File

@@ -1,4 +1,4 @@
from coinbase.rest.types.product_types import Candle, GetProductResponse, Product
from pydantic import BaseModel from pydantic import BaseModel
class BaseWrapper: class BaseWrapper:
@@ -27,58 +27,6 @@ class ProductInfo(BaseModel):
status: str = "" status: str = ""
quote_currency: str = "" quote_currency: str = ""
@staticmethod
def from_coinbase(product_data: GetProductResponse) -> 'ProductInfo':
product = ProductInfo()
product.id = product_data.product_id or ""
product.symbol = product_data.base_currency_id or ""
product.price = float(product_data.price) if product_data.price else 0.0
product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0.0
# TODO Check what status means in Coinbase
product.status = product_data.status or ""
return product
@staticmethod
def from_coinbase_product(product_data: Product) -> 'ProductInfo':
product = ProductInfo()
product.id = product_data.product_id or ""
product.symbol = product_data.base_currency_id or ""
product.price = float(product_data.price) if product_data.price else 0.0
product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0.0
product.status = product_data.status or ""
return product
@staticmethod
def from_cryptocompare(asset_data: dict) -> 'ProductInfo':
product = ProductInfo()
product.id = asset_data['FROMSYMBOL'] + '-' + asset_data['TOSYMBOL']
product.symbol = asset_data['FROMSYMBOL']
product.price = float(asset_data['PRICE'])
product.volume_24h = float(asset_data['VOLUME24HOUR'])
product.status = "" # Cryptocompare does not provide status
return product
@staticmethod
def from_binance(ticker_data: dict, ticker_24h_data: dict) -> 'ProductInfo':
"""
Crea un oggetto ProductInfo da dati Binance.
Args:
ticker_data: Dati del ticker di prezzo
ticker_24h_data: Dati del ticker 24h
Returns:
Oggetto ProductInfo
"""
product = ProductInfo()
product.id = ticker_data['symbol']
product.symbol = ticker_data['symbol'].replace('USDT', '').replace('BUSD', '')
product.price = float(ticker_data['price'])
product.volume_24h = float(ticker_24h_data['volume'])
product.status = "TRADING" # Binance non fornisce status esplicito
product.quote_currency = "USDT" # Assumiamo USDT come default
return product
class Price(BaseModel): class Price(BaseModel):
""" """
Rappresenta i dati di prezzo per un asset, come ottenuti dalle API di mercato. Rappresenta i dati di prezzo per un asset, come ottenuti dalle API di mercato.
@@ -90,25 +38,3 @@ class Price(BaseModel):
close: float = 0.0 close: float = 0.0
volume: float = 0.0 volume: float = 0.0
time: str = "" time: str = ""
@staticmethod
def from_coinbase(candle_data: Candle) -> 'Price':
price = Price()
price.high = float(candle_data.high) if candle_data.high else 0.0
price.low = float(candle_data.low) if candle_data.low else 0.0
price.open = float(candle_data.open) if candle_data.open else 0.0
price.close = float(candle_data.close) if candle_data.close else 0.0
price.volume = float(candle_data.volume) if candle_data.volume else 0.0
price.time = str(candle_data.start) if candle_data.start else ""
return price
@staticmethod
def from_cryptocompare(price_data: dict) -> 'Price':
price = Price()
price.high = float(price_data['high'])
price.low = float(price_data['low'])
price.open = float(price_data['open'])
price.close = float(price_data['close'])
price.volume = float(price_data['volumeto'])
price.time = str(price_data['time'])
return price

View File

@@ -1,37 +1,31 @@
import os import os
from typing import Optional from datetime import datetime
from datetime import datetime, timedelta
from binance.client import Client from binance.client import Client
from .base import ProductInfo, BaseWrapper, Price from .base import ProductInfo, BaseWrapper, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError
def get_product(currency: str, ticker_data: dict[str, str]) -> 'ProductInfo':
product = ProductInfo()
product.id = ticker_data.get('symbol')
product.symbol = ticker_data.get('symbol', '').replace(currency, '')
product.price = float(ticker_data.get('price', 0))
product.volume_24h = float(ticker_data.get('volume', 0))
product.status = "TRADING" # Binance non fornisce status esplicito
product.quote_currency = currency
return product
class BinanceWrapper(BaseWrapper): class BinanceWrapper(BaseWrapper):
""" """
Wrapper per le API autenticate di Binance. Wrapper per le API autenticate di Binance.\n
Implementa l'interfaccia BaseWrapper per fornire accesso unificato Implementa l'interfaccia BaseWrapper per fornire accesso unificato
ai dati di mercato di Binance tramite le API REST con autenticazione. ai dati di mercato di Binance tramite le API REST con autenticazione.\n
La documentazione delle API è disponibile qui:
https://binance-docs.github.io/apidocs/spot/en/ https://binance-docs.github.io/apidocs/spot/en/
""" """
def __init__(self, api_key: Optional[str] = None, api_secret: Optional[str] = None, currency: str = "USDT"): def __init__(self, currency: str = "USDT"):
""" api_key = os.getenv("BINANCE_API_KEY")
Inizializza il wrapper con le credenziali API.
Args:
api_key: Chiave API di Binance (se None, usa variabile d'ambiente)
api_secret: Secret API di Binance (se None, usa variabile d'ambiente)
currency: Valuta di quotazione di default (default: USDT)
"""
if api_key is None:
api_key = os.getenv("BINANCE_API_KEY")
assert api_key is not None, "API key is required" assert api_key is not None, "API key is required"
if api_secret is None: api_secret = os.getenv("BINANCE_API_SECRET")
api_secret = os.getenv("BINANCE_API_SECRET")
assert api_secret is not None, "API secret is required" assert api_secret is not None, "API secret is required"
self.currency = currency self.currency = currency
@@ -40,130 +34,58 @@ class BinanceWrapper(BaseWrapper):
def __format_symbol(self, asset_id: str) -> str: def __format_symbol(self, asset_id: str) -> str:
""" """
Formatta l'asset_id nel formato richiesto da Binance. Formatta l'asset_id nel formato richiesto da Binance.
Args:
asset_id: ID dell'asset (es. "BTC" o "BTC-USDT")
Returns:
Simbolo formattato per Binance (es. "BTCUSDT")
""" """
if '-' in asset_id: return asset_id.replace('-', '') if '-' in asset_id else f"{asset_id}{self.currency}"
# Se già nel formato "BTC-USDT", converte in "BTCUSDT"
return asset_id.replace('-', '')
else:
# Se solo "BTC", aggiunge la valuta di default
return f"{asset_id}{self.currency}"
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_product(self, asset_id: str) -> ProductInfo: def get_product(self, asset_id: str) -> ProductInfo:
"""
Ottiene informazioni su un singolo prodotto.
Args:
asset_id: ID dell'asset da recuperare
Returns:
Oggetto ProductInfo con le informazioni del prodotto
"""
symbol = self.__format_symbol(asset_id) symbol = self.__format_symbol(asset_id)
ticker = self.client.get_symbol_ticker(symbol=symbol) ticker = self.client.get_symbol_ticker(symbol=symbol)
ticker_24h = self.client.get_ticker(symbol=symbol) ticker_24h = self.client.get_ticker(symbol=symbol)
ticker['volume'] = ticker_24h.get('volume', 0) # Aggiunge volume 24h ai dati del ticker
return ProductInfo.from_binance(ticker, ticker_24h)
return get_product(self.currency, ticker)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]: def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
""" symbols = [self.__format_symbol(asset_id) for asset_id in asset_ids]
Ottiene informazioni su più prodotti. symbols_str = f"[\"{'","'.join(symbols)}\"]"
Args: tickers = self.client.get_symbol_ticker(symbols=symbols_str)
asset_ids: Lista di ID degli asset da recuperare tickers_24h = self.client.get_ticker(symbols=symbols_str) # un po brutale, ma va bene così
for t, t24 in zip(tickers, tickers_24h):
Returns: t['volume'] = t24.get('volume', 0)
Lista di oggetti ProductInfo
""" return [get_product(self.currency, ticker) for ticker in tickers]
def get_all_products(self) -> list[ProductInfo]:
all_tickers = self.client.get_ticker()
products = [] products = []
for asset_id in asset_ids:
try: for ticker in all_tickers:
product = self.get_product(asset_id) # Filtra solo i simboli che terminano con la valuta di default
if ticker['symbol'].endswith(self.currency):
product = get_product(self.currency, ticker)
products.append(product) products.append(product)
except Exception as e:
print(f"Errore nel recupero di {asset_id}: {e}")
continue
return products return products
@retry_on_failure(max_retries=3, delay=1.0) def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
@handle_api_errors
def get_all_products(self) -> list[ProductInfo]:
"""
Ottiene informazioni su tutti i prodotti disponibili.
Returns:
Lista di oggetti ProductInfo per tutti i prodotti
"""
try:
# Ottiene tutti i ticker 24h che contengono le informazioni necessarie
all_tickers = self.client.get_ticker()
products = []
for ticker in all_tickers:
# Filtra solo i simboli che terminano con la valuta di default
if ticker['symbol'].endswith(self.currency):
try:
# Crea ProductInfo direttamente dal ticker 24h
product = ProductInfo()
product.id = ticker['symbol']
product.symbol = ticker['symbol'].replace(self.currency, '')
product.price = float(ticker['lastPrice'])
product.volume_24h = float(ticker['volume'])
product.status = "TRADING" # Binance non fornisce status esplicito
product.quote_currency = self.currency
products.append(product)
except (ValueError, KeyError) as e:
print(f"Errore nel parsing di {ticker['symbol']}: {e}")
continue
return products
except Exception as e:
print(f"Errore nel recupero di tutti i prodotti: {e}")
return []
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_historical_prices(self, asset_id: str = "BTC") -> list[Price]:
"""
Ottiene i prezzi storici per un asset.
Args:
asset_id: ID dell'asset (default: "BTC")
Returns:
Lista di oggetti Price con i dati storici
"""
symbol = self.__format_symbol(asset_id) symbol = self.__format_symbol(asset_id)
try: # Ottiene candele orarie degli ultimi 30 giorni
# Ottiene candele orarie degli ultimi 30 giorni klines = self.client.get_historical_klines(
klines = self.client.get_historical_klines( symbol=symbol,
symbol=symbol, interval=Client.KLINE_INTERVAL_1HOUR,
interval=Client.KLINE_INTERVAL_1HOUR, limit=limit,
start_str="30 days ago UTC" )
)
prices = []
prices = [] for kline in klines:
for kline in klines: price = Price()
price = Price() price.open = float(kline[1])
price.open = float(kline[1]) price.high = float(kline[2])
price.high = float(kline[2]) price.low = float(kline[3])
price.low = float(kline[3]) price.close = float(kline[4])
price.close = float(kline[4]) price.volume = float(kline[5])
price.volume = float(kline[5]) price.time = str(datetime.fromtimestamp(kline[0] / 1000))
price.time = str(datetime.fromtimestamp(kline[0] / 1000)) prices.append(price)
prices.append(price) return prices
return prices
except Exception as e:
print(f"Errore nel recupero dei prezzi storici per {symbol}: {e}")
return []

View File

@@ -2,8 +2,29 @@ import os
from enum import Enum from enum import Enum
from datetime import datetime, timedelta from datetime import datetime, timedelta
from coinbase.rest import RESTClient from coinbase.rest import RESTClient
from coinbase.rest.types.product_types import Candle, GetProductResponse, Product
from .base import ProductInfo, BaseWrapper, Price from .base import ProductInfo, BaseWrapper, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError, RateLimitError
def get_product(product_data: GetProductResponse | Product) -> 'ProductInfo':
product = ProductInfo()
product.id = product_data.product_id or ""
product.symbol = product_data.base_currency_id or ""
product.price = float(product_data.price) if product_data.price else 0.0
product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0.0
# TODO Check what status means in Coinbase
product.status = product_data.status or ""
return product
def get_price(candle_data: Candle) -> 'Price':
price = Price()
price.high = float(candle_data.high) if candle_data.high else 0.0
price.low = float(candle_data.low) if candle_data.low else 0.0
price.open = float(candle_data.open) if candle_data.open else 0.0
price.close = float(candle_data.close) if candle_data.close else 0.0
price.volume = float(candle_data.volume) if candle_data.volume else 0.0
price.time = str(candle_data.start) if candle_data.start else ""
return price
class Granularity(Enum): class Granularity(Enum):
@@ -45,16 +66,16 @@ class CoinBaseWrapper(BaseWrapper):
def get_product(self, asset_id: str) -> ProductInfo: def get_product(self, asset_id: str) -> ProductInfo:
asset_id = self.__format(asset_id) asset_id = self.__format(asset_id)
asset = self.client.get_product(asset_id) asset = self.client.get_product(asset_id)
return ProductInfo.from_coinbase(asset) return get_product(asset)
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]: def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
all_asset_ids = [self.__format(asset_id) for asset_id in asset_ids] all_asset_ids = [self.__format(asset_id) for asset_id in asset_ids]
assets = self.client.get_products(product_ids=all_asset_ids) assets = self.client.get_products(product_ids=all_asset_ids)
return [ProductInfo.from_coinbase(asset) for asset in assets.products] return [get_product(asset) for asset in assets.products]
def get_all_products(self) -> list[ProductInfo]: def get_all_products(self) -> list[ProductInfo]:
assets = self.client.get_products() assets = self.client.get_products()
return [ProductInfo.from_coinbase_product(asset) for asset in assets.products] return [get_product(asset) for asset in assets.products]
def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]: def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
asset_id = self.__format(asset_id) asset_id = self.__format(asset_id)
@@ -68,4 +89,4 @@ class CoinBaseWrapper(BaseWrapper):
end=str(int(end_time.timestamp())), end=str(int(end_time.timestamp())),
limit=limit limit=limit
) )
return [Price.from_coinbase(candle) for candle in data.candles] return [get_price(candle) for candle in data.candles]

View File

@@ -2,7 +2,27 @@ import os
import requests import requests
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from .base import ProductInfo, BaseWrapper, Price from .base import ProductInfo, BaseWrapper, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError
def get_product(asset_data: dict) -> 'ProductInfo':
product = ProductInfo()
product.id = asset_data['FROMSYMBOL'] + '-' + asset_data['TOSYMBOL']
product.symbol = asset_data['FROMSYMBOL']
product.price = float(asset_data['PRICE'])
product.volume_24h = float(asset_data['VOLUME24HOUR'])
product.status = "" # Cryptocompare does not provide status
return product
def get_price(price_data: dict) -> 'Price':
price = Price()
price.high = float(price_data['high'])
price.low = float(price_data['low'])
price.open = float(price_data['open'])
price.close = float(price_data['close'])
price.volume = float(price_data['volumeto'])
price.time = str(price_data['time'])
return price
BASE_URL = "https://min-api.cryptocompare.com" BASE_URL = "https://min-api.cryptocompare.com"
@@ -33,7 +53,7 @@ class CryptoCompareWrapper(BaseWrapper):
"tsyms": self.currency "tsyms": self.currency
}) })
data = response.get('RAW', {}).get(asset_id, {}).get(self.currency, {}) data = response.get('RAW', {}).get(asset_id, {}).get(self.currency, {})
return ProductInfo.from_cryptocompare(data) return get_product(data)
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]: def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
response = self.__request("/data/pricemultifull", params = { response = self.__request("/data/pricemultifull", params = {
@@ -44,10 +64,11 @@ class CryptoCompareWrapper(BaseWrapper):
data = response.get('RAW', {}) data = response.get('RAW', {})
for asset_id in asset_ids: for asset_id in asset_ids:
asset_data = data.get(asset_id, {}).get(self.currency, {}) asset_data = data.get(asset_id, {}).get(self.currency, {})
assets.append(ProductInfo.from_cryptocompare(asset_data)) assets.append(get_product(asset_data))
return assets return assets
def get_all_products(self) -> list[ProductInfo]: def get_all_products(self) -> list[ProductInfo]:
# TODO serve davvero il workaroud qui? Possiamo prendere i dati da un altro endpoint intanto
raise NotImplementedError("get_all_products is not supported by CryptoCompare API") raise NotImplementedError("get_all_products is not supported by CryptoCompare API")
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[dict]: def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[dict]:
@@ -58,5 +79,5 @@ class CryptoCompareWrapper(BaseWrapper):
}) })
data = response.get('Data', {}).get('Data', []) data = response.get('Data', {}).get('Data', [])
prices = [Price.from_cryptocompare(price_data) for price_data in data] prices = [get_price(price_data) for price_data in data]
return prices return prices

View File

@@ -1,7 +1,52 @@
import pytest import pytest
from app.markets.binance import BinanceWrapper
@pytest.mark.market @pytest.mark.market
@pytest.mark.api @pytest.mark.api
class TestBinance: class TestBinance:
# TODO fare dei test veri e propri
pass def test_binance_init(self):
market = BinanceWrapper()
assert market is not None
assert hasattr(market, 'currency')
assert market.currency == "USDT"
def test_binance_get_product(self):
market = BinanceWrapper()
product = market.get_product("BTC")
assert product is not None
assert hasattr(product, 'symbol')
assert product.symbol == "BTC"
assert hasattr(product, 'price')
assert product.price > 0
def test_binance_get_products(self):
market = BinanceWrapper()
products = market.get_products(["BTC", "ETH"])
assert products is not None
assert isinstance(products, list)
assert len(products) == 2
symbols = [p.symbol for p in products]
assert "BTC" in symbols
assert "ETH" in symbols
for product in products:
assert hasattr(product, 'price')
assert product.price > 0
def test_binance_invalid_product(self):
market = BinanceWrapper()
with pytest.raises(Exception):
_ = market.get_product("INVALID")
def test_binance_history(self):
market = BinanceWrapper()
history = market.get_historical_prices("BTC", limit=5)
assert history is not None
assert isinstance(history, list)
assert len(history) == 5
for entry in history:
assert hasattr(entry, 'time')
assert hasattr(entry, 'close')
assert hasattr(entry, 'high')
assert entry.close > 0
assert entry.high > 0