Refactor project structure to organize APIs (#24)

* Refactor project structure "api"
* fix bug conversione delle valute fiat in stablecoin in BinanceWrapper
* Refactor: WrapperHandler for managing API wrappers with retry logic; update related modules and tests
* Refactor: Update ProductInfo and Price classes to include aggregation methods; remove standalone aggregation functions
* fix docs
This commit was merged in pull request #24.
This commit is contained in:
Giacomo Bertolazzi
2025-10-11 21:36:13 +02:00
committed by GitHub
parent 517842c834
commit 093a7f5a48
40 changed files with 284 additions and 238 deletions

0
src/app/api/__init__.py Normal file
View File

View File

152
src/app/api/base/markets.py Normal file
View File

@@ -0,0 +1,152 @@
import statistics
from datetime import datetime
from pydantic import BaseModel
class ProductInfo(BaseModel):
"""
Product information as obtained from market APIs.
Implements conversion methods from raw API data.
"""
id: str = ""
symbol: str = ""
price: float = 0.0
volume_24h: float = 0.0
currency: str = ""
@staticmethod
def aggregate(products: dict[str, list['ProductInfo']]) -> list['ProductInfo']:
"""
Aggregates a list of ProductInfo by symbol.
Args:
products (dict[str, list[ProductInfo]]): Map provider -> list of ProductInfo
Returns:
list[ProductInfo]: List of ProductInfo aggregated by symbol
"""
# Costruzione mappa symbol -> lista di ProductInfo
symbols_infos: dict[str, list[ProductInfo]] = {}
for _, product_list in products.items():
for product in product_list:
symbols_infos.setdefault(product.symbol, []).append(product)
# Aggregazione per ogni symbol
aggregated_products: list[ProductInfo] = []
for symbol, product_list in symbols_infos.items():
product = ProductInfo()
product.id = f"{symbol}_AGGREGATED"
product.symbol = symbol
product.currency = next(p.currency for p in product_list if p.currency)
volume_sum = sum(p.volume_24h for p in product_list)
product.volume_24h = volume_sum / len(product_list) if product_list else 0.0
prices = sum(p.price * p.volume_24h for p in product_list)
product.price = (prices / volume_sum) if volume_sum > 0 else 0.0
aggregated_products.append(product)
return aggregated_products
class Price(BaseModel):
"""
Represents price data for an asset as obtained from market APIs.
Implements conversion methods from raw API data.
"""
high: float = 0.0
low: float = 0.0
open: float = 0.0
close: float = 0.0
volume: float = 0.0
timestamp: str = ""
"""Timestamp in format YYYY-MM-DD HH:MM"""
def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None:
"""
Sets the timestamp from milliseconds or seconds.
The timestamp is saved as a formatted string 'YYYY-MM-DD HH:MM'.
Args:
timestamp_ms: Timestamp in milliseconds.
timestamp_s: Timestamp in seconds.
Raises:
ValueError: If neither timestamp_ms nor timestamp_s is provided.
"""
if timestamp_ms is not None:
timestamp = timestamp_ms // 1000
elif timestamp_s is not None:
timestamp = timestamp_s
else:
raise ValueError("Either timestamp_ms or timestamp_s must be provided")
assert timestamp > 0, "Invalid timestamp data received"
self.timestamp = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M')
@staticmethod
def aggregate(prices: dict[str, list['Price']]) -> list['Price']:
"""
Aggregates historical prices for the same symbol by calculating the mean.
Args:
prices (dict[str, list[Price]]): Map provider -> list of Price.
The map must contain only Price objects for the same symbol.
Returns:
list[Price]: List of Price objects aggregated by timestamp.
"""
# Costruiamo una mappa timestamp -> lista di Price
timestamped_prices: dict[str, list[Price]] = {}
for _, price_list in prices.items():
for price in price_list:
timestamped_prices.setdefault(price.timestamp, []).append(price)
# Ora aggregiamo i prezzi per ogni timestamp
aggregated_prices: list[Price] = []
for time, price_list in timestamped_prices.items():
price = Price()
price.timestamp = time
price.high = statistics.mean([p.high for p in price_list])
price.low = statistics.mean([p.low for p in price_list])
price.open = statistics.mean([p.open for p in price_list])
price.close = statistics.mean([p.close for p in price_list])
price.volume = statistics.mean([p.volume for p in price_list])
aggregated_prices.append(price)
return aggregated_prices
class MarketWrapper:
"""
Base class for market API wrappers.
All market API wrappers should inherit from this class and implement the methods.
Provides interface for retrieving product and price information from market APIs.
"""
def get_product(self, asset_id: str) -> ProductInfo:
"""
Get product information for a specific asset ID.
Args:
asset_id (str): The asset ID to retrieve information for.
Returns:
ProductInfo: An object containing product information.
"""
raise NotImplementedError("This method should be overridden by subclasses")
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Get product information for multiple asset IDs.
Args:
asset_ids (list[str]): The list of asset IDs to retrieve information for.
Returns:
list[ProductInfo]: A list of objects containing product information.
"""
raise NotImplementedError("This method should be overridden by subclasses")
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
"""
Get historical price data for a specific asset ID.
Args:
asset_id (str): The asset ID to retrieve price data for.
limit (int): The maximum number of price data points to return.
Returns:
list[Price]: A list of Price objects.
"""
raise NotImplementedError("This method should be overridden by subclasses")

40
src/app/api/base/news.py Normal file
View File

@@ -0,0 +1,40 @@
from pydantic import BaseModel
class Article(BaseModel):
"""
Represents a news article with source, time, title, and description.
"""
source: str = ""
time: str = ""
title: str = ""
description: str = ""
class NewsWrapper:
"""
Base class for news API wrappers.
All news API wrappers should inherit from this class and implement the methods.
Provides interface for retrieving news articles from news APIs.
"""
def get_top_headlines(self, limit: int = 100) -> list[Article]:
"""
Retrieve top headlines, optionally limited by the specified number.
Args:
limit (int): The maximum number of articles to return.
Returns:
list[Article]: A list of Article objects.
"""
raise NotImplementedError("This method should be overridden by subclasses")
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
"""
Retrieve the latest news based on a search query.
Args:
query (str): The search query.
limit (int): The maximum number of articles to return.
Returns:
list[Article]: A list of Article objects.
"""
raise NotImplementedError("This method should be overridden by subclasses")

View File

@@ -0,0 +1,37 @@
from pydantic import BaseModel
class SocialPost(BaseModel):
"""
Represents a social media post with time, title, description, and comments.
"""
time: str = ""
title: str = ""
description: str = ""
comments: list["SocialComment"] = []
class SocialComment(BaseModel):
"""
Represents a comment on a social media post.
"""
time: str = ""
description: str = ""
class SocialWrapper:
"""
Base class for social media API wrappers.
All social media API wrappers should inherit from this class and implement the methods.
Provides interface for retrieving social media posts and comments from APIs.
"""
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
"""
Retrieve top cryptocurrency-related posts, optionally limited by the specified number.
Args:
limit (int): The maximum number of posts to return.
Returns:
list[SocialPost]: A list of SocialPost objects.
"""
raise NotImplementedError("This method should be overridden by subclasses")

View File

@@ -0,0 +1,86 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.base.markets import MarketWrapper, Price, ProductInfo
from app.api.markets.binance import BinanceWrapper
from app.api.markets.coinbase import CoinBaseWrapper
from app.api.markets.cryptocompare import CryptoCompareWrapper
from app.api.markets.yfinance import YFinanceWrapper
__all__ = [ "MarketAPIsTool", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "ProductInfo", "Price" ]
class MarketAPIsTool(MarketWrapper, Toolkit):
"""
Class that aggregates multiple market API wrappers and manages them using WrapperHandler.
This class supports retrieving product information and historical prices.
This class can also aggregate data from multiple sources to provide a more comprehensive view of the market.
The following wrappers are included in this order:
- BinanceWrapper
- YFinanceWrapper
- CoinBaseWrapper
- CryptoCompareWrapper
"""
def __init__(self, currency: str = "USD"):
"""
Initialize the MarketAPIsTool with multiple market API wrappers.
The following wrappers are included in this order:
- BinanceWrapper
- YFinanceWrapper
- CoinBaseWrapper
- CryptoCompareWrapper
Args:
currency (str): Valuta in cui restituire i prezzi. Default è "USD".
"""
kwargs = {"currency": currency or "USD"}
wrappers: list[type[MarketWrapper]] = [BinanceWrapper, YFinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs)
Toolkit.__init__( # type: ignore
self,
name="Market APIs Toolkit",
tools=[
self.get_product,
self.get_products,
self.get_historical_prices,
self.get_products_aggregated,
self.get_historical_prices_aggregated,
],
)
def get_product(self, asset_id: str) -> ProductInfo:
return self.handler.try_call(lambda w: w.get_product(asset_id))
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
return self.handler.try_call(lambda w: w.get_products(asset_ids))
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Restituisce i dati aggregati per una lista di asset_id.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_ids (list[str]): Lista di asset_id da cercare.
Returns:
list[ProductInfo]: Lista di ProductInfo aggregati.
Raises:
Exception: If all wrappers fail to provide results.
"""
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
return ProductInfo.aggregate(all_products)
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
"""
Restituisce i dati storici aggregati per un asset_id. Usa i dati di tutte le fonti disponibili e li aggrega.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_id (str): Asset ID da cercare.
limit (int): Numero massimo di dati storici da restituire.
Returns:
list[Price]: Lista di Price aggregati.
Raises:
Exception: If all wrappers fail to provide results.
"""
all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit))
return Price.aggregate(all_prices)

View File

@@ -0,0 +1,83 @@
import os
from typing import Any
from binance.client import Client # type: ignore
from app.api.base.markets import ProductInfo, MarketWrapper, Price
def extract_product(currency: str, ticker_data: dict[str, Any]) -> ProductInfo:
product = ProductInfo()
product.id = ticker_data.get('symbol', '')
product.symbol = ticker_data.get('symbol', '').replace(currency, '')
product.price = float(ticker_data.get('price', 0))
product.volume_24h = float(ticker_data.get('volume', 0))
product.currency = currency
return product
def extract_price(kline_data: list[Any]) -> Price:
timestamp = kline_data[0]
price = Price()
price.open = float(kline_data[1])
price.high = float(kline_data[2])
price.low = float(kline_data[3])
price.close = float(kline_data[4])
price.volume = float(kline_data[5])
price.set_timestamp(timestamp_ms=timestamp)
return price
# Add here eventual other fiat not supported by Binance
FIAT_TO_STABLECOIN = {
"USD": "USDT",
}
class BinanceWrapper(MarketWrapper):
"""
Wrapper per le API autenticate di Binance.\n
Implementa l'interfaccia BaseWrapper per fornire accesso unificato
ai dati di mercato di Binance tramite le API REST con autenticazione.\n
https://binance-docs.github.io/apidocs/spot/en/
"""
def __init__(self, currency: str = "USD"):
"""
Inizializza il wrapper di Binance con le credenziali API e la valuta di riferimento.
Alcune valute fiat non sono supportate direttamente da Binance (es. "USD").
Infatti, se viene fornita una valuta fiat come "USD", questa viene automaticamente convertita in una stablecoin Tether ("USDT") per compatibilità con Binance.
Args:
currency (str): Valuta in cui restituire i prezzi. Se "USD" viene fornito, verrà utilizzato "USDT". Default è "USD".
"""
api_key = os.getenv("BINANCE_API_KEY")
api_secret = os.getenv("BINANCE_API_SECRET")
self.currency = currency if currency not in FIAT_TO_STABLECOIN else FIAT_TO_STABLECOIN[currency]
self.client = Client(api_key=api_key, api_secret=api_secret)
def __format_symbol(self, asset_id: str) -> str:
"""
Formatta l'asset_id nel formato richiesto da Binance.
"""
return asset_id.replace('-', '') if '-' in asset_id else f"{asset_id}{self.currency}"
def get_product(self, asset_id: str) -> ProductInfo:
symbol = self.__format_symbol(asset_id)
ticker: dict[str, Any] = self.client.get_symbol_ticker(symbol=symbol) # type: ignore
ticker_24h: dict[str, Any] = self.client.get_ticker(symbol=symbol) # type: ignore
ticker['volume'] = ticker_24h.get('volume', 0)
return extract_product(self.currency, ticker)
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
return [ self.get_product(asset_id) for asset_id in asset_ids ]
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
symbol = self.__format_symbol(asset_id)
# Ottiene candele orarie degli ultimi 30 giorni
klines: list[list[Any]] = self.client.get_historical_klines( # type: ignore
symbol=symbol,
interval=Client.KLINE_INTERVAL_1HOUR,
limit=limit,
)
return [extract_price(kline) for kline in klines]

View File

@@ -0,0 +1,90 @@
import os
from enum import Enum
from datetime import datetime, timedelta
from coinbase.rest import RESTClient # type: ignore
from coinbase.rest.types.product_types import Candle, GetProductResponse, Product # type: ignore
from app.api.base.markets import ProductInfo, MarketWrapper, Price
def extract_product(product_data: GetProductResponse | Product) -> ProductInfo:
product = ProductInfo()
product.id = product_data.product_id or ""
product.symbol = product_data.base_currency_id or ""
product.price = float(product_data.price) if product_data.price else 0.0
product.volume_24h = float(product_data.volume_24h) if product_data.volume_24h else 0.0
return product
def extract_price(candle_data: Candle) -> Price:
timestamp = int(candle_data.start) if candle_data.start else 0
price = Price()
price.high = float(candle_data.high) if candle_data.high else 0.0
price.low = float(candle_data.low) if candle_data.low else 0.0
price.open = float(candle_data.open) if candle_data.open else 0.0
price.close = float(candle_data.close) if candle_data.close else 0.0
price.volume = float(candle_data.volume) if candle_data.volume else 0.0
price.set_timestamp(timestamp_s=timestamp)
return price
class Granularity(Enum):
UNKNOWN_GRANULARITY = 0
ONE_MINUTE = 60
FIVE_MINUTE = 300
FIFTEEN_MINUTE = 900
THIRTY_MINUTE = 1800
ONE_HOUR = 3600
TWO_HOUR = 7200
FOUR_HOUR = 14400
SIX_HOUR = 21600
ONE_DAY = 86400
class CoinBaseWrapper(MarketWrapper):
"""
Wrapper per le API di Coinbase Advanced Trade.\n
Implementa l'interfaccia BaseWrapper per fornire accesso unificato
ai dati di mercato di Coinbase tramite le API REST.\n
https://docs.cdp.coinbase.com/api-reference/advanced-trade-api/rest-api/introduction
"""
def __init__(self, currency: str = "USD"):
api_key = os.getenv("COINBASE_API_KEY")
assert api_key, "COINBASE_API_KEY environment variable not set"
api_private_key = os.getenv("COINBASE_API_SECRET")
assert api_private_key, "COINBASE_API_SECRET environment variable not set"
self.currency = currency
self.client: RESTClient = RESTClient(
api_key=api_key,
api_secret=api_private_key
)
def __format(self, asset_id: str) -> str:
return asset_id if '-' in asset_id else f"{asset_id}-{self.currency}"
def get_product(self, asset_id: str) -> ProductInfo:
asset_id = self.__format(asset_id)
asset = self.client.get_product(asset_id) # type: ignore
return extract_product(asset)
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
all_asset_ids = [self.__format(asset_id) for asset_id in asset_ids]
assets = self.client.get_products(product_ids=all_asset_ids) # type: ignore
assert assets.products is not None, "No products data received from Coinbase"
return [extract_product(asset) for asset in assets.products]
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
asset_id = self.__format(asset_id)
end_time = datetime.now()
start_time = end_time - timedelta(days=14)
data = self.client.get_candles( # type: ignore
product_id=asset_id,
granularity=Granularity.ONE_HOUR.name,
start=str(int(start_time.timestamp())),
end=str(int(end_time.timestamp())),
limit=limit
)
assert data.candles is not None, "No candles data received from Coinbase"
return [extract_price(candle) for candle in data.candles]

View File

@@ -0,0 +1,81 @@
import os
from typing import Any
import requests
from app.api.base.markets import ProductInfo, MarketWrapper, Price
def extract_product(asset_data: dict[str, Any]) -> ProductInfo:
product = ProductInfo()
product.id = asset_data.get('FROMSYMBOL', '') + '-' + asset_data.get('TOSYMBOL', '')
product.symbol = asset_data.get('FROMSYMBOL', '')
product.price = float(asset_data.get('PRICE', 0))
product.volume_24h = float(asset_data.get('VOLUME24HOUR', 0))
assert product.price > 0, "Invalid price data received from CryptoCompare"
return product
def extract_price(price_data: dict[str, Any]) -> Price:
timestamp = price_data.get('time', 0)
price = Price()
price.high = float(price_data.get('high', 0))
price.low = float(price_data.get('low', 0))
price.open = float(price_data.get('open', 0))
price.close = float(price_data.get('close', 0))
price.volume = float(price_data.get('volumeto', 0))
price.set_timestamp(timestamp_s=timestamp)
return price
BASE_URL = "https://min-api.cryptocompare.com"
class CryptoCompareWrapper(MarketWrapper):
"""
Wrapper per le API pubbliche di CryptoCompare.
La documentazione delle API è disponibile qui: https://developers.coindesk.com/documentation/legacy/Price/SingleSymbolPriceEndpoint
!!ATTENZIONE!! sembra essere una API legacy e potrebbe essere deprecata in futuro.
"""
def __init__(self, currency:str='USD'):
api_key = os.getenv("CRYPTOCOMPARE_API_KEY")
assert api_key, "CRYPTOCOMPARE_API_KEY environment variable not set"
self.api_key = api_key
self.currency = currency
def __request(self, endpoint: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
if params is None:
params = {}
params['api_key'] = self.api_key
response = requests.get(f"{BASE_URL}{endpoint}", params=params)
return response.json()
def get_product(self, asset_id: str) -> ProductInfo:
response = self.__request("/data/pricemultifull", params = {
"fsyms": asset_id,
"tsyms": self.currency
})
data = response.get('RAW', {}).get(asset_id, {}).get(self.currency, {})
return extract_product(data)
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
response = self.__request("/data/pricemultifull", params = {
"fsyms": ",".join(asset_ids),
"tsyms": self.currency
})
assets: list[ProductInfo] = []
data = response.get('RAW', {})
for asset_id in asset_ids:
asset_data = data.get(asset_id, {}).get(self.currency, {})
assets.append(extract_product(asset_data))
return assets
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
response = self.__request("/data/v2/histohour", params = {
"fsym": asset_id,
"tsym": self.currency,
"limit": limit-1 # because the API returns limit+1 items (limit + current)
})
data = response.get('Data', {}).get('Data', [])
prices = [extract_price(price_data) for price_data in data]
return prices

View File

@@ -0,0 +1,82 @@
import json
from agno.tools.yfinance import YFinanceTools
from app.api.base.markets import MarketWrapper, ProductInfo, Price
def extract_product(stock_data: dict[str, str]) -> ProductInfo:
"""
Converte i dati di YFinanceTools in ProductInfo.
"""
product = ProductInfo()
product.id = stock_data.get('Symbol', '')
product.symbol = product.id.split('-')[0] # Rimuovi il suffisso della valuta per le crypto
product.price = float(stock_data.get('Current Stock Price', f"0.0 USD").split(" ")[0]) # prende solo il numero
product.volume_24h = 0.0 # YFinance non fornisce il volume 24h direttamente
product.currency = product.id.split('-')[1] # La valuta è la parte dopo il '-'
return product
def extract_price(hist_data: dict[str, str]) -> Price:
"""
Converte i dati storici di YFinanceTools in Price.
"""
timestamp = int(hist_data.get('Timestamp', '0'))
price = Price()
price.high = float(hist_data.get('High', 0.0))
price.low = float(hist_data.get('Low', 0.0))
price.open = float(hist_data.get('Open', 0.0))
price.close = float(hist_data.get('Close', 0.0))
price.volume = float(hist_data.get('Volume', 0.0))
price.set_timestamp(timestamp_ms=timestamp)
return price
class YFinanceWrapper(MarketWrapper):
"""
Wrapper per YFinanceTools che fornisce dati di mercato per azioni, ETF e criptovalute.
Implementa l'interfaccia BaseWrapper per compatibilità con il sistema esistente.
Usa YFinanceTools dalla libreria agno per coerenza con altri wrapper.
"""
def __init__(self, currency: str = "USD"):
self.currency = currency
self.tool = YFinanceTools()
def _format_symbol(self, asset_id: str) -> str:
"""
Formatta il simbolo per yfinance.
Per crypto, aggiunge '-' e la valuta (es. BTC -> BTC-USD).
"""
asset_id = asset_id.upper()
return f"{asset_id}-{self.currency}" if '-' not in asset_id else asset_id
def get_product(self, asset_id: str) -> ProductInfo:
symbol = self._format_symbol(asset_id)
stock_info = self.tool.get_company_info(symbol)
stock_info = json.loads(stock_info)
return extract_product(stock_info)
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
products: list[ProductInfo] = []
for asset_id in asset_ids:
product = self.get_product(asset_id)
products.append(product)
return products
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
symbol = self._format_symbol(asset_id)
days = limit // 24 + 1 # Arrotonda per eccesso
hist_data = self.tool.get_historical_stock_prices(symbol, period=f"{days}d", interval="1h")
hist_data = json.loads(hist_data)
# Il formato dei dati è {timestamp: {Open: x, High: y, Low: z, Close: w, Volume: v}}
timestamps = sorted(hist_data.keys())[-limit:]
prices: list[Price] = []
for timestamp in timestamps:
temp = hist_data[timestamp]
temp['Timestamp'] = timestamp
price = extract_price(temp)
prices.append(price)
return prices

View File

@@ -0,0 +1,78 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.base.news import NewsWrapper, Article
from app.api.news.news_api import NewsApiWrapper
from app.api.news.googlenews import GoogleNewsWrapper
from app.api.news.cryptopanic_api import CryptoPanicWrapper
from app.api.news.duckduckgo import DuckDuckGoWrapper
__all__ = ["NewsAPIsTool", "NewsApiWrapper", "GoogleNewsWrapper", "CryptoPanicWrapper", "DuckDuckGoWrapper", "Article"]
class NewsAPIsTool(NewsWrapper, Toolkit):
"""
Aggregates multiple news API wrappers and manages them using WrapperHandler.
This class supports retrieving top headlines and latest news articles by querying multiple sources:
- GoogleNewsWrapper
- DuckDuckGoWrapper
- NewsApiWrapper
- CryptoPanicWrapper
By default, it returns results from the first successful wrapper.
Optionally, it can be configured to collect articles from all wrappers.
If no wrapper succeeds, an exception is raised.
"""
def __init__(self):
"""
Initialize the NewsAPIsTool with multiple news API wrappers.
The tool uses WrapperHandler to manage and invoke the different news API wrappers.
The following wrappers are included in this order:
- GoogleNewsWrapper.
- DuckDuckGoWrapper.
- NewsApiWrapper.
- CryptoPanicWrapper.
"""
wrappers: list[type[NewsWrapper]] = [GoogleNewsWrapper, DuckDuckGoWrapper, NewsApiWrapper, CryptoPanicWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
name="News APIs Toolkit",
tools=[
self.get_top_headlines,
self.get_latest_news,
self.get_top_headlines_aggregated,
self.get_latest_news_aggregated,
],
)
def get_top_headlines(self, limit: int = 100) -> list[Article]:
return self.handler.try_call(lambda w: w.get_top_headlines(limit))
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
return self.handler.try_call(lambda w: w.get_latest_news(query, limit))
def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]:
"""
Calls get_top_headlines on all wrappers/providers and returns a dictionary mapping their names to their articles.
Args:
limit (int): Maximum number of articles to retrieve from each provider.
Returns:
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_top_headlines(limit))
def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]:
"""
Calls get_latest_news on all wrappers/providers and returns a dictionary mapping their names to their articles.
Args:
query (str): The search query to find relevant news articles.
limit (int): Maximum number of articles to retrieve from each provider.
Returns:
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_latest_news(query, limit))

View File

@@ -0,0 +1,79 @@
import os
from typing import Any
import requests
from enum import Enum
from app.api.base.news import NewsWrapper, Article
class CryptoPanicFilter(Enum):
RISING = "rising"
HOT = "hot"
BULLISH = "bullish"
BEARISH = "bearish"
IMPORTANT = "important"
SAVED = "saved"
LOL = "lol"
ANY = ""
class CryptoPanicKind(Enum):
NEWS = "news"
MEDIA = "media"
ALL = "all"
def extract_articles(response: dict[str, Any]) -> list[Article]:
articles: list[Article] = []
if 'results' in response:
for item in response['results']:
article = Article()
article.source = item.get('source', {}).get('title', '')
article.time = item.get('published_at', '')
article.title = item.get('title', '')
article.description = item.get('description', '')
articles.append(article)
return articles
class CryptoPanicWrapper(NewsWrapper):
"""
A wrapper for the CryptoPanic API (Documentation: https://cryptopanic.com/developers/api/)
Requires an API key set in the environment variable CRYPTOPANIC_API_KEY.
It is free to use, but has rate limits and restrictions based on the plan type (the free plan is 'developer' with 100 req/month).
Supports different plan types via the CRYPTOPANIC_API_PLAN environment variable (developer, growth, enterprise).
"""
def __init__(self):
self.api_key = os.getenv("CRYPTOPANIC_API_KEY", "")
assert self.api_key, "CRYPTOPANIC_API_KEY environment variable not set"
# Set here for the future, but currently not needed
plan_type = os.getenv("CRYPTOPANIC_API_PLAN", "developer").lower()
assert plan_type in ["developer", "growth", "enterprise"], "Invalid CRYPTOPANIC_API_PLAN value"
self.base_url = f"https://cryptopanic.com/api/{plan_type}/v2"
self.filter = CryptoPanicFilter.ANY
self.kind = CryptoPanicKind.NEWS
def get_base_params(self) -> dict[str, str]:
params: dict[str, str] = {}
params['public'] = 'true' # recommended for app and bots
params['auth_token'] = self.api_key
params['kind'] = self.kind.value
if self.filter != CryptoPanicFilter.ANY:
params['filter'] = self.filter.value
return params
def set_filter(self, filter: CryptoPanicFilter):
self.filter = filter
def get_top_headlines(self, limit: int = 100) -> list[Article]:
return self.get_latest_news("", limit) # same endpoint so just call the other method
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
params = self.get_base_params()
params['currencies'] = query
response = requests.get(f"{self.base_url}/posts/", params=params)
assert response.status_code == 200, f"Error fetching data: {response}"
json_response = response.json()
articles = extract_articles(json_response)
return articles[:limit]

View File

@@ -0,0 +1,34 @@
import json
from typing import Any
from agno.tools.duckduckgo import DuckDuckGoTools
from app.api.base.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:
article = Article()
article.source = result.get("source", "")
article.time = result.get("date", "")
article.title = result.get("title", "")
article.description = result.get("body", "")
return article
class DuckDuckGoWrapper(NewsWrapper):
"""
A wrapper for DuckDuckGo News search using the Tool from agno.tools.duckduckgo.
It can be rewritten to use direct API calls if needed in the future, but currently is easy to write and use.
"""
def __init__(self):
self.tool = DuckDuckGoTools()
self.query = "crypto"
def get_top_headlines(self, limit: int = 100) -> list[Article]:
results = self.tool.duckduckgo_news(self.query, max_results=limit)
json_results = json.loads(results)
return [extract_article(result) for result in json_results]
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
results = self.tool.duckduckgo_news(query or self.query, max_results=limit)
json_results = json.loads(results)
return [extract_article(result) for result in json_results]

View File

@@ -0,0 +1,38 @@
from typing import Any
from gnews import GNews # type: ignore
from app.api.base.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:
article = Article()
article.source = result.get("source", "")
article.time = result.get("publishedAt", "")
article.title = result.get("title", "")
article.description = result.get("description", "")
return article
class GoogleNewsWrapper(NewsWrapper):
"""
A wrapper for the Google News RSS Feed (Documentation: https://github.com/ranahaani/GNews/?tab=readme-ov-file#about-gnews)
It does not require an API key and is free to use.
"""
def get_top_headlines(self, limit: int = 100) -> list[Article]:
gnews = GNews(language='en', max_results=limit, period='7d')
results: list[dict[str, Any]] = gnews.get_top_news() # type: ignore
articles: list[Article] = []
for result in results:
article = extract_article(result)
articles.append(article)
return articles
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
gnews = GNews(language='en', max_results=limit, period='7d')
results: list[dict[str, Any]] = gnews.get_news(query) # type: ignore
articles: list[Article] = []
for result in results:
article = extract_article(result)
articles.append(article)
return articles

View File

@@ -0,0 +1,54 @@
import os
from typing import Any
import newsapi # type: ignore
from app.api.base.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:
article = Article()
article.source = result.get("source", {}).get("name", "")
article.time = result.get("publishedAt", "")
article.title = result.get("title", "")
article.description = result.get("description", "")
return article
class NewsApiWrapper(NewsWrapper):
"""
A wrapper for the NewsAPI (Documentation: https://newsapi.org/docs/get-started)
Requires an API key set in the environment variable NEWS_API_KEY.
It is free to use, but has rate limits and restrictions based on the plan type (the free plan is 'developer' with 100 req/day).
"""
def __init__(self):
api_key = os.getenv("NEWS_API_KEY")
assert api_key, "NEWS_API_KEY environment variable not set"
self.client = newsapi.NewsApiClient(api_key=api_key)
self.category = "business" # Cryptocurrency is under business
self.language = "en"
self.max_page_size = 100
def __calc_pages(self, limit: int, page_size: int) -> tuple[int, int]:
page_size = min(self.max_page_size, limit)
pages = (limit // page_size) + (1 if limit % page_size > 0 else 0)
return pages, page_size
def get_top_headlines(self, limit: int = 100) -> list[Article]:
pages, page_size = self.__calc_pages(limit, self.max_page_size)
articles: list[Article] = []
for page in range(1, pages + 1):
headlines: dict[str, Any] = self.client.get_top_headlines(q="", category=self.category, language=self.language, page_size=page_size, page=page) # type: ignore
results = [extract_article(article) for article in headlines.get("articles", [])] # type: ignore
articles.extend(results)
return articles
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
pages, page_size = self.__calc_pages(limit, self.max_page_size)
articles: list[Article] = []
for page in range(1, pages + 1):
everything: dict[str, Any] = self.client.get_everything(q=query, language=self.language, sort_by="publishedAt", page_size=page_size, page=page) # type: ignore
results = [extract_article(article) for article in everything.get("articles", [])] # type: ignore
articles.extend(results)
return articles

View File

@@ -0,0 +1,53 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.base.social import SocialPost, SocialWrapper
from app.api.social.reddit import RedditWrapper
__all__ = ["SocialAPIsTool", "RedditWrapper", "SocialPost"]
class SocialAPIsTool(SocialWrapper, Toolkit):
"""
Aggregates multiple social media API wrappers and manages them using WrapperHandler.
This class supports retrieving top crypto-related posts by querying multiple sources:
- RedditWrapper
By default, it returns results from the first successful wrapper.
Optionally, it can be configured to collect posts from all wrappers.
If no wrapper succeeds, an exception is raised.
"""
def __init__(self):
"""
Initialize the SocialAPIsTool with multiple social media API wrappers.
The tool uses WrapperHandler to manage and invoke the different social media API wrappers.
The following wrappers are included in this order:
- RedditWrapper.
"""
wrappers: list[type[SocialWrapper]] = [RedditWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
name="Socials Toolkit",
tools=[
self.get_top_crypto_posts,
self.get_top_crypto_posts_aggregated,
],
)
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit))
def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]:
"""
Calls get_top_crypto_posts on all wrappers/providers and returns a dictionary mapping their names to their posts.
Args:
limit_per_wrapper (int): Maximum number of posts to retrieve from each provider.
Returns:
dict[str, list[SocialPost]]: A dictionary where keys are wrapper names and values are lists of SocialPost objects.
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_top_crypto_posts(limit_per_wrapper))

View File

@@ -0,0 +1,67 @@
import os
from praw import Reddit # type: ignore
from praw.models import Submission # type: ignore
from app.api.base.social import SocialWrapper, SocialPost, SocialComment
MAX_COMMENTS = 5
# metterne altri se necessario.
# fonti: https://lkiconsulting.io/marketing/best-crypto-subreddits/
SUBREDDITS = [
"CryptoCurrency",
"Bitcoin",
"Ethereum",
"CryptoMarkets",
"Dogecoin",
"Altcoin",
"DeFi",
"NFT",
"BitcoinBeginners",
"CryptoTechnology",
"btc" # alt subs of Bitcoin
]
def extract_post(post: Submission) -> SocialPost:
social = SocialPost()
social.time = str(post.created)
social.title = post.title
social.description = post.selftext
for top_comment in post.comments:
comment = SocialComment()
comment.time = str(top_comment.created)
comment.description = top_comment.body
social.comments.append(comment)
if len(social.comments) >= MAX_COMMENTS:
break
return social
class RedditWrapper(SocialWrapper):
"""
A wrapper for the Reddit API using PRAW (Python Reddit API Wrapper).
Requires the following environment variables to be set:
- REDDIT_API_CLIENT_ID
- REDDIT_API_CLIENT_SECRET
You can get them by creating an app at https://www.reddit.com/prefs/apps
"""
def __init__(self):
client_id = os.getenv("REDDIT_API_CLIENT_ID")
assert client_id, "REDDIT_API_CLIENT_ID environment variable is not set"
client_secret = os.getenv("REDDIT_API_CLIENT_SECRET")
assert client_secret, "REDDIT_API_CLIENT_SECRET environment variable is not set"
self.tool = Reddit(
client_id=client_id,
client_secret=client_secret,
user_agent="upo-appAI",
)
self.subreddits = self.tool.subreddit("+".join(SUBREDDITS))
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
top_posts = self.subreddits.top(limit=limit, time_filter="week")
return [extract_post(post) for post in top_posts]

View File

@@ -0,0 +1,148 @@
import inspect
import time
import traceback
from typing import Any, Callable, Generic, TypeVar
from agno.utils.log import log_info, log_warning #type: ignore
WrapperType = TypeVar("WrapperType")
WrapperClassType = TypeVar("WrapperClassType")
OutputType = TypeVar("OutputType")
class WrapperHandler(Generic[WrapperType]):
"""
A handler for managing multiple wrappers with retry logic.
It attempts to call a function on the current wrapper, and if it fails,
it retries a specified number of times before switching to the next wrapper.
If all wrappers fail, it raises an exception.
Note: use `build_wrappers` to create an instance of this class for better error handling.
"""
def __init__(self, wrappers: list[WrapperType], try_per_wrapper: int = 3, retry_delay: int = 2):
"""
Initializes the WrapperHandler with a list of wrappers and retry settings.\n
Use `build_wrappers` to create an instance of this class for better error handling.
Args:
wrappers (list[W]): A list of wrapper instances to manage.
try_per_wrapper (int): Number of retries per wrapper before switching to the next.
retry_delay (int): Delay in seconds between retries.
"""
assert not WrapperHandler.__check(wrappers), "All wrappers must be instances of their respective classes. Use `build_wrappers` to create the WrapperHandler."
self.wrappers = wrappers
self.retry_per_wrapper = try_per_wrapper
self.retry_delay = retry_delay
self.index = 0
def try_call(self, func: Callable[[WrapperType], OutputType]) -> OutputType:
"""
Attempts to call the provided function on the current wrapper.
If it fails, it retries a specified number of times before switching to the next wrapper.
If all wrappers fail, it raises an exception.
Args:
func (Callable[[W], T]): A function that takes a wrapper and returns a result.
Returns:
T: The result of the function call.
Raises:
Exception: If all wrappers fail after retries.
"""
return self.__try_call(func, try_all=False).popitem()[1]
def try_call_all(self, func: Callable[[WrapperType], OutputType]) -> dict[str, OutputType]:
"""
Calls the provided function on all wrappers, collecting results.
If a wrapper fails, it logs a warning and continues with the next.
If all wrappers fail, it raises an exception.
Args:
func (Callable[[W], T]): A function that takes a wrapper and returns a result.
Returns:
list[T]: A list of results from the function calls.
Raises:
Exception: If all wrappers fail.
"""
return self.__try_call(func, try_all=True)
def __try_call(self, func: Callable[[WrapperType], OutputType], try_all: bool) -> dict[str, OutputType]:
"""
Internal method to handle the logic of trying to call a function on wrappers.
It can either stop at the first success or try all wrappers.
Args:
func (Callable[[W], T]): A function that takes a wrapper and returns a result.
try_all (bool): If True, tries all wrappers and collects results; if False, stops at the first success.
Returns:
dict[str, T]: A dictionary mapping wrapper class names to results.
Raises:
Exception: If all wrappers fail after retries.
"""
log_info(f"{inspect.getsource(func).strip()} {inspect.getclosurevars(func).nonlocals}")
results: dict[str, OutputType] = {}
starting_index = self.index
for i in range(starting_index, len(self.wrappers) + starting_index):
self.index = i % len(self.wrappers)
wrapper = self.wrappers[self.index]
wrapper_name = wrapper.__class__.__name__
if not try_all:
log_info(f"try_call {wrapper_name}")
for try_count in range(1, self.retry_per_wrapper + 1):
try:
result = func(wrapper)
log_info(f"{wrapper_name} succeeded")
results[wrapper_name] = result
break
except Exception as e:
error = WrapperHandler.__concise_error(e)
log_warning(f"{wrapper_name} failed {try_count}/{self.retry_per_wrapper}: {error}")
time.sleep(self.retry_delay)
if not try_all and results:
return results
if not results:
error = locals().get("error", "Unknown error")
raise Exception(f"All wrappers failed, latest error: {error}")
self.index = starting_index
return results
@staticmethod
def __check(wrappers: list[Any]) -> bool:
return all(w.__class__ is type for w in wrappers)
@staticmethod
def __concise_error(e: Exception) -> str:
last_frame = traceback.extract_tb(e.__traceback__)[-1]
return f"{e} [\"{last_frame.filename}\", line {last_frame.lineno}]"
@staticmethod
def build_wrappers(constructors: list[type[WrapperClassType]], try_per_wrapper: int = 3, retry_delay: int = 2, kwargs: dict[str, Any] | None = None) -> 'WrapperHandler[WrapperClassType]':
"""
Builds a WrapperHandler instance with the given wrapper constructors.
It attempts to initialize each wrapper and logs a warning if any cannot be initialized.
Only successfully initialized wrappers are included in the handler.
Args:
constructors (list[type[W]]): An iterable of wrapper classes to instantiate. e.g. [WrapperA, WrapperB]
try_per_wrapper (int): Number of retries per wrapper before switching to the next.
retry_delay (int): Delay in seconds between retries.
kwargs (dict | None): Optional dictionary with keyword arguments common to all wrappers.
Returns:
WrapperHandler[W]: An instance of WrapperHandler with the initialized wrappers.
Raises:
Exception: If no wrappers could be initialized.
"""
assert WrapperHandler.__check(constructors), f"All constructors must be classes. Received: {constructors}"
result: list[WrapperClassType] = []
for wrapper_class in constructors:
try:
wrapper = wrapper_class(**(kwargs or {}))
result.append(wrapper)
except Exception as e:
log_warning(f"{wrapper_class} cannot be initialized: {e}")
return WrapperHandler(result, try_per_wrapper, retry_delay)