diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index bc6a4c4c82c..7ac55b0d7aa 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -146,6 +146,58 @@ std::vector> transform_batch_items_or_t return result; } +folly::Future LocalVersionedEngine::delete_unreferenced_pruned_indexes( + const std::shared_ptr& entry +) { + try { + // Phase 2 of prune (sweep). write_and_prune_previous has already persisted the prune in a + // single journal entry: the most recent previous versions are retained as individual + // TOMBSTONEs above the TOMBSTONE_ALL high-water mark, and everything at or below that line is + // buried. It handed us the loaded+mutated entry (no extra storage round-trip needed). Here we: + // 1. Split the tombstoned indexes by the line: the buried block (<= line) is to_delete, the + // retained block (> line) is protected so its shared data (append-inheritance / dedup) + // survives. + // 2. Physically delete to_delete, except snapshotted keys. + // No further version-map write is needed — the line is already on disk, so the chain stays + // bounded. Under delayed_deletes the physical delete is skipped (the background tool reclaims + // via its own reference check). + if (!entry->tombstone_all_) + return folly::Unit(); + const StreamId stream_id = entry->head_->id(); + const VersionId line = entry->tombstone_all_->version_id(); + + std::vector to_delete; + std::vector retained; + for (auto& k : entry->get_tombstoned_indexes()) { + if (k.version_id() <= line) + to_delete.emplace_back(std::move(k)); + else + retained.emplace_back(std::move(k)); + } + if (to_delete.empty() || cfg().write_options().delayed_deletes()) + return folly::Unit(); + + // TODO: the following function will load all snapshots, which will be horrifyingly inefficient when called + // multiple times from batch_* + auto [not_in_snaps, in_snaps] = + get_index_keys_partitioned_by_inclusion_in_snapshots(store(), stream_id, std::move(to_delete)); + for (auto& k : retained) + in_snaps.insert(std::move(k)); + PreDeleteChecks checks{false, false, false, false, std::move(in_snaps)}; + // Return the chain rather than blocking with .get(): this runs inside a threadpool task, and + // waiting on threadpool work from within it can deadlock. + return delete_trees_responsibly(store(), version_map(), not_in_snaps, {}, {}, checks) + .thenValue([](auto&&) { return folly::Unit{}; }) + .thenError(folly::tag_t{}, [](auto const& ex) { + log::version().warn("Failed to clean up pruned previous versions due to: {}", ex.what()); + }); + } catch (const std::exception& ex) { + // Best-effort so deliberately swallow + log::version().warn("Failed to clean up pruned previous versions due to: {}", ex.what()); + } + return folly::Unit(); +} + folly::Future LocalVersionedEngine::delete_unreferenced_pruned_indexes( std::vector&& pruned_indexes, const AtomKey& key_to_keep ) { @@ -159,7 +211,7 @@ folly::Future LocalVersionedEngine::delete_unreferenced_pruned_inde in_snaps.insert(key_to_keep); PreDeleteChecks checks{false, false, false, false, std::move(in_snaps)}; return delete_trees_responsibly(store(), version_map(), not_in_snaps, {}, {}, checks) - .thenValueInline([](auto&&) { return folly::Unit{}; }) // drop the DeleteTreesStats + .thenValue([](auto&&) { return folly::Unit{}; }) .thenError(folly::tag_t{}, [](auto const& ex) { log::version().warn("Failed to clean up pruned previous versions due to: {}", ex.what()); }); @@ -341,15 +393,27 @@ std::optional LocalVersionedEngine::get_specific_version( std::optional LocalVersionedEngine::get_version_at_time( const StreamId& stream_id, timestamp as_of, const VersionQuery& version_query ) { - - auto index_key = load_index_key_from_time(store(), version_map(), stream_id, as_of); - if (!index_key && std::get(version_query.content_).iterate_snapshots_if_tombstoned) { - auto index_keys = get_index_keys_in_snapshots(store(), stream_id); - auto vector_index_keys = std::vector(index_keys.begin(), index_keys.end()); - std::sort(std::begin(vector_index_keys), std::end(vector_index_keys), [](auto& k1, auto& k2) { - return k1.creation_ts() > k2.creation_ts(); - }); - index_key = get_index_key_from_time(as_of, vector_index_keys); + LoadStrategy load_strategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, as_of}; + auto entry = version_map()->check_reload(store(), stream_id, load_strategy, __FUNCTION__); + + auto live_key = get_index_key_from_time(as_of, entry->get_indexes(false)); + auto index_key = live_key; + + if (std::get(version_query.content_).iterate_snapshots_if_tombstoned && + (!entry->tombstones_.empty() || entry->tombstone_all_.has_value())) { + // A deleted version may be more recent than the live one at this timestamp. + // get_indexes(true) reuses the already-loaded entry — no extra I/O. + auto best_key = get_index_key_from_time(as_of, entry->get_indexes(true)); + if (!live_key || (best_key && best_key->version_id() > live_key->version_id())) { + auto index_keys = get_index_keys_in_snapshots(store(), stream_id); + auto vector_index_keys = std::vector(index_keys.begin(), index_keys.end()); + std::sort(std::begin(vector_index_keys), std::end(vector_index_keys), [](auto& k1, auto& k2) { + return k1.creation_ts() > k2.creation_ts(); + }); + auto snap_key = get_index_key_from_time(as_of, vector_index_keys); + if (snap_key && (!live_key || snap_key->version_id() > live_key->version_id())) + index_key = snap_key; + } } if (!index_key) { @@ -1654,8 +1718,8 @@ void LocalVersionedEngine::write_version_and_prune_previous( bool prune_previous_versions, const AtomKey& new_version, const std::optional& previous_key ) { if (prune_previous_versions) { - auto pruned_indexes = version_map()->write_and_prune_previous(store(), new_version, previous_key); - delete_unreferenced_pruned_indexes(std::move(pruned_indexes), new_version).get(); + auto entry = version_map()->write_and_prune_previous(store(), new_version); + delete_unreferenced_pruned_indexes(entry).get(); } else { version_map()->write_version(store(), new_version, previous_key); } @@ -1669,16 +1733,11 @@ folly::Future LocalVersionedEngine::write_index_key_to_version_ma folly::Future write_version_fut; if (prune_previous_versions) { - write_version_fut = - async::submit_io_task( - WriteAndPrunePreviousTask{ - store(), version_map, index_key, std::move(stream_update_info.previous_index_key_) - } - ) - .via(&async::cpu_executor()) - .thenValue([this, index_key](auto&& atom_key_vec) { - return delete_unreferenced_pruned_indexes(std::move(atom_key_vec), index_key); - }); + write_version_fut = async::submit_io_task(WriteAndPrunePreviousTask{store(), version_map, index_key}) + .via(&async::cpu_executor()) + .thenValue([this](std::shared_ptr&& entry) { + return delete_unreferenced_pruned_indexes(entry); + }); } else { write_version_fut = async::submit_io_task( WriteVersionTask{store(), version_map, index_key, stream_update_info.previous_index_key_} diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index ed103ec13b4..ad0c70e2c4d 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -438,10 +438,20 @@ class LocalVersionedEngine : public VersionedEngine { ) override; /** - * Take tombstoned indexes that have been pruned in the version map and perform the actual deletion - * for indexes that are safe to delete (eg indexes contained in a snapshot are skipped). + * Phase 2 of prune (sweep), write path. write_and_prune_previous has already persisted the prune: + * the most recent previous versions are retained as individual TOMBSTONEs above the TOMBSTONE_ALL + * line, and everything at or below the line is buried. This physically deletes the buried block + * (unless delayed_deletes or snapshotted), protecting the retained keys' shared data. No further + * version-map write is needed — the line is already on disk. Best-effort; failures are logged. * - * @param pruned_indexes Must all share the same id() and should be tombstoned. + * @param entry The loaded+mutated version-map entry from the mark phase, reused here to avoid a + * second chain read. + */ + folly::Future delete_unreferenced_pruned_indexes(const std::shared_ptr& entry); + + /** + * Phase 2 of prune (sweep), admin path. Physically deletes the supplied pruned index keys (unless + * delayed_deletes or snapshotted), protecting key_to_keep's shared data. Best-effort. */ folly::Future delete_unreferenced_pruned_indexes( std::vector&& pruned_indexes, const AtomKey& key_to_keep diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index ac4bbc99cde..396d81a3606 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -906,6 +906,9 @@ TEST(VersionMap, FollowingVersionChainEndEarlyOnTombstoneAll) { } TEST(VersionMap, FollowingVersionChainWithWriteAndPrunePrevious) { + // Disable the protection window so retention is driven solely by the anchor: only the anchor v2 + // is retained, v0/v1 are buried by the folded TOMBSTONE_ALL. + ScopedConfig protection("VersionStore.PrunePreviousProtectionSecs", 0); auto store = std::make_shared(); auto version_map = std::make_shared(); StreamId id{"test"}; @@ -918,9 +921,11 @@ TEST(VersionMap, FollowingVersionChainWithWriteAndPrunePrevious) { version_map->do_write(store, key, entry); write_symbol_ref(store, key, std::nullopt, entry->head_.value()); - // write a new version and prune the previous versions + // Phase 1 (mark) folds the whole prune into one journal entry: head stays v3 (TABLE_INDEX), the + // anchor v2 (the latest undeleted previous) is retained as an individual TOMBSTONE, and a single + // TOMBSTONE_ALL is placed at v1 (anchor-1), burying v0 and v1. auto key2 = atom_key_with_version(id, 3, 3); - version_map->write_and_prune_previous(store, key2, key); + version_map->write_and_prune_previous(store, key2); auto ref_entry = VersionMapEntry{}; read_symbol_ref(store, id, ref_entry); @@ -942,9 +947,10 @@ TEST(VersionMap, FollowingVersionChainWithWriteAndPrunePrevious) { }) { follow_result->clear(); version_map->follow_version_chain(store, ref_entry, follow_result, load_strategy); - // When loading with any of the specified load strategies with include_deleted=false we should end following the - // version chain early at version 2 because that's when we encounter the TOMBSTONE_ALL. - ASSERT_EQ(follow_result->load_progress_.oldest_loaded_index_version_, VersionId{2}); + // With include_deleted=false the load stops at the TOMBSTONE_ALL high-water mark. The anchor + // v2 is individually tombstoned above the line at v1, so the load traverses v3, v2, then + // stops at v1 (the line). + ASSERT_EQ(follow_result->load_progress_.oldest_loaded_index_version_, VersionId{1}); } for (auto load_strategy : @@ -959,6 +965,46 @@ TEST(VersionMap, FollowingVersionChainWithWriteAndPrunePrevious) { } } +TEST(VersionMap, PrunePreviousTombstonesAllPreExisting) { + // Phase 1 of prune (mark): write_and_prune_previous individually tombstones every pre-existing + // version (so readers see only the latest) but does NOT physically delete or write a + // TOMBSTONE_ALL — that is the engine's sweep phase. This verifies the version-map guarantee: + // V0 is tombstoned (no longer visible) yet still present and reachable via INCLUDE_DELETED. + PilotedClock::reset(); + auto store = std::make_shared(); + auto version_map = std::make_shared(); + StreamId id{"test_concurrent_prune"}; + + auto v0_key = atom_key_with_version(id, 0, PilotedClock::nanos_since_epoch()); + auto entry = version_map->check_reload( + store, id, LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}, __FUNCTION__ + ); + version_map->do_write(store, v0_key, entry); + write_symbol_ref(store, v0_key, std::nullopt, entry->head_.value()); + + auto v1_key = atom_key_with_version(id, 1, PilotedClock::nanos_since_epoch()); + // Anchor is the latest undeleted previous (v0); it is the sole previous version, so it is retained + // as an individual TOMBSTONE and no TOMBSTONE_ALL is written. + version_map->write_and_prune_previous(store, v1_key); + + // V0 is no longer visible as a live version... + auto live = get_all_versions(store, version_map, id); + auto v0_alive = std::any_of(live.begin(), live.end(), [](const AtomKey& k) { return k.version_id() == 0; }); + EXPECT_FALSE(v0_alive) << "V0 should be tombstoned in the chain after write_and_prune_previous"; + + // ...but it is still present (individually tombstoned, not buried under a TOMBSTONE_ALL), so an + // INCLUDE_DELETED load still reaches it and the engine sweep can later reclaim it. + auto reloaded = version_map->check_reload( + store, id, LoadStrategy{LoadType::ALL, LoadObjective::INCLUDE_DELETED}, __FUNCTION__ + ); + EXPECT_TRUE(reloaded->is_tombstoned(VersionId{0})) << "V0 must be individually tombstoned by the mark phase"; + EXPECT_FALSE(reloaded->tombstone_all_.has_value()) << "the mark phase must not write a TOMBSTONE_ALL"; + auto tombstoned = reloaded->get_tombstoned_indexes(); + auto v0_present = + std::any_of(tombstoned.begin(), tombstoned.end(), [](const AtomKey& k) { return k.version_id() == 0; }); + EXPECT_TRUE(v0_present) << "V0 must still be present for the engine sweep to find and delete"; +} + TEST(VersionMap, HasCachedEntry) { ScopedConfig sc("VersionMap.ReloadInterval", std::numeric_limits::max()); // Set up the version chain v0 <- v1(tombstone_all) <- v2 <- v3(tombstoned) diff --git a/cpp/arcticdb/version/test/test_version_store.cpp b/cpp/arcticdb/version/test/test_version_store.cpp index 672a7005080..471e26ac1c7 100644 --- a/cpp/arcticdb/version/test/test_version_store.cpp +++ b/cpp/arcticdb/version/test/test_version_store.cpp @@ -90,12 +90,19 @@ TEST(PythonVersionStore, WriteWithPruneVersions) { using namespace arcticdb::stream; using namespace arcticdb::pipelines; + // Disable the protection window so pruning is not blocked by PilotedClock's tiny timestamps. + ScopedConfig no_protection("VersionStore.PrunePreviousProtectionSecs", 0); + auto [version_store, mock_store] = python_version_store_in_memory(); - write_version_frame({"test_versioned_engine_delete"}, 0, version_store, 30, true); - write_version_frame({"test_versioned_engine_delete"}, 1, version_store, 30, true, 0, std::nullopt, true); - // Should have pruned the previous version and have just one version - ASSERT_EQ(mock_store->num_atom_keys_of_type(KeyType::TABLE_INDEX), 1); + auto k0 = write_version_frame({"test_versioned_engine_delete"}, 0, version_store, 30, true); + auto k1 = write_version_frame({"test_versioned_engine_delete"}, 1, version_store, 30, true, 0, k0, true); + // After two writes: v0 is the just-superseded head, so it is retained as the anchor — nothing pruned yet. + ASSERT_EQ(mock_store->num_atom_keys_of_type(KeyType::TABLE_INDEX), 2); + + write_version_frame({"test_versioned_engine_delete"}, 2, version_store, 30, true, 0, k1, true); + // After the third write: v1 is now the anchor (kept), v0 has aged out and is pruned, v2 is latest — 2 survive. + ASSERT_EQ(mock_store->num_atom_keys_of_type(KeyType::TABLE_INDEX), 2); } TEST(PythonVersionStore, DeleteAllVersions) { diff --git a/cpp/arcticdb/version/test/version_map_model.hpp b/cpp/arcticdb/version/test/version_map_model.hpp index f570664cd34..b3d127e8e8b 100644 --- a/cpp/arcticdb/version/test/version_map_model.hpp +++ b/cpp/arcticdb/version/test/version_map_model.hpp @@ -55,7 +55,7 @@ struct MapStorePair { auto version_id = prev ? prev->version_id() + 1 : 0; if (tombstones_) - map_->write_and_prune_previous(store_, make_test_index_key(id, version_id, KeyType::TABLE_INDEX), prev); + map_->write_and_prune_previous(store_, make_test_index_key(id, version_id, KeyType::TABLE_INDEX)); else backwards_compat_write_and_prune_previous( store_, map_, make_test_index_key(id, version_id, KeyType::TABLE_INDEX) diff --git a/cpp/arcticdb/version/version_functions.hpp b/cpp/arcticdb/version/version_functions.hpp index 949b5e8ab5f..340b599502b 100644 --- a/cpp/arcticdb/version/version_functions.hpp +++ b/cpp/arcticdb/version/version_functions.hpp @@ -225,6 +225,21 @@ inline folly::Future finalize_tombstone_a version_store::TombstoneVersionResult res{true, entry->head_->id()}; res.keys_to_delete = std::move(tombstone_result.second); + // Also delete any versions a prior prune tombstoned but retained in storage (the anchor / + // within-window keys sitting individually-tombstoned above the TOMBSTONE_ALL line). They are + // already loaded in this UNDELETED_ONLY entry, so no extra storage round-trip is needed. Without + // this they would leak when the whole symbol is deleted. Filter on the *individual* tombstone: + // versions buried by a TOMBSTONE_ALL (a prior delete_all) are already physically deleted, so + // re-adding them would just issue wasted delete ops. + for (const auto& key : entry->keys_) { + if (is_index_key_type(key.type()) && entry->has_individual_tombstone(key.version_id()) && + std::none_of(res.keys_to_delete.begin(), res.keys_to_delete.end(), [&](const AtomKey& k) { + return k.version_id() == key.version_id(); + })) { + res.keys_to_delete.push_back(key); + } + } + res.no_undeleted_left = true; res.latest_version_ = tombstone_result.first; return res; diff --git a/cpp/arcticdb/version/version_map.hpp b/cpp/arcticdb/version/version_map.hpp index b200e8cc9b8..cda2b12d1f9 100644 --- a/cpp/arcticdb/version/version_map.hpp +++ b/cpp/arcticdb/version/version_map.hpp @@ -283,35 +283,116 @@ class VersionMapImpl { return entry->dump(); } - std::vector write_and_prune_previous( - std::shared_ptr store, const AtomKey& key, const std::optional& previous_key - ) { + // Phase 1 of prune (mark): write `key` as the new head in a single journal entry that also folds + // in the prune of every currently-live previous version. The most recent previous versions are + // *retained* — the anchor plus anything inside the protection window — written as individual + // TOMBSTONEs, leaving them present-but-invisible *above* the high-water mark so their data stays + // in storage and findable. The anchor is the latest *undeleted* previous version: the base a + // concurrent appender builds on (an appender always extends the latest live version, never a + // tombstoned one), so retaining it is what prevents the data-loss race. Everything below the + // retained floor is buried by a single TOMBSTONE_ALL at floor-1. Because the head of the entry + // remains the new TABLE_INDEX, the symbol ref's first key is an index key and the LATEST read + // fast-path is preserved (no extra VERSION GET). Only one TABLE_INDEX is written, so the do_write + // invariant holds. + // + // Phase 2 (delete_unreferenced_pruned_indexes in the engine) physically deletes the buried block; + // it needs no further version-map write because the line is already persisted here. + // + // Returns the loaded+mutated entry so the sweep can reuse it without re-reading the chain. + std::shared_ptr write_and_prune_previous(std::shared_ptr store, const AtomKey& key) { ARCTICDB_DEBUG(log::version(), "Version map pruning previous versions for stream {}", key.id()); auto entry = check_reload(store, key.id(), LoadStrategy{LoadType::ALL, LoadObjective::UNDELETED_ONLY}, __FUNCTION__); - auto [_, result] = tombstone_from_key_or_all_internal(store, key.id(), previous_key, entry, false); - std::vector keys_to_write; - std::optional tombstone_all_key; - keys_to_write.push_back(key); - if (!result.empty()) { - auto first_key_to_tombstone = previous_key ? previous_key : entry->get_first_index(false).first; - tombstone_all_key = get_tombstone_all_key(first_key_to_tombstone.value(), store->current_timestamp()); - entry->try_set_tombstone_all(tombstone_all_key.value()); - keys_to_write.push_back(tombstone_all_key.value()); + const auto now = store->current_timestamp(); + const auto protection_secs = ConfigsMap::instance()->get_int("VersionStore.PrunePreviousProtectionSecs", 600); + const timestamp cutoff = now - static_cast(protection_secs) * 1'000'000'000LL; + + // The anchor is the latest undeleted previous version (an appender's base is always the latest + // live version). keys_ is in descending version order, so the first undeleted previous index + // is it. + std::optional anchor; + for (const auto& k : entry->keys_) { + if (is_index_key_type(k.type()) && k.version_id() != key.version_id() && !entry->is_tombstoned(k)) { + anchor = k.version_id(); + break; + } + } + + // Partition every previous index key that is still present above the existing high-water mark + // (the UNDELETED_ONLY load stops at the old TOMBSTONE_ALL, so keys_ holds exactly these). We + // include already-individually-tombstoned versions: a prior prune kept them present-but- + // invisible inside the protection window, and once they age out this prune is what finally + // buries and reclaims them. + std::vector retained; + std::vector others; + for (const auto& k : entry->keys_) { + if (is_index_key_type(k.type()) && k.version_id() != key.version_id()) { + if ((anchor && k.version_id() == *anchor) || (protection_secs > 0 && k.creation_ts() >= cutoff)) + retained.push_back(k); + else + others.push_back(k); + } + } + + std::vector keys_to_write{key}; + std::vector tombstones; + for (const auto& k : retained) { + // Already-tombstoned retained versions stay invisible without a fresh tombstone. + if (!entry->is_tombstoned(k)) { + tombstones.push_back(index_to_tombstone(k.version_id(), key.id(), now)); + keys_to_write.push_back(tombstones.back()); + } } - auto previous_index = do_write(store, key.version_id(), key.id(), std::span{keys_to_write}, entry); - write_symbol_ref(store, *entry->keys_.cbegin(), previous_index, entry->head_.value()); + // Bury everything strictly below the retained floor with a single TOMBSTONE_ALL at floor-1. + // A non-retained version that sits *above* the floor (e.g. a delete_version'd former head) is + // left individually tombstoned rather than buried — a TOMBSTONE_ALL low enough to bury it + // would also bury the retained anchor. It is reclaimed by a later prune once the floor rises + // past it. When nothing is retained, bury the whole present block. + std::optional line; + if (retained.empty()) { + if (!others.empty()) + line = std::max_element(others.begin(), others.end(), [](const auto& a, const auto& b) { + return a.version_id() < b.version_id(); + })->version_id(); + } else { + const VersionId floor = + std::min_element(retained.begin(), retained.end(), [](const auto& a, const auto& b) { + return a.version_id() < b.version_id(); + })->version_id(); + if (std::any_of(others.begin(), others.end(), [floor](const auto& k) { return k.version_id() < floor; })) + line = floor - 1; + } + + std::optional tombstone_all_key; + if (line) { + tombstone_all_key = atom_key_builder() + .version_id(*line) + .creation_ts(now) + .content_hash(0) + .start_index(NumericIndex{0}) + .end_index(NumericIndex{0}) + .build(key.id(), KeyType::TOMBSTONE_ALL); + keys_to_write.push_back(*tombstone_all_key); + } + do_write(store, key.version_id(), key.id(), std::span{keys_to_write}, entry); + for (const auto& t : tombstones) + entry->tombstones_.try_emplace(t.version_id(), t); + if (tombstone_all_key) + entry->try_set_tombstone_all(*tombstone_all_key); + write_symbol_ref(store, *entry->keys_.cbegin(), entry->get_second_undeleted_index(), entry->head_.value()); maybe_invalidate_cached_undeleted(*entry); + if (log_changes_) { + for (const auto& t : tombstones) + log_tombstone(store, key.id(), t.version_id()); if (tombstone_all_key) - log_tombstone_all(store, tombstone_all_key.value().id(), tombstone_all_key.value().version_id()); + log_tombstone_all(store, key.id(), tombstone_all_key->version_id()); log_write(store, key.id(), key.version_id()); } - - return result; + return entry; } std::pair> delete_all_versions( @@ -1114,6 +1195,7 @@ class VersionMapImpl { private: FRIEND_TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad); + std::pair> tombstone_from_key_or_all_internal( std::shared_ptr store, const StreamId& stream_id, std::optional first_key_to_tombstone = std::nullopt, diff --git a/cpp/arcticdb/version/version_tasks.hpp b/cpp/arcticdb/version/version_tasks.hpp index 4df34011bbd..a98483ba573 100644 --- a/cpp/arcticdb/version/version_tasks.hpp +++ b/cpp/arcticdb/version/version_tasks.hpp @@ -233,20 +233,15 @@ struct WriteAndPrunePreviousTask : async::BaseTask { const std::shared_ptr store_; const std::shared_ptr version_map_; const AtomKey key_; - const std::optional maybe_prev_; - WriteAndPrunePreviousTask( - std::shared_ptr store, std::shared_ptr version_map, AtomKey key, - std::optional maybe_prev - ) : + WriteAndPrunePreviousTask(std::shared_ptr store, std::shared_ptr version_map, AtomKey key) : store_(std::move(store)), version_map_(std::move(version_map)), - key_(std::move(key)), - maybe_prev_(std::move(maybe_prev)) {} + key_(std::move(key)) {} - folly::Future> operator()() { + folly::Future> operator()() { ScopedLock lock(version_map_->get_lock_object(key_.id())); - return version_map_->write_and_prune_previous(store_, key_, maybe_prev_); + return version_map_->write_and_prune_previous(store_, key_); } }; diff --git a/docs/claude/cpp/VERSIONING.md b/docs/claude/cpp/VERSIONING.md index 28496aea57c..b8484dbb3a0 100644 --- a/docs/claude/cpp/VERSIONING.md +++ b/docs/claude/cpp/VERSIONING.md @@ -208,16 +208,110 @@ Tombstoned versions: ### Hard Delete (Prune) -`prune_previous_versions` physically removes old version data: +Pruning comes in **two flavours with different physical-delete policies**: -```python -# Prune on write -lib.write("sym", df, prune_previous_version=True) +| Trigger | Tombstoning | Physical delete | +|---|---|---| +| Per-write flag `prune_previous_version=True` (on `write`/`append`/`update`/etc.) | Immediate, unconditional | **Windowed + anchored** — concurrent-writer safe (see below) | +| Explicit admin `prune_previous_versions(symbol)` | Immediate, unconditional | **Aggressive** — deletes everything except the latest immediately; no protection window | + +Both tombstone pre-existing non-snapshotted versions immediately so they vanish from readers. They +differ only in *when* the data is physically removed. The windowed path is the interesting one and +is described below; the aggressive admin path is summarised under [Implementation](#implementation). + +#### Per-write prune: single-write fold + delete-only sweep -# Explicit prune -lib.prune_previous_versions("sym") +The per-write path is **two phases**, but the version-chain mutation all happens in **phase 1** as a +single journal entry — the mechanism is **individual `TOMBSTONE`s plus a `TOMBSTONE_ALL` +high-water mark**. The chain on disk ends up looking like: + +``` + index(latest, live) ◄─ head; ref[0] is an index key (LATEST fast-path) + └─ index(retained, individually TOMBSTONE'd) ◄─ above the line: present-but-invisible + └─ TOMBSTONE_ALL(version = floor − 1) ◄─ the line + └─ index(...buried...) ◄─ at/below the line: physically deleted ``` +**Phase 1 — mark + fold** (`version_map::write_and_prune_previous`): in **one journal entry** write +the new head `TABLE_INDEX`, an **individual `TOMBSTONE`** for each *retained* previous version, and +a single `TOMBSTONE_ALL`. Retained = the **anchor** (the latest *undeleted* previous version — the +base a concurrent appender builds on, kept regardless of age) plus any version inside the protection +window (`creation_ts >= now − PrunePreviousProtectionSecs`, default 600 s). The `TOMBSTONE_ALL` is +placed at `floor − 1` where `floor` is the oldest retained version, so it buries everything +*strictly below* the floor. A non-retained version that happens to sit *above* the floor (e.g. a +`delete_version`'d former head) is left individually tombstoned, **not** buried — a line low enough +to bury it would also bury the anchor; a later prune reclaims it once the floor rises past it. +Crucially the entry's head stays the new `TABLE_INDEX`, so the symbol ref's first key is an index +key and the LATEST read fast-path is preserved (no extra VERSION GET). The loaded+mutated entry is +returned so phase 2 can reuse it without re-reading the chain. + +**Phase 2 — sweep** (`local_versioned_engine::delete_unreferenced_pruned_indexes(entry)`): reuses +the phase-1 entry (no reload) and is **delete-only — no version-map write**, because the line is +already persisted. Split the tombstoned index keys by the line: the buried block (`version_id <= +line`) is `to_delete`; the retained block (`> line`) is protected. **Physically delete** `to_delete` +except snapshotted keys. Under `delayed_deletes` (background deletion) this phase early-returns. + +The high-water line is what keeps prune/delete **bounded**: the version map loads with +`UNDELETED_ONLY`, which stops walking the chain at the `TOMBSTONE_ALL` line +(`continue_when_loading_undeleted` in `version_utils.hpp`). So each prune reads only the retained +region above the line plus the live head — proportional to retained data, never to total history. +When retained versions later age out of the window, the **next prune's phase 1** re-partitions them +(it includes already-individually-tombstoned versions in its scan), drops them below the new floor, +and the matching phase 2 deletes them — so a whole burst aged-out together is reclaimed in one +bounded prune. + +`KeyType::TOMBSTONE` (individual) and `KeyType::TOMBSTONE_ALL` already coexist and are understood by +older clients, so the layout `index <- individualTombstones(retained) <- TOMBSTONE_ALL(line)` is +forward/backward compatible. + +#### Dependency safety — what protects shared data + +A retained (anchor / within-window) index key keeps its data segments, and a buried key being +deleted may share those segments — via append-inheritance, via dedup, or transitively. To prevent +dangling references the sweep assembles `PreDeleteChecks::could_share_data` from: + +| Source | Added by | Covers | +|---|---|---| +| All snapshotted index keys for the stream | `get_index_keys_partitioned_by_inclusion_in_snapshots` returns them as `in_snaps` | Anything a snapshot pins | +| The retained anchor + within-window keys | inserted into `in_snaps` (the `retained` block) in `delete_unreferenced_pruned_indexes` | Append/dedup chains that still have at least one retained survivor (the new head is itself retained, so anything it references is covered) | + +`delete_trees_responsibly` then computes +`data_to_delete = recurse_index_keys(to_delete) − recurse_index_keys(could_share_data)`. +Because `recurse_index_keys` walks each protected TABLE_INDEX and emits every data atom key it +lists, transitive chains are covered by their endpoint: a single retained anchor at the end of a +long append chain protects every data segment the chain inherited from older buried versions. + +Failure mode the window bounds: a concurrent writer that started an append on V_K, then took +longer than `PrunePreviousProtectionSecs` to commit, with no retained/snapshotted key in between +holding V_K's references — that writer's commit can reference deleted segments. Tune +`PrunePreviousProtectionSecs` higher than the slowest expected append/dedup-write latency. + +#### Background deletion (delayed deletes) + +When the library is configured with **background deletion**, phase 2 early-returns without physically +deleting (the background tool runs its own reference-check before removing data). The +`TOMBSTONE_ALL` line is still persisted — it was written in phase 1 regardless — so the version +chain stays bounded. The buried index keys remain findable by the background tool via an +`INCLUDE_DELETED` load, which ignores the line. + +#### Implementation + +- `version_map.hpp:write_and_prune_previous()` — per-write phase 1 (mark + fold): writes the new + head, the retained versions' individual `TOMBSTONE`s, and the `TOMBSTONE_ALL` line in one journal + entry. Computes the anchor internally as the latest *undeleted* previous index (not the caller's + `previous_key`, which can point at a tombstoned former head after a `delete_version`). Returns the + loaded+mutated entry so the sweep can reuse it without re-reading the chain. +- `local_versioned_engine.cpp:delete_unreferenced_pruned_indexes(entry)` — per-write phase 2: + delete-only. Reuses the entry, splits the tombstoned indexes by the persisted line, deletes the + buried block (unless `delayed_deletes`), protects retained + snapshotted. No version-map write, so + the whole per-write prune is one chain read + one write. +- `version_store_api.cpp:PythonVersionStore::prune_previous_versions()` — the **separate aggressive + admin path**. It does *not* use `write_and_prune_previous`; it calls + `version_map::tombstone_from_key_or_all` (tombstones everything below the latest) then the + vector-form `delete_unreferenced_pruned_indexes(pruned_indexes, key_to_keep=latest)`, which + physically deletes immediately (except the latest and snapshotted keys). No protection window — + use this only when you explicitly want unconditional reclaim and no concurrent writers are mid-append. + ### TOMBSTONE_ALL Marks all versions before a point as deleted: @@ -334,6 +428,9 @@ ArcticDB does not use locks for symbol writes. Concurrent writes use a **last-wr **Caveats:** - Concurrent appends may appear out of order or one may be dropped - Parallel writes to the same symbol are not recommended for modification operations +- `prune_previous_versions=True` with concurrent writers: protected by a 10-minute window + (see [Hard Delete (Prune)](#hard-delete-prune)); writers that started from the same base version + still have their data accessible while they complete ### Read Concurrency diff --git a/docs/claude/plans/fix-prune-previous/branch-work-log.md b/docs/claude/plans/fix-prune-previous/branch-work-log.md new file mode 100644 index 00000000000..0e65eb27b9d --- /dev/null +++ b/docs/claude/plans/fix-prune-previous/branch-work-log.md @@ -0,0 +1,411 @@ +# Branch Work Log: fix-prune-previous + +## Session 1 (2026-04-13) + +### Goal +Fix a race condition where concurrent writers using `prune_previous_version=True` could produce unreadable versions. Writer 1's prune would delete data that Writer 2's new version still referenced, because both writers started from the same base version. + +### What was done + +- **Created Pegasus venv** `fix-prune-previous` (distribution `311-1`, Python 3.11) and installed ArcticDB in editable mode with `ARCTIC_CMAKE_PRESET=skip ARCTICDB_PROTOC_VERS=4`. + +- **Wrote 4 regression tests** in `python/tests/unit/arcticdb/version_store/test_prune_previous.py`: + - `test_prune_previous_preserves_recent_versions` — prune must not delete versions < 10 min old + - `test_prune_previous_single_preexisting_version_not_pruned` — never prune sole pre-existing version + - `test_prune_previous_prunes_when_old_enough` — with 0-second window, exactly 2 versions survive (anchor + latest) + - `test_concurrent_appends_with_prune_all_readable` — all versions readable after concurrent appends with prune + +- **Confirmed tests failed** against unmodified code (3/4 deterministic tests failed; concurrent test is probabilistic). + +- **Fixed `cpp/arcticdb/version/version_map.hpp`**: + - Added private `get_prune_previous_boundary()` helper that: + 1. Reads `VersionStore.PrunePreviousProtectionSecs` config (default 600 = 10 min) + 2. Finds all undeleted index keys ≤ requested boundary that are older than the cutoff + 3. If fewer than 2 candidates exist, returns nullopt (nothing to prune) + 4. Returns the second-newest candidate as the tombstone boundary, keeping the newest as an anchor + - Modified `write_and_prune_previous()` to call `get_prune_previous_boundary()` and only tombstone if it returns a value; also fixed the `tombstone_all_key` construction to use `effective_tombstone_key` (not `previous_key`) so the logical deletion range is correct. + +- **Ran `make lint`** — clang-format and black reformatted code; all tests still pass 4/4. + +## Session 2 (2026-04-14) + +### Goal +Address code review feedback from Session 1; fix vacuously-passing concurrent Python test; harden the C++ implementation. + +### What was done + +- **Replaced Python concurrent test** (`test_concurrent_appends_with_prune_all_readable`) with a deterministic C++ test `PrunePreviousProtectsBaseVersionForConcurrentWriters` in `test_version_map.cpp`. Root cause: LMDB serialises all write transactions so the race window never opened in Python. + +- **Fixed `FollowingVersionChainWithWriteAndPrunePrevious`** after anchor-rule changes: + - Added `ScopedConfig no_protection("VersionStore.PrunePreviousProtectionSecs", 0)` so PilotedClock timestamps (near 0 ns) fall below the cutoff and versions are eligible for pruning. + - Corrected LATEST+UNDELETED_ONLY expectation: uses the 3-item ref-entry fast-path, copying `oldest_loaded_index_version_ = min(3, 2) = 2` from the symbol ref. + - Corrected UNDELETED_ONLY traversal expectation: with anchor=V2, boundary=V1, TOMBSTONE_ALL lands at V1, so oldest = 1 (not old V2). + +- **Optimised `get_prune_previous_boundary`**: replaced vector-collect-all with early-exit two-pointer scan. We only need the first two eligible versions; allocating a vector for all candidates wastes memory on symbols with many versions. + +- **Fixed pre-existing `test_segment_reslicer.cpp` bug**: `str32.resize(width, '\0\0\0\0')` is a multi-character literal (triggers `-Werror=multichar`); changed to `U'\0'` (correct `char32_t` null). This was blocking the entire `test_unit_arcticdb` binary from building. + +- **Ran full VersionMap test suite** (27/27 passed) and `make lint`. + +## Session 3 (2026-04-14) + +### Goal +Address code review comments and update all documentation for the `prune_previous_versions` behaviour change. + +### What was done + +- **Updated `library.py` docstrings** for all 9 occurrences of `prune_previous_versions` parameter across `write`, `write_pickle`, `write_batch`, `append_batch`, `finalize_staged_data`, `write_metadata`, `write_metadata_batch`, `delete_data_in_range`, `compact_incomplete`, `defragment_symbol_data`, and `update_by_query`. New description explains the protection window, anchor rule, and directs users to explicit `prune_previous_versions()` for unconditional pruning. + +- **Updated explicit `prune_previous_versions()` docstring** to clarify it is unconditional (no protection window). + +- **Added `VersionStore.PrunePreviousProtectionSecs` to `docs/mkdocs/docs/runtime_config.md`** — user-tunable config; explains 2-version minimum, anchor rule, use cases. + +- **Updated `docs/claude/cpp/VERSIONING.md`** — replaced the "Hard Delete (Prune)" section with a detailed explanation of the two modes (prune-on-write with protection window/anchor vs. explicit unconditional prune); added note on prune+concurrency to the Concurrency section. + +- **Fixed failing regression tests in `test_nonreg_specific.py`** (89 failures from `should_be_pruned=True` cases): the old tests expected 1 surviving TABLE_INDEX key after pruning, but the anchor rule always keeps the newest eligible version so the minimum is 2. Fixed by: + - Adding `set_config_int("VersionStore.PrunePreviousProtectionSecs", 0)` to each test + - Adding an intermediate write so that there are ≥2 eligible candidates before the method-under-test (enabling the anchor to keep one while deleting the other) + - Updating assertions: `2 if should_be_pruned else 3` + - Exception: `test_prune_previous_defragment_symbol_data` already has 3 writes; its intermediate assertion is now `== 2` (can't distinguish with only 1 candidate) and the final is `2/3`. + - All 144 prune regression tests now pass. + +- **Updated PR description** with explicit "Behaviour change" section: after N ≥ 3 sequential writes, exactly 2 versions survive instead of 1. + +### Key design decisions +- The 10-minute window is the primary protection: any version younger than `PrunePreviousProtectionSecs` is immune to pruning, giving concurrent writers time to commit. +- The anchor is the secondary protection: even with a 0-second window, the newest eligible version is always kept so there is always a safe base for future writers. +- `PrunePreviousProtectionSecs` is a runtime config (not `static const`) so tests can override it via `set_config_int`. +- Explicit admin `prune_previous_versions()` is NOT modified — it keeps original aggressive behaviour. + +## Session 4 (2026-04-14) + +### Goal +Complete test fix-up: get all 25 remaining failing unit tests to pass after reverting sole-candidate pruning. + +### What was done + +- **Reverted sole-candidate C++ change** (confirmed already done at session start): `get_prune_previous_boundary` returns `boundary` (not `boundary.has_value() ? boundary : anchor`), so sole pre-existing candidates are kept as anchor and NOT tombstoned. + +- **Added global conftest fixture** `_disable_prune_protection_window` — autouse, function-scoped, sets `PrunePreviousProtectionSecs=0` before each test and restores it after. Tests that specifically exercise the window (e.g. `test_prune_previous_preserves_recent_versions`) override back to 600 at the start. + +- **Fixed 7 `test_append.py` / `test_parallel.py` failures** (session carried over from previous session's edits): + - `test_defragment_read_prev_versions`, `test_append_out_of_order_and_sort`, `test_sort_merge_append`: changed `== 1` → `== 2` with anchor-rule comments. + +- **Fixed 25 additional unit test failures** across 5 files by updating assertions to reflect the anchor rule (sole pre-existing candidate is never pruned): + - `test_deletion.py::test_delete_snapshot`: updated key-version assertions to accept both version_id 0 and 1. + - `test_deletion.py::test_tombstones_deleted_data_keys_prune`: `len(data_keys) == 1` → `== 2`. + - `test_deletion_batch.py::test_batch_write_with_pruning` and `test_delete_tree_via_prune_previous`: `len(versions) == 1` → `== 2`. + - `test_stage.py::test_finalize_with_tokens_and_prune_previous`: prune_previous branch now reads V0 and asserts it equals df_1 (V0 is alive as anchor). + - `test_parallel.py::test_parallel_write_sort_merge`: `0 not in versions` → `0 in versions`. + - `test_parallel.py::test_compact_incomplete_prune_previous`: `has_symbol("sym", 0) != should_prune` → `has_symbol("sym", 0)` (always True due to anchor). + +- **Full suite**: 774 passed, 0 failed, 132 skipped — all unit tests green. + +### Key anchor rule reminder +With `PrunePreviousProtectionSecs=0`, a sole pre-existing version is always the anchor and is NOT tombstoned. Pruning only starts removing versions on the **second** prune operation (once there are ≥2 eligible candidates). After N≥3 sequential prune writes, exactly 2 versions survive (anchor + latest). + +## Session 5 (2026-04-14) + +### Goal +Fix vacuous tests, upgrade conftest to session-scoped fixture, and rebase onto origin/master. + +### What was done + +- **Refactored conftest fixture** `_disable_prune_protection_window` to `scope="session"` — sets `PrunePreviousProtectionSecs=0` once per test session rather than before every test. Added `_with_protection_window` fixture in `test_prune_previous.py` using `config_context` (from `arcticdb.util.test`) to temporarily restore the 600s window for the specific tests that exercise it; `config_context` saves the current value and restores it in `finally`. + +- **Fixed `test_finalize_with_tokens_and_prune_previous` (test_stage.py)** — was vacuous: both `prune=True` and `prune=False` branches asserted identical data. Added a `df_0` pre-write (V0) so that when pruning fires it removes V0; `prune=True` now raises `NoSuchVersionException` reading V0, while `prune=False` returns `df_0`. + +- **Fixed `test_compact_incomplete_prune_previous` (test_parallel.py)** — was vacuous: `should_prune` flag was computed but not used in assertion (`assert lib.has_symbol("sym", 0)` always True). Added a pre-write at timestamp `-1` (V0), changing V1 write to timestamp `0`; compaction creates V2. With 2 eligible candidates, pruning now removes V0, restoring `assert lib.has_symbol("sym", 0) != should_prune`. + +- **Confirmed test suite**: all changed tests pass (0 failures). + +- **Resolved rebase conflicts for CLAUDE.md and PIPELINE.md**: discovered the earlier rebase was accidentally run on `jb/duckdb_incremental` (wrong branch). After aborting, switched to `fix-prune-previous` and confirmed it is already on top of origin/master — rebase was a no-op. + +## Session 8 (2026-04-14) + +### Goal +Fix remaining failures in `test_basic_version_store.py` integration tests after ref-key structure changed with the anchor rule. + +### What was done + +- **Fixed `check_regular_write_ref_key_structure`**: after a prune write with the anchor rule the ref key has 3 entries (latest TABLE_INDEX, anchor TABLE_INDEX, VERSION), not 2. Updated the helper to expect 3 entries mirroring `check_append_ref_key_structure`. Updated docstring. + +- **Fixed `test_prune_previous_versions_batch_write_metadata`**: TABLE_DATA assertion `== 2` → `== 1`. `batch_write_metadata` is a metadata-only write so V2 reuses V1's data segments; only V1's (anchor) TABLE_DATA key survives after pruning V0. + +## Session 7 (2026-04-14) + +### Goal +Fix integration tests in test_basic_version_store.py, test_arctic.py, and test_dedup.py missed by +the earlier unit-test sweep. + +### What was done + +- **Fixed `test_basic_version_store.py`** (6 locations): + - `test_with_prune`: `== 1` → `== 2` at both version-count assertions (lines 371, 380). + - `check_write_and_prune_previous_version_keys` helper: `TOMBSTONE_ALL.version_id == latest_version_id - 1` → `latest_version_id - 2` (the TOMBSTONE_ALL now marks the boundary, which is one below the anchor, not the anchor itself). + - `test_prune_previous_versions_write`, `test_prune_previous_versions_write_batch`, `test_prune_previous_versions_batch_write_metadata`: `list_versions == 1`, `TABLE_INDEX == 1`, `TABLE_DATA == 1` → `== 2` each. + - `test_prune_previous_versions_append_batch`: same INDEX/versions fix; TABLE_DATA count unchanged (stays 3 because append inheritance means V2 references all earlier data segments, protecting them via `delete_unreferenced_pruned_indexes`). + +- **Fixed `test_arctic.py`** (5 locations): + - `test_delete_version_after_tombstone_all`: added extra write (V0) as the boundary so the prune write (V2) fires a real TOMBSTONE_ALL; adjusted final delete from `[1,2]` → `[1,2,3]`. + - `test_prune_previous_versions_with_write`: replaced single prune write + two `NoDataFoundException` checks with two sequential prune writes (each removes one version rolling through the anchor rule), plus an intermediate assertion that V1 is still readable as anchor after the first prune. + - `test_append_prune_previous_versions`, `test_update_prune_previous_versions`: `len == 1` → `== 2`, added `("symbol", 0) in symbols`. + - `test_compact_data`: `1 if prune` → `2 if prune`. + +- **Fixed `test_dedup.py`** (3 locations): + - `test_de_dup_same_value_written`: `list_versions == 1` → `== 2` (data key count unchanged since de-dup with same data protects all keys). + - `test_de_dup_with_delete`, `test_de_dup_with_delete_multiple`: `== num_elements` (100) → `== 3 * num_elements / 2` (150) because V0 (sole eligible anchor) retains its 50 df1 data keys alongside V3's 100 new keys (df2+df3 has no overlap with df1 so those keys aren't protected by V3's reference). + - `test_de_dup_with_tombstones`, `test_de_dup_with_tombstones_multiple`: **no change needed** — in those tests there are 2 eligible candidates so the anchor/boundary split still produces the same total data key count (150) via de-dup inheritance. + +## Session 6 (2026-04-14) + +### Goal +Fix two more test files missed by the anchor-rule update sweep. + +### What was done + +- **Fixed `test_column_stats_creation.py::test_column_stats_object_deleted_with_index_key`** (4 parametrised variants): `test_prune_previous_kwarg` and `test_prune_previous_kwarg_batch_methods` both set `expected_count = 0` after writing with `prune_previous_version=True`. With the anchor rule the sole pre-existing version is kept, so its column stats key survives; changed to `expected_count = 1`. + +- **Fixed `test_recursive_normalizers.py::test_data_layout`** (2 parametrised variants): after writing v2 with `prune_previous_version=True`, v1 survives as anchor alongside v2. Updated key-count assertions: `MULTI_KEY 1→2`, `TABLE_INDEX 3→6`, `TABLE_DATA 3→6`. The subsequent `lib.delete("sym")` path is unaffected (still cleans up all keys to 0). + +## Session 9 (2026-05-29) + +### Goal +Decouple the tombstone gate from the physical-delete gate, so `lib.write(sym, df, prune_previous_version=True)` produces the intuitive "only the latest version is visible" result while still keeping concurrent-writer safety. + +### What was done + +**C++ refactor** + +- **`cpp/arcticdb/version/version_map.hpp`**: removed the old `get_prune_previous_boundary` helper. `write_and_prune_previous` now unconditionally tombstones every pre-existing undeleted index (boundary = `entry->get_first_index(false).first`). Switched the entry load to `INCLUDE_DELETED` so an aging-tombstones sweep can pick up index keys that were previously tombstoned but not yet physically removed. Added a `WriteAndPrunePreviousResult` struct exposing `freshly_tombstoned` (anchor candidates) and `aging_tombstones` (eligible for cleanup) so the engine layer can compute the anchor from the freshly-tombstoned set only, avoiding mis-anchoring on zombie tombstones left by `delete_version`. + +- **`cpp/arcticdb/version/version_tasks.hpp`**: `WriteAndPrunePreviousTask` no longer takes `delayed_deletes`; returns the new struct. + +- **`cpp/arcticdb/version/local_versioned_engine.cpp`**: `delete_unreferenced_pruned_indexes` now takes two vectors (freshly + aging). Anchor = max version_id in freshly only. A key is kept in storage iff it is the anchor OR its `creation_ts >= now - PrunePreviousProtectionSecs`. Aging keys go straight to the snapshot/dedup filter without anchor protection. + +- **`cpp/arcticdb/version/version_store_api.cpp`**: admin `prune_previous_versions` also calls `collect_aging_tombstones` and passes both vectors. `delete_all_versions` extends `all_index_keys` with previously-tombstoned-but-not-deleted index keys so `lib.delete(sym)` cleans up anchor-retained data instead of leaking it. + +**Docs** + +- `docs/mkdocs/docs/runtime_config.md`: rewrote `VersionStore.PrunePreviousProtectionSecs` description to reflect the new "tombstone immediately, physical delete deferred" semantics + the anchor rule. +- `docs/claude/cpp/VERSIONING.md`: replaced the "two modes" prune section with a single-policy diagram covering both the write and admin paths. Added a note about the prior anchor-at-tombstone-time iteration for git archaeology. +- All `library.py` and `_store.py` docstrings for `prune_previous_version` (write/append/update/etc) and the admin `prune_previous_versions()` method. + +**Tests — Python** + +- Reverted assertions in `test_basic_version_store.py`, `test_arctic.py`, `test_dedup.py`, `test_deletion.py`, `test_deletion_batch.py`, `test_nonreg_specific.py`, `test_stage.py`, `test_parallel.py`, `test_recursive_normalizers.py`, `test_column_stats_creation.py`, `test_sort_merge.py`, `test_update.py`, `test_append.py`, `test_snapshot.py`, `test_num_storage_operations.py`, hypothesis test, and the new `test_prune_previous.py`: visible-version counts go back to `== 1` after prune; storage-key counts often stay at `== 2` because the anchor rule keeps the newest pre-existing version's index/data in storage. Removed the intermediate-write workarounds that sessions 4/6/7/8 added. +- Added `test_prune_previous_data_remains_in_storage_during_window` and `test_prune_previous_anchor_protects_long_stable_head` to exercise the new safety properties directly. + +**Tests — C++** + +- Renamed `PrunePreviousProtectsBaseVersionForConcurrentWriters` to `PrunePreviousTombstonesAllPreExisting`. It now asserts that the version-map tombstones V0 (no longer visible) and reports it in `freshly_tombstoned` so the engine layer can apply the protection-window + anchor rule. +- `FollowingVersionChainWithWriteAndPrunePrevious`: restored to its pre-branch form (matches new policy because all pre-existing versions are tombstoned). +- `rapidcheck_version_map.cpp`: removed the `PrunePreviousProtectionSecs=0` scoped config (no longer needed). +- `version_map_model.hpp::VersionMapTombstonesModel::write_and_prune_previous`: reverted to "tombstone everything below the new version". + +### Verification + +- `cpp test_unit_arcticdb --gtest_filter='VersionMap.*'`: 30/30 passing. +- `cpp arcticdb_rapidcheck_tests --gtest_filter='VersionMap.*'`: 2/2 passing. +- Unit tests for prune-impact files (test_prune_previous, test_deletion, test_deletion_batch, test_parallel, test_stage, test_sort_merge, test_recursive_normalizers, test_nonreg_specific): 1254/1254 passing. +- Hypothesis tests: 3/3 passing. +- `make lint` (black check): clean. +- Pre-existing failures unrelated to this change (test_append_mismatched_object_kind, test_find_version, missing protobuf module for column_stats) remain. + +### Key design decisions + +- **Anchor + time-based, both at physical-delete time.** Tombstoning is unconditional; the visible API matches user intuition. The anchor (max version_id in just-tombstoned set) is kept regardless of age — protects writers sitting on a long-stable head. The time window protects recently-written versions whose data may still be referenced by in-flight appends. +- **Aging-sweep keys are NOT anchor candidates.** Otherwise zombie tombstones left by `delete_version` would mis-anchor the prune call and let the live anchor get deleted. Tracked separately via `WriteAndPrunePreviousResult.aging_tombstones`. +- **`delete(sym)` cleans up anchor-retained keys.** Added an INCLUDE_DELETED reload after `tombstone_all_async` so previously-tombstoned-but-not-deleted index keys also get fed to `delete_tree`. Without this, the anchor protection would permanently leak the symbol's prior version data once you delete it. + +--- + +## Session: bound prune/delete via TOMBSTONE_ALL high-water mark + individual tombstones + +### Problem +The prior iteration wrote a single `TOMBSTONE_ALL` at the prior head and used an `INCLUDE_DELETED` +aging-sweep to find retained anchors below it. But `INCLUDE_DELETED` loads never stop at a +`TOMBSTONE_ALL`, so every prune-write and `delete_all_versions` re-read the entire deletion history +— O(total versions). This failed `test_num_storage_operations::{test_write_and_prune_previous_over_time, +test_delete_over_time}` which require a *constant* number of storage ops. + +### Fix +Treat `TOMBSTONE_ALL` as a "everything below here is gone" high-water mark. Retained versions +(anchor + within-window) are kept as **individual** `KeyType::TOMBSTONE` keys *above* the line; a +bounded `UNDELETED_ONLY` load stops at the line but still loads the individual tombstones above it. +When versions age out and are physically deleted, the line is **advanced** to bury them. The loaded +region is the retained set (bounded by window content), not history. + +The window/anchor partition and the line advance now live in `version_map` under the per-symbol +lock (`partition_and_advance_tombstone_all`), so tombstones + line move atomically. The engine +(`delete_unreferenced_pruned_indexes`) is a dumb executor: delete `to_delete`, protect `could_share` ++ `key_to_keep`, honour snapshots. + +### Changes +- `version_map.hpp`: result struct → `{to_delete, could_share}`; removed `collect_aging_tombstones`; + rewrote `write_and_prune_previous` (bounded `UNDELETED_ONLY` load; `no_retention` flag keeps the + legacy single-`TOMBSTONE_ALL` behaviour for `delayed_deletes`); added + `partition_and_advance_tombstone_all` (shared by write + admin paths). +- `version_tasks.hpp`: `WriteAndPrunePreviousTask` threads the `no_retention` flag. +- `local_versioned_engine.cpp/.hpp`: `delete_unreferenced_pruned_indexes(to_delete, could_share, + key_to_keep)` — dropped the time/anchor gate (now in version_map); updated both callers to pass + `delayed_deletes()` as `no_retention`. +- `version_store_api.cpp`: `delete_all_versions` reads retained tombstoned indexes from the cached + `UNDELETED_ONLY` entry (no extra `INCLUDE_DELETED` reload); admin `prune_previous_versions` uses + bounded load + `partition_and_advance_tombstone_all(keep_anchor=false, protection_secs=0)` for + thorough reclaim. +- Tests: updated `FollowingVersionChainWithWriteAndPrunePrevious` (line now lands at anchor-1) and + `PrunePreviousTombstonesAllPreExisting` (anchor is retained → reported in `could_share`, not + `to_delete`). rapidcheck model unchanged (visible semantics identical). + +### Verification +- C++ `VersionMap.*`: 30/30. rapidcheck `VersionMap.*`: 2/2. +- Python: pending (see below). + +### Revised to mark/sweep two-phase (per maintainer review) + +Maintainer pushed back on complexity/sprawl. Restructured into a clean two-phase split that reuses +existing tombstone machinery: + +- **Phase 1 — mark** (`version_map::write_and_prune_previous`): write the new head + an individual + `TOMBSTONE` for every live previous version, in one journal entry. No `TOMBSTONE_ALL`, no delete. + Reuses the same representation as `delete_version`. ~10-line change from master. +- **Phase 2 — sweep** (`local_versioned_engine::delete_unreferenced_pruned_indexes(stream_id, anchor_version)`): + collect the above-line tombstoned set, retain anchor + within-window, delete the rest (skip under + `delayed_deletes`), then `version_map::advance_tombstone_all(line)` to bury the removed block. +- Removed `WriteAndPrunePreviousResult`, `partition_and_advance_tombstone_all`, and the `no_retention` + flag plumbing. `version_map.hpp` diff dropped from +167 to +76 vs master. +- **Anchor correctness**: the anchor is passed explicitly (the caller's `previous_key`), not + re-derived as `max(tombstoned)` in the sweep — an earlier `delete_version` of the head could + otherwise leave a higher tombstoned version that was never an append base. +- **delete_all_versions reverted to master**; the retained-key cleanup moved to + `finalize_tombstone_all_result` (shared by delete + batch-delete), where the `UNDELETED_ONLY` entry + is already loaded → zero extra I/O. Retained keys go through the same `delete_tree` snapshot/dedup + checks as live keys, so snapshotted retained versions are protected. +- Added Python tests: `test_delete_all_reclaims_prune_retained_version` (no leak) and + `test_delete_all_after_prune_preserves_snapshotted_version` (snapshot safety). + +### Optimized mark/sweep (entry reuse) — per maintainer review of read amplification + +The two-phase split initially did 3 version-map loads per prune (mark, sweep-collect, advance) — with +the op-count guard test disabling the cache (ReloadInterval=0) that measured ~22 VERSION GETs/prune +(5× master). Reduced to 1 load by threading the loaded+mutated entry through the phases: +- `write_and_prune_previous` now returns the entry (the cached shared_ptr it loaded under the lock). +- `delete_unreferenced_pruned_indexes(entry, anchor_version)` reuses it (no reload) to collect the + tombstoned set and partition. +- `advance_tombstone_all(entry, line)` writes the line on the same entry (re-acquiring the lock). +Result: 1 chain read + 2 writes per prune. Probe (ReloadInterval=0, moto): VERSION GETs plateau at 8 +(total 25) after a ~5-prune transient while the TOMBSTONE_ALL line catches up. + +Also fixed `finalize_tombstone_all_result` to collect only *individually*-tombstoned keys +(`has_individual_tombstone`) rather than all `is_tombstoned` — TOMBSTONE_ALL-buried versions from a +prior delete_all are already physically gone, so re-adding them issued a wasted delete op (delete +steady-state went 17→16, matching master). + +Adjusted `test_write_and_prune_previous_over_time` to warm up 6 prunes before capturing its baseline +(it asserts steady-state constancy; the line-catch-up transient is expected). + +### Verification (final) +- C++ `VersionMap.*`: 30/30. rapidcheck `VersionMap.*`: 2/2. +- Python: test_prune_previous (10/10, incl. delete-all reclaim + snapshot-safety), test_deletion, + test_num_storage_operations::{test_write_and_prune_previous_over_time, test_delete_over_time} — 50 + passed. delete steady = 16 (== master); prune VERSION GETs bounded/constant in steady state. + +### Concurrency note (maintainer Q) +The anchor protects only the *immediately* previous head (1-back). Protection against multiple fast +successive writes is the **window's** job (retains everything younger than PrunePreviousProtectionSecs), +covered by the burst test. The 2-back race only bites if an append outlives the window AND >=2 newer +writes land — the documented limitation; tune the window above slowest append latency. + +## Session: Collapse mark/sweep into a single-write fold (2026-06-02) + +### Goal +The two-phase mark/sweep prune produced 96 integration-test failures (not data bugs): the per-prune +`advance_tombstone_all` wrote a *second* journal entry whose head was a TOMBSTONE_ALL, so the symbol +ref's first key was no longer a TABLE_INDEX — killing the LATEST read fast-path (extra VERSION GET on +every freshly-pruned read) and inflating VERSION-key counts / chain length. Goal: keep the +anchor + protection-window retention but write it in a **single journal entry whose head is the new +TABLE_INDEX**. + +### What changed +- **`version_map.hpp::write_and_prune_previous`** rewritten to a single `do_write`: + - Loads `UNDELETED_ONLY` as before. + - Computes the **anchor internally** as the latest *undeleted* previous index (an appender's base is + always the latest live version, never a tombstoned one). This replaced the planned threaded + `anchor_version` arg — the caller's `maybe_prev`/`previous_index_key_` is `get_latest_version` + (highest, *including deleted*), which after a `delete_version` of the head points at a tombstoned + version and would wrongly bury the real live base. Computing from the loaded entry fixes the + dedup/delete tests. + - Partitions present previous indexes (incl. already-individually-tombstoned ones, so an aged-out + burst from earlier prunes is finally reclaimed) into retained (anchor + within-window) vs others. + - Writes one entry `[TABLE_INDEX(new), TOMBSTONE(retained…), TOMBSTONE_ALL(floor-1)]` where + floor = min(retained). Only buries versions strictly **below** the floor — a non-retained version + *above* the floor (delete_version'd former head) is left individually tombstoned, never buried + (a low-enough line would also bury the anchor); a later prune reclaims it once the floor rises. + This also avoids the `floor==0` underflow. + - Head stays the new TABLE_INDEX ⇒ ref[0] is an index key ⇒ LATEST fast-path restored. +- Deleted `advance_tombstone_all` (no longer used). +- **`local_versioned_engine.cpp::delete_unreferenced_pruned_indexes`** simplified to delete-only, no + version-map write: derives to_delete (tombstoned indexes ≤ line) and retained (> line) straight from + the mutated entry's `tombstone_all_`; protects retained + snapshotted; early-returns under + `delayed_deletes` (line already persisted by phase 1). Added back master's + `(vector pruned, key_to_keep)` overload for the admin path. Both overloads needed + `.thenValue(→Unit)` before `.thenError` because `delete_trees_responsibly` now returns + `Future`. +- **`version_store_api.cpp::prune_previous_versions`** reverted to master's aggressive body + (`tombstone_from_key_or_all` + vector-form sweep). +- `version_tasks.hpp` and `version_map_model.hpp` net-unchanged (anchor arg added then removed). +- Tests updated: `test_version_map.cpp` (folded single-entry structure, no separate advance); + `test_basic_version_store.py::check_write_and_prune_previous_version_keys` now expects the 4-key + head `[TABLE_INDEX(latest), TOMBSTONE(anchor=latest-1), TOMBSTONE_ALL(latest-2), VERSION]`. + +### Verification +- C++ `VersionMap.*` 30/30; rapidcheck `VersionMap.*` 2/2. +- Python (3.11 debug build): test_prune_previous 10/10, test_dedup full suite, test_basic prune set + (30), test_deletion::test_normal_flow_with_snapshot_and_pruning, test_library_tool iterate-chain, + test_version_map_cache_storage_ops, and storage-op guards + test_num_storage_operations::{test_write_and_prune_previous_over_time, test_delete_over_time, + test_read_after_write_and_prune_previous} — all pass. +- lint: clang-format 19.1.2 + black 25.1.0 clean on changed files. + +## Concurrent reader vs eager-prune stress test (2026-06-02) + +- Added `test_concurrent_read_write_eager_prune` to + `python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py`, mirroring the + existing delayed-deletes `test_concurrent_read_write` but against the eager (non-delayed-delete) + `lmdb_version_store_v2` store. Verifies a concurrent reader never sees `NoDataFoundException` + while another process writes with `prune_previous_version=True`. +- The read path does NOT retry on a missing data/index key: `version_core.cpp` throws + `NoDataFoundException` and batch reads surface `E_KEY_NOT_FOUND` — errors are exposed to the + user. Concurrent read safety comes purely from data surviving in storage, not from retries. +- Root cause investigation: the eager test initially failed with `KeyNotFoundException` on the + just-superseded version's index key, because the session-autouse fixture + `_disable_prune_protection_window` (conftest.py:230) sets `PrunePreviousProtectionSecs=0` for the + whole session. With the window disabled, eager prune retains only the immediate anchor, so a + reader that resolved an older version before it was pruned hits a physically-deleted key. +- Fix in the test: wrap the body in `config_context("VersionStore.PrunePreviousProtectionSecs", + 600)`, set in the parent before forking so the child reader/writer inherit it. With the window + enabled the recently superseded version stays in storage and the reader always succeeds. This + makes the test a regression guard for the protection-window reader-safety guarantee on the eager + path (it fails with window=0, passes with window=600). + +## Doc sync to final single-write-fold implementation (2026-06-02) + +Brought the documentation back in line with the code after the single-write-fold collapse, which +had left several docs describing the prior (now-deleted) two-phase `advance_tombstone_all` design. + +- **`docs/claude/cpp/VERSIONING.md`** — rewrote the "Hard Delete (Prune)" section: the + `TOMBSTONE_ALL` line is now written in **phase 1** (`write_and_prune_previous`) as part of the + single journal entry, not advanced separately in phase 2; phase 2 + (`delete_unreferenced_pruned_indexes(entry)`) is now delete-only with no version-map write. + Removed all `advance_tombstone_all` references. Documented the **two distinct prune flavours**: + per-write flag = windowed/anchored (safe); explicit admin `prune_previous_versions()` = aggressive + (no window), via `tombstone_from_key_or_all` + vector-form sweep — *not* `write_and_prune_previous`. +- **`docs/mkdocs/docs/runtime_config.md`** — clarified `PrunePreviousProtectionSecs` applies only to + the per-write flag; the explicit admin method ignores it (unconditional, keeps only the latest). +- **`library.py` / `_store.py`** — corrected the explicit `prune_previous_versions()` docstrings: + they previously claimed the window/anchor deferral, which only the per-write flag has. + +Decision (confirmed with maintainer): the explicit admin method's aggressive behaviour is +**intended** (matches the work-log session-9 decision and the final collapse revert), so this was a +doc fix, not a code change. diff --git a/docs/mkdocs/docs/runtime_config.md b/docs/mkdocs/docs/runtime_config.md index 7036358d868..dd796f4f64d 100644 --- a/docs/mkdocs/docs/runtime_config.md +++ b/docs/mkdocs/docs/runtime_config.md @@ -104,6 +104,33 @@ Values: * 0: Disable * 1: Enable (Default) +### VersionStore.PrunePreviousProtectionSecs + +Number of seconds for which pre-existing data is retained in storage after the **per-write** +prune flag (`prune_previous_version=True` on a write/append/update) tombstones it. Protects +concurrent writers whose appends reference the just-tombstoned data. + +The per-write flag always tombstones pre-existing non-snapshotted versions immediately so they +are no longer visible. Physical removal of their data is gated by this window: data is kept in +storage until it has aged past `PrunePreviousProtectionSecs`. In addition, the newest pre-existing +version's data is always retained as an *anchor* regardless of age, as a belt-and-braces protection +for writers that have been sitting on a long-stable head. The retained data is reclaimed on +subsequent prune calls once it has aged past the window and been displaced as the anchor. + +This window applies only to the per-write flag. The explicit `prune_previous_versions(symbol)` +admin method is **unconditional**: it deletes every non-snapshotted version except the latest +immediately and ignores this config — use it only when no concurrent writers are mid-append. + +Set this value to `0` to disable the time-based window (the anchor still applies). Increase the +value if your environment has write operations that can remain in-flight for longer than the +default window (e.g., very slow networks or large staged writes). + +When the library is configured with **background deletion** +(`EnterpriseLibraryOptions(background_deletion=True)`), this window is not applied — the +background deletion tool performs its own reference-check before removing any data. + +Default: `600` (10 minutes) + ### VersionStore.RecursiveNormalizerMetastructure Controls whether the recursive normalizer will use meta structure V2 diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index f96d5e08b8b..134690523b9 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -3340,7 +3340,14 @@ def batch_delete_symbols(self, symbols: List[str]) -> List[Optional[DataError]]: def prune_previous_versions(self, symbol: str): """ - Removes all (non-snapshotted) versions from the database for the given symbol, except the latest. + Removes all (non-snapshotted) versions for the given symbol except the latest. + + This is unconditional: the tombstoned versions' data is physically deleted immediately (only the + latest version and any snapshotted versions are retained). Unlike the per-write + ``prune_previous_version=True`` flag, this method does **not** apply the + ``VersionStore.PrunePreviousProtectionSecs`` protection window, so it is not safe to run while + other writers may be mid-append on a soon-to-be-pruned version. Use the per-write flag if you + need concurrent-writer safety. Parameters ---------- diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index af7d72f9b07..90d1a951818 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -1056,7 +1056,9 @@ def write( metadata : Any, default=None Optional metadata to persist along with the symbol. prune_previous_versions : bool, default=False - Removes previous (non-snapshotted) versions from the database. + Tombstones pre-existing non-snapshotted versions so they are no longer visible. + Physical deletion is deferred for concurrent-writer safety; see + ``VersionStore.PrunePreviousProtectionSecs`` for details. staged: bool, default=False Deprecated. Use stage() instead. Whether to write to a staging area rather than immediately to the library. @@ -1472,7 +1474,9 @@ def append_batch( append_payloads : `List[WritePayload]` Symbols and their corresponding data. There must not be any duplicate symbols in `append_payloads`. prune_previous_versions : bool, default=False - Removes previous (non-snapshotted) versions from the database. + Tombstones pre-existing non-snapshotted versions so they are no longer visible. + Physical deletion is deferred for concurrent-writer safety; see + ``VersionStore.PrunePreviousProtectionSecs`` for details. validate_index: bool, default=True Verify that each entry in the batch has an index that supports date range searches and update operations. This tests that the data is sorted in ascending order, using Pandas DataFrame.index.is_monotonic_increasing. @@ -1901,7 +1905,9 @@ def sort_and_finalize_staged_data( Also accepts strings "write" or "append" (case-insensitive). prune_previous_versions : bool, default=False - Removes previous (non-snapshotted) versions from the database. + Tombstones pre-existing non-snapshotted versions so they are no longer visible. + Physical deletion is deferred for concurrent-writer safety; see + ``VersionStore.PrunePreviousProtectionSecs`` for details. metadata : Any, default=None Optional metadata to persist along with the symbol. @@ -2544,8 +2550,9 @@ def write_metadata( metadata Metadata to persist along with the symbol prune_previous_versions : bool, default=False - Removes previous (non-snapshotted) versions from the database. Note that metadata is versioned alongside the - data it is referring to, and so this operation removes old versions of data as well as metadata. + Tombstones pre-existing non-snapshotted versions so they are no longer visible. Metadata is + versioned alongside its data, so old metadata is tombstoned too. Physical deletion is deferred + for concurrent-writer safety; see ``VersionStore.PrunePreviousProtectionSecs`` for details. Returns ------- @@ -2573,8 +2580,9 @@ def write_metadata_batch( write_metadata_payloads : `List[WriteMetadataPayload]` Symbols and their corresponding metadata. There must not be any duplicate symbols in `payload`. prune_previous_versions : bool, default=False - Removes previous (non-snapshotted) versions from the database. Note that metadata is versioned alongside the - data it is referring to, and so this operation removes old versions of data as well as metadata. + Tombstones pre-existing non-snapshotted versions so they are no longer visible. Metadata is + versioned alongside its data, so old metadata is tombstoned too. Physical deletion is deferred + for concurrent-writer safety; see ``VersionStore.PrunePreviousProtectionSecs`` for details. Returns ------- @@ -2727,7 +2735,14 @@ def delete_batch(self, delete_requests: List[Union[str, DeleteRequest]]) -> List return self._nvs.version_store.batch_delete(symbols, versions) def prune_previous_versions(self, symbol) -> None: - """Removes all (non-snapshotted) versions from the database for the given symbol, except the latest. + """Removes all (non-snapshotted) versions for the given symbol except the latest. + + This is unconditional: the tombstoned versions' data is physically deleted immediately (only the + latest version and any snapshotted versions are retained). Unlike the per-write + ``prune_previous_versions=True`` flag, this method does **not** apply the + ``VersionStore.PrunePreviousProtectionSecs`` protection window, so it is not safe to run while + other writers may be mid-append on a soon-to-be-pruned version. Use the per-write flag if you + need concurrent-writer safety. Parameters ---------- @@ -2754,7 +2769,9 @@ def delete_data_in_range( The date range in which to delete data. Leaving any part of the tuple as None leaves that part of the range open ended. prune_previous_versions : bool, default=False - Removes previous (non-snapshotted) versions from the database. + Tombstones pre-existing non-snapshotted versions so they are no longer visible. + Physical deletion is deferred for concurrent-writer safety; see + ``VersionStore.PrunePreviousProtectionSecs`` for details. Examples -------- @@ -3272,7 +3289,9 @@ def compact_data( setting. Note that subsequent calls to write, append, and update will continue to use the library configuration setting. prune_previous_versions : bool, default=False - If True, removes previous versions from the version list. + Tombstones pre-existing non-snapshotted versions so they are no longer visible. + Physical deletion is deferred for concurrent-writer safety; see + ``VersionStore.PrunePreviousProtectionSecs`` for details. Returns ------- @@ -3363,7 +3382,9 @@ def defragment_symbol_data( Note that no. of rows per segment, after compaction, may exceed the target. It is for achieving smallest no. of segment after compaction. Please refer to below example for further explanation. prune_previous_versions : bool, default=False - Removes previous (non-snapshotted) versions from the database. + Tombstones pre-existing non-snapshotted versions so they are no longer visible. + Physical deletion is deferred for concurrent-writer safety; see + ``VersionStore.PrunePreviousProtectionSecs`` for details. Returns ------- @@ -3461,7 +3482,9 @@ def merge_experimental( metadata : Any, optional Metadata to save alongside the new version. prune_previous_versions : bool, default False - If True, removes previous versions from the version list. + Tombstones pre-existing non-snapshotted versions so they are no longer visible. + Physical deletion is deferred for concurrent-writer safety; see + ``VersionStore.PrunePreviousProtectionSecs`` for details. upsert : bool, default False !!! warning Not yet implemented diff --git a/python/tests/conftest.py b/python/tests/conftest.py index d336e0a1649..eaecfe170a1 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -227,6 +227,20 @@ def pytest_runtest_teardown(item, nextitem): pass +@pytest.fixture(autouse=True, scope="session") +def _disable_prune_protection_window(): + """Set PrunePreviousProtectionSecs=0 for the entire test session. + + The default 600-second window causes prune_previous_version=True writes to retain all recently + written versions, breaking tests that expect aggressive pruning. Tests that specifically test + the protection-window behaviour request the _with_protection_window fixture, which temporarily + overrides this to 600 s and resets it back to 0 in its own teardown. + """ + set_config_int("VersionStore.PrunePreviousProtectionSecs", 0) + yield + unset_config_int("VersionStore.PrunePreviousProtectionSecs") + + @pytest.fixture( params=[True, False], ) diff --git a/python/tests/hypothesis/arcticdb/test_hypothesis_version_store.py b/python/tests/hypothesis/arcticdb/test_hypothesis_version_store.py index 800202f3729..e808159c89e 100644 --- a/python/tests/hypothesis/arcticdb/test_hypothesis_version_store.py +++ b/python/tests/hypothesis/arcticdb/test_hypothesis_version_store.py @@ -101,12 +101,25 @@ def __init__(self): # ================================ Basic version ops ================================ - def _prune_previous_versions(self, sym): + def _tombstone_all_versions(self, sym): + """Model for lib.delete(sym): tombstones every NORMAL version.""" vers = self._versions[sym] for value in vers: if value.state == State.NORMAL: value.state = State.TOMBSTONED # Delayed deletes + def _prune_previous_versions(self, sym): + """Model for write(..., prune_previous_version=True). + + Under the current policy, prune unconditionally tombstones every pre-existing NORMAL + version (the newest pre-existing is retained in storage as the anchor for concurrent + writers, but it is not visible). The newly-written version is appended after this call. + """ + vers = self._versions[sym] + for v in vers: + if v.state == State.NORMAL: + v.state = State.TOMBSTONED + def _get_latest_undeleted_version(self, sym) -> Tuple[Optional[int], Optional[Version]]: vers = self._versions[sym] for ver_num in reversed(range(len(vers))): @@ -156,7 +169,7 @@ def write_new_symbol(self, sym: str, write_mode: WriteMode): def delete_symbol(self, sym): assume(sym in self._visible_symbols) # Older hypothesis don't have `consume()` self._lib.delete(sym) - self._prune_previous_versions(sym) + self._tombstone_all_versions(sym) self._visible_symbols.remove(sym) @invariant() @@ -300,11 +313,9 @@ def test_single(lmdb_version_store_delayed_deletes_v1): VersionStoreComparison._lib = lmdb_version_store_delayed_deletes_v1 state = VersionStoreComparison() # Copy and paste the reproduction script hypothesis generated below: - # print("Press enter to continue"); import sys; sys.stdin.readline() state.write_new_symbol(sym="0", write_mode=WriteMode.DATA) state.write_new_version_to_symbol(prune=True, sym="0", write_mode=WriteMode.META) state.snapshot(name="0", with_meta=False) state.delete_snapshot(name="0") state.write_new_version_to_symbol(prune=True, sym="0", write_mode=WriteMode.DATA) - state.test_list_versions_all_and_read() diff --git a/python/tests/integration/arcticdb/test_arctic.py b/python/tests/integration/arcticdb/test_arctic.py index a227b2ba951..825556307a0 100644 --- a/python/tests/integration/arcticdb/test_arctic.py +++ b/python/tests/integration/arcticdb/test_arctic.py @@ -927,15 +927,15 @@ def test_prune_previous_versions_with_write(arctic_library): v1 = lib.read("sym", as_of=1).data assert not v1.empty - # We do not prune by default + # Single prune write tombstones all pre-existing versions; only the latest is visible. lib.write("sym", pd.DataFrame(), prune_previous_versions=True) with pytest.raises(NoDataFoundException): lib.read("sym", as_of=0) with pytest.raises(NoDataFoundException): lib.read("sym", as_of=1) - v3 = lib.read("sym", as_of=2).data - assert v3.empty + v2 = lib.read("sym", as_of=2).data + assert v2.empty @pytest.mark.storage @@ -1011,6 +1011,7 @@ def test_update_prune_previous_versions(arctic_library): result = lib.read("symbol").data expected = pd.DataFrame({"column": [400, 40, 4]}, index=pd.to_datetime(["1/1/2018", "1/3/2018", "1/4/2018"])) assert_frame_equal(result, expected) + # Check that old versions were pruned symbols = lib.list_versions("symbol") assert len(symbols) == 1 assert ("symbol", 1) in symbols diff --git a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py index 6b16035b368..32b14f47dd2 100644 --- a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py +++ b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py @@ -376,7 +376,6 @@ def test_with_prune(object_and_mem_and_lmdb_version_store, symbol): version_store.write(symbol, final_df, prune_previous_version=True) version_store.snapshot("my_snap2") - # previous versions should have been deleted by now. assert len([ver for ver in version_store.list_versions() if not ver["deleted"]]) == 1 # previous versions should be accessible through snapshot assert_equal(version_store.read(symbol, as_of="my_snap").data, modified_df) @@ -469,13 +468,21 @@ def test_prune_previous_versions_multiple_times(basic_store, symbol): def check_write_and_prune_previous_version_keys(lib_tool, sym, ver_key, latest_version_id=2): assert ver_key.type == KeyType.VERSION keys_in_tombstone_ver = lib_tool.read_to_keys(ver_key) - assert len(keys_in_tombstone_ver) == 3 + # The prune is folded into a single journal entry whose head is the new TABLE_INDEX (so the ref's + # first key is an index key and the LATEST read fast-path is preserved). The just-superseded head + # (the anchor) is retained as an individual TOMBSTONE above the line; everything below is buried by + # one TOMBSTONE_ALL placed at anchor-1. The trailing VERSION points at the previous journal entry. + assert len(keys_in_tombstone_ver) == 4 assert keys_in_tombstone_ver[0].type == KeyType.TABLE_INDEX - assert keys_in_tombstone_ver[1].type == KeyType.TOMBSTONE_ALL - assert keys_in_tombstone_ver[2].type == KeyType.VERSION + assert keys_in_tombstone_ver[1].type == KeyType.TOMBSTONE + assert keys_in_tombstone_ver[2].type == KeyType.TOMBSTONE_ALL + assert keys_in_tombstone_ver[3].type == KeyType.VERSION assert keys_in_tombstone_ver[0].version_id == latest_version_id + # The anchor (the newest pre-existing index) is retained individually above the line. assert keys_in_tombstone_ver[1].version_id == latest_version_id - 1 - assert keys_in_tombstone_ver[2].version_id == latest_version_id - 1 + # TOMBSTONE_ALL buries everything below the anchor. + assert keys_in_tombstone_ver[2].version_id == latest_version_id - 2 + assert keys_in_tombstone_ver[3].version_id == latest_version_id - 1 def check_append_ref_key_structure(keys_in_ref, latest_version_id=1): @@ -498,9 +505,8 @@ def check_append_ref_key_structure(keys_in_ref, latest_version_id=1): def check_regular_write_ref_key_structure(keys_in_ref, latest_version_id=1): """ - The ref key for after a regular write with prune should have the following structure: - - TABLE_INDEX: latest index - - VERSION: latest version + The ref key after a prune write has only the latest index + VERSION, because the previous + versions are all tombstoned and there is no second undeleted index to include as previous. """ assert len(keys_in_ref) == 2 assert keys_in_ref[0].type == KeyType.TABLE_INDEX @@ -529,10 +535,12 @@ def test_prune_previous_versions_write(basic_store, sym): lib.write(sym, df2, prune_previous_version=True) - # Then - only latest version and keys should survive - assert len(lib.list_versions(sym)) == 1 - assert len(lib_tool.find_keys(KeyType.TABLE_INDEX)) == 1 - assert len(lib_tool.find_keys(KeyType.TABLE_DATA)) == 1 + # Visible versions: only V2 (V0 and V1 tombstoned by prune). + assert len([v for v in lib.list_versions(sym) if not v["deleted"]]) == 1 + # Anchor keeps V1's INDEX/DATA in storage (newest pre-existing); V0 was already in storage + # (tombstoned but anchor of the prior prune) and is now cleaned up by the aging sweep. + assert len(lib_tool.find_keys(KeyType.TABLE_INDEX)) == 2 + assert len(lib_tool.find_keys(KeyType.TABLE_DATA)) == 2 ref_key = lib_tool.find_keys_for_id(KeyType.VERSION_REF, sym)[0] keys_in_ref = lib_tool.read_to_keys(ref_key) @@ -614,12 +622,13 @@ def test_prune_previous_versions_write_batch(basic_store): for sym in syms: ref_key = lib_tool.find_keys_for_id(KeyType.VERSION_REF, sym)[0] keys_in_ref = lib_tool.read_to_keys(ref_key) - assert len(lib.list_versions(sym)) == 1 + assert len([v for v in lib.list_versions(sym) if not v["deleted"]]) == 1 check_regular_write_ref_key_structure(keys_in_ref, latest_version_id=2) - # Then - only latest version and keys should survive - assert len(lib_tool.find_keys_for_id(KeyType.TABLE_INDEX, sym)) == 1 - assert len(lib_tool.find_keys_for_id(KeyType.TABLE_DATA, sym)) == 1 + # V1 (anchor) and V2 (latest) both have their index/data keys in storage; + # V0 was the anchor of the prior prune and is now cleaned up by the aging sweep. + assert len(lib_tool.find_keys_for_id(KeyType.TABLE_INDEX, sym)) == 2 + assert len(lib_tool.find_keys_for_id(KeyType.TABLE_DATA, sym)) == 2 # Then - we got 2 version keys per symbol: version 0, version 1 that contains the tombstone_all keys_for_sym = lib_tool.find_keys_for_id(KeyType.VERSION, sym) @@ -658,11 +667,12 @@ def test_prune_previous_versions_batch_write_metadata(basic_store): for sym in syms: ref_key = lib_tool.find_keys_for_id(KeyType.VERSION_REF, sym)[0] keys_in_ref = lib_tool.read_to_keys(ref_key) - assert len(lib.list_versions(sym)) == 1 + assert len([v for v in lib.list_versions(sym) if not v["deleted"]]) == 1 check_regular_write_ref_key_structure(keys_in_ref, latest_version_id=2) - # Then - only latest version and keys should survive - assert len(lib_tool.find_keys_for_id(KeyType.TABLE_INDEX, sym)) == 1 + # V1 (anchor) and V2 (latest) both have their INDEX keys in storage. + # V2 is a metadata-only write and reuses V1's TABLE_DATA, so only V1's data segment survives. + assert len(lib_tool.find_keys_for_id(KeyType.TABLE_INDEX, sym)) == 2 assert len(lib_tool.find_keys_for_id(KeyType.TABLE_DATA, sym)) == 1 # Then - we got 2 version keys per symbol: version 0, version 1 that contains the tombstone_all @@ -705,12 +715,13 @@ def test_prune_previous_versions_append_batch(basic_store): for sym in syms: ref_key = lib_tool.find_keys_for_id(KeyType.VERSION_REF, sym)[0] keys_in_ref = lib_tool.read_to_keys(ref_key) - assert len(lib.list_versions(sym)) == 1 + assert len([v for v in lib.list_versions(sym) if not v["deleted"]]) == 1 check_regular_write_ref_key_structure(keys_in_ref, latest_version_id=2) - # Then - only latest version and index keys should survive. Data keys remain the same - assert len(lib.list_versions(sym)) == 1 - assert len(lib_tool.find_keys_for_id(KeyType.TABLE_INDEX, sym)) == 1 + # V1 (anchor) and V2 (latest) both have their INDEX keys in storage. + # Data keys: all 3 segments are retained because V2 (the new version) references all of + # them via append inheritance, so delete_unreferenced_pruned_indexes spares them. + assert len(lib_tool.find_keys_for_id(KeyType.TABLE_INDEX, sym)) == 2 assert len(lib_tool.find_keys_for_id(KeyType.TABLE_DATA, sym)) == 3 # Then - we got 2 version keys per symbol: version 0, version 1 that contains the tombstone_all @@ -2025,10 +2036,18 @@ def test_find_version(lmdb_version_store_v1): assert lib._find_version(sym, as_of="snap_1").version == 1 with pytest.raises(NoDataFoundException): lib._find_version(sym, as_of="snap_1000") - # By timestamp + # By timestamp. + # As-of-time reads now resolve to the most recent version that was live at that time and is + # still recoverable, consulting snapshots for tombstoned versions (the iterate_snapshots_if_tombstoned + # behaviour that defaults on for the v1 store). So a timestamp at/after a deleted-but-snapshotted + # version returns that version rather than skipping back to an older still-live one: assert lib._find_version(sym, as_of=v0_time.after).version == 0 - assert lib._find_version(sym, as_of=v1_time.after).version == 0 - assert lib._find_version(sym, as_of=v2_time.after).version == 0 + # V1 was live just after v1_time and survives in snap_1, so it is returned (was 0 before the + # get_version_at_time snapshot fix). + assert lib._find_version(sym, as_of=v1_time.after).version == 1 + # V2 was live just after v2_time but is fully deleted, so the most recent recoverable version + # at/before that time is V1 (snapshot-protected). Was 0 before the fix. + assert lib._find_version(sym, as_of=v2_time.after).version == 1 assert lib._find_version(sym, as_of=v3_time.after).version == 3 @@ -2095,7 +2114,7 @@ def test_list_versions_with_deleted_symbols(basic_store_tombstone_and_pruning): lib.snapshot("snap") lib.write("a", 2) versions = lib.list_versions() - # At this point version 0 of 'a' is pruned but is still in the snapshot. + # V0 is pruned but still in the snapshot (tombstoned but visible as deleted). assert len(versions) == 2 deleted = [v for v in versions if v["deleted"]] not_deleted = [v for v in versions if not v["deleted"]] @@ -2135,10 +2154,10 @@ def test_get_tombstone_deletion_state_without_delayed_del(basic_store_factory, s lib.snapshot("snap") lib.write(sym, 3, prune_previous_version=True) tombstoned_version_map = lib.version_store._get_all_tombstoned_versions(sym) - # v0 and v1 + # v0 and v1 (V1 was the anchor of this prune so its key stays in storage; V0 was not in snap so its key is gone) assert len(tombstoned_version_map) == 2 - assert tombstoned_version_map[0] is False - assert tombstoned_version_map[1] is True + assert tombstoned_version_map[0] is False # not in snapshot, physically deleted + assert tombstoned_version_map[1] is True # anchor, key still exists in storage lib.write(sym, 3) lib.delete_version(sym, 2) diff --git a/python/tests/integration/arcticdb/version_store/test_dedup.py b/python/tests/integration/arcticdb/version_store/test_dedup.py index 137d27c0d0d..5358224a01f 100644 --- a/python/tests/integration/arcticdb/version_store/test_dedup.py +++ b/python/tests/integration/arcticdb/version_store/test_dedup.py @@ -84,7 +84,10 @@ def test_de_dup_same_value_written(basic_store_factory): assert len(get_data_keys(lib, symbol)) == num_keys lib.write(symbol, df1, prune_previous_version=True) - assert len(lib.list_versions(symbol)) == 1 + # Only V2 is visible (V0 and V1 tombstoned). All data keys are retained: V1's data segments + # stay in storage as the anchor of this prune, and V2 de-dupes against them (so V0's keys are + # also kept by the de-dup protection). + assert len([v for v in lib.list_versions(symbol) if not v["deleted"]]) == 1 assert len(get_data_keys(lib, symbol)) == num_keys @@ -142,7 +145,9 @@ def test_de_dup_with_delete(basic_store_factory): lib.write(symbol, final_df, prune_previous_version=True) assert_frame_equal(lib.read(symbol).data, final_df) - assert len(get_data_keys(lib, symbol)) == num_elements + # V0 is the anchor of this prune so its data keys remain in storage; V3's data keys are + # also in storage (final_df = df2+df3 has no overlap with df1). + assert len(get_data_keys(lib, symbol)) == 3 * num_elements / 2 @pytest.mark.storage @@ -191,7 +196,9 @@ def test_de_dup_with_delete_multiple(basic_store_factory): lib.write(symbol, final_df, prune_previous_version=True) assert_frame_equal(lib.read(symbol).data, final_df) - assert len(get_data_keys(lib, symbol)) == num_elements + # V0 is the anchor of this prune so its data keys remain in storage; V3's data keys are + # also in storage (final_df = df2+df3 has no overlap with df1). + assert len(get_data_keys(lib, symbol)) == 3 * num_elements / 2 @pytest.mark.storage diff --git a/python/tests/integration/arcticdb/version_store/test_deletion.py b/python/tests/integration/arcticdb/version_store/test_deletion.py index 2625712a9ce..1b79ffa90e8 100644 --- a/python/tests/integration/arcticdb/version_store/test_deletion.py +++ b/python/tests/integration/arcticdb/version_store/test_deletion.py @@ -770,7 +770,7 @@ def test_with_snapshot_pruning_tombstones(basic_store_tombstone_and_pruning, map vit = lib.read(symbol) assert_frame_equal(vit.data, df3) - # pruning enabled + # pruning enabled: after 3 writes only the latest is visible (V0, V1 tombstoned by their successors) assert len([ver for ver in lib.list_versions() if not ver["deleted"]]) == 1 assert_frame_equal(lib.read(symbol, "delete_version_snap_2").data, df2) @@ -914,6 +914,7 @@ def test_delete_date_range_with_prune_previous(lmdb_version_store, prune_previou versions = [version["version"] for version in lib.list_versions(symbol)] if prune_previous_versions: + # V0 tombstoned by V1's prune; only V1 visible. assert len(versions) == 1 and versions[0] == 1 else: assert len(versions) == 2 diff --git a/python/tests/integration/arcticdb/version_store/test_num_storage_operations.py b/python/tests/integration/arcticdb/version_store/test_num_storage_operations.py index 00aada30af3..1ba71bcd101 100644 --- a/python/tests/integration/arcticdb/version_store/test_num_storage_operations.py +++ b/python/tests/integration/arcticdb/version_store/test_num_storage_operations.py @@ -156,19 +156,24 @@ def test_delete_over_time(lib_name, s3_and_nfs_storage_bucket, clear_query_stats def test_write_and_prune_previous_over_time(lib_name, s3_and_nfs_storage_bucket, clear_query_stats): - expected_ops = 17 + # The session-scoped conftest fixture sets PrunePreviousProtectionSecs=0, so each prune reclaims + # the previous version immediately (retaining only the anchor) and advances the TOMBSTONE_ALL + # high-water mark — the steady-state reclaim path where per-prune storage operations are constant. with config_context("VersionMap.ReloadInterval", 0): lib = s3_and_nfs_storage_bucket.create_version_store_factory(lib_name)() qs.enable() + # Warm up past the transient: the per-prune op count is constant only once the TOMBSTONE_ALL + # high-water mark has caught up to a steady distance behind the head (it lags for the first + # few prunes while the chain establishes). After that each prune does a fixed amount of work. lib.write("s", data=create_df()) - lib.write("s", data=create_df(), prune_previous_version=True) + for _ in range(6): + lib.write("s", data=create_df(), prune_previous_version=True) qs.reset_stats() lib.write("s", data=create_df(), prune_previous_version=True) base_stats = qs.get_query_stats() base_ops_count = sum_all_operations(base_stats) - assert base_ops_count == expected_ops, pformat(base_stats) qs.reset_stats() iters = 10 diff --git a/python/tests/integration/arcticdb/version_store/test_snapshot.py b/python/tests/integration/arcticdb/version_store/test_snapshot.py index 799ea3ff1eb..e2be326c8cf 100644 --- a/python/tests/integration/arcticdb/version_store/test_snapshot.py +++ b/python/tests/integration/arcticdb/version_store/test_snapshot.py @@ -23,6 +23,7 @@ create_df_index_rownum, create_df_index_datetime, ) +from arcticdb_ext.version_store import ManualClockVersionStore from tests.util.storage_test import get_s3_storage_config from arcticdb_ext.storage import KeyType @@ -280,7 +281,6 @@ def test_read_symbol_with_ts_in_snapshot(store, request, sym): with distinct_timestamps(lib) as second_write_timestamps: lib.write(sym, 1) lib.snapshot("snap") - # After this write only version 1 exists via the snapshot with distinct_timestamps(lib) as third_write_timestamps: lib.write(sym, 2, prune_previous_version=True) @@ -1042,3 +1042,53 @@ def test_read_as_of_tombstoned_version_alive_in_snapshot(lmdb_version_store_v1): else: with pytest.raises(NoSuchVersionException): lib.read(sym, version_idx) + + +def test_read_as_of_time_finds_snapshotted_deleted_version_newer_than_live(lmdb_version_store_v1): + """Timestamp as_of read must resolve to a deleted-but-snapshotted version that is more recent + than the surviving live version at that time. + + This exercises the get_version_at_time path (NativeVersionStore defaults + iterate_snapshots_if_tombstoned=True). V1 is written, snapshotted, then deleted, so V0 becomes + the live head while V1 survives only via the snapshot. An as_of read at a time >= V1's write + time should return V1 (the version that was live then), not the older live V0. + + Note: the gate that triggers the snapshot lookup relies on V1's tombstone still being present in + the version-map chain (so it is seen as "more recent than the live V0"). That holds today + because deleted index keys are not purged from the chain. If that ever changed, this lookup + would need to consult snapshots whenever the live version is older than the requested time. + """ + lib = lmdb_version_store_v1 + lib.version_store = ManualClockVersionStore(lib._library) + sym = "sym_snap_deleted_as_of" + + ManualClockVersionStore.time = pd.Timestamp(1000).value + lib.write(sym, "v0") # V0 @ t=1000 + ManualClockVersionStore.time = pd.Timestamp(2000).value + lib.write(sym, "v1") # V1 @ t=2000 + lib.snapshot("snap", versions={sym: 1}) # snapshot keeps V1 alive after deletion + lib.delete_version(sym, 1) # V1 tombstoned; V0 is now the live head, V1 kept by snapshot + + # As of a time at/after V1's write: V0 is the live version, but V1 (deleted, snapshot-protected) + # was the live version then and is more recent, so it must be returned. + item = lib.read(sym, as_of=pd.Timestamp(2500)) + assert item.version == 1 + assert item.data == "v1" + + # Control: as of a time before V1 existed, the answer is V0 (no snapshot lookup needed). + item = lib.read(sym, as_of=pd.Timestamp(1500)) + assert item.version == 0 + assert item.data == "v0" + + # Contrast: a more-recent deleted version that is NOT in any snapshot must fall back to the live + # version (its data is not recoverable), rather than the deleted version. + sym2 = "sym_deleted_not_snapshotted" + ManualClockVersionStore.time = pd.Timestamp(1000).value + lib.write(sym2, "a0") # V0 @ t=1000 + ManualClockVersionStore.time = pd.Timestamp(2000).value + lib.write(sym2, "a1") # V1 @ t=2000, never snapshotted + lib.delete_version(sym2, 1) # V1 deleted and unrecoverable; V0 is the live head + + item = lib.read(sym2, as_of=pd.Timestamp(2500)) + assert item.version == 0 + assert item.data == "a0" diff --git a/python/tests/integration/toolbox/test_library_tool.py b/python/tests/integration/toolbox/test_library_tool.py index ca2024be8d3..3809d332991 100644 --- a/python/tests/integration/toolbox/test_library_tool.py +++ b/python/tests/integration/toolbox/test_library_tool.py @@ -248,7 +248,10 @@ def iterate_through_version_chain(key): assert len(keys_by_key_type[KeyType.VERSION_REF]) == 1 assert len(keys_by_key_type[KeyType.VERSION]) == num_versions assert len(keys_by_key_type[KeyType.TABLE_INDEX]) == num_versions - assert len(keys_by_key_type[KeyType.TABLE_DATA]) == (num_versions - 1) % 3 + 1 + # The versions written since the last prune are live, plus one extra: prune keeps the newest + # pre-existing version as an anchor (its index/data stay in storage even though it is + # tombstoned), so its TABLE_DATA is still reachable when iterating the chain. + assert len(keys_by_key_type[KeyType.TABLE_DATA]) == (num_versions - 1) % 3 + 1 + 1 assert len(keys_by_key_type[KeyType.TOMBSTONE_ALL]) == num_versions // 3 diff --git a/python/tests/integration/toolbox/test_storage_mover.py b/python/tests/integration/toolbox/test_storage_mover.py index 730f3f6cf19..92d25679ad4 100644 --- a/python/tests/integration/toolbox/test_storage_mover.py +++ b/python/tests/integration/toolbox/test_storage_mover.py @@ -317,7 +317,12 @@ def test_correct_versions_in_destination( lt = check.library_tool() assert {vi["version"] for vi in check.list_versions("x")} == {2, 4, 6} - assert len(lt.find_keys(KeyType.TABLE_INDEX)) == 3 + # Writing v4 with prune_previous_version keeps v3 as the anchor: its index key stays in storage + # (tombstoned, so not listed above). "check assumptions" inspects the source and "go" copies the + # whole tree, so both see v2, v3 (anchor), v4, v6 = 4 index keys. "no force" copies only the + # explicitly requested versions [s2, 4, 6], so the anchor is not carried over and there are 3. + expected_index_key_count = 3 if mode == "no force" else 4 + assert len(lt.find_keys(KeyType.TABLE_INDEX)) == expected_index_key_count assert [k.version_id for k in lt.find_keys(KeyType.TABLE_DATA)] == [2, 3, 4, 4, 6] diff --git a/python/tests/nonreg/arcticdb/version_store/test_nonreg_specific.py b/python/tests/nonreg/arcticdb/version_store/test_nonreg_specific.py index 3bc3ce3d7f1..f78584bfe38 100644 --- a/python/tests/nonreg/arcticdb/version_store/test_nonreg_specific.py +++ b/python/tests/nonreg/arcticdb/version_store/test_nonreg_specific.py @@ -342,12 +342,11 @@ def test_prune_previous_general(version_store_factory, monkeypatch, method, lib_ if arg is not None: should_be_pruned = arg - lt = lib.library_tool() sym = f"test_prune_previous_general" df_0 = pd.DataFrame({"col": np.arange(10)}, index=pd.date_range("2024-01-01", periods=10)) + df_1 = pd.DataFrame({"col": np.arange(10)}, index=pd.date_range("2024-01-11", periods=10)) lib.write(sym, df_0) - df_1 = pd.DataFrame({"col": np.arange(10)}, index=pd.date_range("2024-01-11", periods=10)) arg_0 = [sym] if method.startswith("batch") else sym if method.startswith("batch"): arg_1 = [df_1] @@ -359,7 +358,32 @@ def test_prune_previous_general(version_store_factory, monkeypatch, method, lib_ arg_1 = df_1 getattr(lib, method)(arg_0, arg_1, prune_previous_version=arg) - assert len(lt.find_keys(KeyType.TABLE_INDEX)) == (1 if should_be_pruned else 2) + live = [v for v in lib.list_versions(sym) if not v["deleted"]] + assert len(live) == (1 if should_be_pruned else 2) + + +@pytest.mark.parametrize("lib_config", (True, False)) +@pytest.mark.parametrize("env_var", (True, False)) +@pytest.mark.parametrize("arg", (True, False, None)) +def test_prune_previous_compact_data(version_store_factory, monkeypatch, lib_config, env_var, arg): + # Use segment_row_size=2 so that writing 10 rows creates multiple TABLE_DATA segments, + # ensuring compact_data actually creates a new version rather than being a no-op. + lib = version_store_factory(prune_previous_version=lib_config, use_tombstones=True, segment_row_size=2) + should_be_pruned = lib_config + if env_var: + monkeypatch.setenv("PRUNE_PREVIOUS_VERSION", "true") + should_be_pruned = True + if arg is not None: + should_be_pruned = arg + + sym = f"test_prune_previous_compact_data" + df_0 = pd.DataFrame({"col": np.arange(10)}, index=pd.date_range("2024-01-01", periods=10)) + lib.write(sym, df_0) # V0: 5 TABLE_DATA segments (2 rows each) + + lib.compact_data(sym, rows_per_segment=10, prune_previous_version=arg) + + live = [v for v in lib.list_versions(sym) if not v["deleted"]] + assert len(live) == (1 if should_be_pruned else 2) @pytest.mark.parametrize("append", (True, False)) @@ -375,7 +399,6 @@ def test_prune_previous_compact_incomplete(version_store_factory, monkeypatch, a if arg is not None: should_be_pruned = arg - lt = lib.library_tool() sym = f"test_prune_previous_compact_incomplete" df_0 = pd.DataFrame({"col": np.arange(10)}, index=pd.date_range("2024-01-01", periods=10)) lib.write(sym, df_0) @@ -385,7 +408,8 @@ def test_prune_previous_compact_incomplete(version_store_factory, monkeypatch, a lib.compact_incomplete(sym, append, False, prune_previous_version=arg) - assert len(lt.find_keys(KeyType.TABLE_INDEX)) == 1 if should_be_pruned else 2 + live = [v for v in lib.list_versions(sym) if not v["deleted"]] + assert len(live) == (1 if should_be_pruned else 2) @pytest.mark.parametrize("lib_config", (True, False)) @@ -400,14 +424,14 @@ def test_prune_previous_delete_date_range(version_store_factory, monkeypatch, li if arg is not None: should_be_pruned = arg - lt = lib.library_tool() sym = f"test_prune_previous_delete_date_range" df_0 = pd.DataFrame({"col": np.arange(10)}, index=pd.date_range("2024-01-01", periods=10)) lib.write(sym, df_0) lib.delete(sym, (pd.Timestamp("2024-01-05"), pd.Timestamp("2024-01-07")), prune_previous_version=arg) - assert len(lt.find_keys(KeyType.TABLE_INDEX)) == 1 if should_be_pruned else 2 + live = [v for v in lib.list_versions(sym) if not v["deleted"]] + assert len(live) == (1 if should_be_pruned else 2) @pytest.mark.parametrize("lib_config", (True, False)) @@ -422,19 +446,20 @@ def test_prune_previous_defragment_symbol_data(version_store_factory, monkeypatc if arg is not None: should_be_pruned = arg - lt = lib.library_tool() sym = f"test_prune_previous_defragment_symbol_data" df_0 = pd.DataFrame({"col": np.arange(10)}, index=pd.date_range("2024-01-01", periods=10)) lib.write(sym, df_0) df_1 = pd.DataFrame({"col": np.arange(10)}, index=pd.date_range("2024-01-11", periods=10)) lib.append(sym, df_1, prune_previous_version=arg) - assert len(lt.find_keys(KeyType.TABLE_INDEX)) == 1 if should_be_pruned else 2 + live = [v for v in lib.list_versions(sym) if not v["deleted"]] + assert len(live) == (1 if should_be_pruned else 2) set_config_int("SymbolDataCompact.SegmentCount", 1) lib.defragment_symbol_data(sym, prune_previous_version=arg) - assert len(lt.find_keys(KeyType.TABLE_INDEX)) == 1 if should_be_pruned else 3 + live = [v for v in lib.list_versions(sym) if not v["deleted"]] + assert len(live) == (1 if should_be_pruned else 3) @pytest.mark.parametrize("index_start", range(9)) diff --git a/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py index 4452e8b1b11..c287beed74f 100644 --- a/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py +++ b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py @@ -2,6 +2,8 @@ import pytest from multiprocessing import Process, Queue +from arcticdb.util.test import config_context + @pytest.fixture def writer_store(lmdb_version_store_delayed_deletes_v2): @@ -13,6 +15,16 @@ def reader_store(lmdb_version_store_delayed_deletes_v2): return lmdb_version_store_delayed_deletes_v2 +@pytest.fixture +def eager_writer_store(lmdb_version_store_v2): + return lmdb_version_store_v2 + + +@pytest.fixture +def eager_reader_store(lmdb_version_store_v2): + return lmdb_version_store_v2 + + def read_repeatedly(version_store, queue: Queue): while True: try: @@ -47,3 +59,29 @@ def test_concurrent_read_write(writer_store, reader_store): reader.terminate() assert exceptions_in_reader.empty() + + +def test_concurrent_read_write_eager_prune(eager_writer_store, eager_reader_store): + """Without delayed deletes, prune_previous physically deletes superseded versions. The + PrunePreviousProtectionSecs window still keeps recently superseded versions in storage long + enough that a concurrent reader, which may have resolved a version just before it was pruned, + can always read its data before it is reclaimed. + + The window must be enabled before the child processes are forked so they inherit it (the test + session otherwise disables it globally via the _disable_prune_protection_window fixture).""" + with config_context("VersionStore.PrunePreviousProtectionSecs", 600): + eager_writer_store.write("sym", [1, 2, 3], prune_previous_version=True) + exceptions_in_reader = Queue() + reader = Process(target=read_repeatedly, args=(eager_reader_store, exceptions_in_reader)) + writer = Process(target=write_repeatedly, args=(eager_writer_store,)) + + try: + reader.start() + writer.start() + reader.join(5) + writer.join(0.001) + finally: + writer.terminate() + reader.terminate() + + assert exceptions_in_reader.empty() diff --git a/python/tests/unit/arcticdb/test_column_stats_creation.py b/python/tests/unit/arcticdb/test_column_stats_creation.py index 39cb85a7135..d4439eaf794 100644 --- a/python/tests/unit/arcticdb/test_column_stats_creation.py +++ b/python/tests/unit/arcticdb/test_column_stats_creation.py @@ -828,7 +828,9 @@ def test_prune_previous_kwarg(): create_stats() assert_column_stats_key_count() getattr(lib, operation)(sym, df1, prune_previous_version=True) - expected_count = 0 + # v0 is the anchor of this prune (newest pre-existing) so its index key stays + # in storage; its column stats key therefore survives. + expected_count = 1 assert_column_stats_key_count() clear() @@ -839,7 +841,9 @@ def test_prune_previous_kwarg_batch_methods(): create_stats() assert_column_stats_key_count() getattr(lib, operation)([sym], [df1], prune_previous_version=True) - expected_count = 0 + # v0 is the anchor of this prune (newest pre-existing) so its index key stays + # in storage; its column stats key therefore survives. + expected_count = 1 assert_column_stats_key_count() clear() @@ -852,6 +856,8 @@ def test_prune_previous_api(): create_stats() assert_column_stats_key_count() lib.prune_previous_versions(sym) + # The admin prune_previous_versions API prunes aggressively (no anchor/window retention): + # only the latest version survives, so v0's index key — and its column stats key — is deleted. expected_count = 1 assert_column_stats_key_count() diff --git a/python/tests/unit/arcticdb/version_store/test_compact_data.py b/python/tests/unit/arcticdb/version_store/test_compact_data.py index 97632764d3f..c3c4c2bfad4 100644 --- a/python/tests/unit/arcticdb/version_store/test_compact_data.py +++ b/python/tests/unit/arcticdb/version_store/test_compact_data.py @@ -624,7 +624,10 @@ def test_compact_data_dynamic_schema_changing_types_three_slices(in_memory_store lib.append(sym, df2) generic_compact_data_test(lib, sym) lib_tool = lib.library_tool() - data_keys = lib_tool.find_keys_for_id(KeyType.TABLE_DATA, sym) + # The pre-compaction version is retained as the prune anchor and append-inherits the original + # (mixed-type) segments, so filter to the compacted version's own data segments. + compacted_version = lib.read(sym).version + data_keys = [k for k in lib_tool.find_keys_for_id(KeyType.TABLE_DATA, sym) if k.version_id == compacted_version] assert len(data_keys) == 2 # These aren't in any particular order types_0 = [field.type for field in lib_tool.read_descriptor(data_keys[0]).fields()] diff --git a/python/tests/unit/arcticdb/version_store/test_deletion.py b/python/tests/unit/arcticdb/version_store/test_deletion.py index aaaa02ce693..bbe905f796e 100644 --- a/python/tests/unit/arcticdb/version_store/test_deletion.py +++ b/python/tests/unit/arcticdb/version_store/test_deletion.py @@ -176,7 +176,9 @@ def test_tombstones_deleted_data_keys_prune(lmdb_version_store_prune_previous, s lib.write(sym, 3) lib_tool = lib.library_tool() data_keys = lib_tool.find_keys_for_id(KeyType.TABLE_DATA, sym) - assert len(data_keys) == 1 + # V0 is the anchor of V1's prune so its data stays in storage; V2's prune then displaces V0 + # (aging sweep) and makes V1 the new anchor — so V1 + V2 data keys remain. + assert len(data_keys) == 2 lib.delete(sym) data_keys = lib_tool.find_keys_for_id(KeyType.TABLE_DATA, sym) diff --git a/python/tests/unit/arcticdb/version_store/test_prune_previous.py b/python/tests/unit/arcticdb/version_store/test_prune_previous.py new file mode 100644 index 00000000000..7e656be983c --- /dev/null +++ b/python/tests/unit/arcticdb/version_store/test_prune_previous.py @@ -0,0 +1,308 @@ +""" +Copyright 2026 Man Group Operations Limited + +Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + +As of the Change Date specified in that file, in accordance with the Business Source License, use of this software +will be governed by the Apache License, version 2.0. +""" + +import pandas as pd +import pytest + +from arcticdb_ext.storage import KeyType +from arcticdb.util.test import assert_frame_equal, config_context + + +@pytest.fixture +def _with_protection_window(): + """Enable the 600-second protection window for a single test, then restore the prior value. + + The session-scoped conftest fixture sets PrunePreviousProtectionSecs=0 once at session start. + This fixture temporarily overrides it to 600 s for tests that exercise window behaviour and + restores the previous value in a finally block so subsequent tests are not affected even if + the test raises. + """ + with config_context("VersionStore.PrunePreviousProtectionSecs", 600): + yield + + +def test_prune_previous_preserves_recent_data_in_storage(version_store_factory, _with_protection_window): + """Writing with prune_previous_version=True must not physically delete data younger than the window. + + Regression test for a race condition: if two writers concurrently append to the same base version, + Writer 1's prune must not delete the base version's data segments before Writer 2 has finished + referencing them. Under the new policy the pre-existing version is tombstoned immediately, but + its data remains in storage until it has aged past the protection window. + """ + lib = version_store_factory() + sym = "test_sym" + + df0 = pd.DataFrame({"x": range(5)}) + df1 = pd.DataFrame({"x": range(5, 10)}) + + lib.write(sym, df0) + # V0 is freshly written (< 10 minutes old). Writing V1 with prune must tombstone V0 but + # keep its data in storage so concurrent writers based on V0 can still finish. + lib.write(sym, df1, prune_previous_version=True) + + # V0 is tombstoned — only V1 is visible. + live_versions = [v for v in lib.list_versions(sym) if not v["deleted"]] + assert len(live_versions) == 1, f"Expected only V1 to be visible (V0 tombstoned), got {live_versions}" + + # But V0's TABLE_DATA segments are still in storage (protected by the window). + lib_tool = lib.library_tool() + data_keys = lib_tool.find_keys(KeyType.TABLE_DATA) + assert len(data_keys) == 2, f"Expected V0 and V1 data segments to remain in storage, got {data_keys}" + + +def test_prune_previous_prunes_sole_preexisting_version(version_store_factory): + """With protection=0 a sole pre-existing version is still tombstoned (no anchor visibility).""" + # conftest sets PrunePreviousProtectionSecs=0 + lib = version_store_factory() + sym = "test_sym" + + df0 = pd.DataFrame({"x": [1, 2, 3]}) + df1 = pd.DataFrame({"x": [4, 5, 6]}) + + lib.write(sym, df0) + lib.write(sym, df1, prune_previous_version=True) + + live_versions = [v for v in lib.list_versions(sym) if not v["deleted"]] + assert len(live_versions) == 1, f"V0 must be tombstoned (no anchor visibility), got {live_versions}" + + +def test_prune_previous_prunes_when_old_enough(version_store_factory): + """With protection=0 every prune tombstones every pre-existing version. + + After N sequential prune writes, exactly 1 live version survives (the latest). + """ + # conftest already sets PrunePreviousProtectionSecs=0 + lib = version_store_factory() + sym = "test_sym" + + lib.write(sym, pd.DataFrame({"x": range(5)})) # V0 + lib.write(sym, pd.DataFrame({"x": range(5, 10)}), prune_previous_version=True) # V1, V0 tombstoned + lib.write(sym, pd.DataFrame({"x": range(10, 15)}), prune_previous_version=True) # V2, V1 tombstoned + lib.write(sym, pd.DataFrame({"x": range(15, 20)}), prune_previous_version=True) # V3, V2 tombstoned + + live_versions = [v for v in lib.list_versions(sym) if not v["deleted"]] + assert len(live_versions) == 1, f"Only the latest version should remain visible, got {live_versions}" + + +def test_prune_previous_data_remains_in_storage_during_window(version_store_factory): + """With a 600 s window, pre-existing data stays in storage even though the version is tombstoned. + + Then once the window is set back to 0 and another prune fires, the previously-held data is + cleaned up by the aging-tombstones sweep. + """ + lib = version_store_factory() + sym = "test_sym" + lib_tool = lib.library_tool() + + df0 = pd.DataFrame({"x": range(5)}) + df1 = pd.DataFrame({"x": range(5, 10)}) + df2 = pd.DataFrame({"x": range(10, 15)}) + + lib.write(sym, df0) + # With window=600 s, V0 is tombstoned but its data stays in storage. + with config_context("VersionStore.PrunePreviousProtectionSecs", 600): + lib.write(sym, df1, prune_previous_version=True) + + live_versions = [v for v in lib.list_versions(sym) if not v["deleted"]] + assert len(live_versions) == 1, f"Only V1 should be visible, got {live_versions}" + # Both V0 and V1 data segments are still in storage (window protects V0, V1 is live). + data_keys_during_window = lib_tool.find_keys(KeyType.TABLE_DATA) + assert ( + len(data_keys_during_window) == 2 + ), f"Expected V0 and V1 data segments during window, got {data_keys_during_window}" + + # Now set the window to 0 and prune again — the aging-tombstones sweep should pick up V0's + # tombstoned-but-not-deleted data. (V1 is the new anchor and stays in storage.) + lib.write(sym, df2, prune_previous_version=True) + + data_keys_after_sweep = lib_tool.find_keys(KeyType.TABLE_DATA) + # V1 (new anchor, tombstoned) + V2 (current latest). V0 should have been swept. + assert len(data_keys_after_sweep) == 2, ( + f"Expected V0 data segments to be swept; only V1 (anchor) and V2 (latest) should remain, " + f"got {data_keys_after_sweep}" + ) + + +def test_prune_previous_burst_within_window_all_cleaned_in_one_sweep(version_store_factory): + """A burst of versions written within the window must all be reclaimed in a single later sweep. + + Regression test for the bounded aging sweep: writing N versions inside the protection window + retains all of them (each is tombstoned but kept). When they later age out of the window + *together*, the next prune must collect the whole still-present block in one sweep — not just + the newest aged version, which would leak the rest (and, because the bounded walk stops at the + first already-cleaned-up key, leak them permanently). The sweep only stops once it reaches a + key that has actually been physically deleted, so an un-deleted burst is fully cleaned. + """ + lib = version_store_factory() + sym = "test_sym" + lib_tool = lib.library_tool() + + # Distinct data per version so there is no dedup/append sharing: each write owns one segment. + dfs = [pd.DataFrame({"x": range(i * 5, i * 5 + 5)}) for i in range(6)] + + # Burst of 5 prune-writes inside the window -> V0..V3 tombstoned-but-kept, V4 live. + with config_context("VersionStore.PrunePreviousProtectionSecs", 600): + for i in range(5): + lib.write(sym, dfs[i], prune_previous_version=(i > 0)) + + assert ( + len(lib_tool.find_keys(KeyType.TABLE_DATA)) == 5 + ), "All 5 versions' data should be retained while within the window" + + # The burst has now aged out (session default window=0). A single prune must reclaim the whole + # aged block, leaving only the new anchor (V4) and the latest (V5). + lib.write(sym, dfs[5], prune_previous_version=True) + data_keys = lib_tool.find_keys(KeyType.TABLE_DATA) + assert len(data_keys) == 2, ( + f"The whole aged-out burst must be reclaimed in one sweep (anchor V4 + latest V5 remain); " f"got {data_keys}" + ) + + # Steady state: a subsequent prune stays bounded and does not re-leak (V5 anchor + V6 latest). + lib.write(sym, dfs[0], prune_previous_version=True) + assert len(lib_tool.find_keys(KeyType.TABLE_DATA)) == 2, "Subsequent prunes must stay bounded at anchor + latest" + + +def test_prune_previous_anchor_protects_long_stable_head(version_store_factory, _with_protection_window): + """The anchor rule keeps the newest pre-existing version in storage even if it is older than the window. + + Scenario: V0 was written long ago and is the head. A concurrent writer is mid-append on V0 + when another writer prunes. V0 is older than protection_secs (stale head), but the anchor + rule retains V0's data in storage so the concurrent writer can still complete. + """ + lib = version_store_factory() + sym = "test_sym" + lib_tool = lib.library_tool() + + df0 = pd.DataFrame({"x": range(5)}) + df1 = pd.DataFrame({"x": range(5, 10)}) + + lib.write(sym, df0) + # Force V0 to look old to the prune path: set a 0-second window for this single call so + # the time guard would otherwise cause physical deletion; only the anchor saves it. + with config_context("VersionStore.PrunePreviousProtectionSecs", 0): + lib.write(sym, df1, prune_previous_version=True) + + # V0 is tombstoned but its data is retained as the anchor (it is the max version_id in + # pruned_indexes at the moment of the prune call). + live_versions = [v for v in lib.list_versions(sym) if not v["deleted"]] + assert len(live_versions) == 1, f"Only V1 should be visible, got {live_versions}" + data_keys = lib_tool.find_keys(KeyType.TABLE_DATA) + assert len(data_keys) == 2, f"Anchor rule must keep V0's data even with the time window at 0; got {data_keys}" + + +def test_prune_previous_anchor_append_chain_keeps_inherited_data(version_store_factory): + """The anchor's data must be preserved when its inherited segments are in aging_tombstones. + + Chain scenario: + V0 = write(df0) -> data {A} + V1 = append(df1, prune_previous_version=True) -> V0 anchor-kept; V1 refs {A, B} + V2 = write(df_unrelated, prune_previous_version=True) -> V1 anchor-kept; V0 in aging. + + With PrunePreviousProtectionSecs=0 (session fixture default), V0 is outside the window and + is not the anchor of V2's prune, so it is eligible for sweep. But V1's TABLE_INDEX (the new + anchor, retained in storage) references V0's data {A} via append-inheritance. The sweep + must not delete {A}, or V1's index dangles. + """ + lib = version_store_factory() + sym = "test_sym" + lib_tool = lib.library_tool() + + df0 = pd.DataFrame({"x": range(5)}, index=pd.date_range("2000-01-01", periods=5)) + df1 = pd.DataFrame({"x": range(5, 10)}, index=pd.date_range("2000-01-06", periods=5)) + # Different column name and date range so there is no append-inheritance or dedup overlap + # with V0 or V1: V2's recurse-set does not cover V0's data. + df_unrelated = pd.DataFrame({"y": [1, 2]}, index=pd.date_range("2050-01-01", periods=2)) + + lib.write(sym, df0) + lib.append(sym, df1, prune_previous_version=True) + lib.write(sym, df_unrelated, prune_previous_version=True) + + v1_index_keys = [k for k in lib_tool.find_keys_for_id(KeyType.TABLE_INDEX, sym) if k.version_id == 1] + assert len(v1_index_keys) == 1, f"V1's index key must be retained as anchor, got {v1_index_keys}" + + v1_referenced_data = [k for k in lib_tool.read_to_keys(v1_index_keys[0]) if k.type == KeyType.TABLE_DATA] + data_in_storage = lib_tool.find_keys_for_id(KeyType.TABLE_DATA, sym) + missing = [k for k in v1_referenced_data if k not in data_in_storage] + assert not missing, ( + f"V1 (anchor) references data keys that were deleted by V2's aging sweep; V1 is now " + f"dangling. Missing keys: {missing}. Present in storage: {data_in_storage}." + ) + + +def test_prune_previous_anchor_dedup_chain_keeps_shared_data(version_store_factory): + """Same hazard as the append chain, but via dedup instead of append-inheritance. + + V0 = write(df_a) -> data {A} + V1 = write(df_a, prune=True) [dedups vs V0] -> V0 anchor; V1 refs {A} + V2 = write(df_b, prune=True) [no dedup match] -> V1 anchor; V0 in aging. + + V1 deduped against V0, so V1's TABLE_INDEX references V0's data atom keys. When V0 ages + out, its data must be preserved because V1 still holds references. + """ + lib = version_store_factory(de_duplication=True) + sym = "test_sym" + lib_tool = lib.library_tool() + + df_a = pd.DataFrame({"x": range(20)}, index=pd.date_range("2000-01-01", periods=20)) + df_b = pd.DataFrame({"y": [1, 2]}, index=pd.date_range("2050-01-01", periods=2)) + + lib.write(sym, df_a) + lib.write(sym, df_a, prune_previous_version=True) # full dedup against V0 + lib.write(sym, df_b, prune_previous_version=True) # no dedup with V1; V0 ages out + + v1_index_keys = [k for k in lib_tool.find_keys_for_id(KeyType.TABLE_INDEX, sym) if k.version_id == 1] + assert len(v1_index_keys) == 1, f"V1's index key must be retained as anchor, got {v1_index_keys}" + + v1_referenced_data = [k for k in lib_tool.read_to_keys(v1_index_keys[0]) if k.type == KeyType.TABLE_DATA] + data_in_storage = lib_tool.find_keys_for_id(KeyType.TABLE_DATA, sym) + missing = [k for k in v1_referenced_data if k not in data_in_storage] + assert not missing, ( + f"V1 (anchor, dedup'd against V0) references data keys that were deleted when V0 was " + f"swept. Missing: {missing}. Present in storage: {data_in_storage}." + ) + + +def test_delete_all_reclaims_prune_retained_version(version_store_factory): + """delete_all_versions must reclaim a version that prune retained (anchor) but did not delete. + + With protection=0 (session default) V0 is tombstoned by V1's prune but kept in storage as the + anchor. Deleting the whole symbol must clean up V0's data too — otherwise it leaks forever. + """ + lib = version_store_factory() + sym = "test_sym" + lib_tool = lib.library_tool() + + lib.write(sym, pd.DataFrame({"x": range(5)})) # V0 + lib.write(sym, pd.DataFrame({"x": range(5, 10)}), prune_previous_version=True) # V1; V0 retained anchor + assert len(lib_tool.find_keys(KeyType.TABLE_DATA)) == 2, "V0 (anchor) + V1 data should be present" + + lib.delete(sym) + assert ( + len(lib_tool.find_keys(KeyType.TABLE_DATA)) == 0 + ), "delete_all must reclaim the prune-retained anchor's data, not leak it" + + +def test_delete_all_after_prune_preserves_snapshotted_version(version_store_factory): + """delete_all_versions must NOT delete a prune-retained version's data if a snapshot pins it. + + Safety counterpart to the reclaim test: when delete_all collects the retained-but-tombstoned + keys, the snapshot pre-delete check must still protect any that a snapshot references. + """ + lib = version_store_factory() + sym = "test_sym" + df0 = pd.DataFrame({"x": range(5)}) + + lib.write(sym, df0) # V0 + lib.snapshot("snap") # snapshot pins V0 + lib.write(sym, pd.DataFrame({"x": range(5, 10)}), prune_previous_version=True) # V1; V0 retained + snapshotted + + lib.delete(sym) + + # V0's data must survive (the snapshot still references it), so the snapshot read works. + assert_frame_equal(lib.read(sym, as_of="snap").data, df0) diff --git a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py index ffe55218090..8c008f45981 100644 --- a/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py +++ b/python/tests/unit/arcticdb/version_store/test_recursive_normalizers.py @@ -895,13 +895,16 @@ def test_data_layout(lmdb_version_store_v1, all_recursive_metastructure_versions with pytest.raises(NoSuchVersionException): lib.read("sym__k") - # Check that keys are cleaned up when we prune + # Check that keys are cleaned up when we prune. + # v1 is the anchor of this prune (newest pre-existing) so its keys stay in storage; + # v0 is tombstoned and its keys are physically deleted. lib.write("sym", data, recursive_normalizers=True, prune_previous_version=True) assert len(lt.find_keys(KeyType.VERSION_REF)) == 1 + # v0 write, v1 write, and the v2 prune-write: the prune folds mark+sweep into one journal entry. assert len(lt.find_keys(KeyType.VERSION)) == 3 - assert len(lt.find_keys(KeyType.MULTI_KEY)) == 1 - assert len(lt.find_keys(KeyType.TABLE_INDEX)) == 3 - assert len(lt.find_keys(KeyType.TABLE_DATA)) == 3 + assert len(lt.find_keys(KeyType.MULTI_KEY)) == 2 # v1 (anchor) + v2 + assert len(lt.find_keys(KeyType.TABLE_INDEX)) == 6 # 3 each for v1 (anchor) and v2 + assert len(lt.find_keys(KeyType.TABLE_DATA)) == 6 # 3 each for v1 (anchor) and v2 # Check that keys are cleaned up when we delete lib.delete("sym") diff --git a/python/tests/unit/arcticdb/version_store/test_stage.py b/python/tests/unit/arcticdb/version_store/test_stage.py index 4e046e274f5..00df8b802ad 100644 --- a/python/tests/unit/arcticdb/version_store/test_stage.py +++ b/python/tests/unit/arcticdb/version_store/test_stage.py @@ -457,8 +457,7 @@ def test_finalize_with_tokens_and_prune_previous( with pytest.raises(NoSuchVersionException): lib.read(sym, as_of=0) else: - res = lib.read(sym, as_of=0) - assert_frame_equal(res.data, df_1) + assert_frame_equal(lib.read(sym, as_of=0).data, df_1) @pytest.mark.parametrize("validate_index", (True, False))