-
Notifications
You must be signed in to change notification settings - Fork 0
[improve][broker] PIP-423: Add a new admin API to acknowledge a single message #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7afe161
88ffdf9
212f174
8686255
6fe2b11
dc079a6
f66f1b5
391a426
d7f025c
5f170cc
bce6ae3
347e52f
a4911e5
2e77580
e7a988a
4613789
275a3cb
a88ace7
b5efe78
fd17f4d
c7df916
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pulsar.broker.admin; | ||
|
|
||
| import com.fasterxml.jackson.core.JsonParser; | ||
| import com.fasterxml.jackson.core.ObjectCodec; | ||
| import com.fasterxml.jackson.databind.DeserializationContext; | ||
| import com.fasterxml.jackson.databind.JsonDeserializer; | ||
| import com.fasterxml.jackson.databind.JsonNode; | ||
| import com.fasterxml.jackson.databind.annotation.JsonDeserialize; | ||
| import com.fasterxml.jackson.databind.node.ArrayNode; | ||
| import com.fasterxml.jackson.databind.node.ObjectNode; | ||
| import io.netty.buffer.Unpooled; | ||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Base64; | ||
| import java.util.List; | ||
| import lombok.Getter; | ||
| import org.apache.pulsar.common.api.proto.MessageIdData; | ||
|
|
||
| /** | ||
| * Server-side request body for skipping messages by message IDs with support for multiple formats. | ||
| */ | ||
| @Getter | ||
| @JsonDeserialize(using = SkipMessageIdsRequest.Deserializer.class) | ||
| public class SkipMessageIdsRequest { | ||
| private final List<MessageIdItem> items = new ArrayList<>(); | ||
|
|
||
| public SkipMessageIdsRequest() { } | ||
|
|
||
| private void addItem(long ledgerId, long entryId, Integer batchIndex) { | ||
| items.add(new MessageIdItem(ledgerId, entryId, batchIndex)); | ||
| } | ||
|
|
||
| public record MessageIdItem(long ledgerId, long entryId, Integer batchIndex) { | ||
| public long getLedgerId() { | ||
| return ledgerId; | ||
| } | ||
|
|
||
| public long getEntryId() { | ||
| return entryId; | ||
| } | ||
|
|
||
| public Integer getBatchIndex() { | ||
| return batchIndex; | ||
| } | ||
| } | ||
|
|
||
| public static class Deserializer extends JsonDeserializer<SkipMessageIdsRequest> { | ||
| @Override | ||
| public SkipMessageIdsRequest deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { | ||
| ObjectCodec codec = p.getCodec(); | ||
| JsonNode node = codec.readTree(p); | ||
| SkipMessageIdsRequest r = new SkipMessageIdsRequest(); | ||
|
|
||
| if (node == null || node.isNull()) { | ||
| throw new IOException("Invalid skipByMessageIds payload: empty body"); | ||
| } | ||
|
|
||
| if (node.isArray()) { | ||
| // Treat as default byteArray list | ||
| ArrayNode arr = (ArrayNode) node; | ||
| for (JsonNode idNode : arr) { | ||
| if (idNode != null && !idNode.isNull()) { | ||
| appendFromBase64(idNode.asText(), r); | ||
| } | ||
| } | ||
| return r; | ||
| } | ||
|
|
||
| if (node.isObject()) { | ||
| ObjectNode obj = (ObjectNode) node; | ||
| JsonNode typeNode = obj.get("type"); | ||
| String type = typeNode != null && !typeNode.isNull() ? typeNode.asText() : null; | ||
| JsonNode messageIdsNode = obj.get("messageIds"); | ||
|
|
||
| if (messageIdsNode != null) { | ||
| if (messageIdsNode.isArray()) { | ||
| ArrayNode arr = (ArrayNode) messageIdsNode; | ||
| if (type == null || type.isEmpty() || "byteArray".equalsIgnoreCase(type)) { | ||
| for (JsonNode idNode : arr) { | ||
| if (idNode != null && !idNode.isNull()) { | ||
| appendFromBase64(idNode.asText(), r); | ||
| } | ||
| } | ||
| } else if ("messageId".equalsIgnoreCase(type)) { | ||
| for (JsonNode idObj : arr) { | ||
| if (idObj == null || idObj.isNull()) { | ||
| continue; | ||
| } | ||
| if (!idObj.isObject()) { | ||
| throw new IOException("Invalid skipByMessageIds payload:" | ||
| + " messageIds elements must be objects"); | ||
| } | ||
| long ledgerId = requiredLong(idObj.get("ledgerId"), "ledgerId"); | ||
| long entryId = requiredLong(idObj.get("entryId"), "entryId"); | ||
| Integer batchIndex = optionalNonNegativeInt(idObj.get("batchIndex"), "batchIndex"); | ||
| r.addItem(ledgerId, entryId, batchIndex); | ||
| } | ||
| } else { | ||
| // Unknown type with array payload => reject | ||
| throw new IOException("Invalid skipByMessageIds payload: unsupported type for array"); | ||
| } | ||
| return r; | ||
| } else if (messageIdsNode.isObject()) { | ||
| throw new IOException("Invalid skipByMessageIds payload: messageIds must be an array"); | ||
| } else { | ||
| throw new IOException("Invalid skipByMessageIds payload: unsupported messageIds type"); | ||
| } | ||
| } | ||
|
|
||
| // No messageIds field => reject | ||
| throw new IOException("Invalid skipByMessageIds payload: missing messageIds"); | ||
| } | ||
|
|
||
| throw new IOException("Invalid skipByMessageIds payload: unsupported top-level JSON"); | ||
| } | ||
|
|
||
| private static long requiredLong(JsonNode node, String fieldName) throws IOException { | ||
| if (node == null || node.isNull()) { | ||
| throw new IOException("Invalid skipByMessageIds payload: missing " + fieldName); | ||
| } | ||
| try { | ||
| if (node.isNumber()) { | ||
| return node.longValue(); | ||
| } | ||
| if (node.isTextual()) { | ||
| return Long.parseLong(node.asText()); | ||
| } | ||
| } catch (Exception e) { | ||
| throw new IOException("Invalid skipByMessageIds payload: invalid " + fieldName, e); | ||
| } | ||
| throw new IOException("Invalid skipByMessageIds payload: invalid " + fieldName); | ||
| } | ||
|
|
||
| private static Integer optionalNonNegativeInt(JsonNode node, String fieldName) throws IOException { | ||
| if (node == null || node.isNull()) { | ||
| return null; | ||
| } | ||
| try { | ||
| int v; | ||
| if (node.isNumber()) { | ||
| v = node.intValue(); | ||
| } else if (node.isTextual()) { | ||
| v = Integer.parseInt(node.asText()); | ||
| } else { | ||
| throw new IOException("Invalid skipByMessageIds payload: invalid " + fieldName); | ||
| } | ||
| return v >= 0 ? v : null; | ||
| } catch (NumberFormatException e) { | ||
| throw new IOException("Invalid skipByMessageIds payload: invalid " + fieldName, e); | ||
| } | ||
| } | ||
|
|
||
| private static void appendFromBase64(String base64, SkipMessageIdsRequest r) | ||
| throws IOException { | ||
| if (base64 == null) { | ||
| return; | ||
| } | ||
| byte[] data; | ||
| try { | ||
| data = Base64.getDecoder().decode(base64); | ||
| } catch (IllegalArgumentException e) { | ||
| throw new IOException("Invalid skipByMessageIds payload: invalid base64 messageId", e); | ||
| } | ||
| if (data.length == 0) { | ||
| throw new IOException("Invalid skipByMessageIds payload: invalid base64 messageId (empty)"); | ||
| } | ||
| MessageIdData idData = new MessageIdData(); | ||
| try { | ||
| idData.parseFrom(Unpooled.wrappedBuffer(data, 0, data.length), data.length); | ||
| } catch (Exception e) { | ||
| throw new IOException(e); | ||
| } | ||
| long ledgerId = idData.getLedgerId(); | ||
| long entryId = idData.getEntryId(); | ||
| int batchIndex = idData.hasBatchIndex() ? idData.getBatchIndex() : -1; | ||
| if (batchIndex >= 0) { | ||
| r.addItem(ledgerId, entryId, batchIndex); | ||
| } else { | ||
| r.addItem(ledgerId, entryId, null); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,8 @@ | |
| import java.util.Base64; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.LinkedHashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
|
|
@@ -73,6 +75,7 @@ | |
| import org.apache.pulsar.broker.PulsarServerException; | ||
| import org.apache.pulsar.broker.PulsarService; | ||
| import org.apache.pulsar.broker.admin.AdminResource; | ||
| import org.apache.pulsar.broker.admin.SkipMessageIdsRequest; | ||
| import org.apache.pulsar.broker.authentication.AuthenticationDataSource; | ||
| import org.apache.pulsar.broker.authorization.AuthorizationService; | ||
| import org.apache.pulsar.broker.service.AnalyzeBacklogResult; | ||
|
|
@@ -81,6 +84,7 @@ | |
| import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition; | ||
| import org.apache.pulsar.broker.service.GetStatsOptions; | ||
| import org.apache.pulsar.broker.service.MessageExpirer; | ||
| import org.apache.pulsar.broker.service.SkipEntry; | ||
|
||
| import org.apache.pulsar.broker.service.Subscription; | ||
| import org.apache.pulsar.broker.service.Topic; | ||
| import org.apache.pulsar.broker.service.TopicPoliciesService; | ||
|
|
@@ -1915,7 +1919,7 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, | |
| if (partitionMetadata.partitions > 0) { | ||
| String msg = "Skip messages on a partitioned topic is not allowed"; | ||
| log.warn("[{}] {} {} {}", clientAppId(), msg, topicName, subName); | ||
| throw new RestException(Status.METHOD_NOT_ALLOWED, msg); | ||
| throw new RestException(Status.METHOD_NOT_ALLOWED, msg); | ||
| } | ||
| return getTopicReferenceAsync(topicName).thenCompose(t -> { | ||
| PersistentTopic topic = (PersistentTopic) t; | ||
|
|
@@ -1964,6 +1968,111 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, | |
| }); | ||
| } | ||
|
|
||
| protected void internalSkipByMessageIds(AsyncResponse asyncResponse, String subName, boolean authoritative, | ||
| SkipMessageIdsRequest messageIds) { | ||
| CompletableFuture<Void> validationFuture = validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName); | ||
| validationFuture = validationFuture.thenCompose(__ -> { | ||
| if (topicName.isGlobal()) { | ||
| return validateGlobalNamespaceOwnershipAsync(namespaceName); | ||
| } else { | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
| }); | ||
| validationFuture.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)) | ||
| .thenAccept(partitionMetadata -> { | ||
| if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) { | ||
| String msg = "Skip messages on a partitioned topic is not allowed"; | ||
| log.warn("[{}] {} {} {}", clientAppId(), msg, topicName, subName); | ||
| throw new RestException(Status.METHOD_NOT_ALLOWED, msg); | ||
| } | ||
| internalSkipByMessageIdsForNonPartitionedTopic(asyncResponse, messageIds, subName, authoritative); | ||
| }).exceptionally(ex -> { | ||
| if (isNot307And404Exception(ex)) { | ||
| log.error("[{}] Failed to ack messages on topic {}: {}", clientAppId(), topicName, ex); | ||
| } | ||
| resumeAsyncResponseExceptionally(asyncResponse, ex); | ||
| return null; | ||
| }); | ||
| } | ||
|
|
||
| private void internalSkipByMessageIdsForNonPartitionedTopic(AsyncResponse asyncResponse, | ||
| SkipMessageIdsRequest messageIds, | ||
| String subName, | ||
| boolean authoritative) { | ||
| validateTopicOwnershipAsync(topicName, authoritative) | ||
| .thenCompose(__ -> getTopicReferenceAsync(topicName)) | ||
| .thenCompose(optTopic -> { | ||
| if (optTopic == null) { | ||
| throw new RestException(Status.NOT_FOUND, | ||
| getTopicNotFoundErrorMessage(topicName.toString())); | ||
| } | ||
| if (!(optTopic instanceof PersistentTopic persistentTopic)) { | ||
| throw new RestException(Status.METHOD_NOT_ALLOWED, | ||
| "Skip messages on a non-persistent topic is not allowed"); | ||
| } | ||
| log.info("[{}] Skipping messages by messageIds for subscription {} on topic {}", clientAppId(), | ||
| subName, topicName); | ||
| return internalSkipByMessageIdsForSubscriptionAsync(persistentTopic, subName, messageIds); | ||
| }) | ||
| .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) | ||
| .exceptionally(ex -> { | ||
| Throwable t = FutureUtil.unwrapCompletionException(ex); | ||
| if (isNot307And404Exception(t)) { | ||
| log.error("[{}] Error in internalSkipByMessageIdsForNonPartitionedTopic for {}: {}", | ||
| clientAppId(), topicName, t.getMessage(), t); | ||
| } | ||
| resumeAsyncResponseExceptionally(asyncResponse, t); | ||
| return null; | ||
| }); | ||
| } | ||
|
|
||
| private CompletableFuture<Void> internalSkipByMessageIdsForSubscriptionAsync( | ||
| PersistentTopic topic, String subName, SkipMessageIdsRequest messageIds) { | ||
| Subscription sub = topic.getSubscription(subName); | ||
| if (sub == null) { | ||
| return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, | ||
| getSubNotFoundErrorMessage(topic.getName(), subName))); | ||
| } | ||
| // Build List<SkipEntry> from parsed items | ||
| Map<String, AggregatedSkip> aggregated = new LinkedHashMap<>(); | ||
| for (SkipMessageIdsRequest.MessageIdItem it : messageIds.getItems()) { | ||
| long ledgerId = it.getLedgerId(); | ||
| long entryId = it.getEntryId(); | ||
| Integer batchIndex = it.getBatchIndex(); | ||
| String key = ledgerId + ":" + entryId; | ||
| AggregatedSkip agg = aggregated.computeIfAbsent(key, k -> new AggregatedSkip(ledgerId, entryId)); | ||
| if (batchIndex == null) { | ||
| agg.full = true; | ||
| } else { | ||
| agg.indexes.add(batchIndex); | ||
| } | ||
| } | ||
| List<SkipEntry> skipEntries = new ArrayList<>(aggregated.size()); | ||
| for (AggregatedSkip v : aggregated.values()) { | ||
| if (v.full) { | ||
| skipEntries.add(new SkipEntry(v.ledgerId, v.entryId, null)); | ||
| } else { | ||
| // sort indexes to have deterministic order | ||
| List<Integer> idx = new ArrayList<>(v.indexes); | ||
| Collections.sort(idx); | ||
| skipEntries.add(new SkipEntry(v.ledgerId, v.entryId, idx)); | ||
| } | ||
| } | ||
| return sub.skipMessages(skipEntries); | ||
| } | ||
|
|
||
| private static final class AggregatedSkip { | ||
| final long ledgerId; | ||
| final long entryId; | ||
| boolean full = false; | ||
| final LinkedHashSet<Integer> indexes = new LinkedHashSet<>(); | ||
|
|
||
| AggregatedSkip(long ledgerId, long entryId) { | ||
| this.ledgerId = ledgerId; | ||
| this.entryId = entryId; | ||
| } | ||
| } | ||
|
|
||
| protected void internalExpireMessagesForAllSubscriptions(AsyncResponse asyncResponse, int expireTimeInSeconds, | ||
| boolean authoritative) { | ||
| CompletableFuture<Void> future = validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ | |
| import javax.ws.rs.core.MediaType; | ||
| import javax.ws.rs.core.Response; | ||
| import org.apache.pulsar.broker.admin.AdminResource; | ||
| import org.apache.pulsar.broker.admin.SkipMessageIdsRequest; | ||
| import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; | ||
| import org.apache.pulsar.broker.service.BrokerServiceException; | ||
| import org.apache.pulsar.broker.service.GetStatsOptions; | ||
|
|
@@ -615,6 +616,7 @@ public void skipAllMessages(@Suspended final AsyncResponse asyncResponse, @PathP | |
| @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}") | ||
| @ApiOperation(hidden = true, value = "Skip messages on a topic subscription.") | ||
| @ApiResponses(value = { | ||
| @ApiResponse(code = 400, message = "Invalid request"), | ||
| @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), | ||
| @ApiResponse(code = 403, message = "Don't have admin permission"), | ||
| @ApiResponse(code = 404, message = "Namesapce or topic or subscription does not exist") }) | ||
|
|
@@ -633,6 +635,30 @@ public void skipMessages(@Suspended final AsyncResponse asyncResponse, @PathPara | |
| } | ||
| } | ||
|
|
||
| @POST | ||
| @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds") | ||
| @ApiOperation(hidden = true, value = "Skip messages on a topic subscription.") | ||
| @ApiResponses(value = { | ||
| @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), | ||
| @ApiResponse(code = 400, message = "Bad Request: invalid messageIds format"), | ||
| @ApiResponse(code = 403, message = "Don't have admin permission"), | ||
| @ApiResponse(code = 404, message = "Namesapce or topic or subscription does not exist") }) | ||
|
||
| public void skipByMessageIds(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, | ||
| @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, | ||
| @PathParam("topic") @Encoded String encodedTopic, | ||
| @PathParam("subName") String encodedSubName, | ||
| @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, | ||
| @ApiParam(value = "The message ID to skip") SkipMessageIdsRequest messageIds) { | ||
| try { | ||
| validateTopicName(property, cluster, namespace, encodedTopic); | ||
| internalSkipByMessageIds(asyncResponse, decode(encodedSubName), authoritative, messageIds); | ||
| } catch (WebApplicationException wae) { | ||
| asyncResponse.resume(wae); | ||
| } catch (Exception e) { | ||
| asyncResponse.resume(new RestException(e)); | ||
| } | ||
| } | ||
|
|
||
| @POST | ||
| @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/expireMessages/{expireTimeInSeconds}") | ||
| @ApiOperation(hidden = true, value = "Expire messages on a topic subscription.") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getter methods (getLedgerId, getEntryId, getBatchIndex) are redundant for a Java record, as records automatically generate these methods. These explicit definitions can be removed.