diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index c42ccf69f8610..67d392f78f57f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1686,7 +1686,7 @@ public void operationFailed(ManagedLedgerException exception) { persistentMarkDeletePosition = null; inProgressMarkDeletePersistPosition = null; - internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(), + internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? null : Collections.emptyMap(), new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index a3eb475633583..d972b63a2949f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.eq; @@ -6176,6 +6177,89 @@ public void testPersistBatchDeletedIndexesTruncatedCounter(int maxBatchIndexes, } } + @Test + @SuppressWarnings("unchecked") + public void testCompactionCursorResetNeverLoseMarkDeleteProperties() throws Exception { + @Cleanup + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open( + "testCompactionCursorResetNeverLoseMarkDeleteProperties", + new ManagedLedgerConfig().setMaxEntriesPerLedger(10)); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("__compaction"); + ManagedCursorImpl spyCursor = spy(cursor); + ledger.getCursors().removeCursor(cursor.getName()); + ledger.getCursors().add(spyCursor, null); + + ledger.addEntry("entry-1".getBytes(Encoding)); + Position markDeletePosition = ledger.addEntry("entry-2".getBytes(Encoding)); + + String compactedLedgerProperty = "CompactedTopicLedger"; + Map properties = Map.of(compactedLedgerProperty, 123456L); + + CountDownLatch markDeleteEntered = new CountDownLatch(1); + CountDownLatch resetEntered = new CountDownLatch(1); + CountDownLatch markDeleteReturned = new CountDownLatch(1); + CountDownLatch markDeleteCompleted = new CountDownLatch(1); + CountDownLatch resetCompleted = new CountDownLatch(1); + + doAnswer(invocation -> { + Map invocationProperties = invocation.getArgument(1); + if (invocationProperties != null && invocationProperties.containsKey(compactedLedgerProperty)) { + // Hold the compaction mark-delete after it enters internalAsyncMarkDelete, but before its + // properties can update lastMarkDeleteEntry. + markDeleteEntered.countDown(); + assertTrue(resetEntered.await(5, TimeUnit.SECONDS)); + try { + return invocation.callRealMethod(); + } finally { + markDeleteReturned.countDown(); + } + } + + if (invocationProperties == null || invocationProperties.isEmpty()) { + // Let reset capture its properties argument first, then persist it only after the compaction + // mark-delete has completed the real internalAsyncMarkDelete call. + resetEntered.countDown(); + assertTrue(markDeleteReturned.await(5, TimeUnit.SECONDS)); + return invocation.callRealMethod(); + } + + return invocation.callRealMethod(); + }).when(spyCursor).internalAsyncMarkDelete(any(Position.class), nullable(Map.class), + any(MarkDeleteCallback.class), nullable(Object.class), nullable(Runnable.class)); + + // Start compaction mark-delete from another thread because the spy intentionally blocks it. + CompletableFuture.runAsync(() -> spyCursor.asyncMarkDelete( + markDeletePosition, properties, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + markDeleteCompleted.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + } + }, null)); + + assertTrue(markDeleteEntered.await(5, TimeUnit.SECONDS)); + // Reset the compaction cursor while the previous mark-delete with properties is still in progress. + spyCursor.asyncResetCursor(markDeletePosition, false, new AsyncCallbacks.ResetCursorCallback() { + @Override + public void resetComplete(Object ctx) { + resetCompleted.countDown(); + } + + @Override + public void resetFailed(ManagedLedgerException exception, Object ctx) { + } + }); + + assertTrue(markDeleteCompleted.await(5, TimeUnit.SECONDS)); + assertTrue(resetCompleted.await(5, TimeUnit.SECONDS)); + + assertEquals(spyCursor.getMarkDeletedPosition(), markDeletePosition); + assertEquals(spyCursor.getProperties(), properties); + } + @SuppressWarnings("try") class TestPulsarMockBookKeeper extends PulsarMockBookKeeper { Map ledgerErrors = new HashMap<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index a9f7e30510413..2848ba2a0d24d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -245,8 +245,7 @@ public void findEntryComplete(Position position, Object ctx) { .attr("position", position) .log("Expiring all messages until position"); Position prevMarkDeletePos = cursor.getMarkDeletedPosition(); - cursor.asyncMarkDelete(position, cursor.getProperties(), markDeleteCallback, - cursor.getNumberOfEntriesInBacklog(false)); + cursor.asyncMarkDelete(position, null, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false)); if (!Objects.equals(cursor.getMarkDeletedPosition(), prevMarkDeletePos) && subscription != null) { subscription.updateLastMarkDeleteAdvancedTimestamp(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 8d9a2e4578012..5bf4e1f97d462 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -34,11 +35,14 @@ import io.netty.buffer.UnpooledByteBufAllocator; import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1098,4 +1102,58 @@ public void testGetFindPositionRange_SingleClosedLedger() { assertNull(range.getRight()); assertEquals(range.getLeft(), PositionFactory.create(1, 9)); } + + @Test + @SuppressWarnings("unchecked") + void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception { + final String ledgerAndCursorName = "testExpireMessagesNeverLoseMarkDeleteProperties"; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(10); + config.setRetentionTime(1, TimeUnit.HOURS); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + ManagedCursorImpl spyCursor = spy(cursor); + + Position pos1 = ledger.addEntry(createMessageWrittenToLedger("msg-1")); + Position pos2 = ledger.addEntry(createMessageWrittenToLedger("msg-2")); + + CountDownLatch expiryMarkDeleteEnteredLatch = new CountDownLatch(1); + CountDownLatch cursorMarkDeleteCompletedLatch = new CountDownLatch(1); + CountDownLatch expiryMarkDeleteCompletedLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + Map invocationProperties = invocation.getArgument(1); + // Pause the expiry-triggered mark-delete so the user markDelete() can complete first. + if (invocationProperties == null || invocationProperties.isEmpty()) { + expiryMarkDeleteEnteredLatch.countDown(); + assertTrue(cursorMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS)); + try { + return invocation.callRealMethod(); + } finally { + expiryMarkDeleteCompletedLatch.countDown(); + } + } + + return invocation.callRealMethod(); + }).when(spyCursor) + .asyncMarkDelete(any(Position.class), nullable(Map.class), any(AsyncCallbacks.MarkDeleteCallback.class), + nullable(Object.class)); + + PersistentTopic topic = mockPersistentTopic("topicname"); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(topic, + spyCursor.getName(), spyCursor, null); + + CompletableFuture.runAsync(() -> monitor.findEntryComplete(pos2, null)); + assertTrue(expiryMarkDeleteEnteredLatch.await(5, TimeUnit.SECONDS)); + + Map properties = new HashMap<>(); + properties.put("test-property", 1L); + spyCursor.markDelete(pos1, properties); + cursorMarkDeleteCompletedLatch.countDown(); + + assertTrue(expiryMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS)); + assertEquals(spyCursor.getMarkDeletedPosition(), pos2); + assertEquals(spyCursor.getProperties(), properties); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java index bc4260ec771af..adf9132f9a94a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java @@ -268,7 +268,8 @@ public void testTlsAuthDisallowInsecure() throws Exception { @Test public void testRateLimiting() throws Exception { - setupEnv(false, false, false, false, 10.0, false); + double rateLimit = 10.0; + setupEnv(false, false, false, false, rateLimit, false); // setupEnv makes HTTP calls to create the cluster, tenant, and namespace. var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); @@ -282,7 +283,7 @@ public void testRateLimiting() throws Exception { // Make requests without exceeding the max rate for (int i = 0; i < 5; i++) { makeHttpRequest(false, false); - Thread.sleep(200); + Thread.sleep(rateLimitPauseMillis(rateLimit)); } metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); @@ -572,13 +573,32 @@ private void setupEnv(boolean enableFilter, boolean enableTls, boolean enableAut } catch (ConflictException ce) { // This is OK. } + sleepForRateLimiter(rateLimit); + try { pulsarAdmin.tenants().createTenant("my-property", TenantInfo.builder().allowedClusters(Sets.newHashSet(config.getClusterName())).build()); + } catch (Exception e) { + // This is OK. + } + sleepForRateLimiter(rateLimit); + + try { pulsarAdmin.namespaces().createNamespace("my-property/my-namespace"); } catch (Exception e) { // This is OK. } + sleepForRateLimiter(rateLimit); + } + + private static void sleepForRateLimiter(double rateLimit) throws InterruptedException { + if (rateLimit > 0) { + Thread.sleep(rateLimitPauseMillis(rateLimit)); + } + } + + private static long rateLimitPauseMillis(double rateLimit) { + return (long) Math.ceil((1000.0 / rateLimit) * 2); } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java index 2a7fcb3eb17af..09c0113c0a163 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java @@ -20,8 +20,10 @@ import io.netty.util.HashedWheelTimer; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.pulsar.broker.service.SharedPulsarBaseTest; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.Test; @@ -29,14 +31,17 @@ public class ProducerCleanupTest extends SharedPulsarBaseTest { @Test - public void testAllTimerTaskShouldCanceledAfterProducerClosed() throws PulsarClientException, InterruptedException { - Producer producer = pulsarClient.newProducer() + public void testAllTimerTaskShouldCanceledAfterProducerClosed() throws PulsarClientException { + @Cleanup + PulsarClient client = newPulsarClient(); + Producer producer = client.newProducer() .topic(newTopicName()) - .sendTimeout(1, TimeUnit.SECONDS) + .sendTimeout(15, TimeUnit.SECONDS) .create(); producer.close(); - Thread.sleep(2000); - HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer(); - Assert.assertEquals(timer.pendingTimeouts(), 0); + HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) client).timer(); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0)); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index fe9ec2d59c477..ddf0d1a219da0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -991,13 +991,17 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message message) { // synchronize redeliverUnacknowledgedMessages(). incomingQueueLock.lock(); try { - if (canEnqueueMessage(message) && incomingMessages.offer(message)) { - // After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message - // instance anymore, since for pooled messages, this instance was possibly already been released - // and recycled. + if (canEnqueueMessage(message)) { INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize); - getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize)); - updateAutoScaleReceiverQueueHint(); + if (incomingMessages.offer(message)) { + // After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message + // instance anymore, since for pooled messages, this instance was possibly already been released + // and recycled. + getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize)); + updateAutoScaleReceiverQueueHint(); + } else { + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -messageSize); + } } } finally { incomingQueueLock.unlock(); diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go index af8a4e0157b76..2cdfc8a6e9497 100644 --- a/pulsar-function-go/pf/instance.go +++ b/pulsar-function-go/pf/instance.go @@ -164,7 +164,6 @@ CLOSE: case cm := <-channel: msgInput := cm.Message atMostOnce := gi.context.instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATMOST_ONCE - atLeastOnce := gi.context.instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATLEAST_ONCE autoAck := gi.context.instanceConf.funcDetails.AutoAck //nolint:staticcheck if autoAck && atMostOnce { gi.ackInputMessage(msgInput) @@ -177,12 +176,8 @@ CLOSE: output, err := gi.handlerMsg(msgInput) if err != nil { - log.Errorf("handler message error:%v", err) - if autoAck && atLeastOnce { - gi.nackInputMessage(msgInput) - } - gi.stats.incrTotalUserExceptions(err) - return err + gi.handleUserError(msgInput, err) + continue } gi.stats.processTimeEnd() @@ -391,6 +386,29 @@ func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) { return channel, nil } +func (gi *goInstance) shouldNackInputOnFailure() bool { + guarantee := gi.context.instanceConf.funcDetails.ProcessingGuarantees + return guarantee == pb.ProcessingGuarantees_ATLEAST_ONCE || + guarantee == pb.ProcessingGuarantees_MANUAL +} + +func (gi *goInstance) handleUserError(msgInput pulsar.Message, err error) { + log.Errorf("handler message error:%v", err) + if gi.shouldNackInputOnFailure() { + gi.nackInputMessage(msgInput) + } + gi.stats.incrTotalUserExceptions(err) + gi.stats.processTimeEnd() +} + +func (gi *goInstance) handlePublishError(msgInput pulsar.Message, err error) { + if gi.context.instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATLEAST_ONCE { + gi.nackInputMessage(msgInput) + } + gi.stats.incrTotalSysExceptions(err) + log.Errorf("failed to publish output message: %v", err) +} + func (gi *goInstance) handlerMsg(input pulsar.Message) (output []byte, err error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -420,11 +438,8 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) { // semantics, ensure we nack so someone else can get it, in case we are the only handler. Then mark // exception and fail out. if err != nil { - if autoAck && atLeastOnce { - gi.nackInputMessage(msgInput) - } - gi.stats.incrTotalSysExceptions(err) - log.Fatal(err) + gi.handlePublishError(msgInput, err) + return } // Otherwise the message succeeded. If the SDK is entrusted with responding and we are using // atLeastOnce delivery semantics, ack the message. @@ -437,7 +452,7 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) { return } - // No output from the function or no output topic. Ack if we need to and mark the success before rturning. + // No output from the function or no output topic. Ack if we need to and mark the success before returning. if autoAck && atLeastOnce { gi.ackInputMessage(msgInput) } diff --git a/pulsar-function-go/pf/instance_test.go b/pulsar-function-go/pf/instance_test.go index bf45ae3a8917e..447d3a22f8d88 100644 --- a/pulsar-function-go/pf/instance_test.go +++ b/pulsar-function-go/pf/instance_test.go @@ -27,6 +27,8 @@ import ( "time" "github.com/stretchr/testify/assert" + + pb "github.com/apache/pulsar/pulsar-function-go/pb" ) func testProcessSpawnerHealthCheckTimer( @@ -115,3 +117,33 @@ func Test_goInstance_handlerMsg(t *testing.T) { assert.Equal(t, "output", string(output)) assert.Equal(t, message, fc.record) } + +func newTestGoInstance(guarantee pb.ProcessingGuarantees) *goInstance { + return &goInstance{ + context: &FunctionContext{ + instanceConf: &instanceConf{ + funcDetails: pb.FunctionDetails{ + ProcessingGuarantees: guarantee, + }, + }, + }, + } +} + +func TestShouldNackInputOnFailure(t *testing.T) { + tests := []struct { + name string + guarantee pb.ProcessingGuarantees + want bool + }{ + {"atLeastOnce", pb.ProcessingGuarantees_ATLEAST_ONCE, true}, + {"manual", pb.ProcessingGuarantees_MANUAL, true}, + {"atMostOnce", pb.ProcessingGuarantees_ATMOST_ONCE, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + instance := newTestGoInstance(tt.guarantee) + assert.Equal(t, tt.want, instance.shouldNackInputOnFailure()) + }) + } +} diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile index 19f5cd5e28d17..4867bfc21c4fd 100644 --- a/tests/docker-images/latest-version-image/Dockerfile +++ b/tests/docker-images/latest-version-image/Dockerfile @@ -24,6 +24,7 @@ ARG GOLANG_IMAGE FROM $GOLANG_IMAGE as pulsar-function-go COPY target/pulsar-function-go/ /go/src/github.com/apache/pulsar/pulsar-function-go +COPY go-examples/exceptionFunc/ /go/src/github.com/apache/pulsar/pulsar-function-go/examples/exceptionFunc/ RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go && go install ./... RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/pf && go install RUN cd /go/src/github.com/apache/pulsar/pulsar-function-go/examples && go install ./... diff --git a/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go b/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go new file mode 100644 index 0000000000000..80ace6e21b9b4 --- /dev/null +++ b/tests/docker-images/latest-version-image/go-examples/exceptionFunc/exceptionFunc.go @@ -0,0 +1,41 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package main + +import ( + "context" + "errors" + + "github.com/apache/pulsar/pulsar-function-go/pf" +) + +var i int + +func HandleException(ctx context.Context, in []byte) ([]byte, error) { + i++ + if i%10 == 0 { + return nil, errors.New("test") + } + return []byte(string(in) + "!"), nil +} + +func main() { + pf.Start(HandleException) +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 86dc71f16fb6e..419482cea97c9 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -410,6 +410,10 @@ protected void testFunctionNegAck(Runtime runtime) throws Exception { submitFunction( runtime, inputTopicName, outputTopicName, functionName, EXCEPTION_FUNCTION_PYTHON_FILE, EXCEPTION_PYTHON_CLASS, schema, null); + } else if (runtime == Runtime.GO) { + submitFunction( + runtime, inputTopicName, outputTopicName, functionName, EXCEPTION_GO_FILE, + null, schema, null); } else { submitFunction( runtime, inputTopicName, outputTopicName, functionName, null, EXCEPTION_JAVA_CLASS, schema, null); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java index 890b0eab0d8ec..c041f1bcf7375 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java @@ -78,6 +78,7 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite { public static final String EXCLAMATION_GO_FILE = "exclamationFunc"; public static final String PUBLISH_FUNCTION_GO_FILE = "exclamationFunc"; + public static final String EXCEPTION_GO_FILE = "exceptionFunc"; public static final String LOGGING_JAVA_CLASS = "org.apache.pulsar.functions.api.examples.LoggingFunction"; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java index 0550fd94ebee2..6e631adafbf7f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/go/PulsarFunctionsGoTest.java @@ -39,4 +39,9 @@ public void testGoExclamationMultiInputsFunction() throws Exception { testExclamationFunction(Runtime.GO, false, false, true, false); } + @Test(groups = {"go_function", "function"}) + public void testGoFunctionNegAck() throws Exception { + testFunctionNegAck(Runtime.GO); + } + }