Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 248 additions & 0 deletions foyer/src/hybrid/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,84 @@ where
try_cancel!(self, span, record_hybrid_remove_threshold);
}

/// Wait for all pending storage operations to complete.
///
/// This ensures that all in-flight writes, deletes, and flushes have been
/// processed. Useful when you need strict consistency guarantees.
///
/// # Example
///
/// ```ignore
/// // Ensure deletion is complete before reading
/// cache.remove(&key);
/// cache.sync().await;
/// assert!(cache.get(&key).await?.is_none());
/// ```
pub async fn sync(&self) {
self.inner.storage.wait().await
}

/// Remove a cached entry with the given key and wait for the operation to complete.
///
/// This is equivalent to calling `remove()` followed by `sync()`, ensuring
/// that the deletion is fully processed before returning.
///
/// # Example
///
/// ```ignore
/// // Atomic delete operation
/// cache.remove_sync(&key).await;
/// // At this point, the key is guaranteed to be removed
/// ```
pub async fn remove_sync<Q>(&self, key: &Q)
where
Q: Hash + Equivalent<K> + ?Sized + Send + Sync + 'static,
{
self.remove(key);
self.sync().await
}

/// Insert a cache entry and wait for any pending operations to complete.
///
/// This is equivalent to calling `insert()` followed by `sync()`, ensuring
/// that any previous value with the same key is fully removed before returning.
/// This prevents race conditions when overwriting existing entries.
///
/// # Example
///
/// ```ignore
/// // Atomic insert operation that safely overwrites
/// let entry = cache.insert_sync(key, new_value).await;
/// // At this point, the old value (if any) is guaranteed to be gone
/// ```
pub async fn insert_sync(&self, key: K, value: V) -> HybridCacheEntry<K, V, S> {
let entry = self.insert(key, value);
self.sync().await;
entry
}

/// Insert a cache entry with properties and wait for any pending operations to complete.
///
/// This is equivalent to calling `insert_with_properties()` followed by `sync()`.
///
/// # Example
///
/// ```ignore
/// // Atomic insert with custom properties
/// let props = HybridCacheProperties::default().with_location(Location::InMem);
/// let entry = cache.insert_with_properties_sync(key, value, props).await;
/// ```
pub async fn insert_with_properties_sync(
&self,
key: K,
value: V,
properties: HybridCacheProperties,
) -> HybridCacheEntry<K, V, S> {
let entry = self.insert_with_properties(key, value, properties);
self.sync().await;
entry
}

/// Check if the hybrid cache contains a cached entry with the given key.
///
/// `contains` may return a false-positive result if there is a hash collision with the given key.
Expand Down Expand Up @@ -1524,4 +1602,174 @@ mod tests {
let hybrid = open(&dir).await;
assert_eq!(*hybrid.get(&1).await.unwrap().unwrap(), vec![1; 3 * KB]);
}

#[test_log::test(tokio::test)]
async fn test_delete_consistency_with_sync() {
let dir = tempfile::tempdir().unwrap();
let hybrid = open(dir.path()).await;

// Test that sync() ensures deletion completes before subsequent operations
for i in 0..100 {
let key = i;
let old_value = vec![i as u8; 1 * KB];
let new_value = vec![(i + 1) as u8; 1 * KB];

// Insert initial value
hybrid.insert(key, old_value.clone());

// Delete with sync to ensure consistency
hybrid.remove(&key);
hybrid.sync().await;

// Reinsert with different value
hybrid.insert(key, new_value.clone());

// Read should always get the new value
let retrieved = hybrid.get(&key).await.unwrap().unwrap();
assert_eq!(
retrieved.value(),
&new_value,
"Got stale value after delete + sync at iteration {}",
i
);
}
}

#[test_log::test(tokio::test)]
async fn test_remove_sync_atomicity() {
let dir = tempfile::tempdir().unwrap();
let hybrid = open(dir.path()).await;

// Test the convenience method remove_sync
for i in 0..50 {
let key = i;
let value = vec![i as u8; 2 * KB];

// Insert and verify
hybrid.insert(key, value.clone());
assert!(hybrid.contains(&key));

// Use atomic remove_sync
hybrid.remove_sync(&key).await;

// Should be immediately gone
assert!(!hybrid.contains(&key));
let result = hybrid.get(&key).await.unwrap();
assert!(result.is_none(), "Key {} should be deleted", key);

// Reinsert should work without issues
let new_value = vec![(i + 100) as u8; 2 * KB];
hybrid.insert(key, new_value.clone());

let retrieved = hybrid.get(&key).await.unwrap().unwrap();
assert_eq!(retrieved.value(), &new_value);
}
}

#[test_log::test(tokio::test)]
async fn test_sync_after_multiple_operations() {
let dir = tempfile::tempdir().unwrap();
let hybrid = open(dir.path()).await;

// Perform multiple operations then sync once
let keys: Vec<u64> = (0..20).collect();

// Insert all
for &key in &keys {
hybrid.insert(key, vec![key as u8; 1 * KB]);
}

// Delete half
for &key in keys.iter().filter(|&&k| k % 2 == 0) {
hybrid.remove(&key);
}

// Sync once to ensure all operations complete
hybrid.sync().await;

// Verify state
for &key in &keys {
let result = hybrid.get(&key).await.unwrap();
if key % 2 == 0 {
assert!(result.is_none(), "Even key {} should be deleted", key);
} else {
assert!(result.is_some(), "Odd key {} should exist", key);
assert_eq!(result.unwrap().value(), &vec![key as u8; 1 * KB]);
}
}
}

#[test_log::test(tokio::test)]
async fn test_sync_with_storage_operations() {
let dir = tempfile::tempdir().unwrap();
let hybrid = open(dir.path()).await;

// Test sync with storage writer operations
let key = 42u64;
let value1 = vec![1u8; 10 * KB];
let value2 = vec![2u8; 10 * KB];

// Write directly to storage
hybrid.storage_writer(key).insert(value1.clone()).unwrap();

// Remove and sync
hybrid.remove(&key);
hybrid.sync().await;

// Should not find the key
assert!(hybrid.get(&key).await.unwrap().is_none());

// Write new value to storage
hybrid.storage_writer(key).insert(value2.clone()).unwrap();
hybrid.sync().await;

// Should find the new value
let retrieved = hybrid.get(&key).await.unwrap().unwrap();
assert_eq!(retrieved.value(), &value2);
}

#[test_log::test(tokio::test)]
async fn test_insert_sync_atomicity() {
let dir = tempfile::tempdir().unwrap();
let hybrid = open(dir.path()).await;

// Test that insert_sync ensures overwrites complete atomically
let key = 999u64;
for i in 0..100 {
let value = vec![i as u8; 2 * KB];

// Use insert_sync for atomic overwrites
hybrid.insert_sync(key, value.clone()).await;

// Read should always get the current value
let retrieved = hybrid.get(&key).await.unwrap().unwrap();
assert_eq!(
retrieved.value(),
&value,
"Got stale value after insert_sync at iteration {}",
i
);
}
}

#[test_log::test(tokio::test)]
async fn test_insert_with_properties_sync() {
let dir = tempfile::tempdir().unwrap();
let hybrid = open(dir.path()).await;

let key = 123u64;
let properties = HybridCacheProperties::default().with_location(Location::InMem);

// Test multiple overwrites with properties
for i in 0..50 {
let value = vec![i as u8; 1 * KB];

let entry = hybrid.insert_with_properties_sync(key, value.clone(), properties.clone()).await;
assert_eq!(entry.value(), &value);

// Verify no stale reads
let retrieved = hybrid.get(&key).await.unwrap().unwrap();
assert_eq!(retrieved.value(), &value);
}
}
}
Loading