Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1738,10 +1737,6 @@ public void topics() throws Exception {
verify(mockTopics).createSubscription("persistent://myprop/ns1/ds1", "sub1",
MessageId.earliest, false, null);

cmdTopics.run(split("analyze-backlog persistent://myprop/ns1/ds1 -s sub1"));
verify(mockTopics).analyzeSubscriptionBacklog("persistent://myprop/ns1/ds1", "sub1",
Optional.empty());

Comment on lines -1741 to -1744
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could be useful to keep a unit test for parameter parsing since this would catch issues before the integration test is run

cmdTopics.run(split("trim-topic persistent://myprop/ns1/ds1"));
verify(mockTopics).trimTopic("persistent://myprop/ns1/ds1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,17 @@ <K, V> void print(Map<K, V> items) {
}

<T> void print(T item) {
print(item, true);
}

<T> void print(T item, boolean prettyPrint) {
try {
if (item instanceof String) {
commandSpec.commandLine().getOut().println(item);
} else {
} else if (prettyPrint) {
prettyPrint(item);
} else {
plainPrint(item);
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -129,6 +135,14 @@ <T> void prettyPrint(T item) {
}
}

<T> void plainPrint(T item) {
try {
commandSpec.commandLine().getOut().println(MAPPER.writeValueAsString(item));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static final ObjectMapper MAPPER = ObjectMapperFactory.create();
private static final ObjectWriter WRITER = MAPPER.writerWithDefaultPrettyPrinter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import picocli.CommandLine.Command;
Expand Down Expand Up @@ -3002,24 +3003,44 @@ private class AnalyzeBacklog extends CliCommand {
@Parameters(description = "persistent://tenant/namespace/topic", arity = "1")
private String topicName;

@Option(names = { "-s", "--subscription" }, description = "Subscription to be analyzed", required = true)
@Option(names = {"-s", "--subscription"}, description = "Subscription to be analyzed", required = true)
private String subName;

@Option(names = { "--position",
"-p" }, description = "message position to start the scan from (ledgerId:entryId)", required = false)
@Option(names = {"--position",
"-p"}, description = "Message position to start the scan from (ledgerId:entryId)", required = false)
private String messagePosition;

@Option(names = {"--backlog-scan-max-entries",
"-b"}, description = "The maximum number of backlog entries the client will scan before terminating "
+ "its loop", required = false)
private long backlogScanMaxEntries = -1;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--backlog-scan-max-entries default of -1 relies on a subtle side-effect for backward compat — CmdTopics.java:3019 With -1, the predicate result.getEntries() >= -1 is always true, so the loop completes on the first iteration (matching old single-call behavior). That works, but it conflates "unset" with "terminate immediately." Preferred: treat unset as "no cap" (use the no-predicate overload analyzeSubscriptionBacklogAsync(topic, sub, pos)) and only take the looping path when -b is supplied.
That makes intent explicit and avoids relying on entries >= -1 semantics. Also add a validation error for -b 0 or negative user input, since those are meaningless.


@Option(names = {"--quiet", "-q"}, description = "Disable analyze-backlog progress reporting", required = false)
private boolean quiet = false;

@Option(names = {"--plain-print",
"-pp"}, description = "Plain(Non-pretty) print the final result output as NDJSON", required = false)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since there's already -p, it's not recommended to add -pp. Instead, it's better to shorten --plain-print to --plain and just not add a short option name at all.

private boolean plainPrint = false;

@Override
void run() throws PulsarAdminException {
void run() throws Exception {
String persistentTopic = validatePersistentTopic(topicName);
Optional<MessageId> startPosition = Optional.empty();
int partitionIndex = TopicName.get(persistentTopic).getPartitionIndex();
if (isNotBlank(messagePosition)) {
int partitionIndex = TopicName.get(persistentTopic).getPartitionIndex();
MessageId messageId = validateMessageIdString(messagePosition, partitionIndex);
startPosition = Optional.of(messageId);
}
print(getTopics().analyzeSubscriptionBacklog(persistentTopic, subName, startPosition));

AnalyzeSubscriptionBacklogResult backlogResult =
getTopics().analyzeSubscriptionBacklogAsync(persistentTopic, subName, startPosition, result -> {
boolean terminate = result.getEntries() >= backlogScanMaxEntries;
if (!quiet && !terminate) {
print(result, false);
}
return terminate;
}).get();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using the async method isn't useful since .get() is called. There would be a need to change to throws Exception when using analyzeSubscriptionBacklog method.

print(backlogResult, !plainPrint);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.cli.topic;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.testng.annotations.Test;

public class AnalyzeBacklogTest extends PulsarTestSuite {

private static final String PREFIX = "PULSAR_PREFIX_";
private static final String ANALYZE_BACKLOG_TOPIC_NAME = "public/default/analyze-backlog-topic";
private static final String ANALYZE_BACKLOG_SUBSCRIPTION_NAME = "sub1";
private static final int SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES = 10;
private static final String LINE_SEPARATOR_REGEX = "\\r?\\n";
private static final String TOPICS_CMD = "topics";

@Override
public void setupCluster() throws Exception {
brokerEnvs.put(PREFIX + "subscriptionBacklogScanMaxEntries",
String.valueOf(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES));
super.setupCluster();
}

@Test
public void testAnalyzeBacklogUsingDefaultConfig() throws Exception {
prepareSubscriptionBacklog(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES + 1);

ContainerExecResult result =
pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, "analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
"-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME);

String stdout = result.getStdout();
AnalyzeSubscriptionBacklogResult backlogResult =
jsonMapper().readValue(stdout, AnalyzeSubscriptionBacklogResult.class);
assertEquals(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES, backlogResult.getEntries());

String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
assertTrue(lines.length > 1);
}

@Test
public void testAnalyzeBacklogUsingPlainPrint() throws Exception {
prepareSubscriptionBacklog(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES + 1);

ContainerExecResult result =
pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, "analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
"-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "-pp");

String stdout = result.getStdout();
AnalyzeSubscriptionBacklogResult backlogResult =
jsonMapper().readValue(stdout, AnalyzeSubscriptionBacklogResult.class);
assertEquals(SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES, backlogResult.getEntries());

String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
assertEquals(1, lines.length);
}

@Test
public void testAnalyzeBacklogClientSideLoopUsingPlainPrint() throws Exception {
int backlogNum = 50;
prepareSubscriptionBacklog(backlogNum);

int backlogScanMaxEntries = 40;
ContainerExecResult result =
pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, "analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
"-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "-b", String.valueOf(backlogScanMaxEntries), "-pp");

int expectedResultLines = 4;
String stdout = result.getStdout();
String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
assertEquals(expectedResultLines, lines.length);

for (int i = 0; i < expectedResultLines; i++) {
AnalyzeSubscriptionBacklogResult backlogResult =
jsonMapper().readValue(lines[i], AnalyzeSubscriptionBacklogResult.class);
assertEquals((long) SUBSCRIPTION_BACKLOG_SCAN_MAX_ENTRIES * (i + 1), backlogResult.getEntries());
}
}

@Test
public void testAnalyzeBacklogClientSideLoopUsingQuietPlainPrint() throws Exception {
int backlogNum = 50;
prepareSubscriptionBacklog(backlogNum);

int backlogScanMaxEntries = 35;
ContainerExecResult result =
pulsarCluster.runAdminCommandOnAnyBroker(TOPICS_CMD, "analyze-backlog", ANALYZE_BACKLOG_TOPIC_NAME,
"-s", ANALYZE_BACKLOG_SUBSCRIPTION_NAME, "-b", String.valueOf(backlogScanMaxEntries), "-q",
"-pp");

String stdout = result.getStdout();
String[] lines = stdout.split(LINE_SEPARATOR_REGEX);
assertEquals(1, lines.length);

int expectedEntries = 40;
AnalyzeSubscriptionBacklogResult backlogResult =
jsonMapper().readValue(stdout, AnalyzeSubscriptionBacklogResult.class);
assertEquals(expectedEntries, backlogResult.getEntries());
}

private void prepareSubscriptionBacklog(int backlogNum) throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
@Cleanup
Producer<byte[]> producer =
client.newProducer().topic(ANALYZE_BACKLOG_TOPIC_NAME).enableBatching(false).create();
@Cleanup
Consumer<byte[]> consumer = client.newConsumer().topic(ANALYZE_BACKLOG_TOPIC_NAME)
.subscriptionName(ANALYZE_BACKLOG_SUBSCRIPTION_NAME).subscribe();

List<CompletableFuture<MessageId>> futures = new ArrayList<>(backlogNum);
for (int i = 0; i < backlogNum; i++) {
byte[] msgBytes = ("test" + i).getBytes(StandardCharsets.UTF_8);
CompletableFuture<MessageId> future = producer.sendAsync(msgBytes);
futures.add(future);
}
FutureUtil.waitForAll(futures).get();
}

}
1 change: 1 addition & 0 deletions tests/integration/src/test/resources/pulsar-cli.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<class name="org.apache.pulsar.tests.integration.cli.tenant.TenantTest"/>
<class name="org.apache.pulsar.tests.integration.cli.PerfToolTest"/>
<class name="org.apache.pulsar.tests.integration.cli.topicpolicies.SchemaCompatibilityStrategyTest"/>
<class name="org.apache.pulsar.tests.integration.cli.topic.AnalyzeBacklogTest"/>
</classes>
</test>
</suite>
Loading