From f0e0b66deb4ac727e5a013732fe880cc3ac5efb8 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Tue, 12 May 2026 19:57:36 +0800 Subject: [PATCH] [feat][cli] Add fault scenario controls to pulsar-perf --- .../testclient/PerformanceConsumer.java | 88 +++++++++++++------ .../testclient/PerformanceProducer.java | 32 +++++-- .../testclient/PerformanceProducerTest.java | 72 +++++++++++++++ 3 files changed, 161 insertions(+), 31 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index d699da466af14..dbd1d3368d216 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -180,6 +180,15 @@ public class PerformanceConsumer extends PerformanceTopicListArguments{ @Option(names = { "--histogram-file" }, description = "HdrHistogram output file") public String histogramFile = null; + @Option(names = {"--ack-mode"}, description = "Acknowledgement mode, valid options are: [ack, none]") + public AckMode ackMode = AckMode.ack; + + @Option(names = {"--processing-delay-millis"}, description = "Delay before acknowledging each message") + public long processingDelayMillis = 0; + + @Option(names = {"--ack-timeout-millis"}, description = "Ack timeout in millis. 0 disables ack timeout") + public long ackTimeoutMillis = 0; + public PerformanceConsumer() { super("consume"); } @@ -207,6 +216,15 @@ public void validate() throws Exception { throw new Exception("The size of subscriptions list should be equal to --num-subscriptions"); } } + if (this.ackMode == AckMode.none && this.isEnableTransaction) { + throw new Exception("--ack-mode none cannot be used with --txn-enable"); + } + if (this.processingDelayMillis < 0) { + throw new Exception("--processing-delay-millis must be >= 0"); + } + if (this.ackTimeoutMillis > 0 && this.ackTimeoutMillis <= 1000) { + throw new Exception("--ack-timeout-millis must be 0 or greater than 1000"); + } } @Override public void run() throws Exception { @@ -296,7 +314,14 @@ public void run() throws Exception { recorder.recordValue(latencyMillis); cumulativeRecorder.recordValue(latencyMillis); } - if (this.isEnableTransaction) { + if (this.processingDelayMillis > 0) { + try { + Thread.sleep(this.processingDelayMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (this.ackMode == AckMode.ack && this.isEnableTransaction) { try { messageReceiveLimiter.acquire(); } catch (InterruptedException e){ @@ -314,7 +339,7 @@ public void run() throws Exception { } return null; }); - } else { + } else if (this.ackMode == AckMode.ack) { consumer.acknowledgeAsync(msg).thenRun(()->{ totalMessageAck.increment(); messageAck.increment(); @@ -397,29 +422,7 @@ public void run() throws Exception { }; List>> futures = new ArrayList<>(); - ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.BYTEBUFFER) // - .messageListener(listener) // - .receiverQueueSize(this.receiverQueueSize) // - .maxTotalReceiverQueueSizeAcrossPartitions(this.maxTotalReceiverQueueSizeAcrossPartitions) - .acknowledgmentGroupTime(this.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) // - .subscriptionType(this.subscriptionType) - .subscriptionInitialPosition(this.subscriptionInitialPosition) - .autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull) - .enableBatchIndexAcknowledgment(this.batchIndexAck) - .poolMessages(this.poolMessages) - .replicateSubscriptionState(this.replicatedSubscription) - .autoScaledReceiverQueueSizeEnabled(this.autoScaledReceiverQueueSize); - if (this.maxPendingChunkedMessage > 0) { - consumerBuilder.maxPendingChunkedMessage(this.maxPendingChunkedMessage); - } - if (this.expireTimeOfIncompleteChunkedMessageMs > 0) { - consumerBuilder.expireTimeOfIncompleteChunkedMessage(this.expireTimeOfIncompleteChunkedMessageMs, - TimeUnit.MILLISECONDS); - } - - if (isNotBlank(this.encKeyFile)) { - consumerBuilder.defaultCryptoKeyReader(this.encKeyFile); - } + ConsumerBuilder consumerBuilder = createConsumerBuilder(pulsarClient, listener); for (int i = 0; i < this.numTopics; i++) { final TopicName topicName = TopicName.get(this.topics.get(i)); @@ -589,6 +592,37 @@ private void printAggregatedThroughput(long start) { totalMessagesReceived.sum(), rate, throughput, rateAck, totalnumMessageAckFailed); } + ConsumerBuilder createConsumerBuilder(PulsarClient pulsarClient, + MessageListener listener) { + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.BYTEBUFFER) // + .messageListener(listener) // + .receiverQueueSize(this.receiverQueueSize) // + .maxTotalReceiverQueueSizeAcrossPartitions(this.maxTotalReceiverQueueSizeAcrossPartitions) + .acknowledgmentGroupTime(this.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) // + .subscriptionType(this.subscriptionType) + .subscriptionInitialPosition(this.subscriptionInitialPosition) + .autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull) + .enableBatchIndexAcknowledgment(this.batchIndexAck) + .poolMessages(this.poolMessages) + .replicateSubscriptionState(this.replicatedSubscription) + .autoScaledReceiverQueueSizeEnabled(this.autoScaledReceiverQueueSize); + if (this.maxPendingChunkedMessage > 0) { + consumerBuilder.maxPendingChunkedMessage(this.maxPendingChunkedMessage); + } + if (this.expireTimeOfIncompleteChunkedMessageMs > 0) { + consumerBuilder.expireTimeOfIncompleteChunkedMessage(this.expireTimeOfIncompleteChunkedMessageMs, + TimeUnit.MILLISECONDS); + } + if (this.ackTimeoutMillis > 0) { + consumerBuilder.ackTimeout(this.ackTimeoutMillis, TimeUnit.MILLISECONDS); + } + + if (isNotBlank(this.encKeyFile)) { + consumerBuilder.defaultCryptoKeyReader(this.encKeyFile); + } + return consumerBuilder; + } + private static void printAggregatedStats() { Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram(); @@ -605,4 +639,8 @@ private static void printAggregatedStats() { reportHistogram.getValueAtPercentile(99.999), reportHistogram.getMaxValue()); } + + public enum AckMode { + ack, none + } } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 05ad9f6254dd1..653ed186e61d4 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -214,6 +214,15 @@ public class PerformanceProducer extends PerformanceTopicListArguments{ + ", valid options are: [autoIncrement, random]", descriptionKey = "messageKeyGenerationMode") public String messageKeyGenerationMode = null; + @Option(names = {"--message-key"}, description = "Use a fixed key for all messages") + public String messageKey = null; + + @Option(names = {"--message-routing-mode"}, description = "Message routing mode for partitioned topics") + public MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition; + + @Option(names = {"--block-if-queue-full"}, arity = "1", description = "Block sends when the pending queue is full") + public boolean blockIfQueueFull = true; + @Option(names = { "-am", "--access-mode" }, description = "Producer access mode") public ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared; @@ -436,6 +445,17 @@ public PerformanceProducer() { super("produce"); } + @Override + public void validate() throws Exception { + super.validate(); + if (isNotBlank(this.messageKey) && isNotBlank(this.messageKeyGenerationMode)) { + throw new Exception("--message-key cannot be used with --message-key-generation-mode"); + } + if (this.messageRoutingMode == MessageRoutingMode.CustomPartition) { + throw new Exception("--message-routing-mode CustomPartition is not supported by pulsar-perf"); + } + } + private static void executorShutdownNow(ExecutorService executor) { executor.shutdownNow(); try { @@ -470,8 +490,7 @@ ProducerBuilder createProducerBuilder(PulsarClient client, int producerI .compressionType(this.compression) // .maxPendingMessages(this.maxOutstanding) // .accessMode(this.producerAccessMode) - // enable round robin message routing if it is a partitioned topic - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition); + .messageRoutingMode(this.messageRoutingMode); if (this.maxPendingMessagesAcrossPartitions > 0) { producerBuilder.maxPendingMessagesAcrossPartitions(this.maxPendingMessagesAcrossPartitions); } @@ -494,8 +513,7 @@ ProducerBuilder createProducerBuilder(PulsarClient client, int producerI producerBuilder.batchingMaxBytes(this.batchMaxBytes); } - // Block if queue is full else we will start seeing errors in sendAsync - producerBuilder.blockIfQueueFull(true); + producerBuilder.blockIfQueueFull(this.blockIfQueueFull); if (isNotBlank(this.encKeyName) && isNotBlank(this.encKeyFile)) { producerBuilder.addEncryptionKey(this.encKeyName); @@ -648,8 +666,10 @@ private void runProducer(int producerId, if (this.setEventTime) { messageBuilder.eventTime(System.currentTimeMillis()); } - //generate msg key - if (msgKeyMode == MessageKeyGenerationMode.random) { + // Generate message key. + if (isNotBlank(this.messageKey)) { + messageBuilder.key(this.messageKey); + } else if (msgKeyMode == MessageKeyGenerationMode.random) { messageBuilder.key(String.valueOf(ThreadLocalRandom.current().nextInt())); } else if (msgKeyMode == MessageKeyGenerationMode.autoIncrement) { messageBuilder.key(String.valueOf(totalSent.get())); diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index 654b73b70ae45..21c225c2e832b 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ProducerBuilderImpl; @@ -181,6 +182,77 @@ public void testBatchingDisabled() throws Exception { Assert.assertFalse(builder.getConf().isBatchingEnabled()); } + @Test(timeOut = 20000) + public void testFixedMessageKey() throws Exception { + String argString = "%s -r 10 -u %s -m 5 --message-key hot-key"; + String topic = testTopic + UUID.randomUUID(); + String args = String.format(argString, topic, pulsar.getBrokerServiceUrl()); + Thread thread = new Thread(() -> { + try { + PerformanceProducer producer = new PerformanceProducer(); + producer.run(args.split(" ")); + } catch (Exception e) { + e.printStackTrace(); + } + }); + Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared).subscribe(); + + thread.start(); + + Message message = consumer.receive(15, TimeUnit.SECONDS); + assertNotNull(message); + Assert.assertEquals(message.getKey(), "hot-key"); + consumer.acknowledge(message); + + thread.interrupt(); + thread.join(); + consumer.close(); + } + + @Test + public void testMessageKeyCannotBeUsedWithMessageKeyGenerationMode() { + PerformanceProducer producer = new PerformanceProducer(); + producer.topics = List.of(testTopic + UUID.randomUUID()); + producer.messageKey = "hot-key"; + producer.messageKeyGenerationMode = "random"; + + Assert.expectThrows(Exception.class, producer::validate); + } + + @Test + public void testCustomMessageRoutingModeIsRejected() { + PerformanceProducer producer = new PerformanceProducer(); + producer.topics = List.of(testTopic + UUID.randomUUID()); + producer.messageRoutingMode = MessageRoutingMode.CustomPartition; + + Assert.expectThrows(Exception.class, producer::validate); + } + + @Test(timeOut = 20000) + public void testProducerFaultScenarioBuilderOptions() throws Exception { + String topic = testTopic + UUID.randomUUID(); + PerformanceProducer producer = new PerformanceProducer(); + producer.topics = List.of(topic); + producer.serviceURL = pulsar.getBrokerServiceUrl(); + + ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(producer) + .enableTransaction(producer.isEnableTransaction); + @Cleanup + PulsarClient client = clientBuilder.build(); + ProducerBuilderImpl defaultBuilder = (ProducerBuilderImpl) producer.createProducerBuilder( + client, 0); + Assert.assertEquals(defaultBuilder.getConf().getMessageRoutingMode(), MessageRoutingMode.RoundRobinPartition); + Assert.assertTrue(defaultBuilder.getConf().isBlockIfQueueFull()); + + producer.messageRoutingMode = MessageRoutingMode.SinglePartition; + producer.blockIfQueueFull = false; + ProducerBuilderImpl faultBuilder = (ProducerBuilderImpl) producer.createProducerBuilder( + client, 0); + Assert.assertEquals(faultBuilder.getConf().getMessageRoutingMode(), MessageRoutingMode.SinglePartition); + Assert.assertFalse(faultBuilder.getConf().isBlockIfQueueFull()); + } + @Test(timeOut = 20000) public void testCreatePartitions() throws Exception { String argString = "%s -r 10 -u %s -au %s -m 5 -np 10";