Tool #15
79
src/app.py
79
src/app.py
@@ -1,48 +1,83 @@
|
|||||||
import gradio as gr
|
import gradio as gr
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
from app.pipeline import Pipeline
|
|
||||||
from agno.utils.log import log_info
|
from agno.utils.log import log_info
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
from app.chat_manager import ChatManager
|
||||||
|
|
||||||
########################################
|
########################################
|
||||||
# MAIN APP & GRADIO INTERFACE
|
# MAIN APP & GRADIO CHAT INTERFACE
|
||||||
########################################
|
########################################
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
######################################
|
# Carica variabili d’ambiente (.env)
|
||||||
# DA FARE PRIMA DI ESEGUIRE L'APP
|
|
||||||
# qui carichiamo le variabili d'ambiente dal file .env
|
|
||||||
# una volta fatto, possiamo usare le API keys senza problemi
|
|
||||||
# quindi non è necessario richiamare load_dotenv() altrove
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
######################################
|
|
||||||
|
|
||||||
pipeline = Pipeline()
|
# Inizializza ChatManager
|
||||||
|
chat = ChatManager()
|
||||||
|
|
||||||
|
########################################
|
||||||
|
# Funzioni Gradio
|
||||||
|
########################################
|
||||||
|
def respond(message, history):
|
||||||
|
response = chat.send_message(message)
|
||||||
|
history.append({"role": "user", "content": message})
|
||||||
|
history.append({"role": "assistant", "content": response})
|
||||||
|
return history, history, ""
|
||||||
|
|
||||||
|
def save_current_chat():
|
||||||
|
chat.save_chat("chat.json")
|
||||||
|
return "💾 Chat salvata in chat.json"
|
||||||
|
|
||||||
|
def load_previous_chat():
|
||||||
|
chat.load_chat("chat.json")
|
||||||
|
history = []
|
||||||
|
for m in chat.get_history():
|
||||||
|
history.append({"role": m["role"], "content": m["content"]})
|
||||||
|
return history, history
|
||||||
|
|
||||||
|
def reset_chat():
|
||||||
|
chat.reset_chat()
|
||||||
|
return [], []
|
||||||
|
|
||||||
|
########################################
|
||||||
|
# Interfaccia Gradio
|
||||||
|
########################################
|
||||||
with gr.Blocks() as demo:
|
with gr.Blocks() as demo:
|
||||||
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto")
|
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)")
|
||||||
|
|
||||||
|
# Dropdown provider e stile
|
||||||
with gr.Row():
|
with gr.Row():
|
||||||
provider = gr.Dropdown(
|
provider = gr.Dropdown(
|
||||||
choices=pipeline.list_providers(),
|
choices=chat.list_providers(),
|
||||||
type="index",
|
type="index",
|
||||||
label="Modello da usare"
|
label="Modello da usare"
|
||||||
)
|
)
|
||||||
provider.change(fn=pipeline.choose_provider, inputs=provider, outputs=None)
|
provider.change(fn=chat.choose_provider, inputs=provider, outputs=None)
|
||||||
|
|
||||||
style = gr.Dropdown(
|
style = gr.Dropdown(
|
||||||
choices=pipeline.list_styles(),
|
choices=chat.list_styles(),
|
||||||
type="index",
|
type="index",
|
||||||
label="Stile di investimento"
|
label="Stile di investimento"
|
||||||
)
|
)
|
||||||
style.change(fn=pipeline.choose_style, inputs=style, outputs=None)
|
style.change(fn=chat.choose_style, inputs=style, outputs=None)
|
||||||
|
|
||||||
user_input = gr.Textbox(label="Richiesta utente")
|
chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages")
|
||||||
output = gr.Textbox(label="Risultato analisi", lines=12)
|
msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?")
|
||||||
|
|
||||||
analyze_btn = gr.Button("🔎 Analizza")
|
with gr.Row():
|
||||||
analyze_btn.click(fn=pipeline.interact, inputs=[user_input], outputs=output)
|
clear_btn = gr.Button("🗑️ Reset Chat")
|
||||||
|
save_btn = gr.Button("💾 Salva Chat")
|
||||||
|
load_btn = gr.Button("📂 Carica Chat")
|
||||||
|
|
||||||
server, port = ("0.0.0.0", 8000) # 0.0.0.0 per docker compatibility
|
# Invio messaggio
|
||||||
|
msg.submit(respond, inputs=[msg, chatbot], outputs=[chatbot, chatbot, msg])
|
||||||
|
# Reset
|
||||||
|
clear_btn.click(reset_chat, inputs=None, outputs=[chatbot, chatbot])
|
||||||
|
# Salvataggio
|
||||||
|
save_btn.click(save_current_chat, inputs=None, outputs=None)
|
||||||
|
# Caricamento
|
||||||
|
load_btn.click(load_previous_chat, inputs=None, outputs=[chatbot, chatbot])
|
||||||
|
|
||||||
|
server, port = ("0.0.0.0", 8000)
|
||||||
server_log = "localhost" if server == "0.0.0.0" else server
|
server_log = "localhost" if server == "0.0.0.0" else server
|
||||||
log_info(f"Starting UPO AppAI on http://{server_log}:{port}")
|
log_info(f"Starting UPO AppAI Chat on http://{server_log}:{port}") # noqa
|
||||||
demo.launch(server_name=server, server_port=port, quiet=True)
|
demo.launch(server_name=server, server_port=port, quiet=True)
|
||||||
|
|||||||
78
src/app/chat_manager.py
Normal file
78
src/app/chat_manager.py
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
from typing import List, Dict
|
||||||
|
from app.pipeline import Pipeline
|
||||||
|
|
||||||
|
SAVE_DIR = os.path.join(os.path.dirname(__file__), "..", "saves")
|
||||||
|
os.makedirs(SAVE_DIR, exist_ok=True)
|
||||||
|
|
||||||
|
class ChatManager:
|
||||||
|
"""
|
||||||
|
Gestisce la conversazione con la Pipeline:
|
||||||
|
- mantiene lo storico dei messaggi
|
||||||
|
- invoca la Pipeline per generare risposte
|
||||||
|
- salva e ricarica le chat
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.pipeline = Pipeline()
|
||||||
|
self.history: List[Dict[str, str]] = [] # [{"role": "user"/"assistant", "content": "..."}]
|
||||||
|
|
||||||
|
def send_message(self, message: str) -> str:
|
||||||
|
"""
|
||||||
|
Aggiunge un messaggio utente, chiama la Pipeline e salva la risposta nello storico.
|
||||||
|
"""
|
||||||
|
# Aggiungi messaggio utente allo storico
|
||||||
|
self.history.append({"role": "user", "content": message})
|
||||||
|
|
||||||
|
# Pipeline elabora la query
|
||||||
|
response = self.pipeline.interact(message)
|
||||||
|
|
||||||
|
# Aggiungi risposta assistente allo storico
|
||||||
|
self.history.append({"role": "assistant", "content": response})
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def save_chat(self, filename: str = "chat.json") -> None:
|
||||||
|
"""
|
||||||
|
Salva la chat corrente in src/saves/<filename>.
|
||||||
|
"""
|
||||||
|
path = os.path.join(SAVE_DIR, filename)
|
||||||
|
with open(path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(self.history, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
|
def load_chat(self, filename: str = "chat.json") -> None:
|
||||||
|
"""
|
||||||
|
Carica una chat salvata da src/saves/<filename>.
|
||||||
|
"""
|
||||||
|
path = os.path.join(SAVE_DIR, filename)
|
||||||
|
if not os.path.exists(path):
|
||||||
|
self.history = []
|
||||||
|
return
|
||||||
|
with open(path, "r", encoding="utf-8") as f:
|
||||||
|
self.history = json.load(f)
|
||||||
|
|
||||||
|
def reset_chat(self) -> None:
|
||||||
|
"""
|
||||||
|
Resetta lo storico della chat.
|
||||||
|
"""
|
||||||
|
self.history = []
|
||||||
|
|
||||||
|
def get_history(self) -> List[Dict[str, str]]:
|
||||||
|
"""
|
||||||
|
Restituisce lo storico completo della chat.
|
||||||
|
"""
|
||||||
|
return self.history
|
||||||
|
|
||||||
|
# Facciamo pass-through di provider e style, così Gradio può usarli
|
||||||
|
def choose_provider(self, index: int):
|
||||||
|
self.pipeline.choose_provider(index)
|
||||||
|
|
||||||
|
def choose_style(self, index: int):
|
||||||
|
self.pipeline.choose_style(index)
|
||||||
|
|
||||||
|
def list_providers(self) -> List[str]:
|
||||||
|
return self.pipeline.list_providers()
|
||||||
|
|
||||||
|
def list_styles(self) -> List[str]:
|
||||||
|
return self.pipeline.list_styles()
|
||||||
@@ -7,7 +7,7 @@ from .binance import BinanceWrapper
|
|||||||
from .cryptocompare import CryptoCompareWrapper
|
from .cryptocompare import CryptoCompareWrapper
|
||||||
from .yfinance import YFinanceWrapper
|
from .yfinance import YFinanceWrapper
|
||||||
|
|
||||||
__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper" ]
|
__all__ = [ "MarketAPIsTool", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "MARKET_INSTRUCTIONS" ]
|
||||||
|
|
||||||
|
|
||||||
class MarketAPIsTool(BaseWrapper, Toolkit):
|
class MarketAPIsTool(BaseWrapper, Toolkit):
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
|
from agno.run.agent import RunOutput
|
||||||
from agno.team import Team
|
from agno.team import Team
|
||||||
from agno.utils.log import log_info
|
|
||||||
|
|
||||||
from app.news import NewsAPIsTool, NEWS_INSTRUCTIONS
|
from app.news import NewsAPIsTool, NEWS_INSTRUCTIONS
|
||||||
from app.social import SocialAPIsTool, SOCIAL_INSTRUCTIONS
|
from app.social import SocialAPIsTool, SOCIAL_INSTRUCTIONS
|
||||||
@@ -10,98 +10,139 @@ from app.predictor import PredictorStyle, PredictorInput, PredictorOutput, PREDI
|
|||||||
|
|
||||||
class Pipeline:
|
class Pipeline:
|
||||||
"""
|
"""
|
||||||
Pipeline coordinata: esegue tutti gli agenti del Team, aggrega i risultati e invoca il Predictor.
|
Coordina gli agenti di servizio (Market, News, Social) e il Predictor finale.
|
||||||
|
Il Team è orchestrato da qwen3:latest (Ollama), mentre il Predictor è dinamico
|
||||||
|
e scelto dall'utente tramite i dropdown dell'interfaccia grafica.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# Inizializza gli agenti
|
# Inizializza gli agenti
|
||||||
market_agent = AppModels.OLLAMA_QWEN_1B.get_agent(
|
self.market_agent = AppModels.OLLAMA_QWEN.get_agent(
|
||||||
instructions=MARKET_INSTRUCTIONS,
|
instructions=MARKET_INSTRUCTIONS,
|
||||||
name="MarketAgent",
|
name="MarketAgent",
|
||||||
tools=[MarketAPIsTool()]
|
tools=[MarketAPIsTool()]
|
||||||
)
|
)
|
||||||
news_agent = AppModels.OLLAMA_QWEN_1B.get_agent(
|
self.news_agent = AppModels.OLLAMA_QWEN.get_agent(
|
||||||
instructions=NEWS_INSTRUCTIONS,
|
instructions=NEWS_INSTRUCTIONS,
|
||||||
name="NewsAgent",
|
name="NewsAgent",
|
||||||
tools=[NewsAPIsTool()]
|
tools=[NewsAPIsTool()]
|
||||||
)
|
)
|
||||||
social_agent = AppModels.OLLAMA_QWEN_1B.get_agent(
|
self.social_agent = AppModels.OLLAMA_QWEN.get_agent(
|
||||||
instructions=SOCIAL_INSTRUCTIONS,
|
instructions=SOCIAL_INSTRUCTIONS,
|
||||||
name="SocialAgent",
|
name="SocialAgent",
|
||||||
tools=[SocialAPIsTool()]
|
tools=[SocialAPIsTool()]
|
||||||
)
|
)
|
||||||
|
|
||||||
# Crea il Team
|
# === Modello di orchestrazione del Team ===
|
||||||
prompt = """
|
team_model = AppModels.OLLAMA_QWEN.get_model(
|
||||||
You are the coordinator of a team of analysts specialized in cryptocurrency market analysis.
|
# TODO: migliorare le istruzioni del team
|
||||||
Your role is to gather insights from various sources, including market data, news articles, and social media trends.
|
"Agisci come coordinatore: smista le richieste tra MarketAgent, NewsAgent e SocialAgent."
|
||||||
Based on the information provided by your team members, you will synthesize a comprehensive sentiment analysis for each cryptocurrency discussed.
|
|
||||||
Your analysis should consider the following aspects:
|
|
||||||
1. Market Trends: Evaluate the current market trends and price movements.
|
|
||||||
2. News Impact: Assess the impact of recent news articles on market sentiment.
|
|
||||||
3. Social Media Buzz: Analyze social media discussions and trends related to the cryptocurrencies.
|
|
||||||
Your final output should be a well-rounded sentiment analysis that can guide investment decisions.
|
|
||||||
""" # TODO migliorare il prompt
|
|
||||||
self.team = Team(
|
|
||||||
model = AppModels.OLLAMA_QWEN_1B.get_model(prompt),
|
|
||||||
name="CryptoAnalysisTeam",
|
|
||||||
members=[market_agent, news_agent, social_agent]
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Modelli disponibili e Predictor
|
# === Team ===
|
||||||
|
self.team = Team(
|
||||||
|
name="CryptoAnalysisTeam",
|
||||||
|
members=[self.market_agent, self.news_agent, self.social_agent],
|
||||||
|
model=team_model
|
||||||
|
)
|
||||||
|
|
||||||
|
# === Predictor ===
|
||||||
self.available_models = AppModels.availables()
|
self.available_models = AppModels.availables()
|
||||||
self.predictor_model = self.available_models[0]
|
self.all_styles = list(PredictorStyle)
|
||||||
self.predictor = self.predictor_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type]
|
|
||||||
|
|
||||||
# Stili
|
# Scelte di default
|
||||||
self.styles = list(PredictorStyle)
|
self.chosen_model = self.available_models[0] if self.available_models else None
|
||||||
self.style = self.styles[0]
|
self.style = self.all_styles[0] if self.all_styles else None
|
||||||
|
|
||||||
|
self._init_predictor() # Inizializza il predictor con il modello di default
|
||||||
|
|
||||||
|
# ======================
|
||||||
|
# Dropdown handlers
|
||||||
|
# ======================
|
||||||
def choose_provider(self, index: int):
|
def choose_provider(self, index: int):
|
||||||
self.predictor_model = self.available_models[index]
|
"""
|
||||||
self.predictor = self.predictor_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type]
|
Sceglie il modello LLM da usare per il Predictor.
|
||||||
|
"""
|
||||||
|
self.chosen_model = self.available_models[index]
|
||||||
|
self._init_predictor()
|
||||||
|
|
||||||
def choose_style(self, index: int):
|
def choose_style(self, index: int):
|
||||||
self.style = self.styles[index]
|
|
||||||
|
|
||||||
def interact(self, query: str) -> str:
|
|
||||||
"""
|
"""
|
||||||
Esegue il Team (Market + News + Social), aggrega i risultati e invoca il Predictor.
|
Sceglie lo stile (conservativo/aggressivo) da usare per il Predictor.
|
||||||
"""
|
"""
|
||||||
# Step 1: raccogli output del Team
|
self.style = self.all_styles[index]
|
||||||
team_results = self.team.run(query)
|
|
||||||
if isinstance(team_results, dict): # alcuni Team possono restituire dict
|
|
||||||
pieces = [str(v) for v in team_results.values()]
|
|
||||||
elif isinstance(team_results, list):
|
|
||||||
pieces = [str(r) for r in team_results]
|
|
||||||
else:
|
|
||||||
pieces = [str(team_results)]
|
|
||||||
aggregated_text = "\n\n".join(pieces)
|
|
||||||
|
|
||||||
# Step 2: prepara input per Predictor
|
# ======================
|
||||||
predictor_input = PredictorInput(
|
# Helpers
|
||||||
data=[], # TODO: mappare meglio i dati di mercato in ProductInfo
|
# ======================
|
||||||
style=self.style,
|
def _init_predictor(self):
|
||||||
sentiment=aggregated_text
|
"""
|
||||||
|
Inizializza (o reinizializza) il Predictor in base al modello scelto.
|
||||||
|
"""
|
||||||
|
if not self.chosen_model:
|
||||||
|
return
|
||||||
|
self.predictor = self.chosen_model.get_agent(
|
||||||
|
PREDICTOR_INSTRUCTIONS,
|
||||||
|
output=PredictorOutput, # type: ignore
|
||||||
|
)
|
||||||
|
|
||||||
|
def list_providers(self) -> list[str]:
|
||||||
|
"""
|
||||||
|
Restituisce la lista dei nomi dei modelli disponibili.
|
||||||
|
"""
|
||||||
|
return [model.name for model in self.available_models]
|
||||||
|
|
||||||
|
def list_styles(self) -> list[str]:
|
||||||
|
"""
|
||||||
|
Restituisce la lista degli stili di previsione disponibili.
|
||||||
|
"""
|
||||||
|
return [style.value for style in self.all_styles]
|
||||||
|
|
||||||
|
# ======================
|
||||||
|
# Core interaction
|
||||||
|
# ======================
|
||||||
|
def interact(self, query: str) -> str:
|
||||||
|
"""
|
||||||
|
1. Raccoglie output dai membri del Team
|
||||||
|
2. Aggrega output strutturati
|
||||||
|
3. Invoca Predictor
|
||||||
|
4. Restituisce la strategia finale
|
||||||
|
"""
|
||||||
|
if not self.predictor or not self.style:
|
||||||
|
return "⚠️ Devi prima selezionare un modello e una strategia validi dagli appositi menu."
|
||||||
|
|
||||||
|
# Step 1: raccolta output dai membri del Team
|
||||||
|
team_outputs = self.team.run(query)
|
||||||
|
|
||||||
|
# Step 2: aggregazione output strutturati
|
||||||
|
all_products = []
|
||||||
|
sentiments = []
|
||||||
|
|
||||||
|
for agent_output in team_outputs.member_responses:
|
||||||
|
if isinstance(agent_output, RunOutput):
|
||||||
|
if "products" in agent_output.metadata:
|
||||||
|
all_products.extend(agent_output.metadata["products"])
|
||||||
|
if "sentiment_news" in agent_output.metadata:
|
||||||
|
sentiments.append(agent_output.metadata["sentiment_news"])
|
||||||
|
if "sentiment_social" in agent_output.metadata:
|
||||||
|
sentiments.append(agent_output.metadata["sentiment_social"])
|
||||||
|
|
||||||
|
aggregated_sentiment = "\n".join(sentiments)
|
||||||
|
|
||||||
|
# Step 3: invocazione Predictor
|
||||||
|
predictor_input = PredictorInput(
|
||||||
|
data=all_products,
|
||||||
|
style=self.style,
|
||||||
|
sentiment=aggregated_sentiment
|
||||||
)
|
)
|
||||||
|
|
||||||
# Step 3: chiama Predictor
|
|
||||||
result = self.predictor.run(predictor_input)
|
result = self.predictor.run(predictor_input)
|
||||||
prediction: PredictorOutput = result.content
|
prediction: PredictorOutput = result.content
|
||||||
|
|
||||||
# Step 4: formatta output finale
|
# Step 4: restituzione strategia finale
|
||||||
portfolio_lines = "\n".join(
|
portfolio_lines = "\n".join(
|
||||||
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
|
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
|
||||||
)
|
)
|
||||||
output = (
|
return (
|
||||||
f"📊 Strategia ({self.style.value}): {prediction.strategy}\n\n"
|
f"📊 Strategia ({self.style.value}): {prediction.strategy}\n\n"
|
||||||
f"💼 Portafoglio consigliato:\n{portfolio_lines}"
|
f"💼 Portafoglio consigliato:\n{portfolio_lines}"
|
||||||
)
|
)
|
||||||
|
|
||||||
return output
|
|
||||||
|
|
||||||
def list_providers(self) -> list[str]:
|
|
||||||
return [m.name for m in self.available_models]
|
|
||||||
|
|
||||||
def list_styles(self) -> list[str]:
|
|
||||||
return [s.value for s in self.styles]
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from app.markets.base import ProductInfo
|
|||||||
from app.models import AppModels
|
from app.models import AppModels
|
||||||
|
|
||||||
def unified_checks(model: AppModels, input):
|
def unified_checks(model: AppModels, input):
|
||||||
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput)
|
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type]
|
||||||
result = llm.run(input)
|
result = llm.run(input)
|
||||||
content = result.content
|
content = result.content
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user