diff --git a/src/versioned_db.rs b/src/versioned_db.rs index 11e3ee6..263151c 100644 --- a/src/versioned_db.rs +++ b/src/versioned_db.rs @@ -273,12 +273,33 @@ impl VersionedSchemaBatch where S::Key: Ord, { - /// Puts a key-value pair into the batch. - pub fn put_versioned(&mut self, key: S::Key, value: S::Value) { + /// Puts a non-empty key-value pair into the batch. + /// + /// # Panics + /// + /// Panics if `value` serializes to zero bytes. Empty-byte values collide + /// with the tombstone encoding used by the archival column family and + /// would be silently dropped on historical reads. If you intend to remove + /// the key, call [`Self::delete_versioned`] instead. + pub fn put_versioned(&mut self, key: S::Key, value: S::Value) + where + S::Value: AsRef<[u8]>, + { + assert!( + !value.as_ref().is_empty(), + "Versioned values may not be zero-length: empty-byte values collide \ + with the tombstone encoding used by the archival column family and \ + would be silently dropped by `get_historical_value_raw`. If you \ + intend to delete the key, call `delete_versioned` instead.", + ); self.versioned_table_writes.insert(key, Some(value)); } /// Deletes a key from the batch. + /// + /// `put_versioned` rejects zero-length values because they collide with + /// the archival tombstone encoding, so deletion must go through this + /// method. pub fn delete_versioned(&mut self, key: S::Key) { self.versioned_table_writes.insert(key, None); } @@ -622,7 +643,13 @@ where // println!("Writing live key with version: {}. {:?}", version, key.as_ref()); match value { Some(value) => { - assert!(!key_with_version.live_key().is_empty(), "Live values may not have zero-length. This prevents confusion with placholders for deleted values."); + assert!( + !value.as_ref().is_empty(), + "Versioned values may not have zero-length: empty-byte values collide \ + with the tombstone placeholder used for deleted values. This indicates \ + a caller constructed a VersionedSchemaBatch bypassing `put_versioned` \ + (e.g. via the `From` impl or a future direct-insert helper).", + ); // println!("Inserting live key: {:?}", key_with_version.live_key()); live_put_bytes += key_with_version.live_key().len() + value.as_ref().len(); live_db_batch.put_cf(live_cf_handle, key_with_version.live_key(), value); diff --git a/tests/versioned_db_inner/empty_value.rs b/tests/versioned_db_inner/empty_value.rs new file mode 100644 index 0000000..d25a51c --- /dev/null +++ b/tests/versioned_db_inner/empty_value.rs @@ -0,0 +1,54 @@ +use rockbound::versioned_db::VersionedSchemaBatch; + +use crate::versioned_db_inner::{setup_bytevec_db, ByteVecSchema, TestByteVec, TestKey}; + +#[test] +#[should_panic(expected = "Versioned values may not be zero-length")] +fn test_put_versioned_empty_value_panics() { + let mut batch = VersionedSchemaBatch::::default(); + batch.put_versioned(TestKey::from(b"k".to_vec()), TestByteVec(Vec::new())); +} + +#[test] +fn test_put_versioned_non_empty_round_trip() { + let (_tmpdir, vdb) = setup_bytevec_db(); + let key = TestKey::from(b"k".to_vec()); + let value = TestByteVec(vec![0x42]); + + let mut batch = VersionedSchemaBatch::::default(); + batch.put_versioned(key.clone(), value.clone()); + vdb.commit(&batch, 0).unwrap(); + + assert_eq!(vdb.get_live_value(&key).unwrap(), Some(value.clone())); + assert_eq!(vdb.get_historical_value(&key, 0).unwrap(), Some(value)); +} + +#[test] +fn test_delete_versioned_still_hides_key() { + let (_tmpdir, vdb) = setup_bytevec_db(); + let key = TestKey::from(b"k".to_vec()); + let value_v0 = TestByteVec(vec![0x01]); + + let mut b0 = VersionedSchemaBatch::::default(); + b0.put_versioned(key.clone(), value_v0.clone()); + vdb.commit(&b0, 0).unwrap(); + + let mut b1 = VersionedSchemaBatch::::default(); + b1.delete_versioned(key.clone()); + vdb.commit(&b1, 1).unwrap(); + + assert_eq!(vdb.get_historical_value(&key, 0).unwrap(), Some(value_v0)); + assert_eq!(vdb.get_historical_value(&key, 1).unwrap(), None); + assert_eq!(vdb.get_live_value(&key).unwrap(), None); +} + +#[test] +#[should_panic(expected = "Versioned values may not have zero-length")] +fn test_commit_loop_empty_value_panics_if_bypassed() { + let (_tmpdir, vdb) = setup_bytevec_db(); + let batch = VersionedSchemaBatch::::from([( + TestKey::from(b"k".to_vec()), + Some(TestByteVec(Vec::new())), + )]); + let _ = vdb.commit(&batch, 0); +} diff --git a/tests/versioned_db_inner/mod.rs b/tests/versioned_db_inner/mod.rs index c74b976..0d0252e 100644 --- a/tests/versioned_db_inner/mod.rs +++ b/tests/versioned_db_inner/mod.rs @@ -1,5 +1,6 @@ mod cache; mod delta_reader; +mod empty_value; mod iterator; use std::sync::Arc; @@ -130,6 +131,10 @@ fn get_column_families() -> Vec { LiveKeys::HISTORICAL_COLUMN_FAMILY_NAME, LiveKeys::PRUNING_COLUMN_FAMILY_NAME, LiveKeys::VERSION_METADATA_COLUMN_FAMILY_NAME, + ByteVecSchema::COLUMN_FAMILY_NAME, + ByteVecSchema::HISTORICAL_COLUMN_FAMILY_NAME, + ByteVecSchema::PRUNING_COLUMN_FAMILY_NAME, + ByteVecSchema::VERSION_METADATA_COLUMN_FAMILY_NAME, ] } @@ -268,3 +273,91 @@ impl CacheForVersionedDB for VersionedDbCache { Some(RwLockReadGuard::map(lock, |c| c)) } } + +/// Variable-length value type for exercising edge cases in the versioned-db +/// encoding. Unlike `TestField` (fixed 4 bytes), this can round-trip payloads +/// of arbitrary length including zero, which is needed to verify that +/// `put_versioned` rejects empty values. +#[derive(Debug, Default, Clone, PartialOrd, Ord, PartialEq, Eq)] +pub struct TestByteVec(pub Vec); + +impl AsRef<[u8]> for TestByteVec { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl ValueCodec for TestByteVec { + fn encode_value(&self) -> Result, CodecError> { + Ok(self.0.clone()) + } + + fn decode_value(data: &[u8]) -> Result { + Ok(Self(data.to_vec())) + } +} + +#[derive(Debug, Default, Clone, PartialOrd, Ord, PartialEq, Eq)] +pub struct ByteVecSchema; + +impl Schema for ByteVecSchema { + type Key = TestKey; + type Value = TestByteVec; + const COLUMN_FAMILY_NAME: ColumnFamilyName = "ByteVecLiveCF"; +} + +impl SchemaWithVersion for ByteVecSchema { + const HISTORICAL_COLUMN_FAMILY_NAME: ColumnFamilyName = "ByteVecHistoricalCF"; + const PRUNING_COLUMN_FAMILY_NAME: ColumnFamilyName = "ByteVecPruningCF"; + const VERSION_METADATA_COLUMN_FAMILY_NAME: ColumnFamilyName = "ByteVecVersionMetadataCF"; +} + +impl KeyEncoder for TestKey { + fn encode_key(&self) -> Result, CodecError> { + Ok(self.0.as_ref().to_vec()) + } +} + +impl KeyDecoder for TestKey { + fn decode_key(data: &[u8]) -> Result { + Ok(TestKey::from(data.to_vec())) + } +} + +impl CacheForVersionedDB for VersionedDbCache { + fn write(&self) -> parking_lot::MappedRwLockWriteGuard<'_, CacheForSchema> { + RwLockWriteGuard::map(self.cache.write(), |c| c) + } + + fn read(&self) -> parking_lot::MappedRwLockReadGuard<'_, CacheForSchema> { + RwLockReadGuard::map(self.cache.read(), |c| c) + } + + fn try_read( + &self, + ) -> Option>> { + let lock = self.cache.try_read()?; + Some(RwLockReadGuard::map(lock, |c| c)) + } +} + +/// Creates an on-disk `VersionedDB` backed by `ByteVecSchema`. The returned +/// `TempDir` must be kept alive for the lifetime of the returned `VersionedDB`; +/// callers typically bind it to a `_tmpdir` local. +pub fn setup_bytevec_db() -> ( + TempDir, + Arc>>, +) { + let tmpdir = tempfile::tempdir().unwrap(); + let db = Arc::new(open_db(&tmpdir)); + let cache = VersionedDbCache::::new(1_000); + let vdb = Arc::new( + VersionedDB::>::from_dbs( + db.clone(), + db.clone(), + cache, + ) + .unwrap(), + ); + (tmpdir, vdb) +}