Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ public class PerformanceConsumer extends PerformanceTopicListArguments{
@Option(names = { "--histogram-file" }, description = "HdrHistogram output file")
public String histogramFile = null;

@Option(names = {"--ack-mode"}, description = "Acknowledgement mode, valid options are: [ack, none]")
public AckMode ackMode = AckMode.ack;

@Option(names = {"--processing-delay-millis"}, description = "Delay before acknowledging each message")
public long processingDelayMillis = 0;

@Option(names = {"--ack-timeout-millis"}, description = "Ack timeout in millis. 0 disables ack timeout")
public long ackTimeoutMillis = 0;

public PerformanceConsumer() {
super("consume");
}
Expand Down Expand Up @@ -207,6 +216,15 @@ public void validate() throws Exception {
throw new Exception("The size of subscriptions list should be equal to --num-subscriptions");
}
}
if (this.ackMode == AckMode.none && this.isEnableTransaction) {
throw new Exception("--ack-mode none cannot be used with --txn-enable");
}
if (this.processingDelayMillis < 0) {
throw new Exception("--processing-delay-millis must be >= 0");
}
if (this.ackTimeoutMillis > 0 && this.ackTimeoutMillis <= 1000) {
throw new Exception("--ack-timeout-millis must be 0 or greater than 1000");
}
}
@Override
public void run() throws Exception {
Expand Down Expand Up @@ -296,7 +314,14 @@ public void run() throws Exception {
recorder.recordValue(latencyMillis);
cumulativeRecorder.recordValue(latencyMillis);
}
if (this.isEnableTransaction) {
if (this.processingDelayMillis > 0) {
try {
Thread.sleep(this.processingDelayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (this.ackMode == AckMode.ack && this.isEnableTransaction) {
try {
messageReceiveLimiter.acquire();
} catch (InterruptedException e){
Expand All @@ -314,7 +339,7 @@ public void run() throws Exception {
}
return null;
});
} else {
} else if (this.ackMode == AckMode.ack) {
consumer.acknowledgeAsync(msg).thenRun(()->{
totalMessageAck.increment();
messageAck.increment();
Expand Down Expand Up @@ -397,29 +422,7 @@ public void run() throws Exception {
};

List<Future<Consumer<ByteBuffer>>> futures = new ArrayList<>();
ConsumerBuilder<ByteBuffer> consumerBuilder = pulsarClient.newConsumer(Schema.BYTEBUFFER) //
.messageListener(listener) //
.receiverQueueSize(this.receiverQueueSize) //
.maxTotalReceiverQueueSizeAcrossPartitions(this.maxTotalReceiverQueueSizeAcrossPartitions)
.acknowledgmentGroupTime(this.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) //
.subscriptionType(this.subscriptionType)
.subscriptionInitialPosition(this.subscriptionInitialPosition)
.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull)
.enableBatchIndexAcknowledgment(this.batchIndexAck)
.poolMessages(this.poolMessages)
.replicateSubscriptionState(this.replicatedSubscription)
.autoScaledReceiverQueueSizeEnabled(this.autoScaledReceiverQueueSize);
if (this.maxPendingChunkedMessage > 0) {
consumerBuilder.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
}
if (this.expireTimeOfIncompleteChunkedMessageMs > 0) {
consumerBuilder.expireTimeOfIncompleteChunkedMessage(this.expireTimeOfIncompleteChunkedMessageMs,
TimeUnit.MILLISECONDS);
}

if (isNotBlank(this.encKeyFile)) {
consumerBuilder.defaultCryptoKeyReader(this.encKeyFile);
}
ConsumerBuilder<ByteBuffer> consumerBuilder = createConsumerBuilder(pulsarClient, listener);

for (int i = 0; i < this.numTopics; i++) {
final TopicName topicName = TopicName.get(this.topics.get(i));
Expand Down Expand Up @@ -589,6 +592,37 @@ private void printAggregatedThroughput(long start) {
totalMessagesReceived.sum(), rate, throughput, rateAck, totalnumMessageAckFailed);
}

ConsumerBuilder<ByteBuffer> createConsumerBuilder(PulsarClient pulsarClient,
MessageListener<ByteBuffer> listener) {
ConsumerBuilder<ByteBuffer> consumerBuilder = pulsarClient.newConsumer(Schema.BYTEBUFFER) //
.messageListener(listener) //
.receiverQueueSize(this.receiverQueueSize) //
.maxTotalReceiverQueueSizeAcrossPartitions(this.maxTotalReceiverQueueSizeAcrossPartitions)
.acknowledgmentGroupTime(this.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) //
.subscriptionType(this.subscriptionType)
.subscriptionInitialPosition(this.subscriptionInitialPosition)
.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull)
.enableBatchIndexAcknowledgment(this.batchIndexAck)
.poolMessages(this.poolMessages)
.replicateSubscriptionState(this.replicatedSubscription)
.autoScaledReceiverQueueSizeEnabled(this.autoScaledReceiverQueueSize);
if (this.maxPendingChunkedMessage > 0) {
consumerBuilder.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
}
if (this.expireTimeOfIncompleteChunkedMessageMs > 0) {
consumerBuilder.expireTimeOfIncompleteChunkedMessage(this.expireTimeOfIncompleteChunkedMessageMs,
TimeUnit.MILLISECONDS);
}
if (this.ackTimeoutMillis > 0) {
consumerBuilder.ackTimeout(this.ackTimeoutMillis, TimeUnit.MILLISECONDS);
}

if (isNotBlank(this.encKeyFile)) {
consumerBuilder.defaultCryptoKeyReader(this.encKeyFile);
}
return consumerBuilder;
}

private static void printAggregatedStats() {
Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();

Expand All @@ -605,4 +639,8 @@ private static void printAggregatedStats() {
reportHistogram.getValueAtPercentile(99.999),
reportHistogram.getMaxValue());
}

public enum AckMode {
ack, none
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ public class PerformanceProducer extends PerformanceTopicListArguments{
+ ", valid options are: [autoIncrement, random]", descriptionKey = "messageKeyGenerationMode")
public String messageKeyGenerationMode = null;

@Option(names = {"--message-key"}, description = "Use a fixed key for all messages")
public String messageKey = null;

@Option(names = {"--message-routing-mode"}, description = "Message routing mode for partitioned topics")
public MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition;

@Option(names = {"--block-if-queue-full"}, arity = "1", description = "Block sends when the pending queue is full")
public boolean blockIfQueueFull = true;

@Option(names = { "-am", "--access-mode" }, description = "Producer access mode")
public ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared;

Expand Down Expand Up @@ -436,6 +445,17 @@ public PerformanceProducer() {
super("produce");
}

@Override
public void validate() throws Exception {
super.validate();
if (isNotBlank(this.messageKey) && isNotBlank(this.messageKeyGenerationMode)) {
throw new Exception("--message-key cannot be used with --message-key-generation-mode");
}
if (this.messageRoutingMode == MessageRoutingMode.CustomPartition) {
throw new Exception("--message-routing-mode CustomPartition is not supported by pulsar-perf");
}
}

private static void executorShutdownNow(ExecutorService executor) {
executor.shutdownNow();
try {
Expand Down Expand Up @@ -470,8 +490,7 @@ ProducerBuilder<byte[]> createProducerBuilder(PulsarClient client, int producerI
.compressionType(this.compression) //
.maxPendingMessages(this.maxOutstanding) //
.accessMode(this.producerAccessMode)
// enable round robin message routing if it is a partitioned topic
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
.messageRoutingMode(this.messageRoutingMode);
if (this.maxPendingMessagesAcrossPartitions > 0) {
producerBuilder.maxPendingMessagesAcrossPartitions(this.maxPendingMessagesAcrossPartitions);
}
Expand All @@ -494,8 +513,7 @@ ProducerBuilder<byte[]> createProducerBuilder(PulsarClient client, int producerI
producerBuilder.batchingMaxBytes(this.batchMaxBytes);
}

// Block if queue is full else we will start seeing errors in sendAsync
producerBuilder.blockIfQueueFull(true);
producerBuilder.blockIfQueueFull(this.blockIfQueueFull);

if (isNotBlank(this.encKeyName) && isNotBlank(this.encKeyFile)) {
producerBuilder.addEncryptionKey(this.encKeyName);
Expand Down Expand Up @@ -648,8 +666,10 @@ private void runProducer(int producerId,
if (this.setEventTime) {
messageBuilder.eventTime(System.currentTimeMillis());
}
//generate msg key
if (msgKeyMode == MessageKeyGenerationMode.random) {
// Generate message key.
if (isNotBlank(this.messageKey)) {
messageBuilder.key(this.messageKey);
} else if (msgKeyMode == MessageKeyGenerationMode.random) {
messageBuilder.key(String.valueOf(ThreadLocalRandom.current().nextInt()));
} else if (msgKeyMode == MessageKeyGenerationMode.autoIncrement) {
messageBuilder.key(String.valueOf(totalSent.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
Expand Down Expand Up @@ -181,6 +182,77 @@ public void testBatchingDisabled() throws Exception {
Assert.assertFalse(builder.getConf().isBatchingEnabled());
}

@Test(timeOut = 20000)
public void testFixedMessageKey() throws Exception {
String argString = "%s -r 10 -u %s -m 5 --message-key hot-key";
String topic = testTopic + UUID.randomUUID();
String args = String.format(argString, topic, pulsar.getBrokerServiceUrl());
Thread thread = new Thread(() -> {
try {
PerformanceProducer producer = new PerformanceProducer();
producer.run(args.split(" "));
} catch (Exception e) {
e.printStackTrace();
}
});
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub")
.subscriptionType(SubscriptionType.Shared).subscribe();

thread.start();

Message<byte[]> message = consumer.receive(15, TimeUnit.SECONDS);
assertNotNull(message);
Assert.assertEquals(message.getKey(), "hot-key");
consumer.acknowledge(message);

thread.interrupt();
thread.join();
consumer.close();
}

@Test
public void testMessageKeyCannotBeUsedWithMessageKeyGenerationMode() {
PerformanceProducer producer = new PerformanceProducer();
producer.topics = List.of(testTopic + UUID.randomUUID());
producer.messageKey = "hot-key";
producer.messageKeyGenerationMode = "random";

Assert.expectThrows(Exception.class, producer::validate);
}

@Test
public void testCustomMessageRoutingModeIsRejected() {
PerformanceProducer producer = new PerformanceProducer();
producer.topics = List.of(testTopic + UUID.randomUUID());
producer.messageRoutingMode = MessageRoutingMode.CustomPartition;

Assert.expectThrows(Exception.class, producer::validate);
}

@Test(timeOut = 20000)
public void testProducerFaultScenarioBuilderOptions() throws Exception {
String topic = testTopic + UUID.randomUUID();
PerformanceProducer producer = new PerformanceProducer();
producer.topics = List.of(topic);
producer.serviceURL = pulsar.getBrokerServiceUrl();

ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(producer)
.enableTransaction(producer.isEnableTransaction);
@Cleanup
PulsarClient client = clientBuilder.build();
ProducerBuilderImpl<byte[]> defaultBuilder = (ProducerBuilderImpl<byte[]>) producer.createProducerBuilder(
client, 0);
Assert.assertEquals(defaultBuilder.getConf().getMessageRoutingMode(), MessageRoutingMode.RoundRobinPartition);
Assert.assertTrue(defaultBuilder.getConf().isBlockIfQueueFull());

producer.messageRoutingMode = MessageRoutingMode.SinglePartition;
producer.blockIfQueueFull = false;
ProducerBuilderImpl<byte[]> faultBuilder = (ProducerBuilderImpl<byte[]>) producer.createProducerBuilder(
client, 0);
Assert.assertEquals(faultBuilder.getConf().getMessageRoutingMode(), MessageRoutingMode.SinglePartition);
Assert.assertFalse(faultBuilder.getConf().isBlockIfQueueFull());
}

@Test(timeOut = 20000)
public void testCreatePartitions() throws Exception {
String argString = "%s -r 10 -u %s -au %s -m 5 -np 10";
Expand Down
Loading