Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 98 additions & 30 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,76 @@ std::vector<std::variant<ResultValueType, DataError>> transform_batch_items_or_t
}

folly::Future<folly::Unit> LocalVersionedEngine::delete_unreferenced_pruned_indexes(
std::vector<AtomKey>&& pruned_indexes, const AtomKey& key_to_keep
const std::shared_ptr<VersionMapEntry>& entry, VersionId anchor_version
) {
try {
if (!pruned_indexes.empty() && !cfg().write_options().delayed_deletes()) {
// TODO: the following function will load all snapshots, which will be horrifyingly inefficient when called
// multiple times from batch_*
auto [not_in_snaps, in_snaps] = get_index_keys_partitioned_by_inclusion_in_snapshots(
store(), pruned_indexes.begin()->id(), std::move(pruned_indexes)
);
in_snaps.insert(key_to_keep);
PreDeleteChecks checks{false, false, false, false, std::move(in_snaps)};
return delete_trees_responsibly(store(), version_map(), not_in_snaps, {}, {}, checks)
.thenValueInline([](auto&&) { return folly::Unit{}; }) // drop the DeleteTreesStats
.thenError(folly::tag_t<std::exception>{}, [](auto const& ex) {
log::version().warn("Failed to clean up pruned previous versions due to: {}", ex.what());
});
// Phase 2 of prune (sweep). write_and_prune_previous has individually tombstoned the pruned
// versions, leaving them present-but-invisible above the TOMBSTONE_ALL line, and handed us the
// loaded entry (so no extra storage round-trip is needed). Collect that above-line tombstoned
// set, then:
// 1. Retain the anchor (the just-superseded previous head, passed in as anchor_version so a
// concurrent appender on it does not lose its base) and any version still inside the
// protection window. The anchor is the caller's previous head rather than max(tombstoned)
// because an earlier delete_version of the head can leave a higher tombstoned version
// that was never anyone's append base.
// 2. Physically delete the rest, except snapshotted keys; protect the retained keys'
// shared data (append-inheritance / dedup) via could_share_data.
// 3. Advance the TOMBSTONE_ALL line past the now-removed contiguous bottom block so a later
// prune does not revisit it — this is what keeps per-prune work bounded.
// Under delayed_deletes the physical delete is skipped (the background tool reclaims via its
// own reference check), but the line is still advanced so the version chain stays bounded.
const StreamId stream_id = entry->head_->id();
auto tombstoned = entry->get_tombstoned_indexes();
if (tombstoned.empty())
return folly::Unit();

const auto protection_secs = ConfigsMap::instance()->get_int("VersionStore.PrunePreviousProtectionSecs", 600);
const timestamp cutoff =
store()->current_timestamp() - static_cast<timestamp>(protection_secs) * 1'000'000'000LL;

std::vector<AtomKey> to_delete;
std::vector<AtomKey> kept;
for (auto& k : tombstoned) {
if (k.version_id() == anchor_version || (protection_secs > 0 && k.creation_ts() >= cutoff))
kept.emplace_back(std::move(k));
else
to_delete.emplace_back(std::move(k));
}
// The anchor should always be present (the caller just tombstoned it), but guard against a
// racing reload that lost it: without a retained key we cannot place the line safely.
if (to_delete.empty() || kept.empty())
return folly::Unit();

const VersionId line = std::min_element(
kept.begin(),
kept.end(),
[](const auto& a, const auto& b) { return a.version_id() < b.version_id(); }
)->version_id() -
1;

if (cfg().write_options().delayed_deletes()) {
version_map()->advance_tombstone_all(store(), entry, line);
return folly::Unit();
}

// TODO: the following function will load all snapshots, which will be horrifyingly inefficient when called
// multiple times from batch_*
auto [not_in_snaps, in_snaps] =
get_index_keys_partitioned_by_inclusion_in_snapshots(store(), stream_id, std::move(to_delete));
for (auto& k : kept)
in_snaps.insert(std::move(k));
PreDeleteChecks checks{false, false, false, false, std::move(in_snaps)};
// Return the chain rather than blocking with .get(): this runs inside a threadpool task, and
// waiting on threadpool work from within it can deadlock. Only advance the line once the
// delete succeeds — on failure the keys stay above the line and a later prune retries them.
return delete_trees_responsibly(store(), version_map(), not_in_snaps, {}, {}, checks)
.thenValue([this, entry, line](auto&&) {
version_map()->advance_tombstone_all(store(), entry, line);
return folly::Unit{};
})
.thenError(folly::tag_t<std::exception>{}, [](auto const& ex) {
log::version().warn("Failed to clean up pruned previous versions due to: {}", ex.what());
});
} catch (const std::exception& ex) {
// Best-effort so deliberately swallow
log::version().warn("Failed to clean up pruned previous versions due to: {}", ex.what());
Expand Down Expand Up @@ -341,15 +394,27 @@ std::optional<VersionedItem> LocalVersionedEngine::get_specific_version(
std::optional<VersionedItem> LocalVersionedEngine::get_version_at_time(
const StreamId& stream_id, timestamp as_of, const VersionQuery& version_query
) {

auto index_key = load_index_key_from_time(store(), version_map(), stream_id, as_of);
if (!index_key && std::get<TimestampVersionQuery>(version_query.content_).iterate_snapshots_if_tombstoned) {
auto index_keys = get_index_keys_in_snapshots(store(), stream_id);
auto vector_index_keys = std::vector<AtomKey>(index_keys.begin(), index_keys.end());
std::sort(std::begin(vector_index_keys), std::end(vector_index_keys), [](auto& k1, auto& k2) {
return k1.creation_ts() > k2.creation_ts();
});
index_key = get_index_key_from_time(as_of, vector_index_keys);
LoadStrategy load_strategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, as_of};
auto entry = version_map()->check_reload(store(), stream_id, load_strategy, __FUNCTION__);

auto live_key = get_index_key_from_time(as_of, entry->get_indexes(false));
auto index_key = live_key;

if (std::get<TimestampVersionQuery>(version_query.content_).iterate_snapshots_if_tombstoned &&
(!entry->tombstones_.empty() || entry->tombstone_all_.has_value())) {
// A deleted version may be more recent than the live one at this timestamp.
// get_indexes(true) reuses the already-loaded entry — no extra I/O.
auto best_key = get_index_key_from_time(as_of, entry->get_indexes(true));
if (!live_key || (best_key && best_key->version_id() > live_key->version_id())) {
auto index_keys = get_index_keys_in_snapshots(store(), stream_id);
auto vector_index_keys = std::vector<AtomKey>(index_keys.begin(), index_keys.end());
std::sort(std::begin(vector_index_keys), std::end(vector_index_keys), [](auto& k1, auto& k2) {
return k1.creation_ts() > k2.creation_ts();
});
auto snap_key = get_index_key_from_time(as_of, vector_index_keys);
if (snap_key && (!live_key || snap_key->version_id() > live_key->version_id()))
index_key = snap_key;
}
}

if (!index_key) {
Expand Down Expand Up @@ -1654,8 +1719,9 @@ void LocalVersionedEngine::write_version_and_prune_previous(
bool prune_previous_versions, const AtomKey& new_version, const std::optional<IndexTypeKey>& previous_key
) {
if (prune_previous_versions) {
auto pruned_indexes = version_map()->write_and_prune_previous(store(), new_version, previous_key);
delete_unreferenced_pruned_indexes(std::move(pruned_indexes), new_version).get();
auto entry = version_map()->write_and_prune_previous(store(), new_version);
if (previous_key)
delete_unreferenced_pruned_indexes(entry, previous_key->version_id()).get();
} else {
version_map()->write_version(store(), new_version, previous_key);
}
Expand All @@ -1669,15 +1735,17 @@ folly::Future<VersionedItem> LocalVersionedEngine::write_index_key_to_version_ma
folly::Future<folly::Unit> write_version_fut;

if (prune_previous_versions) {
const auto anchor_version = stream_update_info.previous_index_key_
? std::make_optional(stream_update_info.previous_index_key_->version_id())
: std::nullopt;
write_version_fut =
async::submit_io_task(
WriteAndPrunePreviousTask{
store(), version_map, index_key, std::move(stream_update_info.previous_index_key_)
}
WriteAndPrunePreviousTask{store(), version_map, index_key}
)
.via(&async::cpu_executor())
.thenValue([this, index_key](auto&& atom_key_vec) {
return delete_unreferenced_pruned_indexes(std::move(atom_key_vec), index_key);
.thenValue([this, anchor_version](std::shared_ptr<VersionMapEntry>&& entry) {
return anchor_version ? delete_unreferenced_pruned_indexes(entry, *anchor_version)
: folly::makeFuture(folly::Unit{});
});
} else {
write_version_fut = async::submit_io_task(
Expand Down
14 changes: 10 additions & 4 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,13 +438,19 @@ class LocalVersionedEngine : public VersionedEngine {
) override;

/**
* Take tombstoned indexes that have been pruned in the version map and perform the actual deletion
* for indexes that are safe to delete (eg indexes contained in a snapshot are skipped).
* Phase 2 of prune (sweep). write_and_prune_previous has individually tombstoned the pruned
* versions above the TOMBSTONE_ALL line. This collects that above-line tombstoned set, retains
* the anchor + within-window versions, physically deletes the rest (unless delayed_deletes or
* snapshotted), and advances the TOMBSTONE_ALL line past the removed block to keep the chain
* bounded. Best-effort; failures are logged and swallowed.
*
* @param pruned_indexes Must all share the same id() and should be tombstoned.
* @param entry The loaded+mutated version-map entry from the mark phase, reused here to avoid a
* second chain read.
* @param anchor_version The just-superseded previous head (the caller's previous_key). Always
* retained regardless of age so a concurrent appender on it keeps its base.
*/
folly::Future<folly::Unit> delete_unreferenced_pruned_indexes(
std::vector<AtomKey>&& pruned_indexes, const AtomKey& key_to_keep
const std::shared_ptr<VersionMapEntry>& entry, VersionId anchor_version
);

std::shared_ptr<Store>& store() override { return store_; }
Expand Down
52 changes: 47 additions & 5 deletions cpp/arcticdb/version/test/test_version_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -918,9 +918,12 @@ TEST(VersionMap, FollowingVersionChainWithWriteAndPrunePrevious) {
version_map->do_write(store, key, entry);
write_symbol_ref(store, key, std::nullopt, entry->head_.value());

// write a new version and prune the previous versions
// Phase 1 (mark): individually tombstone v0, v1, v2; only v3 stays visible. No TOMBSTONE_ALL yet.
auto key2 = atom_key_with_version(id, 3, 3);
version_map->write_and_prune_previous(store, key2, key);
auto pruned_entry = version_map->write_and_prune_previous(store, key2);
// Phase 2 (sweep): the engine would delete the aged-out block and advance the line. Simulate it
// retaining v2 as the anchor and advancing the line to v1.
version_map->advance_tombstone_all(store, pruned_entry, VersionId{1});

auto ref_entry = VersionMapEntry{};
read_symbol_ref(store, id, ref_entry);
Expand All @@ -942,9 +945,10 @@ TEST(VersionMap, FollowingVersionChainWithWriteAndPrunePrevious) {
}) {
follow_result->clear();
version_map->follow_version_chain(store, ref_entry, follow_result, load_strategy);
// When loading with any of the specified load strategies with include_deleted=false we should end following the
// version chain early at version 2 because that's when we encounter the TOMBSTONE_ALL.
ASSERT_EQ(follow_result->load_progress_.oldest_loaded_index_version_, VersionId{2});
// With include_deleted=false the load stops at the TOMBSTONE_ALL high-water mark. The anchor
// v2 is individually tombstoned above the line at v1, so the load traverses v3, v2, then
// stops at v1 (the line).
ASSERT_EQ(follow_result->load_progress_.oldest_loaded_index_version_, VersionId{1});
}

for (auto load_strategy :
Expand All @@ -959,6 +963,44 @@ TEST(VersionMap, FollowingVersionChainWithWriteAndPrunePrevious) {
}
}

TEST(VersionMap, PrunePreviousTombstonesAllPreExisting) {
// Phase 1 of prune (mark): write_and_prune_previous individually tombstones every pre-existing
// version (so readers see only the latest) but does NOT physically delete or write a
// TOMBSTONE_ALL — that is the engine's sweep phase. This verifies the version-map guarantee:
// V0 is tombstoned (no longer visible) yet still present and reachable via INCLUDE_DELETED.
PilotedClock::reset();
auto store = std::make_shared<InMemoryStore>();
auto version_map = std::make_shared<VersionMap>();
StreamId id{"test_concurrent_prune"};

auto v0_key = atom_key_with_version(id, 0, PilotedClock::nanos_since_epoch());
auto entry = version_map->check_reload(
store, id, LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}, __FUNCTION__
);
version_map->do_write(store, v0_key, entry);
write_symbol_ref(store, v0_key, std::nullopt, entry->head_.value());

auto v1_key = atom_key_with_version(id, 1, PilotedClock::nanos_since_epoch());
version_map->write_and_prune_previous(store, v1_key);

// V0 is no longer visible as a live version...
auto live = get_all_versions(store, version_map, id);
auto v0_alive = std::any_of(live.begin(), live.end(), [](const AtomKey& k) { return k.version_id() == 0; });
EXPECT_FALSE(v0_alive) << "V0 should be tombstoned in the chain after write_and_prune_previous";

// ...but it is still present (individually tombstoned, not buried under a TOMBSTONE_ALL), so an
// INCLUDE_DELETED load still reaches it and the engine sweep can later reclaim it.
auto reloaded = version_map->check_reload(
store, id, LoadStrategy{LoadType::ALL, LoadObjective::INCLUDE_DELETED}, __FUNCTION__
);
EXPECT_TRUE(reloaded->is_tombstoned(VersionId{0})) << "V0 must be individually tombstoned by the mark phase";
EXPECT_FALSE(reloaded->tombstone_all_.has_value()) << "the mark phase must not write a TOMBSTONE_ALL";
auto tombstoned = reloaded->get_tombstoned_indexes();
auto v0_present =
std::any_of(tombstoned.begin(), tombstoned.end(), [](const AtomKey& k) { return k.version_id() == 0; });
EXPECT_TRUE(v0_present) << "V0 must still be present for the engine sweep to find and delete";
}

TEST(VersionMap, HasCachedEntry) {
ScopedConfig sc("VersionMap.ReloadInterval", std::numeric_limits<int64_t>::max());
// Set up the version chain v0 <- v1(tombstone_all) <- v2 <- v3(tombstoned)
Expand Down
15 changes: 11 additions & 4 deletions cpp/arcticdb/version/test/test_version_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,19 @@ TEST(PythonVersionStore, WriteWithPruneVersions) {
using namespace arcticdb::stream;
using namespace arcticdb::pipelines;

// Disable the protection window so pruning is not blocked by PilotedClock's tiny timestamps.
ScopedConfig no_protection("VersionStore.PrunePreviousProtectionSecs", 0);

auto [version_store, mock_store] = python_version_store_in_memory();

write_version_frame({"test_versioned_engine_delete"}, 0, version_store, 30, true);
write_version_frame({"test_versioned_engine_delete"}, 1, version_store, 30, true, 0, std::nullopt, true);
// Should have pruned the previous version and have just one version
ASSERT_EQ(mock_store->num_atom_keys_of_type(KeyType::TABLE_INDEX), 1);
auto k0 = write_version_frame({"test_versioned_engine_delete"}, 0, version_store, 30, true);
auto k1 = write_version_frame({"test_versioned_engine_delete"}, 1, version_store, 30, true, 0, k0, true);
// After two writes: v0 is the just-superseded head, so it is retained as the anchor — nothing pruned yet.
ASSERT_EQ(mock_store->num_atom_keys_of_type(KeyType::TABLE_INDEX), 2);

write_version_frame({"test_versioned_engine_delete"}, 2, version_store, 30, true, 0, k1, true);
// After the third write: v1 is now the anchor (kept), v0 has aged out and is pruned, v2 is latest — 2 survive.
ASSERT_EQ(mock_store->num_atom_keys_of_type(KeyType::TABLE_INDEX), 2);
}

TEST(PythonVersionStore, DeleteAllVersions) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/test/version_map_model.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct MapStorePair {
auto version_id = prev ? prev->version_id() + 1 : 0;

if (tombstones_)
map_->write_and_prune_previous(store_, make_test_index_key(id, version_id, KeyType::TABLE_INDEX), prev);
map_->write_and_prune_previous(store_, make_test_index_key(id, version_id, KeyType::TABLE_INDEX));
else
backwards_compat_write_and_prune_previous(
store_, map_, make_test_index_key(id, version_id, KeyType::TABLE_INDEX)
Expand Down
15 changes: 15 additions & 0 deletions cpp/arcticdb/version/version_functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,21 @@ inline folly::Future<version_store::TombstoneVersionResult> finalize_tombstone_a
version_store::TombstoneVersionResult res{true, entry->head_->id()};
res.keys_to_delete = std::move(tombstone_result.second);

// Also delete any versions a prior prune tombstoned but retained in storage (the anchor /
// within-window keys sitting individually-tombstoned above the TOMBSTONE_ALL line). They are
// already loaded in this UNDELETED_ONLY entry, so no extra storage round-trip is needed. Without
// this they would leak when the whole symbol is deleted. Filter on the *individual* tombstone:
// versions buried by a TOMBSTONE_ALL (a prior delete_all) are already physically deleted, so
// re-adding them would just issue wasted delete ops.
for (const auto& key : entry->keys_) {
if (is_index_key_type(key.type()) && entry->has_individual_tombstone(key.version_id()) &&
std::none_of(res.keys_to_delete.begin(), res.keys_to_delete.end(), [&](const AtomKey& k) {
return k.version_id() == key.version_id();
})) {
res.keys_to_delete.push_back(key);
}
}

res.no_undeleted_left = true;
res.latest_version_ = tombstone_result.first;
return res;
Expand Down
Loading
Loading