diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 82132078cd783..b968ca7f6093e 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -120,6 +120,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.protocol.schema.PostSchemaPayload; +import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; @@ -2321,6 +2322,40 @@ public void nonPersistentTopics() throws Exception { verify(mockNonPersistentTopics).getListInBundle("myprop/ns1", "0x23d70a30_0x26666658"); } + @Test + public void topicsAnalyzeBacklogParameterParsing() throws Exception { + PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); + Topics mockTopics = mock(Topics.class); + when(admin.topics()).thenReturn(mockTopics); + + AnalyzeSubscriptionBacklogResult backlogResult = new AnalyzeSubscriptionBacklogResult(); + doReturn(backlogResult).when(mockTopics) + .analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"), eq("sub1"), Mockito.any()); + doReturn(backlogResult).when(mockTopics) + .analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"), eq("sub1"), Mockito.any(), + Mockito.any()); + + CmdTopics cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("analyze-backlog persistent://myprop/ns1/ds1 -s sub1 --position 1:1")); + verify(mockTopics).analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"), eq("sub1"), + eq(Optional.of(new MessageIdImpl(1, 1, -1)))); + + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("analyze-backlog persistent://myprop/ns1/ds1 -s sub1 -b 100 --plain --quiet")); + verify(mockTopics).analyzeSubscriptionBacklog(eq("persistent://myprop/ns1/ds1"), eq("sub1"), + eq(Optional.empty()), Mockito.any()); + } + + @Test + public void topicsAnalyzeBacklogRejectsNonPositiveBacklogScanMaxEntries() { + PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); + Topics mockTopics = mock(Topics.class); + when(admin.topics()).thenReturn(mockTopics); + + CmdTopics cmdTopics = new CmdTopics(() -> admin); + assertFalse(cmdTopics.run(split("analyze-backlog persistent://myprop/ns1/ds1 -s sub1 -b 0"))); + } + @Test public void bookies() throws Exception { PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java index f6701dd993bc9..0535796b6a7d2 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java @@ -110,11 +110,17 @@ void print(Map items) { } void print(T item) { + print(item, true); + } + + void print(T item, boolean prettyPrint) { try { if (item instanceof String) { commandSpec.commandLine().getOut().println(item); - } else { + } else if (prettyPrint) { prettyPrint(item); + } else { + plainPrint(item); } } catch (Exception e) { throw new RuntimeException(e); @@ -129,6 +135,14 @@ void prettyPrint(T item) { } } + void plainPrint(T item) { + try { + commandSpec.commandLine().getOut().println(MAPPER.writeValueAsString(item)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private static final ObjectMapper MAPPER = ObjectMapperFactory.create(); private static final ObjectWriter WRITER = MAPPER.writerWithDefaultPrettyPrinter(); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 0181d2b745520..9fc31eb95dacd 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -83,6 +83,7 @@ import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscribeRate; +import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.ObjectMapperFactory; import picocli.CommandLine.Command; @@ -3002,24 +3003,52 @@ private class AnalyzeBacklog extends CliCommand { @Parameters(description = "persistent://tenant/namespace/topic", arity = "1") private String topicName; - @Option(names = { "-s", "--subscription" }, description = "Subscription to be analyzed", required = true) + @Option(names = {"-s", "--subscription"}, description = "Subscription to be analyzed", required = true) private String subName; - @Option(names = { "--position", - "-p" }, description = "message position to start the scan from (ledgerId:entryId)", required = false) + @Option(names = {"--position", + "-p"}, description = "Message position to start the scan from (ledgerId:entryId)", required = false) private String messagePosition; + @Option(names = {"--backlog-scan-max-entries", + "-b"}, description = "The maximum number of backlog entries the client will scan before terminating " + + "its loop", required = false) + private Long backlogScanMaxEntries; + + @Option(names = {"--quiet", "-q"}, description = "Disable analyze-backlog progress reporting", required = false) + private boolean quiet = false; + + @Option(names = {"--plain"}, description = "Plain(Non-pretty) print backlog results as NDJSON", + required = false) + private boolean plainPrint = false; + @Override - void run() throws PulsarAdminException { + void run() throws Exception { String persistentTopic = validatePersistentTopic(topicName); Optional startPosition = Optional.empty(); + int partitionIndex = TopicName.get(persistentTopic).getPartitionIndex(); if (isNotBlank(messagePosition)) { - int partitionIndex = TopicName.get(persistentTopic).getPartitionIndex(); MessageId messageId = validateMessageIdString(messagePosition, partitionIndex); startPosition = Optional.of(messageId); } - print(getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition)); + AnalyzeSubscriptionBacklogResult backlogResult; + if (backlogScanMaxEntries == null) { + backlogResult = getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition); + } else { + if (backlogScanMaxEntries <= 0) { + throw new ParameterException("--backlog-scan-max-entries must be greater than 0"); + } + backlogResult = getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition, + result -> { + boolean terminate = result.getEntries() >= backlogScanMaxEntries; + if (!quiet && !terminate) { + print(result, !plainPrint); + } + return terminate; + }); + } + print(backlogResult, !plainPrint); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topic/AnalyzeBacklogTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topic/AnalyzeBacklogTest.java new file mode 100644 index 0000000000000..55220a6355e66 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/topic/AnalyzeBacklogTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.cli.topic; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; +import org.testng.annotations.Test; + +public class AnalyzeBacklogTest extends PulsarTestSuite { + + private static final String PREFIX = "PULSAR_PREFIX_"; + private static final String ANALYZE_BACKLOG_TOPIC_NAME = "public/default/analyze-backlog-topic"; + private static final String ANALYZE_BACKLOG_SUBSCRIPTION_NAME = "sub1"; + private static final int SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES = 10; + private static final String LINE_SEPARATOR_REGEX = "\\r?\\n"; + private static final String TOPICS_CMD = "topics"; + + @Override + public void setupCluster() throws Exception { + brokerEnvs.put(PREFIX + "subscriptionBacklogScanMaxEntries", + String.valueOf(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES)); + super.setupCluster(); + } + + @Test + public void testAnalyzeBacklogUsingDefaultConfig() throws Exception { + prepareSubscriptionBacklog(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES + 1); + + ContainerExecResult result = + pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, "analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME, + "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME); + + String stdout = result.getStdout(); + AnalyzeSubscriptionBacklogResult backlogResult = + jsonMapper().readValue(stdout, AnalyzeSubscriptionBacklogResult.class); + assertEquals(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES, backlogResult.getEntries()); + + String[] lines = stdout.split(LINE_SEPARATOR_REGEX); + assertTrue(lines.length > 1); + } + + @Test + public void testAnalyzeBacklogUsingPlainPrint() throws Exception { + prepareSubscriptionBacklog(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES + 1); + + ContainerExecResult result = + pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, "analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME, + "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "--plain"); + + String stdout = result.getStdout(); + AnalyzeSubscriptionBacklogResult backlogResult = + jsonMapper().readValue(stdout, AnalyzeSubscriptionBacklogResult.class); + assertEquals(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES, backlogResult.getEntries()); + + String[] lines = stdout.split(LINE_SEPARATOR_REGEX); + assertEquals(1, lines.length); + } + + @Test + public void testAnalyzeBacklogClientSideLoopUsingPlainPrint() throws Exception { + int backlogNum = 50; + prepareSubscriptionBacklog(backlogNum); + + int backlogScanMaxEntries = 40; + ContainerExecResult result = + pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, "analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME, + "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "-b", String.valueOf(backlogScanMaxEntries), + "--plain"); + + int expectedResultLines = 4; + String stdout = result.getStdout(); + String[] lines = stdout.split(LINE_SEPARATOR_REGEX); + assertEquals(expectedResultLines, lines.length); + + for (int i = 0; i < expectedResultLines; i++) { + AnalyzeSubscriptionBacklogResult backlogResult = + jsonMapper().readValue(lines[i], AnalyzeSubscriptionBacklogResult.class); + assertEquals((long) SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES * (i + 1), backlogResult.getEntries()); + } + } + + @Test + public void testAnalyzeBacklogClientSideLoopUsingQuietPlainPrint() throws Exception { + int backlogNum = 50; + prepareSubscriptionBacklog(backlogNum); + + int backlogScanMaxEntries = 35; + ContainerExecResult result = + pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, "analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME, + "-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "-b", String.valueOf(backlogScanMaxEntries), "-q", + "--plain"); + + String stdout = result.getStdout(); + String[] lines = stdout.split(LINE_SEPARATOR_REGEX); + assertEquals(1, lines.length); + + int expectedEntries = 40; + AnalyzeSubscriptionBacklogResult backlogResult = + jsonMapper().readValue(stdout, AnalyzeSubscriptionBacklogResult.class); + assertEquals(expectedEntries, backlogResult.getEntries()); + } + + private void prepareSubscriptionBacklog(int backlogNum) throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build(); + @Cleanup + Producer producer = + client.newProducer().topic(ANALYZE_BACKLOG_TOPIC_NAME).enableBatching(false).create(); + @Cleanup + Consumer consumer = client.newConsumer().topic(ANALYZE_BACKLOG_TOPIC_NAME) + .subscriptionName(ANALYZE_BACKLOG_SUBSCRIPTION_NAME).subscribe(); + + List> futures = new ArrayList<>(backlogNum); + for (int i = 0; i < backlogNum; i++) { + byte[] msgBytes = ("test" + i).getBytes(StandardCharsets.UTF_8); + CompletableFuture future = producer.sendAsync(msgBytes); + futures.add(future); + } + FutureUtil.waitForAll(futures).get(); + } + +} diff --git a/tests/integration/src/test/resources/pulsar-cli.xml b/tests/integration/src/test/resources/pulsar-cli.xml index af55aca8a0098..e32c4649d248f 100644 --- a/tests/integration/src/test/resources/pulsar-cli.xml +++ b/tests/integration/src/test/resources/pulsar-cli.xml @@ -34,6 +34,7 @@ +