Merge branch '2-news-api' into 3-market-api

This commit is contained in:
2025-09-30 17:30:01 +02:00
9 changed files with 226 additions and 755 deletions

View File

@@ -1,27 +1,36 @@
import os
from typing import Optional
from enum import Enum
from datetime import datetime, timedelta
from coinbase.rest import RESTClient
from .base import ProductInfo, BaseWrapper, Price
from .error_handler import retry_on_failure, handle_api_errors, MarketAPIError, RateLimitError
class Granularity(Enum):
UNKNOWN_GRANULARITY = 0
ONE_MINUTE = 60
FIVE_MINUTE = 300
FIFTEEN_MINUTE = 900
THIRTY_MINUTE = 1800
ONE_HOUR = 3600
TWO_HOUR = 7200
FOUR_HOUR = 14400
SIX_HOUR = 21600
ONE_DAY = 86400
class CoinBaseWrapper(BaseWrapper):
"""
Wrapper per le API di Coinbase Advanced Trade.
Wrapper per le API di Coinbase Advanced Trade.\n
Implementa l'interfaccia BaseWrapper per fornire accesso unificato
ai dati di mercato di Coinbase tramite le API REST.
La documentazione delle API è disponibile qui:
ai dati di mercato di Coinbase tramite le API REST.\n
https://docs.cdp.coinbase.com/api-reference/advanced-trade-api/rest-api/introduction
"""
def __init__(self, api_key: Optional[str] = None, api_private_key: Optional[str] = None, currency: str = "USD"):
if api_key is None:
api_key = os.getenv("COINBASE_API_KEY")
def __init__(self, currency: str = "USD"):
api_key = os.getenv("COINBASE_API_KEY")
assert api_key is not None, "API key is required"
if api_private_key is None:
api_private_key = os.getenv("COINBASE_API_SECRET")
api_private_key = os.getenv("COINBASE_API_SECRET")
assert api_private_key is not None, "API private key is required"
self.currency = currency
@@ -33,49 +42,30 @@ class CoinBaseWrapper(BaseWrapper):
def __format(self, asset_id: str) -> str:
return asset_id if '-' in asset_id else f"{asset_id}-{self.currency}"
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_product(self, asset_id: str) -> ProductInfo:
asset_id = self.__format(asset_id)
asset = self.client.get_product(asset_id)
return ProductInfo.from_coinbase(asset)
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
all_asset_ids = [self.__format(asset_id) for asset_id in asset_ids]
assets = self.client.get_products(product_ids=all_asset_ids)
if assets.products:
return [ProductInfo.from_coinbase_product(asset) for asset in assets.products]
return []
return [ProductInfo.from_coinbase(asset) for asset in assets.products]
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_all_products(self) -> list[ProductInfo]:
assets = self.client.get_products()
if assets.products:
return [ProductInfo.from_coinbase_product(asset) for asset in assets.products]
return []
return [ProductInfo.from_coinbase_product(asset) for asset in assets.products]
@retry_on_failure(max_retries=3, delay=1.0)
@handle_api_errors
def get_historical_prices(self, asset_id: str = "BTC") -> list[Price]:
def get_historical_prices(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
asset_id = self.__format(asset_id)
# Get last 14 days of hourly data (14*24 = 336 candles, within 350 limit)
end_time = datetime.now()
start_time = end_time - timedelta(days=14)
# Convert to UNIX timestamps as strings (required by Coinbase API)
start_timestamp = str(int(start_time.timestamp()))
end_timestamp = str(int(end_time.timestamp()))
data = self.client.get_candles(
product_id=asset_id,
start=start_timestamp,
end=end_timestamp,
granularity="ONE_HOUR",
limit=350 # Explicitly set the limit
granularity=Granularity.ONE_HOUR.name,
start=str(int(start_time.timestamp())),
end=str(int(end_time.timestamp())),
limit=limit
)
if data.candles:
return [Price.from_coinbase(candle) for candle in data.candles]
return []
return [Price.from_coinbase(candle) for candle in data.candles]