Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions pip/pip-448.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# PIP-448: Topic-level Delayed Message Tracker for Memory Optimization

# Background knowledge

In Apache Pulsar, **Delayed Message Delivery** allows producers to specify a delay for a message, ensuring it is not delivered to any consumer until the specified time has passed. This is a useful feature for implementing tasks like scheduled reminders or retry mechanisms with backoff.

Currently, the default mechanism for handling delayed messages is the **In-Memory Delayed Delivery Tracker** (`InMemoryDelayedDeliveryTracker`). This tracker is instantiated on a *per-subscription* basis within the broker. When a topic has multiple subscriptions (e.g., in a shared subscription model), each subscription gets its own independent `InMemoryDelayedDeliveryTracker` instance.

The consequence of this design is that if a delayed message is published to a topic with 'N' subscriptions, that message's metadata (its position: ledgerId and entryId) is stored 'N' times in the broker's memory, once for each subscription's tracker. This leads to significant memory overhead, especially for topics with a large number of subscriptions, as the memory usage scales linearly with the number of subscriptions.

The **`DelayedDeliveryTrackerFactory`** is responsible for creating these tracker instances whenever a new subscription dispatcher is initialized.

# Motivation

The primary motivation for this proposal is to address the high memory consumption caused by the current per-subscription delayed message tracking mechanism. For topics with hundreds or thousands of subscriptions, the memory footprint for delayed messages becomes prohibitively large. Each delayed message's position is duplicated across every subscription's tracker, leading to a memory usage pattern of `O(num_delayed_messages * num_subscriptions)`.

This excessive memory usage can cause:
* Increased memory pressure on Pulsar brokers.
* More frequent and longer Garbage Collection (GC) pauses, impacting broker performance.
* Potential OutOfMemoryErrors, leading to broker instability.
* Limited scalability for use cases that rely on many subscriptions per topic, such as IoT or large-scale microservices with shared subscriptions.

By optimizing the delayed message tracking to be more memory-efficient, we can enhance broker stability and scalability, allowing Pulsar to better support these critical use cases.

# Goals

## In Scope
* Introduce a topic-level delayed message index that is shared across all subscriptions of a single topic. This will store each delayed message's position only once.
* Significantly reduce the memory footprint for delayed message handling, changing the memory complexity from `O(num_delayed_messages * num_subscriptions)` to `O(num_delayed_messages)`.
* Maintain the existing `DelayedDeliveryTracker` interface to ensure seamless integration with the existing dispatcher logic, requiring no changes to the dispatcher's core message delivery flow.
* Make this new topic-level tracker the default in-memory implementation, replacing the legacy per-subscription tracker.

## Out of Scope
* This proposal does not modify the persistent, bucket-based delayed delivery tracker (`BucketDelayedDeliveryTracker`). The scope is limited to the in-memory implementation.
* No changes will be made to the public-facing client APIs, REST APIs, or the wire protocol for producing or consuming delayed messages. This is a broker-internal optimization.
* Modifying the semantics of delayed message delivery. The user-facing behavior will remain identical.

# High Level Design

The core idea of this proposal is to shift from a per-subscription delayed message tracker to a shared, topic-level tracker. This will be achieved by introducing two new components: a `TopicDelayedDeliveryTrackerManager` and an `InMemoryTopicDelayedDeliveryTrackerView`.

1. **Shared Topic-Level Manager**: For each topic, a single `InMemoryTopicDelayedDeliveryTrackerManager` instance will be created. This manager will own and maintain a global index of all delayed messages for that topic. The index will store each message's position just once, keyed by its delivery timestamp.

2. **Per-Subscription View**: The dispatcher for each subscription will no longer get a full, independent tracker. Instead, it will receive an `InMemoryTopicDelayedDeliveryTrackerView` object. This view implements the `DelayedDeliveryTracker` interface but acts as a lightweight adapter or proxy to the shared `TopicDelayedDeliveryTrackerManager`. It maintains per-subscription state, such as the `markDeletePosition`, but all core operations (adding messages, retrieving scheduled messages) are delegated to the shared manager.

3. **Factory and Lifecycle Management**: The `InMemoryDelayedDeliveryTrackerFactory` will be updated to manage the lifecycle of these new topic-level managers. It will maintain a cache of managers keyed by topic name. When a tracker is requested for a subscription:
* If a manager for that topic already exists, it is retrieved from the cache.
* If not, a new manager is created and stored in the cache.
* The manager then creates a new `View` object for the requesting subscription.
* When a subscription is closed, it unregisters itself from the manager. When the last subscription for a topic is closed, the manager cleans up its resources and is removed from the factory's cache.

This architectural change can be described as follows:

* **Before this change:** For a single topic, each subscription (e.g., Sub1, Sub2, Sub3) maintained its own complete `InMemoryDelayedDeliveryTracker` instance. If a delayed message was sent to the topic, its metadata would be stored independently in the tracker for Sub1, the tracker for Sub2, and the tracker for Sub3, causing data duplication.

* **After this change:** For a single topic, there is only one central `InMemoryTopicDelayedDeliveryTrackerManager` instance that holds a shared index of all delayed messages. Each subscription (Sub1, Sub2, Sub3) receives a lightweight `View` object. All these views point to and interact with the single, shared manager, eliminating data duplication.

The manager will handle pruning of acknowledged messages from the shared index by tracking the `markDeletePosition` of all active subscriptions and only removing messages that have been acknowledged by *all* of them (i.e., cleaning up to the minimum `markDeletePosition`).

# Detailed Design

## Design & Implementation Details

### `TopicDelayedDeliveryTrackerManager` (New Interface)
This new interface defines the contract for a topic-level delayed delivery manager.
```java
public interface TopicDelayedDeliveryTrackerManager extends AutoCloseable {
// Creates a subscription-specific view
DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher);
// Unregisters a subscription
void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher);
// Updates tick time
void onTickTimeUpdated(long newTickTimeMillis);
// Topic-level stats
long topicBufferMemoryBytes();
long topicDelayedMessages();
}
```

### `InMemoryTopicDelayedDeliveryTrackerManager` (New Class)
This is the main implementation of the topic-level manager.
* **Data Structure**: The core index is a `ConcurrentSkipListMap<Long, Long2ObjectRBTreeMap<Roaring64Bitmap>>`. This structure maps a delivery timestamp to a map of `ledgerId -> Roaring64Bitmap` of `entryId`s. This is highly efficient for storing and querying message positions.
* **Subscription Context (`SubContext`)**: A nested static class holds per-subscription state, primarily the `dispatcher` reference and the latest `markDeletePosition`. The manager maintains a `ConcurrentHashMap<String, SubContext>` to track all active subscriptions.
* **Message Addition**: When a message is added via a subscription's `View`, it's inserted into the shared index. Duplicates (same message added by different subscriptions) are inherently handled by the `Roaring64Bitmap` which acts as a set.
* **Message Retrieval**: When a `View` requests scheduled messages, the manager queries the shared index for messages ready for delivery. It then filters these results against that specific subscription's `markDeletePosition` to ensure it only returns messages that have not yet been acknowledged by that subscription.
* **Pruning**: The manager periodically prunes the index. It calculates the minimum `markDeletePosition` across all registered subscriptions. Any message with a position less than or equal to this minimum position is safe to remove from the shared index.
* **Lifecycle**: The manager is created by the factory for the first subscription on a topic. It is destroyed and its resources are released when the last subscription is closed, which is triggered by an `onEmptyCallback` from the factory.

### `InMemoryTopicDelayedDeliveryTrackerView` (New Class)
This class implements the `DelayedDeliveryTracker` interface, making it compatible with the dispatcher.
* **Role**: It acts as a lightweight proxy, holding a reference to the shared `InMemoryTopicDelayedDeliveryTrackerManager` and its own `SubContext`.
* **Operations**: All `DelayedDeliveryTracker` method calls (e.g., `addMessage`, `getScheduledMessages`) are forwarded to the shared manager, passing along its `SubContext` so the manager can perform operations in the correct per-subscription context.
* **`updateMarkDeletePosition`**: A new method is added to the view, called by the dispatcher when the cursor moves. This provides an efficient, event-driven way to keep the manager's `SubContext` up-to-date with the latest `markDeletePosition`.

### `InMemoryDelayedDeliveryTrackerFactory` (Modified)
The factory's logic is changed to manage the lifecycle of topic-level managers.
* **Cache**: It maintains a `ConcurrentMap<String, TopicDelayedDeliveryTrackerManager> topicManagers`.
* **`newTracker` method**:
1. It uses `topicManagers.computeIfAbsent()` to atomically get or create an `InMemoryTopicDelayedDeliveryTrackerManager` for the given topic.
2. The `onEmptyCallback` provided during creation ensures that when the manager becomes empty (last subscription closes), it is removed from the `topicManagers` cache.
3. It then calls `manager.createOrGetView(dispatcher)` to get a subscription-specific view.
4. It returns this view to the dispatcher.

### `PersistentDispatcherMultipleConsumers` (Modified)
A new method is introduced to enable event-driven updates.
* **`markDeletePositionMoveForward()`**: This new method is called by the cursor when its `markDeletePosition` advances. Inside this method, it checks if the active tracker is an `InMemoryTopicDelayedDeliveryTrackerView` and, if so, calls `view.updateMarkDeletePosition()`. This pushes the latest `markDeletePosition` to the tracker view immediately, allowing for more timely pruning decisions in the shared manager.

## Public-facing Changes

This proposal involves a broker-internal optimization. There are **no changes** to any public-facing components.

### Public API
No changes.

### Binary protocol
No changes.

### Configuration
No changes.

### CLI
No changes.

### Metrics
No changes. Existing metrics related to delayed messages will continue to function, but their values at the subscription level will now be derived from the shared topic-level manager.

# Monitoring

While no new metrics are introduced, the effects of this change can be monitored through existing broker metrics:
* **JVM Heap Memory**: Monitor the broker's JVM heap usage (`jvm_memory_bytes_used{area="heap"}`). For brokers hosting topics with many subscriptions and delayed messages, a significant reduction in heap memory usage and a more stable memory footprint should be observed.
* **Garbage Collection**: Monitor JVM GC metrics (`jvm_gc_collection_seconds_count`, `jvm_gc_collection_seconds_sum`). A reduction in memory pressure should lead to fewer and shorter GC pauses, improving overall broker stability and performance.
* Users can compare memory usage before and after upgrading to a version with this change to quantify the improvement.

# Security Considerations

This proposal refactors an internal component of the broker and does not introduce any new public APIs, endpoints, or protocol commands. The security model remains unchanged.

The design ensures data isolation between subscriptions. When a subscription requests scheduled messages, the `InMemoryTopicDelayedDeliveryTrackerManager` filters the results based on that specific subscription's `markDeletePosition`. This prevents one subscription from accessing or being affected by the acknowledgment state of another. Therefore, multi-tenancy guarantees are preserved, and the change does not introduce any new security risks.

# Backward & Forward Compatibility

## Upgrade
The change is fully backward compatible. Upgrading a broker to a version containing this feature requires no special steps. Upon restart, the `InMemoryDelayedDeliveryTrackerFactory` will begin creating the new topic-level managers instead of the old per-subscription trackers. This is a drop-in replacement.

## Downgrade / Rollback
A downgrade is seamless. The change does not alter any on-disk data formats or persistent metadata. If a broker is rolled back to a previous version, it will simply revert to using the original `InMemoryDelayedDeliveryTracker` on a per-subscription basis.

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
Delayed message delivery is a broker-local feature that does not directly interact with the geo-replication mechanism. Therefore, this change has no impact on geo-replication, and no special considerations are needed for upgrade or downgrade in a replicated environment.

# Alternatives

Comment thread
Denovo1998 marked this conversation as resolved.
Outdated
# General Notes
This implementation effectively replaces the legacy `InMemoryDelayedDeliveryTracker` as the default in-memory strategy. The factory no longer instantiates the old class. This provides a significant performance and scalability improvement for a common Pulsar use case.

# Links

* Mailing List discussion thread:
* Mailing List voting thread:
Comment thread
Denovo1998 marked this conversation as resolved.
Outdated