diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java index 732071ee01a6a..413bb5c018e84 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java @@ -69,29 +69,23 @@ public void readEntriesComplete(List entries, Object ctx) { try { Position lastPositionForBatch = entries.get(entries.size() - 1).getPosition(); lastSeenPosition = lastPositionForBatch; - // filter out the entry if it has been already deleted - // filterReadEntries will call entry.release if the entry is filtered out - List entriesFiltered = this.cursor.filterReadEntries(entries); - int skippedEntries = entries.size() - entriesFiltered.size(); - remainingEntries.addAndGet(-skippedEntries); - if (!entriesFiltered.isEmpty()) { - for (Entry entry : entriesFiltered) { - if (remainingEntries.decrementAndGet() <= 0) { - log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor); - callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); - return; - } - if (!condition.test(entry)) { - log.warn("[{}] Scan abort due to user code", OpScan.this.cursor); - callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx); - return; - } + for (Entry entry : entries) { + if (remainingEntries.getAndDecrement() <= 0) { + log.info("[{}] Scan abort after reading too many entries", OpScan.this.cursor); + callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); + return; + } + if (!condition.test(entry)) { + log.info("[{}] Scan abort due to user code", OpScan.this.cursor); + callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx); + return; } } searchPosition = ledger.getPositionAfterN(lastPositionForBatch, 1, PositionBound.startExcluded); if (log.isDebugEnabled()) { - log.debug("readEntryComplete {} at {} next is {}", lastPositionForBatch, searchPosition); + log.debug("[{}] readEntryComplete at {} next is {}", OpScan.this.cursor, lastPositionForBatch, + searchPosition); } if (searchPosition.compareTo(lastPositionForBatch) == 0) { @@ -117,12 +111,12 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { public void find() { if (remainingEntries.get() <= 0) { - log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor); + log.info("[{}] Scan abort after reading too many entries", OpScan.this.cursor); callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); return; } if (System.currentTimeMillis() - startTime > timeOutMs) { - log.warn("[{}] Scan abort after hitting the deadline", OpScan.this.cursor); + log.info("[{}] Scan abort after hitting the deadline", OpScan.this.cursor); callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index acea913204999..4425436954aa6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -20,6 +20,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import java.util.ArrayList; import java.util.List; @@ -27,10 +28,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.SubscriptionType; @@ -50,6 +53,12 @@ public void setup() throws Exception { producerBaseSetup(); } + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setDispatcherMaxReadBatchSize(10); + } + @AfterMethod(alwaysRun = true) @Override public void cleanup() throws Exception { @@ -189,4 +198,190 @@ public void partitionedTopicNotAllowed() throws Exception { assertEquals(0, analyzeSubscriptionBacklogResult.getEntries()); } + @Test + public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 20; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-without-loop"; + String subName = "sub-1"; + int numMessages = 10; + + // Test server returns false aborted flag. + List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); + + verifyClientSideLoopBacklog(topic, subName, numMessages - 1, numMessages, messageIds.get(0), + messageIds.get(numMessages - 1)); + } + + @Test + public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 20; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-without-loop"; + String subName = "sub-1"; + int numMessages = 25; + + // Test backlogScanMaxEntries(client side) <= subscriptionBacklogScanMaxEntries(server side), but server + // returns true aborted flag. Server dispatcherMaxReadBatchSize is set to 10. + List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); + + verifyClientSideLoopBacklog(topic, subName, serverSubscriptionBacklogScanMaxEntries - 1, + serverSubscriptionBacklogScanMaxEntries, messageIds.get(0), + messageIds.get(serverSubscriptionBacklogScanMaxEntries - 1)); + + } + + @Test + public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 20; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-with-loop"; + String subName = "sub-1"; + int numMessages = 45; + + // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination + // condition is that server returns false aborted flag. + List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); + + verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, messageIds.get(0), + messageIds.get(numMessages - 1)); + } + + @Test + public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 15; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-with-loop"; + String subName = "sub-1"; + int numMessages = 55; + int backlogScanMaxEntries = 40; + + // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination + // condition is that total entries exceeds backlogScanMaxEntries. + // Server dispatcherMaxReadBatchSize is set to 10. + List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); + + // Broker returns 15 + 15 + 15 = 45 entries. + int expectedEntries = (backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries + 1) + * serverSubscriptionBacklogScanMaxEntries; + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, expectedEntries, messageIds.get(0), + messageIds.get(expectedEntries - 1)); + } + + @Test + public void analyzeBacklogWithTopicUnload() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 10; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-unload"; + String subName = "sub-1"; + int numMessages = 35; + + admin.topics().createSubscription(topic, subName, MessageId.latest); + + assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); + verifyBacklog(topic, subName, 0, 0); + + // Test client side loop with topic unload. Use sync send method here to avoid potential message duplication. + @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + List messageIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + MessageId messageId = producer.send(("test-" + i).getBytes()); + messageIds.add(messageId); + if (RandomUtils.secure().randomBoolean()) { + admin.topics().unload(topic); + } + } + + verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, messageIds.get(0), + messageIds.get(numMessages - 1)); + } + + @Test + public void analyzeBacklogWithIndividualAck() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 20; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-with-individual-ack"; + String subName = "sub-1"; + int messages = 55; + + // Test client side loop with individual ack. + List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, messages); + + // We want to wait for the server to process acks, in order to not have a flaky test. + @Cleanup Consumer consumer = + pulsarClient.newConsumer().topic(topic).isAckReceiptEnabled(true).subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared).subscribe(); + + // Individual ack message2. + Message message1 = consumer.receive(); + Message message2 = consumer.receive(); + consumer.acknowledge(message2); + + int backlogScanMaxEntries = 20; + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, backlogScanMaxEntries, messageIds.get(0), + messageIds.get(backlogScanMaxEntries)); + + // Ack message1. + consumer.acknowledge(message1); + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, backlogScanMaxEntries, messageIds.get(2), + messageIds.get(backlogScanMaxEntries + 1)); + + // Ack all messages. + for (int i = 2; i < messages; i++) { + Message message = consumer.receive(); + consumer.acknowledge(message); + } + + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 0, null, null); + } + + private List clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages) + throws Exception { + admin.topics().createSubscription(topic, subName, MessageId.latest); + + assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); + verifyClientSideLoopBacklog(topic, subName, -1, 0, null, null); + + @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + List> futures = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + CompletableFuture future = producer.sendAsync(("test-" + i).getBytes()); + futures.add(future); + } + FutureUtil.waitForAll(futures).get(); + return futures.stream().map(CompletableFuture::join).toList(); + } + + private void verifyClientSideLoopBacklog(String topic, String subName, int backlogMaxScanEntries, + int expectedEntries, MessageId firstMessageId, MessageId lastMessageId) + throws Exception { + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogMaxScanEntries); + + assertEquals(backlogResult.getEntries(), expectedEntries); + assertEquals(backlogResult.getMessages(), expectedEntries); + + if (firstMessageId == null) { + assertNull(backlogResult.getFirstMessageId()); + } else { + MessageIdAdv firstMessageIdAdv = (MessageIdAdv) firstMessageId; + assertEquals(backlogResult.getFirstMessageId(), + firstMessageIdAdv.getLedgerId() + ":" + firstMessageIdAdv.getEntryId()); + } + + if (lastMessageId == null) { + assertNull(backlogResult.getLastMessageId()); + } else { + MessageIdAdv lastMessageIdAdv = (MessageIdAdv) lastMessageId; + assertEquals(backlogResult.getLastMessageId(), + lastMessageIdAdv.getLedgerId() + ":" + lastMessageIdAdv.getEntryId()); + } + } + } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index fbcf0b4a07b1f..e68be8fd2e805 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; @@ -2220,21 +2221,134 @@ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String * This is a potentially expensive operation, as it requires * to read the messages from storage. * This function takes into consideration batch messages - * and also Subscription filters. + * and also Subscription filters.
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, long)} * @param topic * Topic name * @param subscriptionName * the subscription * @param startPosition * the position to start the scan from (empty means the last processed message) + * @param backlogScanMaxEntries + * the maximum number of backlog entries the client will scan before terminating its loop * @return an accurate analysis of the backlog * @throws PulsarAdminException * Unexpected error */ + AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional startPosition, + long backlogScanMaxEntries) throws PulsarAdminException; + + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters.
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, Predicate)}
+ * + * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @param terminatePredicate + * the predicate to determine whether to terminate the loop + * @return an accurate analysis of the backlog + */ + AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional startPosition, + Predicate terminatePredicate) + throws PulsarAdminException; + + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters. + * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @return an accurate analysis of the backlog + */ CompletableFuture analyzeSubscriptionBacklogAsync(String topic, String subscriptionName, Optional startPosition); + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters. + * + *

+ * What's the purpose of this overloaded method?
+ * There are broker side configurable maximum limits how many entries will be read and how long the scanning can + * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and subscriptionBacklogScanMaxEntries + * (default 10000) control this behavior.
+ * Increasing these settings is possible. However, it's possible that the HTTP request times out (also idle timeout + * in NAT/firewall etc.) before the command completes so increasing the limits might not be useful beyond a few + * minutes. + *

+ * + *

+ * How does this method work?
+ * 1. Add a new parameter backlogScanMaxEntries in client side method to control the client-side loop termination + * condition.
+ * 2. If subscriptionBacklogScanMaxEntries(server side) >= backlogScanMaxEntries(client side), then + * backlogScanMaxEntries parameter will take no effect.
+ * 3. If subscriptionBacklogScanMaxEntries < backlogScanMaxEntries, the client will call analyze-backlog method in + * a loop until server return ScanOutcome.COMPLETED or the total entries exceeds backlogScanMaxEntries.
+ * 4. This means that backlogScanMaxEntries cannot be used to precisely control the number of entries scanned by + * the server, it only serves to determine when the loop should terminate.
+ * 5. With this method, the server can reduce the values of the two parameters subscriptionBacklogScanMaxTimeMs and + * subscriptionBacklogScanMaxEntries, so user can retrieve the desired number of backlog entries through + * client-side looping. + *

+ * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @param backlogScanMaxEntries + * the maximum number of backlog entries the client will scan before terminating its loop + * @return an accurate analysis of the backlog + */ + CompletableFuture analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, + Optional startPosition, + long backlogScanMaxEntries); + + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters.
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, long)}
+ * User can control the loop termination condition by terminatePredicate. + * + * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @param terminatePredicate + * the predicate to determine whether to terminate the loop + * @return an accurate analysis of the backlog + */ + CompletableFuture analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, Optional startPosition, + Predicate terminatePredicate); + /** * Get backlog size by a message ID. * @param topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 11dd69a23ce58..78b7329159c1a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -34,6 +34,10 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.ws.rs.client.Entity; @@ -44,6 +48,7 @@ import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -1560,6 +1565,23 @@ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition)); } + @Override + public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional startPosition, + long backlogScanMaxEntries) + throws PulsarAdminException { + return sync( + () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, backlogScanMaxEntries)); + } + + @Override + public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional startPosition, + Predicate terminatePredicate) + throws PulsarAdminException { + return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, terminatePredicate)); + } + @Override public CompletableFuture analyzeSubscriptionBacklogAsync(String topic, String subscriptionName, @@ -1591,6 +1613,96 @@ public void failed(Throwable throwable) { return future; } + @Override + public CompletableFuture analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, + Optional startPosition, + long backlogScanMaxEntries) { + return analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, + (backlogResult) -> backlogResult.getEntries() >= backlogScanMaxEntries); + } + + @Override + public CompletableFuture analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, Optional startPosition, + Predicate terminatePredicate) { + final CompletableFuture future = new CompletableFuture<>(); + AtomicReference resultRef = new AtomicReference<>(); + int partitionIndex = TopicName.get(topic).getPartitionIndex(); + AtomicReference> startPositionRef = new AtomicReference<>(startPosition); + + Supplier> resultSupplier = + () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPositionRef.get()); + BiConsumer completeAction = new BiConsumer<>() { + @Override + public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable throwable) { + if (throwable != null) { + future.completeExceptionally(throwable); + return; + } + + AnalyzeSubscriptionBacklogResult mergedResult = mergeBacklogResults(currentResult, resultRef.get()); + resultRef.set(mergedResult); + if (!mergedResult.isAborted() || terminatePredicate.test(mergedResult)) { + future.complete(mergedResult); + return; + } + + // In analyze-backlog, we treat 0 entries or null lastMessageId as scan completed for mere safety. + // 0 entries or a null lastMessageId indicates no entries were scanned. + if (currentResult.getEntries() <= 0 || StringUtils.isBlank(currentResult.getLastMessageId())) { + log.info("[{}][{}] complete scan due total entry <= 0 or last message id is blank, " + + "start position is: {}, current result: {}", topic, subscriptionName, + startPositionRef.get(), currentResult); + future.complete(mergedResult); + return; + } + + String[] messageIdSplits = mergedResult.getLastMessageId().split(":"); + MessageIdImpl nextScanMessageId = + new MessageIdImpl(Long.parseLong(messageIdSplits[0]), Long.parseLong(messageIdSplits[1]) + 1, + partitionIndex); + startPositionRef.set(Optional.of(nextScanMessageId)); + + resultSupplier.get().whenComplete(this); + } + }; + + resultSupplier.get().whenComplete(completeAction); + return future; + } + + private AnalyzeSubscriptionBacklogResult mergeBacklogResults(AnalyzeSubscriptionBacklogResult current, + AnalyzeSubscriptionBacklogResult previous) { + if (previous == null) { + return current; + } + + AnalyzeSubscriptionBacklogResult mergedRes = new AnalyzeSubscriptionBacklogResult(); + mergedRes.setEntries(current.getEntries() + previous.getEntries()); + mergedRes.setMessages(current.getMessages() + previous.getMessages()); + mergedRes.setMarkerMessages(current.getMarkerMessages() + previous.getMarkerMessages()); + + mergedRes.setFilterAcceptedEntries(current.getFilterAcceptedEntries() + previous.getFilterAcceptedEntries()); + mergedRes.setFilterRejectedEntries(current.getFilterRejectedEntries() + previous.getFilterRejectedEntries()); + mergedRes.setFilterRescheduledEntries( + current.getFilterRescheduledEntries() + previous.getFilterRescheduledEntries()); + + mergedRes.setFilterAcceptedMessages(current.getFilterAcceptedMessages() + previous.getFilterAcceptedMessages()); + mergedRes.setFilterRejectedMessages(current.getFilterRejectedMessages() + previous.getFilterRejectedMessages()); + mergedRes.setFilterRescheduledMessages( + current.getFilterRescheduledMessages() + previous.getFilterRescheduledMessages()); + + mergedRes.setAborted(current.isAborted()); + mergedRes.setFirstMessageId(previous.getFirstMessageId()); + String lastMessageId = current.getLastMessageId(); + if (StringUtils.isNotBlank(lastMessageId)) { + mergedRes.setLastMessageId(lastMessageId); + } + + return mergedRes; + } + @Override public Long getBacklogSizeByMessageId(String topic, MessageId messageId) throws PulsarAdminException {