diff --git a/foreign/cpp/Cargo.toml b/foreign/cpp/Cargo.toml index 477cb0b74d..0b34273974 100644 --- a/foreign/cpp/Cargo.toml +++ b/foreign/cpp/Cargo.toml @@ -27,6 +27,7 @@ ignored = ["cxx-build"] crate-type = ["staticlib"] [dependencies] +bytes = "1.11.1" cxx = "1.0.194" iggy = { path = "../../core/sdk" } iggy_common = { path = "../../core/common" } diff --git a/foreign/cpp/build.rs b/foreign/cpp/build.rs index 476a9d796c..d2cbf0094c 100644 --- a/foreign/cpp/build.rs +++ b/foreign/cpp/build.rs @@ -24,6 +24,7 @@ fn main() { println!("cargo:rerun-if-changed=src/consumer_group.rs"); println!("cargo:rerun-if-changed=src/identifier.rs"); println!("cargo:rerun-if-changed=src/lib.rs"); + println!("cargo:rerun-if-changed=src/messages.rs"); println!("cargo:rerun-if-changed=src/stream.rs"); println!("cargo:rerun-if-changed=src/topic.rs"); } diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index 341e0c1a05..def43b84a2 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -17,10 +17,11 @@ use crate::{RUNTIME, ffi}; use iggy::prelude::{ - Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm, - ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as RustIggyClient, + Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm, Consumer, + ConsumerGroupClient, ConsumerKind, Identifier as RustIdentifier, IggyClient as RustIggyClient, IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as RustIggyExpiry, - MaxTopicSize as RustMaxTopicSize, PartitionClient, StreamClient, TopicClient, UserClient, + IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, MessageClient, PartitionClient, + Partitioning, PollingStrategy, StreamClient, TopicClient, UserClient, }; use std::str::FromStr; use std::sync::Arc; @@ -73,6 +74,17 @@ impl Client { }) } + pub fn get_streams(&self) -> Result, String> { + RUNTIME.block_on(async { + let streams = self + .inner + .get_streams() + .await + .map_err(|error| format!("Could not get streams: {error}"))?; + Ok(streams.into_iter().map(ffi::Stream::from).collect()) + }) + } + pub fn create_stream(&self, stream_name: String) -> Result<(), String> { RUNTIME.block_on(async { self.inner @@ -127,6 +139,132 @@ impl Client { // }) // } + #[allow(clippy::too_many_arguments)] + pub fn send_messages( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + partitioning_kind: String, + partitioning_value: Vec, + messages: Vec, + ) -> Result<(), String> { + let rust_stream_id = RustIdentifier::try_from(stream_id) + .map_err(|error| format!("Could not send messages: {error}"))?; + let rust_topic_id = RustIdentifier::try_from(topic_id) + .map_err(|error| format!("Could not send messages: {error}"))?; + + let partitioning = match partitioning_kind.as_str() { + "balanced" => Partitioning::balanced(), + "partition_id" => { + if partitioning_value.len() < 4 { + return Err( + "Could not send messages: partition_id requires 4 bytes".to_string() + ); + } + let id = u32::from_le_bytes(partitioning_value[..4].try_into().map_err(|_| { + "Could not send messages: invalid partition_id value".to_string() + })?); + Partitioning::partition_id(id) + } + "messages_key" => Partitioning::messages_key(&partitioning_value).map_err(|error| { + format!("Could not send messages: invalid messages key: {error}") + })?, + _ => { + return Err(format!( + "Could not send messages: invalid partitioning kind: {partitioning_kind}" + )); + } + }; + + let mut iggy_messages: Vec = messages + .into_iter() + .map(IggyMessage::try_from) + .collect::, _>>()?; + + RUNTIME.block_on(async { + self.inner + .send_messages( + &rust_stream_id, + &rust_topic_id, + &partitioning, + &mut iggy_messages, + ) + .await + .map_err(|error| format!("Could not send messages: {error}"))?; + Ok(()) + }) + } + + #[allow(clippy::too_many_arguments)] + pub fn poll_messages( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + partition_id: u32, + consumer_kind: String, + consumer_id: ffi::Identifier, + polling_strategy_kind: String, + polling_strategy_value: u64, + count: u32, + auto_commit: bool, + ) -> Result { + let rust_stream_id = RustIdentifier::try_from(stream_id) + .map_err(|error| format!("Could not poll messages: {error}"))?; + let rust_topic_id = RustIdentifier::try_from(topic_id) + .map_err(|error| format!("Could not poll messages: {error}"))?; + let rust_consumer_id = RustIdentifier::try_from(consumer_id) + .map_err(|error| format!("Could not poll messages: {error}"))?; + + let consumer = Consumer { + kind: match consumer_kind.as_str() { + "consumer" => ConsumerKind::Consumer, + "consumer_group" => ConsumerKind::ConsumerGroup, + _ => { + return Err(format!( + "Could not poll messages: invalid consumer kind: {consumer_kind}" + )); + } + }, + id: rust_consumer_id, + }; + + let strategy = match polling_strategy_kind.as_str() { + "offset" => PollingStrategy::offset(polling_strategy_value), + "timestamp" => PollingStrategy::timestamp(IggyTimestamp::from(polling_strategy_value)), + "first" => PollingStrategy::first(), + "last" => PollingStrategy::last(), + "next" => PollingStrategy::next(), + _ => { + return Err(format!( + "Could not poll messages: invalid polling strategy: {polling_strategy_kind}" + )); + } + }; + + let opt_partition = if partition_id == u32::MAX { + None + } else { + Some(partition_id) + }; + + RUNTIME.block_on(async { + let polled = self + .inner + .poll_messages( + &rust_stream_id, + &rust_topic_id, + opt_partition, + &consumer, + &strategy, + count, + auto_commit, + ) + .await + .map_err(|error| format!("Could not poll messages: {error}"))?; + Ok(ffi::PolledMessages::from(polled)) + }) + } + #[allow(clippy::too_many_arguments)] pub fn create_topic( &self, @@ -378,6 +516,66 @@ impl Client { Ok(()) }) } + + pub fn join_consumer_group( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + group_id: ffi::Identifier, + ) -> Result<(), String> { + let rust_stream_id = RustIdentifier::try_from(stream_id).map_err(|error| { + format!("Could not join consumer group: invalid stream identifier: {error}") + })?; + let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| { + format!("Could not join consumer group: invalid topic identifier: {error}") + })?; + let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error| { + format!("Could not join consumer group: invalid group identifier: {error}") + })?; + + RUNTIME.block_on(async { + self.inner + .join_consumer_group(&rust_stream_id, &rust_topic_id, &rust_group_id) + .await + .map_err(|error| { + format!( + "Could not join consumer group '{}' for topic '{}' on stream '{}': {error}", + rust_group_id, rust_topic_id, rust_stream_id + ) + })?; + Ok(()) + }) + } + + pub fn leave_consumer_group( + &self, + stream_id: ffi::Identifier, + topic_id: ffi::Identifier, + group_id: ffi::Identifier, + ) -> Result<(), String> { + let rust_stream_id = RustIdentifier::try_from(stream_id).map_err(|error| { + format!("Could not leave consumer group: invalid stream identifier: {error}") + })?; + let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| { + format!("Could not leave consumer group: invalid topic identifier: {error}") + })?; + let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error| { + format!("Could not leave consumer group: invalid group identifier: {error}") + })?; + + RUNTIME.block_on(async { + self.inner + .leave_consumer_group(&rust_stream_id, &rust_topic_id, &rust_group_id) + .await + .map_err(|error| { + format!( + "Could not leave consumer group '{}' for topic '{}' on stream '{}': {error}", + rust_group_id, rust_topic_id, rust_stream_id + ) + })?; + Ok(()) + }) + } } pub unsafe fn delete_connection(client: *mut Client) -> Result<(), String> { diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs index 4d47a5b946..d65e3339db 100644 --- a/foreign/cpp/src/lib.rs +++ b/foreign/cpp/src/lib.rs @@ -17,6 +17,7 @@ mod client; mod consumer_group; mod identifier; +mod messages; mod stream; mod topic; @@ -51,6 +52,36 @@ mod ffi { partitions_count: u32, } + struct Stream { + id: u32, + created_at: u64, + name: String, + size_bytes: u64, + messages_count: u64, + topics_count: u32, + } + + struct Message { + checksum: u64, + id_lo: u64, + id_hi: u64, + offset: u64, + timestamp: u64, + origin_timestamp: u64, + user_headers_length: u32, + payload_length: u32, + reserved: u64, + payload: Vec, + user_headers: Vec, + } + + struct PolledMessages { + partition_id: u32, + current_offset: u64, + count: u32, + messages: Vec, + } + struct StreamDetails { id: u32, created_at: u64, @@ -83,6 +114,7 @@ mod ffi { fn login_user(self: &Client, username: String, password: String) -> Result<()>; fn connect(self: &Client) -> Result<()>; fn create_stream(self: &Client, stream_name: String) -> Result<()>; + fn get_streams(self: &Client) -> Result>; fn get_stream(self: &Client, stream_id: Identifier) -> Result; fn delete_stream(self: &Client, stream_id: Identifier) -> Result<()>; // fn purge_stream(&self, stream_id: Identifier) -> Result<()>; @@ -129,6 +161,44 @@ mod ffi { topic_id: Identifier, group_id: Identifier, ) -> Result<()>; + fn join_consumer_group( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + group_id: Identifier, + ) -> Result<()>; + fn leave_consumer_group( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + group_id: Identifier, + ) -> Result<()>; + + #[allow(clippy::too_many_arguments)] + fn poll_messages( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + partition_id: u32, + consumer_kind: String, + consumer_id: Identifier, + polling_strategy_kind: String, + polling_strategy_value: u64, + count: u32, + auto_commit: bool, + ) -> Result; + + fn new_message(self: &mut Message, payload: Vec); + + #[allow(clippy::too_many_arguments)] + fn send_messages( + self: &Client, + stream_id: Identifier, + topic_id: Identifier, + partitioning_kind: String, + partitioning_value: Vec, + messages: Vec, + ) -> Result<()>; unsafe fn delete_connection(client: *mut Client) -> Result<()>; diff --git a/foreign/cpp/src/messages.rs b/foreign/cpp/src/messages.rs new file mode 100644 index 0000000000..6b513a6c15 --- /dev/null +++ b/foreign/cpp/src/messages.rs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::ffi; +use bytes::Bytes; +use iggy::prelude::{IggyMessage as RustIggyMessage, PolledMessages as RustPolledMessages}; + +impl ffi::Message { + pub fn new_message(&mut self, payload: Vec) { + let payload_length = payload.len() as u32; + *self = Self { + checksum: 0, + id_lo: 0, + id_hi: 0, + offset: 0, + timestamp: 0, + origin_timestamp: 0, + user_headers_length: 0, + payload_length, + reserved: 0, + payload, + user_headers: Vec::new(), + }; + } +} + +impl From for ffi::Message { + fn from(m: RustIggyMessage) -> Self { + let id = m.header.id; + ffi::Message { + checksum: m.header.checksum, + id_lo: id as u64, + id_hi: (id >> 64) as u64, + offset: m.header.offset, + timestamp: m.header.timestamp, + origin_timestamp: m.header.origin_timestamp, + user_headers_length: m.header.user_headers_length, + payload_length: m.header.payload_length, + reserved: m.header.reserved, + payload: m.payload.to_vec(), + user_headers: m.user_headers.map(|h| h.to_vec()).unwrap_or_default(), + } + } +} + +impl TryFrom for RustIggyMessage { + type Error = String; + + fn try_from(m: ffi::Message) -> Result { + let id = ((m.id_hi as u128) << 64) | (m.id_lo as u128); + let payload = Bytes::from(m.payload); + let msg = if id > 0 { + RustIggyMessage::builder().id(id).payload(payload).build() + } else { + RustIggyMessage::builder().payload(payload).build() + }; + msg.map_err(|error| format!("Could not convert message: {error}")) + } +} + +impl From for ffi::PolledMessages { + fn from(p: RustPolledMessages) -> Self { + ffi::PolledMessages { + partition_id: p.partition_id, + current_offset: p.current_offset, + count: p.count, + messages: p.messages.into_iter().map(ffi::Message::from).collect(), + } + } +} diff --git a/foreign/cpp/src/stream.rs b/foreign/cpp/src/stream.rs index 5c1f6e9d7d..b6e0ae22bf 100644 --- a/foreign/cpp/src/stream.rs +++ b/foreign/cpp/src/stream.rs @@ -16,8 +16,22 @@ // under the License. use crate::ffi; +use iggy::prelude::Stream as RustStream; use iggy::prelude::StreamDetails as RustStreamDetails; +impl From for ffi::Stream { + fn from(s: RustStream) -> Self { + ffi::Stream { + id: s.id, + created_at: s.created_at.as_micros(), + name: s.name, + size_bytes: s.size.as_bytes_u64(), + messages_count: s.messages_count, + topics_count: s.topics_count, + } + } +} + impl From for ffi::StreamDetails { fn from(stream: RustStreamDetails) -> Self { ffi::StreamDetails { diff --git a/foreign/cpp/tests/client/low_level_e2e.cpp b/foreign/cpp/tests/client/low_level_e2e.cpp index 26b82b82f8..7306f0c953 100644 --- a/foreign/cpp/tests/client/low_level_e2e.cpp +++ b/foreign/cpp/tests/client/low_level_e2e.cpp @@ -16,6 +16,7 @@ // under the License. // TODO(slbotbm): create fixture for setup/teardown. +// TODO(slbotbm): Add tests for join_consumer_group() and leave_consumer_group() #include diff --git a/foreign/cpp/tests/common/test_helpers.hpp b/foreign/cpp/tests/common/test_helpers.hpp index 5457c09d7c..15851f08cc 100644 --- a/foreign/cpp/tests/common/test_helpers.hpp +++ b/foreign/cpp/tests/common/test_helpers.hpp @@ -42,3 +42,20 @@ inline iggy::ffi::Client *login_to_server() { client->login_user("iggy", "iggy"); return client; } + +inline rust::Vec to_payload(const std::string &s) { + rust::Vec v; + for (const char c : s) { + v.push_back(static_cast(c)); + } + return v; +} + +inline rust::Vec partition_id_bytes(std::uint32_t id) { + rust::Vec v; + v.push_back(static_cast(id & 0xFF)); + v.push_back(static_cast((id >> 8) & 0xFF)); + v.push_back(static_cast((id >> 16) & 0xFF)); + v.push_back(static_cast((id >> 24) & 0xFF)); + return v; +} diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp b/foreign/cpp/tests/message/low_level_e2e.cpp new file mode 100644 index 0000000000..c092929ef5 --- /dev/null +++ b/foreign/cpp/tests/message/low_level_e2e.cpp @@ -0,0 +1,1266 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include + +#include "lib.rs.h" +#include "tests/common/test_helpers.hpp" + +TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) { + RecordProperty("description", "Sends 10 messages and polls them back, verifying count, offsets, and payloads."); + const std::string stream_name = "cpp-msg-roundtrip"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("test message " + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages))); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 10u); + ASSERT_EQ(polled.messages.size(), 10u); + for (std::uint32_t i = 0; i < 10; i++) { + ASSERT_EQ(polled.messages[i].offset, static_cast(i)); + std::string expected = "test message " + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + ASSERT_EQ(actual, expected) << "Payload mismatch at offset " << i; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) { + RecordProperty("description", "Verifies that polled message IDs match the sent IDs."); + const std::string stream_name = "cpp-msg-verify-ids"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("id-test-message")); + msg.id_lo = 42; + msg.id_hi = 0; + messages.push_back(std::move(msg)); + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.messages.size(), 1u); + ASSERT_EQ(polled.messages[0].id_lo, 42u); + ASSERT_EQ(polled.messages[0].id_hi, 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesFromEmptyPartition) { + RecordProperty("description", "Verifies polling from an empty partition returns zero messages."); + const std::string stream_name = "cpp-msg-empty-poll"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 0u); + ASSERT_EQ(polled.messages.size(), 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) { + RecordProperty("description", "Verifies send_messages throws when not authenticated."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + ASSERT_NO_THROW(client->connect()); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("should-fail")); + messages.push_back(std::move(msg)); + + ASSERT_THROW(client->send_messages(make_numeric_identifier(1), make_numeric_identifier(1), "partition_id", + partition_id_bytes(0), std::move(messages)), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesWithInvalidStreamId) { + RecordProperty("description", "Throws when sending messages with an invalid stream identifier."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + iggy::ffi::Identifier invalid_id; + invalid_id.kind = "invalid"; + invalid_id.length = 0; + + ASSERT_THROW(client->send_messages(invalid_id, make_numeric_identifier(1), "partition_id", partition_id_bytes(0), + std::move(messages)), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesToNonExistentStream) { + RecordProperty("description", "Throws when sending messages to a non-existent stream."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + ASSERT_THROW(client->send_messages(make_string_identifier("nonexistent-stream-12345"), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages)), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningKind) { + RecordProperty("description", "Throws when sending messages with an invalid partitioning kind."); + const std::string stream_name = "cpp-msg-invalid-part-kind"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "invalid_kind", + partition_id_bytes(0), std::move(messages)), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningValue) { + RecordProperty("description", "Throws when sending messages with insufficient partitioning value bytes."); + const std::string stream_name = "cpp-msg-invalid-part-val"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + rust::Vec short_bytes; + short_bytes.push_back(0x00); + short_bytes.push_back(0x01); + + ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + std::move(short_bytes), std::move(messages)), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesToSpecificPartitionVerified) { + RecordProperty("description", + "Verifies messages sent to a specific partition are only retrievable from that partition."); + const std::string stream_name = "cpp-msg-specific-part"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 3, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("partition-test-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled_part0 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + ASSERT_EQ(polled_part0.count, 5u); + + auto polled_part1 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 1, + "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + ASSERT_EQ(polled_part1.count, 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendEmptyMessageVectorThrows) { + RecordProperty("description", "Throws when sending an empty message vector."); + const std::string stream_name = "cpp-msg-empty-vec"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec empty_messages; + + ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(empty_messages)), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessageWithEmptyPayloadThrows) { + RecordProperty("description", "Throws when sending a message with an empty payload."); + const std::string stream_name = "cpp-msg-empty-payload"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + iggy::ffi::Message msg; + rust::Vec empty_payload; + msg.new_message(std::move(empty_payload)); + messages.push_back(std::move(msg)); + + ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessageWithOversizedPayloadThrows) { + RecordProperty("description", "Throws when sending a message exceeding maximum payload size."); + const std::string stream_name = "cpp-msg-oversized"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec oversized_payload; + for (std::uint32_t i = 0; i < 64000001u; i++) { + oversized_payload.push_back(0x41); + } + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(std::move(oversized_payload)); + messages.push_back(std::move(msg)); + + ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesPreservesOrder) { + RecordProperty("description", "Verifies messages are stored and retrieved in the order they were sent."); + const std::string stream_name = "cpp-msg-order"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 50; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("order-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 50u); + for (std::uint32_t i = 0; i < 50; i++) { + ASSERT_EQ(polled.messages[i].offset, static_cast(i)); + std::string expected = "order-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesWithDuplicateIds) { + RecordProperty("description", "Verifies sending multiple messages with the same ID succeeds."); + const std::string stream_name = "cpp-msg-dup-ids"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 3; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("dup-id-msg-" + std::to_string(i))); + msg.id_lo = 99; + msg.id_hi = 0; + messages.push_back(std::move(msg)); + } + + ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages))); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 3u); + for (std::size_t i = 0; i < polled.messages.size(); i++) { + EXPECT_EQ(polled.messages[i].id_lo, 99u); + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesWithVariousPayloads) { + RecordProperty("description", + "Verifies various payload types including null bytes, UTF-8, and binary data are preserved."); + const std::string stream_name = "cpp-msg-various-payloads"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec payload_null; + payload_null.push_back(0x00); + payload_null.push_back(0x01); + payload_null.push_back(0x00); + payload_null.push_back(0xFF); + + rust::Vec payload_binary; + payload_binary.push_back(0xDE); + payload_binary.push_back(0xAD); + payload_binary.push_back(0xBE); + payload_binary.push_back(0xEF); + + rust::Vec messages; + + iggy::ffi::Message msg0; + msg0.new_message(to_payload("simple ascii")); + messages.push_back(std::move(msg0)); + + iggy::ffi::Message msg1; + msg1.new_message(std::move(payload_null)); + messages.push_back(std::move(msg1)); + + iggy::ffi::Message msg2; + msg2.new_message(to_payload("héllo wörld")); + messages.push_back(std::move(msg2)); + + iggy::ffi::Message msg3; + msg3.new_message(std::move(payload_binary)); + messages.push_back(std::move(msg3)); + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 4u); + + std::string ascii_actual(polled.messages[0].payload.begin(), polled.messages[0].payload.end()); + EXPECT_EQ(ascii_actual, "simple ascii"); + + ASSERT_EQ(polled.messages[1].payload.size(), 4u); + EXPECT_EQ(polled.messages[1].payload[0], 0x00); + EXPECT_EQ(polled.messages[1].payload[1], 0x01); + EXPECT_EQ(polled.messages[1].payload[2], 0x00); + EXPECT_EQ(polled.messages[1].payload[3], 0xFF); + + std::string utf8_actual(polled.messages[2].payload.begin(), polled.messages[2].payload.end()); + EXPECT_EQ(utf8_actual, "héllo wörld"); + + ASSERT_EQ(polled.messages[3].payload.size(), 4u); + EXPECT_EQ(polled.messages[3].payload[0], 0xDE); + EXPECT_EQ(polled.messages[3].payload[1], 0xAD); + EXPECT_EQ(polled.messages[3].payload[2], 0xBE); + EXPECT_EQ(polled.messages[3].payload[3], 0xEF); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesBeforeLoginThrows) { + RecordProperty("description", "Throws when polling messages before authentication."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + ASSERT_NO_THROW(client->connect()); + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(1), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidStreamIdThrows) { + RecordProperty("description", "Throws when polling messages with an invalid stream identifier."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + iggy::ffi::Identifier invalid_id; + invalid_id.kind = "invalid"; + invalid_id.length = 0; + + ASSERT_THROW(client->poll_messages(invalid_id, make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesFromNonExistentStreamThrows) { + RecordProperty("description", "Throws when polling messages from a non-existent stream."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + ASSERT_THROW(client->poll_messages(make_string_identifier("nonexistent-stream-poll"), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerKindThrows) { + RecordProperty("description", "Throws when polling messages with an invalid consumer kind."); + const std::string stream_name = "cpp-msg-invalid-consumer"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "invalid", + make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidStrategyKindThrows) { + RecordProperty("description", "Throws when polling messages with an invalid polling strategy kind."); + const std::string stream_name = "cpp-msg-invalid-strategy"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "invalid", 0, 10, false), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesCountLessThanAvailable) { + RecordProperty("description", "Returns only the requested count when fewer messages are requested than available."); + const std::string stream_name = "cpp-msg-count-less"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 5, false); + + ASSERT_EQ(polled.count, 5u); + ASSERT_EQ(polled.messages.size(), 5u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithLargeOffset) { + RecordProperty("description", "Returns zero messages when polling with an offset beyond available messages."); + const std::string stream_name = "cpp-msg-large-offset"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 999999, 100, false); + + ASSERT_EQ(polled.count, 0u); + ASSERT_EQ(polled.messages.size(), 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesFirstStrategy) { + RecordProperty("description", "Verifies first polling strategy returns messages from the beginning."); + const std::string stream_name = "cpp-msg-first-strategy"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "first", 0, 3, false); + + ASSERT_EQ(polled.count, 3u); + ASSERT_EQ(polled.messages.size(), 3u); + EXPECT_EQ(polled.messages[0].offset, 0u); + for (std::uint32_t i = 0; i < 3; i++) { + EXPECT_EQ(polled.messages[i].offset, static_cast(i)); + std::string expected = "msg-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesLastStrategy) { + RecordProperty("description", "Verifies last polling strategy returns messages from the end."); + const std::string stream_name = "cpp-msg-last-strategy"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "last", 0, 3, false); + + ASSERT_EQ(polled.count, 3u); + ASSERT_EQ(polled.messages.size(), 3u); + EXPECT_EQ(polled.messages[0].offset, 7u); + EXPECT_EQ(polled.messages[2].offset, 9u); + for (std::uint32_t i = 0; i < 3; i++) { + std::string expected = "msg-" + std::to_string(7 + i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesNextStrategyNoAutoCommit) { + RecordProperty("description", + "Verifies next strategy without auto-commit returns the same messages on repeated calls."); + const std::string stream_name = "cpp-msg-next-no-commit"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled1 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "next", 0, 100, false); + ASSERT_EQ(polled1.count, 5u); + + auto polled2 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "next", 0, 100, false); + ASSERT_EQ(polled2.count, 5u); + for (std::uint32_t i = 0; i < 5; i++) { + EXPECT_EQ(polled1.messages[i].offset, static_cast(i)); + std::string expected = "msg-" + std::to_string(i); + std::string actual(polled1.messages[i].payload.begin(), polled1.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "polled1 payload mismatch at index " << i; + } + for (std::uint32_t i = 0; i < 5; i++) { + EXPECT_EQ(polled2.messages[i].offset, static_cast(i)); + std::string expected = "msg-" + std::to_string(i); + std::string actual(polled2.messages[i].payload.begin(), polled2.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "polled2 payload mismatch at index " << i; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesNextStrategyAutoCommit) { + RecordProperty("description", "Verifies next strategy with auto-commit advances the offset on subsequent polls."); + const std::string stream_name = "cpp-msg-next-auto-commit"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled1 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "next", 0, 5, true); + ASSERT_EQ(polled1.count, 5u); + EXPECT_EQ(polled1.messages[0].offset, 0u); + EXPECT_EQ(polled1.messages[4].offset, 4u); + + auto polled2 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "next", 0, 5, true); + ASSERT_EQ(polled2.count, 5u); + EXPECT_EQ(polled2.messages[0].offset, 5u); + EXPECT_EQ(polled2.messages[4].offset, 9u); + for (std::uint32_t i = 0; i < 5; i++) { + std::string expected1 = "msg-" + std::to_string(i); + std::string actual1(polled1.messages[i].payload.begin(), polled1.messages[i].payload.end()); + EXPECT_EQ(actual1, expected1) << "polled1 payload mismatch at index " << i; + } + for (std::uint32_t i = 0; i < 5; i++) { + std::string expected2 = "msg-" + std::to_string(5 + i); + std::string actual2(polled2.messages[i].payload.begin(), polled2.messages[i].payload.end()); + EXPECT_EQ(actual2, expected2) << "polled2 payload mismatch at index " << i; + } + + auto polled3 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "next", 0, 5, true); + ASSERT_EQ(polled3.count, 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesConsumerIdIndependence) { + RecordProperty("description", "Verifies different consumer IDs maintain independent offsets."); + const std::string stream_name = "cpp-msg-consumer-indep"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled_c1 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "next", 0, 3, true); + ASSERT_EQ(polled_c1.count, 3u); + + auto polled_c2 = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(2), "next", 0, 5, true); + ASSERT_EQ(polled_c2.count, 5u); + + auto polled_c1_again = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "next", 0, 5, true); + ASSERT_EQ(polled_c1_again.count, 2u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesMultipleSendsThenPollOrder) { + RecordProperty("description", "Verifies message ordering is preserved across multiple send batches."); + const std::string stream_name = "cpp-msg-multi-batch-order"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec batch1; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("batch1-" + std::to_string(i))); + batch1.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(batch1)); + + rust::Vec batch2; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("batch2-" + std::to_string(i))); + batch2.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(batch2)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 10u); + for (std::uint32_t i = 0; i < 10; i++) { + EXPECT_EQ(polled.messages[i].offset, static_cast(i)) << "Offset mismatch at index " << i; + } + for (std::uint32_t i = 0; i < 5; i++) { + std::string expected = "batch1-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "batch1 payload mismatch at index " << i; + } + for (std::uint32_t i = 0; i < 5; i++) { + std::string expected = "batch2-" + std::to_string(i); + std::string actual(polled.messages[5 + i].payload.begin(), polled.messages[5 + i].payload.end()); + EXPECT_EQ(actual, expected) << "batch2 payload mismatch at index " << i; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesMultipleCustomIds) { + RecordProperty("description", "Verifies multiple messages with distinct custom IDs are all preserved."); + const std::string stream_name = "cpp-msg-multi-custom-ids"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + const std::uint64_t id_values[] = {100, 200, 300, 400, 500}; + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + msg.id_lo = id_values[i]; + msg.id_hi = 0; + messages.push_back(std::move(msg)); + } + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 5u); + for (std::uint32_t i = 0; i < 5; i++) { + EXPECT_EQ(polled.messages[i].id_lo, id_values[i]) << "ID mismatch at index " << i; + EXPECT_EQ(polled.messages[i].id_hi, 0u); + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesAfterStreamDeletedThrows) { + RecordProperty("description", "Throws when polling messages after the stream has been deleted."); + const std::string stream_name = "cpp-msg-deleted-stream"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + std::uint32_t saved_stream_id = stream.id; + client->delete_stream(make_numeric_identifier(saved_stream_id)); + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(saved_stream_id), make_numeric_identifier(0), 0, + "consumer", make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidPartitionIdThrows) { + RecordProperty("description", "Throws when polling with a non-existent partition ID."); + const std::string stream_name = "cpp-msg-invalid-partition"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 9999, "consumer", + make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithCountZeroThrows) { + RecordProperty("description", "Throws when polling with count=0."); + const std::string stream_name = "cpp-msg-count-zero"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 0, false), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithoutSpecifyingPartition) { + RecordProperty("description", + "Verifies polling with partition_id=u32::MAX defaults to partition 0 and returns messages."); + const std::string stream_name = "cpp-msg-no-partition"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), UINT32_MAX, + "consumer", make_numeric_identifier(1), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 5u); + ASSERT_EQ(polled.messages.size(), 5u); + for (std::uint32_t i = 0; i < 5; i++) { + std::string expected = "msg-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesTimestampStrategy) { + RecordProperty("description", + "Verifies timestamp polling strategy returns messages with timestamp >= the specified value."); + const std::string stream_name = "cpp-msg-timestamp-strategy"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec batch1; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("batch1-" + std::to_string(i))); + batch1.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(batch1)); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + rust::Vec batch2; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("batch2-" + std::to_string(i))); + batch2.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(batch2)); + + auto all = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 100, false); + ASSERT_EQ(all.count, 10u); + + std::uint64_t batch2_timestamp = all.messages[5].timestamp; + ASSERT_GT(batch2_timestamp, all.messages[0].timestamp); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(2), "timestamp", batch2_timestamp, 100, false); + + ASSERT_GE(polled.count, 5u); + for (std::size_t i = 0; i < polled.messages.size(); i++) { + EXPECT_GE(polled.messages[i].timestamp, batch2_timestamp) + << "Message at index " << i << " has earlier timestamp"; + } + for (std::size_t i = 0; i < polled.messages.size(); i++) { + std::string expected = "batch2-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i; + } + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesMonotonicOffsets) { + RecordProperty("description", + "Verifies offsets are monotonically increasing and continuous across multiple polls."); + const std::string stream_name = "cpp-msg-monotonic-offsets"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 20; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("mono-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + std::uint64_t expected_offset = 0; + for (int chunk = 0; chunk < 4; chunk++) { + auto polled = + client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", expected_offset, 5, false); + + ASSERT_EQ(polled.count, 5u) << "Chunk " << chunk; + ASSERT_EQ(polled.messages.size(), 5u) << "Chunk " << chunk; + + for (std::size_t i = 0; i < polled.messages.size(); i++) { + EXPECT_EQ(polled.messages[i].offset, expected_offset) << "Chunk " << chunk << " index " << i; + expected_offset++; + } + } + + ASSERT_EQ(expected_offset, 20u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesLargeBatch) { + RecordProperty("description", "Verifies sending a large batch of 1000 messages succeeds and all are retrievable."); + const std::string stream_name = "cpp-msg-large-batch"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 1000; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("batch-msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + + ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), + "partition_id", partition_id_bytes(0), std::move(messages))); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + make_numeric_identifier(1), "offset", 0, 1000, false); + + ASSERT_EQ(polled.count, 1000u); + ASSERT_EQ(polled.messages.size(), 1000u); + EXPECT_EQ(polled.messages[0].offset, 0u); + EXPECT_EQ(polled.messages[999].offset, 999u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, SendMessagesWithInvalidTopicIdThrows) { + RecordProperty("description", "Throws when sending messages with an invalid topic identifier."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + rust::Vec messages; + iggy::ffi::Message msg; + msg.new_message(to_payload("test")); + messages.push_back(std::move(msg)); + + iggy::ffi::Identifier invalid_id; + invalid_id.kind = "invalid"; + invalid_id.length = 0; + + ASSERT_THROW(client->send_messages(make_numeric_identifier(1), invalid_id, "partition_id", partition_id_bytes(0), + std::move(messages)), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidTopicIdThrows) { + RecordProperty("description", "Throws when polling messages with an invalid topic identifier."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + iggy::ffi::Identifier invalid_id; + invalid_id.kind = "invalid"; + invalid_id.length = 0; + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(1), invalid_id, 0, "consumer", + make_numeric_identifier(1), "offset", 0, 10, false), + std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerIdThrows) { + RecordProperty("description", "Throws when polling messages with an invalid consumer identifier."); + const std::string stream_name = "cpp-msg-invalid-consumer-id"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + iggy::ffi::Identifier invalid_id; + invalid_id.kind = "invalid"; + invalid_id.length = 0; + + ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", + invalid_id, "offset", 0, 10, false), + std::exception); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Message, ConsumerGroupCreateJoinAndPollMessages) { + RecordProperty("description", + "Creates a consumer group, joins it, sends messages, and polls them using consumer_group kind."); + const std::string stream_name = "cpp-msg-consumer-group"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + auto group = + client->create_consumer_group(make_numeric_identifier(stream.id), make_numeric_identifier(0), "test-group"); + ASSERT_EQ(group.members_count, 0u); + + ASSERT_NO_THROW(client->join_consumer_group(make_numeric_identifier(stream.id), make_numeric_identifier(0), + make_numeric_identifier(group.id))); + + auto group_after_join = client->get_consumer_group(make_numeric_identifier(stream.id), make_numeric_identifier(0), + make_numeric_identifier(group.id)); + ASSERT_EQ(group_after_join.members_count, 1u); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 10; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("cg-msg-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, + "consumer_group", make_numeric_identifier(group.id), "offset", 0, 100, false); + + ASSERT_EQ(polled.count, 10u); + ASSERT_EQ(polled.messages.size(), 10u); + for (std::uint32_t i = 0; i < 10; i++) { + std::string expected = "cg-msg-" + std::to_string(i); + std::string actual(polled.messages[i].payload.begin(), polled.messages[i].payload.end()); + EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i; + } + + ASSERT_NO_THROW(client->leave_consumer_group(make_numeric_identifier(stream.id), make_numeric_identifier(0), + make_numeric_identifier(group.id))); + + auto group_after_leave = client->get_consumer_group(make_numeric_identifier(stream.id), make_numeric_identifier(0), + make_numeric_identifier(group.id)); + ASSERT_EQ(group_after_leave.members_count, 0u); + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} diff --git a/foreign/cpp/tests/message/unit_tests.cpp b/foreign/cpp/tests/message/unit_tests.cpp new file mode 100644 index 0000000000..7e6917b065 --- /dev/null +++ b/foreign/cpp/tests/message/unit_tests.cpp @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include + +#include "lib.rs.h" + +TEST(MessageTest, NewMessageSetsPayloadAndLength) { + RecordProperty("description", "Verifies new_message sets payload and payload_length correctly."); + iggy::ffi::Message msg; + rust::Vec payload; + const std::string text = "hello world"; + for (const char c : text) { + payload.push_back(static_cast(c)); + } + + msg.new_message(std::move(payload)); + + ASSERT_EQ(msg.payload_length, static_cast(text.size())); + ASSERT_EQ(msg.payload.size(), text.size()); + for (std::size_t i = 0; i < text.size(); i++) { + EXPECT_EQ(msg.payload[i], static_cast(text[i])); + } +} + +TEST(MessageTest, NewMessageZerosHeaderFields) { + RecordProperty("description", "Verifies new_message initializes all header fields to zero."); + iggy::ffi::Message msg; + msg.checksum = 999; + msg.id_lo = 999; + msg.id_hi = 999; + msg.offset = 999; + msg.timestamp = 999; + msg.origin_timestamp = 999; + msg.reserved = 999; + + rust::Vec payload; + payload.push_back(0x42); + msg.new_message(std::move(payload)); + + EXPECT_EQ(msg.checksum, 0u); + EXPECT_EQ(msg.id_lo, 0u); + EXPECT_EQ(msg.id_hi, 0u); + EXPECT_EQ(msg.offset, 0u); + EXPECT_EQ(msg.timestamp, 0u); + EXPECT_EQ(msg.origin_timestamp, 0u); + EXPECT_EQ(msg.user_headers_length, 0u); + EXPECT_EQ(msg.reserved, 0u); + EXPECT_TRUE(msg.user_headers.empty()); +} + +TEST(MessageTest, NewMessageWithEmptyPayload) { + RecordProperty("description", "Verifies new_message accepts an empty payload."); + iggy::ffi::Message msg; + rust::Vec empty_payload; + + msg.new_message(std::move(empty_payload)); + + ASSERT_EQ(msg.payload_length, 0u); + ASSERT_EQ(msg.payload.size(), 0u); +} + +TEST(MessageTest, NewMessageWithSingleByte) { + RecordProperty("description", "Verifies new_message works with a single-byte payload."); + iggy::ffi::Message msg; + rust::Vec payload; + payload.push_back(0xFF); + + msg.new_message(std::move(payload)); + + ASSERT_EQ(msg.payload_length, 1u); + ASSERT_EQ(msg.payload.size(), 1u); + EXPECT_EQ(msg.payload[0], 0xFF); +} + +TEST(MessageTest, NewMessageWithNullBytes) { + RecordProperty("description", "Verifies new_message preserves null bytes in payload."); + iggy::ffi::Message msg; + rust::Vec payload; + payload.push_back(0x00); + payload.push_back(0x01); + payload.push_back(0x00); + + msg.new_message(std::move(payload)); + + ASSERT_EQ(msg.payload_length, 3u); + ASSERT_EQ(msg.payload.size(), 3u); + EXPECT_EQ(msg.payload[0], 0x00); + EXPECT_EQ(msg.payload[1], 0x01); + EXPECT_EQ(msg.payload[2], 0x00); +} + +TEST(MessageTest, NewMessageOverwritesPreviousState) { + RecordProperty("description", "Verifies calling new_message twice overwrites previous payload and fields."); + iggy::ffi::Message msg; + + rust::Vec payload1; + payload1.push_back(0xAA); + payload1.push_back(0xBB); + msg.new_message(std::move(payload1)); + msg.id_lo = 42; + + rust::Vec payload2; + payload2.push_back(0xCC); + msg.new_message(std::move(payload2)); + + ASSERT_EQ(msg.payload_length, 1u); + ASSERT_EQ(msg.payload.size(), 1u); + EXPECT_EQ(msg.payload[0], 0xCC); + EXPECT_EQ(msg.id_lo, 0u); +} + +TEST(MessageTest, NewMessageThenSetCustomId) { + RecordProperty("description", "Verifies custom ID can be set after new_message without affecting payload."); + iggy::ffi::Message msg; + rust::Vec payload; + payload.push_back(0x42); + msg.new_message(std::move(payload)); + + msg.id_lo = 100; + msg.id_hi = 200; + + EXPECT_EQ(msg.id_lo, 100u); + EXPECT_EQ(msg.id_hi, 200u); + ASSERT_EQ(msg.payload_length, 1u); + EXPECT_EQ(msg.payload[0], 0x42); +} + +TEST(MessageTest, NewMessageWithLargePayload) { + RecordProperty("description", "Verifies new_message handles a larger payload correctly."); + iggy::ffi::Message msg; + rust::Vec payload; + for (std::uint32_t i = 0; i < 10000; i++) { + payload.push_back(static_cast(i % 256)); + } + + msg.new_message(std::move(payload)); + + ASSERT_EQ(msg.payload_length, 10000u); + ASSERT_EQ(msg.payload.size(), 10000u); + EXPECT_EQ(msg.payload[0], 0u); + EXPECT_EQ(msg.payload[255], 255u); + EXPECT_EQ(msg.payload[256], 0u); +} diff --git a/foreign/cpp/tests/stream/low_level_e2e.cpp b/foreign/cpp/tests/stream/low_level_e2e.cpp index 43d0496dee..30d8a55a1c 100644 --- a/foreign/cpp/tests/stream/low_level_e2e.cpp +++ b/foreign/cpp/tests/stream/low_level_e2e.cpp @@ -19,6 +19,7 @@ #include #include +#include #include @@ -293,3 +294,173 @@ TEST(LowLevelE2E_Stream, GetStreamByNumericIdentifierReturnsStreamDetails) { ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); client = nullptr; } + +TEST(LowLevelE2E_Stream, GetStreamsReturnsEmptyAfterCleanup) { + RecordProperty("description", "Verifies get_streams returns empty vector after cleaning up all streams."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + auto streams = client->get_streams(); + for (const auto &s : streams) { + client->delete_stream(make_numeric_identifier(s.id)); + } + + streams = client->get_streams(); + ASSERT_EQ(streams.size(), 0); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Stream, GetStreamsReturnsStreamAfterCreation) { + RecordProperty("description", "Verifies created stream appears in get_streams result."); + const std::string stream_name = "cpp-stream-get-streams"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto streams = client->get_streams(); + ASSERT_GE(streams.size(), 1); + + bool found = false; + for (const auto &s : streams) { + if (std::string(s.name) == stream_name) { + found = true; + EXPECT_GT(s.created_at, static_cast(0)); + EXPECT_EQ(s.size_bytes, static_cast(0)); + EXPECT_EQ(s.messages_count, static_cast(0)); + EXPECT_EQ(s.topics_count, 0u); + break; + } + } + ASSERT_TRUE(found) << "Stream '" << stream_name << "' not found in get_streams result"; + + client->delete_stream(make_string_identifier(stream_name)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Stream, GetStreamsFieldsVerification) { + RecordProperty("description", + "Verifies get_streams returns correct field values after creating stream with topic and messages."); + const std::string stream_name = "cpp-stream-fields-verify"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + auto stream = client->get_stream(make_string_identifier(stream_name)); + client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, "none", 0, "never_expire", 0, + "server_default"); + + rust::Vec messages; + for (std::uint32_t i = 0; i < 5; i++) { + iggy::ffi::Message msg; + msg.new_message(to_payload("field-verify-message-" + std::to_string(i))); + messages.push_back(std::move(msg)); + } + client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", + partition_id_bytes(0), std::move(messages)); + + auto streams = client->get_streams(); + ASSERT_GE(streams.size(), 1u); + + bool found = false; + for (const auto &s : streams) { + if (std::string(s.name) == stream_name) { + found = true; + EXPECT_EQ(s.topics_count, 1u); + EXPECT_EQ(s.messages_count, 5u); + break; + } + } + ASSERT_TRUE(found) << "Stream '" << stream_name << "' not found in get_streams result"; + + client->delete_stream(make_numeric_identifier(stream.id)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); +} + +TEST(LowLevelE2E_Stream, GetStreamsBeforeLoginThrows) { + RecordProperty("description", "Throws when get_streams is called before authentication."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + + ASSERT_THROW(client->get_streams(), std::exception); + ASSERT_NO_THROW(client->connect()); + ASSERT_THROW(client->get_streams(), std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Stream, GetStreamsConsistentWithGetStream) { + RecordProperty("description", "Verifies get_streams result is consistent with get_stream for the same stream."); + const std::string stream_name = "cpp-stream-consistency"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + + std::string list_name; + std::uint32_t list_id = 0; + std::uint32_t list_topics_count = 0; + std::uint64_t list_created_at = 0; + std::uint64_t list_size_bytes = 0; + auto streams = client->get_streams(); + for (const auto &s : streams) { + if (std::string(s.name) == stream_name) { + list_name = std::string(s.name); + list_id = s.id; + list_topics_count = s.topics_count; + list_created_at = s.created_at; + list_size_bytes = s.size_bytes; + break; + } + } + ASSERT_FALSE(list_name.empty()) << "Stream '" << stream_name << "' not found in get_streams result"; + + auto single = client->get_stream(make_string_identifier(stream_name)); + auto single_name = std::string(single.name); + auto single_topics = single.topics_count; + + EXPECT_EQ(list_name, single_name); + EXPECT_EQ(list_id, single.id); + EXPECT_EQ(list_topics_count, single_topics); + EXPECT_EQ(list_created_at, single.created_at); + EXPECT_EQ(list_size_bytes, single.size_bytes); + + client->delete_stream(make_string_identifier(stream_name)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Stream, GetStreamsRepeatedCallsReturnSameResult) { + RecordProperty("description", "Verifies repeated get_streams calls return consistent results."); + const std::string stream_name = "cpp-stream-repeated"; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + client->create_stream(stream_name); + + auto streams1 = client->get_streams(); + auto streams2 = client->get_streams(); + auto streams3 = client->get_streams(); + + ASSERT_EQ(streams1.size(), streams2.size()); + ASSERT_EQ(streams2.size(), streams3.size()); + + auto contains_stream = [&](const rust::Vec &vec) { + for (const auto &s : vec) { + if (std::string(s.name) == stream_name) { + return true; + } + } + return false; + }; + + ASSERT_TRUE(contains_stream(streams1)) << "Stream not found in first call"; + ASSERT_TRUE(contains_stream(streams2)) << "Stream not found in second call"; + ASSERT_TRUE(contains_stream(streams3)) << "Stream not found in third call"; + + client->delete_stream(make_string_identifier(stream_name)); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +}