Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ pydocstyle.convention = "numpy"
# those files have an increased risk of relying on import order
"tests/*" = [
"S101", # asserts are fine in tests
"S311", # deterministic seeded RNG is fine for test simulations
"B018", # 'useless expressions' are ok because some tests just check for exceptions
"TID251", # tests can use stdlib logging
]
Expand Down
1 change: 1 addition & 0 deletions src/ess/livedata/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class ServiceStatus:
active_job_count: int
messages_processed: int
error: str | None = None
batch_interval_s: float = 1.0


def _add_time_coords(
Expand Down
180 changes: 180 additions & 0 deletions src/ess/livedata/core/message_batcher.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
import time
from abc import ABC, abstractmethod
from collections.abc import Callable
from dataclasses import dataclass
from numbers import Number
from typing import Any

import structlog

from ess.livedata.core.message import Message

logger = structlog.get_logger(__name__)


@dataclass(slots=True, kw_only=True)
class MessageBatch:
Expand All @@ -23,6 +29,34 @@ def batch(self, messages: list[Message[Any]]) -> MessageBatch | None:
If no batch can be created (batch incomplete), return None.
"""

def report_batch( # noqa: B027
self,
message_count: int | None,
processing_time_s: float = 0.0,
) -> None:
"""Report the outcome of the last processing cycle.

Called by the processor after each cycle. Batchers that support adaptive
behavior override this to adjust their batch length. The default is a
no-op.

Parameters
----------
message_count:
Number of messages in the processed batch. ``None`` if the batcher
returned ``None`` (idle cycle). 0 indicates an empty batch from a
time gap.
processing_time_s:
Wall-clock time spent processing the batch (preprocessing, workflow
execution, serialization). Used by adaptive batchers to detect
overload. Ignored for idle cycles.
"""

@property
def batch_length_s(self) -> float:
"""Current effective batch length in seconds."""
return 1.0


class NaiveMessageBatcher(MessageBatcher):
"""
Expand All @@ -35,9 +69,14 @@ def __init__(
self, batch_length_s: float = 1.0, pulse_length_s: float = 1.0 / 14
) -> None:
# Batch length is currently ignored.
self._batch_length_s = batch_length_s
self._batch_length_ns = int(batch_length_s * 1_000_000_000)
self._pulse_length_ns = int(pulse_length_s * 1_000_000_000)

@property
def batch_length_s(self) -> float:
return self._batch_length_s

def batch(self, messages: list[Message[Any]]) -> MessageBatch | None:
# Filter messages with incompatible (broken) timestamps to avoid issues below.
messages = [msg for msg in messages if isinstance(msg.timestamp, Number)]
Expand Down Expand Up @@ -81,10 +120,24 @@ class SimpleMessageBatcher(MessageBatcher):
"""

def __init__(self, batch_length_s: float = 1.0) -> None:
self._batch_length_s_value = batch_length_s
self._batch_length_ns = int(batch_length_s * 1_000_000_000)
self._active_batch: MessageBatch | None = None
self._future_messages: list[Message[Any]] = []

@property
def batch_length_s(self) -> float:
return self._batch_length_s_value

def set_batch_length(self, batch_length_s: float) -> None:
"""Update the batch length for future batches.

The current active batch keeps its boundaries and completes normally.
Only the next batch boundary will use the new length.
"""
self._batch_length_s_value = batch_length_s
self._batch_length_ns = int(batch_length_s * 1_000_000_000)

def batch(self, messages: list[Message[Any]]) -> MessageBatch | None:
# Filter messages with incompatible (broken) timestamps to avoid issues below.
messages = [msg for msg in messages if isinstance(msg.timestamp, Number)]
Expand Down Expand Up @@ -143,3 +196,130 @@ def _split_messages(
before = [msg for msg in messages if msg.timestamp < timestamp]
after = [msg for msg in messages if msg.timestamp >= timestamp]
return before, after


ESCALATION_OVERLOAD_THRESHOLD = 2
ESCALATION_HALF_STEPS = 2
DEESCALATION_HEADROOM_RATIO = 0.75
DEESCALATION_UNDERLOAD_THRESHOLD = 3
DEESCALATION_IDLE_WINDOWS = 3

_SQRT2 = 2**0.5


@dataclass(frozen=True)
class AdaptiveBatcherState:
"""State snapshot of an AdaptiveMessageBatcher for status reporting."""

level: int
batch_length_s: float


class AdaptiveMessageBatcher(MessageBatcher):
"""A message batcher that dynamically adjusts its batch length based on load.

Wraps a ``SimpleMessageBatcher`` and uses processing-time feedback to detect
overload. When processing consistently exceeds the batch window, the batcher
escalates by doubling the window (+2 half-steps). When processing completes
with headroom, it de-escalates by a factor of 1/sqrt(2) (-1 half-step).

The asymmetric step sizes mean two de-escalation steps undo one escalation,
providing natural damping. The batch window is always on the grid
``base * sqrt(2)^n``, avoiding floating-point drift.

Idle periods also trigger de-escalation via a wall-clock fallback.
"""

def __init__(
self,
base_batch_length_s: float = 1.0,
max_level: int = 3,
clock: Callable[[], float] = time.monotonic,
) -> None:
self._base_batch_length_s = base_batch_length_s
self._max_half_steps = max_level * 2
self._half_step = 0
self._consecutive_overloaded = 0
self._consecutive_underloaded = 0
self._last_nonempty_batch_time: float | None = None
self._clock = clock
self._inner = SimpleMessageBatcher(batch_length_s=base_batch_length_s)

def batch(self, messages: list[Message[Any]]) -> MessageBatch | None:
return self._inner.batch(messages)

def report_batch(
self,
message_count: int | None,
processing_time_s: float = 0.0,
) -> None:
if message_count is None:
# Idle cycle — no load signal, leave consecutive counters
# untouched. Genuine idleness is handled by the wall-clock
# fallback below; resetting counters here would prevent
# de-escalation under continuous light load where idle polls
# between batches outnumber real reports.
if self._half_step > 0 and self._last_nonempty_batch_time is not None:
idle_s = self._clock() - self._last_nonempty_batch_time
idle_windows = idle_s / self.batch_length_s
if idle_windows >= DEESCALATION_IDLE_WINDOWS:
self._set_half_step(self._half_step - 1)
self._last_nonempty_batch_time = self._clock()
elif message_count == 0:
# Empty batch from time gap — not a load signal
pass
else:
# Non-empty batch — use processing time to decide
self._last_nonempty_batch_time = self._clock()

if processing_time_s > self.batch_length_s:
# Overloaded: processing exceeded the batch window
self._consecutive_overloaded += 1
self._consecutive_underloaded = 0
if (
self._consecutive_overloaded >= ESCALATION_OVERLOAD_THRESHOLD
and self._half_step < self._max_half_steps
):
new = min(
self._half_step + ESCALATION_HALF_STEPS,
self._max_half_steps,
)
self._set_half_step(new)
self._consecutive_overloaded = 0
elif processing_time_s < self.batch_length_s * DEESCALATION_HEADROOM_RATIO:
# Underloaded: headroom available
self._consecutive_underloaded += 1
self._consecutive_overloaded = 0
if (
self._consecutive_underloaded >= DEESCALATION_UNDERLOAD_THRESHOLD
and self._half_step > 0
):
self._set_half_step(self._half_step - 1)
self._consecutive_underloaded = 0
else:
# In between — processing fits but without much headroom
self._consecutive_overloaded = 0
self._consecutive_underloaded = 0

def _set_half_step(self, new_half_step: int) -> None:
old_length = self.batch_length_s
self._half_step = new_half_step
new_length = self.batch_length_s
logger.warning(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a warning? Do you consider a batch change to be a config error? It seems to me like these changes will be common and part of normal operation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not consider it a config error in general. I expect in many cases that the batch size can be the minimum (1 second), but for larger or un-optimized data-reduction I expect that the batch size needs to be improved.

It seems to me like these changes will be common and part of normal operation.

I think the scenarios may give a wrong impression - I hope that we can stay at the minimum batch size almost always. Batch size increases are there to deal with spikes in the backlog (such as GC running), or as a way to keep operating without dropping data, until we have scaled our system (e.g., by running more backend workers).

'adaptive_batch_level_change',
old_batch_length_s=old_length,
new_batch_length_s=new_length,
level=self._half_step,
)
self._inner.set_batch_length(new_length)

@property
def batch_length_s(self) -> float:
return self._base_batch_length_s * _SQRT2**self._half_step

@property
def state(self) -> AdaptiveBatcherState:
return AdaptiveBatcherState(
level=self._half_step,
batch_length_s=self.batch_length_s,
)
16 changes: 14 additions & 2 deletions src/ess/livedata/core/orchestrating_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
Tin,
Tout,
)
from .message_batcher import MessageBatch, MessageBatcher, SimpleMessageBatcher
from .message_batcher import (
AdaptiveMessageBatcher,
MessageBatch,
MessageBatcher,
)

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -112,7 +116,7 @@ def __init__(
job_factory=JobFactory(instrument=instrument), job_threads=job_threads
)
self._job_manager_adapter = JobManagerAdapter(job_manager=self._job_manager)
self._message_batcher = message_batcher or SimpleMessageBatcher()
self._message_batcher = message_batcher or AdaptiveMessageBatcher()
self._config_processor = ConfigProcessor(
job_manager_adapter=self._job_manager_adapter
)
Expand Down Expand Up @@ -172,6 +176,7 @@ def process(self) -> None:

message_batch = self._message_batcher.batch(data_messages)
if message_batch is None:
self._message_batcher.report_batch(None, processing_time_s=0.0)
self._empty_batches += 1
self._maybe_log_metrics()
self._sink.publish_messages(result_messages)
Expand All @@ -182,6 +187,8 @@ def process(self) -> None:
time.sleep(0.1)
return

batch_start = time.monotonic()

# Pre-process message batch
workflow_data = self._message_preprocessor.preprocess_messages(message_batch)

Expand Down Expand Up @@ -220,6 +227,10 @@ def process(self) -> None:
else:
valid_results.append(result)

processing_time_s = time.monotonic() - batch_start
self._message_batcher.report_batch(
len(message_batch.messages), processing_time_s=processing_time_s
)
self._batches_processed += 1
self._maybe_log_metrics()

Expand Down Expand Up @@ -259,6 +270,7 @@ def _get_service_status(self, job_statuses: list[JobStatus]) -> ServiceStatus:
active_job_count=len(job_statuses),
messages_processed=self._messages_processed,
error=self._service_error,
batch_interval_s=self._message_batcher.batch_length_s,
)

def _maybe_log_metrics(self) -> None:
Expand Down
5 changes: 4 additions & 1 deletion src/ess/livedata/dashboard/widgets/backend_status_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,10 @@ def update(
# Stats
jobs_text = f"Jobs: {status.active_job_count}"
msgs_text = f"Msgs: {_format_messages(status.messages_processed)}"
self._stats_pane.object = f"<span>{jobs_text} | {msgs_text}</span>"
batch_text = f"Batch: {status.batch_interval_s:.0f}s"
self._stats_pane.object = (
f"<span>{jobs_text} | {msgs_text} | {batch_text}</span>"
)

def _calculate_uptime(self, started_at_ns: int) -> float:
"""Calculate uptime in seconds from started_at timestamp."""
Expand Down
5 changes: 5 additions & 0 deletions src/ess/livedata/kafka/x5f2_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ class ServiceStatusPayload(pydantic.BaseModel):
description="Total messages processed since startup"
)
error: str | None = pydantic.Field(default=None, description="Error message if any")
batch_interval_s: float = pydantic.Field(
default=1.0, description="Current batch interval in seconds"
)


class ServiceStatusJSON(pydantic.BaseModel):
Expand Down Expand Up @@ -280,6 +283,7 @@ def from_service_status(
active_job_count=status.active_job_count,
messages_processed=status.messages_processed,
error=status.error,
batch_interval_s=status.batch_interval_s,
),
),
)
Expand All @@ -296,6 +300,7 @@ def to_service_status(self) -> ServiceStatus:
active_job_count=message.active_job_count,
messages_processed=message.messages_processed,
error=message.error,
batch_interval_s=message.batch_interval_s,
)


Expand Down
Loading
Loading