Skip to content
7 changes: 5 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ impl CfDescriptorBuilder {
}

/// Apply RocksDB's point-lookup tuning while keeping the block-based table options mutable.
Comment thread
preston-evans98 marked this conversation as resolved.
///
/// These options are taken directly from the rocksdb source: https://github.com/facebook/rocksdb/blob/809ed26be58e390d5e4dd271bfd4de5ecd23576b/options/options.cc#L647
pub fn optimize_for_point_lookup(&mut self, block_cache_size_mb: u64) {
let block_cache_size = usize::try_from(block_cache_size_mb)
.ok()
Expand All @@ -134,7 +136,7 @@ impl CfDescriptorBuilder {
let block_opts = self.block_based_table_options_mut();
block_opts.set_data_block_index_type(rocksdb::DataBlockIndexType::BinaryAndHash);
block_opts.set_data_block_hash_ratio(0.75);
block_opts.set_bloom_filter(10.0, true);
block_opts.set_bloom_filter(10.0, false);
block_opts.set_block_cache(&rocksdb::Cache::new_lru_cache(block_cache_size));
self.cf_opts.set_memtable_prefix_bloom_ratio(0.02);
self.cf_opts.set_memtable_whole_key_filtering(true);
Expand All @@ -151,7 +153,8 @@ impl CfDescriptorBuilder {
}

/// Returns the default column family descriptor and lets callers customize the RocksDB options
/// before the descriptor is finalized.
/// before the descriptor is finalized. LZ4 compression is enabled by default before the customization function is called.
/// Any overrides to the compression type will supersede the default LZ4 compression.
pub fn default_cf_descriptor_with(
cf_name: impl Into<String>,
customize: impl FnOnce(&str, &mut CfDescriptorBuilder),
Expand Down
43 changes: 37 additions & 6 deletions src/versioned_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,18 +1003,49 @@ impl<V: SchemaWithVersion, C: CacheForVersionedDB<V>> VersionedDeltaReader<V, C>
where
V::Value: Clone + AsRef<[u8]>,
V: Ord,
V::Key: Eq + std::hash::Hash + KeyEncoder<V> + KeyDecoder<V> + HasPrefix + Ord + AsRef<[u8]>,
V::Key: Eq
+ std::hash::Hash
+ KeyEncoder<V>
+ KeyDecoder<V>
+ HasPrefix
+ Ord
+ AsRef<[u8]>
+ std::fmt::Debug,
{
/// Construct an iterator over the versioned DB with a given prefix.
///
/// Note that the returned iterator holds a read lock over the DB, so no writes can complete while it is active.
pub fn iter_with_prefix(&self, prefix: V::Key) -> anyhow::Result<VersionedDbIterator<'_, V>> {
self.iter_with_prefix_and_cursor(prefix, None)
}

/// Construct an iterator over the versioned DB with a given prefix, skipping until the given key is reached. Note that
/// any values exactly equal to the cursor *will* be included in the iterator.
///
/// Note that the returned iterator holds a read lock over the DB, so no writes can complete while it is active.
pub fn iter_with_prefix_and_cursor(
&self,
prefix: V::Key,
skip_until: Option<V::Key>,
) -> anyhow::Result<VersionedDbIterator<'_, V>> {
let read_lock = self.db.versioned_db_cache.read();
let version_on_disk = self.db.get_committed_version_live_db()?;
// println!("Version on disk: {:?}", version_on_disk);
let raw_prefix = prefix.clone()..;
let encoded_prefix = prefix.encode_key()?;
Comment thread
preston-evans98 marked this conversation as resolved.
let range = encoded_prefix.clone()..;

let (encoded_range, raw_range_start): (_, V::Key) = if let Some(skip_until) = skip_until {
if !skip_until.has_prefix(&prefix) {
return Err(anyhow::anyhow!(
"cursor must have appropriate prefix {:?}, got {:?}",
prefix,
skip_until
));
}
let encoded_skip_until = skip_until.encode_key()?;
(encoded_skip_until.clone().., skip_until.clone())
} else {
(encoded_prefix.clone().., prefix.clone())
};

let latest_version = self.latest_version();
if version_on_disk.is_some_and(|v| v > latest_version.unwrap_or(0)) {
return Err(anyhow::anyhow!("Version on disk is newer than the latest version in the reader. Cannot create an iterator which is guaranteed to be consistent with the version of this storage."));
Expand All @@ -1040,7 +1071,7 @@ where
Some(
snapshot
.versioned_table_writes
.range(raw_prefix.clone())
.range(raw_range_start.clone()..)
.peekable(),
)
})
Expand All @@ -1049,7 +1080,7 @@ where
let db_iterator = Some(
self.db
.live_db
.iter_range_allow_cached::<V>(&read_lock, range, ScanDirection::Forward)?
.iter_range_allow_cached::<V>(&read_lock, encoded_range, ScanDirection::Forward)?
.peekable(),
);
Ok(VersionedDbIterator {
Expand Down
223 changes: 222 additions & 1 deletion tests/versioned_db_inner/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,18 @@ fn check_iterator(
delta_reader: &VersionedDeltaReader<LiveKeys, VersionedDbCache<LiveKeys>>,
prefix: &[u8],
expected_values: &[(&[u8], u32)],
) {
check_iterator_with_cursor(delta_reader, prefix, expected_values, None);
}

fn check_iterator_with_cursor(
delta_reader: &VersionedDeltaReader<LiveKeys, VersionedDbCache<LiveKeys>>,
prefix: &[u8],
expected_values: &[(&[u8], u32)],
cursor: Option<TestKey>,
) {
let mut iter = delta_reader
.iter_with_prefix(TestKey::from(prefix.to_vec()))
.iter_with_prefix_and_cursor(TestKey::from(prefix.to_vec()), cursor)
.unwrap();
for (key, value) in expected_values {
let next = iter.next();
Expand Down Expand Up @@ -165,6 +174,218 @@ fn test_iteration() {
.is_err());
}

#[test]
fn test_iteration_with_cursor() {
let test_db = TestDB::new();
let db = Arc::new(test_db.db);

let versioned_db_cache = VersionedDbCache::new(10_000);
let versioned_db = Arc::new(
VersionedDB::<LiveKeys, VersionedDbCache<LiveKeys>>::from_dbs(
db.clone(),
db.clone(),
versioned_db_cache,
)
.unwrap(),
);

// Check iteration against an empty DB.
let version = versioned_db.get_committed_version_live_db().unwrap();
assert_eq!(version, None);
let delta_reader = VersionedDeltaReader::<LiveKeys, VersionedDbCache<LiveKeys>>::new(
versioned_db.clone(),
None,
vec![],
);
let mut iter = delta_reader
.iter_with_prefix(TestKey::from(b"key".to_vec()))
.unwrap();
assert_eq!(iter.next(), None);
drop(iter);
let mut iter = delta_reader
.iter_with_prefix_and_cursor(
TestKey::from(b"key".to_vec()),
Some(TestKey::from(b"key11".to_vec())),
)
.unwrap();
assert_eq!(iter.next(), None);
drop(iter);

put_keys(
&versioned_db,
&[(b"key11", 0), (b"key19", 0), (b"key20", 0)],
0,
);

// Check iteration against prefixes "key". and key1
let delta_reader = VersionedDeltaReader::<LiveKeys, VersionedDbCache<LiveKeys>>::new(
versioned_db.clone(),
Some(0),
vec![],
);
check_iterator_with_cursor(
&delta_reader,
b"key",
&[(b"key11", 0), (b"key19", 0), (b"key20", 0)],
Some(TestKey::from(b"key11".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key",
&[(b"key11", 0), (b"key19", 0), (b"key20", 0)],
Some(TestKey::from(b"key10".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key",
&[(b"key19", 0), (b"key20", 0)],
Some(TestKey::from(b"key12".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key1",
&[(b"key11", 0), (b"key19", 0)],
Some(TestKey::from(b"key10".to_vec())),
);

let mut snapshot = VersionedSchemaBatch::<LiveKeys>::default();
snapshot.put_versioned(TestKey::from(b"key12".to_vec()), TestField::new(1));
snapshot.put_versioned(TestKey::from(b"key19".to_vec()), TestField::new(1));

let snapshot_1 = Arc::new(snapshot);
let mut snapshots = vec![snapshot_1.clone()];
let delta_reader = VersionedDeltaReader::<LiveKeys, VersionedDbCache<LiveKeys>>::new(
versioned_db.clone(),
Some(0),
snapshots.clone(),
);
check_iterator_with_cursor(
&delta_reader,
b"key",
&[(b"key11", 0), (b"key12", 1), (b"key19", 1), (b"key20", 0)],
Some(TestKey::from(b"key11".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key1",
&[(b"key11", 0), (b"key12", 1), (b"key19", 1)],
Some(TestKey::from(b"key10".to_vec())),
);

check_iterator_with_cursor(
&delta_reader,
b"key",
&[(b"key12", 1), (b"key19", 1), (b"key20", 0)],
Some(TestKey::from(b"key12".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key1",
&[(b"key12", 1), (b"key19", 1)],
Some(TestKey::from(b"key12".to_vec())),
);

let mut snapshot_2 = VersionedSchemaBatch::<LiveKeys>::default();
snapshot_2.put_versioned(TestKey::from(b"key12".to_vec()), TestField::new(2));
snapshot_2.delete_versioned(TestKey::from(b"key19".to_vec()));
let snapshot_2 = Arc::new(snapshot_2);
snapshots.push(snapshot_2.clone());

let delta_reader = VersionedDeltaReader::<LiveKeys, VersionedDbCache<LiveKeys>>::new(
versioned_db.clone(),
Some(0),
snapshots.clone(),
);
check_iterator_with_cursor(
&delta_reader,
b"key",
&[(b"key11", 0), (b"key12", 2), (b"key20", 0)],
Some(TestKey::from(b"key11".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key1",
&[(b"key11", 0), (b"key12", 2)],
Some(TestKey::from(b"key11".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key1",
&[(b"key12", 2)],
Some(TestKey::from(b"key12".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key",
&[(b"key11", 0), (b"key12", 2), (b"key20", 0)],
Some(TestKey::from(b"key11".to_vec())),
);

// Committing the oldest snapshot should not change the iterator.
commit_batch(&versioned_db, &snapshot_1, 1);
check_iterator_with_cursor(
&delta_reader,
b"key",
&[(b"key11", 0), (b"key12", 2), (b"key20", 0)],
Some(TestKey::from(b"key11".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key1",
&[(b"key11", 0), (b"key12", 2)],
Some(TestKey::from(b"key11".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key1",
&[(b"key12", 2)],
Some(TestKey::from(b"key12".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key",
&[(b"key11", 0), (b"key12", 2), (b"key20", 0)],
Some(TestKey::from(b"key11".to_vec())),
);

// Committing the second snapshot should not change the iterator.
commit_batch(&versioned_db, &snapshot_2, 2);
check_iterator_with_cursor(
&delta_reader,
b"key",
&[(b"key11", 0), (b"key12", 2), (b"key20", 0)],
Some(TestKey::from(b"key11".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key1",
&[(b"key11", 0), (b"key12", 2)],
Some(TestKey::from(b"key11".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key1",
&[(b"key12", 2)],
Some(TestKey::from(b"key12".to_vec())),
);
check_iterator_with_cursor(
&delta_reader,
b"key",
&[(b"key11", 0), (b"key12", 2), (b"key20", 0)],
Some(TestKey::from(b"key11".to_vec())),
);

// Commiting another batch should make it impossible to create an iterator
commit_batch(
&versioned_db,
&VersionedSchemaBatch::<LiveKeys>::default(),
3,
);
assert!(delta_reader
.iter_with_prefix(TestKey::from(b"key".to_vec()))
.is_err());
}

#[test]
fn test_open_iterator_blocks_writes() {
let test_db = TestDB::new();
Expand Down
Loading