Update chat interface #70
37
src/app/agents/action_registry.py
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
from typing import Any, Callable
|
||||||
|
|
||||||
|
|
||||||
|
# Registro centrale popolato da tutti i file Toolkit all'avvio.
|
||||||
|
ACTION_DESCRIPTIONS: dict[str, str] = {}
|
||||||
|
|
|||||||
|
|
||||||
|
def get_user_friendly_action(tool_name: str) -> str:
|
||||||
|
"""
|
||||||
|
Restituisce un messaggio leggibile e descrittivo per l'utente
|
||||||
|
leggendo dal registro globale.
|
||||||
|
"""
|
||||||
|
# Usa il dizionario ACTION_DESCRIPTIONS importato
|
||||||
|
return ACTION_DESCRIPTIONS.get(tool_name, f"⚙️ Eseguo l'operazione: {tool_name}...")
|
||||||
|
|
||||||
|
def friendly_action(description: str) -> Callable[..., Any]:
|
||||||
|
"""
|
||||||
|
Decoratore che registra automaticamente la descrizione "user-friendly"
|
||||||
|
di un metodo nel registro globale.
|
||||||
|
|
||||||
|
Questo decoratore viene eseguito all'avvio dell'app (quando i file
|
||||||
|
vengono importati) e popola il dizionario ACTION_DESCRIPTIONS.
|
||||||
|
|
||||||
|
Restituisce la funzione originale non modificata.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
|
||||||
|
# Registra l'azione
|
||||||
|
tool_name = func.__name__
|
||||||
|
if tool_name in ACTION_DESCRIPTIONS:
|
||||||
|
print(f"Attenzione: Azione '{tool_name}' registrata più volte.")
|
||||||
|
|
||||||
|
ACTION_DESCRIPTIONS[tool_name] = description
|
||||||
|
|
||||||
|
# Restituisce la funzione originale
|
||||||
|
return func
|
||||||
|
|
||||||
|
return decorator
|
||||||
@@ -45,28 +45,28 @@ class PipelineInputs:
|
|||||||
"""
|
"""
|
||||||
Sceglie il modello LLM da usare per l'analizzatore di query.
|
Sceglie il modello LLM da usare per l'analizzatore di query.
|
||||||
"""
|
"""
|
||||||
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||||
self.query_analyzer_model = self.configs.models.all_models[index]
|
self.query_analyzer_model = self.configs.models.all_models[index]
|
||||||
|
|
||||||
def choose_team_leader(self, index: int):
|
def choose_team_leader(self, index: int):
|
||||||
"""
|
"""
|
||||||
Sceglie il modello LLM da usare per il Team Leader.
|
Sceglie il modello LLM da usare per il Team Leader.
|
||||||
"""
|
"""
|
||||||
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||||
self.team_leader_model = self.configs.models.all_models[index]
|
self.team_leader_model = self.configs.models.all_models[index]
|
||||||
|
|
||||||
def choose_team(self, index: int):
|
def choose_team(self, index: int):
|
||||||
"""
|
"""
|
||||||
Sceglie il modello LLM da usare per il Team.
|
Sceglie il modello LLM da usare per il Team.
|
||||||
"""
|
"""
|
||||||
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||||
self.team_model = self.configs.models.all_models[index]
|
self.team_model = self.configs.models.all_models[index]
|
||||||
|
|
||||||
def choose_report_generator(self, index: int):
|
def choose_report_generator(self, index: int):
|
||||||
"""
|
"""
|
||||||
Sceglie il modello LLM da usare per il generatore di report.
|
Sceglie il modello LLM da usare per il generatore di report.
|
||||||
"""
|
"""
|
||||||
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
|
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
|
||||||
self.report_generation_model = self.configs.models.all_models[index]
|
self.report_generation_model = self.configs.models.all_models[index]
|
||||||
|
|
||||||
def choose_strategy(self, index: int):
|
def choose_strategy(self, index: int):
|
||||||
@@ -145,21 +145,31 @@ class RunMessage:
|
|||||||
- In esecuzione (➡️)
|
- In esecuzione (➡️)
|
||||||
- Completato (✅)
|
- Completato (✅)
|
||||||
|
|
||||||
Lo stato di esecuzione può essere assegnato solo ad uno step alla volta.
|
Lo stato di esecuzione può essere assegnato solo a uno step alla volta.
|
||||||
Args:
|
Args:
|
||||||
inputs (PipelineInputs): Input della pipeline per mostrare la configurazione.
|
inputs (PipelineInputs): Input della pipeline per mostrare la configurazione
|
||||||
prefix (str, optional): Prefisso del messaggio. Defaults to "".
|
prefix (str, optional): Prefisso del messaggio. Defaults to ""
|
||||||
suffix (str, optional): Suffisso del messaggio. Defaults to "".
|
suffix (str, optional): Suffisso del messaggio. Defaults to ""
|
||||||
"""
|
"""
|
||||||
self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n"
|
self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n"
|
||||||
self.emojis = ['🔳', '➡️', '✅']
|
self.emojis = ['🔳', '➡️', '✅']
|
||||||
self.placeholder = '<<<>>>'
|
self.placeholder = '<<<>>>'
|
||||||
self.current = 0
|
self.current = 0
|
||||||
self.steps_total = [
|
self.steps_total: list[tuple[str, int]] = []
|
||||||
(f"{self.placeholder} Query Check", 1),
|
self.set_steps(["Query Check", "Info Recovery", "Report Generation"])
|
||||||
(f"{self.placeholder} Info Recovery", 0),
|
|
||||||
(f"{self.placeholder} Report Generation", 0),
|
def set_steps(self, steps: list[str]) -> 'RunMessage':
|
||||||
]
|
"""
|
||||||
|
Inizializza gli step di esecuzione con lo stato iniziale.
|
||||||
|
Args:
|
||||||
|
steps (list[str]): Lista degli step da includere nel messaggio.
|
||||||
|
Returns:
|
||||||
|
RunMessage: L'istanza aggiornata di RunMessage.
|
||||||
|
"""
|
||||||
|
self.steps_total = [(f"{self.placeholder} {step}", 0) for step in steps]
|
||||||
|
self.steps_total[0] = (self.steps_total[0][0], 1) # Primo step in esecuzione
|
||||||
|
self.current = 0
|
||||||
|
return self
|
||||||
|
|
||||||
def update(self) -> 'RunMessage':
|
def update(self) -> 'RunMessage':
|
||||||
"""
|
"""
|
||||||
@@ -176,15 +186,15 @@ class RunMessage:
|
|||||||
self.steps_total[self.current] = (text_curr, state_curr + 1)
|
self.steps_total[self.current] = (text_curr, state_curr + 1)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def update_step(self, text_extra: str = "") -> 'RunMessage':
|
def update_step_with_tool(self, tool_used: str = "") -> 'RunMessage':
|
||||||
"""
|
"""
|
||||||
Aggiorna il messaggio per lo step corrente.
|
Aggiorna il messaggio per lo step corrente.
|
||||||
Args:
|
Args:
|
||||||
text_extra (str, optional): Testo aggiuntivo da includere nello step. Defaults to "".
|
tool_used (str, optional): Testo aggiuntivo da includere nello step. Defaults to "".
|
||||||
"""
|
"""
|
||||||
text_curr, state_curr = self.steps_total[self.current]
|
text_curr, state_curr = self.steps_total[self.current]
|
||||||
if text_extra:
|
if tool_used:
|
||||||
text_curr = f"{text_curr.replace('╚', '╠')}\n╚═ {text_extra}"
|
text_curr = f"{text_curr.replace('╚', '╠')}\n╚═ {tool_used}"
|
||||||
self.steps_total[self.current] = (text_curr, state_curr)
|
self.steps_total[self.current] = (text_curr, state_curr)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
@@ -196,3 +206,4 @@ class RunMessage:
|
|||||||
"""
|
"""
|
||||||
steps = [msg.replace(self.placeholder, self.emojis[state]) for msg, state in self.steps_total]
|
steps = [msg.replace(self.placeholder, self.emojis[state]) for msg, state in self.steps_total]
|
||||||
return self.base_message + "\n".join(steps)
|
return self.base_message + "\n".join(steps)
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,7 @@
|
|||||||
import asyncio
|
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
from typing import Any, Callable
|
from typing import Any, AsyncGenerator, Callable
|
||||||
from agno.agent import RunEvent
|
from agno.agent import RunEvent
|
||||||
from agno.run.workflow import WorkflowRunEvent
|
from agno.run.workflow import WorkflowRunEvent
|
||||||
from agno.workflow.types import StepInput, StepOutput
|
from agno.workflow.types import StepInput, StepOutput
|
||||||
@@ -13,28 +12,34 @@ from app.agents.core import *
|
|||||||
logging = logging.getLogger("pipeline")
|
logging = logging.getLogger("pipeline")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class PipelineEvent(str, Enum):
|
class PipelineEvent(str, Enum):
|
||||||
QUERY_CHECK = "Query Check"
|
QUERY_CHECK = "Query Check"
|
||||||
QUERY_ANALYZER = "Query Analyzer"
|
QUERY_CHECK_END = "Query Check End"
|
||||||
INFO_RECOVERY = "Info Recovery"
|
INFO_RECOVERY = "Info Recovery"
|
||||||
|
INFO_RECOVERY_END = "Info Recovery End"
|
||||||
REPORT_GENERATION = "Report Generation"
|
REPORT_GENERATION = "Report Generation"
|
||||||
REPORT_TRANSLATION = "Report Translation"
|
REPORT_GENERATION_END = "Report Generation End"
|
||||||
RUN_FINISHED = WorkflowRunEvent.workflow_completed.value
|
TOOL_USED = RunEvent.tool_call_started.value
|
||||||
TOOL_USED = RunEvent.tool_call_completed.value
|
TOOL_USED_END = RunEvent.tool_call_completed.value
|
||||||
|
RUN_END = WorkflowRunEvent.workflow_completed.value
|
||||||
|
|
||||||
def check_event(self, event: str, step_name: str) -> bool:
|
def check_event(self, event: str, step_name: str) -> bool:
|
||||||
return event == self.value or (WorkflowRunEvent.step_completed == event and step_name == self.value)
|
if event == self.value:
|
||||||
|
return True
|
||||||
|
|
||||||
|
index = self.value.rfind(" End")
|
||||||
|
value = self.value[:index] if index > -1 else self.value
|
||||||
|
step_state = WorkflowRunEvent.step_completed if index > -1 else WorkflowRunEvent.step_started
|
||||||
|
return step_name == value and step_state == event
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], None]]]:
|
def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], str | None]]]:
|
||||||
return [
|
return [
|
||||||
(PipelineEvent.QUERY_CHECK, lambda _: logging.info(f"[{run_id}] Query Check completed.")),
|
(PipelineEvent.QUERY_CHECK_END, lambda _: logging.info(f"[{run_id}] Query Check completed.")),
|
||||||
(PipelineEvent.QUERY_ANALYZER, lambda _: logging.info(f"[{run_id}] Query Analyzer completed.")),
|
(PipelineEvent.INFO_RECOVERY_END, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")),
|
||||||
(PipelineEvent.INFO_RECOVERY, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")),
|
(PipelineEvent.REPORT_GENERATION_END, lambda _: logging.info(f"[{run_id}] Report Generation completed.")),
|
||||||
(PipelineEvent.REPORT_GENERATION, lambda _: logging.info(f"[{run_id}] Report Generation completed.")),
|
(PipelineEvent.TOOL_USED_END, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")),
|
||||||
(PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")),
|
(PipelineEvent.RUN_END, lambda _: logging.info(f"[{run_id}] Run completed.")),
|
||||||
(PipelineEvent.RUN_FINISHED, lambda _: logging.info(f"[{run_id}] Run completed.")),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
@@ -53,7 +58,7 @@ class Pipeline:
|
|||||||
"""
|
"""
|
||||||
self.inputs = inputs
|
self.inputs = inputs
|
||||||
|
|
||||||
def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str:
|
async def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> str:
|
||||||
"""
|
"""
|
||||||
Esegue la pipeline di agenti per rispondere alla query dell'utente.
|
Esegue la pipeline di agenti per rispondere alla query dell'utente.
|
||||||
Args:
|
Args:
|
||||||
@@ -61,9 +66,12 @@ class Pipeline:
|
|||||||
Returns:
|
Returns:
|
||||||
La risposta generata dalla pipeline.
|
La risposta generata dalla pipeline.
|
||||||
"""
|
"""
|
||||||
return asyncio.run(self.interact_async(listeners))
|
response = ""
|
||||||
|
async for chunk in self.interact_stream(listeners):
|
||||||
|
response = chunk
|
||||||
|
return response
|
||||||
|
|
||||||
async def interact_async(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str:
|
async def interact_stream(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> AsyncGenerator[str, None]:
|
||||||
"""
|
"""
|
||||||
Versione asincrona che esegue la pipeline di agenti per rispondere alla query dell'utente.
|
Versione asincrona che esegue la pipeline di agenti per rispondere alla query dell'utente.
|
||||||
Args:
|
Args:
|
||||||
@@ -81,9 +89,8 @@ class Pipeline:
|
|||||||
)
|
)
|
||||||
|
|
||||||
workflow = self.build_workflow()
|
workflow = self.build_workflow()
|
||||||
result = await self.run(workflow, query, events=events)
|
async for item in self.run_stream(workflow, query, events=events):
|
||||||
return result
|
yield item
|
||||||
|
|
||||||
|
|
||||||
|
[nitpick] The comment describes the function as 'ESEGUE (yield)' which is inconsistent with the style used in similar comments. The word 'ESEGUE' (executes) appears to be emphasized but doesn't align well with the yield concept. Consider clarifying that it 'yields' or 'streams' intermediate results and the final response. [nitpick] The comment describes the function as 'ESEGUE (yield)' which is inconsistent with the style used in similar comments. The word 'ESEGUE' (executes) appears to be emphasized but doesn't align well with the yield concept. Consider clarifying that it 'yields' or 'streams' intermediate results and the final response.
```suggestion
Versione asincrona in streaming che restituisce (yield) gli aggiornamenti di stato intermedi
e il risultato finale della pipeline.
```
|
|||||||
def build_workflow(self) -> Workflow:
|
def build_workflow(self) -> Workflow:
|
||||||
"""
|
"""
|
||||||
@@ -99,7 +106,8 @@ class Pipeline:
|
|||||||
# Step 2: Crea gli steps
|
# Step 2: Crea gli steps
|
||||||
def condition_query_ok(step_input: StepInput) -> StepOutput:
|
def condition_query_ok(step_input: StepInput) -> StepOutput:
|
||||||
val = step_input.previous_step_content
|
val = step_input.previous_step_content
|
||||||
return StepOutput(stop=not val.is_crypto) if isinstance(val, QueryOutputs) else StepOutput(stop=True)
|
stop = (not val.is_crypto) if isinstance(val, QueryOutputs) else True
|
||||||
|
return StepOutput(stop=stop)
|
||||||
|
|
||||||
query_check = Step(name=PipelineEvent.QUERY_CHECK, agent=query_check)
|
query_check = Step(name=PipelineEvent.QUERY_CHECK, agent=query_check)
|
||||||
info_recovery = Step(name=PipelineEvent.INFO_RECOVERY, team=team)
|
info_recovery = Step(name=PipelineEvent.INFO_RECOVERY, team=team)
|
||||||
@@ -114,33 +122,39 @@ class Pipeline:
|
|||||||
])
|
])
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def run(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str:
|
async def run_stream(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], str | None]]]) -> AsyncGenerator[str, None]:
|
||||||
"""
|
"""
|
||||||
Esegue il workflow e gestisce gli eventi tramite le callback fornite.
|
Esegue il workflow e restituisce gli eventi di stato e il risultato finale.
|
||||||
Args:
|
Args:
|
||||||
workflow: istanza di Workflow da eseguire
|
workflow: L'istanza di Workflow da eseguire
|
||||||
query: query dell'utente da passare al workflow
|
query: Gli input della query
|
||||||
events: dizionario di callback per eventi specifici (opzionale)
|
events: La lista di eventi e callback da gestire durante l'esecuzione.
|
||||||
Returns:
|
Yields:
|
||||||
La risposta generata dal workflow.
|
Aggiornamenti di stato e la risposta finale generata dal workflow.
|
||||||
"""
|
"""
|
||||||
iterator = await workflow.arun(query, stream=True, stream_intermediate_steps=True)
|
iterator = await workflow.arun(query, stream=True, stream_intermediate_steps=True)
|
||||||
|
|
||||||
content = None
|
content = None
|
||||||
|
|
||||||
async for event in iterator:
|
async for event in iterator:
|
||||||
step_name = getattr(event, 'step_name', '')
|
step_name = getattr(event, 'step_name', '')
|
||||||
|
|
||||||
|
# Chiama i listeners (se presenti) per ogni evento
|
||||||
for app_event, listener in events:
|
for app_event, listener in events:
|
||||||
if app_event.check_event(event.event, step_name):
|
if app_event.check_event(event.event, step_name):
|
||||||
|
The ellipsis '...' at the end of the string is inconsistent with other yield statements. Line 196 has three dots while lines 181, 183, 185, and 194 use '...' (ellipsis character). Consider using the ellipsis character for consistency. The ellipsis '...' at the end of the string is inconsistent with other yield statements. Line 196 has three dots while lines 181, 183, 185, and 194 use '...' (ellipsis character). Consider using the ellipsis character for consistency.
```suggestion
yield f"Sto usando uno strumento sconosciuto…"
```
[nitpick] The default error message 'Errore durante l'esecuzione del workflow.' may not be reached in practice since the stream should always yield at least one item (the error message from line 213). This initialization could be misleading. Consider using a more descriptive default or documenting why this fallback exists. [nitpick] The default error message 'Errore durante l'esecuzione del workflow.' may not be reached in practice since the stream should always yield at least one item (the error message from line 213). This initialization could be misleading. Consider using a more descriptive default or documenting why this fallback exists.
```suggestion
# Fallback: if the workflow yields no results, return a descriptive error.
final_result = "[Pipeline Error] Nessun risultato prodotto dal workflow. (Fallback: run_stream non ha generato output)"
```
|
|||||||
listener(event)
|
update = listener(event)
|
||||||
if event.event == WorkflowRunEvent.step_completed:
|
if update: yield update
|
||||||
|
|
||||||
|
# Salva il contenuto finale quando uno step è completato
|
||||||
|
if event.event == WorkflowRunEvent.step_completed.value:
|
||||||
content = getattr(event, 'content', '')
|
content = getattr(event, 'content', '')
|
||||||
|
|
||||||
|
# Restituisce la risposta finale
|
||||||
if content and isinstance(content, str):
|
if content and isinstance(content, str):
|
||||||
think_str = "</think>"
|
think_str = "</think>"
|
||||||
think = content.rfind(think_str)
|
think = content.rfind(think_str)
|
||||||
return content[(think + len(think_str)):] if think != -1 else content
|
yield content[(think + len(think_str)):] if think != -1 else content
|
||||||
if content and isinstance(content, QueryOutputs):
|
elif content and isinstance(content, QueryOutputs):
|
||||||
return content.response
|
yield content.response
|
||||||
|
else:
|
||||||
logging.error(f"No output from workflow: {content}")
|
logging.error(f"No output from workflow: {content}")
|
||||||
return "No output from workflow, something went wrong."
|
yield "Nessun output dal workflow, qualcosa è andato storto."
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
from agno.tools import Toolkit
|
from agno.tools import Toolkit
|
||||||
|
|
||||||
|
from app.agents.action_registry import friendly_action
|
||||||
from app.api.tools.instructions import MARKET_TOOL_INSTRUCTIONS
|
from app.api.tools.instructions import MARKET_TOOL_INSTRUCTIONS
|
||||||
from app.api.wrapper_handler import WrapperHandler
|
from app.api.wrapper_handler import WrapperHandler
|
||||||
from app.api.core.markets import MarketWrapper, Price, ProductInfo
|
from app.api.core.markets import MarketWrapper, Price, ProductInfo
|
||||||
@@ -40,6 +42,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@friendly_action("🔍 Recupero le informazioni sul prodotto richiesto...")
|
||||||
def get_product(self, asset_id: str) -> ProductInfo:
|
def get_product(self, asset_id: str) -> ProductInfo:
|
||||||
"""
|
"""
|
||||||
Gets product information for a *single* asset from the *first available* provider.
|
Gets product information for a *single* asset from the *first available* provider.
|
||||||
@@ -56,6 +59,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
|
|||||||
"""
|
"""
|
||||||
return self.handler.try_call(lambda w: w.get_product(asset_id))
|
return self.handler.try_call(lambda w: w.get_product(asset_id))
|
||||||
|
|
||||||
|
@friendly_action("📦 Recupero i dati su più asset...")
|
||||||
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
|
def get_products(self, asset_ids: list[str]) -> list[ProductInfo]:
|
||||||
"""
|
"""
|
||||||
Gets product information for a *list* of assets from the *first available* provider.
|
Gets product information for a *list* of assets from the *first available* provider.
|
||||||
@@ -72,6 +76,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
|
|||||||
"""
|
"""
|
||||||
return self.handler.try_call(lambda w: w.get_products(asset_ids))
|
return self.handler.try_call(lambda w: w.get_products(asset_ids))
|
||||||
|
|
||||||
|
@friendly_action("📊 Recupero i dati storici dei prezzi...")
|
||||||
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
|
def get_historical_prices(self, asset_id: str, limit: int = 100) -> list[Price]:
|
||||||
"""
|
"""
|
||||||
Gets historical price data for a *single* asset from the *first available* provider.
|
Gets historical price data for a *single* asset from the *first available* provider.
|
||||||
@@ -89,6 +94,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
|
|||||||
"""
|
"""
|
||||||
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
|
return self.handler.try_call(lambda w: w.get_historical_prices(asset_id, limit))
|
||||||
|
|
||||||
|
@friendly_action("🧩 Aggrego le informazioni da più fonti...")
|
||||||
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
|
def get_products_aggregated(self, asset_ids: list[str]) -> list[ProductInfo]:
|
||||||
"""
|
"""
|
||||||
Gets product information for multiple assets from *all available providers* and *aggregates* the results.
|
Gets product information for multiple assets from *all available providers* and *aggregates* the results.
|
||||||
@@ -109,6 +115,7 @@ class MarketAPIsTool(MarketWrapper, Toolkit):
|
|||||||
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
|
all_products = self.handler.try_call_all(lambda w: w.get_products(asset_ids))
|
||||||
return ProductInfo.aggregate(all_products)
|
return ProductInfo.aggregate(all_products)
|
||||||
|
|
||||||
|
@friendly_action("📈 Creo uno storico aggregato dei prezzi...")
|
||||||
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
|
def get_historical_prices_aggregated(self, asset_id: str = "BTC", limit: int = 100) -> list[Price]:
|
||||||
"""
|
"""
|
||||||
Gets historical price data for a single asset from *all available providers* and *aggregates* the results.
|
Gets historical price data for a single asset from *all available providers* and *aggregates* the results.
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
from agno.tools import Toolkit
|
from agno.tools import Toolkit
|
||||||
|
|
||||||
|
from app.agents.action_registry import friendly_action
|
||||||
from app.api.tools.instructions import NEWS_TOOL_INSTRUCTIONS
|
from app.api.tools.instructions import NEWS_TOOL_INSTRUCTIONS
|
||||||
from app.api.wrapper_handler import WrapperHandler
|
from app.api.wrapper_handler import WrapperHandler
|
||||||
from app.api.core.news import NewsWrapper, Article
|
from app.api.core.news import NewsWrapper, Article
|
||||||
@@ -42,6 +44,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@friendly_action("📰 Cerco le notizie principali...")
|
||||||
def get_top_headlines(self, limit: int = 100) -> list[Article]:
|
def get_top_headlines(self, limit: int = 100) -> list[Article]:
|
||||||
"""
|
"""
|
||||||
Retrieves top headlines from the *first available* news provider.
|
Retrieves top headlines from the *first available* news provider.
|
||||||
@@ -58,6 +61,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
|
|||||||
"""
|
"""
|
||||||
return self.handler.try_call(lambda w: w.get_top_headlines(limit))
|
return self.handler.try_call(lambda w: w.get_top_headlines(limit))
|
||||||
|
|
||||||
|
@friendly_action("🔎 Cerco notizie recenti sull'argomento...")
|
||||||
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
|
def get_latest_news(self, query: str, limit: int = 100) -> list[Article]:
|
||||||
"""
|
"""
|
||||||
Searches for the latest news on a specific topic from the *first available* provider.
|
Searches for the latest news on a specific topic from the *first available* provider.
|
||||||
@@ -75,6 +79,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
|
|||||||
"""
|
"""
|
||||||
return self.handler.try_call(lambda w: w.get_latest_news(query, limit))
|
return self.handler.try_call(lambda w: w.get_latest_news(query, limit))
|
||||||
|
|
||||||
|
@friendly_action("🗞️ Raccolgo le notizie principali da tutte le fonti...")
|
||||||
def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]:
|
def get_top_headlines_aggregated(self, limit: int = 100) -> dict[str, list[Article]]:
|
||||||
"""
|
"""
|
||||||
Retrieves top headlines from *all available providers* and aggregates the results.
|
Retrieves top headlines from *all available providers* and aggregates the results.
|
||||||
@@ -94,6 +99,7 @@ class NewsAPIsTool(NewsWrapper, Toolkit):
|
|||||||
"""
|
"""
|
||||||
return self.handler.try_call_all(lambda w: w.get_top_headlines(limit))
|
return self.handler.try_call_all(lambda w: w.get_top_headlines(limit))
|
||||||
|
|
||||||
|
@friendly_action("📚 Raccolgo notizie specifiche da tutte le fonti...")
|
||||||
def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]:
|
def get_latest_news_aggregated(self, query: str, limit: int = 100) -> dict[str, list[Article]]:
|
||||||
"""
|
"""
|
||||||
Searches for news on a specific topic from *all available providers* and aggregates the results.
|
Searches for news on a specific topic from *all available providers* and aggregates the results.
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ class PlanMemoryTool(Toolkit):
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.tasks: list[Task] = []
|
self.tasks: list[Task] = []
|
||||||
|
|
||||||
Toolkit.__init__(self, # type: ignore[call-arg]
|
Toolkit.__init__(self, # type: ignore[call-arg]
|
||||||
name="Plan Memory Toolkit",
|
name="Plan Memory Toolkit",
|
||||||
instructions=PLAN_MEMORY_TOOL_INSTRUCTIONS,
|
instructions=PLAN_MEMORY_TOOL_INSTRUCTIONS,
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
from agno.tools import Toolkit
|
from agno.tools import Toolkit
|
||||||
|
|
||||||
|
from app.agents.action_registry import friendly_action
|
||||||
from app.api.tools.instructions import SOCIAL_TOOL_INSTRUCTIONS
|
from app.api.tools.instructions import SOCIAL_TOOL_INSTRUCTIONS
|
||||||
from app.api.wrapper_handler import WrapperHandler
|
from app.api.wrapper_handler import WrapperHandler
|
||||||
from app.api.core.social import SocialPost, SocialWrapper
|
from app.api.core.social import SocialPost, SocialWrapper
|
||||||
@@ -41,6 +43,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit):
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@friendly_action("📱 Cerco i post più popolari sui social...")
|
||||||
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
|
def get_top_crypto_posts(self, limit: int = 5) -> list[SocialPost]:
|
||||||
"""
|
"""
|
||||||
Retrieves top cryptocurrency-related posts from the *first available* social media provider.
|
Retrieves top cryptocurrency-related posts from the *first available* social media provider.
|
||||||
@@ -57,6 +60,7 @@ class SocialAPIsTool(SocialWrapper, Toolkit):
|
|||||||
"""
|
"""
|
||||||
return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit))
|
return self.handler.try_call(lambda w: w.get_top_crypto_posts(limit))
|
||||||
|
|
||||||
|
@friendly_action("🌐 Raccolgo i post da tutte le piattaforme social...")
|
||||||
def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]:
|
def get_top_crypto_posts_aggregated(self, limit_per_wrapper: int = 5) -> dict[str, list[SocialPost]]:
|
||||||
"""
|
"""
|
||||||
Retrieves top cryptocurrency-related posts from *all available providers* and aggregates the results.
|
Retrieves top cryptocurrency-related posts from *all available providers* and aggregates the results.
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ class CryptoSymbolsTools(Toolkit):
|
|||||||
def __init__(self, cache_file: str = 'resources/cryptos.csv'):
|
def __init__(self, cache_file: str = 'resources/cryptos.csv'):
|
||||||
self.cache_file = cache_file
|
self.cache_file = cache_file
|
||||||
self.final_table = pd.read_csv(self.cache_file) if os.path.exists(self.cache_file) else pd.DataFrame() # type: ignore
|
self.final_table = pd.read_csv(self.cache_file) if os.path.exists(self.cache_file) else pd.DataFrame() # type: ignore
|
||||||
|
|
||||||
Toolkit.__init__(self, # type: ignore
|
Toolkit.__init__(self, # type: ignore
|
||||||
name="Crypto Symbols Tool",
|
name="Crypto Symbols Tool",
|
||||||
instructions=SYMBOLS_TOOL_INSTRUCTIONS,
|
instructions=SYMBOLS_TOOL_INSTRUCTIONS,
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
|
from typing import Any, Callable
|
||||||
import gradio as gr
|
import gradio as gr
|
||||||
from app.agents.pipeline import Pipeline, PipelineInputs
|
from app.agents.action_registry import get_user_friendly_action
|
||||||
|
from app.agents.pipeline import Pipeline, PipelineEvent, PipelineInputs
|
||||||
|
|
||||||
|
|
||||||
class ChatManager:
|
class ChatManager:
|
||||||
@@ -49,13 +51,28 @@ class ChatManager:
|
|||||||
########################################
|
########################################
|
||||||
# Funzioni Gradio
|
# Funzioni Gradio
|
||||||
########################################
|
########################################
|
||||||
def gradio_respond(self, message: str, history: list[tuple[str, str]]) -> str:
|
async def gradio_respond(self, message: str, history: list[tuple[str, str]]):
|
||||||
|
"""
|
||||||
|
Versione asincrona in streaming.
|
||||||
|
Produce (yield) aggiornamenti di stato e la risposta finale.
|
||||||
|
"""
|
||||||
self.inputs.user_query = message
|
self.inputs.user_query = message
|
||||||
pipeline = Pipeline(self.inputs)
|
pipeline = Pipeline(self.inputs)
|
||||||
response = pipeline.interact()
|
listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = [ # type: ignore
|
||||||
|
(PipelineEvent.QUERY_CHECK, lambda _: "🔍 Sto controllando la tua richiesta..."),
|
||||||
|
(PipelineEvent.INFO_RECOVERY, lambda _: "📊 Sto recuperando i dati (mercato, news, social)..."),
|
||||||
|
(PipelineEvent.REPORT_GENERATION, lambda _: "✍️ Sto scrivendo il report finale..."),
|
||||||
|
(PipelineEvent.TOOL_USED, lambda e: get_user_friendly_action(e.tool.tool_name))
|
||||||
|
]
|
||||||
|
|
||||||
self.history.append((message, response))
|
response = None
|
||||||
return response
|
async for chunk in pipeline.interact_stream(listeners=listeners):
|
||||||
|
response = chunk # Salva l'ultimo chunk (che sarà la risposta finale)
|
||||||
|
yield response # Restituisce l'aggiornamento (o la risposta finale) a Gradio
|
||||||
|
|
||||||
|
# Dopo che il generatore è completo, salva l'ultima risposta nello storico
|
||||||
|
if response:
|
||||||
|
self.history.append((message, response))
|
||||||
|
|
||||||
def gradio_save(self) -> str:
|
def gradio_save(self) -> str:
|
||||||
self.save_chat("chat.json")
|
self.save_chat("chat.json")
|
||||||
@@ -72,7 +89,7 @@ class ChatManager:
|
|||||||
|
|
||||||
|
|
||||||
def gradio_build_interface(self) -> gr.Blocks:
|
def gradio_build_interface(self) -> gr.Blocks:
|
||||||
with gr.Blocks() as interface:
|
with gr.Blocks(fill_height=True, fill_width=True) as interface:
|
||||||
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)")
|
gr.Markdown("# 🤖 Agente di Analisi e Consulenza Crypto (Chat)")
|
||||||
|
|
||||||
# --- Prepara le etichette di default per i dropdown
|
# --- Prepara le etichette di default per i dropdown
|
||||||
|
|||||||
@@ -271,7 +271,7 @@ class TelegramApp:
|
|||||||
await bot.delete_message(chat_id=chat_id, message_id=update.message.id)
|
await bot.delete_message(chat_id=chat_id, message_id=update.message.id)
|
||||||
|
|
||||||
def update_user(update_step: str = "") -> None:
|
def update_user(update_step: str = "") -> None:
|
||||||
if update_step: run_message.update_step(update_step)
|
if update_step: run_message.update_step_with_tool(update_step)
|
||||||
else: run_message.update()
|
else: run_message.update()
|
||||||
|
|
||||||
message = run_message.get_latest()
|
message = run_message.get_latest()
|
||||||
@@ -280,11 +280,11 @@ class TelegramApp:
|
|||||||
|
|
||||||
await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
|
await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
|
||||||
pipeline = Pipeline(inputs)
|
pipeline = Pipeline(inputs)
|
||||||
report_content = await pipeline.interact_async(listeners=[
|
report_content = await pipeline.interact(listeners=[
|
||||||
(PipelineEvent.QUERY_CHECK, lambda _: update_user()),
|
(PipelineEvent.QUERY_CHECK_END, lambda _: update_user()),
|
||||||
(PipelineEvent.TOOL_USED, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))),
|
(PipelineEvent.TOOL_USED_END, lambda e: update_user(e.tool.tool_name.replace('get_', '').replace("_", "\\_"))),
|
||||||
(PipelineEvent.INFO_RECOVERY, lambda _: update_user()),
|
(PipelineEvent.INFO_RECOVERY_END, lambda _: update_user()),
|
||||||
(PipelineEvent.REPORT_GENERATION, lambda _: update_user()),
|
(PipelineEvent.REPORT_GENERATION_END, lambda _: update_user()),
|
||||||
])
|
])
|
||||||
|
|
||||||
# attach report file to the message
|
# attach report file to the message
|
||||||
|
|||||||
Sarebbe più corretto metterlo dentro il file core.py dato che ci sono tutte le interazioni core con la pipeline.
O ancor meglio che sia un decorator da mettere su ogni funzione di cui si vuole avere una descrizione e che aggiorna il registro, che sia dentro la classe RunMessage o che sia libero
Stavo tentando di farne un decorator, ma sfruttando il decorator @tool di agno che però sminchiava il Toolkit. Alla fine ho fatto questo accrocchio che non mi soddisfa in pieno, ma non avevo più tempo e funziona.
Non chiudere ancora la pull request che domani provo a fare un decoratore custom così evitiamo quel registro che ho improvvisato