diff --git a/foyer-memory/Cargo.toml b/foyer-memory/Cargo.toml index 890caf63..cf40ede5 100644 --- a/foyer-memory/Cargo.toml +++ b/foyer-memory/Cargo.toml @@ -15,7 +15,7 @@ readme = { workspace = true } ahash = { workspace = true } arc-swap = "1" bitflags = "2" -cmsketch = "0.2.1" +cmsketch = "0.2.2" equivalent = { workspace = true } fastrace = { workspace = true } foyer-common = { workspace = true } diff --git a/foyer-memory/src/eviction/fifo.rs b/foyer-memory/src/eviction/fifo.rs index 2d2ede1b..1166f395 100644 --- a/foyer-memory/src/eviction/fifo.rs +++ b/foyer-memory/src/eviction/fifo.rs @@ -18,7 +18,7 @@ use foyer_common::code::{Key, Value}; use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink}; use serde::{Deserialize, Serialize}; -use super::{Eviction, Op}; +use super::{Context, Eviction, Op}; use crate::{ error::Result, record::{CacheHint, Record}, @@ -52,6 +52,17 @@ pub struct FifoState { intrusive_adapter! { Adapter = Arc>>: Record> { ?offset = Record::>::STATE_OFFSET + offset_of!(FifoState, link) => LinkedListAtomicLink } where K: Key, V: Value } +#[derive(Debug, Clone)] +pub struct FifoContext; + +impl Context for FifoContext { + type Config = FifoConfig; + + fn init(_: &Self::Config) -> Self { + Self + } +} + pub struct Fifo where K: Key, @@ -70,8 +81,9 @@ where type Value = V; type Hint = FifoHint; type State = FifoState; + type Context = FifoContext; - fn new(_capacity: usize, _config: &Self::Config) -> Self + fn new(_: usize, _: &Self::Config, _: &Self::Context) -> Self where Self: Sized, { @@ -156,7 +168,7 @@ pub mod tests { .collect_vec(); let r = |i: usize| rs[i].clone(); - let mut fifo = TestFifo::new(100, &FifoConfig {}); + let mut fifo = TestFifo::new(100, &FifoConfig {}, &FifoContext); // 0, 1, 2, 3 fifo.push(r(0)); diff --git a/foyer-memory/src/eviction/lfu.rs b/foyer-memory/src/eviction/lfu.rs index dd72d879..f67e0b3a 100644 --- a/foyer-memory/src/eviction/lfu.rs +++ b/foyer-memory/src/eviction/lfu.rs @@ -14,7 +14,7 @@ use std::{mem::offset_of, sync::Arc}; -use cmsketch::CMSketchU16; +use cmsketch::CMSketchAtomicU16; use foyer_common::{ code::{Key, Value}, strict_assert, strict_assert_eq, strict_assert_ne, @@ -22,7 +22,7 @@ use foyer_common::{ use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink}; use serde::{Deserialize, Serialize}; -use super::{Eviction, Op}; +use super::{Context, Eviction, Op, OpCtx}; use crate::{ error::{Error, Result}, record::{CacheHint, Record}, @@ -105,6 +105,21 @@ pub struct LfuState { intrusive_adapter! { Adapter = Arc>>: Record> { ?offset = Record::>::STATE_OFFSET + offset_of!(LfuState, link) => LinkedListAtomicLink } where K: Key, V: Value } +#[derive(Debug, Clone)] +pub struct LfuContext { + frequencies: Arc, +} + +impl Context for LfuContext { + type Config = LfuConfig; + + fn init(config: &Self::Config) -> Self { + let frequencies = CMSketchAtomicU16::new(config.cmsketch_eps, config.cmsketch_confidence); + let frequencies = Arc::new(frequencies); + Self { frequencies } + } +} + /// This implementation is inspired by [Caffeine](https://github.com/ben-manes/caffeine) under Apache License 2.0 /// /// A new and hot entry is kept in `window`. @@ -133,8 +148,7 @@ where window_weight_capacity: usize, protected_weight_capacity: usize, - // TODO(MrCroxx): use a count-min-sketch impl with atomic u16 - frequencies: CMSketchU16, + frequencies: Arc, step: usize, decay: usize, @@ -185,8 +199,9 @@ where type Value = V; type Hint = LfuHint; type State = LfuState; + type Context = LfuContext; - fn new(capacity: usize, config: &Self::Config) -> Self + fn new(capacity: usize, config: &Self::Config, context: &Self::Context) -> Self where Self: Sized, { @@ -212,7 +227,7 @@ where let window_weight_capacity = (capacity as f64 * config.window_capacity_ratio) as usize; let protected_weight_capacity = (capacity as f64 * config.protected_capacity_ratio) as usize; - let frequencies = CMSketchU16::new(config.cmsketch_eps, config.cmsketch_confidence); + let frequencies = context.frequencies.clone(); let decay = frequencies.width(); Self { @@ -284,7 +299,6 @@ where record.set_in_eviction(true); state.queue = Queue::Window; self.increase_queue_weight(Queue::Window, record.weight()); - self.update_frequencies(record.hash()); self.window.push_back(record); // If `window` weight exceeds the capacity, overflow entry from `window` to `probation`. @@ -365,48 +379,57 @@ where } fn acquire() -> Op { - Op::mutable(|this: &mut Self, record| { - // Update frequency by access. - this.update_frequencies(record.hash()); - - if !record.is_in_eviction() { - return; - } - - let state = unsafe { &mut *record.state().get() }; - - strict_assert!(state.link.is_linked()); + Op::mutable_ctx(|this: &mut Self, ctx| { + match ctx { + OpCtx::Hit { record } => { + // Update frequency by access. + this.update_frequencies(record.hash()); + + if !record.is_in_eviction() { + return; + } - match state.queue { - Queue::None => unreachable!(), - Queue::Window => { - // Move to MRU position of `window`. - let r = unsafe { this.window.remove_from_ptr(Arc::as_ptr(record)) }; - this.window.push_back(r); - } - Queue::Probation => { - // Promote to MRU position of `protected`. - let r = unsafe { this.probation.remove_from_ptr(Arc::as_ptr(record)) }; - this.decrease_queue_weight(Queue::Probation, record.weight()); - state.queue = Queue::Protected; - this.increase_queue_weight(Queue::Protected, record.weight()); - this.protected.push_back(r); - - // If `protected` weight exceeds the capacity, overflow entry from `protected` to `probation`. - while this.protected_weight > this.protected_weight_capacity { - strict_assert!(!this.protected.is_empty()); - let r = this.protected.pop_front().unwrap(); - let s = unsafe { &mut *r.state().get() }; - this.decrease_queue_weight(Queue::Protected, r.weight()); - s.queue = Queue::Probation; - this.increase_queue_weight(Queue::Probation, r.weight()); - this.probation.push_back(r); + let state = unsafe { &mut *record.state().get() }; + + strict_assert!(state.link.is_linked()); + + match state.queue { + Queue::None => unreachable!(), + Queue::Window => { + // Move to MRU position of `window`. + let r = unsafe { this.window.remove_from_ptr(Arc::as_ptr(record)) }; + this.window.push_back(r); + } + Queue::Probation => { + // Promote to MRU position of `protected`. + let r = unsafe { this.probation.remove_from_ptr(Arc::as_ptr(record)) }; + this.decrease_queue_weight(Queue::Probation, record.weight()); + state.queue = Queue::Protected; + this.increase_queue_weight(Queue::Protected, record.weight()); + this.protected.push_back(r); + + // If `protected` weight exceeds the capacity, overflow entry from `protected` to + // `probation`. + while this.protected_weight > this.protected_weight_capacity { + strict_assert!(!this.protected.is_empty()); + let r = this.protected.pop_front().unwrap(); + let s = unsafe { &mut *r.state().get() }; + this.decrease_queue_weight(Queue::Protected, r.weight()); + s.queue = Queue::Probation; + this.increase_queue_weight(Queue::Probation, r.weight()); + this.probation.push_back(r); + } + } + Queue::Protected => { + // Move to MRU position of `protected`. + let r = unsafe { this.protected.remove_from_ptr(Arc::as_ptr(record)) }; + this.protected.push_back(r); + } } } - Queue::Protected => { - // Move to MRU position of `protected`. - let r = unsafe { this.protected.remove_from_ptr(Arc::as_ptr(record)) }; - this.protected.push_back(r); + OpCtx::Miss { hash } => { + // Update frequency by access. + this.update_frequencies(hash); } } }) @@ -495,7 +518,8 @@ mod tests { cmsketch_eps: 0.01, cmsketch_confidence: 0.95, }; - let mut lfu = TestLfu::new(10, &config); + let context = LfuContext::init(&config); + let mut lfu = TestLfu::new(10, &config, &context); assert_eq!(lfu.window_weight_capacity, 2); assert_eq!(lfu.protected_weight_capacity, 6); diff --git a/foyer-memory/src/eviction/lru.rs b/foyer-memory/src/eviction/lru.rs index 17496e3b..406163ec 100644 --- a/foyer-memory/src/eviction/lru.rs +++ b/foyer-memory/src/eviction/lru.rs @@ -21,7 +21,7 @@ use foyer_common::{ use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink}; use serde::{Deserialize, Serialize}; -use super::{Eviction, Op}; +use super::{Context, Eviction, Op}; use crate::{ error::{Error, Result}, record::{CacheHint, Record}, @@ -90,6 +90,17 @@ pub struct LruState { intrusive_adapter! { Adapter = Arc>>: Record> { ?offset = Record::>::STATE_OFFSET + offset_of!(LruState, link) => LinkedListAtomicLink } where K: Key, V: Value } +#[derive(Debug, Clone)] +pub struct LruContext; + +impl Context for LruContext { + type Config = LruConfig; + + fn init(_: &Self::Config) -> Self { + Self + } +} + pub struct Lru where K: Key, @@ -135,8 +146,9 @@ where type Value = V; type Hint = LruHint; type State = LruState; + type Context = LruContext; - fn new(capacity: usize, config: &Self::Config) -> Self + fn new(capacity: usize, config: &Self::Config, _: &Self::Context) -> Self where Self: Sized, { @@ -400,7 +412,7 @@ pub mod tests { let config = LruConfig { high_priority_pool_ratio: 0.5, }; - let mut lru = TestLru::new(8, &config); + let mut lru = TestLru::new(8, &config, &LruContext); assert_eq!(lru.high_priority_weight_capacity, 4); @@ -477,7 +489,7 @@ pub mod tests { let config = LruConfig { high_priority_pool_ratio: 0.5, }; - let mut lru = TestLru::new(8, &config); + let mut lru = TestLru::new(8, &config, &LruContext); assert_eq!(lru.high_priority_weight_capacity, 4); diff --git a/foyer-memory/src/eviction/mod.rs b/foyer-memory/src/eviction/mod.rs index 4f351de4..e0ce614e 100644 --- a/foyer-memory/src/eviction/mod.rs +++ b/foyer-memory/src/eviction/mod.rs @@ -31,6 +31,11 @@ impl State for T where T: Send + Sync + 'static + Default {} pub trait Config: Send + Sync + 'static + Clone + Serialize + DeserializeOwned + Default {} impl Config for T where T: Send + Sync + 'static + Clone + Serialize + DeserializeOwned + Default {} +pub trait Context: Send + Sync + 'static + Clone { + type Config: Config; + fn init(config: &Self::Config) -> Self; +} + /// Wrapper for one of the three kind of operations for the eviction container: /// /// 1. no operation @@ -43,10 +48,22 @@ where { /// no operation Noop, - /// immutable operation + /// perform immutable operation on cache hit Immutable(Box>) + Send + Sync + 'static>), - /// mutable operation + /// perform mutable operation on cache hit Mutable(Box>) + Send + Sync + 'static>), + /// perform immutable operation on both hit and miss + ImmutableCtx(Box) + Send + Sync + 'static>), + /// perform mutable operation on both hit and miss + MutableCtx(Box) + Send + Sync + 'static>), +} + +pub enum OpCtx<'a, E> +where + E: Eviction, +{ + Hit { record: &'a Arc> }, + Miss { hash: u64 }, } impl Op @@ -58,7 +75,7 @@ where Self::Noop } - /// immutable operation + /// perform immutable operation on cache hit pub fn immutable(f: F) -> Self where F: Fn(&E, &Arc>) + Send + Sync + 'static, @@ -66,13 +83,29 @@ where Self::Immutable(Box::new(f)) } - /// mutable operation + /// perform mutable operation on cache hit pub fn mutable(f: F) -> Self where F: FnMut(&mut E, &Arc>) + Send + Sync + 'static, { Self::Mutable(Box::new(f)) } + + /// perform immutable operation on both hit and miss + pub fn immutable_ctx(f: F) -> Self + where + F: Fn(&E, OpCtx<'_, E>) + Send + Sync + 'static, + { + Self::ImmutableCtx(Box::new(f)) + } + + /// perform mutable operation on both hit and miss + pub fn mutable_ctx(f: F) -> Self + where + F: FnMut(&mut E, OpCtx<'_, E>) + Send + Sync + 'static, + { + Self::MutableCtx(Box::new(f)) + } } /// Cache eviction algorithm abstraction. @@ -96,9 +129,11 @@ pub trait Eviction: Send + Sync + 'static + Sized { type Hint: Hint; /// State for a cache entry. Mutable state for maintaining the cache eviction algorithm implementation. type State: State; + /// Shared context for all evction shards. + type Context: Context; /// Create a new cache eviction algorithm instance with the given arguments. - fn new(capacity: usize, config: &Self::Config) -> Self; + fn new(capacity: usize, config: &Self::Config, context: &Self::Context) -> Self; /// Update the arguments of the ache eviction algorithm instance. fn update(&mut self, capacity: usize, config: Option<&Self::Config>) -> Result<()>; @@ -140,6 +175,53 @@ pub trait Eviction: Send + Sync + 'static + Sized { fn release() -> Op; } +/// Provides helper functions for developers. +pub trait EvictionExt: Eviction { + fn acquire_immutable(&self, record: &Arc>) { + match Self::acquire() { + Op::Immutable(f) => f(self, record), + _ => unreachable!(), + } + } + + fn acquire_mutable(&mut self, record: &Arc>) { + match Self::acquire() { + Op::Mutable(mut f) => f(self, record), + _ => unreachable!(), + } + } + + fn acquire_immutable_ctx(&self, ctx: OpCtx<'_, Self>) { + match Self::acquire() { + Op::ImmutableCtx(f) => f(self, ctx), + _ => unreachable!(), + } + } + + fn acquire_mutable_ctx(&mut self, ctx: OpCtx<'_, Self>) { + match Self::acquire() { + Op::MutableCtx(mut f) => f(self, ctx), + _ => unreachable!(), + } + } + + fn release_immutable(&self, record: &Arc>) { + match Self::release() { + Op::Immutable(f) => f(self, record), + _ => unreachable!(), + } + } + + fn release_mutable(&mut self, record: &Arc>) { + match Self::release() { + Op::Mutable(mut f) => f(self, record), + _ => unreachable!(), + } + } +} + +impl EvictionExt for E where E: Eviction {} + pub mod fifo; pub mod lfu; pub mod lru; diff --git a/foyer-memory/src/eviction/s3fifo.rs b/foyer-memory/src/eviction/s3fifo.rs index 566265ae..9ae078f5 100644 --- a/foyer-memory/src/eviction/s3fifo.rs +++ b/foyer-memory/src/eviction/s3fifo.rs @@ -28,7 +28,7 @@ use foyer_common::{ use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink}; use serde::{Deserialize, Serialize}; -use super::{Eviction, Op}; +use super::{Context, Eviction, Op}; use crate::{ error::{Error, Result}, record::{CacheHint, Record}, @@ -120,6 +120,17 @@ impl S3FifoState { intrusive_adapter! { Adapter = Arc>>: Record> { ?offset = Record::>::STATE_OFFSET + offset_of!(S3FifoState, link) => LinkedListAtomicLink } where K: Key, V: Value } +#[derive(Debug, Clone)] +pub struct S3FifoContext; + +impl Context for S3FifoContext { + type Config = S3FifoConfig; + + fn init(_: &Self::Config) -> Self { + Self + } +} + pub struct S3Fifo where K: Key, @@ -215,8 +226,9 @@ where type Value = V; type Hint = S3FifoHint; type State = S3FifoState; + type Context = S3FifoContext; - fn new(capacity: usize, config: &Self::Config) -> Self + fn new(capacity: usize, config: &Self::Config, _: &Self::Context) -> Self where Self: Sized, { @@ -459,7 +471,7 @@ mod tests { ghost_queue_capacity_ratio: 10.0, small_to_main_freq_threshold: 2, }; - let mut s3fifo = TestS3Fifo::new(8, &config); + let mut s3fifo = TestS3Fifo::new(8, &config, &S3FifoContext); assert_eq!(s3fifo.small_weight_capacity, 2); diff --git a/foyer-memory/src/raw.rs b/foyer-memory/src/raw.rs index 122dd6b8..3ff9c64e 100644 --- a/foyer-memory/src/raw.rs +++ b/foyer-memory/src/raw.rs @@ -20,7 +20,7 @@ use std::{ ops::Deref, pin::Pin, sync::Arc, - task::{Context, Poll}, + task::Poll, }; use arc_swap::ArcSwap; @@ -46,7 +46,7 @@ use tokio::{sync::oneshot, task::JoinHandle}; use crate::{ error::{Error, Result}, - eviction::{Eviction, Op}, + eviction::{Context, Eviction, EvictionExt, Op, OpCtx}, indexer::{hash_table::HashTableIndexer, sentry::Sentry, Indexer}, pipe::NoopPipe, record::{Data, Record}, @@ -215,7 +215,7 @@ where Q: Hash + Equivalent + ?Sized, { self.get_inner(hash, key) - .inspect(|record| self.acquire_immutable(record)) + .inspect(|record| self.eviction.acquire_immutable(record)) } #[fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")] @@ -223,7 +223,36 @@ where where Q: Hash + Equivalent + ?Sized, { - self.get_inner(hash, key).inspect(|record| self.acquire_mutable(record)) + self.get_inner(hash, key) + .inspect(|record| self.eviction.acquire_mutable(record)) + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::get_immutable")] + fn get_immutable_ctx(&self, hash: u64, key: &Q) -> Option>> + where + Q: Hash + Equivalent + ?Sized, + { + let res = self.get_inner(hash, key); + let ctx = match res.as_ref() { + Some(record) => OpCtx::Hit { record }, + None => OpCtx::Miss { hash }, + }; + self.eviction.acquire_immutable_ctx(ctx); + res + } + + #[fastrace::trace(name = "foyer::memory::raw::shard::get_mutable")] + fn get_mutable_ctx(&mut self, hash: u64, key: &Q) -> Option>> + where + Q: Hash + Equivalent + ?Sized, + { + let res = self.get_inner(hash, key); + let ctx = match res.as_ref() { + Some(record) => OpCtx::Hit { record }, + None => OpCtx::Miss { hash }, + }; + self.eviction.acquire_mutable_ctx(ctx); + res } #[fastrace::trace(name = "foyer::memory::raw::shard::get_inner")] @@ -269,44 +298,36 @@ where self.metrics.memory_remove.increase(count); } - #[fastrace::trace(name = "foyer::memory::raw::shard::acquire_immutable")] - fn acquire_immutable(&self, record: &Arc>) { - match E::acquire() { - Op::Immutable(f) => f(&self.eviction, record), - _ => unreachable!(), + #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_noop")] + fn fetch_noop(&self, hash: u64, key: &E::Key) -> RawShardFetch + where + E::Key: Clone, + { + if let Some(record) = self.get_noop(hash, key) { + return RawShardFetch::Hit(record); } - } - #[fastrace::trace(name = "foyer::memory::raw::shard::acquire_mutable")] - fn acquire_mutable(&mut self, record: &Arc>) { - match E::acquire() { - Op::Mutable(mut f) => f(&mut self.eviction, record), - _ => unreachable!(), - } + self.fetch_queue(key.clone()) } - #[fastrace::trace(name = "foyer::memory::raw::shard::release_immutable")] - fn release_immutable(&self, record: &Arc>) { - match E::release() { - Op::Immutable(f) => f(&self.eviction, record), - _ => unreachable!(), + #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_immutable")] + fn fetch_immutable(&self, hash: u64, key: &E::Key) -> RawShardFetch + where + E::Key: Clone, + { + if let Some(record) = self.get_immutable(hash, key) { + return RawShardFetch::Hit(record); } - } - #[fastrace::trace(name = "foyer::memory::raw::shard::release_mutable")] - fn release_mutable(&mut self, record: &Arc>) { - match E::release() { - Op::Mutable(mut f) => f(&mut self.eviction, record), - _ => unreachable!(), - } + self.fetch_queue(key.clone()) } - #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_noop")] - fn fetch_noop(&self, hash: u64, key: &E::Key) -> RawShardFetch + #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_mutable")] + fn fetch_mutable(&mut self, hash: u64, key: &E::Key) -> RawShardFetch where E::Key: Clone, { - if let Some(record) = self.get_noop(hash, key) { + if let Some(record) = self.get_mutable(hash, key) { return RawShardFetch::Hit(record); } @@ -314,11 +335,11 @@ where } #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_immutable")] - fn fetch_immutable(&self, hash: u64, key: &E::Key) -> RawShardFetch + fn fetch_immutable_ctx(&self, hash: u64, key: &E::Key) -> RawShardFetch where E::Key: Clone, { - if let Some(record) = self.get_immutable(hash, key) { + if let Some(record) = self.get_immutable_ctx(hash, key) { return RawShardFetch::Hit(record); } @@ -326,11 +347,11 @@ where } #[fastrace::trace(name = "foyer::memory::raw::shard::fetch_mutable")] - fn fetch_mutable(&mut self, hash: u64, key: &E::Key) -> RawShardFetch + fn fetch_mutable_ctx(&mut self, hash: u64, key: &E::Key) -> RawShardFetch where E::Key: Clone, { - if let Some(record) = self.get_mutable(hash, key) { + if let Some(record) = self.get_mutable_ctx(hash, key) { return RawShardFetch::Hit(record); } @@ -441,9 +462,11 @@ where pub fn new(config: RawCacheConfig) -> Self { let shard_capacity = config.capacity / config.shards; + let context = E::Context::init(&config.eviction_config); + let shards = (0..config.shards) .map(|_| RawCacheShard { - eviction: E::new(shard_capacity, &config.eviction_config), + eviction: E::new(shard_capacity, &config.eviction_config, &context), indexer: Sentry::default(), usage: 0, capacity: shard_capacity, @@ -692,6 +715,12 @@ where Op::Mutable(_) => self.inner.shards[self.shard(hash)] .write() .with(|mut shard| shard.get_mutable(hash, key)), + Op::ImmutableCtx(_) => self.inner.shards[self.shard(hash)] + .read() + .with(|shard| shard.get_immutable_ctx(hash, key)), + Op::MutableCtx(_) => self.inner.shards[self.shard(hash)] + .write() + .with(|mut shard| shard.get_mutable_ctx(hash, key)), }?; Some(RawCacheEntry { @@ -727,6 +756,12 @@ where Op::Mutable(_) => self.inner.shards[self.shard(hash)] .write() .with(|mut shard| shard.get_mutable(hash, key)), + Op::ImmutableCtx(_) => self.inner.shards[self.shard(hash)] + .read() + .with(|shard| shard.get_immutable_ctx(hash, key)), + Op::MutableCtx(_) => self.inner.shards[self.shard(hash)] + .write() + .with(|mut shard| shard.get_mutable_ctx(hash, key)), } .is_some() } @@ -799,8 +834,13 @@ where if self.record.dec_refs(1) == 0 { match E::release() { Op::Noop => {} - Op::Immutable(_) => shard.read().with(|shard| shard.release_immutable(&self.record)), - Op::Mutable(_) => shard.write().with(|mut shard| shard.release_mutable(&self.record)), + Op::Immutable(_) => shard + .read() + .with(|shard| shard.eviction.release_immutable(&self.record)), + Op::Mutable(_) => shard + .write() + .with(|mut shard| shard.eviction.release_mutable(&self.record)), + Op::ImmutableCtx(_) | Op::MutableCtx(_) => unreachable!(), } if self.record.is_ephemeral() { @@ -977,7 +1017,7 @@ where { type Output = Diversion, ER>, FetchContext>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { match self.project() { RawFetchInnerProj::Hit(opt) => Poll::Ready(Ok(opt.take().unwrap()).into()), RawFetchInnerProj::Wait(waiter) => waiter.poll(cx).map_err(|err| err.into()).map(Diversion::from), @@ -1040,6 +1080,12 @@ where Op::Noop => self.inner.shards[self.shard(hash)].read().fetch_noop(hash, &key), Op::Immutable(_) => self.inner.shards[self.shard(hash)].read().fetch_immutable(hash, &key), Op::Mutable(_) => self.inner.shards[self.shard(hash)].write().fetch_mutable(hash, &key), + Op::ImmutableCtx(_) => self.inner.shards[self.shard(hash)] + .read() + .fetch_immutable_ctx(hash, &key), + Op::MutableCtx(_) => self.inner.shards[self.shard(hash)] + .write() + .fetch_mutable_ctx(hash, &key), }; match raw {