diff --git a/core/binary_protocol/src/consensus/operation.rs b/core/binary_protocol/src/consensus/operation.rs index c17733debc..d275269aeb 100644 --- a/core/binary_protocol/src/consensus/operation.rs +++ b/core/binary_protocol/src/consensus/operation.rs @@ -37,7 +37,7 @@ pub enum Operation { CreatePartitions = 136, DeletePartitions = 137, // TODO: DeleteSegments is a partition operation (is_partition() == true) but its - // discriminant sits in the metadata range (128-147). Should be moved to 162 once + // discriminant sits in the metadata range (128-147). Should be moved to 163 once // iggy_common's Operation enum is removed and wire compat is no longer a concern. DeleteSegments = 138, CreateConsumerGroup = 139, @@ -53,6 +53,7 @@ pub enum Operation { // Partition operations (routed by namespace) SendMessages = 160, StoreConsumerOffset = 161, + DeleteConsumerOffset = 162, } impl Operation { @@ -90,7 +91,10 @@ impl Operation { pub const fn is_partition(&self) -> bool { matches!( self, - Self::SendMessages | Self::StoreConsumerOffset | Self::DeleteSegments + Self::SendMessages + | Self::StoreConsumerOffset + | Self::DeleteConsumerOffset + | Self::DeleteSegments ) } @@ -122,7 +126,8 @@ impl Operation { | Self::CreatePersonalAccessToken | Self::DeletePersonalAccessToken | Self::SendMessages - | Self::StoreConsumerOffset => match crate::dispatch::lookup_by_operation(*self) { + | Self::StoreConsumerOffset + | Self::DeleteConsumerOffset => match crate::dispatch::lookup_by_operation(*self) { Some(meta) => Some(meta.code), None => None, }, @@ -170,6 +175,7 @@ mod tests { Operation::DeletePersonalAccessToken, Operation::SendMessages, Operation::StoreConsumerOffset, + Operation::DeleteConsumerOffset, ]; for op in ops { let code = op @@ -202,5 +208,6 @@ mod tests { assert!(Operation::SendMessages.is_partition()); assert!(!Operation::SendMessages.is_metadata()); assert!(Operation::DeleteSegments.is_partition()); + assert!(Operation::DeleteConsumerOffset.is_partition()); } } diff --git a/core/binary_protocol/src/dispatch.rs b/core/binary_protocol/src/dispatch.rs index fcd1dcb69a..1aa9eb8289 100644 --- a/core/binary_protocol/src/dispatch.rs +++ b/core/binary_protocol/src/dispatch.rs @@ -120,7 +120,11 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[ "consumer_offset.store", Operation::StoreConsumerOffset, ), - CommandMeta::non_replicated(DELETE_CONSUMER_OFFSET_CODE, "consumer_offset.delete"), + CommandMeta::replicated( + DELETE_CONSUMER_OFFSET_CODE, + "consumer_offset.delete", + Operation::DeleteConsumerOffset, + ), // Streams CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"), CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"), @@ -260,6 +264,7 @@ pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta> Operation::DeletePersonalAccessToken => 18, Operation::SendMessages => 21, Operation::StoreConsumerOffset => 24, + Operation::DeleteConsumerOffset => 25, Operation::Reserved => return None, }; Some(&COMMAND_TABLE[idx]) @@ -378,6 +383,7 @@ mod tests { Operation::DeletePersonalAccessToken, Operation::SendMessages, Operation::StoreConsumerOffset, + Operation::DeleteConsumerOffset, ]; for op in replicated_ops { let meta = lookup_by_operation(op) diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index 200ee87ad3..4e857f6232 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -107,8 +107,6 @@ mod impls; pub use impls::*; mod plane_mux; pub use plane_mux::*; -mod namespaced_pipeline; -pub use namespaced_pipeline::*; mod plane_helpers; pub use plane_helpers::*; mod observability; diff --git a/core/consensus/src/namespaced_pipeline.rs b/core/consensus/src/namespaced_pipeline.rs deleted file mode 100644 index 71e18fb712..0000000000 --- a/core/consensus/src/namespaced_pipeline.rs +++ /dev/null @@ -1,580 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::Pipeline; -use crate::impls::{PIPELINE_PREPARE_QUEUE_MAX, PipelineEntry}; -use iggy_binary_protocol::{Message, PrepareHeader}; -use std::collections::{HashMap, VecDeque}; - -/// Pipeline that partitions entries by namespace for independent commit draining. -/// -/// A single global op sequence and hash chain spans all namespaces, but entries -/// are stored in per-namespace `VecDeques`. Each namespace tracks its own commit -/// frontier so `drain_committable_all` drains quorum'd entries per-namespace -/// without waiting for the global commit to advance past unrelated namespaces. -/// -/// The global commit (on `VsrConsensus`) remains a conservative lower bound -/// for the VSR protocol (view change, follower commit piggybacking). It only -/// advances when all ops up to that point are drained. Per-namespace draining -/// can run ahead of the global commit. -/// -/// An alternative (simpler) approach would drain purely by per-entry quorum -/// flag without tracking per-namespace commit numbers, relying solely on -/// `global_commit_frontier` for the protocol commit. We track per-namespace -/// commits explicitly for observability and to make the independence model -/// visible in the data structure. -#[derive(Debug)] -pub struct NamespacedPipeline { - queues: HashMap>, - /// Per-namespace commit frontier: highest drained op per namespace. - pub(crate) ns_commits: HashMap, - pub(crate) total_count: usize, - last_push_checksum: u128, - last_push_op: u64, - /// Lower bound of ops pushed to this pipeline instance. - /// Used by `global_commit_frontier` to distinguish "never pushed" from "drained." - first_push_op: u64, -} - -impl Default for NamespacedPipeline { - fn default() -> Self { - Self::new() - } -} - -impl NamespacedPipeline { - #[must_use] - pub fn new() -> Self { - Self { - queues: HashMap::new(), - ns_commits: HashMap::new(), - total_count: 0, - last_push_checksum: 0, - last_push_op: 0, - first_push_op: 0, - } - } - - pub fn register_namespace(&mut self, ns: u64) { - self.queues.entry(ns).or_default(); - self.ns_commits.entry(ns).or_insert(0); - } - - /// Per-namespace commit frontier for the given namespace. - #[must_use] - pub fn ns_commit(&self, ns: u64) -> Option { - self.ns_commits.get(&ns).copied() - } - - /// Drain entries that have achieved quorum, independently per namespace. - /// - /// For each namespace queue, drains from the front while entries have - /// `ok_quorum_received == true`. Returns entries sorted by global op - /// for deterministic processing. - /// - /// # Panics - /// If the front entry exists but `pop_front` returns `None` (unreachable). - pub fn drain_committable_all(&mut self) -> Vec { - let mut drained = Vec::new(); - - let Self { - queues, - ns_commits, - total_count, - .. - } = self; - - for (ns, queue) in queues.iter_mut() { - while let Some(front) = queue.front() { - if !front.ok_quorum_received { - break; - } - let entry = queue.pop_front().expect("front exists"); - *total_count -= 1; - if let Some(ns_commit) = ns_commits.get_mut(ns) { - *ns_commit = entry.header.op; - } - drained.push(entry); - } - } - - drained.sort_by_key(|entry| entry.header.op); - drained - } - - /// Compute the global commit frontier after draining. - /// - /// Walks forward from `current_commit + 1`, treating any op not found - /// in the pipeline (already drained) as committed. Stops at the first - /// op still present in a queue or past `last_push_op`. - #[must_use] - pub fn global_commit_frontier(&self, current_commit: u64) -> u64 { - let mut commit = current_commit; - loop { - let next = commit + 1; - if next > self.last_push_op { - break; - } - // Ops below first_push_op were never in this pipeline instance - // and must not be mistaken for drained entries. - if next < self.first_push_op { - break; - } - // Still in a queue means not yet drained - if self.entry_by_op(next).is_some() { - break; - } - commit = next; - } - commit - } -} - -impl Pipeline for NamespacedPipeline { - type Entry = PipelineEntry; - - /// # Panics - /// - If the pipeline is full. - /// - If ops are not globally sequential. - /// - If the hash chain is broken. - /// - If the namespace is not registered. - fn push(&mut self, entry: Self::Entry) { - assert!( - self.total_count < PIPELINE_PREPARE_QUEUE_MAX, - "namespaced pipeline full" - ); - - let header = entry.header; - let ns = header.namespace; - - if self.total_count > 0 { - assert_eq!( - header.op, - self.last_push_op + 1, - "global ops must be sequential: expected {}, got {}", - self.last_push_op + 1, - header.op - ); - assert_eq!( - header.parent, self.last_push_checksum, - "parent must chain to previous global checksum" - ); - } else { - self.first_push_op = header.op; - } - - let queue = self - .queues - .get_mut(&ns) - .expect("push_message: namespace not registered"); - if let Some(tail) = queue.back() { - assert!( - header.op > tail.header.op, - "op must increase within namespace queue" - ); - } - - queue.push_back(entry); - self.total_count += 1; - self.last_push_checksum = header.checksum; - self.last_push_op = header.op; - } - - fn pop(&mut self) -> Option { - let min_ns = self - .queues - .iter() - .filter_map(|(ns, q)| q.front().map(|entry| (*ns, entry.header.op))) - .min_by_key(|(_, op)| *op) - .map(|(ns, _)| ns)?; - - let entry = self.queues.get_mut(&min_ns)?.pop_front()?; - self.total_count -= 1; - Some(entry) - } - - fn clear(&mut self) { - for queue in self.queues.values_mut() { - queue.clear(); - } - self.total_count = 0; - self.last_push_checksum = 0; - self.last_push_op = 0; - self.first_push_op = 0; - } - - /// Linear scan all queues. Ops are globally unique; max 8 entries total. - fn entry_by_op(&self, op: u64) -> Option<&Self::Entry> { - for queue in self.queues.values() { - for entry in queue { - if entry.header.op == op { - return Some(entry); - } - } - } - None - } - - fn entry_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry> { - for queue in self.queues.values_mut() { - for entry in queue.iter_mut() { - if entry.header.op == op { - return Some(entry); - } - } - } - None - } - - fn entry_by_op_and_checksum(&self, op: u64, checksum: u128) -> Option<&Self::Entry> { - let entry = self.entry_by_op(op)?; - if entry.header.checksum == checksum { - Some(entry) - } else { - None - } - } - - fn head(&self) -> Option<&Self::Entry> { - self.queues - .values() - .filter_map(|q| q.front()) - .min_by_key(|entry| entry.header.op) - } - - fn is_full(&self) -> bool { - self.total_count >= PIPELINE_PREPARE_QUEUE_MAX - } - - fn is_empty(&self) -> bool { - self.total_count == 0 - } - - fn len(&self) -> usize { - self.total_count - } - - fn verify(&self) { - assert!(self.total_count <= PIPELINE_PREPARE_QUEUE_MAX); - - let actual_count: usize = self.queues.values().map(VecDeque::len).sum(); - assert_eq!(actual_count, self.total_count, "total_count mismatch"); - - // Per-namespace: ops must be monotonically increasing - for queue in self.queues.values() { - let mut prev_op = None; - for entry in queue { - if let Some(prev) = prev_op { - assert!( - entry.header.op > prev, - "ops must increase within namespace queue" - ); - } - prev_op = Some(entry.header.op); - } - } - - // Global: collect all entries, sort by op, verify sequential ops and hash chain - let mut all_entries: Vec<&PipelineEntry> = - self.queues.values().flat_map(|q| q.iter()).collect(); - all_entries.sort_by_key(|e| e.header.op); - - for window in all_entries.windows(2) { - let prev = &window[0].header; - let curr = &window[1].header; - assert_eq!( - curr.op, - prev.op + 1, - "global ops must be sequential: {} -> {}", - prev.op, - curr.op - ); - assert_eq!( - curr.parent, prev.checksum, - "global hash chain broken at op {}: parent={} expected={}", - curr.op, curr.parent, prev.checksum - ); - } - } -} - -impl NamespacedPipeline { - #[allow(clippy::needless_pass_by_value)] - pub fn push_message(&mut self, message: Message) { - self.push(PipelineEntry::new(*message.header())); - } - - pub fn pop_message(&mut self) -> Option { - self.pop() - } - - #[must_use] - pub fn message_by_op(&self, op: u64) -> Option<&PipelineEntry> { - self.entry_by_op(op) - } - - pub fn message_by_op_mut(&mut self, op: u64) -> Option<&mut PipelineEntry> { - self.entry_by_op_mut(op) - } - - #[must_use] - pub fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> Option<&PipelineEntry> { - self.entry_by_op_and_checksum(op, checksum) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use iggy_binary_protocol::{Command2, Message, PrepareHeader}; - - fn make_prepare( - op: u64, - parent: u128, - checksum: u128, - namespace: u64, - ) -> Message { - Message::::new(std::mem::size_of::()).transmute_header( - |_, new| { - *new = PrepareHeader { - command: Command2::Prepare, - op, - parent, - checksum, - namespace, - ..Default::default() - }; - }, - ) - } - - fn mark_quorum(pipeline: &mut NamespacedPipeline, op: u64) { - pipeline - .message_by_op_mut(op) - .expect("mark_quorum: op not in pipeline") - .ok_quorum_received = true; - } - - #[test] - fn multi_namespace_push_pop() { - let mut pipeline = NamespacedPipeline::new(); - pipeline.register_namespace(100); - pipeline.register_namespace(200); - - pipeline.push_message(make_prepare(1, 0, 10, 100)); - pipeline.push_message(make_prepare(2, 10, 20, 200)); - pipeline.push_message(make_prepare(3, 20, 30, 100)); - pipeline.push_message(make_prepare(4, 30, 40, 200)); - - assert_eq!(pipeline.total_count, 4); - assert!(!pipeline.is_empty()); - - // head is the entry with the smallest op - assert_eq!(pipeline.head().unwrap().header.op, 1); - - // pop returns entries in global op order - assert_eq!(pipeline.pop_message().unwrap().header.op, 1); - assert_eq!(pipeline.pop_message().unwrap().header.op, 2); - assert_eq!(pipeline.pop_message().unwrap().header.op, 3); - assert_eq!(pipeline.pop_message().unwrap().header.op, 4); - assert!(pipeline.is_empty()); - } - - #[test] - fn drain_committable_all() { - let mut pipeline = NamespacedPipeline::new(); - pipeline.register_namespace(100); - pipeline.register_namespace(200); - - // Interleaved ops across two namespaces: [ns_a:1, ns_b:2, ns_a:3, ns_b:4] - pipeline.push_message(make_prepare(1, 0, 10, 100)); - pipeline.push_message(make_prepare(2, 10, 20, 200)); - pipeline.push_message(make_prepare(3, 20, 30, 100)); - pipeline.push_message(make_prepare(4, 30, 40, 200)); - - // Mark ops 1,2,3 as quorum'd (not 4) - mark_quorum(&mut pipeline, 1); - mark_quorum(&mut pipeline, 2); - mark_quorum(&mut pipeline, 3); - - // ns_100 drains [1,3], ns_200 drains [2] (stops at non-quorum'd 4) - let drained = pipeline.drain_committable_all(); - let drained_ops: Vec<_> = drained.iter().map(|e| e.header.op).collect(); - assert_eq!(drained_ops, vec![1, 2, 3]); - - assert_eq!(pipeline.total_count, 1); - assert_eq!(pipeline.head().unwrap().header.op, 4); - - // Per-namespace commits track highest drained op - assert_eq!(pipeline.ns_commit(100), Some(3)); - assert_eq!(pipeline.ns_commit(200), Some(2)); - - // Global commit advances past contiguously drained ops - assert_eq!(pipeline.global_commit_frontier(0), 3); - } - - #[test] - fn drain_committable_all_full() { - let mut pipeline = NamespacedPipeline::new(); - pipeline.register_namespace(100); - pipeline.register_namespace(200); - - pipeline.push_message(make_prepare(1, 0, 10, 100)); - pipeline.push_message(make_prepare(2, 10, 20, 200)); - pipeline.push_message(make_prepare(3, 20, 30, 100)); - pipeline.push_message(make_prepare(4, 30, 40, 200)); - - mark_quorum(&mut pipeline, 1); - mark_quorum(&mut pipeline, 2); - mark_quorum(&mut pipeline, 3); - mark_quorum(&mut pipeline, 4); - - let drained = pipeline.drain_committable_all(); - assert_eq!(drained.len(), 4); - assert!(pipeline.is_empty()); - assert_eq!(pipeline.global_commit_frontier(0), 4); - } - - #[test] - fn independent_namespace_progress() { - let mut pipeline = NamespacedPipeline::new(); - pipeline.register_namespace(100); - pipeline.register_namespace(200); - - // [ns_a:1, ns_b:2, ns_a:3, ns_b:4] - pipeline.push_message(make_prepare(1, 0, 10, 100)); - pipeline.push_message(make_prepare(2, 10, 20, 200)); - pipeline.push_message(make_prepare(3, 20, 30, 100)); - pipeline.push_message(make_prepare(4, 30, 40, 200)); - - // Only ns_a (ops 1,3) gets quorum, ns_b (ops 2,4) does not - mark_quorum(&mut pipeline, 1); - mark_quorum(&mut pipeline, 3); - - let drained = pipeline.drain_committable_all(); - let drained_ops: Vec<_> = drained.iter().map(|e| e.header.op).collect(); - assert_eq!(drained_ops, vec![1, 3]); - - // ns_a progressed independently, ns_b untouched - assert_eq!(pipeline.ns_commit(100), Some(3)); - assert_eq!(pipeline.ns_commit(200), Some(0)); - assert_eq!(pipeline.total_count, 2); - - // Global commit only advances to 1 (can't skip ns_b's op 2) - assert_eq!(pipeline.global_commit_frontier(0), 1); - - // Now ns_b gets quorum - mark_quorum(&mut pipeline, 2); - mark_quorum(&mut pipeline, 4); - - let drained = pipeline.drain_committable_all(); - let drained_ops: Vec<_> = drained.iter().map(|e| e.header.op).collect(); - assert_eq!(drained_ops, vec![2, 4]); - - // Global commit jumps to 4 (ops 1,3 already drained, 2,4 just drained) - assert_eq!(pipeline.global_commit_frontier(1), 4); - assert_eq!(pipeline.ns_commit(200), Some(4)); - } - - #[test] - fn message_by_op_cross_namespace() { - let mut pipeline = NamespacedPipeline::new(); - pipeline.register_namespace(100); - pipeline.register_namespace(200); - pipeline.register_namespace(300); - - pipeline.push_message(make_prepare(1, 0, 10, 100)); - pipeline.push_message(make_prepare(2, 10, 20, 200)); - pipeline.push_message(make_prepare(3, 20, 30, 300)); - - assert_eq!(pipeline.message_by_op(1).unwrap().header.namespace, 100); - assert_eq!(pipeline.message_by_op(2).unwrap().header.namespace, 200); - assert_eq!(pipeline.message_by_op(3).unwrap().header.namespace, 300); - assert!(pipeline.message_by_op(4).is_none()); - } - - #[test] - fn message_by_op_and_checksum() { - let mut pipeline = NamespacedPipeline::new(); - pipeline.register_namespace(100); - pipeline.push_message(make_prepare(1, 0, 10, 100)); - - assert!(pipeline.message_by_op_and_checksum(1, 10).is_some()); - assert!(pipeline.message_by_op_and_checksum(1, 99).is_none()); - assert!(pipeline.message_by_op_and_checksum(2, 10).is_none()); - } - - #[test] - fn verify_passes() { - let mut pipeline = NamespacedPipeline::new(); - pipeline.register_namespace(100); - pipeline.register_namespace(200); - pipeline.push_message(make_prepare(1, 0, 10, 100)); - pipeline.push_message(make_prepare(2, 10, 20, 200)); - pipeline.push_message(make_prepare(3, 20, 30, 100)); - pipeline.verify(); - } - - #[test] - #[allow(clippy::cast_possible_truncation)] - fn is_full() { - let mut pipeline = NamespacedPipeline::new(); - pipeline.register_namespace(0); - pipeline.register_namespace(1); - for i in 0..PIPELINE_PREPARE_QUEUE_MAX as u128 { - let parent = if i == 0 { 0 } else { i * 10 }; - let checksum = (i + 1) * 10; - pipeline.push_message(make_prepare(i as u64 + 1, parent, checksum, i as u64 % 2)); - } - assert!(pipeline.is_full()); - } - - #[test] - #[should_panic(expected = "namespaced pipeline full")] - #[allow(clippy::cast_possible_truncation)] - fn push_when_full_panics() { - let mut pipeline = NamespacedPipeline::new(); - pipeline.register_namespace(0); - for i in 0..PIPELINE_PREPARE_QUEUE_MAX as u128 { - let parent = if i == 0 { 0 } else { i * 10 }; - let checksum = (i + 1) * 10; - pipeline.push_message(make_prepare(i as u64 + 1, parent, checksum, 0)); - } - pipeline.push_message(make_prepare(100, 80, 1000, 0)); - } - - #[test] - fn clear_preserves_ns_commits() { - let mut pipeline = NamespacedPipeline::new(); - pipeline.register_namespace(100); - pipeline.register_namespace(200); - pipeline.push_message(make_prepare(1, 0, 10, 100)); - pipeline.push_message(make_prepare(2, 10, 20, 200)); - - // Mark op 1 as committed in ns 100 before clearing - pipeline.ns_commits.insert(100, 1); - - pipeline.clear(); - assert!(pipeline.is_empty()); - assert_eq!(pipeline.total_count, 0); - - // ns_commits must survive clear -- they represent durable knowledge - // about already-drained ops, not pipeline state - assert_eq!(pipeline.ns_commits.get(&100), Some(&1)); - assert_eq!(pipeline.ns_commits.get(&200), Some(&0)); - } -} diff --git a/core/consensus/src/observability.rs b/core/consensus/src/observability.rs index 865935faf8..0ad1a02a7b 100644 --- a/core/consensus/src/observability.rs +++ b/core/consensus/src/observability.rs @@ -19,6 +19,7 @@ use crate::{Pipeline, PipelineEntry, Status, VsrAction, VsrConsensus}; use iggy_binary_protocol::Operation; use iggy_common::sharding::IggyNamespace; use message_bus::MessageBus; +use std::borrow::Cow; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PlaneKind { @@ -337,6 +338,235 @@ pub trait StructuredSimEvent { fn emit(&self, sim_event: SimEventKind); } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PartitionDiagEvent<'a> { + pub replica: ReplicaLogContext, + pub message: &'static str, + pub operation: Option, + pub op: Option, + pub prepare_checksum: Option, + pub reason: Option<&'a str>, + pub error: Option>, +} + +impl<'a> PartitionDiagEvent<'a> { + #[must_use] + pub const fn new(replica: ReplicaLogContext, message: &'static str) -> Self { + Self { + replica, + message, + operation: None, + op: None, + prepare_checksum: None, + reason: None, + error: None, + } + } + + #[must_use] + pub const fn with_operation(mut self, operation: Operation) -> Self { + self.operation = Some(operation); + self + } + + #[must_use] + pub const fn with_op(mut self, op: u64) -> Self { + self.op = Some(op); + self + } + + #[must_use] + pub const fn with_prepare_checksum(mut self, prepare_checksum: u128) -> Self { + self.prepare_checksum = Some(prepare_checksum); + self + } + + #[must_use] + pub const fn with_reason(mut self, reason: &'a str) -> Self { + self.reason = Some(reason); + self + } + + #[must_use] + pub fn with_error(mut self, error: E) -> Self + where + E: Into>, + { + self.error = Some(error.into()); + self + } +} + +pub fn emit_partition_diag(level: tracing::Level, event: &PartitionDiagEvent<'_>) { + match level { + tracing::Level::ERROR => emit_partition_diag_error(event), + tracing::Level::WARN => emit_partition_diag_warn(event), + tracing::Level::INFO => emit_partition_diag_info(event), + tracing::Level::DEBUG => emit_partition_diag_debug(event), + tracing::Level::TRACE => emit_partition_diag_trace(event), + } +} + +fn emit_partition_diag_error(event: &PartitionDiagEvent<'_>) { + let ctx = event.replica; + let operation = event.operation.map_or("", operation_as_str); + let op = event.op.unwrap_or_default(); + let prepare_checksum = event.prepare_checksum.unwrap_or_default(); + let reason = event.reason.unwrap_or(""); + let error = event.error.as_deref().unwrap_or(""); + + tracing::event!( + target: "iggy.partitions.diag", + tracing::Level::ERROR, + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + operation, + op, + prepare_checksum, + reason, + error, + message = event.message, + ); +} + +fn emit_partition_diag_warn(event: &PartitionDiagEvent<'_>) { + let ctx = event.replica; + let operation = event.operation.map_or("", operation_as_str); + let op = event.op.unwrap_or_default(); + let prepare_checksum = event.prepare_checksum.unwrap_or_default(); + let reason = event.reason.unwrap_or(""); + let error = event.error.as_deref().unwrap_or(""); + + tracing::event!( + target: "iggy.partitions.diag", + tracing::Level::WARN, + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + operation, + op, + prepare_checksum, + reason, + error, + message = event.message, + ); +} + +fn emit_partition_diag_info(event: &PartitionDiagEvent<'_>) { + let ctx = event.replica; + let operation = event.operation.map_or("", operation_as_str); + let op = event.op.unwrap_or_default(); + let prepare_checksum = event.prepare_checksum.unwrap_or_default(); + let reason = event.reason.unwrap_or(""); + let error = event.error.as_deref().unwrap_or(""); + + tracing::event!( + target: "iggy.partitions.diag", + tracing::Level::INFO, + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + operation, + op, + prepare_checksum, + reason, + error, + message = event.message, + ); +} + +fn emit_partition_diag_debug(event: &PartitionDiagEvent<'_>) { + let ctx = event.replica; + let operation = event.operation.map_or("", operation_as_str); + let op = event.op.unwrap_or_default(); + let prepare_checksum = event.prepare_checksum.unwrap_or_default(); + let reason = event.reason.unwrap_or(""); + let error = event.error.as_deref().unwrap_or(""); + + tracing::event!( + target: "iggy.partitions.diag", + tracing::Level::DEBUG, + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + operation, + op, + prepare_checksum, + reason, + error, + message = event.message, + ); +} + +fn emit_partition_diag_trace(event: &PartitionDiagEvent<'_>) { + let ctx = event.replica; + let operation = event.operation.map_or("", operation_as_str); + let op = event.op.unwrap_or_default(); + let prepare_checksum = event.prepare_checksum.unwrap_or_default(); + let reason = event.reason.unwrap_or(""); + let error = event.error.as_deref().unwrap_or(""); + + tracing::event!( + target: "iggy.partitions.diag", + tracing::Level::TRACE, + plane = ctx.plane.as_str(), + cluster_id = ctx.cluster_id, + replica_id = ctx.replica_id, + namespace_raw = ctx.namespace.raw, + stream_id = namespace_component(ctx.namespace.stream_id), + topic_id = namespace_component(ctx.namespace.topic_id), + partition_id = namespace_component(ctx.namespace.partition_id), + view = ctx.view, + log_view = ctx.log_view, + commit = ctx.commit, + status = status_as_str(ctx.status), + role = ctx.role.as_str(), + operation, + op, + prepare_checksum, + reason, + error, + message = event.message, + ); +} + #[must_use] pub const fn status_as_str(status: Status) -> &'static str { match status { @@ -372,6 +602,7 @@ pub const fn operation_as_str(operation: Operation) -> &'static str { Operation::DeletePersonalAccessToken => "delete_personal_access_token", Operation::SendMessages => "send_messages", Operation::StoreConsumerOffset => "store_consumer_offset", + Operation::DeleteConsumerOffset => "delete_consumer_offset", } } diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index c49911b3dc..2a20f947c1 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -15,31 +15,55 @@ // specific language governing permissions and limitations // under the License. +use crate::iggy_index_writer::IggyIndexWriter; use crate::journal::{ MessageLookup, PartitionJournal, PartitionJournalMemStorage, QueryableJournal, }; +use crate::log::JournalInfo; use crate::log::SegmentedLog; +use crate::messages_writer::MessagesWriter; +use crate::offset_storage::{delete_persisted_offset, persist_offset}; +use crate::segment::Segment; use crate::{ - AppendResult, Partition, PartitionOffsets, PollFragments, PollQueryResult, PollingArgs, - PollingConsumer, + AppendResult, Partition, PartitionOffsets, PartitionsConfig, PollFragments, PollQueryResult, + PollingArgs, PollingConsumer, }; +use consensus::{ + CommitLogEvent, Consensus, PartitionDiagEvent, Pipeline, PipelineEntry, PlaneKind, Project, + ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, ack_preflight, + ack_quorum_reached, build_reply_message, drain_committable_prefix, + emit_namespace_progress_event, emit_partition_diag, emit_sim_event, + fence_old_prepare_by_commit, replicate_preflight, replicate_to_next_in_chain, + send_prepare_ok as send_prepare_ok_common, +}; +use iggy_binary_protocol::{GenericHeader, PrepareOkHeader, RequestHeader}; use iggy_binary_protocol::{Message, Operation, PrepareHeader}; use iggy_common::{ ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, ConsumerOffsets, - IggyByteSize, IggyError, IggyTimestamp, PartitionStats, PollingKind, + IggyByteSize, IggyError, IggyTimestamp, PartitionStats, PollingKind, SegmentStorage, send_messages2::stamp_prepare_for_persistence, + send_messages2::{convert_request_message, decode_prepare_slice}, + sharding::IggyNamespace, }; +use iobuf::Frozen; use journal::Journal as _; +use message_bus::{IggyMessageBus, MessageBus}; +use std::collections::HashMap; +use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::sync::Mutex as TokioMutex; -use tracing::warn; +use tracing::{debug, warn}; // This struct aliases in terms of the code contained the `LocalPartition from `core/server/src/streaming/partitions/local_partition.rs`. // // TODO: Fix op deduplication once we move to a consensus-per-partition design. #[derive(Debug)] -pub struct IggyPartition { +pub struct IggyPartition +where + B: MessageBus, +{ + consensus: VsrConsensus, pub log: SegmentedLog, PartitionJournalMemStorage>, /// Highest durably persisted offset. pub offset: Arc, @@ -52,11 +76,69 @@ pub struct IggyPartition { pub revision_id: u64, pub should_increment_offset: bool, pub write_lock: Arc>, + consumer_offsets_path: Option, + consumer_group_offsets_path: Option, + consumer_offset_enforce_fsync: bool, + pending_consumer_offset_commits: HashMap, + observed_view: u32, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +struct PendingConsumerOffsetCommit { + kind: ConsumerKind, + consumer_id: u32, + mutation: PendingConsumerOffsetMutation, } -impl IggyPartition { - pub fn new(stats: Arc) -> Self { +#[derive(Debug, Clone, Copy, PartialEq)] +enum PendingConsumerOffsetMutation { + Upsert(u64), + Delete, +} + +impl PendingConsumerOffsetCommit { + const fn upsert(kind: ConsumerKind, consumer_id: u32, offset: u64) -> Self { Self { + kind, + consumer_id, + mutation: PendingConsumerOffsetMutation::Upsert(offset), + } + } + + const fn delete(kind: ConsumerKind, consumer_id: u32) -> Self { + Self { + kind, + consumer_id, + mutation: PendingConsumerOffsetMutation::Delete, + } + } + + fn try_from_polling_consumer( + consumer: PollingConsumer, + offset: u64, + ) -> Result { + let (kind, consumer_id) = match consumer { + PollingConsumer::Consumer(id, _) => ( + ConsumerKind::Consumer, + u32::try_from(id).map_err(|_| IggyError::InvalidCommand)?, + ), + PollingConsumer::ConsumerGroup(group_id, _) => ( + ConsumerKind::ConsumerGroup, + u32::try_from(group_id).map_err(|_| IggyError::InvalidCommand)?, + ), + }; + Ok(Self::upsert(kind, consumer_id, offset)) + } +} + +impl IggyPartition +where + B: MessageBus, +{ + pub fn new(stats: Arc, consensus: VsrConsensus) -> Self { + let observed_view = consensus.view(); + Self { + consensus, log: SegmentedLog::default(), offset: Arc::new(AtomicU64::new(0)), dirty_offset: AtomicU64::new(0), @@ -67,11 +149,255 @@ impl IggyPartition { revision_id: 0, should_increment_offset: false, write_lock: Arc::new(TokioMutex::new(())), + consumer_offsets_path: None, + consumer_group_offsets_path: None, + consumer_offset_enforce_fsync: false, + pending_consumer_offset_commits: HashMap::new(), + observed_view, } } + + #[must_use] + pub const fn consensus(&self) -> &VsrConsensus { + &self.consensus + } + + #[must_use] + pub fn with_in_memory_storage( + stats: Arc, + consensus: VsrConsensus, + segment_size: IggyByteSize, + consumer_offset_enforce_fsync: bool, + ) -> Self { + let mut partition = Self::new(stats, consensus); + partition.consumer_offset_enforce_fsync = consumer_offset_enforce_fsync; + let start_offset = 0; + let segment = Segment::new(start_offset, segment_size); + let storage = SegmentStorage::default(); + partition + .log + .add_persisted_segment(segment, storage, None, None); + partition.offset.store(start_offset, Ordering::Release); + partition + .dirty_offset + .store(start_offset, Ordering::Relaxed); + partition.should_increment_offset = false; + partition.stats.increment_segments_count(1); + partition + } + + pub fn configure_consumer_offset_storage( + &mut self, + consumer_offsets_path: String, + consumer_group_offsets_path: String, + consumer_offsets: ConsumerOffsets, + consumer_group_offsets: ConsumerGroupOffsets, + consumer_offset_enforce_fsync: bool, + ) { + self.consumer_offsets = Arc::new(consumer_offsets); + self.consumer_group_offsets = Arc::new(consumer_group_offsets); + self.consumer_offsets_path = Some(consumer_offsets_path); + self.consumer_group_offsets_path = Some(consumer_group_offsets_path); + self.consumer_offset_enforce_fsync = consumer_offset_enforce_fsync; + } + + pub(crate) async fn persist_and_stage_consumer_offset_upsert( + &mut self, + op: u64, + kind: ConsumerKind, + consumer_id: u32, + offset: u64, + ) -> Result<(), IggyError> { + let pending = PendingConsumerOffsetCommit::upsert(kind, consumer_id, offset); + self.persist_consumer_offset_commit(pending).await?; + self.pending_consumer_offset_commits.insert(op, pending); + Ok(()) + } + + pub(crate) async fn persist_and_stage_consumer_offset_delete( + &mut self, + op: u64, + kind: ConsumerKind, + consumer_id: u32, + ) -> Result<(), IggyError> { + self.ensure_consumer_offset_exists(kind, consumer_id)?; + let pending = PendingConsumerOffsetCommit::delete(kind, consumer_id); + self.persist_consumer_offset_commit(pending).await?; + self.pending_consumer_offset_commits.insert(op, pending); + Ok(()) + } + + pub(crate) fn apply_staged_consumer_offset_commit(&mut self, op: u64) -> Result<(), IggyError> { + let pending = self + .pending_consumer_offset_commits + .remove(&op) + .ok_or(IggyError::InvalidCommand)?; + self.apply_consumer_offset_commit(pending)?; + Ok(()) + } + + async fn persist_consumer_offset_commit( + &self, + pending: PendingConsumerOffsetCommit, + ) -> Result<(), IggyError> { + let Some(path) = self.persisted_offset_path(pending.kind, pending.consumer_id) else { + return Ok(()); + }; + match pending.mutation { + PendingConsumerOffsetMutation::Upsert(offset) => { + persist_offset(&path, offset, self.consumer_offset_enforce_fsync).await + } + PendingConsumerOffsetMutation::Delete => delete_persisted_offset(&path).await, + } + } + + fn apply_consumer_offset_commit( + &self, + pending: PendingConsumerOffsetCommit, + ) -> Result<(), IggyError> { + match pending.mutation { + PendingConsumerOffsetMutation::Upsert(offset) + if pending.kind == ConsumerKind::Consumer => + { + let id = pending.consumer_id; + let guard = self.consumer_offsets.pin(); + let key = usize::try_from(id).expect("u32 consumer id must fit usize"); + if let Some(existing) = guard.get(&key) { + existing.offset.store(offset, Ordering::Relaxed); + } else { + let created = self.consumer_offsets_path.as_deref().map_or_else( + || ConsumerOffset::new(ConsumerKind::Consumer, id, 0, String::new()), + |path| ConsumerOffset::default_for_consumer(id, path), + ); + created.offset.store(offset, Ordering::Relaxed); + guard.insert(key, created); + } + Ok(()) + } + PendingConsumerOffsetMutation::Upsert(offset) + if pending.kind == ConsumerKind::ConsumerGroup => + { + let group_id = pending.consumer_id; + let guard = self.consumer_group_offsets.pin(); + let key = ConsumerGroupId( + usize::try_from(group_id).expect("u32 group id must fit usize"), + ); + if let Some(existing) = guard.get(&key) { + existing.offset.store(offset, Ordering::Relaxed); + } else { + let created = self.consumer_group_offsets_path.as_deref().map_or_else( + || { + ConsumerOffset::new( + ConsumerKind::ConsumerGroup, + group_id, + 0, + String::new(), + ) + }, + |path| ConsumerOffset::default_for_consumer_group(key, path), + ); + created.offset.store(offset, Ordering::Relaxed); + guard.insert(key, created); + } + Ok(()) + } + PendingConsumerOffsetMutation::Delete if pending.kind == ConsumerKind::Consumer => { + let id = pending.consumer_id; + let guard = self.consumer_offsets.pin(); + let key = usize::try_from(id).expect("u32 consumer id must fit usize"); + let _ = guard + .remove(&key) + .ok_or(IggyError::ConsumerOffsetNotFound(key))?; + Ok(()) + } + PendingConsumerOffsetMutation::Delete + if pending.kind == ConsumerKind::ConsumerGroup => + { + let group_id = pending.consumer_id; + let guard = self.consumer_group_offsets.pin(); + let key = ConsumerGroupId( + usize::try_from(group_id).expect("u32 group id must fit usize"), + ); + let _ = guard + .remove(&key) + .ok_or(IggyError::ConsumerOffsetNotFound(key.0))?; + Ok(()) + } + _ => Ok(()), + } + } + + async fn store_consumer_offset_and_persist( + &self, + consumer: PollingConsumer, + offset: u64, + ) -> Result<(), IggyError> { + let pending = PendingConsumerOffsetCommit::try_from_polling_consumer(consumer, offset)?; + self.persist_consumer_offset_commit(pending).await?; + self.apply_consumer_offset_commit(pending)?; + Ok(()) + } + + fn persisted_offset_path(&self, kind: ConsumerKind, consumer_id: u32) -> Option { + match kind { + ConsumerKind::Consumer => self + .consumer_offsets_path + .as_ref() + .map(|path| format!("{path}/{consumer_id}")), + ConsumerKind::ConsumerGroup => self + .consumer_group_offsets_path + .as_ref() + .map(|path| format!("{path}/{consumer_id}")), + } + } + + fn ensure_consumer_offset_exists( + &self, + kind: ConsumerKind, + consumer_id: u32, + ) -> Result<(), IggyError> { + let found = match kind { + ConsumerKind::Consumer => { + let key = usize::try_from(consumer_id).expect("u32 consumer id must fit usize"); + self.consumer_offsets.pin().contains_key(&key) + } + ConsumerKind::ConsumerGroup => { + let key = ConsumerGroupId( + usize::try_from(consumer_id).expect("u32 group id must fit usize"), + ); + self.consumer_group_offsets.pin().contains_key(&key) + } + }; + + if found { + Ok(()) + } else { + Err(IggyError::ConsumerOffsetNotFound( + usize::try_from(consumer_id).expect("u32 consumer id must fit usize"), + )) + } + } + + #[must_use] + fn diag_ctx(&self) -> ReplicaLogContext { + ReplicaLogContext::from_consensus(self.consensus(), PlaneKind::Partitions) + } + + fn clear_pending_consumer_offset_commits_if_view_changed(&mut self) { + let current_view = self.consensus.view(); + if current_view == self.observed_view { + return; + } + + self.pending_consumer_offset_commits.clear(); + self.observed_view = current_view; + } } -impl Partition for IggyPartition { +impl Partition for IggyPartition +where + B: MessageBus, +{ async fn append_messages( &mut self, message: Message, @@ -191,7 +517,10 @@ impl Partition for IggyPartition { if args.auto_commit && !fragments.is_empty() { let last_offset = last_matching_offset.expect("non-empty poll result must have a last offset"); - if let Err(err) = self.store_consumer_offset(consumer, last_offset) { + if let Err(err) = self + .store_consumer_offset_and_persist(consumer, last_offset) + .await + { // warning for now. warn!( target: "iggy.partitions.diag", @@ -212,41 +541,8 @@ impl Partition for IggyPartition { consumer: PollingConsumer, offset: u64, ) -> Result<(), IggyError> { - match consumer { - PollingConsumer::Consumer(id, _) => { - let guard = self.consumer_offsets.pin(); - if let Some(existing) = guard.get(&id) { - existing.offset.store(offset, Ordering::Relaxed); - } else { - guard.insert( - id, - ConsumerOffset::new( - ConsumerKind::Consumer, - id as u32, - offset, - String::new(), - ), - ); - } - } - PollingConsumer::ConsumerGroup(group_id, _) => { - let guard = self.consumer_group_offsets.pin(); - let key = ConsumerGroupId(group_id); - if let Some(existing) = guard.get(&key) { - existing.offset.store(offset, Ordering::Relaxed); - } else { - guard.insert( - key, - ConsumerOffset::new( - ConsumerKind::ConsumerGroup, - group_id as u32, - offset, - String::new(), - ), - ); - } - } - } + let pending = PendingConsumerOffsetCommit::try_from_polling_consumer(consumer, offset)?; + self.apply_consumer_offset_commit(pending)?; Ok(()) } @@ -272,3 +568,911 @@ impl Partition for IggyPartition { ) } } + +impl IggyPartition +where + B: MessageBus, Client = u128>, +{ + #[must_use] + fn namespace(&self) -> IggyNamespace { + IggyNamespace::from_raw(self.consensus.namespace()) + } + + /// Handles a client request for this partition and turns it into a prepare. + /// + /// # Panics + /// Panics if called when this partition's consensus instance is not the + /// primary, is not in normal status, or is currently syncing. + #[allow(clippy::future_not_send)] + pub async fn on_request(&mut self, message: Message) { + self.clear_pending_consumer_offset_commits_if_view_changed(); + let namespace = IggyNamespace::from_raw(message.header().namespace); + let prepare = { + let consensus = self.consensus(); + emit_sim_event( + SimEventKind::ClientRequestReceived, + &RequestLogEvent { + replica: ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + client_id: message.header().client, + request_id: message.header().request, + operation: message.header().operation, + }, + ); + + let message = if message.header().operation == Operation::SendMessages { + match convert_request_message(namespace, message) { + Ok(message) => message, + Err(error) => { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new( + ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + "failed to convert send_messages request", + ) + .with_operation(Operation::SendMessages) + .with_error(error.to_string()), + ); + return; + } + } + } else { + message + }; + + if message.header().operation == Operation::DeleteConsumerOffset { + match Self::parse_consumer_offset_request(message.header().operation, &message) + .and_then(|(kind, consumer_id, _)| { + self.ensure_consumer_offset_exists(kind, consumer_id) + }) { + Ok(()) => {} + Err(error) => { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new( + ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + "rejecting delete_consumer_offset for missing offset", + ) + .with_operation(Operation::DeleteConsumerOffset) + .with_error(error.to_string()), + ); + return; + } + } + } + + assert!(!consensus.is_follower(), "on_request: primary only"); + assert!(consensus.is_normal(), "on_request: status must be normal"); + assert!(!consensus.is_syncing(), "on_request: must not be syncing"); + + let prepare = message.project(consensus); + consensus.verify_pipeline(); + consensus.pipeline_message(PlaneKind::Partitions, &prepare); + prepare + }; + self.on_replicate(prepare).await; + } + + #[allow(clippy::future_not_send)] + pub async fn on_replicate(&mut self, message: Message) { + self.clear_pending_consumer_offset_commits_if_view_changed(); + let header = *message.header(); + let previous_commit = self.consensus.commit(); + let current_op = { + let consensus = self.consensus(); + let current_op = match replicate_preflight(consensus, &header) { + Ok(current_op) => current_op, + Err(reason) => { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new( + ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + "ignoring prepare during replicate preflight", + ) + .with_operation(header.operation) + .with_op(header.op) + .with_reason(reason.as_str()), + ); + return; + } + }; + + if fence_old_prepare_by_commit(consensus, &header) { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new( + ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + "received old prepare, skipping replication", + ) + .with_operation(header.operation) + .with_op(header.op), + ); + return; + } + + current_op + }; + + debug_assert_eq!(header.op, current_op + 1); + { + let consensus = self.consensus(); + consensus.sequencer().set_sequence(header.op); + consensus.set_last_prepare_checksum(header.checksum); + } + + let message = { + let consensus = self.consensus(); + replicate_to_next_in_chain(consensus, message).await + }; + let replicated_result = self.apply_replicated_operation(message).await; + + let commit = self.consensus.commit(); + if commit > previous_commit + && let Err(error) = self.apply_committed_consumer_offset_commits_up_to(commit) + { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new( + self.diag_ctx(), + "failed to apply committed consumer offset updates after commit advanced", + ) + .with_operation(header.operation) + .with_op(header.op) + .with_error(error.to_string()), + ); + return; + } + + if let Err(error) = replicated_result { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new( + self.diag_ctx(), + "failed to apply replicated partition operation", + ) + .with_operation(header.operation) + .with_op(header.op) + .with_error(error.to_string()), + ); + return; + } + + { + let consensus = self.consensus(); + emit_namespace_progress_event( + SimEventKind::NamespaceProgressUpdated, + &ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + header.op, + consensus.pipeline().borrow().len(), + ); + } + + self.send_prepare_ok(&header).await; + } + + #[allow(clippy::future_not_send)] + pub async fn on_ack(&mut self, message: Message, config: &PartitionsConfig) { + self.clear_pending_consumer_offset_commits_if_view_changed(); + let header = *message.header(); + { + let consensus = self.consensus(); + if let Err(reason) = ack_preflight(consensus) { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new( + ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + "ignoring ack during preflight", + ) + .with_op(header.op) + .with_reason(reason.as_str()), + ); + return; + } + + let pipeline = consensus.pipeline().borrow(); + if pipeline + .entry_by_op_and_checksum(header.op, header.prepare_checksum) + .is_none() + { + emit_partition_diag( + tracing::Level::DEBUG, + &PartitionDiagEvent::new( + ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + "ack target prepare not in pipeline", + ) + .with_op(header.op) + .with_prepare_checksum(header.prepare_checksum), + ); + return; + } + } + + if !ack_quorum_reached(self.consensus(), PlaneKind::Partitions, &header) { + return; + } + + let drained = drain_committable_prefix(self.consensus()); + if drained.is_empty() { + return; + } + + self.handle_committed_entries(drained, config).await; + { + let consensus = self.consensus(); + emit_namespace_progress_event( + SimEventKind::NamespaceProgressUpdated, + &ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + consensus.commit(), + consensus.pipeline().borrow().len(), + ); + } + } + + async fn apply_replicated_operation( + &mut self, + message: Message, + ) -> Result<(), IggyError> { + let header = *message.header(); + let replica_id = self.consensus.replica(); + let namespace_raw = self.consensus.namespace(); + + match header.operation { + Operation::SendMessages => { + self.append_send_messages_to_journal(message).await?; + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + replica = replica_id, + op = header.op, + namespace_raw, + operation = ?header.operation, + "replicated send_messages appended to partition journal" + ); + Ok(()) + } + Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset => { + let (kind, consumer_id, offset) = + Self::parse_staged_consumer_offset_commit(header.operation, &message)?; + let write_lock = self.write_lock.clone(); + let _guard = write_lock.lock().await; + match header.operation { + Operation::StoreConsumerOffset => { + self.persist_and_stage_consumer_offset_upsert( + header.op, + kind, + consumer_id, + offset.expect("store_consumer_offset must include offset"), + ) + .await?; + } + Operation::DeleteConsumerOffset => { + self.persist_and_stage_consumer_offset_delete(header.op, kind, consumer_id) + .await?; + } + _ => unreachable!(), + } + + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + replica = replica_id, + op = header.op, + namespace_raw, + operation = ?header.operation, + consumer_kind = ?kind, + consumer_id, + offset = ?offset, + "replicated consumer offset persisted and staged" + ); + Ok(()) + } + _ => { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica = replica_id, + namespace_raw, + op = header.op, + operation = ?header.operation, + "unexpected replicated partition operation" + ); + Ok(()) + } + } + } + + async fn append_send_messages_to_journal( + &mut self, + message: Message, + ) -> Result<(), IggyError> { + let write_lock = self.write_lock.clone(); + let _guard = write_lock.lock().await; + self.append_messages(message).await.map(|_| ()) + } + + #[allow(clippy::too_many_lines)] + async fn commit_messages(&mut self, config: &PartitionsConfig) -> Result<(), IggyError> { + let write_lock = self.write_lock.clone(); + let _guard = write_lock.lock().await; + + let journal_info = self.log.journal().info; + if journal_info.messages_count == 0 { + return Ok(()); + } + + let is_full = self.log.active_segment().is_full(); + let unsaved_messages_count_exceeded = + journal_info.messages_count >= config.messages_required_to_save; + let unsaved_messages_size_exceeded = journal_info.size.as_bytes_u64() + >= config.size_of_messages_required_to_save.as_bytes_u64(); + let should_persist = + is_full || unsaved_messages_count_exceeded || unsaved_messages_size_exceeded; + if !should_persist { + return Ok(()); + } + + let (frozen_batches, index_bytes, batch_count) = { + let entries = self.log.journal().inner.entries(); + let segment = self.log.active_segment(); + let mut file_position = segment.size.as_bytes_u64(); + self.log.ensure_indexes(); + let indexes = self.log.active_indexes_mut().expect("indexes must exist"); + let mut flush_index = None; + let mut frozen = Vec::with_capacity(entries.len()); + let mut batch_count = 0u32; + + for entry in entries { + let Ok(batch) = decode_prepare_slice(entry.as_slice()) else { + continue; + }; + let message_count = batch.message_count(); + if message_count == 0 { + continue; + } + + let index = crate::iggy_index::IggyIndex::new( + batch.header.base_offset, + batch.header.base_timestamp, + file_position, + ); + if flush_index.is_none() { + indexes.insert(index.offset, index.timestamp, index.position); + flush_index = Some(index); + } + file_position += batch.header.total_size() as u64; + batch_count += message_count; + frozen.push(entry); + } + + let index_bytes = + flush_index.map(|index| crate::iggy_index::IggyIndexCache::serialize(&index)); + + (frozen, index_bytes, batch_count) + }; + + let Some(index_bytes) = index_bytes else { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = self.namespace().inner(), + "failed to build sparse index entry from pending journal batches" + ); + return Err(IggyError::InvalidCommand); + }; + + self.persist_frozen_batches_to_disk(frozen_batches, index_bytes, batch_count) + .await?; + + if is_full { + self.rotate_segment(config).await?; + } + + let _ = self.log.journal_mut().inner.commit(); + self.log.journal_mut().info = JournalInfo::default(); + + let segment_index = self.log.segments().len() - 1; + let segment = &mut self.log.segments_mut()[segment_index]; + if segment.end_offset == 0 && journal_info.first_timestamp != 0 { + segment.start_timestamp = journal_info.first_timestamp; + } + segment.end_timestamp = journal_info.end_timestamp; + segment.max_timestamp = segment.max_timestamp.max(journal_info.max_timestamp); + segment.end_offset = journal_info.current_offset; + + self.stats + .increment_size_bytes(journal_info.size.as_bytes_u64()); + self.stats + .increment_messages_count(u64::from(journal_info.messages_count)); + + let durable_offset = journal_info.current_offset; + self.offset.store(durable_offset, Ordering::Release); + self.stats.set_current_offset(durable_offset); + Ok(()) + } + + async fn handle_committed_entries( + &mut self, + drained: Vec, + config: &PartitionsConfig, + ) { + let replica_id = self.consensus.replica(); + let namespace_raw = self.consensus.namespace(); + if let (Some(first), Some(last)) = (drained.first(), drained.last()) { + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id, + first_op = first.header.op, + last_op = last.header.op, + drained_count = drained.len(), + "draining committed partition ops" + ); + } + + let mut failed_commit = false; + let committed_visible_offsets = self.resolve_committed_visible_offsets(&drained).await; + let mut messages_committed = false; + + for PipelineEntry { + header: prepare_header, + .. + } in drained + { + if !self + .commit_partition_entry( + prepare_header, + &mut messages_committed, + &committed_visible_offsets, + &mut failed_commit, + config, + ) + .await + { + continue; + } + + let pipeline_depth = self.consensus.pipeline().borrow().len(); + let event = CommitLogEvent { + replica: ReplicaLogContext::from_consensus(&self.consensus, PlaneKind::Partitions), + op: prepare_header.op, + client_id: prepare_header.client, + request_id: prepare_header.request, + operation: prepare_header.operation, + pipeline_depth, + }; + emit_sim_event(SimEventKind::OperationCommitted, &event); + emit_namespace_progress_event( + SimEventKind::NamespaceProgressUpdated, + &event.replica, + prepare_header.op, + pipeline_depth, + ); + + let reply_buffers = + build_reply_message(&self.consensus, &prepare_header, bytes::Bytes::new()) + .into_generic(); + emit_sim_event(SimEventKind::ClientReplyEmitted, &event); + + if let Err(error) = self + .consensus + .message_bus() + .send_to_client(prepare_header.client, reply_buffers) + .await + { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + client = prepare_header.client, + op = prepare_header.op, + namespace_raw, + %error, + "failed to send reply to client" + ); + } + } + + if failed_commit { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id, + namespace_raw, + "partition failed local commit handling for one or more ops" + ); + } + } + + async fn resolve_committed_visible_offsets( + &self, + drained: &[PipelineEntry], + ) -> HashMap { + let mut committed_visible_offsets = HashMap::new(); + + for entry in drained { + if entry.header.operation != Operation::SendMessages { + continue; + } + + match self.committed_end_offset_for_prepare(&entry.header).await { + Ok(Some(end_offset)) => { + committed_visible_offsets.insert(entry.header.op, end_offset); + } + Ok(None) => {} + Err(error) => { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = self.consensus.replica(), + namespace_raw = self.namespace().inner(), + op = entry.header.op, + operation = ?entry.header.operation, + %error, + "failed to resolve committed visible offset for partition entry" + ); + } + } + } + + committed_visible_offsets + } + + async fn commit_partition_entry( + &mut self, + prepare_header: PrepareHeader, + messages_committed: &mut bool, + committed_visible_offsets: &HashMap, + failed_commit: &mut bool, + config: &PartitionsConfig, + ) -> bool { + match prepare_header.operation { + Operation::SendMessages => { + if !*messages_committed { + if let Err(error) = self.commit_messages(config).await { + *failed_commit = true; + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = self.consensus.replica(), + namespace_raw = self.namespace().inner(), + op = prepare_header.op, + operation = ?prepare_header.operation, + %error, + "failed to commit partition messages" + ); + return false; + } + *messages_committed = true; + } + + if let Some(visible_offset) = committed_visible_offsets.get(&prepare_header.op) { + self.offset.store(*visible_offset, Ordering::Release); + self.stats.set_current_offset(*visible_offset); + } + !*failed_commit + } + Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset => { + self.commit_consumer_offset_entry(prepare_header, failed_commit) + .await + } + _ => { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = self.consensus.replica(), + op = prepare_header.op, + namespace_raw = self.namespace().inner(), + operation = ?prepare_header.operation, + "unexpected committed partition operation" + ); + true + } + } + } + + async fn committed_end_offset_for_prepare( + &self, + prepare_header: &PrepareHeader, + ) -> Result, IggyError> { + let Some(entry) = self.log.journal().inner.entry(prepare_header).await else { + return Err(IggyError::InvalidCommand); + }; + let batch = + decode_prepare_slice(entry.as_slice()).map_err(|_| IggyError::InvalidCommand)?; + let message_count = batch.message_count(); + if message_count == 0 { + return Ok(None); + } + + Ok(Some( + batch.header.base_offset + u64::from(message_count) - 1, + )) + } + + fn parse_consumer_offset_request( + operation: Operation, + message: &Message, + ) -> Result<(ConsumerKind, u32, Option), IggyError> { + let total_size = + usize::try_from(message.header().size).map_err(|_| IggyError::InvalidCommand)?; + let body = message + .as_slice() + .get(std::mem::size_of::()..total_size) + .ok_or(IggyError::InvalidCommand)?; + Self::parse_consumer_offset_payload(operation, body) + } + + fn parse_staged_consumer_offset_commit( + operation: Operation, + message: &Message, + ) -> Result<(ConsumerKind, u32, Option), IggyError> { + let total_size = + usize::try_from(message.header().size).map_err(|_| IggyError::InvalidCommand)?; + let body = message + .as_slice() + .get(std::mem::size_of::()..total_size) + .ok_or(IggyError::InvalidCommand)?; + Self::parse_consumer_offset_payload(operation, body) + } + + fn parse_consumer_offset_payload( + operation: Operation, + body: &[u8], + ) -> Result<(ConsumerKind, u32, Option), IggyError> { + let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?; + let consumer_id = body + .get(1..5) + .ok_or(IggyError::InvalidCommand) + .and_then(|bytes| { + <[u8; 4]>::try_from(bytes) + .map(u32::from_le_bytes) + .map_err(|_| IggyError::InvalidCommand) + })?; + let kind = ConsumerKind::from_code(consumer_kind)?; + match operation { + Operation::StoreConsumerOffset => { + let offset = + body.get(5..13) + .ok_or(IggyError::InvalidCommand) + .and_then(|bytes| { + <[u8; 8]>::try_from(bytes) + .map(u64::from_le_bytes) + .map_err(|_| IggyError::InvalidCommand) + })?; + Ok((kind, consumer_id, Some(offset))) + } + Operation::DeleteConsumerOffset => Ok((kind, consumer_id, None)), + _ => Err(IggyError::InvalidCommand), + } + } + + fn apply_committed_consumer_offset_commits_up_to( + &mut self, + commit: u64, + ) -> Result<(), IggyError> { + let mut committed_ops: Vec<_> = self + .pending_consumer_offset_commits + .keys() + .copied() + .filter(|op| *op <= commit) + .collect(); + committed_ops.sort_unstable(); + + for op in committed_ops { + self.apply_staged_consumer_offset_commit(op)?; + } + + Ok(()) + } + + async fn commit_consumer_offset_entry( + &mut self, + prepare_header: PrepareHeader, + failed_commit: &mut bool, + ) -> bool { + let write_lock = self.write_lock.clone(); + let _guard = write_lock.lock().await; + + if let Err(error) = self.apply_staged_consumer_offset_commit(prepare_header.op) { + *failed_commit = true; + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = self.consensus.replica(), + op = prepare_header.op, + namespace_raw = self.namespace().inner(), + %error, + "failed to apply staged consumer offset commit" + ); + return false; + } + + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + replica_id = self.consensus.replica(), + op = prepare_header.op, + namespace_raw = self.namespace().inner(), + "consumer offset committed" + ); + true + } + + async fn persist_frozen_batches_to_disk( + &mut self, + frozen_batches: Vec>, + index_bytes: Vec, + batch_count: u32, + ) -> Result<(), IggyError> { + if batch_count == 0 { + return Ok(()); + } + + if !self.log.has_segments() { + return Ok(()); + } + + let stripped_batches: Vec<_> = frozen_batches + .into_iter() + .map(|batch| batch.slice(std::mem::size_of::()..)) + .collect(); + let messages_writer = self + .log + .messages_writers() + .last() + .and_then(|writer| writer.as_ref()) + .cloned(); + let index_writer = self + .log + .index_writers() + .last() + .and_then(|writer| writer.as_ref()) + .cloned(); + + if messages_writer.is_none() || index_writer.is_none() { + let saved_bytes = stripped_batches.iter().map(Frozen::len).sum::(); + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = self.namespace().inner(), + batch_count, + saved_bytes, + "simulated in-memory batch persistence" + ); + + let segment_index = self.log.segments().len() - 1; + let segment = &mut self.log.segments_mut()[segment_index]; + segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + saved_bytes as u64); + self.log.clear_in_flight(); + return Ok(()); + } + + let messages_writer = messages_writer.expect("checked above"); + let index_writer = index_writer.expect("checked above"); + + let saved = messages_writer + .save_frozen_batches(&stripped_batches) + .await + .map_err(|error| { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = self.namespace().inner(), + batch_count, + %error, + "failed to save frozen batches" + ); + error + })?; + + index_writer + .save_indexes(index_bytes) + .await + .map_err(|error| { + warn!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = self.namespace().inner(), + batch_count, + %error, + "failed to save sparse indexes" + ); + error + })?; + + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = self.namespace().inner(), + batch_count, + saved_bytes = saved.as_bytes_u64(), + "persisted batches to disk" + ); + + let segment_index = self.log.segments().len() - 1; + let segment = &mut self.log.segments_mut()[segment_index]; + segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + saved.as_bytes_u64()); + + self.log.clear_in_flight(); + Ok(()) + } + + async fn rotate_segment(&mut self, config: &PartitionsConfig) -> Result<(), IggyError> { + let namespace = self.namespace(); + let old_segment_index = self.log.segments().len() - 1; + let active_segment = self.log.active_segment_mut(); + active_segment.sealed = true; + let start_offset = active_segment.end_offset + 1; + + let segment = Segment::new(start_offset, config.segment_size); + let messages_path = config.get_messages_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + start_offset, + ); + let index_path = config.get_index_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + start_offset, + ); + + let storage = SegmentStorage::new( + &messages_path, + &index_path, + 0, + 0, + config.enforce_fsync, + config.enforce_fsync, + false, + ) + .await + .map_err(|_| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))?; + let messages_size_bytes = storage + .messages_writer + .as_ref() + .ok_or_else(|| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))? + .size_counter(); + let messages_writer = Rc::new( + MessagesWriter::new( + &messages_path, + messages_size_bytes, + config.enforce_fsync, + false, + ) + .await + .map_err(|_| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))?, + ); + let index_writer = Rc::new( + IggyIndexWriter::new( + &index_path, + Rc::new(std::sync::atomic::AtomicU64::new(0)), + config.enforce_fsync, + false, + ) + .await + .map_err(|_| IggyError::CannotCreateSegmentIndexFile(index_path.clone()))?, + ); + + let old_storage = &mut self.log.storages_mut()[old_segment_index]; + let _ = old_storage.shutdown(); + self.log.messages_writers_mut()[old_segment_index] = None; + self.log.index_writers_mut()[old_segment_index] = None; + + self.log + .add_persisted_segment(segment, storage, Some(messages_writer), Some(index_writer)); + self.stats.increment_segments_count(1); + + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = namespace.inner(), + start_offset, + "rotated to new segment" + ); + Ok(()) + } + + async fn send_prepare_ok(&self, header: &PrepareHeader) { + send_prepare_ok_common(self.consensus(), header, Some(true)).await; + } +} diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 5d9d3d6010..ce286a7152 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -18,39 +18,17 @@ #![allow(dead_code)] use crate::IggyPartition; -use crate::Partition; -use crate::PollingConsumer; -use crate::iggy_index_writer::IggyIndexWriter; -use crate::log::JournalInfo; -use crate::messages_writer::MessagesWriter; -use crate::segment::Segment; use crate::types::PartitionsConfig; -use consensus::PlaneIdentity; -use consensus::{ - CommitLogEvent, Consensus, NamespacedPipeline, Pipeline, PipelineEntry, Plane, PlaneKind, - Project, ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, - ack_preflight, build_reply_message, emit_namespace_progress_event, emit_sim_event, - fence_old_prepare_by_commit, pipeline_prepare_common, replicate_preflight, - replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, -}; +use consensus::{Consensus, Plane, PlaneIdentity, VsrConsensus}; use iggy_binary_protocol::{ - Command2, ConsensusHeader, GenericHeader, Message, Operation, PrepareHeader, PrepareOkHeader, + Command2, ConsensusHeader, GenericHeader, Message, PrepareHeader, PrepareOkHeader, RequestHeader, }; -use iggy_common::{ - IggyByteSize, IggyError, PartitionStats, SegmentStorage, - send_messages2::{convert_request_message, decode_prepare_slice}, - sharding::{IggyNamespace, LocalIdx, ShardId}, -}; -use iobuf::Frozen; -use journal::Journal as _; +use iggy_common::sharding::{IggyNamespace, LocalIdx, ShardId}; use message_bus::MessageBus; use std::cell::UnsafeCell; -use std::collections::{HashMap, HashSet}; -use std::rc::Rc; -use std::sync::Arc; -use std::sync::atomic::Ordering; -use tracing::{debug, warn}; +use std::collections::HashMap; +use tracing::warn; /// Per-shard collection of all partitions. /// @@ -61,29 +39,25 @@ use tracing::{debug, warn}; /// For example, shard 0 might have `partition_ids` [0, 2, 4] while shard 1 /// has `partition_ids` [1, 3, 5]. The `LocalIdx` provides the actual index /// into the `partitions` Vec. -pub struct IggyPartitions { +pub struct IggyPartitions +where + B: MessageBus, +{ shard_id: ShardId, config: PartitionsConfig, - /// Collection of partitions, the index of each partition isn't it's ID, but rather an local index (`LocalIdx`) which is used for lookups. + /// Collection of partitions, the index of each partition isn't it's ID, but rather a local index (`LocalIdx`) which is used for lookups. /// /// Wrapped in `UnsafeCell` for interior mutability — matches the single-threaded /// per-shard execution model. Consensus trait methods take `&self` but need to /// mutate partition state (segments, offsets, journal). - /// - /// TODO: Move to more granular synchronization around partition substate and - /// stop exposing `&mut IggyPartition` from `&self`. The long-term shape here - /// should let append/commit paths coordinate on the exact mutable state they - /// need instead of relying on `UnsafeCell` aliasing for the whole partition. - partitions: UnsafeCell>, + partitions: UnsafeCell>>, namespace_to_local: HashMap, - consensus: Option, -} - -const fn freeze_client_reply(message: Message) -> Message { - message } -impl IggyPartitions { +impl IggyPartitions +where + B: MessageBus, +{ #[must_use] pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self { Self { @@ -91,7 +65,6 @@ impl IggyPartitions { config, partitions: UnsafeCell::new(Vec::new()), namespace_to_local: HashMap::new(), - consensus: None, } } @@ -102,7 +75,6 @@ impl IggyPartitions { config, partitions: UnsafeCell::new(Vec::with_capacity(capacity)), namespace_to_local: HashMap::with_capacity(capacity), - consensus: None, } } @@ -110,7 +82,7 @@ impl IggyPartitions { &self.config } - fn partitions(&self) -> &Vec { + fn partitions(&self) -> &Vec> { // Safety: single-threaded per-shard model, no concurrent access. unsafe { &*self.partitions.get() } } @@ -128,13 +100,13 @@ impl IggyPartitions { } /// Get partition by local index. - pub fn get(&self, local_idx: LocalIdx) -> Option<&IggyPartition> { + pub fn get(&self, local_idx: LocalIdx) -> Option<&IggyPartition> { self.partitions().get(*local_idx) } /// Get mutable partition by local index. #[allow(clippy::mut_from_ref)] - fn get_mut(&self, local_idx: LocalIdx) -> Option<&mut IggyPartition> { + fn get_mut(&self, local_idx: LocalIdx) -> Option<&mut IggyPartition> { // Safety: single-threaded per-shard model, no concurrent access. unsafe { (&mut *self.partitions.get()).get_mut(*local_idx) } } @@ -145,7 +117,7 @@ impl IggyPartitions { } /// Insert a new partition and return its local index. - pub fn insert(&mut self, namespace: IggyNamespace, partition: IggyPartition) -> LocalIdx { + pub fn insert(&mut self, namespace: IggyNamespace, partition: IggyPartition) -> LocalIdx { let partitions = self.partitions.get_mut(); let local_idx = LocalIdx::new(partitions.len()); partitions.push(partition); @@ -159,21 +131,21 @@ impl IggyPartitions { } /// Get partition by namespace directly. - pub fn get_by_ns(&self, namespace: &IggyNamespace) -> Option<&IggyPartition> { + pub fn get_by_ns(&self, namespace: &IggyNamespace) -> Option<&IggyPartition> { let idx = self.namespace_to_local.get(namespace)?; self.partitions().get(**idx) } /// Get mutable partition by namespace directly. #[allow(clippy::mut_from_ref)] - fn get_mut_by_ns(&self, namespace: &IggyNamespace) -> Option<&mut IggyPartition> { + pub fn get_mut_by_ns(&self, namespace: &IggyNamespace) -> Option<&mut IggyPartition> { let idx = self.namespace_to_local.get(namespace)?; // Safety: single-threaded per-shard model, no concurrent access. unsafe { (&mut *self.partitions.get()).get_mut(**idx) } } /// Remove a partition by namespace. Returns the removed partition if found. - pub fn remove(&mut self, namespace: &IggyNamespace) -> Option { + pub fn remove(&mut self, namespace: &IggyNamespace) -> Option> { let local_idx = self.namespace_to_local.remove(namespace)?; let idx = *local_idx; let partitions = self.partitions.get_mut(); @@ -182,12 +154,9 @@ impl IggyPartitions { return None; } - // Swap-remove for O(1) deletion let partition = partitions.swap_remove(idx); - // If we swapped an element, update its index in the map if idx < partitions.len() { - // Find the namespace that was at the last position (now at idx) for lidx in self.namespace_to_local.values_mut() { if **lidx == partitions.len() { *lidx = LocalIdx::new(idx); @@ -200,7 +169,7 @@ impl IggyPartitions { } /// Remove multiple partitions at once. - pub fn remove_many(&mut self, namespaces: &[IggyNamespace]) -> Vec { + pub fn remove_many(&mut self, namespaces: &[IggyNamespace]) -> Vec> { namespaces.iter().filter_map(|ns| self.remove(ns)).collect() } @@ -209,383 +178,72 @@ impl IggyPartitions { self.namespace_to_local.keys() } - /// Iterate over all (namespace, partition) pairs. - pub fn iter(&self) -> impl Iterator { - self.namespace_to_local - .iter() - .filter_map(|(ns, idx)| self.partitions().get(**idx).map(|p| (ns, p))) - } - - /// Iterate over all (namespace, partition) pairs mutably. - pub fn iter_mut(&self) -> impl Iterator { - // Safety: single-threaded per-shard model, no concurrent access. - let partitions = unsafe { &mut *self.partitions.get() }; - let partitions_ptr = partitions.as_mut_ptr(); - let partitions_len = partitions.len(); - self.namespace_to_local.iter().filter_map(move |(ns, idx)| { - let i = **idx; - if i < partitions_len { - // Safety: each LocalIdx is unique, so no two iterations alias the same element. - Some((ns, unsafe { &mut *partitions_ptr.add(i) })) - } else { - None - } - }) - } - /// Get partition by namespace, initializing if not present. - pub fn get_or_init(&mut self, namespace: IggyNamespace, init: F) -> &mut IggyPartition + pub fn get_or_init(&mut self, namespace: IggyNamespace, init: F) -> &mut IggyPartition where - F: FnOnce() -> IggyPartition, + F: FnOnce() -> IggyPartition, { - // TODO: get_or_insert if !self.namespace_to_local.contains_key(&namespace) { - let partition = init(); - self.insert(namespace, partition); + self.insert(namespace, init()); } let idx = *self.namespace_to_local[&namespace]; &mut self.partitions.get_mut()[idx] } - - pub const fn consensus(&self) -> Option<&C> { - self.consensus.as_ref() - } - - pub fn set_consensus(&mut self, consensus: C) { - self.consensus = Some(consensus); - } - - /// Initialize a new partition with in-memory storage (for testing/simulation). - /// - /// Idempotent: subsequent calls for the same namespace are no-ops returning - /// the existing index. Consensus must be set separately via `set_consensus`. - /// - /// TODO: Make the log generic over its storage backend to support both - /// in-memory (for testing) and file-backed (for production) storage without - /// needing separate initialization methods. - pub fn init_partition_in_memory(&mut self, namespace: IggyNamespace) -> LocalIdx { - if let Some(idx) = self.local_idx(&namespace) { - return idx; - } - - // Create initial segment with default (in-memory) storage - let start_offset = 0; - let segment = Segment::new(start_offset, self.config.segment_size); - let storage = SegmentStorage::default(); - - // Create partition with initialized log - let stats = Arc::new(PartitionStats::default()); - let mut partition = IggyPartition::new(stats); - partition - .log - .add_persisted_segment(segment, storage, None, None); - partition.offset.store(start_offset, Ordering::Release); - partition - .dirty_offset - .store(start_offset, Ordering::Relaxed); - partition.should_increment_offset = false; - partition.stats.increment_segments_count(1); - - self.insert(namespace, partition) - } - - /// Initialize a new partition with file-backed storage. - /// - /// This is the data plane initialization - creates the partition structure, - /// initial segment, and storage. Skips the control plane metadata broadcasting. - /// - /// Corresponds to the "INITIATE PARTITION" phase in the server's flow: - /// 1. Control plane: create `PartitionMeta` (SKIPPED in this method) - /// 2. Control plane: broadcast to shards (SKIPPED in this method) - /// 3. Data plane: INITIATE PARTITION (THIS METHOD) - /// - /// Idempotent: subsequent calls for the same namespace are no-ops. - /// Consensus must be set separately via `set_consensus`. - /// - /// # Errors - /// - /// Returns an `IggyError` if the backing segment storage or writers cannot be created. - pub async fn init_partition( - &mut self, - namespace: IggyNamespace, - ) -> Result { - if let Some(idx) = self.local_idx(&namespace) { - return Ok(idx); - } - - // Create initial segment with storage - let start_offset = 0; - let segment = Segment::new(start_offset, self.config.segment_size); - - // TODO: Waiting for issue to move server config to shared module. - // Once complete, paths will come from proper base_path/streams_path/etc config fields. - let messages_path = self.config.get_messages_path( - namespace.stream_id(), - namespace.topic_id(), - namespace.partition_id(), - start_offset, - ); - let index_path = self.config.get_index_path( - namespace.stream_id(), - namespace.topic_id(), - namespace.partition_id(), - start_offset, - ); - - let storage = SegmentStorage::new( - &messages_path, - &index_path, - 0, // messages_size (new segment) - 0, // indexes_size (new segment) - self.config.enforce_fsync, - self.config.enforce_fsync, - false, // file_exists (new segment) - ) - .await - .map_err(|_| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))?; - let messages_size_bytes = storage - .messages_writer - .as_ref() - .ok_or_else(|| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))? - .size_counter(); - let messages_writer = Rc::new( - MessagesWriter::new( - &messages_path, - messages_size_bytes, - self.config.enforce_fsync, - false, - ) - .await - .map_err(|_| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))?, - ); - let index_writer = Rc::new( - IggyIndexWriter::new( - &index_path, - Rc::new(std::sync::atomic::AtomicU64::new(0)), - self.config.enforce_fsync, - false, - ) - .await - .map_err(|_| IggyError::CannotCreateSegmentIndexFile(index_path.clone()))?, - ); - - // Create partition with initialized log - let stats = Arc::new(PartitionStats::default()); - let mut partition = IggyPartition::new(stats); - partition.log.add_persisted_segment( - segment, - storage, - Some(messages_writer), - Some(index_writer), - ); - partition.offset.store(start_offset, Ordering::Release); - partition - .dirty_offset - .store(start_offset, Ordering::Relaxed); - partition.should_increment_offset = false; - partition.stats.increment_segments_count(1); - - Ok(self.insert(namespace, partition)) - } } -impl Plane> for IggyPartitions> +impl Plane> for IggyPartitions where B: MessageBus, Client = u128>, { async fn on_request(&self, message: as Consensus>::Message) { let namespace = IggyNamespace::from_raw(message.header().namespace); - let consensus = self - .consensus() - .expect("on_request: consensus not initialized"); - - emit_sim_event( - SimEventKind::ClientRequestReceived, - &RequestLogEvent { - replica: ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), - client_id: message.header().client, - request_id: message.header().request, - operation: message.header().operation, - }, - ); - let message = if message.header().operation == Operation::SendMessages { - match convert_request_message(namespace, message) { - Ok(message) => message, - Err(error) => { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - replica_id = consensus.replica(), - namespace_raw = namespace.inner(), - operation = ?Operation::SendMessages, - error = %error, - "failed to convert send_messages request" - ); - return; - } - } - } else { - message - }; - let prepare = message.project(consensus); - pipeline_prepare_common(consensus, PlaneKind::Partitions, prepare, |prepare| { - self.on_replicate(prepare) - }) - .await; - } - - async fn on_replicate(&self, message: as Consensus>::Message) { - let header = *message.header(); - let namespace = IggyNamespace::from_raw(header.namespace); - let consensus = self - .consensus() - .expect("on_replicate: consensus not initialized"); - - let current_op = match replicate_preflight(consensus, &header) { - Ok(current_op) => current_op, - Err(reason) => { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - replica_id = consensus.replica(), - view = consensus.view(), - op = header.op, - namespace_raw = header.namespace, - operation = ?header.operation, - reason = reason.as_str(), - "ignoring prepare during replicate preflight" - ); - return; - } - }; - - let is_old_prepare = fence_old_prepare_by_commit(consensus, &header); - if is_old_prepare { + let Some(partition) = self.get_mut_by_ns(&namespace) else { warn!( target: "iggy.partitions.diag", plane = "partitions", - replica_id = consensus.replica(), - view = consensus.view(), - op = header.op, - commit = consensus.commit(), - namespace_raw = header.namespace, - operation = ?header.operation, - "received old prepare, skipping replication" + namespace_raw = namespace.inner(), + operation = ?message.header().operation, + "partition not initialized for namespace" ); return; - } + }; + partition.on_request(message).await; + } - // TODO: Figure out the flow of the partition operations. - // In metadata layer we assume that when an `on_request` or `on_replicate` is called, it's called from correct shard. - // I think we need to do the same here, which means that the code from below is unfallable, the partition should always exist by now! - let message = self.replicate(message).await; - if let Err(error) = self.apply_replicated_operation(&namespace, message).await { + async fn on_replicate(&self, message: as Consensus>::Message) { + let namespace = IggyNamespace::from_raw(message.header().namespace); + let Some(partition) = self.get_mut_by_ns(&namespace) else { warn!( target: "iggy.partitions.diag", plane = "partitions", - replica_id = consensus.replica(), - op = header.op, namespace_raw = namespace.inner(), - operation = ?header.operation, - %error, - "failed to apply replicated partition operation" + op = message.header().op, + operation = ?message.header().operation, + "partition not initialized for namespace" ); return; - } - - // TODO: Make those assertions be toggleable through a feature flag, so they can be used only by simulator/tests. - debug_assert_eq!(header.op, current_op + 1); - consensus.sequencer().set_sequence(header.op); - consensus.set_last_prepare_checksum(header.checksum); - emit_namespace_progress_event( - SimEventKind::NamespaceProgressUpdated, - &ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), - header.op, - consensus.pipeline().borrow().len(), - ); - - self.send_prepare_ok(&header).await; + }; + partition.on_replicate(message).await; } async fn on_ack(&self, message: as Consensus>::Message) { - let header = message.header(); - let consensus = self.consensus().expect("on_ack: consensus not initialized"); - - if let Err(reason) = ack_preflight(consensus) { + let namespace = IggyNamespace::from_raw(message.header().namespace); + let config = self.config.clone(); + let Some(partition) = self.get_mut_by_ns(&namespace) else { warn!( target: "iggy.partitions.diag", plane = "partitions", - replica_id = consensus.replica(), - view = consensus.view(), - op = header.op, - reason = reason.as_str(), - "ignoring ack during preflight" + namespace_raw = namespace.inner(), + op = message.header().op, + "partition not initialized for namespace" ); return; - } - - { - let pipeline = consensus.pipeline().borrow(); - if pipeline - .entry_by_op_and_checksum(header.op, header.prepare_checksum) - .is_none() - { - debug!( - target: "iggy.partitions.diag", - plane = "partitions", - replica_id = consensus.replica(), - op = header.op, - prepare_checksum = header.prepare_checksum, - "ack target prepare not in pipeline" - ); - return; - } - } - - consensus.handle_prepare_ok(PlaneKind::Partitions, header); - - // SAFETY(IGGY-66): Per-namespace drain independent of global commit. - // - // drain_committable_all() drains each namespace queue independently by - // quorum flag, so ns_a ops can be drained and replied to clients while - // ns_b ops block the global commit (e.g., ns_a ops 1,3 drain while - // ns_b op 2 is pending). This is intentional for partition independence. - // - // View change risk: if a view change occurs before the global commit - // covers a drained op, the new primary replays from max_commit+1 and - // re-executes it. append_messages is NOT idempotent -- re-execution - // produces duplicate partition data. - // - // Before this path handles real traffic, two guards are required: - // 1. Op-based dedup in apply_replicated_operation: skip append if - // the partition journal already contains data for this op. - // 2. Client reply dedup by (client_id, request_id): prevent - // duplicate replies after view change re-execution. - let drained = { - let mut pipeline = consensus.pipeline().borrow_mut(); - pipeline.drain_committable_all() }; - - if drained.is_empty() { - return; - } - - self.handle_committed_entries(consensus, drained).await; - - let pipeline = consensus.pipeline().borrow(); - let new_commit = pipeline.global_commit_frontier(consensus.commit()); - drop(pipeline); - consensus.advance_commit_number(new_commit); - emit_namespace_progress_event( - SimEventKind::NamespaceProgressUpdated, - &ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), - new_commit, - consensus.pipeline().borrow().len(), - ); + partition.on_ack(message, &config).await; } } -impl PlaneIdentity> for IggyPartitions> +impl PlaneIdentity> for IggyPartitions where B: MessageBus, Client = u128>, { @@ -600,718 +258,3 @@ where message.header().operation().is_partition() } } - -impl IggyPartitions> -where - B: MessageBus, Client = u128>, -{ - /// # Panics - /// Panics if consensus is not initialized. - pub fn register_namespace_in_pipeline(&self, ns: u64) { - self.consensus() - .expect("register_namespace_in_pipeline: consensus not initialized") - .pipeline() - .borrow_mut() - .register_namespace(ns); - } - - async fn apply_replicated_operation( - &self, - namespace: &IggyNamespace, - message: Message, - ) -> Result<(), IggyError> { - let consensus = self - .consensus() - .expect("apply_replicated_operation: consensus not initialized"); - let header = *message.header(); - - // TODO: WE have to distinguish between an `message` recv by leader and follower. - // In the follower path, we have to skip the `prepare_for_persistence` path, just append to journal. - match header.operation { - Operation::SendMessages => { - self.append_send_messages_to_journal(namespace, message) - .await?; - debug!( - target: "iggy.partitions.diag", - plane = "partitions", - replica = consensus.replica(), - op = header.op, - namespace_raw = namespace.inner(), - operation = ?header.operation, - "replicated send_messages appended to partition journal" - ); - Ok(()) - } - Operation::StoreConsumerOffset => { - let total_size = header.size() as usize; - let body = &message.as_slice()[std::mem::size_of::()..total_size]; - let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?; - let consumer_id = - body.get(1..5) - .ok_or(IggyError::InvalidCommand) - .and_then(|bytes| { - <[u8; 4]>::try_from(bytes) - .map(u32::from_le_bytes) - .map(|value| value as usize) - .map_err(|_| IggyError::InvalidCommand) - })?; - let offset = - body.get(5..13) - .ok_or(IggyError::InvalidCommand) - .and_then(|bytes| { - <[u8; 8]>::try_from(bytes) - .map(u64::from_le_bytes) - .map_err(|_| IggyError::InvalidCommand) - })?; - let consumer = match consumer_kind { - 1 => PollingConsumer::Consumer(consumer_id, 0), - 2 => PollingConsumer::ConsumerGroup(consumer_id, 0), - _ => { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - replica = consensus.replica(), - op = header.op, - namespace_raw = namespace.inner(), - operation = ?header.operation, - consumer_kind, - "unknown consumer kind while applying replicated offset update" - ); - return Err(IggyError::InvalidCommand); - } - }; - - let partition = self - .get_by_ns(namespace) - .expect("store_consumer_offset: partition not found for namespace"); - let _ = partition.store_consumer_offset(consumer, offset); - - debug!( - target: "iggy.partitions.diag", - plane = "partitions", - replica = consensus.replica(), - op = header.op, - namespace_raw = namespace.inner(), - operation = ?header.operation, - consumer_kind, - consumer_id, - offset, - "replicated consumer offset stored" - ); - Ok(()) - } - _ => { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - replica = consensus.replica(), - namespace_raw = namespace.inner(), - op = header.op, - operation = ?header.operation, - "unexpected replicated partition operation" - ); - Ok(()) - } - } - } - - /// Append a prepare message to a partition's journal with offset assignment. - /// - /// Updates `segment.current_position` (logical position for indexing) but - /// not `segment.end_offset` or `segment.end_timestamp` (committed state). - /// Those are updated during commit. - /// - /// Uses `dirty_offset` for offset assignment so that multiple prepares - /// can be pipelined before any commit. - async fn append_send_messages_to_journal( - &self, - namespace: &IggyNamespace, - message: Message, - ) -> Result<(), IggyError> { - let write_lock = self - .get_by_ns(namespace) - .expect("append_send_messages_to_journal: partition not found for namespace") - .write_lock - .clone(); - let _guard = write_lock.lock().await; - let partition = self - .get_mut_by_ns(namespace) - .expect("append_send_messages_to_journal: partition not found for namespace"); - partition.append_messages(message).await.map(|_| ()) - } - - /// Replicate a prepare message to the next replica in the chain. - /// - /// Chain replication: primary -> first backup -> ... -> last backup. - /// Stops when the next replica would be the primary. - async fn replicate(&self, message: Message) -> Message { - let consensus = self - .consensus() - .expect("replicate: consensus not initialized"); - replicate_to_next_in_chain(consensus, message).await - } - - /// Persist prepared messages to segment storage and advance the durable frontier. - /// - /// Updates segment metadata, stats, flushes journal to disk if thresholds - /// are exceeded, and advances the durable offset last. - #[allow(clippy::too_many_lines)] - async fn commit_messages(&self, namespace: &IggyNamespace) -> Result<(), IggyError> { - let write_lock = self - .get_by_ns(namespace) - .expect("commit_messages: partition not found for namespace") - .write_lock - .clone(); - let _guard = write_lock.lock().await; - - let partition = self - .get_mut_by_ns(namespace) - .expect("commit_messages: partition not found for namespace"); - - let journal = partition.log.journal(); - let journal_info = journal.info; - - if journal_info.messages_count == 0 { - return Ok(()); - } - - // 1. Check flush thresholds. - let is_full = partition.log.active_segment().is_full(); - - let unsaved_messages_count_exceeded = - journal_info.messages_count >= self.config.messages_required_to_save; - let unsaved_messages_size_exceeded = journal_info.size.as_bytes_u64() - >= self.config.size_of_messages_required_to_save.as_bytes_u64(); - let should_persist = - is_full || unsaved_messages_count_exceeded || unsaved_messages_size_exceeded; - - if !should_persist { - return Ok(()); - } - - // Freeze journal batches. - let (frozen_batches, index_bytes, batch_count) = { - let entries = partition.log.journal().inner.entries(); - let segment = partition.log.active_segment(); - let mut file_position = segment.size.as_bytes_u64(); - partition.log.ensure_indexes(); - let indexes = partition.log.active_indexes_mut().unwrap(); - let mut flush_index = None; - let mut frozen = Vec::with_capacity(entries.len()); - let mut batch_count = 0u32; - - for entry in entries { - let Ok(batch) = decode_prepare_slice(entry.as_slice()) else { - continue; - }; - let message_count = batch.message_count(); - if message_count == 0 { - continue; - } - - let index = crate::iggy_index::IggyIndex::new( - batch.header.base_offset, - batch.header.base_timestamp, - file_position, - ); - if flush_index.is_none() { - indexes.insert(index.offset, index.timestamp, index.position); - flush_index = Some(index); - } - file_position += batch.header.total_size() as u64; - batch_count += message_count; - frozen.push(entry); - } - - let index_bytes = - flush_index.map(|index| crate::iggy_index::IggyIndexCache::serialize(&index)); - - (frozen, index_bytes, batch_count) - }; - - let Some(index_bytes) = index_bytes else { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - namespace_raw = namespace.inner(), - "failed to build sparse index entry from pending journal batches" - ); - return Err(IggyError::InvalidCommand); - }; - - // Persist to disk. - self.persist_frozen_batches_to_disk(namespace, frozen_batches, index_bytes, batch_count) - .await?; - - if is_full { - self.rotate_segment(namespace).await?; - } - - // Reset journal info after drain. - let partition = self - .get_mut_by_ns(namespace) - .expect("commit_messages: partition not found"); - let _ = partition.log.journal_mut().inner.commit(); - partition.log.journal_mut().info = JournalInfo::default(); - - // Update segment metadata from the just-persisted journal state. - let partition = self - .get_mut_by_ns(namespace) - .expect("commit_messages: partition not found"); - let segment_index = partition.log.segments().len() - 1; - let segment = &mut partition.log.segments_mut()[segment_index]; - - if segment.end_offset == 0 && journal_info.first_timestamp != 0 { - segment.start_timestamp = journal_info.first_timestamp; - } - segment.end_timestamp = journal_info.end_timestamp; - segment.max_timestamp = segment.max_timestamp.max(journal_info.max_timestamp); - segment.end_offset = journal_info.current_offset; - - partition - .stats - .increment_size_bytes(journal_info.size.as_bytes_u64()); - partition - .stats - .increment_messages_count(u64::from(journal_info.messages_count)); - - let durable_offset = journal_info.current_offset; - partition.offset.store(durable_offset, Ordering::Release); - partition.stats.set_current_offset(durable_offset); - Ok(()) - } - - async fn handle_committed_entries( - &self, - consensus: &VsrConsensus, - drained: Vec, - ) { - if let (Some(first), Some(last)) = (drained.first(), drained.last()) { - debug!( - target: "iggy.partitions.diag", - plane = "partitions", - replica_id = consensus.replica(), - first_op = first.header.op, - last_op = last.header.op, - drained_count = drained.len(), - "draining committed partition ops" - ); - } - - let mut committed_ns: HashSet = HashSet::new(); - let mut failed_ns: HashSet = HashSet::new(); - let committed_visible_offsets = self - .resolve_committed_visible_offsets(consensus, &drained, &mut failed_ns) - .await; - - for PipelineEntry { - header: prepare_header, - .. - } in drained - { - let entry_namespace = IggyNamespace::from_raw(prepare_header.namespace); - if !self - .commit_partition_entry( - consensus, - prepare_header, - entry_namespace, - &mut committed_ns, - &committed_visible_offsets, - &mut failed_ns, - ) - .await - { - continue; - } - - let pipeline_depth = consensus.pipeline().borrow().len(); - let event = CommitLogEvent { - replica: ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), - op: prepare_header.op, - client_id: prepare_header.client, - request_id: prepare_header.request, - operation: prepare_header.operation, - pipeline_depth, - }; - emit_sim_event(SimEventKind::OperationCommitted, &event); - emit_namespace_progress_event( - SimEventKind::NamespaceProgressUpdated, - &event.replica, - prepare_header.op, - pipeline_depth, - ); - - let generic_reply = - build_reply_message(consensus, &prepare_header, bytes::Bytes::new()).into_generic(); - let reply_buffers = freeze_client_reply(generic_reply); - emit_sim_event(SimEventKind::ClientReplyEmitted, &event); - - if let Err(error) = consensus - .message_bus() - .send_to_client(prepare_header.client, reply_buffers) - .await - { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - client = prepare_header.client, - op = prepare_header.op, - namespace_raw = entry_namespace.inner(), - %error, - "failed to send reply to client" - ); - } - } - if !failed_ns.is_empty() { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - replica_id = consensus.replica(), - failed_namespaces = failed_ns.len(), - "some namespaces failed local commit handling" - ); - } - } - - async fn resolve_committed_visible_offsets( - &self, - consensus: &VsrConsensus, - drained: &[PipelineEntry], - failed_ns: &mut HashSet, - ) -> HashMap { - let mut committed_visible_offsets = HashMap::new(); - - for entry in drained { - if entry.header.operation != Operation::SendMessages { - continue; - } - - let entry_namespace = IggyNamespace::from_raw(entry.header.namespace); - match self - .committed_end_offset_for_prepare(&entry_namespace, &entry.header) - .await - { - Ok(Some(end_offset)) => { - committed_visible_offsets.insert(entry_namespace, end_offset); - } - Ok(None) => {} - Err(error) => { - failed_ns.insert(entry_namespace); - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - replica_id = consensus.replica(), - namespace_raw = entry_namespace.inner(), - op = entry.header.op, - operation = ?entry.header.operation, - %error, - "failed to resolve committed visible offset for partition entry" - ); - } - } - } - - committed_visible_offsets - } - - async fn commit_partition_entry( - &self, - consensus: &VsrConsensus, - prepare_header: PrepareHeader, - entry_namespace: IggyNamespace, - committed_ns: &mut HashSet, - committed_visible_offsets: &HashMap, - failed_ns: &mut HashSet, - ) -> bool { - match prepare_header.operation { - Operation::SendMessages => { - if committed_ns.insert(entry_namespace) - && let Err(error) = self.commit_messages(&entry_namespace).await - { - failed_ns.insert(entry_namespace); - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - replica_id = consensus.replica(), - namespace_raw = entry_namespace.inner(), - op = prepare_header.op, - operation = ?prepare_header.operation, - %error, - "failed to commit partition messages" - ); - } - - if committed_ns.contains(&entry_namespace) - && !failed_ns.contains(&entry_namespace) - && let Some(visible_offset) = - committed_visible_offsets.get(&entry_namespace).copied() - { - let partition = self - .get_by_ns(&entry_namespace) - .expect("commit_partition_entry: partition not found"); - partition.offset.store(visible_offset, Ordering::Release); - partition.stats.set_current_offset(visible_offset); - } - - !failed_ns.contains(&entry_namespace) - } - Operation::StoreConsumerOffset => { - // TODO: Commit consumer offset update. - debug!( - target: "iggy.partitions.diag", - plane = "partitions", - replica_id = consensus.replica(), - op = prepare_header.op, - namespace_raw = entry_namespace.inner(), - "consumer offset committed" - ); - true - } - _ => { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - replica_id = consensus.replica(), - op = prepare_header.op, - namespace_raw = entry_namespace.inner(), - operation = ?prepare_header.operation, - "unexpected committed partition operation" - ); - true - } - } - } - - async fn committed_end_offset_for_prepare( - &self, - namespace: &IggyNamespace, - prepare_header: &PrepareHeader, - ) -> Result, IggyError> { - let partition = self - .get_by_ns(namespace) - .expect("committed_end_offset_for_prepare: partition not found"); - let Some(entry) = partition.log.journal().inner.entry(prepare_header).await else { - return Err(IggyError::InvalidCommand); - }; - let batch = - decode_prepare_slice(entry.as_slice()).map_err(|_| IggyError::InvalidCommand)?; - let message_count = batch.message_count(); - if message_count == 0 { - return Ok(None); - } - - Ok(Some( - batch.header.base_offset + u64::from(message_count) - 1, - )) - } - - /// Persist frozen batches to disk and update segment bookkeeping. - async fn persist_frozen_batches_to_disk( - &self, - namespace: &IggyNamespace, - frozen_batches: Vec>, - index_bytes: Vec, - batch_count: u32, - ) -> Result<(), IggyError> { - if batch_count == 0 { - return Ok(()); - } - - let partition = self - .get_by_ns(namespace) - .expect("persist: partition not found"); - - if !partition.log.has_segments() { - return Ok(()); - } - - let stripped_batches: Vec<_> = frozen_batches - .into_iter() - .map(|batch| batch.slice(std::mem::size_of::()..)) - .collect(); - let messages_writer = partition - .log - .messages_writers() - .last() - .and_then(|writer| writer.as_ref()) - .cloned(); - let index_writer = partition - .log - .index_writers() - .last() - .and_then(|writer| writer.as_ref()) - .cloned(); - - if messages_writer.is_none() || index_writer.is_none() { - let saved_bytes = stripped_batches.iter().map(Frozen::len).sum::(); - debug!( - target: "iggy.partitions.diag", - plane = "partitions", - namespace_raw = namespace.inner(), - batch_count, - saved_bytes, - "simulated in-memory batch persistence" - ); - - let partition = self - .get_mut_by_ns(namespace) - .expect("persist: partition not found"); - let segment_index = partition.log.segments().len() - 1; - let segment = &mut partition.log.segments_mut()[segment_index]; - segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + saved_bytes as u64); - partition.log.clear_in_flight(); - return Ok(()); - } - - let messages_writer = messages_writer.expect("checked above"); - let index_writer = index_writer.expect("checked above"); - - let saved = messages_writer - .save_frozen_batches(&stripped_batches) - .await - .map_err(|error| { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - namespace_raw = namespace.inner(), - batch_count, - %error, - "failed to save frozen batches" - ); - error - })?; - - index_writer - .save_indexes(index_bytes) - .await - .map_err(|error| { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - namespace_raw = namespace.inner(), - batch_count, - %error, - "failed to save sparse indexes" - ); - error - })?; - - debug!( - target: "iggy.partitions.diag", - plane = "partitions", - namespace_raw = namespace.inner(), - batch_count, - saved_bytes = saved.as_bytes_u64(), - "persisted batches to disk" - ); - - let partition = self - .get_mut_by_ns(namespace) - .expect("persist: partition not found"); - - // Recalculate index: segment deletion during async I/O shifts indices. - let segment_index = partition.log.segments().len() - 1; - - let segment = &mut partition.log.segments_mut()[segment_index]; - segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + saved.as_bytes_u64()); - - partition.log.clear_in_flight(); - Ok(()) - } - - /// Rotate to a new segment when the current one is full. - async fn rotate_segment(&self, namespace: &IggyNamespace) -> Result<(), IggyError> { - let partition = self - .get_mut_by_ns(namespace) - .expect("rotate_segment: partition not found"); - - let old_segment_index = partition.log.segments().len() - 1; - let active_segment = partition.log.active_segment_mut(); - active_segment.sealed = true; - let start_offset = active_segment.end_offset + 1; - - let segment = Segment::new(start_offset, self.config.segment_size); - - // TODO: Waiting for issue to move server config to shared module. - // Once complete, paths will come from proper base_path/streams_path/etc config fields. - let messages_path = self.config.get_messages_path( - namespace.stream_id(), - namespace.topic_id(), - namespace.partition_id(), - start_offset, - ); - let index_path = self.config.get_index_path( - namespace.stream_id(), - namespace.topic_id(), - namespace.partition_id(), - start_offset, - ); - - let storage = SegmentStorage::new( - &messages_path, - &index_path, - 0, // messages_size (new segment) - 0, // indexes_size (new segment) - self.config.enforce_fsync, - self.config.enforce_fsync, - false, // file_exists (new segment) - ) - .await - .map_err(|_| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))?; - let messages_size_bytes = storage - .messages_writer - .as_ref() - .ok_or_else(|| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))? - .size_counter(); - let messages_writer = Rc::new( - MessagesWriter::new( - &messages_path, - messages_size_bytes, - self.config.enforce_fsync, - false, - ) - .await - .map_err(|_| IggyError::CannotCreateSegmentLogFile(messages_path.clone()))?, - ); - let index_writer = Rc::new( - IggyIndexWriter::new( - &index_path, - Rc::new(std::sync::atomic::AtomicU64::new(0)), - self.config.enforce_fsync, - false, - ) - .await - .map_err(|_| IggyError::CannotCreateSegmentIndexFile(index_path.clone()))?, - ); - - // Close writers for the sealed segment. - let old_storage = &mut partition.log.storages_mut()[old_segment_index]; - let _ = old_storage.shutdown(); - partition.log.messages_writers_mut()[old_segment_index] = None; - partition.log.index_writers_mut()[old_segment_index] = None; - - partition.log.add_persisted_segment( - segment, - storage, - Some(messages_writer), - Some(index_writer), - ); - partition.stats.increment_segments_count(1); - - debug!( - target: "iggy.partitions.diag", - plane = "partitions", - namespace_raw = namespace.inner(), - start_offset, - "rotated to new segment" - ); - Ok(()) - } - - async fn send_prepare_ok(&self, header: &PrepareHeader) { - let consensus = self - .consensus() - .expect("send_prepare_ok: consensus not initialized"); - send_prepare_ok_common(consensus, header, Some(true)).await; - } -} diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs index fd5bb0234c..be2b186471 100644 --- a/core/partitions/src/lib.rs +++ b/core/partitions/src/lib.rs @@ -24,6 +24,7 @@ mod iggy_partitions; mod journal; mod log; mod messages_writer; +mod offset_storage; mod segment; mod types; diff --git a/core/partitions/src/offset_storage.rs b/core/partitions/src/offset_storage.rs new file mode 100644 index 0000000000..dd98266293 --- /dev/null +++ b/core/partitions/src/offset_storage.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use compio::{ + fs::{OpenOptions, create_dir_all, remove_file}, + io::AsyncWriteAtExt, +}; +use iggy_common::IggyError; +use std::path::Path; + +pub async fn persist_offset(path: &str, offset: u64, enforce_fsync: bool) -> Result<(), IggyError> { + if let Some(parent) = Path::new(path).parent() + && !parent.exists() + { + create_dir_all(parent).await.map_err(|_| { + IggyError::CannotCreateConsumerOffsetsDirectory(parent.display().to_string()) + })?; + } + + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .await + .map_err(|_| IggyError::CannotOpenConsumerOffsetsFile(path.to_owned()))?; + let buf = offset.to_le_bytes(); + file.write_all_at(buf, 0) + .await + .0 + .map_err(|_| IggyError::CannotWriteToFile)?; + + if enforce_fsync { + file.sync_data() + .await + .map_err(|_| IggyError::CannotWriteToFile)?; + } + + Ok(()) +} + +pub async fn delete_persisted_offset(path: &str) -> Result<(), IggyError> { + if !Path::new(path).exists() { + return Ok(()); + } + + remove_file(path) + .await + .map_err(|_| IggyError::CannotDeleteConsumerOffsetFile(path.to_owned())) +} diff --git a/core/partitions/src/types.rs b/core/partitions/src/types.rs index dbaf57a595..178922755f 100644 --- a/core/partitions/src/types.rs +++ b/core/partitions/src/types.rs @@ -219,6 +219,16 @@ pub struct PartitionsConfig { } impl PartitionsConfig { + #[must_use] + pub fn get_partition_path( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> String { + format!("/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}",) + } + /// Constructs the file path for segment messages. /// /// TODO: This is a stub waiting for completion of issue to move server config @@ -233,7 +243,8 @@ impl PartitionsConfig { start_offset: u64, ) -> String { format!( - "/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}/{start_offset:0>20}.log", + "{}/{start_offset:0>20}.log", + self.get_partition_path(stream_id, topic_id, partition_id) ) } @@ -251,7 +262,47 @@ impl PartitionsConfig { start_offset: u64, ) -> String { format!( - "/tmp/iggy_stub/streams/{stream_id}/topics/{topic_id}/partitions/{partition_id}/{start_offset:0>20}.index", + "{}/{start_offset:0>20}.index", + self.get_partition_path(stream_id, topic_id, partition_id) + ) + } + + #[must_use] + pub fn get_offsets_path( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> String { + format!( + "{}/offsets", + self.get_partition_path(stream_id, topic_id, partition_id) + ) + } + + #[must_use] + pub fn get_consumer_offsets_path( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> String { + format!( + "{}/consumers", + self.get_offsets_path(stream_id, topic_id, partition_id) + ) + } + + #[must_use] + pub fn get_consumer_group_offsets_path( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> String { + format!( + "{}/groups", + self.get_offsets_path(stream_id, topic_id, partition_id) ) } } diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs index 3e17f4ba2b..acb8655a39 100644 --- a/core/shard/src/lib.rs +++ b/core/shard/src/lib.rs @@ -18,25 +18,57 @@ mod router; pub mod shards_table; -use consensus::{MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, VsrConsensus}; +use consensus::{LocalPipeline, MuxPlane, PartitionsHandle, Plane, VsrConsensus}; use iggy_binary_protocol::{ GenericHeader, Message, MessageBag, PrepareHeader, PrepareOkHeader, RequestHeader, }; -use iggy_common::sharding::IggyNamespace; use iggy_common::variadic; +use iggy_common::{PartitionStats, sharding::IggyNamespace}; use journal::{Journal, JournalHandle}; use message_bus::MessageBus; use metadata::IggyMetadata; use metadata::stm::StateMachine; -use partitions::IggyPartitions; +use partitions::{IggyPartition, IggyPartitions}; use shards_table::ShardsTable; +use std::sync::Arc; -pub type ShardPlane = MuxPlane< - variadic!( - IggyMetadata, J, S, M>, - IggyPartitions> - ), ->; +pub type ShardPlane = + MuxPlane, J, S, M>, IggyPartitions)>; + +pub struct ShardIdentity { + pub id: u16, + pub name: String, +} + +impl ShardIdentity { + #[must_use] + pub const fn new(id: u16, name: String) -> Self { + Self { id, name } + } +} + +pub struct PartitionConsensusConfig +where + B: MessageBus, +{ + pub cluster_id: u128, + pub replica_count: u8, + pub bus: B, +} + +impl PartitionConsensusConfig +where + B: MessageBus, +{ + #[must_use] + pub const fn new(cluster_id: u128, replica_count: u8, bus: B) -> Self { + Self { + cluster_id, + replica_count, + bus, + } + } +} /// Bounded mpsc channel sender (blocking send). pub type Sender = crossfire::MTx>; @@ -108,6 +140,8 @@ where /// Partition namespace -> owning shard lookup. shards_table: T, + + partition_consensus: PartitionConsensusConfig, } impl IggyShard @@ -121,16 +155,17 @@ where /// * `inbox` - the receiver that this shard drains in its message pump. /// * `shards_table` - namespace -> shard routing table. #[must_use] - pub const fn new( - id: u16, - name: String, + pub fn new( + identity: ShardIdentity, metadata: IggyMetadata, J, S, M>, - partitions: IggyPartitions>, + partitions: IggyPartitions, senders: Vec>>, inbox: Receiver>, shards_table: T, + partition_consensus: PartitionConsensusConfig, ) -> Self { let plane = MuxPlane::new(variadic!(metadata, partitions)); + let ShardIdentity { id, name } = identity; Self { id, name, @@ -138,6 +173,7 @@ where senders, inbox, shards_table, + partition_consensus, } } @@ -147,17 +183,18 @@ where /// via [`on_message`](Self::on_message) instead of through an inbox channel. #[must_use] pub fn without_inbox( - id: u16, - name: String, + identity: ShardIdentity, metadata: IggyMetadata, J, S, M>, - partitions: IggyPartitions>, + partitions: IggyPartitions, shards_table: T, + partition_consensus: PartitionConsensusConfig, ) -> Self { // TODO: previously we used unbounded channel with flume, // but this is not possible with crossfire without mangling types due to Flavor trait in crossfire. // This needs to be revisited in the future. let (_tx, inbox) = channel(1); let plane = MuxPlane::new(variadic!(metadata, partitions)); + let ShardIdentity { id, name } = identity; Self { id, name, @@ -165,6 +202,7 @@ where senders: Vec::new(), inbox, shards_table, + partition_consensus, } } @@ -311,31 +349,64 @@ where } } - if let Some(consensus) = planes.1.0.consensus() { - consensus.drain_loopback_into(buf); - let count = buf.len(); - total += count; - for msg in buf.drain(..) { - let typed: Message = msg - .try_into_typed() - .expect("loopback queue must only contain PrepareOk messages"); - planes.1.0.on_ack(typed).await; - } + let namespaces: Vec<_> = planes.1.0.namespaces().copied().collect(); + for namespace in namespaces { + let partition = planes + .1 + .0 + .get_by_ns(&namespace) + .expect("partition namespace must resolve during loopback drain"); + partition.consensus().drain_loopback_into(buf); + } + let count = buf.len(); + total += count; + for msg in buf.drain(..) { + let typed: Message = msg + .try_into_typed() + .expect("loopback queue must only contain PrepareOk messages"); + planes.1.0.on_ack(typed).await; } total } + /// Initializes a partition and its dedicated consensus instance on this shard. + /// + /// # Panics + /// Panics if the shard id does not fit in `u8`, which is currently required + /// by the partition consensus replica id. pub fn init_partition(&mut self, namespace: IggyNamespace) where B: MessageBus< Replica = u8, Data = iggy_binary_protocol::Message, Client = u128, - >, + > + Clone, { let partitions = self.plane.partitions_mut(); - partitions.init_partition_in_memory(namespace); - partitions.register_namespace_in_pipeline(namespace.inner()); + if partitions.contains(&namespace) { + return; + } + + let replica_id = + u8::try_from(self.id).expect("shard id must fit in u8 for partition consensus"); + let consensus = VsrConsensus::new( + self.partition_consensus.cluster_id, + replica_id, + self.partition_consensus.replica_count, + namespace.inner(), + self.partition_consensus.bus.clone(), + LocalPipeline::new(), + ); + consensus.init(); + + let stats = Arc::new(PartitionStats::default()); + let partition = IggyPartition::with_in_memory_storage( + stats, + consensus, + partitions.config().segment_size, + partitions.config().enforce_fsync, + ); + partitions.insert(namespace, partition); } } diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs index 4d23affff3..4975cee61c 100644 --- a/core/simulator/src/client.rs +++ b/core/simulator/src/client.rs @@ -116,6 +116,19 @@ impl SimClient { self.build_request_with_namespace(Operation::StoreConsumerOffset, &payload, namespace) } + pub fn delete_consumer_offset( + &self, + namespace: IggyNamespace, + consumer_kind: u8, + consumer_id: u32, + ) -> Message { + let mut payload = Vec::with_capacity(5); + payload.push(consumer_kind); + payload.extend_from_slice(&consumer_id.to_le_bytes()); + + self.build_request_with_namespace(Operation::DeleteConsumerOffset, &payload, namespace) + } + #[allow(clippy::cast_possible_truncation)] fn build_request_with_namespace( &self, diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs index 730e0af185..0b1610b362 100644 --- a/core/simulator/src/replica.rs +++ b/core/simulator/src/replica.rs @@ -17,7 +17,7 @@ use crate::bus::{SharedSimOutbox, SimOutbox}; use crate::deps::{MemStorage, SimJournal, SimMuxStateMachine, SimSnapshot}; -use consensus::{LocalPipeline, NamespacedPipeline, VsrConsensus}; +use consensus::{LocalPipeline, VsrConsensus}; use iggy_common::IggyByteSize; use iggy_common::sharding::ShardId; use iggy_common::variadic; @@ -68,20 +68,17 @@ pub fn new_replica(id: u8, name: String, bus: &Arc, replica_count: u8 segment_size: IggyByteSize::from(1024 * 1024 * 1024), }; - let mut partitions = IggyPartitions::new(ShardId::new(u16::from(id)), partitions_config); + let partitions = IggyPartitions::new(ShardId::new(u16::from(id)), partitions_config); - // TODO: namespace=0 collides with metadata consensus. Safe for now because the simulator - // routes by Operation type, but a shared view change bus would produce namespace collisions. - let partition_consensus = VsrConsensus::new( - CLUSTER_ID, - id, - replica_count, - 0, - SharedSimOutbox(Arc::clone(bus)), - NamespacedPipeline::new(), - ); - partition_consensus.init(); - partitions.set_consensus(partition_consensus); - - shard::IggyShard::without_inbox(u16::from(id), name, metadata, partitions, ()) + shard::IggyShard::without_inbox( + shard::ShardIdentity::new(u16::from(id), name), + metadata, + partitions, + (), + shard::PartitionConsensusConfig::new( + CLUSTER_ID, + replica_count, + SharedSimOutbox(Arc::clone(bus)), + ), + ) }