diff --git a/src/ess/livedata/config/instrument.py b/src/ess/livedata/config/instrument.py index 7b6163bb8..65dfbec65 100644 --- a/src/ess/livedata/config/instrument.py +++ b/src/ess/livedata/config/instrument.py @@ -3,12 +3,16 @@ from __future__ import annotations from collections import UserDict -from collections.abc import Callable, Sequence +from collections.abc import Callable, Mapping, Sequence from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any if TYPE_CHECKING: + import sciline + from ess.livedata.handlers.detector_view_specs import SpectrumViewSpec + from ess.livedata.handlers.log_context import LogContextBinding + from ess.livedata.handlers.stream_processor_workflow import ValueLog import pydantic import scipp as sc @@ -90,6 +94,7 @@ class Instrument: monitors: list[str] = field(default_factory=list) workflow_factory: WorkflowFactory = field(default_factory=WorkflowFactory) f144_attribute_registry: dict[str, dict[str, Any]] = field(default_factory=dict) + log_context_bindings: list[LogContextBinding] = field(default_factory=list) source_metadata: dict[str, SourceMetadata] = field(default_factory=dict) _detector_numbers: dict[str, sc.Variable] = field(default_factory=dict) _nexus_file: str | None = None @@ -107,6 +112,15 @@ def __post_init__(self) -> None: register_timeseries_workflow_specs, ) + # Auto-derive f144_attribute_registry entries from bindings that + # declare units. Explicit registry entries win on conflict. + for binding in self.log_context_bindings: + if binding.units is None: + continue + self.f144_attribute_registry.setdefault( + binding.stream_name, {'units': binding.units} + ) + timeseries_names = list(self.f144_attribute_registry.keys()) self._timeseries_workflow_handle = register_timeseries_workflow_specs( instrument=self, source_names=timeseries_names @@ -350,6 +364,64 @@ def add_logical_view( ) return handle + def apply_dynamic_transforms( + self, + workflow: sciline.Pipeline, + components: Mapping[str, type], + ) -> dict[str, type[ValueLog]]: + """Patch ``workflow`` to drive matching NXlog placeholders from f144 streams. + + For each ``(source_name, component_type)`` entry, selects every + :class:`DynamicTransformBinding` in :attr:`log_context_bindings` + whose ``dependent_sources`` set includes that source name. For each + component type with at least one match, replaces the + ``NeXusTransformationChain[T, SampleRun]`` provider with a closure + that consumes the matched bindings' ``log_key`` parameters and + writes the latest sample into the chain. Non-transform bindings + on the instrument are ignored. + + Parameters + ---------- + workflow: + The Sciline pipeline to patch in place. + components: + Mapping ``source_name -> component_type`` for the + essreduce-loaded NeXus components whose ``depends_on`` chain + might need patching (e.g. ``{'loki_detector_0': NXdetector, + aux_source_names['incident_monitor']: Incident, ...}``). + Callers own the alias resolution: source names are the + actual on-disk names, not aliases. + + Returns + ------- + : + Mapping ``stream_name -> log_key`` for every binding that + actually matched; pass to :class:`StreamProcessorWorkflow` + as ``context_keys`` so SPW's wrapping rule delivers each + f144 NXlog to the right Sciline parameter. + """ + from ess.livedata.handlers.dynamic_transforms import ( + DynamicTransformBinding, + build_patched_chain_provider, + ) + + transform_bindings = [ + b + for b in self.log_context_bindings + if isinstance(b, DynamicTransformBinding) + ] + context_keys: dict[str, type[ValueLog]] = {} + for source_name, component_type in components.items(): + matched = [ + b for b in transform_bindings if source_name in b.dependent_sources + ] + if not matched: + continue + workflow.insert(build_patched_chain_provider(component_type, matched)) + for binding in matched: + context_keys[binding.stream_name] = binding.log_key + return context_keys + def register_spec( self, *, @@ -410,6 +482,14 @@ def register_spec( ------- Handle for attaching factory later. """ + # Merge log-context aux sources scoped per spec source list, so + # factories whose bindings cover any of the spec's sources + # automatically have the routing layer deliver matching f144 streams. + if self.log_context_bindings and source_names: + from ess.livedata.handlers.log_context import compose_aux_sources + + aux_sources = compose_aux_sources(self, list(source_names), aux_sources) + spec = WorkflowSpec( instrument=self.name, group=group, @@ -450,10 +530,7 @@ def load_factories(self) -> None: ) if self._logical_views: - from ess.livedata.handlers.detector_view import ( - DetectorViewFactory, - InstrumentDetectorSource, - ) + from ess.livedata.handlers.detector_view import DetectorViewFactory from ess.livedata.handlers.detector_view import ( LogicalViewConfig as ScilineLogicalViewConfig, ) @@ -467,8 +544,8 @@ def load_factories(self) -> None: spectrum_view=config.spectrum_view, ) factory = DetectorViewFactory( - data_source=InstrumentDetectorSource(self), view_config=view_config, + instrument=self, ) handle.attach_factory()(factory.make_workflow) diff --git a/src/ess/livedata/config/instruments/bifrost/factories.py b/src/ess/livedata/config/instruments/bifrost/factories.py index ebbd8757c..3f69e7199 100644 --- a/src/ess/livedata/config/instruments/bifrost/factories.py +++ b/src/ess/livedata/config/instruments/bifrost/factories.py @@ -80,6 +80,7 @@ def _monitor_workflow_factory(source_name: str, params: TOAOnlyMonitorDataParams edges=params.get_active_edges(), range_filter=params.get_active_range(), coordinate_mode='toa', + instrument=instrument, ) # Create base reduction workflow diff --git a/src/ess/livedata/config/instruments/dream/factories.py b/src/ess/livedata/config/instruments/dream/factories.py index 889fbdc21..055d3604f 100644 --- a/src/ess/livedata/config/instruments/dream/factories.py +++ b/src/ess/livedata/config/instruments/dream/factories.py @@ -92,6 +92,7 @@ def _resolve_lookup_table_filename(instrument_configuration): flip_x=True, ), }, + instrument=instrument, ) @specs.projection_handle.attach_factory() @@ -133,6 +134,7 @@ def _monitor_workflow_factory(source_name: str, params: DreamMonitorDataParams): coordinate_mode=mode, lookup_table_filename=lookup_table_filename, geometry_filename=geometry_filename, + instrument=instrument, ) # Powder reduction workflow setup diff --git a/src/ess/livedata/config/instruments/dummy/factories.py b/src/ess/livedata/config/instruments/dummy/factories.py index e8eb8e853..f1d0315cb 100644 --- a/src/ess/livedata/config/instruments/dummy/factories.py +++ b/src/ess/livedata/config/instruments/dummy/factories.py @@ -29,7 +29,6 @@ def setup_factories(instrument: Instrument) -> None: from ess.livedata.handlers.area_detector_view import AreaDetectorView from ess.livedata.handlers.detector_view import ( DetectorViewFactory, - InstrumentDetectorSource, LogicalViewConfig, ) from ess.livedata.handlers.stream_processor_workflow import StreamProcessorWorkflow @@ -44,8 +43,8 @@ def setup_factories(instrument: Instrument) -> None: # Create detector view for event-mode panel_0 using Sciline-based factory _panel_0_view = DetectorViewFactory( - data_source=InstrumentDetectorSource(instrument), view_config=LogicalViewConfig(), # Identity transform + instrument=instrument, ) specs.panel_0_view_handle.attach_factory()(_panel_0_view.make_workflow) @@ -78,6 +77,7 @@ def _monitor_workflow_factory(source_name: str, params: TOAOnlyMonitorDataParams edges=params.get_active_edges(), range_filter=params.get_active_range(), coordinate_mode='toa', + instrument=instrument, ) # Total counts workflow diff --git a/src/ess/livedata/config/instruments/estia/factories.py b/src/ess/livedata/config/instruments/estia/factories.py index 1a92e2ddd..ebeab4d5b 100644 --- a/src/ess/livedata/config/instruments/estia/factories.py +++ b/src/ess/livedata/config/instruments/estia/factories.py @@ -54,6 +54,7 @@ def _monitor_workflow_factory(source_name: str, params: TOAOnlyMonitorDataParams edges=params.get_active_edges(), range_filter=params.get_active_range(), coordinate_mode='toa', + instrument=instrument, ) @specs.reflectometry_reduction_handle.attach_factory() diff --git a/src/ess/livedata/config/instruments/loki/factories.py b/src/ess/livedata/config/instruments/loki/factories.py index 89a9b56d0..e4d38c666 100644 --- a/src/ess/livedata/config/instruments/loki/factories.py +++ b/src/ess/livedata/config/instruments/loki/factories.py @@ -51,8 +51,6 @@ def setup_factories(instrument: Instrument) -> None: StreamProcessorWorkflow, ) - from .specs import LOKI_DYNAMIC_TRANSFORMS - _nexus_geometry_filename = get_nexus_geometry_filename('loki') def _resolve_lookup_table_filename() -> str: @@ -102,11 +100,11 @@ def _make_base_workflow() -> LokiWorkflow: ) for name, res in _bank_resolutions.items() }, - # Drive the rear bank's NeXus 'detector_carriage' transformation - # from the live f144 carriage readback. The mapping is shared with - # loki/specs.py so the spec routes the stream only to the consuming - # source. - dynamic_transforms=LOKI_DYNAMIC_TRANSFORMS, + # Apply the instrument's dynamic-transform registry so the rear + # bank's carriage NXlog placeholder is driven by the live f144 + # readback. Other banks have no matching binding, so the helper is + # a no-op for them. + instrument=instrument, ) from ess.livedata.handlers.detector_view_specs import DetectorViewParams @@ -157,6 +155,7 @@ def _monitor_workflow_factory(source_name: str, params: MonitorDataParams): coordinate_mode=mode, lookup_table_filename=lookup_table_filename, geometry_filename=geometry_filename, + instrument=instrument, ) # --- Providers for current_run transmission mode --- @@ -209,6 +208,18 @@ def _i_of_q_factory( wf[sans_types.WavelengthBins] = params.wavelength_edges.get_edges() wf[BeamCenter] = params.beam_center.get_vector() + # Patch the workflow to drive any matching NXlog placeholder along + # the loaded components' depends_on chains from f144 streams. + # For LOKI today this covers the rear-bank carriage (issue #922). + context_keys = instrument.apply_dynamic_transforms( + wf, + { + source_name: NXdetector, + aux_source_names['incident_monitor']: Incident, + aux_source_names['transmission_monitor']: Transmission, + }, + ) + target_keys: dict[str, sciline.typing.Key] = { 'i_of_q': IntensityQ[SampleRun], } @@ -235,6 +246,7 @@ def _i_of_q_factory( return StreamProcessorWorkflow( wf, dynamic_keys=_dynamic_keys(source_name), + context_keys=context_keys, target_keys=target_keys, accumulators=_accumulators, ) diff --git a/src/ess/livedata/config/instruments/loki/specs.py b/src/ess/livedata/config/instruments/loki/specs.py index 5e5d41e6e..50b46918a 100644 --- a/src/ess/livedata/config/instruments/loki/specs.py +++ b/src/ess/livedata/config/instruments/loki/specs.py @@ -17,32 +17,40 @@ AuxSources, WorkflowOutputsBase, ) -from ess.livedata.handlers.detector_view.types import TransformValueStream -from ess.livedata.handlers.detector_view_specs import ( - DetectorROIAuxSources, - register_detector_view_spec, -) +from ess.livedata.handlers.detector_view_specs import register_detector_view_spec +from ess.livedata.handlers.dynamic_transforms import DynamicTransformBinding from ess.livedata.handlers.monitor_workflow_specs import ( MonitorDataParams, register_monitor_workflow_specs, ) +from ess.livedata.handlers.stream_processor_workflow import ValueLog from ess.livedata.handlers.wavelength_lut_workflow_specs import ( register_wavelength_lut_workflow_spec, ) from .views import get_tube_view -#: Per-source bindings of NeXus transformation entries to live f144 streams. -#: Single source of truth shared between the spec (for routing via -#: ``DetectorROIAuxSources``) and the factory (for graph wiring via -#: ``DetectorViewFactory(dynamic_transforms=...)``). Only the rear bank has -#: a live carriage readback; other banks have no dynamic geometry. -LOKI_DYNAMIC_TRANSFORMS: dict[str, TransformValueStream] = { - 'loki_detector_0': TransformValueStream( - transform_name='/entry/instrument/detector_carriage/value', - aux_stream='detector_carriage', + +#: Sciline key for the rear-detector carriage f144 NXlog. Subclassing +#: ValueLog gives the binding its own grep-able Sciline node, distinct +#: from any future bindings on the same component type. +class DetectorCarriageLog(ValueLog): + """Carriage f144 NXlog (drives ``loki_detector_0`` chain).""" + + +#: Dynamic-transform bindings for LOKI. Only the rear bank's carriage entry +#: is dynamic; other banks have no live position readback. ``beam_monitor_m4`` +#: is also movable (on the carriage, probably), but we currently have no file +#: with a correct depends_on chain. +LOKI_DYNAMIC_TRANSFORMS = [ + DynamicTransformBinding( + nxlog_path='/entry/instrument/detector_carriage/value', + stream_name='detector_carriage', + log_key=DetectorCarriageLog, + dependent_sources=frozenset({'loki_detector_0'}), + units='mm', ), -} +] class TransmissionMode(StrEnum): @@ -209,6 +217,7 @@ class SansWorkflowParams(pydantic.BaseModel): f144_attribute_registry={ name: {'units': info['units']} for name, info in f144_log_streams.items() }, + log_context_bindings=LOKI_DYNAMIC_TRANSFORMS, source_metadata={ 'loki_detector_0': SourceMetadata(title='Rear'), 'loki_detector_1': SourceMetadata(title='Mid Top'), @@ -275,7 +284,6 @@ class SansWorkflowParams(pydantic.BaseModel): instrument=instrument, projection='xy_plane', source_names=detector_names, - aux_sources=DetectorROIAuxSources(dynamic_transforms=LOKI_DYNAMIC_TRANSFORMS), ) # Register tube view for all detector banks diff --git a/src/ess/livedata/config/instruments/nmx/factories.py b/src/ess/livedata/config/instruments/nmx/factories.py index 0e0eeecc0..a9f5a9f71 100644 --- a/src/ess/livedata/config/instruments/nmx/factories.py +++ b/src/ess/livedata/config/instruments/nmx/factories.py @@ -16,7 +16,6 @@ def setup_factories(instrument: Instrument) -> None: # Lazy imports from ess.livedata.handlers.detector_view import ( DetectorViewFactory, - InstrumentDetectorSource, LogicalViewConfig, ) @@ -37,8 +36,8 @@ def setup_factories(instrument: Instrument) -> None: # Create detector view using Sciline-based factory (identity transform) _nmx_panels_view = DetectorViewFactory( - data_source=InstrumentDetectorSource(instrument), view_config=LogicalViewConfig(), # Identity transform + instrument=instrument, ) specs.panel_xy_view_handle.attach_factory()(_nmx_panels_view.make_workflow) @@ -55,4 +54,5 @@ def _monitor_workflow_factory(source_name: str, params: TOAOnlyMonitorDataParams edges=params.get_active_edges(), range_filter=params.get_active_range(), coordinate_mode='toa', + instrument=instrument, ) diff --git a/src/ess/livedata/config/instruments/odin/factories.py b/src/ess/livedata/config/instruments/odin/factories.py index bbacbb5b1..7d2b657c0 100644 --- a/src/ess/livedata/config/instruments/odin/factories.py +++ b/src/ess/livedata/config/instruments/odin/factories.py @@ -28,4 +28,5 @@ def _monitor_workflow_factory(source_name: str, params: TOAOnlyMonitorDataParams edges=params.get_active_edges(), range_filter=params.get_active_range(), coordinate_mode='toa', + instrument=instrument, ) diff --git a/src/ess/livedata/config/instruments/tbl/factories.py b/src/ess/livedata/config/instruments/tbl/factories.py index b94dae682..2ca4fe3e0 100644 --- a/src/ess/livedata/config/instruments/tbl/factories.py +++ b/src/ess/livedata/config/instruments/tbl/factories.py @@ -29,4 +29,5 @@ def _monitor_workflow_factory(source_name: str, params: TOAOnlyMonitorDataParams edges=params.get_active_edges(), range_filter=params.get_active_range(), coordinate_mode='toa', + instrument=instrument, ) diff --git a/src/ess/livedata/config/workflow_spec.py b/src/ess/livedata/config/workflow_spec.py index 731efa7c8..c448ac0b9 100644 --- a/src/ess/livedata/config/workflow_spec.py +++ b/src/ess/livedata/config/workflow_spec.py @@ -194,6 +194,31 @@ def render( return result +class CombinedAuxSources(AuxSources): + """Composes multiple :class:`AuxSources` into one. + + Inputs are merged (later components override earlier on key collisions); + ``render`` dispatches to each component and merges the results. + """ + + def __init__(self, components: list[AuxSources]) -> None: + self._components = components + merged: dict[str, str | AuxInput] = {} + for comp in components: + merged.update(comp.inputs) + super().__init__(merged) + + def render( + self, + job_id: JobId, + selections: dict[str, str] | None = None, + ) -> dict[str, str]: + result: dict[str, str] = {} + for comp in self._components: + result.update(comp.render(job_id, selections)) + return result + + class ResultKey(BaseModel, frozen=True): # Workflows produce one or more named outputs. Each output is serialized as a # separate da00 message. The output_name identifies which output this key refers to. diff --git a/src/ess/livedata/handlers/detector_view/__init__.py b/src/ess/livedata/handlers/detector_view/__init__.py index 0eec9ba56..6e48a1be5 100644 --- a/src/ess/livedata/handlers/detector_view/__init__.py +++ b/src/ess/livedata/handlers/detector_view/__init__.py @@ -15,23 +15,20 @@ 2. Logical views (fold/slice transforms with optional reduction) """ -from .data_source import InstrumentDetectorSource, NeXusDetectorSource +from .data_source import NeXusDetectorSource from .factory import DetectorViewFactory from .types import ( CoordinateMode, GeometricViewConfig, LogicalViewConfig, SpectrumView, - TransformValueStream, ) __all__ = [ 'CoordinateMode', 'DetectorViewFactory', 'GeometricViewConfig', - 'InstrumentDetectorSource', 'LogicalViewConfig', 'NeXusDetectorSource', 'SpectrumView', - 'TransformValueStream', ] diff --git a/src/ess/livedata/handlers/detector_view/data_source.py b/src/ess/livedata/handlers/detector_view/data_source.py index 533eea5e8..31c64de54 100644 --- a/src/ess/livedata/handlers/detector_view/data_source.py +++ b/src/ess/livedata/handlers/detector_view/data_source.py @@ -11,16 +11,13 @@ from __future__ import annotations import pathlib -from typing import TYPE_CHECKING, Protocol +from typing import Protocol import sciline import scipp as sc from ess.reduce.nexus.types import EmptyDetector, Filename, NeXusName, SampleRun from scippnexus import NXdetector -if TYPE_CHECKING: - from ess.livedata.config.instrument import Instrument - def create_empty_detector(detector_number: sc.Variable) -> sc.DataArray: """ @@ -116,25 +113,3 @@ def configure_workflow(self, workflow: sciline.Pipeline, source_name: str) -> No workflow[EmptyDetector[SampleRun]] = create_empty_detector( self._detector_number ) - - -class InstrumentDetectorSource: - """ - Create EmptyDetector from an Instrument's configured detector_number. - - Use this for logical views where the detector_number is configured in the - Instrument and may differ for each source_name. This enables fast startup - without file I/O while supporting multiple detector sources. - - Parameters - ---------- - instrument: - The instrument configuration containing detector_number arrays. - """ - - def __init__(self, instrument: Instrument) -> None: - self._instrument = instrument - - def configure_workflow(self, workflow: sciline.Pipeline, source_name: str) -> None: - detector_number = self._instrument.get_detector_number(source_name) - workflow[EmptyDetector[SampleRun]] = create_empty_detector(detector_number) diff --git a/src/ess/livedata/handlers/detector_view/factory.py b/src/ess/livedata/handlers/detector_view/factory.py index 370630fc2..694a8ea76 100644 --- a/src/ess/livedata/handlers/detector_view/factory.py +++ b/src/ess/livedata/handlers/detector_view/factory.py @@ -9,19 +9,24 @@ from __future__ import annotations +from typing import TYPE_CHECKING + import scipp as sc -from ess.reduce.nexus.types import NeXusData, SampleRun +from ess.reduce.nexus.types import EmptyDetector, NeXusData, SampleRun from ess.reduce.unwrap import LookupTableFilename from ess.reduce.unwrap.types import LookupTableRelativeErrorThreshold from scippnexus import NXdetector +if TYPE_CHECKING: + from ess.livedata.config.instrument import Instrument as Instrument + from ..accumulators import make_no_copy_accumulator_pair # Import types unconditionally for runtime type hint resolution # (used by workflow_factory.attach_factory to inspect parameter types) from ..detector_view_specs import DetectorViewParams from ..stream_processor_workflow import StreamProcessorWorkflow -from .data_source import DetectorDataSource, DetectorNumberSource +from .data_source import DetectorDataSource, NeXusDetectorSource, create_empty_detector from .providers import spectrum_view from .types import ( AccumulatedHistogram, @@ -39,13 +44,10 @@ ROISpectra, SpectrumView, SpectrumViewTransform, - TransformValueLog, - TransformValueStream, UsePixelWeighting, ViewConfig, ) from .workflow import ( - add_dynamic_transform, add_geometric_projection, add_logical_projection, create_base_workflow, @@ -66,33 +68,35 @@ class DetectorViewFactory: Parameters ---------- - data_source: - Detector data source configuration. Use NeXusDetectorSource for - loading geometry from a file, or DetectorNumberSource for fast - file-less startup with logical views. view_config: View configuration. Can be a single config (applied to all sources) or a dict mapping source names to configs (for per-detector settings). - dynamic_transforms: - Optional mapping ``source_name -> TransformValueStream`` binding a - NeXus transformation entry of the detector's chain to the f144 - stream that supplies its live values. ``aux_stream`` is the - logical name of the auxiliary input delivering the NXlog - DataArray (must match the corresponding ``AuxSources`` entry). - ``transform_name`` is the entry of ``chain.transformations`` - whose ``.value`` will be replaced with the latest sample. + instrument: + Instrument whose ``dynamic_transforms`` registry is consulted when + constructing the workflow. When a binding's ``dependent_sources`` + includes the workflow's ``source_name``, + ``apply_dynamic_transforms`` patches the workflow at + :meth:`make_workflow` time. With no matching binding this is a + no-op. Also supplies the per-source ``detector_number`` when + ``data_source`` is not given. + data_source: + Optional detector data source. Use ``NeXusDetectorSource`` to load + full geometry from a file (required for wavelength mode). When + omitted, an EmptyDetector is built from + ``instrument.get_detector_number(source_name)``; useful for + logical views that only need the pixel structure. """ def __init__( self, *, - data_source: DetectorDataSource, view_config: ViewConfig | dict[str, ViewConfig], - dynamic_transforms: dict[str, TransformValueStream] | None = None, + instrument: Instrument, + data_source: DetectorDataSource | None = None, ) -> None: self._data_source = data_source self._view_config = view_config - self._dynamic_transforms = dynamic_transforms or {} + self._instrument = instrument def _get_config(self, source_name: str) -> ViewConfig: """Get the view config for a given source.""" @@ -131,10 +135,10 @@ def make_workflow( if mode == 'wavelength': if lookup_table_filename is None: raise ValueError(f"{mode} mode requires lookup_table_filename") - if isinstance(self._data_source, DetectorNumberSource): + if not isinstance(self._data_source, NeXusDetectorSource): raise ValueError( f"{mode} mode requires geometry for Ltotal computation; " - "use NeXusDetectorSource instead of DetectorNumberSource" + "pass data_source=NeXusDetectorSource(...)" ) # Get mode-specific event coordinate @@ -163,8 +167,15 @@ def make_workflow( workflow[LookupTableFilename] = lookup_table_filename workflow[LookupTableRelativeErrorThreshold] = {source_name: float('inf')} - # Configure detector data source (EmptyDetector) - self._data_source.configure_workflow(workflow, source_name) + # Configure detector data source (EmptyDetector). Without an explicit + # data_source, build EmptyDetector from the instrument's per-source + # detector_number — file-less, sufficient for logical views. + if self._data_source is None: + workflow[EmptyDetector[SampleRun]] = create_empty_detector( + self._instrument.get_detector_number(source_name) + ) + else: + self._data_source.configure_workflow(workflow, source_name) # Set pixel weighting configuration workflow[UsePixelWeighting] = use_pixel_weighting @@ -257,12 +268,15 @@ def bound_spectrum_transform( 'roi_spectra_current', ) - # Wire dynamic detector geometry (f144 NXlog stream) if configured for - # this source. - value_stream = self._dynamic_transforms.get(source_name) - if value_stream is not None: - add_dynamic_transform(workflow, transform_name=value_stream.transform_name) - context_keys[value_stream.aux_stream] = TransformValueLog + # Wire dynamic NeXus transforms (f144 NXlog streams) for any binding + # whose ``dependent_sources`` include this source. With no matching + # binding this is a no-op; with one, the patched chain provider + # raises later if the chain reaches an unpatched empty NXlog. + context_keys.update( + self._instrument.apply_dynamic_transforms( + workflow, {source_name: NXdetector} + ) + ) cumulative, window = make_no_copy_accumulator_pair() return StreamProcessorWorkflow( diff --git a/src/ess/livedata/handlers/detector_view/types.py b/src/ess/livedata/handlers/detector_view/types.py index ed7f32aa2..887f91d4b 100644 --- a/src/ess/livedata/handlers/detector_view/types.py +++ b/src/ess/livedata/handlers/detector_view/types.py @@ -299,53 +299,6 @@ class ROISpectra( """ -TransformName = NewType('TransformName', str) -"""Name of the NeXus transformation entry driven by a live value stream.""" - - -@dataclass(frozen=True, slots=True) -class TransformValue: - """Current scalar value of a named entry in a NeXus transformation chain. - - Carries the latest value (typically from an f144 position stream) for a - single named transformation in a detector's ``depends_on`` chain. The - units must match those baked into the NeXus file. - """ - - name: TransformName - value: sc.Variable - - -@dataclass(frozen=True, slots=True) -class TransformValueStream: - """ - Binds a NeXus transformation entry to the f144 stream that supplies its - live values at runtime. - - Parameters - ---------- - transform_name: - NeXus path of the transformation entry whose value is driven by the - live stream. - aux_stream: - Name of the auxiliary (f144) stream supplying the live values. - """ - - transform_name: str - aux_stream: str - - -TransformValueLog = NewType('TransformValueLog', sc.DataArray | None) -"""NXlog-shaped DataArray (timeseries) of f144 values for a transform. - -Set via ``set_context`` from the ToNXlog accumulator output. Carries a -``time`` coord; the provider extracts the latest sample. The alias -includes ``None`` because sciline initialises context-keyed parameters -to ``None`` before the first ``set_context`` call; consumers must -handle that. -""" - - ROIPolygonMasks = NewType('ROIPolygonMasks', dict) """Precomputed boolean masks for polygon ROIs. diff --git a/src/ess/livedata/handlers/detector_view/workflow.py b/src/ess/livedata/handlers/detector_view/workflow.py index be25e9418..e40341004 100644 --- a/src/ess/livedata/handlers/detector_view/workflow.py +++ b/src/ess/livedata/handlers/detector_view/workflow.py @@ -10,12 +10,10 @@ from __future__ import annotations from collections.abc import Callable -from copy import deepcopy from typing import Literal import sciline import scipp as sc -import scippnexus as snx from ess.reduce.live.raw import ( DetectorViewResolution, PositionNoiseReplicaCount, @@ -27,8 +25,7 @@ position_noise_for_cylindrical_pixel, position_with_noisy_replicas, ) -from ess.reduce.nexus.types import NeXusComponent, NeXusTransformationChain, SampleRun -from ess.reduce.nexus.workflow import get_transformation_chain +from ess.reduce.nexus.types import SampleRun from ess.reduce.unwrap import GenericUnwrapWorkflow from .projectors import make_geometric_projector, make_logical_projector @@ -59,66 +56,10 @@ LogicalTransform, ProjectionType, ReductionDim, - TransformName, - TransformValue, - TransformValueLog, UsePixelWeighting, ) -def get_transformation_chain_with_value( - detector: NeXusComponent[snx.NXdetector, SampleRun], - transform_value: TransformValue, -) -> NeXusTransformationChain[snx.NXdetector, SampleRun]: - """Inject a live value into one entry of the detector transformation chain. - - Replaces essreduce's ``get_transformation_chain`` so that a runtime - f144 stream value drives the detector position. The baked-in value - from the reference geometry file is intentionally never used: it may - be stale or invalid, and a wrong result is worse than no result. - """ - chain = get_transformation_chain(detector) - if transform_value.name not in chain.transformations: - raise KeyError( - f"Transformation entry {transform_value.name!r} not found in chain. " - f"Available entries: {sorted(chain.transformations.keys())}" - ) - # Copy so we don't leak changes back into the cached NeXusComponent. - chain = deepcopy(chain) - chain.transformations[transform_value.name].value = transform_value.value - return chain - - -def transform_value_from_log( - log: TransformValueLog, - name: TransformName, -) -> TransformValue: - """Build a TransformValue from the latest sample of an NXlog DataArray. - - The ``log`` arrives via ``set_context`` from the ``ToNXlog`` - accumulator. We extract the most recent value as a scalar - ``sc.Variable`` so the downstream ``to_transformation`` time-filter - branch is bypassed (see ``ess.reduce.nexus.workflow.to_transformation``). - - Before the first ``set_context`` call the parameter is ``None``; - after it, it is an NXlog that may still be empty if no f144 message - has arrived yet. Both cases raise, so the workflow reports "no value - yet" rather than silently falling back to the reference file's - baked-in value (which may be stale or invalid). - - Raises - ------ - ValueError - If the log is ``None`` or has not yet received any samples. - """ - if log is None or log.sizes.get('time', 0) == 0: - raise ValueError( - f"No samples yet for transformation {name!r}: f144 stream has not " - "produced a value." - ) - return TransformValue(name=name, value=log['time', -1].data) - - def create_base_workflow( *, bins: sc.Variable, @@ -193,33 +134,6 @@ def create_base_workflow( return workflow -def add_dynamic_transform( - workflow: sciline.Pipeline, - *, - transform_name: str, -) -> None: - """ - Patch the workflow to drive a NeXus transformation from a live f144 stream. - - Replaces essreduce's ``get_transformation_chain`` provider so that the - detector's transformation chain picks up the latest value from an - ``NXlog`` context stream. Only call this for sources that have a - dynamic geometry configured; otherwise the workflow uses the file's - baked-in transformation unchanged. - - Parameters - ---------- - workflow: - Sciline pipeline to configure. - transform_name: - Name of the entry inside the detector's ``depends_on`` chain whose - value is driven by the f144 stream. - """ - workflow.insert(get_transformation_chain_with_value) - workflow.insert(transform_value_from_log) - workflow[TransformName] = TransformName(transform_name) - - def add_geometric_projection( workflow: sciline.Pipeline, *, diff --git a/src/ess/livedata/handlers/detector_view_specs.py b/src/ess/livedata/handlers/detector_view_specs.py index 1ef3b7d90..fb49ee52a 100644 --- a/src/ess/livedata/handlers/detector_view_specs.py +++ b/src/ess/livedata/handlers/detector_view_specs.py @@ -15,10 +15,7 @@ from collections.abc import Callable from dataclasses import dataclass -from typing import TYPE_CHECKING, Literal - -if TYPE_CHECKING: - from .detector_view.types import TransformValueStream +from typing import Literal import pydantic import scipp as sc @@ -28,7 +25,6 @@ from ..config.instrument import Instrument from ..config.workflow_spec import ( DETECTORS, - AuxInput, AuxSources, JobId, WorkflowOutputsBase, @@ -417,37 +413,22 @@ class DetectorROIAuxSources(AuxSources): The render() method prefixes ROI stream names with the job_id to create job-specific ROI configuration streams, since each job instance needs its own ROIs. - - Optionally also advertises one or more global f144 streams that drive - runtime-dynamic NeXus transformation values for specific source_names. - These streams are physical properties of the instrument (not job- - specific), so they are rendered un-prefixed and only routed to the jobs - whose source_name actually consumes them. """ - def __init__( - self, - dynamic_transforms: dict[str, TransformValueStream] | None = None, - ) -> None: - self._dynamic_transforms = dynamic_transforms or {} - inputs: dict[str, str | AuxInput] = { - 'roi_rectangle': 'roi_rectangle', - 'roi_polygon': 'roi_polygon', - } - # Advertise each unique global aux stream so the dashboard schema - # and spec validation know it exists. Routing is per-source via - # render(). - for binding in self._dynamic_transforms.values(): - inputs.setdefault(binding.aux_stream, binding.aux_stream) - super().__init__(inputs) + def __init__(self) -> None: + super().__init__( + { + 'roi_rectangle': 'roi_rectangle', + 'roi_polygon': 'roi_polygon', + } + ) def render( self, job_id: JobId, selections: dict[str, str] | None = None, ) -> dict[str, str]: - """Render ROI stream names with job-specific prefix, plus any - source-specific global aux streams. + """Render ROI stream names with job-specific prefix. Parameters ---------- @@ -460,18 +441,12 @@ def render( ------- : Mapping from ROI geometry keys to job-specific stream names - (e.g., ``'{source_name}/{job_number}/roi_rectangle'``), plus - any global aux streams bound to this source's NeXus transforms, - rendered un-prefixed. + (e.g., ``'{source_name}/{job_number}/roi_rectangle'``). """ - rendered: dict[str, str] = { + return { 'roi_rectangle': f"{job_id}/roi_rectangle", 'roi_polygon': f"{job_id}/roi_polygon", } - binding = self._dynamic_transforms.get(job_id.source_name) - if binding is not None: - rendered[binding.aux_stream] = binding.aux_stream - return rendered ProjectionType = Literal["xy_plane", "cylinder_mantle_z"] @@ -579,6 +554,9 @@ def register_detector_view_spec( title=title, description=description, source_names=source_names, + # Dynamic-transform aux sources are merged centrally by + # Instrument.register_spec; callers pass only their spec-specific + # auxes here. aux_sources=aux_sources if aux_sources is not None else DetectorROIAuxSources(), params=make_detector_view_params(spectrum_view=spectrum_view), outputs=make_detector_view_outputs( diff --git a/src/ess/livedata/handlers/dynamic_transforms.py b/src/ess/livedata/handlers/dynamic_transforms.py new file mode 100644 index 000000000..2cdfcf6eb --- /dev/null +++ b/src/ess/livedata/handlers/dynamic_transforms.py @@ -0,0 +1,97 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +"""Drive NeXus ``depends_on`` chains from live f144 streams. + +The geometry artifact represents dynamic geometry as length-0 NXlog +placeholders along ``depends_on`` chains. Workflows that load components +walking through such a placeholder must replace its (empty) value with +the latest sample from a live f144 stream — otherwise essreduce's +``reject_time_dependent_transform`` filter raises at workflow +construction time. + +Per-instrument bindings are declared as :class:`DynamicTransformBinding` +on :class:`Instrument`. The method :meth:`Instrument.apply_dynamic_transforms` +replaces essreduce's ``NeXusTransformationChain[T, SampleRun]`` provider +with a synthesised one that consumes the matching :class:`ValueLog` +subclasses and patches the chain in place. + +The generic f144-NXlog → Sciline parameter machinery lives in +:mod:`.log_context` and :mod:`.stream_processor_workflow`; this module +is the transform-specific consumer of those primitives. +""" + +from __future__ import annotations + +from copy import deepcopy +from dataclasses import dataclass +from typing import Any + +from ess.reduce.nexus.types import ( + NeXusComponent, + NeXusTransformationChain, + SampleRun, +) +from ess.reduce.nexus.workflow import get_transformation_chain + +from .log_context import LogContextBinding +from .stream_processor_workflow import ValueLog, synthesise_provider + + +@dataclass(frozen=True, slots=True, kw_only=True) +class DynamicTransformBinding(LogContextBinding): + """Binds an NXlog placeholder in a ``depends_on`` chain to an f144 stream. + + Parameters + ---------- + nxlog_path: + Absolute NeXus path of the placeholder NXlog node along a + ``depends_on`` chain (e.g. ``/entry/instrument/detector_carriage/value``). + """ + + nxlog_path: str + + +def build_patched_chain_provider( + component_type: type, matched: list[DynamicTransformBinding] +) -> Any: + """Build a per-component-type provider that patches NXlog placeholders. + + The returned provider replaces essreduce's ``get_transformation_chain`` + specialised to ``component_type``: it consumes + ``NeXusComponent[component_type, SampleRun]`` plus one parameter per + matched binding (annotated with that binding's ``log_key``), and produces + ``NeXusTransformationChain[component_type, SampleRun]``. It cannot + instead consume the chain as input — that would self-cycle on its own + return type. + """ + bindings_local = list(matched) + + def _impl(component: Any, *containers: ValueLog | None) -> Any: + chain = get_transformation_chain(component) + patched = deepcopy(chain) + for binding, container in zip(bindings_local, containers, strict=True): + if binding.nxlog_path not in patched.transformations: + continue + if ( + container is None + or container.values is None + or container.values.sizes.get('time', 0) == 0 + ): + raise ValueError( + f"No samples yet for {binding.stream_name!r} " + f"(transform {binding.nxlog_path!r})" + ) + log = container.values + patched.transformations[binding.nxlog_path].value = log['time', -1].data + return patched + + annotations: dict[str, Any] = { + 'component': NeXusComponent[component_type, SampleRun], + **{f'log_{i}': b.log_key for i, b in enumerate(matched)}, + 'return': NeXusTransformationChain[component_type, SampleRun], + } + return synthesise_provider( + name=f'_patched_chain__{component_type.__name__}', + impl=_impl, + annotations=annotations, + ) diff --git a/src/ess/livedata/handlers/log_context.py b/src/ess/livedata/handlers/log_context.py new file mode 100644 index 000000000..7d6073e16 --- /dev/null +++ b/src/ess/livedata/handlers/log_context.py @@ -0,0 +1,124 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +"""Generic plumbing for f144 NXlog → Sciline parameter delivery. + +The flow is: + +- A live f144 stream is consumed off Kafka, adapted to a :class:`LogData` + message, accumulated by ``ToNXlog`` into a cumulative ``DataArray`` with + a ``time`` coord. +- :class:`~ess.livedata.handlers.stream_processor_workflow.StreamProcessorWorkflow` + receives that cumulative payload as a ``context_keys`` entry and (if the + Sciline key is a :class:`ValueLog` subclass) wraps it via ``key(values=raw)`` + before delegating to ``set_context``. +- Per-binding :class:`ValueLog` subclasses give the cumulative log a + distinct Sciline node identity, so multiple bindings can coexist without + colliding on a shared parameter key. + +:class:`LogContextBinding` declares the binding at instrument level, and +:func:`compose_aux_sources` derives the spec's ``AuxSources`` from the +relevant bindings — scoped per-job by ``source_name``. + +Specific binding *kinds* (e.g. :class:`DynamicTransformBinding`) live in +their own modules and patch their own providers; the routing layer here +is type-agnostic. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from ess.livedata.config.workflow_spec import ( + AuxSources, + CombinedAuxSources, + JobId, +) + +from .stream_processor_workflow import ValueLog + +if TYPE_CHECKING: + from ess.livedata.config.instrument import Instrument + + +@dataclass(frozen=True, slots=True, kw_only=True) +class LogContextBinding: + """Declares "f144 stream X feeds Sciline parameter K, scoped to sources S". + + Parameters + ---------- + stream_name: + Name of the f144 aux stream that supplies live samples. Must + appear in :attr:`Instrument.f144_attribute_registry` (auto-derived + from the binding when ``units`` is set). + log_key: + :class:`ValueLog` subclass used as the Sciline key for this + binding. Each binding declares its own subclass so the per-binding + log appears as a distinct, grep-able Sciline node. + dependent_sources: + Source names whose specs should subscribe to this stream. A spec + whose ``source_names`` intersects this set picks up the binding's + stream as an aux input; at render time, only jobs whose + ``source_name`` is in this set receive it. + units: + Optional unit string. When provided, :class:`Instrument` auto-adds + ``{stream_name: {'units': units}}`` to its f144 attribute registry, + so per-instrument adoption need not splice the registry by hand. + """ + + stream_name: str + log_key: type[ValueLog] + dependent_sources: frozenset[str] + units: str | None = None + + +class LogContextAuxSources(AuxSources): + """Aux sources covering an instrument's log-context bindings. + + Inputs include every binding whose ``dependent_sources`` set intersects + the spec's ``source_names``. ``render`` returns only the streams whose + binding includes ``job_id.source_name`` in its ``dependent_sources``, + rendered un-prefixed (these are global f144 streams shared across jobs). + """ + + def __init__(self, instrument: Instrument, source_names: list[str]) -> None: + self._instrument = instrument + selected = set(source_names) + inputs: dict[str, str] = { + b.stream_name: b.stream_name + for b in instrument.log_context_bindings + if b.dependent_sources & selected + } + super().__init__(dict(inputs)) + + def render( + self, + job_id: JobId, + selections: dict[str, str] | None = None, + ) -> dict[str, str]: + return { + b.stream_name: b.stream_name + for b in self._instrument.log_context_bindings + if job_id.source_name in b.dependent_sources + } + + +def compose_aux_sources( + instrument: Instrument, + source_names: list[str], + caller_aux: AuxSources | None, +) -> AuxSources | None: + """Merge caller-supplied aux sources with auto-derived log-context aux + sources for the given source set.""" + components: list[AuxSources] = [] + if caller_aux is not None: + components.append(caller_aux) + if instrument.log_context_bindings: + dyn = LogContextAuxSources(instrument, source_names) + if dyn.inputs: + components.append(dyn) + if not components: + return None + if len(components) == 1: + return components[0] + return CombinedAuxSources(components) diff --git a/src/ess/livedata/handlers/monitor_workflow.py b/src/ess/livedata/handlers/monitor_workflow.py index adb8d6201..6ef39a6c4 100644 --- a/src/ess/livedata/handlers/monitor_workflow.py +++ b/src/ess/livedata/handlers/monitor_workflow.py @@ -4,7 +4,7 @@ from __future__ import annotations -from typing import Literal +from typing import TYPE_CHECKING, Literal import sciline import scipp as sc @@ -31,6 +31,9 @@ WindowMonitorHistogram, ) +if TYPE_CHECKING: + from ess.livedata.config.instrument import Instrument + def _histogram_monitor( data: sc.DataArray, edges: sc.Variable, event_coord: str @@ -169,6 +172,7 @@ def create_monitor_workflow( source_name: str, edges: sc.Variable, *, + instrument: Instrument, range_filter: tuple[sc.Variable, sc.Variable] | None = None, coordinate_mode: Literal['toa', 'wavelength'] = 'toa', geometry_filename: str | None = None, @@ -185,6 +189,14 @@ def create_monitor_workflow( and as the NeXus component name when loading geometry from file. edges: Bin edges for histogramming (TOA or wavelength edges depending on mode). + instrument: + Instrument whose ``dynamic_transforms`` registry is consulted. When a + binding's ``dependent_sources`` includes ``source_name``, + ``apply_dynamic_transforms`` patches the workflow's + ``NeXusTransformationChain[NXmonitor, SampleRun]`` provider so the + monitor's ``depends_on`` chain is driven by live f144 streams; the + resulting Sciline context keys are merged into ``context_keys``. With + no matching binding the call is a no-op. range_filter: Optional (low, high) range for ratemeter counts. coordinate_mode: @@ -236,6 +248,11 @@ def create_monitor_workflow( workflow[LookupTableFilename] = lookup_table_filename workflow[LookupTableRelativeErrorThreshold] = {source_name: float('inf')} + merged_context_keys: dict[str, type] = dict(context_keys) if context_keys else {} + merged_context_keys.update( + instrument.apply_dynamic_transforms(workflow, {source_name: NXmonitor}) + ) + # Only accumulate CumulativeMonitorHistogram and WindowMonitorHistogram. # MonitorCountsTotal and MonitorCountsInRange are computed from # WindowMonitorHistogram during finalize, not accumulated separately. @@ -247,7 +264,7 @@ def create_monitor_workflow( # For wavelength mode, GenericUnwrapWorkflow providers convert RawMonitor to # WavelengthMonitor. dynamic_keys={source_name: NeXusData[NXmonitor, SampleRun]}, - context_keys=context_keys, + context_keys=merged_context_keys, target_keys={ 'cumulative': CumulativeMonitorHistogram, 'current': WindowMonitorHistogram, diff --git a/src/ess/livedata/handlers/monitor_workflow_specs.py b/src/ess/livedata/handlers/monitor_workflow_specs.py index 33482b548..8581afd68 100644 --- a/src/ess/livedata/handlers/monitor_workflow_specs.py +++ b/src/ess/livedata/handlers/monitor_workflow_specs.py @@ -241,13 +241,20 @@ def register_monitor_workflow_specs( title="Beam monitor", description=description, source_names=source_names, + # Dynamic-transform aux sources are merged centrally by + # Instrument.register_spec; callers pass only their spec-specific + # auxes here. aux_sources=aux_sources, params=params, outputs=MonitorHistogramOutputs, ) -def create_monitor_workflow_factory(source_name: str, params: MonitorDataParams): +def create_monitor_workflow_factory( + source_name: str, + params: MonitorDataParams, + instrument: Instrument, +): """ Factory function for monitor workflow from MonitorDataParams. @@ -264,4 +271,5 @@ def create_monitor_workflow_factory(source_name: str, params: MonitorDataParams) edges=params.get_active_edges(), range_filter=params.get_active_range(), coordinate_mode=mode, + instrument=instrument, ) diff --git a/src/ess/livedata/handlers/stream_processor_workflow.py b/src/ess/livedata/handlers/stream_processor_workflow.py index 8b6f35dae..c0ca2a812 100644 --- a/src/ess/livedata/handlers/stream_processor_workflow.py +++ b/src/ess/livedata/handlers/stream_processor_workflow.py @@ -4,7 +4,8 @@ from __future__ import annotations -from collections.abc import Iterable +from collections.abc import Callable, Iterable +from dataclasses import dataclass from typing import TYPE_CHECKING, Any if TYPE_CHECKING: @@ -12,6 +13,7 @@ import sciline import sciline.typing +import scipp as sc from ess.reduce import streaming from ess.livedata.core.timestamp import Timestamp @@ -19,6 +21,60 @@ from .workflow_factory import Workflow +@dataclass(frozen=True, slots=True) +class ValueLog: + """Typed Sciline-key wrapper around a cumulative ``ToNXlog`` payload. + + Subclass to create a distinct Sciline parameter per stream. The class + is the typed wrapper for an NXlog's ``value``-over-``time`` payload: + ``values`` carries the cumulative timeseries (a ``DataArray`` with a + ``time`` coord). + + ``values`` is ``None`` before the first ``set_context`` call — + ``ess.reduce.streaming.StreamProcessor`` pre-sets every context key to + ``None`` — otherwise it is the NXlog produced by ``ToNXlog``, possibly + still empty if no f144 message has arrived yet. + + :class:`StreamProcessorWorkflow` detects subclasses of this type among + its ``context_keys`` values and wraps the raw NXlog as + ``key(values=raw)`` before delegating to ``set_context``. + """ + + values: sc.DataArray | None = None + + +def synthesise_provider( + name: str, + impl: Callable[..., Any], + annotations: dict[str, Any], +) -> Any: + """Synthesise a Sciline provider with explicit named positional parameters. + + Returns a function ``name(p1, p2, ...)`` whose ``__annotations__`` come + from ``annotations`` (with ``'return'`` consumed as the return type) and + whose body delegates to ``impl(p1, p2, ...)``. + + Why code synthesis is unavoidable: Sciline introspects providers via + ``inspect.getfullargspec()``, which reads the underlying ``__code__`` + object and ignores ``__signature__``. ``__annotations__`` only attaches + types to parameters that already exist in the code object — it cannot + invent them. Producing N named typed positional parameters therefore + requires building a real function via ``exec``/``compile``. This is the + same technique ``dataclasses``, ``attrs``, ``pydantic``, and + ``collections.namedtuple`` use to generate ``__init__`` / + ``__new__``. Callers are responsible for constructing safe parameter + names — no external string should reach the template. + """ + params = [n for n in annotations if n != 'return'] + arg_list = ', '.join(params) + src = f"def {name}({arg_list}):\n return _impl({arg_list})\n" + ns: dict[str, Any] = {'_impl': impl} + exec(src, ns) # noqa: S102 + fn = ns[name] + fn.__annotations__ = dict(annotations) + return fn + + class StreamProcessorWorkflow(Workflow): """ Wrapper around ess.reduce.streaming.StreamProcessor to match the Workflow protocol. @@ -95,11 +151,22 @@ def accumulate( # will fail. See aux_sources / render() in workflow_spec.py for how # the routing layer ensures only jobs that subscribed to a stream # receive its data. - context = { - sciline_key: data[key] - for key, sciline_key in self._context_keys.items() - if key in data - } + # + # Wrapping rule: if a Sciline key is a ValueLog subclass, the raw + # NXlog payload is wrapped via ``key(values=raw)`` before set_context, + # so consumers (e.g. patched chain providers) see a typed value. The + # ``isinstance(sciline_key, type)`` guard is non-negotiable: NewType + # instances and parameterised generics are not classes and issubclass + # would raise TypeError on them. + context = {} + for key, sciline_key in self._context_keys.items(): + if key not in data: + continue + raw = data[key] + if isinstance(sciline_key, type) and issubclass(sciline_key, ValueLog): + context[sciline_key] = sciline_key(values=raw) + else: + context[sciline_key] = raw dynamic = { sciline_key: data[key] for key, sciline_key in self._dynamic_keys.items() diff --git a/tests/config/dynamic_transforms_registry_test.py b/tests/config/dynamic_transforms_registry_test.py new file mode 100644 index 000000000..81984e255 --- /dev/null +++ b/tests/config/dynamic_transforms_registry_test.py @@ -0,0 +1,167 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +"""Validate each instrument's dynamic-transform bindings against the +currently-registered geometry artifact. + +For every :class:`DynamicTransformBinding` declared on an instrument, +walks the depends_on chain of every declared consumer in the artifact and +confirms the binding's ``nxlog_path`` appears on every chain. Catches +typos and orphaned bindings before runtime. + +Also rejects duplicate ``log_key``s — Sciline collapses two parameters +of the same key, silently merging two bindings. +""" + +from __future__ import annotations + +import pytest +import scippnexus as snx +from scippnexus.field import DependsOn +from scippnexus.nxtransformations import TransformationChain, parse_depends_on_chain + +from ess.livedata.config.instrument import instrument_registry +from ess.livedata.config.instruments import available_instruments, get_config +from ess.livedata.handlers.detector_data_handler import get_nexus_geometry_filename +from ess.livedata.handlers.dynamic_transforms import DynamicTransformBinding + + +def _transform_bindings(instrument) -> list[DynamicTransformBinding]: + return [ + b + for b in instrument.log_context_bindings + if isinstance(b, DynamicTransformBinding) + ] + + +def _load_chain(artifact: str, source_name: str) -> TransformationChain | None: + """Walk a source's depends_on chain via scippnexus, without loading the + full component group. Returns ``None`` for static components with no + ``depends_on`` field.""" + parent_path = f'/entry/instrument/{source_name}' + with snx.File(artifact, 'r') as f: + comp = f[parent_path] + try: + depends_on = comp['depends_on'][()] + except KeyError: + return None + if not isinstance(depends_on, DependsOn): + depends_on = DependsOn(parent=parent_path, value=depends_on) + return parse_depends_on_chain(comp, depends_on) + + +def _chain_paths(artifact: str, source_name: str) -> list[str]: + chain = _load_chain(artifact, source_name) + return list(chain.transformations) if chain is not None else [] + + +def _empty_nxlog(artifact: str, source_name: str) -> str | None: + """First path along the chain whose value is a length-0 NXlog, or None.""" + chain = _load_chain(artifact, source_name) + if chain is None: + return None + for path, t in chain.transformations.items(): + sizes = getattr(t.value, 'sizes', None) + if sizes is not None and sizes.get('time', None) == 0: + return path + return None + + +def _instruments_with_dynamic_transforms() -> list[str]: + cases = [] + for name in available_instruments(): + get_config(name) + inst = instrument_registry[name] + if _transform_bindings(inst): + cases.append(name) + return cases + + +@pytest.fixture(scope='module', params=_instruments_with_dynamic_transforms()) +def instrument(request): + name = request.param + get_config(name) + return instrument_registry[name] + + +def test_registry_log_keys_are_unique(instrument) -> None: + keys = [b.log_key for b in _transform_bindings(instrument)] + assert len(keys) == len(set(keys)), ( + f"Duplicate log_key in {instrument.name}.log_context_bindings: {keys}" + ) + + +def test_registry_paths_match_artifact(instrument) -> None: + artifact = str(get_nexus_geometry_filename(instrument.name)) + for binding in _transform_bindings(instrument): + for source_name in binding.dependent_sources: + chain = _chain_paths(artifact, source_name) + assert binding.nxlog_path in chain, ( + f"Binding {binding.stream_name!r} declares nxlog_path " + f"{binding.nxlog_path!r} in consumers of {source_name!r}, " + f"but it does not appear on the depends_on chain " + f"resolved from the artifact ({artifact}). " + f"Walked: {chain}" + ) + + +# Known orphan placeholders that have not yet been bound. Each entry is +# (instrument_name, source_name) -> nxlog_path. This is a deliberate +# ledger: keep the test strict (no orphans) but record the ones we are +# consciously leaving for a follow-up. Removing an entry is a checklist +# item for the follow-up PR. +_KNOWN_ORPHAN_NXLOGS: dict[tuple[str, str], str] = { + # See loki/specs.py: m4 trans_20 needs either a make_geometry_nexus.py + # change to share the carriage NXlog, or a separate f144 stream + # registration. Tracked as follow-up to issue #922. + ( + 'loki', + 'beam_monitor_m4', + ): '/entry/instrument/beam_monitor_m4/transformations/trans_20', +} + + +def test_no_orphan_empty_nxlogs(instrument) -> None: + """Every empty NXlog reachable from any source on a registered spec + must be covered by a binding (or in the known-orphan ledger). + Otherwise, workflows loading that source will trip essreduce's + ``reject_time_dependent_transform`` at compute time.""" + artifact = str(get_nexus_geometry_filename(instrument.name)) + covered = {b.nxlog_path for b in _transform_bindings(instrument)} + sources = list(instrument.detector_names) + list(instrument.monitors) + for source_name in sources: + empty = _empty_nxlog(artifact, source_name) + if empty is None or empty in covered: + continue + known = _KNOWN_ORPHAN_NXLOGS.get((instrument.name, source_name)) + if known == empty: + continue + pytest.fail( + f"Source {source_name!r} has an empty NXlog placeholder at " + f"{empty!r} not covered by any binding. Add a " + f"DynamicTransformBinding to " + f"{instrument.name}.log_context_bindings or fix the geometry " + f"artifact (or list it in _KNOWN_ORPHAN_NXLOGS with a follow-up " + f"reference)." + ) + + +def test_consumers_subset_of_registered_sources(instrument) -> None: + """Each consumer must be a registered source on the instrument.""" + valid = set(instrument.detector_names) | set(instrument.monitors) + for binding in _transform_bindings(instrument): + unknown = binding.dependent_sources - valid + assert not unknown, ( + f"Binding {binding.stream_name!r} declares unknown consumers " + f"{unknown}; valid sources: {sorted(valid)}" + ) + + +def test_stream_in_f144_attribute_registry(instrument) -> None: + """The binding's ``stream_name`` must be a registered f144 stream so + the routing layer subscribes to it.""" + for binding in _transform_bindings(instrument): + assert binding.stream_name in instrument.f144_attribute_registry, ( + f"Binding {binding.stream_name!r} is not declared in " + f"{instrument.name}.f144_attribute_registry; the routing layer " + f"will not deliver f144 messages to the workflow." + ) diff --git a/tests/handlers/detector_view/factory_test.py b/tests/handlers/detector_view/factory_test.py index 644671811..b2b6547be 100644 --- a/tests/handlers/detector_view/factory_test.py +++ b/tests/handlers/detector_view/factory_test.py @@ -2,8 +2,10 @@ # Copyright (c) 2025 Scipp contributors (https://github.com/scipp) """Tests for DetectorViewScilineFactory.""" +import pytest import scipp as sc +from ess.livedata.config.instrument import Instrument from ess.livedata.handlers.detector_view.data_source import DetectorNumberSource from ess.livedata.handlers.detector_view.factory import DetectorViewFactory from ess.livedata.handlers.detector_view.types import ( @@ -14,10 +16,17 @@ from .utils import make_fake_detector_number +@pytest.fixture +def instrument(): + return Instrument(name='_test') + + class TestDetectorViewScilineFactory: """Tests for DetectorViewScilineFactory.""" - def test_factory_initialization_with_logical_and_geometric_configs(self): + def test_factory_initialization_with_logical_and_geometric_configs( + self, instrument + ): """Test factory initialization with logical and geometric view configs.""" detector_number = make_fake_detector_number(4, 4) @@ -28,6 +37,7 @@ def transform(da: sc.DataArray, source_name: str) -> sc.DataArray: logical_factory = DetectorViewFactory( data_source=DetectorNumberSource(detector_number), view_config=LogicalViewConfig(transform=transform), + instrument=instrument, ) assert logical_factory is not None assert isinstance(logical_factory._view_config, LogicalViewConfig) @@ -40,12 +50,13 @@ def transform(da: sc.DataArray, source_name: str) -> sc.DataArray: projection_type='xy_plane', resolution={'x': 100, 'y': 100}, ), + instrument=instrument, ) assert geometric_factory is not None assert isinstance(geometric_factory._view_config, GeometricViewConfig) assert geometric_factory._view_config.projection_type == 'xy_plane' - def test_factory_initialization_with_per_source_configs(self): + def test_factory_initialization_with_per_source_configs(self, instrument): """Test that factory can be initialized with per-source configs.""" detector_number = make_fake_detector_number(4, 4) @@ -61,6 +72,7 @@ def transform(da: sc.DataArray, source_name: str) -> sc.DataArray: resolution={'x': 100, 'y': 100}, ), }, + instrument=instrument, ) assert factory is not None diff --git a/tests/handlers/detector_view/spectrum_view_test.py b/tests/handlers/detector_view/spectrum_view_test.py index 61099d08d..3adf4da54 100644 --- a/tests/handlers/detector_view/spectrum_view_test.py +++ b/tests/handlers/detector_view/spectrum_view_test.py @@ -37,6 +37,8 @@ def _rebin_x_transform(histogram: sc.DataArray, params: _RebinParams) -> sc.Data def _make_factory_with_spectrum( spec: SpectrumViewSpec, *, y_size: int = 4, x_size: int = 4 ) -> DetectorViewFactory: + from ess.livedata.config.instrument import Instrument + detector_number = make_fake_detector_number(y_size, x_size) def logical_transform(da: sc.DataArray, source_name: str) -> sc.DataArray: @@ -45,6 +47,7 @@ def logical_transform(da: sc.DataArray, source_name: str) -> sc.DataArray: return DetectorViewFactory( data_source=DetectorNumberSource(detector_number), view_config=LogicalViewConfig(transform=logical_transform, spectrum_view=spec), + instrument=Instrument(name='_test'), ) @@ -100,6 +103,8 @@ def test_spectrum_view_rebin_factor_applied(self): def test_no_spectrum_view_when_spec_absent(self): """Without a SpectrumViewSpec, spectrum_view is not in the outputs.""" + from ess.livedata.config.instrument import Instrument + detector_number = make_fake_detector_number(4, 4) def logical_transform(da: sc.DataArray, source_name: str) -> sc.DataArray: @@ -108,6 +113,7 @@ def logical_transform(da: sc.DataArray, source_name: str) -> sc.DataArray: factory = DetectorViewFactory( data_source=DetectorNumberSource(detector_number), view_config=LogicalViewConfig(transform=logical_transform), + instrument=Instrument(name='_test'), ) params = make_detector_view_params()() workflow = factory.make_workflow('detector', params=params) diff --git a/tests/handlers/detector_view/transform_value_test.py b/tests/handlers/detector_view/transform_value_test.py deleted file mode 100644 index f26388f9e..000000000 --- a/tests/handlers/detector_view/transform_value_test.py +++ /dev/null @@ -1,111 +0,0 @@ -# SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) -"""Unit tests for the dynamic detector geometry providers.""" - -from __future__ import annotations - -import pytest -import scipp as sc -import scippnexus as snx -from ess.reduce.nexus.types import NeXusComponent, SampleRun - -from ess.livedata.handlers.detector_view.types import ( - TransformName, - TransformValue, - TransformValueLog, -) -from ess.livedata.handlers.detector_view.workflow import ( - get_transformation_chain_with_value, - transform_value_from_log, -) - - -def _make_chain() -> snx.TransformationChain: - """Build a minimal TransformationChain with two named transformation entries.""" - chain = snx.TransformationChain(parent='/det', value='transformations/carriage') - chain.transformations = sc.DataGroup() - - # Each entry is a snx.Transformation with a mutable .value attribute. - # We bypass full construction and use a tiny stub that mimics the - # interface relied upon by get_transformation_chain_with_value. - class _Transform: - def __init__(self, v: sc.Variable) -> None: - self.value = v - - chain.transformations['carriage'] = _Transform(sc.scalar(1.0, unit='mm')) - chain.transformations['other'] = _Transform(sc.scalar(7.0, unit='mm')) - return chain - - -def _make_log(values: list[float], unit: str = 'mm') -> sc.DataArray: - times = sc.array( - dims=['time'], values=list(range(len(values))), unit='ns', dtype='int64' - ) - return sc.DataArray( - sc.array(dims=['time'], values=values, unit=unit), - coords={'time': sc.epoch(unit='ns') + times}, - ) - - -class TestTransformValueFromLog: - def test_raises_when_log_is_none(self): - with pytest.raises(ValueError, match='No samples yet'): - transform_value_from_log(None, TransformName('detector_carriage')) - - def test_raises_when_log_has_no_samples(self): - empty = sc.DataArray( - sc.array(dims=['time'], values=[], unit='mm'), - coords={ - 'time': sc.epoch(unit='ns') - + sc.array(dims=['time'], values=[], unit='ns', dtype='int64') - }, - ) - log = TransformValueLog(empty) - with pytest.raises(ValueError, match='No samples yet'): - transform_value_from_log(log, TransformName('detector_carriage')) - - def test_picks_latest_value(self): - log = TransformValueLog(_make_log([1.0, 2.0, 7.5])) - tv = transform_value_from_log(log, TransformName('detector_carriage')) - assert tv.name == 'detector_carriage' - assert tv.value.value == 7.5 - assert tv.value.unit == sc.Unit('mm') - # Scalar (no time dim) so to_transformation's filter branch is bypassed. - assert tv.value.sizes == {} - - def test_units_propagate(self): - log = TransformValueLog(_make_log([4.2], unit='m')) - tv = transform_value_from_log(log, TransformName('detector_carriage')) - assert tv.value.unit == sc.Unit('m') - - -class TestChainInjection: - def _component(self, chain: snx.TransformationChain) -> NeXusComponent: - # NeXusComponent is a sciline.Scope wrapper around sc.DataGroup; for - # the purpose of get_transformation_chain_with_value we only need - # something that supports __getitem__('depends_on'). - return NeXusComponent[snx.NXdetector, SampleRun]( - sc.DataGroup({'depends_on': chain}) - ) - - def test_raises_when_name_not_in_chain(self): - chain = _make_chain() - comp = self._component(chain) - with pytest.raises(KeyError, match='missing'): - get_transformation_chain_with_value( - comp, - TransformValue(name='missing', value=sc.scalar(1.0, unit='mm')), - ) - - def test_replaces_only_named_entry(self): - chain = _make_chain() - comp = self._component(chain) - new_value = sc.scalar(42.0, unit='mm') - out = get_transformation_chain_with_value( - comp, - TransformValue(name='carriage', value=new_value), - ) - assert out.transformations['carriage'].value.value == 42.0 - assert out.transformations['other'].value.value == 7.0 - # Original chain is untouched (deepcopy in provider). - assert chain.transformations['carriage'].value.value == 1.0 diff --git a/tests/handlers/detector_view/utils.py b/tests/handlers/detector_view/utils.py index 630e3a28c..c8472c485 100644 --- a/tests/handlers/detector_view/utils.py +++ b/tests/handlers/detector_view/utils.py @@ -116,6 +116,8 @@ def make_test_factory(y_size: int = 4, x_size: int = 4) -> DetectorViewFactory: Uses DetectorNumberSource for fast, file-less workflow creation. """ + from ess.livedata.config.instrument import Instrument + detector_number = make_fake_detector_number(y_size, x_size) def logical_transform(da: sc.DataArray, source_name: str) -> sc.DataArray: @@ -124,6 +126,7 @@ def logical_transform(da: sc.DataArray, source_name: str) -> sc.DataArray: return DetectorViewFactory( data_source=DetectorNumberSource(detector_number), view_config=LogicalViewConfig(transform=logical_transform), + instrument=Instrument(name='_test'), ) diff --git a/tests/handlers/dynamic_transforms_test.py b/tests/handlers/dynamic_transforms_test.py new file mode 100644 index 000000000..cfdda6d3c --- /dev/null +++ b/tests/handlers/dynamic_transforms_test.py @@ -0,0 +1,310 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) +"""Tests for the centralised dynamic-transform machinery.""" + +from __future__ import annotations + +import h5py +import numpy as np +import pytest +import sciline +import scipp as sc +from ess.reduce.nexus.types import ( + Filename, + NeXusName, + NeXusTransformationChain, + SampleRun, +) +from scippnexus import NXdetector + +from ess.livedata.config import Instrument +from ess.livedata.config.workflow_spec import JobId +from ess.livedata.handlers.dynamic_transforms import DynamicTransformBinding +from ess.livedata.handlers.log_context import compose_aux_sources +from ess.livedata.handlers.stream_processor_workflow import ValueLog + +# --- Test fixtures: minimal artifact builders --- + + +def _write_chain( + f: h5py.File, parent: str, name: str, depends_on: str, value: float +) -> None: + """Write a transformation Dataset with a depends_on attribute.""" + ds = f[parent].create_dataset(name, data=value) + ds.attrs['depends_on'] = depends_on + ds.attrs['transformation_type'] = 'translation' + ds.attrs['vector'] = [0.0, 0.0, 1.0] + ds.attrs['units'] = 'm' + + +def _write_nxlog( + f: h5py.File, + parent: str, + name: str, + *, + depends_on: str, + samples: int = 0, + units: str = 'mm', +) -> None: + """Write an NXlog group placeholder mirroring the production schema. + + Includes the ``average_value`` / ``minimum_value`` / ``maximum_value`` + scalar children so scippnexus loads the group as a DataArray (with + those scalar coords) rather than falling back to a bare Variable — + matches the shape produced by ``make_geometry_nexus.py``. + """ + g = f[parent].create_group(name) + g.attrs['NX_class'] = 'NXlog' + g.attrs['depends_on'] = depends_on + g.attrs['transformation_type'] = 'translation' + g.attrs['vector'] = [1.0, 0.0, 0.0] + val = g.create_dataset( + 'value', shape=(samples,), maxshape=(None,), dtype=np.float64 + ) + val.attrs['units'] = units + t = g.create_dataset('time', shape=(samples,), maxshape=(None,), dtype=np.int64) + t.attrs['units'] = 'ns' + t.attrs['start'] = '1970-01-01T00:00:00' + for stat in ('average_value', 'minimum_value', 'maximum_value'): + ds = g.create_dataset(stat, data=0.0) + ds.attrs['units'] = units + + +def _make_artifact( + path, *, m4_via_carriage: bool = False, m4_dynamic_only: bool = False +) -> str: + """Build a minimal LOKI-shaped artifact for tests.""" + fn = str(path / 'geom.nxs') + with h5py.File(fn, 'w') as f: + entry = f.create_group('entry') + entry.attrs['NX_class'] = 'NXentry' + inst = entry.create_group('instrument') + inst.attrs['NX_class'] = 'NXinstrument' + + # Carriage NXlog (the dynamic placeholder). + carriage = inst.create_group('detector_carriage') + carriage.attrs['NX_class'] = 'NXpositioner' + carriage.create_group('transformations') + _write_chain( + f, + 'entry/instrument/detector_carriage/transformations', + 'detector_carriage_zero', + depends_on='.', + value=5.098, + ) + _write_nxlog( + f, + 'entry/instrument/detector_carriage', + 'value', + depends_on=( + '/entry/instrument/detector_carriage/transformations/' + 'detector_carriage_zero' + ), + samples=0, + ) + + # loki_detector_0 depends on carriage NXlog. + det = inst.create_group('loki_detector_0') + det.attrs['NX_class'] = 'NXdetector' + det.create_dataset( + 'depends_on', data='/entry/instrument/detector_carriage/value' + ) + # detector_number sentinel so essreduce loaders don't choke. + det.create_dataset('detector_number', data=np.array([1, 2, 3])) + + # Static detector for the no-dynamic case. + static_det = inst.create_group('loki_detector_static') + static_det.attrs['NX_class'] = 'NXdetector' + static_det.create_group('transformations') + _write_chain( + f, + 'entry/instrument/loki_detector_static/transformations', + 'fixed', + depends_on='.', + value=1.0, + ) + static_det.create_dataset( + 'depends_on', + data='/entry/instrument/loki_detector_static/transformations/fixed', + ) + static_det.create_dataset('detector_number', data=np.array([10, 11])) + return fn + + +class _CarriageLog(ValueLog): + pass + + +class _OtherLog(ValueLog): + pass + + +# --- Instrument.apply_dynamic_transforms --- + + +def _make_workflow_loading(fn: str, source_name: str) -> sciline.Pipeline: + """A minimal Sciline pipeline mimicking essreduce's NeXus loading shape.""" + from ess.reduce.nexus.workflow import GenericNeXusWorkflow + + wf = GenericNeXusWorkflow(run_types=[SampleRun], monitor_types=[]) + wf[Filename[SampleRun]] = fn + wf[NeXusName[NXdetector]] = source_name + return wf + + +def _make_instrument( + bindings: list[DynamicTransformBinding], +) -> Instrument: + inst = Instrument(name='_test', log_context_bindings=bindings) + return inst + + +def test_apply_no_op_when_chain_has_no_dynamic_nxlog(tmp_path) -> None: + fn = _make_artifact(tmp_path) + inst = _make_instrument( + [ + DynamicTransformBinding( + nxlog_path='/entry/instrument/detector_carriage/value', + stream_name='detector_carriage', + log_key=_CarriageLog, + dependent_sources=frozenset({'loki_detector_0'}), + ), + ] + ) + source_name = 'loki_detector_static' + wf = _make_workflow_loading(fn, source_name) + context_keys = inst.apply_dynamic_transforms(wf, {source_name: NXdetector}) + assert context_keys == {} + + +def test_apply_patches_chain_for_matching_component(tmp_path) -> None: + fn = _make_artifact(tmp_path) + inst = _make_instrument( + [ + DynamicTransformBinding( + nxlog_path='/entry/instrument/detector_carriage/value', + stream_name='detector_carriage', + log_key=_CarriageLog, + dependent_sources=frozenset({'loki_detector_0'}), + ), + ] + ) + source_name = 'loki_detector_0' + wf = _make_workflow_loading(fn, source_name) + context_keys = inst.apply_dynamic_transforms(wf, {source_name: NXdetector}) + assert context_keys == {'detector_carriage': _CarriageLog} + + +def test_apply_no_samples_yet_raises_at_compute(tmp_path) -> None: + """End-to-end: pipeline configured, but log container is None at compute + time -> patched provider raises with a registry-aware message.""" + fn = _make_artifact(tmp_path) + inst = _make_instrument( + [ + DynamicTransformBinding( + nxlog_path='/entry/instrument/detector_carriage/value', + stream_name='detector_carriage', + log_key=_CarriageLog, + dependent_sources=frozenset({'loki_detector_0'}), + ), + ] + ) + source_name = 'loki_detector_0' + wf = _make_workflow_loading(fn, source_name) + inst.apply_dynamic_transforms(wf, {source_name: NXdetector}) + wf[_CarriageLog] = _CarriageLog(values=None) + with pytest.raises(ValueError, match='No samples yet'): + wf.compute(NeXusTransformationChain[NXdetector, SampleRun]) + + +def test_apply_uses_latest_sample(tmp_path) -> None: + fn = _make_artifact(tmp_path) + inst = _make_instrument( + [ + DynamicTransformBinding( + nxlog_path='/entry/instrument/detector_carriage/value', + stream_name='detector_carriage', + log_key=_CarriageLog, + dependent_sources=frozenset({'loki_detector_0'}), + ), + ] + ) + source_name = 'loki_detector_0' + wf = _make_workflow_loading(fn, source_name) + inst.apply_dynamic_transforms(wf, {source_name: NXdetector}) + log = sc.DataArray( + sc.array(dims=['time'], values=[1.0, 2.0, 7.5], unit='mm'), + coords={ + 'time': sc.array( + dims=['time'], values=[0, 1, 2], unit='ns', dtype='datetime64' + ) + }, + ) + wf[_CarriageLog] = _CarriageLog(values=log) + chain = wf.compute(NeXusTransformationChain[NXdetector, SampleRun]) + patched_value = chain.transformations[ + '/entry/instrument/detector_carriage/value' + ].value + assert sc.identical(patched_value, sc.scalar(7.5, unit='mm')) + + +# --- aux source composition --- + + +def _job_id(source_name: str) -> JobId: + import uuid + + return JobId(source_name=source_name, job_number=uuid.uuid4()) + + +def test_compose_aux_inputs_filtered_by_consumers() -> None: + inst = _make_instrument( + [ + DynamicTransformBinding( + nxlog_path='/a', + stream_name='stream_a', + log_key=_CarriageLog, + dependent_sources=frozenset({'src_a'}), + ), + DynamicTransformBinding( + nxlog_path='/b', + stream_name='stream_b', + log_key=_OtherLog, + dependent_sources=frozenset({'src_b'}), + ), + ] + ) + aux_a = compose_aux_sources(inst, ['src_a'], None) + assert aux_a is not None + assert set(aux_a.inputs) == {'stream_a'} + + aux_ab = compose_aux_sources(inst, ['src_a', 'src_b'], None) + assert aux_ab is not None + assert set(aux_ab.inputs) == {'stream_a', 'stream_b'} + + assert compose_aux_sources(inst, ['src_unknown'], None) is None + + +def test_compose_render_filtered_by_source_name() -> None: + inst = _make_instrument( + [ + DynamicTransformBinding( + nxlog_path='/a', + stream_name='stream_a', + log_key=_CarriageLog, + dependent_sources=frozenset({'src_a', 'src_shared'}), + ), + DynamicTransformBinding( + nxlog_path='/b', + stream_name='stream_b', + log_key=_OtherLog, + dependent_sources=frozenset({'src_b'}), + ), + ] + ) + aux = compose_aux_sources(inst, ['src_a', 'src_shared', 'src_b'], None) + assert aux is not None + assert aux.render(_job_id('src_a')) == {'stream_a': 'stream_a'} + assert aux.render(_job_id('src_shared')) == {'stream_a': 'stream_a'} + assert aux.render(_job_id('src_b')) == {'stream_b': 'stream_b'} + assert aux.render(_job_id('src_other')) == {} diff --git a/tests/handlers/monitor_workflow_test.py b/tests/handlers/monitor_workflow_test.py index a6f041209..5ff2f3fcc 100644 --- a/tests/handlers/monitor_workflow_test.py +++ b/tests/handlers/monitor_workflow_test.py @@ -40,6 +40,14 @@ ) +@pytest.fixture +def instrument(): + """Minimal instrument used by create_monitor_workflow tests.""" + from ess.livedata.config.instrument import Instrument + + return Instrument(name='test_inst', monitors=['monitor_1', 'monitor_2']) + + class TestMonitorDataParams: """Tests for MonitorDataParams Pydantic model.""" @@ -277,38 +285,49 @@ class TestCreateMonitorWorkflow: def toa_edges(self): return sc.linspace('time_of_arrival', 0, 71_000_000, num=101, unit='ns') - def test_creates_stream_processor_workflow(self, toa_edges): + def test_creates_stream_processor_workflow(self, instrument, toa_edges): from ess.livedata.handlers.stream_processor_workflow import ( StreamProcessorWorkflow, ) - workflow = create_monitor_workflow('monitor_1', toa_edges) + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) assert isinstance(workflow, StreamProcessorWorkflow) - def test_workflow_has_required_methods(self, toa_edges): - workflow = create_monitor_workflow('monitor_1', toa_edges) + def test_workflow_has_required_methods(self, instrument, toa_edges): + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) assert hasattr(workflow, 'accumulate') assert hasattr(workflow, 'finalize') assert hasattr(workflow, 'clear') - def test_workflow_with_range_filter(self, toa_edges): + def test_workflow_with_range_filter(self, instrument, toa_edges): range_filter = ( sc.scalar(10_000_000, unit='ns'), sc.scalar(60_000_000, unit='ns'), ) workflow = create_monitor_workflow( - 'monitor_1', toa_edges, range_filter=range_filter + 'monitor_1', toa_edges, range_filter=range_filter, instrument=instrument ) assert workflow is not None - def test_workflow_with_wavelength_mode_requires_lookup_table(self, toa_edges): + def test_workflow_with_wavelength_mode_requires_lookup_table( + self, instrument, toa_edges + ): """Test that wavelength mode requires lookup_table_filename.""" with pytest.raises(ValueError, match="lookup_table_filename is required"): create_monitor_workflow( - 'monitor_1', toa_edges, coordinate_mode='wavelength' + 'monitor_1', + toa_edges, + coordinate_mode='wavelength', + instrument=instrument, ) - def test_workflow_with_wavelength_mode_requires_geometry_file(self, toa_edges): + def test_workflow_with_wavelength_mode_requires_geometry_file( + self, instrument, toa_edges + ): """Test that wavelength mode requires geometry_filename.""" with pytest.raises(ValueError, match="geometry_filename is required"): create_monitor_workflow( @@ -316,9 +335,10 @@ def test_workflow_with_wavelength_mode_requires_geometry_file(self, toa_edges): toa_edges, coordinate_mode='wavelength', lookup_table_filename='/path/to/lookup.h5', + instrument=instrument, ) - def test_context_keys_forwarded_to_stream_processor(self, toa_edges): + def test_context_keys_forwarded_to_stream_processor(self, instrument, toa_edges): """Test that context_keys are passed through to StreamProcessorWorkflow.""" from ess.livedata.handlers.stream_processor_workflow import ( StreamProcessorWorkflow, @@ -326,23 +346,115 @@ def test_context_keys_forwarded_to_stream_processor(self, toa_edges): context_keys = {'position': sc.Variable} workflow = create_monitor_workflow( - 'monitor_1', toa_edges, context_keys=context_keys + 'monitor_1', toa_edges, context_keys=context_keys, instrument=instrument ) assert isinstance(workflow, StreamProcessorWorkflow) # Verify context_keys are stored on the workflow assert 'position' in workflow._context_keys assert workflow._context_keys['position'] is sc.Variable - def test_without_context_keys_still_works(self, toa_edges): + def test_without_context_keys_still_works(self, instrument, toa_edges): """Test that omitting context_keys preserves existing behavior.""" from ess.livedata.handlers.stream_processor_workflow import ( StreamProcessorWorkflow, ) - workflow = create_monitor_workflow('monitor_1', toa_edges) + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) assert isinstance(workflow, StreamProcessorWorkflow) assert workflow._context_keys == {} + def test_instrument_without_matching_binding_adds_no_context_keys(self, toa_edges): + from ess.livedata.config import Instrument + from ess.livedata.handlers.dynamic_transforms import ( + DynamicTransformBinding, + ) + from ess.livedata.handlers.stream_processor_workflow import ValueLog + + class _SomeLog(ValueLog): + pass + + instrument = Instrument( + name='_test', + monitors=['monitor_1', 'monitor_2'], + log_context_bindings=[ + DynamicTransformBinding( + nxlog_path='/entry/instrument/some/value', + stream_name='some_stream', + log_key=_SomeLog, + dependent_sources=frozenset({'monitor_2'}), + ), + ], + ) + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) + assert workflow._context_keys == {} + + def test_instrument_with_matching_binding_merges_context_keys(self, toa_edges): + from ess.livedata.config import Instrument + from ess.livedata.handlers.dynamic_transforms import ( + DynamicTransformBinding, + ) + from ess.livedata.handlers.stream_processor_workflow import ValueLog + + class _CarriageLog(ValueLog): + pass + + instrument = Instrument( + name='_test', + monitors=['monitor_1'], + log_context_bindings=[ + DynamicTransformBinding( + nxlog_path='/entry/instrument/carriage/value', + stream_name='carriage', + log_key=_CarriageLog, + dependent_sources=frozenset({'monitor_1'}), + ), + ], + ) + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) + assert workflow._context_keys == {'carriage': _CarriageLog} + + def test_instrument_keys_merge_with_caller_supplied_context_keys(self, toa_edges): + from ess.livedata.config import Instrument + from ess.livedata.handlers.dynamic_transforms import ( + DynamicTransformBinding, + ) + from ess.livedata.handlers.stream_processor_workflow import ValueLog + + class _CarriageLog(ValueLog): + pass + + instrument = Instrument( + name='_test', + monitors=['monitor_1'], + log_context_bindings=[ + DynamicTransformBinding( + nxlog_path='/entry/instrument/carriage/value', + stream_name='carriage', + log_key=_CarriageLog, + dependent_sources=frozenset({'monitor_1'}), + ), + ], + ) + caller_keys: dict[str, type] = {'extra': sc.Variable} + workflow = create_monitor_workflow( + 'monitor_1', + toa_edges, + context_keys=caller_keys, + instrument=instrument, + ) + assert workflow._context_keys == { + 'extra': sc.Variable, + 'carriage': _CarriageLog, + } + # The caller's dict must not have been mutated. + assert caller_keys == {'extra': sc.Variable} + class TestMonitorWorkflowIntegration: """Integration tests for the V2 monitor workflow.""" @@ -362,8 +474,10 @@ def sample_binned_events(self): binned = sc.DataArray(sc.bins(begin=begin, dim='event', data=events)) return binned - def test_full_workflow_cycle(self, toa_edges, sample_binned_events): - workflow = create_monitor_workflow('monitor_1', toa_edges) + def test_full_workflow_cycle(self, instrument, toa_edges, sample_binned_events): + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) # Accumulate data workflow.accumulate( @@ -386,9 +500,13 @@ def test_full_workflow_cycle(self, toa_edges, sample_binned_events): assert results['counts_total'].value == 5.0 assert results['counts_in_toa_range'].value == 5.0 - def test_time_coords_on_delta_outputs(self, toa_edges, sample_binned_events): + def test_time_coords_on_delta_outputs( + self, instrument, toa_edges, sample_binned_events + ): """Delta outputs get time, start_time, end_time coords.""" - workflow = create_monitor_workflow('monitor_1', toa_edges) + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) workflow.accumulate( {'monitor_1': sample_binned_events}, start_time=Timestamp.from_ns(1000), @@ -421,10 +539,12 @@ def test_time_coords_on_delta_outputs(self, toa_edges, sample_binned_events): assert results['counts_in_toa_range'].coords['end_time'].value == 2000 def test_cumulative_output_has_no_time_coords( - self, toa_edges, sample_binned_events + self, instrument, toa_edges, sample_binned_events ): """Cumulative output should not have time coords (spans all time).""" - workflow = create_monitor_workflow('monitor_1', toa_edges) + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) workflow.accumulate( {'monitor_1': sample_binned_events}, start_time=Timestamp.from_ns(1000), @@ -437,10 +557,12 @@ def test_cumulative_output_has_no_time_coords( assert 'end_time' not in results['cumulative'].coords def test_time_coords_track_first_start_last_end( - self, toa_edges, sample_binned_events + self, instrument, toa_edges, sample_binned_events ): """Time coords should track first start_time and last end_time.""" - workflow = create_monitor_workflow('monitor_1', toa_edges) + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) # Multiple accumulate calls before finalize workflow.accumulate( {'monitor_1': sample_binned_events}, @@ -464,9 +586,13 @@ def test_time_coords_track_first_start_last_end( assert results['current'].coords['start_time'].value == 1000 assert results['current'].coords['end_time'].value == 4000 - def test_time_coords_reset_after_finalize(self, toa_edges, sample_binned_events): + def test_time_coords_reset_after_finalize( + self, instrument, toa_edges, sample_binned_events + ): """Time coords should reset between finalize cycles.""" - workflow = create_monitor_workflow('monitor_1', toa_edges) + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) # First cycle workflow.accumulate( @@ -489,10 +615,12 @@ def test_time_coords_reset_after_finalize(self, toa_edges, sample_binned_events) assert results2['current'].coords['end_time'].value == 6000 def test_cumulative_accumulates_window_clears( - self, toa_edges, sample_binned_events + self, instrument, toa_edges, sample_binned_events ): """Verify cumulative accumulates while window clears each cycle.""" - workflow = create_monitor_workflow('monitor_1', toa_edges) + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) # First cycle workflow.accumulate( @@ -517,9 +645,11 @@ def test_cumulative_accumulates_window_clears( # Current only has the latest cycle assert results2['current'].sum().value == 5.0 - def test_full_workflow_cycle_histogram_mode(self, toa_edges): + def test_full_workflow_cycle_histogram_mode(self, instrument, toa_edges): """Test full workflow cycle with histogram-mode monitor data.""" - workflow = create_monitor_workflow('monitor_1', toa_edges) + workflow = create_monitor_workflow( + 'monitor_1', toa_edges, instrument=instrument + ) # Create histogram data like Cumulative preprocessor produces input_edges = sc.linspace('tof', 0, 10, num=11, unit='ns') @@ -614,6 +744,7 @@ def _factory(source_name: str, params: MonitorDataParams): edges=params.get_active_edges(), range_filter=params.get_active_range(), coordinate_mode=mode, + instrument=test_instrument, ) # Verify factory works by creating a workflow @@ -629,7 +760,7 @@ def _factory(source_name: str, params: MonitorDataParams): class TestMonitorWorkflowFactoryCoordinateMode: """Tests for coordinate mode in monitor workflow factory.""" - def test_wavelength_mode_requires_geometry_and_lookup_table(self): + def test_wavelength_mode_requires_geometry_and_lookup_table(self, instrument): """Test that wavelength mode requires geometry and lookup table files. The create_monitor_workflow_factory doesn't provide these parameters, @@ -649,7 +780,7 @@ def test_wavelength_mode_requires_geometry_and_lookup_table(self): ) with pytest.raises(ValueError, match="lookup_table_filename is required"): - create_monitor_workflow_factory('monitor_1', params) + create_monitor_workflow_factory('monitor_1', params, instrument) @pytest.mark.slow @@ -723,7 +854,7 @@ def lookup_table_filename(self): ) def test_wavelength_mode_with_histogram_input( - self, wavelength_edges, geometry_filename, lookup_table_filename + self, instrument, wavelength_edges, geometry_filename, lookup_table_filename ): """Test wavelength mode workflow with histogram input data. @@ -742,6 +873,7 @@ def test_wavelength_mode_with_histogram_input( coordinate_mode='wavelength', geometry_filename=str(geometry_filename), lookup_table_filename=str(lookup_table_filename), + instrument=instrument, ) # Create histogram data like fake_monitors da00 mode produces diff --git a/tests/handlers/stream_processor_workflow_test.py b/tests/handlers/stream_processor_workflow_test.py index da2e9f07a..e9ecda451 100644 --- a/tests/handlers/stream_processor_workflow_test.py +++ b/tests/handlers/stream_processor_workflow_test.py @@ -8,7 +8,10 @@ import scipp as sc from ess.livedata.core.timestamp import Timestamp -from ess.livedata.handlers.stream_processor_workflow import StreamProcessorWorkflow +from ess.livedata.handlers.stream_processor_workflow import ( + StreamProcessorWorkflow, + ValueLog, +) Streamed = NewType('Streamed', int) Context = NewType('Context', int) @@ -356,3 +359,57 @@ def test_time_tracking_resets_after_finalize_and_clear(self, dataarray_workflow) ) result3 = workflow.finalize() assert result3['current'].coords['start_time'].value == 9000 + + +class _CarriageContainer(ValueLog): + pass + + +class TestValueLogWrappingRule: + """SPW wraps raw NXlog payloads in a ValueLog subclass before + set_context iff the Sciline key is a ValueLog subclass.""" + + @pytest.fixture + def workflow_using_log(self) -> sciline.Pipeline: + def consume_log(container: _CarriageContainer) -> Output: + assert isinstance(container, _CarriageContainer) + assert isinstance(container.values, sc.DataArray) + return Output(int(container.values.values[0])) + + return sciline.Pipeline((consume_log,)) + + def test_wraps_valuelog_subclass(self, workflow_using_log) -> None: + wf = StreamProcessorWorkflow( + workflow_using_log, + dynamic_keys={}, + context_keys={'carriage': _CarriageContainer}, + target_keys={'output': Output}, + accumulators=(), + ) + raw = sc.DataArray(sc.array(dims=['time'], values=[42.0], unit='mm')) + wf.accumulate( + {'carriage': raw}, + start_time=Timestamp.from_ns(0), + end_time=Timestamp.from_ns(1), + ) + result = wf.finalize() + assert result['output'] == 42 + + def test_passes_through_non_valuelog_keys(self) -> None: + # Plain integer context — must not be wrapped. + def consume(value: Context) -> Output: + return Output(value * 2) + + wf = StreamProcessorWorkflow( + sciline.Pipeline((consume,)), + dynamic_keys={}, + context_keys={'ctx': Context}, + target_keys={'output': Output}, + accumulators=(), + ) + wf.accumulate( + {'ctx': Context(5)}, + start_time=Timestamp.from_ns(0), + end_time=Timestamp.from_ns(1), + ) + assert wf.finalize()['output'] == 10 diff --git a/tests/services/data_reduction_test.py b/tests/services/data_reduction_test.py index 7aadcd0ad..310ff4372 100644 --- a/tests/services/data_reduction_test.py +++ b/tests/services/data_reduction_test.py @@ -180,6 +180,15 @@ def test_can_configure_and_stop_workflow_with_detector_and_monitors( service.step() sink.messages.clear() # Clear the workflow status message(s), one per source name. + # LOKI's i_of_q workflow walks the rear-bank carriage chain, which has a + # dynamic NXlog placeholder driven by the detector_carriage f144 stream. + # Publish a sample so the patched chain provider has a value to inject; + # without it, the workflow correctly raises "no samples yet" — see + # tests/handlers/dynamic_transforms_test.py. + if instrument == 'loki': + app.publish_log_message(source_name='detector_carriage', time=1, value=0.0) + service.step() + app.publish_events(size=2000, time=2) service.step() # No monitor data yet, so the workflow was not able to produce a result yet