From 03a3d5e0ede95955d49dd56e37bab76fccc49138 Mon Sep 17 00:00:00 2001 From: MrCroxx Date: Thu, 26 Feb 2026 17:30:46 +0800 Subject: [PATCH] stash, todo: rebase me Signed-off-by: MrCroxx --- foyer-storage/src/iov2/bytes.rs | 352 +++++++++++++++++ foyer-storage/src/iov2/device/file.rs | 205 ++++++++++ foyer-storage/src/iov2/device/fs.rs | 124 ++++++ foyer-storage/src/iov2/device/mod.rs | 33 ++ foyer-storage/src/iov2/device/utils.rs | 64 ++++ foyer-storage/src/iov2/engine/mod.rs | 157 ++++++++ foyer-storage/src/iov2/engine/monitor.rs | 77 ++++ foyer-storage/src/iov2/engine/noop.rs | 40 ++ foyer-storage/src/iov2/engine/psync.rs | 276 ++++++++++++++ foyer-storage/src/iov2/engine/uring.rs | 462 +++++++++++++++++++++++ foyer-storage/src/iov2/mod.rs | 19 + foyer-storage/src/lib.rs | 1 + io.md | 17 + 13 files changed, 1827 insertions(+) create mode 100644 foyer-storage/src/iov2/bytes.rs create mode 100644 foyer-storage/src/iov2/device/file.rs create mode 100644 foyer-storage/src/iov2/device/fs.rs create mode 100644 foyer-storage/src/iov2/device/mod.rs create mode 100644 foyer-storage/src/iov2/device/utils.rs create mode 100644 foyer-storage/src/iov2/engine/mod.rs create mode 100644 foyer-storage/src/iov2/engine/monitor.rs create mode 100644 foyer-storage/src/iov2/engine/noop.rs create mode 100644 foyer-storage/src/iov2/engine/psync.rs create mode 100644 foyer-storage/src/iov2/engine/uring.rs create mode 100644 foyer-storage/src/iov2/mod.rs create mode 100644 io.md diff --git a/foyer-storage/src/iov2/bytes.rs b/foyer-storage/src/iov2/bytes.rs new file mode 100644 index 00000000..0aeada8b --- /dev/null +++ b/foyer-storage/src/iov2/bytes.rs @@ -0,0 +1,352 @@ +// Copyright 2026 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + any::Any, + fmt::Debug, + ops::{Deref, DerefMut, RangeBounds}, + ptr::NonNull, + slice::{from_raw_parts, from_raw_parts_mut}, + sync::Arc, +}; + +use allocator_api2::alloc::{Allocator, Global, Layout, handle_alloc_error}; +use foyer_common::bits; + +use super::PAGE; + +pub trait IoB: Deref + Send + Sync + 'static + Debug + Any { + fn as_raw_parts(&self) -> (*mut u8, usize); +} + +/// 4K-aligned immutable buf for direct I/O. +pub trait IoBuf: Deref + IoB { + // TODO(MrCroxx): Remove this after bump MSRV to 1.86.0+ + // https://blog.rust-lang.org/2025/04/03/Rust-1.86.0/ + fn into_iob(self: Box) -> Box; +} + +/// 4K-aligned mutable buf for direct I/O. +pub trait IoBufMut: DerefMut + IoB { + // TODO(MrCroxx): Remove this after bump MSRV to 1.86.0+ + // https://blog.rust-lang.org/2025/04/03/Rust-1.86.0/ + fn into_iob(self: Box) -> Box; +} + +/// 4K-aligned raw bytes. +#[derive(Debug)] +pub struct Raw { + ptr: *mut u8, + cap: usize, +} + +unsafe impl Send for Raw {} +unsafe impl Sync for Raw {} + +impl Raw { + /// Allocate an 4K-aligned [`Raw`] with **AT LEAST** `capacity` bytes. + /// + /// # Safety + /// + /// The returned buffer contains uninitialized memory. Callers must initialize + /// the memory before reading from it to avoid undefined behavior. + pub fn new(capacity: usize) -> Self { + let capacity = bits::align_up(PAGE, capacity); + let layout = unsafe { Layout::from_size_align_unchecked(capacity, PAGE) }; + let mut nonnull = match Global.allocate(layout) { + Ok(nonnull) => nonnull, + Err(_) => handle_alloc_error(layout), + }; + let slice = unsafe { nonnull.as_mut() }; + let ptr = slice.as_mut_ptr(); + let cap = slice.len(); + Self { ptr, cap } + } + + /// Consume [`Raw`] and get the raw pointer and the capacity. + /// + /// # Safety + /// + /// [`Raw::from_raw_parts`] must be called later. Otherwise the buffer memory will leak. + pub fn into_raw_parts(self) -> (*mut u8, usize) { + let res = (self.ptr, self.cap); + std::mem::forget(self); + res + } + + /// Construct [`Raw`] with the raw pointer and the capacity. + /// + /// # Safety + /// + /// The `ptr` and `cap` must be returned by [`Raw::into_raw_parts`]. + pub unsafe fn from_raw_parts(ptr: *mut u8, cap: usize) -> Self { + Self { ptr, cap } + } +} + +impl Clone for Raw { + fn clone(&self) -> Self { + let mut buf = Raw::new(self.cap); + assert_eq!(buf.cap, self.cap); + buf.copy_from_slice(self); + buf + } +} + +impl Drop for Raw { + fn drop(&mut self) { + let layout = unsafe { Layout::from_size_align_unchecked(self.cap, PAGE) }; + unsafe { Global.deallocate(NonNull::new_unchecked(self.ptr), layout) }; + } +} + +impl Deref for Raw { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + unsafe { from_raw_parts(self.ptr, self.cap) } + } +} + +impl DerefMut for Raw { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { from_raw_parts_mut(self.ptr, self.cap) } + } +} + +impl AsRef<[u8]> for Raw { + fn as_ref(&self) -> &[u8] { + self + } +} + +impl AsMut<[u8]> for Raw { + fn as_mut(&mut self) -> &mut [u8] { + &mut *self + } +} + +impl PartialEq for Raw { + fn eq(&self, other: &Self) -> bool { + self.as_ref() == other.as_ref() + } +} + +impl Eq for Raw {} + +impl IoB for Raw { + fn as_raw_parts(&self) -> (*mut u8, usize) { + (self.ptr, self.cap) + } +} + +impl IoBuf for Raw { + fn into_iob(self: Box) -> Box { + self + } +} + +impl IoBufMut for Raw { + fn into_iob(self: Box) -> Box { + self + } +} + +/// A 4K-aligned slice on the io buffer that can be shared. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct IoSlice { + raw: Arc, + start: usize, + end: usize, +} + +impl From for IoSlice { + fn from(value: Raw) -> Self { + let len = value.len(); + Self { + raw: Arc::new(value), + start: 0, + end: len, + } + } +} + +impl From for IoSlice { + fn from(value: IoSliceMut) -> Self { + let len = value.len(); + Self { + raw: Arc::new(value.raw), + start: 0, + end: len, + } + } +} + +impl Deref for IoSlice { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.raw[self.start..self.end] + } +} + +impl AsRef<[u8]> for IoSlice { + fn as_ref(&self) -> &[u8] { + self + } +} + +impl IoB for IoSlice { + fn as_raw_parts(&self) -> (*mut u8, usize) { + let ptr = unsafe { self.raw.ptr.add(self.start) }; + let len = self.end - self.start; + (ptr, len) + } +} + +impl IoBuf for IoSlice { + fn into_iob(self: Box) -> Box { + self + } +} + +impl IoSlice { + pub fn slice(&self, range: impl RangeBounds) -> Self { + let s = match range.start_bound() { + std::ops::Bound::Included(i) => self.start + *i, + std::ops::Bound::Excluded(_) => unreachable!(), + std::ops::Bound::Unbounded => self.start, + }; + + let e = match range.end_bound() { + std::ops::Bound::Included(i) => self.start + *i + 1, + std::ops::Bound::Excluded(i) => self.start + *i, + std::ops::Bound::Unbounded => self.end, + }; + + if s > e { + panic!("slice index starts at {s} but ends at {e}"); + } + + bits::assert_aligned(PAGE, s); + bits::assert_aligned(PAGE, e); + + Self { + raw: self.raw.clone(), + start: s, + end: e, + } + } + + /// Convert into [`IoSliceMut`], if the [`IoSlice`] has exactly one reference. + pub fn try_into_io_slice_mut(self) -> Option { + let raw = Arc::into_inner(self.raw)?; + Some(IoSliceMut { raw }) + } +} + +/// A 4K-aligned mutable slice. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct IoSliceMut { + raw: Raw, +} + +impl IoSliceMut { + pub fn new(capacity: usize) -> Self { + let raw = Raw::new(capacity); + Self { raw } + } + + pub fn len(&self) -> usize { + self.raw.cap + } +} + +impl Deref for IoSliceMut { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.raw + } +} + +impl DerefMut for IoSliceMut { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.raw + } +} + +impl AsRef<[u8]> for IoSliceMut { + fn as_ref(&self) -> &[u8] { + self + } +} + +impl AsMut<[u8]> for IoSliceMut { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.raw + } +} + +impl IoB for IoSliceMut { + fn as_raw_parts(&self) -> (*mut u8, usize) { + (self.raw.ptr, self.raw.cap) + } +} +impl IoBuf for IoSliceMut { + fn into_iob(self: Box) -> Box { + self + } +} + +impl IoBufMut for IoSliceMut { + fn into_iob(self: Box) -> Box { + self + } +} + +impl IoSliceMut { + pub fn into_io_slice(self) -> IoSlice { + let start = 0; + let end = self.raw.cap; + let raw = Arc::new(self.raw); + IoSlice { raw, start, end } + } +} + +impl dyn IoB { + /// Convert into concrete type [`Box`] if the underlying type fits. + pub fn try_into_io_slice(self: Box) -> Option> { + let any: Box = self; + any.downcast::().ok() + } + + /// Convert into concrete type [`Box`] if the underlying type fits. + pub fn try_into_io_slice_mut(self: Box) -> Option> { + let any: Box = self; + any.downcast::().ok() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dyn() { + let raw = Raw::new(4096); + let _: Box = Box::new(raw.clone()); + let _: Box = Box::new(raw.clone()); + } +} diff --git a/foyer-storage/src/iov2/device/file.rs b/foyer-storage/src/iov2/device/file.rs new file mode 100644 index 00000000..85a804dc --- /dev/null +++ b/foyer-storage/src/iov2/device/file.rs @@ -0,0 +1,205 @@ +// Copyright 2026 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fs::{File, OpenOptions, create_dir_all}, + path::{Path, PathBuf}, + sync::{Arc, RwLock}, +}; + +use foyer_common::error::{Error, Result}; +use fs4::free_space; + +use crate::{ + RawFile, Statistics, Throttle, + iov2::{ + PAGE, + device::{Device, DeviceBuilder}, + }, +}; + +/// Builder for a file-based device that manages a single file or a raw block device. +#[derive(Debug)] +pub struct FileDeviceBuilder { + path: PathBuf, + capacity: Option, + throttle: Throttle, + #[cfg(target_os = "linux")] + direct: bool, +} + +impl FileDeviceBuilder { + /// Use the given file path as the file device path. + pub fn new(path: impl AsRef) -> Self { + Self { + path: path.as_ref().into(), + capacity: None, + throttle: Throttle::default(), + #[cfg(target_os = "linux")] + direct: false, + } + } + + /// Set the capacity of the file device. + /// + /// The given capacity may be modified on build for alignment. + /// + /// The file device uses 80% of the current free disk space by default. + pub fn with_capacity(mut self, capacity: usize) -> Self { + self.capacity = Some(capacity); + self + } + + /// Set the throttle of the file device. + pub fn with_throttle(mut self, throttle: Throttle) -> Self { + self.throttle = throttle; + self + } + + /// Set whether the file device should use direct I/O. + #[cfg(target_os = "linux")] + pub fn with_direct(mut self, direct: bool) -> Self { + self.direct = direct; + self + } +} + +impl DeviceBuilder for FileDeviceBuilder { + fn build(self) -> Result> { + // Normalize configurations. + + let align_v = |value: usize, align: usize| value - (value % align); + + let capacity = self.capacity.unwrap_or_else(|| { + // Try to get the capacity if `path` refer to a raw block device. + #[cfg(unix)] + if let Ok(metadata) = std::fs::metadata(&self.path) { + let file_type = metadata.file_type(); + + use std::os::unix::fs::FileTypeExt; + if file_type.is_block_device() { + return super::utils::get_dev_capacity(&self.path).unwrap(); + } + } + + // Create an empty directory if needed before to get free space. + let dir = self.path.parent().expect("path must point to a file").to_path_buf(); + create_dir_all(&dir).unwrap(); + free_space(&dir).unwrap() as usize / 10 * 8 + }); + let size = align_v(capacity, PAGE); + + println!("==========> {size}"); + + // Build device. + + let mut opts = OpenOptions::new(); + opts.create(true).write(true).read(true); + #[cfg(target_os = "linux")] + if self.direct { + use std::os::unix::fs::OpenOptionsExt; + opts.custom_flags(libc::O_DIRECT | libc::O_NOATIME); + } + + let file = opts.open(&self.path).map_err(Error::io_error)?; + + if file.metadata().unwrap().is_file() { + tracing::warn!( + "{} {} {}", + "It seems a `DirectFileDevice` is used within a normal file system, which is inefficient.", + "Please use `DirectFileDevice` directly on a raw block device.", + "Or use `DirectFsDevice` within a normal file system.", + ); + file.set_len(size as _).map_err(Error::io_error)?; + } + let file = Arc::new(file); + + let statistics = Arc::new(Statistics::new(self.throttle)); + + let device = FileDevice { + file, + offset: 0, + size, + statistics, + }; + let device: Arc = Arc::new(device); + Ok(device) + } +} + +/// A device upon a single file or a raw block device. +#[derive(Debug)] +pub struct FileDevice { + file: Arc, + offset: u64, + size: usize, + statistics: Arc, +} + +impl FileDevice { + /// Resolve the raw file handle and offset for the given position. + pub fn resolve(&self, pos: u64) -> (RawFile, u64) { + #[cfg(any(target_family = "unix", target_family = "wasm"))] + let raw = { + use std::os::fd::AsRawFd; + RawFile(self.file.as_raw_fd()) + }; + + #[cfg(target_family = "windows")] + let raw = { + use std::os::windows::io::AsRawHandle; + RawFile(self.file.as_raw_handle()) + }; + + let pos = self.offset + pos; + + (raw, pos) + } + + /// Split the file device into two at the given position. + pub fn split(self: &Arc, pos: u64) -> (Arc, Arc) { + let left = Arc::new(Self { + file: self.file.clone(), + offset: self.offset, + size: pos as usize, + statistics: self.statistics.clone(), + }); + + let right = Arc::new(Self { + file: self.file.clone(), + offset: self.offset + pos, + size: self.size - pos as usize, + statistics: self.statistics.clone(), + }); + + (left, right) + } + + /// Get the offset of the file device to the underlying file. + pub fn offset(&self) -> u64 { + self.offset + } + + /// Get the size of the file device. + pub fn size(&self) -> usize { + self.size + } + + /// Get the statistics of the file device. + pub fn statistics(&self) -> &Arc { + &self.statistics + } +} + +impl Device for FileDevice {} diff --git a/foyer-storage/src/iov2/device/fs.rs b/foyer-storage/src/iov2/device/fs.rs new file mode 100644 index 00000000..f70502a1 --- /dev/null +++ b/foyer-storage/src/iov2/device/fs.rs @@ -0,0 +1,124 @@ +// Copyright 2026 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fs::{File, OpenOptions, create_dir_all}, + path::{Path, PathBuf}, + sync::{Arc, RwLock}, +}; + +use foyer_common::error::{Error, Result}; +use fs4::free_space; + +use crate::{ + RawFile, Statistics, Throttle, + iov2::{ + PAGE, + device::{Device, DeviceBuilder}, + }, +}; + +/// Builder for a filesystem-based device that manages files in a directory. +#[derive(Debug)] +pub struct FsDeviceBuilder { + dir: PathBuf, + capacity: Option, + throttle: Throttle, + #[cfg(target_os = "linux")] + direct: bool, +} + +impl FsDeviceBuilder { + /// Use the given file path as the file device path. + pub fn new(dir: impl AsRef) -> Self { + Self { + dir: dir.as_ref().into(), + capacity: None, + throttle: Throttle::default(), + #[cfg(target_os = "linux")] + direct: false, + } + } + + /// Set the capacity of the file device. + /// + /// The given capacity may be modified on build for alignment. + /// + /// The file device uses 80% of the current free disk space by default. + pub fn with_capacity(mut self, capacity: usize) -> Self { + self.capacity = Some(capacity); + self + } + + /// Set the throttle of the file device. + pub fn with_throttle(mut self, throttle: Throttle) -> Self { + self.throttle = throttle; + self + } + + /// Set whether the file device should use direct I/O. + #[cfg(target_os = "linux")] + pub fn with_direct(mut self, direct: bool) -> Self { + self.direct = direct; + self + } +} + +impl DeviceBuilder for FsDeviceBuilder { + fn build(self) -> Result> { + // Normalize configurations. + + let align_v = |value: usize, align: usize| value - value % align; + + let capacity = self.capacity.unwrap_or({ + // Create an empty directory before to get free space. + create_dir_all(&self.dir).unwrap(); + free_space(&self.dir).unwrap() as usize / 10 * 8 + }); + let capacity = align_v(capacity, PAGE); + + let statistics = Arc::new(Statistics::new(self.throttle)); + + // Build device. + + if !self.dir.exists() { + create_dir_all(&self.dir).map_err(Error::io_error)?; + } + + let device = FsDevice { + capacity, + statistics, + dir: self.dir, + #[cfg(target_os = "linux")] + direct: self.direct, + }; + let device: Arc = Arc::new(device); + Ok(device) + } +} + +#[derive(Debug)] +pub struct FsDevice { + capacity: usize, + statistics: Arc, + dir: PathBuf, + #[cfg(target_os = "linux")] + direct: bool, +} + +impl FsDevice { + // pub fn +} + +impl Device for FsDevice {} diff --git a/foyer-storage/src/iov2/device/mod.rs b/foyer-storage/src/iov2/device/mod.rs new file mode 100644 index 00000000..37259e0d --- /dev/null +++ b/foyer-storage/src/iov2/device/mod.rs @@ -0,0 +1,33 @@ +// Copyright 2026 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use foyer_common::error::Result; + +use std::{any::Any, fmt::Debug, sync::Arc}; + +/// Device builder trait. +pub trait DeviceBuilder: Send + Sync + 'static + Debug { + /// Build a device from the given configuration. + fn build(self) -> Result>; +} + +/// Device trait. +/// +/// Because different device types may share almost no common properties, +/// the device only serves as base of `Any` trait for downcasting. +pub trait Device: Debug + Send + Sync + Any + 'static {} + +mod file; +mod fs; +mod utils; diff --git a/foyer-storage/src/iov2/device/utils.rs b/foyer-storage/src/iov2/device/utils.rs new file mode 100644 index 00000000..cacadb9a --- /dev/null +++ b/foyer-storage/src/iov2/device/utils.rs @@ -0,0 +1,64 @@ +// Copyright 2026 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(unix)] +pub fn get_dev_capacity(path: impl AsRef) -> foyer_common::error::Result { + use foyer_common::error::Error; + + const BLKGETSIZE64: u64 = 0x80081272; + const DIOCGMEDIASIZE: u64 = 0x40086481; + const DKIOCGETBLOCKSIZE: u64 = 0x40046418; + const DKIOCGETBLOCKCOUNT: u64 = 0x40046419; + + use std::{fs::File, os::fd::AsRawFd}; + + let file = File::open(path.as_ref())?; + let fd = file.as_raw_fd(); + + if cfg!(target_os = "linux") { + let mut size: u64 = 0; + let res = unsafe { libc::ioctl(fd, BLKGETSIZE64, &mut size) }; + if res != 0 { + return Err(std::io::Error::from_raw_os_error(res).into()); + } + Ok(size as usize) + } else if cfg!(target_os = "freebsd") { + let mut size: u32 = 0; + let res = unsafe { libc::ioctl(fd, DIOCGMEDIASIZE, &mut size) }; + if res != 0 { + return Err(std::io::Error::from_raw_os_error(res).into()); + } + Ok(size as usize) + } else if cfg!(target_os = "macos") { + let mut block_size: u64 = 0; + let mut block_count: u64 = 0; + let res = unsafe { libc::ioctl(fd, DKIOCGETBLOCKSIZE, &mut block_size) }; + if res != 0 { + return Err(std::io::Error::from_raw_os_error(res).into()); + } + let res = unsafe { libc::ioctl(fd, DKIOCGETBLOCKCOUNT, &mut block_count) }; + if res != 0 { + return Err(std::io::Error::from_raw_os_error(res).into()); + } + let size = block_size * block_count; + Ok(size as usize) + } else { + use foyer_common::error::ErrorKind; + + Err(Error::new( + ErrorKind::Unsupported, + "get_dev_capacity() is not supported on this platform".to_string(), + )) + } +} diff --git a/foyer-storage/src/iov2/engine/mod.rs b/foyer-storage/src/iov2/engine/mod.rs new file mode 100644 index 00000000..d67494f3 --- /dev/null +++ b/foyer-storage/src/iov2/engine/mod.rs @@ -0,0 +1,157 @@ +// Copyright 2026 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod monitor; +pub mod noop; +pub mod psync; + +#[cfg(target_os = "linux")] +pub mod uring; + +use crate::{Statistics, iov2::bytes::IoB}; + +#[cfg(feature = "tracing")] +use fastrace::{future::InSpan, prelude::*}; +use foyer_common::{error::Result, spawn::Spawner}; +use futures_core::future::BoxFuture; +use pin_project::pin_project; + +use std::{ + any::Any, + fmt::Debug, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll, ready}, +}; + +/// Raw os file resource. +/// +/// Use `fd` with unix and wasm, use `handle` with windows. +#[cfg(any(target_family = "unix", target_family = "wasm"))] +#[derive(Debug)] +pub struct RawFile(pub std::os::fd::RawFd); + +/// Raw os file resource. +/// +/// Use `fd` with unix and wasm, use `handle` with windows. +#[cfg(target_family = "windows")] +#[derive(Debug)] +pub struct RawFile(pub std::os::windows::io::RawHandle); + +unsafe impl Send for RawFile {} +unsafe impl Sync for RawFile {} + +#[derive(Debug)] +pub enum IoTarget { + File { raw: RawFile, offset: u64 }, + Object { name: String }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum IoOp { + Read, + Write, +} + +#[derive(Debug)] +pub struct IoUnit { + pub target: IoTarget, + pub op: IoOp, + pub buf: Box, + + pub statistics: Arc, +} + +pub trait IoEngine: Send + Sync + 'static + Debug + Any { + fn submit(&self, unit: IoUnit) -> IoHandle; +} + +#[cfg(not(feature = "tracing"))] +type IoHandleInner = BoxFuture<'static, (Box, Result<()>)>; +#[cfg(feature = "tracing")] +type IoHandleInner = InSpan, Result<()>)>>; + +/// A detached I/O handle that can be polled for completion. +#[pin_project] +pub struct IoHandle { + #[pin] + inner: IoHandleInner, + callback: Option>, +} + +impl Debug for IoHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IoHandle").finish() + } +} + +#[cfg(not(feature = "tracing"))] +impl From, Result<()>)>> for IoHandle { + fn from(inner: BoxFuture<'static, (Box, Result<()>)>) -> Self { + Self { inner, callback: None } + } +} + +#[cfg(feature = "tracing")] +impl From, Result<()>)>> for IoHandle { + fn from(inner: BoxFuture<'static, (Box, Result<()>)>) -> Self { + let inner = inner.in_span(Span::enter_with_local_parent("foyer::storage::io::io_handle")); + Self { inner, callback: None } + } +} + +impl IoHandle { + pub(crate) fn with_callback(mut self, callback: F) -> Self + where + F: FnOnce() + Send + 'static, + { + assert!(self.callback.is_none(), "io handle callback can only be set once"); + self.callback = Some(Box::new(callback)); + self + } +} + +impl Future for IoHandle { + type Output = (Box, Result<()>); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let res = ready!(this.inner.poll(cx)); + if let Some(callback) = this.callback.take() { + callback(); + } + Poll::Ready(res) + } +} + +/// Context for building the disk cache io engine. +pub struct IoEngineBuildContext { + /// The runtime for the disk cache engine. + pub spawner: Spawner, +} + +/// I/O engine config trait. +pub trait IoEngineConfig: Send + Sync + 'static + Debug { + /// Build an I/O engine from the given configuration. + fn build(self: Box, ctx: IoEngineBuildContext) -> BoxFuture<'static, Result>>; + + /// Box the config. + fn boxed(self) -> Box + where + Self: Sized, + { + Box::new(self) + } +} diff --git a/foyer-storage/src/iov2/engine/monitor.rs b/foyer-storage/src/iov2/engine/monitor.rs new file mode 100644 index 00000000..33b29ebe --- /dev/null +++ b/foyer-storage/src/iov2/engine/monitor.rs @@ -0,0 +1,77 @@ +// Copyright 2026 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{fmt::Debug, sync::Arc, time::Instant}; + +use foyer_common::metrics::Metrics; + +use crate::iov2::engine::{IoEngine, IoHandle, IoOp, IoUnit}; + +#[derive(Debug)] +struct Inner { + io_engine: Arc, + metrics: Arc, +} + +#[derive(Clone)] +pub struct MonitoredIoEngine { + inner: Arc, +} + +impl MonitoredIoEngine { + pub fn new(io_engine: Arc, metrics: Arc) -> Arc { + let inner = Inner { io_engine, metrics }; + Arc::new(Self { inner: Arc::new(inner) }) + } +} + +impl Debug for MonitoredIoEngine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MonitoredIoEngine") + .field("engine", &self.inner.io_engine) + .finish() + } +} + +impl IoEngine for MonitoredIoEngine { + #[cfg_attr( + feature = "tracing", + fastrace::trace(name = "foyer::storage::io::engine::monitor::submit") + )] + fn submit(&self, unit: IoUnit) -> IoHandle { + let now = Instant::now(); + + let bytes = unit.buf.len(); + let op = unit.op; + + let statistics = unit.statistics.clone(); + let metrics = self.inner.metrics.clone(); + + let handle = self.inner.io_engine.submit(unit); + handle.with_callback(move || match op { + IoOp::Read => { + statistics.record_disk_read(bytes); + metrics.storage_disk_read.increase(1); + metrics.storage_disk_read_bytes.increase(bytes as u64); + metrics.storage_disk_read_duration.record(now.elapsed().as_secs_f64()); + } + IoOp::Write => { + statistics.record_disk_write(bytes); + metrics.storage_disk_write.increase(1); + metrics.storage_disk_write_bytes.increase(bytes as u64); + metrics.storage_disk_write_duration.record(now.elapsed().as_secs_f64()); + } + }) + } +} diff --git a/foyer-storage/src/iov2/engine/noop.rs b/foyer-storage/src/iov2/engine/noop.rs new file mode 100644 index 00000000..fca67b67 --- /dev/null +++ b/foyer-storage/src/iov2/engine/noop.rs @@ -0,0 +1,40 @@ +// Copyright 2026 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use foyer_common::error::Result; +use futures_core::future::BoxFuture; +use futures_util::FutureExt; + +use crate::iov2::engine::{IoEngine, IoEngineBuildContext, IoEngineConfig, IoHandle, IoUnit}; +/// Config for a no-operation mock I/O engine. +#[derive(Debug, Default)] +pub struct NoopIoEngineConfig; + +impl IoEngineConfig for NoopIoEngineConfig { + fn build(self: Box, _: IoEngineBuildContext) -> BoxFuture<'static, Result>> { + async move { Ok(Arc::new(NoopIoEngine) as Arc) }.boxed() + } +} + +/// A mock I/O engine that does nothing. +#[derive(Debug)] +pub struct NoopIoEngine; + +impl IoEngine for NoopIoEngine { + fn submit(&self, unit: IoUnit) -> IoHandle { + async move { (unit.buf, Ok(())) }.boxed().into() + } +} diff --git a/foyer-storage/src/iov2/engine/psync.rs b/foyer-storage/src/iov2/engine/psync.rs new file mode 100644 index 00000000..2d5ae558 --- /dev/null +++ b/foyer-storage/src/iov2/engine/psync.rs @@ -0,0 +1,276 @@ +// Copyright 2026 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fmt::Debug, + fs::File, + mem::ManuallyDrop, + ops::{Deref, DerefMut}, + sync::Arc, +}; + +#[cfg(feature = "tracing")] +use fastrace::prelude::*; +use foyer_common::{ + error::{Error, Result}, + spawn::Spawner, +}; +use futures_core::future::BoxFuture; +use futures_util::FutureExt; + +use crate::iov2::{ + bytes::{IoB, IoBuf, IoBufMut, Raw}, + engine::{IoEngine, IoEngineBuildContext, IoEngineConfig, IoHandle, IoOp, IoTarget, IoUnit, RawFile}, +}; + +#[derive(Debug)] +struct FileHandle(ManuallyDrop); + +#[cfg(target_family = "windows")] +impl From for FileHandle { + fn from(raw: RawFile) -> Self { + use std::os::windows::io::FromRawHandle; + let file = unsafe { File::from_raw_handle(raw.0) }; + let file = ManuallyDrop::new(file); + Self(file) + } +} + +#[cfg(target_family = "unix")] +impl From for FileHandle { + fn from(raw: RawFile) -> Self { + use std::os::unix::io::FromRawFd; + let file = unsafe { File::from_raw_fd(raw.0) }; + let file = ManuallyDrop::new(file); + Self(file) + } +} + +impl Deref for FileHandle { + type Target = File; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for FileHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Config for synchronous I/O engine with pread(2)/pwrite(2). +#[derive(Debug)] +pub struct PsyncIoEngineConfig { + #[cfg(any(test, feature = "test_utils"))] + write_io_latency: Option>, + + #[cfg(any(test, feature = "test_utils"))] + read_io_latency: Option>, +} + +impl Default for PsyncIoEngineConfig { + fn default() -> Self { + Self::new() + } +} + +impl From for Box { + fn from(builder: PsyncIoEngineConfig) -> Self { + builder.boxed() + } +} + +impl PsyncIoEngineConfig { + /// Create a new synchronous I/O engine config with default configurations. + pub fn new() -> Self { + Self { + #[cfg(any(test, feature = "test_utils"))] + write_io_latency: None, + #[cfg(any(test, feature = "test_utils"))] + read_io_latency: None, + } + } + + /// Set the simulated additional write I/O latency for testing purposes. + #[cfg(any(test, feature = "test_utils"))] + pub fn with_write_io_latency(mut self, latency: std::ops::Range) -> Self { + self.write_io_latency = Some(latency); + self + } + + /// Set the simulated additional read I/O latency for testing purposes. + #[cfg(any(test, feature = "test_utils"))] + pub fn with_read_io_latency(mut self, latency: std::ops::Range) -> Self { + self.read_io_latency = Some(latency); + self + } +} + +impl IoEngineConfig for PsyncIoEngineConfig { + fn build(self: Box, ctx: IoEngineBuildContext) -> BoxFuture<'static, Result>> { + async move { + let engine = PsyncIoEngine { + spawner: ctx.spawner, + #[cfg(any(test, feature = "test_utils"))] + write_io_latency: None, + #[cfg(any(test, feature = "test_utils"))] + read_io_latency: None, + }; + let engine: Arc = Arc::new(engine); + Ok(engine) + } + .boxed() + } +} + +/// The synchronous I/O engine that uses pread(2)/pwrite(2) and tokio thread pool for reading and writing. +pub struct PsyncIoEngine { + spawner: Spawner, + + #[cfg(any(test, feature = "test_utils"))] + write_io_latency: Option>, + #[cfg(any(test, feature = "test_utils"))] + read_io_latency: Option>, +} + +impl Debug for PsyncIoEngine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PsyncIoEngine").finish() + } +} + +impl PsyncIoEngine { + #[cfg_attr( + feature = "tracing", + fastrace::trace(name = "foyer::storage::io::engine::psync::read") + )] + fn read(&self, buf: Box, file: FileHandle, offset: u64) -> IoHandle { + let runtime = self.spawner.clone(); + + #[cfg(feature = "tracing")] + let span = Span::enter_with_local_parent("foyer::storage::io::engine::psync::read::io"); + + #[cfg(any(test, feature = "test_utils"))] + let read_io_latency = self.read_io_latency.clone(); + + async move { + let (buf, res) = match runtime + .spawn_blocking(move || { + let (ptr, len) = buf.as_raw_parts(); + let slice = unsafe { std::slice::from_raw_parts_mut(ptr, len) }; + let res = { + #[cfg(target_family = "windows")] + { + use std::os::windows::fs::FileExt; + file.seek_read(slice, offset).map(|_| ()).map_err(Error::io_error) + } + #[cfg(target_family = "unix")] + { + use std::os::unix::fs::FileExt; + file.read_exact_at(slice, offset).map_err(Error::io_error) + } + }; + #[cfg(any(test, feature = "test_utils"))] + if let Some(lat) = read_io_latency { + std::thread::sleep(rand::random_range(lat)); + } + (buf, res) + }) + .await + { + Ok((buf, res)) => { + #[cfg(feature = "tracing")] + drop(span); + (buf, res) + } + Err(e) => return (Box::new(Raw::new(0)) as Box, Err(e)), + }; + (buf, res) + } + .boxed() + .into() + } + + #[cfg_attr( + feature = "tracing", + fastrace::trace(name = "foyer::storage::io::engine::psync::write") + )] + fn write(&self, buf: Box, file: FileHandle, offset: u64) -> IoHandle { + let runtime = self.spawner.clone(); + + #[cfg(feature = "tracing")] + let span = Span::enter_with_local_parent("foyer::storage::io::engine::psync::write::io"); + + #[cfg(any(test, feature = "test_utils"))] + let write_io_latency = self.write_io_latency.clone(); + async move { + let (buf, res) = match runtime + .spawn_blocking(move || { + let (ptr, len) = buf.as_raw_parts(); + let slice = unsafe { std::slice::from_raw_parts_mut(ptr, len) }; + let res = { + #[cfg(target_family = "windows")] + { + use std::os::windows::fs::FileExt; + file.seek_write(slice, offset).map(|_| ()).map_err(Error::io_error) + } + #[cfg(target_family = "unix")] + { + use std::os::unix::fs::FileExt; + file.write_all_at(slice, offset).map_err(Error::io_error) + } + }; + #[cfg(any(test, feature = "test_utils"))] + if let Some(lat) = write_io_latency { + std::thread::sleep(rand::random_range(lat)); + } + (buf, res) + }) + .await + { + Ok((buf, res)) => { + #[cfg(feature = "tracing")] + drop(span); + (buf, res) + } + Err(e) => return (Box::new(Raw::new(0)) as Box, Err(e)), + }; + (buf, res) + } + .boxed() + .into() + } +} + +impl IoEngine for PsyncIoEngine { + #[cfg_attr( + feature = "tracing", + fastrace::trace(name = "foyer::storage::io::engine::psync::submit") + )] + fn submit(&self, unit: IoUnit) -> IoHandle { + let IoTarget::File { raw, offset } = unit.target else { + // FIXME(MrCroxx): Refine the hint. Raise error or panic? + panic!("psync io engine only supports file io"); + }; + + let file = FileHandle::from(raw); + + match unit.op { + IoOp::Read => self.read(unit.buf, file, offset), + IoOp::Write => self.write(unit.buf, file, offset), + } + } +} diff --git a/foyer-storage/src/iov2/engine/uring.rs b/foyer-storage/src/iov2/engine/uring.rs new file mode 100644 index 00000000..8f4ba615 --- /dev/null +++ b/foyer-storage/src/iov2/engine/uring.rs @@ -0,0 +1,462 @@ +// Copyright 2026 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + fmt::Debug, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + mpsc, + }, +}; + +use core_affinity::CoreId; +#[cfg(feature = "tracing")] +use fastrace::prelude::*; +use foyer_common::error::{Error, ErrorKind, Result}; +use futures_core::future::BoxFuture; +use futures_util::FutureExt; +use io_uring::{IoUring, opcode, types::Fd}; +use mea::oneshot; + +use crate::iov2::{ + bytes::IoB, + engine::{IoEngine, IoEngineBuildContext, IoEngineConfig, IoHandle, IoOp, IoTarget, IoUnit, RawFile}, +}; + +/// Config for io_uring based I/O engine. +#[derive(Debug)] +pub struct UringIoEngineConfig { + threads: usize, + cpus: Vec, + io_depth: usize, + sqpoll: bool, + sqpoll_cpus: Vec, + sqpoll_idle: u32, + iopoll: bool, + weight: f64, + + #[cfg(any(test, feature = "test_utils"))] + write_io_latency: Option>, + #[cfg(any(test, feature = "test_utils"))] + read_io_latency: Option>, +} + +impl Default for UringIoEngineConfig { + fn default() -> Self { + Self::new() + } +} + +impl UringIoEngineConfig { + /// Create a new io_uring based I/O engine config with default configurations. + pub fn new() -> Self { + Self { + threads: 1, + cpus: vec![], + io_depth: 64, + sqpoll: false, + sqpoll_cpus: vec![], + sqpoll_idle: 10, + iopoll: false, + weight: 1.0, + #[cfg(any(test, feature = "test_utils"))] + write_io_latency: None, + #[cfg(any(test, feature = "test_utils"))] + read_io_latency: None, + } + } + + /// Set the number of threads to use for the I/O engine. + pub fn with_threads(mut self, threads: usize) -> Self { + self.threads = threads; + self + } + + /// Bind the engine threads to specific CPUs. + /// + /// The length of `cpus` must be equal to the threads. + pub fn with_cpus(mut self, cpus: Vec) -> Self { + self.cpus = cpus; + self + } + + /// Set the I/O depth for each thread. + pub fn with_io_depth(mut self, io_depth: usize) -> Self { + self.io_depth = io_depth; + self + } + + /// Enable or disable I/O polling. + /// + /// FYI: + /// + /// - [io_uring_setup(2)](https://man7.org/linux/man-pages/man2/io_uring_setup.2.html) + /// - [crate - io-uring](https://docs.rs/io-uring/latest/io_uring/struct.Builder.html#method.setup_iopoll) + /// + /// Related syscall flag: `IORING_SETUP_IOPOLL`. + /// + /// NOTE: + /// + /// - If this feature is enabled, the underlying device MUST be opened with the `O_DIRECT` flag. + /// - If this feature is enabled, the underlying device MUST support io polling. + /// + /// Default: `false`. + pub fn with_iopoll(mut self, iopoll: bool) -> Self { + self.iopoll = iopoll; + self + } + + /// Set the weight of read/write priorities. + /// + /// The engine will try to keep the read/write iodepth ratio as close to the specified weight as possible. + pub fn with_weight(mut self, weight: f64) -> Self { + self.weight = weight; + self + } + + /// Enable or disable SQ polling. + /// + /// FYI: + /// + /// - [io_uring_setup(2)](https://man7.org/linux/man-pages/man2/io_uring_setup.2.html) + /// - [crate - io-uring](https://docs.rs/io-uring/latest/io_uring/struct.Builder.html#method.setup_sqpoll) + /// + /// Related syscall flag: `IORING_SETUP_IOPOLL`. + /// + /// NOTE: If this feature is enabled, the underlying device must be opened with the `O_DIRECT` flag. + /// + /// Default: `false`. + pub fn with_sqpoll(mut self, sqpoll: bool) -> Self { + self.sqpoll = sqpoll; + self + } + + /// Bind the kernel’s SQ poll thread to the specified cpu. + /// + /// This flag is only meaningful when [`Self::with_sqpoll`] is enabled. + /// + /// The length of `cpus` must be equal to the number of threads. + pub fn with_sqpoll_cpus(mut self, cpus: Vec) -> Self { + self.sqpoll_cpus = cpus; + self + } + + /// After idle milliseconds, the kernel thread will go to sleep and you will have to wake it up again with a system + /// call. + /// + /// This flag is only meaningful when [`Self::with_sqpoll`] is enabled. + pub fn with_sqpoll_idle(mut self, idle: u32) -> Self { + self.sqpoll_idle = idle; + self + } + + /// Set the simulated additional write I/O latency for testing purposes. + #[cfg(any(test, feature = "test_utils"))] + pub fn with_write_io_latency(mut self, latency: std::ops::Range) -> Self { + self.write_io_latency = Some(latency); + self + } + + /// Set the simulated additional read I/O latency for testing purposes. + #[cfg(any(test, feature = "test_utils"))] + pub fn with_read_io_latency(mut self, latency: std::ops::Range) -> Self { + self.read_io_latency = Some(latency); + self + } +} + +impl IoEngineConfig for UringIoEngineConfig { + fn build(self: Box, _: IoEngineBuildContext) -> BoxFuture<'static, Result>> { + async move { + if self.threads == 0 { + return Err(Error::new(ErrorKind::Config, "shards must be greater than 0") + .with_context("threads", self.threads)); + } + + let (read_txs, read_rxs): (Vec>, Vec>) = (0..self.threads) + .map(|_| { + let (tx, rx) = mpsc::sync_channel(4096); + (tx, rx) + }) + .unzip(); + + let (write_txs, write_rxs): (Vec>, Vec>) = (0..self.threads) + .map(|_| { + let (tx, rx) = mpsc::sync_channel(4096); + (tx, rx) + }) + .unzip(); + + for (i, (read_rx, write_rx)) in read_rxs.into_iter().zip(write_rxs.into_iter()).enumerate() { + let mut builder = IoUring::builder(); + if self.iopoll { + builder.setup_iopoll(); + } + if self.sqpoll { + builder.setup_sqpoll(self.sqpoll_idle); + if !self.sqpoll_cpus.is_empty() { + let cpu = self.sqpoll_cpus[i]; + builder.setup_sqpoll_cpu(cpu); + } + } + let cpu = if self.cpus.is_empty() { None } else { Some(self.cpus[i]) }; + let uring = builder.build(self.io_depth as _).map_err(Error::io_error)?; + let shard = UringIoEngineShard { + read_rx, + write_rx, + uring, + io_depth: self.io_depth, + weight: self.weight, + read_inflight: 0, + write_inflight: 0, + }; + + std::thread::Builder::new() + .name(format!("foyer-uring-{i}")) + .spawn(move || { + if let Some(cpu) = cpu { + core_affinity::set_for_current(CoreId { id: cpu as _ }); + } + shard.run(); + }) + .map_err(Error::io_error)?; + } + + let engine = UringIoEngine { + read_txs, + write_txs, + sequence: AtomicUsize::new(0), + }; + let engine = Arc::new(engine); + Ok(engine as Arc) + } + .boxed() + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum UringIoType { + Read, + Write, +} + +struct RawBuf { + ptr: *mut u8, + len: usize, +} + +unsafe impl Send for RawBuf {} +unsafe impl Sync for RawBuf {} + +struct RawFileAddress { + file: RawFile, + offset: u64, +} + +struct UringIoCtx { + tx: oneshot::Sender>, + io_type: UringIoType, + rbuf: RawBuf, + addr: RawFileAddress, + #[cfg(feature = "tracing")] + span: fastrace::Span, +} + +struct UringIoEngineShard { + read_rx: mpsc::Receiver, + write_rx: mpsc::Receiver, + weight: f64, + uring: IoUring, + io_depth: usize, + read_inflight: usize, + write_inflight: usize, +} + +impl UringIoEngineShard { + fn run(mut self) { + loop { + 'prepare: loop { + if self.read_inflight + self.write_inflight >= self.io_depth { + break 'prepare; + } + + let ctx = if (self.read_inflight as f64) < self.write_inflight as f64 * self.weight { + match self.read_rx.try_recv() { + Err(mpsc::TryRecvError::Disconnected) => return, + Ok(ctx) => Some(ctx), + Err(mpsc::TryRecvError::Empty) => match self.write_rx.try_recv() { + Err(mpsc::TryRecvError::Disconnected) => return, + Ok(ctx) => Some(ctx), + Err(mpsc::TryRecvError::Empty) => None, + }, + } + } else { + match self.write_rx.try_recv() { + Err(mpsc::TryRecvError::Disconnected) => return, + Ok(ctx) => Some(ctx), + Err(mpsc::TryRecvError::Empty) => match self.read_rx.try_recv() { + Err(mpsc::TryRecvError::Disconnected) => return, + Ok(ctx) => Some(ctx), + Err(mpsc::TryRecvError::Empty) => None, + }, + } + }; + + let ctx = match ctx { + Some(ctx) => ctx, + None => break 'prepare, + }; + + let ctx = Box::new(ctx); + + let fd = Fd(ctx.addr.file.0); + let sqe = match ctx.io_type { + UringIoType::Read => { + self.read_inflight += 1; + opcode::Read::new(fd, ctx.rbuf.ptr, ctx.rbuf.len as _) + .offset(ctx.addr.offset) + .build() + } + UringIoType::Write => { + self.write_inflight += 1; + opcode::Write::new(fd, ctx.rbuf.ptr, ctx.rbuf.len as _) + .offset(ctx.addr.offset) + .build() + } + }; + let data = Box::into_raw(ctx) as u64; + let sqe = sqe.user_data(data); + unsafe { self.uring.submission().push(&sqe).unwrap() } + } + + if self.read_inflight + self.write_inflight > 0 { + self.uring.submit().unwrap(); + } + + for cqe in self.uring.completion() { + let data = cqe.user_data(); + let ctx = unsafe { Box::from_raw(data as *mut UringIoCtx) }; + + match ctx.io_type { + UringIoType::Read => self.read_inflight -= 1, + UringIoType::Write => self.write_inflight -= 1, + } + + let res = cqe.result(); + if res < 0 { + let err = Error::raw_os_io_error(res); + let _ = ctx.tx.send(Err(err)); + } else { + let _ = ctx.tx.send(Ok(())); + } + + #[cfg(feature = "tracing")] + drop(ctx.span); + } + } + } +} + +/// The io_uring based I/O engine. +pub struct UringIoEngine { + read_txs: Vec>, + write_txs: Vec>, + sequence: AtomicUsize, +} + +impl Debug for UringIoEngine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("UringIoEngine").finish() + } +} + +impl UringIoEngine { + #[cfg_attr( + feature = "tracing", + fastrace::trace(name = "foyer::storage::io::engine::uring::read") + )] + fn read(&self, buf: Box, file: RawFile, offset: u64, tid: usize) -> IoHandle { + let (tx, rx) = oneshot::channel(); + let shard = &self.read_txs[tid]; + let (ptr, len) = buf.as_raw_parts(); + let rbuf = RawBuf { ptr, len }; + let addr = RawFileAddress { file, offset }; + #[cfg(feature = "tracing")] + let span = Span::enter_with_local_parent("foyer::storage::io::engine::uring::read::io"); + let _ = shard.send(UringIoCtx { + tx, + io_type: UringIoType::Read, + rbuf, + addr, + #[cfg(feature = "tracing")] + span, + }); + async move { + let res = match rx.await { + Ok(res) => res, + Err(e) => Err(Error::new(ErrorKind::ChannelClosed, "io completion channel closed").with_source(e)), + }; + (buf, res) + } + .boxed() + .into() + } + + #[cfg_attr( + feature = "tracing", + fastrace::trace(name = "foyer::storage::io::engine::uring::write") + )] + fn write(&self, buf: Box, file: RawFile, offset: u64, tid: usize) -> IoHandle { + let (tx, rx) = oneshot::channel(); + let shard = &self.write_txs[tid]; + let (ptr, len) = buf.as_raw_parts(); + let rbuf = RawBuf { ptr, len }; + let addr = RawFileAddress { file, offset }; + #[cfg(feature = "tracing")] + let span = Span::enter_with_local_parent("foyer::storage::io::engine::uring::write::io"); + let _ = shard.send(UringIoCtx { + tx, + io_type: UringIoType::Write, + rbuf, + addr, + #[cfg(feature = "tracing")] + span, + }); + async move { + let res = match rx.await { + Ok(res) => res, + Err(e) => Err(Error::new(ErrorKind::ChannelClosed, "io completion channel closed").with_source(e)), + }; + (buf, res) + } + .boxed() + .into() + } +} + +impl IoEngine for UringIoEngine { + fn submit(&self, unit: IoUnit) -> IoHandle { + let IoTarget::File { raw, offset } = unit.target else { + // FIXME(MrCroxx): Refine the hint. Raise error or panic? + panic!("uring io engine only supports file io"); + }; + let seq = self.sequence.fetch_add(1, Ordering::Relaxed); + + match unit.op { + IoOp::Read => self.read(unit.buf, raw, offset, seq % self.read_txs.len()), + IoOp::Write => self.write(unit.buf, raw, offset, seq % self.write_txs.len()), + } + } +} diff --git a/foyer-storage/src/iov2/mod.rs b/foyer-storage/src/iov2/mod.rs new file mode 100644 index 00000000..6a60351e --- /dev/null +++ b/foyer-storage/src/iov2/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2026 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod bytes; +mod device; +mod engine; + +const PAGE: usize = 4096; diff --git a/foyer-storage/src/lib.rs b/foyer-storage/src/lib.rs index e9440eca..1941bc5b 100644 --- a/foyer-storage/src/lib.rs +++ b/foyer-storage/src/lib.rs @@ -21,6 +21,7 @@ mod compress; mod engine; mod filter; mod io; +mod iov2; mod keeper; mod serde; mod store; diff --git a/io.md b/io.md new file mode 100644 index 00000000..a34bca2b --- /dev/null +++ b/io.md @@ -0,0 +1,17 @@ +| Cache Engine | Addressing Pattern | +| :-: | :-: | +| Block-based | (fd, offset, buffer) | +| KV-based | (key, buffer) | + + +| Io Engine | Compatible Devices | Acceptable Addressing Pattern | +| :-: | :-: | :-: | +| Psync | FS, File | (fd, offset, buffer) | +| Uring | FS, File | (fd, offset, buffer) | +| OpenDAL | (x, OpenDAL only) | (key, buffer) | + +| Device | Acceptable Addressing Pattern | +| :-: | :-: | +| Fs | (fd, offset, buffer), (key, buffer) | +| File | (fd, offset, buffer) | +| OpenDAL | (key, buffer) | \ No newline at end of file