Update chat interface #70
37
src/app/agents/action_registry.py
Normal file
@@ -0,0 +1,37 @@
|
||||
from typing import Any, Callable
|
||||
|
||||
|
||||
# Registro centrale popolato da tutti i file Toolkit all'avvio.
|
||||
ACTION_DESCRIPTIONS: dict[str, str] = {}
|
||||
|
|
||||
|
||||
def get_user_friendly_action(tool_name: str) -> str:
|
||||
"""
|
||||
Restituisce un messaggio leggibile e descrittivo per l'utente
|
||||
leggendo dal registro globale.
|
||||
"""
|
||||
# Usa il dizionario ACTION_DESCRIPTIONS importato
|
||||
return ACTION_DESCRIPTIONS.get(tool_name, f"⚙️ Eseguo l'operazione: {tool_name}...")
|
||||
|
||||
def friendly_action(description: str) -> Callable[..., Any]:
|
||||
"""
|
||||
Decoratore che registra automaticamente la descrizione "user-friendly"
|
||||
di un metodo nel registro globale.
|
||||
|
||||
Questo decoratore viene eseguito all'avvio dell'app (quando i file
|
||||
vengono importati) e popola il dizionario ACTION_DESCRIPTIONS.
|
||||
|
||||
Restituisce la funzione originale non modificata.
|
||||
"""
|
||||
|
||||
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
|
||||
# Registra l'azione
|
||||
tool_name = func.__name__
|
||||
if tool_name in ACTION_DESCRIPTIONS:
|
||||
print(f"Attenzione: Azione '{tool_name}' registrata più volte.")
|
||||
|
||||
ACTION_DESCRIPTIONS[tool_name] = description
|
||||
|
||||
# Restituisce la funzione originale
|
||||
return func
|
||||
|
||||
return decorator
|
||||
@@ -45,28 +45,28 @@ class PipelineInputs:
|
||||
"""
|
||||
Sceglie il modello LLM da usare per l'analizzatore di query.
|
||||
"""
|
||||
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||
self.query_analyzer_model = self.configs.models.all_models[index]
|
||||
|
||||
def choose_team_leader(self, index: int):
|
||||
"""
|
||||
Sceglie il modello LLM da usare per il Team Leader.
|
||||
"""
|
||||
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||
self.team_leader_model = self.configs.models.all_models[index]
|
||||
|
||||
def choose_team(self, index: int):
|
||||
"""
|
||||
Sceglie il modello LLM da usare per il Team.
|
||||
"""
|
||||
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||
self.team_model = self.configs.models.all_models[index]
|
||||
|
||||
def choose_report_generator(self, index: int):
|
||||
"""
|
||||
Sceglie il modello LLM da usare per il generatore di report.
|
||||
"""
|
||||
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||
self.report_generation_model = self.configs.models.all_models[index]
|
||||
|
||||
def choose_strategy(self, index: int):
|
||||
@@ -145,21 +145,31 @@ class RunMessage:
|
||||
- In esecuzione (➡️)
|
||||
- Completato (✅)
|
||||
|
||||
Lo stato di esecuzione può essere assegnato solo ad uno step alla volta.
|
||||
Lo stato di esecuzione può essere assegnato solo a uno step alla volta.
|
||||
Args:
|
||||
inputs (PipelineInputs): Input della pipeline per mostrare la configurazione.
|
||||
prefix (str, optional): Prefisso del messaggio. Defaults to "".
|
||||
suffix (str, optional): Suffisso del messaggio. Defaults to "".
|
||||
inputs (PipelineInputs): Input della pipeline per mostrare la configurazione
|
||||
prefix (str, optional): Prefisso del messaggio. Defaults to ""
|
||||
suffix (str, optional): Suffisso del messaggio. Defaults to ""
|
||||
"""
|
||||
self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n"
|
||||
self.emojis = ['🔳', '➡️', '✅']
|
||||
self.placeholder = '<<<>>>'
|
||||
self.current = 0
|
||||
self.steps_total = [
|
||||
(f"{self.placeholder} Query Check", 1),
|
||||
(f"{self.placeholder} Info Recovery", 0),
|
||||
(f"{self.placeholder} Report Generation", 0),
|
||||
]
|
||||
self.steps_total: list[tuple[str, int]] = []
|
||||
self.set_steps(["Query Check", "Info Recovery", "Report Generation"])
|
||||
|
||||
def set_steps(self, steps: list[str]) -> 'RunMessage':
|
||||
"""
|
||||
Inizializza gli step di esecuzione con lo stato iniziale.
|
||||
Args:
|
||||
steps (list[str]): Lista degli step da includere nel messaggio.
|
||||
Returns:
|
||||
RunMessage: L'istanza aggiornata di RunMessage.
|
||||
"""
|
||||
self.steps_total = [(f"{self.placeholder} {step}", 0) for step in steps]
|
||||
self.steps_total[0] = (self.steps_total[0][0], 1) # Primo step in esecuzione
|
||||
self.current = 0
|
||||
return self
|
||||
|
||||
def update(self) -> 'RunMessage':
|
||||
"""
|
||||
@@ -176,15 +186,15 @@ class RunMessage:
|
||||
self.steps_total[self.current] = (text_curr, state_curr + 1)
|
||||
return self
|
||||
|
||||
def update_step(self, text_extra: str = "") -> 'RunMessage':
|
||||
def update_step_with_tool(self, tool_used: str = "") -> 'RunMessage':
|
||||
"""
|
||||
Aggiorna il messaggio per lo step corrente.
|
||||
Args:
|
||||
text_extra (str, optional): Testo aggiuntivo da includere nello step. Defaults to "".
|
||||
tool_used (str, optional): Testo aggiuntivo da includere nello step. Defaults to "".
|
||||
"""
|
||||
text_curr, state_curr = self.steps_total[self.current]
|
||||
if text_extra:
|
||||
text_curr = f"{text_curr.replace('╚', '╠')}\n╚═ {text_extra}"
|
||||
if tool_used:
|
||||
text_curr = f"{text_curr.replace('╚', '╠')}\n╚═ {tool_used}"
|
||||
self.steps_total[self.current] = (text_curr, state_curr)
|
||||
return self
|
||||
|
||||
@@ -196,3 +206,4 @@ class RunMessage:
|
||||
"""
|
||||
steps = [msg.replace(self.placeholder, self.emojis[state]) for msg, state in self.steps_total]
|
||||
return self.base_message + "\n".join(steps)
|
||||
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import asyncio
|
||||
from enum import Enum
|
||||
import logging
|
||||
import random
|
||||
from typing import Any, Callable
|
||||
from typing import Any, AsyncGenerator, Callable
|
||||
from agno.agent import RunEvent
|
||||
from agno.run.workflow import WorkflowRunEvent
|
||||
from agno.workflow.types import StepInput, StepOutput
|
||||
@@ -13,28 +12,34 @@ from app.agents.core import *
|
||||
logging = logging.getLogger("pipeline")
|
||||
|
||||
|
||||
|
||||
class PipelineEvent(str, Enum):
|
||||
QUERY_CHECK = "Query Check"
|
||||
QUERY_ANALYZER = "Query Analyzer"
|
||||
QUERY_CHECK_END = "Query Check End"
|
||||
INFO_RECOVERY = "Info Recovery"
|
||||
INFO_RECOVERY_END = "Info Recovery End"
|
||||
REPORT_GENERATION = "Report Generation"
|
||||
REPORT_TRANSLATION = "Report Translation"
|
||||
RUN_FINISHED = WorkflowRunEvent.workflow_completed.value
|
||||
TOOL_USED = RunEvent.tool_call_completed.value
|
||||
REPORT_GENERATION_END = "Report Generation End"
|
||||
TOOL_USED = RunEvent.tool_call_started.value
|
||||
TOOL_USED_END = RunEvent.tool_call_completed.value
|
||||
RUN_END = WorkflowRunEvent.workflow_completed.value
|
||||
|
||||
def check_event(self, event: str, step_name: str) -> bool:
|
||||
return event == self.value or (WorkflowRunEvent.step_completed == event and step_name == self.value)
|
||||
if event == self.value:
|
||||
return True
|
||||
|
||||
index = self.value.rfind(" End")
|
||||
value = self.value[:index] if index > -1 else self.value
|
||||
step_state = WorkflowRunEvent.step_completed if index > -1 else WorkflowRunEvent.step_started
|
||||
return step_name == value and step_state == event
|
||||
|
||||
@classmethod
|
||||
def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], None]]]:
|
||||
def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], str | None]]]:
|
||||
return [
|
||||
(PipelineEvent.QUERY_CHECK, lambda _: logging.info(f"[{run_id}] Query Check completed.")),
|
||||
(PipelineEvent.QUERY_ANALYZER, lambda _: logging.info(f"[{run_id}] Query Analyzer completed.")),
|
||||
(PipelineEvent.INFO_RECOVERY, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")),
|
||||
(PipelineEvent.REPORT_GENERATION, lambda _: logging.info(f"[{run_id}] Report Generation completed.")),
|
||||
(PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")),
|
||||
(PipelineEvent.RUN_FINISHED, lambda _: logging.info(f"[{run_id}] Run completed.")),
|
||||
(PipelineEvent.QUERY_CHECK_END, lambda _: logging.info(f"[{run_id}] Query Check completed.")),
|
||||
(PipelineEvent.INFO_RECOVERY_END, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")),
|
||||
(PipelineEvent.REPORT_GENERATION_END, lambda _: logging.info(f"[{run_id}] Report Generation completed.")),
|
||||
(PipelineEvent.TOOL_USED_END, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")),
|
||||
(PipelineEvent.RUN_END, lambda _: logging.info(f"[{run_id}] Run completed.")),
|
||||
]
|
||||
|
||||
|
||||
@@ -53,7 +58,7 @@ class Pipeline:
|
||||
"""
|
||||
self.inputs = inputs
|
||||
|
||||
def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str:
|
||||
async def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> str:
|
||||
"""
|
||||
Esegue la pipeline di agenti per rispondere alla query dell'utente.
|
||||
Args:
|
||||
@@ -61,9 +66,12 @@ class Pipeline:
|
||||
Returns:
|
||||
La risposta generata dalla pipeline.
|
||||
"""
|
||||
return asyncio.run(self.interact_async(listeners))
|
||||
response = ""
|
||||
async for chunk in self.interact_stream(listeners):
|
||||
response = chunk
|
||||
return response
|
||||
|
||||
async def interact_async(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str:
|
||||
async def interact_stream(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> AsyncGenerator[str, None]:
|
||||
"""
|
||||
Versione asincrona che esegue la pipeline di agenti per rispondere alla query dell'utente.
|
||||
Args:
|
||||
@@ -81,9 +89,8 @@ class Pipeline:
|
||||
)
|
||||
|
||||
workflow = self.build_workflow()
|
||||
result = await self.run(workflow, query, events=events)
|
||||
return result
|
||||
|
||||
async for item in self.run_stream(workflow, query, events=events):
|
||||
yield item
|
||||
|
||||
|
[nitpick] The comment describes the function as 'ESEGUE (yield)' which is inconsistent with the style used in similar comments. The word 'ESEGUE' (executes) appears to be emphasized but doesn't align well with the yield concept. Consider clarifying that it 'yields' or 'streams' intermediate results and the final response. [nitpick] The comment describes the function as 'ESEGUE (yield)' which is inconsistent with the style used in similar comments. The word 'ESEGUE' (executes) appears to be emphasized but doesn't align well with the yield concept. Consider clarifying that it 'yields' or 'streams' intermediate results and the final response.
```suggestion
Versione asincrona in streaming che restituisce (yield) gli aggiornamenti di stato intermedi
e il risultato finale della pipeline.
```
|
||||
def build_workflow(self) -> Workflow:
|
||||
"""
|
||||
@@ -99,7 +106,8 @@ class Pipeline:
|
||||
# Step 2: Crea gli steps
|
||||
def condition_query_ok(step_input: StepInput) -> StepOutput:
|
||||
val = step_input.previous_step_content
|
||||
return StepOutput(stop=not val.is_crypto) if isinstance(val, QueryOutputs) else StepOutput(stop=True)
|
||||
stop = (not val.is_crypto) if isinstance(val, QueryOutputs) else True
|
||||
return StepOutput(stop=stop)
|
||||
|
||||
query_check = Step(name=PipelineEvent.QUERY_CHECK, agent=query_check)
|
||||
info_recovery = Step(name=PipelineEvent.INFO_RECOVERY, team=team)
|
||||
@@ -114,33 +122,39 @@ class Pipeline:
|
||||
])
|
||||
|
||||
@classmethod
|
||||
async def run(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str:
|
||||
async def run_stream(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], str | None]]]) -> AsyncGenerator[str, None]:
|
||||
"""
|
||||
Esegue il workflow e gestisce gli eventi tramite le callback fornite.
|
||||
Esegue il workflow e restituisce gli eventi di stato e il risultato finale.
|
||||
Args:
|
||||
workflow: istanza di Workflow da eseguire
|
||||
query: query dell'utente da passare al workflow
|
||||
events: dizionario di callback per eventi specifici (opzionale)
|
||||
Returns:
|
||||
La risposta generata dal workflow.
|
||||
workflow: L'istanza di Workflow da eseguire
|
||||
query: Gli input della query
|
||||
events: La lista di eventi e callback da gestire durante l'esecuzione.
|
||||
Yields:
|
||||
Aggiornamenti di stato e la risposta finale generata dal workflow.
|
||||
"""
|
||||
iterator = await workflow.arun(query, stream=True, stream_intermediate_steps=True)
|
||||
|
||||
content = None
|
||||
|
||||
async for event in iterator:
|
||||
step_name = getattr(event, 'step_name', '')
|
||||
|
||||
# Chiama i listeners (se presenti) per ogni evento
|
||||
for app_event, listener in events:
|
||||
if app_event.check_event(event.event, step_name):
|
||||
|
The ellipsis '...' at the end of the string is inconsistent with other yield statements. Line 196 has three dots while lines 181, 183, 185, and 194 use '...' (ellipsis character). Consider using the ellipsis character for consistency. The ellipsis '...' at the end of the string is inconsistent with other yield statements. Line 196 has three dots while lines 181, 183, 185, and 194 use '...' (ellipsis character). Consider using the ellipsis character for consistency.
```suggestion
yield f"Sto usando uno strumento sconosciuto…"
```
[nitpick] The default error message 'Errore durante l'esecuzione del workflow.' may not be reached in practice since the stream should always yield at least one item (the error message from line 213). This initialization could be misleading. Consider using a more descriptive default or documenting why this fallback exists. [nitpick] The default error message 'Errore durante l'esecuzione del workflow.' may not be reached in practice since the stream should always yield at least one item (the error message from line 213). This initialization could be misleading. Consider using a more descriptive default or documenting why this fallback exists.
```suggestion
# Fallback: if the workflow yields no results, return a descriptive error.
final_result = "[Pipeline Error] Nessun risultato prodotto dal workflow. (Fallback: run_stream non ha generato output)"
```
|
||||
listener(event)
|
||||
if event.event == WorkflowRunEvent.step_completed:
|
||||
update = listener(event)
|
||||
if update: yield update
|
||||
|
||||
# Salva il contenuto finale quando uno step è completato
|
||||
if event.event == WorkflowRunEvent.step_completed.value:
|
||||
content = getattr(event, 'content', '')
|
||||
|
||||
# Restituisce la risposta finale
|
||||
if content and isinstance(content, str):
|
||||
think_str = "</think>"
|
||||
think = content.rfind(think_str)
|
||||
return content[(think + len(think_str)):] if think != -1 else content
|
||||
if content and isinstance(content, QueryOutputs):
|
||||
return content.response
|
||||
|
||||
logging.error(f"No output from workflow: {content}")
|
||||
return "No output from workflow, something went wrong."
|
||||
yield content[(think + len(think_str)):] if think != -1 else content
|
||||
elif content and isinstance(content, QueryOutputs):
|
||||
yield content.response
|
||||
else:
|
||||
logging.error(f"No output from workflow: {content}")
|
||||
yield "Nessun output dal workflow, qualcosa è andato storto."
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
from agno.tools import Toolkit
|
||||
|
||||
from app.agents.action_registry import friendly_action
|
||||
from app.api.tools.instructions import MARKET_TOOL_INSTRUCTIONS
|
||||
from app.api.wrapper_handler import WrapperHandler
|
||||
from app.api.core.markets import MarketWrapper, Price, ProductInfo
|
||||
@@ -40,6 +42,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
|
||||
],
|
||||
)
|
||||
|
||||
@friendly_action("🔍 Recupero le informazioni sul prodotto richiesto...")
|
||||
def get_product(self, asset_id: str) -> ProductInfo:
|
||||
"""
|
||||
Gets product information for a *single* asset from the *first available* provider.
|
||||
@@ -56,6 +59,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
|
||||
"""
|
||||
return self.handler.try_call(lambda w: w.get_product(asset_id))
|
||||
|
||||
@friendly_action("📦 Recupero i dati su più asset...")
|
||||
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
|
||||
"""
|
||||
Gets product information for a *list* of assets from the *first available* provider.
|
||||
@@ -72,6 +76,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
|
||||
"""
|
||||
return self.handler.try_call(lambda w: w.get_products(asset_ids))
|
||||
|
||||
@friendly_action("📊 Recupero i dati storici dei prezzi...")
|
||||
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
|
||||
"""
|
||||
Gets historical price data for a *single* asset from the *first available* provider.
|
||||
@@ -89,6 +94,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
|
||||
"""
|
||||
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
|
||||
|
||||
@friendly_action("🧩 Aggrego le informazioni da più fonti...")
|
||||
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
|
||||
"""
|
||||
Gets product information for multiple assets from *all available providers* and *aggregates* the results.
|
||||
@@ -109,6 +115,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
|
||||
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
|
||||
return ProductInfo.aggregate(all_products)
|
||||
|
||||
@friendly_action("📈 Creo uno storico aggregato dei prezzi...")
|
||||
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
|
||||
"""
|
||||
Gets historical price data for a single asset from *all available providers* and *aggregates* the results.
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
from agno.tools import Toolkit
|
||||
|
||||
from app.agents.action_registry import friendly_action
|
||||
from app.api.tools.instructions import NEWS_TOOL_INSTRUCTIONS
|
||||
from app.api.wrapper_handler import WrapperHandler
|
||||
from app.api.core.news import NewsWrapper, Article
|
||||
@@ -42,6 +44,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
|
||||
],
|
||||
)
|
||||
|
||||
@friendly_action("📰 Cerco le notizie principali...")
|
||||
def get_top_headlines(self, limit: int = 100) -> list[Article]:
|
||||
"""
|
||||
Retrieves top headlines from the *first available* news provider.
|
||||
@@ -58,6 +61,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
|
||||
"""
|
||||
return self.handler.try_call(lambda w: w.get_top_headlines(limit))
|
||||
|
||||
@friendly_action("🔎 Cerco notizie recenti sull'argomento...")
|
||||
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
|
||||
"""
|
||||
Searches for the latest news on a specific topic from the *first available* provider.
|
||||
@@ -75,6 +79,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
|
||||
"""
|
||||
return self.handler.try_call(lambda w: w.get_latest_news(query, limit))
|
||||
|
||||
@friendly_action("🗞️ Raccolgo le notizie principali da tutte le fonti...")
|
||||
def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]:
|
||||
"""
|
||||
Retrieves top headlines from *all available providers* and aggregates the results.
|
||||
@@ -94,6 +99,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
|
||||
"""
|
||||
return self.handler.try_call_all(lambda w: w.get_top_headlines(limit))
|
||||
|
||||
@friendly_action("📚 Raccolgo notizie specifiche da tutte le fonti...")
|
||||
def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]:
|
||||
"""
|
||||
Searches for news on a specific topic from *all available providers* and aggregates the results.
|
||||
|
||||
@@ -14,6 +14,7 @@ class PlanMemoryTool(Toolkit):
|
||||
|
||||
def __init__(self):
|
||||
self.tasks: list[Task] = []
|
||||
|
||||
Toolkit.__init__(self, # type: ignore[call-arg]
|
||||
name="Plan Memory Toolkit",
|
||||
instructions=PLAN_MEMORY_TOOL_INSTRUCTIONS,
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
from agno.tools import Toolkit
|
||||
|
||||
from app.agents.action_registry import friendly_action
|
||||
from app.api.tools.instructions import SOCIAL_TOOL_INSTRUCTIONS
|
||||
from app.api.wrapper_handler import WrapperHandler
|
||||
from app.api.core.social import SocialPost, SocialWrapper
|
||||
@@ -41,6 +43,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit):
|
||||
],
|
||||
)
|
||||
|
||||
@friendly_action("📱 Cerco i post più popolari sui social...")
|
||||
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
|
||||
"""
|
||||
Retrieves top cryptocurrency-related posts from the *first available* social media provider.
|
||||
@@ -57,6 +60,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit):
|
||||
"""
|
||||
return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit))
|
||||
|
||||
@friendly_action("🌐 Raccolgo i post da tutte le piattaforme social...")
|
||||
def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]:
|
||||
"""
|
||||
Retrieves top cryptocurrency-related posts from *all available providers* and aggregates the results.
|
||||
|
||||
@@ -22,6 +22,7 @@ class CryptoSymbolsTools(Toolkit):
|
||||
def __init__(self, cache_file: str = 'resources/cryptos.csv'):
|
||||
self.cache_file = cache_file
|
||||
self.final_table = pd.read_csv(self.cache_file) if os.path.exists(self.cache_file) else pd.DataFrame() # type: ignore
|
||||
|
||||
Toolkit.__init__(self, # type: ignore
|
||||
name="Crypto Symbols Tool",
|
||||
instructions=SYMBOLS_TOOL_INSTRUCTIONS,
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import os
|
||||
import json
|
||||
from typing import Any, Callable
|
||||
import gradio as gr
|
||||
from app.agents.pipeline import Pipeline, PipelineInputs
|
||||
from app.agents.action_registry import get_user_friendly_action
|
||||
from app.agents.pipeline import Pipeline, PipelineEvent, PipelineInputs
|
||||
|
||||
|
||||
class ChatManager:
|
||||
@@ -49,13 +51,28 @@ class ChatManager:
|
||||
########################################
|
||||
# Funzioni Gradio
|
||||
########################################
|
||||
def gradio_respond(self, message: str, history: list[tuple[str, str]]) -> str:
|
||||
async def gradio_respond(self, message: str, history: list[tuple[str, str]]):
|
||||
"""
|
||||
Versione asincrona in streaming.
|
||||
Produce (yield) aggiornamenti di stato e la risposta finale.
|
||||
"""
|
||||
self.inputs.user_query = message
|
||||
pipeline = Pipeline(self.inputs)
|
||||
response = pipeline.interact()
|
||||
listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = [ # type: ignore
|
||||
(PipelineEvent.QUERY_CHECK, lambda _: "🔍 Sto controllando la tua richiesta..."),
|
||||
(PipelineEvent.INFO_RECOVERY, lambda _: "📊 Sto recuperando i dati (mercato, news, social)..."),
|
||||
(PipelineEvent.REPORT_GENERATION, lambda _: "✍️ Sto scrivendo il report finale..."),
|
||||
(PipelineEvent.TOOL_USED, lambda e: get_user_friendly_action(e.tool.tool_name))
|
||||
]
|
||||
|
||||
self.history.append((message, response))
|
||||
return response
|
||||
response = None
|
||||
async for chunk in pipeline.interact_stream(listeners=listeners):
|
||||
response = chunk # Salva l'ultimo chunk (che sarà la risposta finale)
|
||||
yield response # Restituisce l'aggiornamento (o la risposta finale) a Gradio
|
||||
|
||||
# Dopo che il generatore è completo, salva l'ultima risposta nello storico
|
||||
if response:
|
||||
self.history.append((message, response))
|
||||
|
||||
def gradio_save(self) -> str:
|
||||
self.save_chat("chat.json")
|
||||
@@ -72,7 +89,7 @@ class ChatManager:
|
||||
|
||||
|
||||
def gradio_build_interface(self) -> gr.Blocks:
|
||||
with gr.Blocks() as interface:
|
||||
with gr.Blocks(fill_height=True, fill_width=True) as interface:
|
||||
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)")
|
||||
|
||||
# --- Prepara le etichette di default per i dropdown
|
||||
|
||||
@@ -271,7 +271,7 @@ class TelegramApp:
|
||||
await bot.delete_message(chat_id=chat_id, message_id=update.message.id)
|
||||
|
||||
def update_user(update_step: str = "") -> None:
|
||||
if update_step: run_message.update_step(update_step)
|
||||
if update_step: run_message.update_step_with_tool(update_step)
|
||||
else: run_message.update()
|
||||
|
||||
message = run_message.get_latest()
|
||||
@@ -280,11 +280,11 @@ class TelegramApp:
|
||||
|
||||
await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
|
||||
pipeline = Pipeline(inputs)
|
||||
report_content = await pipeline.interact_async(listeners=[
|
||||
(PipelineEvent.QUERY_CHECK, lambda _: update_user()),
|
||||
(PipelineEvent.TOOL_USED, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))),
|
||||
(PipelineEvent.INFO_RECOVERY, lambda _: update_user()),
|
||||
(PipelineEvent.REPORT_GENERATION, lambda _: update_user()),
|
||||
report_content = await pipeline.interact(listeners=[
|
||||
(PipelineEvent.QUERY_CHECK_END, lambda _: update_user()),
|
||||
(PipelineEvent.TOOL_USED_END, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))),
|
||||
(PipelineEvent.INFO_RECOVERY_END, lambda _: update_user()),
|
||||
(PipelineEvent.REPORT_GENERATION_END, lambda _: update_user()),
|
||||
])
|
||||
|
||||
# attach report file to the message
|
||||
|
||||
Sarebbe più corretto metterlo dentro il file core.py dato che ci sono tutte le interazioni core con la pipeline.
O ancor meglio che sia un decorator da mettere su ogni funzione di cui si vuole avere una descrizione e che aggiorna il registro, che sia dentro la classe RunMessage o che sia libero
Stavo tentando di farne un decorator, ma sfruttando il decorator @tool di agno che però sminchiava il Toolkit. Alla fine ho fatto questo accrocchio che non mi soddisfa in pieno, ma non avevo più tempo e funziona.
Non chiudere ancora la pull request che domani provo a fare un decoratore custom così evitiamo quel registro che ho improvvisato