- use Pydantic for input & output for models
- update ToolAgent to utilize new model definitions
- improve test cases for consistency
This commit is contained in:
2025-09-27 18:51:20 +02:00
parent 03d8523a5a
commit 4615ebe63e
5 changed files with 101 additions and 126 deletions

View File

@@ -1,81 +1,51 @@
import json
from enum import Enum from enum import Enum
from app.markets.base import ProductInfo from app.markets.base import ProductInfo
from pydantic import BaseModel, Field
class PredictorStyle(Enum): class PredictorStyle(Enum):
CONSERVATIVE = "Conservativo" CONSERVATIVE = "Conservativo"
AGGRESSIVE = "Aggressivo" AGGRESSIVE = "Aggressivo"
# TODO (?) Change sentiment to a more structured format or merge it with data analysis (change then also the prompt) class PredictorInput(BaseModel):
def prepare_inputs(data: list[ProductInfo], style: PredictorStyle, sentiment: str) -> str: data: list[ProductInfo] = Field(..., description="Market data as a list of ProductInfo")
return json.dumps({ style: PredictorStyle = Field(..., description="Prediction style")
"data": [(product.symbol, f"{product.price:.2f}") for product in data], sentiment: str = Field(..., description="Aggregated sentiment from news and social analysis")
"style": style.value,
"sentiment": sentiment
})
def instructions() -> str: class ItemPortfolio(BaseModel):
return """ asset: str = Field(..., description="Name of the asset")
You are an **Allocation Algorithm (Crypto-Algo)**. Your sole objective is to process the input data and generate a strictly structured output, as specified. **You must not provide any explanations, conclusions, introductions, preambles, or comments that are not strictly required by the final format.** percentage: float = Field(..., description="Percentage allocation to the asset")
motivation: str = Field(..., description="Motivation for the allocation")
**CRITICAL INSTRUCTION: The final output MUST be a valid JSON object written entirely in Italian, following the structure below.** class PredictorOutput(BaseModel):
strategy: str = Field(..., description="Concise operational strategy in Italian")
portfolio: list[ItemPortfolio] = Field(..., description="List of portfolio items with allocations")
PREDICTOR_INSTRUCTIONS = """
You are an **Allocation Algorithm (Crypto-Algo)** specialized in analyzing market data and sentiment to generate an investment strategy and a target portfolio.
Your sole objective is to process the input data and generate the strictly structured output as required by the response format. **You MUST NOT provide introductions, preambles, explanations, conclusions, or any additional comments that are not strictly required.**
## Processing Instructions (Absolute Rule) ## Processing Instructions (Absolute Rule)
Analyze the Input provided in JSON format and generate the Output in two distinct sections. Your allocation strategy must be **derived exclusively from the "Logic Rule" corresponding to the requested *style*** and the *data* provided. **DO NOT** use external knowledge. The allocation strategy must be **derived exclusively from the "Allocation Logic" corresponding to the requested *style*** and the provided market/sentiment data. **DO NOT** use external or historical knowledge.
## Data Input (JSON Format) ## Allocation Logic
The input will be a single JSON block containing the following mandatory fields:
1. **"data":** *Array of Arrays*. Market data. Format: `[[Asset_Name: String, Current_Price: String], ...]`
* *Example:* `[["BTC", "60000.00"], ["ETH", "3500.00"], ["SOL", "150.00"]]`
2. **"style":** *ENUM String (only "conservativo" or "aggressivo")*. Defines the risk approach.
3. **"sentiment":** *Descriptive String*. Summarizes market sentiment.
## Allocation Logic Rules
### "Aggressivo" Style (Aggressive) ### "Aggressivo" Style (Aggressive)
* **Priority:** Maximum return (High Volatility accepted). * **Priority:** Maximizing return (high volatility accepted).
* **Focus:** Higher allocation to **non-BTC/ETH assets** with high momentum potential (Altcoins, mid/low-cap assets). * **Focus:** Higher allocation to **non-BTC/ETH assets** with high momentum potential (Altcoins, mid/low-cap assets).
* **BTC/ETH:** Must form a base (anchor), but their allocation **must not exceed 50%** of the total portfolio. * **BTC/ETH:** Must serve as a base (anchor), but their allocation **must not exceed 50%** of the total portfolio.
* **Sentiment:** Use positive sentiment to increase allocation to high-risk assets. * **Sentiment:** Use positive sentiment to increase exposure to high-risk assets.
### "Conservativo" Style (Conservative) ### "Conservativo" Style (Conservative)
* **Priority:** Capital preservation (Volatility minimized). * **Priority:** Capital preservation (volatility minimized).
* **Focus:** Major allocation to **BTC and/or ETH (Large-Cap Assets)**. * **Focus:** Major allocation to **BTC and/or ETH (Large-Cap Assets)**.
* **BTC/ETH:** Their allocation **must be at least 70%** of the total portfolio. * **BTC/ETH:** Their allocation **must be at least 70%** of the total portfolio.
* **Altcoins:** Any allocations to non-BTC/ETH assets must be minimal (max 30% combined) and for assets that minimize speculative risk. * **Altcoins:** Any allocations to non-BTC/ETH assets must be minimal (max 30% combined) and for assets that minimize speculative risk.
* **Sentiment:** Use positive sentiment only as confirmation for exposure, avoiding reactions to excessive "FOMO" signals. * **Sentiment:** Use positive sentiment only as confirmation for exposure, avoiding reactions to excessive "FOMO" signals.
## Output Format Requirements (Strict JSON) ## Output Requirements (Content MUST be in Italian)
The Output **must be a single JSON object** with two keys: `"strategia"` and `"portafoglio"`. 1. **Strategy (strategy):** Must be a concise operational description **in Italian ("in Italiano")**, with a maximum of 5 sentences.
2. **Portfolio (portfolio):** The sum of all percentages must be **exactly 100%**. The justification (motivation) for each asset must be a single clear sentence **in Italian ("in Italiano")**.
1. **"strategia":** *Stringa (massimo 5 frasi in Italiano)*. Una descrizione operativa concisa.
2. **"portafoglio":** *Array di Oggetti JSON*. La somma delle percentuali deve essere **esattamente 100%**. Ogni oggetto nell'array deve avere i seguenti campi (valori in Italiano):
* `"asset"`: Nome dell'Asset (es. "BTC").
* `"percentuale"`: Percentuale di Allocazione (come numero intero o decimale, es. 45.0).
* `"motivazione"`: Stringa (massimo una frase) che giustifica l'allocazione.
**THE OUTPUT MUST BE GENERATED BY FAITHFULLY COPYING THE FOLLOWING STRUCTURAL TEMPLATE (IN ITALIAN CONTENT, JSON FORMAT):**
{
"strategia": "[Strategia sintetico-operativa in massimo 5 frasi...]",
"portafoglio": [
{
"asset": "Asset_1",
"percentuale": X,
"motivazione": "[Massimo una frase chiara in Italiano]"
},
{
"asset": "Asset_2",
"percentuale": Y,
"motivazione": "[Massimo una frase chiara in Italiano]"
},
{
"asset": "Asset_3",
"percentuale": Z,
"motivazione": "[Massimo una frase chiara in Italiano]"
}
]
}
""" """

View File

@@ -1,5 +1,5 @@
from coinbase.rest.types.product_types import Candle, GetProductResponse from coinbase.rest.types.product_types import Candle, GetProductResponse
from pydantic import BaseModel
class BaseWrapper: class BaseWrapper:
""" """
@@ -15,17 +15,17 @@ class BaseWrapper:
def get_historical_prices(self, asset_id: str = "BTC") -> list['Price']: def get_historical_prices(self, asset_id: str = "BTC") -> list['Price']:
raise NotImplementedError raise NotImplementedError
class ProductInfo: class ProductInfo(BaseModel):
""" """
Informazioni sul prodotto, come ottenute dalle API di mercato. Informazioni sul prodotto, come ottenute dalle API di mercato.
Implementa i metodi di conversione dai dati grezzi delle API. Implementa i metodi di conversione dai dati grezzi delle API.
""" """
id: str id: str = ""
symbol: str symbol: str = ""
price: float price: float = 0.0
volume_24h: float volume_24h: float = 0.0
status: str status: str = ""
quote_currency: str quote_currency: str = ""
def from_coinbase(product_data: GetProductResponse) -> 'ProductInfo': def from_coinbase(product_data: GetProductResponse) -> 'ProductInfo':
product = ProductInfo() product = ProductInfo()
@@ -46,17 +46,17 @@ class ProductInfo:
product.status = "" # Cryptocompare does not provide status product.status = "" # Cryptocompare does not provide status
return product return product
class Price: class Price(BaseModel):
""" """
Rappresenta i dati di prezzo per un asset, come ottenuti dalle API di mercato. Rappresenta i dati di prezzo per un asset, come ottenuti dalle API di mercato.
Implementa i metodi di conversione dai dati grezzi delle API. Implementa i metodi di conversione dai dati grezzi delle API.
""" """
high: float high: float = 0.0
low: float low: float = 0.0
open: float open: float = 0.0
close: float close: float = 0.0
volume: float volume: float = 0.0
time: str time: str = ""
def from_coinbase(candle_data: Candle) -> 'Price': def from_coinbase(candle_data: Candle) -> 'Price':
price = Price() price = Price()

View File

@@ -1,14 +1,15 @@
import os import os
import requests import requests
from enum import Enum from enum import Enum
from pydantic import BaseModel
from agno.agent import Agent from agno.agent import Agent
from agno.models.base import BaseModel from agno.models.base import Model
from agno.models.google import Gemini from agno.models.google import Gemini
from agno.models.ollama import Ollama from agno.models.ollama import Ollama
from agno.utils.log import log_warning from agno.utils.log import log_warning
class Models(Enum): class AppModels(Enum):
""" """
Enum per i modelli supportati. Enum per i modelli supportati.
Aggiungere nuovi modelli qui se necessario. Aggiungere nuovi modelli qui se necessario.
@@ -21,7 +22,7 @@ class Models(Enum):
OLLAMA_QWEN = "qwen3:latest" # + good + fast (8b) OLLAMA_QWEN = "qwen3:latest" # + good + fast (8b)
@staticmethod @staticmethod
def availables_local() -> list['Models']: def availables_local() -> list['AppModels']:
""" """
Controlla quali provider di modelli LLM locali sono disponibili. Controlla quali provider di modelli LLM locali sono disponibili.
Ritorna una lista di provider disponibili. Ritorna una lista di provider disponibili.
@@ -34,13 +35,13 @@ class Models(Enum):
availables = [] availables = []
result = result.text result = result.text
if Models.OLLAMA_GPT.value in result: if AppModels.OLLAMA_GPT.value in result:
availables.append(Models.OLLAMA_GPT) availables.append(AppModels.OLLAMA_GPT)
if Models.OLLAMA_QWEN.value in result: if AppModels.OLLAMA_QWEN.value in result:
availables.append(Models.OLLAMA_QWEN) availables.append(AppModels.OLLAMA_QWEN)
return availables return availables
def availables_online() -> list['Models']: def availables_online() -> list['AppModels']:
""" """
Controlla quali provider di modelli LLM online hanno le loro API keys disponibili Controlla quali provider di modelli LLM online hanno le loro API keys disponibili
come variabili d'ambiente e ritorna una lista di provider disponibili. come variabili d'ambiente e ritorna una lista di provider disponibili.
@@ -49,12 +50,12 @@ class Models(Enum):
log_warning("No GOOGLE_API_KEY set in environment variables.") log_warning("No GOOGLE_API_KEY set in environment variables.")
return [] return []
availables = [] availables = []
availables.append(Models.GEMINI) availables.append(AppModels.GEMINI)
availables.append(Models.GEMINI_PRO) availables.append(AppModels.GEMINI_PRO)
return availables return availables
@staticmethod @staticmethod
def availables() -> list['Models']: def availables() -> list['AppModels']:
""" """
Controlla quali provider di modelli LLM locali sono disponibili e quali Controlla quali provider di modelli LLM locali sono disponibili e quali
provider di modelli LLM online hanno le loro API keys disponibili come variabili provider di modelli LLM online hanno le loro API keys disponibili come variabili
@@ -64,8 +65,8 @@ class Models(Enum):
2. Ollama (locale) 2. Ollama (locale)
""" """
availables = [ availables = [
*Models.availables_online(), *AppModels.availables_online(),
*Models.availables_local() *AppModels.availables_local()
] ]
assert availables, "No valid model API keys set in environment variables." assert availables, "No valid model API keys set in environment variables."
return availables return availables
@@ -94,7 +95,7 @@ class Models(Enum):
return response[start:end + 1].strip() return response[start:end + 1].strip()
def get_model(self, instructions:str) -> BaseModel: def get_model(self, instructions:str) -> Model:
""" """
Restituisce un'istanza del modello specificato. Restituisce un'istanza del modello specificato.
instructions: istruzioni da passare al modello (system prompt). instructions: istruzioni da passare al modello (system prompt).
@@ -102,14 +103,14 @@ class Models(Enum):
Raise ValueError se il modello non è supportato. Raise ValueError se il modello non è supportato.
""" """
name = self.value name = self.value
if self in {Models.GEMINI, Models.GEMINI_PRO}: if self in {AppModels.GEMINI, AppModels.GEMINI_PRO}:
return Gemini(name, instructions=[instructions]) return Gemini(name, instructions=[instructions])
elif self in {Models.OLLAMA_GPT, Models.OLLAMA_QWEN}: elif self in {AppModels.OLLAMA_GPT, AppModels.OLLAMA_QWEN}:
return Ollama(name, instructions=[instructions]) return Ollama(name, instructions=[instructions])
raise ValueError(f"Modello non supportato: {self}") raise ValueError(f"Modello non supportato: {self}")
def get_agent(self, instructions: str, name: str = "") -> Agent: def get_agent(self, instructions: str, name: str = "", output: BaseModel | None = None) -> Agent:
""" """
Costruisce un agente con il modello e le istruzioni specificate. Costruisce un agente con il modello e le istruzioni specificate.
instructions: istruzioni da passare al modello (system prompt). instructions: istruzioni da passare al modello (system prompt).
@@ -120,6 +121,6 @@ class Models(Enum):
name=name, name=name,
retries=2, retries=2,
delay_between_retries=5, # seconds delay_between_retries=5, # seconds
use_json_mode=True, # utile per fare in modo che l'agente risponda in JSON (anche se sembra essere solo placebo) output_schema=output # se si usa uno schema di output, lo si passa qui
# TODO Eventuali altri parametri da mettere all'agente anche se si possono comunque assegnare dopo la creazione # TODO Eventuali altri parametri da mettere all'agente anche se si possono comunque assegnare dopo la creazione
) )

View File

@@ -1,9 +1,8 @@
from app.agents.news_agent import NewsAgent from app.agents.news_agent import NewsAgent
from app.agents.social_agent import SocialAgent from app.agents.social_agent import SocialAgent
from app.agents.predictor import PredictorStyle from app.agents.predictor import PredictorStyle, PredictorInput, PredictorOutput, PREDICTOR_INSTRUCTIONS
from app.agents import predictor
from app.markets import MarketAPIs from app.markets import MarketAPIs
from app.models import Models from app.models import AppModels
from agno.utils.log import log_info from agno.utils.log import log_info
class ToolAgent: class ToolAgent:
@@ -15,7 +14,7 @@ class ToolAgent:
""" """
Inizializza l'agente con i modelli disponibili, gli stili e l'API di mercato. Inizializza l'agente con i modelli disponibili, gli stili e l'API di mercato.
""" """
self.available_models = Models.availables() self.available_models = AppModels.availables()
self.all_styles = list(PredictorStyle) self.all_styles = list(PredictorStyle)
self.style = self.all_styles[0] # Default to the first style self.style = self.all_styles[0] # Default to the first style
@@ -31,7 +30,7 @@ class ToolAgent:
# TODO https://docs.agno.com/introduction # TODO https://docs.agno.com/introduction
# Inoltre permette di creare dei team e workflow di agenti più facilmente # Inoltre permette di creare dei team e workflow di agenti più facilmente
self.chosen_model = self.available_models[index] self.chosen_model = self.available_models[index]
self.predictor = self.chosen_model.get_agent(predictor.instructions()) self.predictor = self.chosen_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput)
self.news_agent = NewsAgent() self.news_agent = NewsAgent()
self.social_agent = SocialAgent() self.social_agent = SocialAgent()
@@ -64,18 +63,17 @@ class ToolAgent:
sentiment = f"{news_sentiment}\n{social_sentiment}" sentiment = f"{news_sentiment}\n{social_sentiment}"
# Step 3: previsione # Step 3: previsione
inputs = predictor.prepare_inputs( inputs = PredictorInput(data=market_data, style=self.style, sentiment=sentiment)
data=market_data, result = self.predictor.run(inputs)
style=self.style, prediction: PredictorOutput = result.content
sentiment=sentiment
)
prediction = self.predictor.run(inputs)
output = Models.extract_json_str_from_response(prediction.content)
log_info(f"End of prediction") log_info(f"End of prediction")
market_data = "\n".join([f"{product.symbol}: {product.price}" for product in market_data]) market_data = "\n".join([f"{product.symbol}: {product.price}" for product in market_data])
return f"{market_data}\n{sentiment}\n\n📈 Consiglio finale:\n{output}" output = f"[{prediction.strategy}]\nPortafoglio:\n" + "\n".join(
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
)
return f"INPUT:\n{market_data}\n{sentiment}\n\n\nOUTPUT:\n{output}"
def list_providers(self) -> list[str]: def list_providers(self) -> list[str]:
""" """

View File

@@ -1,19 +1,29 @@
import json
import pytest import pytest
from app.agents import predictor from app.agents.predictor import PREDICTOR_INSTRUCTIONS, PredictorInput, PredictorOutput, PredictorStyle
from app.models import Models from app.markets.base import ProductInfo
from app.models import AppModels
def unified_checks(model: Models, input): def unified_checks(model: AppModels, input):
llm = model.get_agent(predictor.instructions()) llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput)
result = llm.run(input) result = llm.run(input)
content = result.content
print(result.content) assert isinstance(content, PredictorOutput)
potential_json = Models.extract_json_str_from_response(result.content) assert content.strategy not in (None, "", "null")
content = json.loads(potential_json) # Verifica che l'output sia un JSON valido assert isinstance(content.strategy, str)
assert isinstance(content.portfolio, list)
assert content['strategia'] is not None assert len(content.portfolio) > 0
assert isinstance(content['portafoglio'], list) for item in content.portfolio:
assert abs(sum(item['percentuale'] for item in content['portafoglio']) - 100) < 0.01 # La somma deve essere esattamente 100 assert item.asset not in (None, "", "null")
assert isinstance(item.asset, str)
assert item.percentage > 0
assert item.percentage <= 100
assert isinstance(item.percentage, (int, float))
assert item.motivation not in (None, "", "null")
assert isinstance(item.motivation, str)
# La somma delle percentuali deve essere esattamente 100
total_percentage = sum(item.percentage for item in content.portfolio)
assert abs(total_percentage - 100) < 0.01 # Permette una piccola tolleranza per errori di arrotondamento
class TestPredictor: class TestPredictor:
@@ -21,23 +31,19 @@ class TestPredictor:
def inputs(self): def inputs(self):
data = [] data = []
for symbol, price in [("BTC", 60000.00), ("ETH", 3500.00), ("SOL", 150.00)]: for symbol, price in [("BTC", 60000.00), ("ETH", 3500.00), ("SOL", 150.00)]:
product_info = predictor.ProductInfo() product_info = ProductInfo()
product_info.symbol = symbol product_info.symbol = symbol
product_info.price = price product_info.price = price
data.append(product_info) data.append(product_info)
return predictor.prepare_inputs( return PredictorInput(data=data, style=PredictorStyle.AGGRESSIVE, sentiment="positivo")
data=data,
style=predictor.PredictorStyle.AGGRESSIVE,
sentiment="positivo"
)
def test_gemini_model_output(self, inputs): def test_gemini_model_output(self, inputs):
unified_checks(Models.GEMINI, inputs) unified_checks(AppModels.GEMINI, inputs)
def test_ollama_qwen_model_output(self, inputs):
unified_checks(AppModels.OLLAMA_QWEN, inputs)
@pytest.mark.slow @pytest.mark.slow
def test_ollama_gpt_oss_model_output(self, inputs): def test_ollama_gpt_oss_model_output(self, inputs):
unified_checks(Models.OLLAMA_GPT, inputs) unified_checks(AppModels.OLLAMA_GPT, inputs)
def test_ollama_qwen_model_output(self, inputs):
unified_checks(Models.OLLAMA_QWEN, inputs)