Implement asynchronous streaming for Gradio responses and enhance pipeline event handling

This commit is contained in:
trojanhorse47
2025-10-30 14:26:12 +01:00
parent 5297bf8a9a
commit 0799a4ab08
3 changed files with 109 additions and 24 deletions

View File

@@ -45,28 +45,28 @@ class PipelineInputs:
"""
Sceglie il modello LLM da usare per l'analizzatore di query.
"""
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
self.query_analyzer_model = self.configs.models.all_models[index]
def choose_team_leader(self, index: int):
"""
Sceglie il modello LLM da usare per il Team Leader.
"""
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
self.team_leader_model = self.configs.models.all_models[index]
def choose_team(self, index: int):
"""
Sceglie il modello LLM da usare per il Team.
"""
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
self.team_model = self.configs.models.all_models[index]
def choose_report_generator(self, index: int):
"""
Sceglie il modello LLM da usare per il generatore di report.
"""
assert index >= 0 and index < len(self.configs.models.all_models), "Index out of range for models list."
assert 0 <= index < len(self.configs.models.all_models), "Index out of range for models list."
self.report_generation_model = self.configs.models.all_models[index]
def choose_strategy(self, index: int):
@@ -111,6 +111,7 @@ class PipelineInputs:
name="CryptoAnalysisTeam",
tools=[ReasoningTools(), PlanMemoryTool(), CryptoSymbolsTools()],
members=[market_agent, news_agent, social_agent],
stream_intermediate_steps=True
)
def get_agent_query_checker(self) -> Agent:

View File

@@ -84,6 +84,25 @@ class Pipeline:
result = await self.run(workflow, query, events=events)
return result
async def interact_stream(self, listeners: list[tuple[PipelineEvent, Callable[[Any], None]]] = []):
"""
Versione asincrona in streaming che ESEGUE (yield) la pipeline,
restituendo gli aggiornamenti di stato e il risultato finale.
"""
run_id = random.randint(1000, 9999) # Per tracciare i log
logging.info(f"[{run_id}] Pipeline query: {self.inputs.user_query}")
events = [*PipelineEvent.get_log_events(run_id), *listeners]
query = QueryInputs(
user_query=self.inputs.user_query,
strategy=self.inputs.strategy.description
)
workflow = self.build_workflow()
# Delega al classmethod 'run_stream' per lo streaming
async for item in self.run_stream(workflow, query, events=events):
yield item
def build_workflow(self) -> Workflow:
"""
@@ -114,33 +133,88 @@ class Pipeline:
])
@classmethod
async def run(cls, workflow: Workflow, query: QueryInputs, events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str:
async def run(cls, workflow: Workflow, query: QueryInputs,
events: list[tuple[PipelineEvent, Callable[[Any], None]]]) -> str:
"""
Esegue il workflow e gestisce gli eventi tramite le callback fornite.
Args:
workflow: istanza di Workflow da eseguire
query: query dell'utente da passare al workflow
events: dizionario di callback per eventi specifici (opzionale)
Returns:
La risposta generata dal workflow.
Esegue il workflow e gestisce gli eventi, restituendo solo il risultato finale.
Consuma il generatore 'run_stream'.
"""
final_result = "Errore durante l'esecuzione del workflow."
# Consuma il generatore e salva solo l'ultimo item
async for item in cls.run_stream(workflow, query, events):
final_result = item
return final_result
@classmethod
async def run_stream(cls, workflow: Workflow, query: QueryInputs,
events: list[tuple[PipelineEvent, Callable[[Any], None]]]):
"""
Esegue il workflow e restituisce gli eventi di stato e il risultato finale.
"""
iterator = await workflow.arun(query, stream=True, stream_intermediate_steps=True)
content = None
current_active_step = None
async for event in iterator:
step_name = getattr(event, 'step_name', '')
# 1. Chiama i listeners (per i log)
for app_event, listener in events:
if app_event.check_event(event.event, step_name):
listener(event)
if event.event == WorkflowRunEvent.step_completed:
# 2. Restituisce gli aggiornamenti di stato per Gradio
if event.event == WorkflowRunEvent.step_started.value:
current_active_step = step_name
if step_name == PipelineEvent.QUERY_CHECK.value:
yield "🔍 Sto controllando la tua richiesta..."
elif step_name == PipelineEvent.INFO_RECOVERY.value:
yield "📊 Sto recuperando i dati (mercato, news, social)..."
elif step_name == PipelineEvent.REPORT_GENERATION.value:
yield "✍️ Sto scrivendo il report finale..."
# Gestisce i tool usati da agenti singoli (come Query Check)
elif event.event == WorkflowRunEvent.step_output.value:
agent_event = event.content
if hasattr(agent_event, 'event') and agent_event.event == RunEvent.tool_call_completed.value:
tool_name = getattr(agent_event.tool, 'tool_name', 'uno strumento')
yield f"🛠️ Sto usando lo strumento: {tool_name}..."
# Gestisce i tool usati da agenti interni al team (come CustomEvent)
elif event.event == WorkflowRunEvent.custom_event.value:
custom_content = getattr(event, 'content', None)
if custom_content and hasattr(custom_content, 'event'):
agent_event = custom_content
if agent_event.event == RunEvent.tool_call_completed.value:
if step_name == PipelineEvent.INFO_RECOVERY.value:
tool_name = getattr(agent_event.tool, 'tool_name', 'uno strumento')
yield f"🛠️ (Team) Sto usando lo strumento: {tool_name}..."
# Gestisce gli eventi di tool promossi dal Team
elif event.event == PipelineEvent.TOOL_USED.value:
# Ci assicuriamo che l'evento provenga dallo step corretto
if current_active_step == PipelineEvent.INFO_RECOVERY.value:
tool_object = getattr(event, 'tool', None)
if tool_object:
tool_name = getattr(tool_object, 'tool_name', 'uno strumento')
yield f"🛠️ (Team) Sto usando lo strumento: {tool_name}..."
else:
yield f"🛠️ (Team) Sto usando uno strumento sconosciuto..."
# 3. Salva il contenuto finale quando uno step è completato
elif event.event == WorkflowRunEvent.step_completed.value:
current_active_step = None
content = getattr(event, 'content', '')
# 4. Restituisce la risposta finale
if content and isinstance(content, str):
think_str = "</think>"
think = content.rfind(think_str)
return content[(think + len(think_str)):] if think != -1 else content
if content and isinstance(content, QueryOutputs):
return content.response
logging.error(f"No output from workflow: {content}")
return "No output from workflow, something went wrong."
final_answer = content[(think + len(think_str)):] if think != -1 else content
yield final_answer
elif content and isinstance(content, QueryOutputs):
yield content.response
else:
logging.error(f"No output from workflow: {content}")
yield "Nessun output dal workflow, qualcosa è andato storto."

View File

@@ -49,13 +49,23 @@ class ChatManager:
########################################
# Funzioni Gradio
########################################
def gradio_respond(self, message: str, history: list[tuple[str, str]]) -> str:
async def gradio_respond(self, message: str, history: list[tuple[str, str]]):
"""
Versione asincrona in streaming.
Produce (yield) aggiornamenti di stato e la risposta finale.
"""
self.inputs.user_query = message
pipeline = Pipeline(self.inputs)
response = pipeline.interact()
self.history.append((message, response))
return response
response = None
# Itera sul nuovo generatore asincrono
async for chunk in pipeline.interact_stream():
response = chunk # Salva l'ultimo chunk (che sarà la risposta finale)
yield response # Restituisce l'aggiornamento (o la risposta finale) a Gradio
# Dopo che il generatore è completo, salva l'ultima risposta nello storico
if response:
self.history.append((message, response))
def gradio_save(self) -> str:
self.save_chat("chat.json")