Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ class PULSAR_PUBLIC Message {
*/
bool hasOrderingKey() const;

/**
* Check if the message has a null value.
*
* Messages with null values are used as tombstones on compacted topics
* to delete the message for a specific key.
*
* @return true if the message has a null value (tombstone)
* false if the message has actual payload data
*/
bool hasNullValue() const;

/**
* Get the UTC based timestamp in milliseconds referring to when the message was published by the client
* producer
Expand Down
11 changes: 11 additions & 0 deletions include/pulsar/MessageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ class PULSAR_PUBLIC MessageBuilder {
*/
MessageBuilder& disableReplication(bool flag);

/**
* Mark the message as having a null value.
*
* This is used for messages on compacted topics where a null value
* acts as a tombstone for a specific key, removing the message from
* the compacted view.
*
* @return the message builder instance
*/
MessageBuilder& setNullValue();

/**
* create a empty message, with no properties or data
*
Expand Down
19 changes: 19 additions & 0 deletions include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ PULSAR_PUBLIC void pulsar_message_set_replication_clusters(pulsar_message_t *mes
*/
PULSAR_PUBLIC void pulsar_message_disable_replication(pulsar_message_t *message, int flag);

/**
* Mark the message as having a null value.
*
* This is used for messages on compacted topics where a null value
* acts as a tombstone for a specific key, removing the message from
* the compacted view.
*/
PULSAR_PUBLIC void pulsar_message_set_null_value(pulsar_message_t *message);

/// Accessor for built messages

/**
Expand Down Expand Up @@ -221,6 +230,16 @@ PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message,
*/
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message);

/**
* Check if the message has a null value.
*
* Messages with null values are used as tombstones on compacted topics
* to delete the message for a specific key.
*
* @return 1 if the message has a null value, 0 otherwise
*/
PULSAR_PUBLIC int pulsar_message_has_null_value(pulsar_message_t *message);

#ifdef __cplusplus
}
#endif
4 changes: 4 additions & 0 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,10 @@ static std::pair<std::unique_ptr<char[]>, size_t> serializeSingleMessageMetadata
metadata.set_sequence_id(msgMetadata.sequence_id());
}

if (msgMetadata.null_value()) {
metadata.set_null_value(true);
}

size_t size = metadata.ByteSizeLong();
std::unique_ptr<char[]> data{new char[size]};
metadata.SerializeToArray(data.get(), size);
Expand Down
13 changes: 13 additions & 0 deletions lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata& brokerE
} else {
impl_->metadata.clear_sequence_id();
}

if (singleMetadata.null_value()) {
impl_->metadata.set_null_value(true);
} else {
impl_->metadata.clear_null_value();
}
}

const MessageId& Message::getMessageId() const {
Expand Down Expand Up @@ -177,6 +183,13 @@ const std::string& Message::getOrderingKey() const {
return impl_->getOrderingKey();
}

bool Message::hasNullValue() const {
if (impl_) {
return impl_->metadata.null_value();
}
return false;
}

const std::string& Message::getTopicName() const {
if (!impl_) {
return emptyString;
Expand Down
6 changes: 6 additions & 0 deletions lib/MessageBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ MessageBuilder& MessageBuilder::disableReplication(bool flag) {
return *this;
}

MessageBuilder& MessageBuilder::setNullValue() {
checkMetadata();
Comment thread
Bhargavkonidena marked this conversation as resolved.
impl_->metadata.set_null_value(true);
return *this;
}

const char* MessageBuilder::data() const {
assert(impl_->payload.data());
return impl_->payload.data();
Expand Down
4 changes: 4 additions & 0 deletions lib/c/c_Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ void pulsar_message_disable_replication(pulsar_message_t *message, int flag) {
message->builder.disableReplication(flag);
}

void pulsar_message_set_null_value(pulsar_message_t *message) { message->builder.setNullValue(); }

int pulsar_message_has_property(pulsar_message_t *message, const char *name) {
return message->message.hasProperty(name);
}
Expand Down Expand Up @@ -148,3 +150,5 @@ void pulsar_message_set_schema_version(pulsar_message_t *message, const char *sc
const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
return message->message.getProducerName().c_str();
}

int pulsar_message_has_null_value(pulsar_message_t *message) { return message->message.hasNullValue(); }
30 changes: 30 additions & 0 deletions tests/BatchMessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,36 @@ TEST(BatchMessageTest, testParseMessageBatchEntry) {
}
}

TEST(BatchMessageTest, testParseMessageBatchEntryWithNullValue) {
std::vector<Message> msgs;
msgs.emplace_back(MessageBuilder().setPartitionKey("key1").setNullValue().build());
msgs.emplace_back(MessageBuilder().setContent("content2").setPartitionKey("key2").build());
msgs.emplace_back(MessageBuilder().setPartitionKey("key3").setNullValue().build());

SharedBuffer payload;
Commands::serializeSingleMessagesToBatchPayload(payload, msgs);
ASSERT_EQ(payload.writableBytes(), 0);

MessageBatch messageBatch;
auto fakeId = MessageIdBuilder().ledgerId(6000L).entryId(20L).partition(0).build();
messageBatch.withMessageId(fakeId).parseFrom(payload, static_cast<uint32_t>(msgs.size()));
const std::vector<Message>& messages = messageBatch.messages();

ASSERT_EQ(messages.size(), 3);

ASSERT_TRUE(messages[0].hasNullValue());
ASSERT_EQ(messages[0].getPartitionKey(), "key1");
ASSERT_EQ(messages[0].getLength(), 0);

ASSERT_FALSE(messages[1].hasNullValue());
ASSERT_EQ(messages[1].getPartitionKey(), "key2");
ASSERT_EQ(messages[1].getDataAsString(), "content2");

ASSERT_TRUE(messages[2].hasNullValue());
ASSERT_EQ(messages[2].getPartitionKey(), "key3");
ASSERT_EQ(messages[2].getLength(), 0);
}

TEST(BatchMessageTest, testSendCallback) {
const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";

Expand Down
43 changes: 43 additions & 0 deletions tests/MessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,46 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
auto msg = MessageBuilder().setContent("test").build();
ASSERT_TRUE(msg.getTopicName().empty());
}

TEST(MessageTest, testNullValueMessage) {
{
auto msg = MessageBuilder().setContent("test").build();
ASSERT_FALSE(msg.hasNullValue());
}

{
auto msg = MessageBuilder().setNullValue().setPartitionKey("key1").build();
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getLength(), 0);
ASSERT_EQ(msg.getPartitionKey(), "key1");
}

{
auto msg = MessageBuilder().setPartitionKey("key2").setNullValue().build();
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getPartitionKey(), "key2");
}
}

TEST(MessageTest, testEmptyMessage) {
auto msg = MessageBuilder().build();
ASSERT_FALSE(msg.hasNullValue());
ASSERT_EQ(msg.getLength(), 0);
}

TEST(MessageTest, testEmptyStringNotNullValue) {
// Empty string message - has content set to ""
auto emptyStringMsg = MessageBuilder().setContent("").build();
ASSERT_FALSE(emptyStringMsg.hasNullValue());
ASSERT_EQ(emptyStringMsg.getLength(), 0);
ASSERT_EQ(emptyStringMsg.getDataAsString(), "");

// Null value message - explicitly marked as null
auto nullValueMsg = MessageBuilder().setNullValue().setPartitionKey("key").build();
ASSERT_TRUE(nullValueMsg.hasNullValue());
ASSERT_EQ(nullValueMsg.getLength(), 0);

// Both have length 0, but they are semantically different
// Empty string: the value IS an empty string
// Null value: the value does not exist (tombstone for compaction)
}
125 changes: 125 additions & 0 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1045,5 +1045,130 @@ TEST(ReaderTest, testReaderWithZeroMessageListenerThreads) {
client.close();
}

TEST(ReaderTest, testReadCompactedWithNullValue) {
Client client(serviceUrl);

const std::string topicName =
"persistent://public/default/testReadCompactedWithNullValue-" + std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

// Send messages with keys
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1").build()));
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key2").setContent("value2").build()));
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key3").setContent("value3").build()));

// Send a tombstone (null value) for key2
auto tombstone = MessageBuilder().setPartitionKey("key2").setNullValue().build();
ASSERT_TRUE(tombstone.hasNullValue());
ASSERT_EQ(tombstone.getLength(), 0);
ASSERT_EQ(ResultOk, producer.send(tombstone));

// Update key1 with a new value
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-updated").build()));

// Trigger compaction via admin API
{
std::string compactUrl =
adminUrl + "admin/v2/persistent/public/default/testReadCompactedWithNullValue-" +
std::to_string(time(nullptr)) + "/compaction";
// Note: Compaction is async, we just trigger it
makePutRequest(compactUrl, "");
Comment thread
Bhargavkonidena marked this conversation as resolved.
Outdated
}

// Create a reader with readCompacted enabled
ReaderConfiguration readerConf;
readerConf.setReadCompacted(true);
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

// Read all messages and verify we can detect null values
std::map<std::string, std::string> keyValues;
std::set<std::string> nullValueKeys;

for (int i = 0; i < 10; i++) {
bool hasMessageAvailable = false;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
if (!hasMessageAvailable) {
break;
}

Message msg;
Result res = reader.readNext(msg, 3000);
if (res != ResultOk) {
break;
}
Comment thread
Bhargavkonidena marked this conversation as resolved.
Outdated

std::string key = msg.getPartitionKey();
if (msg.hasNullValue()) {
nullValueKeys.insert(key);
LOG_INFO("Received null value (tombstone) for key: " << key);
} else {
keyValues[key] = msg.getDataAsString();
LOG_INFO("Received message for key: " << key << ", value: " << msg.getDataAsString());
}
}

// Verify we received the tombstone for key2
// Note: Without compaction completing, we see all messages including the tombstone
// After compaction, we would only see the latest value for each key
ASSERT_TRUE(nullValueKeys.count("key2") > 0 || keyValues.count("key2") == 0)
<< "key2 should either have a null value or be absent after compaction";

producer.close();
reader.close();
client.close();
}

TEST(ReaderTest, testNullValueMessageProperties) {
Client client(serviceUrl);

const std::string topicName =
"persistent://public/default/testNullValueMessageProperties-" + std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

// Send a null value message with properties
auto tombstone = MessageBuilder()
.setPartitionKey("user-123")
.setNullValue()
.setProperty("reason", "account-deleted")
.setProperty("deleted-by", "admin")
.build();

ASSERT_TRUE(tombstone.hasNullValue());
ASSERT_EQ(tombstone.getPartitionKey(), "user-123");
ASSERT_EQ(tombstone.getProperty("reason"), "account-deleted");
ASSERT_EQ(tombstone.getProperty("deleted-by"), "admin");
ASSERT_EQ(tombstone.getLength(), 0);

ASSERT_EQ(ResultOk, producer.send(tombstone));

// Create a reader and verify the message
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg, 5000));

// Verify all properties are preserved
ASSERT_TRUE(msg.hasNullValue());
ASSERT_EQ(msg.getPartitionKey(), "user-123");
ASSERT_EQ(msg.getProperty("reason"), "account-deleted");
ASSERT_EQ(msg.getProperty("deleted-by"), "admin");
ASSERT_EQ(msg.getLength(), 0);

producer.close();
reader.close();
client.close();
}

INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));
Loading
Loading