Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions physicsnemo/datapipes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
NormalizeVectors,
Purge,
Rename,
Resize,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reshape missing from top-level exports

Reshape is exported from physicsnemo.datapipes.transforms (added to both the import and __all__ in transforms/__init__.py), but it is not imported or listed in this top-level datapipes/__init__.py. This means users cannot do from physicsnemo.datapipes import Reshape or dp.Reshape(...), unlike Resize which was properly added. This appears to be an oversight.

from physicsnemo.datapipes.transforms import (
    ...
    Rename,
    Resize,
    Reshape,   # <-- add this
    Scale,
    ...
)

__all__ = [
    ...
    "ConstantField",
    "Reshape",   # <-- add this
    ...
]

Scale,
SubsamplePoints,
Transform,
Expand Down Expand Up @@ -109,6 +110,7 @@
"CreateGrid",
"KNearestNeighbors",
"CenterOfMass",
"Resize",
# Transforms - Utility
"Rename",
"Purge",
Expand Down
113 changes: 101 additions & 12 deletions physicsnemo/datapipes/readers/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
NumpyReader - Read data from NumPy .npz files.

Supports reading from single .npz files or directories of .npz files.
In single-file mode, optional ``preload_to_cpu=True`` loads the entire
dataset into RAM at init for faster iteration with no per-sample I/O.
"""

from __future__ import annotations
Expand All @@ -38,8 +40,12 @@ class NumpyReader(Reader):
Read samples from NumPy .npz files.

Supports two modes:
1. Single .npz file: samples indexed along first dimension of each array
2. Directory of .npz files: one sample per file

1. **Single .npz file**: Samples are indexed along the first dimension
of each array. Optionally, ``preload_to_cpu=True`` loads all arrays
into RAM at init so iteration does no disk I/O.
2. **Directory of .npz files**: One sample per file; each file is opened
on demand.

Example (single .npz):
>>> # data.npz with arrays "positions" (N, 100, 3), "features" (N, 100)
Expand All @@ -52,6 +58,10 @@ class NumpyReader(Reader):
>>> # Directory with sample_0.npz, sample_1.npz, ...
>>> reader = NumpyReader("data_dir/", file_pattern="sample_*.npz") # doctest: +SKIP
>>> data, metadata = reader[0] # Returns (TensorDict, dict) tuple # doctest: +SKIP

Example (single .npz with preload):
>>> reader = NumpyReader("data.npz", preload_to_cpu=True) # doctest: +SKIP
>>> # All arrays loaded into RAM at init; no disk I/O during iteration
"""

def __init__(
Expand All @@ -65,6 +75,7 @@ def __init__(
pin_memory: bool = False,
include_index_in_metadata: bool = True,
coordinated_subsampling: Optional[dict[str, Any]] = None,
preload_to_cpu: bool = False,
) -> None:
"""
Initialize the NumPy reader.
Expand Down Expand Up @@ -93,13 +104,21 @@ def __init__(
Optional dict to configure coordinated subsampling (directory mode
only). If provided, must contain ``n_points`` (int) and
``target_keys`` (list of str).
preload_to_cpu : bool, default=False
If True, in single-file mode the reader loads all requested
arrays into RAM at init, closes the file, and serves samples
from memory. Use when the dataset fits in RAM and you want
to avoid disk I/O during training. Ignored in directory mode.

Raises
------
FileNotFoundError
If path doesn't exist.
ValueError
If no files found in directory or unsupported file type.
KeyError
If preload_to_cpu is True and a required field is missing
from the file (and not in default_values).
"""
super().__init__(
pin_memory=pin_memory,
Expand All @@ -112,14 +131,17 @@ def __init__(
self.default_values = default_values or {}
self.file_pattern = file_pattern
self.index_key = index_key
self.preload_to_cpu = preload_to_cpu

if not self.path.exists():
raise FileNotFoundError(f"Path not found: {self.path}")

# Determine mode based on path
self._mode: str # "single" or "directory"
# Mode: "single" (one .npz, samples along first dim) or "directory"
self._mode: str
self._files: Optional[list[Path]] = None
self._data: Optional[np.lib.npyio.NpzFile] = None
# When preload_to_cpu: in-memory arrays keyed by field name (single-file only)
self._preloaded: Optional[dict[str, np.ndarray]] = None
self._available_fields: list[str] = []

if self.path.is_dir():
Expand Down Expand Up @@ -147,19 +169,37 @@ def _setup_directory_mode(self) -> None:
self._available_fields = list(npz.files)

def _setup_single_file_mode(self) -> None:
"""Set up reader for single .npz file."""
"""Set up reader for a single .npz file; optionally preload all arrays to RAM."""
self._mode = "single"
self._data = np.load(self.path)
self._available_fields = list(self._data.files)

# Determine length from index_key or first field
# Sample count is the first dimension of index_key or of the first array
if self.index_key is not None:
self._length = self._data[self.index_key].shape[0]
elif self._available_fields:
self._length = self._data[self._available_fields[0]].shape[0]
else:
self._length = 0

# Optional: load entire dataset into RAM and close the file
if self.preload_to_cpu:
required = set(self.fields) - set(self.default_values.keys())
missing = required - set(self._data.files)
if missing:
raise KeyError(
f"Required fields {missing} not found in {self.path}. "
f"Available: {list(self._data.files)}"
)
self._preloaded = {}
for field in self.fields:
if field in self._data.files:
# .copy() forces a real array; np.array() ensures contiguous
self._preloaded[field] = np.array(self._data[field].copy())
if hasattr(self._data, "close"):
self._data.close()
self._data = None

@property
def fields(self) -> list[str]:
"""Fields that will be loaded (user-specified or all available)."""
Expand Down Expand Up @@ -275,20 +315,66 @@ def _load_from_npz(
# Directory mode: load full array
arr = arr[:]

data[field] = torch.from_numpy(np.array(arr))
data[field] = torch.from_numpy(np.asarray(arr, dtype=np.float32))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forced float32 cast breaks integer fields

The change from torch.from_numpy(np.array(arr)) to torch.from_numpy(np.asarray(arr, dtype=np.float32)) unconditionally casts every loaded field to float32. This silently corrupts integer fields (e.g., labels, masks, indices stored as int64/int32). The conftest fixture numpy_data_dir even creates label=np.array([i], dtype=np.int64).

Consider only casting floating-point arrays to float32 and leaving integer arrays untouched:

Suggested change
data[field] = torch.from_numpy(np.asarray(arr, dtype=np.float32))
data[field] = torch.from_numpy(np.asarray(arr, dtype=np.float32 if np.issubdtype(arr.dtype, np.floating) else arr.dtype))


elif field in self.default_values:
data[field] = self.default_values[field].clone()
data[field] = self.default_values[field].clone().float()

return data

def _load_sample_from_preloaded(self, index: int) -> dict[str, torch.Tensor]:
"""
Load a single sample by indexing into preloaded in-memory arrays.

Used only when ``preload_to_cpu=True`` in single-file mode. Applies
coordinated subsampling (random contiguous slice) when configured.

Parameters
----------
index : int
Sample index along the first dimension of each preloaded array.

Returns
-------
dict[str, torch.Tensor]
Dictionary mapping field names to CPU tensors for this sample.
"""
data = {}
fields_to_load = self.fields
target_keys_set = set()
subsample_slice = None

# If subsampling is enabled, pick one random contiguous slice for this sample
if self._coordinated_subsampling_config is not None:
n_points = self._coordinated_subsampling_config["n_points"]
target_keys_set = set(self._coordinated_subsampling_config["target_keys"])
for field in target_keys_set:
if field in self._preloaded:
arr = self._preloaded[field][index]
subsample_slice = self._select_random_sections_from_slice(
0, arr.shape[0], n_points
)
break

for field in fields_to_load:
if field in self._preloaded:
arr = np.array(self._preloaded[field][index], copy=False)
if subsample_slice is not None and field in target_keys_set:
arr = arr[subsample_slice]
data[field] = torch.from_numpy(np.asarray(arr, dtype=np.float32))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same float32 cast issue in preloaded path

Same issue as in _load_from_npz: unconditional dtype=np.float32 cast will corrupt integer fields loaded from preloaded arrays. The fix should mirror whatever approach is chosen for _load_from_npz (line 318).

elif field in self.default_values:
data[field] = self.default_values[field].clone().float()
return data

def _load_sample(self, index: int) -> dict[str, torch.Tensor]:
"""Load a single sample."""
"""Load a single sample from disk or from preloaded RAM."""
if self._mode == "directory":
file_path = self._files[index]
with np.load(file_path) as npz:
return self._load_from_npz(npz, index=None, file_path=file_path)
else: # single
elif self._preloaded is not None:
return self._load_sample_from_preloaded(index)
else:
return self._load_from_npz(self._data, index=index)

def __len__(self) -> int:
Expand Down Expand Up @@ -318,24 +404,27 @@ def _supports_coordinated_subsampling(self) -> bool:
return self._mode == "directory"

def close(self) -> None:
"""Close file handles."""
"""Close file handles and release preloaded in-memory arrays (if any)."""
super().close()
if self._data is not None:
if hasattr(self._data, "close"):
self._data.close()
self._data = None
self._preloaded = None

def __repr__(self) -> str:
subsample_info = ""
if self._coordinated_subsampling_config is not None:
cfg = self._coordinated_subsampling_config
subsample_info = f", subsampling={cfg['n_points']} points"

preload_info = ", preload_to_cpu=True" if self._preloaded is not None else ""
return (
f"NumpyReader("
f"path={self.path}, "
f"mode={self._mode}, "
f"len={len(self)}, "
f"fields={self.fields}"
f"{subsample_info})"
f"{subsample_info}"
f"{preload_info})"
)
4 changes: 4 additions & 0 deletions physicsnemo/datapipes/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
CenterOfMass,
CreateGrid,
KNearestNeighbors,
Resize,
)
from physicsnemo.datapipes.transforms.subsample import (
SubsamplePoints,
Expand All @@ -55,6 +56,7 @@
ConstantField,
Purge,
Rename,
Reshape,
)

__all__ = [
Expand Down Expand Up @@ -83,8 +85,10 @@
"CreateGrid",
"KNearestNeighbors",
"CenterOfMass",
"Resize",
# Utility
"Rename",
"Purge",
"ConstantField",
"Reshape",
]
3 changes: 2 additions & 1 deletion physicsnemo/datapipes/transforms/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from __future__ import annotations

import collections.abc
import warnings
from pathlib import Path
from typing import Any, Literal, Optional
Expand Down Expand Up @@ -196,7 +197,7 @@ def _process_stats_dict(
"""Process statistics into dict of tensors for each field."""
result: dict[str, torch.Tensor] = {}

if isinstance(stats, dict):
if isinstance(stats, collections.abc.Mapping):
for key in self.input_keys:
if key not in stats:
raise ValueError(
Expand Down
Loading
Loading