diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index b637aec71792..6cb70f88df4b 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -74,6 +74,7 @@ Guidelines for modifications: * Gary Lvov * Giulio Romualdi * Grzegorz Malczyk +* Haixuan Xavier Tao * Haoran Zhou * Harsh Patel * HoJin Jeon diff --git a/source/isaaclab/config/extension.toml b/source/isaaclab/config/extension.toml index 48c9f4d59d44..cc88c6861801 100644 --- a/source/isaaclab/config/extension.toml +++ b/source/isaaclab/config/extension.toml @@ -1,7 +1,7 @@ [package] # Note: Semantic Versioning is used: https://semver.org/ -version = "0.54.3" +version = "0.54.4" # Description title = "Isaac Lab framework for Robot Learning" diff --git a/source/isaaclab/docs/CHANGELOG.rst b/source/isaaclab/docs/CHANGELOG.rst index 4b20c566edec..32661c8f67b3 100644 --- a/source/isaaclab/docs/CHANGELOG.rst +++ b/source/isaaclab/docs/CHANGELOG.rst @@ -1,6 +1,26 @@ Changelog --------- +0.54.4 (2026-04-24) +~~~~~~~~~~~~~~~~~~~ + +Changed +^^^^^^^ + +* Removed per-call GPU→CPU synchronizations from + :class:`~isaaclab.utils.buffers.CircularBuffer` by replacing the + ``torch.any(...)`` probes in ``append`` and ``__getitem__`` with a + CPU-side flag maintained by :meth:`reset`. Also removed a redundant + ``.clone()`` from :meth:`~isaaclab.utils.buffers.DelayBuffer.compute` + (the underlying advanced-indexing gather already allocates fresh + storage). Public API and first-push replication semantics are + unchanged; on CUDA at large ``num_envs`` this yields a meaningful + speedup for consumers that call the delay buffer every physics step + (e.g. :class:`~isaaclab.actuators.DelayedPDActuator`, + :class:`~isaaclab.actuators.RemotizedPDActuator`, and observation + history buffers). + + 0.54.3 (2026-02-04) ~~~~~~~~~~~~~~~~~~~ diff --git a/source/isaaclab/isaaclab/utils/buffers/circular_buffer.py b/source/isaaclab/isaaclab/utils/buffers/circular_buffer.py index c5c9fe9ff6ad..8dbec5700964 100644 --- a/source/isaaclab/isaaclab/utils/buffers/circular_buffer.py +++ b/source/isaaclab/isaaclab/utils/buffers/circular_buffer.py @@ -47,6 +47,13 @@ def __init__(self, max_len: int, batch_size: int, device: str): # the actual buffer for data storage # note: this is initialized on the first call to :meth:`append` self._buffer: torch.Tensor = None # type: ignore + # CPU-side flag that mirrors ``any(self._num_pushes == 0)`` without + # requiring a GPU→CPU synchronization in the hot path. It flips True + # whenever :meth:`reset` marks any batch index for first-push + # replication, and clears when the next :meth:`append` performs that + # replication. Reads in :meth:`__getitem__` use this to raise the same + # "buffer empty" error as before but without a per-call sync. + self._any_first_push_pending: bool = False """ Properties. @@ -99,6 +106,12 @@ def reset(self, batch_ids: Sequence[int] | None = None): Args: batch_ids: Elements to reset in the batch dimension. Default is None, which resets all the batch indices. """ + # A zero-length ``batch_ids`` is a no-op: nothing to zero, and raising + # ``_any_first_push_pending`` would spuriously block the next read + # (the previous ``torch.any(num_pushes == 0)`` guard was immune to + # this because it probed the actual tensor state). + if batch_ids is not None and len(batch_ids) == 0: + return # resolve all indices if batch_ids is None: batch_ids = slice(None) @@ -108,6 +121,10 @@ def reset(self, batch_ids: Sequence[int] | None = None): # set buffer at batch_id reset indices to 0.0 so that the buffer() # getter returns the cleared circular buffer after reset. self._buffer[:, batch_ids, :] = 0.0 + # mark that at least one batch index now has ``num_pushes == 0`` so + # the next :meth:`append` performs the first-push history replication + # and :meth:`__getitem__` rejects reads until that append happens. + self._any_first_push_pending = True def append(self, data: torch.Tensor): """Append the data to the circular buffer. @@ -129,14 +146,28 @@ def append(self, data: torch.Tensor): if self._buffer is None: self._pointer = -1 self._buffer = torch.empty((self.max_length, *data.shape), dtype=data.dtype, device=self._device) + # the buffer was just created, so every batch index starts with + # ``num_pushes == 0`` and must be replicated on this first append + self._any_first_push_pending = True # move the head to the next slot self._pointer = (self._pointer + 1) % self.max_length # add the new data to the last layer self._buffer[self._pointer] = data - # Check for batches with zero pushes and initialize all values in batch to first append - is_first_push = self._num_pushes == 0 - if torch.any(is_first_push): - self._buffer[:, is_first_push] = data[is_first_push] + # Check for batches with zero pushes and initialize all values in + # batch to first append. The CPU flag ``_any_first_push_pending`` + # mirrors ``torch.any(num_pushes == 0)`` but is maintained by + # :meth:`reset` and cleared here, so we avoid a GPU→CPU sync every + # append in the common case where no batch just reset. + if self._any_first_push_pending: + is_first_push = self._num_pushes == 0 + # Broadcast-safe write that works for arbitrary trailing data + # shape. Equivalent to ``self._buffer[:, is_first_push] = + # data[is_first_push]`` but without materializing the dynamic + # boolean index (which would reintroduce a sync on some torch + # versions via shape inference). + mask = is_first_push.view(1, -1, *([1] * (data.ndim - 1))) + self._buffer = torch.where(mask, data.unsqueeze(0), self._buffer) + self._any_first_push_pending = False # increment number of number of pushes for all batches self._num_pushes += 1 @@ -160,8 +191,11 @@ def __getitem__(self, key: torch.Tensor) -> torch.Tensor: # check the batch size if len(key) != self.batch_size: raise ValueError(f"The argument 'key' has length {key.shape[0]}, while expecting {self.batch_size}") - # check if the buffer is empty - if torch.any(self._num_pushes == 0) or self._buffer is None: + # check if the buffer is empty — equivalent to + # ``torch.any(self._num_pushes == 0)`` but sync-free: the CPU flag + # flips True in :meth:`reset` (or on buffer construction) and back to + # False when :meth:`append` has filled every reset index's history. + if self._any_first_push_pending or self._buffer is None: raise RuntimeError("Attempting to retrieve data on an empty circular buffer. Please append data first.") # admissible lag diff --git a/source/isaaclab/isaaclab/utils/buffers/delay_buffer.py b/source/isaaclab/isaaclab/utils/buffers/delay_buffer.py index dd1a1ef72684..66fe38bd6e10 100644 --- a/source/isaaclab/isaaclab/utils/buffers/delay_buffer.py +++ b/source/isaaclab/isaaclab/utils/buffers/delay_buffer.py @@ -173,6 +173,9 @@ def compute(self, data: torch.Tensor) -> torch.Tensor: """ # add the new data to the last layer self._circular_buffer.append(data) - # return output - delayed_data = self._circular_buffer[self._time_lags] - return delayed_data.clone() + # ``CircularBuffer.__getitem__`` uses advanced indexing + # (``self._buffer[index_in_buffer, self._ALL_INDICES]``), which + # already allocates a fresh storage. Returning the gather result + # directly is safe — consumers that mutate it in place won't touch + # the internal buffer — and skips one ``(batch, *feat)`` copy per call. + return self._circular_buffer[self._time_lags] diff --git a/source/isaaclab/test/utils/test_circular_buffer.py b/source/isaaclab/test/utils/test_circular_buffer.py index 52a2c16829d8..286aca97e3d1 100644 --- a/source/isaaclab/test/utils/test_circular_buffer.py +++ b/source/isaaclab/test/utils/test_circular_buffer.py @@ -154,6 +154,76 @@ def test_key_greater_than_pushes(circular_buffer): assert torch.equal(retrieved_data, data1) +def test_empty_batch_ids_reset_is_noop(circular_buffer): + """``reset(batch_ids=[])`` must leave the buffer readable. + + Regression test for the CPU-flag path: a zero-length ``batch_ids`` must + not flip ``_any_first_push_pending`` (nothing was actually zeroed). The + previous ``torch.any(num_pushes == 0)`` probe was immune because it read + the real tensor state; the flag-based guard must match that behavior to + avoid spuriously raising on callers that pass empty done-env lists. + """ + data = torch.ones((circular_buffer.batch_size, 2), device=circular_buffer.device) + circular_buffer.append(data) + circular_buffer.append(data) + circular_buffer.reset(batch_ids=[]) + # Must not raise — all batch indices still have num_pushes > 0. + retrieved = circular_buffer[torch.tensor([0, 0, 0], device=circular_buffer.device)] + torch.testing.assert_close(retrieved, data) + # Also accept a zero-length tensor (common when built from a termination mask). + circular_buffer.reset(batch_ids=torch.tensor([], dtype=torch.long, device=circular_buffer.device)) + retrieved = circular_buffer[torch.tensor([0, 0, 0], device=circular_buffer.device)] + torch.testing.assert_close(retrieved, data) + + +def test_partial_reset_then_read_raises(circular_buffer): + """``__getitem__`` must still raise after a partial reset without a follow-up append. + + Guards the CPU-side ``_any_first_push_pending`` flag that replaces the + previous per-call ``torch.any(num_pushes == 0)`` probe: the flag must be + set by :meth:`reset` and only cleared once :meth:`append` replicates the + first push, otherwise a reader could silently observe uninitialised slots. + """ + data = torch.ones((circular_buffer.batch_size, 2), device=circular_buffer.device) + circular_buffer.append(data) + circular_buffer.append(data) + circular_buffer.reset(batch_ids=[0]) + with pytest.raises(RuntimeError): + circular_buffer[torch.tensor([0, 0, 0], device=circular_buffer.device)] + + +def test_interleaved_partial_reset_and_append(circular_buffer): + """Cycle several partial resets + appends and verify each reset env's + history is fully replicated with its first post-reset sample (first-push + invariant) while untouched envs keep their real history.""" + d1 = torch.tensor([[1, 1], [2, 2], [3, 3]], device=circular_buffer.device) + d2 = 10 * d1 + d3 = 100 * d1 + + circular_buffer.append(d1) + circular_buffer.append(d2) + + # Partial reset env 0 then append d3. Env 0's entire history should now + # be d3[0]; envs 1, 2 should have history including d1, d2, d3. + circular_buffer.reset(batch_ids=[0]) + circular_buffer.append(d3) + for i in range(circular_buffer.max_length): + torch.testing.assert_close(circular_buffer.buffer[0, 0], circular_buffer.buffer[0, i]) + torch.testing.assert_close(circular_buffer.buffer[1, -1], d3[1]) + torch.testing.assert_close(circular_buffer.buffer[1, -2], d2[1]) + + # Now partial reset env 1 and append d1 again. Env 1's history should + # now be d1[1] everywhere; env 0 still keeps its d3 history (plus the new + # d1 append at the head), env 2 shifts normally. + circular_buffer.reset(batch_ids=[1]) + circular_buffer.append(d1) + for i in range(circular_buffer.max_length): + torch.testing.assert_close(circular_buffer.buffer[1, 0], circular_buffer.buffer[1, i]) + torch.testing.assert_close(circular_buffer.buffer[1, -1], d1[1]) + torch.testing.assert_close(circular_buffer.buffer[0, -1], d1[0]) + torch.testing.assert_close(circular_buffer.buffer[2, -1], d1[2]) + + def test_return_buffer_prop(circular_buffer): """Test retrieving the whole buffer for correct size and contents. Returning the whole buffer should have the shape [batch_size,max_len,data.shape[1:]]