feat(partitions): implement StoreConsumerOffset and DeleteConsumerOffset#3071
feat(partitions): implement StoreConsumerOffset and DeleteConsumerOffset#3071
Conversation
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (0.22%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## master #3071 +/- ##
=============================================
- Coverage 70.67% 58.53% -12.15%
Complexity 943 943
=============================================
Files 1114 1114
Lines 94780 94577 -203
Branches 71980 71794 -186
=============================================
- Hits 66989 55361 -11628
- Misses 25319 36833 +11514
+ Partials 2472 2383 -89
🚀 New features to boost your workflow:
|
| Self::parse_staged_consumer_offset_commit(header.operation, &message)?; | ||
| let write_lock = self | ||
| .get_by_ns(namespace) | ||
| .expect("store_consumer_offset: partition not found for namespace") |
There was a problem hiding this comment.
Return IggyError instead of using expect.
| ); | ||
| let _ = guard.remove(&key); | ||
| } | ||
| _ => {} |
There was a problem hiding this comment.
Instead of silent catch-all. Make it unreachable!.
| file.write_all_at(buf, 0) | ||
| .await | ||
| .0 | ||
| .map_err(|_| IggyError::CannotWriteToFile)?; |
There was a problem hiding this comment.
persist_offset writes via write_all_at without sync_data() or sync_all(). the rest of the codebase (messages_writer.rs:90-91, iggy_index_writer.rs:92-93) conditionally calls self.fsync().await when enforce_fsync is true. send_prepare_ok is called with Some(true) at iggy_partition.rs:1322, which tells the primary this replica has durably persisted - but the data may only be in the page cache. this is a false durability claim to the consensus layer.
also: OpenOptions is missing .truncate(true) - safe today (always 8 bytes at offset 0), but fragile if the format changes.
fix: accept enforce_fsync from PartitionsConfig and call file.sync_data().await when enabled.
| pub write_lock: Arc<TokioMutex<()>>, | ||
| consumer_offsets_path: Option<String>, | ||
| consumer_group_offsets_path: Option<String>, | ||
| pending_consumer_offset_commits: HashMap<u64, PendingConsumerOffsetCommit>, |
There was a problem hiding this comment.
pending_consumer_offset_commits has two compounding problems:
-
unbounded growth on followers: entries are inserted during
on_replicate(lines 203, 215) but only removed viaon_ack->handle_committed_entries->apply_staged_consumer_offset_commit(line 222). followers never reach that path becauseack_preflight(plane_helpers.rs:161) returnsErr(NotPrimary). the HashMap grows monotonically on every follower - ~88 bytes/entry, 1M consumer offset ops = ~88MB leaked. -
orphaned on view change:
reset_view_change_stateclears the pipeline but has no hook intoIggyPartition. after view change, orphaned entries with reused op numbers could shadow new ones.
this also means followers never apply consumer offset commits to their in-memory state (consumer_offsets / consumer_group_offsets), so follower reads are stale/missing. the metadata plane has commit_journal() called on followers (metadata.rs:417-419) but the partition plane has no equivalent follower-side commit path.
fix: (a) followers should apply consumer offset commits directly during on_replicate (they don't need staging since they don't send client replies), (b) add a view-change cleanup hook that clears this HashMap.
| offset: u64, | ||
| ) -> Result<(), IggyError> { | ||
| let pending = PendingConsumerOffsetCommit::upsert(kind, consumer_id, offset); | ||
| self.persist_consumer_offset_commit(pending).await?; |
There was a problem hiding this comment.
consumer offsets are persisted to disk here during the prepare phase (before quorum), unlike SendMessages which only writes to the in-memory journal during prepare and persists to disk during commit (commit_messages).
if the prepare never commits (leader crash, view change), disk has uncommitted values with no rollback mechanism. on restart, the uncommitted offset file is loaded as if it were committed.
recommended fix: move the persist_consumer_offset_commit call from persist_and_stage_* (prepare phase) to commit_consumer_offset_entry (commit phase). during prepare, only stage in-memory. this eliminates the rollback problem entirely and aligns with the SendMessages pattern.
| let consensus = self.consensus(); | ||
| replicate_to_next_in_chain(consensus, message).await | ||
| }; | ||
| if let Err(error) = self.apply_replicated_operation(message).await { |
There was a problem hiding this comment.
if apply_replicated_operation fails here, the early return at line 625 skips sequencer().set_sequence(header.op) at line 630 and send_prepare_ok at line 640. but replicate_to_next_in_chain at line 610 already forwarded the prepare to the next replica BEFORE this point.
result: chain continues, quorum may be reached, op commits cluster-wide - but this replica permanently misses it. debug_assert_eq!(header.op, current_op + 1) at line 627 fires in debug, but in release the replica silently diverges. subsequent prepares arrive with op numbers that no longer match the sequencer.
this is a design divergence from the metadata plane, which advances the sequencer unconditionally before the journal append (metadata.rs:394-396).
fix: advance the sequencer before apply_replicated_operation (matching the metadata plane), or advance it unconditionally regardless of apply outcome. if the operation truly can't be applied, the replica should request state transfer rather than silently diverging.
| operation: Operation, | ||
| message: &Message<PrepareHeader>, | ||
| ) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> { | ||
| let total_size = message.header().size as usize; |
There was a problem hiding this comment.
header.size as usize is used to slice message.as_slice() without bounds validation. if total_size > message.as_slice().len() this panics. the call chain is on the replication hot path: on_replicate -> apply_replicated_operation -> parse_staged_consumer_offset_commit. a malformed prepare from the primary (or protocol version mismatch during rolling upgrade) would crash every follower.
note: the subsequent body parsing (lines 1085-1103) correctly uses checked access (.first(), .get(1..5), .get(5..13)). only this initial slice creation lacks bounds checking.
fix: message.as_slice().get(std::mem::size_of::<PrepareHeader>()..total_size).ok_or(IggyError::InvalidCommand)?
| let last_offset = | ||
| last_matching_offset.expect("non-empty poll result must have a last offset"); | ||
| if let Err(err) = self.store_consumer_offset(consumer, last_offset) { | ||
| if let Err(err) = self |
There was a problem hiding this comment.
poll_messages with auto_commit=true calls store_consumer_offset_and_persist which persists and applies directly without going through consensus replication. this creates three divergent write paths for consumer offsets:
- replicated via
StoreConsumerOffsetoperation (new in this PR) - full prepare/commit cycle - local-only persist + apply via auto-commit here - no replication
- in-memory-only via
Partition::store_consumer_offsettrait method (line 473) - no persistence, no replication
on failover, the new leader has no record of auto-committed offsets since they were never replicated. this is a pre-existing pattern, but now inconsistent since explicit StoreConsumerOffset IS replicated. worth documenting whether this is intentional (local optimization for consumer progress) or should be migrated to the consensus path.
Implement
StoreConsumerOffsetandDeleteConsumerOffsetas replicated operations on partition.