diff --git a/plugins/_chat_compaction/extensions/webui/chat-input-bottom-actions-start/compact-button.html b/plugins/_chat_compaction/extensions/webui/chat-input-bottom-actions-start/compact-button.html
index 51d8ea9232..c0818fcc8f 100644
--- a/plugins/_chat_compaction/extensions/webui/chat-input-bottom-actions-start/compact-button.html
+++ b/plugins/_chat_compaction/extensions/webui/chat-input-bottom-actions-start/compact-button.html
@@ -131,11 +131,14 @@
Compact Chat History
}
.cmpct-dialog {
- background: var(--color-panel, #1a1a1a);
+ background: var(--color-panel, var(--color-background));
border-radius: 12px;
box-shadow: 0 4px 23px rgba(0, 0, 0, 0.3);
width: 90%;
max-width: 560px;
+ max-height: 85dvh;
+ display: flex;
+ flex-direction: column;
overflow: hidden;
}
@@ -144,14 +147,15 @@ Compact Chat History
align-items: center;
justify-content: space-between;
padding: 16px 20px;
- border-bottom: 1px solid var(--color-border, #333);
+ border-bottom: 1px solid var(--color-border);
+ flex-shrink: 0;
}
.cmpct-header h3 {
margin: 0;
font-size: 1.1rem;
font-weight: 600;
- color: var(--color-text, #e5e5e5);
+ color: var(--color-text);
}
.cmpct-close {
@@ -160,21 +164,24 @@ Compact Chat History
cursor: pointer;
padding: 4px;
border-radius: 4px;
- color: var(--color-text-secondary, #999);
+ color: var(--color-text-secondary);
transition: color 0.2s;
}
.cmpct-close:hover {
- color: var(--color-text, #e5e5e5);
+ color: var(--color-text);
}
.cmpct-body {
padding: 20px;
+ overflow-y: auto;
+ flex: 1;
+ min-height: 0;
}
.cmpct-desc {
margin: 0 0 16px 0;
- color: var(--color-text-secondary, #999);
+ color: var(--color-text-secondary);
font-size: 0.9rem;
}
@@ -190,13 +197,13 @@ Compact Chat History
flex-direction: column;
align-items: center;
padding: 12px 8px;
- background: var(--color-background-muted, #252525);
+ background: var(--color-background-muted);
border-radius: 8px;
}
.cmpct-stat-label {
font-size: 0.7rem;
- color: var(--color-text-secondary, #999);
+ color: var(--color-text-secondary);
text-transform: uppercase;
letter-spacing: 0.05em;
margin-bottom: 4px;
@@ -205,7 +212,7 @@ Compact Chat History
.cmpct-stat-value {
font-size: 1.1rem;
font-weight: 600;
- color: var(--color-text, #e5e5e5);
+ color: var(--color-text);
}
/* Model selection section */
@@ -215,7 +222,7 @@ Compact Chat History
.cmpct-model-label {
font-size: 0.7rem;
- color: var(--color-text-secondary, #999);
+ color: var(--color-text-secondary);
text-transform: uppercase;
letter-spacing: 0.05em;
margin-bottom: 8px;
@@ -232,22 +239,22 @@ Compact Chat History
min-width: 0;
padding: 6px 10px;
border-radius: 6px;
- border: 1px solid var(--color-border, #333);
- background: var(--color-background-muted, #252525);
- color: var(--color-text, #e5e5e5);
+ border: 1px solid var(--color-border);
+ background: var(--color-background-muted);
+ color: var(--color-text);
font-size: 0.85rem;
cursor: pointer;
}
.cmpct-select:focus {
outline: none;
- border-color: var(--color-primary, #3b82f6);
+ border-color: var(--color-primary);
}
.cmpct-toggle {
display: flex;
border-radius: 6px;
- border: 1px solid var(--color-border, #333);
+ border: 1px solid var(--color-border);
overflow: hidden;
flex-shrink: 0;
}
@@ -255,8 +262,8 @@ Compact Chat History
.cmpct-toggle button {
padding: 6px 14px;
border: none;
- background: var(--color-background-muted, #252525);
- color: var(--color-text-secondary, #999);
+ background: var(--color-background-muted);
+ color: var(--color-text-secondary);
font-size: 0.8rem;
font-weight: 500;
cursor: pointer;
@@ -264,21 +271,21 @@ Compact Chat History
}
.cmpct-toggle button:first-child {
- border-right: 1px solid var(--color-border, #333);
+ border-right: 1px solid var(--color-border);
}
.cmpct-toggle button.active {
- background: var(--color-primary, #3b82f6);
- color: white;
+ background: color-mix(in srgb, var(--color-primary) 18%, var(--color-background-muted));
+ color: var(--color-primary);
}
.cmpct-toggle button:hover:not(.active) {
- background: var(--color-background-hover, #444);
+ background: var(--color-background-hover);
}
.cmpct-model-name {
font-size: 0.85rem;
- color: var(--color-text, #e5e5e5);
+ color: var(--color-text);
opacity: 0.7;
white-space: nowrap;
overflow: hidden;
@@ -290,10 +297,10 @@ Compact Chat History
align-items: flex-start;
gap: 8px;
padding: 10px 12px;
- background: rgba(34, 197, 94, 0.1);
- border: 1px solid rgba(34, 197, 94, 0.25);
+ background: color-mix(in srgb, var(--color-primary) 10%, transparent);
+ border: 1px solid color-mix(in srgb, var(--color-primary) 26%, var(--color-border));
border-radius: 8px;
- color: #4ade80;
+ color: color-mix(in srgb, var(--color-primary) 85%, var(--color-text));
font-size: 0.8rem;
}
@@ -308,7 +315,7 @@ Compact Chat History
justify-content: center;
gap: 8px;
padding: 30px;
- color: var(--color-text-secondary, #999);
+ color: var(--color-text-secondary);
}
.cmpct-footer {
@@ -316,7 +323,8 @@ Compact Chat History
justify-content: flex-end;
gap: 8px;
padding: 12px 20px;
- border-top: 1px solid var(--color-border, #333);
+ border-top: 1px solid var(--color-border);
+ flex-shrink: 0;
}
.cmpct-btn {
@@ -338,21 +346,22 @@ Compact Chat History
}
.cmpct-btn-cancel {
- background: var(--color-background-muted, #333);
- color: var(--color-text, #e5e5e5);
+ background: var(--color-background-muted);
+ color: var(--color-text);
}
.cmpct-btn-cancel:hover:not(:disabled) {
- background: var(--color-background-hover, #444);
+ background: var(--color-background-hover);
}
.cmpct-btn-danger {
- background: #dc2626;
- color: white;
+ background: color-mix(in srgb, var(--color-error, #dc2626) 16%, var(--color-background-muted));
+ border: 1px solid color-mix(in srgb, var(--color-error, #dc2626) 36%, var(--color-border));
+ color: var(--color-error, #dc2626);
}
.cmpct-btn-danger:hover:not(:disabled) {
- background: #b91c1c;
+ background: color-mix(in srgb, var(--color-error, #dc2626) 24%, var(--color-background-hover));
}
.cmpct-spinner {
diff --git a/plugins/_chat_compaction/webui/compact-modal.html b/plugins/_chat_compaction/webui/compact-modal.html
index 665da605bc..610d41303b 100644
--- a/plugins/_chat_compaction/webui/compact-modal.html
+++ b/plugins/_chat_compaction/webui/compact-modal.html
@@ -122,8 +122,9 @@ Compact Chat History
box-shadow: var(--shadow-lg);
width: 90%;
max-width: 640px;
- max-height: 90vh;
- overflow-y: auto;
+ max-height: 85dvh;
+ display: flex;
+ flex-direction: column;
}
.modal-header {
@@ -132,6 +133,7 @@ Compact Chat History
justify-content: space-between;
padding: var(--spacing-md) var(--spacing-lg);
border-bottom: 1px solid var(--color-border);
+ flex-shrink: 0;
}
.modal-header h3 {
@@ -156,6 +158,9 @@ Compact Chat History
.modal-content {
padding: var(--spacing-lg);
+ overflow-y: auto;
+ flex: 1;
+ min-height: 0;
}
.stats-description {
@@ -255,8 +260,8 @@ Compact Chat History
}
.model-type-toggle button.active {
- background: var(--color-primary, #3b82f6);
- color: white;
+ background: color-mix(in srgb, var(--color-primary) 18%, var(--color-background-muted));
+ color: var(--color-primary);
}
.model-type-toggle button:hover:not(.active) {
@@ -322,6 +327,7 @@ Compact Chat History
gap: var(--spacing-sm);
padding: var(--spacing-md) var(--spacing-lg);
border-top: 1px solid var(--color-border);
+ flex-shrink: 0;
}
.button {
@@ -354,7 +360,7 @@ Compact Chat History
.button.primary {
background: var(--color-primary);
border: none;
- color: white;
+ color: var(--color-background);
}
.button.primary:hover:not(:disabled) {
@@ -362,11 +368,13 @@ Compact Chat History
}
.button.danger {
- background: var(--color-danger, #dc2626);
+ background: color-mix(in srgb, var(--color-error, #dc2626) 16%, var(--color-background-muted));
+ border: 1px solid color-mix(in srgb, var(--color-error, #dc2626) 36%, var(--color-border));
+ color: var(--color-error, #dc2626);
}
.button.danger:hover:not(:disabled) {
- background: var(--color-danger-hover, #b91c1c);
+ background: color-mix(in srgb, var(--color-error, #dc2626) 24%, var(--color-background-hover));
}
.loading-spinner {
diff --git a/plugins/_telegram_integration/api/webhook.py b/plugins/_telegram_integration/api/webhook.py
index 049be486ba..322e9102f8 100644
--- a/plugins/_telegram_integration/api/webhook.py
+++ b/plugins/_telegram_integration/api/webhook.py
@@ -1,11 +1,138 @@
+import asyncio
+import os
+import time
+import traceback
+
from helpers.api import ApiHandler, Request, Response
from helpers.print_style import PrintStyle
from plugins._telegram_integration.helpers.dependencies import ensure_dependencies
+_SEEN_UPDATES: dict[str, float] = {}
+_SEEN_TTL_SECONDS = 600
+_BACKGROUND_TASKS: set[asyncio.Task] = set()
+_BOOTSTRAP_LOCKS: dict[str, asyncio.Lock] = {}
+_TRACE_FILE = "/a0/logs/telegram_webhook_trace.log"
-class TelegramWebhook(ApiHandler):
- """Receives Telegram webhook updates. No auth/CSRF — Telegram cannot send session cookies."""
+def _trace(message: str) -> None:
+ try:
+ os.makedirs(os.path.dirname(_TRACE_FILE), exist_ok=True)
+ with open(_TRACE_FILE, "a", encoding="utf-8") as f:
+ f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} {message}\n")
+ except Exception:
+ pass
+
+
+def _track_background_task(task: asyncio.Task) -> None:
+ _BACKGROUND_TASKS.add(task)
+ task.add_done_callback(_BACKGROUND_TASKS.discard)
+
+
+async def _ensure_instance_locked(bot_name: str):
+ from plugins._telegram_integration.helpers.bot_manager import ensure_bot_running_from_config
+
+ lock = _BOOTSTRAP_LOCKS.setdefault(bot_name, asyncio.Lock())
+ async with lock:
+ return await ensure_bot_running_from_config(bot_name)
+
+
+def _cleanup_seen_updates() -> None:
+ now = time.time()
+ for key, ts in list(_SEEN_UPDATES.items()):
+ if now - ts > _SEEN_TTL_SECONDS:
+ _SEEN_UPDATES.pop(key, None)
+
+
+def _is_duplicate(bot_name: str, update_id: int | None) -> bool:
+ if update_id is None:
+ return False
+ _cleanup_seen_updates()
+ return f"{bot_name}:{update_id}" in _SEEN_UPDATES
+
+
+def _mark_processed(bot_name: str, update_id: int | None) -> None:
+ if update_id is None:
+ return
+ _cleanup_seen_updates()
+ _SEEN_UPDATES[f"{bot_name}:{update_id}"] = time.time()
+
+
+def get_bot_instance(bot_name: str):
+ from plugins._telegram_integration.helpers.bot_manager import get_bot
+ return get_bot(bot_name)
+
+
+async def _direct_process_update(bot_name: str, bot_cfg: dict, update_data: dict) -> bool:
+ """Hermes-style direct processing path.
+
+ Aiogram dispatcher is still available as fallback, but direct routing avoids losing
+ updates if router/filter state is broken after hot restarts.
+ """
+ from aiogram.types import Update
+ from plugins._telegram_integration.helpers.handler import (
+ handle_start,
+ handle_clear,
+ handle_new_chat,
+ handle_message,
+ handle_nudge,
+ handle_pause,
+ handle_restart,
+ handle_topicname,
+ handle_affect_project,
+ handle_callback_query,
+ handle_new_members,
+ handle_forum_topic_closed,
+ )
+
+ update = Update.model_validate(update_data, context={"bot": get_bot_instance(bot_name).bot if get_bot_instance(bot_name) else None})
+ msg = update.message or update.edited_message
+ if update.callback_query:
+ _trace(f"direct callback bot={bot_name} user={getattr(update.callback_query.from_user, 'id', None)}")
+ await handle_callback_query(update.callback_query, bot_name, bot_cfg)
+ return True
+
+ if not msg:
+ return False
+
+ user_id = getattr(getattr(msg, "from_user", None), "id", None)
+ chat_id = getattr(getattr(msg, "chat", None), "id", None)
+ text = msg.text or msg.caption or ""
+ content_type = getattr(msg, "content_type", "")
+ _trace(f"direct message bot={bot_name} user={user_id} chat={chat_id} type={content_type} text={text[:120]!r}")
+
+ if getattr(msg, "new_chat_members", None):
+ await handle_new_members(msg, bot_name, bot_cfg)
+ return True
+ if getattr(msg, "forum_topic_closed", None):
+ await handle_forum_topic_closed(msg, bot_name, bot_cfg)
+ return True
+
+ command = ""
+ if text.startswith("/"):
+ command = text.split(maxsplit=1)[0].split("@", 1)[0].lower()
+
+ if command == "/start":
+ await handle_start(msg, bot_name, bot_cfg)
+ elif command == "/clear":
+ await handle_clear(msg, bot_name, bot_cfg)
+ elif command in ("/new", "/branch"):
+ await handle_new_chat(msg, bot_name, bot_cfg)
+ elif command in ("/project", "/projets"):
+ await handle_affect_project(msg, bot_name, bot_cfg)
+ elif command == "/nudge":
+ await handle_nudge(msg, bot_name, bot_cfg)
+ elif command == "/pause":
+ await handle_pause(msg, bot_name, bot_cfg)
+ elif command == "/restart":
+ await handle_restart(msg, bot_name, bot_cfg)
+ elif command == "/topicname":
+ await handle_topicname(msg, bot_name, bot_cfg)
+ else:
+ await handle_message(msg, bot_name, bot_cfg)
+ return True
+
+
+class TelegramWebhook(ApiHandler):
@classmethod
def requires_auth(cls) -> bool:
return False
@@ -20,30 +147,49 @@ def get_methods(cls) -> list[str]:
async def process(self, input: dict, request: Request) -> dict | Response:
ensure_dependencies()
- from aiogram.types import Update
-
- from plugins._telegram_integration.helpers.bot_manager import get_bot
-
- # Identify which bot this update is for
bot_name = request.args.get("bot", "")
+ _trace(f"process entry bot={bot_name!r} update={input.get('update_id')} keys={list(input.keys())}")
if not bot_name:
+ _trace("reject missing bot")
return Response("Missing ?bot= parameter", 400)
- instance = get_bot(bot_name)
+ instance = await _ensure_instance_locked(bot_name)
if not instance:
- return Response(f"Bot not found: {bot_name}", 404)
+ _trace(f"reject bot not found {bot_name}")
+ return Response(f"Bot not found or disabled: {bot_name}", 404)
- # Verify webhook secret if configured
secret_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token", "")
if instance.webhook_secret and secret_header != instance.webhook_secret:
+ _trace(f"reject invalid secret bot={bot_name}")
return Response("Invalid secret token", 403)
- # Parse and feed the update to aiogram
+ update_id = input.get("update_id")
+ if _is_duplicate(bot_name, update_id):
+ _trace(f"duplicate bot={bot_name} update={update_id}")
+ return {"ok": True, "duplicate": True}
+
+ async def _handle() -> None:
+ from plugins._telegram_integration.helpers.bot_manager import _get_current_bot_cfg
+ bot_cfg = _get_current_bot_cfg(bot_name)
+ handled = await _direct_process_update(bot_name, bot_cfg, input)
+ if not handled:
+ from aiogram.types import Update
+ update = Update.model_validate(input, context={"bot": instance.bot})
+ await instance.dispatcher.feed_update(instance.bot, update)
+ _trace(f"dispatcher fallback ok bot={bot_name} update={update_id}")
+ else:
+ _trace(f"direct ok bot={bot_name} update={update_id}")
+
+ # Process inline and ACK Telegram only after successful handling.
+ # If handling fails during/around a restart, return 500 so Telegram retries
+ # the same update instead of losing it after a premature 200 ACK.
try:
- update = Update.model_validate(input, context={"bot": instance.bot})
- await instance.dispatcher.feed_update(instance.bot, update)
+ await _handle()
except Exception as e:
- PrintStyle.error(f"Telegram webhook ({bot_name}): {e}")
- return Response("Internal error", 500)
+ err = traceback.format_exc()
+ _trace(f"error bot={bot_name} update={update_id}: {e}\n{err}")
+ PrintStyle.error(f"Telegram webhook ({bot_name}): {e}\n{err}")
+ return Response("Telegram update processing failed; retry requested", 500)
+ _mark_processed(bot_name, update_id)
return {"ok": True}
diff --git a/plugins/_telegram_integration/extensions/python/job_loop/_10_telegram_bot.py b/plugins/_telegram_integration/extensions/python/job_loop/_10_telegram_bot.py
index 6355a9a963..663fde7fdc 100644
--- a/plugins/_telegram_integration/extensions/python/job_loop/_10_telegram_bot.py
+++ b/plugins/_telegram_integration/extensions/python/job_loop/_10_telegram_bot.py
@@ -31,16 +31,20 @@ async def execute(self, **kwargs: Any) -> None:
get_all_bots,
create_bot,
cache_bot_info,
+ set_bot_commands,
start_polling,
setup_webhook,
stop_bot,
+ _make_handler,
)
from plugins._telegram_integration.helpers.handler import (
handle_start,
handle_clear,
+ handle_new_chat,
handle_message,
handle_callback_query,
handle_new_members,
+ handle_forum_topic_closed,
cleanup_old_attachments,
)
@@ -74,9 +78,11 @@ async def execute(self, **kwargs: Any) -> None:
# Create handler closures that capture bot_name and config
_on_start = partial(_make_handler(handle_start), bot_name=name, bot_cfg=bot_cfg)
_on_clear = partial(_make_handler(handle_clear), bot_name=name, bot_cfg=bot_cfg)
+ _on_new_chat = partial(_make_handler(handle_new_chat), bot_name=name, bot_cfg=bot_cfg)
_on_message = partial(_make_handler(handle_message), bot_name=name, bot_cfg=bot_cfg)
_on_callback = partial(_make_handler(handle_callback_query), bot_name=name, bot_cfg=bot_cfg)
_on_new_members = partial(_make_handler(handle_new_members), bot_name=name, bot_cfg=bot_cfg)
+ _on_forum_topic_closed = partial(_make_handler(handle_forum_topic_closed), bot_name=name, bot_cfg=bot_cfg)
instance = create_bot(
name=name,
@@ -84,13 +90,16 @@ async def execute(self, **kwargs: Any) -> None:
on_message=_on_message,
on_command_start=_on_start,
on_command_clear=_on_clear,
+ on_command_new=_on_new_chat,
on_command_control=_on_message,
on_callback_query=_on_callback,
on_new_members=_on_new_members,
+ on_forum_topic_closed=_on_forum_topic_closed,
group_mode=bot_cfg.get("group_mode", "mention"),
)
await cache_bot_info(instance)
+ await set_bot_commands(instance)
mode = bot_cfg.get("mode", "polling")
if mode == "webhook":
@@ -112,20 +121,3 @@ async def execute(self, **kwargs: Any) -> None:
PrintStyle.error(
f"Telegram ({name}): failed to start: {format_error(e)}"
)
-
-# Wrapper functions for aiogram handlers
-
-def _get_current_bot_cfg(bot_name: str) -> dict:
- """Fetch the latest bot config by name, so handlers always use fresh settings."""
- config = plugins.get_plugin_config(PLUGIN_NAME) or {}
- for b in config.get("bots", []):
- if b.get("name") == bot_name:
- return b
- return {}
-
-
-def _make_handler(handler_fn):
- """Create a wrapper that resolves fresh bot config on every call."""
- async def _wrapped(event, bot_name: str, bot_cfg: dict):
- await handler_fn(event, bot_name, _get_current_bot_cfg(bot_name) or bot_cfg)
- return _wrapped
diff --git a/plugins/_telegram_integration/helpers/bot_manager.py b/plugins/_telegram_integration/helpers/bot_manager.py
index b59c686646..39bbbbb9b5 100644
--- a/plugins/_telegram_integration/helpers/bot_manager.py
+++ b/plugins/_telegram_integration/helpers/bot_manager.py
@@ -6,7 +6,7 @@
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode, ChatType, ContentType
from aiogram.filters import Command, CommandStart
-from aiogram.types import Message
+from aiogram.types import BotCommand, Message
from helpers.errors import format_error
from helpers.print_style import PrintStyle
@@ -25,6 +25,8 @@ class BotInstance:
group_mode: str = "mention" # current group_mode setting
bot_info: object | None = None # cached result of bot.get_me()
+PLUGIN_NAME = "_telegram_integration"
+
# Bot registry (singleton, persists across module reloads)
_bots: dict[str, BotInstance] = {}
@@ -45,9 +47,12 @@ def create_bot(
on_message: Callable[..., Awaitable],
on_command_start: Callable[..., Awaitable],
on_command_clear: Callable[..., Awaitable],
+ on_command_new: Callable[..., Awaitable] | None = None,
on_command_control: Callable[..., Awaitable] | None = None,
+ on_command_affect_project: Callable[..., Awaitable] | None = None,
on_callback_query: Callable[..., Awaitable] | None = None,
on_new_members: Callable[..., Awaitable] | None = None,
+ on_forum_topic_closed: Callable[..., Awaitable] | None = None,
group_mode: str = "mention",
) -> BotInstance:
bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
@@ -57,10 +62,17 @@ def create_bot(
# Register command handlers
router.message.register(on_command_start, CommandStart())
router.message.register(on_command_clear, Command("clear"))
+ if on_command_new:
+ router.message.register(on_command_new, Command(commands=["new", "branch"]))
if on_command_control:
router.message.register(
on_command_control,
- Command(commands=["project", "config", "preset", "queue", "send"]),
+ Command(commands=["config", "preset", "queue", "send", "now", "later", "nudge", "pause", "restart", "topicname"]),
+ )
+ if on_command_affect_project:
+ router.message.register(
+ on_command_affect_project,
+ Command(commands=["project", "projets"]),
)
if on_callback_query:
@@ -69,6 +81,9 @@ def create_bot(
if on_new_members:
router.message.register(on_new_members, F.content_type == ContentType.NEW_CHAT_MEMBERS)
+ if on_forum_topic_closed:
+ router.message.register(on_forum_topic_closed, F.content_type == ContentType.FORUM_TOPIC_CLOSED)
+
# Register message handler with group filtering
if group_mode == "off":
# Private chats only
@@ -100,6 +115,28 @@ async def cache_bot_info(instance: BotInstance):
return instance.bot_info
+async def set_bot_commands(instance: BotInstance):
+ """Publish visible slash commands in Telegram's command menu."""
+ commands = [
+ BotCommand(command="start", description="Start the Agent Zero bot"),
+ BotCommand(command="clear", description="Reset the current conversation"),
+ BotCommand(command="new", description="Start a new Agent Zero chat here"),
+ BotCommand(command="branch", description="Branch into a new Agent Zero chat here"),
+ BotCommand(command="project", description="Show or change the active project"),
+ BotCommand(command="config", description="Show or change chat configuration"),
+ BotCommand(command="preset", description="Apply a model/config preset"),
+ BotCommand(command="queue", description="Voir/envoyer la queue"),
+ BotCommand(command="later", description="Ajouter un message à la queue"),
+ BotCommand(command="now", description="Interrompre et envoyer maintenant"),
+ BotCommand(command="nudge", description="Relancer l’agent actif"),
+ BotCommand(command="pause", description="Mettre l’agent actif en pause"),
+ BotCommand(command="restart", description="Redémarrer Agent Zero"),
+ BotCommand(command="topicname", description="Tester le renommage du sujet"),
+ BotCommand(command="send", description="Envoyer la queue"),
+ ]
+ await instance.bot.set_my_commands(commands)
+
+
def _make_group_mention_filter(handler: Callable, bot: Bot):
"""Create a group message handler that only responds to mentions and replies."""
async def _group_handler(message: Message):
@@ -185,6 +222,15 @@ async def setup_webhook(instance: BotInstance, webhook_url: str, secret: str = "
await instance.bot.set_webhook(
url=full_url,
secret_token=secret or None,
+ drop_pending_updates=False,
+ max_connections=40,
+ allowed_updates=[
+ "message",
+ "edited_message",
+ "callback_query",
+ "my_chat_member",
+ "chat_member",
+ ],
)
instance.webhook_active = True
@@ -227,3 +273,95 @@ async def test_token(token: str) -> tuple[bool, str]:
return True, f"Connected as @{info.username} ({info.first_name})"
except Exception as e:
return False, format_error(e)
+
+
+# Webhook-safe bootstrap helpers
+
+def _get_current_bot_cfg(bot_name: str) -> dict:
+ """Fetch the latest bot config by name, so handlers always use fresh settings."""
+ from helpers import plugins
+
+ config = plugins.get_plugin_config(PLUGIN_NAME) or {}
+ for b in config.get("bots", []):
+ if b.get("name") == bot_name:
+ return b
+ return {}
+
+
+def _make_handler(handler_fn):
+ """Create a wrapper that resolves fresh bot config on every call."""
+ async def _wrapped(event, bot_name: str, bot_cfg: dict):
+ await handler_fn(event, bot_name, _get_current_bot_cfg(bot_name) or bot_cfg)
+ return _wrapped
+
+
+async def ensure_bot_running_from_config(name: str) -> BotInstance | None:
+ """Recreate a Telegram bot instance from plugin config when registry is empty.
+
+ This makes webhook delivery independent from the periodic job loop: after a process
+ restart, the first webhook request can bootstrap the bot/dispatcher/handlers itself.
+ """
+ inst = get_bot(name)
+ if inst:
+ return inst
+
+ from functools import partial
+ from helpers import plugins
+ from plugins._telegram_integration.helpers.handler import (
+ handle_start,
+ handle_clear,
+ handle_new_chat,
+ handle_message,
+ handle_callback_query,
+ handle_affect_project,
+ handle_new_members,
+ handle_forum_topic_closed,
+ )
+
+ config = plugins.get_plugin_config(PLUGIN_NAME) or {}
+ bot_cfg = next(
+ (
+ b for b in config.get("bots", [])
+ if b.get("name") == name and b.get("enabled") and b.get("token")
+ ),
+ None,
+ )
+ if not bot_cfg:
+ return None
+
+ _on_start = partial(_make_handler(handle_start), bot_name=name, bot_cfg=bot_cfg)
+ _on_clear = partial(_make_handler(handle_clear), bot_name=name, bot_cfg=bot_cfg)
+ _on_new_chat = partial(_make_handler(handle_new_chat), bot_name=name, bot_cfg=bot_cfg)
+ _on_message = partial(_make_handler(handle_message), bot_name=name, bot_cfg=bot_cfg)
+ _on_callback = partial(_make_handler(handle_callback_query), bot_name=name, bot_cfg=bot_cfg)
+ _on_affect_project = partial(_make_handler(handle_affect_project), bot_name=name, bot_cfg=bot_cfg)
+ _on_new_members = partial(_make_handler(handle_new_members), bot_name=name, bot_cfg=bot_cfg)
+ _on_forum_topic_closed = partial(_make_handler(handle_forum_topic_closed), bot_name=name, bot_cfg=bot_cfg)
+
+ inst = create_bot(
+ name=name,
+ token=bot_cfg["token"],
+ on_message=_on_message,
+ on_command_start=_on_start,
+ on_command_clear=_on_clear,
+ on_command_new=_on_new_chat,
+ on_command_control=_on_message,
+ on_command_affect_project=_on_affect_project,
+ on_callback_query=_on_callback,
+ on_new_members=_on_new_members,
+ on_forum_topic_closed=_on_forum_topic_closed,
+ group_mode=bot_cfg.get("group_mode", "mention"),
+ )
+
+ await cache_bot_info(inst)
+ await set_bot_commands(inst)
+
+ mode = bot_cfg.get("mode", "polling")
+ if mode == "webhook":
+ webhook_url = bot_cfg.get("webhook_url", "")
+ if webhook_url:
+ await setup_webhook(inst, webhook_url, bot_cfg.get("webhook_secret", ""))
+ else:
+ await start_polling(inst)
+
+ return inst
diff --git a/plugins/_telegram_integration/helpers/handler.py b/plugins/_telegram_integration/helpers/handler.py
index ea2dbd055d..38b6ff4996 100644
--- a/plugins/_telegram_integration/helpers/handler.py
+++ b/plugins/_telegram_integration/helpers/handler.py
@@ -1,8 +1,13 @@
+import base64
+import subprocess
+import tempfile
import json
import os
+import re
import threading
import time
import uuid
+from datetime import datetime
from contextlib import asynccontextmanager, suppress
from aiogram import Bot
@@ -12,10 +17,12 @@
from agent import AgentContext, UserMessage
from helpers import plugins, files, projects
+from helpers import process as a0_process
from helpers import message_queue as mq
from helpers import integration_commands
from helpers.notification import NotificationManager, NotificationType, NotificationPriority
-from helpers.persist_chat import save_tmp_chat
+from helpers import persist_chat
+from helpers.persist_chat import save_tmp_chat, _serialize_context, _deserialize_context
from helpers.print_style import PrintStyle
from helpers.errors import format_error
from initialize import initialize_agent
@@ -33,8 +40,10 @@
CTX_TG_USERNAME,
CTX_TG_TYPING_STOP,
CTX_TG_REPLY_TO,
+ CTX_TG_MESSAGE_THREAD_ID,
CTX_TG_ATTACHMENTS,
CTX_TG_KEYBOARD,
+ CTX_TG_BRANCH_SEED,
)
# Chat mapping: (bot_name, tg_user_id) → AgentContext ID
@@ -58,8 +67,14 @@ def _save_state(state: dict):
files.write_file(path, json.dumps(state))
-def _map_key(bot_name: str, user_id: int, chat_id: int) -> str:
- return f"{bot_name}:{user_id}:{chat_id}"
+def _map_key(bot_name: str, user_id: int, chat_id: int, message_thread_id: int | None = None) -> str:
+ """Map a Telegram user/chat/topic to one AgentContext.
+
+ Forum topics in Telegram groups share the same chat_id, so include
+ message_thread_id to avoid cross-topic replies and context leakage.
+ """
+ thread_part = message_thread_id if message_thread_id is not None else "main"
+ return f"{bot_name}:{user_id}:{chat_id}:{thread_part}"
def cleanup_old_attachments():
@@ -125,6 +140,7 @@ def _get_project(bot_cfg: dict, user_id: int) -> str:
async def handle_start(message: TgMessage, bot_name: str, bot_cfg: dict):
"""Handle /start command."""
user = message.from_user
+ PrintStyle.info(f"Telegram handle_start bot={bot_name} chat={message.chat.id} user={getattr(user, 'id', None)} text={getattr(message, 'text', '')!r}")
if not user:
return
@@ -158,7 +174,8 @@ async def handle_clear(message: TgMessage, bot_name: str, bot_cfg: dict):
if not _is_allowed(bot_cfg, user.id, user.username):
return
- key = _map_key(bot_name, user.id, message.chat.id)
+ message_thread_id = getattr(message, "message_thread_id", None)
+ key = _map_key(bot_name, user.id, message.chat.id, message_thread_id)
with _chat_map_lock:
state = _load_state()
@@ -175,6 +192,7 @@ async def handle_clear(message: TgMessage, bot_name: str, bot_cfg: dict):
instance.bot.token, message.chat.id,
"Chat cleared. Send a new message to start fresh.",
parse_mode=None,
+ message_thread_id=message_thread_id,
)
# Send notification
@@ -190,9 +208,514 @@ async def handle_clear(message: TgMessage, bot_name: str, bot_cfg: dict):
)
+def _extract_branch_seed_from_reply(message: TgMessage) -> str:
+ replied = getattr(message, "reply_to_message", None)
+ if not replied:
+ return ""
+ text = (getattr(replied, "text", None) or getattr(replied, "caption", None) or "").strip()
+ if not text:
+ return "[Message Telegram cité sans texte exploitable]"
+ return text[:4000]
+
+
+def _clone_context_for_telegram_branch(
+ parent_context: AgentContext,
+ bot_name: str,
+ bot_cfg: dict,
+ chat_id: int,
+ message_thread_id: int | None,
+ user_id: int,
+ username: str | None,
+ context_name: str | None = None,
+) -> AgentContext:
+ """Clone a parent Agent Zero context for Telegram /branch like the UI branch action.
+
+ Unlike /new, /branch must preserve the full parent chat log and agent history.
+ """
+ data = _serialize_context(parent_context)
+ data.pop("id", None)
+
+ src_name = data.get("name") or "Chat"
+ data["name"] = context_name or f"{src_name} (branch)"
+ data["created_at"] = datetime.now().isoformat()
+
+ ctx_data = data.setdefault("data", {})
+ ctx_data[CTX_TG_BOT] = bot_name
+ ctx_data[CTX_TG_BOT_CFG] = bot_cfg
+ ctx_data[CTX_TG_CHAT_ID] = chat_id
+ ctx_data[CTX_TG_MESSAGE_THREAD_ID] = message_thread_id
+ ctx_data[CTX_TG_USER_ID] = user_id
+ ctx_data[CTX_TG_USERNAME] = username or ""
+ if context_name:
+ ctx_data["tg_topic_base_name"] = context_name[:128]
+
+ context = _deserialize_context(data)
+ save_tmp_chat(context)
+ return context
+
+
+async def handle_new_chat(message: TgMessage, bot_name: str, bot_cfg: dict):
+ """Handle /new and /branch — start a fresh Agent Zero context.
+
+ `/branch ` creates a Telegram forum topic when possible and maps the
+ new Agent Zero chat to that topic. The topic name is also used as the
+ Agent Zero UI chat name.
+ """
+ user = message.from_user
+ if not user:
+ return
+
+ if not _is_allowed(bot_cfg, user.id, user.username):
+ return
+
+ instance = get_bot(bot_name)
+ if not instance:
+ return
+
+ text = (message.text or message.caption or "").strip()
+ command_part, _, arg_part = text.partition(" ")
+ command = command_part.split("@", 1)[0].lower()
+ requested_name = arg_part.strip()
+
+ message_thread_id = getattr(message, "message_thread_id", None)
+ context_name = requested_name or None
+ branch_seed = _extract_branch_seed_from_reply(message) if command == "/branch" else ""
+
+ parent_context = None
+ parent_thread_id = message_thread_id
+ if command == "/branch":
+ parent_context = await _get_or_create_context(
+ bot_name,
+ bot_cfg,
+ message,
+ force_new=False,
+ message_thread_id=parent_thread_id,
+ )
+
+ if command == "/branch" and requested_name:
+ chat_type = getattr(message.chat, "type", None)
+ chat_is_forum = getattr(message.chat, "is_forum", False)
+
+ # Aiogram message.chat may be partial and omit is_forum even when the
+ # Telegram API getChat endpoint reports the supergroup as a forum.
+ if chat_type != "supergroup" or not chat_is_forum:
+ with suppress(Exception):
+ async with _temp_bot(instance.bot.token) as topic_bot:
+ fresh_chat = await topic_bot.get_chat(message.chat.id)
+ chat_type = getattr(fresh_chat, "type", chat_type)
+ chat_is_forum = getattr(fresh_chat, "is_forum", chat_is_forum)
+
+ if chat_type != "supergroup" or not chat_is_forum:
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ "La création de sujet Telegram nécessite un supergroupe avec les sujets/forum activés.",
+ parse_mode=None,
+ message_thread_id=message_thread_id,
+ )
+ return
+ try:
+ async with _temp_bot(instance.bot.token) as topic_bot:
+ topic = await topic_bot.create_forum_topic(
+ chat_id=message.chat.id,
+ name=requested_name[:128],
+ )
+ message_thread_id = topic.message_thread_id
+ except Exception as e:
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ f"Impossible de créer le sujet Telegram: {format_error(e)}. Vérifie que le bot est admin et a le droit de gérer les sujets.",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
+ return
+
+ if command == "/branch" and parent_context:
+ try:
+ context = _clone_context_for_telegram_branch(
+ parent_context,
+ bot_name,
+ bot_cfg,
+ message.chat.id,
+ message_thread_id,
+ user.id,
+ user.username,
+ context_name=context_name,
+ )
+ with _chat_map_lock:
+ state = _load_state()
+ chats = state.setdefault("chats", {})
+ key = _map_key(bot_name, user.id, message.chat.id, message_thread_id)
+ chats[key] = context.id
+ _save_state(state)
+ PrintStyle.success(
+ f"Telegram ({bot_name}): branched chat {parent_context.id} -> {context.id} thread={message_thread_id or 'main'}"
+ )
+ with suppress(Exception):
+ from helpers.state_monitor_integration import mark_dirty_all
+ mark_dirty_all(reason="telegram.branch_chat")
+ except Exception as e:
+ PrintStyle.error(f"Telegram: failed to branch context: {format_error(e)}")
+ context = None
+ else:
+ context = await _get_or_create_context(
+ bot_name,
+ bot_cfg,
+ message,
+ force_new=True,
+ message_thread_id=message_thread_id,
+ context_name=context_name,
+ )
+
+ if context:
+ if requested_name and command == "/branch":
+ context.data["tg_topic_base_name"] = requested_name[:128]
+ save_tmp_chat(context)
+ if branch_seed:
+ context.data[CTX_TG_BRANCH_SEED] = branch_seed
+ seed_msg = (
+ "Contexte de branche créé depuis ce message Telegram cité :\n\n"
+ f"> {branch_seed}"
+ )
+ msg_id = str(uuid.uuid4())
+ mq.log_user_message(context, seed_msg, [], message_id=msg_id, source=" (telegram branch seed)")
+ context.communicate(UserMessage(message=seed_msg, id=msg_id))
+ save_tmp_chat(context)
+ target = f" dans le sujet « {requested_name} »" if requested_name and command == "/branch" else ""
+ suffix = " avec le message cité comme point de départ" if branch_seed else ""
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ f"Nouveau chat Agent Zero créé{target}{suffix}. ID: {context.id}",
+ parse_mode=None,
+ message_thread_id=message_thread_id,
+ )
+ else:
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ "Impossible de créer un nouveau chat Agent Zero.",
+ parse_mode=None,
+ message_thread_id=message_thread_id,
+ )
+
+
+def _pop_topic_contexts(bot_name: str, chat_id: int, message_thread_id: int) -> list[tuple[str, str]]:
+ """Remove Telegram topic mappings from state and return mapped Agent Zero context ids."""
+ removed: list[tuple[str, str]] = []
+ with _chat_map_lock:
+ state = _load_state()
+ chats = state.setdefault("chats", {})
+ prefix = f"{bot_name}:"
+ suffix = f":{chat_id}:{message_thread_id}"
+ for key, ctx_id in list(chats.items()):
+ if key.startswith(prefix) and key.endswith(suffix):
+ chats.pop(key, None)
+ removed.append((key, ctx_id))
+ if removed:
+ _save_state(state)
+ return removed
+
+
+def _get_context_project_name(context: AgentContext | None, ctx_id: str) -> str | None:
+ """Return the Agent Zero project assigned to a chat context.
+
+ Telegram topic archival must use the same project key as chat_project_filter.
+ Agent Zero stores project assignment in context.data["project"] via
+ helpers.projects.activate_project(...), not in /projects//chats/.
+ """
+ if context:
+ with suppress(Exception):
+ project_name = context.get_data("project")
+ if project_name:
+ return str(project_name)
+ if hasattr(context, "project") and context.project:
+ return context.project.name
+
+ projects_base = "/a0/usr/projects"
+ if os.path.exists(projects_base):
+ for proj in os.listdir(projects_base):
+ if proj.startswith("_"):
+ continue
+ if os.path.exists(os.path.join(projects_base, proj, "chats", ctx_id)):
+ return proj
+ return None
+
+
+def _archive_chat_like_chat_project_filter(ctx_id: str) -> str:
+ """Archive then remove an Agent Zero chat using chat_project_filter's archive format."""
+ import shutil
+ from datetime import datetime
+
+ context = AgentContext.use(ctx_id)
+ if not context:
+ raise RuntimeError(f"Context {ctx_id} not found")
+
+ project_name = _get_context_project_name(context, ctx_id)
+ if project_name:
+ archive_dir = os.path.join("/a0/usr/projects", project_name, "chatProject")
+ else:
+ archive_dir = "/a0/usr/projects/_unassigned/chatProject"
+ os.makedirs(archive_dir, exist_ok=True)
+
+ chat_json = persist_chat.export_json_chat(context)
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ chat_name = getattr(context, "name", None) or f"Chat#{getattr(context, 'no', 'Unknown')}"
+ safe_name = "".join(c for c in chat_name if c.isalnum() or c in (" ", "-", "_")).strip()
+ safe_name = safe_name.replace(" ", "_")[:50]
+ filename = f"{timestamp}_{safe_name}_{ctx_id[:8]}.json"
+ file_path = os.path.join(archive_dir, filename)
+
+ archive_data = {
+ "original_ctxid": ctx_id,
+ "original_name": chat_name,
+ "project_name": project_name,
+ "archived_at": datetime.now().isoformat(),
+ "context_data": json.loads(chat_json) if isinstance(chat_json, str) else chat_json,
+ }
+ with open(file_path, "w", encoding="utf-8") as f:
+ json.dump(archive_data, f, indent=2, ensure_ascii=False)
+
+ if context:
+ context.reset()
+ AgentContext.remove(ctx_id)
+ chat_dir = f"/a0/usr/chats/{ctx_id}"
+ if os.path.exists(chat_dir):
+ shutil.rmtree(chat_dir)
+ return file_path
+
+
+def _close_chat(ctx_id: str):
+ context = AgentContext.get(ctx_id)
+ if context:
+ context.reset()
+ AgentContext.remove(ctx_id)
+ persist_chat.remove_chat(ctx_id)
+
+
+async def handle_forum_topic_closed(message: TgMessage, bot_name: str, bot_cfg: dict):
+ """Archive the Agent Zero chat mapped to a closed Telegram forum topic."""
+ message_thread_id = getattr(message, "message_thread_id", None)
+ if message_thread_id is None:
+ return
+
+ removed = _pop_topic_contexts(bot_name, message.chat.id, message_thread_id)
+
+ archived_paths: list[str] = []
+ for _, ctx_id in removed:
+ try:
+ archived_paths.append(_archive_chat_like_chat_project_filter(ctx_id))
+ except Exception as e:
+ PrintStyle.error(f"Telegram ({bot_name}): failed to archive closed topic context {ctx_id}: {format_error(e)}")
+
+ if removed:
+ PrintStyle.info(
+ f"Telegram ({bot_name}): archived {len(archived_paths)}/{len(removed)} Agent Zero chat(s) for closed topic thread={message_thread_id}"
+ )
+ with suppress(Exception):
+ from helpers.state_monitor_integration import mark_dirty_all
+ mark_dirty_all(reason="telegram.forum_topic_closed")
+
+
+async def handle_forum_topic_deleted(message: TgMessage, bot_name: str, bot_cfg: dict):
+ """Close the Agent Zero chat mapped to a deleted Telegram forum topic, if Telegram exposes this event."""
+ message_thread_id = getattr(message, "message_thread_id", None)
+ if message_thread_id is None:
+ return
+
+ removed = _pop_topic_contexts(bot_name, message.chat.id, message_thread_id)
+ for _, ctx_id in removed:
+ try:
+ _close_chat(ctx_id)
+ except Exception as e:
+ PrintStyle.error(f"Telegram ({bot_name}): failed to close deleted topic context {ctx_id}: {format_error(e)}")
+
+ if removed:
+ PrintStyle.info(
+ f"Telegram ({bot_name}): closed {len(removed)} Agent Zero chat(s) for deleted topic thread={message_thread_id}"
+ )
+ with suppress(Exception):
+ from helpers.state_monitor_integration import mark_dirty_all
+ mark_dirty_all(reason="telegram.forum_topic_deleted")
+
+
+async def handle_restart(message: TgMessage, bot_name: str, bot_cfg: dict):
+ """Restart Agent Zero using the same reload mechanism as the UI Restart button."""
+ user = message.from_user
+ PrintStyle.info(f"Telegram handle_restart bot={bot_name} chat={message.chat.id} user={getattr(user, 'id', None)}")
+ if not user:
+ return
+ if not _is_allowed(bot_cfg, user.id, user.username):
+ return
+
+ instance = get_bot(bot_name)
+ if not instance:
+ return
+
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ "Redémarrage d’Agent Zero demandé. Je reviens dans quelques secondes.",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
+
+ def _reload_later():
+ with suppress(Exception):
+ PrintStyle.info(f"Telegram ({bot_name}): restarting Agent Zero via /restart")
+ a0_process.reload()
+
+ threading.Timer(1.0, _reload_later).start()
+
+
+async def handle_topicname(message: TgMessage, bot_name: str, bot_cfg: dict):
+ """Diagnostic command: rename the current Telegram forum topic directly."""
+ user = message.from_user
+ PrintStyle.info(f"Telegram handle_topicname bot={bot_name} chat={message.chat.id} user={getattr(user, 'id', None)}")
+ if not user:
+ return
+ if not _is_allowed(bot_cfg, user.id, user.username):
+ return
+
+ instance = get_bot(bot_name)
+ if not instance:
+ return
+
+ text = message.text or message.caption or ""
+ _, requested_name = _parse_slash_command(text)
+ requested_name = (requested_name or "").strip()
+ message_thread_id = getattr(message, "message_thread_id", None)
+
+ if not requested_name:
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ "Usage : /topicname ",
+ parse_mode=None,
+ message_thread_id=message_thread_id,
+ )
+ return
+ if message_thread_id is None:
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ "Impossible : Telegram ne fournit pas l’ID du sujet courant pour cette commande.",
+ parse_mode=None,
+ message_thread_id=message_thread_id,
+ )
+ return
+
+ name = requested_name[:128].strip()
+ try:
+ async with _temp_bot(instance.bot.token) as topic_bot:
+ await topic_bot.edit_forum_topic(
+ chat_id=message.chat.id,
+ message_thread_id=message_thread_id,
+ name=name,
+ )
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ f"Sujet renommé : {name}",
+ parse_mode=None,
+ message_thread_id=message_thread_id,
+ )
+ PrintStyle.info(f"Telegram ({bot_name}): /topicname renamed thread={message_thread_id} to {name!r}")
+ except Exception as e:
+ error = format_error(e)
+ PrintStyle.error(f"Telegram ({bot_name}): /topicname failed thread={message_thread_id} name={name!r}: {error}")
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ f"Erreur renommage sujet : {error}",
+ parse_mode=None,
+ message_thread_id=message_thread_id,
+ )
+
+
+async def handle_pause(message: TgMessage, bot_name: str, bot_cfg: dict):
+ """Pause the active Agent Zero run for this Telegram chat/topic."""
+ user = message.from_user
+ PrintStyle.info(f"Telegram handle_pause bot={bot_name} chat={message.chat.id} user={getattr(user, 'id', None)}")
+ if not user:
+ return
+ if not _is_allowed(bot_cfg, user.id, user.username):
+ return
+
+ instance = get_bot(bot_name)
+ if not instance:
+ return
+
+ context = await _get_or_create_context(bot_name, bot_cfg, message)
+ if not context:
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ "Impossible de trouver/créer la session à mettre en pause.",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
+ return
+
+ try:
+ context.paused = True
+ save_tmp_chat(context)
+ with suppress(Exception):
+ from helpers.state_monitor_integration import mark_dirty_for_context
+ mark_dirty_for_context(context.id, reason="telegram.pause")
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ "Agent mis en pause.",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
+ except Exception as e:
+ PrintStyle.error(f"Telegram pause failed: {e}")
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ f"Erreur pause : {format_error(e)}",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
+
+
+async def handle_nudge(message: TgMessage, bot_name: str, bot_cfg: dict):
+ """Nudge the active Agent Zero run for this Telegram chat/topic."""
+ user = message.from_user
+ PrintStyle.info(f"Telegram handle_nudge bot={bot_name} chat={message.chat.id} user={getattr(user, 'id', None)}")
+ if not user:
+ return
+ if not _is_allowed(bot_cfg, user.id, user.username):
+ return
+
+ instance = get_bot(bot_name)
+ if not instance:
+ return
+
+ context = await _get_or_create_context(bot_name, bot_cfg, message)
+ if not context:
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ "Impossible de trouver/créer la session à relancer.",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
+ return
+
+ try:
+ context.nudge()
+ save_tmp_chat(context)
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ "✅ Nudge envoyé à l’agent.",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
+ except Exception as e:
+ PrintStyle.error(f"Telegram nudge failed: {e}")
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ f"Erreur nudge : {format_error(e)}",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
+
+
async def handle_message(message: TgMessage, bot_name: str, bot_cfg: dict):
"""Handle incoming user message."""
user = message.from_user
+ PrintStyle.info(f"Telegram handle_message bot={bot_name} chat={message.chat.id} user={getattr(user, 'id', None)} text={(getattr(message, 'text', None) or getattr(message, 'caption', None) or '')[:80]!r}")
if not user:
return
@@ -210,20 +733,76 @@ async def handle_message(message: TgMessage, bot_name: str, bot_cfg: dict):
instance.bot.token, message.chat.id,
"Failed to create chat session.",
parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
+ return
+
+ # Telegram-specific /now and /later handling.
+ # /now forces an immediate intervention (native Agent Zero behavior),
+ # /later stores work without interrupting the current run.
+ command, command_args = _parse_slash_command(text)
+ if command == "/restart":
+ await handle_restart(message, bot_name, bot_cfg)
+ return
+ if command == "/topicname":
+ await handle_topicname(message, bot_name, bot_cfg)
+ return
+ if command == "/pause":
+ await handle_pause(message, bot_name, bot_cfg)
+ return
+ if command == "/now":
+ if not command_args:
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ "Usage : /now — interrompt le raisonnement courant et envoie immédiatement.",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
+ return
+ text = command_args
+ elif command == "/later" and command_args:
+ async with _temp_bot(instance.bot.token) as dl_bot:
+ attachments = await _download_attachments(dl_bot, message, bot_name=bot_name)
+ attachments += await _download_quoted_attachments(dl_bot, message, bot_name=bot_name)
+ agent = context.agent0
+ queued_body = _with_quoted_message_context(message, command_args)
+ queued_msg = agent.read_prompt(
+ "fw.telegram.user_message.md",
+ sender=_format_user(user),
+ body=queued_body,
+ )
+ item = mq.add(context, queued_msg, attachments)
+ save_tmp_chat(context)
+ with suppress(Exception):
+ from helpers.state_monitor_integration import mark_dirty_for_context
+ mark_dirty_for_context(context.id, reason="telegram.queue_add")
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ f"Ajouté à la queue #{item.get('seq')}. Utilise /send ou /queue send pour lancer.",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
)
return
command_reply = integration_commands.try_handle_command(context, text)
if command_reply is not None:
- await _send_with_temp_bot(instance.bot.token, message.chat.id, command_reply, parse_mode=None)
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id, command_reply,
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
return
# Start persistent typing indicator (thread-based, works across event loops)
- typing_stop = _start_typing(instance.bot.token, message.chat.id)
+ typing_stop = _start_typing(instance.bot.token, message.chat.id, getattr(message, "message_thread_id", None))
# Store stop event so send_telegram_reply can cancel typing
context.data[CTX_TG_TYPING_STOP] = typing_stop
+ # Preserve Telegram forum topic/thread so replies stay in the originating subject.
+ thread_id = getattr(message, "message_thread_id", None)
+ context.data[CTX_TG_MESSAGE_THREAD_ID] = thread_id
+
# In group chats, if user replied to the bot's message, reply to the user's message
reply_to_id = None
if message.chat.type != "private" and instance.bot_info:
@@ -236,13 +815,15 @@ async def handle_message(message: TgMessage, bot_name: str, bot_cfg: dict):
# Use temp bot for downloads (cross-event-loop safe)
async with _temp_bot(instance.bot.token) as dl_bot:
attachments = await _download_attachments(dl_bot, message, bot_name=bot_name)
+ attachments += await _download_quoted_attachments(dl_bot, message, bot_name=bot_name)
- # Build user message with prompt
+ # Build user message with prompt, including Telegram quoted/replied message context.
agent = context.agent0
+ body_text = _with_quoted_message_context(message, text)
user_msg = agent.read_prompt(
"fw.telegram.user_message.md",
sender=_format_user(user),
- body=text,
+ body=body_text,
)
msg_id = str(uuid.uuid4())
@@ -269,8 +850,328 @@ async def handle_message(message: TgMessage, bot_name: str, bot_cfg: dict):
)
+def _parse_slash_command(text: str) -> tuple[str, str]:
+ line = ""
+ for candidate in (text or "").splitlines():
+ candidate = candidate.strip()
+ if candidate:
+ line = candidate
+ break
+ if not line.startswith("/"):
+ return "", ""
+ command, _, args = line.partition(" ")
+ command = command.split("@", 1)[0].strip().lower()
+ return command, args.strip()
+
+
+def _chunk_rows(items: list[dict], per_row: int = 1) -> list[list[dict]]:
+ rows: list[list[dict]] = []
+ for i in range(0, len(items), per_row):
+ rows.append(items[i:i + per_row])
+ return rows
+
+
+def _active_project_items() -> list[dict]:
+ try:
+ return projects.get_active_projects_list() or []
+ except Exception:
+ items: list[dict] = []
+ base = "/a0/usr/projects"
+ if not os.path.isdir(base):
+ return items
+ for name in sorted(os.listdir(base)):
+ if name.startswith("_"):
+ continue
+ header = os.path.join(base, name, ".a0proj", "project.json")
+ if not os.path.isfile(header):
+ continue
+ title = name
+ with suppress(Exception):
+ data = json.loads(files.read_file(header))
+ title = data.get("title") or name
+ items.append({"name": name, "title": title})
+ return items
+
+
+def _project_label(item: dict) -> str:
+ title = str(item.get("title") or "").strip()
+ name = str(item.get("name") or "").strip()
+ if title and title.lower() != name.lower():
+ return f"{title} ({name})"
+ return title or name
+
+
+def _project_picker_keyboard(items: list[dict], current: str | None, message_thread_id: int | None, context_id: str | None = None) -> list[list[dict]]:
+ """Build the project picker.
+
+ Prefer encoding the AgentContext id in callback data. Telegram sometimes
+ omits message_thread_id on inline callbacks, while the context always stores
+ the originating topic id. The legacy thread-based callback remains supported
+ by _handle_project_callback for already displayed keyboards.
+ """
+ current = str(current or "").strip()
+ if context_id:
+ callback_prefix = f"tg_project_ctx:{context_id}"
+ else:
+ thread_part = str(message_thread_id) if message_thread_id is not None else "main"
+ callback_prefix = f"tg_project:{thread_part}"
+ buttons = []
+ for index, item in enumerate(items[:48]):
+ name = item.get("name") or ""
+ label = _project_label(item)
+ prefix = "✅ " if name == current else ""
+ if context_id:
+ callback_data = f"{callback_prefix}:i:{index}"
+ else:
+ callback_data = f"{callback_prefix}:{name}"
+ buttons.append({"text": (prefix + label)[:64], "callback_data": callback_data[:64]})
+ buttons.append({"text": "❌ Aucun projet", "callback_data": f"{callback_prefix}:none"[:64]})
+ return _chunk_rows(buttons, 1)
+
+
+async def handle_affect_project(message: TgMessage, bot_name: str, bot_cfg: dict):
+ """Show an inline project picker to assign the current Telegram chat/topic."""
+ user = message.from_user
+ if not user:
+ return
+ if not _is_allowed(bot_cfg, user.id, user.username):
+ return
+
+ instance = get_bot(bot_name)
+ if not instance:
+ return
+
+ context = await _get_or_create_context(bot_name, bot_cfg, message)
+ current = context.get_data("project") if context else ""
+ message_thread_id = getattr(message, "message_thread_id", None)
+ if context and message_thread_id is not None:
+ context.data[CTX_TG_MESSAGE_THREAD_ID] = message_thread_id
+ save_tmp_chat(context)
+ items = _active_project_items()
+ if not items:
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ "Aucun projet actif disponible.",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ )
+ return
+
+ keyboard = _project_picker_keyboard(items, current, message_thread_id, context.id if context else None)
+
+ current_label = current or "aucun"
+ await _send_with_temp_bot(
+ instance.bot.token, message.chat.id,
+ f"Projet actuel : {current_label}\nChoisis le projet à associer à ce chat :",
+ parse_mode=None,
+ message_thread_id=getattr(message, "message_thread_id", None),
+ keyboard=keyboard,
+ )
+
+
+def _strip_project_prefix_from_topic(name: str) -> str:
+ name = str(name or "").strip()
+ # Remove prefixes previously generated by this plugin to avoid stacking.
+ return re.sub(r"^.+?\s+[—-]\s+", "", name, count=1).strip() or name
+
+
+def _telegram_topic_project_name(label: str | None, base_name: str | None = None) -> str:
+ label = str(label or "").strip()
+ base_name = _strip_project_prefix_from_topic(base_name or "")
+ if not label:
+ name = base_name or "Agent Zero"
+ elif base_name and base_name.lower() != label.lower():
+ name = f"{label} — {base_name}"
+ else:
+ name = label or base_name or "Agent Zero"
+ # Telegram forum topic names are limited to 128 chars; keep the project visible first.
+ return name[:128].strip()
+
+
+def _remember_topic_base_name(context: AgentContext | None, base_name: str | None) -> None:
+ if not context:
+ return
+ base_name = _strip_project_prefix_from_topic(base_name or "")
+ if base_name:
+ context.data["tg_topic_base_name"] = base_name[:128]
+
+
+async def _rename_forum_topic_for_project(instance, chat_id: int, message_thread_id: int | None, label: str | None, base_name: str | None = None) -> str | None:
+ if not instance:
+ PrintStyle.error("Telegram: skipped forum topic rename because bot instance is unavailable")
+ return None
+ if message_thread_id is None:
+ PrintStyle.error("Telegram: skipped forum topic rename because message_thread_id is missing")
+ return None
+ name = _telegram_topic_project_name(label, base_name)
+ token = getattr(getattr(instance, "bot", None), "token", None)
+ if not token:
+ PrintStyle.error("Telegram: skipped forum topic rename because bot token is unavailable")
+ return None
+ try:
+ async with _temp_bot(token) as topic_bot:
+ await topic_bot.edit_forum_topic(
+ chat_id=chat_id,
+ message_thread_id=message_thread_id,
+ name=name,
+ )
+ PrintStyle.info(f"Telegram: renamed forum topic thread={message_thread_id} to {name!r}")
+ return name
+ except Exception as e:
+ PrintStyle.error(f"Telegram: failed to rename forum topic thread={message_thread_id} to {name!r}: {format_error(e)}")
+ return None
+
+
+async def _refresh_project_picker_message(query: CallbackQuery, instance, items: list[dict], current: str | None, message_thread_id: int | None, context_id: str | None = None) -> None:
+ """Edit the inline project picker so the green tick moves immediately."""
+ if not instance or not query.message:
+ return
+ token = getattr(getattr(instance, "bot", None), "token", None)
+ if not token:
+ return
+ try:
+ keyboard = tc.build_inline_keyboard(_project_picker_keyboard(items, current, message_thread_id, context_id))
+ current_label = current or "aucun"
+ async with _temp_bot(token) as picker_bot:
+ await picker_bot.edit_message_text(
+ chat_id=query.message.chat.id,
+ message_id=query.message.message_id,
+ text=f"Projet actuel : {current_label}\nChoisis le projet à associer à ce chat :",
+ reply_markup=keyboard,
+ parse_mode=None,
+ )
+ except Exception as e:
+ PrintStyle.error(f"Telegram: failed to refresh project picker: {format_error(e)}")
+
+
+async def _handle_project_callback(query: CallbackQuery, bot_name: str, bot_cfg: dict) -> bool:
+ data = query.data or ""
+ if not (data.startswith("tg_project:") or data.startswith("tg_project_ctx:")):
+ return False
+
+ user = query.from_user
+ if not user or not query.message:
+ return True
+ if not _is_allowed(bot_cfg, user.id, user.username):
+ await query.answer("Non autorisé.")
+ return True
+
+ context = None
+ message_thread_id = getattr(query.message, "message_thread_id", None)
+ context_id = None
+
+ if data.startswith("tg_project_ctx:"):
+ raw_payload = data.split(":", 1)[1]
+ payload_parts = raw_payload.split(":", 1)
+ if len(payload_parts) != 2:
+ await query.answer("Callback projet invalide.")
+ return True
+ context_id, selected = payload_parts
+ context = AgentContext.get(context_id)
+ if context:
+ stored_thread_id = context.data.get(CTX_TG_MESSAGE_THREAD_ID)
+ if stored_thread_id is not None:
+ with suppress(Exception):
+ message_thread_id = int(stored_thread_id)
+ if selected.startswith("i:"):
+ with suppress(Exception):
+ project_index = int(selected.split(":", 1)[1])
+ indexed_items = _active_project_items()
+ if 0 <= project_index < len(indexed_items[:48]):
+ selected = indexed_items[project_index].get("name") or ""
+ else:
+ raw_payload = data.split(":", 1)[1]
+ payload_parts = raw_payload.split(":", 1)
+ if len(payload_parts) == 2 and (payload_parts[0] == "main" or payload_parts[0].isdigit()):
+ thread_part, selected = payload_parts
+ callback_thread_id = getattr(query.message, "message_thread_id", None)
+ # Older project pickers used "main" when Telegram did not expose the
+ # thread at send time. On callback, Telegram can still include the real
+ # message_thread_id; prefer it so old buttons can rename the topic too.
+ if thread_part == "main":
+ message_thread_id = callback_thread_id
+ else:
+ message_thread_id = int(thread_part)
+ else:
+ selected = raw_payload
+ message_thread_id = getattr(query.message, "message_thread_id", None)
+
+ if context is None:
+ context = await _get_or_create_context_from_user(
+ bot_name, bot_cfg, user.id, user.username, query.message.chat.id,
+ message_thread_id,
+ )
+ if not context:
+ await query.answer("Contexte introuvable.")
+ return True
+
+ # Telegram callback messages can omit message_thread_id. Fall back to the
+ # context value so topic renaming is not skipped after selecting a project.
+ if message_thread_id is None:
+ stored_thread_id = context.data.get(CTX_TG_MESSAGE_THREAD_ID)
+ if stored_thread_id is not None:
+ with suppress(Exception):
+ message_thread_id = int(stored_thread_id)
+
+ instance = get_bot(bot_name)
+ PrintStyle.info(
+ f"Telegram ({bot_name}): project callback selected={selected!r} context={getattr(context, 'id', None)} "
+ f"thread={message_thread_id!r} chat={getattr(query.message.chat, 'id', None)} instance={bool(instance)}"
+ )
+ if selected == "none":
+ projects.deactivate_project(context.id)
+ save_tmp_chat(context)
+ with suppress(Exception):
+ from helpers.state_monitor_integration import mark_dirty_for_context
+ mark_dirty_for_context(context.id, reason="telegram.project_assign_clear")
+ with suppress(Exception):
+ await query.answer("Projet retiré.")
+ if instance:
+ items = _active_project_items()
+ await _refresh_project_picker_message(query, instance, items, "", message_thread_id, context.id)
+ base_name = context.data.get("tg_topic_base_name") or getattr(context, "name", "")
+ _remember_topic_base_name(context, base_name)
+ renamed_to = await _rename_forum_topic_for_project(instance, query.message.chat.id, message_thread_id, None, base_name)
+ if renamed_to:
+ context.data["tg_topic_name"] = renamed_to
+ save_tmp_chat(context)
+ await _send_with_temp_bot(instance.bot.token, query.message.chat.id, "Projet retiré de ce chat. Le préfixe du sujet a été retiré.", parse_mode=None, message_thread_id=message_thread_id)
+ return True
+
+ items = _active_project_items()
+ match = next((item for item in items if item.get("name") == selected), None)
+ if not match:
+ await query.answer("Projet introuvable.")
+ return True
+
+ projects.activate_project(context.id, selected)
+ save_tmp_chat(context)
+ with suppress(Exception):
+ from helpers.state_monitor_integration import mark_dirty_for_context
+ mark_dirty_for_context(context.id, reason="telegram.project_assign_set")
+ label = _project_label(match)
+ with suppress(Exception):
+ await query.answer(f"Associé à {label}")
+ if not instance:
+ PrintStyle.error(f"Telegram ({bot_name}): project callback cannot rename topic because bot instance is unavailable")
+ return True
+ await _refresh_project_picker_message(query, instance, items, selected, message_thread_id, context.id)
+ base_name = context.data.get("tg_topic_base_name") or getattr(context, "name", "")
+ _remember_topic_base_name(context, base_name)
+ renamed_to = await _rename_forum_topic_for_project(instance, query.message.chat.id, message_thread_id, label, base_name)
+ if renamed_to:
+ context.data["tg_topic_name"] = renamed_to
+ save_tmp_chat(context)
+ suffix = f"\nSujet renommé : {renamed_to}" if renamed_to else ""
+ await _send_with_temp_bot(instance.bot.token, query.message.chat.id, f"Chat associé au projet : {label}{suffix}", parse_mode=None, message_thread_id=message_thread_id)
+ return True
+
+
async def handle_callback_query(query: CallbackQuery, bot_name: str, bot_cfg: dict):
"""Handle inline keyboard button press."""
+ if await _handle_project_callback(query, bot_name, bot_cfg):
+ return
+
user = query.from_user
if not user or not query.message:
return
@@ -286,8 +1187,10 @@ async def handle_callback_query(query: CallbackQuery, bot_name: str, bot_cfg: di
if not text:
return
+ message_thread_id = getattr(query.message, "message_thread_id", None)
context = await _get_or_create_context_from_user(
bot_name, bot_cfg, user.id, user.username, query.message.chat.id,
+ message_thread_id,
)
if not context:
return
@@ -296,7 +1199,11 @@ async def handle_callback_query(query: CallbackQuery, bot_name: str, bot_cfg: di
if command_reply is not None:
instance = get_bot(bot_name)
if instance:
- await _send_with_temp_bot(instance.bot.token, query.message.chat.id, command_reply, parse_mode=None)
+ await _send_with_temp_bot(
+ instance.bot.token, query.message.chat.id, command_reply,
+ parse_mode=None,
+ message_thread_id=message_thread_id,
+ )
return
agent = context.agent0
@@ -342,12 +1249,20 @@ async def _get_or_create_context(
bot_name: str,
bot_cfg: dict,
message: TgMessage,
+ force_new: bool = False,
+ message_thread_id: int | None = None,
+ context_name: str | None = None,
) -> AgentContext | None:
user = message.from_user
if not user:
return None
+ if message_thread_id is None:
+ message_thread_id = getattr(message, "message_thread_id", None)
return await _get_or_create_context_from_user(
bot_name, bot_cfg, user.id, user.username, message.chat.id,
+ message_thread_id,
+ force_new=force_new,
+ context_name=context_name,
)
@@ -357,18 +1272,24 @@ async def _get_or_create_context_from_user(
user_id: int,
username: str | None,
chat_id: int,
+ message_thread_id: int | None = None,
+ force_new: bool = False,
+ context_name: str | None = None,
) -> AgentContext | None:
- key = _map_key(bot_name, user_id, chat_id)
+ key = _map_key(bot_name, user_id, chat_id, message_thread_id)
with _chat_map_lock:
state = _load_state()
chats = state.setdefault("chats", {})
- ctx_id = chats.get(key)
+ ctx_id = None if force_new else chats.get(key)
# Check if existing context is still alive
if ctx_id:
ctx = AgentContext.get(ctx_id)
if ctx:
+ if message_thread_id is not None:
+ ctx.data[CTX_TG_MESSAGE_THREAD_ID] = message_thread_id
+ save_tmp_chat(ctx)
return ctx
# Context was garbage collected, remove stale mapping
chats.pop(key, None)
@@ -377,11 +1298,13 @@ async def _get_or_create_context_from_user(
try:
config = initialize_agent()
display_name = f"@{username}" if username else str(user_id)
- ctx = AgentContext(config, name=f"Telegram: {display_name}")
+ chat_name = context_name or f"Telegram: {display_name}"
+ ctx = AgentContext(config, name=chat_name)
ctx.data[CTX_TG_BOT] = bot_name
ctx.data[CTX_TG_BOT_CFG] = bot_cfg
ctx.data[CTX_TG_CHAT_ID] = chat_id
+ ctx.data[CTX_TG_MESSAGE_THREAD_ID] = message_thread_id
ctx.data[CTX_TG_USER_ID] = user_id
ctx.data[CTX_TG_USERNAME] = username or ""
@@ -396,7 +1319,7 @@ async def _get_or_create_context_from_user(
_save_state(state)
PrintStyle.success(
- f"Telegram ({bot_name}): new chat {ctx.id} for user {display_name}"
+ f"Telegram ({bot_name}): new chat {ctx.id} for user {display_name} thread={message_thread_id or 'main'}"
)
return ctx
@@ -406,6 +1329,56 @@ async def _get_or_create_context_from_user(
# Message content extraction
+def _format_telegram_sender_from_message(message: TgMessage) -> str:
+ user = getattr(message, "from_user", None)
+ if not user:
+ return "expéditeur inconnu"
+ return _format_user(user)
+
+
+def _extract_quoted_message_context(message: TgMessage) -> str:
+ """Return readable context for the Telegram message quoted/replied to, if any."""
+ replied = getattr(message, "reply_to_message", None)
+ if not replied:
+ return ""
+
+ parts: list[str] = []
+ sender = _format_telegram_sender_from_message(replied)
+ quoted_text = (getattr(replied, "text", None) or getattr(replied, "caption", None) or "").strip()
+ if quoted_text:
+ parts.append(quoted_text[:4000])
+
+ indicators: list[str] = []
+ checks = [
+ ("photo", "photo"),
+ ("document", "document"),
+ ("audio", "audio"),
+ ("voice", "message vocal"),
+ ("video", "vidéo"),
+ ("video_note", "note vidéo"),
+ ("animation", "animation"),
+ ("sticker", "sticker"),
+ ("location", "localisation"),
+ ("contact", "contact"),
+ ]
+ for attr, label in checks:
+ if getattr(replied, attr, None):
+ indicators.append(label)
+ if indicators:
+ parts.append("[Pièce(s)/contenu cité(s) : " + ", ".join(indicators) + "]")
+
+ body = "\n".join(parts).strip() or "[Message Telegram cité sans texte exploitable]"
+ return f"Message Telegram cité par l’utilisateur (contexte pour comprendre 'ceci') — {sender}:\n{body}"
+
+
+def _with_quoted_message_context(message: TgMessage, body: str) -> str:
+ quoted = _extract_quoted_message_context(message)
+ body = (body or "").strip()
+ if not quoted:
+ return body
+ return f"{quoted}\n\nMessage actuel de l’utilisateur:\n{body}"
+
+
def _extract_message_content(message: TgMessage) -> str:
parts = []
@@ -433,6 +1406,18 @@ def _extract_message_content(message: TgMessage) -> str:
return "\n".join(parts) if parts else "[No text content]"
+async def _download_quoted_attachments(bot, message: TgMessage, bot_name: str = "") -> list[str]:
+ """Download attachments from the Telegram message quoted/replied to, if any."""
+ replied = getattr(message, "reply_to_message", None)
+ if not replied:
+ return []
+ try:
+ return await _download_attachments(bot, replied, bot_name=bot_name)
+ except Exception as e:
+ PrintStyle.error(f"Telegram: failed to download quoted attachments: {format_error(e)}")
+ return []
+
+
async def _download_attachments(bot, message: TgMessage, bot_name: str = "") -> list[str]:
"""Download photos, documents, audio, voice, video from message."""
paths: list[str] = []
@@ -477,6 +1462,25 @@ async def _dl(file_id: str, filename: str) -> str | None:
return paths
+
+def _sanitize_telegram_outbound_text(text: str) -> str:
+ """Remove unwanted Agent Zero mobile/status prefixes before Telegram delivery."""
+ if not text:
+ return text
+
+ cleaned = text.replace("\ufeff", "")
+ # Strip leading whitespace and repeated status prefixes such as:
+ # "GEN", "GEN 🔵", "🔵", "🟦", bullets/dots/check/status emojis.
+ prefix_re = re.compile(
+ r"^(?:\s|(?:GEN\b[\s::\-–—]*)|[🔵🟦🔷🔹🔘●•◦○✅☑️✔️🟢🟡🟠🔴⚪⚫]\s*)+",
+ re.IGNORECASE,
+ )
+ previous = None
+ while previous != cleaned:
+ previous = cleaned
+ cleaned = prefix_re.sub("", cleaned)
+ return cleaned.lstrip()
+
# Reply sending (called from process_chain_end extension)
async def send_telegram_reply(
@@ -484,6 +1488,7 @@ async def send_telegram_reply(
response_text: str,
attachments: list[str] | None = None,
keyboard: list[list[dict]] | None = None,
+ voice: bool = False,
) -> str | None:
"""Send reply to Telegram user. Returns error string or None on success."""
bot_name = context.data.get(CTX_TG_BOT)
@@ -499,6 +1504,7 @@ async def send_telegram_reply(
return "No chat_id on context"
reply_to = context.data.get(CTX_TG_REPLY_TO)
+ message_thread_id = context.data.get(CTX_TG_MESSAGE_THREAD_ID)
try:
async with _temp_bot(instance.bot.token, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) as reply_bot:
@@ -506,16 +1512,23 @@ async def send_telegram_reply(
for path in attachments:
local_path = files.fix_dev_path(path)
if tc.is_image_file(local_path):
- await tc.send_photo(reply_bot, chat_id, local_path, reply_to_message_id=reply_to)
+ await tc.send_photo(reply_bot, chat_id, local_path, reply_to_message_id=reply_to, message_thread_id=message_thread_id)
else:
- await tc.send_file(reply_bot, chat_id, local_path, reply_to_message_id=reply_to)
+ await tc.send_file(reply_bot, chat_id, local_path, reply_to_message_id=reply_to, message_thread_id=message_thread_id)
if response_text:
+ response_text = _sanitize_telegram_outbound_text(response_text)
+ if voice:
+ voice_path = await _generate_telegram_voice(response_text)
+ if voice_path:
+ await tc.send_voice(reply_bot, chat_id, voice_path, reply_to_message_id=reply_to, message_thread_id=message_thread_id)
+ else:
+ PrintStyle.error("Telegram voice requested but TTS generation returned no file; falling back to text")
html_text = tc.md_to_telegram_html(response_text)
if keyboard:
- await tc.send_text_with_keyboard(reply_bot, chat_id, html_text, keyboard, reply_to_message_id=reply_to)
+ await tc.send_text_with_keyboard(reply_bot, chat_id, html_text, keyboard, reply_to_message_id=reply_to, message_thread_id=message_thread_id)
else:
- await tc.send_text(reply_bot, chat_id, html_text, reply_to_message_id=reply_to)
+ await tc.send_text(reply_bot, chat_id, html_text, reply_to_message_id=reply_to, message_thread_id=message_thread_id)
return None
@@ -524,6 +1537,37 @@ async def send_telegram_reply(
PrintStyle.error(f"Telegram reply failed: {error}")
return error
+
+async def _generate_telegram_voice(text: str) -> str | None:
+ """Generate an OGG/Opus voice note for Telegram using Agent Zero Kokoro TTS."""
+ clean = _sanitize_telegram_outbound_text(text or "").strip()
+ if not clean:
+ return None
+ # Keep voice notes concise and avoid speaking huge technical outputs.
+ clean = re.sub(r"```.*?```", "", clean, flags=re.S).strip()
+ clean = re.sub(r"[`*_~#]", "", clean)
+ clean = clean[:1800]
+ try:
+ from helpers.kokoro_tts import synthesize_sentences
+ sentences = [part.strip() for part in re.split(r"(?<=[.!?])\s+|\n+", clean) if part.strip()] or [clean]
+ audio_b64 = await synthesize_sentences(sentences[:12])
+ if not audio_b64:
+ return None
+ with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as wav_file:
+ wav_file.write(base64.b64decode(audio_b64))
+ wav_path = wav_file.name
+ ogg_path = wav_path.rsplit(".", 1)[0] + ".ogg"
+ subprocess.run(
+ ["ffmpeg", "-y", "-loglevel", "error", "-i", wav_path, "-c:a", "libopus", "-b:a", "32k", "-application", "voip", ogg_path],
+ check=True,
+ )
+ with suppress(Exception):
+ os.unlink(wav_path)
+ return ogg_path
+ except Exception as e:
+ PrintStyle.error(f"Telegram TTS generation failed: {format_error(e)}")
+ return None
+
# Helpers
@asynccontextmanager
@@ -537,13 +1581,31 @@ async def _temp_bot(token: str, **kwargs):
await bot.session.close()
-async def _send_with_temp_bot(token: str, chat_id: int, text: str, parse_mode: str | None = None):
+async def _send_with_temp_bot(
+ token: str,
+ chat_id: int,
+ text: str,
+ parse_mode: str | None = None,
+ message_thread_id: int | None = None,
+ keyboard: list[list[dict]] | None = None,
+):
"""Send text using a temporary Bot to avoid cross-event-loop session issues."""
+ text = _sanitize_telegram_outbound_text(text)
async with _temp_bot(token) as bot:
- await tc.send_text(bot, chat_id, text, parse_mode=parse_mode)
+ if keyboard:
+ await tc.send_text_with_keyboard(
+ bot,
+ chat_id,
+ text,
+ keyboard,
+ parse_mode=parse_mode,
+ message_thread_id=message_thread_id,
+ )
+ else:
+ await tc.send_text(bot, chat_id, text, parse_mode=parse_mode, message_thread_id=message_thread_id)
-def _start_typing(token: str, chat_id: int) -> threading.Event:
+def _start_typing(token: str, chat_id: int, message_thread_id: int | None = None) -> threading.Event:
"""Spawn a daemon thread that sends typing every 4s. Returns a stop Event."""
stop = threading.Event()
@@ -553,7 +1615,7 @@ def _run():
async def _loop():
async with _temp_bot(token) as bot:
while not stop.is_set():
- await tc.send_typing(bot, chat_id)
+ await tc.send_typing(bot, chat_id, message_thread_id=message_thread_id)
for _ in range(8):
if stop.is_set():
return