Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions src/versioned_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,33 @@ impl<S: Schema> VersionedSchemaBatch<S>
where
S::Key: Ord,
{
/// Puts a key-value pair into the batch.
pub fn put_versioned(&mut self, key: S::Key, value: S::Value) {
/// Puts a non-empty key-value pair into the batch.
///
/// # Panics
///
/// Panics if `value` serializes to zero bytes. Empty-byte values collide
/// with the tombstone encoding used by the archival column family and
/// would be silently dropped on historical reads. If you intend to remove
/// the key, call [`Self::delete_versioned`] instead.
pub fn put_versioned(&mut self, key: S::Key, value: S::Value)
where
S::Value: AsRef<[u8]>,
{
assert!(
!value.as_ref().is_empty(),
"Versioned values may not be zero-length: empty-byte values collide \
with the tombstone encoding used by the archival column family and \
would be silently dropped by `get_historical_value_raw`. If you \
intend to delete the key, call `delete_versioned` instead.",
);
self.versioned_table_writes.insert(key, Some(value));
}

/// Deletes a key from the batch.
///
/// `put_versioned` rejects zero-length values because they collide with
/// the archival tombstone encoding, so deletion must go through this
/// method.
pub fn delete_versioned(&mut self, key: S::Key) {
self.versioned_table_writes.insert(key, None);
}
Expand Down Expand Up @@ -622,7 +643,13 @@ where
// println!("Writing live key with version: {}. {:?}", version, key.as_ref());
match value {
Some(value) => {
assert!(!key_with_version.live_key().is_empty(), "Live values may not have zero-length. This prevents confusion with placholders for deleted values.");
assert!(
!value.as_ref().is_empty(),
"Versioned values may not have zero-length: empty-byte values collide \
with the tombstone placeholder used for deleted values. This indicates \
a caller constructed a VersionedSchemaBatch bypassing `put_versioned` \
(e.g. via the `From<IntoIterator>` impl or a future direct-insert helper).",
);
// println!("Inserting live key: {:?}", key_with_version.live_key());
live_put_bytes += key_with_version.live_key().len() + value.as_ref().len();
live_db_batch.put_cf(live_cf_handle, key_with_version.live_key(), value);
Expand Down
54 changes: 54 additions & 0 deletions tests/versioned_db_inner/empty_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use rockbound::versioned_db::VersionedSchemaBatch;

use crate::versioned_db_inner::{setup_bytevec_db, ByteVecSchema, TestByteVec, TestKey};

#[test]
#[should_panic(expected = "Versioned values may not be zero-length")]
fn test_put_versioned_empty_value_panics() {
let mut batch = VersionedSchemaBatch::<ByteVecSchema>::default();
batch.put_versioned(TestKey::from(b"k".to_vec()), TestByteVec(Vec::new()));
}

#[test]
fn test_put_versioned_non_empty_round_trip() {
let (_tmpdir, vdb) = setup_bytevec_db();
let key = TestKey::from(b"k".to_vec());
let value = TestByteVec(vec![0x42]);

let mut batch = VersionedSchemaBatch::<ByteVecSchema>::default();
batch.put_versioned(key.clone(), value.clone());
vdb.commit(&batch, 0).unwrap();

assert_eq!(vdb.get_live_value(&key).unwrap(), Some(value.clone()));
assert_eq!(vdb.get_historical_value(&key, 0).unwrap(), Some(value));
}

#[test]
fn test_delete_versioned_still_hides_key() {
let (_tmpdir, vdb) = setup_bytevec_db();
let key = TestKey::from(b"k".to_vec());
let value_v0 = TestByteVec(vec![0x01]);

let mut b0 = VersionedSchemaBatch::<ByteVecSchema>::default();
b0.put_versioned(key.clone(), value_v0.clone());
vdb.commit(&b0, 0).unwrap();

let mut b1 = VersionedSchemaBatch::<ByteVecSchema>::default();
b1.delete_versioned(key.clone());
vdb.commit(&b1, 1).unwrap();

assert_eq!(vdb.get_historical_value(&key, 0).unwrap(), Some(value_v0));
assert_eq!(vdb.get_historical_value(&key, 1).unwrap(), None);
assert_eq!(vdb.get_live_value(&key).unwrap(), None);
}

#[test]
#[should_panic(expected = "Versioned values may not have zero-length")]
fn test_commit_loop_empty_value_panics_if_bypassed() {
let (_tmpdir, vdb) = setup_bytevec_db();
let batch = VersionedSchemaBatch::<ByteVecSchema>::from([(
TestKey::from(b"k".to_vec()),
Some(TestByteVec(Vec::new())),
)]);
let _ = vdb.commit(&batch, 0);
}
93 changes: 93 additions & 0 deletions tests/versioned_db_inner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod cache;
mod delta_reader;
mod empty_value;
mod iterator;

use std::sync::Arc;
Expand Down Expand Up @@ -130,6 +131,10 @@ fn get_column_families() -> Vec<ColumnFamilyName> {
LiveKeys::HISTORICAL_COLUMN_FAMILY_NAME,
LiveKeys::PRUNING_COLUMN_FAMILY_NAME,
LiveKeys::VERSION_METADATA_COLUMN_FAMILY_NAME,
ByteVecSchema::COLUMN_FAMILY_NAME,
ByteVecSchema::HISTORICAL_COLUMN_FAMILY_NAME,
ByteVecSchema::PRUNING_COLUMN_FAMILY_NAME,
ByteVecSchema::VERSION_METADATA_COLUMN_FAMILY_NAME,
]
}

Expand Down Expand Up @@ -268,3 +273,91 @@ impl CacheForVersionedDB<LiveKeys> for VersionedDbCache<LiveKeys> {
Some(RwLockReadGuard::map(lock, |c| c))
}
}

/// Variable-length value type for exercising edge cases in the versioned-db
/// encoding. Unlike `TestField` (fixed 4 bytes), this can round-trip payloads
/// of arbitrary length including zero, which is needed to verify that
/// `put_versioned` rejects empty values.
#[derive(Debug, Default, Clone, PartialOrd, Ord, PartialEq, Eq)]
pub struct TestByteVec(pub Vec<u8>);

impl AsRef<[u8]> for TestByteVec {
fn as_ref(&self) -> &[u8] {
&self.0
}
}

impl<S: Schema> ValueCodec<S> for TestByteVec {
fn encode_value(&self) -> Result<Vec<u8>, CodecError> {
Ok(self.0.clone())
}

fn decode_value(data: &[u8]) -> Result<Self, CodecError> {
Ok(Self(data.to_vec()))
}
}

#[derive(Debug, Default, Clone, PartialOrd, Ord, PartialEq, Eq)]
pub struct ByteVecSchema;

impl Schema for ByteVecSchema {
type Key = TestKey;
type Value = TestByteVec;
const COLUMN_FAMILY_NAME: ColumnFamilyName = "ByteVecLiveCF";
}

impl SchemaWithVersion for ByteVecSchema {
const HISTORICAL_COLUMN_FAMILY_NAME: ColumnFamilyName = "ByteVecHistoricalCF";
const PRUNING_COLUMN_FAMILY_NAME: ColumnFamilyName = "ByteVecPruningCF";
const VERSION_METADATA_COLUMN_FAMILY_NAME: ColumnFamilyName = "ByteVecVersionMetadataCF";
}

impl KeyEncoder<ByteVecSchema> for TestKey {
fn encode_key(&self) -> Result<Vec<u8>, CodecError> {
Ok(self.0.as_ref().to_vec())
}
}

impl KeyDecoder<ByteVecSchema> for TestKey {
fn decode_key(data: &[u8]) -> Result<Self, CodecError> {
Ok(TestKey::from(data.to_vec()))
}
}

impl CacheForVersionedDB<ByteVecSchema> for VersionedDbCache<ByteVecSchema> {
fn write(&self) -> parking_lot::MappedRwLockWriteGuard<'_, CacheForSchema<ByteVecSchema>> {
RwLockWriteGuard::map(self.cache.write(), |c| c)
}

fn read(&self) -> parking_lot::MappedRwLockReadGuard<'_, CacheForSchema<ByteVecSchema>> {
RwLockReadGuard::map(self.cache.read(), |c| c)
}

fn try_read(
&self,
) -> Option<parking_lot::MappedRwLockReadGuard<'_, CacheForSchema<ByteVecSchema>>> {
let lock = self.cache.try_read()?;
Some(RwLockReadGuard::map(lock, |c| c))
}
}

/// Creates an on-disk `VersionedDB` backed by `ByteVecSchema`. The returned
/// `TempDir` must be kept alive for the lifetime of the returned `VersionedDB`;
/// callers typically bind it to a `_tmpdir` local.
pub fn setup_bytevec_db() -> (
TempDir,
Arc<VersionedDB<ByteVecSchema, VersionedDbCache<ByteVecSchema>>>,
) {
let tmpdir = tempfile::tempdir().unwrap();
let db = Arc::new(open_db(&tmpdir));
let cache = VersionedDbCache::<ByteVecSchema>::new(1_000);
let vdb = Arc::new(
VersionedDB::<ByteVecSchema, VersionedDbCache<ByteVecSchema>>::from_dbs(
db.clone(),
db.clone(),
cache,
)
.unwrap(),
);
(tmpdir, vdb)
}
Loading