3 market api (#8)

* Creazione branch tool, refactor degli import e soppressione dei warning

* Update pytest configuration and dependencies in pyproject.toml

* Add news API integration and related configurations

- Update .env.example to include NEWS_API_KEY configuration
- Add newsapi-python dependency in pyproject.toml
- Implement NewsAPI class for fetching news articles
- Create Article model for structured news data
- Add tests for NewsAPI functionality in test_news_api.py
- Update pytest configuration to include news marker

* Add news API functionality and update tests for article retrieval

* ToDo:
1. Aggiungere un aggregator per i dati recuperati dai provider.
2. Lavorare effettivamente all'issue

Done:
1. creati test per i provider
2. creato market_providers_api_demo.py per mostrare i dati recuperati dalle api dei providers
3. aggiornato i provider
4. creato il provider binance sia pubblico che con chiave
5. creato error_handler.py per gestire decoratori e utilità: retry automatico, gestione timeout...

* Refactor news API integration to use NewsApiWrapper and GnewsWrapper; add tests for Gnews API functionality

* Add CryptoPanic API integration and related tests; update .env.example and test configurations

* Implement WrapperHandler for managing multiple news API wrappers; add tests for wrapper functionality

* Enhance WrapperHandler
- docstrings
- add try_call_all method
- update tests

* pre merge con phil

* Add DuckDuckGo and Google News wrappers; refactor CryptoPanic and NewsAPI

- Implemented DuckDuckGoWrapper for news retrieval using DuckDuckGo tools.
- Added GoogleNewsWrapper for accessing Google News RSS feed.
- Refactored CryptoPanicWrapper to unify get_top_headlines and get_latest_news methods.
- Updated NewsApiWrapper to simplify top headlines retrieval.
- Added tests for DuckDuckGo and Google News wrappers.
- Enhanced documentation for CryptoPanicWrapper and NewsApiWrapper.
- Created base module for social media integrations.

* - Refactor struttura progetto: divisione tra agent e toolkit

* Refactor try_call_all method to return a dictionary of results; update tests for success and partial failures

* Fix class and test method names for DuckDuckGoWrapper

* Add Reddit API wrapper and related tests; update environment configuration

* pre merge con giacomo

* Fix import statements

* Fixes
- separated tests
- fix tests
- fix bugs reintroduced my previous merge

* Refactor market API wrappers to streamline product and price retrieval methods

* Add BinanceWrapper to market API exports

* Finito ISSUE 3

* Final review
- rm PublicBinanceAgent & updated demo
- moved in the correct folder some tests
- fix binance bug

---------

Co-authored-by: trojanhorse47 <cosmomemory@hotmail.it>
Co-authored-by: Berack96 <giacomobertolazzi7@gmail.com>
Co-authored-by: Giacomo Bertolazzi <31776951+Berack96@users.noreply.github.com>
This commit was merged in pull request #8.
This commit is contained in:
Simo
2025-10-01 15:51:25 +02:00
committed by GitHub
parent 4615ebe63e
commit dc9dc98298
50 changed files with 2673 additions and 671 deletions

View File

@@ -1,39 +0,0 @@
from agno.tools import Toolkit
from app.markets import MarketAPIs
# TODO (?) in futuro fare in modo che la LLM faccia da sé per il mercato
# Non so se può essere utile, per ora lo lascio qui
# per ora mettiamo tutto statico e poi, se abbiamo API-Key senza limiti
# possiamo fare in modo di far scegliere alla LLM quale crypto proporre
# in base alle sue proprie chiamate API
class MarketToolkit(Toolkit):
def __init__(self):
self.market_api = MarketAPIs("USD") # change currency if needed
super().__init__(
name="Market Toolkit",
tools=[
self.get_historical_data,
self.get_current_price,
],
)
def get_historical_data(self, symbol: str):
return self.market_api.get_historical_prices(symbol)
def get_current_price(self, symbol: str):
return self.market_api.get_products(symbol)
def prepare_inputs():
pass
def instructions():
return """
Utilizza questo strumento per ottenere dati di mercato storici e attuali per criptovalute specifiche.
Puoi richiedere i prezzi storici o il prezzo attuale di una criptovaluta specifica.
Esempio di utilizzo:
- get_historical_data("BTC")
- get_current_price("ETH")
"""

View File

@@ -0,0 +1,90 @@
from typing import Union, List, Dict, Optional, Any, Iterator, Sequence
from agno.agent import Agent
from agno.models.message import Message
from agno.run.agent import RunOutput, RunOutputEvent
from pydantic import BaseModel
from app.toolkits.market_toolkit import MarketToolkit
from app.markets.base import ProductInfo # modello dati già definito nel tuo progetto
class MarketAgent(Agent):
"""
Wrapper che trasforma MarketToolkit in un Agent compatibile con Team.
Produce sia output leggibile (content) che dati strutturati (metadata).
"""
def __init__(self, currency: str = "USD"):
super().__init__()
self.toolkit = MarketToolkit()
self.currency = currency
self.name = "MarketAgent"
def run(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[Sequence[Any]] = None,
images: Optional[Sequence[Any]] = None,
videos: Optional[Sequence[Any]] = None,
files: Optional[Sequence[Any]] = None,
retries: Optional[int] = None,
knowledge_filters: Optional[Dict[str, Any]] = None,
add_history_to_context: Optional[bool] = None,
add_dependencies_to_context: Optional[bool] = None,
add_session_state_to_context: Optional[bool] = None,
dependencies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
yield_run_response: bool = False,
debug_mode: Optional[bool] = None,
**kwargs: Any,
) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]:
# 1. Estraggo la query dal parametro "input"
if isinstance(input, str):
query = input
elif isinstance(input, dict) and "query" in input:
query = input["query"]
elif isinstance(input, Message):
query = input.content
elif isinstance(input, BaseModel):
query = str(input)
elif isinstance(input, list) and input and isinstance(input[0], Message):
query = input[0].content
else:
query = str(input)
# 2. Individuo i simboli da analizzare
symbols = []
for token in query.upper().split():
if token in ("BTC", "ETH", "XRP", "LTC", "BCH"): # TODO: estendere dinamicamente
symbols.append(token)
if not symbols:
symbols = ["BTC", "ETH"] # default
# 3. Recupero i dati dal toolkit
results = []
products: List[ProductInfo] = []
try:
products.extend(self.toolkit.get_current_prices(symbols)) # supponiamo ritorni un ProductInfo o simile
# Usa list comprehension per iterare symbols e products insieme
results.extend([
f"{symbol}: ${product.price:.2f}" if hasattr(product, 'price') and product.price else f"{symbol}: N/A"
for symbol, product in zip(symbols, products)
])
except Exception as e:
results.extend(f"Errore: Impossibile recuperare i dati di mercato\n{str(e)}")
# 4. Preparo output leggibile + metadati strutturati
output_text = "📊 Dati di mercato:\n" + "\n".join(results)
return RunOutput(
content=output_text,
metadata={"products": products}
)

View File

@@ -1,4 +1,34 @@
class NewsAgent:
from agno.agent import Agent
class NewsAgent(Agent):
"""
Gli agenti devono esporre un metodo run con questa firma.
def run(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[Sequence[Any]] = None,
images: Optional[Sequence[Any]] = None,
videos: Optional[Sequence[Any]] = None,
files: Optional[Sequence[Any]] = None,
retries: Optional[int] = None,
knowledge_filters: Optional[Dict[str, Any]] = None,
add_history_to_context: Optional[bool] = None,
add_dependencies_to_context: Optional[bool] = None,
add_session_state_to_context: Optional[bool] = None,
dependencies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
yield_run_response: bool = False,
debug_mode: Optional[bool] = None,
**kwargs: Any,
) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]:
"""
@staticmethod
def analyze(query: str) -> str:
# Mock analisi news

View File

@@ -1,51 +0,0 @@
from enum import Enum
from app.markets.base import ProductInfo
from pydantic import BaseModel, Field
class PredictorStyle(Enum):
CONSERVATIVE = "Conservativo"
AGGRESSIVE = "Aggressivo"
class PredictorInput(BaseModel):
data: list[ProductInfo] = Field(..., description="Market data as a list of ProductInfo")
style: PredictorStyle = Field(..., description="Prediction style")
sentiment: str = Field(..., description="Aggregated sentiment from news and social analysis")
class ItemPortfolio(BaseModel):
asset: str = Field(..., description="Name of the asset")
percentage: float = Field(..., description="Percentage allocation to the asset")
motivation: str = Field(..., description="Motivation for the allocation")
class PredictorOutput(BaseModel):
strategy: str = Field(..., description="Concise operational strategy in Italian")
portfolio: list[ItemPortfolio] = Field(..., description="List of portfolio items with allocations")
PREDICTOR_INSTRUCTIONS = """
You are an **Allocation Algorithm (Crypto-Algo)** specialized in analyzing market data and sentiment to generate an investment strategy and a target portfolio.
Your sole objective is to process the input data and generate the strictly structured output as required by the response format. **You MUST NOT provide introductions, preambles, explanations, conclusions, or any additional comments that are not strictly required.**
## Processing Instructions (Absolute Rule)
The allocation strategy must be **derived exclusively from the "Allocation Logic" corresponding to the requested *style*** and the provided market/sentiment data. **DO NOT** use external or historical knowledge.
## Allocation Logic
### "Aggressivo" Style (Aggressive)
* **Priority:** Maximizing return (high volatility accepted).
* **Focus:** Higher allocation to **non-BTC/ETH assets** with high momentum potential (Altcoins, mid/low-cap assets).
* **BTC/ETH:** Must serve as a base (anchor), but their allocation **must not exceed 50%** of the total portfolio.
* **Sentiment:** Use positive sentiment to increase exposure to high-risk assets.
### "Conservativo" Style (Conservative)
* **Priority:** Capital preservation (volatility minimized).
* **Focus:** Major allocation to **BTC and/or ETH (Large-Cap Assets)**.
* **BTC/ETH:** Their allocation **must be at least 70%** of the total portfolio.
* **Altcoins:** Any allocations to non-BTC/ETH assets must be minimal (max 30% combined) and for assets that minimize speculative risk.
* **Sentiment:** Use positive sentiment only as confirmation for exposure, avoiding reactions to excessive "FOMO" signals.
## Output Requirements (Content MUST be in Italian)
1. **Strategy (strategy):** Must be a concise operational description **in Italian ("in Italiano")**, with a maximum of 5 sentences.
2. **Portfolio (portfolio):** The sum of all percentages must be **exactly 100%**. The justification (motivation) for each asset must be a single clear sentence **in Italian ("in Italiano")**.
"""

View File

@@ -1,4 +1,35 @@
class SocialAgent:
from agno.agent import Agent
class SocialAgent(Agent):
"""
Gli agenti devono esporre un metodo run con questa firma.
def run(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[Sequence[Any]] = None,
images: Optional[Sequence[Any]] = None,
videos: Optional[Sequence[Any]] = None,
files: Optional[Sequence[Any]] = None,
retries: Optional[int] = None,
knowledge_filters: Optional[Dict[str, Any]] = None,
add_history_to_context: Optional[bool] = None,
add_dependencies_to_context: Optional[bool] = None,
add_session_state_to_context: Optional[bool] = None,
dependencies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
yield_run_response: bool = False,
debug_mode: Optional[bool] = None,
**kwargs: Any,
) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]:
"""
@staticmethod
def analyze(query: str) -> str:
# Mock analisi social