diff --git a/foyer/src/hybrid/cache.rs b/foyer/src/hybrid/cache.rs index 4e69a6c1..e08e0e53 100644 --- a/foyer/src/hybrid/cache.rs +++ b/foyer/src/hybrid/cache.rs @@ -771,6 +771,84 @@ where try_cancel!(self, span, record_hybrid_remove_threshold); } + /// Wait for all pending storage operations to complete. + /// + /// This ensures that all in-flight writes, deletes, and flushes have been + /// processed. Useful when you need strict consistency guarantees. + /// + /// # Example + /// + /// ```ignore + /// // Ensure deletion is complete before reading + /// cache.remove(&key); + /// cache.sync().await; + /// assert!(cache.get(&key).await?.is_none()); + /// ``` + pub async fn sync(&self) { + self.inner.storage.wait().await + } + + /// Remove a cached entry with the given key and wait for the operation to complete. + /// + /// This is equivalent to calling `remove()` followed by `sync()`, ensuring + /// that the deletion is fully processed before returning. + /// + /// # Example + /// + /// ```ignore + /// // Atomic delete operation + /// cache.remove_sync(&key).await; + /// // At this point, the key is guaranteed to be removed + /// ``` + pub async fn remove_sync(&self, key: &Q) + where + Q: Hash + Equivalent + ?Sized + Send + Sync + 'static, + { + self.remove(key); + self.sync().await + } + + /// Insert a cache entry and wait for any pending operations to complete. + /// + /// This is equivalent to calling `insert()` followed by `sync()`, ensuring + /// that any previous value with the same key is fully removed before returning. + /// This prevents race conditions when overwriting existing entries. + /// + /// # Example + /// + /// ```ignore + /// // Atomic insert operation that safely overwrites + /// let entry = cache.insert_sync(key, new_value).await; + /// // At this point, the old value (if any) is guaranteed to be gone + /// ``` + pub async fn insert_sync(&self, key: K, value: V) -> HybridCacheEntry { + let entry = self.insert(key, value); + self.sync().await; + entry + } + + /// Insert a cache entry with properties and wait for any pending operations to complete. + /// + /// This is equivalent to calling `insert_with_properties()` followed by `sync()`. + /// + /// # Example + /// + /// ```ignore + /// // Atomic insert with custom properties + /// let props = HybridCacheProperties::default().with_location(Location::InMem); + /// let entry = cache.insert_with_properties_sync(key, value, props).await; + /// ``` + pub async fn insert_with_properties_sync( + &self, + key: K, + value: V, + properties: HybridCacheProperties, + ) -> HybridCacheEntry { + let entry = self.insert_with_properties(key, value, properties); + self.sync().await; + entry + } + /// Check if the hybrid cache contains a cached entry with the given key. /// /// `contains` may return a false-positive result if there is a hash collision with the given key. @@ -1524,4 +1602,174 @@ mod tests { let hybrid = open(&dir).await; assert_eq!(*hybrid.get(&1).await.unwrap().unwrap(), vec![1; 3 * KB]); } + + #[test_log::test(tokio::test)] + async fn test_delete_consistency_with_sync() { + let dir = tempfile::tempdir().unwrap(); + let hybrid = open(dir.path()).await; + + // Test that sync() ensures deletion completes before subsequent operations + for i in 0..100 { + let key = i; + let old_value = vec![i as u8; 1 * KB]; + let new_value = vec![(i + 1) as u8; 1 * KB]; + + // Insert initial value + hybrid.insert(key, old_value.clone()); + + // Delete with sync to ensure consistency + hybrid.remove(&key); + hybrid.sync().await; + + // Reinsert with different value + hybrid.insert(key, new_value.clone()); + + // Read should always get the new value + let retrieved = hybrid.get(&key).await.unwrap().unwrap(); + assert_eq!( + retrieved.value(), + &new_value, + "Got stale value after delete + sync at iteration {}", + i + ); + } + } + + #[test_log::test(tokio::test)] + async fn test_remove_sync_atomicity() { + let dir = tempfile::tempdir().unwrap(); + let hybrid = open(dir.path()).await; + + // Test the convenience method remove_sync + for i in 0..50 { + let key = i; + let value = vec![i as u8; 2 * KB]; + + // Insert and verify + hybrid.insert(key, value.clone()); + assert!(hybrid.contains(&key)); + + // Use atomic remove_sync + hybrid.remove_sync(&key).await; + + // Should be immediately gone + assert!(!hybrid.contains(&key)); + let result = hybrid.get(&key).await.unwrap(); + assert!(result.is_none(), "Key {} should be deleted", key); + + // Reinsert should work without issues + let new_value = vec![(i + 100) as u8; 2 * KB]; + hybrid.insert(key, new_value.clone()); + + let retrieved = hybrid.get(&key).await.unwrap().unwrap(); + assert_eq!(retrieved.value(), &new_value); + } + } + + #[test_log::test(tokio::test)] + async fn test_sync_after_multiple_operations() { + let dir = tempfile::tempdir().unwrap(); + let hybrid = open(dir.path()).await; + + // Perform multiple operations then sync once + let keys: Vec = (0..20).collect(); + + // Insert all + for &key in &keys { + hybrid.insert(key, vec![key as u8; 1 * KB]); + } + + // Delete half + for &key in keys.iter().filter(|&&k| k % 2 == 0) { + hybrid.remove(&key); + } + + // Sync once to ensure all operations complete + hybrid.sync().await; + + // Verify state + for &key in &keys { + let result = hybrid.get(&key).await.unwrap(); + if key % 2 == 0 { + assert!(result.is_none(), "Even key {} should be deleted", key); + } else { + assert!(result.is_some(), "Odd key {} should exist", key); + assert_eq!(result.unwrap().value(), &vec![key as u8; 1 * KB]); + } + } + } + + #[test_log::test(tokio::test)] + async fn test_sync_with_storage_operations() { + let dir = tempfile::tempdir().unwrap(); + let hybrid = open(dir.path()).await; + + // Test sync with storage writer operations + let key = 42u64; + let value1 = vec![1u8; 10 * KB]; + let value2 = vec![2u8; 10 * KB]; + + // Write directly to storage + hybrid.storage_writer(key).insert(value1.clone()).unwrap(); + + // Remove and sync + hybrid.remove(&key); + hybrid.sync().await; + + // Should not find the key + assert!(hybrid.get(&key).await.unwrap().is_none()); + + // Write new value to storage + hybrid.storage_writer(key).insert(value2.clone()).unwrap(); + hybrid.sync().await; + + // Should find the new value + let retrieved = hybrid.get(&key).await.unwrap().unwrap(); + assert_eq!(retrieved.value(), &value2); + } + + #[test_log::test(tokio::test)] + async fn test_insert_sync_atomicity() { + let dir = tempfile::tempdir().unwrap(); + let hybrid = open(dir.path()).await; + + // Test that insert_sync ensures overwrites complete atomically + let key = 999u64; + for i in 0..100 { + let value = vec![i as u8; 2 * KB]; + + // Use insert_sync for atomic overwrites + hybrid.insert_sync(key, value.clone()).await; + + // Read should always get the current value + let retrieved = hybrid.get(&key).await.unwrap().unwrap(); + assert_eq!( + retrieved.value(), + &value, + "Got stale value after insert_sync at iteration {}", + i + ); + } + } + + #[test_log::test(tokio::test)] + async fn test_insert_with_properties_sync() { + let dir = tempfile::tempdir().unwrap(); + let hybrid = open(dir.path()).await; + + let key = 123u64; + let properties = HybridCacheProperties::default().with_location(Location::InMem); + + // Test multiple overwrites with properties + for i in 0..50 { + let value = vec![i as u8; 1 * KB]; + + let entry = hybrid.insert_with_properties_sync(key, value.clone(), properties.clone()).await; + assert_eq!(entry.value(), &value); + + // Verify no stale reads + let retrieved = hybrid.get(&key).await.unwrap().unwrap(); + assert_eq!(retrieved.value(), &value); + } + } }