Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 66 additions & 14 deletions src/ess/livedata/handlers/to_nxevent_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@
from ess.livedata.core.handler import Accumulator


def _require_single_pulse(ev44: eventdata_ev44.EventData) -> None:
index = ev44.reference_time_index
if len(index) > 1 or index[0] != 0 or len(ev44.reference_time) > 1:
raise NotImplementedError("Processing multi-pulse messages is not supported.")


@dataclass
class MonitorEvents:
"""
Expand All @@ -35,9 +29,34 @@ class MonitorEvents:
unit: str

@staticmethod
def from_ev44(ev44: eventdata_ev44.EventData) -> MonitorEvents:
_require_single_pulse(ev44)
return MonitorEvents(time_of_arrival=ev44.time_of_flight, unit='ns')
def from_ev44(
ev44: eventdata_ev44.EventData,
) -> list[tuple[int | None, MonitorEvents]]:
"""Split an ev44 message into per-pulse MonitorEvents.

Returns a list of (timestamp, MonitorEvents) tuples, one per pulse.
If ``reference_time`` is empty, returns a single element with
``None`` as timestamp so the caller can fall back to the message
timestamp.
"""
ref_time = ev44.reference_time
ref_index = ev44.reference_time_index
tof = ev44.time_of_flight

if len(ref_time) == 0:
return [(None, MonitorEvents(time_of_arrival=tof, unit='ns'))]

results: list[tuple[int | None, MonitorEvents]] = []
for i in range(len(ref_time)):
start = ref_index[i]
end = ref_index[i + 1] if i + 1 < len(ref_index) else len(tof)
results.append(
(
int(ref_time[i]),
MonitorEvents(time_of_arrival=tof[start:end], unit='ns'),
)
)
return results


@dataclass
Expand All @@ -62,11 +81,44 @@ def __post_init__(self) -> None:
)

@staticmethod
def from_ev44(ev44: eventdata_ev44.EventData) -> DetectorEvents:
_require_single_pulse(ev44)
return DetectorEvents(
pixel_id=ev44.pixel_id, time_of_arrival=ev44.time_of_flight, unit='ns'
)
def from_ev44(
ev44: eventdata_ev44.EventData,
) -> list[tuple[int | None, DetectorEvents]]:
"""Split an ev44 message into per-pulse DetectorEvents.

Returns a list of (timestamp, DetectorEvents) tuples, one per pulse.
If ``reference_time`` is empty, returns a single element with
``None`` as timestamp so the caller can fall back to the message
timestamp.
"""
ref_time = ev44.reference_time
ref_index = ev44.reference_time_index
tof = ev44.time_of_flight
pixel_id = ev44.pixel_id

if len(ref_time) == 0:
return [
(
None,
DetectorEvents(pixel_id=pixel_id, time_of_arrival=tof, unit='ns'),
)
]

results: list[tuple[int | None, DetectorEvents]] = []
for i in range(len(ref_time)):
start = ref_index[i]
end = ref_index[i + 1] if i + 1 < len(ref_index) else len(tof)
results.append(
(
int(ref_time[i]),
DetectorEvents(
pixel_id=pixel_id[start:end],
time_of_arrival=tof[start:end],
unit='ns',
),
)
)
return results


Events = TypeVar('Events', DetectorEvents, MonitorEvents)
Expand Down
82 changes: 56 additions & 26 deletions src/ess/livedata/kafka/message_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,19 @@ def adapt(


class Ev44ToMonitorEventsAdapter(
MessageAdapter[Message[eventdata_ev44.EventData], Message[MonitorEvents]]
MessageAdapter[Message[eventdata_ev44.EventData], list[Message[MonitorEvents]]]
):
def adapt(
self, message: Message[eventdata_ev44.EventData]
) -> Message[MonitorEvents]:
return Message(
timestamp=message.timestamp,
stream=message.stream,
value=MonitorEvents.from_ev44(message.value),
)
) -> list[Message[MonitorEvents]]:
return [
Message(
timestamp=ts if ts is not None else message.timestamp,
stream=message.stream,
value=events,
)
for ts, events in MonitorEvents.from_ev44(message.value)
]


class X5f2ToStatusAdapter(
Expand Down Expand Up @@ -202,30 +205,49 @@ class KafkaToMonitorEventsAdapter(KafkaAdapter[MonitorEvents]):
def __init__(self, stream_lut: StreamLUT):
super().__init__(stream_lut=stream_lut, stream_kind=StreamKind.MONITOR_EVENTS)

def adapt(self, message: KafkaMessage) -> Message[MonitorEvents]:
def adapt(self, message: KafkaMessage) -> list[Message[MonitorEvents]]:
buffer = message.value()
eventdata_ev44.check_schema_identifier(buffer, eventdata_ev44.FILE_IDENTIFIER)
event = Event44Message.Event44Message.GetRootAs(buffer, 0)
stream = self.get_stream_id(
topic=message.topic(), source_name=event.SourceName().decode("utf-8")
)
reference_time = event.ReferenceTimeAsNumpy()
reference_time_index = event.ReferenceTimeIndexAsNumpy()
time_of_arrival = event.TimeOfFlightAsNumpy()

# A fallback, useful in particular for testing so serialized data can be reused.
if reference_time.size > 0:
timestamp = reference_time[-1]
else:
timestamp = message.timestamp()[1]
return Message(
timestamp=timestamp,
stream=stream,
value=MonitorEvents(time_of_arrival=time_of_arrival, unit='ns'),
)
if reference_time.size == 0:
# A fallback, useful for testing so serialized data can be reused.
return [
Message(
timestamp=message.timestamp()[1],
stream=stream,
value=MonitorEvents(time_of_arrival=time_of_arrival, unit='ns'),
)
]

results: list[Message[MonitorEvents]] = []
for i in range(len(reference_time)):
start = reference_time_index[i]
end = (
reference_time_index[i + 1]
if i + 1 < len(reference_time_index)
else len(time_of_arrival)
)
results.append(
Message(
timestamp=int(reference_time[i]),
stream=stream,
value=MonitorEvents(
time_of_arrival=time_of_arrival[start:end], unit='ns'
),
)
)
return results


class Ev44ToDetectorEventsAdapter(
MessageAdapter[Message[eventdata_ev44.EventData], Message[DetectorEvents]]
MessageAdapter[Message[eventdata_ev44.EventData], list[Message[DetectorEvents]]]
):
def __init__(self, *, merge_detectors: bool = False):
"""
Expand All @@ -241,15 +263,18 @@ def __init__(self, *, merge_detectors: bool = False):

def adapt(
self, message: Message[eventdata_ev44.EventData]
) -> Message[DetectorEvents]:
) -> list[Message[DetectorEvents]]:
stream = message.stream
if self._merge_detectors:
stream = replace(stream, name='unified_detector')
return Message(
timestamp=message.timestamp,
stream=stream,
value=DetectorEvents.from_ev44(message.value),
)
return [
Message(
timestamp=ts if ts is not None else message.timestamp,
stream=stream,
value=events,
)
for ts, events in DetectorEvents.from_ev44(message.value)
]


class Da00ToScippAdapter(
Expand Down Expand Up @@ -403,7 +428,7 @@ def get_messages(self) -> Sequence[U]:
adapted = []
for msg in raw_messages:
try:
adapted.append(self._adapter.adapt(msg))
result = self._adapter.adapt(msg)
except streaming_data_types.exceptions.WrongSchemaException:
logger.warning('Message %s has an unknown schema. Skipping.', msg)
if self._raise_on_error:
Expand All @@ -412,4 +437,9 @@ def get_messages(self) -> Sequence[U]:
logger.exception('Error adapting message %s: %s', msg, e)
if self._raise_on_error:
raise
else:
if isinstance(result, list):
adapted.extend(result)
else:
adapted.append(result)
return adapted
66 changes: 56 additions & 10 deletions tests/handlers/to_nxevent_data_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)


def test_MonitorEvents_from_ev44() -> None:
def test_MonitorEvents_from_ev44_no_reference_time() -> None:
ev44 = eventdata_ev44.EventData(
source_name='ignored',
message_id=0,
Expand All @@ -21,25 +21,71 @@ def test_MonitorEvents_from_ev44() -> None:
pixel_id=[1, 1, 1],
time_of_flight=[1, 2, 3],
)
monitor_events = MonitorEvents.from_ev44(ev44)
result = MonitorEvents.from_ev44(ev44)
assert len(result) == 1
ts, monitor_events = result[0]
assert ts is None
assert monitor_events.time_of_arrival == [1, 2, 3]
assert monitor_events.unit == 'ns'


@pytest.mark.parametrize('events_cls', [MonitorEvents, DetectorEvents])
def test_MonitorEvents_from_ev44_raises_with_multi_pulse_message(
events_cls: MonitorEvents,
) -> None:
def test_MonitorEvents_from_ev44_single_pulse_with_reference_time() -> None:
ev44 = eventdata_ev44.EventData(
source_name='ignored',
message_id=0,
reference_time=[1, 2],
reference_time_index=[0, 1],
reference_time=[1000],
reference_time_index=[0],
pixel_id=[1, 1, 1],
time_of_flight=[1, 2, 3],
)
with pytest.raises(NotImplementedError):
events_cls.from_ev44(ev44)
result = MonitorEvents.from_ev44(ev44)
assert len(result) == 1
ts, monitor_events = result[0]
assert ts == 1000
assert list(monitor_events.time_of_arrival) == [1, 2, 3]


def test_MonitorEvents_from_ev44_multi_pulse() -> None:
ev44 = eventdata_ev44.EventData(
source_name='ignored',
message_id=0,
reference_time=[100, 200, 300],
reference_time_index=[0, 2, 5],
pixel_id=[1, 1, 1, 1, 1, 1, 1],
time_of_flight=[10, 20, 30, 40, 50, 60, 70],
)
result = MonitorEvents.from_ev44(ev44)
assert len(result) == 3

assert result[0][0] == 100
assert list(result[0][1].time_of_arrival) == [10, 20]

assert result[1][0] == 200
assert list(result[1][1].time_of_arrival) == [30, 40, 50]

assert result[2][0] == 300
assert list(result[2][1].time_of_arrival) == [60, 70]


def test_DetectorEvents_from_ev44_multi_pulse() -> None:
ev44 = eventdata_ev44.EventData(
source_name='ignored',
message_id=0,
reference_time=[100, 200],
reference_time_index=[0, 3],
pixel_id=[1, 2, 3, 4, 5],
time_of_flight=[10, 20, 30, 40, 50],
)
result = DetectorEvents.from_ev44(ev44)
assert len(result) == 2

assert result[0][0] == 100
assert list(result[0][1].time_of_arrival) == [10, 20, 30]
assert list(result[0][1].pixel_id) == [1, 2, 3]

assert result[1][0] == 200
assert list(result[1][1].time_of_arrival) == [40, 50]
assert list(result[1][1].pixel_id) == [4, 5]


def test_MonitorEvents_ToNXevent_data() -> None:
Expand Down
Loading
Loading