* Refactor market agent and toolkit to support batch price retrieval

* 1. Correzione del modello base del Team: inizializzato con qwen3:latest
2. Modifica dell'interfaccia e inserimento di un ChatManager per gestire interazione, salvataggio e caricamento della chat.

* * Fix degli import
+ Aggiunta cancellazione casella di input all'invio della richiesta dell'utente

* Riorganizzazione degli import per utilizzare il percorso corretto in tutti i moduli

* Remove unused imports from __init__.py

* Update __all__ in __init__.py to include MARKET_INSTRUCTIONS

---------

Co-authored-by: Berack96 <giacomobertolazzi7@gmail.com>
This commit was merged in pull request #15.
This commit is contained in:
trojanhorse47
2025-10-03 11:42:11 +02:00
committed by GitHub
parent d2fbc0ceea
commit 85153c405b
5 changed files with 236 additions and 82 deletions

View File

@@ -1,48 +1,83 @@
import gradio as gr
from dotenv import load_dotenv
from app.pipeline import Pipeline
from agno.utils.log import log_info
from dotenv import load_dotenv
from app.chat_manager import ChatManager
########################################
# MAIN APP & GRADIO INTERFACE
# MAIN APP & GRADIO CHAT INTERFACE
########################################
if __name__ == "__main__":
######################################
# DA FARE PRIMA DI ESEGUIRE L'APP
# qui carichiamo le variabili d'ambiente dal file .env
# una volta fatto, possiamo usare le API keys senza problemi
# quindi non è necessario richiamare load_dotenv() altrove
# Carica variabili dambiente (.env)
load_dotenv()
######################################
pipeline = Pipeline()
# Inizializza ChatManager
chat = ChatManager()
########################################
# Funzioni Gradio
########################################
def respond(message, history):
response = chat.send_message(message)
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": response})
return history, history, ""
def save_current_chat():
chat.save_chat("chat.json")
return "💾 Chat salvata in chat.json"
def load_previous_chat():
chat.load_chat("chat.json")
history = []
for m in chat.get_history():
history.append({"role": m["role"], "content": m["content"]})
return history, history
def reset_chat():
chat.reset_chat()
return [], []
########################################
# Interfaccia Gradio
########################################
with gr.Blocks() as demo:
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto")
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)")
# Dropdown provider e stile
with gr.Row():
provider = gr.Dropdown(
choices=pipeline.list_providers(),
choices=chat.list_providers(),
type="index",
label="Modello da usare"
)
provider.change(fn=pipeline.choose_provider, inputs=provider, outputs=None)
provider.change(fn=chat.choose_provider, inputs=provider, outputs=None)
style = gr.Dropdown(
choices=pipeline.list_styles(),
choices=chat.list_styles(),
type="index",
label="Stile di investimento"
)
style.change(fn=pipeline.choose_style, inputs=style, outputs=None)
style.change(fn=chat.choose_style, inputs=style, outputs=None)
user_input = gr.Textbox(label="Richiesta utente")
output = gr.Textbox(label="Risultato analisi", lines=12)
chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages")
msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?")
analyze_btn = gr.Button("🔎 Analizza")
analyze_btn.click(fn=pipeline.interact, inputs=[user_input], outputs=output)
with gr.Row():
clear_btn = gr.Button("🗑️ Reset Chat")
save_btn = gr.Button("💾 Salva Chat")
load_btn = gr.Button("📂 Carica Chat")
server, port = ("0.0.0.0", 8000) # 0.0.0.0 per docker compatibility
# Invio messaggio
msg.submit(respond, inputs=[msg, chatbot], outputs=[chatbot, chatbot, msg])
# Reset
clear_btn.click(reset_chat, inputs=None, outputs=[chatbot, chatbot])
# Salvataggio
save_btn.click(save_current_chat, inputs=None, outputs=None)
# Caricamento
load_btn.click(load_previous_chat, inputs=None, outputs=[chatbot, chatbot])
server, port = ("0.0.0.0", 8000)
server_log = "localhost" if server == "0.0.0.0" else server
log_info(f"Starting UPO AppAI on http://{server_log}:{port}")
log_info(f"Starting UPO AppAI Chat on http://{server_log}:{port}") # noqa
demo.launch(server_name=server, server_port=port, quiet=True)

78
src/app/chat_manager.py Normal file
View File

@@ -0,0 +1,78 @@
import os
import json
from typing import List, Dict
from app.pipeline import Pipeline
SAVE_DIR = os.path.join(os.path.dirname(__file__), "..", "saves")
os.makedirs(SAVE_DIR, exist_ok=True)
class ChatManager:
"""
Gestisce la conversazione con la Pipeline:
- mantiene lo storico dei messaggi
- invoca la Pipeline per generare risposte
- salva e ricarica le chat
"""
def __init__(self):
self.pipeline = Pipeline()
self.history: List[Dict[str, str]] = [] # [{"role": "user"/"assistant", "content": "..."}]
def send_message(self, message: str) -> str:
"""
Aggiunge un messaggio utente, chiama la Pipeline e salva la risposta nello storico.
"""
# Aggiungi messaggio utente allo storico
self.history.append({"role": "user", "content": message})
# Pipeline elabora la query
response = self.pipeline.interact(message)
# Aggiungi risposta assistente allo storico
self.history.append({"role": "assistant", "content": response})
return response
def save_chat(self, filename: str = "chat.json") -> None:
"""
Salva la chat corrente in src/saves/<filename>.
"""
path = os.path.join(SAVE_DIR, filename)
with open(path, "w", encoding="utf-8") as f:
json.dump(self.history, f, ensure_ascii=False, indent=2)
def load_chat(self, filename: str = "chat.json") -> None:
"""
Carica una chat salvata da src/saves/<filename>.
"""
path = os.path.join(SAVE_DIR, filename)
if not os.path.exists(path):
self.history = []
return
with open(path, "r", encoding="utf-8") as f:
self.history = json.load(f)
def reset_chat(self) -> None:
"""
Resetta lo storico della chat.
"""
self.history = []
def get_history(self) -> List[Dict[str, str]]:
"""
Restituisce lo storico completo della chat.
"""
return self.history
# Facciamo pass-through di provider e style, così Gradio può usarli
def choose_provider(self, index: int):
self.pipeline.choose_provider(index)
def choose_style(self, index: int):
self.pipeline.choose_style(index)
def list_providers(self) -> List[str]:
return self.pipeline.list_providers()
def list_styles(self) -> List[str]:
return self.pipeline.list_styles()

View File

@@ -7,7 +7,7 @@ from .binance import BinanceWrapper
from .cryptocompare import CryptoCompareWrapper
from .yfinance import YFinanceWrapper
__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper" ]
__all__ = [ "MarketAPIsTool", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "MARKET_INSTRUCTIONS" ]
class MarketAPIsTool(BaseWrapper, Toolkit):

View File

@@ -1,5 +1,5 @@
from agno.run.agent import RunOutput
from agno.team import Team
from agno.utils.log import log_info
from app.news import NewsAPIsTool, NEWS_INSTRUCTIONS
from app.social import SocialAPIsTool, SOCIAL_INSTRUCTIONS
@@ -10,98 +10,139 @@ from app.predictor import PredictorStyle, PredictorInput, PredictorOutput, PREDI
class Pipeline:
"""
Pipeline coordinata: esegue tutti gli agenti del Team, aggrega i risultati e invoca il Predictor.
Coordina gli agenti di servizio (Market, News, Social) e il Predictor finale.
Il Team è orchestrato da qwen3:latest (Ollama), mentre il Predictor è dinamico
e scelto dall'utente tramite i dropdown dell'interfaccia grafica.
"""
def __init__(self):
# Inizializza gli agenti
market_agent = AppModels.OLLAMA_QWEN_1B.get_agent(
self.market_agent = AppModels.OLLAMA_QWEN.get_agent(
instructions=MARKET_INSTRUCTIONS,
name="MarketAgent",
tools=[MarketAPIsTool()]
)
news_agent = AppModels.OLLAMA_QWEN_1B.get_agent(
self.news_agent = AppModels.OLLAMA_QWEN.get_agent(
instructions=NEWS_INSTRUCTIONS,
name="NewsAgent",
tools=[NewsAPIsTool()]
)
social_agent = AppModels.OLLAMA_QWEN_1B.get_agent(
self.social_agent = AppModels.OLLAMA_QWEN.get_agent(
instructions=SOCIAL_INSTRUCTIONS,
name="SocialAgent",
tools=[SocialAPIsTool()]
)
# Crea il Team
prompt = """
You are the coordinator of a team of analysts specialized in cryptocurrency market analysis.
Your role is to gather insights from various sources, including market data, news articles, and social media trends.
Based on the information provided by your team members, you will synthesize a comprehensive sentiment analysis for each cryptocurrency discussed.
Your analysis should consider the following aspects:
1. Market Trends: Evaluate the current market trends and price movements.
2. News Impact: Assess the impact of recent news articles on market sentiment.
3. Social Media Buzz: Analyze social media discussions and trends related to the cryptocurrencies.
Your final output should be a well-rounded sentiment analysis that can guide investment decisions.
""" # TODO migliorare il prompt
self.team = Team(
model = AppModels.OLLAMA_QWEN_1B.get_model(prompt),
name="CryptoAnalysisTeam",
members=[market_agent, news_agent, social_agent]
# === Modello di orchestrazione del Team ===
team_model = AppModels.OLLAMA_QWEN.get_model(
# TODO: migliorare le istruzioni del team
"Agisci come coordinatore: smista le richieste tra MarketAgent, NewsAgent e SocialAgent."
)
# Modelli disponibili e Predictor
# === Team ===
self.team = Team(
name="CryptoAnalysisTeam",
members=[self.market_agent, self.news_agent, self.social_agent],
model=team_model
)
# === Predictor ===
self.available_models = AppModels.availables()
self.predictor_model = self.available_models[0]
self.predictor = self.predictor_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type]
self.all_styles = list(PredictorStyle)
# Stili
self.styles = list(PredictorStyle)
self.style = self.styles[0]
# Scelte di default
self.chosen_model = self.available_models[0] if self.available_models else None
self.style = self.all_styles[0] if self.all_styles else None
self._init_predictor() # Inizializza il predictor con il modello di default
# ======================
# Dropdown handlers
# ======================
def choose_provider(self, index: int):
self.predictor_model = self.available_models[index]
self.predictor = self.predictor_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type]
"""
Sceglie il modello LLM da usare per il Predictor.
"""
self.chosen_model = self.available_models[index]
self._init_predictor()
def choose_style(self, index: int):
self.style = self.styles[index]
def interact(self, query: str) -> str:
"""
Esegue il Team (Market + News + Social), aggrega i risultati e invoca il Predictor.
Sceglie lo stile (conservativo/aggressivo) da usare per il Predictor.
"""
# Step 1: raccogli output del Team
team_results = self.team.run(query)
if isinstance(team_results, dict): # alcuni Team possono restituire dict
pieces = [str(v) for v in team_results.values()]
elif isinstance(team_results, list):
pieces = [str(r) for r in team_results]
else:
pieces = [str(team_results)]
aggregated_text = "\n\n".join(pieces)
self.style = self.all_styles[index]
# Step 2: prepara input per Predictor
predictor_input = PredictorInput(
data=[], # TODO: mappare meglio i dati di mercato in ProductInfo
style=self.style,
sentiment=aggregated_text
# ======================
# Helpers
# ======================
def _init_predictor(self):
"""
Inizializza (o reinizializza) il Predictor in base al modello scelto.
"""
if not self.chosen_model:
return
self.predictor = self.chosen_model.get_agent(
PREDICTOR_INSTRUCTIONS,
output=PredictorOutput, # type: ignore
)
def list_providers(self) -> list[str]:
"""
Restituisce la lista dei nomi dei modelli disponibili.
"""
return [model.name for model in self.available_models]
def list_styles(self) -> list[str]:
"""
Restituisce la lista degli stili di previsione disponibili.
"""
return [style.value for style in self.all_styles]
# ======================
# Core interaction
# ======================
def interact(self, query: str) -> str:
"""
1. Raccoglie output dai membri del Team
2. Aggrega output strutturati
3. Invoca Predictor
4. Restituisce la strategia finale
"""
if not self.predictor or not self.style:
return "⚠️ Devi prima selezionare un modello e una strategia validi dagli appositi menu."
# Step 1: raccolta output dai membri del Team
team_outputs = self.team.run(query)
# Step 2: aggregazione output strutturati
all_products = []
sentiments = []
for agent_output in team_outputs.member_responses:
if isinstance(agent_output, RunOutput):
if "products" in agent_output.metadata:
all_products.extend(agent_output.metadata["products"])
if "sentiment_news" in agent_output.metadata:
sentiments.append(agent_output.metadata["sentiment_news"])
if "sentiment_social" in agent_output.metadata:
sentiments.append(agent_output.metadata["sentiment_social"])
aggregated_sentiment = "\n".join(sentiments)
# Step 3: invocazione Predictor
predictor_input = PredictorInput(
data=all_products,
style=self.style,
sentiment=aggregated_sentiment
)
# Step 3: chiama Predictor
result = self.predictor.run(predictor_input)
prediction: PredictorOutput = result.content
# Step 4: formatta output finale
# Step 4: restituzione strategia finale
portfolio_lines = "\n".join(
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
)
output = (
return (
f"📊 Strategia ({self.style.value}): {prediction.strategy}\n\n"
f"💼 Portafoglio consigliato:\n{portfolio_lines}"
)
return output
def list_providers(self) -> list[str]:
return [m.name for m in self.available_models]
def list_styles(self) -> list[str]:
return [s.value for s in self.styles]

View File

@@ -4,7 +4,7 @@ from app.markets.base import ProductInfo
from app.models import AppModels
def unified_checks(model: AppModels, input):
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput)
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type]
result = llm.run(input)
content = result.content