Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ physicsnemo/models/vfgn/ @mnabian

# Experimental deliberately has no codeowner
physicsnemo/experimental/
physicsnemo/experimental/datapipes/healda/ @pzharrington

# ==============================================================================
# EXAMPLES - Active Learning
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ repos:
language: python
types: [python]
additional_dependencies: ['interrogate==1.7.0']
exclude: ^docs/|^physicsnemo/experimental/|^test/
exclude: ^docs/|^physicsnemo/experimental/|^test/|^examples/.*/test/

- repo: https://github.com/igorshubovych/markdownlint-cli
rev: v0.35.0
Expand Down
202 changes: 202 additions & 0 deletions examples/weather/healda/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# HealDA — AI-based Data Assimilation on the HEALPix Grid

> **🏗️ This recipe is under active construction. 🏗️**
> Structure and functionality are subject to changes.

HealDA is a stateless assimilation model that produces a single
global weather analysis from conventional and satellite
observations. It operates on a HEALPix level-6 padded XY grid
and outputs ERA5-compatible atmospheric variables.

## Setup

Start by installing PhysicsNeMo (if not already installed) with
the `healda` optional dependency group, along with the packages
in `requirements.txt`. Then, copy this folder
(`examples/weather/healda`) to a system with a GPU available.
Also, prepare a dataset that can serve training data according
to the protocols outlined in the
[Generalized Data Loading](#generalized-data-loading) section
below.

## Generalized Data Loading

The `physicsnemo.experimental.datapipes.healda` package provides
a composable data loading pipeline with clear extension points.
The architecture separates components into loaders, transforms,
datasets, and sampling infrastructure.

### Architecture

```text
ObsERA5Dataset(era5_data, obs_loader, transform)
| Temporal windowing via FrameIndexGenerator
| __getitems__ -> get() per index -> transform.transform()
v
RestartableDistributedSampler (stateful distributed sampling with checkpointing)
|
DataLoader (pin_memory, persistent_workers)
|
prefetch_map(loader, transform.device_transform)
|
Training loop (GPU-ready batch)
```

### Key Protocols

Custom data sources and transforms plug in via these protocols
(see `physicsnemo.experimental.datapipes.healda.protocols`):

**`ObsLoader`** — the observation loading interface:

```python
class MyObsLoader:
async def sel_time(self, times):
"""Return {"obs": [pa.Table, ...]}"""
...
```

**`Transform`** / **`DeviceTransform`** — two-stage batch
processing:

```python
class MyTransform:
def transform(self, times, frames):
"""CPU-side: normalize, encode obs, time features."""
...

def device_transform(self, batch, device):
"""GPU-side: move to device, compute obs features."""
...
```

### Provided Implementations

| Component | Module | Description |
|---|---|---|
| `ObsERA5Dataset` | `dataset` | ERA5 state + observations |
| `UFSUnifiedLoader` | `loaders.ufs_obs` | Parquet obs loader |
| `ERA5Loader` | `loaders.era5` | Async ERA5 zarr loader |
| `ERA5ObsTransform` | `transforms.era5_obs` | Two-stage transform |
| `RestartableDistributedSampler` | `samplers` | Stateful distributed sampler |
| `prefetch_map` | `prefetch` | CUDA stream prefetching |

All modules above are under
`physicsnemo.experimental.datapipes.healda`.

### Writing a Custom Observation Loader

Implement `async def sel_time(times)` returning a dict with
observation data per timestamp:

```python
class GOESRadianceLoader:
def __init__(self, data_path, channels):
self.data_path = data_path
self.channels = channels

async def sel_time(self, times):
tables = []
for t in times:
table = self._load_goes_radiances(t)
tables.append(table)
return {"obs": tables}
```

Then pass it to the dataset:

```python
from physicsnemo.experimental.datapipes.healda import (
ObsERA5Dataset,
)
from physicsnemo.experimental.datapipes.healda.transforms.era5_obs import (
ERA5ObsTransform,
)
from physicsnemo.experimental.datapipes.healda.configs.variable_configs import (
VARIABLE_CONFIGS,
)

dataset = ObsERA5Dataset(
era5_data=era5_xr["data"],
obs_loader=GOESRadianceLoader(...),
transform=ERA5ObsTransform(sensors=["goes"], ...),
variable_config=VARIABLE_CONFIGS["era5"],
)
```

### Putting It Together

A complete training pipeline wires together all the
components — dataset, sampler, DataLoader, and GPU prefetch:

```python
import torch
from torch.utils.data import DataLoader

from physicsnemo.experimental.datapipes.healda import (
ObsERA5Dataset,
RestartableDistributedSampler,
identity_collate,
prefetch_map,
)
from physicsnemo.experimental.datapipes.healda.loaders.ufs_obs import (
UFSUnifiedLoader,
)
from physicsnemo.experimental.datapipes.healda.transforms.era5_obs import (
ERA5ObsTransform,
)
from physicsnemo.experimental.datapipes.healda.configs.variable_configs import (
VARIABLE_CONFIGS,
)

sensors = ["atms", "mhs", "conv"]

# 1. Build loaders
obs_loader = UFSUnifiedLoader(
data_path="/path/to/processed_obs",
sensors=sensors,
normalization="zscore",
obs_context_hours=(-21, 3),
)
transform = ERA5ObsTransform(
variable_config=VARIABLE_CONFIGS["era5"],
sensors=sensors,
)

# 2. Build dataset
dataset = ObsERA5Dataset(
era5_data=era5_xr["data"],
obs_loader=obs_loader,
transform=transform,
variable_config=VARIABLE_CONFIGS["era5"],
split="train",
)

# 3. Sampler + DataLoader
sampler = RestartableDistributedSampler(
dataset, rank=rank, num_replicas=world_size,
)
sampler.set_epoch(0)
dataloader = DataLoader(
dataset,
sampler=sampler,
batch_size=2,
num_workers=8,
collate_fn=identity_collate,
pin_memory=True,
persistent_workers=True,
)

# 4. GPU prefetch (hides CPU→GPU transfer behind training)
device = torch.device("cuda")
loader = prefetch_map(
dataloader,
lambda batch: transform.device_transform(batch, device),
queue_size=1,
)

# 5. Training loop — batches arrive GPU-ready
for batch in loader:
loss = model(batch)
...
```
9 changes: 9 additions & 0 deletions examples/weather/healda/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# nvidia-physicsnemo[datapipes-extras]
cftime
pyarrow
dotenv
earth2grid @ git+https://github.com/NVlabs/earth2grid.git@main
healpy
Comment thread
pzharrington marked this conversation as resolved.
Outdated
matplotlib
joblib
icechunk
15 changes: 15 additions & 0 deletions physicsnemo/experimental/datapipes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# SPDX-FileCopyrightText: Copyright (c) 2023 - 2026 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
79 changes: 79 additions & 0 deletions physicsnemo/experimental/datapipes/healda/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# SPDX-FileCopyrightText: Copyright (c) 2023 - 2026 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""HealDA data loading pipeline.

Provides the complete data pipeline for HealDA training: observation loading,
ERA5 state loading, two-stage transforms (CPU + GPU), distributed sampling,
and background CUDA prefetching.

Key entry points:

- :class:`ObsERA5Dataset` — map-style dataset combining ERA5 state + observations
- :class:`UFSUnifiedLoader` — parquet-based observation loader
- :class:`ERA5ObsTransform` — two-stage transform with Triton feature kernels
- :func:`prefetch_map` — background CUDA stream prefetching
- :class:`RestartableDistributedSampler` — stateful distributed sampler with checkpoint support

Protocols for custom loaders/transforms:

- :class:`ObsLoader` — async observation loading interface
- :class:`Transform` — CPU-side batch transform
- :class:`DeviceTransform` — GPU-side batch transform
"""

from physicsnemo.experimental.datapipes.healda.dataset import (
ObsERA5Dataset,
identity_collate,
)
from physicsnemo.experimental.datapipes.healda.prefetch import prefetch_map
from physicsnemo.experimental.datapipes.healda.protocols import (
DeviceTransform,
ObsLoader,
Transform,
)
from physicsnemo.experimental.datapipes.healda.samplers import (
RestartableDistributedSampler,
)
from physicsnemo.experimental.datapipes.healda.types import (
Batch,
BatchInfo,
TimeUnit,
UnifiedObservation,
VariableConfig,
empty_batch,
split_by_sensor,
)

__all__ = [
# Dataset
"ObsERA5Dataset",
# Protocols
"ObsLoader",
"Transform",
"DeviceTransform",
# Types
"UnifiedObservation",
"Batch",
"BatchInfo",
"VariableConfig",
"TimeUnit",
"empty_batch",
"split_by_sensor",
# Infrastructure
"prefetch_map",
"RestartableDistributedSampler",
"identity_collate",
]
15 changes: 15 additions & 0 deletions physicsnemo/experimental/datapipes/healda/configs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# SPDX-FileCopyrightText: Copyright (c) 2023 - 2026 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading
Loading