Refactor market system and improve app configuration

- Refactor market system from singleton function to class-based architecture with MarketAPIs
- Add automatic API key detection for Coinbase and CryptoCompare wrappers
- Improve error handling and logging with agno.utils.log throughout the application
- Split Models class into separate methods for online and local model availability
- Add proper JSON response extraction with thinking pattern support
- Enhance ToolAgent with better state management for provider and style selection
- Update Gradio app with proper server configuration and logging
This commit is contained in:
2025-09-27 17:49:32 +02:00
parent a51ec67ac1
commit 03d8523a5a
8 changed files with 143 additions and 73 deletions

View File

@@ -2,6 +2,7 @@ import gradio as gr
from dotenv import load_dotenv
from app.tool import ToolAgent
from agno.utils.log import log_info
########################################
# MAIN APP & GRADIO INTERFACE
@@ -33,10 +34,14 @@ if __name__ == "__main__":
type="index",
label="Stile di investimento"
)
style.change(fn=tool_agent.choose_style, inputs=style, outputs=None)
user_input = gr.Textbox(label="Richiesta utente")
output = gr.Textbox(label="Risultato analisi", lines=12)
analyze_btn = gr.Button("🔎 Analizza")
analyze_btn.click(fn=tool_agent.interact, inputs=[user_input, style], outputs=output)
demo.launch(server_name="0.0.0.0", server_port=8000)
analyze_btn.click(fn=tool_agent.interact, inputs=[user_input], outputs=output)
server, port = ("0.0.0.0", 8000)
log_info(f"Starting UPO AppAI on http://{server}:{port}")
demo.launch(server_name=server, server_port=port, quiet=True)

View File

@@ -1,5 +1,5 @@
from agno.tools import Toolkit
from app.markets import get_first_available_market_api
from app.markets import MarketAPIs
# TODO (?) in futuro fare in modo che la LLM faccia da sé per il mercato
# Non so se può essere utile, per ora lo lascio qui
@@ -8,7 +8,7 @@ from app.markets import get_first_available_market_api
# in base alle sue proprie chiamate API
class MarketToolkit(Toolkit):
def __init__(self):
self.market_agent = get_first_available_market_api("USD") # change currency if needed
self.market_api = MarketAPIs("USD") # change currency if needed
super().__init__(
name="Market Toolkit",
@@ -19,10 +19,10 @@ class MarketToolkit(Toolkit):
)
def get_historical_data(self, symbol: str):
return self.market_agent.get_historical_prices(symbol)
return self.market_api.get_historical_prices(symbol)
def get_current_price(self, symbol: str):
return self.market_agent.get_products(symbol)
return self.market_api.get_products(symbol)
def prepare_inputs():
pass

View File

@@ -1,38 +1,57 @@
import os
from app.markets.base import BaseWrapper
from app.markets.coinbase import CoinBaseWrapper
from app.markets.cryptocompare import CryptoCompareWrapper
# TODO Dare la priorità in base alla qualità del servizio
# TODO Aggiungere altri wrapper se necessario
def get_first_available_market_api(currency:str = "USD") -> BaseWrapper:
"""
Restituisce il primo wrapper disponibile in base alle configurazioni del file .env e alle chiavi API presenti.
La priorità è data a Coinbase, poi a CryptoCompare.
Se non sono presenti chiavi API, restituisce una eccezione.
:param currency: Valuta di riferimento (default "USD")
:return: Lista di istanze di wrapper
"""
return get_list_available_market_apis(currency=currency)[0]
from agno.utils.log import log_warning
class MarketAPIs(BaseWrapper):
"""
Classe per gestire le API di mercato disponibili.
Permette di ottenere un'istanza della prima API disponibile in base alla priorità specificata.
"""
@staticmethod
def get_list_available_market_apis(currency: str = "USD") -> list[BaseWrapper]:
"""
Restituisce la lista di wrapper disponibili in base alle configurazioni del file .env e alle chiavi API presenti.
La priorità è data a Coinbase, poi a CryptoCompare.
Se non sono presenti chiavi API, restituisce una eccezione.
Restituisce una lista di istanze delle API di mercato disponibili.
La priorità è data dall'ordine delle API nella lista wrappers.
1. CoinBase
2. CryptoCompare
:param currency: Valuta di riferimento (default "USD")
:return: Lista di istanze di wrapper
:return: Lista di istanze delle API di mercato disponibili
"""
wrappers = []
wrapper_builders = [
CoinBaseWrapper,
CryptoCompareWrapper,
]
api_key = os.getenv("COINBASE_API_KEY")
api_secret = os.getenv("COINBASE_API_SECRET")
if api_key and api_secret:
wrappers.append(CoinBaseWrapper(api_key=api_key, api_private_key=api_secret, currency=currency))
result = []
for wrapper in wrapper_builders:
try:
result.append(wrapper(currency=currency))
except Exception as _:
log_warning(f"{wrapper} cannot be initialized, maybe missing API key?")
api_key = os.getenv("CRYPTOCOMPARE_API_KEY")
if api_key:
wrappers.append(CryptoCompareWrapper(api_key=api_key, currency=currency))
assert result, "No market API keys set in environment variables."
return result
assert wrappers, "No valid API keys set in environment variables."
return wrappers
def __init__(self, currency: str = "USD"):
"""
Inizializza la classe con la valuta di riferimento e la priorità dei provider.
:param currency: Valuta di riferimento (default "USD")
"""
self.currency = currency
self.wrappers = MarketAPIs.get_list_available_market_apis(currency=currency)
# Metodi che semplicemente chiamano il metodo corrispondente del primo wrapper disponibile
# TODO magari fare in modo che se il primo fallisce, prova con il secondo, ecc.
# oppure fare un round-robin tra i vari wrapper oppure usarli tutti e fare una media dei risultati
def get_product(self, asset_id):
return self.wrappers[0].get_product(asset_id)
def get_products(self, asset_ids: list):
return self.wrappers[0].get_products(asset_ids)
def get_all_products(self):
return self.wrappers[0].get_all_products()
def get_historical_prices(self, asset_id = "BTC"):
return self.wrappers[0].get_historical_prices(asset_id)

View File

@@ -1,3 +1,4 @@
import os
from coinbase.rest import RESTClient
from app.markets.base import ProductInfo, BaseWrapper, Price
@@ -6,8 +7,13 @@ class CoinBaseWrapper(BaseWrapper):
Wrapper per le API di Coinbase.
La documentazione delle API è disponibile qui: https://docs.cdp.coinbase.com/api-reference/advanced-trade-api/rest-api/introduction
"""
def __init__(self, api_key:str, api_private_key:str, currency: str = "USD"):
def __init__(self, api_key:str = None, api_private_key:str = None, currency: str = "USD"):
if api_key is None:
api_key = os.getenv("COINBASE_API_KEY")
assert api_key is not None, "API key is required"
if api_private_key is None:
api_private_key = os.getenv("COINBASE_API_SECRET")
assert api_private_key is not None, "API private key is required"
self.currency = currency

View File

@@ -1,3 +1,4 @@
import os
import requests
from app.markets.base import ProductInfo, BaseWrapper, Price
@@ -9,7 +10,9 @@ class CryptoCompareWrapper(BaseWrapper):
La documentazione delle API è disponibile qui: https://developers.coindesk.com/documentation/legacy/Price/SingleSymbolPriceEndpoint
!!ATTENZIONE!! sembra essere una API legacy e potrebbe essere deprecata in futuro.
"""
def __init__(self, api_key:str, currency:str='USD'):
def __init__(self, api_key:str = None, currency:str='USD'):
if api_key is None:
api_key = os.getenv("CRYPTOCOMPARE_API_KEY")
assert api_key is not None, "API key is required"
self.api_key = api_key

View File

@@ -6,6 +6,8 @@ from agno.models.base import BaseModel
from agno.models.google import Gemini
from agno.models.ollama import Ollama
from agno.utils.log import log_warning
class Models(Enum):
"""
Enum per i modelli supportati.
@@ -19,29 +21,52 @@ class Models(Enum):
OLLAMA_QWEN = "qwen3:latest" # + good + fast (8b)
@staticmethod
def availables() -> list['Models']:
def availables_local() -> list['Models']:
"""
Controlla quali provider di modelli LLM hanno le loro API keys disponibili
come variabili d'ambiente e ritorna una lista di provider disponibili.
L'ordine di preferenza è:
1. Gemini (Google)
2. Ollama (locale)
Controlla quali provider di modelli LLM locali sono disponibili.
Ritorna una lista di provider disponibili.
"""
availables = []
if os.getenv("GOOGLE_API_KEY"):
availables.append(Models.GEMINI)
availables.append(Models.GEMINI_PRO)
ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434")
result = requests.get(f"{ollama_host}/api/tags")
print(result)
if result.status_code == 200:
if result.status_code != 200:
log_warning(f"Ollama is not running or not reachable {result}")
return []
availables = []
result = result.text
if Models.OLLAMA_GPT.value in result:
availables.append(Models.OLLAMA_GPT)
if Models.OLLAMA_QWEN.value in result:
availables.append(Models.OLLAMA_QWEN)
return availables
def availables_online() -> list['Models']:
"""
Controlla quali provider di modelli LLM online hanno le loro API keys disponibili
come variabili d'ambiente e ritorna una lista di provider disponibili.
"""
if not os.getenv("GOOGLE_API_KEY"):
log_warning("No GOOGLE_API_KEY set in environment variables.")
return []
availables = []
availables.append(Models.GEMINI)
availables.append(Models.GEMINI_PRO)
return availables
@staticmethod
def availables() -> list['Models']:
"""
Controlla quali provider di modelli LLM locali sono disponibili e quali
provider di modelli LLM online hanno le loro API keys disponibili come variabili
d'ambiente e ritorna una lista di provider disponibili.
L'ordine di preferenza è:
1. Gemini (Google)
2. Ollama (locale)
"""
availables = [
*Models.availables_online(),
*Models.availables_local()
]
assert availables, "No valid model API keys set in environment variables."
return availables
@@ -56,6 +81,10 @@ class Models(Enum):
in tutti i casi. Si assume che il JSON sia ben formato e che inizi con
'{' e finisca con '}'. Quindi anche solo un json array farà fallire questa funzione.
"""
think = response.rfind("</think>")
if think != -1:
response = response[think:]
start = response.find("{")
assert start != -1, "No JSON found in the response."
@@ -89,10 +118,8 @@ class Models(Enum):
return Agent(
model=self.get_model(instructions),
name=name,
use_json_mode=True,
# TODO Eventuali altri parametri da mettere all'agente
# anche se si possono comunque assegnare dopo la creazione
# Esempio:
# retries=2,
# retry_delay=1,
retries=2,
delay_between_retries=5, # seconds
use_json_mode=True, # utile per fare in modo che l'agente risponda in JSON (anche se sembra essere solo placebo)
# TODO Eventuali altri parametri da mettere all'agente anche se si possono comunque assegnare dopo la creazione
)

View File

@@ -1,10 +1,10 @@
from app.agents.news_agent import NewsAgent
from app.agents.social_agent import SocialAgent
from app.agents import predictor
from app.agents.predictor import PredictorStyle
from app.markets import get_first_available_market_api
from app.agents import predictor
from app.markets import MarketAPIs
from app.models import Models
from agno.utils.log import log_info
class ToolAgent:
"""
@@ -17,8 +17,9 @@ class ToolAgent:
"""
self.available_models = Models.availables()
self.all_styles = list(PredictorStyle)
self.style = self.all_styles[0] # Default to the first style
self.market = get_first_available_market_api(currency="USD")
self.market = MarketAPIs(currency="USD")
self.choose_provider(0) # Default to the first model
def choose_provider(self, index: int):
@@ -29,18 +30,26 @@ class ToolAgent:
# TODO Utilizzare AGNO per gestire i modelli... è molto più semplice e permette di cambiare modello facilmente
# TODO https://docs.agno.com/introduction
# Inoltre permette di creare dei team e workflow di agenti più facilmente
chosen_model = self.available_models[index]
self.predictor = chosen_model.get_agent(predictor.instructions())
self.chosen_model = self.available_models[index]
self.predictor = self.chosen_model.get_agent(predictor.instructions())
self.news_agent = NewsAgent()
self.social_agent = SocialAgent()
def interact(self, query: str, style_index: int):
def choose_style(self, index: int):
"""
Sceglie lo stile di previsione da utilizzare in base all'indice fornito.
index: indice dello stile nella lista all_styles.
"""
self.style = self.all_styles[index]
def interact(self, query: str) -> str:
"""
Funzione principale che coordina gli agenti per rispondere alla richiesta dell'utente.
query: richiesta dell'utente (es. "Qual è la previsione per Bitcoin?")
style_index: indice dello stile di previsione nella lista all_styles.
"""
log_info(f"[model={self.chosen_model.name}] [style={self.style.name}] [query=\"{query.replace('"', "'")}\"]")
# TODO Step 0: ricerca e analisi della richiesta (es. estrazione di criptovalute specifiche)
# Prendere la query dell'utente e fare un'analisi preliminare con una agente o con un team di agenti (social e news)
@@ -49,6 +58,7 @@ class ToolAgent:
market_data = self.market.get_products(cryptos)
news_sentiment = self.news_agent.analyze(query)
social_sentiment = self.social_agent.analyze(query)
log_info(f"End of data collection")
# Step 2: aggrega sentiment
sentiment = f"{news_sentiment}\n{social_sentiment}"
@@ -56,12 +66,13 @@ class ToolAgent:
# Step 3: previsione
inputs = predictor.prepare_inputs(
data=market_data,
style=self.all_styles[style_index],
style=self.style,
sentiment=sentiment
)
prediction = self.predictor.run(inputs)
output = Models.extract_json_str_from_response(prediction.content)
log_info(f"End of prediction")
market_data = "\n".join([f"{product.symbol}: {product.price}" for product in market_data])
return f"{market_data}\n{sentiment}\n\n📈 Consiglio finale:\n{output}"

View File

@@ -4,15 +4,14 @@ from app.agents.market import MarketToolkit
from app.markets.base import BaseWrapper
from app.markets.coinbase import CoinBaseWrapper
from app.markets.cryptocompare import CryptoCompareWrapper
from app.markets import get_first_available_market_api
from app.markets import MarketAPIs
class TestMarketSystem:
"""Test suite per il sistema di mercato (wrappers + toolkit)"""
@pytest.fixture(scope="class")
def market_wrapper(self) -> BaseWrapper:
first = get_first_available_market_api("USD")
return first
return MarketAPIs("USD")
def test_wrapper_initialization(self, market_wrapper):
assert market_wrapper is not None
@@ -51,7 +50,7 @@ class TestMarketSystem:
toolkit = MarketToolkit()
assert toolkit is not None
assert hasattr(toolkit, 'market_agent')
assert toolkit.market_agent is not None
assert toolkit.market_api is not None
tools = toolkit.tools
assert len(tools) > 0
@@ -122,9 +121,9 @@ class TestMarketSystem:
if potential_providers == 0:
with pytest.raises(AssertionError, match="No valid API keys"):
get_first_available_market_api()
MarketAPIs.get_list_available_market_apis()
else:
wrapper = get_first_available_market_api("USD")
wrapper = MarketAPIs("USD")
assert wrapper is not None
assert hasattr(wrapper, 'get_product')