Merge branch 'main' into 6-telegram-interface

This commit is contained in:
2025-10-12 18:38:07 +02:00
48 changed files with 703 additions and 614 deletions

View File

@@ -1,86 +1,7 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.base.markets import MarketWrapper, Price, ProductInfo
from app.api.markets.binance import BinanceWrapper
from app.api.markets.coinbase import CoinBaseWrapper
from app.api.markets.cryptocompare import CryptoCompareWrapper
from app.api.markets.yfinance import YFinanceWrapper
__all__ = [ "MarketAPIsTool", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "ProductInfo", "Price" ]
__all__ = ["BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper"]
class MarketAPIsTool(MarketWrapper, Toolkit):
"""
Class that aggregates multiple market API wrappers and manages them using WrapperHandler.
This class supports retrieving product information and historical prices.
This class can also aggregate data from multiple sources to provide a more comprehensive view of the market.
The following wrappers are included in this order:
- BinanceWrapper
- YFinanceWrapper
- CoinBaseWrapper
- CryptoCompareWrapper
"""
def __init__(self, currency: str = "USD"):
"""
Initialize the MarketAPIsTool with multiple market API wrappers.
The following wrappers are included in this order:
- BinanceWrapper
- YFinanceWrapper
- CoinBaseWrapper
- CryptoCompareWrapper
Args:
currency (str): Valuta in cui restituire i prezzi. Default è "USD".
"""
kwargs = {"currency": currency or "USD"}
wrappers: list[type[MarketWrapper]] = [BinanceWrapper, YFinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs)
Toolkit.__init__( # type: ignore
self,
name="Market APIs Toolkit",
tools=[
self.get_product,
self.get_products,
self.get_historical_prices,
self.get_products_aggregated,
self.get_historical_prices_aggregated,
],
)
def get_product(self, asset_id: str) -> ProductInfo:
return self.handler.try_call(lambda w: w.get_product(asset_id))
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
return self.handler.try_call(lambda w: w.get_products(asset_ids))
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Restituisce i dati aggregati per una lista di asset_id.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_ids (list[str]): Lista di asset_id da cercare.
Returns:
list[ProductInfo]: Lista di ProductInfo aggregati.
Raises:
Exception: If all wrappers fail to provide results.
"""
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
return ProductInfo.aggregate(all_products)
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
"""
Restituisce i dati storici aggregati per un asset_id. Usa i dati di tutte le fonti disponibili e li aggrega.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_id (str): Asset ID da cercare.
limit (int): Numero massimo di dati storici da restituire.
Returns:
list[Price]: Lista di Price aggregati.
Raises:
Exception: If all wrappers fail to provide results.
"""
all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit))
return Price.aggregate(all_prices)

View File

@@ -1,7 +1,7 @@
import os
from typing import Any
from binance.client import Client # type: ignore
from app.api.base.markets import ProductInfo, MarketWrapper, Price
from app.api.core.markets import ProductInfo, MarketWrapper, Price
def extract_product(currency: str, ticker_data: dict[str, Any]) -> ProductInfo:

View File

@@ -3,7 +3,7 @@ from enum import Enum
from datetime import datetime, timedelta
from coinbase.rest import RESTClient # type: ignore
from coinbase.rest.types.product_types import Candle, GetProductResponse, Product # type: ignore
from app.api.base.markets import ProductInfo, MarketWrapper, Price
from app.api.core.markets import ProductInfo, MarketWrapper, Price
def extract_product(product_data: GetProductResponse | Product) -> ProductInfo:

View File

@@ -1,7 +1,7 @@
import os
from typing import Any
import requests
from app.api.base.markets import ProductInfo, MarketWrapper, Price
from app.api.core.markets import ProductInfo, MarketWrapper, Price
def extract_product(asset_data: dict[str, Any]) -> ProductInfo:

View File

@@ -1,6 +1,6 @@
import json
from agno.tools.yfinance import YFinanceTools
from app.api.base.markets import MarketWrapper, ProductInfo, Price
from app.api.core.markets import MarketWrapper, ProductInfo, Price
def extract_product(stock_data: dict[str, str]) -> ProductInfo:

View File

@@ -1,78 +1,7 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.base.news import NewsWrapper, Article
from app.api.news.news_api import NewsApiWrapper
from app.api.news.newsapi import NewsApiWrapper
from app.api.news.googlenews import GoogleNewsWrapper
from app.api.news.cryptopanic_api import CryptoPanicWrapper
from app.api.news.duckduckgo import DuckDuckGoWrapper
__all__ = ["NewsAPIsTool", "NewsApiWrapper", "GoogleNewsWrapper", "CryptoPanicWrapper", "DuckDuckGoWrapper", "Article"]
__all__ = ["NewsApiWrapper", "GoogleNewsWrapper", "CryptoPanicWrapper", "DuckDuckGoWrapper"]
class NewsAPIsTool(NewsWrapper, Toolkit):
"""
Aggregates multiple news API wrappers and manages them using WrapperHandler.
This class supports retrieving top headlines and latest news articles by querying multiple sources:
- GoogleNewsWrapper
- DuckDuckGoWrapper
- NewsApiWrapper
- CryptoPanicWrapper
By default, it returns results from the first successful wrapper.
Optionally, it can be configured to collect articles from all wrappers.
If no wrapper succeeds, an exception is raised.
"""
def __init__(self):
"""
Initialize the NewsAPIsTool with multiple news API wrappers.
The tool uses WrapperHandler to manage and invoke the different news API wrappers.
The following wrappers are included in this order:
- GoogleNewsWrapper.
- DuckDuckGoWrapper.
- NewsApiWrapper.
- CryptoPanicWrapper.
"""
wrappers: list[type[NewsWrapper]] = [GoogleNewsWrapper, DuckDuckGoWrapper, NewsApiWrapper, CryptoPanicWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
name="News APIs Toolkit",
tools=[
self.get_top_headlines,
self.get_latest_news,
self.get_top_headlines_aggregated,
self.get_latest_news_aggregated,
],
)
def get_top_headlines(self, limit: int = 100) -> list[Article]:
return self.handler.try_call(lambda w: w.get_top_headlines(limit))
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
return self.handler.try_call(lambda w: w.get_latest_news(query, limit))
def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]:
"""
Calls get_top_headlines on all wrappers/providers and returns a dictionary mapping their names to their articles.
Args:
limit (int): Maximum number of articles to retrieve from each provider.
Returns:
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_top_headlines(limit))
def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]:
"""
Calls get_latest_news on all wrappers/providers and returns a dictionary mapping their names to their articles.
Args:
query (str): The search query to find relevant news articles.
limit (int): Maximum number of articles to retrieve from each provider.
Returns:
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_latest_news(query, limit))

View File

@@ -2,7 +2,7 @@ import os
from typing import Any
import requests
from enum import Enum
from app.api.base.news import NewsWrapper, Article
from app.api.core.news import NewsWrapper, Article
class CryptoPanicFilter(Enum):

View File

@@ -1,7 +1,7 @@
import json
from typing import Any
from agno.tools.duckduckgo import DuckDuckGoTools
from app.api.base.news import Article, NewsWrapper
from app.api.core.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:

View File

@@ -1,6 +1,6 @@
from typing import Any
from gnews import GNews # type: ignore
from app.api.base.news import Article, NewsWrapper
from app.api.core.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:

View File

@@ -1,7 +1,7 @@
import os
from typing import Any
import newsapi # type: ignore
from app.api.base.news import Article, NewsWrapper
from app.api.core.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:

View File

@@ -1,53 +1,3 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.base.social import SocialPost, SocialWrapper
from app.api.social.reddit import RedditWrapper
__all__ = ["SocialAPIsTool", "RedditWrapper", "SocialPost"]
class SocialAPIsTool(SocialWrapper, Toolkit):
"""
Aggregates multiple social media API wrappers and manages them using WrapperHandler.
This class supports retrieving top crypto-related posts by querying multiple sources:
- RedditWrapper
By default, it returns results from the first successful wrapper.
Optionally, it can be configured to collect posts from all wrappers.
If no wrapper succeeds, an exception is raised.
"""
def __init__(self):
"""
Initialize the SocialAPIsTool with multiple social media API wrappers.
The tool uses WrapperHandler to manage and invoke the different social media API wrappers.
The following wrappers are included in this order:
- RedditWrapper.
"""
wrappers: list[type[SocialWrapper]] = [RedditWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
name="Socials Toolkit",
tools=[
self.get_top_crypto_posts,
self.get_top_crypto_posts_aggregated,
],
)
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit))
def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]:
"""
Calls get_top_crypto_posts on all wrappers/providers and returns a dictionary mapping their names to their posts.
Args:
limit_per_wrapper (int): Maximum number of posts to retrieve from each provider.
Returns:
dict[str, list[SocialPost]]: A dictionary where keys are wrapper names and values are lists of SocialPost objects.
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_top_crypto_posts(limit_per_wrapper))
__all__ = ["RedditWrapper"]

View File

@@ -1,7 +1,7 @@
import os
from praw import Reddit # type: ignore
from praw.models import Submission # type: ignore
from app.api.base.social import SocialWrapper, SocialPost, SocialComment
from app.api.core.social import SocialWrapper, SocialPost, SocialComment
MAX_COMMENTS = 5

View File

@@ -0,0 +1,5 @@
from app.api.tools.market_tool import MarketAPIsTool
from app.api.tools.social_tool import SocialAPIsTool
from app.api.tools.news_tool import NewsAPIsTool
__all__ = ["MarketAPIsTool", "NewsAPIsTool", "SocialAPIsTool"]

View File

@@ -0,0 +1,80 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.core.markets import MarketWrapper, Price, ProductInfo
from app.api.markets import BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper, YFinanceWrapper
class MarketAPIsTool(MarketWrapper, Toolkit):
"""
Class that aggregates multiple market API wrappers and manages them using WrapperHandler.
This class supports retrieving product information and historical prices.
This class can also aggregate data from multiple sources to provide a more comprehensive view of the market.
The following wrappers are included in this order:
- BinanceWrapper
- YFinanceWrapper
- CoinBaseWrapper
- CryptoCompareWrapper
"""
def __init__(self, currency: str = "USD"):
"""
Initialize the MarketAPIsTool with multiple market API wrappers.
The following wrappers are included in this order:
- BinanceWrapper
- YFinanceWrapper
- CoinBaseWrapper
- CryptoCompareWrapper
Args:
currency (str): Valuta in cui restituire i prezzi. Default è "USD".
"""
kwargs = {"currency": currency or "USD"}
wrappers: list[type[MarketWrapper]] = [BinanceWrapper, YFinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs)
Toolkit.__init__( # type: ignore
self,
name="Market APIs Toolkit",
tools=[
self.get_product,
self.get_products,
self.get_historical_prices,
self.get_products_aggregated,
self.get_historical_prices_aggregated,
],
)
def get_product(self, asset_id: str) -> ProductInfo:
return self.handler.try_call(lambda w: w.get_product(asset_id))
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
return self.handler.try_call(lambda w: w.get_products(asset_ids))
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Restituisce i dati aggregati per una lista di asset_id.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_ids (list[str]): Lista di asset_id da cercare.
Returns:
list[ProductInfo]: Lista di ProductInfo aggregati.
Raises:
Exception: If all wrappers fail to provide results.
"""
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
return ProductInfo.aggregate(all_products)
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
"""
Restituisce i dati storici aggregati per un asset_id. Usa i dati di tutte le fonti disponibili e li aggrega.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_id (str): Asset ID da cercare.
limit (int): Numero massimo di dati storici da restituire.
Returns:
list[Price]: Lista di Price aggregati.
Raises:
Exception: If all wrappers fail to provide results.
"""
all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit))
return Price.aggregate(all_prices)

View File

@@ -0,0 +1,72 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.core.news import NewsWrapper, Article
from app.api.news import NewsApiWrapper, GoogleNewsWrapper, CryptoPanicWrapper, DuckDuckGoWrapper
class NewsAPIsTool(NewsWrapper, Toolkit):
"""
Aggregates multiple news API wrappers and manages them using WrapperHandler.
This class supports retrieving top headlines and latest news articles by querying multiple sources:
- GoogleNewsWrapper
- DuckDuckGoWrapper
- NewsApiWrapper
- CryptoPanicWrapper
By default, it returns results from the first successful wrapper.
Optionally, it can be configured to collect articles from all wrappers.
If no wrapper succeeds, an exception is raised.
"""
def __init__(self):
"""
Initialize the NewsAPIsTool with multiple news API wrappers.
The tool uses WrapperHandler to manage and invoke the different news API wrappers.
The following wrappers are included in this order:
- GoogleNewsWrapper.
- DuckDuckGoWrapper.
- NewsApiWrapper.
- CryptoPanicWrapper.
"""
wrappers: list[type[NewsWrapper]] = [GoogleNewsWrapper, DuckDuckGoWrapper, NewsApiWrapper, CryptoPanicWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
name="News APIs Toolkit",
tools=[
self.get_top_headlines,
self.get_latest_news,
self.get_top_headlines_aggregated,
self.get_latest_news_aggregated,
],
)
def get_top_headlines(self, limit: int = 100) -> list[Article]:
return self.handler.try_call(lambda w: w.get_top_headlines(limit))
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
return self.handler.try_call(lambda w: w.get_latest_news(query, limit))
def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]:
"""
Calls get_top_headlines on all wrappers/providers and returns a dictionary mapping their names to their articles.
Args:
limit (int): Maximum number of articles to retrieve from each provider.
Returns:
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_top_headlines(limit))
def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]:
"""
Calls get_latest_news on all wrappers/providers and returns a dictionary mapping their names to their articles.
Args:
query (str): The search query to find relevant news articles.
limit (int): Maximum number of articles to retrieve from each provider.
Returns:
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_latest_news(query, limit))

View File

@@ -0,0 +1,51 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.core.social import SocialPost, SocialWrapper
from app.api.social import RedditWrapper
class SocialAPIsTool(SocialWrapper, Toolkit):
"""
Aggregates multiple social media API wrappers and manages them using WrapperHandler.
This class supports retrieving top crypto-related posts by querying multiple sources:
- RedditWrapper
By default, it returns results from the first successful wrapper.
Optionally, it can be configured to collect posts from all wrappers.
If no wrapper succeeds, an exception is raised.
"""
def __init__(self):
"""
Initialize the SocialAPIsTool with multiple social media API wrappers.
The tool uses WrapperHandler to manage and invoke the different social media API wrappers.
The following wrappers are included in this order:
- RedditWrapper.
"""
wrappers: list[type[SocialWrapper]] = [RedditWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
name="Socials Toolkit",
tools=[
self.get_top_crypto_posts,
self.get_top_crypto_posts_aggregated,
],
)
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit))
def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]:
"""
Calls get_top_crypto_posts on all wrappers/providers and returns a dictionary mapping their names to their posts.
Args:
limit_per_wrapper (int): Maximum number of posts to retrieve from each provider.
Returns:
dict[str, list[SocialPost]]: A dictionary where keys are wrapper names and values are lists of SocialPost objects.
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_top_crypto_posts(limit_per_wrapper))

View File

@@ -36,6 +36,16 @@ class WrapperHandler(Generic[WrapperType]):
self.retry_delay = retry_delay
self.index = 0
def set_retries(self, try_per_wrapper: int, retry_delay: int) -> None:
"""
Sets the retry parameters for the handler.
Args:
try_per_wrapper (int): Number of retries per wrapper before switching to the next.
retry_delay (int): Delay in seconds between retries.
"""
self.retry_per_wrapper = try_per_wrapper
self.retry_delay = retry_delay
def try_call(self, func: Callable[[WrapperType], OutputType]) -> OutputType:
"""
Attempts to call the provided function on the current wrapper.