diff --git a/src/app.py b/src/app.py index cf09fd5..65c22cc 100644 --- a/src/app.py +++ b/src/app.py @@ -1,48 +1,83 @@ import gradio as gr - -from dotenv import load_dotenv -from app.pipeline import Pipeline from agno.utils.log import log_info +from dotenv import load_dotenv + +from app.chat_manager import ChatManager ######################################## -# MAIN APP & GRADIO INTERFACE +# MAIN APP & GRADIO CHAT INTERFACE ######################################## if __name__ == "__main__": - ###################################### - # DA FARE PRIMA DI ESEGUIRE L'APP - # qui carichiamo le variabili d'ambiente dal file .env - # una volta fatto, possiamo usare le API keys senza problemi - # quindi non è necessario richiamare load_dotenv() altrove + # Carica variabili d’ambiente (.env) load_dotenv() - ###################################### - pipeline = Pipeline() + # Inizializza ChatManager + chat = ChatManager() + ######################################## + # Funzioni Gradio + ######################################## + def respond(message, history): + response = chat.send_message(message) + history.append({"role": "user", "content": message}) + history.append({"role": "assistant", "content": response}) + return history, history, "" + + def save_current_chat(): + chat.save_chat("chat.json") + return "💾 Chat salvata in chat.json" + + def load_previous_chat(): + chat.load_chat("chat.json") + history = [] + for m in chat.get_history(): + history.append({"role": m["role"], "content": m["content"]}) + return history, history + + def reset_chat(): + chat.reset_chat() + return [], [] + + ######################################## + # Interfaccia Gradio + ######################################## with gr.Blocks() as demo: - gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto") + gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)") + # Dropdown provider e stile with gr.Row(): provider = gr.Dropdown( - choices=pipeline.list_providers(), + choices=chat.list_providers(), type="index", label="Modello da usare" ) - provider.change(fn=pipeline.choose_provider, inputs=provider, outputs=None) + provider.change(fn=chat.choose_provider, inputs=provider, outputs=None) style = gr.Dropdown( - choices=pipeline.list_styles(), + choices=chat.list_styles(), type="index", label="Stile di investimento" ) - style.change(fn=pipeline.choose_style, inputs=style, outputs=None) + style.change(fn=chat.choose_style, inputs=style, outputs=None) - user_input = gr.Textbox(label="Richiesta utente") - output = gr.Textbox(label="Risultato analisi", lines=12) + chatbot = gr.Chatbot(label="Conversazione", height=500, type="messages") + msg = gr.Textbox(label="Scrivi la tua richiesta", placeholder="Es: Quali sono le crypto interessanti oggi?") - analyze_btn = gr.Button("🔎 Analizza") - analyze_btn.click(fn=pipeline.interact, inputs=[user_input], outputs=output) + with gr.Row(): + clear_btn = gr.Button("🗑️ Reset Chat") + save_btn = gr.Button("💾 Salva Chat") + load_btn = gr.Button("📂 Carica Chat") - server, port = ("0.0.0.0", 8000) # 0.0.0.0 per docker compatibility + # Invio messaggio + msg.submit(respond, inputs=[msg, chatbot], outputs=[chatbot, chatbot, msg]) + # Reset + clear_btn.click(reset_chat, inputs=None, outputs=[chatbot, chatbot]) + # Salvataggio + save_btn.click(save_current_chat, inputs=None, outputs=None) + # Caricamento + load_btn.click(load_previous_chat, inputs=None, outputs=[chatbot, chatbot]) + + server, port = ("0.0.0.0", 8000) server_log = "localhost" if server == "0.0.0.0" else server - log_info(f"Starting UPO AppAI on http://{server_log}:{port}") + log_info(f"Starting UPO AppAI Chat on http://{server_log}:{port}") # noqa demo.launch(server_name=server, server_port=port, quiet=True) diff --git a/src/app/chat_manager.py b/src/app/chat_manager.py new file mode 100644 index 0000000..7928c95 --- /dev/null +++ b/src/app/chat_manager.py @@ -0,0 +1,78 @@ +import os +import json +from typing import List, Dict +from app.pipeline import Pipeline + +SAVE_DIR = os.path.join(os.path.dirname(__file__), "..", "saves") +os.makedirs(SAVE_DIR, exist_ok=True) + +class ChatManager: + """ + Gestisce la conversazione con la Pipeline: + - mantiene lo storico dei messaggi + - invoca la Pipeline per generare risposte + - salva e ricarica le chat + """ + + def __init__(self): + self.pipeline = Pipeline() + self.history: List[Dict[str, str]] = [] # [{"role": "user"/"assistant", "content": "..."}] + + def send_message(self, message: str) -> str: + """ + Aggiunge un messaggio utente, chiama la Pipeline e salva la risposta nello storico. + """ + # Aggiungi messaggio utente allo storico + self.history.append({"role": "user", "content": message}) + + # Pipeline elabora la query + response = self.pipeline.interact(message) + + # Aggiungi risposta assistente allo storico + self.history.append({"role": "assistant", "content": response}) + + return response + + def save_chat(self, filename: str = "chat.json") -> None: + """ + Salva la chat corrente in src/saves/. + """ + path = os.path.join(SAVE_DIR, filename) + with open(path, "w", encoding="utf-8") as f: + json.dump(self.history, f, ensure_ascii=False, indent=2) + + def load_chat(self, filename: str = "chat.json") -> None: + """ + Carica una chat salvata da src/saves/. + """ + path = os.path.join(SAVE_DIR, filename) + if not os.path.exists(path): + self.history = [] + return + with open(path, "r", encoding="utf-8") as f: + self.history = json.load(f) + + def reset_chat(self) -> None: + """ + Resetta lo storico della chat. + """ + self.history = [] + + def get_history(self) -> List[Dict[str, str]]: + """ + Restituisce lo storico completo della chat. + """ + return self.history + + # Facciamo pass-through di provider e style, così Gradio può usarli + def choose_provider(self, index: int): + self.pipeline.choose_provider(index) + + def choose_style(self, index: int): + self.pipeline.choose_style(index) + + def list_providers(self) -> List[str]: + return self.pipeline.list_providers() + + def list_styles(self) -> List[str]: + return self.pipeline.list_styles() diff --git a/src/app/markets/__init__.py b/src/app/markets/__init__.py index b782b8f..ef73f68 100644 --- a/src/app/markets/__init__.py +++ b/src/app/markets/__init__.py @@ -7,7 +7,7 @@ from .binance import BinanceWrapper from .cryptocompare import CryptoCompareWrapper from .yfinance import YFinanceWrapper -__all__ = [ "MarketAPIs", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper" ] +__all__ = [ "MarketAPIsTool", "BinanceWrapper", "CoinBaseWrapper", "CryptoCompareWrapper", "YFinanceWrapper", "MARKET_INSTRUCTIONS" ] class MarketAPIsTool(BaseWrapper, Toolkit): diff --git a/src/app/pipeline.py b/src/app/pipeline.py index 10dddab..a7ae9d4 100644 --- a/src/app/pipeline.py +++ b/src/app/pipeline.py @@ -1,5 +1,5 @@ +from agno.run.agent import RunOutput from agno.team import Team -from agno.utils.log import log_info from app.news import NewsAPIsTool, NEWS_INSTRUCTIONS from app.social import SocialAPIsTool, SOCIAL_INSTRUCTIONS @@ -10,98 +10,139 @@ from app.predictor import PredictorStyle, PredictorInput, PredictorOutput, PREDI class Pipeline: """ - Pipeline coordinata: esegue tutti gli agenti del Team, aggrega i risultati e invoca il Predictor. + Coordina gli agenti di servizio (Market, News, Social) e il Predictor finale. + Il Team è orchestrato da qwen3:latest (Ollama), mentre il Predictor è dinamico + e scelto dall'utente tramite i dropdown dell'interfaccia grafica. """ - def __init__(self): # Inizializza gli agenti - market_agent = AppModels.OLLAMA_QWEN_1B.get_agent( + self.market_agent = AppModels.OLLAMA_QWEN.get_agent( instructions=MARKET_INSTRUCTIONS, name="MarketAgent", tools=[MarketAPIsTool()] ) - news_agent = AppModels.OLLAMA_QWEN_1B.get_agent( + self.news_agent = AppModels.OLLAMA_QWEN.get_agent( instructions=NEWS_INSTRUCTIONS, name="NewsAgent", tools=[NewsAPIsTool()] ) - social_agent = AppModels.OLLAMA_QWEN_1B.get_agent( + self.social_agent = AppModels.OLLAMA_QWEN.get_agent( instructions=SOCIAL_INSTRUCTIONS, name="SocialAgent", tools=[SocialAPIsTool()] ) - # Crea il Team - prompt = """ - You are the coordinator of a team of analysts specialized in cryptocurrency market analysis. - Your role is to gather insights from various sources, including market data, news articles, and social media trends. - Based on the information provided by your team members, you will synthesize a comprehensive sentiment analysis for each cryptocurrency discussed. - Your analysis should consider the following aspects: - 1. Market Trends: Evaluate the current market trends and price movements. - 2. News Impact: Assess the impact of recent news articles on market sentiment. - 3. Social Media Buzz: Analyze social media discussions and trends related to the cryptocurrencies. - Your final output should be a well-rounded sentiment analysis that can guide investment decisions. - """ # TODO migliorare il prompt - self.team = Team( - model = AppModels.OLLAMA_QWEN_1B.get_model(prompt), - name="CryptoAnalysisTeam", - members=[market_agent, news_agent, social_agent] + # === Modello di orchestrazione del Team === + team_model = AppModels.OLLAMA_QWEN.get_model( + # TODO: migliorare le istruzioni del team + "Agisci come coordinatore: smista le richieste tra MarketAgent, NewsAgent e SocialAgent." ) - # Modelli disponibili e Predictor + # === Team === + self.team = Team( + name="CryptoAnalysisTeam", + members=[self.market_agent, self.news_agent, self.social_agent], + model=team_model + ) + + # === Predictor === self.available_models = AppModels.availables() - self.predictor_model = self.available_models[0] - self.predictor = self.predictor_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type] + self.all_styles = list(PredictorStyle) - # Stili - self.styles = list(PredictorStyle) - self.style = self.styles[0] + # Scelte di default + self.chosen_model = self.available_models[0] if self.available_models else None + self.style = self.all_styles[0] if self.all_styles else None + self._init_predictor() # Inizializza il predictor con il modello di default + + # ====================== + # Dropdown handlers + # ====================== def choose_provider(self, index: int): - self.predictor_model = self.available_models[index] - self.predictor = self.predictor_model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type] + """ + Sceglie il modello LLM da usare per il Predictor. + """ + self.chosen_model = self.available_models[index] + self._init_predictor() def choose_style(self, index: int): - self.style = self.styles[index] - - def interact(self, query: str) -> str: """ - Esegue il Team (Market + News + Social), aggrega i risultati e invoca il Predictor. + Sceglie lo stile (conservativo/aggressivo) da usare per il Predictor. """ - # Step 1: raccogli output del Team - team_results = self.team.run(query) - if isinstance(team_results, dict): # alcuni Team possono restituire dict - pieces = [str(v) for v in team_results.values()] - elif isinstance(team_results, list): - pieces = [str(r) for r in team_results] - else: - pieces = [str(team_results)] - aggregated_text = "\n\n".join(pieces) + self.style = self.all_styles[index] - # Step 2: prepara input per Predictor - predictor_input = PredictorInput( - data=[], # TODO: mappare meglio i dati di mercato in ProductInfo - style=self.style, - sentiment=aggregated_text + # ====================== + # Helpers + # ====================== + def _init_predictor(self): + """ + Inizializza (o reinizializza) il Predictor in base al modello scelto. + """ + if not self.chosen_model: + return + self.predictor = self.chosen_model.get_agent( + PREDICTOR_INSTRUCTIONS, + output=PredictorOutput, # type: ignore + ) + + def list_providers(self) -> list[str]: + """ + Restituisce la lista dei nomi dei modelli disponibili. + """ + return [model.name for model in self.available_models] + + def list_styles(self) -> list[str]: + """ + Restituisce la lista degli stili di previsione disponibili. + """ + return [style.value for style in self.all_styles] + + # ====================== + # Core interaction + # ====================== + def interact(self, query: str) -> str: + """ + 1. Raccoglie output dai membri del Team + 2. Aggrega output strutturati + 3. Invoca Predictor + 4. Restituisce la strategia finale + """ + if not self.predictor or not self.style: + return "⚠️ Devi prima selezionare un modello e una strategia validi dagli appositi menu." + + # Step 1: raccolta output dai membri del Team + team_outputs = self.team.run(query) + + # Step 2: aggregazione output strutturati + all_products = [] + sentiments = [] + + for agent_output in team_outputs.member_responses: + if isinstance(agent_output, RunOutput): + if "products" in agent_output.metadata: + all_products.extend(agent_output.metadata["products"]) + if "sentiment_news" in agent_output.metadata: + sentiments.append(agent_output.metadata["sentiment_news"]) + if "sentiment_social" in agent_output.metadata: + sentiments.append(agent_output.metadata["sentiment_social"]) + + aggregated_sentiment = "\n".join(sentiments) + + # Step 3: invocazione Predictor + predictor_input = PredictorInput( + data=all_products, + style=self.style, + sentiment=aggregated_sentiment ) - # Step 3: chiama Predictor result = self.predictor.run(predictor_input) prediction: PredictorOutput = result.content - # Step 4: formatta output finale + # Step 4: restituzione strategia finale portfolio_lines = "\n".join( [f"{item.asset} ({item.percentage}%): {item.motivation}" for item in prediction.portfolio] ) - output = ( + return ( f"📊 Strategia ({self.style.value}): {prediction.strategy}\n\n" f"💼 Portafoglio consigliato:\n{portfolio_lines}" ) - - return output - - def list_providers(self) -> list[str]: - return [m.name for m in self.available_models] - - def list_styles(self) -> list[str]: - return [s.value for s in self.styles] diff --git a/tests/agents/test_predictor.py b/tests/agents/test_predictor.py index 9f28717..5867938 100644 --- a/tests/agents/test_predictor.py +++ b/tests/agents/test_predictor.py @@ -4,7 +4,7 @@ from app.markets.base import ProductInfo from app.models import AppModels def unified_checks(model: AppModels, input): - llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) + llm = model.get_agent(PREDICTOR_INSTRUCTIONS, output=PredictorOutput) # type: ignore[arg-type] result = llm.run(input) content = result.content