Merge branch 'main' into 21-team-monitoring

This commit is contained in:
2025-10-12 23:41:19 +02:00
62 changed files with 785 additions and 609 deletions

View File

@@ -17,6 +17,7 @@ COPY pyproject.toml ./
COPY uv.lock ./
COPY LICENSE ./
COPY src/ ./src/
COPY configs.yaml ./
# Creiamo l'ambiente virtuale con tutto già presente
RUN uv sync

View File

@@ -21,7 +21,7 @@ L'obiettivo è quello di creare un sistema di consulenza finanziaria basato su L
L'installazione di questo progetto richiede 3 passaggi totali (+1 se si vuole sviluppare in locale) che devono essere eseguiti in sequenza. Se questi passaggi sono eseguiti correttamente, l'applicazione dovrebbe partire senza problemi. Altrimenti è molto probabile che si verifichino errori di vario tipo (moduli mancanti, chiavi API non trovate, ecc.).
1. Configurare le variabili d'ambiente
1. Configurazioni dell'app e delle variabili d'ambiente
2. Installare Ollama e i modelli locali
3. Far partire il progetto con Docker (consigliato)
4. (Solo per sviluppo locale) Installare uv e creare l'ambiente virtuale
@@ -29,9 +29,12 @@ L'installazione di questo progetto richiede 3 passaggi totali (+1 se si vuole sv
> [!IMPORTANT]\
> Prima di iniziare, assicurarsi di avere clonato il repository e di essere nella cartella principale del progetto.
### **1. Variabili d'Ambiente**
### **1. Configurazioni**
Copia il file `.env.example` in `.env` e successivamente modificalo con le tue API keys:
Ci sono due file di configurazione principali che l'app utilizza: `config.yaml` e `.env`.\
Il primo contiene le configurazioni generali dell'applicazione e può essere modificato a piacimento, mentre il secondo è utilizzato per le variabili d'ambiente.
Per il secondo, bisogna copiare il file `.env.example` in `.env` e successivamente modificalo con le tue API keys:
```sh
cp .env.example .env
nano .env # esempio di modifica del file
@@ -49,11 +52,8 @@ Per l'installazione scaricare Ollama dal loro [sito ufficiale](https://ollama.co
Dopo l'installazione, si possono iniziare a scaricare i modelli desiderati tramite il comando `ollama pull <model>:<tag>`.
I modelli usati dall'applicazione sono visibili in [src/app/models.py](src/app/models.py). Di seguito metto lo stesso una lista di modelli, ma potrebbe non essere aggiornata:
- `gpt-oss:latest`
- `qwen3:latest`
- `qwen3:4b`
- `qwen3:1.7b`
I modelli usati dall'applicazione sono quelli specificati nel file [config.yaml](config.yaml) alla voce `model`. Se in locale si hanno dei modelli diversi, è possibile modificare questa voce per usare quelli disponibili.
I modelli consigliati per questo progetto sono `qwen3:4b` e `qwen3:1.7b`.
### **3. Docker**
Se si vuole solamente avviare il progetto, si consiglia di utilizzare [Docker](https://www.docker.com), dato che sono stati creati i files [Dockerfile](Dockerfile) e [docker-compose.yaml](docker-compose.yaml) per creare il container con tutti i file necessari e già in esecuzione.
@@ -86,7 +86,7 @@ uv pip install -e .
A questo punto si può già modificare il codice e, quando necessario, far partire il progetto tramite il comando:
```sh
uv run python src/app
uv run src/app
```
# **Applicazione**
@@ -105,12 +105,15 @@ Usando la libreria ``gradio`` è stata creata un'interfaccia web semplice per in
src
└── app
├── __main__.py
├── agents <-- Agenti, modelli, prompts e simili
├── base <-- Classi base per le API
├── markets <-- Market data provider (Es. Binance)
├── news <-- News data provider (Es. NewsAPI)
├── social <-- Social data provider (Es. Reddit)
└── utils <-- Codice di utilità generale
├── config.py <-- Configurazioni app
├── agents <-- Agenti, Team, prompts e simili
├── api <-- Tutte le API esterne
│ ├── core <-- Classi core per le API
│ ├── markets <-- Market data provider (Es. Binance)
│ ├── news <-- News data provider (Es. NewsAPI)
│ ├── social <-- Social data provider (Es. Reddit)
│ └── tools <-- Tools per agenti creati dalle API
└── interface <-- Interfacce utente
```
## Tests

45
configs.yaml Normal file
View File

@@ -0,0 +1,45 @@
port: 8000
gradio_share: false
logging_level: INFO
strategies:
- name: Conservative
label: Conservative
description: Focus on stable and low-risk investments.
- name: Balanced
label: Balanced
description: A mix of growth and stability.
- name: Aggressive
label: Aggressive
description: High-risk, high-reward investments.
models:
gemini:
- name: gemini-2.0-flash
label: Gemini
- name: gemini-2.0-pro
label: Gemini Pro
ollama:
- name: gpt-oss:latest
label: Ollama GPT
- name: qwen3:8b
label: Qwen 3 (8B)
- name: qwen3:4b
label: Qwen 3 (4B)
- name: qwen3:1.7b
label: Qwen 3 (1.7B)
api:
retry_attempts: 3
retry_delay_seconds: 2
currency: EUR
# TODO Magari implementare un sistema per settare i providers
market_providers: [BinanceWrapper, YFinanceWrapper]
news_providers: [GoogleNewsWrapper, DuckDuckGoWrapper]
social_providers: [RedditWrapper]
agents:
strategy: Conservative
team_model: qwen3:1.7b
team_leader_model: qwen3:4b
predictor_model: qwen3:4b

View File

@@ -27,7 +27,7 @@ project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from dotenv import load_dotenv
from app.markets import (
from app.api.markets import (
CoinBaseWrapper,
CryptoCompareWrapper,
BinanceWrapper,

View File

@@ -5,7 +5,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../src'
###########################################
from dotenv import load_dotenv
from app.news import NewsApiWrapper
from app.api.news import NewsApiWrapper
def main():
api = NewsApiWrapper()

View File

@@ -13,6 +13,7 @@ dependencies = [
"pytest", # Test
"dotenv", # Gestire variabili d'ambiente (generalmente API keys od opzioni)
"gradio", # UI web semplice con user_input e output
"colorlog", # Log colorati in console
# Per costruire agenti (ovvero modelli che possono fare più cose tramite tool) https://github.com/agno-agi/agno
# altamente consigliata dato che ha anche tools integrati per fare scraping, calcoli e molto altro

View File

@@ -1,14 +1,19 @@
import asyncio
import gradio as gr
from dotenv import load_dotenv
from agno.utils.log import log_info #type: ignore
from app.utils import ChatManager
from app.configs import AppConfig
from app.interface import ChatManager
from app.agents import Pipeline
if __name__ == "__main__":
# Inizializzazioni
load_dotenv()
pipeline = Pipeline()
configs = AppConfig.load()
pipeline = Pipeline(configs)
chat = ChatManager()
########################################
@@ -57,7 +62,7 @@ if __name__ == "__main__":
type="index",
label="Stile di investimento"
)
style.change(fn=pipeline.choose_style, inputs=style, outputs=None)
style.change(fn=pipeline.choose_strategy, inputs=style, outputs=None)
chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages")
msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?")
@@ -73,7 +78,9 @@ if __name__ == "__main__":
save_btn.click(save_current_chat, inputs=None, outputs=None)
load_btn.click(load_previous_chat, inputs=None, outputs=[chatbot, chatbot])
server, port = ("0.0.0.0", 8000) # 0.0.0.0 per accesso esterno (Docker)
server_log = "localhost" if server == "0.0.0.0" else server
log_info(f"Starting UPO AppAI Chat on http://{server_log}:{port}") # noqa
demo.launch(server_name=server, server_port=port, quiet=True)
try:
_app, local, shared = demo.launch(server_name="0.0.0.0", server_port=configs.port, quiet=True, prevent_thread_lock=True, share=configs.gradio_share)
log_info(f"Starting UPO AppAI Chat on {shared or local}")
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
demo.close()

View File

@@ -1,6 +1,5 @@
from app.agents.models import AppModels
from app.agents.predictor import PredictorInput, PredictorOutput, PredictorStyle, PREDICTOR_INSTRUCTIONS
from app.agents.predictor import PredictorInput, PredictorOutput
from app.agents.team import create_team_with
from app.agents.pipeline import Pipeline
__all__ = ["AppModels", "PredictorInput", "PredictorOutput", "PredictorStyle", "PREDICTOR_INSTRUCTIONS", "create_team_with", "Pipeline"]
__all__ = ["PredictorInput", "PredictorOutput", "create_team_with", "Pipeline"]

View File

@@ -1,107 +0,0 @@
import os
import ollama
from enum import Enum
from agno.agent import Agent
from agno.models.base import Model
from agno.models.google import Gemini
from agno.models.ollama import Ollama
from agno.tools import Toolkit
from agno.utils.log import log_warning #type: ignore
from pydantic import BaseModel
class AppModels(Enum):
"""
Enum per i modelli supportati.
Aggiungere nuovi modelli qui se necessario.
Per quanto riguarda Ollama, i modelli dovranno essere scaricati e installati
localmente seguendo le istruzioni di https://ollama.com/docs/guide/install-models
"""
GEMINI = "gemini-2.0-flash" # API online
GEMINI_PRO = "gemini-2.0-pro" # API online, più costoso ma migliore
OLLAMA_GPT = "gpt-oss:latest" # + good - slow (13b)
OLLAMA_QWEN = "qwen3:latest" # + good + fast (8b)
OLLAMA_QWEN_4B = "qwen3:4b" # + fast + decent (4b)
OLLAMA_QWEN_1B = "qwen3:1.7b" # + very fast + decent (1.7b)
@staticmethod
def availables_local() -> list['AppModels']:
"""
Controlla quali provider di modelli LLM locali sono disponibili.
Ritorna una lista di provider disponibili.
"""
try:
models_list = ollama.list()
availables = [model['model'] for model in models_list['models']]
app_models = [model for model in AppModels if model.name.startswith("OLLAMA")]
return [model for model in app_models if model.value in availables]
except Exception as e:
log_warning(f"Ollama is not running or not reachable: {e}")
return []
@staticmethod
def availables_online() -> list['AppModels']:
"""
Controlla quali provider di modelli LLM online hanno le loro API keys disponibili
come variabili d'ambiente e ritorna una lista di provider disponibili.
"""
if not os.getenv("GOOGLE_API_KEY"):
log_warning("No GOOGLE_API_KEY set in environment variables.")
return []
availables = [AppModels.GEMINI, AppModels.GEMINI_PRO]
return availables
@staticmethod
def availables() -> list['AppModels']:
"""
Controlla quali provider di modelli LLM locali sono disponibili e quali
provider di modelli LLM online hanno le loro API keys disponibili come variabili
d'ambiente e ritorna una lista di provider disponibili.
L'ordine di preferenza è:
1. Gemini (Google)
2. Ollama (locale)
"""
availables = [
*AppModels.availables_online(),
*AppModels.availables_local()
]
assert availables, "No valid model API keys set in environment variables."
return availables
def get_model(self, instructions:str) -> Model:
"""
Restituisce un'istanza del modello specificato.
Args:
instructions: istruzioni da passare al modello (system prompt).
Returns:
Un'istanza di BaseModel o una sua sottoclasse.
Raise:
ValueError se il modello non è supportato.
"""
name = self.value
if self in {model for model in AppModels if model.name.startswith("GEMINI")}:
return Gemini(name, instructions=[instructions])
elif self in {model for model in AppModels if model.name.startswith("OLLAMA")}:
return Ollama(name, instructions=[instructions])
raise ValueError(f"Modello non supportato: {self}")
def get_agent(self, instructions: str, name: str = "", output_schema: type[BaseModel] | None = None, tools: list[Toolkit] | None = None) -> Agent:
"""
Costruisce un agente con il modello e le istruzioni specificate.
Args:
instructions: istruzioni da passare al modello (system prompt)
name: nome dell'agente (opzionale)
output: schema di output opzionale (Pydantic BaseModel)
tools: lista opzionale di strumenti (tools) da fornire all'agente
Returns:
Un'istanza di Agent.
"""
return Agent(
model=self.get_model(instructions),
name=name,
retries=2,
tools=tools,
delay_between_retries=5, # seconds
output_schema=output_schema
)

View File

@@ -1,5 +1,7 @@
from app.agents.models import AppModels
from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorOutput, PredictorStyle
from agno.run.agent import RunEvent
from app.agents.prompts import *
from app.agents.team import AppTeam
from app.configs import AppConfig
class Pipeline:
@@ -9,12 +11,12 @@ class Pipeline:
e scelto dall'utente tramite i dropdown dell'interfaccia grafica.
"""
def __init__(self):
self.available_models = AppModels.availables()
self.all_styles = list(PredictorStyle)
def __init__(self, configs: AppConfig):
self.configs = configs
self.style = self.all_styles[0]
self.choose_predictor(0) # Modello di default
# Stato iniziale
self.choose_strategy(0)
self.choose_predictor(0)
# ======================
# Dropdown handlers
@@ -23,17 +25,13 @@ class Pipeline:
"""
Sceglie il modello LLM da usare per il Predictor.
"""
model = self.available_models[index]
self.predictor = model.get_agent(
PREDICTOR_INSTRUCTIONS,
output_schema=PredictorOutput,
)
self.predictor = self.configs.models.all_models[index]
def choose_style(self, index: int):
def choose_strategy(self, index: int):
"""
Sceglie lo stile (conservativo/aggressivo) da usare per il Predictor.
Sceglie la strategia da usare per il Predictor.
"""
self.style = self.all_styles[index]
self.strat = self.configs.strategies[index].description
# ======================
# Helpers
@@ -42,13 +40,13 @@ class Pipeline:
"""
Restituisce la lista dei nomi dei modelli disponibili.
"""
return [model.name for model in self.available_models]
return [model.label for model in self.configs.models.all_models]
def list_styles(self) -> list[str]:
"""
Restituisce la lista degli stili di previsione disponibili.
"""
return [style.value for style in self.all_styles]
return [strat.label for strat in self.configs.strategies]
# ======================
# Core interaction
@@ -61,9 +59,7 @@ class Pipeline:
4. Restituisce la strategia finale
"""
# Step 1: raccolta output dai membri del Team
from app.agents import AppTeam
from agno.agent import RunEvent
team = AppTeam(AppModels.OLLAMA_QWEN_1B) # TODO rendere dinamico
team = AppTeam(configs=self.configs, team_models=self.predictor)
team.add_listener(RunEvent.tool_call_started, lambda e: print(f"Team tool call started: {e.agent_name}")) # type: ignore
team.add_listener(RunEvent.tool_call_completed, lambda e: print(f"Team tool call completed: {e.agent_name}")) # type: ignore
result = team.run_team(query)

View File

@@ -1,15 +1,9 @@
from enum import Enum
from pydantic import BaseModel, Field
from app.base.markets import ProductInfo
class PredictorStyle(Enum):
CONSERVATIVE = "Conservativo"
AGGRESSIVE = "Aggressivo"
from app.api.core.markets import ProductInfo
class PredictorInput(BaseModel):
data: list[ProductInfo] = Field(..., description="Market data as a list of ProductInfo")
style: PredictorStyle = Field(..., description="Prediction style")
style: str = Field(..., description="Prediction style")
sentiment: str = Field(..., description="Aggregated sentiment from news and social analysis")
class ItemPortfolio(BaseModel):
@@ -20,34 +14,3 @@ class ItemPortfolio(BaseModel):
class PredictorOutput(BaseModel):
strategy: str = Field(..., description="Concise operational strategy in Italian")
portfolio: list[ItemPortfolio] = Field(..., description="List of portfolio items with allocations")
PREDICTOR_INSTRUCTIONS = """
You are an **Allocation Algorithm (Crypto-Algo)** specialized in analyzing market data and sentiment to generate an investment strategy and a target portfolio.
Your sole objective is to process the user_input data and generate the strictly structured output as required by the response format. **You MUST NOT provide introductions, preambles, explanations, conclusions, or any additional comments that are not strictly required.**
## Processing Instructions (Absolute Rule)
The allocation strategy must be **derived exclusively from the "Allocation Logic" corresponding to the requested *style*** and the provided market/sentiment data. **DO NOT** use external or historical knowledge.
## Allocation Logic
### "Aggressivo" Style (Aggressive)
* **Priority:** Maximizing return (high volatility accepted).
* **Focus:** Higher allocation to **non-BTC/ETH assets** with high momentum potential (Altcoins, mid/low-cap assets).
* **BTC/ETH:** Must serve as a base (anchor), but their allocation **must not exceed 50%** of the total portfolio.
* **Sentiment:** Use positive sentiment to increase exposure to high-risk assets.
### "Conservativo" Style (Conservative)
* **Priority:** Capital preservation (volatility minimized).
* **Focus:** Major allocation to **BTC and/or ETH (Large-Cap Assets)**.
* **BTC/ETH:** Their allocation **must be at least 70%** of the total portfolio.
* **Altcoins:** Any allocations to non-BTC/ETH assets must be minimal (max 30% combined) and for assets that minimize speculative risk.
* **Sentiment:** Use positive sentiment only as confirmation for exposure, avoiding reactions to excessive "FOMO" signals.
## Output Requirements (Content MUST be in Italian)
1. **Strategy (strategy):** Must be a concise operational description **in Italian ("in Italiano")**, with a maximum of 5 sentences.
2. **Portfolio (portfolio):** The sum of all percentages must be **exactly 100%**. The justification (motivation) for each asset must be a single clear sentence **in Italian ("in Italiano")**.
"""

View File

@@ -0,0 +1,21 @@
from pathlib import Path
__PROMPTS_PATH = Path(__file__).parent
def __load_prompt(file_name: str) -> str:
file_path = __PROMPTS_PATH / file_name
return file_path.read_text(encoding='utf-8').strip()
COORDINATOR_INSTRUCTIONS = __load_prompt("team_leader.txt")
MARKET_INSTRUCTIONS = __load_prompt("team_market.txt")
NEWS_INSTRUCTIONS = __load_prompt("team_news.txt")
SOCIAL_INSTRUCTIONS = __load_prompt("team_social.txt")
PREDICTOR_INSTRUCTIONS = __load_prompt("predictor.txt")
__all__ = [
"COORDINATOR_INSTRUCTIONS",
"MARKET_INSTRUCTIONS",
"NEWS_INSTRUCTIONS",
"SOCIAL_INSTRUCTIONS",
"PREDICTOR_INSTRUCTIONS",
]

View File

@@ -0,0 +1,27 @@
You are an **Allocation Algorithm (Crypto-Algo)** specialized in analyzing market data and sentiment to generate an investment strategy and a target portfolio.
Your sole objective is to process the user_input data and generate the strictly structured output as required by the response format. **You MUST NOT provide introductions, preambles, explanations, conclusions, or any additional comments that are not strictly required.**
## Processing Instructions (Absolute Rule)
The allocation strategy must be **derived exclusively from the "Allocation Logic" corresponding to the requested *style*** and the provided market/sentiment data. **DO NOT** use external or historical knowledge.
## Allocation Logic
### "Aggressivo" Style (Aggressive)
* **Priority:** Maximizing return (high volatility accepted).
* **Focus:** Higher allocation to **non-BTC/ETH assets** with high momentum potential (Altcoins, mid/low-cap assets).
* **BTC/ETH:** Must serve as a base (anchor), but their allocation **must not exceed 50%** of the total portfolio.
* **Sentiment:** Use positive sentiment to increase exposure to high-risk assets.
### "Conservativo" Style (Conservative)
* **Priority:** Capital preservation (volatility minimized).
* **Focus:** Major allocation to **BTC and/or ETH (Large-Cap Assets)**.
* **BTC/ETH:** Their allocation **must be at least 70%** of the total portfolio.
* **Altcoins:** Any allocations to non-BTC/ETH assets must be minimal (max 30% combined) and for assets that minimize speculative risk.
* **Sentiment:** Use positive sentiment only as confirmation for exposure, avoiding reactions to excessive "FOMO" signals.
## Output Requirements (Content MUST be in Italian)
1. **Strategy (strategy):** Must be a concise operational description **in Italian ("in Italiano")**, with a maximum of 5 sentences.
2. **Portfolio (portfolio):** The sum of all percentages must be **exactly 100%**. The justification (motivation) for each asset must be a single clear sentence **in Italian ("in Italiano")**.

View File

@@ -0,0 +1,15 @@
You are the expert coordinator of a financial analysis team specializing in cryptocurrencies.
Your team consists of three agents:
- **MarketAgent**: Provides quantitative market data, price analysis, and technical indicators.
- **NewsAgent**: Scans and analyzes the latest news, articles, and official announcements.
- **SocialAgent**: Gauges public sentiment, trends, and discussions on social media.
Your primary objective is to answer the user's query by orchestrating the work of your team members.
Your workflow is as follows:
1. **Deconstruct the user's query** to identify the required information.
2. **Delegate specific tasks** to the most appropriate agent(s) to gather the necessary data and initial analysis.
3. **Analyze the information** returned by the agents.
4. If the initial data is insufficient or the query is complex, **iteratively re-engage the agents** with follow-up questions to build a comprehensive picture.
5. **Synthesize all the gathered information** into a final, coherent, and complete analysis that fills all the required output fields.

View File

@@ -0,0 +1,19 @@
**TASK:** You are a specialized **Crypto Price Data Retrieval Agent**. Your primary goal is to fetch the most recent and/or historical price data for requested cryptocurrency assets (e.g., 'BTC', 'ETH', 'SOL'). You must provide the data in a clear and structured format.
**AVAILABLE TOOLS:**
1. `get_products(asset_ids: list[str])`: Get **current** product/price info for a list of assets. **(PREFERITA: usa questa per i prezzi live)**
2. `get_historical_prices(asset_id: str, limit: int)`: Get historical price data for one asset. Default limit is 100. **(PREFERITA: usa questa per i dati storici)**
3. `get_products_aggregated(asset_ids: list[str])`: Get **aggregated current** product/price info for a list of assets. **(USA SOLO SE richiesto 'aggregato' o se `get_products` fallisce)**
4. `get_historical_prices_aggregated(asset_id: str, limit: int)`: Get **aggregated historical** price data for one asset. **(USA SOLO SE richiesto 'aggregato' o se `get_historical_prices` fallisce)**
**USAGE GUIDELINE:**
* **Asset ID:** Always convert common names (e.g., 'Bitcoin', 'Ethereum') into their official ticker/ID (e.g., 'BTC', 'ETH').
* **Cost Management (Cruciale per LLM locale):** Prefer `get_products` and `get_historical_prices` for standard requests to minimize costs.
* **Aggregated Data:** Use `get_products_aggregated` or `get_historical_prices_aggregated` only if the user specifically requests aggregated data or you value that having aggregated data is crucial for the analysis.
* **Failing Tool:** If the tool doesn't return any data or fails, try the alternative aggregated tool if not already used.
**REPORTING REQUIREMENT:**
1. **Format:** Output the results in a clear, easy-to-read list or table.
2. **Live Price Request:** If an asset's *current price* is requested, report the **Asset ID**, **Latest Price**, and **Time/Date of the price**.
3. **Historical Price Request:** If *historical data* is requested, report the **Asset ID**, the **Limit** of points returned, and the **First** and **Last** entries from the list of historical prices (Date, Price).
4. **Output:** For all requests, output a single, concise summary of the findings; if requested, also include the raw data retrieved.

View File

@@ -0,0 +1,18 @@
**TASK:** You are a specialized **Crypto News Analyst**. Your goal is to fetch the latest news or top headlines related to cryptocurrencies, and then **analyze the sentiment** of the content to provide a concise report to the team leader. Prioritize 'crypto' or specific cryptocurrency names (e.g., 'Bitcoin', 'Ethereum') in your searches.
**AVAILABLE TOOLS:**
1. `get_latest_news(query: str, limit: int)`: Get the 'limit' most recent news articles for a specific 'query'.
2. `get_top_headlines(limit: int)`: Get the 'limit' top global news headlines.
3. `get_latest_news_aggregated(query: str, limit: int)`: Get aggregated latest news articles for a specific 'query'.
4. `get_top_headlines_aggregated(limit: int)`: Get aggregated top global news headlines.
**USAGE GUIDELINE:**
* Always use `get_latest_news` with a relevant crypto-related query first.
* The default limit for news items should be 5 unless specified otherwise.
* If the tool doesn't return any articles, respond with "No relevant news articles found."
**REPORTING REQUIREMENT:**
1. **Analyze** the tone and key themes of the retrieved articles.
2. **Summarize** the overall **market sentiment** (e.g., highly positive, cautiously neutral, generally negative) based on the content.
3. **Identify** the top 2-3 **main topics** discussed (e.g., new regulation, price surge, institutional adoption).
4. **Output** a single, brief report summarizing these findings. Do not output the raw articles.

View File

@@ -0,0 +1,15 @@
**TASK:** You are a specialized **Social Media Sentiment Analyst**. Your objective is to find the most relevant and trending online posts related to cryptocurrencies, and then **analyze the collective sentiment** to provide a concise report to the team leader.
**AVAILABLE TOOLS:**
1. `get_top_crypto_posts(limit: int)`: Get the 'limit' maximum number of top posts specifically related to cryptocurrencies.
**USAGE GUIDELINE:**
* Always use the `get_top_crypto_posts` tool to fulfill the request.
* The default limit for posts should be 5 unless specified otherwise.
* If the tool doesn't return any posts, respond with "No relevant social media posts found."
**REPORTING REQUIREMENT:**
1. **Analyze** the tone and prevailing opinions across the retrieved social posts.
2. **Summarize** the overall **community sentiment** (e.g., high enthusiasm/FOMO, uncertainty, FUD/fear) based on the content.
3. **Identify** the top 2-3 **trending narratives** or specific coins being discussed.
4. **Output** a single, brief report summarizing these findings. Do not output the raw posts.

View File

@@ -1,35 +1,19 @@
import asyncio
import logging
from typing import Callable, Self
from typing import Callable
from agno.run.agent import RunOutputEvent
from agno.team import Team, TeamRunEvent, TeamRunOutputEvent
from agno.tools.reasoning import ReasoningTools
from app.agents import AppModels
from app.markets import MarketAPIsTool
from app.news import NewsAPIsTool
from app.social import SocialAPIsTool
from app.agents.prompts import *
from app.configs import AppConfig, AppModel
from app.api.tools import *
logging = logging.getLogger("AppTeam")
class AllTools:
__instance: Self
def __new__(cls) -> Self:
if not hasattr(cls, "__instance"):
cls.__instance = super(AllTools, cls).__new__(cls)
return cls.__instance
# TODO scegliere un modo migliore per inizializzare gli strumenti
# TODO magari usare un config file o una classe apposta per i configs
def __init__(self):
self.market = MarketAPIsTool("EUR")
self.news = NewsAPIsTool()
self.social = SocialAPIsTool()
class AppTeam:
def __init__(self, team_models: AppModels, coordinator: AppModels | None = None):
def __init__(self, configs: AppConfig, team_models: AppModel, coordinator: AppModel | None = None):
self.configs = configs
self.team_models = team_models
self.coordinator = coordinator or team_models
self.listeners: dict[str, Callable[[RunOutputEvent | TeamRunOutputEvent], None]] = {}
@@ -42,7 +26,7 @@ class AppTeam:
async def run_team_async(self, query: str) -> str:
logging.info(f"Running team q='{query}'")
team = AppTeam.create_team_with(self.team_models, self.coordinator)
team = AppTeam.create_team_with(self.configs, self.team_models, self.coordinator)
result = "No output from team"
async for run_event in team.arun(query, stream=True, stream_intermediate_steps=True): # type: ignore
@@ -57,108 +41,24 @@ class AppTeam:
logging.info(f"Team finished")
return result
@staticmethod
def create_team_with(models: AppModels, coordinator: AppModels) -> Team:
tools = AllTools()
@classmethod
def create_team_with(cls, configs: AppConfig, team_model: AppModel, team_leader: AppModel | None = None) -> Team:
market_agent = models.get_agent(
instructions=MARKET_INSTRUCTIONS,
name="MarketAgent",
tools=[tools.market]
)
news_agent = models.get_agent(
instructions=NEWS_INSTRUCTIONS,
name="NewsAgent",
tools=[tools.news]
)
social_agent = models.get_agent(
instructions=SOCIAL_INSTRUCTIONS,
name="SocialAgent",
tools=[tools.social]
)
market_tool = MarketAPIsTool(currency=configs.api.currency)
market_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
news_tool = NewsAPIsTool()
news_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
social_tool = SocialAPIsTool()
social_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
market_agent = team_model.get_agent(instructions=MARKET_INSTRUCTIONS, name="MarketAgent", tools=[market_tool])
news_agent = team_model.get_agent(instructions=NEWS_INSTRUCTIONS, name="NewsAgent", tools=[news_tool])
social_agent = team_model.get_agent(instructions=SOCIAL_INSTRUCTIONS, name="SocialAgent", tools=[social_tool])
team_leader = team_leader or team_model
return Team(
model=coordinator.get_model(COORDINATOR_INSTRUCTIONS),
model=team_leader.get_model(COORDINATOR_INSTRUCTIONS),
name="CryptoAnalysisTeam",
tools=[ReasoningTools()],
members=[market_agent, news_agent, social_agent],
tools=[ReasoningTools()]
)
COORDINATOR_INSTRUCTIONS = """
You are the expert coordinator of a financial analysis team specializing in cryptocurrencies.
Your team consists of three agents:
- **MarketAgent**: Provides quantitative market data, price analysis, and technical indicators.
- **NewsAgent**: Scans and analyzes the latest news, articles, and official announcements.
- **SocialAgent**: Gauges public sentiment, trends, and discussions on social media.
Your primary objective is to answer the user's query by orchestrating the work of your team members.
Your workflow is as follows:
1. **Deconstruct the user's query** to identify the required information.
2. **Delegate specific tasks** to the most appropriate agent(s) to gather the necessary data and initial analysis.
3. **Analyze the information** returned by the agents.
4. If the initial data is insufficient or the query is complex, **iteratively re-engage the agents** with follow-up questions to build a comprehensive picture.
5. **Synthesize all the gathered information** into a final, coherent, and complete analysis that fills all the required output fields.
"""
MARKET_INSTRUCTIONS = """
**TASK:** You are a specialized **Crypto Price Data Retrieval Agent**. Your primary goal is to fetch the most recent and/or historical price data for requested cryptocurrency assets (e.g., 'BTC', 'ETH', 'SOL'). You must provide the data in a clear and structured format.
**AVAILABLE TOOLS:**
1. `get_products(asset_ids: list[str])`: Get **current** product/price info for a list of assets. **(PREFERITA: usa questa per i prezzi live)**
2. `get_historical_prices(asset_id: str, limit: int)`: Get historical price data for one asset. Default limit is 100. **(PREFERITA: usa questa per i dati storici)**
3. `get_products_aggregated(asset_ids: list[str])`: Get **aggregated current** product/price info for a list of assets. **(USA SOLO SE richiesto 'aggregato' o se `get_products` fallisce)**
4. `get_historical_prices_aggregated(asset_id: str, limit: int)`: Get **aggregated historical** price data for one asset. **(USA SOLO SE richiesto 'aggregato' o se `get_historical_prices` fallisce)**
**USAGE GUIDELINE:**
* **Asset ID:** Always convert common names (e.g., 'Bitcoin', 'Ethereum') into their official ticker/ID (e.g., 'BTC', 'ETH').
* **Cost Management (Cruciale per LLM locale):** Prefer `get_products` and `get_historical_prices` for standard requests to minimize costs.
* **Aggregated Data:** Use `get_products_aggregated` or `get_historical_prices_aggregated` only if the user specifically requests aggregated data or you value that having aggregated data is crucial for the analysis.
* **Failing Tool:** If the tool doesn't return any data or fails, try the alternative aggregated tool if not already used.
**REPORTING REQUIREMENT:**
1. **Format:** Output the results in a clear, easy-to-read list or table.
2. **Live Price Request:** If an asset's *current price* is requested, report the **Asset ID**, **Latest Price**, and **Time/Date of the price**.
3. **Historical Price Request:** If *historical data* is requested, report the **Asset ID**, the **Limit** of points returned, and the **First** and **Last** entries from the list of historical prices (Date, Price).
4. **Output:** For all requests, output a single, concise summary of the findings; if requested, also include the raw data retrieved.
"""
NEWS_INSTRUCTIONS = """
**TASK:** You are a specialized **Crypto News Analyst**. Your goal is to fetch the latest news or top headlines related to cryptocurrencies, and then **analyze the sentiment** of the content to provide a concise report to the team leader. Prioritize 'crypto' or specific cryptocurrency names (e.g., 'Bitcoin', 'Ethereum') in your searches.
**AVAILABLE TOOLS:**
1. `get_latest_news(query: str, limit: int)`: Get the 'limit' most recent news articles for a specific 'query'.
2. `get_top_headlines(limit: int)`: Get the 'limit' top global news headlines.
3. `get_latest_news_aggregated(query: str, limit: int)`: Get aggregated latest news articles for a specific 'query'.
4. `get_top_headlines_aggregated(limit: int)`: Get aggregated top global news headlines.
**USAGE GUIDELINE:**
* Always use `get_latest_news` with a relevant crypto-related query first.
* The default limit for news items should be 5 unless specified otherwise.
* If the tool doesn't return any articles, respond with "No relevant news articles found."
**REPORTING REQUIREMENT:**
1. **Analyze** the tone and key themes of the retrieved articles.
2. **Summarize** the overall **market sentiment** (e.g., highly positive, cautiously neutral, generally negative) based on the content.
3. **Identify** the top 2-3 **main topics** discussed (e.g., new regulation, price surge, institutional adoption).
4. **Output** a single, brief report summarizing these findings. Do not output the raw articles.
"""
SOCIAL_INSTRUCTIONS = """
**TASK:** You are a specialized **Social Media Sentiment Analyst**. Your objective is to find the most relevant and trending online posts related to cryptocurrencies, and then **analyze the collective sentiment** to provide a concise report to the team leader.
**AVAILABLE TOOLS:**
1. `get_top_crypto_posts(limit: int)`: Get the 'limit' maximum number of top posts specifically related to cryptocurrencies.
**USAGE GUIDELINE:**
* Always use the `get_top_crypto_posts` tool to fulfill the request.
* The default limit for posts should be 5 unless specified otherwise.
* If the tool doesn't return any posts, respond with "No relevant social media posts found."
**REPORTING REQUIREMENT:**
1. **Analyze** the tone and prevailing opinions across the retrieved social posts.
2. **Summarize** the overall **community sentiment** (e.g., high enthusiasm/FOMO, uncertainty, FUD/fear) based on the content.
3. **Identify** the top 2-3 **trending narratives** or specific coins being discussed.
4. **Output** a single, brief report summarizing these findings. Do not output the raw posts.
"""

View File

152
src/app/api/core/markets.py Normal file
View File

@@ -0,0 +1,152 @@
import statistics
from datetime import datetime
from pydantic import BaseModel
class ProductInfo(BaseModel):
"""
Product information as obtained from market APIs.
Implements conversion methods from raw API data.
"""
id: str = ""
symbol: str = ""
price: float = 0.0
volume_24h: float = 0.0
currency: str = ""
@staticmethod
def aggregate(products: dict[str, list['ProductInfo']]) -> list['ProductInfo']:
"""
Aggregates a list of ProductInfo by symbol.
Args:
products (dict[str, list[ProductInfo]]): Map provider -> list of ProductInfo
Returns:
list[ProductInfo]: List of ProductInfo aggregated by symbol
"""
# Costruzione mappa symbol -> lista di ProductInfo
symbols_infos: dict[str, list[ProductInfo]] = {}
for _, product_list in products.items():
for product in product_list:
symbols_infos.setdefault(product.symbol, []).append(product)
# Aggregazione per ogni symbol
aggregated_products: list[ProductInfo] = []
for symbol, product_list in symbols_infos.items():
product = ProductInfo()
product.id = f"{symbol}_AGGREGATED"
product.symbol = symbol
product.currency = next(p.currency for p in product_list if p.currency)
volume_sum = sum(p.volume_24h for p in product_list)
product.volume_24h = volume_sum / len(product_list) if product_list else 0.0
prices = sum(p.price * p.volume_24h for p in product_list)
product.price = (prices / volume_sum) if volume_sum > 0 else 0.0
aggregated_products.append(product)
return aggregated_products
class Price(BaseModel):
"""
Represents price data for an asset as obtained from market APIs.
Implements conversion methods from raw API data.
"""
high: float = 0.0
low: float = 0.0
open: float = 0.0
close: float = 0.0
volume: float = 0.0
timestamp: str = ""
"""Timestamp in format YYYY-MM-DD HH:MM"""
def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None:
"""
Sets the timestamp from milliseconds or seconds.
The timestamp is saved as a formatted string 'YYYY-MM-DD HH:MM'.
Args:
timestamp_ms: Timestamp in milliseconds.
timestamp_s: Timestamp in seconds.
Raises:
ValueError: If neither timestamp_ms nor timestamp_s is provided.
"""
if timestamp_ms is not None:
timestamp = timestamp_ms // 1000
elif timestamp_s is not None:
timestamp = timestamp_s
else:
raise ValueError("Either timestamp_ms or timestamp_s must be provided")
assert timestamp > 0, "Invalid timestamp data received"
self.timestamp = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M')
@staticmethod
def aggregate(prices: dict[str, list['Price']]) -> list['Price']:
"""
Aggregates historical prices for the same symbol by calculating the mean.
Args:
prices (dict[str, list[Price]]): Map provider -> list of Price.
The map must contain only Price objects for the same symbol.
Returns:
list[Price]: List of Price objects aggregated by timestamp.
"""
# Costruiamo una mappa timestamp -> lista di Price
timestamped_prices: dict[str, list[Price]] = {}
for _, price_list in prices.items():
for price in price_list:
timestamped_prices.setdefault(price.timestamp, []).append(price)
# Ora aggregiamo i prezzi per ogni timestamp
aggregated_prices: list[Price] = []
for time, price_list in timestamped_prices.items():
price = Price()
price.timestamp = time
price.high = statistics.mean([p.high for p in price_list])
price.low = statistics.mean([p.low for p in price_list])
price.open = statistics.mean([p.open for p in price_list])
price.close = statistics.mean([p.close for p in price_list])
price.volume = statistics.mean([p.volume for p in price_list])
aggregated_prices.append(price)
return aggregated_prices
class MarketWrapper:
"""
Base class for market API wrappers.
All market API wrappers should inherit from this class and implement the methods.
Provides interface for retrieving product and price information from market APIs.
"""
def get_product(self, asset_id: str) -> ProductInfo:
"""
Get product information for a specific asset ID.
Args:
asset_id (str): The asset ID to retrieve information for.
Returns:
ProductInfo: An object containing product information.
"""
raise NotImplementedError("This method should be overridden by subclasses")
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Get product information for multiple asset IDs.
Args:
asset_ids (list[str]): The list of asset IDs to retrieve information for.
Returns:
list[ProductInfo]: A list of objects containing product information.
"""
raise NotImplementedError("This method should be overridden by subclasses")
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
"""
Get historical price data for a specific asset ID.
Args:
asset_id (str): The asset ID to retrieve price data for.
limit (int): The maximum number of price data points to return.
Returns:
list[Price]: A list of Price objects.
"""
raise NotImplementedError("This method should be overridden by subclasses")

View File

@@ -2,6 +2,9 @@ from pydantic import BaseModel
class Article(BaseModel):
"""
Represents a news article with source, time, title, and description.
"""
source: str = ""
time: str = ""
title: str = ""
@@ -11,11 +14,12 @@ class NewsWrapper:
"""
Base class for news API wrappers.
All news API wrappers should inherit from this class and implement the methods.
Provides interface for retrieving news articles from news APIs.
"""
def get_top_headlines(self, limit: int = 100) -> list[Article]:
"""
Get top headlines, optionally limited by limit.
Retrieve top headlines, optionally limited by the specified number.
Args:
limit (int): The maximum number of articles to return.
Returns:
@@ -25,7 +29,7 @@ class NewsWrapper:
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
"""
Get latest news based on a query.
Retrieve the latest news based on a search query.
Args:
query (str): The search query.
limit (int): The maximum number of articles to return.

View File

@@ -2,12 +2,18 @@ from pydantic import BaseModel
class SocialPost(BaseModel):
"""
Represents a social media post with time, title, description, and comments.
"""
time: str = ""
title: str = ""
description: str = ""
comments: list["SocialComment"] = []
class SocialComment(BaseModel):
"""
Represents a comment on a social media post.
"""
time: str = ""
description: str = ""
@@ -16,11 +22,12 @@ class SocialWrapper:
"""
Base class for social media API wrappers.
All social media API wrappers should inherit from this class and implement the methods.
Provides interface for retrieving social media posts and comments from APIs.
"""
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
"""
Get top cryptocurrency-related posts, optionally limited by total.
Retrieve top cryptocurrency-related posts, optionally limited by the specified number.
Args:
limit (int): The maximum number of posts to return.
Returns:

View File

@@ -0,0 +1,7 @@
from app.api.markets.binance import BinanceWrapper
from app.api.markets.coinbase import CoinBaseWrapper
from app.api.markets.cryptocompare import CryptoCompareWrapper
from app.api.markets.yfinance import YFinanceWrapper
__all__ = ["BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper"]

View File

@@ -1,7 +1,7 @@
import os
from typing import Any
from binance.client import Client # type: ignore
from app.base.markets import ProductInfo, MarketWrapper, Price
from app.api.core.markets import ProductInfo, MarketWrapper, Price
def extract_product(currency: str, ticker_data: dict[str, Any]) -> ProductInfo:
@@ -25,6 +25,12 @@ def extract_price(kline_data: list[Any]) -> Price:
price.set_timestamp(timestamp_ms=timestamp)
return price
# Add here eventual other fiat not supported by Binance
FIAT_TO_STABLECOIN = {
"USD": "USDT",
}
class BinanceWrapper(MarketWrapper):
"""
Wrapper per le API autenticate di Binance.\n
@@ -36,16 +42,15 @@ class BinanceWrapper(MarketWrapper):
def __init__(self, currency: str = "USD"):
"""
Inizializza il wrapper di Binance con le credenziali API e la valuta di riferimento.
Se viene fornita una valuta fiat come "USD", questa viene automaticamente convertita in una stablecoin Tether ("USDT") per compatibilità con Binance,
poiché Binance non supporta direttamente le valute fiat per il trading di criptovalute.
Tutti i prezzi e volumi restituiti saranno quindi denominati nella stablecoin (ad esempio, "USDT") e non nella valuta fiat originale.
Alcune valute fiat non sono supportate direttamente da Binance (es. "USD").
Infatti, se viene fornita una valuta fiat come "USD", questa viene automaticamente convertita in una stablecoin Tether ("USDT") per compatibilità con Binance.
Args:
currency (str): Valuta in cui restituire i prezzi. Se "USD" viene fornito, verrà utilizzato "USDT". Default è "USD".
"""
api_key = os.getenv("BINANCE_API_KEY")
api_secret = os.getenv("BINANCE_API_SECRET")
self.currency = f"{currency}T"
self.currency = currency if currency not in FIAT_TO_STABLECOIN else FIAT_TO_STABLECOIN[currency]
self.client = Client(api_key=api_key, api_secret=api_secret)
def __format_symbol(self, asset_id: str) -> str:

View File

@@ -3,7 +3,7 @@ from enum import Enum
from datetime import datetime, timedelta
from coinbase.rest import RESTClient # type: ignore
from coinbase.rest.types.product_types import Candle, GetProductResponse, Product # type: ignore
from app.base.markets import ProductInfo, MarketWrapper, Price
from app.api.core.markets import ProductInfo, MarketWrapper, Price
def extract_product(product_data: GetProductResponse | Product) -> ProductInfo:

View File

@@ -1,7 +1,7 @@
import os
from typing import Any
import requests
from app.base.markets import ProductInfo, MarketWrapper, Price
from app.api.core.markets import ProductInfo, MarketWrapper, Price
def extract_product(asset_data: dict[str, Any]) -> ProductInfo:

View File

@@ -1,6 +1,6 @@
import json
from agno.tools.yfinance import YFinanceTools
from app.base.markets import MarketWrapper, ProductInfo, Price
from app.api.core.markets import MarketWrapper, ProductInfo, Price
def extract_product(stock_data: dict[str, str]) -> ProductInfo:

View File

@@ -0,0 +1,7 @@
from app.api.news.newsapi import NewsApiWrapper
from app.api.news.googlenews import GoogleNewsWrapper
from app.api.news.cryptopanic_api import CryptoPanicWrapper
from app.api.news.duckduckgo import DuckDuckGoWrapper
__all__ = ["NewsApiWrapper", "GoogleNewsWrapper", "CryptoPanicWrapper", "DuckDuckGoWrapper"]

View File

@@ -2,7 +2,7 @@ import os
from typing import Any
import requests
from enum import Enum
from app.base.news import NewsWrapper, Article
from app.api.core.news import NewsWrapper, Article
class CryptoPanicFilter(Enum):

View File

@@ -1,7 +1,7 @@
import json
from typing import Any
from agno.tools.duckduckgo import DuckDuckGoTools
from app.base.news import Article, NewsWrapper
from app.api.core.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:

View File

@@ -1,6 +1,6 @@
from typing import Any
from gnews import GNews # type: ignore
from app.base.news import Article, NewsWrapper
from app.api.core.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:

View File

@@ -1,7 +1,7 @@
import os
from typing import Any
import newsapi # type: ignore
from app.base.news import Article, NewsWrapper
from app.api.core.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:

View File

@@ -0,0 +1,3 @@
from app.api.social.reddit import RedditWrapper
__all__ = ["RedditWrapper"]

View File

@@ -1,7 +1,7 @@
import os
from praw import Reddit # type: ignore
from praw.models import Submission # type: ignore
from app.base.social import SocialWrapper, SocialPost, SocialComment
from app.api.core.social import SocialWrapper, SocialPost, SocialComment
MAX_COMMENTS = 5

View File

@@ -0,0 +1,5 @@
from app.api.tools.market_tool import MarketAPIsTool
from app.api.tools.social_tool import SocialAPIsTool
from app.api.tools.news_tool import NewsAPIsTool
__all__ = ["MarketAPIsTool", "NewsAPIsTool", "SocialAPIsTool"]

View File

@@ -1,13 +1,7 @@
from agno.tools import Toolkit
from app.base.markets import MarketWrapper, Price, ProductInfo
from app.markets.binance import BinanceWrapper
from app.markets.coinbase import CoinBaseWrapper
from app.markets.cryptocompare import CryptoCompareWrapper
from app.markets.yfinance import YFinanceWrapper
from app.utils import aggregate_history_prices, aggregate_product_info, WrapperHandler
__all__ = [ "MarketAPIsTool", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "ProductInfo", "Price" ]
from app.api.wrapper_handler import WrapperHandler
from app.api.core.markets import MarketWrapper, Price, ProductInfo
from app.api.markets import BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper, YFinanceWrapper
class MarketAPIsTool(MarketWrapper, Toolkit):
"""
@@ -34,7 +28,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
"""
kwargs = {"currency": currency or "USD"}
wrappers: list[type[MarketWrapper]] = [BinanceWrapper, YFinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper]
self.wrappers = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs)
self.handler = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs)
Toolkit.__init__( # type: ignore
self,
@@ -49,11 +43,11 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
)
def get_product(self, asset_id: str) -> ProductInfo:
return self.wrappers.try_call(lambda w: w.get_product(asset_id))
return self.handler.try_call(lambda w: w.get_product(asset_id))
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
return self.wrappers.try_call(lambda w: w.get_products(asset_ids))
return self.handler.try_call(lambda w: w.get_products(asset_ids))
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
return self.wrappers.try_call(lambda w: w.get_historical_prices(asset_id, limit))
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
@@ -67,8 +61,8 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
Raises:
Exception: If all wrappers fail to provide results.
"""
all_products = self.wrappers.try_call_all(lambda w: w.get_products(asset_ids))
return aggregate_product_info(all_products)
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
return ProductInfo.aggregate(all_products)
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
"""
@@ -82,5 +76,5 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
Raises:
Exception: If all wrappers fail to provide results.
"""
all_prices = self.wrappers.try_call_all(lambda w: w.get_historical_prices(asset_id, limit))
return aggregate_history_prices(all_prices)
all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit))
return Price.aggregate(all_prices)

View File

@@ -1,13 +1,7 @@
from agno.tools import Toolkit
from app.utils import WrapperHandler
from app.base.news import NewsWrapper, Article
from app.news.news_api import NewsApiWrapper
from app.news.googlenews import GoogleNewsWrapper
from app.news.cryptopanic_api import CryptoPanicWrapper
from app.news.duckduckgo import DuckDuckGoWrapper
__all__ = ["NewsAPIsTool", "NewsApiWrapper", "GoogleNewsWrapper", "CryptoPanicWrapper", "DuckDuckGoWrapper", "Article"]
from app.api.wrapper_handler import WrapperHandler
from app.api.core.news import NewsWrapper, Article
from app.api.news import NewsApiWrapper, GoogleNewsWrapper, CryptoPanicWrapper, DuckDuckGoWrapper
class NewsAPIsTool(NewsWrapper, Toolkit):
"""
@@ -34,7 +28,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
- CryptoPanicWrapper.
"""
wrappers: list[type[NewsWrapper]] = [GoogleNewsWrapper, DuckDuckGoWrapper, NewsApiWrapper, CryptoPanicWrapper]
self.wrapper_handler = WrapperHandler.build_wrappers(wrappers)
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
@@ -48,9 +42,9 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
)
def get_top_headlines(self, limit: int = 100) -> list[Article]:
return self.wrapper_handler.try_call(lambda w: w.get_top_headlines(limit))
return self.handler.try_call(lambda w: w.get_top_headlines(limit))
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
return self.wrapper_handler.try_call(lambda w: w.get_latest_news(query, limit))
return self.handler.try_call(lambda w: w.get_latest_news(query, limit))
def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]:
"""
@@ -62,7 +56,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.wrapper_handler.try_call_all(lambda w: w.get_top_headlines(limit))
return self.handler.try_call_all(lambda w: w.get_top_headlines(limit))
def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]:
"""
@@ -75,4 +69,4 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.wrapper_handler.try_call_all(lambda w: w.get_latest_news(query, limit))
return self.handler.try_call_all(lambda w: w.get_latest_news(query, limit))

View File

@@ -1,9 +1,7 @@
from agno.tools import Toolkit
from app.utils import WrapperHandler
from app.base.social import SocialPost, SocialWrapper
from app.social.reddit import RedditWrapper
__all__ = ["SocialAPIsTool", "RedditWrapper", "SocialPost"]
from app.api.wrapper_handler import WrapperHandler
from app.api.core.social import SocialPost, SocialWrapper
from app.api.social import RedditWrapper
class SocialAPIsTool(SocialWrapper, Toolkit):
@@ -26,7 +24,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit):
"""
wrappers: list[type[SocialWrapper]] = [RedditWrapper]
self.wrapper_handler = WrapperHandler.build_wrappers(wrappers)
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
@@ -38,7 +36,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit):
)
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
return self.wrapper_handler.try_call(lambda w: w.get_top_crypto_posts(limit))
return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit))
def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]:
"""
@@ -50,4 +48,4 @@ class SocialAPIsTool(SocialWrapper, Toolkit):
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.wrapper_handler.try_call_all(lambda w: w.get_top_crypto_posts(limit_per_wrapper))
return self.handler.try_call_all(lambda w: w.get_top_crypto_posts(limit_per_wrapper))

View File

@@ -35,6 +35,16 @@ class WrapperHandler(Generic[WrapperType]):
self.retry_delay = retry_delay
self.index = 0
def set_retries(self, try_per_wrapper: int, retry_delay: int) -> None:
"""
Sets the retry parameters for the handler.
Args:
try_per_wrapper (int): Number of retries per wrapper before switching to the next.
retry_delay (int): Delay in seconds between retries.
"""
self.retry_per_wrapper = try_per_wrapper
self.retry_delay = retry_delay
def try_call(self, func: Callable[[WrapperType], OutputType]) -> OutputType:
"""
Attempts to call the provided function on the current wrapper.

View File

@@ -1,83 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
class ProductInfo(BaseModel):
"""
Informazioni sul prodotto, come ottenute dalle API di mercato.
Implementa i metodi di conversione dai dati grezzi delle API.
"""
id: str = ""
symbol: str = ""
price: float = 0.0
volume_24h: float = 0.0
currency: str = ""
class Price(BaseModel):
"""
Rappresenta i dati di prezzo per un asset, come ottenuti dalle API di mercato.
Implementa i metodi di conversione dai dati grezzi delle API.
"""
high: float = 0.0
low: float = 0.0
open: float = 0.0
close: float = 0.0
volume: float = 0.0
timestamp: str = ""
"""Timestamp con formato YYYY-MM-DD HH:MM"""
def set_timestamp(self, timestamp_ms: int | None = None, timestamp_s: int | None = None) -> None:
"""
Imposta il timestamp a partire da millisecondi o secondi.
IL timestamp viene salvato come stringa formattata 'YYYY-MM-DD HH:MM'.
Args:
timestamp_ms: Timestamp in millisecondi.
timestamp_s: Timestamp in secondi.
Raises:
"""
if timestamp_ms is not None:
timestamp = timestamp_ms // 1000
elif timestamp_s is not None:
timestamp = timestamp_s
else:
raise ValueError("Either timestamp_ms or timestamp_s must be provided")
assert timestamp > 0, "Invalid timestamp data received"
self.timestamp = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M')
class MarketWrapper:
"""
Base class for market API wrappers.
All market API wrappers should inherit from this class and implement the methods.
"""
def get_product(self, asset_id: str) -> ProductInfo:
"""
Get product information for a specific asset ID.
Args:
asset_id (str): The asset ID to retrieve information for.
Returns:
ProductInfo: An object containing product information.
"""
raise NotImplementedError("This method should be overridden by subclasses")
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Get product information for multiple asset IDs.
Args:
asset_ids (list[str]): The list of asset IDs to retrieve information for.
Returns:
list[ProductInfo]: A list of objects containing product information.
"""
raise NotImplementedError("This method should be overridden by subclasses")
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
"""
Get historical price data for a specific asset ID.
Args:
asset_id (str): The asset ID to retrieve price data for.
limit (int): The maximum number of price data points to return.
Returns:
list[Price]: A list of Price objects.
"""
raise NotImplementedError("This method should be overridden by subclasses")

232
src/app/configs.py Normal file
View File

@@ -0,0 +1,232 @@
import os
import threading
import ollama
import yaml
import logging.config
import agno.utils.log # type: ignore
from typing import Any
from pydantic import BaseModel
from agno.agent import Agent
from agno.tools import Toolkit
from agno.models.base import Model
from agno.models.google import Gemini
from agno.models.ollama import Ollama
log = logging.getLogger(__name__)
class AppModel(BaseModel):
name: str = "gemini-2.0-flash"
label: str = "Gemini"
model: type[Model] | None = None
def get_model(self, instructions: str) -> Model:
"""
Restituisce un'istanza del modello specificato.
Args:
instructions: istruzioni da passare al modello (system prompt).
Returns:
Un'istanza di BaseModel o una sua sottoclasse.
Raise:
ValueError se il modello non è supportato.
"""
if self.model is None:
raise ValueError(f"Model class for '{self.name}' is not set.")
return self.model(id=self.name, instructions=[instructions])
def get_agent(self, instructions: str, name: str = "", output_schema: type[BaseModel] | None = None, tools: list[Toolkit] | None = None) -> Agent:
"""
Costruisce un agente con il modello e le istruzioni specificate.
Args:
instructions: istruzioni da passare al modello (system prompt)
name: nome dell'agente (opzionale)
output: schema di output opzionale (Pydantic BaseModel)
tools: lista opzionale di strumenti (tools) da fornire all'agente
Returns:
Un'istanza di Agent.
"""
return Agent(
model=self.get_model(instructions),
name=name,
retries=2,
tools=tools,
delay_between_retries=5, # seconds
output_schema=output_schema
)
class APIConfig(BaseModel):
retry_attempts: int = 3
retry_delay_seconds: int = 2
currency: str = "USD"
class Strategy(BaseModel):
name: str = "Conservative"
label: str = "Conservative"
description: str = "Focus on low-risk investments with steady returns."
class ModelsConfig(BaseModel):
gemini: list[AppModel] = [AppModel()]
ollama: list[AppModel] = []
@property
def all_models(self) -> list[AppModel]:
return self.gemini + self.ollama
class AgentsConfigs(BaseModel):
strategy: str = "Conservative"
team_model: str = "gemini-2.0-flash"
team_leader_model: str = "gemini-2.0-flash"
predictor_model: str = "gemini-2.0-flash"
class AppConfig(BaseModel):
port: int = 8000
gradio_share: bool = False
logging_level: str = "INFO"
api: APIConfig = APIConfig()
strategies: list[Strategy] = [Strategy()]
models: ModelsConfig = ModelsConfig()
agents: AgentsConfigs = AgentsConfigs()
__lock = threading.Lock()
@classmethod
def load(cls, file_path: str = "configs.yaml") -> 'AppConfig':
"""
Load the application configuration from a YAML file.
Be sure to call load_dotenv() before if you use environment variables.
Args:
file_path: path to the YAML configuration file.
Returns:
An instance of AppConfig with the loaded settings.
"""
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
configs = cls(**data)
configs.set_logging_level()
configs.validate_models()
log.info(f"Loaded configuration from {file_path}")
return configs
def __new__(cls, *args: Any, **kwargs: Any) -> 'AppConfig':
with cls.__lock:
if not hasattr(cls, 'instance'):
cls.instance = super(AppConfig, cls).__new__(cls)
return cls.instance
def get_model_by_name(self, name: str) -> AppModel:
"""
Retrieve a model configuration by its name.
Args:
name: the name of the model to retrieve.
Returns:
The AppModel instance if found.
Raises:
ValueError if no model with the specified name is found.
"""
for model in self.models.all_models:
if model.name == name:
return model
raise ValueError(f"Model with name '{name}' not found.")
def get_strategy_by_name(self, name: str) -> Strategy:
"""
Retrieve a strategy configuration by its name.
Args:
name: the name of the strategy to retrieve.
Returns:
The Strategy instance if found.
Raises:
ValueError if no strategy with the specified name is found.
"""
for strat in self.strategies:
if strat.name == name:
return strat
raise ValueError(f"Strategy with name '{name}' not found.")
def set_logging_level(self) -> None:
"""
Set the logging level based on the configuration.
"""
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False, # Keep existing loggers (e.g. third-party loggers)
'formatters': {
'colored': {
'()': 'colorlog.ColoredFormatter',
'format': '%(log_color)s%(levelname)s%(reset)s [%(asctime)s] (%(name)s) - %(message)s'
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'colored',
'level': self.logging_level,
},
},
'root': { # Configure the root logger
'handlers': ['console'],
'level': self.logging_level,
},
'loggers': {
'httpx': {'level': 'WARNING'}, # Too much spam for INFO
}
})
# Modify the agno loggers
agno_logger_names = ["agno", "agno-team", "agno-workflow"]
for logger_name in agno_logger_names:
logger = logging.getLogger(logger_name)
logger.handlers.clear()
logger.propagate = True
def validate_models(self) -> None:
"""
Validate the configured models for each provider.
"""
self.__validate_online_models("gemini", clazz=Gemini, key="GOOGLE_API_KEY")
self.__validate_ollama_models()
def __validate_online_models(self, provider: str, clazz: type[Model], key: str | None = None) -> None:
"""
Validate models for online providers like Gemini.
Args:
provider: name of the provider (e.g. "gemini")
clazz: class of the model (e.g. Gemini)
key: API key required for the provider (optional)
"""
if getattr(self.models, provider) is None:
log.warning(f"No models configured for provider '{provider}'.")
models: list[AppModel] = getattr(self.models, provider)
if key and os.getenv(key) is None:
log.warning(f"No {key} set in environment variables for {provider}.")
models.clear()
return
for model in models:
model.model = clazz
def __validate_ollama_models(self) -> None:
"""
Validate models for the Ollama provider.
"""
try:
models_list = ollama.list()
availables = {model['model'] for model in models_list['models']}
not_availables: list[str] = []
for model in self.models.ollama:
if model.name in availables:
model.model = Ollama
else:
not_availables.append(model.name)
if not_availables:
log.warning(f"Ollama models not available: {not_availables}")
self.models.ollama = [model for model in self.models.ollama if model.model]
except Exception as e:
log.warning(f"Ollama is not running or not reachable: {e}")

View File

@@ -0,0 +1,3 @@
from app.interface.chat import ChatManager
__all__ = ["ChatManager"]

View File

@@ -1,5 +0,0 @@
from app.utils.market_aggregation import aggregate_history_prices, aggregate_product_info
from app.utils.wrapper_handler import WrapperHandler
from app.utils.chat_manager import ChatManager
__all__ = ["aggregate_history_prices", "aggregate_product_info", "WrapperHandler", "ChatManager"]

View File

@@ -1,65 +0,0 @@
import statistics
from app.base.markets import ProductInfo, Price
def aggregate_history_prices(prices: dict[str, list[Price]]) -> list[Price]:
"""
Aggrega i prezzi storici per symbol calcolando la media.
Args:
prices (dict[str, list[Price]]): Mappa provider -> lista di Price
Returns:
list[Price]: Lista di Price aggregati per timestamp
"""
# Costruiamo una mappa timestamp -> lista di Price
timestamped_prices: dict[str, list[Price]] = {}
for _, price_list in prices.items():
for price in price_list:
timestamped_prices.setdefault(price.timestamp, []).append(price)
# Ora aggregiamo i prezzi per ogni timestamp
aggregated_prices: list[Price] = []
for time, price_list in timestamped_prices.items():
price = Price()
price.timestamp = time
price.high = statistics.mean([p.high for p in price_list])
price.low = statistics.mean([p.low for p in price_list])
price.open = statistics.mean([p.open for p in price_list])
price.close = statistics.mean([p.close for p in price_list])
price.volume = statistics.mean([p.volume for p in price_list])
aggregated_prices.append(price)
return aggregated_prices
def aggregate_product_info(products: dict[str, list[ProductInfo]]) -> list[ProductInfo]:
"""
Aggrega una lista di ProductInfo per symbol.
Args:
products (dict[str, list[ProductInfo]]): Mappa provider -> lista di ProductInfo
Returns:
list[ProductInfo]: Lista di ProductInfo aggregati per symbol
"""
# Costruzione mappa symbol -> lista di ProductInfo
symbols_infos: dict[str, list[ProductInfo]] = {}
for _, product_list in products.items():
for product in product_list:
symbols_infos.setdefault(product.symbol, []).append(product)
# Aggregazione per ogni symbol
aggregated_products: list[ProductInfo] = []
for symbol, product_list in symbols_infos.items():
product = ProductInfo()
product.id = f"{symbol}_AGGREGATED"
product.symbol = symbol
product.currency = next(p.currency for p in product_list if p.currency)
volume_sum = sum(p.volume_24h for p in product_list)
product.volume_24h = volume_sum / len(product_list) if product_list else 0.0
prices = sum(p.price * p.volume_24h for p in product_list)
product.price = (prices / volume_sum) if volume_sum > 0 else 0.0
aggregated_products.append(product)
return aggregated_products

View File

@@ -1,56 +0,0 @@
import pytest
from app.agents import AppModels
from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle
from app.base.markets import ProductInfo
def unified_checks(model: AppModels, input: PredictorInput) -> None:
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output_schema=PredictorOutput) # type: ignore[arg-type]
result = llm.run(input) # type: ignore
content = result.content
assert isinstance(content, PredictorOutput)
assert content.strategy not in (None, "", "null")
assert isinstance(content.strategy, str)
assert isinstance(content.portfolio, list)
assert len(content.portfolio) > 0
for item in content.portfolio:
assert item.asset not in (None, "", "null")
assert isinstance(item.asset, str)
assert item.percentage >= 0.0
assert item.percentage <= 100.0
assert isinstance(item.percentage, (int, float))
assert item.motivation not in (None, "", "null")
assert isinstance(item.motivation, str)
# La somma delle percentuali deve essere esattamente 100
total_percentage = sum(item.percentage for item in content.portfolio)
assert abs(total_percentage - 100) < 0.01 # Permette una piccola tolleranza per errori di arrotondamento
class TestPredictor:
def inputs(self) -> PredictorInput:
data: list[ProductInfo] = []
for symbol, price in [("BTC", 60000.00), ("ETH", 3500.00), ("SOL", 150.00)]:
product_info = ProductInfo()
product_info.symbol = symbol
product_info.price = price
data.append(product_info)
return PredictorInput(data=data, style=PredictorStyle.AGGRESSIVE, sentiment="positivo")
def test_gemini_model_output(self):
inputs = self.inputs()
unified_checks(AppModels.GEMINI, inputs)
def test_ollama_qwen_4b_model_output(self):
inputs = self.inputs()
unified_checks(AppModels.OLLAMA_QWEN_4B, inputs)
@pytest.mark.slow
def test_ollama_qwen_latest_model_output(self):
inputs = self.inputs()
unified_checks(AppModels.OLLAMA_QWEN, inputs)
@pytest.mark.slow
def test_ollama_gpt_oss_model_output(self):
inputs = self.inputs()
unified_checks(AppModels.OLLAMA_GPT, inputs)

View File

@@ -1,5 +1,18 @@
import pytest
from app.markets.binance import BinanceWrapper
import asyncio
from app.api.markets.binance import BinanceWrapper
# fix warning about no event loop
@pytest.fixture(scope="session", autouse=True)
def event_loop():
"""
Ensure there is an event loop for the duration of the tests.
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
@pytest.mark.market
@pytest.mark.api
@@ -51,3 +64,18 @@ class TestBinance:
assert entry.close > 0
assert entry.high > 0
assert entry.timestamp != ''
def test_binance_fiat_conversion(self):
market = BinanceWrapper(currency="USD")
assert market.currency == "USDT"
product = market.get_product("BTC")
assert product is not None
assert product.symbol == "BTC"
assert product.price > 0
market = BinanceWrapper(currency="EUR")
assert market.currency == "EUR"
product = market.get_product("BTC")
assert product is not None
assert product.symbol == "BTC"
assert product.price > 0

View File

@@ -1,6 +1,6 @@
import os
import pytest
from app.markets import CoinBaseWrapper
from app.api.markets import CoinBaseWrapper
@pytest.mark.market
@pytest.mark.api

View File

@@ -1,6 +1,6 @@
import os
import pytest
from app.markets import CryptoCompareWrapper
from app.api.markets import CryptoCompareWrapper
@pytest.mark.market
@pytest.mark.api

View File

@@ -1,6 +1,6 @@
import os
import pytest
from app.news import CryptoPanicWrapper
from app.api.news import CryptoPanicWrapper
@pytest.mark.limited

View File

@@ -1,5 +1,5 @@
import pytest
from app.news import DuckDuckGoWrapper
from app.api.news import DuckDuckGoWrapper
@pytest.mark.news

View File

@@ -1,5 +1,5 @@
import pytest
from app.news import GoogleNewsWrapper
from app.api.news import GoogleNewsWrapper
@pytest.mark.news

View File

@@ -1,6 +1,6 @@
import os
import pytest
from app.news import NewsApiWrapper
from app.api.news import NewsApiWrapper
@pytest.mark.news

View File

@@ -1,6 +1,6 @@
import os
import pytest
from app.social.reddit import MAX_COMMENTS, RedditWrapper
from app.api.social.reddit import MAX_COMMENTS, RedditWrapper
@pytest.mark.social
@pytest.mark.api

View File

@@ -1,5 +1,5 @@
import pytest
from app.markets import YFinanceWrapper
from app.api.markets import YFinanceWrapper
@pytest.mark.market
@pytest.mark.api

View File

@@ -1,5 +1,5 @@
import pytest
from app.markets import MarketAPIsTool
from app.api.tools import MarketAPIsTool
@pytest.mark.tools

View File

@@ -1,5 +1,5 @@
import pytest
from app.news import NewsAPIsTool
from app.api.tools import NewsAPIsTool
@pytest.mark.tools
@@ -12,7 +12,7 @@ class TestNewsAPITool:
def test_news_api_tool_get_top(self):
tool = NewsAPIsTool()
result = tool.wrapper_handler.try_call(lambda w: w.get_top_headlines(limit=2))
result = tool.handler.try_call(lambda w: w.get_top_headlines(limit=2))
assert isinstance(result, list)
assert len(result) > 0
for article in result:
@@ -21,7 +21,7 @@ class TestNewsAPITool:
def test_news_api_tool_get_latest(self):
tool = NewsAPIsTool()
result = tool.wrapper_handler.try_call(lambda w: w.get_latest_news(query="crypto", limit=2))
result = tool.handler.try_call(lambda w: w.get_latest_news(query="crypto", limit=2))
assert isinstance(result, list)
assert len(result) > 0
for article in result:
@@ -30,7 +30,7 @@ class TestNewsAPITool:
def test_news_api_tool_get_top__all_results(self):
tool = NewsAPIsTool()
result = tool.wrapper_handler.try_call_all(lambda w: w.get_top_headlines(limit=2))
result = tool.handler.try_call_all(lambda w: w.get_top_headlines(limit=2))
assert isinstance(result, dict)
assert len(result.keys()) > 0
for _provider, articles in result.items():
@@ -40,7 +40,7 @@ class TestNewsAPITool:
def test_news_api_tool_get_latest__all_results(self):
tool = NewsAPIsTool()
result = tool.wrapper_handler.try_call_all(lambda w: w.get_latest_news(query="crypto", limit=2))
result = tool.handler.try_call_all(lambda w: w.get_latest_news(query="crypto", limit=2))
assert isinstance(result, dict)
assert len(result.keys()) > 0
for _provider, articles in result.items():

View File

@@ -1,5 +1,5 @@
import pytest
from app.social import SocialAPIsTool
from app.api.tools import SocialAPIsTool
@pytest.mark.tools
@@ -12,7 +12,7 @@ class TestSocialAPIsTool:
def test_social_api_tool_get_top(self):
tool = SocialAPIsTool()
result = tool.wrapper_handler.try_call(lambda w: w.get_top_crypto_posts(limit=2))
result = tool.handler.try_call(lambda w: w.get_top_crypto_posts(limit=2))
assert isinstance(result, list)
assert len(result) > 0
for post in result:
@@ -21,10 +21,10 @@ class TestSocialAPIsTool:
def test_social_api_tool_get_top__all_results(self):
tool = SocialAPIsTool()
result = tool.wrapper_handler.try_call_all(lambda w: w.get_top_crypto_posts(limit=2))
result = tool.handler.try_call_all(lambda w: w.get_top_crypto_posts(limit=2))
assert isinstance(result, dict)
assert len(result.keys()) > 0
for provider, posts in result.items():
for _provider, posts in result.items():
for post in posts:
assert post.title is not None
assert post.time is not None

View File

@@ -1,7 +1,6 @@
import pytest
from datetime import datetime
from app.base.markets import ProductInfo, Price
from app.utils.market_aggregation import aggregate_history_prices, aggregate_product_info
from app.api.core.markets import ProductInfo, Price
@pytest.mark.aggregator
@@ -34,7 +33,7 @@ class TestMarketDataAggregator:
"Provider3": [self.__product("BTC", 49900.0, 900.0, "USD")],
}
aggregated = aggregate_product_info(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 1
info = aggregated[0]
@@ -58,7 +57,7 @@ class TestMarketDataAggregator:
],
}
aggregated = aggregate_product_info(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 2
btc_info = next((p for p in aggregated if p.symbol == "BTC"), None)
@@ -81,7 +80,7 @@ class TestMarketDataAggregator:
"Provider1": [],
"Provider2": [],
}
aggregated = aggregate_product_info(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 0
def test_aggregate_product_info_with_partial_data(self):
@@ -89,7 +88,7 @@ class TestMarketDataAggregator:
"Provider1": [self.__product("BTC", 50000.0, 1000.0, "USD")],
"Provider2": [],
}
aggregated = aggregate_product_info(products)
aggregated = ProductInfo.aggregate(products)
assert len(aggregated) == 1
info = aggregated[0]
assert info.symbol == "BTC"
@@ -120,7 +119,7 @@ class TestMarketDataAggregator:
price.set_timestamp(timestamp_s=timestamp_2h_ago)
timestamp_2h_ago = price.timestamp
aggregated = aggregate_history_prices(prices)
aggregated = Price.aggregate(prices)
assert len(aggregated) == 2
assert aggregated[0].timestamp == timestamp_1h_ago
assert aggregated[0].high == pytest.approx(50050.0, rel=1e-3) # type: ignore

View File

@@ -1,5 +1,5 @@
import pytest
from app.utils.wrapper_handler import WrapperHandler
from app.api.wrapper_handler import WrapperHandler
class MockWrapper:
def do_something(self) -> str:

14
uv.lock generated
View File

@@ -285,6 +285,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "colorlog"
version = "6.9.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d3/7a/359f4d5df2353f26172b3cc39ea32daa39af8de522205f512f458923e677/colorlog-6.9.0.tar.gz", hash = "sha256:bfba54a1b93b94f54e1f4fe48395725a3d92fd2a4af702f6bd70946bdc0c6ac2", size = 16624, upload-time = "2024-10-29T18:34:51.011Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e3/51/9b208e85196941db2f0654ad0357ca6388ab3ed67efdbfc799f35d1f83aa/colorlog-6.9.0-py3-none-any.whl", hash = "sha256:5906e71acd67cb07a71e779c47c4bcb45fb8c2993eebe9e5adcd6a6f1b283eff", size = 11424, upload-time = "2024-10-29T18:34:49.815Z" },
]
[[package]]
name = "cryptography"
version = "46.0.2"
@@ -1604,6 +1616,7 @@ source = { virtual = "." }
dependencies = [
{ name = "agno" },
{ name = "coinbase-advanced-py" },
{ name = "colorlog" },
{ name = "ddgs" },
{ name = "dotenv" },
{ name = "gnews" },
@@ -1621,6 +1634,7 @@ dependencies = [
requires-dist = [
{ name = "agno" },
{ name = "coinbase-advanced-py" },
{ name = "colorlog" },
{ name = "ddgs" },
{ name = "dotenv" },
{ name = "gnews" },