diff --git a/src/ess/livedata/core/job.py b/src/ess/livedata/core/job.py index 1d0a6e161..f113a347a 100644 --- a/src/ess/livedata/core/job.py +++ b/src/ess/livedata/core/job.py @@ -122,6 +122,10 @@ class ServiceStatus: active_job_count: int messages_processed: int error: str | None = None + is_shedding: bool = False + shedding_level: int = 0 + messages_dropped: int = 0 + messages_eligible: int = 0 def _add_time_coords( diff --git a/src/ess/livedata/core/load_shedder.py b/src/ess/livedata/core/load_shedder.py new file mode 100644 index 000000000..77828fe74 --- /dev/null +++ b/src/ess/livedata/core/load_shedder.py @@ -0,0 +1,237 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +"""Load shedding for backend message processing. + +When the backend can't keep up with the Kafka message stream, the LoadShedder +selectively drops bulk event data while preserving control messages and f144 logs. + +Overload detection relies on the ``SimpleMessageBatcher`` producing consecutive +non-empty batches. The batcher uses 1-second time windows aligned to message +timestamps: ``batch()`` returns None while all incoming messages fall within the +current window, and returns a non-None batch only when a message crosses the window +boundary. Under normal load, the total processing cycle (fetch → preprocess → +workflow → publish) completes well within one batch window, so the messages fetched +in the next cycle still fall within the same window — ``batch()`` returns None. +Consecutive non-None results mean the processing cycle consistently takes longer +than the batch window, so messages accumulate past the next boundary on every call. + +The effective capacity threshold is slightly below 100% because of the 100 ms idle +sleep in ``OrchestratingProcessor.process()``. When the batcher returns None (no +boundary crossed yet), the processor sleeps 100 ms before the next poll. This means +the total cycle time is ``processing_time + N * 100 ms`` (where N ≥ 1 idle cycles). +With 1-second batch windows, shedding can activate when processing alone takes +roughly 900 ms or more — about 90% utilization. This built-in safety margin is +desirable: a system at 90%+ utilization has almost no headroom for traffic bursts +or GC pauses. + +Empty batches (non-None but with zero messages) are excluded from the overload signal. +The batcher emits these when message timestamps jump forward (e.g., after a pause +between measurement runs) to step through the gap one window at a time. These do not +indicate overload and must not trigger shedding. +""" + +from __future__ import annotations + +import time +from collections.abc import Callable +from dataclasses import dataclass + +import structlog + +from .message import Message, StreamKind + +logger = structlog.get_logger(__name__) + +DROPPABLE_KINDS = frozenset( + { + StreamKind.DETECTOR_EVENTS, + StreamKind.MONITOR_EVENTS, + StreamKind.MONITOR_COUNTS, + StreamKind.AREA_DETECTOR, + } +) + +# Consecutive non-empty batcher results before entering shedding mode. +# With 1-second batch windows this means ~5 seconds of sustained overload. +_ACTIVATION_THRESHOLD = 5 +# Consecutive idle cycles (no batch, or empty batch) before de-escalating one level. +_DEACTIVATION_THRESHOLD = 3 + +_MAX_LEVEL = 3 + +_N_BUCKETS = 10 +_BUCKET_DURATION_S = 6.0 # 10 buckets x 6s = 60s rolling window + + +@dataclass(frozen=True, slots=True) +class LoadShedderState: + """Snapshot of load shedder state for status reporting.""" + + is_shedding: bool + shedding_level: int + messages_dropped: int + messages_eligible: int + + +class _RollingCounter: + """Pair of (dropped, eligible) counters over a fixed rolling time window. + + The window is divided into fixed-size time buckets. Buckets older than the + window are discarded when the counter is advanced to the current time. + """ + + def __init__( + self, + n_buckets: int = _N_BUCKETS, + bucket_duration_s: float = _BUCKET_DURATION_S, + clock: Callable[[], float] = time.monotonic, + ) -> None: + self._n_buckets = n_buckets + self._bucket_duration_s = bucket_duration_s + self._clock = clock + self._dropped = [0] * n_buckets + self._eligible = [0] * n_buckets + self._current_bucket: int = 0 + self._last_time: float = clock() + + def _advance(self) -> None: + """Advance to the current time, zeroing any expired buckets.""" + now = self._clock() + elapsed = now - self._last_time + steps = int(elapsed / self._bucket_duration_s) + if steps <= 0: + return + # Cap: if we've been idle longer than the full window, just clear everything + steps = min(steps, self._n_buckets) + for i in range(1, steps + 1): + bucket = (self._current_bucket + i) % self._n_buckets + self._dropped[bucket] = 0 + self._eligible[bucket] = 0 + self._current_bucket = (self._current_bucket + steps) % self._n_buckets + self._last_time += steps * self._bucket_duration_s + + def record(self, *, dropped: int, eligible: int) -> None: + """Record counts into the current bucket.""" + self._advance() + self._dropped[self._current_bucket] += dropped + self._eligible[self._current_bucket] += eligible + + def totals(self) -> tuple[int, int]: + """Return (dropped, eligible) summed over the rolling window.""" + self._advance() + return sum(self._dropped), sum(self._eligible) + + +class LoadShedder: + """Selectively drops bulk event data when the backend falls behind. + + Overload detection counts consecutive non-empty batches produced by the + message batcher. A non-empty batch means messages crossed a time-window + boundary, which happens approximately once per batch window under normal + load. Consecutive non-empty batches mean the processor is not keeping up: + by the time one batch is processed, enough new messages have arrived to + immediately complete the next window. + + Empty batches (non-None result with zero messages) are explicitly excluded. + The ``SimpleMessageBatcher`` emits these to step through timestamp gaps + (e.g., after a pause between measurements) and they do not indicate load. + + Shedding uses exponential levels: level N keeps every ``2**N``-th droppable + message. Each level handles a 2x increase in overload (level 1 = 50% drop, + level 2 = 75%, level 3 = 87.5%). The level escalates by 1 after + ``_ACTIVATION_THRESHOLD`` consecutive non-empty batches, up to + ``_MAX_LEVEL``, and de-escalates by 1 after ``_DEACTIVATION_THRESHOLD`` + consecutive idle cycles. + + Drop statistics are tracked over a rolling 60-second window. + """ + + def __init__(self, *, clock: Callable[[], float] = time.monotonic) -> None: + self._consecutive_batches: int = 0 + self._consecutive_idle: int = 0 + self._level: int = 0 + self._subsample_counter: int = 0 + self._rolling = _RollingCounter(clock=clock) + + @property + def state(self) -> LoadShedderState: + dropped, eligible = self._rolling.totals() + return LoadShedderState( + is_shedding=self._level > 0, + shedding_level=self._level, + messages_dropped=dropped, + messages_eligible=eligible, + ) + + def report_batch_result(self, batch_message_count: int) -> None: + """Update overload detection counters after a batcher cycle. + + Only batches with at least one message count toward the activation + threshold. Empty batches (zero messages) are treated as idle because + they arise from the batcher stepping through timestamp gaps, not from + genuine overload. + + Parameters + ---------- + batch_message_count: + Number of messages in the batch returned by the batcher, or 0 if + the batcher returned None (no batch) or an empty batch. + """ + if batch_message_count > 0: + self._consecutive_batches += 1 + self._consecutive_idle = 0 + if ( + self._consecutive_batches >= _ACTIVATION_THRESHOLD + and self._level < _MAX_LEVEL + ): + self._level += 1 + self._consecutive_batches = 0 + logger.warning( + 'shedding_escalated', + level=self._level, + keeping=f"1/{2**self._level}", + ) + else: + self._consecutive_idle += 1 + self._consecutive_batches = 0 + if self._level > 0 and self._consecutive_idle >= _DEACTIVATION_THRESHOLD: + self._level -= 1 + self._consecutive_idle = 0 + if self._level == 0: + self._subsample_counter = 0 + logger.warning('shedding_stopped') + else: + logger.warning( + 'shedding_deescalated', + level=self._level, + keeping=f"1/{2**self._level}", + ) + + def shed(self, messages: list[Message]) -> list[Message]: + """Filter messages when shedding is active. + + When inactive, returns all messages unchanged. + When active, keeps every ``2**level``-th droppable message. + Non-droppable messages (control, f144 logs) are always preserved. + + Both active and inactive calls record eligible message counts into the + rolling window so the drop rate reflects what fraction is being shed. + """ + eligible = sum(1 for m in messages if m.stream.kind in DROPPABLE_KINDS) + if self._level == 0: + self._rolling.record(dropped=0, eligible=eligible) + return messages + keep_every = 2**self._level + dropped = 0 + result: list[Message] = [] + for msg in messages: + if msg.stream.kind not in DROPPABLE_KINDS: + result.append(msg) + else: + self._subsample_counter += 1 + if self._subsample_counter % keep_every == 0: + result.append(msg) + else: + dropped += 1 + self._rolling.record(dropped=dropped, eligible=eligible) + return result diff --git a/src/ess/livedata/core/orchestrating_processor.py b/src/ess/livedata/core/orchestrating_processor.py index 00f3fe00e..f869ee9b4 100644 --- a/src/ess/livedata/core/orchestrating_processor.py +++ b/src/ess/livedata/core/orchestrating_processor.py @@ -14,6 +14,7 @@ from .job import JobResult, JobStatus, ServiceState, ServiceStatus from .job_manager import JobFactory, JobManager, WorkflowData from .job_manager_adapter import JobManagerAdapter +from .load_shedder import LoadShedder from .message import ( COMMANDS_STREAM_ID, STATUS_STREAM_ID, @@ -89,6 +90,7 @@ def __init__( sink: MessageSink[Tout], preprocessor_factory: PreprocessorFactory[Tin, Tout], message_batcher: MessageBatcher | None = None, + enable_load_shedding: bool = True, ) -> None: self._source = source self._sink = sink @@ -100,6 +102,7 @@ def __init__( self._config_processor = ConfigProcessor( job_manager_adapter=self._job_manager_adapter ) + self._load_shedder = LoadShedder() if enable_load_shedding else None self._last_status_update: int | None = None self._status_update_interval = 2_000_000_000 # 2 seconds @@ -143,7 +146,14 @@ def process(self) -> None: self._report_status() + if self._load_shedder is not None: + data_messages = self._load_shedder.shed(data_messages) message_batch = self._message_batcher.batch(data_messages) + if self._load_shedder is not None: + # Empty batches (from batcher timestamp catch-up) are not an + # overload signal — only count batches that carry data. + count = len(message_batch.messages) if message_batch is not None else 0 + self._load_shedder.report_batch_result(count) if message_batch is None: self._empty_batches += 1 self._maybe_log_metrics() @@ -222,6 +232,7 @@ def _report_status(self) -> None: def _get_service_status(self, job_statuses: list[JobStatus]) -> ServiceStatus: """Get the current service status for heartbeat publishing.""" + shedder_state = self._load_shedder.state if self._load_shedder else None return ServiceStatus( instrument=self._instrument, namespace=self._namespace, @@ -231,6 +242,10 @@ def _get_service_status(self, job_statuses: list[JobStatus]) -> ServiceStatus: active_job_count=len(job_statuses), messages_processed=self._messages_processed, error=self._service_error, + is_shedding=shedder_state.is_shedding if shedder_state else False, + shedding_level=shedder_state.shedding_level if shedder_state else 0, + messages_dropped=shedder_state.messages_dropped if shedder_state else 0, + messages_eligible=shedder_state.messages_eligible if shedder_state else 0, ) def _maybe_log_metrics(self) -> None: @@ -242,6 +257,7 @@ def _maybe_log_metrics(self) -> None: if timestamp - self._last_metrics_time >= self._metrics_interval: active_jobs = len(self._job_manager.active_jobs) + shedder_state = self._load_shedder.state if self._load_shedder else None logger.info( 'processor_metrics', messages=self._messages_processed, @@ -249,6 +265,14 @@ def _maybe_log_metrics(self) -> None: empty_batches=self._empty_batches, active_jobs=active_jobs, errors=self._errors_since_last_metrics, + shedding=shedder_state.is_shedding if shedder_state else False, + shedding_level=(shedder_state.shedding_level if shedder_state else 0), + messages_dropped=( + shedder_state.messages_dropped if shedder_state else 0 + ), + messages_eligible=( + shedder_state.messages_eligible if shedder_state else 0 + ), interval_seconds=(timestamp - self._last_metrics_time) / 1e9, ) # Reset counters (except messages_processed which is cumulative for service) diff --git a/src/ess/livedata/dashboard/widgets/backend_status_widget.py b/src/ess/livedata/dashboard/widgets/backend_status_widget.py index bf92eb24f..f099ade77 100644 --- a/src/ess/livedata/dashboard/widgets/backend_status_widget.py +++ b/src/ess/livedata/dashboard/widgets/backend_status_widget.py @@ -26,6 +26,7 @@ class WorkerUIConstants: } DEFAULT_COLOR = "#6c757d" STALE_COLOR = "#dc3545" # Red for unexpectedly disappeared workers + SHEDDING_COLOR = "#ff8c00" # Amber for load shedding # Sizes NAMESPACE_WIDTH = 200 @@ -73,6 +74,14 @@ def _format_messages(count: int) -> str: return f"{count / 1_000_000:.1f}M" +def _format_drop_rate(dropped: int, eligible: int) -> str: + """Format drop rate as percentage over the rolling window.""" + if eligible == 0: + return "Dropping: <1%" + pct = 100 * dropped / eligible + return f"Dropping: {pct:.0f}%" + + class WorkerStatusRow: """Widget to display the status of a single backend worker. @@ -126,12 +135,14 @@ def __init__( self.update(status, is_stale, last_seen_seconds_ago) def _get_status_color(self, status: ServiceStatus, is_stale: bool) -> str: - """Get color for worker state, considering staleness.""" + """Get color for worker state, considering staleness and load shedding.""" if is_stale: # Graceful shutdown (inferred from timed-out stopping): show gray if status.state == ServiceState.stopping: return WorkerUIConstants.COLORS[ServiceState.stopped] return WorkerUIConstants.STALE_COLOR + if status.is_shedding: + return WorkerUIConstants.SHEDDING_COLOR return WorkerUIConstants.COLORS.get( status.state, WorkerUIConstants.DEFAULT_COLOR ) @@ -165,6 +176,8 @@ def update( # Distinguish graceful shutdown from unexpected disappearance is_graceful = status.state == ServiceState.stopping status_text = "STOPPED" if is_graceful else "STALE" + elif status.is_shedding: + status_text = "SHEDDING" else: status_text = status.state.value.upper() status_style = self._create_status_style(status_color) @@ -187,7 +200,16 @@ def update( # Stats jobs_text = f"Jobs: {status.active_job_count}" msgs_text = f"Msgs: {_format_messages(status.messages_processed)}" - self._stats_pane.object = f"{jobs_text} | {msgs_text}" + stats_parts = [jobs_text, msgs_text] + if status.messages_dropped > 0: + drop_text = _format_drop_rate( + status.messages_dropped, status.messages_eligible + ) + stats_parts.append( + f'' + f"{drop_text}" + ) + self._stats_pane.object = f"{' | '.join(stats_parts)}" def _calculate_uptime(self, started_at_ns: int) -> float: """Calculate uptime in seconds from started_at timestamp.""" @@ -278,6 +300,7 @@ def _format_summary(self) -> str: stopped_count = 0 stale_count = 0 error_count = 0 + shedding_count = 0 for worker_key, status in self._service_registry.worker_statuses.items(): is_stale = self._service_registry.is_status_stale(worker_key) @@ -288,6 +311,8 @@ def _format_summary(self) -> str: stopped_count += 1 else: stale_count += 1 + elif status.is_shedding: + shedding_count += 1 elif status.state == ServiceState.starting: starting_count += 1 elif status.state == ServiceState.running: @@ -310,6 +335,10 @@ def _span(color: str, count: int, label: str) -> str: ) if running_count: parts.append(_span(colors[ServiceState.running], running_count, "running")) + if shedding_count: + parts.append( + _span(WorkerUIConstants.SHEDDING_COLOR, shedding_count, "shedding") + ) if stopping_count: parts.append( _span(colors[ServiceState.stopping], stopping_count, "stopping") diff --git a/src/ess/livedata/kafka/x5f2_compat.py b/src/ess/livedata/kafka/x5f2_compat.py index fc2bcd36d..fa459e082 100644 --- a/src/ess/livedata/kafka/x5f2_compat.py +++ b/src/ess/livedata/kafka/x5f2_compat.py @@ -197,6 +197,18 @@ class ServiceStatusPayload(pydantic.BaseModel): description="Total messages processed since startup" ) error: str | None = pydantic.Field(default=None, description="Error message if any") + is_shedding: bool = pydantic.Field( + default=False, description="Whether load shedding is active" + ) + shedding_level: int = pydantic.Field( + default=0, description="Current shedding level (0=off, N=keep 1/2^N)" + ) + messages_dropped: int = pydantic.Field( + default=0, description="Messages dropped in the rolling window" + ) + messages_eligible: int = pydantic.Field( + default=0, description="Droppable messages seen in the rolling window" + ) class ServiceStatusJSON(pydantic.BaseModel): @@ -282,6 +294,10 @@ def from_service_status( active_job_count=status.active_job_count, messages_processed=status.messages_processed, error=status.error, + is_shedding=status.is_shedding, + shedding_level=status.shedding_level, + messages_dropped=status.messages_dropped, + messages_eligible=status.messages_eligible, ), ), ) @@ -298,6 +314,10 @@ def to_service_status(self) -> ServiceStatus: active_job_count=message.active_job_count, messages_processed=message.messages_processed, error=message.error, + is_shedding=message.is_shedding, + shedding_level=message.shedding_level, + messages_dropped=message.messages_dropped, + messages_eligible=message.messages_eligible, ) diff --git a/tests/core/load_shedder_test.py b/tests/core/load_shedder_test.py new file mode 100644 index 000000000..118fb518c --- /dev/null +++ b/tests/core/load_shedder_test.py @@ -0,0 +1,394 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) + +import pytest + +from ess.livedata.core.load_shedder import ( + _ACTIVATION_THRESHOLD, + _BUCKET_DURATION_S, + _DEACTIVATION_THRESHOLD, + _MAX_LEVEL, + _N_BUCKETS, + DROPPABLE_KINDS, + LoadShedder, +) +from ess.livedata.core.message import Message, StreamId, StreamKind + + +def _make_message(kind: StreamKind, name: str = "src") -> Message: + return Message(timestamp=0, stream=StreamId(kind=kind, name=name), value=b"") + + +class FakeClock: + """Deterministic clock for testing the rolling window.""" + + def __init__(self, start: float = 0.0) -> None: + self._time = start + + def __call__(self) -> float: + return self._time + + def advance(self, seconds: float) -> None: + self._time += seconds + + +def _make_shedder(clock: FakeClock | None = None) -> LoadShedder: + if clock is None: + clock = FakeClock() + return LoadShedder(clock=clock) + + +def _activate(shedder: LoadShedder) -> None: + for _ in range(_ACTIVATION_THRESHOLD): + shedder.report_batch_result(batch_message_count=10) + assert shedder.state.is_shedding is True + + +class TestLoadShedderInitialState: + def test_not_shedding_initially(self): + shedder = _make_shedder() + assert shedder.state.is_shedding is False + + def test_zero_dropped_initially(self): + shedder = _make_shedder() + assert shedder.state.messages_dropped == 0 + + def test_zero_eligible_initially(self): + shedder = _make_shedder() + assert shedder.state.messages_eligible == 0 + + +class TestLoadShedderActivation: + def test_activates_after_consecutive_batches(self): + shedder = _make_shedder() + _activate(shedder) + + def test_does_not_activate_below_threshold(self): + shedder = _make_shedder() + for _ in range(_ACTIVATION_THRESHOLD - 1): + shedder.report_batch_result(batch_message_count=10) + assert shedder.state.is_shedding is False + + def test_idle_cycle_resets_consecutive_count(self): + shedder = _make_shedder() + for _ in range(_ACTIVATION_THRESHOLD - 1): + shedder.report_batch_result(batch_message_count=10) + shedder.report_batch_result(batch_message_count=0) + # Restart counting — should not activate after fewer than threshold + for _ in range(_ACTIVATION_THRESHOLD - 1): + shedder.report_batch_result(batch_message_count=10) + assert shedder.state.is_shedding is False + + +class TestLoadShedderDeactivation: + @pytest.fixture + def active_shedder(self): + shedder = _make_shedder() + _activate(shedder) + return shedder + + def test_deactivates_after_consecutive_idle(self, active_shedder): + for _ in range(_DEACTIVATION_THRESHOLD): + active_shedder.report_batch_result(batch_message_count=0) + assert active_shedder.state.is_shedding is False + + def test_does_not_deactivate_below_threshold(self, active_shedder): + for _ in range(_DEACTIVATION_THRESHOLD - 1): + active_shedder.report_batch_result(batch_message_count=0) + assert active_shedder.state.is_shedding is True + + def test_batch_resets_idle_count(self, active_shedder): + for _ in range(_DEACTIVATION_THRESHOLD - 1): + active_shedder.report_batch_result(batch_message_count=0) + active_shedder.report_batch_result(batch_message_count=10) + # Restart idle counting + for _ in range(_DEACTIVATION_THRESHOLD - 1): + active_shedder.report_batch_result(batch_message_count=0) + assert active_shedder.state.is_shedding is True + + +class TestLoadShedderShed: + def test_passes_everything_when_inactive(self): + shedder = _make_shedder() + messages = [ + _make_message(StreamKind.DETECTOR_EVENTS), + _make_message(StreamKind.LOG), + _make_message(StreamKind.MONITOR_EVENTS), + ] + result = shedder.shed(messages) + assert result == messages + + def test_preserves_non_droppable_when_active(self): + shedder = _make_shedder() + _activate(shedder) + + non_droppable_kinds = [ + StreamKind.LOG, + StreamKind.LIVEDATA_COMMANDS, + StreamKind.LIVEDATA_RESPONSES, + StreamKind.LIVEDATA_DATA, + StreamKind.LIVEDATA_ROI, + StreamKind.LIVEDATA_STATUS, + StreamKind.UNKNOWN, + ] + messages = [_make_message(kind) for kind in non_droppable_kinds] + result = shedder.shed(messages) + assert result == messages + + def test_drops_roughly_half_of_droppable_when_active(self): + shedder = _make_shedder() + _activate(shedder) + + messages = [_make_message(StreamKind.DETECTOR_EVENTS) for _ in range(100)] + result = shedder.shed(messages) + assert len(result) == 50 + + def test_all_droppable_kinds_are_shed(self): + shedder = _make_shedder() + _activate(shedder) + + for kind in DROPPABLE_KINDS: + messages = [_make_message(kind) for _ in range(10)] + before = shedder.state.messages_dropped + result = shedder.shed(messages) + assert len(result) < len(messages), f"{kind} was not shed" + assert shedder.state.messages_dropped > before + + def test_mixed_messages_preserves_non_droppable(self): + shedder = _make_shedder() + _activate(shedder) + + log_msg = _make_message(StreamKind.LOG) + cmd_msg = _make_message(StreamKind.LIVEDATA_COMMANDS) + det_msgs = [_make_message(StreamKind.DETECTOR_EVENTS) for _ in range(10)] + messages = [log_msg, *det_msgs, cmd_msg] + + result = shedder.shed(messages) + assert log_msg in result + assert cmd_msg in result + + +class TestRollingWindow: + def test_dropped_count_within_window(self): + clock = FakeClock() + shedder = _make_shedder(clock) + _activate(shedder) + + messages = [_make_message(StreamKind.DETECTOR_EVENTS) for _ in range(100)] + shedder.shed(messages) + assert shedder.state.messages_dropped == 50 + assert shedder.state.messages_eligible == 100 + + def test_counts_accumulate_across_calls_in_same_bucket(self): + clock = FakeClock() + shedder = _make_shedder(clock) + _activate(shedder) + + batch = [_make_message(StreamKind.DETECTOR_EVENTS) for _ in range(10)] + shedder.shed(batch) + shedder.shed(batch) + assert shedder.state.messages_dropped == 10 + assert shedder.state.messages_eligible == 20 + + def test_counts_decay_after_window_expires(self): + clock = FakeClock() + shedder = _make_shedder(clock) + _activate(shedder) + + messages = [_make_message(StreamKind.DETECTOR_EVENTS) for _ in range(100)] + shedder.shed(messages) + assert shedder.state.messages_dropped == 50 + + # Advance past the full window + clock.advance(_N_BUCKETS * _BUCKET_DURATION_S + 1) + assert shedder.state.messages_dropped == 0 + assert shedder.state.messages_eligible == 0 + + def test_partial_window_decay(self): + clock = FakeClock() + shedder = _make_shedder(clock) + _activate(shedder) + + # Record in bucket 0 + batch = [_make_message(StreamKind.DETECTOR_EVENTS) for _ in range(10)] + shedder.shed(batch) + dropped_first = shedder.state.messages_dropped + + # Advance to a new bucket and record more + clock.advance(_BUCKET_DURATION_S) + shedder.shed(batch) + assert shedder.state.messages_dropped > dropped_first + + # Advance so the first bucket expires but not the second + clock.advance((_N_BUCKETS - 1) * _BUCKET_DURATION_S) + state = shedder.state + # Only the second bucket's data should remain + assert state.messages_dropped == 5 + assert state.messages_eligible == 10 + + def test_eligible_tracked_when_not_shedding(self): + """Even when not shedding, eligible messages are counted.""" + clock = FakeClock() + shedder = _make_shedder(clock) + # Not activated — no shedding + messages = [_make_message(StreamKind.DETECTOR_EVENTS) for _ in range(20)] + shedder.shed(messages) + state = shedder.state + assert state.messages_dropped == 0 + assert state.messages_eligible == 20 + + +class TestLoadShedderState: + def test_state_reflects_shedding(self): + shedder = _make_shedder() + assert shedder.state.is_shedding is False + _activate(shedder) + assert shedder.state.is_shedding is True + + def test_state_reports_shedding_level(self): + shedder = _make_shedder() + assert shedder.state.shedding_level == 0 + _activate(shedder) + assert shedder.state.shedding_level == 1 + + def test_state_is_snapshot(self): + shedder = _make_shedder() + state = shedder.state + _activate(shedder) + # Original snapshot unchanged (frozen dataclass) + assert state.is_shedding is False + assert shedder.state.is_shedding is True + + +def _escalate_to(shedder: LoadShedder, level: int) -> None: + """Escalate the shedder to the given level.""" + for _ in range(level): + for _ in range(_ACTIVATION_THRESHOLD): + shedder.report_batch_result(batch_message_count=10) + assert shedder.state.shedding_level == level + + +def _deescalate_by(shedder: LoadShedder, steps: int) -> None: + """De-escalate the shedder by the given number of steps.""" + for _ in range(steps): + for _ in range(_DEACTIVATION_THRESHOLD): + shedder.report_batch_result(batch_message_count=0) + + +class TestMultiLevelEscalation: + def test_escalates_to_level_2(self): + shedder = _make_shedder() + _escalate_to(shedder, 2) + assert shedder.state.shedding_level == 2 + + def test_escalates_to_max_level(self): + shedder = _make_shedder() + _escalate_to(shedder, _MAX_LEVEL) + assert shedder.state.shedding_level == _MAX_LEVEL + + def test_does_not_escalate_beyond_max_level(self): + shedder = _make_shedder() + _escalate_to(shedder, _MAX_LEVEL) + for _ in range(_ACTIVATION_THRESHOLD): + shedder.report_batch_result(batch_message_count=10) + assert shedder.state.shedding_level == _MAX_LEVEL + + def test_escalation_requires_threshold_per_level(self): + shedder = _make_shedder() + _escalate_to(shedder, 1) + # Not enough batches for next level + for _ in range(_ACTIVATION_THRESHOLD - 1): + shedder.report_batch_result(batch_message_count=10) + assert shedder.state.shedding_level == 1 + + +class TestMultiLevelDeescalation: + def test_deescalates_one_level_at_a_time(self): + shedder = _make_shedder() + _escalate_to(shedder, 3) + _deescalate_by(shedder, 1) + assert shedder.state.shedding_level == 2 + + def test_deescalates_to_zero(self): + shedder = _make_shedder() + _escalate_to(shedder, 2) + _deescalate_by(shedder, 2) + assert shedder.state.shedding_level == 0 + assert shedder.state.is_shedding is False + + def test_deescalation_requires_threshold_per_level(self): + shedder = _make_shedder() + _escalate_to(shedder, 2) + for _ in range(_DEACTIVATION_THRESHOLD - 1): + shedder.report_batch_result(batch_message_count=0) + assert shedder.state.shedding_level == 2 + + +class TestMultiLevelDropRates: + @pytest.mark.parametrize( + ("level", "expected_kept"), + [ + (1, 128), # keep 1/2 of 256 + (2, 64), # keep 1/4 of 256 + (3, 32), # keep 1/8 of 256 + ], + ) + def test_drop_rate_at_level(self, level, expected_kept): + shedder = _make_shedder() + _escalate_to(shedder, level) + messages = [_make_message(StreamKind.DETECTOR_EVENTS) for _ in range(256)] + result = shedder.shed(messages) + assert len(result) == expected_kept + + +class TestEmptyBatchesIgnored: + """Empty batches from batcher timestamp catch-up must not trigger shedding. + + The SimpleMessageBatcher emits empty batches (non-None with 0 messages) when + message timestamps jump forward (e.g., after a pause between measurement runs). + These are reported as batch_message_count=0 and must be treated as idle cycles. + """ + + def test_consecutive_empty_batches_do_not_activate(self): + shedder = _make_shedder() + for _ in range(_ACTIVATION_THRESHOLD + 5): + shedder.report_batch_result(batch_message_count=0) + assert shedder.state.is_shedding is False + + def test_empty_batches_reset_consecutive_count(self): + """An empty batch between data batches resets the overload counter.""" + shedder = _make_shedder() + for _ in range(_ACTIVATION_THRESHOLD - 1): + shedder.report_batch_result(batch_message_count=10) + # Empty batch interrupts the streak + shedder.report_batch_result(batch_message_count=0) + for _ in range(_ACTIVATION_THRESHOLD - 1): + shedder.report_batch_result(batch_message_count=10) + assert shedder.state.is_shedding is False + + def test_empty_batches_count_toward_deactivation(self): + """Empty batches count as idle and contribute to de-escalation.""" + shedder = _make_shedder() + _activate(shedder) + for _ in range(_DEACTIVATION_THRESHOLD): + shedder.report_batch_result(batch_message_count=0) + assert shedder.state.is_shedding is False + + def test_timestamp_gap_scenario(self): + """Simulate the batcher catch-up after a 5+ second timestamp gap. + + The batcher emits one empty batch per window to step through the gap. + None of these should trigger shedding. + """ + shedder = _make_shedder() + # Normal operation: occasional data batches with idle cycles between + shedder.report_batch_result(batch_message_count=50) + for _ in range(5): + shedder.report_batch_result(batch_message_count=0) + # Gap: 7 consecutive empty batches (batcher catching up through gap) + for _ in range(7): + shedder.report_batch_result(batch_message_count=0) + assert shedder.state.is_shedding is False + # Normal operation resumes with one data batch + shedder.report_batch_result(batch_message_count=50) + assert shedder.state.is_shedding is False diff --git a/tests/helpers/livedata_app.py b/tests/helpers/livedata_app.py index 291f06c9a..d1ad9e396 100644 --- a/tests/helpers/livedata_app.py +++ b/tests/helpers/livedata_app.py @@ -82,7 +82,9 @@ def from_service_builder( consumer = FakeConsumer() if use_naive_message_batcher: builder._processor_cls = partial( - OrchestratingProcessor, message_batcher=NaiveMessageBatcher() + OrchestratingProcessor, + message_batcher=NaiveMessageBatcher(), + enable_load_shedding=False, ) service = builder.from_consumer( consumer=consumer, diff --git a/tests/kafka/status_message_test.py b/tests/kafka/status_message_test.py index 10e88dc2f..f202277ba 100644 --- a/tests/kafka/status_message_test.py +++ b/tests/kafka/status_message_test.py @@ -781,6 +781,33 @@ def test_to_service_status(self): assert converted.active_job_count == original.active_job_count assert converted.messages_processed == original.messages_processed + def test_round_trip_with_shedding_fields(self): + """Test that shedding fields survive model round-trip.""" + original = make_service_status( + is_shedding=True, + shedding_level=2, + messages_dropped=42, + messages_eligible=100, + ) + msg = ServiceStatusMessage.from_service_status(original) + converted = msg.to_service_status() + + assert converted.is_shedding is True + assert converted.shedding_level == 2 + assert converted.messages_dropped == 42 + assert converted.messages_eligible == 100 + + def test_round_trip_defaults_shedding_fields(self): + """Test that shedding fields default gracefully.""" + original = make_service_status() + msg = ServiceStatusMessage.from_service_status(original) + converted = msg.to_service_status() + + assert converted.is_shedding is False + assert converted.shedding_level == 0 + assert converted.messages_dropped == 0 + assert converted.messages_eligible == 0 + class TestServiceStatusX5F2Integration: """Test service status x5f2 serialization/deserialization.""" @@ -808,6 +835,22 @@ def test_service_status_x5f2_round_trip(self): assert converted.messages_processed == original.messages_processed assert converted.error == original.error + def test_service_status_x5f2_round_trip_with_shedding(self): + """Test x5f2 round-trip includes load shedding fields.""" + original = make_service_status( + is_shedding=True, + shedding_level=3, + messages_dropped=1234, + messages_eligible=3000, + ) + x5f2_data = service_status_to_x5f2(original) + converted = x5f2_to_service_status(x5f2_data) + + assert converted.is_shedding is True + assert converted.shedding_level == 3 + assert converted.messages_dropped == 1234 + assert converted.messages_eligible == 3000 + def test_service_status_x5f2_with_error(self): """Test x5f2 round-trip with error message.""" original = make_service_status(