From d252921ba9c44c7fe3b907c3ec3a024650e37560 Mon Sep 17 00:00:00 2001 From: Georgi Petrov Date: Wed, 25 Mar 2026 12:07:47 +0200 Subject: [PATCH 1/8] Optimise SL memory load --- cpp/arcticdb/CMakeLists.txt | 3 +- cpp/arcticdb/version/symbol_list.cpp | 390 +++++++++--------- cpp/arcticdb/version/symbol_list.hpp | 13 +- .../version/test/benchmark_symbol_list.cpp | 306 ++++++++++++++ .../version/test/test_symbol_list.cpp | 43 ++ python/benchmarks/list_symbols.py | 114 +++++ 6 files changed, 670 insertions(+), 199 deletions(-) create mode 100644 cpp/arcticdb/version/test/benchmark_symbol_list.cpp diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 86b842c9bb3..5e68462641f 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -1237,7 +1237,8 @@ if(${TEST}) processing/test/benchmark_common.cpp processing/test/benchmark_ternary.cpp util/test/benchmark_bitset.cpp - version/test/benchmark_write.cpp) + version/test/benchmark_write.cpp + version/test/benchmark_symbol_list.cpp) add_executable(benchmarks ${benchmark_srcs}) diff --git a/cpp/arcticdb/version/symbol_list.cpp b/cpp/arcticdb/version/symbol_list.cpp index 438cb47022a..115e3285f30 100644 --- a/cpp/arcticdb/version/symbol_list.cpp +++ b/cpp/arcticdb/version/symbol_list.cpp @@ -91,59 +91,6 @@ std::vector load_previous_from_version_keys( return symbols; } -std::vector get_all_symbol_list_keys( - const std::shared_ptr& store, SymbolListData& data, WillAttemptCompaction will_attempt_compaction -) { - std::vector output; - uint64_t uncompacted_keys_found = 0; - store->iterate_type( - KeyType::SYMBOL_LIST, - [&data, &output, &uncompacted_keys_found, will_attempt_compaction](auto&& key) { - auto atom_key = to_atom(std::forward(key)); - if (atom_key.id() != compaction_id) { - uncompacted_keys_found++; - } - if (uncompacted_keys_found == warning_threshold() && !data.warned_expected_slowdown_) { - log::symbol().warn( - "`list_symbols` may take longer than expected as there have been many modifications " - "since `list_symbols` was last called. \n\n" - "See here for more information: " - "https://docs.arcticdb.io/latest/technical/on_disk_storage/#symbol-list-caching\n\n" - "To resolve, run `list_symbols` through to completion frequently. " - "Note: write access to storage is required for compaction. " - "{}.\n" - "Note: This warning will only appear once.\n", - will_attempt_compaction - ); - - data.warned_expected_slowdown_ = true; - } - - output.push_back(atom_key); - } - ); - - std::sort(output.begin(), output.end(), [](const AtomKey& left, const AtomKey& right) { - // Some very old symbol list keys have a non-zero version number, but with different semantics to the new style, - // so ignore it. See arcticdb-man#116. Most old style symbol list keys have version ID 0 anyway. - auto left_version = is_new_style_key(left) ? left.version_id() : 0; - auto right_version = is_new_style_key(right) ? right.version_id() : 0; - return std::tie(left.start_index(), left_version, left.creation_ts()) < - std::tie(right.start_index(), right_version, right.creation_ts()); - }); - return output; -} - -MaybeCompaction last_compaction(const std::vector& keys) { - auto pos = std::find_if(keys.rbegin(), keys.rend(), [](const auto& key) { return key.id() == compaction_id; }); - - if (pos == keys.rend()) { - return std::nullopt; - } else { - return {(pos + 1).base()}; // reverse_iterator -> forward itr has an offset of 1 per docs - } -} - // The below string_at and scalar_at functions should be used for symbol list cache segments instead of the ones // provided in SegmentInMemory, because the symbol list structure is the only place where columns can have more entries // than the segment has rows. Hence, we need to bypass the checks inside SegmentInMemory's function and directly call @@ -186,71 +133,39 @@ DataType get_symbol_data_type(const SegmentInMemory& seg) { return data_type; } -std::vector read_old_style_list_from_storage(const SegmentInMemory& seg) { - std::vector output; - if (seg.empty()) - return output; - - const auto data_type = get_symbol_data_type(seg); - - for (auto row : seg) - output.emplace_back(stream_id_from_segment(data_type, seg, row.row_id_, 0), 0, 0, ActionType::ADD); - - return output; -} - -std::vector read_new_style_list_from_storage(const SegmentInMemory& seg) { - std::vector output; - if (seg.empty()) - return output; +/// Iterates a compacted segment's entries (additions and deletions), calling the visitor for each. +/// Visitor signature: void(StreamId&&, VersionId, timestamp, ActionType) +template +void for_each_segment_entry(const SegmentInMemory& seg, Visitor&& visitor) { + if (seg.row_count() == 0 || seg.descriptor().field_count() == 0) + return; const auto data_type = get_symbol_data_type(seg); - // Because we need to be backwards compatible with the old style symbol list, the additions and deletions are - // in separate columns. The first three columns are the symbol, version and timestamp for the additions, and the - // next three are the same for the deletions. Old-style symbol lists will ignore everything but the first column - // which will mean that they can't do any conflict resolution but will get the correct data. - util::check( - seg.column(0).row_count() == seg.column(1).row_count() && - seg.column(0).row_count() == seg.column(2).row_count(), - "Column mismatch in symbol segment additions: {} {} {}", - seg.column(0).row_count(), - seg.column(1).row_count(), - seg.column(2).row_count() - ); + if (seg.descriptor().field_count() == 1) { + // Old-style: single column, all ADD + for (auto row : seg) + visitor(stream_id_from_segment(data_type, seg, row.row_id_, 0), VersionId{0}, timestamp{0}, ActionType::ADD + ); + return; + } + // New-style: columns 0-2 are additions, columns 3-5 are deletions for (auto i = 0L; i < seg.column(0).row_count(); ++i) { - auto stream_id = stream_id_from_segment(data_type, seg, i, 0); - auto reference_id = VersionId{scalar_at(seg, i, 1)}; - auto reference_time = timestamp{scalar_at(seg, i, 2)}; - ARCTICDB_RUNTIME_DEBUG( - log::symbol(), "Reading added symbol {}: {}@{}", stream_id, reference_id, reference_time - ); - output.emplace_back(stream_id, reference_id, reference_time, ActionType::ADD); + visitor(stream_id_from_segment(data_type, seg, i, 0), + VersionId{scalar_at(seg, i, 1)}, + timestamp{scalar_at(seg, i, 2)}, + ActionType::ADD); } if (seg.descriptor().field_count() == 6) { - util::check( - seg.column(3).row_count() == seg.column(4).row_count() && - seg.column(3).row_count() == seg.column(5).row_count(), - "Column mismatch in symbol segment deletions: {} {} {}", - seg.column(3).row_count(), - seg.column(4).row_count(), - seg.column(5).row_count() - ); - for (auto i = 0L; i < seg.column(3).row_count(); ++i) { - auto stream_id = stream_id_from_segment(data_type, seg, i, 3); - auto reference_id = VersionId{scalar_at(seg, i, 4)}; - auto reference_time = timestamp{scalar_at(seg, i, 5)}; - ARCTICDB_RUNTIME_DEBUG( - log::symbol(), "Reading deleted symbol {}: {}@{}", stream_id, reference_id, reference_time - ); - output.emplace_back(stream_id, reference_id, reference_time, ActionType::DELETE); + visitor(stream_id_from_segment(data_type, seg, i, 3), + VersionId{scalar_at(seg, i, 4)}, + timestamp{scalar_at(seg, i, 5)}, + ActionType::DELETE); } } - - return output; } std::vector read_from_storage(const std::shared_ptr& store, const AtomKey& key) { @@ -263,26 +178,111 @@ std::vector read_from_storage(const std::shared_ptr 0, "Expected at least one column in symbol list with key {}", key ); - if (seg.descriptor().field_count() == 1) - return read_old_style_list_from_storage(seg); - else - return read_new_style_list_from_storage(seg); + std::vector output; + auto total = seg.column(0).row_count(); + if (seg.descriptor().field_count() >= 6) + total += seg.column(3).row_count(); + output.reserve(total); + + for_each_segment_entry( + seg, + [&](StreamId&& stream_id, VersionId reference_id, timestamp reference_time, ActionType action) { + ARCTICDB_RUNTIME_DEBUG( + log::symbol(), "Reading {} symbol {}: {}@{}", action, stream_id, reference_id, reference_time + ); + output.emplace_back(std::move(stream_id), reference_id, reference_time, action); + } + ); + + return output; } -MapType load_journal_keys(const std::vector& keys) { - MapType map; - for (const auto& key : keys) { - const auto& action_id = key.id(); - if (action_id == compaction_id) - continue; +struct StreamingJournalResult { + std::optional compaction_key; + MapType update_map; + size_t total_key_count = 0; + std::vector all_keys; // collected for deletion after compaction +}; + +StreamingJournalResult load_journal_streaming( + const std::shared_ptr& store, SymbolListData& data, + WillAttemptCompaction will_attempt_compaction, bool collect_keys +) { + StreamingJournalResult result; + size_t uncompacted_keys_found = 0; + + const auto batch_delete_size = collect_keys + ? ConfigsMap::instance()->get_int("SymbolList.BatchDeleteDuringCompaction", 0) + : 0; + std::vector delete_batch; + + store->iterate_type(KeyType::SYMBOL_LIST, [&](auto&& key) { + auto atom_key = to_atom(std::forward(key)); + result.total_key_count++; + + if (atom_key.id() == compaction_id) { + // Keep the latest compaction key by creation timestamp + if (!result.compaction_key || atom_key.creation_ts() > result.compaction_key->creation_ts()) + result.compaction_key = atom_key; + } else { + uncompacted_keys_found++; + if (uncompacted_keys_found == warning_threshold() && !data.warned_expected_slowdown_) { + log::symbol().warn( + "`list_symbols` may take longer than expected as there have been many modifications " + "since `list_symbols` was last called. \n\n" + "See here for more information: " + "https://docs.arcticdb.io/latest/technical/on_disk_storage/#symbol-list-caching\n\n" + "To resolve, run `list_symbols` through to completion frequently. " + "Note: write access to storage is required for compaction. " + "{}.\n" + "Note: This warning will only appear once.\n", + will_attempt_compaction + ); + data.warned_expected_slowdown_ = true; + } + + // Build MapType entry directly — no intermediate sorted key vector + const auto& symbol = atom_key.start_index(); + const auto version_id = is_new_style_key(atom_key) ? atom_key.version_id() : unknown_version_id; + const auto timestamp = atom_key.creation_ts(); + const auto& action_id_val = atom_key.id(); + ActionType action = + std::get(action_id_val) == DeleteSymbol ? ActionType::DELETE : ActionType::ADD; + result.update_map[symbol].emplace_back(version_id, timestamp, action); + } + + if (collect_keys) { + if (batch_delete_size > 0) { + delete_batch.push_back(std::move(atom_key)); + if (static_cast(delete_batch.size()) >= batch_delete_size) { + std::vector to_remove(delete_batch.begin(), delete_batch.end()); + store->remove_keys_sync(to_remove); + delete_batch.clear(); + } + } else { + result.all_keys.push_back(std::move(atom_key)); + } + } + }); - const auto& symbol = key.start_index(); - const auto version_id = is_new_style_key(key) ? key.version_id() : unknown_version_id; - const auto timestamp = key.creation_ts(); - ActionType action = std::get(action_id) == DeleteSymbol ? ActionType::DELETE : ActionType::ADD; - map[symbol].emplace_back(version_id, timestamp, action); + // Flush remaining batch + if (!delete_batch.empty()) { + std::vector to_remove(delete_batch.begin(), delete_batch.end()); + store->remove_keys_sync(to_remove); } - return map; + + // Sort each symbol's entries to match the order the old sorted-key-vector approach produced. + // Old-style keys (unknown_version_id) should sort as version 0, not max, to preserve chronological order + // relative to new-style keys. + for (auto& [symbol, entries] : result.update_map) { + std::sort(entries.begin(), entries.end(), [](const SymbolEntryData& a, const SymbolEntryData& b) { + auto a_ver = a.reference_id_ == unknown_version_id ? VersionId{0} : a.reference_id_; + auto b_ver = b.reference_id_ == unknown_version_id ? VersionId{0} : b.reference_id_; + return std::tie(a_ver, a.timestamp_) < std::tie(b_ver, b.timestamp_); + }); + } + + return result; } auto tail_range(const std::vector& updated) { @@ -393,23 +393,27 @@ ProblematicResult is_problematic( return ProblematicResult{latest}; } -CollectionType merge_existing_with_journal_keys( - const std::shared_ptr& version_map, const std::shared_ptr& store, - const std::vector& keys, std::vector&& existing -) { - auto existing_keys = std::move(existing); - auto update_map = load_journal_keys(keys); +using ProblematicSymbolMap = std::map>; +struct ExistingKeysMergeResult { CollectionType symbols; - std::map> problematic_symbols; - const auto min_allowed_interval = ConfigsMap::instance()->get_int("SymbolList.MinIntervalNs", 100'000'000LL); + ProblematicSymbolMap problematic; +}; + +/// Merges existing (compacted) entries with journal updates, consuming the existing entries. +/// Matched entries are resolved or flagged as problematic; unmatched existing ADDs are kept as-is. +/// Consumed entries are erased from update_map. +ExistingKeysMergeResult merge_existing_entries( + std::vector existing_keys, MapType& update_map, timestamp min_allowed_interval +) { + ExistingKeysMergeResult result; for (auto& previous_entry : existing_keys) { const auto& stream_id = previous_entry.stream_id_; auto updated = update_map.find(stream_id); if (updated == std::end(update_map)) { if (previous_entry.action_ == ActionType::ADD) - symbols.emplace_back(std::move(previous_entry)); + result.symbols.emplace_back(std::move(previous_entry)); else util::check( previous_entry.action_ == ActionType::DELETE, @@ -420,26 +424,43 @@ CollectionType merge_existing_with_journal_keys( util::check(!updated->second.empty(), "Unexpected empty entry for symbol {}", updated->first); if (auto problematic_entry = is_problematic(previous_entry, updated->second, min_allowed_interval); problematic_entry) { - problematic_symbols.try_emplace( - stream_id, std::make_pair(problematic_entry.reference_id(), problematic_entry.time()) + result.problematic.try_emplace( + std::move(previous_entry.stream_id_), + std::make_pair(problematic_entry.reference_id(), problematic_entry.time()) ); } else { const auto& last_entry = updated->second.rbegin(); - symbols.emplace_back( - updated->first, last_entry->reference_id_, last_entry->timestamp_, last_entry->action_ + result.symbols.emplace_back( + std::move(previous_entry.stream_id_), + last_entry->reference_id_, + last_entry->timestamp_, + last_entry->action_ ); } update_map.erase(updated); } } + return result; +} + +CollectionType merge_existing_with_journal_map( + const std::shared_ptr& version_map, const std::shared_ptr& store, MapType& update_map, + std::vector&& existing +) { + const auto min_allowed_interval = ConfigsMap::instance()->get_int("SymbolList.MinIntervalNs", 100'000'000LL); + + auto merge_result = merge_existing_entries(std::move(existing), update_map, min_allowed_interval); + auto& symbols = merge_result.symbols; + auto& problematic_symbols = merge_result.problematic; + for (const auto& [symbol, entries] : update_map) { ARCTICDB_DEBUG(log::symbol(), "{} {}", symbol, entries); if (auto problematic_entry = is_problematic(entries, min_allowed_interval); problematic_entry) { problematic_symbols.try_emplace(symbol, problematic_entry.reference_id(), problematic_entry.time()); } else { - const auto& last_entry = entries.rbegin(); - symbols.emplace_back(symbol, last_entry->reference_id_, last_entry->timestamp_, last_entry->action_); + const auto& last_entry = *entries.rbegin(); + symbols.emplace_back(symbol, last_entry.reference_id_, last_entry.timestamp_, last_entry.action_); } } @@ -489,55 +510,46 @@ CollectionType merge_existing_with_journal_keys( return symbols; } -CollectionType load_from_symbol_list_keys( - const std::shared_ptr& version_map, const std::shared_ptr& store, - const std::vector& keys, const Compaction& compaction -) { - ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from symbol list keys"); - - auto previous_compaction = read_from_storage(store, *compaction); - return merge_existing_with_journal_keys(version_map, store, keys, std::move(previous_compaction)); -} - -CollectionType load_from_version_keys( - const std::shared_ptr& version_map, const std::shared_ptr& store, - const std::vector& keys, SymbolListData& data, WillAttemptCompaction will_attempt_compaction -) { - ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from version keys"); - auto previous_entries = load_previous_from_version_keys(store, data, will_attempt_compaction); - return merge_existing_with_journal_keys(version_map, store, keys, std::move(previous_entries)); -} - LoadResult attempt_load( const std::shared_ptr& version_map, const std::shared_ptr& store, SymbolListData& data, WillAttemptCompaction will_attempt_compaction ) { ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Symbol list load attempt"); + const bool collect_keys = will_attempt_compaction == WillAttemptCompaction::YES; + auto journal = load_journal_streaming(store, data, will_attempt_compaction, collect_keys); + LoadResult load_result; - load_result.symbol_list_keys_ = get_all_symbol_list_keys(store, data, will_attempt_compaction); - load_result.maybe_previous_compaction = last_compaction(load_result.symbol_list_keys_); + load_result.symbol_list_keys_ = std::move(journal.all_keys); + load_result.compaction_key_ = journal.compaction_key; + load_result.total_key_count_ = journal.total_key_count; + + if (journal.compaction_key) { + ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from symbol list keys"); + auto existing = read_from_storage(store, *journal.compaction_key); + load_result.symbols_ = + merge_existing_with_journal_map(version_map, store, journal.update_map, std::move(existing)); + } else { + ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from version keys"); + auto previous_entries = load_previous_from_version_keys(store, data, will_attempt_compaction); + load_result.symbols_ = + merge_existing_with_journal_map(version_map, store, journal.update_map, std::move(previous_entries)); - if (load_result.maybe_previous_compaction) - load_result.symbols_ = load_from_symbol_list_keys( - version_map, store, load_result.symbol_list_keys_, *load_result.maybe_previous_compaction - ); - else { - load_result.symbols_ = load_from_version_keys( - version_map, store, load_result.symbol_list_keys_, data, will_attempt_compaction - ); std::unordered_set keys_in_versions; - for (const auto& entry : load_result.symbols_) + for (const auto& entry : load_result.symbols_) { keys_in_versions.emplace(entry.stream_id_); + } - for (const auto& key : load_result.symbol_list_keys_) - util::check( - keys_in_versions.find(StreamId{std::get(key.start_index())}) != keys_in_versions.end(), - "Would delete unseen key {}", - key - ); + for (const auto& key : load_result.symbol_list_keys_) { + if (key.id() != compaction_id) { + util::check( + keys_in_versions.find(key.start_index()) != keys_in_versions.end(), + "Would delete unseen key {}", + key + ); + } + } } - load_result.timestamp_ = store->current_timestamp(); return load_result; } @@ -624,12 +636,12 @@ StreamDescriptor delete_symbol_stream_descriptor(const StreamId& stream_id, cons } bool SymbolList::needs_compaction(const LoadResult& load_result) const { - if (!load_result.maybe_previous_compaction) { + if (!load_result.compaction_key_) { log::version().debug("Symbol list: needs_compaction=[true] as no previous compaction"); return true; } - auto n_keys = static_cast(load_result.symbol_list_keys_.size()); + auto n_keys = static_cast(load_result.total_key_count_); if (auto fixed = ConfigsMap::instance()->get_int("SymbolList.MaxDelta")) { auto result = n_keys > *fixed; log::version().debug( @@ -780,20 +792,14 @@ std::vector delete_keys( return store->remove_keys_sync(variant_keys); } -bool has_recent_compaction( - const std::shared_ptr& store, - const std::optional::const_iterator>& maybe_previous_compaction -) { +bool has_recent_compaction(const std::shared_ptr& store, const std::optional& compaction_key) { bool found_last = false; bool has_newer = false; - if (maybe_previous_compaction.has_value()) { - // Symbol list keys source + if (compaction_key) { store->iterate_type( KeyType::SYMBOL_LIST, - [&found_last, - &has_newer, - &last_compaction_key = *maybe_previous_compaction.value()](const VariantKey& key) { + [&found_last, &has_newer, &last_compaction_key = *compaction_key](const VariantKey& key) { const auto& atom = to_atom(key); if (atom == last_compaction_key) found_last = true; @@ -803,7 +809,7 @@ bool has_recent_compaction( std::get(compaction_id) ); } else { - // Version keys source + // No prior compaction — any compaction key means someone else compacted store->iterate_type( KeyType::SYMBOL_LIST, [&has_newer](const VariantKey&) { has_newer = true; }, @@ -811,7 +817,7 @@ bool has_recent_compaction( ); } - return (maybe_previous_compaction && !found_last) || has_newer; + return (compaction_key && !found_last) || has_newer; } size_t SymbolList::compact(const std::shared_ptr& store) { @@ -819,7 +825,7 @@ size_t SymbolList::compact(const std::shared_ptr& store) { LoadResult load_result = ExponentialBackoff(100, 2000).go([this, &version_map, &store]() { return attempt_load(version_map, store, data_, WillAttemptCompaction::YES); }); - auto num_symbol_list_keys = load_result.symbol_list_keys_.size(); + auto num_symbol_list_keys = load_result.total_key_count_; ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Forcing compaction. Obtaining lock..."); StorageLock lock{StringId{CompactionLockName}}; @@ -832,7 +838,7 @@ size_t SymbolList::compact(const std::shared_ptr& store) { } void SymbolList::compact_internal(const std::shared_ptr& store, LoadResult& load_result) const { - if (has_recent_compaction(store, load_result.maybe_previous_compaction)) { + if (has_recent_compaction(store, load_result.compaction_key_)) { // legacy arcticc symbol list entries don't get correctly listed when doing `iterate_type`, so can mess // up racing symbol list compaction detection. ARCTICDB_RUNTIME_DEBUG( @@ -842,7 +848,9 @@ void SymbolList::compact_internal(const std::shared_ptr& store, LoadResul ); } else { auto written = write_symbols(store, load_result.symbols_, compaction_id, data_.type_holder_); - delete_keys(store, load_result.detach_symbol_list_keys(), std::get(written)); + if (!load_result.symbol_list_keys_.empty()) { + delete_keys(store, load_result.detach_symbol_list_keys(), std::get(written)); + } } } diff --git a/cpp/arcticdb/version/symbol_list.hpp b/cpp/arcticdb/version/symbol_list.hpp index f0096b1c92a..8f6398d4c00 100644 --- a/cpp/arcticdb/version/symbol_list.hpp +++ b/cpp/arcticdb/version/symbol_list.hpp @@ -21,8 +21,6 @@ struct SymbolListEntry; struct SymbolEntryData; using MapType = std::unordered_map>; -using Compaction = std::vector::const_iterator; -using MaybeCompaction = std::optional; using CollectionType = std::vector; enum class WillAttemptCompaction : uint8_t { @@ -32,10 +30,10 @@ enum class WillAttemptCompaction : uint8_t { }; struct LoadResult { - std::vector symbol_list_keys_; - MaybeCompaction maybe_previous_compaction; + std::vector symbol_list_keys_; // all SYMBOL_LIST keys, for deletion after compaction + std::optional compaction_key_; // the latest compaction key found, if any CollectionType symbols_; - timestamp timestamp_ = 0L; + size_t total_key_count_ = 0; // total number of SYMBOL_LIST keys, for compaction threshold std::vector&& detach_symbol_list_keys() { return std::move(symbol_list_keys_); } }; @@ -154,6 +152,7 @@ class SymbolList { return WillAttemptCompaction::NO_INSUFFICIENT_PERMISSIONS; return WillAttemptCompaction::YES; }(); + LoadResult load_result = ExponentialBackoff(100, 2000).go( [this, &version_map, &store, will_attempt_compaction]() { return attempt_load(version_map, store, data_, will_attempt_compaction); @@ -181,9 +180,9 @@ class SymbolList { } R output; - for (const auto& entry : load_result.symbols_) { + for (auto& entry : load_result.symbols_) { if (entry.action_ == ActionType::ADD) - output.insert(entry.stream_id_); + output.insert(std::move(entry.stream_id_)); } return output; diff --git a/cpp/arcticdb/version/test/benchmark_symbol_list.cpp b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp new file mode 100644 index 00000000000..c0e7f22877c --- /dev/null +++ b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp @@ -0,0 +1,306 @@ +/* Copyright 2026 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software + * will be governed by the Apache License, version 2.0. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace arcticdb; + +std::shared_ptr create_s3_mock_store(const std::string& lib_name = "benchmark_sl") { + storage::s3::S3ApiInstance::instance(); + arcticdb::proto::storage::VariantStorage vs; + arcticdb::proto::s3_storage::Config cfg; + cfg.set_bucket_name("test-bucket"); + cfg.set_use_mock_storage_for_testing(true); + util::pack_to_any(cfg, *vs.mutable_config()); + + storage::LibraryPath path{lib_name.c_str(), "store"}; + auto storages = storage::create_storages(path, storage::OpenMode::DELETE, {vs}); + auto library = std::make_shared(path, std::move(storages)); + return std::make_shared>(async::AsyncStore(library, codec::default_lz4_codec(), EncodingVersion::V1)); +} + +// Tracks peak heap usage by polling mallinfo2 from a background thread. +struct PeakHeapTracker { + std::atomic running_{false}; + std::atomic peak_uordblks_{0}; + size_t baseline_{0}; + std::thread sampler_; + + void start() { + malloc_trim(0); + baseline_ = mallinfo2().uordblks; + peak_uordblks_ = baseline_; + running_ = true; + sampler_ = std::thread([this] { + while (running_.load(std::memory_order_relaxed)) { + auto current = mallinfo2().uordblks; + auto prev = peak_uordblks_.load(std::memory_order_relaxed); + while (current > prev && !peak_uordblks_.compare_exchange_weak(prev, current, std::memory_order_relaxed) + ) + ; + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + }); + } + + size_t stop() { + running_ = false; + if (sampler_.joinable()) + sampler_.join(); + // One final sample + auto current = mallinfo2().uordblks; + auto prev = peak_uordblks_.load(std::memory_order_relaxed); + if (current > prev) + peak_uordblks_.store(current, std::memory_order_relaxed); + return peak_uordblks_.load() - baseline_; + } +}; + +// Benchmarks peak memory usage of symbol list loading via the compaction path. +// +// Run with: --benchmark_time_unit=ms --benchmark_filter=BM_symbol_list_load + +static void BM_symbol_list_load(benchmark::State& state) { + auto store = std::make_shared(); + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + + // Write journal entries for num_symbols symbols + for (int64_t i = 0; i < num_symbols; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 0); + } + + // Force compaction on first load to create the compacted segment + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map}; + sl.load>(version_map, store, false); + } + + // Add a few journal entries to exercise the merge path + for (int64_t i = 0; i < 100; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 1); + } + + // Disable compaction during the benchmark iterations + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, true); + + auto peak_delta = tracker.stop(); + auto retained = mallinfo2().uordblks - tracker.baseline_; + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["RetainedMB"] = benchmark::Counter(static_cast(retained) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + + benchmark::DoNotOptimize(result); + } +} + +BENCHMARK(BM_symbol_list_load)->Arg(100'000)->Arg(1'000'000)->Unit(benchmark::kMillisecond)->Iterations(3); + +// Benchmarks peak memory with many uncompacted journal entries per symbol. +// This is the scenario where the AtomKey vector elimination matters most. +// +// Setup: N_SYMBOLS symbols, each with ENTRIES_PER_SYMBOL journal entries. + +static void BM_symbol_list_load_many_entries(benchmark::State& state) { + auto store = std::make_shared(); + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + auto entries_per_symbol = state.range(1); + + // Write journal entries: each symbol gets multiple versions + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, v); + } + } + + // Force compaction to create the compacted segment + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map}; + sl.load>(version_map, store, false); + } + + // Write more journal entries (uncompacted) + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, entries_per_symbol + v); + } + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, true); + + auto peak_delta = tracker.stop(); + auto retained = mallinfo2().uordblks - tracker.baseline_; + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["RetainedMB"] = benchmark::Counter(static_cast(retained) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + state.counters["TotalEntries"] = benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); + + benchmark::DoNotOptimize(result); + } +} + +BENCHMARK(BM_symbol_list_load_many_entries)->Args({1'000, 1'000})->Unit(benchmark::kMillisecond)->Iterations(1); + +// Benchmarks peak memory during a load that triggers compaction. +// This measures the compaction path where keys are collected for deletion. +// +// Setup: N_SYMBOLS symbols with ENTRIES_PER_SYMBOL uncompacted journal entries each. +// MaxDelta=0 forces compaction on every load. + +static void BM_symbol_list_compaction(benchmark::State& state) { + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + auto entries_per_symbol = state.range(1); + + // Setup outside the benchmark loop — write journal entries once + auto store = std::make_shared(); + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, v); + } + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, false); + + auto peak_delta = tracker.stop(); + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + state.counters["TotalEntries"] = + benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); + + benchmark::DoNotOptimize(result); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); +} + +BENCHMARK(BM_symbol_list_compaction) + ->Args({1'000, 100}) + ->Unit(benchmark::kMillisecond) + ->Iterations(1); + +// ---- S3 mock variants: exercise the full S3 storage layer (serialization, key parsing) ---- + +static void BM_symbol_list_load_s3(benchmark::State& state) { + auto store = create_s3_mock_store(); + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + + for (int64_t i = 0; i < num_symbols; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 0); + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map}; + sl.load>(version_map, store, false); + } + + for (int64_t i = 0; i < 100; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 1); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, true); + + auto peak_delta = tracker.stop(); + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + + benchmark::DoNotOptimize(result); + } +} + +BENCHMARK(BM_symbol_list_load_s3)->Arg(10'000)->Unit(benchmark::kMillisecond)->Iterations(1); + +static void BM_symbol_list_compaction_s3(benchmark::State& state) { + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + auto entries_per_symbol = state.range(1); + + auto store = create_s3_mock_store(); + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, v); + } + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, false); + + auto peak_delta = tracker.stop(); + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + state.counters["TotalEntries"] = + benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); + + benchmark::DoNotOptimize(result); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); +} + +BENCHMARK(BM_symbol_list_compaction_s3) + ->Args({1'000, 100}) + ->Unit(benchmark::kMillisecond) + ->Iterations(1); diff --git a/cpp/arcticdb/version/test/test_symbol_list.cpp b/cpp/arcticdb/version/test/test_symbol_list.cpp index 6dac896a74b..464b80fcd2b 100644 --- a/cpp/arcticdb/version/test/test_symbol_list.cpp +++ b/cpp/arcticdb/version/test/test_symbol_list.cpp @@ -992,6 +992,49 @@ TEST_F(SymbolListSuite, AddAndCompact) { collect(futures).get(); } +TEST_F(SymbolListSuite, DirectPathEquivalence) { + // Setup: add symbols and force compaction + for (int i = 0; i < 50; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 0); + auto key = atom_key_builder().build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map_}; + sl.load>(version_map_, store_, false); + } + + // Add journal entries: new symbols + deletes of existing ones + for (int i = 40; i < 60; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 1); + auto key = atom_key_builder().version_id(1).build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + for (int i = 0; i < 10; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::remove_symbol(store_, StreamId{symbol}, 1); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + // Load via compaction-eligible path (will not actually compact since threshold is default) + SymbolList sl1{version_map_}; + auto compaction_result = sl1.load>(version_map_, store_, false); + + // Load via direct path (no_compaction=true) + SymbolList sl2{version_map_}; + auto direct_result = sl2.load>(version_map_, store_, true); + + EXPECT_EQ(compaction_result, direct_result); + // 50 original + 10 new = 60. The remove_symbol journal entries are resolved + // as ADD because the version map still shows those symbols as existing. + EXPECT_EQ(direct_result.size(), 60u); +} + struct SymbolListRace : SymbolListSuite, testing::WithParamInterface> {}; TEST_P(SymbolListRace, Run) { diff --git a/python/benchmarks/list_symbols.py b/python/benchmarks/list_symbols.py index ee515d355f7..3b13210d9ec 100644 --- a/python/benchmarks/list_symbols.py +++ b/python/benchmarks/list_symbols.py @@ -70,3 +70,117 @@ def time_has_symbol(self, *args): def _check_test_counter(self): assert self.test_counter == 1 self.test_counter += 1 + + +class ListSymbolsWithCompactedCache: + """Measure list_symbols when the cache is already compacted with some journal entries on top. + + This is the common production scenario: the cache has been compacted and a moderate number + of writes/deletes have happened since the last compaction.""" + + rounds = 1 + number = 3 + timeout = 600 + warmup_time = 0 + + storages = [Storage.LMDB, Storage.AMAZON] + num_symbols = [1_000, 10_000] + num_journal_entries = [0, 100] + + params = [num_symbols, num_journal_entries, storages] + param_names = ["num_symbols", "num_journal_entries", "storage"] + + def __init__(self): + self.logger = get_logger() + self.lib = None + + def setup_cache(self): + return create_libraries_across_storages(self.storages) + + def teardown(self, *args): + if self.lib is not None: + self.lib._nvs.version_store.clear() + + def setup(self, lib_for_storage, num_symbols, num_journal_entries, storage): + self.lib = lib_for_storage[storage] + if self.lib is None: + raise SkipNotImplemented + + simple_df = pd.DataFrame({"a": [1]}) + + # Write symbols in batches to avoid very large payloads + batch_size = 1000 + for start in range(0, num_symbols, batch_size): + end = min(start + batch_size, num_symbols) + payloads = [WritePayload(f"sym_{i}", simple_df) for i in range(start, end)] + self.lib.write_batch(payloads) + + # Trigger compaction by calling list_symbols + self.lib.list_symbols() + + # Add journal entries on top of the compacted cache + for i in range(num_journal_entries): + self.lib.write(f"sym_{i}", simple_df) + + def time_list_symbols(self, *args): + self.lib.list_symbols() + + def peakmem_list_symbols(self, *args): + self.lib.list_symbols() + + +class ListSymbolsWithDeletes: + """Measure list_symbols with a mix of adds and deletes since last compaction. + + Tests the merge path where some symbols have been deleted and new ones added.""" + + rounds = 1 + number = 3 + timeout = 600 + warmup_time = 0 + + storages = [Storage.LMDB, Storage.AMAZON] + num_symbols = [1_000, 10_000] + + params = [num_symbols, storages] + param_names = ["num_symbols", "storage"] + + def __init__(self): + self.logger = get_logger() + self.lib = None + + def setup_cache(self): + return create_libraries_across_storages(self.storages) + + def teardown(self, *args): + if self.lib is not None: + self.lib._nvs.version_store.clear() + + def setup(self, lib_for_storage, num_symbols, storage): + self.lib = lib_for_storage[storage] + if self.lib is None: + raise SkipNotImplemented + + simple_df = pd.DataFrame({"a": [1]}) + + batch_size = 1000 + for start in range(0, num_symbols, batch_size): + end = min(start + batch_size, num_symbols) + payloads = [WritePayload(f"sym_{i}", simple_df) for i in range(start, end)] + self.lib.write_batch(payloads) + + # Compact + self.lib.list_symbols() + + # Delete 10% of symbols and add new ones + num_to_delete = num_symbols // 10 + for i in range(num_to_delete): + self.lib.delete(f"sym_{i}") + for i in range(num_to_delete): + self.lib.write(f"sym_new_{i}", simple_df) + + def time_list_symbols(self, *args): + self.lib.list_symbols() + + def peakmem_list_symbols(self, *args): + self.lib.list_symbols() From 58de69d661bd9fa4af7a829d33a66e1297995427 Mon Sep 17 00:00:00 2001 From: Georgi Petrov Date: Thu, 26 Mar 2026 10:49:12 +0200 Subject: [PATCH 2/8] Fix c++ tests --- cpp/arcticdb/version/symbol_list.cpp | 9 +- .../version/test/benchmark_symbol_list.cpp | 44 +++-- python/.asv/results/benchmarks.json | 174 ++++++++++++++++++ python/benchmarks/list_symbols.py | 53 ++++++ 4 files changed, 259 insertions(+), 21 deletions(-) diff --git a/cpp/arcticdb/version/symbol_list.cpp b/cpp/arcticdb/version/symbol_list.cpp index 115e3285f30..1703c868820 100644 --- a/cpp/arcticdb/version/symbol_list.cpp +++ b/cpp/arcticdb/version/symbol_list.cpp @@ -205,15 +205,14 @@ struct StreamingJournalResult { }; StreamingJournalResult load_journal_streaming( - const std::shared_ptr& store, SymbolListData& data, - WillAttemptCompaction will_attempt_compaction, bool collect_keys + const std::shared_ptr& store, SymbolListData& data, WillAttemptCompaction will_attempt_compaction, + bool collect_keys ) { StreamingJournalResult result; size_t uncompacted_keys_found = 0; - const auto batch_delete_size = collect_keys - ? ConfigsMap::instance()->get_int("SymbolList.BatchDeleteDuringCompaction", 0) - : 0; + const auto batch_delete_size = + collect_keys ? ConfigsMap::instance()->get_int("SymbolList.BatchDeleteDuringCompaction", 0) : 0; std::vector delete_batch; store->iterate_type(KeyType::SYMBOL_LIST, [&](auto&& key) { diff --git a/cpp/arcticdb/version/test/benchmark_symbol_list.cpp b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp index c0e7f22877c..95aa78e3e39 100644 --- a/cpp/arcticdb/version/test/benchmark_symbol_list.cpp +++ b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp @@ -7,10 +7,13 @@ */ #include -#include #include #include +#ifdef __linux__ +#include +#endif + #include #include #include @@ -35,24 +38,37 @@ std::shared_ptr create_s3_mock_store(const std::string& lib_name = "bench storage::LibraryPath path{lib_name.c_str(), "store"}; auto storages = storage::create_storages(path, storage::OpenMode::DELETE, {vs}); auto library = std::make_shared(path, std::move(storages)); - return std::make_shared>(async::AsyncStore(library, codec::default_lz4_codec(), EncodingVersion::V1)); + return std::make_shared>( + async::AsyncStore(library, codec::default_lz4_codec(), EncodingVersion::V1) + ); } // Tracks peak heap usage by polling mallinfo2 from a background thread. +// Only functional on Linux with glibc; on other platforms it reports zero. struct PeakHeapTracker { std::atomic running_{false}; std::atomic peak_uordblks_{0}; size_t baseline_{0}; std::thread sampler_; + static size_t current_heap_bytes() { +#ifdef __linux__ + return mallinfo2().uordblks; +#else + return 0; +#endif + } + void start() { +#ifdef __linux__ malloc_trim(0); - baseline_ = mallinfo2().uordblks; +#endif + baseline_ = current_heap_bytes(); peak_uordblks_ = baseline_; running_ = true; sampler_ = std::thread([this] { while (running_.load(std::memory_order_relaxed)) { - auto current = mallinfo2().uordblks; + auto current = current_heap_bytes(); auto prev = peak_uordblks_.load(std::memory_order_relaxed); while (current > prev && !peak_uordblks_.compare_exchange_weak(prev, current, std::memory_order_relaxed) ) @@ -66,8 +82,7 @@ struct PeakHeapTracker { running_ = false; if (sampler_.joinable()) sampler_.join(); - // One final sample - auto current = mallinfo2().uordblks; + auto current = current_heap_bytes(); auto prev = peak_uordblks_.load(std::memory_order_relaxed); if (current > prev) peak_uordblks_.store(current, std::memory_order_relaxed); @@ -112,7 +127,7 @@ static void BM_symbol_list_load(benchmark::State& state) { auto result = sl.load>(version_map, store, true); auto peak_delta = tracker.stop(); - auto retained = mallinfo2().uordblks - tracker.baseline_; + auto retained = PeakHeapTracker::current_heap_bytes() - tracker.baseline_; state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); state.counters["RetainedMB"] = benchmark::Counter(static_cast(retained) / (1024.0 * 1024.0)); @@ -166,7 +181,7 @@ static void BM_symbol_list_load_many_entries(benchmark::State& state) { auto result = sl.load>(version_map, store, true); auto peak_delta = tracker.stop(); - auto retained = mallinfo2().uordblks - tracker.baseline_; + auto retained = PeakHeapTracker::current_heap_bytes() - tracker.baseline_; state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); state.counters["RetainedMB"] = benchmark::Counter(static_cast(retained) / (1024.0 * 1024.0)); @@ -211,8 +226,7 @@ static void BM_symbol_list_compaction(benchmark::State& state) { state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); - state.counters["TotalEntries"] = - benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); + state.counters["TotalEntries"] = benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); benchmark::DoNotOptimize(result); } @@ -222,6 +236,8 @@ static void BM_symbol_list_compaction(benchmark::State& state) { BENCHMARK(BM_symbol_list_compaction) ->Args({1'000, 100}) + ->Args({1'000, 1'000}) + ->Args({10'000, 100}) ->Unit(benchmark::kMillisecond) ->Iterations(1); @@ -291,8 +307,7 @@ static void BM_symbol_list_compaction_s3(benchmark::State& state) { state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); - state.counters["TotalEntries"] = - benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); + state.counters["TotalEntries"] = benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); benchmark::DoNotOptimize(result); } @@ -300,7 +315,4 @@ static void BM_symbol_list_compaction_s3(benchmark::State& state) { ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); } -BENCHMARK(BM_symbol_list_compaction_s3) - ->Args({1'000, 100}) - ->Unit(benchmark::kMillisecond) - ->Iterations(1); +BENCHMARK(BM_symbol_list_compaction_s3)->Args({1'000, 100})->Unit(benchmark::kMillisecond)->Iterations(1); diff --git a/python/.asv/results/benchmarks.json b/python/.asv/results/benchmarks.json index 626fa120b7b..623187f32ec 100644 --- a/python/.asv/results/benchmarks.json +++ b/python/.asv/results/benchmarks.json @@ -2060,6 +2060,180 @@ "version": "c292f6f8757182d8ac67ddb990a81389675b5d264b32a5b2f720514bf703f6a9", "warmup_time": -1 }, + "list_symbols.ListSymbolsCompaction.peakmem_list_symbols": { + "code": "class ListSymbolsCompaction:\n def peakmem_list_symbols(self, *args):\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, num_versions, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n # Write multiple versions per symbol to create many journal entries\n # Total entries = num_symbols * num_versions\n for v in range(num_versions):\n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "name": "list_symbols.ListSymbolsCompaction.peakmem_list_symbols", + "param_names": [ + "num_symbols", + "num_versions", + "storage" + ], + "params": [ + [ + "1000" + ], + [ + "10", + "100" + ], + [ + "", + "" + ] + ], + "setup_cache_key": "list_symbols:154", + "timeout": 1200, + "type": "peakmemory", + "unit": "bytes", + "version": "94ba7991182f7c4fecbb447270b9eed2beeeaedd57165a2aba287d9b413530d6" + }, + "list_symbols.ListSymbolsCompaction.time_list_symbols": { + "code": "class ListSymbolsCompaction:\n def time_list_symbols(self, *args):\n # This triggers compaction since there are many uncompacted entries\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, num_versions, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n # Write multiple versions per symbol to create many journal entries\n # Total entries = num_symbols * num_versions\n for v in range(num_versions):\n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "min_run_count": 2, + "name": "list_symbols.ListSymbolsCompaction.time_list_symbols", + "number": 1, + "param_names": [ + "num_symbols", + "num_versions", + "storage" + ], + "params": [ + [ + "1000" + ], + [ + "10", + "100" + ], + [ + "", + "" + ] + ], + "repeat": 0, + "rounds": 1, + "sample_time": 0.01, + "setup_cache_key": "list_symbols:154", + "timeout": 1200, + "type": "time", + "unit": "seconds", + "version": "3df9d467f50b70b0623edeeff1dc919e3311985ae79b9de3a2e121fc19fdc0e9", + "warmup_time": 0 + }, + "list_symbols.ListSymbolsWithCompactedCache.peakmem_list_symbols": { + "code": "class ListSymbolsWithCompactedCache:\n def peakmem_list_symbols(self, *args):\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, num_journal_entries, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n # Write symbols in batches to avoid very large payloads\n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n \n # Trigger compaction by calling list_symbols\n self.lib.list_symbols()\n \n # Add journal entries on top of the compacted cache\n for i in range(num_journal_entries):\n self.lib.write(f\"sym_{i}\", simple_df)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "name": "list_symbols.ListSymbolsWithCompactedCache.peakmem_list_symbols", + "param_names": [ + "num_symbols", + "num_journal_entries", + "storage" + ], + "params": [ + [ + "1000", + "10000" + ], + [ + "0", + "100" + ], + [ + "", + "" + ] + ], + "setup_cache_key": "list_symbols:97", + "timeout": 600, + "type": "peakmemory", + "unit": "bytes", + "version": "b4be3096a68b9744d0576580f1dd94c8426b2828826419b4a881a1e2275c4087" + }, + "list_symbols.ListSymbolsWithCompactedCache.time_list_symbols": { + "code": "class ListSymbolsWithCompactedCache:\n def time_list_symbols(self, *args):\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, num_journal_entries, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n # Write symbols in batches to avoid very large payloads\n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n \n # Trigger compaction by calling list_symbols\n self.lib.list_symbols()\n \n # Add journal entries on top of the compacted cache\n for i in range(num_journal_entries):\n self.lib.write(f\"sym_{i}\", simple_df)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "min_run_count": 2, + "name": "list_symbols.ListSymbolsWithCompactedCache.time_list_symbols", + "number": 3, + "param_names": [ + "num_symbols", + "num_journal_entries", + "storage" + ], + "params": [ + [ + "1000", + "10000" + ], + [ + "0", + "100" + ], + [ + "", + "" + ] + ], + "repeat": 0, + "rounds": 1, + "sample_time": 0.01, + "setup_cache_key": "list_symbols:97", + "timeout": 600, + "type": "time", + "unit": "seconds", + "version": "f9910a09469fa71a142bc057314945fe8c50394fcf7def322ad0e3dec941ce2e", + "warmup_time": 0 + }, + "list_symbols.ListSymbolsWithDeletes.peakmem_list_symbols": { + "code": "class ListSymbolsWithDeletes:\n def peakmem_list_symbols(self, *args):\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n \n # Compact\n self.lib.list_symbols()\n \n # Delete 10% of symbols and add new ones\n num_to_delete = num_symbols // 10\n for i in range(num_to_delete):\n self.lib.delete(f\"sym_{i}\")\n for i in range(num_to_delete):\n self.lib.write(f\"sym_new_{i}\", simple_df)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "name": "list_symbols.ListSymbolsWithDeletes.peakmem_list_symbols", + "param_names": [ + "num_symbols", + "storage" + ], + "params": [ + [ + "1000", + "10000" + ], + [ + "", + "" + ] + ], + "setup_cache_key": "list_symbols:205", + "timeout": 600, + "type": "peakmemory", + "unit": "bytes", + "version": "e3fc772bf4b75f473b65d8a843dd3599cb8ca503b251067b12f0e22e6283a51c" + }, + "list_symbols.ListSymbolsWithDeletes.time_list_symbols": { + "code": "class ListSymbolsWithDeletes:\n def time_list_symbols(self, *args):\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n \n # Compact\n self.lib.list_symbols()\n \n # Delete 10% of symbols and add new ones\n num_to_delete = num_symbols // 10\n for i in range(num_to_delete):\n self.lib.delete(f\"sym_{i}\")\n for i in range(num_to_delete):\n self.lib.write(f\"sym_new_{i}\", simple_df)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "min_run_count": 2, + "name": "list_symbols.ListSymbolsWithDeletes.time_list_symbols", + "number": 3, + "param_names": [ + "num_symbols", + "storage" + ], + "params": [ + [ + "1000", + "10000" + ], + [ + "", + "" + ] + ], + "repeat": 0, + "rounds": 1, + "sample_time": 0.01, + "setup_cache_key": "list_symbols:205", + "timeout": 600, + "type": "time", + "unit": "seconds", + "version": "d0da3af49ac919286ca7bdabb468c07c7fa93074dba87e05db3ddd8c33c2a831", + "warmup_time": 0 + }, "list_symbols.ListSymbolsWithoutCache.peakmem_list_symbols": { "code": "class ListSymbolsWithoutCache:\n def peakmem_list_symbols(self, *args):\n self._check_test_counter()\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n self.test_counter = 1\n \n simple_df = pd.DataFrame({\"a\": [1]})\n write_payloads = [WritePayload(f\"{i}\", simple_df) for i in range(num_symbols)]\n self.lib.write_batch(write_payloads)\n\n def setup_cache(self):\n lib_for_storage = create_libraries_across_storages(self.storages)\n return lib_for_storage", "name": "list_symbols.ListSymbolsWithoutCache.peakmem_list_symbols", diff --git a/python/benchmarks/list_symbols.py b/python/benchmarks/list_symbols.py index 3b13210d9ec..0d4726c8340 100644 --- a/python/benchmarks/list_symbols.py +++ b/python/benchmarks/list_symbols.py @@ -129,6 +129,59 @@ def peakmem_list_symbols(self, *args): self.lib.list_symbols() +class ListSymbolsCompaction: + """Measure list_symbols when compaction is triggered with many uncompacted journal entries. + + This is the scenario where streaming optimization matters most: many journal entries + need to be merged during compaction.""" + + rounds = 1 + number = 1 + timeout = 1200 + warmup_time = 0 + + storages = [Storage.LMDB, Storage.AMAZON] + num_symbols = [1_000] + num_versions = [10, 100] + + params = [num_symbols, num_versions, storages] + param_names = ["num_symbols", "num_versions", "storage"] + + def __init__(self): + self.logger = get_logger() + self.lib = None + + def setup_cache(self): + return create_libraries_across_storages(self.storages) + + def teardown(self, *args): + if self.lib is not None: + self.lib._nvs.version_store.clear() + + def setup(self, lib_for_storage, num_symbols, num_versions, storage): + self.lib = lib_for_storage[storage] + if self.lib is None: + raise SkipNotImplemented + + simple_df = pd.DataFrame({"a": [1]}) + + # Write multiple versions per symbol to create many journal entries + # Total entries = num_symbols * num_versions + for v in range(num_versions): + batch_size = 1000 + for start in range(0, num_symbols, batch_size): + end = min(start + batch_size, num_symbols) + payloads = [WritePayload(f"sym_{i}", simple_df) for i in range(start, end)] + self.lib.write_batch(payloads) + + def time_list_symbols(self, *args): + # This triggers compaction since there are many uncompacted entries + self.lib.list_symbols() + + def peakmem_list_symbols(self, *args): + self.lib.list_symbols() + + class ListSymbolsWithDeletes: """Measure list_symbols with a mix of adds and deletes since last compaction. From 1db820481c336e656bf5414a22f3eddc4a33bf05 Mon Sep 17 00:00:00 2001 From: Georgi Petrov Date: Fri, 27 Mar 2026 17:17:55 +0200 Subject: [PATCH 3/8] Fix SL C++ benchmark --- cpp/arcticdb/version/test/benchmark_symbol_list.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/arcticdb/version/test/benchmark_symbol_list.cpp b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp index 95aa78e3e39..7806c202ec7 100644 --- a/cpp/arcticdb/version/test/benchmark_symbol_list.cpp +++ b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp @@ -53,7 +53,11 @@ struct PeakHeapTracker { static size_t current_heap_bytes() { #ifdef __linux__ +#if __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 33) return mallinfo2().uordblks; +#else + return static_cast(mallinfo().uordblks); +#endif #else return 0; #endif From 5844198f6ff4f1d68e9292053a2bff2696c7f7c1 Mon Sep 17 00:00:00 2001 From: Georgi Petrov Date: Tue, 21 Apr 2026 16:12:32 +0300 Subject: [PATCH 4/8] Reduce the list symbols benchmarks --- python/.asv/results/benchmarks.json | 21 ++++++--------------- python/benchmarks/list_symbols.py | 8 ++++---- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/python/.asv/results/benchmarks.json b/python/.asv/results/benchmarks.json index 623187f32ec..4ab93d5bf19 100644 --- a/python/.asv/results/benchmarks.json +++ b/python/.asv/results/benchmarks.json @@ -2073,8 +2073,7 @@ "1000" ], [ - "10", - "100" + "10" ], [ "", @@ -2102,8 +2101,7 @@ "1000" ], [ - "10", - "100" + "10" ], [ "", @@ -2130,8 +2128,7 @@ ], "params": [ [ - "1000", - "10000" + "1000" ], [ "0", @@ -2160,8 +2157,7 @@ ], "params": [ [ - "1000", - "10000" + "1000" ], [ "0", @@ -2191,8 +2187,7 @@ ], "params": [ [ - "1000", - "10000" + "1000" ], [ "", @@ -2216,8 +2211,7 @@ ], "params": [ [ - "1000", - "10000" + "1000" ], [ "", @@ -2243,7 +2237,6 @@ ], "params": [ [ - "100", "1000" ], [ @@ -2268,7 +2261,6 @@ ], "params": [ [ - "100", "1000" ], [ @@ -2297,7 +2289,6 @@ ], "params": [ [ - "100", "1000" ], [ diff --git a/python/benchmarks/list_symbols.py b/python/benchmarks/list_symbols.py index 0d4726c8340..1dc2eadc40e 100644 --- a/python/benchmarks/list_symbols.py +++ b/python/benchmarks/list_symbols.py @@ -24,7 +24,7 @@ class ListSymbolsWithoutCache: warmup_time = 0 storages = [Storage.LMDB, Storage.AMAZON] - num_symbols = [100, 1000] + num_symbols = [1000] params = [num_symbols, storages] param_names = ["num_symbols", "storage"] @@ -84,7 +84,7 @@ class ListSymbolsWithCompactedCache: warmup_time = 0 storages = [Storage.LMDB, Storage.AMAZON] - num_symbols = [1_000, 10_000] + num_symbols = [1_000] num_journal_entries = [0, 100] params = [num_symbols, num_journal_entries, storages] @@ -142,7 +142,7 @@ class ListSymbolsCompaction: storages = [Storage.LMDB, Storage.AMAZON] num_symbols = [1_000] - num_versions = [10, 100] + num_versions = [10] params = [num_symbols, num_versions, storages] param_names = ["num_symbols", "num_versions", "storage"] @@ -193,7 +193,7 @@ class ListSymbolsWithDeletes: warmup_time = 0 storages = [Storage.LMDB, Storage.AMAZON] - num_symbols = [1_000, 10_000] + num_symbols = [1_000] params = [num_symbols, storages] param_names = ["num_symbols", "storage"] From 2584213052f9226a3cb0d2b6117312b859fbce45 Mon Sep 17 00:00:00 2001 From: Georgi Petrov Date: Wed, 13 May 2026 13:46:05 +0100 Subject: [PATCH 5/8] Fix small perf gaps --- cpp/arcticdb/version/symbol_list.cpp | 136 ++++++++---------- cpp/arcticdb/version/symbol_list.hpp | 25 ++-- .../version/test/benchmark_symbol_list.cpp | 30 ++-- 3 files changed, 89 insertions(+), 102 deletions(-) diff --git a/cpp/arcticdb/version/symbol_list.cpp b/cpp/arcticdb/version/symbol_list.cpp index 1703c868820..3888633966d 100644 --- a/cpp/arcticdb/version/symbol_list.cpp +++ b/cpp/arcticdb/version/symbol_list.cpp @@ -201,7 +201,32 @@ struct StreamingJournalResult { std::optional compaction_key; MapType update_map; size_t total_key_count = 0; - std::vector all_keys; // collected for deletion after compaction + std::vector all_keys; +}; + +void add_update_map_entry(MapType& update_map, const AtomKey& atom_key) { + const auto& symbol = atom_key.start_index(); + const auto version_id = is_new_style_key(atom_key) ? atom_key.version_id() : unknown_version_id; + const auto timestamp = atom_key.creation_ts(); + const auto& action_id_val = atom_key.id(); + ActionType action = std::get(action_id_val) == DeleteSymbol ? ActionType::DELETE : ActionType::ADD; + update_map[symbol].emplace_back(version_id, timestamp, action); +} + +void sort_update_map_entries(MapType& update_map) { + for (auto& [symbol, entries] : update_map) { + std::sort(entries.begin(), entries.end(), [](const SymbolEntryData& a, const SymbolEntryData& b) { + auto a_ver = a.reference_id_ == unknown_version_id ? VersionId{0} : a.reference_id_; + auto b_ver = b.reference_id_ == unknown_version_id ? VersionId{0} : b.reference_id_; + return std::tie(a_ver, a.timestamp_) < std::tie(b_ver, b.timestamp_); + }); + } +} + +auto key_sort_comparator = [](const AtomKey& l, const AtomKey& r) { + auto l_ver = is_new_style_key(l) ? l.version_id() : VersionId{0}; + auto r_ver = is_new_style_key(r) ? r.version_id() : VersionId{0}; + return std::tie(l.start_index(), l_ver, l.creation_ts()) < std::tie(r.start_index(), r_ver, r.creation_ts()); }; StreamingJournalResult load_journal_streaming( @@ -211,16 +236,11 @@ StreamingJournalResult load_journal_streaming( StreamingJournalResult result; size_t uncompacted_keys_found = 0; - const auto batch_delete_size = - collect_keys ? ConfigsMap::instance()->get_int("SymbolList.BatchDeleteDuringCompaction", 0) : 0; - std::vector delete_batch; - store->iterate_type(KeyType::SYMBOL_LIST, [&](auto&& key) { auto atom_key = to_atom(std::forward(key)); result.total_key_count++; if (atom_key.id() == compaction_id) { - // Keep the latest compaction key by creation timestamp if (!result.compaction_key || atom_key.creation_ts() > result.compaction_key->creation_ts()) result.compaction_key = atom_key; } else { @@ -240,46 +260,14 @@ StreamingJournalResult load_journal_streaming( data.warned_expected_slowdown_ = true; } - // Build MapType entry directly — no intermediate sorted key vector - const auto& symbol = atom_key.start_index(); - const auto version_id = is_new_style_key(atom_key) ? atom_key.version_id() : unknown_version_id; - const auto timestamp = atom_key.creation_ts(); - const auto& action_id_val = atom_key.id(); - ActionType action = - std::get(action_id_val) == DeleteSymbol ? ActionType::DELETE : ActionType::ADD; - result.update_map[symbol].emplace_back(version_id, timestamp, action); + add_update_map_entry(result.update_map, atom_key); } - if (collect_keys) { - if (batch_delete_size > 0) { - delete_batch.push_back(std::move(atom_key)); - if (static_cast(delete_batch.size()) >= batch_delete_size) { - std::vector to_remove(delete_batch.begin(), delete_batch.end()); - store->remove_keys_sync(to_remove); - delete_batch.clear(); - } - } else { - result.all_keys.push_back(std::move(atom_key)); - } - } + if (collect_keys) + result.all_keys.emplace_back(std::move(atom_key)); }); - // Flush remaining batch - if (!delete_batch.empty()) { - std::vector to_remove(delete_batch.begin(), delete_batch.end()); - store->remove_keys_sync(to_remove); - } - - // Sort each symbol's entries to match the order the old sorted-key-vector approach produced. - // Old-style keys (unknown_version_id) should sort as version 0, not max, to preserve chronological order - // relative to new-style keys. - for (auto& [symbol, entries] : result.update_map) { - std::sort(entries.begin(), entries.end(), [](const SymbolEntryData& a, const SymbolEntryData& b) { - auto a_ver = a.reference_id_ == unknown_version_id ? VersionId{0} : a.reference_id_; - auto b_ver = b.reference_id_ == unknown_version_id ? VersionId{0} : b.reference_id_; - return std::tie(a_ver, a.timestamp_) < std::tie(b_ver, b.timestamp_); - }); - } + sort_update_map_entries(result.update_map); return result; } @@ -518,9 +506,9 @@ LoadResult attempt_load( auto journal = load_journal_streaming(store, data, will_attempt_compaction, collect_keys); LoadResult load_result; - load_result.symbol_list_keys_ = std::move(journal.all_keys); load_result.compaction_key_ = journal.compaction_key; load_result.total_key_count_ = journal.total_key_count; + load_result.symbol_list_keys_ = std::move(journal.all_keys); if (journal.compaction_key) { ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from symbol list keys"); @@ -532,21 +520,6 @@ LoadResult attempt_load( auto previous_entries = load_previous_from_version_keys(store, data, will_attempt_compaction); load_result.symbols_ = merge_existing_with_journal_map(version_map, store, journal.update_map, std::move(previous_entries)); - - std::unordered_set keys_in_versions; - for (const auto& entry : load_result.symbols_) { - keys_in_versions.emplace(entry.stream_id_); - } - - for (const auto& key : load_result.symbol_list_keys_) { - if (key.id() != compaction_id) { - util::check( - keys_in_versions.find(key.start_index()) != keys_in_versions.end(), - "Would delete unseen key {}", - key - ); - } - } } return load_result; @@ -755,7 +728,7 @@ SegmentInMemory create_empty_segment(const StreamId& stream_id) { } VariantKey write_symbols( - const std::shared_ptr& store, const CollectionType& symbols, const StreamId& stream_id, + const std::shared_ptr& store, CollectionType symbols, const StreamId& stream_id, const StreamId& type_holder ) { ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Writing {} symbols to symbol list cache", symbols.size()); @@ -775,20 +748,15 @@ VariantKey write_symbols( return store->write_sync(KeyType::SYMBOL_LIST, 0, stream_id, NumericIndex{0}, NumericIndex{0}, std::move(segment)); } -std::vector delete_keys( - const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude -) { - auto to_remove = std::move(remove); - std::vector variant_keys; - variant_keys.reserve(to_remove.size()); - for (auto& atom_key : to_remove) { - // Corner case: if the newly written Compaction key (exclude) has the same timestamp as an existing one - // (e.g. when a previous compaction round failed in the deletion step), we don't want to delete the former +void delete_keys(const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude) { + std::vector to_remove; + to_remove.reserve(remove.size()); + for (auto& atom_key : remove) { if (atom_key != exclude) - variant_keys.emplace_back(atom_key); + to_remove.emplace_back(std::move(atom_key)); } - - return store->remove_keys_sync(variant_keys); + if (!to_remove.empty()) + store->remove_keys_sync(to_remove); } bool has_recent_compaction(const std::shared_ptr& store, const std::optional& compaction_key) { @@ -808,7 +776,6 @@ bool has_recent_compaction(const std::shared_ptr& store, const std::optio std::get(compaction_id) ); } else { - // No prior compaction — any compaction key means someone else compacted store->iterate_type( KeyType::SYMBOL_LIST, [&has_newer](const VariantKey&) { has_newer = true; }, @@ -838,19 +805,30 @@ size_t SymbolList::compact(const std::shared_ptr& store) { void SymbolList::compact_internal(const std::shared_ptr& store, LoadResult& load_result) const { if (has_recent_compaction(store, load_result.compaction_key_)) { - // legacy arcticc symbol list entries don't get correctly listed when doing `iterate_type`, so can mess - // up racing symbol list compaction detection. ARCTICDB_RUNTIME_DEBUG( log::symbol(), "Symbol list compaction will be skipped: either a concurrent compaction was detected " "or there are legacy arcticc symbol list entries that cannot be verified." ); - } else { - auto written = write_symbols(store, load_result.symbols_, compaction_id, data_.type_holder_); - if (!load_result.symbol_list_keys_.empty()) { - delete_keys(store, load_result.detach_symbol_list_keys(), std::get(written)); - } + return; } + + auto written = write_symbols(store, std::move(load_result.symbols_), compaction_id, data_.type_holder_); + auto written_key = std::get(written); + + // Sort pre-collected keys for storage backend performance, then delete excluding the newly written key + auto& keys = load_result.symbol_list_keys_; + std::sort(keys.begin(), keys.end(), [](const VariantKey& l, const VariantKey& r) { + return key_sort_comparator(to_atom(l), to_atom(r)); + }); + keys.erase( + std::remove_if(keys.begin(), keys.end(), [&written_key](const VariantKey& vk) { + return to_atom(vk) == written_key; + }), + keys.end() + ); + if (!keys.empty()) + store->remove_keys_sync(std::move(keys)); } } // namespace arcticdb diff --git a/cpp/arcticdb/version/symbol_list.hpp b/cpp/arcticdb/version/symbol_list.hpp index 8f6398d4c00..8312eabaed3 100644 --- a/cpp/arcticdb/version/symbol_list.hpp +++ b/cpp/arcticdb/version/symbol_list.hpp @@ -30,12 +30,10 @@ enum class WillAttemptCompaction : uint8_t { }; struct LoadResult { - std::vector symbol_list_keys_; // all SYMBOL_LIST keys, for deletion after compaction - std::optional compaction_key_; // the latest compaction key found, if any + std::optional compaction_key_; CollectionType symbols_; - size_t total_key_count_ = 0; // total number of SYMBOL_LIST keys, for compaction threshold - - std::vector&& detach_symbol_list_keys() { return std::move(symbol_list_keys_); } + size_t total_key_count_ = 0; + std::vector symbol_list_keys_; }; struct SymbolListData { @@ -159,6 +157,13 @@ class SymbolList { } ); + // Build output before compaction — compact_internal frees symbols_ after writing them to storage + R output; + for (const auto& entry : load_result.symbols_) { + if (entry.action_ == ActionType::ADD) + output.insert(entry.stream_id_); + } + if (will_attempt_compaction == WillAttemptCompaction::YES && needs_compaction(load_result)) { ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Compaction necessary. Obtaining lock..."); try { @@ -179,12 +184,6 @@ class SymbolList { } } - R output; - for (auto& entry : load_result.symbols_) { - if (entry.action_ == ActionType::ADD) - output.insert(std::move(entry.stream_id_)); - } - return output; } @@ -214,9 +213,7 @@ class SymbolList { [[nodiscard]] bool needs_compaction(const LoadResult& load_result) const; }; -std::vector delete_keys( - const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude -); +void delete_keys(const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude); struct WriteSymbolTask : async::BaseTask { const std::shared_ptr store_; diff --git a/cpp/arcticdb/version/test/benchmark_symbol_list.cpp b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp index 7806c202ec7..16946c7a7f4 100644 --- a/cpp/arcticdb/version/test/benchmark_symbol_list.cpp +++ b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp @@ -141,7 +141,10 @@ static void BM_symbol_list_load(benchmark::State& state) { } } -BENCHMARK(BM_symbol_list_load)->Arg(100'000)->Arg(1'000'000)->Unit(benchmark::kMillisecond)->Iterations(3); +BENCHMARK(BM_symbol_list_load)->Arg(100'000)->Arg(300'000)->Unit(benchmark::kMillisecond)->Iterations(3); +// Disabled in CI: 1M-symbol setup requires writing+compacting 1M journal entries (~30 s on CI hardware). +// Run locally: make bench-cpp FILTER=BM_symbol_list_load/1000000 +// BENCHMARK(BM_symbol_list_load)->Arg(1'000'000)->Unit(benchmark::kMillisecond)->Iterations(3); // Benchmarks peak memory with many uncompacted journal entries per symbol. // This is the scenario where the AtomKey vector elimination matters most. @@ -197,6 +200,9 @@ static void BM_symbol_list_load_many_entries(benchmark::State& state) { } BENCHMARK(BM_symbol_list_load_many_entries)->Args({1'000, 1'000})->Unit(benchmark::kMillisecond)->Iterations(1); +// Disabled in CI: 300K-symbol setup writes 600K journal entries (~25 s on CI hardware). +// Run locally: make bench-cpp FILTER=BM_symbol_list_load_many_entries/300000 +// BENCHMARK(BM_symbol_list_load_many_entries)->Args({300'000, 1})->Unit(benchmark::kMillisecond)->Iterations(1); // Benchmarks peak memory during a load that triggers compaction. // This measures the compaction path where keys are collected for deletion. @@ -238,12 +244,15 @@ static void BM_symbol_list_compaction(benchmark::State& state) { ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); } -BENCHMARK(BM_symbol_list_compaction) - ->Args({1'000, 100}) - ->Args({1'000, 1'000}) - ->Args({10'000, 100}) - ->Unit(benchmark::kMillisecond) - ->Iterations(1); +BENCHMARK(BM_symbol_list_compaction)->Args({1'000, 100})->Unit(benchmark::kMillisecond)->Iterations(1); +// Disabled in CI: 1M-entry cases each take ~20 s; 300K×1 setup writes 600K entries. +// Run locally: make bench-cpp FILTER=BM_symbol_list_compaction +// BENCHMARK(BM_symbol_list_compaction) +// ->Args({1'000, 1'000}) +// ->Args({10'000, 100}) +// ->Args({300'000, 1}) +// ->Unit(benchmark::kMillisecond) +// ->Iterations(1); // ---- S3 mock variants: exercise the full S3 storage layer (serialization, key parsing) ---- @@ -286,7 +295,7 @@ static void BM_symbol_list_load_s3(benchmark::State& state) { BENCHMARK(BM_symbol_list_load_s3)->Arg(10'000)->Unit(benchmark::kMillisecond)->Iterations(1); -static void BM_symbol_list_compaction_s3(benchmark::State& state) { +[[maybe_unused]] static void BM_symbol_list_compaction_s3(benchmark::State& state) { auto version_map = std::make_shared(); auto num_symbols = state.range(0); auto entries_per_symbol = state.range(1); @@ -319,4 +328,7 @@ static void BM_symbol_list_compaction_s3(benchmark::State& state) { ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); } -BENCHMARK(BM_symbol_list_compaction_s3)->Args({1'000, 100})->Unit(benchmark::kMillisecond)->Iterations(1); +// Disabled in CI: S3 mock compaction takes ~40 s even for 1K×100 entries. +// Run locally: make bench-cpp FILTER=BM_symbol_list_compaction_s3 +// BENCHMARK(BM_symbol_list_compaction_s3)->Args({1'000, 100})->Unit(benchmark::kMillisecond)->Iterations(1); +// BENCHMARK(BM_symbol_list_compaction_s3)->Args({10'000, 10})->Unit(benchmark::kMillisecond)->Iterations(1); From ddbff77b58c8c0df427f37dd71202c79c8e6df4b Mon Sep 17 00:00:00 2001 From: Georgi Petrov Date: Thu, 14 May 2026 10:30:15 +0100 Subject: [PATCH 6/8] Bring back missing checks --- cpp/arcticdb/version/symbol_list.cpp | 43 ++++++++++++++-- .../version/test/test_symbol_list.cpp | 49 +++++++++++++++++++ 2 files changed, 89 insertions(+), 3 deletions(-) diff --git a/cpp/arcticdb/version/symbol_list.cpp b/cpp/arcticdb/version/symbol_list.cpp index 3888633966d..f88f82bcbec 100644 --- a/cpp/arcticdb/version/symbol_list.cpp +++ b/cpp/arcticdb/version/symbol_list.cpp @@ -151,6 +151,14 @@ void for_each_segment_entry(const SegmentInMemory& seg, Visitor&& visitor) { } // New-style: columns 0-2 are additions, columns 3-5 are deletions + util::check( + seg.column(0).row_count() == seg.column(1).row_count() && + seg.column(0).row_count() == seg.column(2).row_count(), + "Column mismatch in symbol segment additions: {} {} {}", + seg.column(0).row_count(), + seg.column(1).row_count(), + seg.column(2).row_count() + ); for (auto i = 0L; i < seg.column(0).row_count(); ++i) { visitor(stream_id_from_segment(data_type, seg, i, 0), VersionId{scalar_at(seg, i, 1)}, @@ -159,6 +167,14 @@ void for_each_segment_entry(const SegmentInMemory& seg, Visitor&& visitor) { } if (seg.descriptor().field_count() == 6) { + util::check( + seg.column(3).row_count() == seg.column(4).row_count() && + seg.column(3).row_count() == seg.column(5).row_count(), + "Column mismatch in symbol segment deletions: {} {} {}", + seg.column(3).row_count(), + seg.column(4).row_count(), + seg.column(5).row_count() + ); for (auto i = 0L; i < seg.column(3).row_count(); ++i) { visitor(stream_id_from_segment(data_type, seg, i, 3), VersionId{scalar_at(seg, i, 4)}, @@ -520,6 +536,25 @@ LoadResult attempt_load( auto previous_entries = load_previous_from_version_keys(store, data, will_attempt_compaction); load_result.symbols_ = merge_existing_with_journal_map(version_map, store, journal.update_map, std::move(previous_entries)); + + // Verify every journal key we'd delete during compaction corresponds to a symbol in the + // merged output. Uses binary search (symbols_ is sorted by stream_id_ after merge). + if (collect_keys) { + for (const auto& key : load_result.symbol_list_keys_) { + auto stream_id = StreamId{std::get(to_atom(key).start_index())}; + auto it = std::lower_bound( + load_result.symbols_.begin(), + load_result.symbols_.end(), + stream_id, + [](const SymbolListEntry& entry, const StreamId& id) { return entry.stream_id_ < id; } + ); + util::check( + it != load_result.symbols_.end() && it->stream_id_ == stream_id, + "Would delete unseen key {}", + key + ); + } + } } return load_result; @@ -822,9 +857,11 @@ void SymbolList::compact_internal(const std::shared_ptr& store, LoadResul return key_sort_comparator(to_atom(l), to_atom(r)); }); keys.erase( - std::remove_if(keys.begin(), keys.end(), [&written_key](const VariantKey& vk) { - return to_atom(vk) == written_key; - }), + std::remove_if( + keys.begin(), + keys.end(), + [&written_key](const VariantKey& vk) { return to_atom(vk) == written_key; } + ), keys.end() ); if (!keys.empty()) diff --git a/cpp/arcticdb/version/test/test_symbol_list.cpp b/cpp/arcticdb/version/test/test_symbol_list.cpp index 464b80fcd2b..3d2be66d510 100644 --- a/cpp/arcticdb/version/test/test_symbol_list.cpp +++ b/cpp/arcticdb/version/test/test_symbol_list.cpp @@ -1035,6 +1035,55 @@ TEST_F(SymbolListSuite, DirectPathEquivalence) { EXPECT_EQ(direct_result.size(), 60u); } +TEST_F(SymbolListSuite, DirectPathEquivalenceWithTrueDeletes) { + // Setup: add symbols and force compaction + for (int i = 0; i < 50; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 0); + auto key = atom_key_builder().build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map_}; + sl.load>(version_map_, store_, false); + } + + // Add new symbols + for (int i = 50; i < 60; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 1); + auto key = atom_key_builder().version_id(1).build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + + // Delete symbols 0-9 via both the symbol list journal AND the version map + for (int i = 0; i < 10; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::remove_symbol(store_, StreamId{symbol}, 1); + version_map_->tombstone_from_key_or_all(store_, StreamId{symbol}); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + SymbolList sl1{version_map_}; + auto compaction_result = sl1.load>(version_map_, store_, false); + + SymbolList sl2{version_map_}; + auto direct_result = sl2.load>(version_map_, store_, true); + + EXPECT_EQ(compaction_result, direct_result); + // 50 original - 10 deleted + 10 new = 50 + EXPECT_EQ(direct_result.size(), 50u); + // Verify deleted symbols are absent + for (int i = 0; i < 10; ++i) + EXPECT_EQ(direct_result.count(StreamId{fmt::format("sym_{}", i)}), 0u); + // Verify new symbols are present + for (int i = 50; i < 60; ++i) + EXPECT_EQ(direct_result.count(StreamId{fmt::format("sym_{}", i)}), 1u); +} + struct SymbolListRace : SymbolListSuite, testing::WithParamInterface> {}; TEST_P(SymbolListRace, Run) { From dbf05d6936978e7d5106ae6e7d56940ceaaa36e7 Mon Sep 17 00:00:00 2001 From: Georgi Petrov Date: Thu, 14 May 2026 11:02:15 +0100 Subject: [PATCH 7/8] Fix check --- cpp/arcticdb/version/symbol_list.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/cpp/arcticdb/version/symbol_list.cpp b/cpp/arcticdb/version/symbol_list.cpp index f88f82bcbec..337251c518d 100644 --- a/cpp/arcticdb/version/symbol_list.cpp +++ b/cpp/arcticdb/version/symbol_list.cpp @@ -538,18 +538,17 @@ LoadResult attempt_load( merge_existing_with_journal_map(version_map, store, journal.update_map, std::move(previous_entries)); // Verify every journal key we'd delete during compaction corresponds to a symbol in the - // merged output. Uses binary search (symbols_ is sorted by stream_id_ after merge). + // merged output. Guards against silent data loss from merge bugs. + // O(N*M) but only runs once per library (version-keys path, before any compaction exists). if (collect_keys) { for (const auto& key : load_result.symbol_list_keys_) { auto stream_id = StreamId{std::get(to_atom(key).start_index())}; - auto it = std::lower_bound( - load_result.symbols_.begin(), - load_result.symbols_.end(), - stream_id, - [](const SymbolListEntry& entry, const StreamId& id) { return entry.stream_id_ < id; } - ); util::check( - it != load_result.symbols_.end() && it->stream_id_ == stream_id, + std::any_of( + load_result.symbols_.begin(), + load_result.symbols_.end(), + [&stream_id](const SymbolListEntry& e) { return e.stream_id_ == stream_id; } + ), "Would delete unseen key {}", key ); From a20ec8e56b5588956ac453a57fdb0f781e12f0ab Mon Sep 17 00:00:00 2001 From: Georgi Petrov Date: Thu, 14 May 2026 11:11:23 +0100 Subject: [PATCH 8/8] Fix performace of delete unseen check --- cpp/arcticdb/version/symbol_list.cpp | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/cpp/arcticdb/version/symbol_list.cpp b/cpp/arcticdb/version/symbol_list.cpp index 337251c518d..eeafb4d61aa 100644 --- a/cpp/arcticdb/version/symbol_list.cpp +++ b/cpp/arcticdb/version/symbol_list.cpp @@ -539,20 +539,17 @@ LoadResult attempt_load( // Verify every journal key we'd delete during compaction corresponds to a symbol in the // merged output. Guards against silent data loss from merge bugs. - // O(N*M) but only runs once per library (version-keys path, before any compaction exists). if (collect_keys) { - for (const auto& key : load_result.symbol_list_keys_) { - auto stream_id = StreamId{std::get(to_atom(key).start_index())}; + std::unordered_set symbols_in_merge; + for (const auto& entry : load_result.symbols_) + symbols_in_merge.emplace(entry.stream_id_); + + for (const auto& key : load_result.symbol_list_keys_) util::check( - std::any_of( - load_result.symbols_.begin(), - load_result.symbols_.end(), - [&stream_id](const SymbolListEntry& e) { return e.stream_id_ == stream_id; } - ), + symbols_in_merge.count(StreamId{std::get(to_atom(key).start_index())}) > 0, "Would delete unseen key {}", key ); - } } }