From 3bc2034a1a251a29188d30fe7df28e848142e7b6 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Wed, 25 Feb 2026 10:23:51 +0000 Subject: [PATCH] Support multi-pulse ev44 messages (#708) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ev44 FlatBuffers schema supports packing events from multiple neutron pulses into a single message via reference_time (array of pulse timestamps) and reference_time_index (array of starting indices). Previously, _require_single_pulse() raised NotImplementedError for multi-pulse messages. Implement 1:N message expansion at the adapter layer: a multi-pulse ev44 message is split into N single-pulse Message objects, each with its per-pulse reference_time[i] as timestamp and sliced event arrays. Downstream code (accumulator, workflows) is unchanged. Key property: zero overhead for single-pulse messages — the loop iterates once, slicing [0:] which is a numpy view (no copy). Changes: - MonitorEvents.from_ev44() and DetectorEvents.from_ev44() now return list[tuple[int | None, Events]] with per-pulse slicing - Ev44ToMonitorEventsAdapter, Ev44ToDetectorEventsAdapter, and KafkaToMonitorEventsAdapter return list[Message[...]] - AdaptingMessageSource.get_messages() flattens list results - Remove _require_single_pulse() function Prompt: Implement the following plan: Support multi-pulse ev44 messages (#708) --- src/ess/livedata/handlers/to_nxevent_data.py | 80 +++++++++++++++---- src/ess/livedata/kafka/message_adapter.py | 82 +++++++++++++------- tests/handlers/to_nxevent_data_test.py | 66 +++++++++++++--- tests/kafka/message_adapter_test.py | 79 +++++++++++++++++-- 4 files changed, 251 insertions(+), 56 deletions(-) diff --git a/src/ess/livedata/handlers/to_nxevent_data.py b/src/ess/livedata/handlers/to_nxevent_data.py index 7d9c4110f..4eb521a3d 100644 --- a/src/ess/livedata/handlers/to_nxevent_data.py +++ b/src/ess/livedata/handlers/to_nxevent_data.py @@ -13,12 +13,6 @@ from ess.livedata.core.handler import Accumulator -def _require_single_pulse(ev44: eventdata_ev44.EventData) -> None: - index = ev44.reference_time_index - if len(index) > 1 or index[0] != 0 or len(ev44.reference_time) > 1: - raise NotImplementedError("Processing multi-pulse messages is not supported.") - - @dataclass class MonitorEvents: """ @@ -35,9 +29,34 @@ class MonitorEvents: unit: str @staticmethod - def from_ev44(ev44: eventdata_ev44.EventData) -> MonitorEvents: - _require_single_pulse(ev44) - return MonitorEvents(time_of_arrival=ev44.time_of_flight, unit='ns') + def from_ev44( + ev44: eventdata_ev44.EventData, + ) -> list[tuple[int | None, MonitorEvents]]: + """Split an ev44 message into per-pulse MonitorEvents. + + Returns a list of (timestamp, MonitorEvents) tuples, one per pulse. + If ``reference_time`` is empty, returns a single element with + ``None`` as timestamp so the caller can fall back to the message + timestamp. + """ + ref_time = ev44.reference_time + ref_index = ev44.reference_time_index + tof = ev44.time_of_flight + + if len(ref_time) == 0: + return [(None, MonitorEvents(time_of_arrival=tof, unit='ns'))] + + results: list[tuple[int | None, MonitorEvents]] = [] + for i in range(len(ref_time)): + start = ref_index[i] + end = ref_index[i + 1] if i + 1 < len(ref_index) else len(tof) + results.append( + ( + int(ref_time[i]), + MonitorEvents(time_of_arrival=tof[start:end], unit='ns'), + ) + ) + return results @dataclass @@ -62,11 +81,44 @@ def __post_init__(self) -> None: ) @staticmethod - def from_ev44(ev44: eventdata_ev44.EventData) -> DetectorEvents: - _require_single_pulse(ev44) - return DetectorEvents( - pixel_id=ev44.pixel_id, time_of_arrival=ev44.time_of_flight, unit='ns' - ) + def from_ev44( + ev44: eventdata_ev44.EventData, + ) -> list[tuple[int | None, DetectorEvents]]: + """Split an ev44 message into per-pulse DetectorEvents. + + Returns a list of (timestamp, DetectorEvents) tuples, one per pulse. + If ``reference_time`` is empty, returns a single element with + ``None`` as timestamp so the caller can fall back to the message + timestamp. + """ + ref_time = ev44.reference_time + ref_index = ev44.reference_time_index + tof = ev44.time_of_flight + pixel_id = ev44.pixel_id + + if len(ref_time) == 0: + return [ + ( + None, + DetectorEvents(pixel_id=pixel_id, time_of_arrival=tof, unit='ns'), + ) + ] + + results: list[tuple[int | None, DetectorEvents]] = [] + for i in range(len(ref_time)): + start = ref_index[i] + end = ref_index[i + 1] if i + 1 < len(ref_index) else len(tof) + results.append( + ( + int(ref_time[i]), + DetectorEvents( + pixel_id=pixel_id[start:end], + time_of_arrival=tof[start:end], + unit='ns', + ), + ) + ) + return results Events = TypeVar('Events', DetectorEvents, MonitorEvents) diff --git a/src/ess/livedata/kafka/message_adapter.py b/src/ess/livedata/kafka/message_adapter.py index bb5aed03e..2be256e99 100644 --- a/src/ess/livedata/kafka/message_adapter.py +++ b/src/ess/livedata/kafka/message_adapter.py @@ -161,16 +161,19 @@ def adapt( class Ev44ToMonitorEventsAdapter( - MessageAdapter[Message[eventdata_ev44.EventData], Message[MonitorEvents]] + MessageAdapter[Message[eventdata_ev44.EventData], list[Message[MonitorEvents]]] ): def adapt( self, message: Message[eventdata_ev44.EventData] - ) -> Message[MonitorEvents]: - return Message( - timestamp=message.timestamp, - stream=message.stream, - value=MonitorEvents.from_ev44(message.value), - ) + ) -> list[Message[MonitorEvents]]: + return [ + Message( + timestamp=ts if ts is not None else message.timestamp, + stream=message.stream, + value=events, + ) + for ts, events in MonitorEvents.from_ev44(message.value) + ] class X5f2ToStatusAdapter( @@ -202,7 +205,7 @@ class KafkaToMonitorEventsAdapter(KafkaAdapter[MonitorEvents]): def __init__(self, stream_lut: StreamLUT): super().__init__(stream_lut=stream_lut, stream_kind=StreamKind.MONITOR_EVENTS) - def adapt(self, message: KafkaMessage) -> Message[MonitorEvents]: + def adapt(self, message: KafkaMessage) -> list[Message[MonitorEvents]]: buffer = message.value() eventdata_ev44.check_schema_identifier(buffer, eventdata_ev44.FILE_IDENTIFIER) event = Event44Message.Event44Message.GetRootAs(buffer, 0) @@ -210,22 +213,41 @@ def adapt(self, message: KafkaMessage) -> Message[MonitorEvents]: topic=message.topic(), source_name=event.SourceName().decode("utf-8") ) reference_time = event.ReferenceTimeAsNumpy() + reference_time_index = event.ReferenceTimeIndexAsNumpy() time_of_arrival = event.TimeOfFlightAsNumpy() - # A fallback, useful in particular for testing so serialized data can be reused. - if reference_time.size > 0: - timestamp = reference_time[-1] - else: - timestamp = message.timestamp()[1] - return Message( - timestamp=timestamp, - stream=stream, - value=MonitorEvents(time_of_arrival=time_of_arrival, unit='ns'), - ) + if reference_time.size == 0: + # A fallback, useful for testing so serialized data can be reused. + return [ + Message( + timestamp=message.timestamp()[1], + stream=stream, + value=MonitorEvents(time_of_arrival=time_of_arrival, unit='ns'), + ) + ] + + results: list[Message[MonitorEvents]] = [] + for i in range(len(reference_time)): + start = reference_time_index[i] + end = ( + reference_time_index[i + 1] + if i + 1 < len(reference_time_index) + else len(time_of_arrival) + ) + results.append( + Message( + timestamp=int(reference_time[i]), + stream=stream, + value=MonitorEvents( + time_of_arrival=time_of_arrival[start:end], unit='ns' + ), + ) + ) + return results class Ev44ToDetectorEventsAdapter( - MessageAdapter[Message[eventdata_ev44.EventData], Message[DetectorEvents]] + MessageAdapter[Message[eventdata_ev44.EventData], list[Message[DetectorEvents]]] ): def __init__(self, *, merge_detectors: bool = False): """ @@ -241,15 +263,18 @@ def __init__(self, *, merge_detectors: bool = False): def adapt( self, message: Message[eventdata_ev44.EventData] - ) -> Message[DetectorEvents]: + ) -> list[Message[DetectorEvents]]: stream = message.stream if self._merge_detectors: stream = replace(stream, name='unified_detector') - return Message( - timestamp=message.timestamp, - stream=stream, - value=DetectorEvents.from_ev44(message.value), - ) + return [ + Message( + timestamp=ts if ts is not None else message.timestamp, + stream=stream, + value=events, + ) + for ts, events in DetectorEvents.from_ev44(message.value) + ] class Da00ToScippAdapter( @@ -403,7 +428,7 @@ def get_messages(self) -> Sequence[U]: adapted = [] for msg in raw_messages: try: - adapted.append(self._adapter.adapt(msg)) + result = self._adapter.adapt(msg) except streaming_data_types.exceptions.WrongSchemaException: logger.warning('Message %s has an unknown schema. Skipping.', msg) if self._raise_on_error: @@ -412,4 +437,9 @@ def get_messages(self) -> Sequence[U]: logger.exception('Error adapting message %s: %s', msg, e) if self._raise_on_error: raise + else: + if isinstance(result, list): + adapted.extend(result) + else: + adapted.append(result) return adapted diff --git a/tests/handlers/to_nxevent_data_test.py b/tests/handlers/to_nxevent_data_test.py index 0739ca2bb..63896ab20 100644 --- a/tests/handlers/to_nxevent_data_test.py +++ b/tests/handlers/to_nxevent_data_test.py @@ -12,7 +12,7 @@ ) -def test_MonitorEvents_from_ev44() -> None: +def test_MonitorEvents_from_ev44_no_reference_time() -> None: ev44 = eventdata_ev44.EventData( source_name='ignored', message_id=0, @@ -21,25 +21,71 @@ def test_MonitorEvents_from_ev44() -> None: pixel_id=[1, 1, 1], time_of_flight=[1, 2, 3], ) - monitor_events = MonitorEvents.from_ev44(ev44) + result = MonitorEvents.from_ev44(ev44) + assert len(result) == 1 + ts, monitor_events = result[0] + assert ts is None assert monitor_events.time_of_arrival == [1, 2, 3] assert monitor_events.unit == 'ns' -@pytest.mark.parametrize('events_cls', [MonitorEvents, DetectorEvents]) -def test_MonitorEvents_from_ev44_raises_with_multi_pulse_message( - events_cls: MonitorEvents, -) -> None: +def test_MonitorEvents_from_ev44_single_pulse_with_reference_time() -> None: ev44 = eventdata_ev44.EventData( source_name='ignored', message_id=0, - reference_time=[1, 2], - reference_time_index=[0, 1], + reference_time=[1000], + reference_time_index=[0], pixel_id=[1, 1, 1], time_of_flight=[1, 2, 3], ) - with pytest.raises(NotImplementedError): - events_cls.from_ev44(ev44) + result = MonitorEvents.from_ev44(ev44) + assert len(result) == 1 + ts, monitor_events = result[0] + assert ts == 1000 + assert list(monitor_events.time_of_arrival) == [1, 2, 3] + + +def test_MonitorEvents_from_ev44_multi_pulse() -> None: + ev44 = eventdata_ev44.EventData( + source_name='ignored', + message_id=0, + reference_time=[100, 200, 300], + reference_time_index=[0, 2, 5], + pixel_id=[1, 1, 1, 1, 1, 1, 1], + time_of_flight=[10, 20, 30, 40, 50, 60, 70], + ) + result = MonitorEvents.from_ev44(ev44) + assert len(result) == 3 + + assert result[0][0] == 100 + assert list(result[0][1].time_of_arrival) == [10, 20] + + assert result[1][0] == 200 + assert list(result[1][1].time_of_arrival) == [30, 40, 50] + + assert result[2][0] == 300 + assert list(result[2][1].time_of_arrival) == [60, 70] + + +def test_DetectorEvents_from_ev44_multi_pulse() -> None: + ev44 = eventdata_ev44.EventData( + source_name='ignored', + message_id=0, + reference_time=[100, 200], + reference_time_index=[0, 3], + pixel_id=[1, 2, 3, 4, 5], + time_of_flight=[10, 20, 30, 40, 50], + ) + result = DetectorEvents.from_ev44(ev44) + assert len(result) == 2 + + assert result[0][0] == 100 + assert list(result[0][1].time_of_arrival) == [10, 20, 30] + assert list(result[0][1].pixel_id) == [1, 2, 3] + + assert result[1][0] == 200 + assert list(result[1][1].time_of_arrival) == [40, 50] + assert list(result[1][1].pixel_id) == [4, 5] def test_MonitorEvents_ToNXevent_data() -> None: diff --git a/tests/kafka/message_adapter_test.py b/tests/kafka/message_adapter_test.py index 59da99081..7e466fe98 100644 --- a/tests/kafka/message_adapter_test.py +++ b/tests/kafka/message_adapter_test.py @@ -122,7 +122,7 @@ def test_adapter(self) -> None: InputStreamKey(topic="monitors", source_name="monitor1"): "monitor_0" } ) - result = adapter.adapt(message) + [result] = adapter.adapt(message) assert result.stream.kind == StreamKind.MONITOR_EVENTS assert result.stream.name == "monitor_0" @@ -149,10 +149,34 @@ def test_no_reference_time_uses_message_timestamp(self) -> None: InputStreamKey(topic="monitors", source_name="monitor1"): "monitor_0" } ) - result = adapter.adapt(message) + [result] = adapter.adapt(message) assert result.timestamp == 9999 + def test_multi_pulse(self) -> None: + multi_pulse_ev44 = eventdata_ev44.serialise_ev44( + source_name="monitor1", + message_id=0, + reference_time=np.array([100, 200]), + reference_time_index=np.array([0, 2]), + time_of_flight=np.array([10, 20, 30]), + pixel_id=np.array([1, 1, 1]), + ) + message = FakeKafkaMessage(value=multi_pulse_ev44, topic="monitors") + adapter = KafkaToMonitorEventsAdapter( + stream_lut={ + InputStreamKey(topic="monitors", source_name="monitor1"): "monitor_0" + } + ) + results = adapter.adapt(message) + assert len(results) == 2 + + assert results[0].timestamp == 100 + assert list(results[0].value.time_of_arrival) == [10, 20] + + assert results[1].timestamp == 200 + assert list(results[1].value.time_of_arrival) == [30] + def test_wrong_schema_raises_exception(self, monkeypatch) -> None: """Test that providing wrong schema raises exception.""" @@ -322,14 +346,14 @@ def test_adapter(self) -> None: ), ) adapter = Ev44ToDetectorEventsAdapter() - result = adapter.adapt(ev44_message) + [result] = adapter.adapt(ev44_message) assert result.timestamp == 1234 assert result.stream.kind == StreamKind.DETECTOR_EVENTS assert result.stream.name == "detector1" assert isinstance(result.value, DetectorEvents) - assert result.value.time_of_arrival == [123456] - assert result.value.pixel_id == [1] + assert list(result.value.time_of_arrival) == [123456] + assert list(result.value.pixel_id) == [1] def test_adapter_merge_detectors(self) -> None: ev44_message = Message( @@ -345,11 +369,54 @@ def test_adapter_merge_detectors(self) -> None: ), ) adapter = Ev44ToDetectorEventsAdapter(merge_detectors=True) - result = adapter.adapt(ev44_message) + [result] = adapter.adapt(ev44_message) assert result.stream.name == "unified_detector" assert isinstance(result.value, DetectorEvents) + def test_multi_pulse(self) -> None: + ev44_message = Message( + timestamp=1234, + stream=StreamId(kind=StreamKind.DETECTOR_EVENTS, name="detector1"), + value=eventdata_ev44.EventData( + source_name="detector1", + message_id=0, + reference_time=np.array([100, 200]), + reference_time_index=[0, 2], + time_of_flight=np.array([10, 20, 30, 40]), + pixel_id=np.array([1, 2, 3, 4]), + ), + ) + adapter = Ev44ToDetectorEventsAdapter() + results = adapter.adapt(ev44_message) + assert len(results) == 2 + + assert results[0].timestamp == 100 + assert list(results[0].value.time_of_arrival) == [10, 20] + assert list(results[0].value.pixel_id) == [1, 2] + + assert results[1].timestamp == 200 + assert list(results[1].value.time_of_arrival) == [30, 40] + assert list(results[1].value.pixel_id) == [3, 4] + + def test_multi_pulse_merge_detectors(self) -> None: + ev44_message = Message( + timestamp=1234, + stream=StreamId(kind=StreamKind.DETECTOR_EVENTS, name="detector2"), + value=eventdata_ev44.EventData( + source_name="detector2", + message_id=0, + reference_time=np.array([100, 200]), + reference_time_index=[0, 1], + time_of_flight=np.array([10, 20]), + pixel_id=np.array([1, 2]), + ), + ) + adapter = Ev44ToDetectorEventsAdapter(merge_detectors=True) + results = adapter.adapt(ev44_message) + assert len(results) == 2 + assert all(r.stream.name == "unified_detector" for r in results) + def message_with_schema(schema: str) -> KafkaMessage: """