Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2915c8f
feat(cpp): add get_streams FFI function and Stream shared struct
seokjin0414 Mar 29, 2026
761e41b
feat(cpp): add send_messages FFI function and IggyMessageToSend share…
seokjin0414 Mar 29, 2026
1a31f58
feat(cpp): add poll_messages FFI function with IggyMessagePolled and …
seokjin0414 Mar 29, 2026
730700c
style(cpp): apply rustfmt formatting
seokjin0414 Mar 29, 2026
fca1ac0
test(cpp): add e2e tests for get_streams, send_messages, and poll_mes…
seokjin0414 Mar 29, 2026
3388f84
fix(cpp): address code review findings for FFI messaging functions
seokjin0414 Mar 29, 2026
cc6ffc0
fix(cpp): apply clang-format and remove fragile stream id assertion
seokjin0414 Mar 29, 2026
b1b347d
refactor(cpp): address review feedback - unify Message struct and imp…
seokjin0414 Mar 29, 2026
61dd065
style(cpp): fix clang-format alignment in message test
seokjin0414 Mar 29, 2026
100d846
refactor(cpp): change new_message from free function to impl Message …
seokjin0414 Mar 30, 2026
ae52d51
test(cpp): move get_streams tests from message/ to stream/ directory
seokjin0414 Mar 30, 2026
f5bcb5e
test(cpp): add comprehensive e2e tests for get_streams, send_messages…
seokjin0414 Mar 30, 2026
9ffc8c6
test(cpp): add overlooked poll_messages e2e tests for partition, coun…
seokjin0414 Apr 1, 2026
9a48392
test(cpp): add large batch send and large count poll e2e tests
seokjin0414 Apr 1, 2026
41a7050
test(cpp): add unit tests for Message.new_message
seokjin0414 Apr 1, 2026
1dbc51f
fix(cpp): fix PollMessagesWithCountZero test — server rejects count=0
seokjin0414 Apr 1, 2026
9656b73
refactor(cpp): address review feedback — rename tests, strengthen ass…
seokjin0414 Apr 4, 2026
596b9d8
fix(cpp): remove unneeded clippy allow on identifier.rs
seokjin0414 Apr 4, 2026
41a3f03
feat(cpp): add join/leave consumer group FFI and address review feedback
seokjin0414 Apr 7, 2026
64d9d08
chore(cpp): add TODO for join/leave consumer group tests
seokjin0414 Apr 8, 2026
af1c9e6
fix(cpp): add id comparison to GetStreamsConsistentWithGetStream test
seokjin0414 Apr 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions foreign/cpp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ignored = ["cxx-build"]
crate-type = ["staticlib"]

[dependencies]
bytes = "1.11.1"
cxx = "1.0.194"
iggy = { path = "../../core/sdk" }
iggy_common = { path = "../../core/common" }
Expand Down
1 change: 1 addition & 0 deletions foreign/cpp/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ fn main() {
println!("cargo:rerun-if-changed=src/consumer_group.rs");
println!("cargo:rerun-if-changed=src/identifier.rs");
println!("cargo:rerun-if-changed=src/lib.rs");
println!("cargo:rerun-if-changed=src/messages.rs");
println!("cargo:rerun-if-changed=src/stream.rs");
println!("cargo:rerun-if-changed=src/topic.rs");
}
204 changes: 201 additions & 3 deletions foreign/cpp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

use crate::{RUNTIME, ffi};
use iggy::prelude::{
Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm,
ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as RustIggyClient,
Client as IggyConnectionClient, CompressionAlgorithm as RustCompressionAlgorithm, Consumer,
ConsumerGroupClient, ConsumerKind, Identifier as RustIdentifier, IggyClient as RustIggyClient,
IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as RustIggyExpiry,
MaxTopicSize as RustMaxTopicSize, PartitionClient, StreamClient, TopicClient, UserClient,
IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, MessageClient, PartitionClient,
Partitioning, PollingStrategy, StreamClient, TopicClient, UserClient,
};
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -73,6 +74,17 @@ impl Client {
})
}

pub fn get_streams(&self) -> Result<Vec<ffi::Stream>, String> {
RUNTIME.block_on(async {
let streams = self
.inner
.get_streams()
.await
.map_err(|error| format!("Could not get streams: {error}"))?;
Ok(streams.into_iter().map(ffi::Stream::from).collect())
})
}

pub fn create_stream(&self, stream_name: String) -> Result<(), String> {
RUNTIME.block_on(async {
self.inner
Expand Down Expand Up @@ -127,6 +139,132 @@ impl Client {
// })
// }

#[allow(clippy::too_many_arguments)]
pub fn send_messages(
&self,
stream_id: ffi::Identifier,
topic_id: ffi::Identifier,
partitioning_kind: String,
partitioning_value: Vec<u8>,
messages: Vec<ffi::Message>,
) -> Result<(), String> {
let rust_stream_id = RustIdentifier::try_from(stream_id)
.map_err(|error| format!("Could not send messages: {error}"))?;
let rust_topic_id = RustIdentifier::try_from(topic_id)
.map_err(|error| format!("Could not send messages: {error}"))?;

let partitioning = match partitioning_kind.as_str() {
"balanced" => Partitioning::balanced(),
"partition_id" => {
if partitioning_value.len() < 4 {
return Err(
"Could not send messages: partition_id requires 4 bytes".to_string()
);
}
let id = u32::from_le_bytes(partitioning_value[..4].try_into().map_err(|_| {
"Could not send messages: invalid partition_id value".to_string()
})?);
Partitioning::partition_id(id)
}
"messages_key" => Partitioning::messages_key(&partitioning_value).map_err(|error| {
format!("Could not send messages: invalid messages key: {error}")
})?,
_ => {
return Err(format!(
"Could not send messages: invalid partitioning kind: {partitioning_kind}"
));
}
};

let mut iggy_messages: Vec<IggyMessage> = messages
.into_iter()
.map(IggyMessage::try_from)
.collect::<Result<Vec<_>, _>>()?;

RUNTIME.block_on(async {
self.inner
.send_messages(
&rust_stream_id,
&rust_topic_id,
&partitioning,
&mut iggy_messages,
)
.await
.map_err(|error| format!("Could not send messages: {error}"))?;
Ok(())
})
}

#[allow(clippy::too_many_arguments)]
pub fn poll_messages(
&self,
stream_id: ffi::Identifier,
topic_id: ffi::Identifier,
partition_id: u32,
consumer_kind: String,
consumer_id: ffi::Identifier,
polling_strategy_kind: String,
polling_strategy_value: u64,
count: u32,
auto_commit: bool,
) -> Result<ffi::PolledMessages, String> {
let rust_stream_id = RustIdentifier::try_from(stream_id)
.map_err(|error| format!("Could not poll messages: {error}"))?;
let rust_topic_id = RustIdentifier::try_from(topic_id)
.map_err(|error| format!("Could not poll messages: {error}"))?;
let rust_consumer_id = RustIdentifier::try_from(consumer_id)
.map_err(|error| format!("Could not poll messages: {error}"))?;

let consumer = Consumer {
kind: match consumer_kind.as_str() {
"consumer" => ConsumerKind::Consumer,
"consumer_group" => ConsumerKind::ConsumerGroup,
_ => {
return Err(format!(
"Could not poll messages: invalid consumer kind: {consumer_kind}"
));
}
},
id: rust_consumer_id,
};

let strategy = match polling_strategy_kind.as_str() {
"offset" => PollingStrategy::offset(polling_strategy_value),
"timestamp" => PollingStrategy::timestamp(IggyTimestamp::from(polling_strategy_value)),
"first" => PollingStrategy::first(),
"last" => PollingStrategy::last(),
"next" => PollingStrategy::next(),
_ => {
return Err(format!(
"Could not poll messages: invalid polling strategy: {polling_strategy_kind}"
));
}
};

let opt_partition = if partition_id == u32::MAX {
None
} else {
Some(partition_id)
};

RUNTIME.block_on(async {
let polled = self
.inner
.poll_messages(
&rust_stream_id,
&rust_topic_id,
opt_partition,
&consumer,
&strategy,
count,
auto_commit,
)
.await
.map_err(|error| format!("Could not poll messages: {error}"))?;
Ok(ffi::PolledMessages::from(polled))
})
}

#[allow(clippy::too_many_arguments)]
pub fn create_topic(
&self,
Expand Down Expand Up @@ -378,6 +516,66 @@ impl Client {
Ok(())
})
}

pub fn join_consumer_group(
&self,
stream_id: ffi::Identifier,
topic_id: ffi::Identifier,
group_id: ffi::Identifier,
) -> Result<(), String> {
let rust_stream_id = RustIdentifier::try_from(stream_id).map_err(|error| {
format!("Could not join consumer group: invalid stream identifier: {error}")
})?;
let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| {
format!("Could not join consumer group: invalid topic identifier: {error}")
})?;
let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error| {
format!("Could not join consumer group: invalid group identifier: {error}")
})?;

RUNTIME.block_on(async {
self.inner
.join_consumer_group(&rust_stream_id, &rust_topic_id, &rust_group_id)
.await
.map_err(|error| {
format!(
"Could not join consumer group '{}' for topic '{}' on stream '{}': {error}",
rust_group_id, rust_topic_id, rust_stream_id
)
})?;
Ok(())
})
}

pub fn leave_consumer_group(
&self,
stream_id: ffi::Identifier,
topic_id: ffi::Identifier,
group_id: ffi::Identifier,
) -> Result<(), String> {
let rust_stream_id = RustIdentifier::try_from(stream_id).map_err(|error| {
format!("Could not leave consumer group: invalid stream identifier: {error}")
})?;
let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| {
format!("Could not leave consumer group: invalid topic identifier: {error}")
})?;
let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error| {
format!("Could not leave consumer group: invalid group identifier: {error}")
})?;

RUNTIME.block_on(async {
self.inner
.leave_consumer_group(&rust_stream_id, &rust_topic_id, &rust_group_id)
.await
.map_err(|error| {
format!(
"Could not leave consumer group '{}' for topic '{}' on stream '{}': {error}",
rust_group_id, rust_topic_id, rust_stream_id
)
})?;
Ok(())
})
}
}

pub unsafe fn delete_connection(client: *mut Client) -> Result<(), String> {
Expand Down
70 changes: 70 additions & 0 deletions foreign/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
mod client;
mod consumer_group;
mod identifier;
mod messages;
mod stream;
mod topic;

Expand Down Expand Up @@ -51,6 +52,36 @@ mod ffi {
partitions_count: u32,
}

struct Stream {
id: u32,
created_at: u64,
name: String,
size_bytes: u64,
messages_count: u64,
topics_count: u32,
}

struct Message {
checksum: u64,
id_lo: u64,
id_hi: u64,
offset: u64,
timestamp: u64,
origin_timestamp: u64,
user_headers_length: u32,
payload_length: u32,
reserved: u64,
payload: Vec<u8>,
user_headers: Vec<u8>,
}

struct PolledMessages {
partition_id: u32,
current_offset: u64,
count: u32,
messages: Vec<Message>,
}

struct StreamDetails {
id: u32,
created_at: u64,
Expand Down Expand Up @@ -83,6 +114,7 @@ mod ffi {
fn login_user(self: &Client, username: String, password: String) -> Result<()>;
fn connect(self: &Client) -> Result<()>;
fn create_stream(self: &Client, stream_name: String) -> Result<()>;
fn get_streams(self: &Client) -> Result<Vec<Stream>>;
fn get_stream(self: &Client, stream_id: Identifier) -> Result<StreamDetails>;
fn delete_stream(self: &Client, stream_id: Identifier) -> Result<()>;
// fn purge_stream(&self, stream_id: Identifier) -> Result<()>;
Expand Down Expand Up @@ -129,6 +161,44 @@ mod ffi {
topic_id: Identifier,
group_id: Identifier,
) -> Result<()>;
fn join_consumer_group(
self: &Client,
stream_id: Identifier,
topic_id: Identifier,
group_id: Identifier,
) -> Result<()>;
fn leave_consumer_group(
self: &Client,
stream_id: Identifier,
topic_id: Identifier,
group_id: Identifier,
) -> Result<()>;

#[allow(clippy::too_many_arguments)]
fn poll_messages(
self: &Client,
stream_id: Identifier,
topic_id: Identifier,
partition_id: u32,
consumer_kind: String,
consumer_id: Identifier,
polling_strategy_kind: String,
polling_strategy_value: u64,
count: u32,
auto_commit: bool,
) -> Result<PolledMessages>;

fn new_message(self: &mut Message, payload: Vec<u8>);

#[allow(clippy::too_many_arguments)]
fn send_messages(
self: &Client,
stream_id: Identifier,
topic_id: Identifier,
partitioning_kind: String,
partitioning_value: Vec<u8>,
messages: Vec<Message>,
) -> Result<()>;

unsafe fn delete_connection(client: *mut Client) -> Result<()>;

Expand Down
Loading
Loading