diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 9bf3983b9c8..d400beea2d8 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -1234,7 +1234,8 @@ if(${TEST}) processing/test/benchmark_common.cpp processing/test/benchmark_ternary.cpp util/test/benchmark_bitset.cpp - version/test/benchmark_write.cpp) + version/test/benchmark_write.cpp + version/test/benchmark_symbol_list.cpp) add_executable(benchmarks ${benchmark_srcs}) diff --git a/cpp/arcticdb/version/symbol_list.cpp b/cpp/arcticdb/version/symbol_list.cpp index 438cb47022a..eeafb4d61aa 100644 --- a/cpp/arcticdb/version/symbol_list.cpp +++ b/cpp/arcticdb/version/symbol_list.cpp @@ -91,59 +91,6 @@ std::vector load_previous_from_version_keys( return symbols; } -std::vector get_all_symbol_list_keys( - const std::shared_ptr& store, SymbolListData& data, WillAttemptCompaction will_attempt_compaction -) { - std::vector output; - uint64_t uncompacted_keys_found = 0; - store->iterate_type( - KeyType::SYMBOL_LIST, - [&data, &output, &uncompacted_keys_found, will_attempt_compaction](auto&& key) { - auto atom_key = to_atom(std::forward(key)); - if (atom_key.id() != compaction_id) { - uncompacted_keys_found++; - } - if (uncompacted_keys_found == warning_threshold() && !data.warned_expected_slowdown_) { - log::symbol().warn( - "`list_symbols` may take longer than expected as there have been many modifications " - "since `list_symbols` was last called. \n\n" - "See here for more information: " - "https://docs.arcticdb.io/latest/technical/on_disk_storage/#symbol-list-caching\n\n" - "To resolve, run `list_symbols` through to completion frequently. " - "Note: write access to storage is required for compaction. " - "{}.\n" - "Note: This warning will only appear once.\n", - will_attempt_compaction - ); - - data.warned_expected_slowdown_ = true; - } - - output.push_back(atom_key); - } - ); - - std::sort(output.begin(), output.end(), [](const AtomKey& left, const AtomKey& right) { - // Some very old symbol list keys have a non-zero version number, but with different semantics to the new style, - // so ignore it. See arcticdb-man#116. Most old style symbol list keys have version ID 0 anyway. - auto left_version = is_new_style_key(left) ? left.version_id() : 0; - auto right_version = is_new_style_key(right) ? right.version_id() : 0; - return std::tie(left.start_index(), left_version, left.creation_ts()) < - std::tie(right.start_index(), right_version, right.creation_ts()); - }); - return output; -} - -MaybeCompaction last_compaction(const std::vector& keys) { - auto pos = std::find_if(keys.rbegin(), keys.rend(), [](const auto& key) { return key.id() == compaction_id; }); - - if (pos == keys.rend()) { - return std::nullopt; - } else { - return {(pos + 1).base()}; // reverse_iterator -> forward itr has an offset of 1 per docs - } -} - // The below string_at and scalar_at functions should be used for symbol list cache segments instead of the ones // provided in SegmentInMemory, because the symbol list structure is the only place where columns can have more entries // than the segment has rows. Hence, we need to bypass the checks inside SegmentInMemory's function and directly call @@ -186,30 +133,24 @@ DataType get_symbol_data_type(const SegmentInMemory& seg) { return data_type; } -std::vector read_old_style_list_from_storage(const SegmentInMemory& seg) { - std::vector output; - if (seg.empty()) - return output; +/// Iterates a compacted segment's entries (additions and deletions), calling the visitor for each. +/// Visitor signature: void(StreamId&&, VersionId, timestamp, ActionType) +template +void for_each_segment_entry(const SegmentInMemory& seg, Visitor&& visitor) { + if (seg.row_count() == 0 || seg.descriptor().field_count() == 0) + return; const auto data_type = get_symbol_data_type(seg); - for (auto row : seg) - output.emplace_back(stream_id_from_segment(data_type, seg, row.row_id_, 0), 0, 0, ActionType::ADD); - - return output; -} - -std::vector read_new_style_list_from_storage(const SegmentInMemory& seg) { - std::vector output; - if (seg.empty()) - return output; - - const auto data_type = get_symbol_data_type(seg); + if (seg.descriptor().field_count() == 1) { + // Old-style: single column, all ADD + for (auto row : seg) + visitor(stream_id_from_segment(data_type, seg, row.row_id_, 0), VersionId{0}, timestamp{0}, ActionType::ADD + ); + return; + } - // Because we need to be backwards compatible with the old style symbol list, the additions and deletions are - // in separate columns. The first three columns are the symbol, version and timestamp for the additions, and the - // next three are the same for the deletions. Old-style symbol lists will ignore everything but the first column - // which will mean that they can't do any conflict resolution but will get the correct data. + // New-style: columns 0-2 are additions, columns 3-5 are deletions util::check( seg.column(0).row_count() == seg.column(1).row_count() && seg.column(0).row_count() == seg.column(2).row_count(), @@ -218,15 +159,11 @@ std::vector read_new_style_list_from_storage(const SegmentInMem seg.column(1).row_count(), seg.column(2).row_count() ); - for (auto i = 0L; i < seg.column(0).row_count(); ++i) { - auto stream_id = stream_id_from_segment(data_type, seg, i, 0); - auto reference_id = VersionId{scalar_at(seg, i, 1)}; - auto reference_time = timestamp{scalar_at(seg, i, 2)}; - ARCTICDB_RUNTIME_DEBUG( - log::symbol(), "Reading added symbol {}: {}@{}", stream_id, reference_id, reference_time - ); - output.emplace_back(stream_id, reference_id, reference_time, ActionType::ADD); + visitor(stream_id_from_segment(data_type, seg, i, 0), + VersionId{scalar_at(seg, i, 1)}, + timestamp{scalar_at(seg, i, 2)}, + ActionType::ADD); } if (seg.descriptor().field_count() == 6) { @@ -238,19 +175,13 @@ std::vector read_new_style_list_from_storage(const SegmentInMem seg.column(4).row_count(), seg.column(5).row_count() ); - for (auto i = 0L; i < seg.column(3).row_count(); ++i) { - auto stream_id = stream_id_from_segment(data_type, seg, i, 3); - auto reference_id = VersionId{scalar_at(seg, i, 4)}; - auto reference_time = timestamp{scalar_at(seg, i, 5)}; - ARCTICDB_RUNTIME_DEBUG( - log::symbol(), "Reading deleted symbol {}: {}@{}", stream_id, reference_id, reference_time - ); - output.emplace_back(stream_id, reference_id, reference_time, ActionType::DELETE); + visitor(stream_id_from_segment(data_type, seg, i, 3), + VersionId{scalar_at(seg, i, 4)}, + timestamp{scalar_at(seg, i, 5)}, + ActionType::DELETE); } } - - return output; } std::vector read_from_storage(const std::shared_ptr& store, const AtomKey& key) { @@ -263,26 +194,98 @@ std::vector read_from_storage(const std::shared_ptr 0, "Expected at least one column in symbol list with key {}", key ); - if (seg.descriptor().field_count() == 1) - return read_old_style_list_from_storage(seg); - else - return read_new_style_list_from_storage(seg); + std::vector output; + auto total = seg.column(0).row_count(); + if (seg.descriptor().field_count() >= 6) + total += seg.column(3).row_count(); + output.reserve(total); + + for_each_segment_entry( + seg, + [&](StreamId&& stream_id, VersionId reference_id, timestamp reference_time, ActionType action) { + ARCTICDB_RUNTIME_DEBUG( + log::symbol(), "Reading {} symbol {}: {}@{}", action, stream_id, reference_id, reference_time + ); + output.emplace_back(std::move(stream_id), reference_id, reference_time, action); + } + ); + + return output; } -MapType load_journal_keys(const std::vector& keys) { - MapType map; - for (const auto& key : keys) { - const auto& action_id = key.id(); - if (action_id == compaction_id) - continue; +struct StreamingJournalResult { + std::optional compaction_key; + MapType update_map; + size_t total_key_count = 0; + std::vector all_keys; +}; - const auto& symbol = key.start_index(); - const auto version_id = is_new_style_key(key) ? key.version_id() : unknown_version_id; - const auto timestamp = key.creation_ts(); - ActionType action = std::get(action_id) == DeleteSymbol ? ActionType::DELETE : ActionType::ADD; - map[symbol].emplace_back(version_id, timestamp, action); +void add_update_map_entry(MapType& update_map, const AtomKey& atom_key) { + const auto& symbol = atom_key.start_index(); + const auto version_id = is_new_style_key(atom_key) ? atom_key.version_id() : unknown_version_id; + const auto timestamp = atom_key.creation_ts(); + const auto& action_id_val = atom_key.id(); + ActionType action = std::get(action_id_val) == DeleteSymbol ? ActionType::DELETE : ActionType::ADD; + update_map[symbol].emplace_back(version_id, timestamp, action); +} + +void sort_update_map_entries(MapType& update_map) { + for (auto& [symbol, entries] : update_map) { + std::sort(entries.begin(), entries.end(), [](const SymbolEntryData& a, const SymbolEntryData& b) { + auto a_ver = a.reference_id_ == unknown_version_id ? VersionId{0} : a.reference_id_; + auto b_ver = b.reference_id_ == unknown_version_id ? VersionId{0} : b.reference_id_; + return std::tie(a_ver, a.timestamp_) < std::tie(b_ver, b.timestamp_); + }); } - return map; +} + +auto key_sort_comparator = [](const AtomKey& l, const AtomKey& r) { + auto l_ver = is_new_style_key(l) ? l.version_id() : VersionId{0}; + auto r_ver = is_new_style_key(r) ? r.version_id() : VersionId{0}; + return std::tie(l.start_index(), l_ver, l.creation_ts()) < std::tie(r.start_index(), r_ver, r.creation_ts()); +}; + +StreamingJournalResult load_journal_streaming( + const std::shared_ptr& store, SymbolListData& data, WillAttemptCompaction will_attempt_compaction, + bool collect_keys +) { + StreamingJournalResult result; + size_t uncompacted_keys_found = 0; + + store->iterate_type(KeyType::SYMBOL_LIST, [&](auto&& key) { + auto atom_key = to_atom(std::forward(key)); + result.total_key_count++; + + if (atom_key.id() == compaction_id) { + if (!result.compaction_key || atom_key.creation_ts() > result.compaction_key->creation_ts()) + result.compaction_key = atom_key; + } else { + uncompacted_keys_found++; + if (uncompacted_keys_found == warning_threshold() && !data.warned_expected_slowdown_) { + log::symbol().warn( + "`list_symbols` may take longer than expected as there have been many modifications " + "since `list_symbols` was last called. \n\n" + "See here for more information: " + "https://docs.arcticdb.io/latest/technical/on_disk_storage/#symbol-list-caching\n\n" + "To resolve, run `list_symbols` through to completion frequently. " + "Note: write access to storage is required for compaction. " + "{}.\n" + "Note: This warning will only appear once.\n", + will_attempt_compaction + ); + data.warned_expected_slowdown_ = true; + } + + add_update_map_entry(result.update_map, atom_key); + } + + if (collect_keys) + result.all_keys.emplace_back(std::move(atom_key)); + }); + + sort_update_map_entries(result.update_map); + + return result; } auto tail_range(const std::vector& updated) { @@ -393,23 +396,27 @@ ProblematicResult is_problematic( return ProblematicResult{latest}; } -CollectionType merge_existing_with_journal_keys( - const std::shared_ptr& version_map, const std::shared_ptr& store, - const std::vector& keys, std::vector&& existing -) { - auto existing_keys = std::move(existing); - auto update_map = load_journal_keys(keys); +using ProblematicSymbolMap = std::map>; +struct ExistingKeysMergeResult { CollectionType symbols; - std::map> problematic_symbols; - const auto min_allowed_interval = ConfigsMap::instance()->get_int("SymbolList.MinIntervalNs", 100'000'000LL); + ProblematicSymbolMap problematic; +}; + +/// Merges existing (compacted) entries with journal updates, consuming the existing entries. +/// Matched entries are resolved or flagged as problematic; unmatched existing ADDs are kept as-is. +/// Consumed entries are erased from update_map. +ExistingKeysMergeResult merge_existing_entries( + std::vector existing_keys, MapType& update_map, timestamp min_allowed_interval +) { + ExistingKeysMergeResult result; for (auto& previous_entry : existing_keys) { const auto& stream_id = previous_entry.stream_id_; auto updated = update_map.find(stream_id); if (updated == std::end(update_map)) { if (previous_entry.action_ == ActionType::ADD) - symbols.emplace_back(std::move(previous_entry)); + result.symbols.emplace_back(std::move(previous_entry)); else util::check( previous_entry.action_ == ActionType::DELETE, @@ -420,26 +427,43 @@ CollectionType merge_existing_with_journal_keys( util::check(!updated->second.empty(), "Unexpected empty entry for symbol {}", updated->first); if (auto problematic_entry = is_problematic(previous_entry, updated->second, min_allowed_interval); problematic_entry) { - problematic_symbols.try_emplace( - stream_id, std::make_pair(problematic_entry.reference_id(), problematic_entry.time()) + result.problematic.try_emplace( + std::move(previous_entry.stream_id_), + std::make_pair(problematic_entry.reference_id(), problematic_entry.time()) ); } else { const auto& last_entry = updated->second.rbegin(); - symbols.emplace_back( - updated->first, last_entry->reference_id_, last_entry->timestamp_, last_entry->action_ + result.symbols.emplace_back( + std::move(previous_entry.stream_id_), + last_entry->reference_id_, + last_entry->timestamp_, + last_entry->action_ ); } update_map.erase(updated); } } + return result; +} + +CollectionType merge_existing_with_journal_map( + const std::shared_ptr& version_map, const std::shared_ptr& store, MapType& update_map, + std::vector&& existing +) { + const auto min_allowed_interval = ConfigsMap::instance()->get_int("SymbolList.MinIntervalNs", 100'000'000LL); + + auto merge_result = merge_existing_entries(std::move(existing), update_map, min_allowed_interval); + auto& symbols = merge_result.symbols; + auto& problematic_symbols = merge_result.problematic; + for (const auto& [symbol, entries] : update_map) { ARCTICDB_DEBUG(log::symbol(), "{} {}", symbol, entries); if (auto problematic_entry = is_problematic(entries, min_allowed_interval); problematic_entry) { problematic_symbols.try_emplace(symbol, problematic_entry.reference_id(), problematic_entry.time()); } else { - const auto& last_entry = entries.rbegin(); - symbols.emplace_back(symbol, last_entry->reference_id_, last_entry->timestamp_, last_entry->action_); + const auto& last_entry = *entries.rbegin(); + symbols.emplace_back(symbol, last_entry.reference_id_, last_entry.timestamp_, last_entry.action_); } } @@ -489,55 +513,46 @@ CollectionType merge_existing_with_journal_keys( return symbols; } -CollectionType load_from_symbol_list_keys( - const std::shared_ptr& version_map, const std::shared_ptr& store, - const std::vector& keys, const Compaction& compaction -) { - ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from symbol list keys"); - - auto previous_compaction = read_from_storage(store, *compaction); - return merge_existing_with_journal_keys(version_map, store, keys, std::move(previous_compaction)); -} - -CollectionType load_from_version_keys( - const std::shared_ptr& version_map, const std::shared_ptr& store, - const std::vector& keys, SymbolListData& data, WillAttemptCompaction will_attempt_compaction -) { - ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from version keys"); - auto previous_entries = load_previous_from_version_keys(store, data, will_attempt_compaction); - return merge_existing_with_journal_keys(version_map, store, keys, std::move(previous_entries)); -} - LoadResult attempt_load( const std::shared_ptr& version_map, const std::shared_ptr& store, SymbolListData& data, WillAttemptCompaction will_attempt_compaction ) { ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Symbol list load attempt"); - LoadResult load_result; - load_result.symbol_list_keys_ = get_all_symbol_list_keys(store, data, will_attempt_compaction); - load_result.maybe_previous_compaction = last_compaction(load_result.symbol_list_keys_); + const bool collect_keys = will_attempt_compaction == WillAttemptCompaction::YES; + auto journal = load_journal_streaming(store, data, will_attempt_compaction, collect_keys); - if (load_result.maybe_previous_compaction) - load_result.symbols_ = load_from_symbol_list_keys( - version_map, store, load_result.symbol_list_keys_, *load_result.maybe_previous_compaction - ); - else { - load_result.symbols_ = load_from_version_keys( - version_map, store, load_result.symbol_list_keys_, data, will_attempt_compaction - ); - std::unordered_set keys_in_versions; - for (const auto& entry : load_result.symbols_) - keys_in_versions.emplace(entry.stream_id_); - - for (const auto& key : load_result.symbol_list_keys_) - util::check( - keys_in_versions.find(StreamId{std::get(key.start_index())}) != keys_in_versions.end(), - "Would delete unseen key {}", - key - ); + LoadResult load_result; + load_result.compaction_key_ = journal.compaction_key; + load_result.total_key_count_ = journal.total_key_count; + load_result.symbol_list_keys_ = std::move(journal.all_keys); + + if (journal.compaction_key) { + ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from symbol list keys"); + auto existing = read_from_storage(store, *journal.compaction_key); + load_result.symbols_ = + merge_existing_with_journal_map(version_map, store, journal.update_map, std::move(existing)); + } else { + ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from version keys"); + auto previous_entries = load_previous_from_version_keys(store, data, will_attempt_compaction); + load_result.symbols_ = + merge_existing_with_journal_map(version_map, store, journal.update_map, std::move(previous_entries)); + + // Verify every journal key we'd delete during compaction corresponds to a symbol in the + // merged output. Guards against silent data loss from merge bugs. + if (collect_keys) { + std::unordered_set symbols_in_merge; + for (const auto& entry : load_result.symbols_) + symbols_in_merge.emplace(entry.stream_id_); + + for (const auto& key : load_result.symbol_list_keys_) + util::check( + symbols_in_merge.count(StreamId{std::get(to_atom(key).start_index())}) > 0, + "Would delete unseen key {}", + key + ); + } } - load_result.timestamp_ = store->current_timestamp(); return load_result; } @@ -624,12 +639,12 @@ StreamDescriptor delete_symbol_stream_descriptor(const StreamId& stream_id, cons } bool SymbolList::needs_compaction(const LoadResult& load_result) const { - if (!load_result.maybe_previous_compaction) { + if (!load_result.compaction_key_) { log::version().debug("Symbol list: needs_compaction=[true] as no previous compaction"); return true; } - auto n_keys = static_cast(load_result.symbol_list_keys_.size()); + auto n_keys = static_cast(load_result.total_key_count_); if (auto fixed = ConfigsMap::instance()->get_int("SymbolList.MaxDelta")) { auto result = n_keys > *fixed; log::version().debug( @@ -744,7 +759,7 @@ SegmentInMemory create_empty_segment(const StreamId& stream_id) { } VariantKey write_symbols( - const std::shared_ptr& store, const CollectionType& symbols, const StreamId& stream_id, + const std::shared_ptr& store, CollectionType symbols, const StreamId& stream_id, const StreamId& type_holder ) { ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Writing {} symbols to symbol list cache", symbols.size()); @@ -764,36 +779,25 @@ VariantKey write_symbols( return store->write_sync(KeyType::SYMBOL_LIST, 0, stream_id, NumericIndex{0}, NumericIndex{0}, std::move(segment)); } -std::vector delete_keys( - const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude -) { - auto to_remove = std::move(remove); - std::vector variant_keys; - variant_keys.reserve(to_remove.size()); - for (auto& atom_key : to_remove) { - // Corner case: if the newly written Compaction key (exclude) has the same timestamp as an existing one - // (e.g. when a previous compaction round failed in the deletion step), we don't want to delete the former +void delete_keys(const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude) { + std::vector to_remove; + to_remove.reserve(remove.size()); + for (auto& atom_key : remove) { if (atom_key != exclude) - variant_keys.emplace_back(atom_key); + to_remove.emplace_back(std::move(atom_key)); } - - return store->remove_keys_sync(variant_keys); + if (!to_remove.empty()) + store->remove_keys_sync(to_remove); } -bool has_recent_compaction( - const std::shared_ptr& store, - const std::optional::const_iterator>& maybe_previous_compaction -) { +bool has_recent_compaction(const std::shared_ptr& store, const std::optional& compaction_key) { bool found_last = false; bool has_newer = false; - if (maybe_previous_compaction.has_value()) { - // Symbol list keys source + if (compaction_key) { store->iterate_type( KeyType::SYMBOL_LIST, - [&found_last, - &has_newer, - &last_compaction_key = *maybe_previous_compaction.value()](const VariantKey& key) { + [&found_last, &has_newer, &last_compaction_key = *compaction_key](const VariantKey& key) { const auto& atom = to_atom(key); if (atom == last_compaction_key) found_last = true; @@ -803,7 +807,6 @@ bool has_recent_compaction( std::get(compaction_id) ); } else { - // Version keys source store->iterate_type( KeyType::SYMBOL_LIST, [&has_newer](const VariantKey&) { has_newer = true; }, @@ -811,7 +814,7 @@ bool has_recent_compaction( ); } - return (maybe_previous_compaction && !found_last) || has_newer; + return (compaction_key && !found_last) || has_newer; } size_t SymbolList::compact(const std::shared_ptr& store) { @@ -819,7 +822,7 @@ size_t SymbolList::compact(const std::shared_ptr& store) { LoadResult load_result = ExponentialBackoff(100, 2000).go([this, &version_map, &store]() { return attempt_load(version_map, store, data_, WillAttemptCompaction::YES); }); - auto num_symbol_list_keys = load_result.symbol_list_keys_.size(); + auto num_symbol_list_keys = load_result.total_key_count_; ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Forcing compaction. Obtaining lock..."); StorageLock lock{StringId{CompactionLockName}}; @@ -832,18 +835,33 @@ size_t SymbolList::compact(const std::shared_ptr& store) { } void SymbolList::compact_internal(const std::shared_ptr& store, LoadResult& load_result) const { - if (has_recent_compaction(store, load_result.maybe_previous_compaction)) { - // legacy arcticc symbol list entries don't get correctly listed when doing `iterate_type`, so can mess - // up racing symbol list compaction detection. + if (has_recent_compaction(store, load_result.compaction_key_)) { ARCTICDB_RUNTIME_DEBUG( log::symbol(), "Symbol list compaction will be skipped: either a concurrent compaction was detected " "or there are legacy arcticc symbol list entries that cannot be verified." ); - } else { - auto written = write_symbols(store, load_result.symbols_, compaction_id, data_.type_holder_); - delete_keys(store, load_result.detach_symbol_list_keys(), std::get(written)); + return; } + + auto written = write_symbols(store, std::move(load_result.symbols_), compaction_id, data_.type_holder_); + auto written_key = std::get(written); + + // Sort pre-collected keys for storage backend performance, then delete excluding the newly written key + auto& keys = load_result.symbol_list_keys_; + std::sort(keys.begin(), keys.end(), [](const VariantKey& l, const VariantKey& r) { + return key_sort_comparator(to_atom(l), to_atom(r)); + }); + keys.erase( + std::remove_if( + keys.begin(), + keys.end(), + [&written_key](const VariantKey& vk) { return to_atom(vk) == written_key; } + ), + keys.end() + ); + if (!keys.empty()) + store->remove_keys_sync(std::move(keys)); } } // namespace arcticdb diff --git a/cpp/arcticdb/version/symbol_list.hpp b/cpp/arcticdb/version/symbol_list.hpp index f0096b1c92a..8312eabaed3 100644 --- a/cpp/arcticdb/version/symbol_list.hpp +++ b/cpp/arcticdb/version/symbol_list.hpp @@ -21,8 +21,6 @@ struct SymbolListEntry; struct SymbolEntryData; using MapType = std::unordered_map>; -using Compaction = std::vector::const_iterator; -using MaybeCompaction = std::optional; using CollectionType = std::vector; enum class WillAttemptCompaction : uint8_t { @@ -32,12 +30,10 @@ enum class WillAttemptCompaction : uint8_t { }; struct LoadResult { - std::vector symbol_list_keys_; - MaybeCompaction maybe_previous_compaction; + std::optional compaction_key_; CollectionType symbols_; - timestamp timestamp_ = 0L; - - std::vector&& detach_symbol_list_keys() { return std::move(symbol_list_keys_); } + size_t total_key_count_ = 0; + std::vector symbol_list_keys_; }; struct SymbolListData { @@ -154,12 +150,20 @@ class SymbolList { return WillAttemptCompaction::NO_INSUFFICIENT_PERMISSIONS; return WillAttemptCompaction::YES; }(); + LoadResult load_result = ExponentialBackoff(100, 2000).go( [this, &version_map, &store, will_attempt_compaction]() { return attempt_load(version_map, store, data_, will_attempt_compaction); } ); + // Build output before compaction — compact_internal frees symbols_ after writing them to storage + R output; + for (const auto& entry : load_result.symbols_) { + if (entry.action_ == ActionType::ADD) + output.insert(entry.stream_id_); + } + if (will_attempt_compaction == WillAttemptCompaction::YES && needs_compaction(load_result)) { ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Compaction necessary. Obtaining lock..."); try { @@ -180,12 +184,6 @@ class SymbolList { } } - R output; - for (const auto& entry : load_result.symbols_) { - if (entry.action_ == ActionType::ADD) - output.insert(entry.stream_id_); - } - return output; } @@ -215,9 +213,7 @@ class SymbolList { [[nodiscard]] bool needs_compaction(const LoadResult& load_result) const; }; -std::vector delete_keys( - const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude -); +void delete_keys(const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude); struct WriteSymbolTask : async::BaseTask { const std::shared_ptr store_; diff --git a/cpp/arcticdb/version/test/benchmark_symbol_list.cpp b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp new file mode 100644 index 00000000000..16946c7a7f4 --- /dev/null +++ b/cpp/arcticdb/version/test/benchmark_symbol_list.cpp @@ -0,0 +1,334 @@ +/* Copyright 2026 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software + * will be governed by the Apache License, version 2.0. + */ + +#include +#include +#include + +#ifdef __linux__ +#include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace arcticdb; + +std::shared_ptr create_s3_mock_store(const std::string& lib_name = "benchmark_sl") { + storage::s3::S3ApiInstance::instance(); + arcticdb::proto::storage::VariantStorage vs; + arcticdb::proto::s3_storage::Config cfg; + cfg.set_bucket_name("test-bucket"); + cfg.set_use_mock_storage_for_testing(true); + util::pack_to_any(cfg, *vs.mutable_config()); + + storage::LibraryPath path{lib_name.c_str(), "store"}; + auto storages = storage::create_storages(path, storage::OpenMode::DELETE, {vs}); + auto library = std::make_shared(path, std::move(storages)); + return std::make_shared>( + async::AsyncStore(library, codec::default_lz4_codec(), EncodingVersion::V1) + ); +} + +// Tracks peak heap usage by polling mallinfo2 from a background thread. +// Only functional on Linux with glibc; on other platforms it reports zero. +struct PeakHeapTracker { + std::atomic running_{false}; + std::atomic peak_uordblks_{0}; + size_t baseline_{0}; + std::thread sampler_; + + static size_t current_heap_bytes() { +#ifdef __linux__ +#if __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 33) + return mallinfo2().uordblks; +#else + return static_cast(mallinfo().uordblks); +#endif +#else + return 0; +#endif + } + + void start() { +#ifdef __linux__ + malloc_trim(0); +#endif + baseline_ = current_heap_bytes(); + peak_uordblks_ = baseline_; + running_ = true; + sampler_ = std::thread([this] { + while (running_.load(std::memory_order_relaxed)) { + auto current = current_heap_bytes(); + auto prev = peak_uordblks_.load(std::memory_order_relaxed); + while (current > prev && !peak_uordblks_.compare_exchange_weak(prev, current, std::memory_order_relaxed) + ) + ; + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + }); + } + + size_t stop() { + running_ = false; + if (sampler_.joinable()) + sampler_.join(); + auto current = current_heap_bytes(); + auto prev = peak_uordblks_.load(std::memory_order_relaxed); + if (current > prev) + peak_uordblks_.store(current, std::memory_order_relaxed); + return peak_uordblks_.load() - baseline_; + } +}; + +// Benchmarks peak memory usage of symbol list loading via the compaction path. +// +// Run with: --benchmark_time_unit=ms --benchmark_filter=BM_symbol_list_load + +static void BM_symbol_list_load(benchmark::State& state) { + auto store = std::make_shared(); + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + + // Write journal entries for num_symbols symbols + for (int64_t i = 0; i < num_symbols; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 0); + } + + // Force compaction on first load to create the compacted segment + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map}; + sl.load>(version_map, store, false); + } + + // Add a few journal entries to exercise the merge path + for (int64_t i = 0; i < 100; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 1); + } + + // Disable compaction during the benchmark iterations + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, true); + + auto peak_delta = tracker.stop(); + auto retained = PeakHeapTracker::current_heap_bytes() - tracker.baseline_; + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["RetainedMB"] = benchmark::Counter(static_cast(retained) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + + benchmark::DoNotOptimize(result); + } +} + +BENCHMARK(BM_symbol_list_load)->Arg(100'000)->Arg(300'000)->Unit(benchmark::kMillisecond)->Iterations(3); +// Disabled in CI: 1M-symbol setup requires writing+compacting 1M journal entries (~30 s on CI hardware). +// Run locally: make bench-cpp FILTER=BM_symbol_list_load/1000000 +// BENCHMARK(BM_symbol_list_load)->Arg(1'000'000)->Unit(benchmark::kMillisecond)->Iterations(3); + +// Benchmarks peak memory with many uncompacted journal entries per symbol. +// This is the scenario where the AtomKey vector elimination matters most. +// +// Setup: N_SYMBOLS symbols, each with ENTRIES_PER_SYMBOL journal entries. + +static void BM_symbol_list_load_many_entries(benchmark::State& state) { + auto store = std::make_shared(); + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + auto entries_per_symbol = state.range(1); + + // Write journal entries: each symbol gets multiple versions + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, v); + } + } + + // Force compaction to create the compacted segment + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map}; + sl.load>(version_map, store, false); + } + + // Write more journal entries (uncompacted) + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, entries_per_symbol + v); + } + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, true); + + auto peak_delta = tracker.stop(); + auto retained = PeakHeapTracker::current_heap_bytes() - tracker.baseline_; + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["RetainedMB"] = benchmark::Counter(static_cast(retained) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + state.counters["TotalEntries"] = benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); + + benchmark::DoNotOptimize(result); + } +} + +BENCHMARK(BM_symbol_list_load_many_entries)->Args({1'000, 1'000})->Unit(benchmark::kMillisecond)->Iterations(1); +// Disabled in CI: 300K-symbol setup writes 600K journal entries (~25 s on CI hardware). +// Run locally: make bench-cpp FILTER=BM_symbol_list_load_many_entries/300000 +// BENCHMARK(BM_symbol_list_load_many_entries)->Args({300'000, 1})->Unit(benchmark::kMillisecond)->Iterations(1); + +// Benchmarks peak memory during a load that triggers compaction. +// This measures the compaction path where keys are collected for deletion. +// +// Setup: N_SYMBOLS symbols with ENTRIES_PER_SYMBOL uncompacted journal entries each. +// MaxDelta=0 forces compaction on every load. + +static void BM_symbol_list_compaction(benchmark::State& state) { + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + auto entries_per_symbol = state.range(1); + + // Setup outside the benchmark loop — write journal entries once + auto store = std::make_shared(); + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, v); + } + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, false); + + auto peak_delta = tracker.stop(); + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + state.counters["TotalEntries"] = benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); + + benchmark::DoNotOptimize(result); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); +} + +BENCHMARK(BM_symbol_list_compaction)->Args({1'000, 100})->Unit(benchmark::kMillisecond)->Iterations(1); +// Disabled in CI: 1M-entry cases each take ~20 s; 300K×1 setup writes 600K entries. +// Run locally: make bench-cpp FILTER=BM_symbol_list_compaction +// BENCHMARK(BM_symbol_list_compaction) +// ->Args({1'000, 1'000}) +// ->Args({10'000, 100}) +// ->Args({300'000, 1}) +// ->Unit(benchmark::kMillisecond) +// ->Iterations(1); + +// ---- S3 mock variants: exercise the full S3 storage layer (serialization, key parsing) ---- + +static void BM_symbol_list_load_s3(benchmark::State& state) { + auto store = create_s3_mock_store(); + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + + for (int64_t i = 0; i < num_symbols; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 0); + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map}; + sl.load>(version_map, store, false); + } + + for (int64_t i = 0; i < 100; ++i) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, 1); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, true); + + auto peak_delta = tracker.stop(); + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + + benchmark::DoNotOptimize(result); + } +} + +BENCHMARK(BM_symbol_list_load_s3)->Arg(10'000)->Unit(benchmark::kMillisecond)->Iterations(1); + +[[maybe_unused]] static void BM_symbol_list_compaction_s3(benchmark::State& state) { + auto version_map = std::make_shared(); + auto num_symbols = state.range(0); + auto entries_per_symbol = state.range(1); + + auto store = create_s3_mock_store(); + for (int64_t i = 0; i < num_symbols; ++i) { + for (int64_t v = 0; v < entries_per_symbol; ++v) { + SymbolList::add_symbol(store, StreamId{fmt::format("symbol_{:06d}", i)}, v); + } + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + + for (auto _ : state) { + PeakHeapTracker tracker; + tracker.start(); + + SymbolList sl{version_map}; + auto result = sl.load>(version_map, store, false); + + auto peak_delta = tracker.stop(); + + state.counters["PeakMB"] = benchmark::Counter(static_cast(peak_delta) / (1024.0 * 1024.0)); + state.counters["NumSymbols"] = benchmark::Counter(static_cast(result.size())); + state.counters["TotalEntries"] = benchmark::Counter(static_cast(num_symbols * entries_per_symbol)); + + benchmark::DoNotOptimize(result); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); +} + +// Disabled in CI: S3 mock compaction takes ~40 s even for 1K×100 entries. +// Run locally: make bench-cpp FILTER=BM_symbol_list_compaction_s3 +// BENCHMARK(BM_symbol_list_compaction_s3)->Args({1'000, 100})->Unit(benchmark::kMillisecond)->Iterations(1); +// BENCHMARK(BM_symbol_list_compaction_s3)->Args({10'000, 10})->Unit(benchmark::kMillisecond)->Iterations(1); diff --git a/cpp/arcticdb/version/test/test_symbol_list.cpp b/cpp/arcticdb/version/test/test_symbol_list.cpp index 6dac896a74b..3d2be66d510 100644 --- a/cpp/arcticdb/version/test/test_symbol_list.cpp +++ b/cpp/arcticdb/version/test/test_symbol_list.cpp @@ -992,6 +992,98 @@ TEST_F(SymbolListSuite, AddAndCompact) { collect(futures).get(); } +TEST_F(SymbolListSuite, DirectPathEquivalence) { + // Setup: add symbols and force compaction + for (int i = 0; i < 50; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 0); + auto key = atom_key_builder().build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map_}; + sl.load>(version_map_, store_, false); + } + + // Add journal entries: new symbols + deletes of existing ones + for (int i = 40; i < 60; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 1); + auto key = atom_key_builder().version_id(1).build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + for (int i = 0; i < 10; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::remove_symbol(store_, StreamId{symbol}, 1); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + // Load via compaction-eligible path (will not actually compact since threshold is default) + SymbolList sl1{version_map_}; + auto compaction_result = sl1.load>(version_map_, store_, false); + + // Load via direct path (no_compaction=true) + SymbolList sl2{version_map_}; + auto direct_result = sl2.load>(version_map_, store_, true); + + EXPECT_EQ(compaction_result, direct_result); + // 50 original + 10 new = 60. The remove_symbol journal entries are resolved + // as ADD because the version map still shows those symbols as existing. + EXPECT_EQ(direct_result.size(), 60u); +} + +TEST_F(SymbolListSuite, DirectPathEquivalenceWithTrueDeletes) { + // Setup: add symbols and force compaction + for (int i = 0; i < 50; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 0); + auto key = atom_key_builder().build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + + ConfigsMap::instance()->set_int("SymbolList.MaxDelta", 0); + { + SymbolList sl{version_map_}; + sl.load>(version_map_, store_, false); + } + + // Add new symbols + for (int i = 50; i < 60; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::add_symbol(store_, StreamId{symbol}, 1); + auto key = atom_key_builder().version_id(1).build(symbol, KeyType::TABLE_INDEX); + version_map_->write_version(store_, key, std::nullopt); + } + + // Delete symbols 0-9 via both the symbol list journal AND the version map + for (int i = 0; i < 10; ++i) { + auto symbol = fmt::format("sym_{}", i); + SymbolList::remove_symbol(store_, StreamId{symbol}, 1); + version_map_->tombstone_from_key_or_all(store_, StreamId{symbol}); + } + + ConfigsMap::instance()->unset_int("SymbolList.MaxDelta"); + + SymbolList sl1{version_map_}; + auto compaction_result = sl1.load>(version_map_, store_, false); + + SymbolList sl2{version_map_}; + auto direct_result = sl2.load>(version_map_, store_, true); + + EXPECT_EQ(compaction_result, direct_result); + // 50 original - 10 deleted + 10 new = 50 + EXPECT_EQ(direct_result.size(), 50u); + // Verify deleted symbols are absent + for (int i = 0; i < 10; ++i) + EXPECT_EQ(direct_result.count(StreamId{fmt::format("sym_{}", i)}), 0u); + // Verify new symbols are present + for (int i = 50; i < 60; ++i) + EXPECT_EQ(direct_result.count(StreamId{fmt::format("sym_{}", i)}), 1u); +} + struct SymbolListRace : SymbolListSuite, testing::WithParamInterface> {}; TEST_P(SymbolListRace, Run) { diff --git a/python/.asv/results/benchmarks.json b/python/.asv/results/benchmarks.json index d90bd305ebc..2a69ff1e61d 100644 --- a/python/.asv/results/benchmarks.json +++ b/python/.asv/results/benchmarks.json @@ -2151,6 +2151,174 @@ "version": "c292f6f8757182d8ac67ddb990a81389675b5d264b32a5b2f720514bf703f6a9", "warmup_time": -1 }, + "list_symbols.ListSymbolsCompaction.peakmem_list_symbols": { + "code": "class ListSymbolsCompaction:\n def peakmem_list_symbols(self, *args):\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, num_versions, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n # Write multiple versions per symbol to create many journal entries\n # Total entries = num_symbols * num_versions\n for v in range(num_versions):\n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "name": "list_symbols.ListSymbolsCompaction.peakmem_list_symbols", + "param_names": [ + "num_symbols", + "num_versions", + "storage" + ], + "params": [ + [ + "1000" + ], + [ + "10" + ], + [ + "", + "" + ] + ], + "setup_cache_key": "list_symbols:154", + "timeout": 1200, + "type": "peakmemory", + "unit": "bytes", + "version": "94ba7991182f7c4fecbb447270b9eed2beeeaedd57165a2aba287d9b413530d6" + }, + "list_symbols.ListSymbolsCompaction.time_list_symbols": { + "code": "class ListSymbolsCompaction:\n def time_list_symbols(self, *args):\n # This triggers compaction since there are many uncompacted entries\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, num_versions, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n # Write multiple versions per symbol to create many journal entries\n # Total entries = num_symbols * num_versions\n for v in range(num_versions):\n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "min_run_count": 2, + "name": "list_symbols.ListSymbolsCompaction.time_list_symbols", + "number": 1, + "param_names": [ + "num_symbols", + "num_versions", + "storage" + ], + "params": [ + [ + "1000" + ], + [ + "10" + ], + [ + "", + "" + ] + ], + "repeat": 0, + "rounds": 1, + "sample_time": 0.01, + "setup_cache_key": "list_symbols:154", + "timeout": 1200, + "type": "time", + "unit": "seconds", + "version": "3df9d467f50b70b0623edeeff1dc919e3311985ae79b9de3a2e121fc19fdc0e9", + "warmup_time": 0 + }, + "list_symbols.ListSymbolsWithCompactedCache.peakmem_list_symbols": { + "code": "class ListSymbolsWithCompactedCache:\n def peakmem_list_symbols(self, *args):\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, num_journal_entries, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n # Write symbols in batches to avoid very large payloads\n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n \n # Trigger compaction by calling list_symbols\n self.lib.list_symbols()\n \n # Add journal entries on top of the compacted cache\n for i in range(num_journal_entries):\n self.lib.write(f\"sym_{i}\", simple_df)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "name": "list_symbols.ListSymbolsWithCompactedCache.peakmem_list_symbols", + "param_names": [ + "num_symbols", + "num_journal_entries", + "storage" + ], + "params": [ + [ + "1000" + ], + [ + "0", + "100" + ], + [ + "", + "" + ] + ], + "setup_cache_key": "list_symbols:97", + "timeout": 600, + "type": "peakmemory", + "unit": "bytes", + "version": "b4be3096a68b9744d0576580f1dd94c8426b2828826419b4a881a1e2275c4087" + }, + "list_symbols.ListSymbolsWithCompactedCache.time_list_symbols": { + "code": "class ListSymbolsWithCompactedCache:\n def time_list_symbols(self, *args):\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, num_journal_entries, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n # Write symbols in batches to avoid very large payloads\n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n \n # Trigger compaction by calling list_symbols\n self.lib.list_symbols()\n \n # Add journal entries on top of the compacted cache\n for i in range(num_journal_entries):\n self.lib.write(f\"sym_{i}\", simple_df)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "min_run_count": 2, + "name": "list_symbols.ListSymbolsWithCompactedCache.time_list_symbols", + "number": 3, + "param_names": [ + "num_symbols", + "num_journal_entries", + "storage" + ], + "params": [ + [ + "1000" + ], + [ + "0", + "100" + ], + [ + "", + "" + ] + ], + "repeat": 0, + "rounds": 1, + "sample_time": 0.01, + "setup_cache_key": "list_symbols:97", + "timeout": 600, + "type": "time", + "unit": "seconds", + "version": "f9910a09469fa71a142bc057314945fe8c50394fcf7def322ad0e3dec941ce2e", + "warmup_time": 0 + }, + "list_symbols.ListSymbolsWithDeletes.peakmem_list_symbols": { + "code": "class ListSymbolsWithDeletes:\n def peakmem_list_symbols(self, *args):\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n \n # Compact\n self.lib.list_symbols()\n \n # Delete 10% of symbols and add new ones\n num_to_delete = num_symbols // 10\n for i in range(num_to_delete):\n self.lib.delete(f\"sym_{i}\")\n for i in range(num_to_delete):\n self.lib.write(f\"sym_new_{i}\", simple_df)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "name": "list_symbols.ListSymbolsWithDeletes.peakmem_list_symbols", + "param_names": [ + "num_symbols", + "storage" + ], + "params": [ + [ + "1000" + ], + [ + "", + "" + ] + ], + "setup_cache_key": "list_symbols:205", + "timeout": 600, + "type": "peakmemory", + "unit": "bytes", + "version": "e3fc772bf4b75f473b65d8a843dd3599cb8ca503b251067b12f0e22e6283a51c" + }, + "list_symbols.ListSymbolsWithDeletes.time_list_symbols": { + "code": "class ListSymbolsWithDeletes:\n def time_list_symbols(self, *args):\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n \n simple_df = pd.DataFrame({\"a\": [1]})\n \n batch_size = 1000\n for start in range(0, num_symbols, batch_size):\n end = min(start + batch_size, num_symbols)\n payloads = [WritePayload(f\"sym_{i}\", simple_df) for i in range(start, end)]\n self.lib.write_batch(payloads)\n \n # Compact\n self.lib.list_symbols()\n \n # Delete 10% of symbols and add new ones\n num_to_delete = num_symbols // 10\n for i in range(num_to_delete):\n self.lib.delete(f\"sym_{i}\")\n for i in range(num_to_delete):\n self.lib.write(f\"sym_new_{i}\", simple_df)\n\n def setup_cache(self):\n return create_libraries_across_storages(self.storages)", + "min_run_count": 2, + "name": "list_symbols.ListSymbolsWithDeletes.time_list_symbols", + "number": 3, + "param_names": [ + "num_symbols", + "storage" + ], + "params": [ + [ + "1000" + ], + [ + "", + "" + ] + ], + "repeat": 0, + "rounds": 1, + "sample_time": 0.01, + "setup_cache_key": "list_symbols:205", + "timeout": 600, + "type": "time", + "unit": "seconds", + "version": "d0da3af49ac919286ca7bdabb468c07c7fa93074dba87e05db3ddd8c33c2a831", + "warmup_time": 0 + }, "list_symbols.ListSymbolsWithoutCache.peakmem_list_symbols": { "code": "class ListSymbolsWithoutCache:\n def peakmem_list_symbols(self, *args):\n self._check_test_counter()\n self.lib.list_symbols()\n\n def setup(self, lib_for_storage, num_symbols, storage):\n self.lib = lib_for_storage[storage]\n if self.lib is None:\n raise SkipNotImplemented\n self.test_counter = 1\n \n simple_df = pd.DataFrame({\"a\": [1]})\n write_payloads = [WritePayload(f\"{i}\", simple_df) for i in range(num_symbols)]\n self.lib.write_batch(write_payloads)\n\n def setup_cache(self):\n lib_for_storage = create_libraries_across_storages(self.storages)\n return lib_for_storage", "name": "list_symbols.ListSymbolsWithoutCache.peakmem_list_symbols", @@ -2160,7 +2328,6 @@ ], "params": [ [ - "100", "1000" ], [ @@ -2185,7 +2352,6 @@ ], "params": [ [ - "100", "1000" ], [ @@ -2214,7 +2380,6 @@ ], "params": [ [ - "100", "1000" ], [ diff --git a/python/benchmarks/list_symbols.py b/python/benchmarks/list_symbols.py index ee515d355f7..1dc2eadc40e 100644 --- a/python/benchmarks/list_symbols.py +++ b/python/benchmarks/list_symbols.py @@ -24,7 +24,7 @@ class ListSymbolsWithoutCache: warmup_time = 0 storages = [Storage.LMDB, Storage.AMAZON] - num_symbols = [100, 1000] + num_symbols = [1000] params = [num_symbols, storages] param_names = ["num_symbols", "storage"] @@ -70,3 +70,170 @@ def time_has_symbol(self, *args): def _check_test_counter(self): assert self.test_counter == 1 self.test_counter += 1 + + +class ListSymbolsWithCompactedCache: + """Measure list_symbols when the cache is already compacted with some journal entries on top. + + This is the common production scenario: the cache has been compacted and a moderate number + of writes/deletes have happened since the last compaction.""" + + rounds = 1 + number = 3 + timeout = 600 + warmup_time = 0 + + storages = [Storage.LMDB, Storage.AMAZON] + num_symbols = [1_000] + num_journal_entries = [0, 100] + + params = [num_symbols, num_journal_entries, storages] + param_names = ["num_symbols", "num_journal_entries", "storage"] + + def __init__(self): + self.logger = get_logger() + self.lib = None + + def setup_cache(self): + return create_libraries_across_storages(self.storages) + + def teardown(self, *args): + if self.lib is not None: + self.lib._nvs.version_store.clear() + + def setup(self, lib_for_storage, num_symbols, num_journal_entries, storage): + self.lib = lib_for_storage[storage] + if self.lib is None: + raise SkipNotImplemented + + simple_df = pd.DataFrame({"a": [1]}) + + # Write symbols in batches to avoid very large payloads + batch_size = 1000 + for start in range(0, num_symbols, batch_size): + end = min(start + batch_size, num_symbols) + payloads = [WritePayload(f"sym_{i}", simple_df) for i in range(start, end)] + self.lib.write_batch(payloads) + + # Trigger compaction by calling list_symbols + self.lib.list_symbols() + + # Add journal entries on top of the compacted cache + for i in range(num_journal_entries): + self.lib.write(f"sym_{i}", simple_df) + + def time_list_symbols(self, *args): + self.lib.list_symbols() + + def peakmem_list_symbols(self, *args): + self.lib.list_symbols() + + +class ListSymbolsCompaction: + """Measure list_symbols when compaction is triggered with many uncompacted journal entries. + + This is the scenario where streaming optimization matters most: many journal entries + need to be merged during compaction.""" + + rounds = 1 + number = 1 + timeout = 1200 + warmup_time = 0 + + storages = [Storage.LMDB, Storage.AMAZON] + num_symbols = [1_000] + num_versions = [10] + + params = [num_symbols, num_versions, storages] + param_names = ["num_symbols", "num_versions", "storage"] + + def __init__(self): + self.logger = get_logger() + self.lib = None + + def setup_cache(self): + return create_libraries_across_storages(self.storages) + + def teardown(self, *args): + if self.lib is not None: + self.lib._nvs.version_store.clear() + + def setup(self, lib_for_storage, num_symbols, num_versions, storage): + self.lib = lib_for_storage[storage] + if self.lib is None: + raise SkipNotImplemented + + simple_df = pd.DataFrame({"a": [1]}) + + # Write multiple versions per symbol to create many journal entries + # Total entries = num_symbols * num_versions + for v in range(num_versions): + batch_size = 1000 + for start in range(0, num_symbols, batch_size): + end = min(start + batch_size, num_symbols) + payloads = [WritePayload(f"sym_{i}", simple_df) for i in range(start, end)] + self.lib.write_batch(payloads) + + def time_list_symbols(self, *args): + # This triggers compaction since there are many uncompacted entries + self.lib.list_symbols() + + def peakmem_list_symbols(self, *args): + self.lib.list_symbols() + + +class ListSymbolsWithDeletes: + """Measure list_symbols with a mix of adds and deletes since last compaction. + + Tests the merge path where some symbols have been deleted and new ones added.""" + + rounds = 1 + number = 3 + timeout = 600 + warmup_time = 0 + + storages = [Storage.LMDB, Storage.AMAZON] + num_symbols = [1_000] + + params = [num_symbols, storages] + param_names = ["num_symbols", "storage"] + + def __init__(self): + self.logger = get_logger() + self.lib = None + + def setup_cache(self): + return create_libraries_across_storages(self.storages) + + def teardown(self, *args): + if self.lib is not None: + self.lib._nvs.version_store.clear() + + def setup(self, lib_for_storage, num_symbols, storage): + self.lib = lib_for_storage[storage] + if self.lib is None: + raise SkipNotImplemented + + simple_df = pd.DataFrame({"a": [1]}) + + batch_size = 1000 + for start in range(0, num_symbols, batch_size): + end = min(start + batch_size, num_symbols) + payloads = [WritePayload(f"sym_{i}", simple_df) for i in range(start, end)] + self.lib.write_batch(payloads) + + # Compact + self.lib.list_symbols() + + # Delete 10% of symbols and add new ones + num_to_delete = num_symbols // 10 + for i in range(num_to_delete): + self.lib.delete(f"sym_{i}") + for i in range(num_to_delete): + self.lib.write(f"sym_new_{i}", simple_df) + + def time_list_symbols(self, *args): + self.lib.list_symbols() + + def peakmem_list_symbols(self, *args): + self.lib.list_symbols()