Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1686,7 +1686,7 @@ public void operationFailed(ManagedLedgerException exception) {

persistentMarkDeletePosition = null;
inProgressMarkDeletePersistPosition = null;
internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? null : Collections.emptyMap(),
new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
Expand Down Expand Up @@ -6176,6 +6177,89 @@ public void testPersistBatchDeletedIndexesTruncatedCounter(int maxBatchIndexes,
}
}

@Test
@SuppressWarnings("unchecked")
public void testCompactionCursorResetNeverLoseMarkDeleteProperties() throws Exception {
@Cleanup
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(
"testCompactionCursorResetNeverLoseMarkDeleteProperties",
new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("__compaction");
ManagedCursorImpl spyCursor = spy(cursor);
ledger.getCursors().removeCursor(cursor.getName());
ledger.getCursors().add(spyCursor, null);

ledger.addEntry("entry-1".getBytes(Encoding));
Position markDeletePosition = ledger.addEntry("entry-2".getBytes(Encoding));

String compactedLedgerProperty = "CompactedTopicLedger";
Map<String, Long> properties = Map.of(compactedLedgerProperty, 123456L);

CountDownLatch markDeleteEntered = new CountDownLatch(1);
CountDownLatch resetEntered = new CountDownLatch(1);
CountDownLatch markDeleteReturned = new CountDownLatch(1);
CountDownLatch markDeleteCompleted = new CountDownLatch(1);
CountDownLatch resetCompleted = new CountDownLatch(1);

doAnswer(invocation -> {
Map<String, Long> invocationProperties = invocation.getArgument(1);
if (invocationProperties != null && invocationProperties.containsKey(compactedLedgerProperty)) {
// Hold the compaction mark-delete after it enters internalAsyncMarkDelete, but before its
// properties can update lastMarkDeleteEntry.
markDeleteEntered.countDown();
assertTrue(resetEntered.await(5, TimeUnit.SECONDS));
try {
return invocation.callRealMethod();
} finally {
markDeleteReturned.countDown();
}
}

if (invocationProperties == null || invocationProperties.isEmpty()) {
// Let reset capture its properties argument first, then persist it only after the compaction
// mark-delete has completed the real internalAsyncMarkDelete call.
resetEntered.countDown();
assertTrue(markDeleteReturned.await(5, TimeUnit.SECONDS));
return invocation.callRealMethod();
}

return invocation.callRealMethod();
}).when(spyCursor).internalAsyncMarkDelete(any(Position.class), nullable(Map.class),
any(MarkDeleteCallback.class), nullable(Object.class), nullable(Runnable.class));

// Start compaction mark-delete from another thread because the spy intentionally blocks it.
CompletableFuture.runAsync(() -> spyCursor.asyncMarkDelete(
markDeletePosition, properties, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
markDeleteCompleted.countDown();
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}
}, null));

assertTrue(markDeleteEntered.await(5, TimeUnit.SECONDS));
// Reset the compaction cursor while the previous mark-delete with properties is still in progress.
spyCursor.asyncResetCursor(markDeletePosition, false, new AsyncCallbacks.ResetCursorCallback() {
@Override
public void resetComplete(Object ctx) {
resetCompleted.countDown();
}

@Override
public void resetFailed(ManagedLedgerException exception, Object ctx) {
}
});

assertTrue(markDeleteCompleted.await(5, TimeUnit.SECONDS));
assertTrue(resetCompleted.await(5, TimeUnit.SECONDS));

assertEquals(spyCursor.getMarkDeletedPosition(), markDeletePosition);
assertEquals(spyCursor.getProperties(), properties);
}

@SuppressWarnings("try")
class TestPulsarMockBookKeeper extends PulsarMockBookKeeper {
Map<Long, Integer> ledgerErrors = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,7 @@ public void findEntryComplete(Position position, Object ctx) {
.attr("position", position)
.log("Expiring all messages until position");
Position prevMarkDeletePos = cursor.getMarkDeletedPosition();
cursor.asyncMarkDelete(position, cursor.getProperties(), markDeleteCallback,
cursor.getNumberOfEntriesInBacklog(false));
cursor.asyncMarkDelete(position, null, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false));
if (!Objects.equals(cursor.getMarkDeletedPosition(), prevMarkDeletePos) && subscription != null) {
subscription.updateLastMarkDeleteAdvancedTimestamp();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand All @@ -34,11 +35,14 @@
import io.netty.buffer.UnpooledByteBufAllocator;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -1098,4 +1102,58 @@ public void testGetFindPositionRange_SingleClosedLedger() {
assertNull(range.getRight());
assertEquals(range.getLeft(), PositionFactory.create(1, 9));
}

@Test
@SuppressWarnings("unchecked")
void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception {
final String ledgerAndCursorName = "testExpireMessagesNeverLoseMarkDeleteProperties";

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(10);
config.setRetentionTime(1, TimeUnit.HOURS);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
ManagedCursorImpl spyCursor = spy(cursor);

Position pos1 = ledger.addEntry(createMessageWrittenToLedger("msg-1"));
Position pos2 = ledger.addEntry(createMessageWrittenToLedger("msg-2"));

CountDownLatch expiryMarkDeleteEnteredLatch = new CountDownLatch(1);
CountDownLatch cursorMarkDeleteCompletedLatch = new CountDownLatch(1);
CountDownLatch expiryMarkDeleteCompletedLatch = new CountDownLatch(1);

doAnswer(invocation -> {
Map<String, Long> invocationProperties = invocation.getArgument(1);
// Pause the expiry-triggered mark-delete so the user markDelete() can complete first.
if (invocationProperties == null || invocationProperties.isEmpty()) {
expiryMarkDeleteEnteredLatch.countDown();
assertTrue(cursorMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS));
try {
return invocation.callRealMethod();
} finally {
expiryMarkDeleteCompletedLatch.countDown();
}
}

return invocation.callRealMethod();
}).when(spyCursor)
.asyncMarkDelete(any(Position.class), nullable(Map.class), any(AsyncCallbacks.MarkDeleteCallback.class),
nullable(Object.class));

PersistentTopic topic = mockPersistentTopic("topicname");
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(topic,
spyCursor.getName(), spyCursor, null);

CompletableFuture.runAsync(() -> monitor.findEntryComplete(pos2, null));
assertTrue(expiryMarkDeleteEnteredLatch.await(5, TimeUnit.SECONDS));

Map<String, Long> properties = new HashMap<>();
properties.put("test-property", 1L);
spyCursor.markDelete(pos1, properties);
cursorMarkDeleteCompletedLatch.countDown();

assertTrue(expiryMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS));
assertEquals(spyCursor.getMarkDeletedPosition(), pos2);
assertEquals(spyCursor.getProperties(), properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ public void testTlsAuthDisallowInsecure() throws Exception {

@Test
public void testRateLimiting() throws Exception {
setupEnv(false, false, false, false, 10.0, false);
double rateLimit = 10.0;
setupEnv(false, false, false, false, rateLimit, false);

// setupEnv makes HTTP calls to create the cluster, tenant, and namespace.
var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
Expand All @@ -282,7 +283,7 @@ public void testRateLimiting() throws Exception {
// Make requests without exceeding the max rate
for (int i = 0; i < 5; i++) {
makeHttpRequest(false, false);
Thread.sleep(200);
Thread.sleep(rateLimitPauseMillis(rateLimit));
}

metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
Expand Down Expand Up @@ -572,13 +573,32 @@ private void setupEnv(boolean enableFilter, boolean enableTls, boolean enableAut
} catch (ConflictException ce) {
// This is OK.
}
sleepForRateLimiter(rateLimit);

try {
pulsarAdmin.tenants().createTenant("my-property",
TenantInfo.builder().allowedClusters(Sets.newHashSet(config.getClusterName())).build());
} catch (Exception e) {
// This is OK.
}
sleepForRateLimiter(rateLimit);

try {
pulsarAdmin.namespaces().createNamespace("my-property/my-namespace");
} catch (Exception e) {
// This is OK.
}
sleepForRateLimiter(rateLimit);
}

private static void sleepForRateLimiter(double rateLimit) throws InterruptedException {
if (rateLimit > 0) {
Thread.sleep(rateLimitPauseMillis(rateLimit));
}
}

private static long rateLimitPauseMillis(double rateLimit) {
return (long) Math.ceil((1000.0 / rateLimit) * 2);
}

@AfterMethod(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,28 @@

import io.netty.util.HashedWheelTimer;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class ProducerCleanupTest extends SharedPulsarBaseTest {

@Test
public void testAllTimerTaskShouldCanceledAfterProducerClosed() throws PulsarClientException, InterruptedException {
Producer<byte[]> producer = pulsarClient.newProducer()
public void testAllTimerTaskShouldCanceledAfterProducerClosed() throws PulsarClientException {
@Cleanup
PulsarClient client = newPulsarClient();
Producer<byte[]> producer = client.newProducer()
.topic(newTopicName())
.sendTimeout(1, TimeUnit.SECONDS)
.sendTimeout(15, TimeUnit.SECONDS)
.create();
producer.close();
Thread.sleep(2000);
HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
Assert.assertEquals(timer.pendingTimeouts(), 0);
HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) client).timer();
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -991,13 +991,17 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
// synchronize redeliverUnacknowledgedMessages().
incomingQueueLock.lock();
try {
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message
// instance anymore, since for pooled messages, this instance was possibly already been released
// and recycled.
if (canEnqueueMessage(message)) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
if (incomingMessages.offer(message)) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message
// instance anymore, since for pooled messages, this instance was possibly already been released
// and recycled.
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
} else {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -messageSize);
}
}
} finally {
incomingQueueLock.unlock();
Expand Down
41 changes: 28 additions & 13 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ CLOSE:
case cm := <-channel:
msgInput := cm.Message
atMostOnce := gi.context.instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATMOST_ONCE
atLeastOnce := gi.context.instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATLEAST_ONCE
autoAck := gi.context.instanceConf.funcDetails.AutoAck //nolint:staticcheck
if autoAck && atMostOnce {
gi.ackInputMessage(msgInput)
Expand All @@ -177,12 +176,8 @@ CLOSE:

output, err := gi.handlerMsg(msgInput)
if err != nil {
log.Errorf("handler message error:%v", err)
if autoAck && atLeastOnce {
gi.nackInputMessage(msgInput)
}
gi.stats.incrTotalUserExceptions(err)
return err
gi.handleUserError(msgInput, err)
continue
}

gi.stats.processTimeEnd()
Expand Down Expand Up @@ -391,6 +386,29 @@ func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) {
return channel, nil
}

func (gi *goInstance) shouldNackInputOnFailure() bool {
guarantee := gi.context.instanceConf.funcDetails.ProcessingGuarantees
return guarantee == pb.ProcessingGuarantees_ATLEAST_ONCE ||
guarantee == pb.ProcessingGuarantees_MANUAL
}

func (gi *goInstance) handleUserError(msgInput pulsar.Message, err error) {
log.Errorf("handler message error:%v", err)
if gi.shouldNackInputOnFailure() {
gi.nackInputMessage(msgInput)
}
gi.stats.incrTotalUserExceptions(err)
gi.stats.processTimeEnd()
}

func (gi *goInstance) handlePublishError(msgInput pulsar.Message, err error) {
if gi.context.instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATLEAST_ONCE {
gi.nackInputMessage(msgInput)
}
gi.stats.incrTotalSysExceptions(err)
log.Errorf("failed to publish output message: %v", err)
}

func (gi *goInstance) handlerMsg(input pulsar.Message) (output []byte, err error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -420,11 +438,8 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) {
// semantics, ensure we nack so someone else can get it, in case we are the only handler. Then mark
// exception and fail out.
if err != nil {
if autoAck && atLeastOnce {
gi.nackInputMessage(msgInput)
}
gi.stats.incrTotalSysExceptions(err)
log.Fatal(err)
gi.handlePublishError(msgInput, err)
return
}
// Otherwise the message succeeded. If the SDK is entrusted with responding and we are using
// atLeastOnce delivery semantics, ack the message.
Expand All @@ -437,7 +452,7 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) {
return
}

// No output from the function or no output topic. Ack if we need to and mark the success before rturning.
// No output from the function or no output topic. Ack if we need to and mark the success before returning.
if autoAck && atLeastOnce {
gi.ackInputMessage(msgInput)
}
Expand Down
Loading
Loading