Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,29 +69,23 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
try {
Position lastPositionForBatch = entries.get(entries.size() - 1).getPosition();
lastSeenPosition = lastPositionForBatch;
// filter out the entry if it has been already deleted
// filterReadEntries will call entry.release if the entry is filtered out
List<Entry> entriesFiltered = this.cursor.filterReadEntries(entries);
int skippedEntries = entries.size() - entriesFiltered.size();
remainingEntries.addAndGet(-skippedEntries);
if (!entriesFiltered.isEmpty()) {
for (Entry entry : entriesFiltered) {
if (remainingEntries.decrementAndGet() <= 0) {
log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
return;
}
if (!condition.test(entry)) {
log.warn("[{}] Scan abort due to user code", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
return;
}
for (Entry entry : entries) {
if (remainingEntries.getAndDecrement() <= 0) {
log.info("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
return;
}
if (!condition.test(entry)) {
log.info("[{}] Scan abort due to user code", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
return;
}
}
Comment thread
lhotari marked this conversation as resolved.
searchPosition = ledger.getPositionAfterN(lastPositionForBatch, 1,
PositionBound.startExcluded);
if (log.isDebugEnabled()) {
log.debug("readEntryComplete {} at {} next is {}", lastPositionForBatch, searchPosition);
log.debug("[{}] readEntryComplete at {} next is {}", OpScan.this.cursor, lastPositionForBatch,
searchPosition);
}

if (searchPosition.compareTo(lastPositionForBatch) == 0) {
Expand All @@ -117,12 +111,12 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

public void find() {
if (remainingEntries.get() <= 0) {
log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
log.info("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
return;
}
if (System.currentTimeMillis() - startTime > timeOutMs) {
log.warn("[{}] Scan abort after hitting the deadline", OpScan.this.cursor);
log.info("[{}] Scan abort after hitting the deadline", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
Expand All @@ -50,6 +53,12 @@ public void setup() throws Exception {
producerBaseSetup();
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();
conf.setDispatcherMaxReadBatchSize(10);
}

@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
Expand Down Expand Up @@ -189,4 +198,190 @@ public void partitionedTopicNotAllowed() throws Exception {
assertEquals(0, analyzeSubscriptionBacklogResult.getEntries());
}

@Test
public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Exception {
int serverSubscriptionBacklogScanMaxEntries = 20;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);

String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-without-loop";
String subName = "sub-1";
int numMessages = 10;

// Test server returns false aborted flag.
List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);

verifyClientSideLoopBacklog(topic, subName, numMessages - 1, numMessages, messageIds.get(0),
messageIds.get(numMessages - 1));
}

@Test
public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception {
int serverSubscriptionBacklogScanMaxEntries = 20;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);

String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-without-loop";
String subName = "sub-1";
int numMessages = 25;

// Test backlogScanMaxEntries(client side) <= subscriptionBacklogScanMaxEntries(server side), but server
// returns true aborted flag. Server dispatcherMaxReadBatchSize is set to 10.
List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);

verifyClientSideLoopBacklog(topic, subName, serverSubscriptionBacklogScanMaxEntries - 1,
serverSubscriptionBacklogScanMaxEntries, messageIds.get(0),
messageIds.get(serverSubscriptionBacklogScanMaxEntries - 1));

}

@Test
public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exception {
int serverSubscriptionBacklogScanMaxEntries = 20;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);

String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-with-loop";
String subName = "sub-1";
int numMessages = 45;

// Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination
// condition is that server returns false aborted flag.
List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);

verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, messageIds.get(0),
messageIds.get(numMessages - 1));
}

@Test
public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception {
int serverSubscriptionBacklogScanMaxEntries = 15;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);

String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-with-loop";
String subName = "sub-1";
int numMessages = 55;
int backlogScanMaxEntries = 40;

// Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination
// condition is that total entries exceeds backlogScanMaxEntries.
// Server dispatcherMaxReadBatchSize is set to 10.
List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);

// Broker returns 15 + 15 + 15 = 45 entries.
int expectedEntries = (backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries + 1)
* serverSubscriptionBacklogScanMaxEntries;
verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, expectedEntries, messageIds.get(0),
messageIds.get(expectedEntries - 1));
}

@Test
public void analyzeBacklogWithTopicUnload() throws Exception {
int serverSubscriptionBacklogScanMaxEntries = 10;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);

String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-unload";
String subName = "sub-1";
int numMessages = 35;

admin.topics().createSubscription(topic, subName, MessageId.latest);

assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
verifyBacklog(topic, subName, 0, 0);

// Test client side loop with topic unload. Use sync send method here to avoid potential message duplication.
@Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
List<MessageId> messageIds = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
MessageId messageId = producer.send(("test-" + i).getBytes());
messageIds.add(messageId);
if (RandomUtils.secure().randomBoolean()) {
admin.topics().unload(topic);
}
}

verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, messageIds.get(0),
messageIds.get(numMessages - 1));
}

@Test
public void analyzeBacklogWithIndividualAck() throws Exception {
int serverSubscriptionBacklogScanMaxEntries = 20;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);

String topic = "persistent://my-property/my-ns/analyze-backlog-with-individual-ack";
String subName = "sub-1";
int messages = 55;

// Test client side loop with individual ack.
List<MessageId> messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, messages);

// We want to wait for the server to process acks, in order to not have a flaky test.
@Cleanup Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).isAckReceiptEnabled(true).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).subscribe();

// Individual ack message2.
Message<byte[]> message1 = consumer.receive();
Message<byte[]> message2 = consumer.receive();
consumer.acknowledge(message2);

int backlogScanMaxEntries = 20;
verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, backlogScanMaxEntries, messageIds.get(0),
messageIds.get(backlogScanMaxEntries));

// Ack message1.
consumer.acknowledge(message1);
verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, backlogScanMaxEntries, messageIds.get(2),
messageIds.get(backlogScanMaxEntries + 1));

// Ack all messages.
for (int i = 2; i < messages; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
}

verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 0, null, null);
}

private List<MessageId> clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages)
throws Exception {
admin.topics().createSubscription(topic, subName, MessageId.latest);

assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
verifyClientSideLoopBacklog(topic, subName, -1, 0, null, null);

@Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
CompletableFuture<MessageId> future = producer.sendAsync(("test-" + i).getBytes());
futures.add(future);
}
FutureUtil.waitForAll(futures).get();
return futures.stream().map(CompletableFuture::join).toList();
}

private void verifyClientSideLoopBacklog(String topic, String subName, int backlogMaxScanEntries,
int expectedEntries, MessageId firstMessageId, MessageId lastMessageId)
throws Exception {
AnalyzeSubscriptionBacklogResult backlogResult =
admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogMaxScanEntries);

assertEquals(backlogResult.getEntries(), expectedEntries);
assertEquals(backlogResult.getMessages(), expectedEntries);

if (firstMessageId == null) {
assertNull(backlogResult.getFirstMessageId());
} else {
MessageIdAdv firstMessageIdAdv = (MessageIdAdv) firstMessageId;
assertEquals(backlogResult.getFirstMessageId(),
firstMessageIdAdv.getLedgerId() + ":" + firstMessageIdAdv.getEntryId());
}

if (lastMessageId == null) {
assertNull(backlogResult.getLastMessageId());
} else {
MessageIdAdv lastMessageIdAdv = (MessageIdAdv) lastMessageId;
assertEquals(backlogResult.getLastMessageId(),
lastMessageIdAdv.getLedgerId() + ":" + lastMessageIdAdv.getEntryId());
}
}

}
Loading
Loading