Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7afe161
Implementing delayed message cancellation in pulsar
Denovo1998 Jan 29, 2025
88ffdf9
feat(broker): Add support for canceling delayed messages in DelayedDe…
Denovo1998 Feb 8, 2025
212f174
add cancelDelayedMessage admin api.
Denovo1998 May 31, 2025
8686255
Merge branch 'master' into delay_msg_cancel
Denovo1998 May 31, 2025
6fe2b11
the cancel command does not need to be added to the sharedBucketPrior…
Denovo1998 Jun 1, 2025
dc079a6
fix test.
Denovo1998 Jun 1, 2025
f66f1b5
clean up useless canceledMessages.
Denovo1998 Jun 1, 2025
391a426
Implement the delayed message cancellation function through acknowled…
Denovo1998 Jun 8, 2025
d7f025c
feat(admin): add skipMessages by message IDs and remove cancelDelayed…
Denovo1998 Jul 17, 2025
5f170cc
use skipByMessageIds as the new path
Denovo1998 Jul 31, 2025
bce6ae3
Merge branch 'master' into delay_msg_cancel
Denovo1998 Nov 6, 2025
347e52f
feat: enhance skipMessages functionality to support batch indices
Denovo1998 Nov 9, 2025
a4911e5
feat: implement SkipMessageIdsRequest to support multiple formats for…
Denovo1998 Nov 9, 2025
2e77580
feat: refactor skipMessages to accept List<SkipEntry> and update rela…
Denovo1998 Nov 9, 2025
e7a988a
feat: add SkipEntry model for skipping messages with optional batch i…
Denovo1998 Nov 9, 2025
4613789
fix: update subscription description for clarity and add constructor …
Denovo1998 Nov 9, 2025
275a3cb
fix: remove unnecessary whitespace in SkipEntry.java
Denovo1998 Nov 9, 2025
a88ace7
Merge branch 'master' into delay_msg_cancel
Denovo1998 Feb 19, 2026
b5efe78
feat(broker): Support skipping messages by ID on partitioned topics
Denovo1998 Feb 19, 2026
fd17f4d
refactor(broker): Improve skipMessages API and test coverage
Denovo1998 Feb 24, 2026
c7df916
test(service): Remove unused import from BucketDelayedDeliveryTest
Denovo1998 Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import lombok.Getter;
import org.apache.pulsar.common.api.proto.MessageIdData;

/**
* Server-side request body for skipping messages by message IDs with support for multiple formats.
*/
@Getter
@JsonDeserialize(using = SkipMessageIdsRequest.Deserializer.class)
public class SkipMessageIdsRequest {
private final List<MessageIdItem> items = new ArrayList<>();

public SkipMessageIdsRequest() { }

private void addItem(long ledgerId, long entryId, Integer batchIndex) {
items.add(new MessageIdItem(ledgerId, entryId, batchIndex));
}

public record MessageIdItem(long ledgerId, long entryId, Integer batchIndex) {
public long getLedgerId() {
return ledgerId;
}

public long getEntryId() {
return entryId;
}

public Integer getBatchIndex() {
return batchIndex;
}
Comment on lines +52 to +62

Copilot AI Nov 9, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getter methods (getLedgerId, getEntryId, getBatchIndex) are redundant for a Java record, as records automatically generate these methods. These explicit definitions can be removed.

Suggested change
public long getLedgerId() {
return ledgerId;
}
public long getEntryId() {
return entryId;
}
public Integer getBatchIndex() {
return batchIndex;
}

Copilot uses AI. Check for mistakes.
}

public static class Deserializer extends JsonDeserializer<SkipMessageIdsRequest> {
@Override
public SkipMessageIdsRequest deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
ObjectCodec codec = p.getCodec();
JsonNode node = codec.readTree(p);
SkipMessageIdsRequest r = new SkipMessageIdsRequest();

if (node == null || node.isNull()) {
throw new IOException("Invalid skipByMessageIds payload: empty body");
}

if (node.isArray()) {
// Treat as default byteArray list
ArrayNode arr = (ArrayNode) node;
for (JsonNode idNode : arr) {
if (idNode != null && !idNode.isNull()) {
appendFromBase64(idNode.asText(), r);
}
}
return r;
}

if (node.isObject()) {
ObjectNode obj = (ObjectNode) node;
JsonNode typeNode = obj.get("type");
String type = typeNode != null && !typeNode.isNull() ? typeNode.asText() : null;
JsonNode messageIdsNode = obj.get("messageIds");

if (messageIdsNode != null) {
if (messageIdsNode.isArray()) {
ArrayNode arr = (ArrayNode) messageIdsNode;
if (type == null || type.isEmpty() || "byteArray".equalsIgnoreCase(type)) {
for (JsonNode idNode : arr) {
if (idNode != null && !idNode.isNull()) {
appendFromBase64(idNode.asText(), r);
}
}
} else if ("messageId".equalsIgnoreCase(type)) {
for (JsonNode idObj : arr) {
if (idObj == null || idObj.isNull()) {
continue;
}
long ledgerId = optLong(idObj.get("ledgerId"));
long entryId = optLong(idObj.get("entryId"));
int batchIndex = optInt(idObj.get("batchIndex"), -1);
if (batchIndex >= 0) {
r.addItem(ledgerId, entryId, batchIndex);
} else {
r.addItem(ledgerId, entryId, null);
}
}
} else {
// Unknown type with array payload => reject
throw new IOException("Invalid skipByMessageIds payload: unsupported type for array");
}
return r;
} else if (messageIdsNode.isObject()) {
// legacy map format is no longer supported
throw new IOException("Invalid skipByMessageIds payload: legacy map format is not supported");
} else {
throw new IOException("Invalid skipByMessageIds payload: unsupported messageIds type");
}
}

// No messageIds field => reject legacy map form
throw new IOException("Invalid skipByMessageIds payload: missing messageIds");
}

throw new IOException("Invalid skipByMessageIds payload: unsupported top-level JSON");
}

private static long optLong(JsonNode node) {
if (node == null || node.isNull()) {
return 0L;
}
try {
return node.asLong();
} catch (Exception e) {
return 0L;
}
}

private static int optInt(JsonNode node, int def) {
if (node == null || node.isNull()) {
return def;
}
try {
return node.asInt();
} catch (Exception e) {
return def;
}
}

private static void appendFromBase64(String base64, SkipMessageIdsRequest r)
throws IOException {
if (base64 == null) {
return;
}
byte[] data = Base64.getDecoder().decode(base64);
MessageIdData idData = new MessageIdData();
try {
idData.parseFrom(Unpooled.wrappedBuffer(data, 0, data.length), data.length);
} catch (Exception e) {
throw new IOException(e);
}
long ledgerId = idData.getLedgerId();
long entryId = idData.getEntryId();
int batchIndex = idData.hasBatchIndex() ? idData.getBatchIndex() : -1;
if (batchIndex >= 0) {
r.addItem(ledgerId, entryId, batchIndex);
} else {
r.addItem(ledgerId, entryId, null);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -73,6 +75,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.SkipMessageIdsRequest;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
Expand All @@ -81,6 +84,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import org.apache.pulsar.broker.service.GetStatsOptions;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.broker.service.SkipEntry;

Copilot AI Nov 9, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The imported class SkipEntry does not exist in the codebase and needs to be created. See the comment on PersistentSubscription.java for the required class structure.

Copilot uses AI. Check for mistakes.
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
Expand Down Expand Up @@ -1899,7 +1903,7 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName,
if (partitionMetadata.partitions > 0) {
String msg = "Skip messages on a partitioned topic is not allowed";
log.warn("[{}] {} {} {}", clientAppId(), msg, topicName, subName);
throw new RestException(Status.METHOD_NOT_ALLOWED, msg);
throw new RestException(Status.METHOD_NOT_ALLOWED, msg);
}
return getTopicReferenceAsync(topicName).thenCompose(t -> {
PersistentTopic topic = (PersistentTopic) t;
Expand Down Expand Up @@ -1948,6 +1952,155 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName,
});
}

protected void internalSkipByMessageIds(AsyncResponse asyncResponse, String subName, boolean authoritative,
SkipMessageIdsRequest messageIds) {
CompletableFuture<Void> validationFuture = validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
validationFuture = validationFuture.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
return CompletableFuture.completedFuture(null);
}
});
validationFuture.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (topicName.isPartitioned()) {
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
subName, authoritative);
} else {
if (partitionMetadata.partitions > 0) {
internalSkipByMessageIdsForPartitionedTopic(asyncResponse, partitionMetadata,
messageIds, subName);
} else {
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
subName, authoritative);
}
}
}).exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to ack messages on topic {}: {}", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private void internalSkipByMessageIdsForPartitionedTopic(AsyncResponse asyncResponse,
PartitionedTopicMetadata partitionMetadata,
SkipMessageIdsRequest messageIds,
String subName) {
final List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
}
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
// Rebuild an Admin API request using the parsed items to avoid legacy-map format
List<org.apache.pulsar.client.admin.SkipMessageIdsRequest.MessageIdItem> items = new ArrayList<>();
for (SkipMessageIdsRequest.MessageIdItem it : messageIds.getItems()) {
items.add(new org.apache.pulsar.client.admin.SkipMessageIdsRequest.MessageIdItem(
it.getLedgerId(), it.getEntryId(), it.getBatchIndex()));
}
org.apache.pulsar.client.admin.SkipMessageIdsRequest req =
org.apache.pulsar.client.admin.SkipMessageIdsRequest.forMessageIds(items);

futures.add(admin
.topics()
.skipMessagesAsync(topicNamePartition.toString(), subName, req));
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Route message IDs to their owning partitions instead of broadcasting

The partitioned-topic handler sends the entire list of message IDs to every partition (for (int i …) calling skipMessagesAsync on each). When the request contains IDs that belong to only one partition (the normal case), the other partitions try to ack ledger/entry pairs that are not in their backlog and the whole operation fails once any partition returns an error. The admin API therefore cannot successfully skip a message on a multi-partition topic. Group message IDs by partition (e.g. via TopicName partition parsing or managed ledger ownership) and only invoke skipMessagesAsync for the partitions that actually contain each ID.

Useful? React with 👍 / 👎.

Copilot AI Nov 9, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for handling partitioned topics appears incorrect. The same message IDs are sent to all partitions (loop on lines 2000-2014), but message IDs are partition-specific - a message ID from partition-0 won't exist in partition-1.

This will likely result in errors or no-op behavior on most partitions. Consider either:

  1. Requiring users to specify the partition explicitly for this operation (document that it doesn't work on partitioned topic names)
  2. Implementing logic to extract partition information from the message IDs if available
  3. Changing the API to accept partition-specific message ID mappings

Copilot uses AI. Check for mistakes.
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = FutureUtil.unwrapCompletionException(exception);
log.warn("[{}] Failed to ack messages on some partitions of {}: {}",
clientAppId(), topicName, t.getMessage());
resumeAsyncResponseExceptionally(asyncResponse, t);
} else {
log.info("[{}] Successfully requested cancellation for delayed message on"
+ " all partitions of topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}
return null;
});
}

private void internalSkipByMessageIdsForNonPartitionedTopic(AsyncResponse asyncResponse,
SkipMessageIdsRequest messageIds,
String subName,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(optTopic -> {
if (!(optTopic instanceof PersistentTopic persistentTopic)) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Cancel delayed message on a non-persistent"
+ " topic is not allowed");
}
log.info("[{}] Cancelling delayed message for subscription {} on topic {}", clientAppId(),
subName, topicName);
return internalSkipByMessageIdsForSubscriptionAsync(persistentTopic, subName, messageIds);
})
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
Throwable t = FutureUtil.unwrapCompletionException(ex);
if (isNot307And404Exception(t)) {
log.error("[{}] Error in internalSkipByMessageIdsForNonPartitionedTopic for {}: {}",
clientAppId(), topicName, t.getMessage(), t);
}
resumeAsyncResponseExceptionally(asyncResponse, t);
return null;
});
}

private CompletableFuture<Void> internalSkipByMessageIdsForSubscriptionAsync(
PersistentTopic topic, String subName, SkipMessageIdsRequest messageIds) {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topic.getName(), subName)));
}
// Build List<SkipEntry> from parsed items
Map<String, AggregatedSkip> aggregated = new LinkedHashMap<>();
for (SkipMessageIdsRequest.MessageIdItem it : messageIds.getItems()) {
long ledgerId = it.getLedgerId();
long entryId = it.getEntryId();
Integer batchIndex = it.getBatchIndex();
String key = ledgerId + ":" + entryId;
AggregatedSkip agg = aggregated.computeIfAbsent(key, k -> new AggregatedSkip(ledgerId, entryId));
if (batchIndex == null) {
agg.full = true;
} else {
agg.indexes.add(batchIndex);
}
}
List<SkipEntry> skipEntries = new ArrayList<>(aggregated.size());
for (AggregatedSkip v : aggregated.values()) {
if (v.full) {
skipEntries.add(new SkipEntry(v.ledgerId, v.entryId, null));
} else {
// sort indexes to have deterministic order
List<Integer> idx = new ArrayList<>(v.indexes);
Collections.sort(idx);
skipEntries.add(new SkipEntry(v.ledgerId, v.entryId, idx));
}
}
return sub.skipMessages(skipEntries);
}

private static final class AggregatedSkip {
final long ledgerId;
final long entryId;
boolean full = false;
final LinkedHashSet<Integer> indexes = new LinkedHashSet<>();

AggregatedSkip(long ledgerId, long entryId) {
this.ledgerId = ledgerId;
this.entryId = entryId;
}
}

protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResponse, int expireTimeInSeconds,
boolean authoritative) {
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES);
Expand Down
Loading
Loading