Skip to content

Add temporal rolling window support#5393

Open
esnvidia wants to merge 15 commits intorapidsai:mainfrom
esnvidia:optimization-full-B-C-D-hash
Open

Add temporal rolling window support#5393
esnvidia wants to merge 15 commits intorapidsai:mainfrom
esnvidia:optimization-full-B-C-D-hash

Conversation

@esnvidia
Copy link
Copy Markdown

@esnvidia esnvidia commented Jan 13, 2026

Pull Request: Windowed temporal neighbor sampling (Python) — B+C+D integration

Summary

This PR exposes windowed temporal neighbor sampling from cuGraph C++ through the C API → Cython → pylibcugraph Python API. It ensures correctness for rolling windows and provides reproducible benchmark/profiling hooks to A/B compare baseline vs optimized behavior.

Primary goals:

  • Correctness: window filtering is applied end-to-end (Python → C++) and robust for rolling windows (forward/backward/shrinking).
  • Performance: avoid the pathological O(E) per-iteration edge-mask scan in rolling-window temporal sampling.
  • User control: expensive validation is gated behind do_expensive_check and does not impact default performance.

What are B, C, and D? (and where they live)

B — Window bounds via binary search (O(log E))

  • What: When edge times are temporally sorted, find the indices for [window_start, window_end) using binary search rather than scanning all edges.
  • Where:
    • Core windowed logic: cugraph/cpp/src/sampling/windowed_temporal_sampling_impl.hpp
      • set_window_mask() (binary search bounds + set from sorted range): L102-L127
      • compute_window_bounds_binary_search() callsite: L112-L118

C — Incremental sliding-window updates (O(ΔE))

  • What: Persist state across calls and update the packed window mask incrementally when the window slides forward (update only the delta edges entering/leaving the window).
  • Where:
    • Persistent state struct: cugraph/cpp/src/sampling/window_state_fwd.hpp
      • window_state_t (+ persisted packed mask): L25-L54
      • ensure_edge_mask_size(): L46-L53
    • Incremental update: cugraph/cpp/src/sampling/windowed_temporal_sampling_impl.hpp
      • update_window_mask_incremental(): L141-L185
      • Robust fallback when the window shrinks/moves backward: L158-L171
    • Window-state initialization: cugraph/cpp/src/sampling/windowed_temporal_sampling_impl.hpp
      • initialize_window_state() (one-time sort-by-time): L56-L88

D — Inline temporal + window filtering in the sampling kernel (O(frontier_edges))

  • What: Apply temporal constraints (and optional window bounds) inline while traversing only frontier edges, rather than precomputing a graph-wide mask.
  • Where:
    • Predicate used during sampling: cugraph/cpp/src/sampling/detail/sample_edges.cuh (temporal_sample_edge_biases_op_t, includes optional window_start/window_end)
    • Sampling path that uses it: cugraph/cpp/src/sampling/detail/sampling_utils.hpp (temporal_sample_edges(...))
    • Callsite for the main temporal sampling pipeline (fanout > 0): cugraph/cpp/src/sampling/temporal_sampling_impl.hpp

Key line references (D):

  • cugraph/cpp/src/sampling/detail/sample_edges.cuh
    • temporal_sample_edge_biases_op_t definition: L86-L214
    • within_window() helper: L93-L96
    • Window predicate applied: valid = valid && within_window(edge_time); at L129-L130, L155-L156, L183-L184, L211-L212
  • cugraph/cpp/src/sampling/detail/sampling_utils.hpp
    • temporal_sample_edges(...) signature includes optional window args: L205-L219
  • cugraph/cpp/src/sampling/temporal_sampling_impl.hpp
    • Sampling path uses temporal_sample_edges(...) and threads through window_start/window_end: L316-L386 (see window_start/window_end passed at L359-L361)

Important behavioral note: “standard rolling” already benefits from D (inline temporal filtering via starting_vertex_times + temporal_sampling_comparison). Windowed rolling adds the lower bound (window_start) and can avoid O(E) mask scans by not attaching a global edge mask by default.

B/C are implemented and available; default windowed sampling uses D (inline window predicate) to avoid the masked-sampling pipeline for fanout>0. Baseline O(E) can be forced via env vars for A/B

What changed (high level)

Python API (pylibcugraph)

  • Adds optional window parameters to homogeneous_uniform_temporal_neighbor_sample():
    • window_start, window_end (+ window_time_unit)
  • If both are provided, dispatches to the windowed C API entrypoint; if only one is provided, raises ValueError.

Files:

  • cugraph/python/pylibcugraph/pylibcugraph/homogeneous_uniform_temporal_neighbor_sample.pyx

C API

  • Adds windowed C API entrypoint:
    • cugraph_homogeneous_uniform_temporal_neighbor_sample_windowed(...)

Files:

  • cugraph/cpp/include/cugraph_c/sampling_algorithms.h
  • cugraph/cpp/src/c_api/temporal_neighbor_sampling.cu
  • cugraph/python/pylibcugraph/pylibcugraph/_cugraph_c/sampling_algorithms.pxd

C++ implementation

  • Adds/uses a persistent window_state_t attached to the graph (SG) to support incremental window updates.
  • Robustness fixes for rolling windows:
    • correct initialization on first use of state
    • safe fallback to rebuild when window moves backward or shrinks
  • Expensive validation (e.g., sortedness checks) is gated behind do_expensive_check.

Files:

  • cugraph/cpp/src/sampling/windowed_temporal_sampling_impl.hpp
  • cugraph/cpp/src/sampling/window_state_fwd.hpp
  • cugraph/cpp/src/sampling/temporal_sampling_impl.hpp

Debug / A/B knobs (useful for profiling)

  • CUGRAPH_WINDOWED_TEMPORAL_FORCE_OE=1
    • Forces the windowed path to build the window mask via the O(E) scan path.
    • Intended for apples-to-apples baseline comparisons.
  • CUGRAPH_WINDOWED_TEMPORAL_USE_EDGE_MASK=1
    • Forces using a global edge mask for windowing (legacy behavior; useful for A/B and gather-style paths).
  • CUGRAPH_MASKED_SAMPLING_AVOID_PARTITION=1
    • Debug knob to skip degree bucketing (partition) inside masked sampling.

Results (no nsys, after rebase on origin/main)

Python C++-equivalent benchmark (10M vertices / 300M edges; 30 iters; 10k seeds; fanout [10,10])

  • standard: 24.58 ms/iter, throughput 4.05 M edges/s
  • windowed_bcd (inline window default): 23.83 ms/iter, throughput 4.16 M edges/s
  • windowed_baseline_oe (forced edge mask + forced O(E) scan): 45.95 ms/iter, throughput 2.16 M edges/s
  • Speedup (windowed_bcd vs windowed_baseline_oe): ~1.93×

Rolling benchmark (10M/300M; 30 iters; 10k seeds; 365d window; step=1d)

  • standard rolling: 21.27 ms
  • windowed rolling: 20.10 ms
  • Speedup: 1.06× (~5.5%)

Large-scale correctness

  • Full-range window exact match ✅
  • Narrow window invariant ([start,end)) ✅
  • Edge times sorted ✅

Why rolling-window speedup vs “standard rolling” is modest (~5–7%)

The rolling benchmark’s standard mode already applies the “≤ window_end” constraint via:

  • starting_vertex_times = window_end per seed, and
  • temporal_sampling_comparison="monotonically_decreasing"

So the windowed mode mainly adds the lower bound (window_start). Without time-sorted adjacency per vertex (Phase-2), this is typically just an extra predicate per candidate edge, not a large reduction in traversal.

The large speedup is vs the windowed baseline that forces global O(E) mask building, which this PR avoids by default.

How to run everything

All commands below assume you’re using the existing container: cugraph-nsys.

1) Pre-commit (RAPIDS)

cd /home/coder/cugraph
pre-commit install
pre-commit run --all-files

2) Unit tests (pylibcugraph)

source /opt/conda/etc/profile.d/conda.sh
conda activate rapids

pytest -q /home/coder/cugraph/python/pylibcugraph/pylibcugraph/tests/test_windowed_temporal_sampling.py
pytest -q /home/coder/cugraph/python/pylibcugraph/pylibcugraph/tests/test_temporal_neighbor_sample.py

3) Benchmarks (no nsys)

(A) Python C++-equivalent benchmark

source /opt/conda/etc/profile.d/conda.sh
conda activate rapids

python /home/coder/retail-gnn-blueprint/benchmarks/python_temporal_sampling_cpp_benchmark.py \
  --mode standard \
  --customers 9300000 --articles 700000 --edges 300000000 \
  --seeds 10000 --iterations 30 --warmup 2 --fanout 10,10 \
  --rmm-initial-gb 16 --rmm-max-gb 44

python /home/coder/retail-gnn-blueprint/benchmarks/python_temporal_sampling_cpp_benchmark.py \
  --mode windowed_bcd \
  --customers 9300000 --articles 700000 --edges 300000000 \
  --seeds 10000 --iterations 30 --warmup 2 --fanout 10,10 \
  --rmm-initial-gb 16 --rmm-max-gb 44

python /home/coder/retail-gnn-blueprint/benchmarks/python_temporal_sampling_cpp_benchmark.py \
  --mode windowed_baseline_oe \
  --customers 9300000 --articles 700000 --edges 300000000 \
  --seeds 10000 --iterations 30 --warmup 2 --fanout 10,10 \
  --rmm-initial-gb 16 --rmm-max-gb 44

(B) Rolling windows: standard vs windowed

source /opt/conda/etc/profile.d/conda.sh
conda activate rapids

python /home/coder/retail-gnn-blueprint/benchmarks/rolling_standard_vs_windowed_10M_300M.py

(C) Large correctness A/B

source /opt/conda/etc/profile.d/conda.sh
conda activate rapids

python /home/coder/retail-gnn-blueprint/benchmarks/ab_windowed_temporal_correctness_10M_300M.py

4) Profiling (nsys)

(A) Profile the Python C++-equivalent benchmark

source /opt/conda/etc/profile.d/conda.sh
conda activate rapids

nsys profile \
  --gpu-metrics-devices=0 \
  --trace=cuda,nvtx \
  --cuda-memory-usage=true \
  --capture-range=cudaProfilerApi --capture-range-end=stop \
  --force-overwrite=true \
  -o /home/coder/profiles/python_cpp_equiv_windowed_bcd \
  python /home/coder/retail-gnn-blueprint/benchmarks/python_temporal_sampling_cpp_benchmark.py \
    --mode windowed_bcd \
    --customers 9300000 --articles 700000 --edges 300000000 \
    --seeds 10000 --iterations 30 --warmup 2 --fanout 10,10 \
    --rmm-initial-gb 16 --rmm-max-gb 44

Repeat with --mode windowed_baseline_oe for baseline.

(B) Profile rolling windows

source /opt/conda/etc/profile.d/conda.sh
conda activate rapids

nsys profile \
  --gpu-metrics-devices=0 \
  --trace=cuda,nvtx \
  --cuda-memory-usage=true \
  --force-overwrite=true \
  -o /home/coder/profiles/rolling_standard_vs_windowed \
  python /home/coder/retail-gnn-blueprint/benchmarks/rolling_standard_vs_windowed_10M_300M.py

How to call the Python API for rolling windows

import cupy as cp
import numpy as np
from pylibcugraph import homogeneous_uniform_temporal_neighbor_sample

SECONDS_PER_DAY = 86400

fanout = np.array([10, 10], dtype=np.int32)
temporal_comparison = "monotonically_decreasing"

for i in range(num_iters):
    window_end = base_end + i * step_days * SECONDS_PER_DAY
    window_start = window_end - window_days * SECONDS_PER_DAY

    start_vertices = cp.random.randint(0, num_customers, size=n_seeds, dtype=cp.int64)
    starting_vertex_times = cp.full(n_seeds, window_end, dtype=cp.int64)

    result = homogeneous_uniform_temporal_neighbor_sample(
        resource_handle,
        graph,
        None,  # temporal_property_name
        start_vertices,
        starting_vertex_times,
        None,  # starting_vertex_label_offsets
        fanout,
        with_replacement=True,
        do_expensive_check=False,
        temporal_sampling_comparison=temporal_comparison,
        window_start=window_start,
        window_end=window_end,
        window_time_unit="s",
    )

Phase-2 (future work)

To get larger speedups vs standard rolling, we need time-bounded neighbor iteration (e.g., time-sorted adjacency per vertex/edge-type and per-vertex binary-search bounds). This is a multi-week change because it affects graph representation/indexing and must support hetero + MG.

PR_WINDOWED_TEMPORAL_SAMPLING_BCD.md

When vertex_labels is std::nullopt, the else branch incorrectly
attempted to access vertex_labels_p1->begin() and vertex_labels_p2->begin(),
causing cudaErrorIllegalAddress crash.

This fix removes the label iterators from thrust::make_zip_iterator
calls in the else branch, since labels are not available when
vertex_labels is nullopt.

Fixes temporal neighbor sampling when called without vertex labels,
e.g., homogeneous_uniform_temporal_neighbor_sample with
temporal_property_name=None.

All 210+ temporal sampling tests pass with this fix.
Replace O(E) edge mask computation with O(frontier_edges) inline filtering
for the sampling path in temporal neighbor sampling.

Key changes:
- Only call update_temporal_edge_mask() for gather path (fan_out < 0)
- Use temporal_sample_edges() instead of sample_edges() for sampling path
- temporal_sample_edges performs inline temporal filtering during sampling

Performance improvement on 300M edge graph:
- Baseline A: 42.67ms/iteration
- Optimization D: 26.18ms/iteration (1.63x speedup)
- Eliminated transform_e_packed_bool kernel (was 62.6% of GPU time)

The optimization leverages the existing temporal_sample_edges function which
uses per_v_random_select_transform_outgoing_e with temporal_sample_edge_biases_op_t
to filter edges inline during the sampling primitive, avoiding the need to
pre-compute a global edge mask.
Add parallel-first CUDA primitives for window-based temporal filtering:

1. set_window_edge_mask(): O(E) parallel comparison for time window
2. compute_window_bounds_binary_search(): O(log E) binary search for sorted edges
3. set_mask_from_sorted_range(): O(E_window) mask from sorted indices
4. update_mask_incremental(): O(ΔE) incremental mask update for sliding windows

Performance on 1M edges:
- Binary search: 0.09ms
- Set mask from range: 0.03ms
- Incremental update: 0.025ms

These primitives enable efficient window-based temporal sampling
without graph reconstruction overhead. For 300M edges:
- Expected binary search: ~0.1ms
- Expected incremental update (1-day step): ~10ms vs 300+ms Python rebuild

Unit tests: 6/6 passed

References: CUDA Programming Guide sections on parallel primitives,
cooperative groups, and thrust algorithms.
Changes:
1. Remove edge mask restriction in temporal_sampling_impl.hpp
   - Was: CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), ...)
   - Now: Supports graphs with pre-attached edge masks (for window filtering)

2. Add windowed_temporal_sampling_impl.hpp
   - New wrapper combining B/C window filtering with D inline temporal filter
   - window_state_t: State for incremental window updates
   - initialize_window_state(): One-time O(E log E) sort for efficient updates
   - set_window_mask(): O(log E) binary search + O(E_window) mask set
   - update_window_mask_incremental(): O(ΔE) incremental update
   - windowed_temporal_neighbor_sample_impl(): Main entry point

3. Add OPTIMIZATION_PROPOSAL_B_C_D_HASH.md
   - Documents approach for B+C+D integration
   - Documents hash table bottleneck and attempted fixes

Performance expectations (from C++ unit tests on 1M edges):
- Binary search: 0.09ms
- Set mask from range: 0.03ms
- Incremental update: 0.025ms

References: CUDA Programming Guide - Cooperative Groups, Thrust algorithms
1. key_store_cg.cuh: New CG-compatible key store with CG size = 4
   - Enables parallel probing for hash table operations
   - Includes alternative deduplicate_sort_unique() for smaller frontiers

2. OPTIMIZATION_PROPOSAL_B_C_D_HASH.md: Updated with results
   - Documents 2.65x speedup achieved (42.67ms → 16.09ms)
   - Analyzes remaining hash table bottleneck (19.2% of GPU time)
   - Notes CG size increase requires invasive changes to 15+ files
   - Proposes alternatives: binary search mode, hybrid approach

Note: Hash table optimization still valuable for trillion-edge graphs
where current 19.2% relative time could become absolute bottleneck.
New files:

1. prims/key_store_cg.cuh:
   - key_store_cg_t: CG size = 4 for parallel probing
   - deduplicate_hybrid(): Chooses algorithm based on size
   - deduplicate_sort_unique(): O(n log n) alternative

2. sampling/detail/renumber_cg.cuh:
   - renumber_cg_store_t: CG-optimized hash table for renumbering
   - renumber_sort_based_t: Sort + binary search alternative
   - RenumberStrategy enum for algorithm selection

For trillion-edge graphs:
- CG size 4 enables 2-4x faster hash probing
- Sort-based approach for memory-constrained scenarios
- Auto-selection based on dataset size

Note: Full integration requires modifying sampling_post_processing_impl.cuh
to use renumber_cg_store_t instead of kv_store_t<..., false>.

References: CUDA Programming Guide - Cooperative Groups
Changes:
1. sampling_post_processing_impl.cuh:
   - Added include for detail/renumber_cg.cuh
   - Replaced kv_store_t<vertex_t, vertex_t, false> with
     detail::renumber_cg_store_t for renumbering lookups
   - CG size = 4 for parallel probing

2. detail/renumber_cg.cuh:
   - Added missing #include <thrust/distance.h>

Build: SUCCESS - all sampling tests pass
Note: The cuco::insert_if_n in nsys profile still shows CG size = 1,
indicating other hash table usages exist in the sampling flow that
need to be migrated to CG-compatible versions.

Next steps:
- Identify all hash table usages in temporal sampling flow
- Create CG-compatible versions for each
- Profile to confirm CG size = 4 is being used
Analysis from nsys profile of optimization_full_BCD_profile.nsys-rep:

Hash table kernel details:
- insert_if_n<(int)1, (int)128>: CG=1, block=128, grid=78125
- Total keys per call: ~10M
- Avg execution time: 8.4ms per call

Load factor analysis:
- cuGraph uses 0.7 (70%) load factor
- At 70% load, avg probe distance = 1/(1-0.7) ≈ 3.3 slots

CG size trade-offs:
| CG | Probes/iter | Avg iters | Warp groups |
|----|-------------|-----------|-------------|
| 1  | 1           | 4         | 32          |
| 4  | 4           | 1         | 8           |
| 8  | 8           | 1         | 4           |

Why CG=4 is optimal:
1. Matches cuco default (static_map/static_set)
2. 4 parallel probes find keys in 1 iteration at 70% load
3. 8 groups per warp = good SM occupancy
4. Memory coalescing: 4 consecutive slots probed together
5. cuco docs: "significant boost at moderate-to-high load factors"

Expected speedup: 2-4x on hash table kernel (8.4ms → 2-4ms)
Use compute_number_of_edges(handle) instead of number_of_edges() which
is not available for single-GPU graph_view_t. This fixes the windowed
temporal benchmark compilation.

Benchmark results confirmed:
- C++ B+C+D: 11.52ms
- Python B+C+D: 16.09ms
- Python overhead: 4.57ms (39.7%)
Add new C API function cugraph_homogeneous_uniform_temporal_neighbor_sample_windowed
with window_start and window_end parameters that enable B+C+D optimizations:
  - B: O(log E) binary search for window bounds
  - C: O(ΔE) incremental window updates
  - D: Inline temporal filtering during sampling

Python API changes:
  - Add window_start and window_end optional parameters to
    homogeneous_uniform_temporal_neighbor_sample()
  - When both provided, calls the optimized windowed C API
  - Backward compatible: existing code works unchanged

Benchmark results:
  - C++ B+C+D: 11.52ms
  - Python D only: 16.09ms
  - Expected improvement: ~29% faster with windowed API
- Add _convert_timestamp_to_int() helper that handles:
  - int/np.integer: passed through unchanged
  - str: parsed via pd.Timestamp (e.g., "2024-01-15")
  - datetime: Python datetime objects
  - pd.Timestamp: pandas Timestamp objects
  - np.datetime64: numpy datetime64

- Add window_time_unit parameter (default 's'):
  - 'ns': nanoseconds
  - 'us': microseconds
  - 'ms': milliseconds
  - 's': seconds

- Add validation: window_end must be > window_start

Example usage:
  result = homogeneous_uniform_temporal_neighbor_sample(
      ...
      window_start="2024-01-01",
      window_end="2024-01-02",
      window_time_unit='s',
  )
The file includes windowed_temporal_sampling_impl.hpp which uses
thrust device operations. These require NVCC compilation, not g++.

Renaming to .cu ensures proper CUDA backend dispatch for thrust.

Benchmark results after fix:
- Medium (1M vertices): 5.7ms
- Large (8M vertices): 12.9ms
Tests cover:
- Window parameters filter edges correctly
- Narrow windows return fewer edges
- Backward compatibility (no window = standard path)
- Timestamp formats: int, numpy, string, datetime, pd.Timestamp, np.datetime64
- Different time units (ns, us, ms, s)
- Validation errors

Also includes profiling script for nsys analysis.

All 13 tests passing.
@esnvidia esnvidia requested review from a team as code owners January 13, 2026 05:34
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot bot commented Jan 13, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@esnvidia
Copy link
Copy Markdown
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant