Skip to content
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:

check:
name: check
runs-on: buildjet-4vcpu-ubuntu-2204
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v3
Expand All @@ -48,7 +48,7 @@ jobs:
# Check that every combination of features is working properly.
features:
name: features
runs-on: buildjet-8vcpu-ubuntu-2204
runs-on: ubuntu-latest
timeout-minutes: 120
steps:
- uses: actions/checkout@v4
Expand All @@ -63,7 +63,7 @@ jobs:
run: cargo hack check --workspace --feature-powerset --all-targets
test:
name: test
runs-on: buildjet-4vcpu-ubuntu-2204
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v3
Expand Down
48 changes: 48 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ impl Default for RocksdbConfig {

/// Generate [`rocksdb::Options`] corresponding to the given [`RocksdbConfig`].
pub fn gen_rocksdb_options(config: &RocksdbConfig, readonly: bool) -> rocksdb::Options {
gen_rocksdb_options_with(config, readonly, |_| {})
}

/// Generate [`rocksdb::Options`] from [`RocksdbConfig`] and then allow callers to apply
/// additional upstream RocksDB settings without replacing the existing default behavior.
pub fn gen_rocksdb_options_with(
config: &RocksdbConfig,
readonly: bool,
customize: impl FnOnce(&mut rocksdb::Options),
) -> rocksdb::Options {
let mut db_opts = rocksdb::Options::default();
db_opts.set_max_open_files(config.max_open_files);
db_opts.set_max_total_wal_size(config.max_total_wal_size);
Expand All @@ -52,5 +62,43 @@ pub fn gen_rocksdb_options(config: &RocksdbConfig, readonly: bool) -> rocksdb::O
// Default: false
}

customize(&mut db_opts);

db_opts
}

#[cfg(test)]
mod tests {
use crate::{DB, DEFAULT_COLUMN_FAMILY_NAME};

use super::*;

#[test]
fn gen_rocksdb_options_with_preserves_existing_defaults_and_allows_customization() {
let config = RocksdbConfig::default();

let tmpdir = tempfile::tempdir().unwrap();
let missing_db_path = tmpdir.path().join("missing-db");
let writable_db_path = tmpdir.path().join("writable-db");
let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME];

assert!(DB::open(
&missing_db_path,
"missing-db",
column_families.clone(),
&gen_rocksdb_options(&config, true),
)
.is_err());

let writable_config = gen_rocksdb_options_with(&config, false, |opts| {
opts.set_keep_log_file_num(7);
});
DB::open(
&writable_db_path,
"writable-db",
column_families,
&writable_config,
)
.unwrap();
}
}
90 changes: 86 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ mod config;
#[cfg(feature = "test-utils")]
pub mod test;

pub use config::{gen_rocksdb_options, RocksdbConfig};
pub use config::{gen_rocksdb_options, gen_rocksdb_options_with, RocksdbConfig};
pub use versioned_db::VersionedColumnFamilyKind;

use std::{path::Path, sync::Arc};

Expand Down Expand Up @@ -97,9 +98,71 @@ pub struct DB {

/// Returns the default column family descriptor. Includes LZ4 compression.
pub fn default_cf_descriptor(cf_name: impl Into<String>) -> rocksdb::ColumnFamilyDescriptor {
let mut cf_opts = rocksdb::Options::default();
cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
rocksdb::ColumnFamilyDescriptor::new(cf_name, cf_opts)
default_cf_descriptor_with(cf_name, |_, _| {})
}

/// Mutable builder for a column family descriptor, including optional block-based table options.
#[derive(Default)]
pub struct CfDescriptorBuilder {
cf_opts: rocksdb::Options,
table_opts: Option<rocksdb::BlockBasedOptions>,
}

impl CfDescriptorBuilder {
/// Create a new builder with default RocksDB options.
pub fn new() -> Self {
Self::default()
}

/// Access the mutable RocksDB column family options.
pub fn options_mut(&mut self) -> &mut rocksdb::Options {
&mut self.cf_opts
}

/// Access block-based table options, creating them if they do not exist yet.
pub fn block_based_table_options_mut(&mut self) -> &mut rocksdb::BlockBasedOptions {
self.table_opts
.get_or_insert_with(rocksdb::BlockBasedOptions::default)
}

/// Apply RocksDB's point-lookup tuning while keeping the block-based table options mutable.
Comment thread
preston-evans98 marked this conversation as resolved.
pub fn optimize_for_point_lookup(&mut self, block_cache_size_mb: u64) {
let block_cache_size = usize::try_from(block_cache_size_mb)
.ok()
.and_then(|mb| mb.checked_mul(1024 * 1024))
.expect("point-lookup block cache size must fit in usize");
let block_opts = self.block_based_table_options_mut();
block_opts.set_data_block_index_type(rocksdb::DataBlockIndexType::BinaryAndHash);
block_opts.set_data_block_hash_ratio(0.75);
block_opts.set_bloom_filter(10.0, true);
block_opts.set_block_cache(&rocksdb::Cache::new_lru_cache(block_cache_size));
self.cf_opts.set_memtable_prefix_bloom_ratio(0.02);
self.cf_opts.set_memtable_whole_key_filtering(true);
}

/// Finalize the builder into a RocksDB column family descriptor.
pub fn finish(mut self, cf_name: impl Into<String>) -> rocksdb::ColumnFamilyDescriptor {
if let Some(table_opts) = self.table_opts.take() {
self.cf_opts.set_block_based_table_factory(&table_opts);
}

rocksdb::ColumnFamilyDescriptor::new(cf_name, self.cf_opts)
}
}

/// Returns the default column family descriptor and lets callers customize the RocksDB options
/// before the descriptor is finalized.
Comment thread
preston-evans98 marked this conversation as resolved.
Outdated
pub fn default_cf_descriptor_with(
cf_name: impl Into<String>,
customize: impl FnOnce(&str, &mut CfDescriptorBuilder),
) -> rocksdb::ColumnFamilyDescriptor {
let cf_name = cf_name.into();
let mut builder = CfDescriptorBuilder::new();
builder
.options_mut()
.set_compression_type(rocksdb::DBCompressionType::Lz4);
customize(&cf_name, &mut builder);
builder.finish(cf_name)
}

impl DB {
Expand All @@ -121,6 +184,25 @@ impl DB {
Ok(db)
}

/// Opens a database backed by RocksDB, using one shared callback to customize each default
/// column family descriptor before opening the DB.
#[tracing::instrument(skip_all, level = "error")]
pub fn open_with_default_cfs(
path: impl AsRef<Path>,
name: &'static str,
column_families: impl IntoIterator<Item = impl Into<String>>,
db_opts: &rocksdb::Options,
mut customize_cf: impl FnMut(&str, &mut CfDescriptorBuilder),
) -> anyhow::Result<Self> {
let descriptors = column_families.into_iter().map(|cf| {
default_cf_descriptor_with(cf.into(), |cf_name, builder| {
customize_cf(cf_name, builder);
})
});
let db = DB::open_with_cfds(db_opts, path, name, descriptors)?;
Ok(db)
}

/// Open RocksDB with the provided column family descriptors.
/// This allows the caller to configure options for each column family.
#[tracing::instrument(skip_all, level = "error")]
Expand Down
109 changes: 90 additions & 19 deletions src/versioned_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use quick_cache::sync::Cache;
use rocksdb::ColumnFamilyDescriptor;

use crate::{
default_cf_descriptor,
default_cf_descriptor_with,
iterator::{RawDbIter, ScanDirection},
metrics::{SCHEMADB_BATCH_COMMIT_BYTES, SCHEMADB_DELETES, SCHEMADB_PUT_BYTES},
schema::{ColumnFamilyName, KeyDecoder, KeyEncoder, ValueCodec},
BasicWeighter, CacheForSchema, CodecError, Schema, DB,
BasicWeighter, CacheForSchema, CfDescriptorBuilder, CodecError, Schema, DB,
};

#[derive(Debug, Default)]
Expand Down Expand Up @@ -284,12 +284,28 @@ where
}
}

fn live_versioned_column_family_descriptor(name: &str) -> ColumnFamilyDescriptor {
let mut cf_opts: rocksdb::Options = rocksdb::Options::default();
cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
// Use a 1GB block cache. TODO: Tune this value
cf_opts.optimize_for_point_lookup(1024);
rocksdb::ColumnFamilyDescriptor::new(name, cf_opts)
/// The role of a versioned column family within a live or archival RocksDB instance.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum VersionedColumnFamilyKind {
/// The live key/value column family.
Live,
/// The historical versioned values column family.
Historical,
/// The pruning keys column family.
Pruning,
/// The version metadata column family.
Metadata,
}

fn live_versioned_column_family_descriptor_with(
name: &str,
customize: impl FnOnce(&str, VersionedColumnFamilyKind, &mut CfDescriptorBuilder),
) -> ColumnFamilyDescriptor {
default_cf_descriptor_with(name, |cf_name, builder| {
// Use a 1GB block cache. TODO: Tune this value
builder.optimize_for_point_lookup(1024);
customize(cf_name, VersionedColumnFamilyKind::Live, builder);
})
}

/// A marker trait showing that a type **HAS NOOP SERIALIZATION** implements `Clone` and `AsRef<[u8]>`, is cheaply cloneable.
Expand Down Expand Up @@ -372,20 +388,42 @@ where
/// Adds the column families for the live db.
pub fn add_live_db_column_families(
existing_column_families: &mut Vec<ColumnFamilyDescriptor>,
) -> anyhow::Result<()> {
Self::add_live_db_column_families_with(existing_column_families, |_, _, _| {})
}

/// Adds the column families for the live db, allowing callers to customize each descriptor.
pub fn add_live_db_column_families_with(
existing_column_families: &mut Vec<ColumnFamilyDescriptor>,
mut customize: impl FnMut(&str, VersionedColumnFamilyKind, &mut CfDescriptorBuilder),
) -> anyhow::Result<()> {
let live_column_family = V::COLUMN_FAMILY_NAME;
let metadata_column_family = V::VERSION_METADATA_COLUMN_FAMILY_NAME;

Self::validate_column_families(existing_column_families)?;

existing_column_families.push(live_versioned_column_family_descriptor(live_column_family));
existing_column_families.push(default_cf_descriptor(metadata_column_family));
existing_column_families.push(live_versioned_column_family_descriptor_with(
live_column_family,
|cf_name, kind, builder| customize(cf_name, kind, builder),
));
existing_column_families.push(default_cf_descriptor_with(
metadata_column_family,
|cf_name, builder| customize(cf_name, VersionedColumnFamilyKind::Metadata, builder),
));
Ok(())
}

/// Adds the column families for the archival db.
pub fn add_archival_db_column_families(
existing_column_families: &mut Vec<ColumnFamilyDescriptor>,
) -> anyhow::Result<()> {
Self::add_archival_db_column_families_with(existing_column_families, |_, _, _| {})
}

/// Adds the column families for the archival db, allowing callers to customize each descriptor.
pub fn add_archival_db_column_families_with(
existing_column_families: &mut Vec<ColumnFamilyDescriptor>,
mut customize: impl FnMut(&str, VersionedColumnFamilyKind, &mut CfDescriptorBuilder),
) -> anyhow::Result<()> {
let historical_versioned_column_family = V::HISTORICAL_COLUMN_FAMILY_NAME;
let pruning_column_family = V::PRUNING_COLUMN_FAMILY_NAME;
Expand All @@ -394,10 +432,22 @@ where

Self::validate_column_families(existing_column_families)?;

existing_column_families.push(default_cf_descriptor(historical_versioned_column_family));
existing_column_families.push(default_cf_descriptor(pruning_column_family));
existing_column_families.push(live_versioned_column_family_descriptor(live_column_family));
existing_column_families.push(default_cf_descriptor(metadata_column_family));
existing_column_families.push(default_cf_descriptor_with(
historical_versioned_column_family,
|cf_name, builder| customize(cf_name, VersionedColumnFamilyKind::Historical, builder),
));
existing_column_families.push(default_cf_descriptor_with(
pruning_column_family,
|cf_name, builder| customize(cf_name, VersionedColumnFamilyKind::Pruning, builder),
));
existing_column_families.push(live_versioned_column_family_descriptor_with(
live_column_family,
|cf_name, kind, builder| customize(cf_name, kind, builder),
));
existing_column_families.push(default_cf_descriptor_with(
metadata_column_family,
|cf_name, builder| customize(cf_name, VersionedColumnFamilyKind::Metadata, builder),
));
Ok(())
}

Expand Down Expand Up @@ -959,12 +1009,33 @@ where
///
/// Note that the returned iterator holds a read lock over the DB, so no writes can complete while it is active.
pub fn iter_with_prefix(&self, prefix: V::Key) -> anyhow::Result<VersionedDbIterator<'_, V>> {
self.iter_with_prefix_and_cursor(prefix, None)
}

/// Construct an iterator over the versioned DB with a given prefix, skipping until the given key is reached. Note that
/// any values exactly equal to the cursor *will* be included in the iterator.
///
/// Note that the returned iterator holds a read lock over the DB, so no writes can complete while it is active.
pub fn iter_with_prefix_and_cursor(
&self,
prefix: V::Key,
skip_until: Option<V::Key>,
) -> anyhow::Result<VersionedDbIterator<'_, V>> {
let read_lock = self.db.versioned_db_cache.read();
let version_on_disk = self.db.get_committed_version_live_db()?;
// println!("Version on disk: {:?}", version_on_disk);
let raw_prefix = prefix.clone()..;
let encoded_prefix = prefix.encode_key()?;
Comment thread
preston-evans98 marked this conversation as resolved.
let range = encoded_prefix.clone()..;

let (encoded_range, raw_range_start): (_, V::Key) = if let Some(skip_until) = skip_until {
if !skip_until.has_prefix(&prefix) {
return Err(anyhow::anyhow!("cursor must have appropriate prefix"));
Comment thread
preston-evans98 marked this conversation as resolved.
Outdated
}
let encoded_skip_until = skip_until.encode_key()?;
(encoded_skip_until.clone().., skip_until.clone())
} else {
let encoded_prefix = prefix.encode_key()?;
(encoded_prefix.clone().., prefix.clone())
};

let latest_version = self.latest_version();
if version_on_disk.is_some_and(|v| v > latest_version.unwrap_or(0)) {
return Err(anyhow::anyhow!("Version on disk is newer than the latest version in the reader. Cannot create an iterator which is guaranteed to be consistent with the version of this storage."));
Expand All @@ -990,7 +1061,7 @@ where
Some(
snapshot
.versioned_table_writes
.range(raw_prefix.clone())
.range(raw_range_start.clone()..)
.peekable(),
)
})
Expand All @@ -999,7 +1070,7 @@ where
let db_iterator = Some(
self.db
.live_db
.iter_range_allow_cached::<V>(&read_lock, range, ScanDirection::Forward)?
.iter_range_allow_cached::<V>(&read_lock, encoded_range, ScanDirection::Forward)?
.peekable(),
);
Ok(VersionedDbIterator {
Expand Down
Loading
Loading