Tool #15

Merged
trojanhorse47 merged 7 commits from tool into main 2025-10-03 11:42:11 +02:00
5 changed files with 252 additions and 87 deletions
Showing only changes of commit 6e1c11f6aa - Show all commits

View File

@@ -1,47 +1,82 @@
import gradio as gr import gradio as gr
from dotenv import load_dotenv
from app.pipeline import Pipeline
from agno.utils.log import log_info from agno.utils.log import log_info
from dotenv import load_dotenv
from app.chat_manager import ChatManager
######################################## ########################################
# MAIN APP & GRADIO INTERFACE # MAIN APP & GRADIO CHAT INTERFACE
######################################## ########################################
if __name__ == "__main__": if __name__ == "__main__":
###################################### # Carica variabili dambiente (.env)
# DA FARE PRIMA DI ESEGUIRE L'APP
# qui carichiamo le variabili d'ambiente dal file .env
# una volta fatto, possiamo usare le API keys senza problemi
# quindi non è necessario richiamare load_dotenv() altrove
load_dotenv() load_dotenv()
######################################
pipeline = Pipeline() # Inizializza ChatManager
chat = ChatManager()
########################################
# Funzioni Gradio
########################################
def respond(message, history):
response = chat.send_message(message)
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": response})
return history, history
def save_current_chat():
chat.save_chat("chat.json")
return "💾 Chat salvata in chat.json"
def load_previous_chat():
chat.load_chat("chat.json")
history = []
for m in chat.get_history():
history.append({"role": m["role"], "content": m["content"]})
return history, history
def reset_chat():
chat.reset_chat()
return [], []
########################################
# Interfaccia Gradio
########################################
with gr.Blocks() as demo: with gr.Blocks() as demo:
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto") gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)")
# Dropdown provider e stile
with gr.Row(): with gr.Row():
provider = gr.Dropdown( provider = gr.Dropdown(
choices=pipeline.list_providers(), choices=chat.list_providers(),
type="index", type="index",
label="Modello da usare" label="Modello da usare"
) )
provider.change(fn=pipeline.choose_provider, inputs=provider, outputs=None) provider.change(fn=chat.choose_provider, inputs=provider, outputs=None)
style = gr.Dropdown( style = gr.Dropdown(
choices=pipeline.list_styles(), choices=chat.list_styles(),
type="index", type="index",
label="Stile di investimento" label="Stile di investimento"
) )
style.change(fn=pipeline.choose_style, inputs=style, outputs=None) style.change(fn=chat.choose_style, inputs=style, outputs=None)
user_input = gr.Textbox(label="Richiesta utente") chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages")
output = gr.Textbox(label="Risultato analisi", lines=12) msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?")
analyze_btn = gr.Button("🔎 Analizza") with gr.Row():
analyze_btn.click(fn=pipeline.interact, inputs=[user_input], outputs=output) clear_btn = gr.Button("🗑️ Reset Chat")
save_btn = gr.Button("💾 Salva Chat")
load_btn = gr.Button("📂 Carica Chat")
server, port = ("0.0.0.0", 8000) # Invio messaggio
log_info(f"Starting UPO AppAI on http://{server}:{port}") msg.submit(respond, inputs=[msg, chatbot], outputs=[chatbot, chatbot])
# Reset
clear_btn.click(reset_chat, inputs=None, outputs=[chatbot, chatbot])
# Salvataggio
save_btn.click(save_current_chat, inputs=None, outputs=None)
# Caricamento
load_btn.click(load_previous_chat, inputs=None, outputs=[chatbot, chatbot])
server, port = ("127.0.0.1", 8000)
log_info(f"Starting UPO AppAI Chat on http://{server}:{port}")
demo.launch(server_name=server, server_port=port, quiet=True) demo.launch(server_name=server, server_port=port, quiet=True)

78
src/app/chat_manager.py Normal file
View File

@@ -0,0 +1,78 @@
import os
import json
from typing import List, Dict
from src.app.pipeline import Pipeline
SAVE_DIR = os.path.join(os.path.dirname(__file__), "..", "saves")
os.makedirs(SAVE_DIR, exist_ok=True)
class ChatManager:
"""
Gestisce la conversazione con la Pipeline:
- mantiene lo storico dei messaggi
- invoca la Pipeline per generare risposte
- salva e ricarica le chat
"""
def __init__(self):
self.pipeline = Pipeline()
self.history: List[Dict[str, str]] = [] # [{"role": "user"/"assistant", "content": "..."}]
def send_message(self, message: str) -> str:
"""
Aggiunge un messaggio utente, chiama la Pipeline e salva la risposta nello storico.
"""
# Aggiungi messaggio utente allo storico
self.history.append({"role": "user", "content": message})
# Pipeline elabora la query
response = self.pipeline.interact(message)
# Aggiungi risposta assistente allo storico
self.history.append({"role": "assistant", "content": response})
return response
def save_chat(self, filename: str = "chat.json") -> None:
"""
Salva la chat corrente in src/saves/<filename>.
"""
path = os.path.join(SAVE_DIR, filename)
with open(path, "w", encoding="utf-8") as f:
json.dump(self.history, f, ensure_ascii=False, indent=2)
def load_chat(self, filename: str = "chat.json") -> None:
"""
Carica una chat salvata da src/saves/<filename>.
"""
path = os.path.join(SAVE_DIR, filename)
if not os.path.exists(path):
self.history = []
return
with open(path, "r", encoding="utf-8") as f:
self.history = json.load(f)
def reset_chat(self) -> None:
"""
Resetta lo storico della chat.
"""
self.history = []
def get_history(self) -> List[Dict[str, str]]:
"""
Restituisce lo storico completo della chat.
"""
return self.history
# Facciamo pass-through di provider e style, così Gradio può usarli
def choose_provider(self, index: int):
self.pipeline.choose_provider(index)
def choose_style(self, index: int):
self.pipeline.choose_style(index)
def list_providers(self) -> List[str]:
return self.pipeline.list_providers()
def list_styles(self) -> List[str]:
return self.pipeline.list_styles()

View File

@@ -1,9 +1,9 @@
from base import BaseWrapper from .base import BaseWrapper
from app.markets.coinbase import CoinBaseWrapper from .coinbase import CoinBaseWrapper
from app.markets.cryptocompare import CryptoCompareWrapper from .cryptocompare import CryptoCompareWrapper
from app.markets.binance import BinanceWrapper from .binance import BinanceWrapper
from app.markets.binance_public import PublicBinanceAgent from .binance_public import PublicBinanceAgent
from app.markets.error_handler import ProviderFallback, MarketAPIError, safe_execute from .error_handler import ProviderFallback, MarketAPIError, safe_execute
from agno.utils.log import log_warning from agno.utils.log import log_warning
import logging import logging

View File

@@ -1,84 +1,136 @@
from typing import List from agno.run.agent import RunOutput
from agno.team import Team from agno.team import Team
from agno.utils.log import log_info
from app.agents.market_agent import MarketAgent from src.app.agents.market_agent import MarketAgent
from src.app.agents.news_agent import NewsAgent from src.app.agents.news_agent import NewsAgent
from src.app.agents.social_agent import SocialAgent from src.app.agents.social_agent import SocialAgent
from src.app.markets import MarketAPIs
from src.app.models import AppModels from src.app.models import AppModels
from src.app.predictor import PredictorStyle, PredictorInput, PredictorOutput, PREDICTOR_INSTRUCTIONS from src.app.predictor import PredictorInput, PredictorOutput, PredictorStyle, PREDICTOR_INSTRUCTIONS
class Pipeline: class Pipeline:
""" """
Pipeline coordinata: esegue tutti gli agenti del Team, aggrega i risultati e invoca il Predictor. Coordina gli agenti di servizio (Market, News, Social) e il Predictor finale.
Il Team è orchestrato da qwen3:latest (Ollama), mentre il Predictor è dinamico
e scelto dall'utente tramite i dropdown dell'interfaccia.
""" """
def __init__(self): def __init__(self):
# Inizializza gli agenti # === Membri del team ===
self.market_agent = MarketAgent() self.market_agent = MarketAgent()
self.news_agent = NewsAgent() self.news_agent = NewsAgent()
self.social_agent = SocialAgent() self.social_agent = SocialAgent()
# Crea il Team # === Modello di orchestrazione del Team ===
self.team = Team(name="CryptoAnalysisTeam", members=[self.market_agent, self.news_agent, self.social_agent]) team_model = AppModels.OLLAMA_QWEN.get_model(
# TODO: migliorare le istruzioni del team
# Modelli disponibili e Predictor "Agisci come coordinatore: smista le richieste tra MarketAgent, NewsAgent e SocialAgent."
self.available_models = AppModels.availables() )
self.predictor_model = self.available_models[0]
self.predictor = self.predictor_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type] # === Team ===
self.team = Team(
# Stili name="CryptoAnalysisTeam",
self.styles = list(PredictorStyle) members=[self.market_agent, self.news_agent, self.social_agent],
self.style = self.styles[0] model=team_model
)
def choose_provider(self, index: int):
self.predictor_model = self.available_models[index] # === Predictor ===
self.predictor = self.predictor_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type] self.available_models = AppModels.availables()
self.all_styles = list(PredictorStyle)
def choose_style(self, index: int):
self.style = self.styles[index] # Scelte di default
self.chosen_model = self.available_models[0] if self.available_models else None
def interact(self, query: str) -> str: self.style = self.all_styles[0] if self.all_styles else None
"""
Esegue il Team (Market + News + Social), aggrega i risultati e invoca il Predictor. self._init_predictor() # Inizializza il predictor con il modello di default
"""
# Step 1: raccogli output del Team # ======================
team_results = self.team.run(query) # Dropdown handlers
if isinstance(team_results, dict): # alcuni Team possono restituire dict # ======================
pieces = [str(v) for v in team_results.values()] def choose_provider(self, index: int):
elif isinstance(team_results, list): """
pieces = [str(r) for r in team_results] Sceglie il modello LLM da usare per il Predictor.
else: """
pieces = [str(team_results)] self.chosen_model = self.available_models[index]
aggregated_text = "\n\n".join(pieces) self._init_predictor()
# Step 2: prepara input per Predictor def choose_style(self, index: int):
predictor_input = PredictorInput( """
data=[], # TODO: mappare meglio i dati di mercato in ProductInfo Sceglie lo stile (conservativo/aggressivo) da usare per il Predictor.
style=self.style, """
sentiment=aggregated_text self.style = self.all_styles[index]
# ======================
# Helpers
# ======================
def _init_predictor(self):
"""
Inizializza (o reinizializza) il Predictor in base al modello scelto.
"""
if not self.chosen_model:
return
self.predictor = self.chosen_model.get_agent(
PREDICTOR_INSTRUCTIONS,
output=PredictorOutput, # type: ignore
)
def list_providers(self) -> list[str]:
"""
Restituisce la lista dei nomi dei modelli disponibili.
"""
return [model.name for model in self.available_models]
def list_styles(self) -> list[str]:
"""
Restituisce la lista degli stili di previsione disponibili.
"""
return [style.value for style in self.all_styles]
# ======================
# Core interaction
# ======================
def interact(self, query: str) -> str:
"""
1. Raccoglie output dai membri del Team
2. Aggrega output strutturati
3. Invoca Predictor
4. Restituisce la strategia finale
"""
if not self.predictor or not self.style:
return "⚠️ Devi prima selezionare un modello e una strategia validi dagli appositi menu."
# Step 1: raccolta output dai membri del Team
team_outputs = self.team.run(query)
# Step 2: aggregazione output strutturati
all_products = []
sentiments = []
for agent_output in team_outputs.member_responses:
if isinstance(agent_output, RunOutput):
if "products" in agent_output.metadata:
all_products.extend(agent_output.metadata["products"])
if "sentiment_news" in agent_output.metadata:
sentiments.append(agent_output.metadata["sentiment_news"])
if "sentiment_social" in agent_output.metadata:
sentiments.append(agent_output.metadata["sentiment_social"])
aggregated_sentiment = "\n".join(sentiments)
# Step 3: invocazione Predictor
predictor_input = PredictorInput(
data=all_products,
style=self.style,
sentiment=aggregated_sentiment
) )
# Step 3: chiama Predictor
result = self.predictor.run(predictor_input) result = self.predictor.run(predictor_input)
prediction: PredictorOutput = result.content prediction: PredictorOutput = result.content
# Step 4: formatta output finale # Step 4: restituzione strategia finale
portfolio_lines = "\n".join( portfolio_lines = "\n".join(
[f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio] [f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio]
) )
output = ( return (
f"📊 Strategia ({self.style.value}): {prediction.strategy}\n\n" f"📊 Strategia ({self.style.value}): {prediction.strategy}\n\n"
f"💼 Portafoglio consigliato:\n{portfolio_lines}" f"💼 Portafoglio consigliato:\n{portfolio_lines}"
) )
return output
def list_providers(self) -> List[str]:
return [m.name for m in self.available_models]
def list_styles(self) -> List[str]:
return [s.value for s in self.styles]

View File

@@ -4,7 +4,7 @@ from app.markets.base import ProductInfo
from app.models import AppModels from app.models import AppModels
def unified_checks(model: AppModels, input): def unified_checks(model: AppModels, input):
llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type]
result = llm.run(input) result = llm.run(input)
content = result.content content = result.content