- Refactor struttura progetto: divisione tra agent e toolkit

This commit is contained in:
trojanhorse47
2025-09-30 12:28:44 +02:00
parent c82f10b32c
commit fcbb312d08
15 changed files with 266 additions and 114 deletions

0
src/__init__.py Normal file
View File

View File

@@ -1,8 +1,8 @@
import gradio as gr
from dotenv import load_dotenv
from app.tool import ToolAgent
from agno.utils.log import log_info
from dotenv import load_dotenv
from app.pipeline import Pipeline
########################################
# MAIN APP & GRADIO INTERFACE
@@ -16,31 +16,31 @@ if __name__ == "__main__":
load_dotenv()
######################################
tool_agent = ToolAgent()
pipeline = Pipeline()
with gr.Blocks() as demo:
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto")
with gr.Row():
provider = gr.Dropdown(
choices=tool_agent.list_providers(),
choices=pipeline.list_providers(),
type="index",
label="Modello da usare"
)
provider.change(fn=tool_agent.choose_provider, inputs=provider, outputs=None)
provider.change(fn=pipeline.choose_provider, inputs=provider, outputs=None)
style = gr.Dropdown(
choices=tool_agent.list_styles(),
choices=pipeline.list_styles(),
type="index",
label="Stile di investimento"
)
style.change(fn=tool_agent.choose_style, inputs=style, outputs=None)
style.change(fn=pipeline.choose_style, inputs=style, outputs=None)
user_input = gr.Textbox(label="Richiesta utente")
output = gr.Textbox(label="Risultato analisi", lines=12)
analyze_btn = gr.Button("🔎 Analizza")
analyze_btn.click(fn=tool_agent.interact, inputs=[user_input], outputs=output)
analyze_btn.click(fn=pipeline.interact, inputs=[user_input], outputs=output)
server, port = ("0.0.0.0", 8000)
log_info(f"Starting UPO AppAI on http://{server}:{port}")

View File

@@ -0,0 +1,92 @@
from typing import Union, List, Dict, Optional, Any, Iterator, Sequence
from agno.agent import Agent
from agno.models.message import Message
from agno.run.agent import RunOutput, RunOutputEvent
from pydantic import BaseModel
from src.app.toolkits.market_toolkit import MarketToolkit
from src.app.markets.base import ProductInfo # modello dati già definito nel tuo progetto
class MarketAgent(Agent):
"""
Wrapper che trasforma MarketToolkit in un Agent compatibile con Team.
Produce sia output leggibile (content) che dati strutturati (metadata).
"""
def __init__(self, currency: str = "USD"):
super().__init__()
self.toolkit = MarketToolkit()
self.currency = currency
self.name = "MarketAgent"
def run(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[Sequence[Any]] = None,
images: Optional[Sequence[Any]] = None,
videos: Optional[Sequence[Any]] = None,
files: Optional[Sequence[Any]] = None,
retries: Optional[int] = None,
knowledge_filters: Optional[Dict[str, Any]] = None,
add_history_to_context: Optional[bool] = None,
add_dependencies_to_context: Optional[bool] = None,
add_session_state_to_context: Optional[bool] = None,
dependencies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
yield_run_response: bool = False,
debug_mode: Optional[bool] = None,
**kwargs: Any,
) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]:
# 1. Estraggo la query dal parametro "input"
if isinstance(input, str):
query = input
elif isinstance(input, dict) and "query" in input:
query = input["query"]
elif isinstance(input, Message):
query = input.content
elif isinstance(input, BaseModel):
query = str(input)
elif isinstance(input, list) and input and isinstance(input[0], Message):
query = input[0].content
else:
query = str(input)
# 2. Individuo i simboli da analizzare
symbols = []
for token in query.upper().split():
if token in ("BTC", "ETH", "XRP", "LTC", "BCH"): # TODO: estendere dinamicamente
symbols.append(token)
if not symbols:
symbols = ["BTC", "ETH"] # default
# 3. Recupero i dati dal toolkit
results = []
products: List[ProductInfo] = []
for sym in symbols:
try:
product = self.toolkit.get_current_price(sym) # supponiamo ritorni un ProductInfo o simile
if isinstance(product, list):
products.extend(product)
else:
products.append(product)
results.append(f"{sym}: {product.price if hasattr(product, 'price') else product}")
except Exception as e:
results.append(f"{sym}: errore ({e})")
# 4. Preparo output leggibile + metadati strutturati
output_text = "📊 Dati di mercato:\n" + "\n".join(results)
return RunOutput(
content=output_text,
metadata={"products": products}
)

View File

@@ -1,4 +1,34 @@
class NewsAgent:
from agno.agent import Agent
class NewsAgent(Agent):
"""
Gli agenti devono esporre un metodo run con questa firma.
def run(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[Sequence[Any]] = None,
images: Optional[Sequence[Any]] = None,
videos: Optional[Sequence[Any]] = None,
files: Optional[Sequence[Any]] = None,
retries: Optional[int] = None,
knowledge_filters: Optional[Dict[str, Any]] = None,
add_history_to_context: Optional[bool] = None,
add_dependencies_to_context: Optional[bool] = None,
add_session_state_to_context: Optional[bool] = None,
dependencies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
yield_run_response: bool = False,
debug_mode: Optional[bool] = None,
**kwargs: Any,
) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]:
"""
@staticmethod
def analyze(query: str) -> str:
# Mock analisi news

View File

@@ -1,4 +1,35 @@
class SocialAgent:
from agno.agent import Agent
class SocialAgent(Agent):
"""
Gli agenti devono esporre un metodo run con questa firma.
def run(
self,
input: Union[str, List, Dict, Message, BaseModel, List[Message]],
*,
stream: Optional[bool] = None,
stream_intermediate_steps: Optional[bool] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
session_state: Optional[Dict[str, Any]] = None,
audio: Optional[Sequence[Any]] = None,
images: Optional[Sequence[Any]] = None,
videos: Optional[Sequence[Any]] = None,
files: Optional[Sequence[Any]] = None,
retries: Optional[int] = None,
knowledge_filters: Optional[Dict[str, Any]] = None,
add_history_to_context: Optional[bool] = None,
add_dependencies_to_context: Optional[bool] = None,
add_session_state_to_context: Optional[bool] = None,
dependencies: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
yield_run_response: bool = False,
debug_mode: Optional[bool] = None,
**kwargs: Any,
) -> Union[RunOutput, Iterator[Union[RunOutputEvent, RunOutput]]]:
"""
@staticmethod
def analyze(query: str) -> str:
# Mock analisi social

View File

@@ -1,8 +1,9 @@
from agno.utils.log import log_warning
from src.app.markets.base import BaseWrapper
from src.app.markets.coinbase import CoinBaseWrapper
from src.app.markets.cryptocompare import CryptoCompareWrapper
from agno.utils.log import log_warning
class MarketAPIs(BaseWrapper):
"""

View File

@@ -1,7 +1,10 @@
import os
from coinbase.rest import RESTClient
from src.app.markets.base import ProductInfo, BaseWrapper, Price
class CoinBaseWrapper(BaseWrapper):
"""
Wrapper per le API di Coinbase.

View File

@@ -1,5 +1,7 @@
import os
import requests
from src.app.markets.base import ProductInfo, BaseWrapper, Price
BASE_URL = "https://min-api.cryptocompare.com"

View File

@@ -1,13 +1,14 @@
import os
import requests
from enum import Enum
from pydantic import BaseModel
import requests
from agno.agent import Agent
from agno.models.base import Model
from agno.models.google import Gemini
from agno.models.ollama import Ollama
from agno.utils.log import log_warning
from pydantic import BaseModel
class AppModels(Enum):
"""

84
src/app/pipeline.py Normal file
View File

@@ -0,0 +1,84 @@
from typing import List
from agno.team import Team
from agno.utils.log import log_info
from app.agents.market_agent import MarketAgent
from src.app.agents.news_agent import NewsAgent
from src.app.agents.social_agent import SocialAgent
from src.app.markets import MarketAPIs
from src.app.models import AppModels
from src.app.predictor import PredictorStyle, PredictorInput, PredictorOutput, PREDICTOR_INSTRUCTIONS
class Pipeline:
"""
Pipeline coordinata: esegue tutti gli agenti del Team, aggrega i risultati e invoca il Predictor.
"""
def __init__(self):
# Inizializza gli agenti
self.market_agent = MarketAgent()
self.news_agent = NewsAgent()
self.social_agent = SocialAgent()
# Crea il Team
self.team = Team(name="CryptoAnalysisTeam", members=[self.market_agent, self.news_agent, self.social_agent])
# Modelli disponibili e Predictor
self.available_models = AppModels.availables()
self.predictor_model = self.available_models[0]
self.predictor = self.predictor_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type]
# Stili
self.styles = list(PredictorStyle)
self.style = self.styles[0]
def choose_provider(self, index: int):
self.predictor_model = self.available_models[index]
self.predictor = self.predictor_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type]
def choose_style(self, index: int):
self.style = self.styles[index]
def interact(self, query: str) -> str:
"""
Esegue il Team (Market + News + Social), aggrega i risultati e invoca il Predictor.
"""
# Step 1: raccogli output del Team
team_results = self.team.run(query)
if isinstance(team_results, dict): # alcuni Team possono restituire dict
pieces = [str(v) for v in team_results.values()]
elif isinstance(team_results, list):
pieces = [str(r) for r in team_results]
else:
pieces = [str(team_results)]
aggregated_text = "\n\n".join(pieces)
# Step 2: prepara input per Predictor
predictor_input = PredictorInput(
data=[], # TODO: mappare meglio i dati di mercato in ProductInfo
style=self.style,
sentiment=aggregated_text
)
# Step 3: chiama Predictor
result = self.predictor.run(predictor_input)
prediction: PredictorOutput = result.content
# Step 4: formatta output finale
portfolio_lines = "\n".join(
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
)
output = (
f"📊 Strategia ({self.style.value}): {prediction.strategy}\n\n"
f"💼 Portafoglio consigliato:\n{portfolio_lines}"
)
return output
def list_providers(self) -> List[str]:
return [m.name for m in self.available_models]
def list_styles(self) -> List[str]:
return [s.value for s in self.styles]

View File

@@ -1,7 +1,10 @@
from enum import Enum
from src.app.markets.base import ProductInfo
from pydantic import BaseModel, Field
from src.app.markets.base import ProductInfo
class PredictorStyle(Enum):
CONSERVATIVE = "Conservativo"
AGGRESSIVE = "Aggressivo"

View File

@@ -1,97 +0,0 @@
from src.app.agents.news_agent import NewsAgent
from src.app.agents.social_agent import SocialAgent
from src.app.agents.predictor import PredictorStyle, PredictorInput, PredictorOutput, PREDICTOR_INSTRUCTIONS
from src.app.markets import MarketAPIs
from src.app.models import AppModels
from agno.utils.log import log_info
class ToolAgent:
"""
Classe principale che coordina gli agenti per rispondere alle richieste dell'utente.
"""
def __init__(self):
"""
Inizializza l'agente con i modelli disponibili, gli stili e l'API di mercato.
"""
self.social_agent = None
self.news_agent = None
self.predictor = None
self.chosen_model = None
self.available_models = AppModels.availables()
self.all_styles = list(PredictorStyle)
self.style = self.all_styles[0] # Default to the first style
self.market = MarketAPIs(currency="USD")
self.choose_provider(0) # Default to the first model
def choose_provider(self, index: int):
"""
Sceglie il modello LLM da utilizzare in base all'indice fornito.
Args:
index: indice del modello nella lista available_models.
"""
# TODO Utilizzare AGNO per gestire i modelli... è molto più semplice e permette di cambiare modello facilmente
# TODO https://docs.agno.com/introduction
# Inoltre permette di creare dei team e workflow di agenti più facilmente
self.chosen_model = self.available_models[index]
self.predictor = self.chosen_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput)
self.news_agent = NewsAgent()
self.social_agent = SocialAgent()
def choose_style(self, index: int):
"""
Sceglie lo stile di previsione da utilizzare in base all'indice fornito.
Args:
index: indice dello stile nella lista all_styles.
"""
self.style = self.all_styles[index]
def interact(self, query: str) -> str:
"""
Funzione principale che coordina gli agenti per rispondere alla richiesta dell'utente.
Args:
query: richiesta dell'utente (es. "Qual è la previsione per Bitcoin?")
"""
log_info(f"[model={self.chosen_model.name}] [style={self.style.name}] [query=\"{query.replace('"', "'")}\"]")
# TODO Step 0: ricerca e analisi della richiesta (es. estrazione di criptovalute specifiche)
# Prendere la query dell'utente e fare un'analisi preliminare con una agente o con un team di agenti (social e news)
# Step 1: raccolta analisi
cryptos = ["BTC", "ETH", "XRP", "LTC", "BCH"] # TODO rendere dinamico in futuro
market_data = self.market.get_products(cryptos)
news_sentiment = self.news_agent.analyze(query)
social_sentiment = self.social_agent.analyze(query)
log_info(f"End of data collection")
# Step 2: aggrega sentiment
sentiment = f"{news_sentiment}\n{social_sentiment}"
# Step 3: previsione
inputs = PredictorInput(data=market_data, style=self.style, sentiment=sentiment)
result = self.predictor.run(inputs)
prediction: PredictorOutput = result.content
log_info(f"End of prediction")
market_data = "\n".join([f"{product.symbol}: {product.price}" for product in market_data])
output = f"[{prediction.strategy}]\nPortafoglio:\n" + "\n".join(
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
)
return f"INPUT:\n{market_data}\n{sentiment}\n\n\nOUTPUT:\n{output}"
def list_providers(self) -> list[str]:
"""
Restituisce la lista dei nomi dei modelli disponibili.
"""
return [model.name for model in self.available_models]
def list_styles(self) -> list[str]:
"""
Restituisce la lista degli stili di previsione disponibili.
"""
return [style.value for style in self.all_styles]

View File

View File

@@ -1,6 +1,8 @@
from agno.tools import Toolkit
from src.app.markets import MarketAPIs
# TODO (?) in futuro fare in modo che la LLM faccia da sé per il mercato
# Non so se può essere utile, per ora lo lascio qui
# per ora mettiamo tutto statico e poi, se abbiamo API-Key senza limiti

View File

@@ -1,6 +1,6 @@
import os
import pytest
from src.app.agents.market import MarketToolkit
from src.app.agents.market_toolkit import MarketToolkit
from src.app.markets.base import BaseWrapper
from src.app.markets.coinbase import CoinBaseWrapper
from src.app.markets.cryptocompare import CryptoCompareWrapper