Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr&
case Closing:
case Closed:
case Producer_Fenced:
case Terminated:
case Failed:
LOG_DEBUG(getName() << "Ignoring connection closed event since the handler is not used anymore");
break;
Expand Down
2 changes: 2 additions & 0 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
Failed, // Handler is failed, in Java client: HandlerState.State.Failed
Producer_Fenced, // The producer has been fenced by the broker
// in Java client: HandlerState.State.ProducerFenced
Terminated, // The topic has been terminatedproducer has been fenced by the broker
// in Java client: HandlerState.State.Terminated
};

std::atomic<State> state_;
Expand Down
7 changes: 5 additions & 2 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result
}
}

if (result == ResultProducerFenced) {
state_ = Producer_Fenced;
if (result == ResultProducerFenced || result == ResultTopicTerminated) {
state_ = result == ResultProducerFenced ? Producer_Fenced : Terminated;
failPendingMessages(result, false);
auto client = client_.lock();
if (client) {
Expand Down Expand Up @@ -450,6 +450,9 @@ bool ProducerImpl::isValidProducerState(const SendCallback& callback) const {
case HandlerBase::Producer_Fenced:
callback(ResultProducerFenced, {});
return false;
case HandlerBase::Terminated:
callback(ResultTopicTerminated, {});
return false;
case HandlerBase::NotStarted:
case HandlerBase::Failed:
default:
Expand Down
1 change: 1 addition & 0 deletions lib/ResultUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ inline bool isResultRetryable(Result result) {
ResultInvalidConfiguration,
ResultIncompatibleSchema,
ResultTopicNotFound,
ResultTopicTerminated,
ResultOperationNotSupported,
ResultNotAllowedError,
ResultChecksumError,
Expand Down
47 changes: 47 additions & 0 deletions tests/ProducerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,53 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
client.close();
}

TEST(ProducerTest, testCreateProducerAfterTopicTermination) {
const auto topicName =
"testCreateProducerAfterTopicTermination-" + std::to_string(time(nullptr));
const auto topic = "persistent://public/default/" + topicName;

Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("content").build()));
ASSERT_EQ(ResultOk, producer.close());

const auto httpCode =
makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", "");
ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;

Producer terminatedProducer;
ASSERT_EQ(ResultTopicTerminated, client.createProducer(topic, terminatedProducer));

client.close();
}

TEST(ProducerTest, testSendAfterTopicTerminationReconnect) {
const auto topicName =
"testSendAfterTopicTerminationReconnect-" + std::to_string(time(nullptr));
const auto topic = "persistent://public/default/" + topicName;

Client client(serviceUrl, ClientConfiguration().setOperationTimeoutSeconds(1));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("before-terminate").build()));

const auto httpCode =
makePostRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/terminate", "");
ASSERT_EQ(200, httpCode) << "httpCode: " << httpCode;

PulsarFriend::getProducerImpl(producer).disconnectProducer();
ASSERT_TRUE(waitUntil(std::chrono::seconds(3),
[&producer] { return PulsarFriend::isTerminated(producer); }));

ASSERT_EQ(ResultTopicTerminated,
producer.send(MessageBuilder().setContent("after-terminate").build()));

client.close();
}

class ProducerTest : public ::testing::TestWithParam<bool> {};

TEST_P(ProducerTest, testMaxMessageSize) {
Expand Down
5 changes: 5 additions & 0 deletions tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ class PulsarFriend {
return waitUntil(std::chrono::seconds(3),
[producerImpl] { return !producerImpl->getCnx().expired(); });
}

static bool isTerminated(Producer producer) {
auto producerImpl = std::dynamic_pointer_cast<ProducerImpl>(producer.impl_);
return producerImpl && producerImpl->state_ == HandlerBase::Terminated;
}
};
} // namespace pulsar

Expand Down
Loading