Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
fc876b7
[feat][broker] Implement topic-level delayed delivery tracking with i…
Denovo1998 Oct 30, 2025
dee4363
feat[broker] Enhance InMemoryTopicDelayedDeliveryTrackerManager with …
Denovo1998 Nov 1, 2025
e455512
[feat][broker] Improve InMemoryTopicDelayedDeliveryTrackerManager wit…
Denovo1998 Nov 1, 2025
60f6bcd
feat[broker] Add pruning mechanism and improve timestamp handling in …
Denovo1998 Nov 1, 2025
2c30c94
[feat][broker] Refactor delayedMessageMap to use Long2ObjectRBTreeMap…
Denovo1998 Nov 1, 2025
6fd6ead
[feat][broker] Refactor mark-delete handling in InMemoryTopicDelayedD…
Denovo1998 Nov 1, 2025
6784c43
feat[broker] Simplify InMemoryTopicDelayedDeliveryTrackerManager by r…
Denovo1998 Nov 1, 2025
701ffbf
feat[broker] Add testing-friendly constructor and accessors to InMemo…
Denovo1998 Nov 1, 2025
c74a879
[feat][broker] Refactor timestamp handling and bucket logic in InMemo…
Denovo1998 Nov 1, 2025
e576c65
[feat][broker] Remove deprecated methods and simplify test annotation…
Denovo1998 Nov 1, 2025
938e4f6
[feat][broker] Replace highestDeliveryTimeTracked with AtomicLong for…
Denovo1998 Nov 1, 2025
e1f2c89
[feat][broker] Introduce InMemoryTopicDelayedDeliveryTrackerFactory f…
Denovo1998 Nov 2, 2025
32f2170
[feat][broker] Add configuration options for in-memory topic-level de…
Denovo1998 Nov 2, 2025
14fc81b
[feat][broker] Replace manual wait loops with Awaitility for pruning …
Denovo1998 Nov 2, 2025
23fbfb6
[feat][broker] Update documentation for InMemoryTopicDelayedDeliveryT…
Denovo1998 Nov 2, 2025
4aada39
feat[broker] Centralize mark-delete propagation to topic-level delaye…
Denovo1998 Nov 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Clock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -40,6 +43,34 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra

private long fixedDelayDetectionLookahead;

// Cache of topic-level managers: topic name -> manager instance
private final ConcurrentMap<String, TopicDelayedDeliveryTrackerManager> topicManagers = new ConcurrentHashMap<>();

public InMemoryDelayedDeliveryTrackerFactory() {

}

// Testing-friendly constructor and accessors
@VisibleForTesting
InMemoryDelayedDeliveryTrackerFactory(Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
}

@VisibleForTesting
int getCachedManagersSize() {
return topicManagers.size();
}

@VisibleForTesting
boolean hasManagerForTopic(String topicName) {
return topicManagers.containsKey(topicName);
}

@Override
public void initialize(PulsarService pulsarService) {
ServiceConfiguration config = pulsarService.getConfig();
Expand All @@ -54,7 +85,7 @@ public void initialize(PulsarService pulsarService) {
public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
try {
tracker = newTracker0(dispatcher);
} catch (Exception e) {
Expand All @@ -66,13 +97,35 @@ public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleCon
}

@VisibleForTesting
InMemoryDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
DelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();

// Get or create topic-level manager for this topic with onEmpty callback to remove from cache
final TopicDelayedDeliveryTrackerManager[] holder = new TopicDelayedDeliveryTrackerManager[1];
TopicDelayedDeliveryTrackerManager manager = topicManagers.computeIfAbsent(topicName, k -> {
InMemoryTopicDelayedDeliveryTrackerManager m = new InMemoryTopicDelayedDeliveryTrackerManager(
timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
fixedDelayDetectionLookahead, () -> topicManagers.remove(topicName, holder[0]));
holder[0] = m;
return m;
});
Comment thread
Denovo1998 marked this conversation as resolved.
Outdated
Comment thread
Denovo1998 marked this conversation as resolved.
Outdated

// Create a per-subscription view from the topic-level manager
return manager.createOrGetView(dispatcher);
}

@Override
public void close() {
// Close all topic-level managers
for (TopicDelayedDeliveryTrackerManager manager : topicManagers.values()) {
try {
manager.close();
} catch (Exception e) {
log.warn("Failed to close topic-level delayed delivery manager", e);
}
}
topicManagers.clear();

if (timer != null) {
timer.stop();
}
Expand Down
Loading
Loading