diff --git a/helpers/integration_commands.py b/helpers/integration_commands.py index d78f3ff6ee..24a78e60f5 100644 --- a/helpers/integration_commands.py +++ b/helpers/integration_commands.py @@ -1,6 +1,7 @@ from __future__ import annotations import re +from dataclasses import dataclass from typing import TYPE_CHECKING from helpers import message_queue as mq @@ -14,7 +15,93 @@ _CLEAR_VALUES = {"", "default", "none", "clear", "off"} -_SUPPORTED_COMMANDS = {"/send", "/queue", "/project", "/config", "/preset"} + + +@dataclass(frozen=True) +class IntegrationCommandDef: + name: str + description: str + category: str + aliases: tuple[str, ...] = () + args_hint: str = "" + menu: bool = True + integrations: tuple[str, ...] = () + + +COMMAND_REGISTRY: tuple[IntegrationCommandDef, ...] = ( + IntegrationCommandDef("commands", "Show all integration commands.", "Info", aliases=("help",)), + IntegrationCommandDef( + "status", + "Show this chat's project, model, agent, and queue state.", + "Info", + ), + IntegrationCommandDef("new", "Start a fresh chat context.", "Session"), + IntegrationCommandDef( + "sessions", + "Show or switch recent chat sessions.", + "Session", + aliases=("session",), + integrations=("telegram",), + ), + IntegrationCommandDef("clear", "Reset the current chat context.", "Session", aliases=("reset",)), + IntegrationCommandDef( + "queue", + "Show or manage queued messages.", + "Session", + args_hint="[send|clear]", + ), + IntegrationCommandDef("send", "Send queued messages now.", "Session", aliases=("push",)), + IntegrationCommandDef( + "steer", + "Intervene in the currently running task.", + "Session", + args_hint="", + ), + IntegrationCommandDef("pause", "Pause the active run.", "Session"), + IntegrationCommandDef("resume", "Resume a paused run.", "Session"), + IntegrationCommandDef("nudge", "Nudge the active run.", "Session"), + IntegrationCommandDef( + "stream", + "Enable or disable Telegram response streaming.", + "Configuration", + args_hint="[on|off]", + integrations=("telegram",), + ), + IntegrationCommandDef( + "tools", + "Show or hide Telegram tool progress.", + "Configuration", + args_hint="[on|off]", + integrations=("telegram",), + ), + IntegrationCommandDef( + "project", + "Show or switch the active project.", + "Configuration", + args_hint="[name|none]", + ), + IntegrationCommandDef( + "model", + "Show or switch the chat model preset.", + "Configuration", + aliases=("config", "preset"), + args_hint="[preset|default]", + ), + IntegrationCommandDef( + "agent", + "Show or switch the agent profile.", + "Configuration", + aliases=("profile",), + args_hint="[profile]", + ), +) + + +_COMMAND_LOOKUP = { + f"/{name}": command + for command in COMMAND_REGISTRY + for name in (command.name, *command.aliases) +} def extract_command_line(text: str) -> str: @@ -26,36 +113,132 @@ def extract_command_line(text: str) -> str: return "" -def parse_command(text: str) -> tuple[str, str] | None: +def parse_command(text: str, *, integration: str | None = None) -> tuple[str, str] | None: line = extract_command_line(text) if not line.startswith("/"): return None command, _, args = line.partition(" ") - command = command.strip().lower() - if command not in _SUPPORTED_COMMANDS: + command = _normalize_command_token(command) + resolved = resolve_command(command, integration=integration) + if not resolved: return None - return command, args.strip() + return f"/{resolved.name}", args.strip() + + +def resolve_command(command: str, *, integration: str | None = None) -> IntegrationCommandDef | None: + normalized = _normalize_command_token(command) + if not normalized.startswith("/"): + normalized = f"/{normalized}" + command_def = _COMMAND_LOOKUP.get(normalized) + if not command_def or not _is_command_available(command_def, integration): + return None + return command_def + + +def telegram_menu_commands() -> list[tuple[str, str]]: + return [ + (command.name, _telegram_description(command)) + for command in COMMAND_REGISTRY + if command.menu and _is_command_available(command, "telegram") + ] -def try_handle_command(context: "AgentContext", text: str) -> str | None: - parsed = parse_command(text) +def command_names(include_aliases: bool = True, *, integration: str | None = None) -> list[str]: + names: list[str] = [] + for command in COMMAND_REGISTRY: + if not _is_command_available(command, integration): + continue + names.append(command.name) + if include_aliases: + names.extend(command.aliases) + return names + + +def help_text(*, full: bool = False, integration: str | None = None) -> str: + commands = tuple( + command + for command in COMMAND_REGISTRY + if _is_command_available(command, integration) and (full or command.menu) + ) + lines = ["Available commands:"] + for command in commands: + args = f" {command.args_hint}" if command.args_hint else "" + alias_text = "" + if command.aliases: + alias_text = f" (alias: {', '.join('/' + alias for alias in command.aliases)})" + lines.append(f"/{command.name}{args} - {command.description}{alias_text}") + return "\n".join(lines) + + +def unknown_command_text(command: str, *, integration: str | None = None) -> str: + token = _normalize_command_token(command).split(" ", 1)[0] + return f"Unknown command: {token}\n\n{help_text(full=True, integration=integration)}" + + +def try_handle_command( + context: "AgentContext", + text: str, + *, + integration: str | None = None, +) -> str | None: + parsed = parse_command(text, integration=integration) if not parsed: return None command, args = parsed + if command == "/commands": + return help_text(full=True, integration=integration) + if command == "/status": + return _handle_status(context) + if command == "/sessions": + return _handle_sessions(context) + if command in {"/new", "/clear"}: + return _handle_clear(context, new_chat=(command == "/new")) if command == "/send": return _handle_queue(context, "send") if command == "/queue": return _handle_queue(context, args) + if command == "/steer": + return _handle_steer(context, args) + if command == "/pause": + return _handle_pause(context) + if command == "/resume": + return _handle_resume(context) + if command == "/nudge": + return _handle_nudge(context) + if command == "/stream": + return _handle_toggle(context, args, "telegram_stream_enabled", "Response streaming") + if command == "/tools": + return _handle_toggle(context, args, "telegram_tools_enabled", "Tool progress") if command == "/project": return _handle_project(context, args) - if command in {"/config", "/preset"}: - return _handle_config(context, args) + if command == "/model": + return _handle_model(context, args) + if command == "/agent": + return _handle_agent(context, args) return None +def _normalize_command_token(command: str) -> str: + normalized = command.strip().lower() + if not normalized: + return "" + token, *rest = normalized.split(" ", 1) + if "@" in token: + token = token.split("@", 1)[0] + return f"{token} {rest[0]}".strip() if rest else token + + +def _is_command_available(command: IntegrationCommandDef, integration: str | None) -> bool: + if not command.integrations: + return True + if not integration: + return False + return integration.lower() in command.integrations + + def _handle_queue(context: "AgentContext", args: str) -> str: queue = mq.get_queue(context) count = len(queue) @@ -68,8 +251,13 @@ def _handle_queue(context: "AgentContext", args: str) -> str: "Use /send or /queue send to send everything as one batch." ) + if action in {"clear", "reset"}: + mq.remove(context) + mark_dirty_for_context(context.id, reason="integration_commands.queue_clear") + return "Queue cleared." + if action not in {"send", "all"}: - return "Unknown queue action. Use /queue send to flush the queue." + return "Unknown queue action. Use /queue send to flush or /queue clear to clear." if count == 0: return "Queue is empty." @@ -80,6 +268,91 @@ def _handle_queue(context: "AgentContext", args: str) -> str: return f"Sent {sent_count} queued {noun} as one batch." +def _handle_status(context: "AgentContext") -> str: + project_name = context.get_data("project") or "none" + override = context.get_data("chat_model_override") + agent_profile = getattr(context.agent0.config, "profile", "default") + running = "running" if context.is_running() else "idle" + if getattr(context, "paused", False): + running = "paused" + queue_count = len(mq.get_queue(context)) + return ( + f"Status: {running}\n" + f"Project: {project_name}\n" + f"Model: {_describe_override(override)}\n" + f"Agent: {agent_profile}\n" + f"Queued messages: {queue_count}" + ) + + +def _handle_sessions(context: "AgentContext") -> str: + from agent import AgentContext + + contexts = sorted( + AgentContext.all(), + key=lambda item: str(item.output().get("last_message") or ""), + reverse=True, + ) + lines = ["Recent sessions:"] + for item in contexts[:4]: + marker = " (current)" if item.id == context.id else "" + running = " - running" if item.is_running() else "" + lines.append(f"- {item.name or item.id}{marker}{running}") + if len(contexts) > 4: + lines.append(f"And {len(contexts) - 4} more. Use Telegram buttons to page through them.") + return "\n".join(lines) + + +def _handle_clear(context: "AgentContext", *, new_chat: bool) -> str: + context.reset() + mq.remove(context) + save_tmp_chat(context) + reason = "integration_commands.new" if new_chat else "integration_commands.clear" + mark_dirty_for_context(context.id, reason=reason) + return "Started a fresh chat." if new_chat else "Chat cleared." + + +def _handle_steer(context: "AgentContext", args: str) -> str: + message = args.strip() + if not message: + return "Usage: /steer " + from agent import UserMessage + + context.communicate(UserMessage(message=message)) + if context.is_running(): + return "Steering message sent to the active run." + return "Message sent." + + +def _handle_pause(context: "AgentContext") -> str: + if not context.is_running(): + return "No active run is currently running." + context.paused = True + return "Agent paused." + + +def _handle_resume(context: "AgentContext") -> str: + context.paused = False + return "Agent resumed." + + +def _handle_nudge(context: "AgentContext") -> str: + context.nudge() + return "Agent nudged." + + +def _handle_toggle(context: "AgentContext", args: str, key: str, label: str) -> str: + value = _parse_toggle(args) + current = _get_toggle(context, key) + if value is None: + state = "on" if current else "off" + return f"{label}: {state}. Use /{key.split('_')[1]} on or /{key.split('_')[1]} off." + context.set_data(key, value) + save_tmp_chat(context) + mark_dirty_for_context(context.id, reason=f"integration_commands.{key}") + return f"{label} {'enabled' if value else 'disabled'}." + + def _handle_project(context: "AgentContext", args: str) -> str: items = projects.get_active_projects_list() or [] current_name = context.get_data("project") or "" @@ -115,7 +388,7 @@ def _handle_project(context: "AgentContext", args: str) -> str: return f"Switched project to {match.get('title') or match['name']}." -def _handle_config(context: "AgentContext", args: str) -> str: +def _handle_model(context: "AgentContext", args: str) -> str: allowed = model_config.is_chat_override_allowed(context.agent0) presets = [preset for preset in model_config.get_presets() if preset.get("name")] current_override = context.get_data("chat_model_override") @@ -123,12 +396,12 @@ def _handle_config(context: "AgentContext", args: str) -> str: if not args: current_label = _describe_override(current_override) available = ", ".join(preset["name"] for preset in presets) or "none" - suffix = "Use /config to switch, or /config default to clear it." + suffix = "Use /model to switch, or /model default to clear it." if not allowed: suffix = "Per-chat config switching is disabled in Model Configuration." return ( - f"Current config: {current_label}\n" - f"Available configs: {available}\n" + f"Current model: {current_label}\n" + f"Available presets: {available}\n" f"{suffix}" ) @@ -159,7 +432,49 @@ def _handle_config(context: "AgentContext", args: str) -> str: context.set_data("chat_model_override", {"preset_name": preset_name}) save_tmp_chat(context) mark_dirty_for_context(context.id, reason="integration_commands.config_set") - return f"Switched config to {preset_name}." + return f"Switched model preset to {preset_name}." + + +def _handle_agent(context: "AgentContext", args: str) -> str: + from agent import Agent + from helpers import subagents + from initialize import initialize_agent + + items = subagents.get_all_agents_list() + current = getattr(context.agent0.config, "profile", "default") + if not args: + available = ", ".join(_format_agent_entry(item) for item in items) or "none" + return ( + f"Current agent: {current}\n" + f"Available agents: {available}\n" + "Use /agent to switch after the current run finishes." + ) + + if context.is_running(): + return "Agent profile can be changed after the current run finishes." + + desired = _strip_quotes(args) + match, ambiguous = _match_named_item(items, desired, keys=("key", "label")) + if ambiguous: + names = ", ".join(_format_agent_entry(item) for item in ambiguous) + return f"Agent profile is ambiguous. Matches: {names}" + if not match: + available = ", ".join(_format_agent_entry(item) for item in items) or "none" + return f"Agent profile '{desired}' was not found. Available agents: {available}" + + profile = str(match["key"]) + if profile == current: + return f"Already using agent {match.get('label') or profile}." + + config = initialize_agent(override_settings={"agent_profile": profile}) + context.config = config + agent = context.agent0 + while agent: + agent.config = config + agent = agent.get_data(Agent.DATA_NAME_SUBORDINATE) + save_tmp_chat(context) + mark_dirty_for_context(context.id, reason="integration_commands.agent_set") + return f"Switched agent to {match.get('label') or profile}." def _format_project_entry(item: dict) -> str: @@ -170,6 +485,19 @@ def _format_project_entry(item: dict) -> str: return name or title +def _format_agent_entry(item: dict) -> str: + key = str(item.get("key", "") or "").strip() + label = str(item.get("label", "") or "").strip() + if label and label.lower() != key.lower(): + return f"{label} ({key})" + return key or label + + +def _telegram_description(command: IntegrationCommandDef) -> str: + description = command.description.strip() + return description[:255] if len(description) > 255 else description + + def _describe_project(items: list[dict], current_name: str) -> str: if not current_name: return "none" @@ -201,6 +529,20 @@ def _normalize_lookup(value: str) -> str: return lowered.strip() +def _get_toggle(context: "AgentContext", key: str) -> bool: + value = context.get_data(key) + return True if value is None else bool(value) + + +def _parse_toggle(args: str) -> bool | None: + value = _normalize_lookup(args) + if value in {"on", "enable", "enabled", "yes", "true", "1"}: + return True + if value in {"off", "disable", "disabled", "no", "false", "0"}: + return False + return None + + def _match_named_item( items: list[dict], desired: str, diff --git a/plugins/_telegram_integration/README.md b/plugins/_telegram_integration/README.md index 8201ab882f..a9793fa352 100644 --- a/plugins/_telegram_integration/README.md +++ b/plugins/_telegram_integration/README.md @@ -16,10 +16,16 @@ This plugin connects one or more Telegram bots to Agent Zero. Each bot runs inde - Supports both long-polling and webhook delivery modes. - **Per-user chat sessions** - Each Telegram user gets a dedicated `AgentContext`, persisted across restarts via a JSON state file. - - `/start` creates a context; `/clear` resets it. + - `/start` creates a context; `/new` starts fresh; `/clear` resets the current context. + - `/commands` shows the shared integration command menu (`/help` is an alias). + - `/status` shows project, model, agent profile, and queue state. - `/project ` switches the active project for the current chat. - - `/config ` switches the active model preset for the current chat. - - `/send` or `/queue send` flushes the queued messages for the current chat. + - `/model ` switches the active model preset for the current chat (`/config` remains an alias). + - `/agent ` switches the active agent profile when the current run is idle. + - `/model`, `/project`, and `/agent` show Telegram inline keyboard pickers when used without arguments. + - `/stream` toggles live response streaming; `/tools` toggles tool progress messages. + - `/send` or `/queue send` flushes queued messages for the current chat. + - `/steer ` sends an intervention to the active run. - **Group support** - Three modes: `mention` (respond only when @mentioned or replied to), `all` (respond to every message), `off` (private only). - Optional welcome message for new members. @@ -27,7 +33,9 @@ This plugin connects one or more Telegram bots to Agent Zero. Each bot runs inde - Extracts text, captions, locations, contacts, stickers, and attachment indicators. - Downloads photos, documents, audio, voice, and video into `usr/uploads/` with configurable auto-cleanup. - **Reply delivery** - - `tool_execute_after` intercepts the `response` tool — sends inline progress for `break_loop=false`. + - Streams tool progress and response text through real Telegram messages updated with `editMessageText`. + - Tool progress and the AI response are separate messages; only the AI response replies to the user message. + - `tool_execute_after` intercepts the `response` tool — sends `break_loop=false` updates as separate intermediate Telegram messages. - `process_chain_end` auto-sends the final response, with retry logic on failure. - **Formatting** - Converts Markdown output to Telegram-compatible HTML (bold, italic, strikethrough, code, links, blockquotes, lists). @@ -36,8 +44,11 @@ This plugin connects one or more Telegram bots to Agent Zero. Each bot runs inde - Agent can attach a `keyboard` array to the `response` tool; button presses feed back as user messages. - **Typing indicator** - Persistent "typing…" action while the agent is processing, cancelled on reply. +- **Busy-run queue** + - Normal user messages received while a run is active are queued automatically. + - `/steer ` is still delivered immediately as an intervention. - **Notifications** - - Optional WebUI notifications for incoming Telegram messages and `/clear` events. + - Optional WebUI notifications for incoming Telegram messages. - **Access control** - Per-bot allow-list by Telegram user ID or @username. Empty list = open to all. - **Project binding** @@ -50,8 +61,13 @@ This plugin connects one or more Telegram bots to Agent Zero. Each bot runs inde - `helpers/handler.py` — Central message routing, context lifecycle, user auth, attachment download, reply sending, typing indicator. - `helpers/bot_manager.py` — Bot creation, polling/webhook lifecycle, bot registry. - `helpers/telegram_client.py` — Low-level Telegram API wrapper: send text/file/photo, Markdown→HTML converter, keyboard builder, message splitting. + - `helpers/command_ui.py` — Telegram inline keyboard command pickers and callback handling. + - `helpers/draft_stream.py` — Editable Telegram message streaming for tool progress and response previews. - **Extensions** - `extensions/python/job_loop/_10_telegram_bot.py` — Bot lifecycle manager, starts/stops bots on each tick. + - `extensions/python/message_loop_start/_45_telegram_draft_start.py` — Initializes Telegram response streaming. + - `extensions/python/tool_execute_before/_45_telegram_draft_tool.py` — Streams tool-start progress. + - `extensions/python/response_stream/_45_telegram_draft_response.py` — Streams final response previews. - `extensions/python/system_prompt/_20_telegram_context.py` — Injects Telegram-specific system prompt. - `extensions/python/tool_execute_after/_50_telegram_response.py` — Intercepts `response` tool for inline delivery. - `extensions/python/process_chain_end/_55_telegram_reply.py` — Auto-sends final reply with retry. diff --git a/plugins/_telegram_integration/extensions/python/_functions/agent/Agent/handle_exception/end/_85_telegram_error.py b/plugins/_telegram_integration/extensions/python/_functions/agent/Agent/handle_exception/end/_85_telegram_error.py new file mode 100644 index 0000000000..b711ba3503 --- /dev/null +++ b/plugins/_telegram_integration/extensions/python/_functions/agent/Agent/handle_exception/end/_85_telegram_error.py @@ -0,0 +1,43 @@ +from helpers.errors import HandledException +from helpers.extension import Extension +from helpers.print_style import PrintStyle +from plugins._telegram_integration.helpers.constants import ( + CTX_TG_BOT, + CTX_TG_ERROR_SENT, + CTX_TG_REPLY_TO, + CTX_TG_TYPING_STOP, +) + + +class TelegramFriendlyError(Extension): + async def execute(self, data: dict = {}, **kwargs): + if not self.agent or self.agent.number != 0: + return + + context = self.agent.context + if not context.data.get(CTX_TG_BOT): + return + + exception = data.get("exception") + if not exception or isinstance(exception, HandledException): + return + + if context.data.get(CTX_TG_ERROR_SENT): + return + + context.data[CTX_TG_ERROR_SENT] = True + + from plugins._telegram_integration.helpers import draft_stream, error_ui, heartbeat + from plugins._telegram_integration.helpers.handler import send_telegram_reply + + text = error_ui.friendly_error_message(exception) + error = await send_telegram_reply(context, text) + if error: + PrintStyle.debug(f"Telegram error reply failed: {error}") + + typing_stop = context.data.pop(CTX_TG_TYPING_STOP, None) + if typing_stop: + typing_stop.set() + await heartbeat.stop(context) + draft_stream.clear(context) + context.data.pop(CTX_TG_REPLY_TO, None) diff --git a/plugins/_telegram_integration/extensions/python/job_loop/_10_telegram_bot.py b/plugins/_telegram_integration/extensions/python/job_loop/_10_telegram_bot.py index 6355a9a963..dff9222eff 100644 --- a/plugins/_telegram_integration/extensions/python/job_loop/_10_telegram_bot.py +++ b/plugins/_telegram_integration/extensions/python/job_loop/_10_telegram_bot.py @@ -31,13 +31,13 @@ async def execute(self, **kwargs: Any) -> None: get_all_bots, create_bot, cache_bot_info, + register_bot_commands, start_polling, setup_webhook, stop_bot, ) from plugins._telegram_integration.helpers.handler import ( handle_start, - handle_clear, handle_message, handle_callback_query, handle_new_members, @@ -73,7 +73,6 @@ async def execute(self, **kwargs: Any) -> None: try: # Create handler closures that capture bot_name and config _on_start = partial(_make_handler(handle_start), bot_name=name, bot_cfg=bot_cfg) - _on_clear = partial(_make_handler(handle_clear), bot_name=name, bot_cfg=bot_cfg) _on_message = partial(_make_handler(handle_message), bot_name=name, bot_cfg=bot_cfg) _on_callback = partial(_make_handler(handle_callback_query), bot_name=name, bot_cfg=bot_cfg) _on_new_members = partial(_make_handler(handle_new_members), bot_name=name, bot_cfg=bot_cfg) @@ -83,7 +82,6 @@ async def execute(self, **kwargs: Any) -> None: token=bot_cfg["token"], on_message=_on_message, on_command_start=_on_start, - on_command_clear=_on_clear, on_command_control=_on_message, on_callback_query=_on_callback, on_new_members=_on_new_members, @@ -91,6 +89,7 @@ async def execute(self, **kwargs: Any) -> None: ) await cache_bot_info(instance) + await register_bot_commands(instance) mode = bot_cfg.get("mode", "polling") if mode == "webhook": diff --git a/plugins/_telegram_integration/extensions/python/message_loop_start/_45_telegram_draft_start.py b/plugins/_telegram_integration/extensions/python/message_loop_start/_45_telegram_draft_start.py new file mode 100644 index 0000000000..6fc4fe89c9 --- /dev/null +++ b/plugins/_telegram_integration/extensions/python/message_loop_start/_45_telegram_draft_start.py @@ -0,0 +1,18 @@ +from agent import LoopData +from helpers.extension import Extension +from plugins._telegram_integration.helpers.constants import CTX_TG_BOT + + +class TelegramDraftStart(Extension): + + async def execute(self, loop_data: LoopData = LoopData(), **kwargs): + if not self.agent or self.agent.number != 0: + return + context = self.agent.context + if not context.data.get(CTX_TG_BOT): + return + + from plugins._telegram_integration.helpers import draft_stream, heartbeat + + await draft_stream.start(context) + await heartbeat.start(context) diff --git a/plugins/_telegram_integration/extensions/python/process_chain_end/_55_telegram_reply.py b/plugins/_telegram_integration/extensions/python/process_chain_end/_55_telegram_reply.py index d0730a6dc3..508c3acf4f 100644 --- a/plugins/_telegram_integration/extensions/python/process_chain_end/_55_telegram_reply.py +++ b/plugins/_telegram_integration/extensions/python/process_chain_end/_55_telegram_reply.py @@ -26,14 +26,13 @@ async def execute(self, loop_data: LoopData = LoopData(), **kwargs): return response_text = _extract_last_response(context) - if not response_text: - return attachments = context.data.pop(CTX_TG_ATTACHMENTS, []) keyboard = context.data.pop(CTX_TG_KEYBOARD, None) try: - await self._send_reply(context, response_text, attachments, keyboard) + if response_text: + await self._send_reply(context, response_text, attachments, keyboard) except Exception as e: PrintStyle.error(f"Telegram auto-reply error: {format_error(e)}") finally: @@ -41,6 +40,10 @@ async def execute(self, loop_data: LoopData = LoopData(), **kwargs): typing_stop = context.data.pop(CTX_TG_TYPING_STOP, None) if typing_stop: typing_stop.set() + from plugins._telegram_integration.helpers import draft_stream, heartbeat + + await heartbeat.stop(context) + draft_stream.clear(context) context.data.pop(CTX_TG_REPLY_TO, None) async def _send_reply( diff --git a/plugins/_telegram_integration/extensions/python/response_stream/_45_telegram_draft_response.py b/plugins/_telegram_integration/extensions/python/response_stream/_45_telegram_draft_response.py new file mode 100644 index 0000000000..47e647d36a --- /dev/null +++ b/plugins/_telegram_integration/extensions/python/response_stream/_45_telegram_draft_response.py @@ -0,0 +1,30 @@ +from agent import LoopData +from helpers.extension import Extension +from plugins._telegram_integration.helpers.constants import CTX_TG_BOT + + +class TelegramDraftResponse(Extension): + + async def execute( + self, + loop_data: LoopData = LoopData(), + parsed: dict | None = None, + **kwargs, + ): + if not self.agent or self.agent.number != 0: + return + context = self.agent.context + if not context.data.get(CTX_TG_BOT): + return + + parsed = parsed or {} + if parsed.get("tool_name") != "response": + return + tool_args = parsed.get("tool_args") or {} + text = tool_args.get("text") or tool_args.get("message") or "" + if not text: + return + + from plugins._telegram_integration.helpers import draft_stream + + await draft_stream.update_response(context, str(text)) diff --git a/plugins/_telegram_integration/extensions/python/system_prompt/_20_telegram_context.py b/plugins/_telegram_integration/extensions/python/system_prompt/_20_telegram_context.py index bf580a43c1..3d10d594b3 100644 --- a/plugins/_telegram_integration/extensions/python/system_prompt/_20_telegram_context.py +++ b/plugins/_telegram_integration/extensions/python/system_prompt/_20_telegram_context.py @@ -1,5 +1,6 @@ from helpers.extension import Extension from agent import LoopData +from helpers import integration_commands from plugins._telegram_integration.helpers.constants import CTX_TG_BOT, CTX_TG_BOT_CFG @@ -18,6 +19,7 @@ async def execute( system_prompt.append( self.agent.read_prompt("fw.telegram.system_context_reply.md") ) + system_prompt.append(_telegram_commands_prompt()) # Inject per-bot agent instructions (once in system prompt) bot_cfg = self.agent.context.data.get(CTX_TG_BOT_CFG, {}) @@ -29,3 +31,18 @@ async def execute( instructions=instructions, ) ) + + +def _telegram_commands_prompt() -> str: + lines = [ + "Telegram slash commands are handled by the integration before you see the message.", + "Do not invent command help, unknown-command replies, or pretend to execute slash commands.", + "If the user asks what commands exist, refer them to /commands.", + "Current integration commands:", + ] + for name in integration_commands.command_names(include_aliases=False, integration="telegram"): + definition = integration_commands.resolve_command(name, integration="telegram") + if definition: + args = f" {definition.args_hint}" if definition.args_hint else "" + lines.append(f"- /{definition.name}{args}: {definition.description}") + return "\n".join(lines) diff --git a/plugins/_telegram_integration/extensions/python/tool_execute_after/_50_telegram_response.py b/plugins/_telegram_integration/extensions/python/tool_execute_after/_50_telegram_response.py index 14968c6232..c959eaf528 100644 --- a/plugins/_telegram_integration/extensions/python/tool_execute_after/_50_telegram_response.py +++ b/plugins/_telegram_integration/extensions/python/tool_execute_after/_50_telegram_response.py @@ -13,14 +13,18 @@ class TelegramResponseIntercept(Extension): async def execute( self, tool_name: str = "", response: Response | None = None, **kwargs, ): - if tool_name != "response": - return if not self.agent: return context = self.agent.context if not context.data.get(CTX_TG_BOT): return + from plugins._telegram_integration.helpers import draft_stream + + if tool_name != "response": + await draft_stream.add_tool_done(context, tool_name, ok=response is not None) + return + tool = self.agent.loop_data.current_tool if not tool: return @@ -43,6 +47,7 @@ async def execute( async def _send_inline(self, context, tool, response: Response): ensure_dependencies() from plugins._telegram_integration.helpers.handler import send_telegram_reply + from plugins._telegram_integration.helpers import draft_stream agent = self.agent assert agent is not None @@ -51,7 +56,12 @@ async def _send_inline(self, context, tool, response: Response): attachments = context.data.pop(CTX_TG_ATTACHMENTS, []) keyboard = context.data.pop(CTX_TG_KEYBOARD, None) - error = await send_telegram_reply(context, text, attachments or None, keyboard) + if attachments: + error = await send_telegram_reply(context, text, attachments or None, keyboard) + elif await draft_stream.send_intermediate_response(context, text, keyboard): + error = None + else: + error = "Telegram intermediate update was not sent" if error: result = agent.read_prompt("fw.telegram.update_error.md", error=error) diff --git a/plugins/_telegram_integration/extensions/python/tool_execute_before/_45_telegram_draft_tool.py b/plugins/_telegram_integration/extensions/python/tool_execute_before/_45_telegram_draft_tool.py new file mode 100644 index 0000000000..4496cd4223 --- /dev/null +++ b/plugins/_telegram_integration/extensions/python/tool_execute_before/_45_telegram_draft_tool.py @@ -0,0 +1,18 @@ +from helpers.extension import Extension +from plugins._telegram_integration.helpers.constants import CTX_TG_BOT + + +class TelegramDraftToolStart(Extension): + + async def execute(self, tool_name: str = "", tool_args: dict | None = None, **kwargs): + if not self.agent or self.agent.number != 0: + return + if (tool_name or "").strip().lower() == "response": + return + context = self.agent.context + if not context.data.get(CTX_TG_BOT): + return + + from plugins._telegram_integration.helpers import draft_stream + + await draft_stream.add_tool_start(context, tool_name, tool_args or {}) diff --git a/plugins/_telegram_integration/helpers/bot_manager.py b/plugins/_telegram_integration/helpers/bot_manager.py index b59c686646..1eec114407 100644 --- a/plugins/_telegram_integration/helpers/bot_manager.py +++ b/plugins/_telegram_integration/helpers/bot_manager.py @@ -6,7 +6,8 @@ from aiogram.client.default import DefaultBotProperties from aiogram.enums import ParseMode, ChatType, ContentType from aiogram.filters import Command, CommandStart -from aiogram.types import Message +from aiogram.types import BotCommand, Message +from helpers import integration_commands from helpers.errors import format_error from helpers.print_style import PrintStyle @@ -44,7 +45,6 @@ def create_bot( token: str, on_message: Callable[..., Awaitable], on_command_start: Callable[..., Awaitable], - on_command_clear: Callable[..., Awaitable], on_command_control: Callable[..., Awaitable] | None = None, on_callback_query: Callable[..., Awaitable] | None = None, on_new_members: Callable[..., Awaitable] | None = None, @@ -56,11 +56,10 @@ def create_bot( # Register command handlers router.message.register(on_command_start, CommandStart()) - router.message.register(on_command_clear, Command("clear")) if on_command_control: router.message.register( on_command_control, - Command(commands=["project", "config", "preset", "queue", "send"]), + Command(commands=integration_commands.command_names(integration="telegram")), ) if on_callback_query: @@ -93,6 +92,21 @@ def create_bot( return instance +async def register_bot_commands(instance: BotInstance) -> None: + """Register Telegram's native / command menu from the shared integration registry.""" + commands = [ + BotCommand(command=name, description=description) + for name, description in integration_commands.telegram_menu_commands() + ] + if not commands: + return + try: + await instance.bot.set_my_commands(commands) + PrintStyle.info(f"Telegram ({instance.name}): registered {len(commands)} bot commands") + except Exception as e: + PrintStyle.error(f"Telegram ({instance.name}): failed to register bot commands: {format_error(e)}") + + async def cache_bot_info(instance: BotInstance): """Fetch and cache bot info. Call after create_bot.""" if not instance.bot_info: diff --git a/plugins/_telegram_integration/helpers/command_ui.py b/plugins/_telegram_integration/helpers/command_ui.py new file mode 100644 index 0000000000..b31dc05010 --- /dev/null +++ b/plugins/_telegram_integration/helpers/command_ui.py @@ -0,0 +1,577 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass + +from agent import AgentContext +from helpers import files, projects, subagents +from helpers import integration_commands +from helpers.persist_chat import save_tmp_chat +from helpers.state_monitor_integration import mark_dirty_for_context +from plugins._model_config.helpers import model_config +from plugins._telegram_integration.helpers import telegram_client as tc +from plugins._telegram_integration.helpers.constants import ( + CTX_TG_STREAM_ENABLED, + CTX_TG_TOOLS_ENABLED, + CTX_TG_BOT, + CTX_TG_CHAT_ID, + CTX_TG_CHAT_TYPE, + CTX_TG_USER_ID, + CTX_TG_USERNAME, + STATE_FILE, +) + +PAGE_SIZE = 8 +SESSION_PAGE_SIZE = 4 +CALLBACK_PREFIX = "tg" + + +@dataclass(frozen=True) +class PickerItem: + key: str + label: str + + +@dataclass(frozen=True) +class SessionItem: + context: AgentContext + label: str + last_message: str + running: bool + + +async def handle_command( + context: AgentContext, + token: str, + chat_id: int, + reply_to_message_id: int | None, + text: str, +) -> bool: + parsed = integration_commands.parse_command(text or "", integration="telegram") + if not parsed: + return False + command, args = parsed + if command in {"/model", "/config", "/preset"} and not args: + await send_model_picker(context, token, chat_id, reply_to_message_id, 0) + return True + if command == "/project" and not args: + await send_project_picker(context, token, chat_id, reply_to_message_id, 0) + return True + if command in {"/agent", "/profile"} and not args: + await send_agent_picker(context, token, chat_id, reply_to_message_id, 0) + return True + if command in {"/sessions", "/session"} and not args: + await send_session_picker(context, token, chat_id, reply_to_message_id, 0) + return True + if command == "/stream": + await send_toggle_picker( + context, + token, + chat_id, + reply_to_message_id, + CTX_TG_STREAM_ENABLED, + "Response streaming", + args, + ) + return True + if command == "/tools": + await send_toggle_picker( + context, + token, + chat_id, + reply_to_message_id, + CTX_TG_TOOLS_ENABLED, + "Tool progress", + args, + ) + return True + return False + + +async def handle_callback( + context: AgentContext, + token: str, + chat_id: int, + message_id: int, + data: str, +) -> bool: + parts = (data or "").split(":") + if len(parts) < 3 or parts[0] != CALLBACK_PREFIX: + return False + kind, action = parts[1], parts[2] + value = parts[3] if len(parts) > 3 else "" + if action == "noop": + return True + if action == "page": + page = _safe_int(value) + if kind == "model": + await edit_model_picker(context, token, chat_id, message_id, page) + elif kind == "project": + await edit_project_picker(context, token, chat_id, message_id, page) + elif kind == "agent": + await edit_agent_picker(context, token, chat_id, message_id, page) + elif kind == "session": + await edit_session_picker(context, token, chat_id, message_id, page) + return True + if kind == "model" and action in {"set", "clear"}: + await _select_model(context, _safe_int(value), clear=(action == "clear")) + await edit_model_picker(context, token, chat_id, message_id, 0, selected=True) + return True + if kind == "project" and action in {"set", "clear"}: + await _select_project(context, _safe_int(value), clear=(action == "clear")) + await edit_project_picker(context, token, chat_id, message_id, 0, selected=True) + return True + if kind == "agent" and action == "set": + await _select_agent(context, _safe_int(value)) + await edit_agent_picker(context, token, chat_id, message_id, 0, selected=True) + return True + if kind == "session" and action == "set": + selected_context = await _select_session(context, _safe_int(value)) + await edit_session_picker(selected_context or context, token, chat_id, message_id, 0, selected=bool(selected_context)) + return True + if kind in {"stream", "tools"} and action in {"on", "off"}: + key = CTX_TG_STREAM_ENABLED if kind == "stream" else CTX_TG_TOOLS_ENABLED + label = "Response streaming" if kind == "stream" else "Tool progress" + context.set_data(key, action == "on") + save_tmp_chat(context) + mark_dirty_for_context(context.id, reason=f"telegram.{kind}_toggle") + await edit_toggle_picker(context, token, chat_id, message_id, key, label) + return True + return True + + +async def send_model_picker( + context: AgentContext, + token: str, + chat_id: int, + reply_to_message_id: int | None, + page: int, +) -> None: + text, markup = _model_view(context, page) + await tc.raw_send_text(token, chat_id, text, reply_to_message_id, "HTML", markup) + + +async def edit_model_picker( + context: AgentContext, + token: str, + chat_id: int, + message_id: int, + page: int, + *, + selected: bool = False, +) -> None: + text, markup = _model_view(context, page, selected=selected) + await tc.raw_edit_text(token, chat_id, message_id, text, "HTML", markup) + + +async def send_project_picker( + context: AgentContext, + token: str, + chat_id: int, + reply_to_message_id: int | None, + page: int, +) -> None: + text, markup = _project_view(context, page) + await tc.raw_send_text(token, chat_id, text, reply_to_message_id, "HTML", markup) + + +async def edit_project_picker( + context: AgentContext, + token: str, + chat_id: int, + message_id: int, + page: int, + *, + selected: bool = False, +) -> None: + text, markup = _project_view(context, page, selected=selected) + await tc.raw_edit_text(token, chat_id, message_id, text, "HTML", markup) + + +async def send_agent_picker( + context: AgentContext, + token: str, + chat_id: int, + reply_to_message_id: int | None, + page: int, +) -> None: + text, markup = _agent_view(context, page) + await tc.raw_send_text(token, chat_id, text, reply_to_message_id, "HTML", markup) + + +async def edit_agent_picker( + context: AgentContext, + token: str, + chat_id: int, + message_id: int, + page: int, + *, + selected: bool = False, +) -> None: + text, markup = _agent_view(context, page, selected=selected) + await tc.raw_edit_text(token, chat_id, message_id, text, "HTML", markup) + + +async def send_toggle_picker( + context: AgentContext, + token: str, + chat_id: int, + reply_to_message_id: int | None, + key: str, + label: str, + args: str = "", +) -> None: + desired = _parse_toggle(args) + if desired is not None: + context.set_data(key, desired) + save_tmp_chat(context) + mark_dirty_for_context(context.id, reason=f"telegram.{key}") + text, markup = _toggle_view(context, key, label) + await tc.raw_send_text(token, chat_id, text, reply_to_message_id, "HTML", markup) + + +async def edit_toggle_picker( + context: AgentContext, + token: str, + chat_id: int, + message_id: int, + key: str, + label: str, +) -> None: + text, markup = _toggle_view(context, key, label) + await tc.raw_edit_text(token, chat_id, message_id, text, "HTML", markup) + + +async def send_session_picker( + context: AgentContext, + token: str, + chat_id: int, + reply_to_message_id: int | None, + page: int, +) -> None: + text, markup = _session_view(context, page) + await tc.raw_send_text(token, chat_id, text, reply_to_message_id, "HTML", markup) + + +async def edit_session_picker( + context: AgentContext, + token: str, + chat_id: int, + message_id: int, + page: int, + *, + selected: bool = False, +) -> None: + text, markup = _session_view(context, page, selected=selected) + await tc.raw_edit_text(token, chat_id, message_id, text, "HTML", markup) + + +def _model_view( + context: AgentContext, + page: int, + *, + selected: bool = False, +) -> tuple[str, dict | None]: + presets = [ + PickerItem(str(preset.get("name", "")), str(preset.get("name", ""))) + for preset in model_config.get_presets() + if isinstance(preset, dict) and preset.get("name") + ] + current = context.get_data("chat_model_override") + current_name = current.get("preset_name") if isinstance(current, dict) else "" + status = f"Current model: {_html(current_name or 'Default')}" + if not model_config.is_chat_override_allowed(context.agent0): + return status + "\nPer-chat model switching is disabled.", None + if selected: + status = "Model updated.\n" + status + rows = _paged_buttons("model", presets, page, current_name) + rows.append([{"text": "Default", "callback_data": "tg:model:clear"}]) + return status, {"inline_keyboard": rows} + + +def _project_view( + context: AgentContext, + page: int, + *, + selected: bool = False, +) -> tuple[str, dict | None]: + items = [ + PickerItem(str(item.get("name", "")), str(item.get("title") or item.get("name") or "")) + for item in projects.get_active_projects_list() or [] + if item.get("name") + ] + current = context.get_data("project") or "" + status = f"Current project: {_html(_label_for(items, current) or 'none')}" + if selected: + status = "Project updated.\n" + status + rows = _paged_buttons("project", items, page, current) + rows.append([{"text": "No project", "callback_data": "tg:project:clear"}]) + return status, {"inline_keyboard": rows} + + +def _agent_view( + context: AgentContext, + page: int, + *, + selected: bool = False, +) -> tuple[str, dict | None]: + items = [ + PickerItem(str(item.get("key", "")), str(item.get("label") or item.get("key") or "")) + for item in subagents.get_all_agents_list() + if item.get("key") + ] + current = getattr(context.agent0.config, "profile", "") or "agent0" + status = f"Current agent: {_html(_label_for(items, current) or current)}" + if context.is_running(): + status += "\nAgent profile can be changed after the current run finishes." + elif selected: + status = "Agent updated.\n" + status + rows = _paged_buttons("agent", items, page, current, disabled=context.is_running()) + if not rows: + return status + "\nNo agent profiles were found.", None + return status, {"inline_keyboard": rows} + + +def _toggle_view(context: AgentContext, key: str, label: str) -> tuple[str, dict]: + enabled = _toggle_enabled(context, key) + kind = "stream" if key == CTX_TG_STREAM_ENABLED else "tools" + state = "enabled" if enabled else "disabled" + text = f"{_html(label)}: {state}" + rows = [[ + {"text": ("On" if enabled else "Turn on"), "callback_data": f"tg:{kind}:on"}, + {"text": ("Off" if not enabled else "Turn off"), "callback_data": f"tg:{kind}:off"}, + ]] + return text, {"inline_keyboard": rows} + + +def _session_view( + context: AgentContext, + page: int, + *, + selected: bool = False, +) -> tuple[str, dict | None]: + items = _session_items() + current_label = _session_label(context) + status = f"Current session: {_html(current_label)}" + if context.is_running(): + status += "\nSession switching is available after the current run finishes." + elif selected: + status = "Session switched.\n" + status + if not items: + return status + "\nNo sessions were found.", None + + total = len(items) + page = _clamp_page(page, total, SESSION_PAGE_SIZE) + start = page * SESSION_PAGE_SIZE + end = min(start + SESSION_PAGE_SIZE, total) + rows: list[list[dict[str, str]]] = [] + for index, item in enumerate(items[start:end], start=start): + marker = "• " if item.context.id == context.id else "" + suffix = " (running)" if item.running else "" + action = "noop" if context.is_running() or item.running else "set" + rows.append([{ + "text": f"{marker}{item.label}{suffix}"[:64], + "callback_data": f"tg:session:{action}:{index}", + }]) + + nav: list[dict[str, str]] = [] + if page > 0: + nav.append({"text": "Prev", "callback_data": f"tg:session:page:{page - 1}"}) + if end < total: + nav.append({"text": "Next", "callback_data": f"tg:session:page:{page + 1}"}) + if nav: + rows.append(nav) + + range_text = f"\nShowing {start + 1}-{end} of {total}." + return status + range_text, {"inline_keyboard": rows} + + +async def _select_model(context: AgentContext, index: int, *, clear: bool = False) -> None: + if not model_config.is_chat_override_allowed(context.agent0): + return + if clear: + context.set_data("chat_model_override", None) + else: + presets = [preset for preset in model_config.get_presets() if preset.get("name")] + if index < 0 or index >= len(presets): + return + context.set_data("chat_model_override", {"preset_name": presets[index]["name"]}) + save_tmp_chat(context) + mark_dirty_for_context(context.id, reason="telegram.model_select") + + +async def _select_project(context: AgentContext, index: int, *, clear: bool = False) -> None: + if clear: + projects.deactivate_project(context.id) + return + items = [item for item in projects.get_active_projects_list() or [] if item.get("name")] + if index < 0 or index >= len(items): + return + projects.activate_project(context.id, str(items[index]["name"])) + + +async def _select_agent(context: AgentContext, index: int) -> None: + if context.is_running(): + return + from agent import Agent + from initialize import initialize_agent + + items = [item for item in subagents.get_all_agents_list() if item.get("key")] + if index < 0 or index >= len(items): + return + profile = str(items[index]["key"]) + config = initialize_agent(override_settings={"agent_profile": profile}) + context.config = config + agent = context.agent0 + while agent: + agent.config = config + agent = agent.get_data(Agent.DATA_NAME_SUBORDINATE) + save_tmp_chat(context) + mark_dirty_for_context(context.id, reason="telegram.agent_select") + + +async def _select_session(context: AgentContext, index: int) -> AgentContext | None: + if context.is_running(): + return None + items = _session_items() + if index < 0 or index >= len(items): + return None + target = items[index].context + if target.is_running(): + return None + + _copy_telegram_binding(context, target) + _set_session_mapping(target) + save_tmp_chat(target) + mark_dirty_for_context(context.id, reason="telegram.session_unselect") + mark_dirty_for_context(target.id, reason="telegram.session_select") + return target + + +def _session_items() -> list[SessionItem]: + contexts = sorted( + AgentContext.all(), + key=lambda item: str(item.output().get("last_message") or ""), + reverse=True, + ) + return [ + SessionItem( + context=item, + label=_session_label(item), + last_message=str(item.output().get("last_message") or ""), + running=item.is_running(), + ) + for item in contexts + ] + + +def _session_label(context: AgentContext) -> str: + return str(context.name or context.id or "Session") + + +def _copy_telegram_binding(source: AgentContext, target: AgentContext) -> None: + for key in ( + CTX_TG_BOT, + CTX_TG_CHAT_ID, + CTX_TG_CHAT_TYPE, + CTX_TG_USER_ID, + CTX_TG_USERNAME, + ): + if key in source.data: + target.data[key] = source.data[key] + + +def _set_session_mapping(context: AgentContext) -> None: + bot_name = str(context.data.get(CTX_TG_BOT) or "") + user_id = context.data.get(CTX_TG_USER_ID) + chat_id = context.data.get(CTX_TG_CHAT_ID) + if not bot_name or user_id is None or chat_id is None: + return + key = f"{bot_name}:{int(user_id)}:{int(chat_id)}" + state = _load_telegram_state() + chats = state.setdefault("chats", {}) + chats[key] = context.id + _save_telegram_state(state) + + +def _load_telegram_state() -> dict: + path = files.get_abs_path(STATE_FILE) + if not files.exists(path): + return {} + try: + return json.loads(files.read_file(path)) + except Exception: + return {} + + +def _save_telegram_state(state: dict) -> None: + path = files.get_abs_path(STATE_FILE) + files.make_dirs(path) + files.write_file(path, json.dumps(state)) + + +def _paged_buttons( + kind: str, + items: list[PickerItem], + page: int, + current: str, + *, + disabled: bool = False, +) -> list[list[dict[str, str]]]: + page = max(0, page) + total_pages = max(1, (len(items) + PAGE_SIZE - 1) // PAGE_SIZE) + page = min(page, total_pages - 1) + start = page * PAGE_SIZE + rows: list[list[dict[str, str]]] = [] + for offset, item in enumerate(items[start:start + PAGE_SIZE], start=start): + marker = "• " if item.key == current else "" + action = "noop" if disabled else "set" + rows.append([{ + "text": f"{marker}{item.label}"[:64], + "callback_data": f"tg:{kind}:{action}:{offset}", + }]) + nav: list[dict[str, str]] = [] + if page > 0: + nav.append({"text": "Prev", "callback_data": f"tg:{kind}:page:{page - 1}"}) + if page < total_pages - 1: + nav.append({"text": "Next", "callback_data": f"tg:{kind}:page:{page + 1}"}) + if nav: + rows.append(nav) + return rows + + +def _toggle_enabled(context: AgentContext, key: str) -> bool: + value = context.get_data(key) + return True if value is None else bool(value) + + +def _parse_toggle(args: str) -> bool | None: + value = (args or "").strip().lower() + if value in {"on", "enable", "enabled", "yes", "true", "1"}: + return True + if value in {"off", "disable", "disabled", "no", "false", "0"}: + return False + return None + + +def _safe_int(value: str) -> int: + try: + return int(value) + except (TypeError, ValueError): + return 0 + + +def _clamp_page(page: int, total_items: int, page_size: int) -> int: + total_pages = max(1, (total_items + page_size - 1) // page_size) + return min(max(0, page), total_pages - 1) + + +def _label_for(items: list[PickerItem], key: str) -> str: + for item in items: + if item.key == key: + return item.label + return key + + +def _html(value: str) -> str: + return str(value).replace("&", "&").replace("<", "<").replace(">", ">") diff --git a/plugins/_telegram_integration/helpers/constants.py b/plugins/_telegram_integration/helpers/constants.py index 830e415472..190ee7fa46 100644 --- a/plugins/_telegram_integration/helpers/constants.py +++ b/plugins/_telegram_integration/helpers/constants.py @@ -6,11 +6,22 @@ CTX_TG_BOT = "telegram_bot" CTX_TG_BOT_CFG = "telegram_bot_cfg" CTX_TG_CHAT_ID = "telegram_chat_id" +CTX_TG_CHAT_TYPE = "telegram_chat_type" CTX_TG_USER_ID = "telegram_user_id" CTX_TG_USERNAME = "telegram_username" CTX_TG_TYPING_STOP = "_telegram_typing_stop" CTX_TG_REPLY_TO = "_telegram_reply_to_message_id" +CTX_TG_STREAM_ENABLED = "telegram_stream_enabled" +CTX_TG_TOOLS_ENABLED = "telegram_tools_enabled" # Transient CTX_TG_ATTACHMENTS = "_telegram_response_attachments" CTX_TG_KEYBOARD = "_telegram_response_keyboard" +CTX_TG_PROGRESS_MESSAGE_ID = "_telegram_progress_message_id" +CTX_TG_PROGRESS_LINES = "_telegram_progress_lines" +CTX_TG_RESPONSE_MESSAGE_ID = "_telegram_response_message_id" +CTX_TG_RESPONSE_TEXT = "_telegram_response_text" +CTX_TG_RESPONSE_LAST_UPDATE = "_telegram_response_last_update" +CTX_TG_ERROR_SENT = "_telegram_error_sent" +CTX_TG_HEARTBEAT_TASK = "_telegram_heartbeat_task" +CTX_TG_HEARTBEAT_STOP = "_telegram_heartbeat_stop" diff --git a/plugins/_telegram_integration/helpers/draft_stream.py b/plugins/_telegram_integration/helpers/draft_stream.py new file mode 100644 index 0000000000..22390fdfff --- /dev/null +++ b/plugins/_telegram_integration/helpers/draft_stream.py @@ -0,0 +1,406 @@ +from __future__ import annotations + +import re +import time + +from agent import AgentContext +from helpers.print_style import PrintStyle +from plugins._telegram_integration.helpers import telegram_client as tc +from plugins._telegram_integration.helpers.bot_manager import get_bot +from plugins._telegram_integration.helpers.constants import ( + CTX_TG_BOT, + CTX_TG_CHAT_ID, + CTX_TG_PROGRESS_LINES, + CTX_TG_PROGRESS_MESSAGE_ID, + CTX_TG_REPLY_TO, + CTX_TG_RESPONSE_LAST_UPDATE, + CTX_TG_RESPONSE_MESSAGE_ID, + CTX_TG_RESPONSE_TEXT, + CTX_TG_STREAM_ENABLED, + CTX_TG_TOOLS_ENABLED, + CTX_TG_ERROR_SENT, +) + +MAX_STREAM_CHARS: int = 3900 +MIN_RESPONSE_UPDATE_SECONDS: float = 1.0 +MAX_PROGRESS_LINES: int = 12 + +TOOL_EMOJIS: dict[str, str] = { + "browser": "🌐", + "code": "⌨️", + "code_execution_tool": "⌨️", + "duckduckgo_search": "🔎", + "read_file": "📖", + "file": "📄", + "knowledge_tool": "📚", + "memory": "🧠", + "search": "🔎", + "search_engine": "🔎", + "search_files": "🔎", + "skill": "📚", + "skill_view": "📚", +} + + +async def start(context: AgentContext) -> None: + context.data.pop(CTX_TG_ERROR_SENT, None) + # Do not pre-create a placeholder Telegram message. Wait until we have + # real assistant text so the stream starts with meaningful content. + if not _stream_enabled(context): + return + + +async def add_tool_start( + context: AgentContext, + tool_name: str, + args: dict | None = None, +) -> None: + if not _tools_enabled(context): + return + label = _tool_progress_label(tool_name, args or {}) + _append_progress_line(context, f"{_tool_emoji(tool_name)} {label}") + await _send_progress(context) + + +async def add_tool_done(context: AgentContext, tool_name: str, ok: bool = True) -> None: + if not _tools_enabled(context): + return + if ok: + return + _mark_tool_failed(context, tool_name) + await _send_progress(context) + + +async def update_response(context: AgentContext, response_text: str) -> None: + if not _stream_enabled(context): + return + cleaned = _visible_response_text(response_text) + if not cleaned: + return + context.data[CTX_TG_RESPONSE_TEXT] = response_text or "" + now = time.time() + last = float(context.data.get(CTX_TG_RESPONSE_LAST_UPDATE) or 0.0) + if now - last < MIN_RESPONSE_UPDATE_SECONDS: + return + await _update_response_message(context, response_text or "") + context.data[CTX_TG_RESPONSE_LAST_UPDATE] = now + + +async def send_intermediate_response( + context: AgentContext, + response_text: str, + keyboard: list[list[dict]] | None = None, +) -> bool: + if context.data.get(CTX_TG_RESPONSE_MESSAGE_ID): + sent = await finalize_response(context, response_text, keyboard) + if sent: + _reset_progress_group(context) + return sent + + html = _format_response(response_text) + if not html: + return False + bot = _bot_instance(context) + chat_id = context.data.get(CTX_TG_CHAT_ID) + if not bot or not chat_id: + return False + try: + sent_id = await tc.raw_send_text( + bot.bot.token, + int(chat_id), + html, + parse_mode="HTML", + reply_markup=_keyboard_markup(keyboard), + ) + sent = bool(sent_id) + if sent: + _reset_progress_group(context) + return sent + except Exception as e: + PrintStyle.debug(f"Telegram intermediate response failed: {e}") + return False + + +async def finalize_response( + context: AgentContext, + response_text: str, + keyboard: list[list[dict]] | None = None, +) -> bool: + message_id = context.data.get(CTX_TG_RESPONSE_MESSAGE_ID) + if not message_id: + return False + ok = await _update_response_message( + context, + response_text or context.data.get(CTX_TG_RESPONSE_TEXT) or "", + keyboard=keyboard, + force=True, + ) + if ok: + context.data.pop(CTX_TG_RESPONSE_MESSAGE_ID, None) + context.data.pop(CTX_TG_RESPONSE_TEXT, None) + context.data.pop(CTX_TG_RESPONSE_LAST_UPDATE, None) + return ok + + +def clear(context: AgentContext) -> None: + for key in ( + CTX_TG_PROGRESS_LINES, + CTX_TG_PROGRESS_MESSAGE_ID, + CTX_TG_RESPONSE_MESSAGE_ID, + CTX_TG_RESPONSE_TEXT, + CTX_TG_RESPONSE_LAST_UPDATE, + ): + context.data.pop(key, None) + + +def _reset_progress_group(context: AgentContext) -> None: + context.data.pop(CTX_TG_PROGRESS_LINES, None) + context.data.pop(CTX_TG_PROGRESS_MESSAGE_ID, None) + + +def _stream_enabled(context: AgentContext) -> bool: + value = context.get_data(CTX_TG_STREAM_ENABLED) + return True if value is None else bool(value) + + +def _tools_enabled(context: AgentContext) -> bool: + value = context.get_data(CTX_TG_TOOLS_ENABLED) + return True if value is None else bool(value) + + +async def _send_progress(context: AgentContext) -> None: + bot = _bot_instance(context) + chat_id = context.data.get(CTX_TG_CHAT_ID) + if not bot or not chat_id: + return + text = "\n".join(context.data.get(CTX_TG_PROGRESS_LINES) or []) + if not text: + return + message_id = context.data.get(CTX_TG_PROGRESS_MESSAGE_ID) + try: + if message_id: + await tc.raw_edit_text(bot.bot.token, int(chat_id), int(message_id), text, parse_mode=None) + return + sent_id = await tc.raw_send_text( + bot.bot.token, + int(chat_id), + text, + parse_mode=None, + ) + if sent_id: + context.data[CTX_TG_PROGRESS_MESSAGE_ID] = sent_id + except Exception as e: + PrintStyle.debug(f"Telegram progress update failed: {e}") + + +async def _ensure_response_message(context: AgentContext, text: str) -> int | None: + message_id = context.data.get(CTX_TG_RESPONSE_MESSAGE_ID) + if message_id: + return int(message_id) + html = _format_response(text) + if not html: + return None + bot = _bot_instance(context) + chat_id = context.data.get(CTX_TG_CHAT_ID) + if not bot or not chat_id: + return None + sent_id = await tc.raw_send_text( + bot.bot.token, + int(chat_id), + html, + reply_to_message_id=_reply_to(context), + parse_mode="HTML", + ) + if sent_id: + context.data[CTX_TG_RESPONSE_MESSAGE_ID] = sent_id + context.data[CTX_TG_RESPONSE_LAST_UPDATE] = time.time() + return sent_id + + +async def _update_response_message( + context: AgentContext, + text: str, + *, + keyboard: list[list[dict]] | None = None, + force: bool = False, +) -> bool: + had_message = bool(context.data.get(CTX_TG_RESPONSE_MESSAGE_ID)) + message_id = await _ensure_response_message(context, text) + bot = _bot_instance(context) + chat_id = context.data.get(CTX_TG_CHAT_ID) + if not message_id or not bot or not chat_id: + return False + markup = _keyboard_markup(keyboard) + html = _format_response(text) + if not had_message and not markup and not force: + return True + try: + ok = await tc.raw_edit_text( + bot.bot.token, + int(chat_id), + int(message_id), + html, + parse_mode="HTML", + reply_markup=markup, + ) + if ok or force: + return ok + return False + except Exception as e: + PrintStyle.debug(f"Telegram response update failed: {e}") + return False + + +def _append_progress_line(context: AgentContext, line: str) -> None: + lines = list(context.data.get(CTX_TG_PROGRESS_LINES) or []) + if lines and lines[-1] == line: + return + lines.append(line) + context.data[CTX_TG_PROGRESS_LINES] = lines[-MAX_PROGRESS_LINES:] + + +def _mark_tool_failed(context: AgentContext, tool_name: str) -> None: + lines = list(context.data.get(CTX_TG_PROGRESS_LINES) or []) + if not lines: + return + + label = _tool_label(tool_name) + for index in range(len(lines) - 1, -1, -1): + line = lines[index] + if _line_matches_tool(line, label): + _, _, suffix = line.partition(" ") + lines[index] = f"❌ {suffix or label}" + context.data[CTX_TG_PROGRESS_LINES] = lines[-MAX_PROGRESS_LINES:] + return + + lines.append(f"❌ {label}") + context.data[CTX_TG_PROGRESS_LINES] = lines[-MAX_PROGRESS_LINES:] + + +def _bot_instance(context: AgentContext): + bot_name = context.data.get(CTX_TG_BOT) + return get_bot(bot_name) if bot_name else None + + +def _reply_to(context: AgentContext) -> int | None: + value = context.data.get(CTX_TG_REPLY_TO) + try: + return int(value) if value else None + except (TypeError, ValueError): + return None + + +def _tool_label(tool_name: str) -> str: + value = (tool_name or "tool").replace("_", " ").replace("-", " ").strip() + return value or "tool" + + +def _line_matches_tool(line: str, label: str) -> bool: + _, _, suffix = line.partition(" ") + normalized_suffix = suffix.strip().lower() + normalized_label = label.strip().lower() + return normalized_suffix == normalized_label or normalized_suffix.startswith(f"{normalized_label}:") + + +def _tool_progress_label(tool_name: str, args: dict) -> str: + label = _tool_label(tool_name) + detail = _tool_detail(tool_name, args) + return f"{label}: {detail}" if detail else label + + +def _tool_detail(tool_name: str, args: dict) -> str: + if not isinstance(args, dict) or not args: + return "" + + normalized = (tool_name or "").strip().lower() + if "search" in normalized: + return _first_arg(args, ("query", "q", "search", "term", "keywords", "pattern")) + if "browser" in normalized: + return _first_arg(args, ("url", "link", "query", "action")) + if "file" in normalized: + return _first_arg(args, ("path", "file", "filename", "query", "pattern")) + if "skill" in normalized: + return _first_arg(args, ("skill", "name", "query", "path")) + + return _first_arg(args, ("action", "method", "operation")) + + +def _first_arg(args: dict, keys: tuple[str, ...]) -> str: + for key in keys: + value = args.get(key) + if value is None: + continue + text = _compact_detail(value) + if text: + return text + return "" + + +def _compact_detail(value: object) -> str: + if isinstance(value, (list, tuple)): + parts = [_compact_detail(item) for item in value[:2]] + text = ", ".join(part for part in parts if part) + if len(value) > 2: + text = f"{text}, ..." + elif isinstance(value, dict): + return "" + else: + text = str(value).strip() + + text = re.sub(r"\s+", " ", text).strip().strip("\"'") + if len(text) > 80: + text = f"{text[:77].rstrip()}..." + return text + + +def _tool_emoji(tool_name: str) -> str: + normalized = (tool_name or "").strip().lower() + for key, emoji in TOOL_EMOJIS.items(): + if key in normalized: + return emoji + return "🛠️" + + +def _format_response(text: str) -> str: + value = _visible_response_text(text) + if not value: + return "" + return tc.md_to_telegram_html(value) + + +def _strip_incomplete_tool_markup(text: str) -> str: + value = text.lstrip() + value = re.sub(r"^<[^>\n]{0,80}$", "", value) + value = re.sub(r"^```(?:json|xml)?\s*$", "", value, flags=re.IGNORECASE) + return value + + +def _visible_response_text(text: str) -> str: + value = _trim(text or "") + return _strip_incomplete_tool_markup(value) + + +def _trim(text: str) -> str: + if len(text) <= MAX_STREAM_CHARS: + return text + return text[-MAX_STREAM_CHARS:] + + +def _keyboard_markup(keyboard: list[list[dict]] | None) -> dict | None: + if not keyboard: + return None + rows: list[list[dict[str, str]]] = [] + for row in keyboard: + out_row: list[dict[str, str]] = [] + for button in row: + text = str(button.get("text") or "")[:64] + if not text: + continue + if button.get("url"): + out_row.append({"text": text, "url": str(button["url"])}) + else: + data = str(button.get("callback_data", text)) + out_row.append({"text": text, "callback_data": data[:64]}) + if out_row: + rows.append(out_row) + return {"inline_keyboard": rows} if rows else None diff --git a/plugins/_telegram_integration/helpers/error_ui.py b/plugins/_telegram_integration/helpers/error_ui.py new file mode 100644 index 0000000000..b9db82ebbc --- /dev/null +++ b/plugins/_telegram_integration/helpers/error_ui.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import re + + +MAX_DETAIL_CHARS = 500 + + +def friendly_error_message(exception: BaseException) -> str: + summary = _error_summary(exception) + category = _classify_error(exception, summary) + + if category == "auth": + return ( + "**Agent Zero hit a provider setup issue.**\n\n" + "The model provider rejected the request because an API key or credential is missing, invalid, or unauthorized.\n\n" + f"Details: `{summary}`\n\n" + "Please check the model/API key settings, then send the message again." + ) + + if category == "rate_limit": + return ( + "**Agent Zero was rate limited by the model provider.**\n\n" + "The provider is asking us to slow down before trying again.\n\n" + f"Details: `{summary}`" + ) + + if category == "timeout": + return ( + "**Agent Zero did not get a response in time.**\n\n" + "The provider or tool call timed out before the agent could finish this request.\n\n" + f"Details: `{summary}`" + ) + + if category == "provider": + return ( + "**Agent Zero could not complete the model request.**\n\n" + "The model provider returned an error before the agent could finish.\n\n" + f"Details: `{summary}`" + ) + + return ( + "**Agent Zero ran into an error while working on this.**\n\n" + f"Details: `{summary}`" + ) + + +def _classify_error(exception: BaseException, summary: str) -> str: + text = f"{type(exception).__name__} {summary}".lower() + + if any( + marker in text + for marker in ( + "api key", + "api_key", + "apikey", + "auth", + "credential", + "unauthorized", + "forbidden", + "invalid key", + "missing key", + "permission denied", + "401", + "403", + ) + ): + return "auth" + + if any(marker in text for marker in ("rate limit", "ratelimit", "too many requests", "429")): + return "rate_limit" + + if any(marker in text for marker in ("timeout", "timed out", "deadline", "read timed out")): + return "timeout" + + if any( + marker in text + for marker in ( + "litellm", + "openai", + "anthropic", + "model", + "provider", + "badrequest", + "service unavailable", + "overloaded", + "503", + ) + ): + return "provider" + + return "generic" + + +def _error_summary(exception: BaseException) -> str: + text = str(exception).strip() or type(exception).__name__ + text = re.sub(r"\s+", " ", text) + if len(text) > MAX_DETAIL_CHARS: + text = f"{text[: MAX_DETAIL_CHARS - 3].rstrip()}..." + return text diff --git a/plugins/_telegram_integration/helpers/handler.py b/plugins/_telegram_integration/helpers/handler.py index ea2dbd055d..5adad0e12f 100644 --- a/plugins/_telegram_integration/helpers/handler.py +++ b/plugins/_telegram_integration/helpers/handler.py @@ -21,6 +21,7 @@ from initialize import initialize_agent from plugins._telegram_integration.helpers import telegram_client as tc +from plugins._telegram_integration.helpers import command_ui from plugins._telegram_integration.helpers.bot_manager import get_bot from plugins._telegram_integration.helpers.constants import ( PLUGIN_NAME, @@ -29,6 +30,7 @@ CTX_TG_BOT, CTX_TG_BOT_CFG, CTX_TG_CHAT_ID, + CTX_TG_CHAT_TYPE, CTX_TG_USER_ID, CTX_TG_USERNAME, CTX_TG_TYPING_STOP, @@ -141,55 +143,15 @@ async def handle_start(message: TgMessage, bot_name: str, bot_cfg: dict): f"\U0001f44b Hello {user.first_name}! I'm connected to Agent Zero.\n\n" "Send me a message and I'll process it.\n" "Use /clear to reset the conversation.\n" - "Use /project, /config, or /send to control the current chat.", + "Use /project, /model, /agent, or /send to control the current chat.", parse_mode=None, + reply_to_message_id=message.message_id, ) # Ensure a chat context exists await _get_or_create_context(bot_name, bot_cfg, message) -async def handle_clear(message: TgMessage, bot_name: str, bot_cfg: dict): - """Handle /clear command — reset user's chat context.""" - user = message.from_user - if not user: - return - - if not _is_allowed(bot_cfg, user.id, user.username): - return - - key = _map_key(bot_name, user.id, message.chat.id) - - with _chat_map_lock: - state = _load_state() - ctx_id = state.get("chats", {}).get(key) - if ctx_id: - ctx = AgentContext.get(ctx_id) - if ctx: - ctx.reset() - PrintStyle.info(f"Telegram ({bot_name}): cleared chat for user {user.id}") - - instance = get_bot(bot_name) - if instance: - await _send_with_temp_bot( - instance.bot.token, message.chat.id, - "Chat cleared. Send a new message to start fresh.", - parse_mode=None, - ) - - # Send notification - if bot_cfg.get("notify_messages", False): - username_str = f"@{user.username}" if user.username else str(user.id) - NotificationManager.send_notification( - type=NotificationType.INFO, - priority=NotificationPriority.NORMAL, - title="Telegram: chat cleared", - message=f"{username_str} cleared their chat via /clear", - display_time=5, - group="telegram", - ) - - async def handle_message(message: TgMessage, bot_name: str, bot_cfg: dict): """Handle incoming user message.""" user = message.from_user @@ -212,26 +174,38 @@ async def handle_message(message: TgMessage, bot_name: str, bot_cfg: dict): parse_mode=None, ) return + context.data[CTX_TG_CHAT_TYPE] = str(message.chat.type or "") + context.data[CTX_TG_REPLY_TO] = message.message_id + + if await command_ui.handle_command( + context, + instance.bot.token, + message.chat.id, + message.message_id, + text, + ): + return - command_reply = integration_commands.try_handle_command(context, text) + command_reply = integration_commands.try_handle_command(context, text, integration="telegram") if command_reply is not None: - await _send_with_temp_bot(instance.bot.token, message.chat.id, command_reply, parse_mode=None) + await _send_with_temp_bot( + instance.bot.token, + message.chat.id, + command_reply, + parse_mode=None, + reply_to_message_id=message.message_id, + ) + return + if integration_commands.extract_command_line(text).startswith("/"): + command = integration_commands.extract_command_line(text).split(" ", 1)[0] + await _send_with_temp_bot( + instance.bot.token, + message.chat.id, + integration_commands.unknown_command_text(command, integration="telegram"), + parse_mode=None, + reply_to_message_id=message.message_id, + ) return - - # Start persistent typing indicator (thread-based, works across event loops) - typing_stop = _start_typing(instance.bot.token, message.chat.id) - - # Store stop event so send_telegram_reply can cancel typing - context.data[CTX_TG_TYPING_STOP] = typing_stop - - # In group chats, if user replied to the bot's message, reply to the user's message - reply_to_id = None - if message.chat.type != "private" and instance.bot_info: - if (message.reply_to_message - and message.reply_to_message.from_user - and message.reply_to_message.from_user.id == instance.bot_info.id): - reply_to_id = message.message_id - context.data[CTX_TG_REPLY_TO] = reply_to_id # Use temp bot for downloads (cross-event-loop safe) async with _temp_bot(instance.bot.token) as dl_bot: @@ -245,6 +219,24 @@ async def handle_message(message: TgMessage, bot_name: str, bot_cfg: dict): body=text, ) + if context.is_running(): + item = mq.add(context, user_msg, attachments) + save_tmp_chat(context) + await _send_with_temp_bot( + instance.bot.token, + message.chat.id, + f"Queued message #{item.get('seq', len(mq.get_queue(context)))}. Use /send to flush queued work, or /steer to interrupt the active run.", + parse_mode=None, + reply_to_message_id=message.message_id, + ) + return + + # Start persistent typing indicator (thread-based, works across event loops) + typing_stop = _start_typing(instance.bot.token, message.chat.id) + + # Store stop event so send_telegram_reply can cancel typing + context.data[CTX_TG_TYPING_STOP] = typing_stop + msg_id = str(uuid.uuid4()) mq.log_user_message(context, user_msg, attachments, message_id=msg_id, source=" (telegram)") context.communicate(UserMessage( @@ -287,16 +279,40 @@ async def handle_callback_query(query: CallbackQuery, bot_name: str, bot_cfg: di return context = await _get_or_create_context_from_user( - bot_name, bot_cfg, user.id, user.username, query.message.chat.id, + bot_name, bot_cfg, user.id, user.username, query.message.chat.id, str(query.message.chat.type or ""), ) if not context: return + context.data[CTX_TG_REPLY_TO] = query.message.message_id + + instance = get_bot(bot_name) + if instance: + try: + if await command_ui.handle_callback( + context, + instance.bot.token, + query.message.chat.id, + query.message.message_id, + text, + ): + return + except Exception as e: + PrintStyle.error(f"Telegram callback failed: {format_error(e)}") + if text.startswith("tg:"): + return + if text.startswith("tg:"): + return - command_reply = integration_commands.try_handle_command(context, text) + command_reply = integration_commands.try_handle_command(context, text, integration="telegram") if command_reply is not None: - instance = get_bot(bot_name) if instance: - await _send_with_temp_bot(instance.bot.token, query.message.chat.id, command_reply, parse_mode=None) + await _send_with_temp_bot( + instance.bot.token, + query.message.chat.id, + command_reply, + parse_mode=None, + reply_to_message_id=query.message.message_id, + ) return agent = context.agent0 @@ -347,7 +363,7 @@ async def _get_or_create_context( if not user: return None return await _get_or_create_context_from_user( - bot_name, bot_cfg, user.id, user.username, message.chat.id, + bot_name, bot_cfg, user.id, user.username, message.chat.id, str(message.chat.type or ""), ) @@ -357,6 +373,7 @@ async def _get_or_create_context_from_user( user_id: int, username: str | None, chat_id: int, + chat_type: str = "", ) -> AgentContext | None: key = _map_key(bot_name, user_id, chat_id) @@ -369,6 +386,7 @@ async def _get_or_create_context_from_user( if ctx_id: ctx = AgentContext.get(ctx_id) if ctx: + ctx.data[CTX_TG_CHAT_TYPE] = chat_type or ctx.data.get(CTX_TG_CHAT_TYPE, "") return ctx # Context was garbage collected, remove stale mapping chats.pop(key, None) @@ -382,6 +400,7 @@ async def _get_or_create_context_from_user( ctx.data[CTX_TG_BOT] = bot_name ctx.data[CTX_TG_BOT_CFG] = bot_cfg ctx.data[CTX_TG_CHAT_ID] = chat_id + ctx.data[CTX_TG_CHAT_TYPE] = chat_type ctx.data[CTX_TG_USER_ID] = user_id ctx.data[CTX_TG_USERNAME] = username or "" @@ -507,12 +526,22 @@ async def send_telegram_reply( local_path = files.fix_dev_path(path) if tc.is_image_file(local_path): await tc.send_photo(reply_bot, chat_id, local_path, reply_to_message_id=reply_to) + elif tc.is_voice_file(local_path): + await tc.send_voice(reply_bot, chat_id, local_path, reply_to_message_id=reply_to) + elif tc.is_audio_file(local_path): + await tc.send_audio(reply_bot, chat_id, local_path, reply_to_message_id=reply_to) + elif tc.is_video_file(local_path): + await tc.send_video(reply_bot, chat_id, local_path, reply_to_message_id=reply_to) else: await tc.send_file(reply_bot, chat_id, local_path, reply_to_message_id=reply_to) if response_text: html_text = tc.md_to_telegram_html(response_text) - if keyboard: + from plugins._telegram_integration.helpers import draft_stream + + if await draft_stream.finalize_response(context, response_text, keyboard): + pass + elif keyboard: await tc.send_text_with_keyboard(reply_bot, chat_id, html_text, keyboard, reply_to_message_id=reply_to) else: await tc.send_text(reply_bot, chat_id, html_text, reply_to_message_id=reply_to) @@ -537,10 +566,22 @@ async def _temp_bot(token: str, **kwargs): await bot.session.close() -async def _send_with_temp_bot(token: str, chat_id: int, text: str, parse_mode: str | None = None): +async def _send_with_temp_bot( + token: str, + chat_id: int, + text: str, + parse_mode: str | None = None, + reply_to_message_id: int | None = None, +): """Send text using a temporary Bot to avoid cross-event-loop session issues.""" async with _temp_bot(token) as bot: - await tc.send_text(bot, chat_id, text, parse_mode=parse_mode) + await tc.send_text( + bot, + chat_id, + text, + reply_to_message_id=reply_to_message_id, + parse_mode=parse_mode, + ) def _start_typing(token: str, chat_id: int) -> threading.Event: diff --git a/plugins/_telegram_integration/helpers/heartbeat.py b/plugins/_telegram_integration/helpers/heartbeat.py new file mode 100644 index 0000000000..188127e415 --- /dev/null +++ b/plugins/_telegram_integration/helpers/heartbeat.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import asyncio +import re +import time +from contextlib import suppress + +from agent import AgentContext +from helpers.print_style import PrintStyle +from plugins._telegram_integration.helpers import telegram_client as tc +from plugins._telegram_integration.helpers.bot_manager import get_bot +from plugins._telegram_integration.helpers.constants import ( + CTX_TG_BOT, + CTX_TG_CHAT_ID, + CTX_TG_HEARTBEAT_STOP, + CTX_TG_HEARTBEAT_TASK, +) + + +HEARTBEAT_INTERVAL_SECONDS = 180 + + +async def start(context: AgentContext) -> None: + task = context.data.get(CTX_TG_HEARTBEAT_TASK) + if task and not task.done(): + return + + stop_event = asyncio.Event() + context.data[CTX_TG_HEARTBEAT_STOP] = stop_event + context.data[CTX_TG_HEARTBEAT_TASK] = asyncio.create_task( + _heartbeat_loop(context, stop_event, time.monotonic()) + ) + + +async def stop(context: AgentContext) -> None: + stop_event = context.data.pop(CTX_TG_HEARTBEAT_STOP, None) + if stop_event: + stop_event.set() + + task = context.data.pop(CTX_TG_HEARTBEAT_TASK, None) + if task and not task.done(): + task.cancel() + with suppress(asyncio.CancelledError): + await task + + +async def _heartbeat_loop( + context: AgentContext, + stop_event: asyncio.Event, + started_at: float, +) -> None: + try: + while True: + try: + await asyncio.wait_for(stop_event.wait(), timeout=HEARTBEAT_INTERVAL_SECONDS) + return + except asyncio.TimeoutError: + await _send_heartbeat(context, time.monotonic() - started_at) + except asyncio.CancelledError: + raise + except Exception as exc: + PrintStyle.debug(f"Telegram heartbeat stopped: {exc}") + + +async def _send_heartbeat(context: AgentContext, elapsed_seconds: float) -> bool: + bot_name = context.data.get(CTX_TG_BOT) + chat_id = context.data.get(CTX_TG_CHAT_ID) + bot = get_bot(bot_name) if bot_name else None + if not bot or not chat_id: + return False + + text = heartbeat_text(elapsed_seconds, _current_reason(context)) + sent_id = await tc.raw_send_text( + bot.bot.token, + int(chat_id), + text, + parse_mode=None, + ) + return bool(sent_id) + + +def heartbeat_text(elapsed_seconds: float, reason: str = "") -> str: + minutes = max(1, int(round(elapsed_seconds / 60))) + detail = reason or "working on your request" + return f"Still working... ({minutes} min elapsed - {detail})" + + +def _current_reason(context: AgentContext) -> str: + progress = str(getattr(getattr(context, "log", None), "progress", "") or "") + progress = _clean_progress(progress) + if not progress or progress.lower() == "waiting for input": + return "working on your request" + return progress + + +def _clean_progress(text: str) -> str: + value = re.sub(r"icon://[a-zA-Z0-9_]+(?:\[[^\]]*\])?\s*", "", text) + value = re.sub(r"\s+", " ", value).strip() + if len(value) > 80: + value = f"{value[:77].rstrip()}..." + return value diff --git a/plugins/_telegram_integration/helpers/telegram_client.py b/plugins/_telegram_integration/helpers/telegram_client.py index 4ea1041234..2fc9f69f10 100644 --- a/plugins/_telegram_integration/helpers/telegram_client.py +++ b/plugins/_telegram_integration/helpers/telegram_client.py @@ -1,6 +1,7 @@ import os import re +import aiohttp from aiogram import Bot from aiogram.exceptions import TelegramBadRequest from aiogram.types import ( @@ -17,6 +18,7 @@ # Text messages MAX_MESSAGE_LENGTH: int = 4096 # Telegram message length limit +TELEGRAM_API_BASE: str = "https://api.telegram.org" async def send_text( @@ -113,6 +115,81 @@ async def send_photo( return None +async def send_voice( + bot: Bot, + chat_id: int, + voice_path: str, + caption: str = "", + reply_to_message_id: int | None = None, +) -> int | None: + """Send a voice message from local path. Returns message_id or None on error.""" + try: + if not os.path.isfile(voice_path): + PrintStyle.error(f"Telegram: voice file not found: {voice_path}") + return None + input_file = FSInputFile(voice_path) + msg = await bot.send_voice( + chat_id=chat_id, + voice=input_file, + caption=caption[:1024] if caption else None, + reply_to_message_id=reply_to_message_id, + ) + return msg.message_id + except Exception as e: + PrintStyle.error(f"Telegram send_voice failed: {format_error(e)}") + return None + + +async def send_audio( + bot: Bot, + chat_id: int, + audio_path: str, + caption: str = "", + reply_to_message_id: int | None = None, +) -> int | None: + """Send an audio message from local path. Returns message_id or None on error.""" + try: + if not os.path.isfile(audio_path): + PrintStyle.error(f"Telegram: audio file not found: {audio_path}") + return None + input_file = FSInputFile(audio_path) + msg = await bot.send_audio( + chat_id=chat_id, + audio=input_file, + caption=caption[:1024] if caption else None, + reply_to_message_id=reply_to_message_id, + ) + return msg.message_id + except Exception as e: + PrintStyle.error(f"Telegram send_audio failed: {format_error(e)}") + return None + + +async def send_video( + bot: Bot, + chat_id: int, + video_path: str, + caption: str = "", + reply_to_message_id: int | None = None, +) -> int | None: + """Send a video message from local path. Returns message_id or None on error.""" + try: + if not os.path.isfile(video_path): + PrintStyle.error(f"Telegram: video file not found: {video_path}") + return None + input_file = FSInputFile(video_path) + msg = await bot.send_video( + chat_id=chat_id, + video=input_file, + caption=caption[:1024] if caption else None, + reply_to_message_id=reply_to_message_id, + ) + return msg.message_id + except Exception as e: + PrintStyle.error(f"Telegram send_video failed: {format_error(e)}") + return None + + # Inline keyboards def build_inline_keyboard( @@ -171,6 +248,88 @@ async def send_typing(bot: Bot, chat_id: int): except Exception: pass + +async def raw_send_text( + token: str, + chat_id: int, + text: str, + reply_to_message_id: int | None = None, + parse_mode: str | None = "HTML", + reply_markup: dict | None = None, +) -> int | None: + payload: dict[str, object] = { + "chat_id": chat_id, + "text": text[:MAX_MESSAGE_LENGTH], + } + if parse_mode: + payload["parse_mode"] = parse_mode + if reply_to_message_id: + payload["reply_parameters"] = {"message_id": int(reply_to_message_id)} + if reply_markup: + payload["reply_markup"] = reply_markup + data = await _raw_post(token, "sendMessage", payload) + result = data.get("result") if isinstance(data, dict) else None + if isinstance(result, dict): + return result.get("message_id") + return None + + +async def raw_edit_text( + token: str, + chat_id: int, + message_id: int, + text: str, + parse_mode: str | None = "HTML", + reply_markup: dict | None = None, +) -> bool: + payload: dict[str, object] = { + "chat_id": chat_id, + "message_id": message_id, + "text": text[:MAX_MESSAGE_LENGTH], + } + if parse_mode: + payload["parse_mode"] = parse_mode + if reply_markup: + payload["reply_markup"] = reply_markup + data = await _raw_post(token, "editMessageText", payload) + if not isinstance(data, dict): + return False + if data.get("ok"): + return True + description = str(data.get("description") or "").lower() + return "message is not modified" in description + + +async def raw_edit_reply_markup( + token: str, + chat_id: int, + message_id: int, + reply_markup: dict | None = None, +) -> bool: + payload: dict[str, object] = { + "chat_id": chat_id, + "message_id": message_id, + } + if reply_markup: + payload["reply_markup"] = reply_markup + data = await _raw_post(token, "editMessageReplyMarkup", payload) + return bool(isinstance(data, dict) and data.get("ok")) + + +async def _raw_post(token: str, method: str, payload: dict[str, object]) -> dict: + url = f"{TELEGRAM_API_BASE}/bot{token}/{method}" + try: + timeout = aiohttp.ClientTimeout(total=10) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post(url, json=payload) as response: + data = await response.json(content_type=None) + if response.status != 200 or not data.get("ok"): + PrintStyle.debug(f"Telegram {method} failed: {data}") + return data if isinstance(data, dict) else {} + except Exception as e: + PrintStyle.debug(f"Telegram {method} failed: {format_error(e)}") + return {} + # File download async def download_file( @@ -210,6 +369,9 @@ def _split_text(text: str, max_len: int) -> list[str]: _IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"} +_VOICE_EXTENSIONS = {".ogg", ".oga", ".opus"} +_AUDIO_EXTENSIONS = {".mp3", ".m4a", ".aac", ".wav", ".flac"} +_VIDEO_EXTENSIONS = {".mp4", ".m4v", ".mov", ".webm"} def is_image_file(path: str) -> bool: @@ -217,6 +379,21 @@ def is_image_file(path: str) -> bool: return ext in _IMAGE_EXTENSIONS +def is_voice_file(path: str) -> bool: + _, ext = os.path.splitext(path.lower()) + return ext in _VOICE_EXTENSIONS + + +def is_audio_file(path: str) -> bool: + _, ext = os.path.splitext(path.lower()) + return ext in _AUDIO_EXTENSIONS + + +def is_video_file(path: str) -> bool: + _, ext = os.path.splitext(path.lower()) + return ext in _VIDEO_EXTENSIONS + + def md_to_telegram_html(text: str) -> str: """Convert Markdown to Telegram-compatible HTML.""" stash: list[str] = [] diff --git a/plugins/_whatsapp_integration/helpers/handler.py b/plugins/_whatsapp_integration/helpers/handler.py index e1efe415c5..9c9e19eb60 100644 --- a/plugins/_whatsapp_integration/helpers/handler.py +++ b/plugins/_whatsapp_integration/helpers/handler.py @@ -207,6 +207,24 @@ async def _route_to_chat( msg_id = str(uuid.uuid4()) media_urls = msg.get("mediaUrls", []) attachments = await _save_incoming_media(media_urls) if media_urls else [] + + if context.is_running(): + item = mq.add(context, user_msg, attachments) + save_tmp_chat(context) + port = int((plugins.get_plugin_config(PLUGIN_NAME) or {}).get("bridge_port", 3100)) + base_url = bridge_manager.get_bridge_url(port) + chat_id = context.data.get(CTX_WA_CHAT_ID, "") or msg.get("chatId", "") + reply_to = msg.get("messageId", "") if context.data.get(CTX_WA_IS_GROUP) else "" + await wa_client.send_message( + base_url, + chat_id, + f"Queued message #{item.get('seq', len(mq.get_queue(context)))}. Use /send to flush queued work, or /steer to interrupt the active run.", + reply_to=reply_to, + ) + await wa_client.send_typing(base_url, chat_id, paused=True) + context.data[CTX_WA_TYPING_ACTIVE] = False + return + mq.log_user_message( context, user_msg, attachments, message_id=msg_id, source=" (whatsapp)", ) diff --git a/tests/test_telegram_error_ui.py b/tests/test_telegram_error_ui.py new file mode 100644 index 0000000000..3b73fdf5bd --- /dev/null +++ b/tests/test_telegram_error_ui.py @@ -0,0 +1,96 @@ +import asyncio + +from plugins._telegram_integration.extensions.python._functions.agent.Agent.handle_exception.end import ( + _85_telegram_error, +) +from plugins._telegram_integration.helpers import error_ui +from plugins._telegram_integration.helpers.constants import ( + CTX_TG_BOT, + CTX_TG_ERROR_SENT, + CTX_TG_REPLY_TO, + CTX_TG_TYPING_STOP, +) + + +class FakeContext: + def __init__(self): + self.data = { + CTX_TG_BOT: "main", + CTX_TG_REPLY_TO: 456, + } + + +class FakeAgent: + number = 0 + + def __init__(self): + self.context = FakeContext() + + +def test_friendly_error_message_for_missing_api_key(): + message = error_ui.friendly_error_message( + RuntimeError("OPENAI_API_KEY is missing or invalid") + ) + + assert "provider setup issue" in message + assert "API key" in message + assert "OPENAI_API_KEY is missing or invalid" in message + + +def test_friendly_error_message_for_rate_limit(): + message = error_ui.friendly_error_message( + RuntimeError("Rate limit exceeded: too many requests") + ) + + assert "rate limited" in message + assert "too many requests" in message + + +def test_telegram_exception_hook_sends_once_and_cleans_stream_state(monkeypatch): + sends = [] + cleared = [] + typing_stopped = [] + + async def fake_send(context, text, attachments=None, keyboard=None): + sends.append( + { + "text": text, + "reply_to": context.data.get(CTX_TG_REPLY_TO), + "attachments": attachments, + "keyboard": keyboard, + } + ) + return None + + def fake_clear(context): + cleared.append(True) + + class FakeStop: + def set(self): + typing_stopped.append(True) + + agent = FakeAgent() + agent.context.data[CTX_TG_TYPING_STOP] = FakeStop() + + monkeypatch.setattr( + "plugins._telegram_integration.helpers.handler.send_telegram_reply", + fake_send, + ) + monkeypatch.setattr( + "plugins._telegram_integration.helpers.draft_stream.clear", + fake_clear, + ) + + extension = _85_telegram_error.TelegramFriendlyError(agent=agent) + data = {"exception": RuntimeError("provider returned 503 service unavailable")} + + asyncio.run(extension.execute(data=data)) + asyncio.run(extension.execute(data=data)) + + assert len(sends) == 1 + assert "could not complete the model request" in sends[0]["text"] + assert sends[0]["reply_to"] == 456 + assert cleared == [True] + assert typing_stopped == [True] + assert agent.context.data[CTX_TG_ERROR_SENT] is True + assert CTX_TG_REPLY_TO not in agent.context.data diff --git a/tests/test_telegram_heartbeat.py b/tests/test_telegram_heartbeat.py new file mode 100644 index 0000000000..e9626ed42c --- /dev/null +++ b/tests/test_telegram_heartbeat.py @@ -0,0 +1,71 @@ +import asyncio + +from plugins._telegram_integration.helpers import heartbeat +from plugins._telegram_integration.helpers.constants import ( + CTX_TG_BOT, + CTX_TG_CHAT_ID, + CTX_TG_HEARTBEAT_STOP, + CTX_TG_HEARTBEAT_TASK, +) + + +class FakeBot: + class Bot: + token = "token" + + bot = Bot() + + +class FakeLog: + progress = "icon://search[Search] A0: Searching official docs" + + +class FakeContext: + def __init__(self): + self.data = { + CTX_TG_BOT: "main", + CTX_TG_CHAT_ID: 123, + } + self.log = FakeLog() + + +def test_heartbeat_text_omits_iteration_count(): + text = heartbeat.heartbeat_text(180, "waiting for provider response") + + assert text == "Still working... (3 min elapsed - waiting for provider response)" + assert "iteration" not in text + + +def test_heartbeat_sends_periodic_status_and_stops(monkeypatch): + calls = [] + + async def fake_send(token, chat_id, text, parse_mode=None, **kwargs): + calls.append( + { + "token": token, + "chat_id": chat_id, + "text": text, + "parse_mode": parse_mode, + } + ) + return len(calls) + + context = FakeContext() + monkeypatch.setattr(heartbeat, "HEARTBEAT_INTERVAL_SECONDS", 0.01) + monkeypatch.setattr(heartbeat, "get_bot", lambda name: FakeBot()) + monkeypatch.setattr(heartbeat.tc, "raw_send_text", fake_send) + + async def run(): + await heartbeat.start(context) + await asyncio.sleep(0.025) + await heartbeat.stop(context) + + asyncio.run(run()) + + assert calls + assert calls[0]["chat_id"] == 123 + assert calls[0]["parse_mode"] is None + assert "Still working..." in calls[0]["text"] + assert "A0: Searching official docs" in calls[0]["text"] + assert CTX_TG_HEARTBEAT_TASK not in context.data + assert CTX_TG_HEARTBEAT_STOP not in context.data diff --git a/tests/test_telegram_intermediate_response.py b/tests/test_telegram_intermediate_response.py new file mode 100644 index 0000000000..c8c5725576 --- /dev/null +++ b/tests/test_telegram_intermediate_response.py @@ -0,0 +1,152 @@ +import asyncio + +from plugins._telegram_integration.helpers import draft_stream +from plugins._telegram_integration.helpers.constants import ( + CTX_TG_BOT, + CTX_TG_CHAT_ID, + CTX_TG_PROGRESS_LINES, + CTX_TG_PROGRESS_MESSAGE_ID, + CTX_TG_REPLY_TO, + CTX_TG_RESPONSE_MESSAGE_ID, +) + + +class FakeBot: + class Bot: + token = "token" + + bot = Bot() + + +class FakeContext: + def __init__(self): + self.data = { + CTX_TG_BOT: "main", + CTX_TG_CHAT_ID: 123, + CTX_TG_REPLY_TO: 456, + } + + def get_data(self, key): + return self.data.get(key) + + +def test_intermediate_response_sends_separate_non_reply_message(monkeypatch): + calls = [] + + async def fake_send(token, chat_id, text, reply_to_message_id=None, parse_mode="HTML", reply_markup=None): + calls.append( + { + "token": token, + "chat_id": chat_id, + "text": text, + "reply_to_message_id": reply_to_message_id, + "parse_mode": parse_mode, + "reply_markup": reply_markup, + } + ) + return 789 + + context = FakeContext() + monkeypatch.setattr(draft_stream, "_bot_instance", lambda ctx: FakeBot()) + monkeypatch.setattr(draft_stream.tc, "raw_send_text", fake_send) + + sent = asyncio.run( + draft_stream.send_intermediate_response( + context, + "**Working** on the brief.", + keyboard=[[{"text": "Open", "callback_data": "open"}]], + ) + ) + + assert sent is True + assert calls == [ + { + "token": "token", + "chat_id": 123, + "text": "Working on the brief.", + "reply_to_message_id": None, + "parse_mode": "HTML", + "reply_markup": {"inline_keyboard": [[{"text": "Open", "callback_data": "open"}]]}, + } + ] + assert CTX_TG_RESPONSE_MESSAGE_ID not in context.data + + +def test_intermediate_response_finalizes_active_stream_and_starts_next_tool_group(monkeypatch): + calls = [] + next_message_id = 100 + + async def fake_send(token, chat_id, text, reply_to_message_id=None, parse_mode="HTML", reply_markup=None): + nonlocal next_message_id + calls.append( + { + "method": "send", + "text": text, + "reply_to_message_id": reply_to_message_id, + "parse_mode": parse_mode, + "reply_markup": reply_markup, + "message_id": next_message_id, + } + ) + next_message_id += 1 + return next_message_id - 1 + + async def fake_edit(token, chat_id, message_id, text, parse_mode="HTML", reply_markup=None): + calls.append( + { + "method": "edit", + "message_id": message_id, + "text": text, + "parse_mode": parse_mode, + "reply_markup": reply_markup, + } + ) + return True + + context = FakeContext() + monkeypatch.setattr(draft_stream, "_bot_instance", lambda ctx: FakeBot()) + monkeypatch.setattr(draft_stream.tc, "raw_send_text", fake_send) + monkeypatch.setattr(draft_stream.tc, "raw_edit_text", fake_edit) + + asyncio.run(draft_stream.add_tool_start(context, "search_engine", {"query": "telegram bot api"})) + asyncio.run(draft_stream.update_response(context, "Found the docs.")) + sent = asyncio.run(draft_stream.send_intermediate_response(context, "Found the docs.")) + asyncio.run(draft_stream.add_tool_start(context, "read_file", {"path": "notes.md"})) + + assert sent is True + assert calls == [ + { + "method": "send", + "text": "🔎 search engine: telegram bot api", + "reply_to_message_id": None, + "parse_mode": None, + "reply_markup": None, + "message_id": 100, + }, + { + "method": "send", + "text": "Found the docs.", + "reply_to_message_id": 456, + "parse_mode": "HTML", + "reply_markup": None, + "message_id": 101, + }, + { + "method": "edit", + "message_id": 101, + "text": "Found the docs.", + "parse_mode": "HTML", + "reply_markup": None, + }, + { + "method": "send", + "text": "📖 read file: notes.md", + "reply_to_message_id": None, + "parse_mode": None, + "reply_markup": None, + "message_id": 102, + }, + ] + assert context.data[CTX_TG_PROGRESS_MESSAGE_ID] == 102 + assert context.data[CTX_TG_PROGRESS_LINES] == ["📖 read file: notes.md"] + assert CTX_TG_RESPONSE_MESSAGE_ID not in context.data diff --git a/tests/test_telegram_media_delivery.py b/tests/test_telegram_media_delivery.py new file mode 100644 index 0000000000..bbdb2a2bc3 --- /dev/null +++ b/tests/test_telegram_media_delivery.py @@ -0,0 +1,108 @@ +import asyncio + +from plugins._telegram_integration.helpers import handler +from plugins._telegram_integration.helpers import telegram_client as tc +from plugins._telegram_integration.helpers.constants import ( + CTX_TG_BOT, + CTX_TG_CHAT_ID, + CTX_TG_REPLY_TO, +) + + +class FakeBotInstance: + class Bot: + token = "token" + + bot = Bot() + + +class FakeContext: + def __init__(self): + self.data = { + CTX_TG_BOT: "main", + CTX_TG_CHAT_ID: 123, + CTX_TG_REPLY_TO: 456, + } + + +class FakeTempBot: + async def __aenter__(self): + return object() + + async def __aexit__(self, exc_type, exc, tb): + return False + + +class FakeMediaBot: + def __init__(self): + self.calls = [] + + async def send_voice(self, **kwargs): + self.calls.append(("voice", kwargs)) + return type("Message", (), {"message_id": 11})() + + async def send_audio(self, **kwargs): + self.calls.append(("audio", kwargs)) + return type("Message", (), {"message_id": 12})() + + async def send_video(self, **kwargs): + self.calls.append(("video", kwargs)) + return type("Message", (), {"message_id": 13})() + + +def test_telegram_client_native_media_helpers(monkeypatch): + bot = FakeMediaBot() + monkeypatch.setattr(tc.os.path, "isfile", lambda path: True) + + voice_id = asyncio.run(tc.send_voice(bot, 123, "voice.ogg", reply_to_message_id=456)) + audio_id = asyncio.run(tc.send_audio(bot, 123, "song.mp3", reply_to_message_id=456)) + video_id = asyncio.run(tc.send_video(bot, 123, "clip.mp4", reply_to_message_id=456)) + + assert (voice_id, audio_id, video_id) == (11, 12, 13) + assert [kind for kind, _ in bot.calls] == ["voice", "audio", "video"] + assert bot.calls[0][1]["chat_id"] == 123 + assert bot.calls[0][1]["reply_to_message_id"] == 456 + + +def test_telegram_reply_routes_native_media_attachments(monkeypatch): + calls = [] + + def fake_temp_bot(*args, **kwargs): + return FakeTempBot() + + async def record(kind, bot, chat_id, path, reply_to_message_id=None, **kwargs): + calls.append( + { + "kind": kind, + "chat_id": chat_id, + "path": path, + "reply_to_message_id": reply_to_message_id, + } + ) + return len(calls) + + monkeypatch.setattr(handler, "get_bot", lambda name: FakeBotInstance()) + monkeypatch.setattr(handler, "_temp_bot", fake_temp_bot) + monkeypatch.setattr(handler.files, "fix_dev_path", lambda path: path) + monkeypatch.setattr(handler.tc, "send_photo", lambda *a, **kw: record("photo", *a, **kw)) + monkeypatch.setattr(handler.tc, "send_voice", lambda *a, **kw: record("voice", *a, **kw)) + monkeypatch.setattr(handler.tc, "send_audio", lambda *a, **kw: record("audio", *a, **kw)) + monkeypatch.setattr(handler.tc, "send_video", lambda *a, **kw: record("video", *a, **kw)) + monkeypatch.setattr(handler.tc, "send_file", lambda *a, **kw: record("document", *a, **kw)) + + error = asyncio.run( + handler.send_telegram_reply( + FakeContext(), + "", + ["image.png", "voice.ogg", "song.mp3", "clip.mp4", "archive.zip"], + ) + ) + + assert error is None + assert calls == [ + {"kind": "photo", "chat_id": 123, "path": "image.png", "reply_to_message_id": 456}, + {"kind": "voice", "chat_id": 123, "path": "voice.ogg", "reply_to_message_id": 456}, + {"kind": "audio", "chat_id": 123, "path": "song.mp3", "reply_to_message_id": 456}, + {"kind": "video", "chat_id": 123, "path": "clip.mp4", "reply_to_message_id": 456}, + {"kind": "document", "chat_id": 123, "path": "archive.zip", "reply_to_message_id": 456}, + ] diff --git a/tests/test_telegram_sessions_picker.py b/tests/test_telegram_sessions_picker.py new file mode 100644 index 0000000000..588ecbc7ec --- /dev/null +++ b/tests/test_telegram_sessions_picker.py @@ -0,0 +1,78 @@ +import asyncio +from types import SimpleNamespace + +from plugins._telegram_integration.helpers import command_ui +from plugins._telegram_integration.helpers.constants import ( + CTX_TG_BOT, + CTX_TG_CHAT_ID, + CTX_TG_USER_ID, + CTX_TG_USERNAME, +) + + +class FakeContext: + def __init__(self, id, name, last_message, *, running=False): + self.id = id + self.name = name + self.data = {} + self._last_message = last_message + self._running = running + + def output(self): + return {"last_message": self._last_message} + + def is_running(self): + return self._running + + +def test_session_picker_shows_four_recent_sessions(monkeypatch): + contexts = [ + FakeContext("ctx-1", "One", "2026-05-01T10:00:00"), + FakeContext("ctx-2", "Two", "2026-05-02T10:00:00"), + FakeContext("ctx-3", "Three", "2026-05-03T10:00:00"), + FakeContext("ctx-4", "Four", "2026-05-04T10:00:00"), + FakeContext("ctx-5", "Five", "2026-05-05T10:00:00"), + ] + current = contexts[4] + monkeypatch.setattr(command_ui, "AgentContext", SimpleNamespace(all=lambda: contexts)) + + text, markup = command_ui._session_view(current, 0) + + assert "Current session: Five" in text + assert "Showing 1-4 of 5." in text + rows = markup["inline_keyboard"] + assert [row[0]["text"] for row in rows[:4]] == ["• Five", "Four", "Three", "Two"] + assert rows[-1] == [{"text": "Next", "callback_data": "tg:session:page:1"}] + + +def test_select_session_remaps_telegram_chat(monkeypatch): + current = FakeContext("ctx-current", "Current", "2026-05-03T10:00:00") + target = FakeContext("ctx-target", "Target", "2026-05-04T10:00:00") + current.data.update( + { + CTX_TG_BOT: "main", + CTX_TG_CHAT_ID: 123, + CTX_TG_USER_ID: 456, + CTX_TG_USERNAME: "anmol", + } + ) + saved = [] + dirty = [] + state_out = {} + + monkeypatch.setattr(command_ui, "AgentContext", SimpleNamespace(all=lambda: [current, target])) + monkeypatch.setattr(command_ui, "save_tmp_chat", lambda context: saved.append(context.id)) + monkeypatch.setattr(command_ui, "mark_dirty_for_context", lambda context_id, *, reason=None: dirty.append((context_id, reason))) + monkeypatch.setattr(command_ui, "_load_telegram_state", lambda: {"chats": {}}) + monkeypatch.setattr(command_ui, "_save_telegram_state", lambda state: state_out.update(state)) + + selected = asyncio.run(command_ui._select_session(current, 0)) + + assert selected is target + assert target.data[CTX_TG_BOT] == "main" + assert target.data[CTX_TG_CHAT_ID] == 123 + assert target.data[CTX_TG_USER_ID] == 456 + assert target.data[CTX_TG_USERNAME] == "anmol" + assert state_out["chats"]["main:456:123"] == "ctx-target" + assert saved == ["ctx-target"] + assert ("ctx-target", "telegram.session_select") in dirty