Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions core/binary_protocol/src/consensus/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub enum Operation {
CreatePartitions = 136,
DeletePartitions = 137,
// TODO: DeleteSegments is a partition operation (is_partition() == true) but its
// discriminant sits in the metadata range (128-147). Should be moved to 162 once
// discriminant sits in the metadata range (128-147). Should be moved to 163 once
// iggy_common's Operation enum is removed and wire compat is no longer a concern.
DeleteSegments = 138,
CreateConsumerGroup = 139,
Expand All @@ -53,6 +53,7 @@ pub enum Operation {
// Partition operations (routed by namespace)
SendMessages = 160,
StoreConsumerOffset = 161,
DeleteConsumerOffset = 162,
}

impl Operation {
Expand Down Expand Up @@ -90,7 +91,10 @@ impl Operation {
pub const fn is_partition(&self) -> bool {
matches!(
self,
Self::SendMessages | Self::StoreConsumerOffset | Self::DeleteSegments
Self::SendMessages
| Self::StoreConsumerOffset
| Self::DeleteConsumerOffset
| Self::DeleteSegments
)
}

Expand Down Expand Up @@ -122,7 +126,8 @@ impl Operation {
| Self::CreatePersonalAccessToken
| Self::DeletePersonalAccessToken
| Self::SendMessages
| Self::StoreConsumerOffset => match crate::dispatch::lookup_by_operation(*self) {
| Self::StoreConsumerOffset
| Self::DeleteConsumerOffset => match crate::dispatch::lookup_by_operation(*self) {
Some(meta) => Some(meta.code),
None => None,
},
Expand Down Expand Up @@ -170,6 +175,7 @@ mod tests {
Operation::DeletePersonalAccessToken,
Operation::SendMessages,
Operation::StoreConsumerOffset,
Operation::DeleteConsumerOffset,
];
for op in ops {
let code = op
Expand Down Expand Up @@ -202,5 +208,6 @@ mod tests {
assert!(Operation::SendMessages.is_partition());
assert!(!Operation::SendMessages.is_metadata());
assert!(Operation::DeleteSegments.is_partition());
assert!(Operation::DeleteConsumerOffset.is_partition());
}
}
8 changes: 7 additions & 1 deletion core/binary_protocol/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[
"consumer_offset.store",
Operation::StoreConsumerOffset,
),
CommandMeta::non_replicated(DELETE_CONSUMER_OFFSET_CODE, "consumer_offset.delete"),
CommandMeta::replicated(
DELETE_CONSUMER_OFFSET_CODE,
"consumer_offset.delete",
Operation::DeleteConsumerOffset,
),
// Streams
CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"),
CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"),
Expand Down Expand Up @@ -260,6 +264,7 @@ pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta>
Operation::DeletePersonalAccessToken => 18,
Operation::SendMessages => 21,
Operation::StoreConsumerOffset => 24,
Operation::DeleteConsumerOffset => 25,
Operation::Reserved => return None,
};
Some(&COMMAND_TABLE[idx])
Expand Down Expand Up @@ -378,6 +383,7 @@ mod tests {
Operation::DeletePersonalAccessToken,
Operation::SendMessages,
Operation::StoreConsumerOffset,
Operation::DeleteConsumerOffset,
];
for op in replicated_ops {
let meta = lookup_by_operation(op)
Expand Down
1 change: 1 addition & 0 deletions core/consensus/src/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ pub const fn operation_as_str(operation: Operation) -> &'static str {
Operation::DeletePersonalAccessToken => "delete_personal_access_token",
Operation::SendMessages => "send_messages",
Operation::StoreConsumerOffset => "store_consumer_offset",
Operation::DeleteConsumerOffset => "delete_consumer_offset",
}
}

Expand Down
247 changes: 211 additions & 36 deletions core/partitions/src/iggy_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::journal::{
MessageLookup, PartitionJournal, PartitionJournalMemStorage, QueryableJournal,
};
use crate::log::SegmentedLog;
use crate::offset_storage::{delete_persisted_offset, persist_offset};
use crate::{
AppendResult, Partition, PartitionOffsets, PollFragments, PollQueryResult, PollingArgs,
PollingConsumer,
Expand All @@ -30,6 +31,7 @@ use iggy_common::{
send_messages2::stamp_prepare_for_persistence,
};
use journal::Journal as _;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex as TokioMutex;
Expand All @@ -52,6 +54,57 @@ pub struct IggyPartition {
pub revision_id: u64,
pub should_increment_offset: bool,
pub write_lock: Arc<TokioMutex<()>>,
consumer_offsets_path: Option<String>,
consumer_group_offsets_path: Option<String>,
pending_consumer_offset_commits: HashMap<u64, PendingConsumerOffsetCommit>,
}

#[derive(Debug, Clone, Copy, PartialEq)]
struct PendingConsumerOffsetCommit {
kind: ConsumerKind,
consumer_id: u32,
mutation: PendingConsumerOffsetMutation,
}

#[derive(Debug, Clone, Copy, PartialEq)]
enum PendingConsumerOffsetMutation {
Upsert(u64),
Delete,
}

impl PendingConsumerOffsetCommit {
const fn upsert(kind: ConsumerKind, consumer_id: u32, offset: u64) -> Self {
Self {
kind,
consumer_id,
mutation: PendingConsumerOffsetMutation::Upsert(offset),
}
}

const fn delete(kind: ConsumerKind, consumer_id: u32) -> Self {
Self {
kind,
consumer_id,
mutation: PendingConsumerOffsetMutation::Delete,
}
}

fn try_from_polling_consumer(
consumer: PollingConsumer,
offset: u64,
) -> Result<Self, IggyError> {
let (kind, consumer_id) = match consumer {
PollingConsumer::Consumer(id, _) => (
ConsumerKind::Consumer,
u32::try_from(id).map_err(|_| IggyError::InvalidCommand)?,
),
PollingConsumer::ConsumerGroup(group_id, _) => (
ConsumerKind::ConsumerGroup,
u32::try_from(group_id).map_err(|_| IggyError::InvalidCommand)?,
),
};
Ok(Self::upsert(kind, consumer_id, offset))
}
}

impl IggyPartition {
Expand All @@ -67,6 +120,158 @@ impl IggyPartition {
revision_id: 0,
should_increment_offset: false,
write_lock: Arc::new(TokioMutex::new(())),
consumer_offsets_path: None,
consumer_group_offsets_path: None,
pending_consumer_offset_commits: HashMap::new(),
}
}

pub fn configure_consumer_offset_storage(
&mut self,
consumer_offsets_path: String,
consumer_group_offsets_path: String,
consumer_offsets: ConsumerOffsets,
consumer_group_offsets: ConsumerGroupOffsets,
) {
self.consumer_offsets = Arc::new(consumer_offsets);
self.consumer_group_offsets = Arc::new(consumer_group_offsets);
self.consumer_offsets_path = Some(consumer_offsets_path);
self.consumer_group_offsets_path = Some(consumer_group_offsets_path);
}

pub(crate) async fn persist_and_stage_consumer_offset_upsert(
&mut self,
op: u64,
kind: ConsumerKind,
consumer_id: u32,
offset: u64,
) -> Result<(), IggyError> {
let pending = PendingConsumerOffsetCommit::upsert(kind, consumer_id, offset);
self.persist_consumer_offset_commit(pending).await?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer offsets are persisted to disk here during the prepare phase (before quorum), unlike SendMessages which only writes to the in-memory journal during prepare and persists to disk during commit (commit_messages).

if the prepare never commits (leader crash, view change), disk has uncommitted values with no rollback mechanism. on restart, the uncommitted offset file is loaded as if it were committed.

recommended fix: move the persist_consumer_offset_commit call from persist_and_stage_* (prepare phase) to commit_consumer_offset_entry (commit phase). during prepare, only stage in-memory. this eliminates the rollback problem entirely and aligns with the SendMessages pattern.

Copy link
Copy Markdown
Contributor Author

@numinnex numinnex Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the behavior of treating written offset as committed by default is fine, as in scenario where we restart a replica after crash we perform state transfer, which would overwrite the currently commited offset for that particular consumer.

self.pending_consumer_offset_commits.insert(op, pending);
Ok(())
}

pub(crate) async fn persist_and_stage_consumer_offset_delete(
&mut self,
op: u64,
kind: ConsumerKind,
consumer_id: u32,
) -> Result<(), IggyError> {
let pending = PendingConsumerOffsetCommit::delete(kind, consumer_id);
self.persist_consumer_offset_commit(pending).await?;
self.pending_consumer_offset_commits.insert(op, pending);
Ok(())
}

pub(crate) fn apply_staged_consumer_offset_commit(&mut self, op: u64) -> Result<(), IggyError> {
let pending = self
.pending_consumer_offset_commits
.remove(&op)
.ok_or(IggyError::InvalidCommand)?;
self.apply_consumer_offset_commit(pending);
Ok(())
}

async fn persist_consumer_offset_commit(
&self,
pending: PendingConsumerOffsetCommit,
) -> Result<(), IggyError> {
let Some(path) = self.persisted_offset_path(pending.kind, pending.consumer_id) else {
return Ok(());
};
match pending.mutation {
PendingConsumerOffsetMutation::Upsert(offset) => persist_offset(&path, offset).await,
PendingConsumerOffsetMutation::Delete => delete_persisted_offset(&path).await,
}
}

fn apply_consumer_offset_commit(&self, pending: PendingConsumerOffsetCommit) {
match pending.mutation {
PendingConsumerOffsetMutation::Upsert(offset)
if pending.kind == ConsumerKind::Consumer =>
{
let id = pending.consumer_id;
let guard = self.consumer_offsets.pin();
let key = usize::try_from(id).expect("u32 consumer id must fit usize");
if let Some(existing) = guard.get(&key) {
existing.offset.store(offset, Ordering::Relaxed);
} else {
let created = self.consumer_offsets_path.as_deref().map_or_else(
|| ConsumerOffset::new(ConsumerKind::Consumer, id, 0, String::new()),
|path| ConsumerOffset::default_for_consumer(id, path),
);
created.offset.store(offset, Ordering::Relaxed);
guard.insert(key, created);
}
}
PendingConsumerOffsetMutation::Upsert(offset)
if pending.kind == ConsumerKind::ConsumerGroup =>
{
let group_id = pending.consumer_id;
let guard = self.consumer_group_offsets.pin();
let key = ConsumerGroupId(
usize::try_from(group_id).expect("u32 group id must fit usize"),
);
if let Some(existing) = guard.get(&key) {
existing.offset.store(offset, Ordering::Relaxed);
} else {
let created = self.consumer_group_offsets_path.as_deref().map_or_else(
|| {
ConsumerOffset::new(
ConsumerKind::ConsumerGroup,
group_id,
0,
String::new(),
)
},
|path| ConsumerOffset::default_for_consumer_group(key, path),
);
created.offset.store(offset, Ordering::Relaxed);
guard.insert(key, created);
}
}
PendingConsumerOffsetMutation::Delete if pending.kind == ConsumerKind::Consumer => {
let id = pending.consumer_id;
let guard = self.consumer_offsets.pin();
let key = usize::try_from(id).expect("u32 consumer id must fit usize");
let _ = guard.remove(&key);
}
PendingConsumerOffsetMutation::Delete
if pending.kind == ConsumerKind::ConsumerGroup =>
{
let group_id = pending.consumer_id;
let guard = self.consumer_group_offsets.pin();
let key = ConsumerGroupId(
usize::try_from(group_id).expect("u32 group id must fit usize"),
);
let _ = guard.remove(&key);
}
_ => {}
}
}

async fn store_consumer_offset_and_persist(
&self,
consumer: PollingConsumer,
offset: u64,
) -> Result<(), IggyError> {
let pending = PendingConsumerOffsetCommit::try_from_polling_consumer(consumer, offset)?;
self.persist_consumer_offset_commit(pending).await?;
self.apply_consumer_offset_commit(pending);
Ok(())
}

fn persisted_offset_path(&self, kind: ConsumerKind, consumer_id: u32) -> Option<String> {
match kind {
ConsumerKind::Consumer => self
.consumer_offsets_path
.as_ref()
.map(|path| format!("{path}/{consumer_id}")),
ConsumerKind::ConsumerGroup => self
.consumer_group_offsets_path
.as_ref()
.map(|path| format!("{path}/{consumer_id}")),
}
}
}
Expand Down Expand Up @@ -191,7 +396,10 @@ impl Partition for IggyPartition {
if args.auto_commit && !fragments.is_empty() {
let last_offset =
last_matching_offset.expect("non-empty poll result must have a last offset");
if let Err(err) = self.store_consumer_offset(consumer, last_offset) {
if let Err(err) = self
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poll_messages with auto_commit=true calls store_consumer_offset_and_persist which persists and applies directly without going through consensus replication. this creates three divergent write paths for consumer offsets:

  1. replicated via StoreConsumerOffset operation (new in this PR) - full prepare/commit cycle
  2. local-only persist + apply via auto-commit here - no replication
  3. in-memory-only via Partition::store_consumer_offset trait method (line 473) - no persistence, no replication

on failover, the new leader has no record of auto-committed offsets since they were never replicated. this is a pre-existing pattern, but now inconsistent since explicit StoreConsumerOffset IS replicated. worth documenting whether this is intentional (local optimization for consumer progress) or should be migrated to the consensus path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for auto commit polls, at the end of the command_handler construct new Message<Prepare>, with StoreConsumerOffset command and run it through the on_message handler, the same way an StoreConsumerOffset client request would.

Additionally I think we should expand the StoreConsumerOffset command to allow for different levels of consistency such as no_ack or quorum, something that kafka does for SendMessages. This way the auto_commit from PollMessages command, could inherit that aswell.

.store_consumer_offset_and_persist(consumer, last_offset)
.await
{
// warning for now.
warn!(
target: "iggy.partitions.diag",
Expand All @@ -212,41 +420,8 @@ impl Partition for IggyPartition {
consumer: PollingConsumer,
offset: u64,
) -> Result<(), IggyError> {
match consumer {
PollingConsumer::Consumer(id, _) => {
let guard = self.consumer_offsets.pin();
if let Some(existing) = guard.get(&id) {
existing.offset.store(offset, Ordering::Relaxed);
} else {
guard.insert(
id,
ConsumerOffset::new(
ConsumerKind::Consumer,
id as u32,
offset,
String::new(),
),
);
}
}
PollingConsumer::ConsumerGroup(group_id, _) => {
let guard = self.consumer_group_offsets.pin();
let key = ConsumerGroupId(group_id);
if let Some(existing) = guard.get(&key) {
existing.offset.store(offset, Ordering::Relaxed);
} else {
guard.insert(
key,
ConsumerOffset::new(
ConsumerKind::ConsumerGroup,
group_id as u32,
offset,
String::new(),
),
);
}
}
}
let pending = PendingConsumerOffsetCommit::try_from_polling_consumer(consumer, offset)?;
self.apply_consumer_offset_commit(pending);
Ok(())
}

Expand Down
Loading
Loading