Improved telegram user waiting
This commit is contained in:
@@ -133,3 +133,41 @@ class PipelineInputs:
|
|||||||
social_tool = SocialAPIsTool()
|
social_tool = SocialAPIsTool()
|
||||||
social_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds)
|
social_tool.handler.set_retries(api.retry_attempts, api.retry_delay_seconds)
|
||||||
return market_tool, news_tool, social_tool
|
return market_tool, news_tool, social_tool
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return "\n".join([
|
||||||
|
f"Query Check: {self.query_analyzer_model.label}",
|
||||||
|
f"Team Leader: {self.team_leader_model.label}",
|
||||||
|
f"Team: {self.team_model.label}",
|
||||||
|
f"Report: {self.report_generation_model.label}",
|
||||||
|
f"Strategy: {self.strategy.label}",
|
||||||
|
f"User Query: \"{self.user_query}\"",
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
class RunMessage:
|
||||||
|
def __init__(self, inputs: PipelineInputs, prefix: str = "", suffix: str = ""):
|
||||||
|
self.base_message = f"Running configurations: \n{prefix}{inputs}{suffix}\n\n"
|
||||||
|
self.emojis = ['🔳', '➡️', '✅']
|
||||||
|
self.placeholder = '<<<>>>'
|
||||||
|
self.current = 0
|
||||||
|
self.steps_total = [
|
||||||
|
(f"{self.placeholder} Query Check", 1),
|
||||||
|
(f"{self.placeholder} Info Recovery", 0),
|
||||||
|
(f"{self.placeholder} Report Generation", 0),
|
||||||
|
]
|
||||||
|
|
||||||
|
def update(self) -> 'RunMessage':
|
||||||
|
text_curr, state_curr = self.steps_total[self.current]
|
||||||
|
self.steps_total[self.current] = (text_curr, state_curr + 1)
|
||||||
|
self.current += 1
|
||||||
|
if self.current < len(self.steps_total):
|
||||||
|
text_curr, state_curr = self.steps_total[self.current]
|
||||||
|
self.steps_total[self.current] = (text_curr, state_curr + 1)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def get_latest(self, extra: str = "") -> str:
|
||||||
|
steps = [msg.replace(self.placeholder, self.emojis[state]) for msg, state in self.steps_total]
|
||||||
|
if extra:
|
||||||
|
steps[self.current] = f"{steps[self.current]}: {extra}"
|
||||||
|
return self.base_message + "\n".join(steps)
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import asyncio
|
||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
@@ -10,7 +11,7 @@ from markdown_pdf import MarkdownPdf, Section
|
|||||||
from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User
|
from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, Update, User
|
||||||
from telegram.constants import ChatAction
|
from telegram.constants import ChatAction
|
||||||
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, ConversationHandler, MessageHandler, filters
|
from telegram.ext import Application, CallbackQueryHandler, CommandHandler, ContextTypes, ConversationHandler, MessageHandler, filters
|
||||||
from app.agents.pipeline import Pipeline, PipelineInputs
|
from app.agents.pipeline import Pipeline, PipelineEvent, PipelineInputs, RunMessage
|
||||||
|
|
||||||
# per per_message di ConversationHandler che rompe sempre qualunque input tu metta
|
# per per_message di ConversationHandler che rompe sempre qualunque input tu metta
|
||||||
warnings.filterwarnings("ignore")
|
warnings.filterwarnings("ignore")
|
||||||
@@ -233,7 +234,7 @@ class TelegramApp:
|
|||||||
|
|
||||||
async def __select_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
async def __select_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||||||
query, user = await self.handle_callbackquery(update)
|
query, user = await self.handle_callbackquery(update)
|
||||||
logging.info(f"@{user.username} --> {query.data}")
|
logging.debug(f"@{user.username} --> {query.data}")
|
||||||
|
|
||||||
req = self.user_requests[user]
|
req = self.user_requests[user]
|
||||||
_, state, index = str(query.data).split(QUERY_SEP)
|
_, state, index = str(query.data).split(QUERY_SEP)
|
||||||
@@ -264,36 +265,33 @@ class TelegramApp:
|
|||||||
msg_id = update.message.message_id - 1
|
msg_id = update.message.message_id - 1
|
||||||
chat_id = update.message.chat_id
|
chat_id = update.message.chat_id
|
||||||
|
|
||||||
configs_str = [
|
run_message = RunMessage(inputs, prefix="```\n", suffix="\n```")
|
||||||
'Running with configurations: ',
|
msg = await bot.edit_message_text(chat_id=chat_id, message_id=msg_id, text=run_message.get_latest(), parse_mode='MarkdownV2')
|
||||||
f'Check: {inputs.query_analyzer_model.label}',
|
|
||||||
f'Leader: {inputs.team_leader_model.label}',
|
|
||||||
f'Team: {inputs.team_model.label}',
|
|
||||||
f'Report: {inputs.report_generation_model.label}',
|
|
||||||
f'Strategy: {inputs.strategy.label}',
|
|
||||||
f'Query: "{inputs.user_query}"'
|
|
||||||
]
|
|
||||||
full_message = f"""```\n{'\n'.join(configs_str)}\n```\n\n"""
|
|
||||||
first_message = full_message + "Generating report, please wait"
|
|
||||||
msg = await bot.edit_message_text(chat_id=chat_id, message_id=msg_id, text=first_message, parse_mode='MarkdownV2')
|
|
||||||
if isinstance(msg, bool): return
|
if isinstance(msg, bool): return
|
||||||
|
|
||||||
# Remove user query and bot message
|
# Remove user query and bot message
|
||||||
await bot.delete_message(chat_id=chat_id, message_id=update.message.id)
|
await bot.delete_message(chat_id=chat_id, message_id=update.message.id)
|
||||||
|
|
||||||
# TODO migliorare messaggi di attesa
|
def update_user(update: bool = True, extra: str = "") -> None:
|
||||||
|
if update: run_message.update()
|
||||||
|
message = run_message.get_latest(extra)
|
||||||
|
if msg.text != message:
|
||||||
|
asyncio.create_task(msg.edit_text(message, parse_mode='MarkdownV2'))
|
||||||
|
|
||||||
await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
|
await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING)
|
||||||
pipeline = Pipeline(inputs)
|
pipeline = Pipeline(inputs)
|
||||||
report_content = await pipeline.interact_async()
|
report_content = await pipeline.interact_async(listeners=[
|
||||||
await msg.delete()
|
(PipelineEvent.QUERY_CHECK, lambda _: update_user()),
|
||||||
|
(PipelineEvent.TOOL_USED, lambda e: update_user(False, f"`{e.agent_name} {e.tool.tool_name}`")),
|
||||||
|
(PipelineEvent.INFO_RECOVERY, lambda _: update_user()),
|
||||||
|
(PipelineEvent.REPORT_GENERATION, lambda _: update_user()),
|
||||||
|
])
|
||||||
|
|
||||||
# attach report file to the message
|
# attach report file to the message
|
||||||
pdf = MarkdownPdf(toc_level=2, optimize=True)
|
pdf = MarkdownPdf(toc_level=2, optimize=True)
|
||||||
pdf.add_section(Section(report_content, toc=False))
|
pdf.add_section(Section(report_content, toc=False))
|
||||||
|
|
||||||
# TODO vedere se ha senso dare il pdf o solo il messaggio
|
|
||||||
document = io.BytesIO()
|
document = io.BytesIO()
|
||||||
pdf.save_bytes(document)
|
pdf.save_bytes(document)
|
||||||
document.seek(0)
|
document.seek(0)
|
||||||
await bot.send_document(chat_id=chat_id, document=document, filename="report.pdf", parse_mode='MarkdownV2', caption=full_message)
|
await msg.reply_document(document=document, filename="report.pdf")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user