Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion livekit-agents/livekit/agents/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class _ToggleMode(Exception):
pass


class _ExitCli(Exception):
class _ExitCli(BaseException):
pass


Expand Down
54 changes: 54 additions & 0 deletions tests/test_drain_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
from __future__ import annotations

import asyncio
import multiprocessing as mp
import socket
from unittest.mock import AsyncMock, patch

import pytest

from livekit.agents.cli.cli import _ExitCli, _run_worker
from livekit.agents.cli.proto import CliArgs
from livekit.agents.ipc.supervised_proc import SupervisedProc
from livekit.agents.utils import aio
from livekit.agents.worker import AgentServer

_CLI_ARGS = CliArgs(log_level="ERROR", url=None, api_key=None, api_secret=None)
Expand All @@ -31,6 +35,29 @@ def _make_server(drain_timeout: int = 1) -> AgentServer:
return server


class _DummySupervisedProc(SupervisedProc):
def _create_process(self, cch: socket.socket, log_cch: socket.socket) -> mp.Process:
raise NotImplementedError

async def _main_task(self, ipc_ch: aio.ChanReceiver[object]) -> None:
raise NotImplementedError


def _make_supervised_proc() -> _DummySupervisedProc:
return _DummySupervisedProc(
initialize_timeout=1.0,
close_timeout=1.0,
memory_warn_mb=0.0,
memory_limit_mb=0.0,
ping_interval=1.0,
ping_timeout=1.0,
high_ping_threshold=1.0,
http_proxy=None,
mp_ctx=mp.get_context("spawn"),
loop=asyncio.get_event_loop(),
)


class TestDrainTimeout:
"""Verify that aclose() is called regardless of drain() outcome."""

Expand Down Expand Up @@ -146,3 +173,30 @@ def test_exitcli_during_drain_forces_exit(self) -> None:

mock_aclose.assert_not_awaited()
mock_exit.assert_called_once_with(1)

def test_memory_monitor_does_not_swallow_exitcli(self) -> None:
"""SIGTERM/SIGINT should not be eaten by broad Exception handlers.

Issue #4664 showed _ExitCli being raised from a signal handler while
_memory_monitor_task() was inside psutil. Because _ExitCli inherited
from Exception, the blanket ``except Exception`` here swallowed the
shutdown signal and left the worker running instead of draining.
"""
proc = _make_supervised_proc()
proc._pid = 123

async def _fake_sleep(_: float) -> None:
proc._closing = True

with (
patch(
"livekit.agents.ipc.supervised_proc.psutil.Process",
side_effect=_ExitCli(),
),
patch("livekit.agents.ipc.supervised_proc.asyncio.sleep", side_effect=_fake_sleep),
patch("livekit.agents.ipc.supervised_proc.logger.exception") as mock_exception,
):
with pytest.raises(_ExitCli):
asyncio.get_event_loop().run_until_complete(proc._memory_monitor_task())

mock_exception.assert_not_called()
Loading