Configurazioni dell'app (#27)

* Prompt messi in una cartella apposta
* Aggiorna importazioni demo per riflettere la nuova struttura delle cartelle API
* Aggiunto configurazione dell'applicazione
* Spostato ChatManager in app.interface
* Update README.md
* Aggiornato config per app & api
* Rinominato il modulo NewsAPI
* fix main infinite loop
* API base --> core
* pattern singleton per AppConfig.
* Estratto i tools nella loro cartella --> api/tools
* fix main KeyboardInterrupt
* update tests
* Docker & libs
* fix copilot suggestions
This commit was merged in pull request #27.
This commit is contained in:
Giacomo Bertolazzi
2025-10-12 18:05:43 +02:00
committed by GitHub
parent 093a7f5a48
commit 862525cc62
49 changed files with 718 additions and 564 deletions

View File

@@ -17,6 +17,7 @@ COPY pyproject.toml ./
COPY uv.lock ./
COPY LICENSE ./
COPY src/ ./src/
COPY configs.yaml ./
# Creiamo l'ambiente virtuale con tutto già presente
RUN uv sync

View File

@@ -21,7 +21,7 @@ L'obiettivo è quello di creare un sistema di consulenza finanziaria basato su L
L'installazione di questo progetto richiede 3 passaggi totali (+1 se si vuole sviluppare in locale) che devono essere eseguiti in sequenza. Se questi passaggi sono eseguiti correttamente, l'applicazione dovrebbe partire senza problemi. Altrimenti è molto probabile che si verifichino errori di vario tipo (moduli mancanti, chiavi API non trovate, ecc.).
1. Configurare le variabili d'ambiente
1. Configurazioni dell'app e delle variabili d'ambiente
2. Installare Ollama e i modelli locali
3. Far partire il progetto con Docker (consigliato)
4. (Solo per sviluppo locale) Installare uv e creare l'ambiente virtuale
@@ -29,9 +29,12 @@ L'installazione di questo progetto richiede 3 passaggi totali (+1 se si vuole sv
> [!IMPORTANT]\
> Prima di iniziare, assicurarsi di avere clonato il repository e di essere nella cartella principale del progetto.
### **1. Variabili d'Ambiente**
### **1. Configurazioni**
Copia il file `.env.example` in `.env` e successivamente modificalo con le tue API keys:
Ci sono due file di configurazione principali che l'app utilizza: `config.yaml` e `.env`.\
Il primo contiene le configurazioni generali dell'applicazione e può essere modificato a piacimento, mentre il secondo è utilizzato per le variabili d'ambiente.
Per il secondo, bisogna copiare il file `.env.example` in `.env` e successivamente modificalo con le tue API keys:
```sh
cp .env.example .env
nano .env # esempio di modifica del file
@@ -49,11 +52,8 @@ Per l'installazione scaricare Ollama dal loro [sito ufficiale](https://ollama.co
Dopo l'installazione, si possono iniziare a scaricare i modelli desiderati tramite il comando `ollama pull <model>:<tag>`.
I modelli usati dall'applicazione sono visibili in [src/app/models.py](src/app/models.py). Di seguito metto lo stesso una lista di modelli, ma potrebbe non essere aggiornata:
- `gpt-oss:latest`
- `qwen3:latest`
- `qwen3:4b`
- `qwen3:1.7b`
I modelli usati dall'applicazione sono quelli specificati nel file [config.yaml](config.yaml) alla voce `model`. Se in locale si hanno dei modelli diversi, è possibile modificare questa voce per usare quelli disponibili.
I modelli consigliati per questo progetto sono `qwen3:4b` e `qwen3:1.7b`.
### **3. Docker**
Se si vuole solamente avviare il progetto, si consiglia di utilizzare [Docker](https://www.docker.com), dato che sono stati creati i files [Dockerfile](Dockerfile) e [docker-compose.yaml](docker-compose.yaml) per creare il container con tutti i file necessari e già in esecuzione.
@@ -105,13 +105,15 @@ Usando la libreria ``gradio`` è stata creata un'interfaccia web semplice per in
src
└── app
├── __main__.py
├── agents <-- Agenti, modelli, prompts e simili
├── config.py <-- Configurazioni app
├── agents <-- Agenti, Team, prompts e simili
├── api <-- Tutte le API esterne
│ ├── base <-- Classi base per le API
│ ├── core <-- Classi core per le API
│ ├── markets <-- Market data provider (Es. Binance)
│ ├── news <-- News data provider (Es. NewsAPI)
── social <-- Social data provider (Es. Reddit)
└── utils <-- Codice di utilità generale
── social <-- Social data provider (Es. Reddit)
│ └── tools <-- Tools per agenti creati dalle API
└── interface <-- Interfacce utente
```
## Tests

45
configs.yaml Normal file
View File

@@ -0,0 +1,45 @@
port: 8000
gradio_share: false
logging_level: INFO
strategies:
- name: Conservative
label: Conservative
description: Focus on stable and low-risk investments.
- name: Balanced
label: Balanced
description: A mix of growth and stability.
- name: Aggressive
label: Aggressive
description: High-risk, high-reward investments.
models:
gemini:
- name: gemini-2.0-flash
label: Gemini
- name: gemini-2.0-pro
label: Gemini Pro
ollama:
- name: gpt-oss:latest
label: Ollama GPT
- name: qwen3:8b
label: Qwen 3 (8B)
- name: qwen3:4b
label: Qwen 3 (4B)
- name: qwen3:1.7b
label: Qwen 3 (1.7B)
api:
retry_attempts: 3
retry_delay_seconds: 2
currency: EUR
# TODO Magari implementare un sistema per settare i providers
market_providers: [BinanceWrapper, YFinanceWrapper]
news_providers: [GoogleNewsWrapper, DuckDuckGoWrapper]
social_providers: [RedditWrapper]
agents:
strategy: Conservative
team_model: qwen3:1.7b
team_leader_model: qwen3:4b
predictor_model: qwen3:4b

View File

@@ -27,7 +27,7 @@ project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from dotenv import load_dotenv
from app.markets import (
from app.api.markets import (
CoinBaseWrapper,
CryptoCompareWrapper,
BinanceWrapper,

View File

@@ -5,7 +5,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../src'
###########################################
from dotenv import load_dotenv
from app.news import NewsApiWrapper
from app.api.news import NewsApiWrapper
def main():
api = NewsApiWrapper()

View File

@@ -13,6 +13,7 @@ dependencies = [
"pytest", # Test
"dotenv", # Gestire variabili d'ambiente (generalmente API keys od opzioni)
"gradio", # UI web semplice con user_input e output
"colorlog", # Log colorati in console
# Per costruire agenti (ovvero modelli che possono fare più cose tramite tool) https://github.com/agno-agi/agno
# altamente consigliata dato che ha anche tools integrati per fare scraping, calcoli e molto altro

View File

@@ -1,14 +1,19 @@
import asyncio
import gradio as gr
from dotenv import load_dotenv
from agno.utils.log import log_info #type: ignore
from app.utils import ChatManager
from app.configs import AppConfig
from app.interface import ChatManager
from app.agents import Pipeline
if __name__ == "__main__":
# Inizializzazioni
load_dotenv()
pipeline = Pipeline()
configs = AppConfig.load()
pipeline = Pipeline(configs)
chat = ChatManager()
########################################
@@ -57,7 +62,7 @@ if __name__ == "__main__":
type="index",
label="Stile di investimento"
)
style.change(fn=pipeline.choose_style, inputs=style, outputs=None)
style.change(fn=pipeline.choose_strategy, inputs=style, outputs=None)
chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages")
msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?")
@@ -73,7 +78,9 @@ if __name__ == "__main__":
save_btn.click(save_current_chat, inputs=None, outputs=None)
load_btn.click(load_previous_chat, inputs=None, outputs=[chatbot, chatbot])
server, port = ("0.0.0.0", 8000) # 0.0.0.0 per accesso esterno (Docker)
server_log = "localhost" if server == "0.0.0.0" else server
log_info(f"Starting UPO AppAI Chat on http://{server_log}:{port}") # noqa
demo.launch(server_name=server, server_port=port, quiet=True)
try:
_app, local, shared = demo.launch(server_name="0.0.0.0", server_port=configs.port, quiet=True, prevent_thread_lock=True, share=configs.gradio_share)
log_info(f"Starting UPO AppAI Chat on {shared or local}")
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
demo.close()

View File

@@ -1,6 +1,5 @@
from app.agents.models import AppModels
from app.agents.predictor import PredictorInput, PredictorOutput, PredictorStyle, PREDICTOR_INSTRUCTIONS
from app.agents.predictor import PredictorInput, PredictorOutput
from app.agents.team import create_team_with
from app.agents.pipeline import Pipeline
__all__ = ["AppModels", "PredictorInput", "PredictorOutput", "PredictorStyle", "PREDICTOR_INSTRUCTIONS", "create_team_with", "Pipeline"]
__all__ = ["PredictorInput", "PredictorOutput", "create_team_with", "Pipeline"]

View File

@@ -1,107 +0,0 @@
import os
import ollama
from enum import Enum
from agno.agent import Agent
from agno.models.base import Model
from agno.models.google import Gemini
from agno.models.ollama import Ollama
from agno.tools import Toolkit
from agno.utils.log import log_warning #type: ignore
from pydantic import BaseModel
class AppModels(Enum):
"""
Enum per i modelli supportati.
Aggiungere nuovi modelli qui se necessario.
Per quanto riguarda Ollama, i modelli dovranno essere scaricati e installati
localmente seguendo le istruzioni di https://ollama.com/docs/guide/install-models
"""
GEMINI = "gemini-2.0-flash" # API online
GEMINI_PRO = "gemini-2.0-pro" # API online, più costoso ma migliore
OLLAMA_GPT = "gpt-oss:latest" # + good - slow (13b)
OLLAMA_QWEN = "qwen3:latest" # + good + fast (8b)
OLLAMA_QWEN_4B = "qwen3:4b" # + fast + decent (4b)
OLLAMA_QWEN_1B = "qwen3:1.7b" # + very fast + decent (1.7b)
@staticmethod
def availables_local() -> list['AppModels']:
"""
Controlla quali provider di modelli LLM locali sono disponibili.
Ritorna una lista di provider disponibili.
"""
try:
models_list = ollama.list()
availables = [model['model'] for model in models_list['models']]
app_models = [model for model in AppModels if model.name.startswith("OLLAMA")]
return [model for model in app_models if model.value in availables]
except Exception as e:
log_warning(f"Ollama is not running or not reachable: {e}")
return []
@staticmethod
def availables_online() -> list['AppModels']:
"""
Controlla quali provider di modelli LLM online hanno le loro API keys disponibili
come variabili d'ambiente e ritorna una lista di provider disponibili.
"""
if not os.getenv("GOOGLE_API_KEY"):
log_warning("No GOOGLE_API_KEY set in environment variables.")
return []
availables = [AppModels.GEMINI, AppModels.GEMINI_PRO]
return availables
@staticmethod
def availables() -> list['AppModels']:
"""
Controlla quali provider di modelli LLM locali sono disponibili e quali
provider di modelli LLM online hanno le loro API keys disponibili come variabili
d'ambiente e ritorna una lista di provider disponibili.
L'ordine di preferenza è:
1. Gemini (Google)
2. Ollama (locale)
"""
availables = [
*AppModels.availables_online(),
*AppModels.availables_local()
]
assert availables, "No valid model API keys set in environment variables."
return availables
def get_model(self, instructions:str) -> Model:
"""
Restituisce un'istanza del modello specificato.
Args:
instructions: istruzioni da passare al modello (system prompt).
Returns:
Un'istanza di BaseModel o una sua sottoclasse.
Raise:
ValueError se il modello non è supportato.
"""
name = self.value
if self in {model for model in AppModels if model.name.startswith("GEMINI")}:
return Gemini(name, instructions=[instructions])
elif self in {model for model in AppModels if model.name.startswith("OLLAMA")}:
return Ollama(name, instructions=[instructions])
raise ValueError(f"Modello non supportato: {self}")
def get_agent(self, instructions: str, name: str = "", output_schema: type[BaseModel] | None = None, tools: list[Toolkit] | None = None) -> Agent:
"""
Costruisce un agente con il modello e le istruzioni specificate.
Args:
instructions: istruzioni da passare al modello (system prompt)
name: nome dell'agente (opzionale)
output: schema di output opzionale (Pydantic BaseModel)
tools: lista opzionale di strumenti (tools) da fornire all'agente
Returns:
Un'istanza di Agent.
"""
return Agent(
model=self.get_model(instructions),
name=name,
retries=2,
tools=tools,
delay_between_retries=5, # seconds
output_schema=output_schema
)

View File

@@ -1,8 +1,9 @@
from agno.run.agent import RunOutput
from app.agents.models import AppModels
from app.agents.team import create_team_with
from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle
from app.api.base.markets import ProductInfo
from app.agents.predictor import PredictorInput, PredictorOutput
from app.agents.prompts import *
from app.api.core.markets import ProductInfo
from app.configs import AppConfig
class Pipeline:
@@ -12,13 +13,12 @@ class Pipeline:
e scelto dall'utente tramite i dropdown dell'interfaccia grafica.
"""
def __init__(self):
self.available_models = AppModels.availables()
self.all_styles = list(PredictorStyle)
def __init__(self, configs: AppConfig):
self.configs = configs
self.style = self.all_styles[0]
self.team = create_team_with(AppModels.OLLAMA_QWEN_1B)
self.choose_predictor(0) # Modello di default
# Stato iniziale
self.choose_strategy(0)
self.choose_predictor(0)
# ======================
# Dropdown handlers
@@ -27,17 +27,17 @@ class Pipeline:
"""
Sceglie il modello LLM da usare per il Predictor.
"""
model = self.available_models[index]
model = self.configs.models.all_models[index]
self.predictor = model.get_agent(
PREDICTOR_INSTRUCTIONS,
output_schema=PredictorOutput,
)
def choose_style(self, index: int):
def choose_strategy(self, index: int):
"""
Sceglie lo stile (conservativo/aggressivo) da usare per il Predictor.
Sceglie la strategia da usare per il Predictor.
"""
self.style = self.all_styles[index]
self.strat = self.configs.strategies[index].description
# ======================
# Helpers
@@ -46,13 +46,13 @@ class Pipeline:
"""
Restituisce la lista dei nomi dei modelli disponibili.
"""
return [model.name for model in self.available_models]
return [model.label for model in self.configs.models.all_models]
def list_styles(self) -> list[str]:
"""
Restituisce la lista degli stili di previsione disponibili.
"""
return [style.value for style in self.all_styles]
return [strat.label for strat in self.configs.strategies]
# ======================
# Core interaction
@@ -65,7 +65,11 @@ class Pipeline:
4. Restituisce la strategia finale
"""
# Step 1: raccolta output dai membri del Team
team_outputs = self.team.run(query) # type: ignore
team_model = self.configs.get_model_by_name(self.configs.agents.team_model)
leader_model = self.configs.get_model_by_name(self.configs.agents.team_leader_model)
team = create_team_with(self.configs, team_model, leader_model)
team_outputs = team.run(query) # type: ignore
# Step 2: aggregazione output strutturati
all_products: list[ProductInfo] = []
@@ -86,7 +90,7 @@ class Pipeline:
# Step 3: invocazione Predictor
predictor_input = PredictorInput(
data=all_products,
style=self.style,
style=self.strat,
sentiment=aggregated_sentiment
)
@@ -100,6 +104,6 @@ class Pipeline:
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
)
return (
f"📊 Strategia ({self.style.value}): {prediction.strategy}\n\n"
f"📊 Strategia ({self.strat}): {prediction.strategy}\n\n"
f"💼 Portafoglio consigliato:\n{portfolio_lines}"
)

View File

@@ -1,15 +1,9 @@
from enum import Enum
from pydantic import BaseModel, Field
from app.api.base.markets import ProductInfo
class PredictorStyle(Enum):
CONSERVATIVE = "Conservativo"
AGGRESSIVE = "Aggressivo"
from app.api.core.markets import ProductInfo
class PredictorInput(BaseModel):
data: list[ProductInfo] = Field(..., description="Market data as a list of ProductInfo")
style: PredictorStyle = Field(..., description="Prediction style")
style: str = Field(..., description="Prediction style")
sentiment: str = Field(..., description="Aggregated sentiment from news and social analysis")
class ItemPortfolio(BaseModel):
@@ -20,34 +14,3 @@ class ItemPortfolio(BaseModel):
class PredictorOutput(BaseModel):
strategy: str = Field(..., description="Concise operational strategy in Italian")
portfolio: list[ItemPortfolio] = Field(..., description="List of portfolio items with allocations")
PREDICTOR_INSTRUCTIONS = """
You are an **Allocation Algorithm (Crypto-Algo)** specialized in analyzing market data and sentiment to generate an investment strategy and a target portfolio.
Your sole objective is to process the user_input data and generate the strictly structured output as required by the response format. **You MUST NOT provide introductions, preambles, explanations, conclusions, or any additional comments that are not strictly required.**
## Processing Instructions (Absolute Rule)
The allocation strategy must be **derived exclusively from the "Allocation Logic" corresponding to the requested *style*** and the provided market/sentiment data. **DO NOT** use external or historical knowledge.
## Allocation Logic
### "Aggressivo" Style (Aggressive)
* **Priority:** Maximizing return (high volatility accepted).
* **Focus:** Higher allocation to **non-BTC/ETH assets** with high momentum potential (Altcoins, mid/low-cap assets).
* **BTC/ETH:** Must serve as a base (anchor), but their allocation **must not exceed 50%** of the total portfolio.
* **Sentiment:** Use positive sentiment to increase exposure to high-risk assets.
### "Conservativo" Style (Conservative)
* **Priority:** Capital preservation (volatility minimized).
* **Focus:** Major allocation to **BTC and/or ETH (Large-Cap Assets)**.
* **BTC/ETH:** Their allocation **must be at least 70%** of the total portfolio.
* **Altcoins:** Any allocations to non-BTC/ETH assets must be minimal (max 30% combined) and for assets that minimize speculative risk.
* **Sentiment:** Use positive sentiment only as confirmation for exposure, avoiding reactions to excessive "FOMO" signals.
## Output Requirements (Content MUST be in Italian)
1. **Strategy (strategy):** Must be a concise operational description **in Italian ("in Italiano")**, with a maximum of 5 sentences.
2. **Portfolio (portfolio):** The sum of all percentages must be **exactly 100%**. The justification (motivation) for each asset must be a single clear sentence **in Italian ("in Italiano")**.
"""

View File

@@ -0,0 +1,21 @@
from pathlib import Path
__PROMPTS_PATH = Path(__file__).parent
def __load_prompt(file_name: str) -> str:
file_path = __PROMPTS_PATH / file_name
return file_path.read_text(encoding='utf-8').strip()
COORDINATOR_INSTRUCTIONS = __load_prompt("team_leader.txt")
MARKET_INSTRUCTIONS = __load_prompt("team_market.txt")
NEWS_INSTRUCTIONS = __load_prompt("team_news.txt")
SOCIAL_INSTRUCTIONS = __load_prompt("team_social.txt")
PREDICTOR_INSTRUCTIONS = __load_prompt("predictor.txt")
__all__ = [
"COORDINATOR_INSTRUCTIONS",
"MARKET_INSTRUCTIONS",
"NEWS_INSTRUCTIONS",
"SOCIAL_INSTRUCTIONS",
"PREDICTOR_INSTRUCTIONS",
]

View File

@@ -0,0 +1,27 @@
You are an **Allocation Algorithm (Crypto-Algo)** specialized in analyzing market data and sentiment to generate an investment strategy and a target portfolio.
Your sole objective is to process the user_input data and generate the strictly structured output as required by the response format. **You MUST NOT provide introductions, preambles, explanations, conclusions, or any additional comments that are not strictly required.**
## Processing Instructions (Absolute Rule)
The allocation strategy must be **derived exclusively from the "Allocation Logic" corresponding to the requested *style*** and the provided market/sentiment data. **DO NOT** use external or historical knowledge.
## Allocation Logic
### "Aggressivo" Style (Aggressive)
* **Priority:** Maximizing return (high volatility accepted).
* **Focus:** Higher allocation to **non-BTC/ETH assets** with high momentum potential (Altcoins, mid/low-cap assets).
* **BTC/ETH:** Must serve as a base (anchor), but their allocation **must not exceed 50%** of the total portfolio.
* **Sentiment:** Use positive sentiment to increase exposure to high-risk assets.
### "Conservativo" Style (Conservative)
* **Priority:** Capital preservation (volatility minimized).
* **Focus:** Major allocation to **BTC and/or ETH (Large-Cap Assets)**.
* **BTC/ETH:** Their allocation **must be at least 70%** of the total portfolio.
* **Altcoins:** Any allocations to non-BTC/ETH assets must be minimal (max 30% combined) and for assets that minimize speculative risk.
* **Sentiment:** Use positive sentiment only as confirmation for exposure, avoiding reactions to excessive "FOMO" signals.
## Output Requirements (Content MUST be in Italian)
1. **Strategy (strategy):** Must be a concise operational description **in Italian ("in Italiano")**, with a maximum of 5 sentences.
2. **Portfolio (portfolio):** The sum of all percentages must be **exactly 100%**. The justification (motivation) for each asset must be a single clear sentence **in Italian ("in Italiano")**.

View File

@@ -0,0 +1,15 @@
You are the expert coordinator of a financial analysis team specializing in cryptocurrencies.
Your team consists of three agents:
- **MarketAgent**: Provides quantitative market data, price analysis, and technical indicators.
- **NewsAgent**: Scans and analyzes the latest news, articles, and official announcements.
- **SocialAgent**: Gauges public sentiment, trends, and discussions on social media.
Your primary objective is to answer the user's query by orchestrating the work of your team members.
Your workflow is as follows:
1. **Deconstruct the user's query** to identify the required information.
2. **Delegate specific tasks** to the most appropriate agent(s) to gather the necessary data and initial analysis.
3. **Analyze the information** returned by the agents.
4. If the initial data is insufficient or the query is complex, **iteratively re-engage the agents** with follow-up questions to build a comprehensive picture.
5. **Synthesize all the gathered information** into a final, coherent, and complete analysis that fills all the required output fields.

View File

@@ -0,0 +1,19 @@
**TASK:** You are a specialized **Crypto Price Data Retrieval Agent**. Your primary goal is to fetch the most recent and/or historical price data for requested cryptocurrency assets (e.g., 'BTC', 'ETH', 'SOL'). You must provide the data in a clear and structured format.
**AVAILABLE TOOLS:**
1. `get_products(asset_ids: list[str])`: Get **current** product/price info for a list of assets. **(PREFERITA: usa questa per i prezzi live)**
2. `get_historical_prices(asset_id: str, limit: int)`: Get historical price data for one asset. Default limit is 100. **(PREFERITA: usa questa per i dati storici)**
3. `get_products_aggregated(asset_ids: list[str])`: Get **aggregated current** product/price info for a list of assets. **(USA SOLO SE richiesto 'aggregato' o se `get_products` fallisce)**
4. `get_historical_prices_aggregated(asset_id: str, limit: int)`: Get **aggregated historical** price data for one asset. **(USA SOLO SE richiesto 'aggregato' o se `get_historical_prices` fallisce)**
**USAGE GUIDELINE:**
* **Asset ID:** Always convert common names (e.g., 'Bitcoin', 'Ethereum') into their official ticker/ID (e.g., 'BTC', 'ETH').
* **Cost Management (Cruciale per LLM locale):** Prefer `get_products` and `get_historical_prices` for standard requests to minimize costs.
* **Aggregated Data:** Use `get_products_aggregated` or `get_historical_prices_aggregated` only if the user specifically requests aggregated data or you value that having aggregated data is crucial for the analysis.
* **Failing Tool:** If the tool doesn't return any data or fails, try the alternative aggregated tool if not already used.
**REPORTING REQUIREMENT:**
1. **Format:** Output the results in a clear, easy-to-read list or table.
2. **Live Price Request:** If an asset's *current price* is requested, report the **Asset ID**, **Latest Price**, and **Time/Date of the price**.
3. **Historical Price Request:** If *historical data* is requested, report the **Asset ID**, the **Limit** of points returned, and the **First** and **Last** entries from the list of historical prices (Date, Price).
4. **Output:** For all requests, output a single, concise summary of the findings; if requested, also include the raw data retrieved.

View File

@@ -0,0 +1,18 @@
**TASK:** You are a specialized **Crypto News Analyst**. Your goal is to fetch the latest news or top headlines related to cryptocurrencies, and then **analyze the sentiment** of the content to provide a concise report to the team leader. Prioritize 'crypto' or specific cryptocurrency names (e.g., 'Bitcoin', 'Ethereum') in your searches.
**AVAILABLE TOOLS:**
1. `get_latest_news(query: str, limit: int)`: Get the 'limit' most recent news articles for a specific 'query'.
2. `get_top_headlines(limit: int)`: Get the 'limit' top global news headlines.
3. `get_latest_news_aggregated(query: str, limit: int)`: Get aggregated latest news articles for a specific 'query'.
4. `get_top_headlines_aggregated(limit: int)`: Get aggregated top global news headlines.
**USAGE GUIDELINE:**
* Always use `get_latest_news` with a relevant crypto-related query first.
* The default limit for news items should be 5 unless specified otherwise.
* If the tool doesn't return any articles, respond with "No relevant news articles found."
**REPORTING REQUIREMENT:**
1. **Analyze** the tone and key themes of the retrieved articles.
2. **Summarize** the overall **market sentiment** (e.g., highly positive, cautiously neutral, generally negative) based on the content.
3. **Identify** the top 2-3 **main topics** discussed (e.g., new regulation, price surge, institutional adoption).
4. **Output** a single, brief report summarizing these findings. Do not output the raw articles.

View File

@@ -0,0 +1,15 @@
**TASK:** You are a specialized **Social Media Sentiment Analyst**. Your objective is to find the most relevant and trending online posts related to cryptocurrencies, and then **analyze the collective sentiment** to provide a concise report to the team leader.
**AVAILABLE TOOLS:**
1. `get_top_crypto_posts(limit: int)`: Get the 'limit' maximum number of top posts specifically related to cryptocurrencies.
**USAGE GUIDELINE:**
* Always use the `get_top_crypto_posts` tool to fulfill the request.
* The default limit for posts should be 5 unless specified otherwise.
* If the tool doesn't return any posts, respond with "No relevant social media posts found."
**REPORTING REQUIREMENT:**
1. **Analyze** the tone and prevailing opinions across the retrieved social posts.
2. **Summarize** the overall **community sentiment** (e.g., high enthusiasm/FOMO, uncertainty, FUD/fear) based on the content.
3. **Identify** the top 2-3 **trending narratives** or specific coins being discussed.
4. **Output** a single, brief report summarizing these findings. Do not output the raw posts.

View File

@@ -1,109 +1,25 @@
from agno.team import Team
from app.agents import AppModels
from app.api.markets import MarketAPIsTool
from app.api.news import NewsAPIsTool
from app.api.social import SocialAPIsTool
from app.api.tools import *
from app.agents.prompts import *
from app.configs import AppConfig, AppModel
def create_team_with(models: AppModels, coordinator: AppModels | None = None) -> Team:
market_agent = models.get_agent(
instructions=MARKET_INSTRUCTIONS,
name="MarketAgent",
tools=[MarketAPIsTool()]
)
news_agent = models.get_agent(
instructions=NEWS_INSTRUCTIONS,
name="NewsAgent",
tools=[NewsAPIsTool()]
)
social_agent = models.get_agent(
instructions=SOCIAL_INSTRUCTIONS,
name="SocialAgent",
tools=[SocialAPIsTool()]
)
def create_team_with(configs: AppConfig, model: AppModel, coordinator: AppModel | None = None) -> Team:
coordinator = coordinator or models
market_tool = MarketAPIsTool(currency=configs.api.currency)
market_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
news_tool = NewsAPIsTool()
news_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
social_tool = SocialAPIsTool()
social_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
market_agent = model.get_agent(instructions=MARKET_INSTRUCTIONS, name="MarketAgent", tools=[market_tool])
news_agent = model.get_agent(instructions=NEWS_INSTRUCTIONS, name="NewsAgent", tools=[news_tool])
social_agent = model.get_agent(instructions=SOCIAL_INSTRUCTIONS, name="SocialAgent", tools=[social_tool])
coordinator = coordinator or model
return Team(
model=coordinator.get_model(COORDINATOR_INSTRUCTIONS),
name="CryptoAnalysisTeam",
members=[market_agent, news_agent, social_agent],
)
COORDINATOR_INSTRUCTIONS = """
You are the expert coordinator of a financial analysis team specializing in cryptocurrencies.
Your team consists of three agents:
- **MarketAgent**: Provides quantitative market data, price analysis, and technical indicators.
- **NewsAgent**: Scans and analyzes the latest news, articles, and official announcements.
- **SocialAgent**: Gauges public sentiment, trends, and discussions on social media.
Your primary objective is to answer the user's query by orchestrating the work of your team members.
Your workflow is as follows:
1. **Deconstruct the user's query** to identify the required information.
2. **Delegate specific tasks** to the most appropriate agent(s) to gather the necessary data and initial analysis.
3. **Analyze the information** returned by the agents.
4. If the initial data is insufficient or the query is complex, **iteratively re-engage the agents** with follow-up questions to build a comprehensive picture.
5. **Synthesize all the gathered information** into a final, coherent, and complete analysis that fills all the required output fields.
"""
MARKET_INSTRUCTIONS = """
**TASK:** You are a specialized **Crypto Price Data Retrieval Agent**. Your primary goal is to fetch the most recent and/or historical price data for requested cryptocurrency assets (e.g., 'BTC', 'ETH', 'SOL'). You must provide the data in a clear and structured format.
**AVAILABLE TOOLS:**
1. `get_products(asset_ids: list[str])`: Get **current** product/price info for a list of assets. **(PREFERITA: usa questa per i prezzi live)**
2. `get_historical_prices(asset_id: str, limit: int)`: Get historical price data for one asset. Default limit is 100. **(PREFERITA: usa questa per i dati storici)**
3. `get_products_aggregated(asset_ids: list[str])`: Get **aggregated current** product/price info for a list of assets. **(USA SOLO SE richiesto 'aggregato' o se `get_products` fallisce)**
4. `get_historical_prices_aggregated(asset_id: str, limit: int)`: Get **aggregated historical** price data for one asset. **(USA SOLO SE richiesto 'aggregato' o se `get_historical_prices` fallisce)**
**USAGE GUIDELINE:**
* **Asset ID:** Always convert common names (e.g., 'Bitcoin', 'Ethereum') into their official ticker/ID (e.g., 'BTC', 'ETH').
* **Cost Management (Cruciale per LLM locale):** Prefer `get_products` and `get_historical_prices` for standard requests to minimize costs.
* **Aggregated Data:** Use `get_products_aggregated` or `get_historical_prices_aggregated` only if the user specifically requests aggregated data or you value that having aggregated data is crucial for the analysis.
* **Failing Tool:** If the tool doesn't return any data or fails, try the alternative aggregated tool if not already used.
**REPORTING REQUIREMENT:**
1. **Format:** Output the results in a clear, easy-to-read list or table.
2. **Live Price Request:** If an asset's *current price* is requested, report the **Asset ID**, **Latest Price**, and **Time/Date of the price**.
3. **Historical Price Request:** If *historical data* is requested, report the **Asset ID**, the **Limit** of points returned, and the **First** and **Last** entries from the list of historical prices (Date, Price).
4. **Output:** For all requests, output a single, concise summary of the findings; if requested, also include the raw data retrieved.
"""
NEWS_INSTRUCTIONS = """
**TASK:** You are a specialized **Crypto News Analyst**. Your goal is to fetch the latest news or top headlines related to cryptocurrencies, and then **analyze the sentiment** of the content to provide a concise report to the team leader. Prioritize 'crypto' or specific cryptocurrency names (e.g., 'Bitcoin', 'Ethereum') in your searches.
**AVAILABLE TOOLS:**
1. `get_latest_news(query: str, limit: int)`: Get the 'limit' most recent news articles for a specific 'query'.
2. `get_top_headlines(limit: int)`: Get the 'limit' top global news headlines.
3. `get_latest_news_aggregated(query: str, limit: int)`: Get aggregated latest news articles for a specific 'query'.
4. `get_top_headlines_aggregated(limit: int)`: Get aggregated top global news headlines.
**USAGE GUIDELINE:**
* Always use `get_latest_news` with a relevant crypto-related query first.
* The default limit for news items should be 5 unless specified otherwise.
* If the tool doesn't return any articles, respond with "No relevant news articles found."
**REPORTING REQUIREMENT:**
1. **Analyze** the tone and key themes of the retrieved articles.
2. **Summarize** the overall **market sentiment** (e.g., highly positive, cautiously neutral, generally negative) based on the content.
3. **Identify** the top 2-3 **main topics** discussed (e.g., new regulation, price surge, institutional adoption).
4. **Output** a single, brief report summarizing these findings. Do not output the raw articles.
"""
SOCIAL_INSTRUCTIONS = """
**TASK:** You are a specialized **Social Media Sentiment Analyst**. Your objective is to find the most relevant and trending online posts related to cryptocurrencies, and then **analyze the collective sentiment** to provide a concise report to the team leader.
**AVAILABLE TOOLS:**
1. `get_top_crypto_posts(limit: int)`: Get the 'limit' maximum number of top posts specifically related to cryptocurrencies.
**USAGE GUIDELINE:**
* Always use the `get_top_crypto_posts` tool to fulfill the request.
* The default limit for posts should be 5 unless specified otherwise.
* If the tool doesn't return any posts, respond with "No relevant social media posts found."
**REPORTING REQUIREMENT:**
1. **Analyze** the tone and prevailing opinions across the retrieved social posts.
2. **Summarize** the overall **community sentiment** (e.g., high enthusiasm/FOMO, uncertainty, FUD/fear) based on the content.
3. **Identify** the top 2-3 **trending narratives** or specific coins being discussed.
4. **Output** a single, brief report summarizing these findings. Do not output the raw posts.
"""

View File

@@ -1,86 +1,7 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.base.markets import MarketWrapper, Price, ProductInfo
from app.api.markets.binance import BinanceWrapper
from app.api.markets.coinbase import CoinBaseWrapper
from app.api.markets.cryptocompare import CryptoCompareWrapper
from app.api.markets.yfinance import YFinanceWrapper
__all__ = [ "MarketAPIsTool", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "ProductInfo", "Price" ]
__all__ = ["BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper"]
class MarketAPIsTool(MarketWrapper, Toolkit):
"""
Class that aggregates multiple market API wrappers and manages them using WrapperHandler.
This class supports retrieving product information and historical prices.
This class can also aggregate data from multiple sources to provide a more comprehensive view of the market.
The following wrappers are included in this order:
- BinanceWrapper
- YFinanceWrapper
- CoinBaseWrapper
- CryptoCompareWrapper
"""
def __init__(self, currency: str = "USD"):
"""
Initialize the MarketAPIsTool with multiple market API wrappers.
The following wrappers are included in this order:
- BinanceWrapper
- YFinanceWrapper
- CoinBaseWrapper
- CryptoCompareWrapper
Args:
currency (str): Valuta in cui restituire i prezzi. Default è "USD".
"""
kwargs = {"currency": currency or "USD"}
wrappers: list[type[MarketWrapper]] = [BinanceWrapper, YFinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs)
Toolkit.__init__( # type: ignore
self,
name="Market APIs Toolkit",
tools=[
self.get_product,
self.get_products,
self.get_historical_prices,
self.get_products_aggregated,
self.get_historical_prices_aggregated,
],
)
def get_product(self, asset_id: str) -> ProductInfo:
return self.handler.try_call(lambda w: w.get_product(asset_id))
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
return self.handler.try_call(lambda w: w.get_products(asset_ids))
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Restituisce i dati aggregati per una lista di asset_id.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_ids (list[str]): Lista di asset_id da cercare.
Returns:
list[ProductInfo]: Lista di ProductInfo aggregati.
Raises:
Exception: If all wrappers fail to provide results.
"""
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
return ProductInfo.aggregate(all_products)
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
"""
Restituisce i dati storici aggregati per un asset_id. Usa i dati di tutte le fonti disponibili e li aggrega.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_id (str): Asset ID da cercare.
limit (int): Numero massimo di dati storici da restituire.
Returns:
list[Price]: Lista di Price aggregati.
Raises:
Exception: If all wrappers fail to provide results.
"""
all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit))
return Price.aggregate(all_prices)

View File

@@ -1,7 +1,7 @@
import os
from typing import Any
from binance.client import Client # type: ignore
from app.api.base.markets import ProductInfo, MarketWrapper, Price
from app.api.core.markets import ProductInfo, MarketWrapper, Price
def extract_product(currency: str, ticker_data: dict[str, Any]) -> ProductInfo:

View File

@@ -3,7 +3,7 @@ from enum import Enum
from datetime import datetime, timedelta
from coinbase.rest import RESTClient # type: ignore
from coinbase.rest.types.product_types import Candle, GetProductResponse, Product # type: ignore
from app.api.base.markets import ProductInfo, MarketWrapper, Price
from app.api.core.markets import ProductInfo, MarketWrapper, Price
def extract_product(product_data: GetProductResponse | Product) -> ProductInfo:

View File

@@ -1,7 +1,7 @@
import os
from typing import Any
import requests
from app.api.base.markets import ProductInfo, MarketWrapper, Price
from app.api.core.markets import ProductInfo, MarketWrapper, Price
def extract_product(asset_data: dict[str, Any]) -> ProductInfo:

View File

@@ -1,6 +1,6 @@
import json
from agno.tools.yfinance import YFinanceTools
from app.api.base.markets import MarketWrapper, ProductInfo, Price
from app.api.core.markets import MarketWrapper, ProductInfo, Price
def extract_product(stock_data: dict[str, str]) -> ProductInfo:

View File

@@ -1,78 +1,7 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.base.news import NewsWrapper, Article
from app.api.news.news_api import NewsApiWrapper
from app.api.news.newsapi import NewsApiWrapper
from app.api.news.googlenews import GoogleNewsWrapper
from app.api.news.cryptopanic_api import CryptoPanicWrapper
from app.api.news.duckduckgo import DuckDuckGoWrapper
__all__ = ["NewsAPIsTool", "NewsApiWrapper", "GoogleNewsWrapper", "CryptoPanicWrapper", "DuckDuckGoWrapper", "Article"]
__all__ = ["NewsApiWrapper", "GoogleNewsWrapper", "CryptoPanicWrapper", "DuckDuckGoWrapper"]
class NewsAPIsTool(NewsWrapper, Toolkit):
"""
Aggregates multiple news API wrappers and manages them using WrapperHandler.
This class supports retrieving top headlines and latest news articles by querying multiple sources:
- GoogleNewsWrapper
- DuckDuckGoWrapper
- NewsApiWrapper
- CryptoPanicWrapper
By default, it returns results from the first successful wrapper.
Optionally, it can be configured to collect articles from all wrappers.
If no wrapper succeeds, an exception is raised.
"""
def __init__(self):
"""
Initialize the NewsAPIsTool with multiple news API wrappers.
The tool uses WrapperHandler to manage and invoke the different news API wrappers.
The following wrappers are included in this order:
- GoogleNewsWrapper.
- DuckDuckGoWrapper.
- NewsApiWrapper.
- CryptoPanicWrapper.
"""
wrappers: list[type[NewsWrapper]] = [GoogleNewsWrapper, DuckDuckGoWrapper, NewsApiWrapper, CryptoPanicWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
name="News APIs Toolkit",
tools=[
self.get_top_headlines,
self.get_latest_news,
self.get_top_headlines_aggregated,
self.get_latest_news_aggregated,
],
)
def get_top_headlines(self, limit: int = 100) -> list[Article]:
return self.handler.try_call(lambda w: w.get_top_headlines(limit))
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
return self.handler.try_call(lambda w: w.get_latest_news(query, limit))
def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]:
"""
Calls get_top_headlines on all wrappers/providers and returns a dictionary mapping their names to their articles.
Args:
limit (int): Maximum number of articles to retrieve from each provider.
Returns:
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_top_headlines(limit))
def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]:
"""
Calls get_latest_news on all wrappers/providers and returns a dictionary mapping their names to their articles.
Args:
query (str): The search query to find relevant news articles.
limit (int): Maximum number of articles to retrieve from each provider.
Returns:
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_latest_news(query, limit))

View File

@@ -2,7 +2,7 @@ import os
from typing import Any
import requests
from enum import Enum
from app.api.base.news import NewsWrapper, Article
from app.api.core.news import NewsWrapper, Article
class CryptoPanicFilter(Enum):

View File

@@ -1,7 +1,7 @@
import json
from typing import Any
from agno.tools.duckduckgo import DuckDuckGoTools
from app.api.base.news import Article, NewsWrapper
from app.api.core.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:

View File

@@ -1,6 +1,6 @@
from typing import Any
from gnews import GNews # type: ignore
from app.api.base.news import Article, NewsWrapper
from app.api.core.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:

View File

@@ -1,7 +1,7 @@
import os
from typing import Any
import newsapi # type: ignore
from app.api.base.news import Article, NewsWrapper
from app.api.core.news import Article, NewsWrapper
def extract_article(result: dict[str, Any]) -> Article:

View File

@@ -1,53 +1,3 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.base.social import SocialPost, SocialWrapper
from app.api.social.reddit import RedditWrapper
__all__ = ["SocialAPIsTool", "RedditWrapper", "SocialPost"]
class SocialAPIsTool(SocialWrapper, Toolkit):
"""
Aggregates multiple social media API wrappers and manages them using WrapperHandler.
This class supports retrieving top crypto-related posts by querying multiple sources:
- RedditWrapper
By default, it returns results from the first successful wrapper.
Optionally, it can be configured to collect posts from all wrappers.
If no wrapper succeeds, an exception is raised.
"""
def __init__(self):
"""
Initialize the SocialAPIsTool with multiple social media API wrappers.
The tool uses WrapperHandler to manage and invoke the different social media API wrappers.
The following wrappers are included in this order:
- RedditWrapper.
"""
wrappers: list[type[SocialWrapper]] = [RedditWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
name="Socials Toolkit",
tools=[
self.get_top_crypto_posts,
self.get_top_crypto_posts_aggregated,
],
)
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit))
def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]:
"""
Calls get_top_crypto_posts on all wrappers/providers and returns a dictionary mapping their names to their posts.
Args:
limit_per_wrapper (int): Maximum number of posts to retrieve from each provider.
Returns:
dict[str, list[SocialPost]]: A dictionary where keys are wrapper names and values are lists of SocialPost objects.
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_top_crypto_posts(limit_per_wrapper))
__all__ = ["RedditWrapper"]

View File

@@ -1,7 +1,7 @@
import os
from praw import Reddit # type: ignore
from praw.models import Submission # type: ignore
from app.api.base.social import SocialWrapper, SocialPost, SocialComment
from app.api.core.social import SocialWrapper, SocialPost, SocialComment
MAX_COMMENTS = 5

View File

@@ -0,0 +1,5 @@
from app.api.tools.market_tool import MarketAPIsTool
from app.api.tools.social_tool import SocialAPIsTool
from app.api.tools.news_tool import NewsAPIsTool
__all__ = ["MarketAPIsTool", "NewsAPIsTool", "SocialAPIsTool"]

View File

@@ -0,0 +1,80 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.core.markets import MarketWrapper, Price, ProductInfo
from app.api.markets import BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper, YFinanceWrapper
class MarketAPIsTool(MarketWrapper, Toolkit):
"""
Class that aggregates multiple market API wrappers and manages them using WrapperHandler.
This class supports retrieving product information and historical prices.
This class can also aggregate data from multiple sources to provide a more comprehensive view of the market.
The following wrappers are included in this order:
- BinanceWrapper
- YFinanceWrapper
- CoinBaseWrapper
- CryptoCompareWrapper
"""
def __init__(self, currency: str = "USD"):
"""
Initialize the MarketAPIsTool with multiple market API wrappers.
The following wrappers are included in this order:
- BinanceWrapper
- YFinanceWrapper
- CoinBaseWrapper
- CryptoCompareWrapper
Args:
currency (str): Valuta in cui restituire i prezzi. Default è "USD".
"""
kwargs = {"currency": currency or "USD"}
wrappers: list[type[MarketWrapper]] = [BinanceWrapper, YFinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs)
Toolkit.__init__( # type: ignore
self,
name="Market APIs Toolkit",
tools=[
self.get_product,
self.get_products,
self.get_historical_prices,
self.get_products_aggregated,
self.get_historical_prices_aggregated,
],
)
def get_product(self, asset_id: str) -> ProductInfo:
return self.handler.try_call(lambda w: w.get_product(asset_id))
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
return self.handler.try_call(lambda w: w.get_products(asset_ids))
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
"""
Restituisce i dati aggregati per una lista di asset_id.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_ids (list[str]): Lista di asset_id da cercare.
Returns:
list[ProductInfo]: Lista di ProductInfo aggregati.
Raises:
Exception: If all wrappers fail to provide results.
"""
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
return ProductInfo.aggregate(all_products)
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
"""
Restituisce i dati storici aggregati per un asset_id. Usa i dati di tutte le fonti disponibili e li aggrega.\n
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
Args:
asset_id (str): Asset ID da cercare.
limit (int): Numero massimo di dati storici da restituire.
Returns:
list[Price]: Lista di Price aggregati.
Raises:
Exception: If all wrappers fail to provide results.
"""
all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit))
return Price.aggregate(all_prices)

View File

@@ -0,0 +1,72 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.core.news import NewsWrapper, Article
from app.api.news import NewsApiWrapper, GoogleNewsWrapper, CryptoPanicWrapper, DuckDuckGoWrapper
class NewsAPIsTool(NewsWrapper, Toolkit):
"""
Aggregates multiple news API wrappers and manages them using WrapperHandler.
This class supports retrieving top headlines and latest news articles by querying multiple sources:
- GoogleNewsWrapper
- DuckDuckGoWrapper
- NewsApiWrapper
- CryptoPanicWrapper
By default, it returns results from the first successful wrapper.
Optionally, it can be configured to collect articles from all wrappers.
If no wrapper succeeds, an exception is raised.
"""
def __init__(self):
"""
Initialize the NewsAPIsTool with multiple news API wrappers.
The tool uses WrapperHandler to manage and invoke the different news API wrappers.
The following wrappers are included in this order:
- GoogleNewsWrapper.
- DuckDuckGoWrapper.
- NewsApiWrapper.
- CryptoPanicWrapper.
"""
wrappers: list[type[NewsWrapper]] = [GoogleNewsWrapper, DuckDuckGoWrapper, NewsApiWrapper, CryptoPanicWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
name="News APIs Toolkit",
tools=[
self.get_top_headlines,
self.get_latest_news,
self.get_top_headlines_aggregated,
self.get_latest_news_aggregated,
],
)
def get_top_headlines(self, limit: int = 100) -> list[Article]:
return self.handler.try_call(lambda w: w.get_top_headlines(limit))
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
return self.handler.try_call(lambda w: w.get_latest_news(query, limit))
def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]:
"""
Calls get_top_headlines on all wrappers/providers and returns a dictionary mapping their names to their articles.
Args:
limit (int): Maximum number of articles to retrieve from each provider.
Returns:
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_top_headlines(limit))
def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]:
"""
Calls get_latest_news on all wrappers/providers and returns a dictionary mapping their names to their articles.
Args:
query (str): The search query to find relevant news articles.
limit (int): Maximum number of articles to retrieve from each provider.
Returns:
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_latest_news(query, limit))

View File

@@ -0,0 +1,51 @@
from agno.tools import Toolkit
from app.api.wrapper_handler import WrapperHandler
from app.api.core.social import SocialPost, SocialWrapper
from app.api.social import RedditWrapper
class SocialAPIsTool(SocialWrapper, Toolkit):
"""
Aggregates multiple social media API wrappers and manages them using WrapperHandler.
This class supports retrieving top crypto-related posts by querying multiple sources:
- RedditWrapper
By default, it returns results from the first successful wrapper.
Optionally, it can be configured to collect posts from all wrappers.
If no wrapper succeeds, an exception is raised.
"""
def __init__(self):
"""
Initialize the SocialAPIsTool with multiple social media API wrappers.
The tool uses WrapperHandler to manage and invoke the different social media API wrappers.
The following wrappers are included in this order:
- RedditWrapper.
"""
wrappers: list[type[SocialWrapper]] = [RedditWrapper]
self.handler = WrapperHandler.build_wrappers(wrappers)
Toolkit.__init__( # type: ignore
self,
name="Socials Toolkit",
tools=[
self.get_top_crypto_posts,
self.get_top_crypto_posts_aggregated,
],
)
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit))
def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]:
"""
Calls get_top_crypto_posts on all wrappers/providers and returns a dictionary mapping their names to their posts.
Args:
limit_per_wrapper (int): Maximum number of posts to retrieve from each provider.
Returns:
dict[str, list[SocialPost]]: A dictionary where keys are wrapper names and values are lists of SocialPost objects.
Raises:
Exception: If all wrappers fail to provide results.
"""
return self.handler.try_call_all(lambda w: w.get_top_crypto_posts(limit_per_wrapper))

View File

@@ -35,6 +35,16 @@ class WrapperHandler(Generic[WrapperType]):
self.retry_delay = retry_delay
self.index = 0
def set_retries(self, try_per_wrapper: int, retry_delay: int) -> None:
"""
Sets the retry parameters for the handler.
Args:
try_per_wrapper (int): Number of retries per wrapper before switching to the next.
retry_delay (int): Delay in seconds between retries.
"""
self.retry_per_wrapper = try_per_wrapper
self.retry_delay = retry_delay
def try_call(self, func: Callable[[WrapperType], OutputType]) -> OutputType:
"""
Attempts to call the provided function on the current wrapper.

232
src/app/configs.py Normal file
View File

@@ -0,0 +1,232 @@
import os
import threading
import ollama
import yaml
import logging.config
import agno.utils.log # type: ignore
from typing import Any
from pydantic import BaseModel
from agno.agent import Agent
from agno.tools import Toolkit
from agno.models.base import Model
from agno.models.google import Gemini
from agno.models.ollama import Ollama
log = logging.getLogger(__name__)
class AppModel(BaseModel):
name: str = "gemini-2.0-flash"
label: str = "Gemini"
model: type[Model] | None = None
def get_model(self, instructions: str) -> Model:
"""
Restituisce un'istanza del modello specificato.
Args:
instructions: istruzioni da passare al modello (system prompt).
Returns:
Un'istanza di BaseModel o una sua sottoclasse.
Raise:
ValueError se il modello non è supportato.
"""
if self.model is None:
raise ValueError(f"Model class for '{self.name}' is not set.")
return self.model(id=self.name, instructions=[instructions])
def get_agent(self, instructions: str, name: str = "", output_schema: type[BaseModel] | None = None, tools: list[Toolkit] | None = None) -> Agent:
"""
Costruisce un agente con il modello e le istruzioni specificate.
Args:
instructions: istruzioni da passare al modello (system prompt)
name: nome dell'agente (opzionale)
output: schema di output opzionale (Pydantic BaseModel)
tools: lista opzionale di strumenti (tools) da fornire all'agente
Returns:
Un'istanza di Agent.
"""
return Agent(
model=self.get_model(instructions),
name=name,
retries=2,
tools=tools,
delay_between_retries=5, # seconds
output_schema=output_schema
)
class APIConfig(BaseModel):
retry_attempts: int = 3
retry_delay_seconds: int = 2
currency: str = "USD"
class Strategy(BaseModel):
name: str = "Conservative"
label: str = "Conservative"
description: str = "Focus on low-risk investments with steady returns."
class ModelsConfig(BaseModel):
gemini: list[AppModel] = [AppModel()]
ollama: list[AppModel] = []
@property
def all_models(self) -> list[AppModel]:
return self.gemini + self.ollama
class AgentsConfigs(BaseModel):
strategy: str = "Conservative"
team_model: str = "gemini-2.0-flash"
team_leader_model: str = "gemini-2.0-flash"
predictor_model: str = "gemini-2.0-flash"
class AppConfig(BaseModel):
port: int = 8000
gradio_share: bool = False
logging_level: str = "INFO"
api: APIConfig = APIConfig()
strategies: list[Strategy] = [Strategy()]
models: ModelsConfig = ModelsConfig()
agents: AgentsConfigs = AgentsConfigs()
__lock = threading.Lock()
@classmethod
def load(cls, file_path: str = "configs.yaml") -> 'AppConfig':
"""
Load the application configuration from a YAML file.
Be sure to call load_dotenv() before if you use environment variables.
Args:
file_path: path to the YAML configuration file.
Returns:
An instance of AppConfig with the loaded settings.
"""
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
configs = cls(**data)
configs.set_logging_level()
configs.validate_models()
log.info(f"Loaded configuration from {file_path}")
return configs
def __new__(cls, *args: Any, **kwargs: Any) -> 'AppConfig':
with cls.__lock:
if not hasattr(cls, 'instance'):
cls.instance = super(AppConfig, cls).__new__(cls)
return cls.instance
def get_model_by_name(self, name: str) -> AppModel:
"""
Retrieve a model configuration by its name.
Args:
name: the name of the model to retrieve.
Returns:
The AppModel instance if found.
Raises:
ValueError if no model with the specified name is found.
"""
for model in self.models.all_models:
if model.name == name:
return model
raise ValueError(f"Model with name '{name}' not found.")
def get_strategy_by_name(self, name: str) -> Strategy:
"""
Retrieve a strategy configuration by its name.
Args:
name: the name of the strategy to retrieve.
Returns:
The Strategy instance if found.
Raises:
ValueError if no strategy with the specified name is found.
"""
for strat in self.strategies:
if strat.name == name:
return strat
raise ValueError(f"Strategy with name '{name}' not found.")
def set_logging_level(self) -> None:
"""
Set the logging level based on the configuration.
"""
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False, # Keep existing loggers (e.g. third-party loggers)
'formatters': {
'colored': {
'()': 'colorlog.ColoredFormatter',
'format': '%(log_color)s%(levelname)s%(reset)s [%(asctime)s] (%(name)s) - %(message)s'
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'colored',
'level': self.logging_level,
},
},
'root': { # Configure the root logger
'handlers': ['console'],
'level': self.logging_level,
},
'loggers': {
'httpx': {'level': 'WARNING'}, # Too much spam for INFO
}
})
# Modify the agno loggers
agno_logger_names = ["agno", "agno-team", "agno-workflow"]
for logger_name in agno_logger_names:
logger = logging.getLogger(logger_name)
logger.handlers.clear()
logger.propagate = True
def validate_models(self) -> None:
"""
Validate the configured models for each provider.
"""
self.__validate_online_models("gemini", clazz=Gemini, key="GOOGLE_API_KEY")
self.__validate_ollama_models()
def __validate_online_models(self, provider: str, clazz: type[Model], key: str | None = None) -> None:
"""
Validate models for online providers like Gemini.
Args:
provider: name of the provider (e.g. "gemini")
clazz: class of the model (e.g. Gemini)
key: API key required for the provider (optional)
"""
if getattr(self.models, provider) is None:
log.warning(f"No models configured for provider '{provider}'.")
models: list[AppModel] = getattr(self.models, provider)
if key and os.getenv(key) is None:
log.warning(f"No {key} set in environment variables for {provider}.")
models.clear()
return
for model in models:
model.model = clazz
def __validate_ollama_models(self) -> None:
"""
Validate models for the Ollama provider.
"""
try:
models_list = ollama.list()
availables = {model['model'] for model in models_list['models']}
not_availables: list[str] = []
for model in self.models.ollama:
if model.name in availables:
model.model = Ollama
else:
not_availables.append(model.name)
if not_availables:
log.warning(f"Ollama models not available: {not_availables}")
self.models.ollama = [model for model in self.models.ollama if model.model]
except Exception as e:
log.warning(f"Ollama is not running or not reachable: {e}")

View File

@@ -0,0 +1,3 @@
from app.interface.chat import ChatManager
__all__ = ["ChatManager"]

View File

@@ -1,3 +0,0 @@
from app.utils.chat_manager import ChatManager
__all__ = ["ChatManager"]

View File

@@ -1,56 +0,0 @@
import pytest
from app.agents import AppModels
from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle
from app.api.base.markets import ProductInfo
def unified_checks(model: AppModels, input: PredictorInput) -> None:
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output_schema=PredictorOutput) # type: ignore[arg-type]
result = llm.run(input) # type: ignore
content = result.content
assert isinstance(content, PredictorOutput)
assert content.strategy not in (None, "", "null")
assert isinstance(content.strategy, str)
assert isinstance(content.portfolio, list)
assert len(content.portfolio) > 0
for item in content.portfolio:
assert item.asset not in (None, "", "null")
assert isinstance(item.asset, str)
assert item.percentage >= 0.0
assert item.percentage <= 100.0
assert isinstance(item.percentage, (int, float))
assert item.motivation not in (None, "", "null")
assert isinstance(item.motivation, str)
# La somma delle percentuali deve essere esattamente 100
total_percentage = sum(item.percentage for item in content.portfolio)
assert abs(total_percentage - 100) < 0.01 # Permette una piccola tolleranza per errori di arrotondamento
class TestPredictor:
def inputs(self) -> PredictorInput:
data: list[ProductInfo] = []
for symbol, price in [("BTC", 60000.00), ("ETH", 3500.00), ("SOL", 150.00)]:
product_info = ProductInfo()
product_info.symbol = symbol
product_info.price = price
data.append(product_info)
return PredictorInput(data=data, style=PredictorStyle.AGGRESSIVE, sentiment="positivo")
def test_gemini_model_output(self):
inputs = self.inputs()
unified_checks(AppModels.GEMINI, inputs)
def test_ollama_qwen_4b_model_output(self):
inputs = self.inputs()
unified_checks(AppModels.OLLAMA_QWEN_4B, inputs)
@pytest.mark.slow
def test_ollama_qwen_latest_model_output(self):
inputs = self.inputs()
unified_checks(AppModels.OLLAMA_QWEN, inputs)
@pytest.mark.slow
def test_ollama_gpt_oss_model_output(self):
inputs = self.inputs()
unified_checks(AppModels.OLLAMA_GPT, inputs)

View File

@@ -1,5 +1,5 @@
import pytest
from app.api.markets import MarketAPIsTool
from app.api.tools import MarketAPIsTool
@pytest.mark.tools

View File

@@ -1,5 +1,5 @@
import pytest
from app.api.news import NewsAPIsTool
from app.api.tools import NewsAPIsTool
@pytest.mark.tools

View File

@@ -1,5 +1,5 @@
import pytest
from app.api.social import SocialAPIsTool
from app.api.tools import SocialAPIsTool
@pytest.mark.tools

View File

@@ -1,6 +1,6 @@
import pytest
from datetime import datetime
from app.api.base.markets import ProductInfo, Price
from app.api.core.markets import ProductInfo, Price
@pytest.mark.aggregator

14
uv.lock generated
View File

@@ -285,6 +285,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "colorlog"
version = "6.9.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d3/7a/359f4d5df2353f26172b3cc39ea32daa39af8de522205f512f458923e677/colorlog-6.9.0.tar.gz", hash = "sha256:bfba54a1b93b94f54e1f4fe48395725a3d92fd2a4af702f6bd70946bdc0c6ac2", size = 16624, upload-time = "2024-10-29T18:34:51.011Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e3/51/9b208e85196941db2f0654ad0357ca6388ab3ed67efdbfc799f35d1f83aa/colorlog-6.9.0-py3-none-any.whl", hash = "sha256:5906e71acd67cb07a71e779c47c4bcb45fb8c2993eebe9e5adcd6a6f1b283eff", size = 11424, upload-time = "2024-10-29T18:34:49.815Z" },
]
[[package]]
name = "cryptography"
version = "46.0.2"
@@ -1604,6 +1616,7 @@ source = { virtual = "." }
dependencies = [
{ name = "agno" },
{ name = "coinbase-advanced-py" },
{ name = "colorlog" },
{ name = "ddgs" },
{ name = "dotenv" },
{ name = "gnews" },
@@ -1621,6 +1634,7 @@ dependencies = [
requires-dist = [
{ name = "agno" },
{ name = "coinbase-advanced-py" },
{ name = "colorlog" },
{ name = "ddgs" },
{ name = "dotenv" },
{ name = "gnews" },