Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ class PULSAR_PUBLIC Message {
*/
bool hasOrderingKey() const;

/**
* Check if the message has a null value.
*
* Messages with null values are used as tombstones on compacted topics
* to delete the message for a specific key.
*
* @return true if the message has a null value (tombstone)
* false if the message has actual payload data
*/
bool hasNullValue() const;

/**
* Get the UTC based timestamp in milliseconds referring to when the message was published by the client
* producer
Expand Down
11 changes: 11 additions & 0 deletions include/pulsar/MessageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ class PULSAR_PUBLIC MessageBuilder {
*/
MessageBuilder& disableReplication(bool flag);

/**
* Mark the message as having a null value.
*
* This is used for messages on compacted topics where a null value
* acts as a tombstone for a specific key, removing the message from
* the compacted view.
*
* @return the message builder instance
*/
MessageBuilder& setNullValue();

/**
* create a empty message, with no properties or data
*
Expand Down
19 changes: 19 additions & 0 deletions include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ PULSAR_PUBLIC void pulsar_message_set_replication_clusters(pulsar_message_t *mes
*/
PULSAR_PUBLIC void pulsar_message_disable_replication(pulsar_message_t *message, int flag);

/**
* Mark the message as having a null value.
*
* This is used for messages on compacted topics where a null value
* acts as a tombstone for a specific key, removing the message from
* the compacted view.
*/
PULSAR_PUBLIC void pulsar_message_set_null_value(pulsar_message_t *message);

/// Accessor for built messages

/**
Expand Down Expand Up @@ -221,6 +230,16 @@ PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message,
*/
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message);

/**
* Check if the message has a null value.
*
* Messages with null values are used as tombstones on compacted topics
* to delete the message for a specific key.
*
* @return 1 if the message has a null value, 0 otherwise
*/
PULSAR_PUBLIC int pulsar_message_has_null_value(pulsar_message_t *message);

#ifdef __cplusplus
}
#endif
4 changes: 4 additions & 0 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,10 @@ static std::pair<std::unique_ptr<char[]>, size_t> serializeSingleMessageMetadata
metadata.set_sequence_id(msgMetadata.sequence_id());
}

if (msgMetadata.null_value()) {
metadata.set_null_value(true);
}

size_t size = metadata.ByteSizeLong();
std::unique_ptr<char[]> data{new char[size]};
metadata.SerializeToArray(data.get(), size);
Expand Down
13 changes: 13 additions & 0 deletions lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata& brokerE
} else {
impl_->metadata.clear_sequence_id();
}

if (singleMetadata.null_value()) {
impl_->metadata.set_null_value(true);
} else {
impl_->metadata.clear_null_value();
}
}

const MessageId& Message::getMessageId() const {
Expand Down Expand Up @@ -177,6 +183,13 @@ const std::string& Message::getOrderingKey() const {
return impl_->getOrderingKey();
}

bool Message::hasNullValue() const {
if (impl_) {
return impl_->metadata.null_value();
}
return false;
}

const std::string& Message::getTopicName() const {
if (!impl_) {
return emptyString;
Expand Down
13 changes: 13 additions & 0 deletions lib/MessageBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,35 @@ void MessageBuilder::checkMetadata() {
MessageBuilder& MessageBuilder::setContent(const void* data, size_t size) {
checkMetadata();
impl_->payload = SharedBuffer::copy((char*)data, size);
impl_->metadata.clear_null_value();
return *this;
}

MessageBuilder& MessageBuilder::setAllocatedContent(void* data, size_t size) {
checkMetadata();
impl_->payload = SharedBuffer::wrap((char*)data, size);
impl_->metadata.clear_null_value();
return *this;
}

MessageBuilder& MessageBuilder::setContent(const std::string& data) {
checkMetadata();
impl_->payload = SharedBuffer::copy((char*)data.c_str(), data.length());
impl_->metadata.clear_null_value();
return *this;
}

MessageBuilder& MessageBuilder::setContent(std::string&& data) {
checkMetadata();
impl_->payload = SharedBuffer::take(std::move(data));
impl_->metadata.clear_null_value();
return *this;
}

MessageBuilder& MessageBuilder::setContent(const KeyValue& data) {
checkMetadata();
impl_->keyValuePtr = data.impl_;
impl_->metadata.clear_null_value();
return *this;
}

Expand Down Expand Up @@ -157,6 +163,13 @@ MessageBuilder& MessageBuilder::disableReplication(bool flag) {
return *this;
}

MessageBuilder& MessageBuilder::setNullValue() {
checkMetadata();
Comment thread
Bhargavkonidena marked this conversation as resolved.
impl_->metadata.set_null_value(true);
impl_->payload = SharedBuffer();
return *this;
}

const char* MessageBuilder::data() const {
assert(impl_->payload.data());
return impl_->payload.data();
Expand Down
4 changes: 4 additions & 0 deletions lib/c/c_Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ void pulsar_message_disable_replication(pulsar_message_t *message, int flag) {
message->builder.disableReplication(flag);
}

void pulsar_message_set_null_value(pulsar_message_t *message) { message->builder.setNullValue(); }

int pulsar_message_has_property(pulsar_message_t *message, const char *name) {
return message->message.hasProperty(name);
}
Expand Down Expand Up @@ -148,3 +150,5 @@ void pulsar_message_set_schema_version(pulsar_message_t *message, const char *sc
const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
return message->message.getProducerName().c_str();
}

int pulsar_message_has_null_value(pulsar_message_t *message) { return message->message.hasNullValue(); }
30 changes: 30 additions & 0 deletions tests/BatchMessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,36 @@ TEST(BatchMessageTest, testParseMessageBatchEntry) {
}
}

TEST(BatchMessageTest, testParseMessageBatchEntryWithNullValue) {
std::vector<Message> msgs;
msgs.emplace_back(MessageBuilder().setPartitionKey("key1").setNullValue().build());
msgs.emplace_back(MessageBuilder().setContent("content2").setPartitionKey("key2").build());
msgs.emplace_back(MessageBuilder().setPartitionKey("key3").setNullValue().build());

SharedBuffer payload;
Commands::serializeSingleMessagesToBatchPayload(payload, msgs);
ASSERT_EQ(payload.writableBytes(), 0);

MessageBatch messageBatch;
auto fakeId = MessageIdBuilder().ledgerId(6000L).entryId(20L).partition(0).build();
messageBatch.withMessageId(fakeId).parseFrom(payload, static_cast<uint32_t>(msgs.size()));
const std::vector<Message>& messages = messageBatch.messages();

ASSERT_EQ(messages.size(), 3);

ASSERT_TRUE(messages[0].hasNullValue());
ASSERT_EQ(messages[0].getPartitionKey(), "key1");
ASSERT_EQ(messages[0].getLength(), 0);

ASSERT_FALSE(messages[1].hasNullValue());
ASSERT_EQ(messages[1].getPartitionKey(), "key2");
ASSERT_EQ(messages[1].getDataAsString(), "content2");

ASSERT_TRUE(messages[2].hasNullValue());
ASSERT_EQ(messages[2].getPartitionKey(), "key3");
ASSERT_EQ(messages[2].getLength(), 0);
}

TEST(BatchMessageTest, testSendCallback) {
const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";

Expand Down
43 changes: 43 additions & 0 deletions tests/MessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,46 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
auto msg = MessageBuilder().setContent("test").build();
ASSERT_TRUE(msg.getTopicName().empty());
}

TEST(MessageTest, testNullValueMessage) {
{
auto msg = MessageBuilder().setContent("test").build();
ASSERT_FALSE(msg.hasNullValue());
}

{
auto msg = MessageBuilder().setNullValue().setPartitionKey("key1").build();
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getLength(), 0);
ASSERT_EQ(msg.getPartitionKey(), "key1");
}

{
auto msg = MessageBuilder().setPartitionKey("key2").setNullValue().build();
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getPartitionKey(), "key2");
}
}

TEST(MessageTest, testEmptyMessage) {
auto msg = MessageBuilder().build();
ASSERT_FALSE(msg.hasNullValue());
ASSERT_EQ(msg.getLength(), 0);
}

TEST(MessageTest, testEmptyStringNotNullValue) {
// Empty string message - has content set to ""
auto emptyStringMsg = MessageBuilder().setContent("").build();
ASSERT_FALSE(emptyStringMsg.hasNullValue());
ASSERT_EQ(emptyStringMsg.getLength(), 0);
ASSERT_EQ(emptyStringMsg.getDataAsString(), "");

// Null value message - explicitly marked as null
auto nullValueMsg = MessageBuilder().setNullValue().setPartitionKey("key").build();
ASSERT_TRUE(nullValueMsg.hasNullValue());
ASSERT_EQ(nullValueMsg.getLength(), 0);

// Both have length 0, but they are semantically different
// Empty string: the value IS an empty string
// Null value: the value does not exist (tombstone for compaction)
}
130 changes: 130 additions & 0 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1045,5 +1045,135 @@ TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) {
client.close();
}

TEST(ReaderTest, testReadCompactedWithNullValue) {
Client client(serviceUrl);

const std::string topicName =
"persistent://public/default/testReadCompactedWithNullValue-" + std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

// Send messages with keys
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));

// Send a tombstone (null value) for key2
auto tombstone = MessageBuilder().setPartitionKey("key2").setNullValue().build();
ASSERT_TRUE(tombstone.hasNullValue());
ASSERT_EQ(tombstone.getLength(), 0);
ASSERT_EQ(ResultOk, producer.send(tombstone));

// Update key1 with a new value
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build()));

// Trigger compaction via admin API
{
// Build compaction URL directly from topicName to avoid mismatches
// topicName is "persistent://public/default/..." -> need "persistent/public/default/..."
std::string topicPath = topicName;
std::size_t schemePos = topicPath.find("://");
if (schemePos != std::string::npos) {
topicPath.erase(schemePos, 3);
}
std::string compactUrl = adminUrl + "admin/v2/" + topicPath + "/compaction";
int res = makePutRequest(compactUrl, "");
ASSERT_TRUE(res == 204 || res == 409) << "Failed to trigger compaction, res: " << res;
}

// Create a reader with readCompacted enabled
ReaderConfiguration readerConf;
readerConf.setReadCompacted(true);
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

// Read all messages and verify we can detect null values
std::map<std::string, std::string> keyValues;
std::set<std::string> nullValueKeys;

bool hasMessageAvailable = false;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
while (hasMessageAvailable) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));

std::string key = msg.getPartitionKey();
if (msg.hasNullValue()) {
nullValueKeys.insert(key);
LOG_INFO("Received null value (tombstone) for key: " << key);
} else {
keyValues[key] = msg.getDataAsString();
LOG_INFO("Received message for key: " << key << ", value: " << msg.getDataAsString());
}

ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
}

// Verify we actually read messages (test should not silently succeed with no messages)
ASSERT_FALSE(keyValues.empty() && nullValueKeys.empty()) << "Expected to read at least one message";

// Verify concrete outcomes:
// - key1 should have the updated value "value1-updated"
// - key2 should either be a tombstone (null value) or absent after compaction
// - key3 should have value "value3"
ASSERT_TRUE(keyValues.count("key1") > 0) << "key1 should be present";
ASSERT_EQ(keyValues["key1"], "value1-updated") << "key1 should have the updated value";
ASSERT_TRUE(keyValues.count("key3") > 0) << "key3 should be present";
ASSERT_EQ(keyValues["key3"], "value3") << "key3 should have value3";
ASSERT_TRUE(nullValueKeys.count("key2") > 0 || keyValues.count("key2") == 0)
<< "key2 should either have a null value or be absent after compaction";

producer.close();
reader.close();
client.close();
}

TEST(ReaderTest, testNullValueMessageProperties) {
Client client(serviceUrl);

const std::string topicName =
"persistent://public/default/testNullValueMessageProperties-" + std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

// Send a null value message with properties
auto tombstone = MessageBuilder()
.setPartitionKey("user-123")
.setNullValue()
.setProperty("reason", "account-deleted")
.setProperty("deleted-by", "admin")
.build();

ASSERT_TRUE(tombstone.hasNullValue());
ASSERT_EQ(tombstone.getPartitionKey(), "user-123");
ASSERT_EQ(tombstone.getProperty("reason"), "account-deleted");
ASSERT_EQ(tombstone.getProperty("deleted-by"), "admin");
ASSERT_EQ(tombstone.getLength(), 0);

ASSERT_EQ(ResultOk, producer.send(tombstone));

// Create a reader and verify the message
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg, 5000));

// Verify all properties are preserved
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getPartitionKey(), "user-123");
ASSERT_EQ(msg.getProperty("reason"), "account-deleted");
ASSERT_EQ(msg.getProperty("deleted-by"), "admin");
ASSERT_EQ(msg.getLength(), 0);

producer.close();
reader.close();
client.close();
}

INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));
Loading
Loading