Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions core/binary_protocol/src/consensus/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub enum Operation {
CreatePartitions = 136,
DeletePartitions = 137,
// TODO: DeleteSegments is a partition operation (is_partition() == true) but its
// discriminant sits in the metadata range (128-147). Should be moved to 162 once
// discriminant sits in the metadata range (128-147). Should be moved to 163 once
// iggy_common's Operation enum is removed and wire compat is no longer a concern.
DeleteSegments = 138,
CreateConsumerGroup = 139,
Expand All @@ -53,6 +53,7 @@ pub enum Operation {
// Partition operations (routed by namespace)
SendMessages = 160,
StoreConsumerOffset = 161,
DeleteConsumerOffset = 162,
}

impl Operation {
Expand Down Expand Up @@ -90,7 +91,10 @@ impl Operation {
pub const fn is_partition(&self) -> bool {
matches!(
self,
Self::SendMessages | Self::StoreConsumerOffset | Self::DeleteSegments
Self::SendMessages
| Self::StoreConsumerOffset
| Self::DeleteConsumerOffset
| Self::DeleteSegments
)
}

Expand Down Expand Up @@ -122,7 +126,8 @@ impl Operation {
| Self::CreatePersonalAccessToken
| Self::DeletePersonalAccessToken
| Self::SendMessages
| Self::StoreConsumerOffset => match crate::dispatch::lookup_by_operation(*self) {
| Self::StoreConsumerOffset
| Self::DeleteConsumerOffset => match crate::dispatch::lookup_by_operation(*self) {
Some(meta) => Some(meta.code),
None => None,
},
Expand Down Expand Up @@ -170,6 +175,7 @@ mod tests {
Operation::DeletePersonalAccessToken,
Operation::SendMessages,
Operation::StoreConsumerOffset,
Operation::DeleteConsumerOffset,
];
for op in ops {
let code = op
Expand Down Expand Up @@ -202,5 +208,6 @@ mod tests {
assert!(Operation::SendMessages.is_partition());
assert!(!Operation::SendMessages.is_metadata());
assert!(Operation::DeleteSegments.is_partition());
assert!(Operation::DeleteConsumerOffset.is_partition());
}
}
8 changes: 7 additions & 1 deletion core/binary_protocol/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[
"consumer_offset.store",
Operation::StoreConsumerOffset,
),
CommandMeta::non_replicated(DELETE_CONSUMER_OFFSET_CODE, "consumer_offset.delete"),
CommandMeta::replicated(
DELETE_CONSUMER_OFFSET_CODE,
"consumer_offset.delete",
Operation::DeleteConsumerOffset,
),
// Streams
CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"),
CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"),
Expand Down Expand Up @@ -260,6 +264,7 @@ pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta>
Operation::DeletePersonalAccessToken => 18,
Operation::SendMessages => 21,
Operation::StoreConsumerOffset => 24,
Operation::DeleteConsumerOffset => 25,
Operation::Reserved => return None,
};
Some(&COMMAND_TABLE[idx])
Expand Down Expand Up @@ -378,6 +383,7 @@ mod tests {
Operation::DeletePersonalAccessToken,
Operation::SendMessages,
Operation::StoreConsumerOffset,
Operation::DeleteConsumerOffset,
];
for op in replicated_ops {
let meta = lookup_by_operation(op)
Expand Down
2 changes: 0 additions & 2 deletions core/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ mod impls;
pub use impls::*;
mod plane_mux;
pub use plane_mux::*;
mod namespaced_pipeline;
pub use namespaced_pipeline::*;
mod plane_helpers;
pub use plane_helpers::*;
mod observability;
Expand Down
Loading
Loading