Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/developer/design/message-flow-and-transformation.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ Isolates ESSlivedata from Kafka topic structure: Kafka uses `(topic, source_name

### Adapter Pattern

`MessageAdapter` protocol: `adapt(message: T) -> U`. Benefits: composable, type-safe, testable, reusable.
`MessageAdapter` protocol: `adapt(message: T) -> Sequence[U]`. Each adapter returns a sequence, enabling 1:N message expansion (e.g., splitting multi-pulse ev44 messages into one message per pulse). Most adapters return a single-element tuple. Benefits: composable, type-safe, testable, reusable.

### Core Adapters

Expand All @@ -127,7 +127,7 @@ Isolates ESSlivedata from Kafka topic structure: Kafka uses `(topic, source_name

### Adapter Composition

**ChainedAdapter**: Chains two adapters sequentially (`second.adapt(first.adapt(message))`).
**ChainedAdapter**: Chains two adapters with flatmap semantics — for each intermediate result from the first adapter, all results from the second adapter are collected into a flat sequence.

**RouteByTopicAdapter**: Routes by Kafka topic to different adapters. Provides `.topics` list for subscription.

Expand Down
31 changes: 26 additions & 5 deletions src/ess/livedata/handlers/to_nxevent_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,33 @@
from ess.livedata.core.handler import Accumulator


def _require_single_pulse(ev44: eventdata_ev44.EventData) -> None:
def split_ev44_pulses(
ev44: eventdata_ev44.EventData,
) -> list[eventdata_ev44.EventData]:
"""Split a multi-pulse ev44 message into one EventData per pulse.

For single-pulse messages the input is returned as-is (no copy).
"""
n_pulses = len(ev44.reference_time)
if n_pulses <= 1:
return [ev44]
index = ev44.reference_time_index
if len(index) > 1 or index[0] != 0 or len(ev44.reference_time) > 1:
raise NotImplementedError("Processing multi-pulse messages is not supported.")
n_events = len(ev44.time_of_flight)
pulses: list[eventdata_ev44.EventData] = []
for i in range(n_pulses):
start = index[i]
end = index[i + 1] if i + 1 < len(index) else n_events
pulses.append(
eventdata_ev44.EventData(
source_name=ev44.source_name,
message_id=ev44.message_id,
reference_time=ev44.reference_time[i : i + 1],
reference_time_index=np.array([0]),
time_of_flight=ev44.time_of_flight[start:end],
pixel_id=ev44.pixel_id[start:end] if ev44.pixel_id is not None else [],
)
)
return pulses


@dataclass
Expand All @@ -36,7 +59,6 @@ class MonitorEvents:

@staticmethod
def from_ev44(ev44: eventdata_ev44.EventData) -> MonitorEvents:
_require_single_pulse(ev44)
return MonitorEvents(time_of_arrival=ev44.time_of_flight, unit='ns')


Expand All @@ -63,7 +85,6 @@ def __post_init__(self) -> None:

@staticmethod
def from_ev44(ev44: eventdata_ev44.EventData) -> DetectorEvents:
_require_single_pulse(ev44)
return DetectorEvents(
pixel_id=ev44.pixel_id, time_of_arrival=ev44.time_of_flight, unit='ns'
)
Expand Down
181 changes: 116 additions & 65 deletions src/ess/livedata/kafka/message_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __eq__(self, other: object) -> bool:


class MessageAdapter(Protocol, Generic[T, U]):
def adapt(self, message: T) -> U: ...
def adapt(self, message: T) -> Sequence[U]: ...


class KafkaAdapter(MessageAdapter[KafkaMessage, Message[T]]):
Expand All @@ -114,49 +114,67 @@ def get_stream_id(self, topic: str, source_name: str) -> StreamId:


class KafkaToEv44Adapter(KafkaAdapter[eventdata_ev44.EventData]):
def adapt(self, message: KafkaMessage) -> Message[eventdata_ev44.EventData]:
def adapt(
self, message: KafkaMessage
) -> Sequence[Message[eventdata_ev44.EventData]]:
from ..handlers.to_nxevent_data import split_ev44_pulses

ev44 = eventdata_ev44.deserialise_ev44(message.value())
stream = self.get_stream_id(topic=message.topic(), source_name=ev44.source_name)
# A fallback, useful in particular for testing so serialized data can be reused.
if ev44.reference_time.size > 0:
timestamp = ev44.reference_time[-1]
else:
timestamp = message.timestamp()[1]
return Message(timestamp=timestamp, stream=stream, value=ev44)
fallback_ts = message.timestamp()[1]
pulses = split_ev44_pulses(ev44)
return tuple(
Message(
timestamp=(
int(pulse.reference_time[0])
if len(pulse.reference_time) > 0
else fallback_ts
),
stream=stream,
value=pulse,
)
for pulse in pulses
)


class KafkaToDa00Adapter(KafkaAdapter[list[dataarray_da00.Variable]]):
def adapt(self, message: KafkaMessage) -> Message[list[dataarray_da00.Variable]]:
def adapt(
self, message: KafkaMessage
) -> Sequence[Message[list[dataarray_da00.Variable]]]:
da00: dataarray_da00.da00_DataArray_t
da00 = dataarray_da00.deserialise_da00(message.value()) # type: ignore[reportAssignmentType]
key = self.get_stream_id(topic=message.topic(), source_name=da00.source_name)
timestamp = da00.timestamp_ns
return Message(timestamp=timestamp, stream=key, value=da00.data)
return (Message(timestamp=timestamp, stream=key, value=da00.data),)


class KafkaToF144Adapter(KafkaAdapter[logdata_f144.ExtractedLogData]):
def __init__(self, *, stream_lut: StreamLUT | None = None):
super().__init__(stream_lut=stream_lut, stream_kind=StreamKind.LOG)

def adapt(self, message: KafkaMessage) -> Message[logdata_f144.ExtractedLogData]:
def adapt(
self, message: KafkaMessage
) -> Sequence[Message[logdata_f144.ExtractedLogData]]:
log_data = logdata_f144.deserialise_f144(message.value())
key = self.get_stream_id(
topic=message.topic(), source_name=log_data.source_name
)
timestamp = log_data.timestamp_unix_ns
return Message(timestamp=timestamp, stream=key, value=log_data)
return (Message(timestamp=timestamp, stream=key, value=log_data),)


class F144ToLogDataAdapter(
MessageAdapter[Message[logdata_f144.ExtractedLogData], Message[LogData]]
):
def adapt(
self, message: Message[logdata_f144.ExtractedLogData]
) -> Message[LogData]:
return Message(
timestamp=message.timestamp,
stream=message.stream,
value=LogData.from_f144(message.value),
) -> Sequence[Message[LogData]]:
return (
Message(
timestamp=message.timestamp,
stream=message.stream,
value=LogData.from_f144(message.value),
),
)


Expand All @@ -165,11 +183,13 @@ class Ev44ToMonitorEventsAdapter(
):
def adapt(
self, message: Message[eventdata_ev44.EventData]
) -> Message[MonitorEvents]:
return Message(
timestamp=message.timestamp,
stream=message.stream,
value=MonitorEvents.from_ev44(message.value),
) -> Sequence[Message[MonitorEvents]]:
return (
Message(
timestamp=message.timestamp,
stream=message.stream,
value=MonitorEvents.from_ev44(message.value),
),
)


Expand All @@ -182,11 +202,15 @@ class X5f2ToStatusAdapter(
Discriminates based on the `message_type` field in the x5f2 status_json.
"""

def adapt(self, message: KafkaMessage) -> Message[JobStatus | ServiceStatus]:
return Message(
timestamp=message.timestamp()[1],
stream=STATUS_STREAM_ID,
value=x5f2_to_status(message.value()),
def adapt(
self, message: KafkaMessage
) -> Sequence[Message[JobStatus | ServiceStatus]]:
return (
Message(
timestamp=message.timestamp()[1],
stream=STATUS_STREAM_ID,
value=x5f2_to_status(message.value()),
),
)


Expand All @@ -202,7 +226,7 @@ class KafkaToMonitorEventsAdapter(KafkaAdapter[MonitorEvents]):
def __init__(self, stream_lut: StreamLUT):
super().__init__(stream_lut=stream_lut, stream_kind=StreamKind.MONITOR_EVENTS)

def adapt(self, message: KafkaMessage) -> Message[MonitorEvents]:
def adapt(self, message: KafkaMessage) -> Sequence[Message[MonitorEvents]]:
buffer = message.value()
eventdata_ev44.check_schema_identifier(buffer, eventdata_ev44.FILE_IDENTIFIER)
event = Event44Message.Event44Message.GetRootAs(buffer, 0)
Expand All @@ -212,16 +236,32 @@ def adapt(self, message: KafkaMessage) -> Message[MonitorEvents]:
reference_time = event.ReferenceTimeAsNumpy()
time_of_arrival = event.TimeOfFlightAsNumpy()

# A fallback, useful in particular for testing so serialized data can be reused.
if reference_time.size > 0:
timestamp = reference_time[-1]
else:
timestamp = message.timestamp()[1]
return Message(
timestamp=timestamp,
stream=stream,
value=MonitorEvents(time_of_arrival=time_of_arrival, unit='ns'),
)
n_pulses = reference_time.size
if n_pulses <= 1:
timestamp = reference_time[0] if n_pulses == 1 else message.timestamp()[1]
return (
Message(
timestamp=timestamp,
stream=stream,
value=MonitorEvents(time_of_arrival=time_of_arrival, unit='ns'),
),
)
ref_index = event.ReferenceTimeIndexAsNumpy()
n_events = len(time_of_arrival)
results: list[Message[MonitorEvents]] = []
for i in range(n_pulses):
start = ref_index[i]
end = ref_index[i + 1] if i + 1 < len(ref_index) else n_events
results.append(
Message(
timestamp=int(reference_time[i]),
stream=stream,
value=MonitorEvents(
time_of_arrival=time_of_arrival[start:end], unit='ns'
),
)
)
return results


class Ev44ToDetectorEventsAdapter(
Expand All @@ -241,14 +281,16 @@ def __init__(self, *, merge_detectors: bool = False):

def adapt(
self, message: Message[eventdata_ev44.EventData]
) -> Message[DetectorEvents]:
) -> Sequence[Message[DetectorEvents]]:
stream = message.stream
if self._merge_detectors:
stream = replace(stream, name='unified_detector')
return Message(
timestamp=message.timestamp,
stream=stream,
value=DetectorEvents.from_ev44(message.value),
return (
Message(
timestamp=message.timestamp,
stream=stream,
value=DetectorEvents.from_ev44(message.value),
),
)


Expand All @@ -257,32 +299,38 @@ class Da00ToScippAdapter(
):
def adapt(
self, message: Message[list[dataarray_da00.Variable]]
) -> Message[sc.DataArray]:
return Message(
timestamp=message.timestamp,
stream=message.stream,
value=da00_to_scipp(message.value),
) -> Sequence[Message[sc.DataArray]]:
return (
Message(
timestamp=message.timestamp,
stream=message.stream,
value=da00_to_scipp(message.value),
),
)


class KafkaToAd00Adapter(KafkaAdapter[area_detector_ad00.ADArray]):
def adapt(self, message: KafkaMessage) -> Message[area_detector_ad00.ADArray]:
def adapt(
self, message: KafkaMessage
) -> Sequence[Message[area_detector_ad00.ADArray]]:
ad00 = area_detector_ad00.deserialise_ad00(message.value())
key = self.get_stream_id(topic=message.topic(), source_name=ad00.source_name)
timestamp = ad00.timestamp_ns
return Message(timestamp=timestamp, stream=key, value=ad00)
return (Message(timestamp=timestamp, stream=key, value=ad00),)


class Ad00ToScippAdapter(
MessageAdapter[Message[area_detector_ad00.ADArray], Message[sc.DataArray]]
):
def adapt(
self, message: Message[area_detector_ad00.ADArray]
) -> Message[sc.DataArray]:
return Message(
timestamp=message.timestamp,
stream=message.stream,
value=ad00_to_scipp(message.value),
) -> Sequence[Message[sc.DataArray]]:
return (
Message(
timestamp=message.timestamp,
stream=message.stream,
value=ad00_to_scipp(message.value),
),
)


Expand All @@ -295,21 +343,21 @@ class RawConfigItem:
class CommandsAdapter(MessageAdapter[KafkaMessage, Message[RawConfigItem]]):
"""Adapts Kafka messages from the livedata commands topic."""

def adapt(self, message: KafkaMessage) -> Message[RawConfigItem]:
def adapt(self, message: KafkaMessage) -> Sequence[Message[RawConfigItem]]:
timestamp = message.timestamp()[1]
# Livedata configuration uses a compacted Kafka topic. The Kafka message key
# is the encoded string representation of a :py:class:`ConfigKey` object.
item = RawConfigItem(key=message.key(), value=message.value())
return Message(stream=COMMANDS_STREAM_ID, timestamp=timestamp, value=item)
return (Message(stream=COMMANDS_STREAM_ID, timestamp=timestamp, value=item),)


class ResponsesAdapter(MessageAdapter[KafkaMessage, Message[CommandAcknowledgement]]):
"""Adapts Kafka messages from the livedata responses topic."""

def adapt(self, message: KafkaMessage) -> Message[CommandAcknowledgement]:
def adapt(self, message: KafkaMessage) -> Sequence[Message[CommandAcknowledgement]]:
timestamp = message.timestamp()[1]
ack = CommandAcknowledgement.model_validate_json(message.value())
return Message(stream=RESPONSES_STREAM_ID, timestamp=timestamp, value=ack)
return (Message(stream=RESPONSES_STREAM_ID, timestamp=timestamp, value=ack),)


class ChainedAdapter(MessageAdapter[T, V]):
Expand All @@ -321,9 +369,12 @@ def __init__(self, first: MessageAdapter[T, U], second: MessageAdapter[U, V]):
self._first = first
self._second = second

def adapt(self, message: T) -> V:
intermediate = self._first.adapt(message)
return self._second.adapt(intermediate)
def adapt(self, message: T) -> Sequence[V]:
return [
result
for intermediate in self._first.adapt(message)
for result in self._second.adapt(intermediate)
]


class RouteBySchemaAdapter(MessageAdapter[KafkaMessage, T]):
Expand All @@ -334,7 +385,7 @@ class RouteBySchemaAdapter(MessageAdapter[KafkaMessage, T]):
def __init__(self, routes: dict[str, MessageAdapter[KafkaMessage, T]]):
self._routes = routes

def adapt(self, message: KafkaMessage) -> T:
def adapt(self, message: KafkaMessage) -> Sequence[T]:
schema = streaming_data_types.utils.get_schema(message.value())
if schema is None:
raise streaming_data_types.exceptions.WrongSchemaException(
Expand Down Expand Up @@ -362,7 +413,7 @@ def topics(self) -> list[str]:
"""Returns the list of topics to subscribe to."""
return list(self._routes.keys())

def adapt(self, message: KafkaMessage) -> T:
def adapt(self, message: KafkaMessage) -> Sequence[T]:
topic = message.topic()
if topic not in self._routes:
raise KeyError(
Expand Down Expand Up @@ -403,7 +454,7 @@ def get_messages(self) -> Sequence[U]:
adapted = []
for msg in raw_messages:
try:
adapted.append(self._adapter.adapt(msg))
adapted.extend(self._adapter.adapt(msg))
except streaming_data_types.exceptions.WrongSchemaException:
logger.warning('Message %s has an unknown schema. Skipping.', msg)
if self._raise_on_error:
Expand Down
Loading
Loading