Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7afe161
Implementing delayed message cancellation in pulsar
Denovo1998 Jan 29, 2025
88ffdf9
feat(broker): Add support for canceling delayed messages in DelayedDe…
Denovo1998 Feb 8, 2025
212f174
add cancelDelayedMessage admin api.
Denovo1998 May 31, 2025
8686255
Merge branch 'master' into delay_msg_cancel
Denovo1998 May 31, 2025
6fe2b11
the cancel command does not need to be added to the sharedBucketPrior…
Denovo1998 Jun 1, 2025
dc079a6
fix test.
Denovo1998 Jun 1, 2025
f66f1b5
clean up useless canceledMessages.
Denovo1998 Jun 1, 2025
391a426
Implement the delayed message cancellation function through acknowled…
Denovo1998 Jun 8, 2025
d7f025c
feat(admin): add skipMessages by message IDs and remove cancelDelayed…
Denovo1998 Jul 17, 2025
5f170cc
use skipByMessageIds as the new path
Denovo1998 Jul 31, 2025
bce6ae3
Merge branch 'master' into delay_msg_cancel
Denovo1998 Nov 6, 2025
347e52f
feat: enhance skipMessages functionality to support batch indices
Denovo1998 Nov 9, 2025
a4911e5
feat: implement SkipMessageIdsRequest to support multiple formats for…
Denovo1998 Nov 9, 2025
2e77580
feat: refactor skipMessages to accept List<SkipEntry> and update rela…
Denovo1998 Nov 9, 2025
e7a988a
feat: add SkipEntry model for skipping messages with optional batch i…
Denovo1998 Nov 9, 2025
4613789
fix: update subscription description for clarity and add constructor …
Denovo1998 Nov 9, 2025
275a3cb
fix: remove unnecessary whitespace in SkipEntry.java
Denovo1998 Nov 9, 2025
a88ace7
Merge branch 'master' into delay_msg_cancel
Denovo1998 Feb 19, 2026
b5efe78
feat(broker): Support skipping messages by ID on partitioned topics
Denovo1998 Feb 19, 2026
fd17f4d
refactor(broker): Improve skipMessages API and test coverage
Denovo1998 Feb 24, 2026
c7df916
test(service): Remove unused import from BucketDelayedDeliveryTest
Denovo1998 Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1261,8 +1261,7 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscr
if (ex != null) {
log.warn("[{}] Failed to get list of subscriptions for {}: {}", clientAppId(),
topicName, ex.getMessage());
if (ex instanceof PulsarAdminException) {
PulsarAdminException pae = (PulsarAdminException) ex;
if (ex instanceof PulsarAdminException pae) {
if (pae.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Internal topics have not been generated yet"));
Expand Down Expand Up @@ -1899,7 +1898,7 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName,
if (partitionMetadata.partitions > 0) {
String msg = "Skip messages on a partitioned topic is not allowed";
log.warn("[{}] {} {} {}", clientAppId(), msg, topicName, subName);
throw new RestException(Status.METHOD_NOT_ALLOWED, msg);
throw new RestException(Status.METHOD_NOT_ALLOWED, msg);
}
return getTopicReferenceAsync(topicName).thenCompose(t -> {
PersistentTopic topic = (PersistentTopic) t;
Expand Down Expand Up @@ -1948,6 +1947,109 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName,
});
}

protected void internalSkipByMessageIds(AsyncResponse asyncResponse, String subName, boolean authoritative,
Map<String, String> messageIds) {
CompletableFuture<Void> validationFuture = validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName);
validationFuture = validationFuture.thenCompose(__ -> {
if (topicName.isGlobal()) {
return validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
return CompletableFuture.completedFuture(null);
}
});
validationFuture.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (topicName.isPartitioned()) {
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
subName, authoritative);
} else {
if (partitionMetadata.partitions > 0) {
internalSkipByMessageIdsForPartitionedTopic(asyncResponse, partitionMetadata,
messageIds, subName);
} else {
internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds,
subName, authoritative);
}
}
}).exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
log.error("[{}] Failed to ack messages on topic {}: {}", clientAppId(), topicName, ex);
Comment thread
Denovo1998 marked this conversation as resolved.
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private void internalSkipByMessageIdsForPartitionedTopic(AsyncResponse asyncResponse,
PartitionedTopicMetadata partitionMetadata,
Map<String, String> messageIds,
String subName) {
final List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
}
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
futures.add(admin
.topics()
.skipMessagesAsync(topicNamePartition.toString(), subName, messageIds));
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = FutureUtil.unwrapCompletionException(exception);
log.warn("[{}] Failed to ack messages on some partitions of {}: {}",
Comment thread
Denovo1998 marked this conversation as resolved.
Outdated
clientAppId(), topicName, t.getMessage());
resumeAsyncResponseExceptionally(asyncResponse, t);
} else {
log.info("[{}] Successfully requested cancellation for delayed message on"
+ " all partitions of topic {}", clientAppId(), topicName);
asyncResponse.resume(Response.noContent().build());
}
return null;
});
}

private void internalSkipByMessageIdsForNonPartitionedTopic(AsyncResponse asyncResponse,
Map<String, String> messageIds,
String subName,
boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(optTopic -> {
if (!(optTopic instanceof PersistentTopic persistentTopic)) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Cancel delayed message on a non-persistent"
+ " topic is not allowed");
}
log.info("[{}] Cancelling delayed message for subscription {} on topic {}", clientAppId(),
subName, topicName);
return internalSkipByMessageIdsForSubscriptionAsync(persistentTopic, subName, messageIds);
})
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
Throwable t = FutureUtil.unwrapCompletionException(ex);
if (isNot307And404Exception(t)) {
log.error("[{}] Error in internalSkipByMessageIdsForNonPartitionedTopic for {}: {}",
clientAppId(), topicName, t.getMessage(), t);
}
resumeAsyncResponseExceptionally(asyncResponse, t);
return null;
});
}

private CompletableFuture<Void> internalSkipByMessageIdsForSubscriptionAsync(
PersistentTopic topic, String subName, Map<String, String> messageIds) {
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topic.getName(), subName)));
}
return sub.skipMessages(messageIds);
}

protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResponse, int expireTimeInSeconds,
boolean authoritative) {
CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES);
Expand Down Expand Up @@ -2031,12 +2133,11 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
if (!(t instanceof PersistentTopic)) {
if (!(t instanceof PersistentTopic topic)) {
resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.METHOD_NOT_ALLOWED,
"Expire messages for all subscriptions on a non-persistent topic is not allowed"));
return;
}
PersistentTopic topic = (PersistentTopic) t;
final List<CompletableFuture<Void>> futures =
new ArrayList<>((int) topic.getReplicators().size());
List<String> subNames =
Expand Down Expand Up @@ -2815,7 +2916,7 @@ protected CompletableFuture<MessageId> internalGetMessageIdByTimestampAsync(long
}).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
if (!(topic instanceof PersistentTopic)) {
if (!(topic instanceof PersistentTopic persistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Get message ID by timestamp on a non-persistent topic is not allowed");
Expand Down Expand Up @@ -2946,13 +3047,12 @@ protected CompletableFuture<Response> internalExamineMessageAsync(String initial
return CompletableFuture.completedFuture(null);
}).thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> {
if (!(topic instanceof PersistentTopic)) {
if (!(topic instanceof PersistentTopic persistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Examine messages on a non-persistent topic is not allowed");
}
try {
PersistentTopic persistentTopic = (PersistentTopic) topic;
long totalMessage = persistentTopic.getNumberOfEntries();
if (totalMessage <= 0) {
throw new RestException(Status.PRECONDITION_FAILED,
Expand Down Expand Up @@ -3957,12 +4057,11 @@ private CompletableFuture<Void> internalExpireMessagesByTimestampForSinglePartit
getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
if (!(t instanceof PersistentTopic)) {
if (!(t instanceof PersistentTopic topic)) {
resultFuture.completeExceptionally(new RestException(Status.METHOD_NOT_ALLOWED,
"Expire messages on a non-persistent topic is not allowed"));
return;
}
PersistentTopic topic = (PersistentTopic) t;

final MessageExpirer messageExpirer;
if (subName.startsWith(topic.getReplicatorPrefix())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -633,6 +634,29 @@ public void skipMessages(@Suspended final AsyncResponse asyncResponse, @PathPara
}
}

@POST
@Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds")
@ApiOperation(hidden = true, value = "Skip messages on a topic subscription.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namesapce or topic or subscription does not exist") })
public void skipByMessageIds(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String encodedSubName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "The message ID to skip") Map<String, String> messageIds) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalSkipByMessageIds(asyncResponse, decode(encodedSubName), authoritative, messageIds);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/expireMessages/{expireTimeInSeconds}")
@ApiOperation(hidden = true, value = "Expire messages on a topic subscription.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,42 @@ public void skipMessages(
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds")
@ApiOperation(value = "Skipping messages on a topic subscription.")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace or topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Skipping messages on a partitioned topic is not allowed"),
Comment thread
Denovo1998 marked this conversation as resolved.
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void skipByMessageIds(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Name of subscription")
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "The message ID to skip") Map<String, String> messageIds) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalSkipByMessageIds(asyncResponse, decode(encodedSubName), authoritative, messageIds);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages/{expireTimeInSeconds}")
@ApiOperation(value = "Expiry messages on a topic subscription.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ default long getNumberOfEntriesDelayed() {

CompletableFuture<Void> skipMessages(int numMessagesToSkip);

CompletableFuture<Void> skipMessages(Map<String, String> messageIds);

CompletableFuture<Void> resetCursor(long timestamp);

CompletableFuture<Void> resetCursor(Position position);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ public CompletableFuture<Void> skipMessages(int numMessagesToSkip) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> skipMessages(Map<String, String> messageIds) {
// No-op
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> resetCursor(long timestamp) {
// No-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
Comment thread
Denovo1998 marked this conversation as resolved.
Outdated
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -50,6 +52,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.MutablePair;
Expand Down Expand Up @@ -809,6 +812,35 @@ public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

@Override
public CompletableFuture<Void> skipMessages(Map<String, String> messageIds) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Skipping messages by messageIds, current backlog {}", topicName, subName,
cursor.getNumberOfEntriesInBacklog(false));
}

if (Subscription.isCumulativeAckMode(getType())) {
return CompletableFuture.failedFuture(new NotAllowedException("Unsupported subscription type."));
}

List<Position> positions = new ArrayList<>();
for (Map.Entry<String, String> entry : messageIds.entrySet()) {
try {
long ledgerId = Long.parseLong(entry.getKey());
long entryId = Long.parseLong(entry.getValue());
Position position = PositionFactory.create(ledgerId, entryId);
positions.add(position);
} catch (Exception e) {
return CompletableFuture.failedFuture(new NotAllowedException("Invalid message ID."));
}
}

Map<String, Long> properties = Collections.emptyMap();
acknowledgeMessage(positions, AckType.Individual, properties);

return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> resetCursor(long timestamp) {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down
Loading
Loading