Update chat interface (#70)
* Update chat interface to fill height and width in Gradio blocks * Implement asynchronous streaming for Gradio responses and enhance pipeline event handling * Refactor tool event handling to provide user-friendly messages and add utility function for descriptive tool actions
This commit was merged in pull request #70.
This commit is contained in:
@@ -1,8 +1,7 @@
|
||||
import asyncio
|
||||
from enum import Enum
|
||||
import logging
|
||||
import random
|
||||
from typing import Any, Callable
|
||||
from typing import Any, AsyncGenerator, Callable
|
||||
from agno.agent import RunEvent
|
||||
from agno.run.workflow import WorkflowRunEvent
|
||||
from agno.workflow.types import StepInput, StepOutput
|
||||
@@ -13,28 +12,34 @@ from app.agents.core import *
|
||||
logging = logging.getLogger("pipeline")
|
||||
|
||||
|
||||
|
||||
class PipelineEvent(str, Enum):
|
||||
QUERY_CHECK = "Query Check"
|
||||
QUERY_ANALYZER = "Query Analyzer"
|
||||
QUERY_CHECK_END = "Query Check End"
|
||||
INFO_RECOVERY = "Info Recovery"
|
||||
INFO_RECOVERY_END = "Info Recovery End"
|
||||
REPORT_GENERATION = "Report Generation"
|
||||
REPORT_TRANSLATION = "Report Translation"
|
||||
RUN_FINISHED = WorkflowRunEvent.workflow_completed.value
|
||||
TOOL_USED = RunEvent.tool_call_completed.value
|
||||
REPORT_GENERATION_END = "Report Generation End"
|
||||
TOOL_USED = RunEvent.tool_call_started.value
|
||||
TOOL_USED_END = RunEvent.tool_call_completed.value
|
||||
RUN_END = WorkflowRunEvent.workflow_completed.value
|
||||
|
||||
def check_event(self, event: str, step_name: str) -> bool:
|
||||
return event == self.value or (WorkflowRunEvent.step_completed == event and step_name == self.value)
|
||||
if event == self.value:
|
||||
return True
|
||||
|
||||
index = self.value.rfind(" End")
|
||||
value = self.value[:index] if index > -1 else self.value
|
||||
step_state = WorkflowRunEvent.step_completed if index > -1 else WorkflowRunEvent.step_started
|
||||
return step_name == value and step_state == event
|
||||
|
||||
@classmethod
|
||||
def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], None]]]:
|
||||
def get_log_events(cls, run_id: int) -> list[tuple['PipelineEvent', Callable[[Any], str | None]]]:
|
||||
return [
|
||||
(PipelineEvent.QUERY_CHECK, lambda _: logging.info(f"[{run_id}] Query Check completed.")),
|
||||
(PipelineEvent.QUERY_ANALYZER, lambda _: logging.info(f"[{run_id}] Query Analyzer completed.")),
|
||||
(PipelineEvent.INFO_RECOVERY, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")),
|
||||
(PipelineEvent.REPORT_GENERATION, lambda _: logging.info(f"[{run_id}] Report Generation completed.")),
|
||||
(PipelineEvent.TOOL_USED, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")),
|
||||
(PipelineEvent.RUN_FINISHED, lambda _: logging.info(f"[{run_id}] Run completed.")),
|
||||
(PipelineEvent.QUERY_CHECK_END, lambda _: logging.info(f"[{run_id}] Query Check completed.")),
|
||||
(PipelineEvent.INFO_RECOVERY_END, lambda _: logging.info(f"[{run_id}] Info Recovery completed.")),
|
||||
(PipelineEvent.REPORT_GENERATION_END, lambda _: logging.info(f"[{run_id}] Report Generation completed.")),
|
||||
(PipelineEvent.TOOL_USED_END, lambda e: logging.info(f"[{run_id}] Tool used [{e.tool.tool_name} {e.tool.tool_args}] by {e.agent_name}.")),
|
||||
(PipelineEvent.RUN_END, lambda _: logging.info(f"[{run_id}] Run completed.")),
|
||||
]
|
||||
|
||||
|
||||
@@ -53,7 +58,7 @@ class Pipeline:
|
||||
"""
|
||||
self.inputs = inputs
|
||||
|
||||
def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str:
|
||||
async def interact(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> str:
|
||||
"""
|
||||
Esegue la pipeline di agenti per rispondere alla query dell'utente.
|
||||
Args:
|
||||
@@ -61,9 +66,12 @@ class Pipeline:
|
||||
Returns:
|
||||
La risposta generata dalla pipeline.
|
||||
"""
|
||||
return asyncio.run(self.interact_async(listeners))
|
||||
response = ""
|
||||
async for chunk in self.interact_stream(listeners):
|
||||
response = chunk
|
||||
return response
|
||||
|
||||
async def interact_async(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []) -> str:
|
||||
async def interact_stream(self, listeners: list[tuple[PipelineEvent, Callable[[Any], str | None]]] = []) -> AsyncGenerator[str, None]:
|
||||
"""
|
||||
Versione asincrona che esegue la pipeline di agenti per rispondere alla query dell'utente.
|
||||
Args:
|
||||
@@ -81,9 +89,8 @@ class Pipeline:
|
||||
)
|
||||
|
||||
workflow = self.build_workflow()
|
||||
result = await self.run(workflow, query, events=events)
|
||||
return result
|
||||
|
||||
async for item in self.run_stream(workflow, query, events=events):
|
||||
yield item
|
||||
|
||||
def build_workflow(self) -> Workflow:
|
||||
"""
|
||||
@@ -99,7 +106,8 @@ class Pipeline:
|
||||
# Step 2: Crea gli steps
|
||||
def condition_query_ok(step_input: StepInput) -> StepOutput:
|
||||
val = step_input.previous_step_content
|
||||
return StepOutput(stop=not val.is_crypto) if isinstance(val, QueryOutputs) else StepOutput(stop=True)
|
||||
stop = (not val.is_crypto) if isinstance(val, QueryOutputs) else True
|
||||
return StepOutput(stop=stop)
|
||||
|
||||
query_check = Step(name=PipelineEvent.QUERY_CHECK, agent=query_check)
|
||||
info_recovery = Step(name=PipelineEvent.INFO_RECOVERY, team=team)
|
||||
@@ -114,33 +122,39 @@ class Pipeline:
|
||||
])
|
||||
|
||||
@classmethod
|
||||
async def run(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str:
|
||||
async def run_stream(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], str | None]]]) -> AsyncGenerator[str, None]:
|
||||
"""
|
||||
Esegue il workflow e gestisce gli eventi tramite le callback fornite.
|
||||
Esegue il workflow e restituisce gli eventi di stato e il risultato finale.
|
||||
Args:
|
||||
workflow: istanza di Workflow da eseguire
|
||||
query: query dell'utente da passare al workflow
|
||||
events: dizionario di callback per eventi specifici (opzionale)
|
||||
Returns:
|
||||
La risposta generata dal workflow.
|
||||
workflow: L'istanza di Workflow da eseguire
|
||||
query: Gli input della query
|
||||
events: La lista di eventi e callback da gestire durante l'esecuzione.
|
||||
Yields:
|
||||
Aggiornamenti di stato e la risposta finale generata dal workflow.
|
||||
"""
|
||||
iterator = await workflow.arun(query, stream=True, stream_intermediate_steps=True)
|
||||
|
||||
content = None
|
||||
|
||||
async for event in iterator:
|
||||
step_name = getattr(event, 'step_name', '')
|
||||
|
||||
# Chiama i listeners (se presenti) per ogni evento
|
||||
for app_event, listener in events:
|
||||
if app_event.check_event(event.event, step_name):
|
||||
listener(event)
|
||||
if event.event == WorkflowRunEvent.step_completed:
|
||||
update = listener(event)
|
||||
if update: yield update
|
||||
|
||||
# Salva il contenuto finale quando uno step è completato
|
||||
if event.event == WorkflowRunEvent.step_completed.value:
|
||||
content = getattr(event, 'content', '')
|
||||
|
||||
# Restituisce la risposta finale
|
||||
if content and isinstance(content, str):
|
||||
think_str = "</think>"
|
||||
think = content.rfind(think_str)
|
||||
return content[(think + len(think_str)):] if think != -1 else content
|
||||
if content and isinstance(content, QueryOutputs):
|
||||
return content.response
|
||||
|
||||
logging.error(f"No output from workflow: {content}")
|
||||
return "No output from workflow, something went wrong."
|
||||
yield content[(think + len(think_str)):] if think != -1 else content
|
||||
elif content and isinstance(content, QueryOutputs):
|
||||
yield content.response
|
||||
else:
|
||||
logging.error(f"No output from workflow: {content}")
|
||||
yield "Nessun output dal workflow, qualcosa è andato storto."
|
||||
|
||||
Reference in New Issue
Block a user