Configurazioni dell'app #27
@@ -17,6 +17,7 @@ COPY pyproject.toml ./
|
||||
COPY uv.lock ./
|
||||
COPY LICENSE ./
|
||||
COPY src/ ./src/
|
||||
COPY configs.yaml ./
|
||||
|
||||
# Creiamo l'ambiente virtuale con tutto già presente
|
||||
RUN uv sync
|
||||
|
||||
26
README.md
26
README.md
@@ -21,7 +21,7 @@ L'obiettivo è quello di creare un sistema di consulenza finanziaria basato su L
|
||||
|
||||
L'installazione di questo progetto richiede 3 passaggi totali (+1 se si vuole sviluppare in locale) che devono essere eseguiti in sequenza. Se questi passaggi sono eseguiti correttamente, l'applicazione dovrebbe partire senza problemi. Altrimenti è molto probabile che si verifichino errori di vario tipo (moduli mancanti, chiavi API non trovate, ecc.).
|
||||
|
||||
1. Configurare le variabili d'ambiente
|
||||
1. Configurazioni dell'app e delle variabili d'ambiente
|
||||
2. Installare Ollama e i modelli locali
|
||||
3. Far partire il progetto con Docker (consigliato)
|
||||
4. (Solo per sviluppo locale) Installare uv e creare l'ambiente virtuale
|
||||
@@ -29,9 +29,12 @@ L'installazione di questo progetto richiede 3 passaggi totali (+1 se si vuole sv
|
||||
> [!IMPORTANT]\
|
||||
> Prima di iniziare, assicurarsi di avere clonato il repository e di essere nella cartella principale del progetto.
|
||||
|
||||
### **1. Variabili d'Ambiente**
|
||||
### **1. Configurazioni**
|
||||
|
||||
Copia il file `.env.example` in `.env` e successivamente modificalo con le tue API keys:
|
||||
Ci sono due file di configurazione principali che l'app utilizza: `config.yaml` e `.env`.\
|
||||
Il primo contiene le configurazioni generali dell'applicazione e può essere modificato a piacimento, mentre il secondo è utilizzato per le variabili d'ambiente.
|
||||
|
||||
Per il secondo, bisogna copiare il file `.env.example` in `.env` e successivamente modificalo con le tue API keys:
|
||||
```sh
|
||||
cp .env.example .env
|
||||
nano .env # esempio di modifica del file
|
||||
@@ -49,11 +52,8 @@ Per l'installazione scaricare Ollama dal loro [sito ufficiale](https://ollama.co
|
||||
|
||||
Dopo l'installazione, si possono iniziare a scaricare i modelli desiderati tramite il comando `ollama pull <model>:<tag>`.
|
||||
|
||||
I modelli usati dall'applicazione sono visibili in [src/app/models.py](src/app/models.py). Di seguito metto lo stesso una lista di modelli, ma potrebbe non essere aggiornata:
|
||||
- `gpt-oss:latest`
|
||||
- `qwen3:latest`
|
||||
- `qwen3:4b`
|
||||
- `qwen3:1.7b`
|
||||
I modelli usati dall'applicazione sono quelli specificati nel file [config.yaml](config.yaml) alla voce `model`. Se in locale si hanno dei modelli diversi, è possibile modificare questa voce per usare quelli disponibili.
|
||||
I modelli consigliati per questo progetto sono `qwen3:4b` e `qwen3:1.7b`.
|
||||
|
||||
### **3. Docker**
|
||||
Se si vuole solamente avviare il progetto, si consiglia di utilizzare [Docker](https://www.docker.com), dato che sono stati creati i files [Dockerfile](Dockerfile) e [docker-compose.yaml](docker-compose.yaml) per creare il container con tutti i file necessari e già in esecuzione.
|
||||
@@ -105,13 +105,15 @@ Usando la libreria ``gradio`` è stata creata un'interfaccia web semplice per in
|
||||
src
|
||||
└── app
|
||||
├── __main__.py
|
||||
├── agents <-- Agenti, modelli, prompts e simili
|
||||
├── config.py <-- Configurazioni app
|
||||
├── agents <-- Agenti, Team, prompts e simili
|
||||
├── api <-- Tutte le API esterne
|
||||
│ ├── base <-- Classi base per le API
|
||||
│ ├── core <-- Classi core per le API
|
||||
│ ├── markets <-- Market data provider (Es. Binance)
|
||||
│ ├── news <-- News data provider (Es. NewsAPI)
|
||||
│ └── social <-- Social data provider (Es. Reddit)
|
||||
└── utils <-- Codice di utilità generale
|
||||
│ ├── social <-- Social data provider (Es. Reddit)
|
||||
│ └── tools <-- Tools per agenti creati dalle API
|
||||
└── interface <-- Interfacce utente
|
||||
```
|
||||
|
||||
## Tests
|
||||
|
||||
45
configs.yaml
Normal file
45
configs.yaml
Normal file
@@ -0,0 +1,45 @@
|
||||
port: 8000
|
||||
gradio_share: false
|
||||
logging_level: INFO
|
||||
|
||||
strategies:
|
||||
- name: Conservative
|
||||
label: Conservative
|
||||
description: Focus on stable and low-risk investments.
|
||||
- name: Balanced
|
||||
label: Balanced
|
||||
description: A mix of growth and stability.
|
||||
- name: Aggressive
|
||||
label: Aggressive
|
||||
description: High-risk, high-reward investments.
|
||||
|
||||
models:
|
||||
gemini:
|
||||
- name: gemini-2.0-flash
|
||||
label: Gemini
|
||||
- name: gemini-2.0-pro
|
||||
label: Gemini Pro
|
||||
ollama:
|
||||
- name: gpt-oss:latest
|
||||
label: Ollama GPT
|
||||
- name: qwen3:8b
|
||||
label: Qwen 3 (8B)
|
||||
- name: qwen3:4b
|
||||
label: Qwen 3 (4B)
|
||||
- name: qwen3:1.7b
|
||||
label: Qwen 3 (1.7B)
|
||||
|
||||
api:
|
||||
retry_attempts: 3
|
||||
retry_delay_seconds: 2
|
||||
currency: EUR
|
||||
# TODO Magari implementare un sistema per settare i providers
|
||||
market_providers: [BinanceWrapper, YFinanceWrapper]
|
||||
news_providers: [GoogleNewsWrapper, DuckDuckGoWrapper]
|
||||
social_providers: [RedditWrapper]
|
||||
|
||||
agents:
|
||||
strategy: Conservative
|
||||
team_model: qwen3:1.7b
|
||||
team_leader_model: qwen3:4b
|
||||
predictor_model: qwen3:4b
|
||||
@@ -27,7 +27,7 @@ project_root = Path(__file__).parent.parent
|
||||
sys.path.insert(0, str(project_root / "src"))
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from app.markets import (
|
||||
from app.api.markets import (
|
||||
CoinBaseWrapper,
|
||||
CryptoCompareWrapper,
|
||||
BinanceWrapper,
|
||||
|
||||
@@ -5,7 +5,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../src'
|
||||
###########################################
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from app.news import NewsApiWrapper
|
||||
from app.api.news import NewsApiWrapper
|
||||
|
||||
def main():
|
||||
api = NewsApiWrapper()
|
||||
|
||||
@@ -13,6 +13,7 @@ dependencies = [
|
||||
"pytest", # Test
|
||||
"dotenv", # Gestire variabili d'ambiente (generalmente API keys od opzioni)
|
||||
"gradio", # UI web semplice con user_input e output
|
||||
"colorlog", # Log colorati in console
|
||||
|
||||
# Per costruire agenti (ovvero modelli che possono fare più cose tramite tool) https://github.com/agno-agi/agno
|
||||
# altamente consigliata dato che ha anche tools integrati per fare scraping, calcoli e molto altro
|
||||
|
||||
@@ -1,14 +1,19 @@
|
||||
import asyncio
|
||||
import gradio as gr
|
||||
from dotenv import load_dotenv
|
||||
from agno.utils.log import log_info #type: ignore
|
||||
from app.utils import ChatManager
|
||||
from app.configs import AppConfig
|
||||
from app.interface import ChatManager
|
||||
from app.agents import Pipeline
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Inizializzazioni
|
||||
load_dotenv()
|
||||
pipeline = Pipeline()
|
||||
|
||||
configs = AppConfig.load()
|
||||
pipeline = Pipeline(configs)
|
||||
|
||||
chat = ChatManager()
|
||||
|
||||
########################################
|
||||
@@ -57,7 +62,7 @@ if __name__ == "__main__":
|
||||
type="index",
|
||||
label="Stile di investimento"
|
||||
)
|
||||
style.change(fn=pipeline.choose_style, inputs=style, outputs=None)
|
||||
style.change(fn=pipeline.choose_strategy, inputs=style, outputs=None)
|
||||
|
||||
chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages")
|
||||
msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?")
|
||||
@@ -73,7 +78,9 @@ if __name__ == "__main__":
|
||||
save_btn.click(save_current_chat, inputs=None, outputs=None)
|
||||
load_btn.click(load_previous_chat, inputs=None, outputs=[chatbot, chatbot])
|
||||
|
||||
server, port = ("0.0.0.0", 8000) # 0.0.0.0 per accesso esterno (Docker)
|
||||
server_log = "localhost" if server == "0.0.0.0" else server
|
||||
log_info(f"Starting UPO AppAI Chat on http://{server_log}:{port}") # noqa
|
||||
demo.launch(server_name=server, server_port=port, quiet=True)
|
||||
try:
|
||||
_app, local, shared = demo.launch(server_name="0.0.0.0", server_port=configs.port, quiet=True, prevent_thread_lock=True, share=configs.gradio_share)
|
||||
log_info(f"Starting UPO AppAI Chat on {shared or local}")
|
||||
asyncio.get_event_loop().run_forever()
|
||||
except KeyboardInterrupt:
|
||||
demo.close()
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from app.agents.models import AppModels
|
||||
from app.agents.predictor import PredictorInput, PredictorOutput, PredictorStyle, PREDICTOR_INSTRUCTIONS
|
||||
from app.agents.predictor import PredictorInput, PredictorOutput
|
||||
from app.agents.team import create_team_with
|
||||
from app.agents.pipeline import Pipeline
|
||||
|
||||
__all__ = ["AppModels", "PredictorInput", "PredictorOutput", "PredictorStyle", "PREDICTOR_INSTRUCTIONS", "create_team_with", "Pipeline"]
|
||||
__all__ = ["PredictorInput", "PredictorOutput", "create_team_with", "Pipeline"]
|
||||
|
||||
@@ -1,107 +0,0 @@
|
||||
import os
|
||||
import ollama
|
||||
from enum import Enum
|
||||
from agno.agent import Agent
|
||||
from agno.models.base import Model
|
||||
from agno.models.google import Gemini
|
||||
from agno.models.ollama import Ollama
|
||||
from agno.tools import Toolkit
|
||||
from agno.utils.log import log_warning #type: ignore
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class AppModels(Enum):
|
||||
"""
|
||||
Enum per i modelli supportati.
|
||||
Aggiungere nuovi modelli qui se necessario.
|
||||
Per quanto riguarda Ollama, i modelli dovranno essere scaricati e installati
|
||||
localmente seguendo le istruzioni di https://ollama.com/docs/guide/install-models
|
||||
"""
|
||||
GEMINI = "gemini-2.0-flash" # API online
|
||||
GEMINI_PRO = "gemini-2.0-pro" # API online, più costoso ma migliore
|
||||
OLLAMA_GPT = "gpt-oss:latest" # + good - slow (13b)
|
||||
OLLAMA_QWEN = "qwen3:latest" # + good + fast (8b)
|
||||
OLLAMA_QWEN_4B = "qwen3:4b" # + fast + decent (4b)
|
||||
OLLAMA_QWEN_1B = "qwen3:1.7b" # + very fast + decent (1.7b)
|
||||
|
||||
@staticmethod
|
||||
def availables_local() -> list['AppModels']:
|
||||
"""
|
||||
Controlla quali provider di modelli LLM locali sono disponibili.
|
||||
Ritorna una lista di provider disponibili.
|
||||
"""
|
||||
try:
|
||||
models_list = ollama.list()
|
||||
availables = [model['model'] for model in models_list['models']]
|
||||
app_models = [model for model in AppModels if model.name.startswith("OLLAMA")]
|
||||
return [model for model in app_models if model.value in availables]
|
||||
except Exception as e:
|
||||
log_warning(f"Ollama is not running or not reachable: {e}")
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def availables_online() -> list['AppModels']:
|
||||
"""
|
||||
Controlla quali provider di modelli LLM online hanno le loro API keys disponibili
|
||||
come variabili d'ambiente e ritorna una lista di provider disponibili.
|
||||
"""
|
||||
if not os.getenv("GOOGLE_API_KEY"):
|
||||
log_warning("No GOOGLE_API_KEY set in environment variables.")
|
||||
return []
|
||||
availables = [AppModels.GEMINI, AppModels.GEMINI_PRO]
|
||||
return availables
|
||||
|
||||
@staticmethod
|
||||
def availables() -> list['AppModels']:
|
||||
"""
|
||||
Controlla quali provider di modelli LLM locali sono disponibili e quali
|
||||
provider di modelli LLM online hanno le loro API keys disponibili come variabili
|
||||
d'ambiente e ritorna una lista di provider disponibili.
|
||||
L'ordine di preferenza è:
|
||||
1. Gemini (Google)
|
||||
2. Ollama (locale)
|
||||
"""
|
||||
availables = [
|
||||
*AppModels.availables_online(),
|
||||
*AppModels.availables_local()
|
||||
]
|
||||
assert availables, "No valid model API keys set in environment variables."
|
||||
return availables
|
||||
|
||||
def get_model(self, instructions:str) -> Model:
|
||||
"""
|
||||
Restituisce un'istanza del modello specificato.
|
||||
Args:
|
||||
instructions: istruzioni da passare al modello (system prompt).
|
||||
Returns:
|
||||
Un'istanza di BaseModel o una sua sottoclasse.
|
||||
Raise:
|
||||
ValueError se il modello non è supportato.
|
||||
"""
|
||||
name = self.value
|
||||
if self in {model for model in AppModels if model.name.startswith("GEMINI")}:
|
||||
return Gemini(name, instructions=[instructions])
|
||||
elif self in {model for model in AppModels if model.name.startswith("OLLAMA")}:
|
||||
return Ollama(name, instructions=[instructions])
|
||||
|
||||
raise ValueError(f"Modello non supportato: {self}")
|
||||
|
||||
def get_agent(self, instructions: str, name: str = "", output_schema: type[BaseModel] | None = None, tools: list[Toolkit] | None = None) -> Agent:
|
||||
"""
|
||||
Costruisce un agente con il modello e le istruzioni specificate.
|
||||
Args:
|
||||
instructions: istruzioni da passare al modello (system prompt)
|
||||
name: nome dell'agente (opzionale)
|
||||
output: schema di output opzionale (Pydantic BaseModel)
|
||||
tools: lista opzionale di strumenti (tools) da fornire all'agente
|
||||
Returns:
|
||||
Un'istanza di Agent.
|
||||
"""
|
||||
return Agent(
|
||||
model=self.get_model(instructions),
|
||||
name=name,
|
||||
retries=2,
|
||||
tools=tools,
|
||||
delay_between_retries=5, # seconds
|
||||
output_schema=output_schema
|
||||
)
|
||||
@@ -1,8 +1,9 @@
|
||||
from agno.run.agent import RunOutput
|
||||
from app.agents.models import AppModels
|
||||
from app.agents.team import create_team_with
|
||||
from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle
|
||||
from app.api.base.markets import ProductInfo
|
||||
from app.agents.predictor import PredictorInput, PredictorOutput
|
||||
from app.agents.prompts import *
|
||||
from app.api.core.markets import ProductInfo
|
||||
from app.configs import AppConfig
|
||||
|
||||
|
||||
class Pipeline:
|
||||
@@ -12,13 +13,12 @@ class Pipeline:
|
||||
e scelto dall'utente tramite i dropdown dell'interfaccia grafica.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.available_models = AppModels.availables()
|
||||
self.all_styles = list(PredictorStyle)
|
||||
def __init__(self, configs: AppConfig):
|
||||
self.configs = configs
|
||||
|
||||
self.style = self.all_styles[0]
|
||||
self.team = create_team_with(AppModels.OLLAMA_QWEN_1B)
|
||||
self.choose_predictor(0) # Modello di default
|
||||
# Stato iniziale
|
||||
self.choose_strategy(0)
|
||||
self.choose_predictor(0)
|
||||
|
||||
# ======================
|
||||
# Dropdown handlers
|
||||
@@ -27,17 +27,17 @@ class Pipeline:
|
||||
"""
|
||||
Sceglie il modello LLM da usare per il Predictor.
|
||||
"""
|
||||
model = self.available_models[index]
|
||||
model = self.configs.models.all_models[index]
|
||||
self.predictor = model.get_agent(
|
||||
PREDICTOR_INSTRUCTIONS,
|
||||
output_schema=PredictorOutput,
|
||||
)
|
||||
|
||||
def choose_style(self, index: int):
|
||||
def choose_strategy(self, index: int):
|
||||
"""
|
||||
Sceglie lo stile (conservativo/aggressivo) da usare per il Predictor.
|
||||
Sceglie la strategia da usare per il Predictor.
|
||||
"""
|
||||
self.style = self.all_styles[index]
|
||||
self.strat = self.configs.strategies[index].description
|
||||
|
||||
# ======================
|
||||
# Helpers
|
||||
@@ -46,13 +46,13 @@ class Pipeline:
|
||||
"""
|
||||
Restituisce la lista dei nomi dei modelli disponibili.
|
||||
"""
|
||||
return [model.name for model in self.available_models]
|
||||
return [model.label for model in self.configs.models.all_models]
|
||||
|
||||
def list_styles(self) -> list[str]:
|
||||
"""
|
||||
Restituisce la lista degli stili di previsione disponibili.
|
||||
"""
|
||||
return [style.value for style in self.all_styles]
|
||||
return [strat.label for strat in self.configs.strategies]
|
||||
|
||||
# ======================
|
||||
# Core interaction
|
||||
@@ -65,7 +65,11 @@ class Pipeline:
|
||||
4. Restituisce la strategia finale
|
||||
"""
|
||||
# Step 1: raccolta output dai membri del Team
|
||||
team_outputs = self.team.run(query) # type: ignore
|
||||
team_model = self.configs.get_model_by_name(self.configs.agents.team_model)
|
||||
leader_model = self.configs.get_model_by_name(self.configs.agents.team_leader_model)
|
||||
|
||||
team = create_team_with(self.configs, team_model, leader_model)
|
||||
team_outputs = team.run(query) # type: ignore
|
||||
|
||||
# Step 2: aggregazione output strutturati
|
||||
all_products: list[ProductInfo] = []
|
||||
@@ -86,7 +90,7 @@ class Pipeline:
|
||||
# Step 3: invocazione Predictor
|
||||
predictor_input = PredictorInput(
|
||||
data=all_products,
|
||||
style=self.style,
|
||||
style=self.strat,
|
||||
sentiment=aggregated_sentiment
|
||||
)
|
||||
|
||||
@@ -100,6 +104,6 @@ class Pipeline:
|
||||
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
|
||||
)
|
||||
return (
|
||||
f"📊 Strategia ({self.style.value}): {prediction.strategy}\n\n"
|
||||
f"📊 Strategia ({self.strat}): {prediction.strategy}\n\n"
|
||||
f"💼 Portafoglio consigliato:\n{portfolio_lines}"
|
||||
)
|
||||
|
||||
@@ -1,15 +1,9 @@
|
||||
from enum import Enum
|
||||
from pydantic import BaseModel, Field
|
||||
from app.api.base.markets import ProductInfo
|
||||
|
||||
|
||||
class PredictorStyle(Enum):
|
||||
CONSERVATIVE = "Conservativo"
|
||||
AGGRESSIVE = "Aggressivo"
|
||||
from app.api.core.markets import ProductInfo
|
||||
|
||||
class PredictorInput(BaseModel):
|
||||
data: list[ProductInfo] = Field(..., description="Market data as a list of ProductInfo")
|
||||
style: PredictorStyle = Field(..., description="Prediction style")
|
||||
style: str = Field(..., description="Prediction style")
|
||||
sentiment: str = Field(..., description="Aggregated sentiment from news and social analysis")
|
||||
|
||||
class ItemPortfolio(BaseModel):
|
||||
@@ -20,34 +14,3 @@ class ItemPortfolio(BaseModel):
|
||||
class PredictorOutput(BaseModel):
|
||||
strategy: str = Field(..., description="Concise operational strategy in Italian")
|
||||
portfolio: list[ItemPortfolio] = Field(..., description="List of portfolio items with allocations")
|
||||
|
||||
|
||||
PREDICTOR_INSTRUCTIONS = """
|
||||
You are an **Allocation Algorithm (Crypto-Algo)** specialized in analyzing market data and sentiment to generate an investment strategy and a target portfolio.
|
||||
|
||||
Your sole objective is to process the user_input data and generate the strictly structured output as required by the response format. **You MUST NOT provide introductions, preambles, explanations, conclusions, or any additional comments that are not strictly required.**
|
||||
|
||||
## Processing Instructions (Absolute Rule)
|
||||
|
||||
The allocation strategy must be **derived exclusively from the "Allocation Logic" corresponding to the requested *style*** and the provided market/sentiment data. **DO NOT** use external or historical knowledge.
|
||||
|
||||
## Allocation Logic
|
||||
|
||||
### "Aggressivo" Style (Aggressive)
|
||||
* **Priority:** Maximizing return (high volatility accepted).
|
||||
* **Focus:** Higher allocation to **non-BTC/ETH assets** with high momentum potential (Altcoins, mid/low-cap assets).
|
||||
* **BTC/ETH:** Must serve as a base (anchor), but their allocation **must not exceed 50%** of the total portfolio.
|
||||
* **Sentiment:** Use positive sentiment to increase exposure to high-risk assets.
|
||||
|
||||
### "Conservativo" Style (Conservative)
|
||||
* **Priority:** Capital preservation (volatility minimized).
|
||||
* **Focus:** Major allocation to **BTC and/or ETH (Large-Cap Assets)**.
|
||||
* **BTC/ETH:** Their allocation **must be at least 70%** of the total portfolio.
|
||||
* **Altcoins:** Any allocations to non-BTC/ETH assets must be minimal (max 30% combined) and for assets that minimize speculative risk.
|
||||
* **Sentiment:** Use positive sentiment only as confirmation for exposure, avoiding reactions to excessive "FOMO" signals.
|
||||
|
||||
## Output Requirements (Content MUST be in Italian)
|
||||
|
||||
1. **Strategy (strategy):** Must be a concise operational description **in Italian ("in Italiano")**, with a maximum of 5 sentences.
|
||||
2. **Portfolio (portfolio):** The sum of all percentages must be **exactly 100%**. The justification (motivation) for each asset must be a single clear sentence **in Italian ("in Italiano")**.
|
||||
"""
|
||||
21
src/app/agents/prompts/__init__.py
Normal file
21
src/app/agents/prompts/__init__.py
Normal file
@@ -0,0 +1,21 @@
|
||||
from pathlib import Path
|
||||
|
||||
__PROMPTS_PATH = Path(__file__).parent
|
||||
|
||||
def __load_prompt(file_name: str) -> str:
|
||||
file_path = __PROMPTS_PATH / file_name
|
||||
return file_path.read_text(encoding='utf-8').strip()
|
||||
|
||||
COORDINATOR_INSTRUCTIONS = __load_prompt("team_leader.txt")
|
||||
MARKET_INSTRUCTIONS = __load_prompt("team_market.txt")
|
||||
NEWS_INSTRUCTIONS = __load_prompt("team_news.txt")
|
||||
SOCIAL_INSTRUCTIONS = __load_prompt("team_social.txt")
|
||||
PREDICTOR_INSTRUCTIONS = __load_prompt("predictor.txt")
|
||||
|
||||
__all__ = [
|
||||
"COORDINATOR_INSTRUCTIONS",
|
||||
"MARKET_INSTRUCTIONS",
|
||||
"NEWS_INSTRUCTIONS",
|
||||
"SOCIAL_INSTRUCTIONS",
|
||||
"PREDICTOR_INSTRUCTIONS",
|
||||
]
|
||||
27
src/app/agents/prompts/predictor.txt
Normal file
27
src/app/agents/prompts/predictor.txt
Normal file
@@ -0,0 +1,27 @@
|
||||
You are an **Allocation Algorithm (Crypto-Algo)** specialized in analyzing market data and sentiment to generate an investment strategy and a target portfolio.
|
||||
|
||||
Your sole objective is to process the user_input data and generate the strictly structured output as required by the response format. **You MUST NOT provide introductions, preambles, explanations, conclusions, or any additional comments that are not strictly required.**
|
||||
|
||||
## Processing Instructions (Absolute Rule)
|
||||
|
||||
The allocation strategy must be **derived exclusively from the "Allocation Logic" corresponding to the requested *style*** and the provided market/sentiment data. **DO NOT** use external or historical knowledge.
|
||||
|
||||
## Allocation Logic
|
||||
|
||||
### "Aggressivo" Style (Aggressive)
|
||||
* **Priority:** Maximizing return (high volatility accepted).
|
||||
* **Focus:** Higher allocation to **non-BTC/ETH assets** with high momentum potential (Altcoins, mid/low-cap assets).
|
||||
* **BTC/ETH:** Must serve as a base (anchor), but their allocation **must not exceed 50%** of the total portfolio.
|
||||
* **Sentiment:** Use positive sentiment to increase exposure to high-risk assets.
|
||||
|
||||
### "Conservativo" Style (Conservative)
|
||||
* **Priority:** Capital preservation (volatility minimized).
|
||||
* **Focus:** Major allocation to **BTC and/or ETH (Large-Cap Assets)**.
|
||||
* **BTC/ETH:** Their allocation **must be at least 70%** of the total portfolio.
|
||||
* **Altcoins:** Any allocations to non-BTC/ETH assets must be minimal (max 30% combined) and for assets that minimize speculative risk.
|
||||
* **Sentiment:** Use positive sentiment only as confirmation for exposure, avoiding reactions to excessive "FOMO" signals.
|
||||
|
||||
## Output Requirements (Content MUST be in Italian)
|
||||
|
||||
1. **Strategy (strategy):** Must be a concise operational description **in Italian ("in Italiano")**, with a maximum of 5 sentences.
|
||||
2. **Portfolio (portfolio):** The sum of all percentages must be **exactly 100%**. The justification (motivation) for each asset must be a single clear sentence **in Italian ("in Italiano")**.
|
||||
15
src/app/agents/prompts/team_leader.txt
Normal file
15
src/app/agents/prompts/team_leader.txt
Normal file
@@ -0,0 +1,15 @@
|
||||
You are the expert coordinator of a financial analysis team specializing in cryptocurrencies.
|
||||
|
||||
Your team consists of three agents:
|
||||
- **MarketAgent**: Provides quantitative market data, price analysis, and technical indicators.
|
||||
- **NewsAgent**: Scans and analyzes the latest news, articles, and official announcements.
|
||||
- **SocialAgent**: Gauges public sentiment, trends, and discussions on social media.
|
||||
|
||||
Your primary objective is to answer the user's query by orchestrating the work of your team members.
|
||||
|
||||
Your workflow is as follows:
|
||||
1. **Deconstruct the user's query** to identify the required information.
|
||||
2. **Delegate specific tasks** to the most appropriate agent(s) to gather the necessary data and initial analysis.
|
||||
3. **Analyze the information** returned by the agents.
|
||||
4. If the initial data is insufficient or the query is complex, **iteratively re-engage the agents** with follow-up questions to build a comprehensive picture.
|
||||
5. **Synthesize all the gathered information** into a final, coherent, and complete analysis that fills all the required output fields.
|
||||
19
src/app/agents/prompts/team_market.txt
Normal file
19
src/app/agents/prompts/team_market.txt
Normal file
@@ -0,0 +1,19 @@
|
||||
**TASK:** You are a specialized **Crypto Price Data Retrieval Agent**. Your primary goal is to fetch the most recent and/or historical price data for requested cryptocurrency assets (e.g., 'BTC', 'ETH', 'SOL'). You must provide the data in a clear and structured format.
|
||||
|
||||
**AVAILABLE TOOLS:**
|
||||
1. `get_products(asset_ids: list[str])`: Get **current** product/price info for a list of assets. **(PREFERITA: usa questa per i prezzi live)**
|
||||
2. `get_historical_prices(asset_id: str, limit: int)`: Get historical price data for one asset. Default limit is 100. **(PREFERITA: usa questa per i dati storici)**
|
||||
3. `get_products_aggregated(asset_ids: list[str])`: Get **aggregated current** product/price info for a list of assets. **(USA SOLO SE richiesto 'aggregato' o se `get_products` fallisce)**
|
||||
4. `get_historical_prices_aggregated(asset_id: str, limit: int)`: Get **aggregated historical** price data for one asset. **(USA SOLO SE richiesto 'aggregato' o se `get_historical_prices` fallisce)**
|
||||
|
||||
**USAGE GUIDELINE:**
|
||||
* **Asset ID:** Always convert common names (e.g., 'Bitcoin', 'Ethereum') into their official ticker/ID (e.g., 'BTC', 'ETH').
|
||||
* **Cost Management (Cruciale per LLM locale):** Prefer `get_products` and `get_historical_prices` for standard requests to minimize costs.
|
||||
* **Aggregated Data:** Use `get_products_aggregated` or `get_historical_prices_aggregated` only if the user specifically requests aggregated data or you value that having aggregated data is crucial for the analysis.
|
||||
* **Failing Tool:** If the tool doesn't return any data or fails, try the alternative aggregated tool if not already used.
|
||||
|
||||
**REPORTING REQUIREMENT:**
|
||||
1. **Format:** Output the results in a clear, easy-to-read list or table.
|
||||
2. **Live Price Request:** If an asset's *current price* is requested, report the **Asset ID**, **Latest Price**, and **Time/Date of the price**.
|
||||
3. **Historical Price Request:** If *historical data* is requested, report the **Asset ID**, the **Limit** of points returned, and the **First** and **Last** entries from the list of historical prices (Date, Price).
|
||||
4. **Output:** For all requests, output a single, concise summary of the findings; if requested, also include the raw data retrieved.
|
||||
18
src/app/agents/prompts/team_news.txt
Normal file
18
src/app/agents/prompts/team_news.txt
Normal file
@@ -0,0 +1,18 @@
|
||||
**TASK:** You are a specialized **Crypto News Analyst**. Your goal is to fetch the latest news or top headlines related to cryptocurrencies, and then **analyze the sentiment** of the content to provide a concise report to the team leader. Prioritize 'crypto' or specific cryptocurrency names (e.g., 'Bitcoin', 'Ethereum') in your searches.
|
||||
|
||||
**AVAILABLE TOOLS:**
|
||||
1. `get_latest_news(query: str, limit: int)`: Get the 'limit' most recent news articles for a specific 'query'.
|
||||
2. `get_top_headlines(limit: int)`: Get the 'limit' top global news headlines.
|
||||
3. `get_latest_news_aggregated(query: str, limit: int)`: Get aggregated latest news articles for a specific 'query'.
|
||||
4. `get_top_headlines_aggregated(limit: int)`: Get aggregated top global news headlines.
|
||||
|
||||
**USAGE GUIDELINE:**
|
||||
* Always use `get_latest_news` with a relevant crypto-related query first.
|
||||
* The default limit for news items should be 5 unless specified otherwise.
|
||||
* If the tool doesn't return any articles, respond with "No relevant news articles found."
|
||||
|
||||
**REPORTING REQUIREMENT:**
|
||||
1. **Analyze** the tone and key themes of the retrieved articles.
|
||||
2. **Summarize** the overall **market sentiment** (e.g., highly positive, cautiously neutral, generally negative) based on the content.
|
||||
3. **Identify** the top 2-3 **main topics** discussed (e.g., new regulation, price surge, institutional adoption).
|
||||
4. **Output** a single, brief report summarizing these findings. Do not output the raw articles.
|
||||
15
src/app/agents/prompts/team_social.txt
Normal file
15
src/app/agents/prompts/team_social.txt
Normal file
@@ -0,0 +1,15 @@
|
||||
**TASK:** You are a specialized **Social Media Sentiment Analyst**. Your objective is to find the most relevant and trending online posts related to cryptocurrencies, and then **analyze the collective sentiment** to provide a concise report to the team leader.
|
||||
|
||||
**AVAILABLE TOOLS:**
|
||||
1. `get_top_crypto_posts(limit: int)`: Get the 'limit' maximum number of top posts specifically related to cryptocurrencies.
|
||||
|
||||
**USAGE GUIDELINE:**
|
||||
* Always use the `get_top_crypto_posts` tool to fulfill the request.
|
||||
* The default limit for posts should be 5 unless specified otherwise.
|
||||
* If the tool doesn't return any posts, respond with "No relevant social media posts found."
|
||||
|
||||
**REPORTING REQUIREMENT:**
|
||||
1. **Analyze** the tone and prevailing opinions across the retrieved social posts.
|
||||
2. **Summarize** the overall **community sentiment** (e.g., high enthusiasm/FOMO, uncertainty, FUD/fear) based on the content.
|
||||
3. **Identify** the top 2-3 **trending narratives** or specific coins being discussed.
|
||||
4. **Output** a single, brief report summarizing these findings. Do not output the raw posts.
|
||||
@@ -1,109 +1,25 @@
|
||||
from agno.team import Team
|
||||
from app.agents import AppModels
|
||||
from app.api.markets import MarketAPIsTool
|
||||
from app.api.news import NewsAPIsTool
|
||||
from app.api.social import SocialAPIsTool
|
||||
from app.api.tools import *
|
||||
from app.agents.prompts import *
|
||||
from app.configs import AppConfig, AppModel
|
||||
|
||||
|
||||
def create_team_with(models: AppModels, coordinator: AppModels | None = None) -> Team:
|
||||
market_agent = models.get_agent(
|
||||
instructions=MARKET_INSTRUCTIONS,
|
||||
name="MarketAgent",
|
||||
tools=[MarketAPIsTool()]
|
||||
)
|
||||
news_agent = models.get_agent(
|
||||
instructions=NEWS_INSTRUCTIONS,
|
||||
name="NewsAgent",
|
||||
tools=[NewsAPIsTool()]
|
||||
)
|
||||
social_agent = models.get_agent(
|
||||
instructions=SOCIAL_INSTRUCTIONS,
|
||||
name="SocialAgent",
|
||||
tools=[SocialAPIsTool()]
|
||||
)
|
||||
def create_team_with(configs: AppConfig, model: AppModel, coordinator: AppModel | None = None) -> Team:
|
||||
|
||||
coordinator = coordinator or models
|
||||
market_tool = MarketAPIsTool(currency=configs.api.currency)
|
||||
market_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
|
||||
news_tool = NewsAPIsTool()
|
||||
news_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
|
||||
social_tool = SocialAPIsTool()
|
||||
social_tool.handler.set_retries(configs.api.retry_attempts, configs.api.retry_delay_seconds)
|
||||
|
||||
market_agent = model.get_agent(instructions=MARKET_INSTRUCTIONS, name="MarketAgent", tools=[market_tool])
|
||||
news_agent = model.get_agent(instructions=NEWS_INSTRUCTIONS, name="NewsAgent", tools=[news_tool])
|
||||
social_agent = model.get_agent(instructions=SOCIAL_INSTRUCTIONS, name="SocialAgent", tools=[social_tool])
|
||||
|
||||
coordinator = coordinator or model
|
||||
return Team(
|
||||
model=coordinator.get_model(COORDINATOR_INSTRUCTIONS),
|
||||
name="CryptoAnalysisTeam",
|
||||
members=[market_agent, news_agent, social_agent],
|
||||
)
|
||||
|
||||
COORDINATOR_INSTRUCTIONS = """
|
||||
You are the expert coordinator of a financial analysis team specializing in cryptocurrencies.
|
||||
|
||||
Your team consists of three agents:
|
||||
- **MarketAgent**: Provides quantitative market data, price analysis, and technical indicators.
|
||||
- **NewsAgent**: Scans and analyzes the latest news, articles, and official announcements.
|
||||
- **SocialAgent**: Gauges public sentiment, trends, and discussions on social media.
|
||||
|
||||
Your primary objective is to answer the user's query by orchestrating the work of your team members.
|
||||
|
||||
Your workflow is as follows:
|
||||
1. **Deconstruct the user's query** to identify the required information.
|
||||
2. **Delegate specific tasks** to the most appropriate agent(s) to gather the necessary data and initial analysis.
|
||||
3. **Analyze the information** returned by the agents.
|
||||
4. If the initial data is insufficient or the query is complex, **iteratively re-engage the agents** with follow-up questions to build a comprehensive picture.
|
||||
5. **Synthesize all the gathered information** into a final, coherent, and complete analysis that fills all the required output fields.
|
||||
"""
|
||||
|
||||
MARKET_INSTRUCTIONS = """
|
||||
**TASK:** You are a specialized **Crypto Price Data Retrieval Agent**. Your primary goal is to fetch the most recent and/or historical price data for requested cryptocurrency assets (e.g., 'BTC', 'ETH', 'SOL'). You must provide the data in a clear and structured format.
|
||||
|
||||
**AVAILABLE TOOLS:**
|
||||
1. `get_products(asset_ids: list[str])`: Get **current** product/price info for a list of assets. **(PREFERITA: usa questa per i prezzi live)**
|
||||
2. `get_historical_prices(asset_id: str, limit: int)`: Get historical price data for one asset. Default limit is 100. **(PREFERITA: usa questa per i dati storici)**
|
||||
3. `get_products_aggregated(asset_ids: list[str])`: Get **aggregated current** product/price info for a list of assets. **(USA SOLO SE richiesto 'aggregato' o se `get_products` fallisce)**
|
||||
4. `get_historical_prices_aggregated(asset_id: str, limit: int)`: Get **aggregated historical** price data for one asset. **(USA SOLO SE richiesto 'aggregato' o se `get_historical_prices` fallisce)**
|
||||
|
||||
**USAGE GUIDELINE:**
|
||||
* **Asset ID:** Always convert common names (e.g., 'Bitcoin', 'Ethereum') into their official ticker/ID (e.g., 'BTC', 'ETH').
|
||||
* **Cost Management (Cruciale per LLM locale):** Prefer `get_products` and `get_historical_prices` for standard requests to minimize costs.
|
||||
* **Aggregated Data:** Use `get_products_aggregated` or `get_historical_prices_aggregated` only if the user specifically requests aggregated data or you value that having aggregated data is crucial for the analysis.
|
||||
* **Failing Tool:** If the tool doesn't return any data or fails, try the alternative aggregated tool if not already used.
|
||||
|
||||
**REPORTING REQUIREMENT:**
|
||||
1. **Format:** Output the results in a clear, easy-to-read list or table.
|
||||
2. **Live Price Request:** If an asset's *current price* is requested, report the **Asset ID**, **Latest Price**, and **Time/Date of the price**.
|
||||
3. **Historical Price Request:** If *historical data* is requested, report the **Asset ID**, the **Limit** of points returned, and the **First** and **Last** entries from the list of historical prices (Date, Price).
|
||||
4. **Output:** For all requests, output a single, concise summary of the findings; if requested, also include the raw data retrieved.
|
||||
"""
|
||||
|
||||
NEWS_INSTRUCTIONS = """
|
||||
**TASK:** You are a specialized **Crypto News Analyst**. Your goal is to fetch the latest news or top headlines related to cryptocurrencies, and then **analyze the sentiment** of the content to provide a concise report to the team leader. Prioritize 'crypto' or specific cryptocurrency names (e.g., 'Bitcoin', 'Ethereum') in your searches.
|
||||
|
||||
**AVAILABLE TOOLS:**
|
||||
1. `get_latest_news(query: str, limit: int)`: Get the 'limit' most recent news articles for a specific 'query'.
|
||||
2. `get_top_headlines(limit: int)`: Get the 'limit' top global news headlines.
|
||||
3. `get_latest_news_aggregated(query: str, limit: int)`: Get aggregated latest news articles for a specific 'query'.
|
||||
4. `get_top_headlines_aggregated(limit: int)`: Get aggregated top global news headlines.
|
||||
|
||||
**USAGE GUIDELINE:**
|
||||
* Always use `get_latest_news` with a relevant crypto-related query first.
|
||||
* The default limit for news items should be 5 unless specified otherwise.
|
||||
* If the tool doesn't return any articles, respond with "No relevant news articles found."
|
||||
|
||||
**REPORTING REQUIREMENT:**
|
||||
1. **Analyze** the tone and key themes of the retrieved articles.
|
||||
2. **Summarize** the overall **market sentiment** (e.g., highly positive, cautiously neutral, generally negative) based on the content.
|
||||
3. **Identify** the top 2-3 **main topics** discussed (e.g., new regulation, price surge, institutional adoption).
|
||||
4. **Output** a single, brief report summarizing these findings. Do not output the raw articles.
|
||||
"""
|
||||
|
||||
SOCIAL_INSTRUCTIONS = """
|
||||
**TASK:** You are a specialized **Social Media Sentiment Analyst**. Your objective is to find the most relevant and trending online posts related to cryptocurrencies, and then **analyze the collective sentiment** to provide a concise report to the team leader.
|
||||
|
||||
**AVAILABLE TOOLS:**
|
||||
1. `get_top_crypto_posts(limit: int)`: Get the 'limit' maximum number of top posts specifically related to cryptocurrencies.
|
||||
|
||||
**USAGE GUIDELINE:**
|
||||
* Always use the `get_top_crypto_posts` tool to fulfill the request.
|
||||
* The default limit for posts should be 5 unless specified otherwise.
|
||||
* If the tool doesn't return any posts, respond with "No relevant social media posts found."
|
||||
|
||||
**REPORTING REQUIREMENT:**
|
||||
1. **Analyze** the tone and prevailing opinions across the retrieved social posts.
|
||||
2. **Summarize** the overall **community sentiment** (e.g., high enthusiasm/FOMO, uncertainty, FUD/fear) based on the content.
|
||||
3. **Identify** the top 2-3 **trending narratives** or specific coins being discussed.
|
||||
4. **Output** a single, brief report summarizing these findings. Do not output the raw posts.
|
||||
"""
|
||||
|
||||
@@ -1,86 +1,7 @@
|
||||
from agno.tools import Toolkit
|
||||
from app.api.wrapper_handler import WrapperHandler
|
||||
from app.api.base.markets import MarketWrapper, Price, ProductInfo
|
||||
from app.api.markets.binance import BinanceWrapper
|
||||
from app.api.markets.coinbase import CoinBaseWrapper
|
||||
from app.api.markets.cryptocompare import CryptoCompareWrapper
|
||||
from app.api.markets.yfinance import YFinanceWrapper
|
||||
|
||||
__all__ = [ "MarketAPIsTool", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "ProductInfo", "Price" ]
|
||||
__all__ = ["BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper"]
|
||||
|
||||
|
||||
class MarketAPIsTool(MarketWrapper, Toolkit):
|
||||
"""
|
||||
Class that aggregates multiple market API wrappers and manages them using WrapperHandler.
|
||||
This class supports retrieving product information and historical prices.
|
||||
This class can also aggregate data from multiple sources to provide a more comprehensive view of the market.
|
||||
The following wrappers are included in this order:
|
||||
- BinanceWrapper
|
||||
- YFinanceWrapper
|
||||
- CoinBaseWrapper
|
||||
- CryptoCompareWrapper
|
||||
"""
|
||||
|
||||
def __init__(self, currency: str = "USD"):
|
||||
"""
|
||||
Initialize the MarketAPIsTool with multiple market API wrappers.
|
||||
The following wrappers are included in this order:
|
||||
- BinanceWrapper
|
||||
- YFinanceWrapper
|
||||
- CoinBaseWrapper
|
||||
- CryptoCompareWrapper
|
||||
Args:
|
||||
currency (str): Valuta in cui restituire i prezzi. Default è "USD".
|
||||
"""
|
||||
kwargs = {"currency": currency or "USD"}
|
||||
wrappers: list[type[MarketWrapper]] = [BinanceWrapper, YFinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper]
|
||||
self.handler = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs)
|
||||
|
||||
Toolkit.__init__( # type: ignore
|
||||
self,
|
||||
name="Market APIs Toolkit",
|
||||
tools=[
|
||||
self.get_product,
|
||||
self.get_products,
|
||||
self.get_historical_prices,
|
||||
self.get_products_aggregated,
|
||||
self.get_historical_prices_aggregated,
|
||||
],
|
||||
)
|
||||
|
||||
def get_product(self, asset_id: str) -> ProductInfo:
|
||||
return self.handler.try_call(lambda w: w.get_product(asset_id))
|
||||
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
|
||||
return self.handler.try_call(lambda w: w.get_products(asset_ids))
|
||||
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
|
||||
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
|
||||
|
||||
|
||||
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
|
||||
"""
|
||||
Restituisce i dati aggregati per una lista di asset_id.\n
|
||||
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
|
||||
Args:
|
||||
asset_ids (list[str]): Lista di asset_id da cercare.
|
||||
Returns:
|
||||
list[ProductInfo]: Lista di ProductInfo aggregati.
|
||||
Raises:
|
||||
Exception: If all wrappers fail to provide results.
|
||||
"""
|
||||
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
|
||||
return ProductInfo.aggregate(all_products)
|
||||
|
||||
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
|
||||
"""
|
||||
Restituisce i dati storici aggregati per un asset_id. Usa i dati di tutte le fonti disponibili e li aggrega.\n
|
||||
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
|
||||
Args:
|
||||
asset_id (str): Asset ID da cercare.
|
||||
limit (int): Numero massimo di dati storici da restituire.
|
||||
Returns:
|
||||
list[Price]: Lista di Price aggregati.
|
||||
Raises:
|
||||
Exception: If all wrappers fail to provide results.
|
||||
"""
|
||||
all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit))
|
||||
return Price.aggregate(all_prices)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
from typing import Any
|
||||
from binance.client import Client # type: ignore
|
||||
from app.api.base.markets import ProductInfo, MarketWrapper, Price
|
||||
from app.api.core.markets import ProductInfo, MarketWrapper, Price
|
||||
|
||||
|
||||
def extract_product(currency: str, ticker_data: dict[str, Any]) -> ProductInfo:
|
||||
|
||||
@@ -3,7 +3,7 @@ from enum import Enum
|
||||
from datetime import datetime, timedelta
|
||||
from coinbase.rest import RESTClient # type: ignore
|
||||
from coinbase.rest.types.product_types import Candle, GetProductResponse, Product # type: ignore
|
||||
from app.api.base.markets import ProductInfo, MarketWrapper, Price
|
||||
from app.api.core.markets import ProductInfo, MarketWrapper, Price
|
||||
|
||||
|
||||
def extract_product(product_data: GetProductResponse | Product) -> ProductInfo:
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
from typing import Any
|
||||
import requests
|
||||
from app.api.base.markets import ProductInfo, MarketWrapper, Price
|
||||
from app.api.core.markets import ProductInfo, MarketWrapper, Price
|
||||
|
||||
|
||||
def extract_product(asset_data: dict[str, Any]) -> ProductInfo:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import json
|
||||
from agno.tools.yfinance import YFinanceTools
|
||||
from app.api.base.markets import MarketWrapper, ProductInfo, Price
|
||||
from app.api.core.markets import MarketWrapper, ProductInfo, Price
|
||||
|
||||
|
||||
def extract_product(stock_data: dict[str, str]) -> ProductInfo:
|
||||
|
||||
@@ -1,78 +1,7 @@
|
||||
from agno.tools import Toolkit
|
||||
from app.api.wrapper_handler import WrapperHandler
|
||||
from app.api.base.news import NewsWrapper, Article
|
||||
from app.api.news.news_api import NewsApiWrapper
|
||||
from app.api.news.newsapi import NewsApiWrapper
|
||||
from app.api.news.googlenews import GoogleNewsWrapper
|
||||
from app.api.news.cryptopanic_api import CryptoPanicWrapper
|
||||
from app.api.news.duckduckgo import DuckDuckGoWrapper
|
||||
|
||||
__all__ = ["NewsAPIsTool", "NewsApiWrapper", "GoogleNewsWrapper", "CryptoPanicWrapper", "DuckDuckGoWrapper", "Article"]
|
||||
__all__ = ["NewsApiWrapper", "GoogleNewsWrapper", "CryptoPanicWrapper", "DuckDuckGoWrapper"]
|
||||
|
||||
|
||||
class NewsAPIsTool(NewsWrapper, Toolkit):
|
||||
"""
|
||||
Aggregates multiple news API wrappers and manages them using WrapperHandler.
|
||||
This class supports retrieving top headlines and latest news articles by querying multiple sources:
|
||||
- GoogleNewsWrapper
|
||||
- DuckDuckGoWrapper
|
||||
- NewsApiWrapper
|
||||
- CryptoPanicWrapper
|
||||
|
||||
By default, it returns results from the first successful wrapper.
|
||||
Optionally, it can be configured to collect articles from all wrappers.
|
||||
If no wrapper succeeds, an exception is raised.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the NewsAPIsTool with multiple news API wrappers.
|
||||
The tool uses WrapperHandler to manage and invoke the different news API wrappers.
|
||||
The following wrappers are included in this order:
|
||||
- GoogleNewsWrapper.
|
||||
- DuckDuckGoWrapper.
|
||||
- NewsApiWrapper.
|
||||
- CryptoPanicWrapper.
|
||||
"""
|
||||
wrappers: list[type[NewsWrapper]] = [GoogleNewsWrapper, DuckDuckGoWrapper, NewsApiWrapper, CryptoPanicWrapper]
|
||||
self.handler = WrapperHandler.build_wrappers(wrappers)
|
||||
|
||||
Toolkit.__init__( # type: ignore
|
||||
self,
|
||||
name="News APIs Toolkit",
|
||||
tools=[
|
||||
self.get_top_headlines,
|
||||
self.get_latest_news,
|
||||
self.get_top_headlines_aggregated,
|
||||
self.get_latest_news_aggregated,
|
||||
],
|
||||
)
|
||||
|
||||
def get_top_headlines(self, limit: int = 100) -> list[Article]:
|
||||
return self.handler.try_call(lambda w: w.get_top_headlines(limit))
|
||||
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
|
||||
return self.handler.try_call(lambda w: w.get_latest_news(query, limit))
|
||||
|
||||
def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]:
|
||||
"""
|
||||
Calls get_top_headlines on all wrappers/providers and returns a dictionary mapping their names to their articles.
|
||||
Args:
|
||||
limit (int): Maximum number of articles to retrieve from each provider.
|
||||
Returns:
|
||||
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
|
||||
Raises:
|
||||
Exception: If all wrappers fail to provide results.
|
||||
"""
|
||||
return self.handler.try_call_all(lambda w: w.get_top_headlines(limit))
|
||||
|
||||
def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]:
|
||||
"""
|
||||
Calls get_latest_news on all wrappers/providers and returns a dictionary mapping their names to their articles.
|
||||
Args:
|
||||
query (str): The search query to find relevant news articles.
|
||||
limit (int): Maximum number of articles to retrieve from each provider.
|
||||
Returns:
|
||||
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
|
||||
Raises:
|
||||
Exception: If all wrappers fail to provide results.
|
||||
"""
|
||||
return self.handler.try_call_all(lambda w: w.get_latest_news(query, limit))
|
||||
|
||||
@@ -2,7 +2,7 @@ import os
|
||||
from typing import Any
|
||||
import requests
|
||||
from enum import Enum
|
||||
from app.api.base.news import NewsWrapper, Article
|
||||
from app.api.core.news import NewsWrapper, Article
|
||||
|
||||
|
||||
class CryptoPanicFilter(Enum):
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import json
|
||||
from typing import Any
|
||||
from agno.tools.duckduckgo import DuckDuckGoTools
|
||||
from app.api.base.news import Article, NewsWrapper
|
||||
from app.api.core.news import Article, NewsWrapper
|
||||
|
||||
|
||||
def extract_article(result: dict[str, Any]) -> Article:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from typing import Any
|
||||
from gnews import GNews # type: ignore
|
||||
from app.api.base.news import Article, NewsWrapper
|
||||
from app.api.core.news import Article, NewsWrapper
|
||||
|
||||
|
||||
def extract_article(result: dict[str, Any]) -> Article:
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
from typing import Any
|
||||
import newsapi # type: ignore
|
||||
from app.api.base.news import Article, NewsWrapper
|
||||
from app.api.core.news import Article, NewsWrapper
|
||||
|
||||
|
||||
def extract_article(result: dict[str, Any]) -> Article:
|
||||
@@ -1,53 +1,3 @@
|
||||
from agno.tools import Toolkit
|
||||
from app.api.wrapper_handler import WrapperHandler
|
||||
from app.api.base.social import SocialPost, SocialWrapper
|
||||
from app.api.social.reddit import RedditWrapper
|
||||
|
||||
__all__ = ["SocialAPIsTool", "RedditWrapper", "SocialPost"]
|
||||
|
||||
|
||||
class SocialAPIsTool(SocialWrapper, Toolkit):
|
||||
"""
|
||||
Aggregates multiple social media API wrappers and manages them using WrapperHandler.
|
||||
This class supports retrieving top crypto-related posts by querying multiple sources:
|
||||
- RedditWrapper
|
||||
|
||||
By default, it returns results from the first successful wrapper.
|
||||
Optionally, it can be configured to collect posts from all wrappers.
|
||||
If no wrapper succeeds, an exception is raised.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the SocialAPIsTool with multiple social media API wrappers.
|
||||
The tool uses WrapperHandler to manage and invoke the different social media API wrappers.
|
||||
The following wrappers are included in this order:
|
||||
- RedditWrapper.
|
||||
"""
|
||||
|
||||
wrappers: list[type[SocialWrapper]] = [RedditWrapper]
|
||||
self.handler = WrapperHandler.build_wrappers(wrappers)
|
||||
|
||||
Toolkit.__init__( # type: ignore
|
||||
self,
|
||||
name="Socials Toolkit",
|
||||
tools=[
|
||||
self.get_top_crypto_posts,
|
||||
self.get_top_crypto_posts_aggregated,
|
||||
],
|
||||
)
|
||||
|
||||
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
|
||||
return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit))
|
||||
|
||||
def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]:
|
||||
"""
|
||||
Calls get_top_crypto_posts on all wrappers/providers and returns a dictionary mapping their names to their posts.
|
||||
Args:
|
||||
limit_per_wrapper (int): Maximum number of posts to retrieve from each provider.
|
||||
Returns:
|
||||
dict[str, list[SocialPost]]: A dictionary where keys are wrapper names and values are lists of SocialPost objects.
|
||||
Raises:
|
||||
Exception: If all wrappers fail to provide results.
|
||||
"""
|
||||
return self.handler.try_call_all(lambda w: w.get_top_crypto_posts(limit_per_wrapper))
|
||||
__all__ = ["RedditWrapper"]
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
from praw import Reddit # type: ignore
|
||||
from praw.models import Submission # type: ignore
|
||||
from app.api.base.social import SocialWrapper, SocialPost, SocialComment
|
||||
from app.api.core.social import SocialWrapper, SocialPost, SocialComment
|
||||
|
||||
|
||||
MAX_COMMENTS = 5
|
||||
|
||||
5
src/app/api/tools/__init__.py
Normal file
5
src/app/api/tools/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from app.api.tools.market_tool import MarketAPIsTool
|
||||
from app.api.tools.social_tool import SocialAPIsTool
|
||||
from app.api.tools.news_tool import NewsAPIsTool
|
||||
|
||||
__all__ = ["MarketAPIsTool", "NewsAPIsTool", "SocialAPIsTool"]
|
||||
80
src/app/api/tools/market_tool.py
Normal file
80
src/app/api/tools/market_tool.py
Normal file
@@ -0,0 +1,80 @@
|
||||
from agno.tools import Toolkit
|
||||
from app.api.wrapper_handler import WrapperHandler
|
||||
from app.api.core.markets import MarketWrapper, Price, ProductInfo
|
||||
from app.api.markets import BinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper, YFinanceWrapper
|
||||
|
||||
class MarketAPIsTool(MarketWrapper, Toolkit):
|
||||
"""
|
||||
Class that aggregates multiple market API wrappers and manages them using WrapperHandler.
|
||||
This class supports retrieving product information and historical prices.
|
||||
This class can also aggregate data from multiple sources to provide a more comprehensive view of the market.
|
||||
The following wrappers are included in this order:
|
||||
- BinanceWrapper
|
||||
- YFinanceWrapper
|
||||
- CoinBaseWrapper
|
||||
- CryptoCompareWrapper
|
||||
"""
|
||||
|
||||
def __init__(self, currency: str = "USD"):
|
||||
"""
|
||||
Initialize the MarketAPIsTool with multiple market API wrappers.
|
||||
The following wrappers are included in this order:
|
||||
- BinanceWrapper
|
||||
- YFinanceWrapper
|
||||
- CoinBaseWrapper
|
||||
- CryptoCompareWrapper
|
||||
Args:
|
||||
currency (str): Valuta in cui restituire i prezzi. Default è "USD".
|
||||
"""
|
||||
kwargs = {"currency": currency or "USD"}
|
||||
wrappers: list[type[MarketWrapper]] = [BinanceWrapper, YFinanceWrapper, CoinBaseWrapper, CryptoCompareWrapper]
|
||||
self.handler = WrapperHandler.build_wrappers(wrappers, kwargs=kwargs)
|
||||
|
||||
Toolkit.__init__( # type: ignore
|
||||
self,
|
||||
name="Market APIs Toolkit",
|
||||
tools=[
|
||||
self.get_product,
|
||||
self.get_products,
|
||||
self.get_historical_prices,
|
||||
self.get_products_aggregated,
|
||||
self.get_historical_prices_aggregated,
|
||||
],
|
||||
)
|
||||
|
||||
def get_product(self, asset_id: str) -> ProductInfo:
|
||||
return self.handler.try_call(lambda w: w.get_product(asset_id))
|
||||
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
|
||||
return self.handler.try_call(lambda w: w.get_products(asset_ids))
|
||||
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
|
||||
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
|
||||
|
||||
|
||||
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
|
||||
"""
|
||||
Restituisce i dati aggregati per una lista di asset_id.\n
|
||||
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
|
||||
Args:
|
||||
asset_ids (list[str]): Lista di asset_id da cercare.
|
||||
Returns:
|
||||
list[ProductInfo]: Lista di ProductInfo aggregati.
|
||||
Raises:
|
||||
Exception: If all wrappers fail to provide results.
|
||||
"""
|
||||
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
|
||||
return ProductInfo.aggregate(all_products)
|
||||
|
||||
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
|
||||
"""
|
||||
Restituisce i dati storici aggregati per un asset_id. Usa i dati di tutte le fonti disponibili e li aggrega.\n
|
||||
Attenzione che si usano tutte le fonti, quindi potrebbe usare molte chiamate API (che potrebbero essere a pagamento).
|
||||
Args:
|
||||
asset_id (str): Asset ID da cercare.
|
||||
limit (int): Numero massimo di dati storici da restituire.
|
||||
Returns:
|
||||
list[Price]: Lista di Price aggregati.
|
||||
Raises:
|
||||
Exception: If all wrappers fail to provide results.
|
||||
"""
|
||||
all_prices = self.handler.try_call_all(lambda w: w.get_historical_prices(asset_id, limit))
|
||||
return Price.aggregate(all_prices)
|
||||
72
src/app/api/tools/news_tool.py
Normal file
72
src/app/api/tools/news_tool.py
Normal file
@@ -0,0 +1,72 @@
|
||||
from agno.tools import Toolkit
|
||||
from app.api.wrapper_handler import WrapperHandler
|
||||
from app.api.core.news import NewsWrapper, Article
|
||||
from app.api.news import NewsApiWrapper, GoogleNewsWrapper, CryptoPanicWrapper, DuckDuckGoWrapper
|
||||
|
||||
class NewsAPIsTool(NewsWrapper, Toolkit):
|
||||
"""
|
||||
Aggregates multiple news API wrappers and manages them using WrapperHandler.
|
||||
This class supports retrieving top headlines and latest news articles by querying multiple sources:
|
||||
- GoogleNewsWrapper
|
||||
- DuckDuckGoWrapper
|
||||
- NewsApiWrapper
|
||||
- CryptoPanicWrapper
|
||||
|
||||
By default, it returns results from the first successful wrapper.
|
||||
Optionally, it can be configured to collect articles from all wrappers.
|
||||
If no wrapper succeeds, an exception is raised.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the NewsAPIsTool with multiple news API wrappers.
|
||||
The tool uses WrapperHandler to manage and invoke the different news API wrappers.
|
||||
The following wrappers are included in this order:
|
||||
- GoogleNewsWrapper.
|
||||
- DuckDuckGoWrapper.
|
||||
- NewsApiWrapper.
|
||||
- CryptoPanicWrapper.
|
||||
"""
|
||||
wrappers: list[type[NewsWrapper]] = [GoogleNewsWrapper, DuckDuckGoWrapper, NewsApiWrapper, CryptoPanicWrapper]
|
||||
self.handler = WrapperHandler.build_wrappers(wrappers)
|
||||
|
||||
Toolkit.__init__( # type: ignore
|
||||
self,
|
||||
name="News APIs Toolkit",
|
||||
tools=[
|
||||
self.get_top_headlines,
|
||||
self.get_latest_news,
|
||||
self.get_top_headlines_aggregated,
|
||||
self.get_latest_news_aggregated,
|
||||
],
|
||||
)
|
||||
|
||||
def get_top_headlines(self, limit: int = 100) -> list[Article]:
|
||||
return self.handler.try_call(lambda w: w.get_top_headlines(limit))
|
||||
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
|
||||
return self.handler.try_call(lambda w: w.get_latest_news(query, limit))
|
||||
|
||||
def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]:
|
||||
"""
|
||||
Calls get_top_headlines on all wrappers/providers and returns a dictionary mapping their names to their articles.
|
||||
Args:
|
||||
limit (int): Maximum number of articles to retrieve from each provider.
|
||||
Returns:
|
||||
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
|
||||
Raises:
|
||||
Exception: If all wrappers fail to provide results.
|
||||
"""
|
||||
return self.handler.try_call_all(lambda w: w.get_top_headlines(limit))
|
||||
|
||||
def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]:
|
||||
"""
|
||||
Calls get_latest_news on all wrappers/providers and returns a dictionary mapping their names to their articles.
|
||||
Args:
|
||||
query (str): The search query to find relevant news articles.
|
||||
limit (int): Maximum number of articles to retrieve from each provider.
|
||||
Returns:
|
||||
dict[str, list[Article]]: A dictionary mapping providers names to their list of Articles
|
||||
Raises:
|
||||
Exception: If all wrappers fail to provide results.
|
||||
"""
|
||||
return self.handler.try_call_all(lambda w: w.get_latest_news(query, limit))
|
||||
51
src/app/api/tools/social_tool.py
Normal file
51
src/app/api/tools/social_tool.py
Normal file
@@ -0,0 +1,51 @@
|
||||
from agno.tools import Toolkit
|
||||
from app.api.wrapper_handler import WrapperHandler
|
||||
from app.api.core.social import SocialPost, SocialWrapper
|
||||
from app.api.social import RedditWrapper
|
||||
|
||||
|
||||
class SocialAPIsTool(SocialWrapper, Toolkit):
|
||||
"""
|
||||
Aggregates multiple social media API wrappers and manages them using WrapperHandler.
|
||||
This class supports retrieving top crypto-related posts by querying multiple sources:
|
||||
- RedditWrapper
|
||||
|
||||
By default, it returns results from the first successful wrapper.
|
||||
Optionally, it can be configured to collect posts from all wrappers.
|
||||
If no wrapper succeeds, an exception is raised.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the SocialAPIsTool with multiple social media API wrappers.
|
||||
The tool uses WrapperHandler to manage and invoke the different social media API wrappers.
|
||||
The following wrappers are included in this order:
|
||||
- RedditWrapper.
|
||||
"""
|
||||
|
||||
wrappers: list[type[SocialWrapper]] = [RedditWrapper]
|
||||
self.handler = WrapperHandler.build_wrappers(wrappers)
|
||||
|
||||
Toolkit.__init__( # type: ignore
|
||||
self,
|
||||
name="Socials Toolkit",
|
||||
tools=[
|
||||
self.get_top_crypto_posts,
|
||||
self.get_top_crypto_posts_aggregated,
|
||||
],
|
||||
)
|
||||
|
||||
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
|
||||
return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit))
|
||||
|
||||
def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]:
|
||||
"""
|
||||
Calls get_top_crypto_posts on all wrappers/providers and returns a dictionary mapping their names to their posts.
|
||||
Args:
|
||||
limit_per_wrapper (int): Maximum number of posts to retrieve from each provider.
|
||||
Returns:
|
||||
dict[str, list[SocialPost]]: A dictionary where keys are wrapper names and values are lists of SocialPost objects.
|
||||
Raises:
|
||||
Exception: If all wrappers fail to provide results.
|
||||
"""
|
||||
return self.handler.try_call_all(lambda w: w.get_top_crypto_posts(limit_per_wrapper))
|
||||
@@ -35,6 +35,16 @@ class WrapperHandler(Generic[WrapperType]):
|
||||
self.retry_delay = retry_delay
|
||||
self.index = 0
|
||||
|
||||
def set_retries(self, try_per_wrapper: int, retry_delay: int) -> None:
|
||||
"""
|
||||
Sets the retry parameters for the handler.
|
||||
Args:
|
||||
try_per_wrapper (int): Number of retries per wrapper before switching to the next.
|
||||
retry_delay (int): Delay in seconds between retries.
|
||||
"""
|
||||
self.retry_per_wrapper = try_per_wrapper
|
||||
self.retry_delay = retry_delay
|
||||
|
||||
def try_call(self, func: Callable[[WrapperType], OutputType]) -> OutputType:
|
||||
"""
|
||||
Attempts to call the provided function on the current wrapper.
|
||||
|
||||
232
src/app/configs.py
Normal file
232
src/app/configs.py
Normal file
@@ -0,0 +1,232 @@
|
||||
import os
|
||||
|
|
||||
import threading
|
||||
import ollama
|
||||
import yaml
|
||||
import logging.config
|
||||
import agno.utils.log # type: ignore
|
||||
from typing import Any
|
||||
from pydantic import BaseModel
|
||||
from agno.agent import Agent
|
||||
from agno.tools import Toolkit
|
||||
from agno.models.base import Model
|
||||
from agno.models.google import Gemini
|
||||
from agno.models.ollama import Ollama
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
class AppModel(BaseModel):
|
||||
name: str = "gemini-2.0-flash"
|
||||
label: str = "Gemini"
|
||||
model: type[Model] | None = None
|
||||
|
||||
def get_model(self, instructions: str) -> Model:
|
||||
"""
|
||||
Restituisce un'istanza del modello specificato.
|
||||
Args:
|
||||
instructions: istruzioni da passare al modello (system prompt).
|
||||
Returns:
|
||||
Un'istanza di BaseModel o una sua sottoclasse.
|
||||
Raise:
|
||||
ValueError se il modello non è supportato.
|
||||
"""
|
||||
if self.model is None:
|
||||
raise ValueError(f"Model class for '{self.name}' is not set.")
|
||||
return self.model(id=self.name, instructions=[instructions])
|
||||
|
||||
def get_agent(self, instructions: str, name: str = "", output_schema: type[BaseModel] | None = None, tools: list[Toolkit] | None = None) -> Agent:
|
||||
"""
|
||||
Costruisce un agente con il modello e le istruzioni specificate.
|
||||
Args:
|
||||
instructions: istruzioni da passare al modello (system prompt)
|
||||
name: nome dell'agente (opzionale)
|
||||
output: schema di output opzionale (Pydantic BaseModel)
|
||||
tools: lista opzionale di strumenti (tools) da fornire all'agente
|
||||
Returns:
|
||||
Un'istanza di Agent.
|
||||
"""
|
||||
return Agent(
|
||||
model=self.get_model(instructions),
|
||||
name=name,
|
||||
retries=2,
|
||||
tools=tools,
|
||||
delay_between_retries=5, # seconds
|
||||
output_schema=output_schema
|
||||
)
|
||||
|
||||
class APIConfig(BaseModel):
|
||||
retry_attempts: int = 3
|
||||
retry_delay_seconds: int = 2
|
||||
currency: str = "USD"
|
||||
|
||||
class Strategy(BaseModel):
|
||||
name: str = "Conservative"
|
||||
label: str = "Conservative"
|
||||
description: str = "Focus on low-risk investments with steady returns."
|
||||
|
||||
class ModelsConfig(BaseModel):
|
||||
gemini: list[AppModel] = [AppModel()]
|
||||
ollama: list[AppModel] = []
|
||||
|
||||
@property
|
||||
def all_models(self) -> list[AppModel]:
|
||||
return self.gemini + self.ollama
|
||||
|
||||
class AgentsConfigs(BaseModel):
|
||||
strategy: str = "Conservative"
|
||||
team_model: str = "gemini-2.0-flash"
|
||||
team_leader_model: str = "gemini-2.0-flash"
|
||||
predictor_model: str = "gemini-2.0-flash"
|
||||
|
||||
class AppConfig(BaseModel):
|
||||
port: int = 8000
|
||||
gradio_share: bool = False
|
||||
logging_level: str = "INFO"
|
||||
api: APIConfig = APIConfig()
|
||||
strategies: list[Strategy] = [Strategy()]
|
||||
models: ModelsConfig = ModelsConfig()
|
||||
agents: AgentsConfigs = AgentsConfigs()
|
||||
|
||||
__lock = threading.Lock()
|
||||
|
||||
@classmethod
|
||||
def load(cls, file_path: str = "configs.yaml") -> 'AppConfig':
|
||||
"""
|
||||
Load the application configuration from a YAML file.
|
||||
Be sure to call load_dotenv() before if you use environment variables.
|
||||
Args:
|
||||
file_path: path to the YAML configuration file.
|
||||
Returns:
|
||||
An instance of AppConfig with the loaded settings.
|
||||
"""
|
||||
with open(file_path, 'r') as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
configs = cls(**data)
|
||||
configs.set_logging_level()
|
||||
configs.validate_models()
|
||||
log.info(f"Loaded configuration from {file_path}")
|
||||
return configs
|
||||
|
||||
def __new__(cls, *args: Any, **kwargs: Any) -> 'AppConfig':
|
||||
with cls.__lock:
|
||||
if not hasattr(cls, 'instance'):
|
||||
cls.instance = super(AppConfig, cls).__new__(cls)
|
||||
return cls.instance
|
||||
|
||||
def get_model_by_name(self, name: str) -> AppModel:
|
||||
"""
|
||||
Retrieve a model configuration by its name.
|
||||
Args:
|
||||
name: the name of the model to retrieve.
|
||||
Returns:
|
||||
The AppModel instance if found.
|
||||
Raises:
|
||||
ValueError if no model with the specified name is found.
|
||||
"""
|
||||
for model in self.models.all_models:
|
||||
if model.name == name:
|
||||
return model
|
||||
raise ValueError(f"Model with name '{name}' not found.")
|
||||
|
||||
def get_strategy_by_name(self, name: str) -> Strategy:
|
||||
"""
|
||||
Retrieve a strategy configuration by its name.
|
||||
Args:
|
||||
name: the name of the strategy to retrieve.
|
||||
Returns:
|
||||
The Strategy instance if found.
|
||||
Raises:
|
||||
ValueError if no strategy with the specified name is found.
|
||||
"""
|
||||
for strat in self.strategies:
|
||||
if strat.name == name:
|
||||
return strat
|
||||
raise ValueError(f"Strategy with name '{name}' not found.")
|
||||
|
||||
def set_logging_level(self) -> None:
|
||||
"""
|
||||
Set the logging level based on the configuration.
|
||||
"""
|
||||
logging.config.dictConfig({
|
||||
'version': 1,
|
||||
'disable_existing_loggers': False, # Keep existing loggers (e.g. third-party loggers)
|
||||
'formatters': {
|
||||
'colored': {
|
||||
'()': 'colorlog.ColoredFormatter',
|
||||
'format': '%(log_color)s%(levelname)s%(reset)s [%(asctime)s] (%(name)s) - %(message)s'
|
||||
},
|
||||
},
|
||||
'handlers': {
|
||||
'console': {
|
||||
'class': 'logging.StreamHandler',
|
||||
'formatter': 'colored',
|
||||
'level': self.logging_level,
|
||||
},
|
||||
},
|
||||
'root': { # Configure the root logger
|
||||
'handlers': ['console'],
|
||||
'level': self.logging_level,
|
||||
},
|
||||
'loggers': {
|
||||
'httpx': {'level': 'WARNING'}, # Too much spam for INFO
|
||||
}
|
||||
})
|
||||
|
||||
# Modify the agno loggers
|
||||
agno_logger_names = ["agno", "agno-team", "agno-workflow"]
|
||||
for logger_name in agno_logger_names:
|
||||
logger = logging.getLogger(logger_name)
|
||||
logger.handlers.clear()
|
||||
logger.propagate = True
|
||||
|
||||
def validate_models(self) -> None:
|
||||
"""
|
||||
Validate the configured models for each provider.
|
||||
"""
|
||||
self.__validate_online_models("gemini", clazz=Gemini, key="GOOGLE_API_KEY")
|
||||
self.__validate_ollama_models()
|
||||
|
||||
def __validate_online_models(self, provider: str, clazz: type[Model], key: str | None = None) -> None:
|
||||
"""
|
||||
Validate models for online providers like Gemini.
|
||||
Args:
|
||||
provider: name of the provider (e.g. "gemini")
|
||||
clazz: class of the model (e.g. Gemini)
|
||||
key: API key required for the provider (optional)
|
||||
"""
|
||||
if getattr(self.models, provider) is None:
|
||||
log.warning(f"No models configured for provider '{provider}'.")
|
||||
|
||||
models: list[AppModel] = getattr(self.models, provider)
|
||||
if key and os.getenv(key) is None:
|
||||
log.warning(f"No {key} set in environment variables for {provider}.")
|
||||
models.clear()
|
||||
return
|
||||
|
||||
for model in models:
|
||||
model.model = clazz
|
||||
|
||||
def __validate_ollama_models(self) -> None:
|
||||
"""
|
||||
Validate models for the Ollama provider.
|
||||
"""
|
||||
try:
|
||||
models_list = ollama.list()
|
||||
availables = {model['model'] for model in models_list['models']}
|
||||
not_availables: list[str] = []
|
||||
|
||||
for model in self.models.ollama:
|
||||
if model.name in availables:
|
||||
model.model = Ollama
|
||||
else:
|
||||
not_availables.append(model.name)
|
||||
if not_availables:
|
||||
log.warning(f"Ollama models not available: {not_availables}")
|
||||
|
||||
self.models.ollama = [model for model in self.models.ollama if model.model]
|
||||
|
||||
except Exception as e:
|
||||
log.warning(f"Ollama is not running or not reachable: {e}")
|
||||
|
||||
3
src/app/interface/__init__.py
Normal file
3
src/app/interface/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from app.interface.chat import ChatManager
|
||||
|
||||
__all__ = ["ChatManager"]
|
||||
@@ -1,3 +0,0 @@
|
||||
from app.utils.chat_manager import ChatManager
|
||||
|
||||
__all__ = ["ChatManager"]
|
||||
@@ -1,56 +0,0 @@
|
||||
import pytest
|
||||
from app.agents import AppModels
|
||||
from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle
|
||||
from app.api.base.markets import ProductInfo
|
||||
|
||||
def unified_checks(model: AppModels, input: PredictorInput) -> None:
|
||||
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output_schema=PredictorOutput) # type: ignore[arg-type]
|
||||
result = llm.run(input) # type: ignore
|
||||
content = result.content
|
||||
|
||||
assert isinstance(content, PredictorOutput)
|
||||
assert content.strategy not in (None, "", "null")
|
||||
assert isinstance(content.strategy, str)
|
||||
assert isinstance(content.portfolio, list)
|
||||
assert len(content.portfolio) > 0
|
||||
for item in content.portfolio:
|
||||
assert item.asset not in (None, "", "null")
|
||||
assert isinstance(item.asset, str)
|
||||
assert item.percentage >= 0.0
|
||||
assert item.percentage <= 100.0
|
||||
assert isinstance(item.percentage, (int, float))
|
||||
assert item.motivation not in (None, "", "null")
|
||||
assert isinstance(item.motivation, str)
|
||||
# La somma delle percentuali deve essere esattamente 100
|
||||
total_percentage = sum(item.percentage for item in content.portfolio)
|
||||
assert abs(total_percentage - 100) < 0.01 # Permette una piccola tolleranza per errori di arrotondamento
|
||||
|
||||
class TestPredictor:
|
||||
|
||||
def inputs(self) -> PredictorInput:
|
||||
data: list[ProductInfo] = []
|
||||
for symbol, price in [("BTC", 60000.00), ("ETH", 3500.00), ("SOL", 150.00)]:
|
||||
product_info = ProductInfo()
|
||||
product_info.symbol = symbol
|
||||
product_info.price = price
|
||||
data.append(product_info)
|
||||
|
||||
return PredictorInput(data=data, style=PredictorStyle.AGGRESSIVE, sentiment="positivo")
|
||||
|
||||
def test_gemini_model_output(self):
|
||||
inputs = self.inputs()
|
||||
unified_checks(AppModels.GEMINI, inputs)
|
||||
|
||||
def test_ollama_qwen_4b_model_output(self):
|
||||
inputs = self.inputs()
|
||||
unified_checks(AppModels.OLLAMA_QWEN_4B, inputs)
|
||||
|
||||
@pytest.mark.slow
|
||||
def test_ollama_qwen_latest_model_output(self):
|
||||
inputs = self.inputs()
|
||||
unified_checks(AppModels.OLLAMA_QWEN, inputs)
|
||||
|
||||
@pytest.mark.slow
|
||||
def test_ollama_gpt_oss_model_output(self):
|
||||
inputs = self.inputs()
|
||||
unified_checks(AppModels.OLLAMA_GPT, inputs)
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
from app.api.markets import MarketAPIsTool
|
||||
from app.api.tools import MarketAPIsTool
|
||||
|
||||
|
||||
@pytest.mark.tools
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
from app.api.news import NewsAPIsTool
|
||||
from app.api.tools import NewsAPIsTool
|
||||
|
||||
|
||||
@pytest.mark.tools
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
from app.api.social import SocialAPIsTool
|
||||
from app.api.tools import SocialAPIsTool
|
||||
|
||||
|
||||
@pytest.mark.tools
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
from app.api.base.markets import ProductInfo, Price
|
||||
from app.api.core.markets import ProductInfo, Price
|
||||
|
||||
|
||||
@pytest.mark.aggregator
|
||||
|
||||
14
uv.lock
generated
14
uv.lock
generated
@@ -285,6 +285,18 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "colorlog"
|
||||
version = "6.9.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "colorama", marker = "sys_platform == 'win32'" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/d3/7a/359f4d5df2353f26172b3cc39ea32daa39af8de522205f512f458923e677/colorlog-6.9.0.tar.gz", hash = "sha256:bfba54a1b93b94f54e1f4fe48395725a3d92fd2a4af702f6bd70946bdc0c6ac2", size = 16624, upload-time = "2024-10-29T18:34:51.011Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/e3/51/9b208e85196941db2f0654ad0357ca6388ab3ed67efdbfc799f35d1f83aa/colorlog-6.9.0-py3-none-any.whl", hash = "sha256:5906e71acd67cb07a71e779c47c4bcb45fb8c2993eebe9e5adcd6a6f1b283eff", size = 11424, upload-time = "2024-10-29T18:34:49.815Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cryptography"
|
||||
version = "46.0.2"
|
||||
@@ -1604,6 +1616,7 @@ source = { virtual = "." }
|
||||
dependencies = [
|
||||
{ name = "agno" },
|
||||
{ name = "coinbase-advanced-py" },
|
||||
{ name = "colorlog" },
|
||||
{ name = "ddgs" },
|
||||
{ name = "dotenv" },
|
||||
{ name = "gnews" },
|
||||
@@ -1621,6 +1634,7 @@ dependencies = [
|
||||
requires-dist = [
|
||||
{ name = "agno" },
|
||||
{ name = "coinbase-advanced-py" },
|
||||
{ name = "colorlog" },
|
||||
{ name = "ddgs" },
|
||||
{ name = "dotenv" },
|
||||
{ name = "gnews" },
|
||||
|
||||
Reference in New Issue
Block a user
This singleton implementation is not thread-safe and can cause issues in concurrent environments. Consider using a proper singleton pattern with locks or removing the singleton behavior if it's not strictly necessary.
Import statements should be placed at the top of the file, not within method bodies. Move this import to the top-level imports section.
Comment should be in English to maintain consistency with code comments throughout the project.
Comment should be in English to maintain consistency with code comments throughout the project.