From 5b882a14ad5ef9049915145cc79e0b1fc054acf1 Mon Sep 17 00:00:00 2001 From: Alex Seaton Date: Fri, 15 May 2026 12:47:09 +0100 Subject: [PATCH 1/5] Submit delete batches in parallel --- cpp/arcticdb/async/async_store.hpp | 52 ++++++- cpp/arcticdb/async/test/test_async.cpp | 144 ++++++++++++++++++ cpp/arcticdb/storage/azure/azure_storage.cpp | 52 +++---- cpp/arcticdb/storage/azure/azure_storage.hpp | 2 + cpp/arcticdb/storage/library.hpp | 2 + cpp/arcticdb/storage/s3/detail-inl.hpp | 71 ++++----- .../storage/s3/nfs_backed_storage.cpp | 7 + .../storage/s3/nfs_backed_storage.hpp | 2 + cpp/arcticdb/storage/s3/s3_storage.cpp | 7 + cpp/arcticdb/storage/s3/s3_storage.hpp | 4 + cpp/arcticdb/storage/storage.hpp | 5 + cpp/arcticdb/storage/storages.hpp | 2 + cpp/arcticdb/util/key_utils.hpp | 48 ++++-- .../version_store/test_bulk_delete.py | 63 ++++++++ .../arcticdb/version_store/test_parallel.py | 4 +- 15 files changed, 378 insertions(+), 87 deletions(-) create mode 100644 python/tests/integration/arcticdb/version_store/test_bulk_delete.py diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index dac1d649033..f239c9bd383 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -383,26 +383,53 @@ class AsyncStore : public Store { folly::Future> remove_keys( const std::vector& keys, storage::RemoveOpts opts ) override { - return keys.empty() ? std::vector() - : async::submit_io_task(RemoveBatchTask{keys, library_, opts}); + return remove_keys(std::vector{keys}, opts); } folly::Future> remove_keys( std::vector&& keys, storage::RemoveOpts opts ) override { - return keys.empty() ? std::vector() - : async::submit_io_task(RemoveBatchTask{std::move(keys), library_, opts}); + if (keys.empty()) { + return std::vector{}; + } + const auto batch_size = library_->max_delete_batch_size(); + if (!batch_size.has_value() || keys.size() <= *batch_size) { + return async::submit_io_task(RemoveBatchTask{std::move(keys), library_, opts}); + } + auto chunks = chunk_keys(std::move(keys), *batch_size); + auto futs = folly::window( + std::move(chunks), + [this, opts](std::vector&& chunk) { + return async::submit_io_task(RemoveBatchTask{std::move(chunk), library_, opts}); + }, + async::TaskScheduler::instance()->io_thread_count() + ); + return folly::collect(std::move(futs)).via(&async::io_executor()).thenValue([](auto&&) { + return std::vector{}; + }); } std::vector remove_keys_sync( const std::vector& keys, storage::RemoveOpts opts ) override { - return keys.empty() ? std::vector() : RemoveBatchTask{keys, library_, opts}(); + return remove_keys_sync(std::vector{keys}, opts); } std::vector remove_keys_sync(std::vector&& keys, storage::RemoveOpts opts) override { - return keys.empty() ? std::vector() : RemoveBatchTask{std::move(keys), library_, opts}(); + if (keys.empty()) { + return {}; + } + const auto batch_size = library_->max_delete_batch_size(); + if (!batch_size.has_value() || keys.size() <= *batch_size) { + return RemoveBatchTask{std::move(keys), library_, opts}(); + } + std::vector result; + for (auto& chunk : chunk_keys(std::move(keys), *batch_size)) { + auto sub = RemoveBatchTask{std::move(chunk), library_, opts}(); + result.insert(result.end(), std::make_move_iterator(sub.begin()), std::make_move_iterator(sub.end())); + } + return result; } std::vector> batch_read_compressed( @@ -513,6 +540,19 @@ class AsyncStore : public Store { private: friend class arcticdb::toolbox::apy::LibraryTool; + + static std::vector> chunk_keys( + std::vector&& keys, size_t batch_size + ) { + std::vector> chunks; + chunks.reserve((keys.size() + batch_size - 1) / batch_size); + for (size_t i = 0; i < keys.size(); i += batch_size) { + const size_t end = std::min(i + batch_size, keys.size()); + chunks.emplace_back(std::make_move_iterator(keys.begin() + i), std::make_move_iterator(keys.begin() + end)); + } + return chunks; + } + std::shared_ptr library_; std::shared_ptr codec_; const EncodingVersion encoding_version_; diff --git a/cpp/arcticdb/async/test/test_async.cpp b/cpp/arcticdb/async/test/test_async.cpp index b215d2afa08..3cf1ba15a3a 100644 --- a/cpp/arcticdb/async/test/test_async.cpp +++ b/cpp/arcticdb/async/test/test_async.cpp @@ -24,6 +24,8 @@ #include #include #include +#include +#include #include using namespace arcticdb; @@ -671,3 +673,145 @@ TEST(Async, CopyCompressedInterStoreKeyExistsCheckFailureWithRetry) { ASSERT_EQ(std::get(read_result_2.first), key); ASSERT_EQ(read_result_2.second.row_count(), row_count); } + +namespace remove_batch_test { + +class RecordingStorage final : public as::Storage { + public: + RecordingStorage(const as::LibraryPath& lib, as::OpenMode mode, std::optional batch_size) : + Storage(lib, mode), + batch_size_(batch_size) {} + + std::string name() const final { return "recording_storage"; } + + std::optional max_delete_batch_size() const override { return batch_size_; } + + std::vector recorded_call_sizes() { + std::lock_guard guard(mutex_); + return recorded_; + } + + void seed_iter_keys(std::vector keys) { + std::lock_guard guard(mutex_); + iter_keys_ = std::move(keys); + } + + private: + void do_write(as::KeySegmentPair&) final { util::raise_rte("unused"); } + void do_write_if_none(as::KeySegmentPair&) final { util::raise_rte("unused"); } + void do_update(as::KeySegmentPair&, as::UpdateOpts) final { util::raise_rte("unused"); } + void do_read(entity::VariantKey&&, const as::ReadVisitor&, as::ReadKeyOpts) final { util::raise_rte("unused"); } + as::KeySegmentPair do_read(entity::VariantKey&&, as::ReadKeyOpts) final { util::raise_rte("unused"); } + + void do_remove(entity::VariantKey&&, as::RemoveOpts) final { + std::lock_guard guard(mutex_); + recorded_.push_back(1); + } + + void do_remove(std::span keys, as::RemoveOpts) final { + std::lock_guard guard(mutex_); + recorded_.push_back(keys.size()); + } + + bool do_key_exists(const entity::VariantKey&) final { return false; } + bool do_supports_prefix_matching() const final { return false; } + as::SupportsAtomicWrites do_supports_atomic_writes() const final { return as::SupportsAtomicWrites::NO; } + bool do_fast_delete() final { return false; } + bool do_iterate_type_until_match(entity::KeyType, const as::IterateTypePredicate& visitor, const std::string&) + final { + std::vector keys; + { + std::lock_guard guard(mutex_); + keys = iter_keys_; + } + for (auto& key : keys) { + if (visitor(entity::VariantKey{key})) { + return true; + } + } + return false; + } + std::string do_key_path(const entity::VariantKey&) const final { return {}; } + + std::optional batch_size_; + std::mutex mutex_; + std::vector recorded_; + std::vector iter_keys_; +}; + +inline std::shared_ptr> build_async_store(std::shared_ptr storage) { + auto storages = std::make_shared(as::Storages::StorageVector{storage}, as::OpenMode::DELETE); + auto library = std::make_shared(as::LibraryPath{"a", "b"}, std::move(storages)); + return std::make_shared>(std::move(library), proto::encoding::VariantCodec{}, EncodingVersion::V1); +} + +inline std::vector make_keys(size_t n) { + std::vector keys; + keys.reserve(n); + for (size_t i = 0; i < n; ++i) { + keys.emplace_back(arcticdb::atom_key_builder().build(fmt::format("k_{}", i), entity::KeyType::TABLE_DATA)); + } + return keys; +} + +} // namespace remove_batch_test + +TEST(AsyncStoreRemoveKeys, AsyncSplitsIntoChunksRespectingBatchSize) { + using namespace remove_batch_test; + auto storage = std::make_shared(as::LibraryPath{"a", "b"}, as::OpenMode::DELETE, 3); + auto store = build_async_store(storage); + + store->remove_keys(make_keys(10), as::RemoveOpts{}).get(); + + auto sizes = storage->recorded_call_sizes(); + std::sort(sizes.begin(), sizes.end()); + ASSERT_EQ(sizes, std::vector({1, 3, 3, 3})); +} + +TEST(AsyncStoreRemoveKeys, SyncSplitsIntoChunksRespectingBatchSize) { + using namespace remove_batch_test; + auto storage = std::make_shared(as::LibraryPath{"a", "b"}, as::OpenMode::DELETE, 3); + auto store = build_async_store(storage); + + store->remove_keys_sync(make_keys(10), as::RemoveOpts{}); + + auto sizes = storage->recorded_call_sizes(); + ASSERT_EQ(sizes, std::vector({3, 3, 3, 1})); +} + +TEST(AsyncStoreRemoveKeys, NulloptBatchSizeSendsSingleCall) { + using namespace remove_batch_test; + auto storage = std::make_shared(as::LibraryPath{"a", "b"}, as::OpenMode::DELETE, std::nullopt); + auto store = build_async_store(storage); + + store->remove_keys(make_keys(100), as::RemoveOpts{}).get(); + + auto sizes = storage->recorded_call_sizes(); + ASSERT_EQ(sizes, std::vector({100})); +} + +TEST(AsyncStoreRemoveKeys, EmptyKeysIsNoOp) { + using namespace remove_batch_test; + auto storage = std::make_shared(as::LibraryPath{"a", "b"}, as::OpenMode::DELETE, 3); + auto store = build_async_store(storage); + + auto result = store->remove_keys(std::vector{}, as::RemoveOpts{}).get(); + + ASSERT_TRUE(result.empty()); + ASSERT_TRUE(storage->recorded_call_sizes().empty()); +} + +TEST(KeyUtilsDeleteBatching, FlushesPendingBufferAtThreshold) { + using namespace remove_batch_test; + ScopedConfig scoped("Storage.DeletePendingBufferSize", 3); + + auto storage = std::make_shared(as::LibraryPath{"a", "b"}, as::OpenMode::DELETE, std::nullopt); + storage->seed_iter_keys(make_keys(11)); + auto store = build_async_store(storage); + + arcticdb::delete_keys_of_type_if( + store, [](const entity::VariantKey&) { return true; }, entity::KeyType::TABLE_DATA + ); + + ASSERT_EQ(storage->recorded_call_sizes(), std::vector({3, 3, 3, 2})); +} diff --git a/cpp/arcticdb/storage/azure/azure_storage.cpp b/cpp/arcticdb/storage/azure/azure_storage.cpp index faf06ab1f44..36928262ee8 100644 --- a/cpp/arcticdb/storage/azure/azure_storage.cpp +++ b/cpp/arcticdb/storage/azure/azure_storage.cpp @@ -211,41 +211,30 @@ void do_remove_impl( ) { ARCTICDB_SUBSAMPLE(AzureStorageDeleteBatch, 0) auto fmt_db = [](auto&& k) { return variant_key_type(k); }; - std::vector to_delete; - static const size_t delete_object_limit = std::min( - BATCH_SUBREQUEST_LIMIT, - static_cast(ConfigsMap::instance()->get_int("AzureStorage.DeleteBatchSize", BATCH_SUBREQUEST_LIMIT)) - ); - auto submit_batch = [&azure_client, &request_timeout](auto& to_delete) { - try { - azure_client.delete_blobs(to_delete, request_timeout); - } catch (const Azure::Core::RequestFailedException& e) { - std::string failed_objects = fmt::format("{}", fmt::join(to_delete, ", ")); - raise_azure_exception(e, failed_objects); - } - to_delete.clear(); - }; + util::check( + variant_keys.size() <= BATCH_SUBREQUEST_LIMIT, + "Azure do_remove_impl called with {} keys, which exceeds BATCH_SUBREQUEST_LIMIT={}. " + "AsyncStore is responsible for chunking inputs to max_delete_batch_size().", + variant_keys.size(), + BATCH_SUBREQUEST_LIMIT + ); (fg::from(variant_keys) | fg::move | fg::groupBy(fmt_db)) - .foreach ([&root_folder, - b = std::move(bucketizer), - delete_object_limit = delete_object_limit, - &to_delete, - &submit_batch](auto&& group - ) { // bypass incorrect 'set but no used" error for delete_object_limit + .foreach ([&root_folder, &azure_client, &request_timeout, b = std::move(bucketizer)](auto&& group) { auto key_type_dir = key_type_folder(root_folder, group.key()); + std::vector to_delete; + to_delete.reserve(group.size()); for (auto k : folly::enumerate(group.values())) { - auto blob_name = object_path(b.bucketize(key_type_dir, *k), *k); - to_delete.emplace_back(std::move(blob_name)); - if (to_delete.size() == delete_object_limit) { - submit_batch(to_delete); - } + to_delete.emplace_back(object_path(b.bucketize(key_type_dir, *k), *k)); + } + try { + azure_client.delete_blobs(to_delete, request_timeout); + } catch (const Azure::Core::RequestFailedException& e) { + std::string failed_objects = fmt::format("{}", fmt::join(to_delete, ", ")); + raise_azure_exception(e, failed_objects); } }); - if (!to_delete.empty()) { - submit_batch(to_delete); - } } std::string prefix_handler( @@ -359,6 +348,13 @@ void AzureStorage::do_remove(std::span variant_keys, RemoveOpts) { detail::do_remove_impl(std::move(variant_keys), root_folder_, *azure_client_, FlatBucketizer{}, request_timeout_); } +std::optional AzureStorage::max_delete_batch_size() const { + return std::min( + BATCH_SUBREQUEST_LIMIT, + static_cast(ConfigsMap::instance()->get_int("AzureStorage.DeleteBatchSize", BATCH_SUBREQUEST_LIMIT)) + ); +} + bool AzureStorage::do_iterate_type_until_match( KeyType key_type, const IterateTypePredicate& visitor, const std::string& prefix ) { diff --git a/cpp/arcticdb/storage/azure/azure_storage.hpp b/cpp/arcticdb/storage/azure/azure_storage.hpp index 79a47ded6f3..67b07a5cb7e 100644 --- a/cpp/arcticdb/storage/azure/azure_storage.hpp +++ b/cpp/arcticdb/storage/azure/azure_storage.hpp @@ -25,6 +25,8 @@ class AzureStorage final : public Storage { std::string name() const final; + std::optional max_delete_batch_size() const final override; + protected: void do_write(KeySegmentPair& key_seg) final; diff --git a/cpp/arcticdb/storage/library.hpp b/cpp/arcticdb/storage/library.hpp index 9375a59141f..b59d8a4d078 100644 --- a/cpp/arcticdb/storage/library.hpp +++ b/cpp/arcticdb/storage/library.hpp @@ -171,6 +171,8 @@ class Library { bool supports_atomic_writes() const { return storages_->supports_atomic_writes(); } + [[nodiscard]] std::optional max_delete_batch_size() const { return storages_->max_delete_batch_size(); } + [[nodiscard]] const LibraryPath& library_path() const { return library_path_; } [[nodiscard]] OpenMode open_mode() const { return storages_->open_mode(); } diff --git a/cpp/arcticdb/storage/s3/detail-inl.hpp b/cpp/arcticdb/storage/s3/detail-inl.hpp index 8463ee6789a..17eb26eb545 100644 --- a/cpp/arcticdb/storage/s3/detail-inl.hpp +++ b/cpp/arcticdb/storage/s3/detail-inl.hpp @@ -299,62 +299,53 @@ void do_remove_impl( ) { ARCTICDB_SUBSAMPLE(S3StorageDeleteBatch, 0) auto fmt_db = [](auto&& k) { return variant_key_type(k); }; - std::vector to_delete; boost::container::small_vector failed_deletes; - static const size_t delete_object_limit = std::min( - DELETE_OBJECTS_LIMIT, - static_cast(ConfigsMap::instance()->get_int("S3Storage.DeleteBatchSize", 1000)) - ); - to_delete.reserve(std::min(ks.size(), delete_object_limit)); + util::check( + ks.size() <= DELETE_OBJECTS_LIMIT, + "S3 do_remove_impl called with {} keys, which exceeds DELETE_OBJECTS_LIMIT={}. " + "AsyncStore is responsible for chunking inputs to max_delete_batch_size().", + ks.size(), + DELETE_OBJECTS_LIMIT + ); (fg::from(ks) | fg::move | fg::groupBy(fmt_db)) .foreach ([&s3_client, &root_folder, &bucket_name, - &to_delete, b = std::forward(bucketizer), &failed_deletes](auto&& group) { auto key_type_dir = key_type_folder(root_folder, group.key()); + std::vector to_delete; + to_delete.reserve(group.size()); for (auto k : folly::enumerate(group.values())) { - auto s3_object_name = object_path(b.bucketize(key_type_dir, *k), *k); - to_delete.emplace_back(std::move(s3_object_name)); - - if (to_delete.size() == delete_object_limit || k.index + 1 == group.size()) { - auto query_stat_operation_time = query_stats::add_task_count_and_time( - query_stats::TaskType::S3_DeleteObjects, group.key() + to_delete.emplace_back(object_path(b.bucketize(key_type_dir, *k), *k)); + } + auto query_stat_operation_time = + query_stats::add_task_count_and_time(query_stats::TaskType::S3_DeleteObjects, group.key()); + auto delete_object_result = s3_client.delete_objects(to_delete, bucket_name); + if (delete_object_result.is_success()) { + ARCTICDB_RUNTIME_DEBUG( + log::storage(), "Deleted {} objects of type {}", to_delete.size(), group.key() + ); + for (auto& bad_key : delete_object_result.get_output().failed_deletes) { + auto bad_key_name = bad_key.s3_object_name.substr(key_type_dir.size(), std::string::npos); + failed_deletes.emplace_back( + variant_key_from_bytes( + reinterpret_cast(bad_key_name.data()), + bad_key_name.size(), + group.key() + ), + std::move(bad_key.error_message) ); - auto delete_object_result = s3_client.delete_objects(to_delete, bucket_name); - if (delete_object_result.is_success()) { - ARCTICDB_RUNTIME_DEBUG( - log::storage(), - "Deleted {} objects, one of which with key '{}'", - to_delete.size(), - variant_key_view(*k) - ); - for (auto& bad_key : delete_object_result.get_output().failed_deletes) { - auto bad_key_name = - bad_key.s3_object_name.substr(key_type_dir.size(), std::string::npos); - failed_deletes.emplace_back( - variant_key_from_bytes( - reinterpret_cast(bad_key_name.data()), - bad_key_name.size(), - group.key() - ), - std::move(bad_key.error_message) - ); - } - } else { - auto& error = delete_object_result.get_error(); - std::string failed_objects = fmt::format("{}", fmt::join(to_delete, ", ")); - raise_s3_exception(error, failed_objects); - } - to_delete.clear(); } + } else { + auto& error = delete_object_result.get_error(); + std::string failed_objects = fmt::format("{}", fmt::join(to_delete, ", ")); + raise_s3_exception(error, failed_objects); } }); - util::check(to_delete.empty(), "Have {} segment that have not been removed", to_delete.size()); raise_if_failed_deletes(failed_deletes); } diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp index ca97d9f9692..33662cf2563 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp @@ -218,6 +218,13 @@ void NfsBackedStorage::do_remove(std::span variant_keys, RemoveOpts) s3::detail::do_remove_impl(std::span(enc), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}); } +std::optional NfsBackedStorage::max_delete_batch_size() const { + return std::min( + s3::detail::DELETE_OBJECTS_LIMIT, + static_cast(ConfigsMap::instance()->get_int("S3Storage.DeleteBatchSize", 1000)) + ); +} + bool NfsBackedStorage::do_iterate_type_until_match( KeyType key_type, const IterateTypePredicate& visitor, const std::string& prefix ) { diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp index 9d87e14911c..ad352b48bca 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp @@ -25,6 +25,8 @@ class NfsBackedStorage final : public Storage { bool supports_object_size_calculation() const final override; + std::optional max_delete_batch_size() const final override; + private: void do_write(KeySegmentPair& key_seg) final; diff --git a/cpp/arcticdb/storage/s3/s3_storage.cpp b/cpp/arcticdb/storage/s3/s3_storage.cpp index 8058e0bf437..8ba5232d549 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.cpp +++ b/cpp/arcticdb/storage/s3/s3_storage.cpp @@ -105,6 +105,13 @@ void S3Storage::do_remove(std::span variant_keys, RemoveOpts) { detail::do_remove_impl(variant_keys, root_folder_, bucket_name_, client(), FlatBucketizer{}); } +std::optional S3Storage::max_delete_batch_size() const { + return std::min( + detail::DELETE_OBJECTS_LIMIT, + static_cast(ConfigsMap::instance()->get_int("S3Storage.DeleteBatchSize", 1000)) + ); +} + void S3Storage::do_remove(VariantKey&& variant_key, RemoveOpts) { detail::do_remove_impl(std::move(variant_key), root_folder_, bucket_name_, client(), FlatBucketizer{}); } diff --git a/cpp/arcticdb/storage/s3/s3_storage.hpp b/cpp/arcticdb/storage/s3/s3_storage.hpp index 97079014324..7d16f2b197a 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.hpp +++ b/cpp/arcticdb/storage/s3/s3_storage.hpp @@ -40,6 +40,8 @@ class S3Storage : public Storage, AsyncStorage { bool supports_object_size_calculation() const final; + std::optional max_delete_batch_size() const override; + // These are only public for testing purposes S3ClientInterface& client() { return *s3_client_; } bool directory_bucket() const { return directory_bucket_; } @@ -103,6 +105,8 @@ class GCPXMLStorage : public S3Storage { public: GCPXMLStorage(const LibraryPath& lib, OpenMode mode, const GCPXMLSettings& conf); + std::optional max_delete_batch_size() const override { return std::nullopt; } + protected: void do_remove(std::span variant_keys, RemoveOpts opts) override; void do_remove(VariantKey&& variant_key, RemoveOpts opts) override; diff --git a/cpp/arcticdb/storage/storage.hpp b/cpp/arcticdb/storage/storage.hpp index e728968f040..7924c4ce1ff 100644 --- a/cpp/arcticdb/storage/storage.hpp +++ b/cpp/arcticdb/storage/storage.hpp @@ -15,6 +15,7 @@ #include #include +#include #include namespace arcticdb::storage { @@ -126,6 +127,10 @@ class Storage { bool fast_delete() { return do_fast_delete(); } + // Maximum number of keys this storage will accept in a single do_remove(span) call + // std::nullopt means no limit (e.g. LMDB, memory) + [[nodiscard]] virtual std::optional max_delete_batch_size() const { return std::nullopt; } + virtual void cleanup() {} inline bool key_exists(const VariantKey& key) { return do_key_exists(key); } diff --git a/cpp/arcticdb/storage/storages.hpp b/cpp/arcticdb/storage/storages.hpp index 2f16a1297fa..13c340307ed 100644 --- a/cpp/arcticdb/storage/storages.hpp +++ b/cpp/arcticdb/storage/storages.hpp @@ -203,6 +203,8 @@ class Storages { void remove(std::span variant_keys, storage::RemoveOpts opts) { primary().remove(variant_keys, opts); } + [[nodiscard]] std::optional max_delete_batch_size() const { return primary().max_delete_batch_size(); } + [[nodiscard]] OpenMode open_mode() const { return mode_; } void move_storage(KeyType key_type, timestamp horizon, size_t storage_index = 0) { diff --git a/cpp/arcticdb/util/key_utils.hpp b/cpp/arcticdb/util/key_utils.hpp index 155da9647e2..0e3f9cbb2e4 100644 --- a/cpp/arcticdb/util/key_utils.hpp +++ b/cpp/arcticdb/util/key_utils.hpp @@ -21,25 +21,51 @@ inline void delete_keys_of_type_if( const std::shared_ptr& store, Predicate&& predicate, KeyType key_type, const std::string& prefix = std::string(), bool continue_on_error = false ) { - static const size_t delete_object_limit = ConfigsMap::instance()->get_int("Storage.DeleteBatchSize", 1000); - std::vector keys{}; + const size_t flush_threshold = ConfigsMap::instance()->get_int("Storage.DeletePendingBufferSize", 100'000); + std::vector keys; + keys.reserve(flush_threshold); + size_t total_flushed = 0; try { store->iterate_type( key_type, - [predicate = std::forward(predicate), store = store, &keys](VariantKey&& key) { - if (predicate(key)) - keys.emplace_back(std::move(key)); - - if (keys.size() == delete_object_limit) { - store->remove_keys(keys).get(); - keys.clear(); + [predicate = std::forward(predicate), + &keys, + &total_flushed, + store, + key_type, + flush_threshold](VariantKey&& key) { + if (!predicate(key)) + return; + keys.emplace_back(std::move(key)); + if (keys.size() >= flush_threshold) { + auto batch = std::move(keys); + keys = {}; + keys.reserve(flush_threshold); + const auto batch_size = batch.size(); + store->remove_keys(std::move(batch)).get(); + total_flushed += batch_size; + log::storage().debug( + "delete_keys_of_type_if: flushed {} keys of type {} (cumulative {})", + batch_size, + key_type, + total_flushed + ); } }, prefix ); - if (!keys.empty()) - store->remove_keys(keys).get(); + if (!keys.empty()) { + const auto batch_size = keys.size(); + store->remove_keys(std::move(keys)).get(); + total_flushed += batch_size; + log::storage().debug( + "delete_keys_of_type_if: final flush of {} keys of type {} (cumulative {})", + batch_size, + key_type, + total_flushed + ); + } } catch (const std::exception& ex) { if (continue_on_error) log::storage().warn("Caught exception {} trying to delete key, continuing", ex.what()); diff --git a/python/tests/integration/arcticdb/version_store/test_bulk_delete.py b/python/tests/integration/arcticdb/version_store/test_bulk_delete.py new file mode 100644 index 00000000000..4d7ff236d91 --- /dev/null +++ b/python/tests/integration/arcticdb/version_store/test_bulk_delete.py @@ -0,0 +1,63 @@ +""" +Copyright 2026 Man Group Operations Limited + +Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + +As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. +""" + +import numpy as np +import pandas as pd +import pytest + +from arcticdb_ext.storage import KeyType +from arcticdb.util.test import config_context, query_stats_operation_count +import arcticdb.toolbox.query_stats as qs + + +def _write_versions(lib, symbol, num_versions): + for i in range(num_versions): + df = pd.DataFrame({"x": np.arange(i, i + 5, dtype=np.int64)}) + lib.write(symbol, df) + + +@pytest.mark.storage +def test_bulk_delete_batching(object_and_mem_and_lmdb_version_store, sym): + lib = object_and_mem_and_lmdb_version_store + num_versions = 12 + batch_size = 5 + + with ( + config_context("S3Storage.DeleteBatchSize", batch_size), + config_context("AzureStorage.DeleteBatchSize", batch_size), + ): + _write_versions(lib, sym, num_versions) + + lib_tool = lib.library_tool() + assert len(lib_tool.find_keys_for_symbol(KeyType.TABLE_INDEX, sym)) == num_versions + assert len(lib_tool.find_keys_for_symbol(KeyType.TABLE_DATA, sym)) == num_versions + + lib.delete(sym) + + assert lib.has_symbol(sym) is False + assert lib_tool.find_keys_for_symbol(KeyType.TABLE_INDEX, sym) == [] + assert lib_tool.find_keys_for_symbol(KeyType.TABLE_DATA, sym) == [] + + +def test_bulk_delete_issues_one_storage_op_per_batch(s3_version_store_v1, sym, clear_query_stats): + lib = s3_version_store_v1 + num_versions = 12 + batch_size = 5 + expected_batches_per_key_type = 3 + + with config_context("S3Storage.DeleteBatchSize", batch_size): + _write_versions(lib, sym, num_versions) + + qs.enable() + qs.reset_stats() + lib.delete(sym) + stats = qs.get_query_stats() + + assert query_stats_operation_count(stats, "S3_DeleteObjects", "TABLE_INDEX") == expected_batches_per_key_type + assert query_stats_operation_count(stats, "S3_DeleteObjects", "TABLE_DATA") == expected_batches_per_key_type + assert query_stats_operation_count(stats, "S3_DeleteObjects", "COLUMN_STATS") == expected_batches_per_key_type diff --git a/python/tests/unit/arcticdb/version_store/test_parallel.py b/python/tests/unit/arcticdb/version_store/test_parallel.py index 39acdd3912c..2028aa4aea8 100644 --- a/python/tests/unit/arcticdb/version_store/test_parallel.py +++ b/python/tests/unit/arcticdb/version_store/test_parallel.py @@ -80,7 +80,7 @@ def test_remove_incomplete(arctic_library_v1, batch, batch_size, lib_name): arctic_library_v1._dev_tools.remove_incompletes(["sym"]) return # remove_incompletes not implemented on Mongo 8784267430 - with config_context_multi({"Storage.DeleteBatchSize": batch_size, "S3Storage.DeleteBatchSize": 2 * batch_size}): + with config_context_multi({"S3Storage.DeleteBatchSize": batch_size, "AzureStorage.DeleteBatchSize": batch_size}): lib_tool = lib.library_tool() assert lib_tool.find_keys(KeyType.APPEND_DATA) == [] assert lib.list_symbols_with_incomplete_data() == [] @@ -134,7 +134,7 @@ def test_remove_incompletes(arctic_library_v1, batch_size): arctic_library_v1._dev_tools.remove_incompletes(["sym"]) return # remove_incompletes not implemented on Mongo 8784267430 - with config_context_multi({"Storage.DeleteBatchSize": batch_size, "S3Storage.DeleteBatchSize": 2 * batch_size}): + with config_context_multi({"S3Storage.DeleteBatchSize": batch_size, "AzureStorage.DeleteBatchSize": batch_size}): lib = arctic_library_v1 lib_tool = lib._dev_tools.library_tool() assert lib_tool.find_keys(KeyType.APPEND_DATA) == [] From e1ddc1a4eb0820e817eccbbd80a996e3871675c3 Mon Sep 17 00:00:00 2001 From: Alex Seaton Date: Fri, 22 May 2026 13:06:09 +0100 Subject: [PATCH 2/5] Remove folly::gen from storage backends --- cpp/arcticdb/CMakeLists.txt | 1 + cpp/arcticdb/entity/test/test_variant_key.cpp | 67 ++++++++++++++ cpp/arcticdb/entity/variant_key.hpp | 18 ++++ cpp/arcticdb/storage/azure/azure_storage.cpp | 32 +++---- cpp/arcticdb/storage/mongo/mongo_storage.cpp | 45 +++++----- cpp/arcticdb/storage/s3/detail-inl.hpp | 88 ++++++++----------- 6 files changed, 153 insertions(+), 98 deletions(-) create mode 100644 cpp/arcticdb/entity/test/test_variant_key.cpp diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index c6d1e14f31f..82523e95196 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -1054,6 +1054,7 @@ if(${TEST}) entity/test/test_key_serialization.cpp entity/test/test_metrics.cpp entity/test/test_ref_key.cpp + entity/test/test_variant_key.cpp entity/test/test_tensor.cpp log/test/test_log.cpp pipeline/test/test_container.hpp diff --git a/cpp/arcticdb/entity/test/test_variant_key.cpp b/cpp/arcticdb/entity/test/test_variant_key.cpp new file mode 100644 index 00000000000..830da1add0a --- /dev/null +++ b/cpp/arcticdb/entity/test/test_variant_key.cpp @@ -0,0 +1,67 @@ +/* Copyright 2026 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software + * will be governed by the Apache License, version 2.0. + */ + +#include + +#include +#include +#include + +using namespace arcticdb; +using namespace arcticdb::entity; + +namespace { +AtomKey make_atom(KeyType kt, int64_t id) { + return atom_key_builder() + .version_id(0) + .creation_ts(0) + .content_hash(0) + .start_index(NumericId{0}) + .end_index(NumericId{0}) + .build(NumericId{id}, kt); +} +} // namespace + +TEST(VariantKey, KeyTypesEmpty) { + std::vector keys; + ASSERT_TRUE(key_types(keys).empty()); +} + +TEST(VariantKey, KeyTypesSingleType) { + std::vector keys{make_atom(KeyType::TABLE_DATA, 1), make_atom(KeyType::TABLE_DATA, 2)}; + auto result = key_types(keys); + ASSERT_EQ(result.size(), 1u); + ASSERT_EQ(result.at(0), KeyType::TABLE_DATA); +} + +TEST(VariantKey, KeyTypesMultipleTypesDeduped) { + std::vector keys{ + make_atom(KeyType::TABLE_DATA, 1), + make_atom(KeyType::TABLE_INDEX, 2), + make_atom(KeyType::TABLE_DATA, 3), + make_atom(KeyType::VERSION, 4), + make_atom(KeyType::TABLE_INDEX, 5), + }; + auto result = key_types(keys); + ASSERT_EQ(result.size(), 3u); + // Results are returned in KeyType enum order. + ASSERT_EQ(result.at(0), KeyType::TABLE_DATA); + ASSERT_EQ(result.at(1), KeyType::TABLE_INDEX); + ASSERT_EQ(result.at(2), KeyType::VERSION); +} + +TEST(VariantKey, KeyTypesAtomAndRefMix) { + std::vector keys{ + make_atom(KeyType::TABLE_DATA, 1), + RefKey{"sym", KeyType::VERSION_REF}, + }; + auto result = key_types(keys); + ASSERT_EQ(result.size(), 2u); + ASSERT_EQ(result.at(0), KeyType::TABLE_DATA); + ASSERT_EQ(result.at(1), KeyType::VERSION_REF); +} diff --git a/cpp/arcticdb/entity/variant_key.hpp b/cpp/arcticdb/entity/variant_key.hpp index 9cebdf1754f..76ead468c41 100644 --- a/cpp/arcticdb/entity/variant_key.hpp +++ b/cpp/arcticdb/entity/variant_key.hpp @@ -10,7 +10,9 @@ #include #include +#include #include +#include #include namespace arcticdb::entity { @@ -48,6 +50,22 @@ inline KeyType variant_key_type(const VariantKey& vk) { return std::visit([](const auto& key) { return key.type(); }, vk); } +template +requires std::same_as>, VariantKey> +std::vector key_types(R&& keys) { + std::array(KeyType::UNDEFINED)> present{}; + for (const auto& k : keys) { + present.at(static_cast(variant_key_type(k))) = true; + } + std::vector result; + for (size_t i = 0; i < present.size(); ++i) { + if (present.at(i)) { + result.push_back(static_cast(i)); + } + } + return result; +} + inline const StreamId& variant_key_id(const VariantKey& vk) { return std::visit([](const auto& key) -> const StreamId& { return key.id(); }, vk); } diff --git a/cpp/arcticdb/storage/azure/azure_storage.cpp b/cpp/arcticdb/storage/azure/azure_storage.cpp index 36928262ee8..f00548d6ddf 100644 --- a/cpp/arcticdb/storage/azure/azure_storage.cpp +++ b/cpp/arcticdb/storage/azure/azure_storage.cpp @@ -21,8 +21,6 @@ #include #include -#include - #undef GetMessage namespace arcticdb::storage { @@ -202,15 +200,12 @@ KeySegmentPair do_read_impl( return KeySegmentPair{}; } -namespace fg = folly::gen; - template void do_remove_impl( std::span variant_keys, const std::string& root_folder, AzureClientWrapper& azure_client, KeyBucketizer&& bucketizer, unsigned int request_timeout ) { ARCTICDB_SUBSAMPLE(AzureStorageDeleteBatch, 0) - auto fmt_db = [](auto&& k) { return variant_key_type(k); }; util::check( variant_keys.size() <= BATCH_SUBREQUEST_LIMIT, @@ -220,21 +215,18 @@ void do_remove_impl( BATCH_SUBREQUEST_LIMIT ); - (fg::from(variant_keys) | fg::move | fg::groupBy(fmt_db)) - .foreach ([&root_folder, &azure_client, &request_timeout, b = std::move(bucketizer)](auto&& group) { - auto key_type_dir = key_type_folder(root_folder, group.key()); - std::vector to_delete; - to_delete.reserve(group.size()); - for (auto k : folly::enumerate(group.values())) { - to_delete.emplace_back(object_path(b.bucketize(key_type_dir, *k), *k)); - } - try { - azure_client.delete_blobs(to_delete, request_timeout); - } catch (const Azure::Core::RequestFailedException& e) { - std::string failed_objects = fmt::format("{}", fmt::join(to_delete, ", ")); - raise_azure_exception(e, failed_objects); - } - }); + std::vector to_delete; + to_delete.reserve(variant_keys.size()); + for (const auto& k : variant_keys) { + auto key_type_dir = key_type_folder(root_folder, variant_key_type(k)); + to_delete.emplace_back(object_path(bucketizer.bucketize(key_type_dir, k), k)); + } + try { + azure_client.delete_blobs(to_delete, request_timeout); + } catch (const Azure::Core::RequestFailedException& e) { + std::string failed_objects = fmt::format("{}", fmt::join(to_delete, ", ")); + raise_azure_exception(e, failed_objects); + } } std::string prefix_handler( diff --git a/cpp/arcticdb/storage/mongo/mongo_storage.cpp b/cpp/arcticdb/storage/mongo/mongo_storage.cpp index 5ffab35d802..4b3171e214a 100644 --- a/cpp/arcticdb/storage/mongo/mongo_storage.cpp +++ b/cpp/arcticdb/storage/mongo/mongo_storage.cpp @@ -9,7 +9,6 @@ #include #include -#include #include #include @@ -146,35 +145,31 @@ bool MongoStorage::do_fast_delete() { } void MongoStorage::do_remove(std::span variant_keys, RemoveOpts opts) { - namespace fg = folly::gen; - auto fmt_db = [](auto&& k) { return variant_key_type(k); }; ARCTICDB_SAMPLE(MongoStorageRemove, 0) std::vector keys_not_found; - (fg::from(variant_keys) | fg::move | fg::groupBy(fmt_db)).foreach ([&](auto&& group) { - for (auto& k : group.values()) { - auto collection = collection_name(variant_key_type(k)); - try { - auto result = client_->remove_keyvalue(db_, collection, k); - storage::check( - result.delete_count.has_value(), "Mongo did not acknowledge deletion for key {}", k - ); - util::warn( - result.delete_count.value() == 1, - "Expected to delete a single document with key {} deleted {} documents", - k, - result.delete_count.value() - ); - if (result.delete_count.value() == 0 && !opts.ignores_missing_key_) { - keys_not_found.push_back(k); - } - } catch (const mongocxx::operation_exception& ex) { - // mongo delete does not throw exception if key not found, it returns 0 as delete count - std::string object_name = std::string(variant_key_view(k)); - raise_mongo_exception(ex, object_name); + for (auto& k : variant_keys) { + auto collection = collection_name(variant_key_type(k)); + try { + auto result = client_->remove_keyvalue(db_, collection, k); + storage::check( + result.delete_count.has_value(), "Mongo did not acknowledge deletion for key {}", k + ); + util::warn( + result.delete_count.value() == 1, + "Expected to delete a single document with key {} deleted {} documents", + k, + result.delete_count.value() + ); + if (result.delete_count.value() == 0 && !opts.ignores_missing_key_) { + keys_not_found.push_back(k); } + } catch (const mongocxx::operation_exception& ex) { + // mongo delete does not throw exception if key not found, it returns 0 as delete count + std::string object_name = std::string(variant_key_view(k)); + raise_mongo_exception(ex, object_name); } - }); + } if (!keys_not_found.empty()) { throw KeyNotFoundException(std::move(keys_not_found)); } diff --git a/cpp/arcticdb/storage/s3/detail-inl.hpp b/cpp/arcticdb/storage/s3/detail-inl.hpp index 17eb26eb545..f49a76b9bf1 100644 --- a/cpp/arcticdb/storage/s3/detail-inl.hpp +++ b/cpp/arcticdb/storage/s3/detail-inl.hpp @@ -32,6 +32,8 @@ #include +#include + #undef GetMessage namespace arcticdb::storage { @@ -40,7 +42,6 @@ using namespace object_store_utils; namespace s3 { -namespace fg = folly::gen; namespace detail { static const size_t DELETE_OBJECTS_LIMIT = 1000; @@ -267,12 +268,12 @@ void do_read_impl( } struct FailedDelete { - VariantKey failed_key; + std::string failed_key_name; std::string error_message; - FailedDelete(VariantKey&& failed_key, std::string&& error_message) : - failed_key(failed_key), - error_message(error_message) {} + FailedDelete(std::string failed_key_name, std::string error_message) : + failed_key_name(std::move(failed_key_name)), + error_message(std::move(error_message)) {} }; inline void raise_if_failed_deletes(const boost::container::small_vector& failed_deletes) { @@ -281,7 +282,7 @@ inline void raise_if_failed_deletes(const boost::container::small_vector failed_deletes; util::check( @@ -309,42 +309,33 @@ void do_remove_impl( DELETE_OBJECTS_LIMIT ); - (fg::from(ks) | fg::move | fg::groupBy(fmt_db)) - .foreach ([&s3_client, - &root_folder, - &bucket_name, - b = std::forward(bucketizer), - &failed_deletes](auto&& group) { - auto key_type_dir = key_type_folder(root_folder, group.key()); - std::vector to_delete; - to_delete.reserve(group.size()); - for (auto k : folly::enumerate(group.values())) { - to_delete.emplace_back(object_path(b.bucketize(key_type_dir, *k), *k)); - } - auto query_stat_operation_time = - query_stats::add_task_count_and_time(query_stats::TaskType::S3_DeleteObjects, group.key()); - auto delete_object_result = s3_client.delete_objects(to_delete, bucket_name); - if (delete_object_result.is_success()) { - ARCTICDB_RUNTIME_DEBUG( - log::storage(), "Deleted {} objects of type {}", to_delete.size(), group.key() - ); - for (auto& bad_key : delete_object_result.get_output().failed_deletes) { - auto bad_key_name = bad_key.s3_object_name.substr(key_type_dir.size(), std::string::npos); - failed_deletes.emplace_back( - variant_key_from_bytes( - reinterpret_cast(bad_key_name.data()), - bad_key_name.size(), - group.key() - ), - std::move(bad_key.error_message) - ); - } - } else { - auto& error = delete_object_result.get_error(); - std::string failed_objects = fmt::format("{}", fmt::join(to_delete, ", ")); - raise_s3_exception(error, failed_objects); - } - }); + std::vector to_delete; + to_delete.reserve(ks.size()); + for (const auto& k : ks) { + auto key_type_dir = key_type_folder(root_folder, variant_key_type(k)); + to_delete.emplace_back(object_path(bucketizer.bucketize(key_type_dir, k), k)); + } + + std::vector> stat_timers; + if (query_stats::QueryStats::instance()->is_enabled()) { + auto distinct_key_types = key_types(ks); + stat_timers.reserve(distinct_key_types.size()); + for (auto kt : distinct_key_types) { + stat_timers.emplace_back(query_stats::add_task_count_and_time(query_stats::TaskType::S3_DeleteObjects, kt)); + } + } + + auto delete_object_result = s3_client.delete_objects(to_delete, bucket_name); + if (delete_object_result.is_success()) { + ARCTICDB_RUNTIME_DEBUG(log::storage(), "Deleted {} objects", to_delete.size()); + for (auto& bad_key : delete_object_result.get_output().failed_deletes) { + failed_deletes.emplace_back(std::move(bad_key.s3_object_name), std::move(bad_key.error_message)); + } + } else { + auto& error = delete_object_result.get_error(); + std::string failed_objects = fmt::format("{}", fmt::join(to_delete, ", ")); + raise_s3_exception(error, failed_objects); + } raise_if_failed_deletes(failed_deletes); } @@ -385,16 +376,7 @@ void do_remove_no_batching_impl( } else if (const auto& error = delete_object_result.get_error(); !is_not_found_error(error.GetErrorType())) { auto key_type_dir = key_type_folder(root_folder, variant_key_type(k)); auto s3_object_name = object_path(bucketizer.bucketize(key_type_dir, k), k); - auto bad_key_name = s3_object_name.substr(key_type_dir.size(), std::string::npos); - auto error_message = error.GetMessage(); - failed_deletes.push_back(FailedDelete{ - variant_key_from_bytes( - reinterpret_cast(bad_key_name.data()), - bad_key_name.size(), - variant_key_type(k) - ), - std::move(error_message) - }); + failed_deletes.emplace_back(std::move(s3_object_name), error.GetMessage()); } else { ARCTICDB_RUNTIME_DEBUG( log::storage(), "Acceptable error when deleting object with key '{}'", variant_key_view(k) From 8691a44a83b6da55d1f3bd868f4cca97834a2cf1 Mon Sep 17 00:00:00 2001 From: Alex Seaton Date: Fri, 22 May 2026 14:09:12 +0100 Subject: [PATCH 3/5] Use proper default value --- cpp/arcticdb/storage/s3/nfs_backed_storage.cpp | 2 +- cpp/arcticdb/storage/s3/s3_storage.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp index 33662cf2563..962c3f99fd3 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp @@ -221,7 +221,7 @@ void NfsBackedStorage::do_remove(std::span variant_keys, RemoveOpts) std::optional NfsBackedStorage::max_delete_batch_size() const { return std::min( s3::detail::DELETE_OBJECTS_LIMIT, - static_cast(ConfigsMap::instance()->get_int("S3Storage.DeleteBatchSize", 1000)) + static_cast(ConfigsMap::instance()->get_int("S3Storage.DeleteBatchSize", s3::detail::DELETE_OBJECTS_LIMIT)) ); } diff --git a/cpp/arcticdb/storage/s3/s3_storage.cpp b/cpp/arcticdb/storage/s3/s3_storage.cpp index 8ba5232d549..3cdd5b26191 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.cpp +++ b/cpp/arcticdb/storage/s3/s3_storage.cpp @@ -108,7 +108,7 @@ void S3Storage::do_remove(std::span variant_keys, RemoveOpts) { std::optional S3Storage::max_delete_batch_size() const { return std::min( detail::DELETE_OBJECTS_LIMIT, - static_cast(ConfigsMap::instance()->get_int("S3Storage.DeleteBatchSize", 1000)) + static_cast(ConfigsMap::instance()->get_int("S3Storage.DeleteBatchSize", detail::DELETE_OBJECTS_LIMIT)) ); } From 80451c8be975c5d056e37a5c5a34eace342bb7d0 Mon Sep 17 00:00:00 2001 From: Alex Seaton Date: Fri, 22 May 2026 14:09:27 +0100 Subject: [PATCH 4/5] Use proper default value --- cpp/arcticdb/storage/s3/nfs_backed_storage.cpp | 4 +++- cpp/arcticdb/storage/s3/s3_storage.cpp | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp index 962c3f99fd3..b57fa0c8e3e 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp @@ -221,7 +221,9 @@ void NfsBackedStorage::do_remove(std::span variant_keys, RemoveOpts) std::optional NfsBackedStorage::max_delete_batch_size() const { return std::min( s3::detail::DELETE_OBJECTS_LIMIT, - static_cast(ConfigsMap::instance()->get_int("S3Storage.DeleteBatchSize", s3::detail::DELETE_OBJECTS_LIMIT)) + static_cast( + ConfigsMap::instance()->get_int("S3Storage.DeleteBatchSize", s3::detail::DELETE_OBJECTS_LIMIT) + ) ); } diff --git a/cpp/arcticdb/storage/s3/s3_storage.cpp b/cpp/arcticdb/storage/s3/s3_storage.cpp index 3cdd5b26191..ac407653544 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.cpp +++ b/cpp/arcticdb/storage/s3/s3_storage.cpp @@ -108,7 +108,9 @@ void S3Storage::do_remove(std::span variant_keys, RemoveOpts) { std::optional S3Storage::max_delete_batch_size() const { return std::min( detail::DELETE_OBJECTS_LIMIT, - static_cast(ConfigsMap::instance()->get_int("S3Storage.DeleteBatchSize", detail::DELETE_OBJECTS_LIMIT)) + static_cast( + ConfigsMap::instance()->get_int("S3Storage.DeleteBatchSize", detail::DELETE_OBJECTS_LIMIT) + ) ); } From 76513609b3af6b353eaa151f2f8e237eb484a774 Mon Sep 17 00:00:00 2001 From: Alex Seaton Date: Wed, 3 Jun 2026 14:42:32 +0100 Subject: [PATCH 5/5] Code review comments --- cpp/arcticdb/async/async_store.hpp | 38 ++++++--------- cpp/arcticdb/async/tasks.hpp | 10 +--- cpp/arcticdb/async/test/test_async.cpp | 3 +- cpp/arcticdb/entity/test/test_variant_key.cpp | 8 ++-- cpp/arcticdb/entity/variant_key.hpp | 2 +- cpp/arcticdb/pipeline/write_frame.cpp | 6 +-- cpp/arcticdb/pipeline/write_frame.hpp | 6 +-- cpp/arcticdb/storage/s3/detail-inl.hpp | 2 +- cpp/arcticdb/storage/test/in_memory_store.hpp | 46 ++++++------------- cpp/arcticdb/stream/stream_sink.hpp | 21 +++------ cpp/arcticdb/util/key_utils.hpp | 2 + cpp/arcticdb/version/symbol_list.cpp | 6 +-- cpp/arcticdb/version/symbol_list.hpp | 4 +- cpp/arcticdb/version/version_map.hpp | 2 +- .../version_store/test_bulk_delete.py | 9 ++-- 15 files changed, 58 insertions(+), 107 deletions(-) diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index f239c9bd383..349cd3635c9 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -372,25 +372,22 @@ class AsyncStore : public Store { return async::submit_io_task(WriteCompressedBatchTask(std::move(kvs), library_)); } - folly::Future remove_key(const entity::VariantKey& key, storage::RemoveOpts opts) override { + folly::Future remove_key(const entity::VariantKey& key, storage::RemoveOpts opts) override { return async::submit_io_task(RemoveTask{key, library_, opts}); } - RemoveKeyResultType remove_key_sync(const entity::VariantKey& key, storage::RemoveOpts opts) override { - return RemoveTask{key, library_, opts}(); + void remove_key_sync(const entity::VariantKey& key, storage::RemoveOpts opts) override { + RemoveTask{key, library_, opts}(); } - folly::Future> remove_keys( - const std::vector& keys, storage::RemoveOpts opts - ) override { + folly::Future remove_keys(const std::vector& keys, storage::RemoveOpts opts) + override { return remove_keys(std::vector{keys}, opts); } - folly::Future> remove_keys( - std::vector&& keys, storage::RemoveOpts opts - ) override { + folly::Future remove_keys(std::vector&& keys, storage::RemoveOpts opts) override { if (keys.empty()) { - return std::vector{}; + return folly::Unit{}; } const auto batch_size = library_->max_delete_batch_size(); if (!batch_size.has_value() || keys.size() <= *batch_size) { @@ -405,31 +402,26 @@ class AsyncStore : public Store { async::TaskScheduler::instance()->io_thread_count() ); return folly::collect(std::move(futs)).via(&async::io_executor()).thenValue([](auto&&) { - return std::vector{}; + return folly::Unit{}; }); } - std::vector remove_keys_sync( - const std::vector& keys, storage::RemoveOpts opts - ) override { - return remove_keys_sync(std::vector{keys}, opts); + void remove_keys_sync(const std::vector& keys, storage::RemoveOpts opts) override { + remove_keys_sync(std::vector{keys}, opts); } - std::vector remove_keys_sync(std::vector&& keys, storage::RemoveOpts opts) - override { + void remove_keys_sync(std::vector&& keys, storage::RemoveOpts opts) override { if (keys.empty()) { - return {}; + return; } const auto batch_size = library_->max_delete_batch_size(); if (!batch_size.has_value() || keys.size() <= *batch_size) { - return RemoveBatchTask{std::move(keys), library_, opts}(); + RemoveBatchTask{std::move(keys), library_, opts}(); + return; } - std::vector result; for (auto& chunk : chunk_keys(std::move(keys), *batch_size)) { - auto sub = RemoveBatchTask{std::move(chunk), library_, opts}(); - result.insert(result.end(), std::make_move_iterator(sub.begin()), std::make_move_iterator(sub.end())); + RemoveBatchTask{std::move(chunk), library_, opts}(); } - return result; } std::vector> batch_read_compressed( diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index 998e8374ade..07eadddd0a3 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -665,10 +665,7 @@ struct RemoveTask : BaseTask { ARCTICDB_MOVE_ONLY_DEFAULT(RemoveTask) - stream::StreamSink::RemoveKeyResultType operator()() { - lib_->remove(std::move(key_), opts_); - return {}; - } + void operator()() { lib_->remove(std::move(key_), opts_); } }; struct RemoveBatchTask : BaseTask { @@ -685,10 +682,7 @@ struct RemoveBatchTask : BaseTask { ARCTICDB_MOVE_ONLY_DEFAULT(RemoveBatchTask) - std::vector operator()() { - lib_->remove(std::span(keys_), opts_); - return {}; - } + void operator()() { lib_->remove(std::span(keys_), opts_); } }; struct VisitObjectSizesTask : BaseTask { diff --git a/cpp/arcticdb/async/test/test_async.cpp b/cpp/arcticdb/async/test/test_async.cpp index 3cf1ba15a3a..e803085dd70 100644 --- a/cpp/arcticdb/async/test/test_async.cpp +++ b/cpp/arcticdb/async/test/test_async.cpp @@ -795,9 +795,8 @@ TEST(AsyncStoreRemoveKeys, EmptyKeysIsNoOp) { auto storage = std::make_shared(as::LibraryPath{"a", "b"}, as::OpenMode::DELETE, 3); auto store = build_async_store(storage); - auto result = store->remove_keys(std::vector{}, as::RemoveOpts{}).get(); + store->remove_keys(std::vector{}, as::RemoveOpts{}).get(); - ASSERT_TRUE(result.empty()); ASSERT_TRUE(storage->recorded_call_sizes().empty()); } diff --git a/cpp/arcticdb/entity/test/test_variant_key.cpp b/cpp/arcticdb/entity/test/test_variant_key.cpp index 830da1add0a..5d41809595e 100644 --- a/cpp/arcticdb/entity/test/test_variant_key.cpp +++ b/cpp/arcticdb/entity/test/test_variant_key.cpp @@ -29,12 +29,12 @@ AtomKey make_atom(KeyType kt, int64_t id) { TEST(VariantKey, KeyTypesEmpty) { std::vector keys; - ASSERT_TRUE(key_types(keys).empty()); + ASSERT_TRUE(unique_key_types(keys).empty()); } TEST(VariantKey, KeyTypesSingleType) { std::vector keys{make_atom(KeyType::TABLE_DATA, 1), make_atom(KeyType::TABLE_DATA, 2)}; - auto result = key_types(keys); + auto result = unique_key_types(keys); ASSERT_EQ(result.size(), 1u); ASSERT_EQ(result.at(0), KeyType::TABLE_DATA); } @@ -47,7 +47,7 @@ TEST(VariantKey, KeyTypesMultipleTypesDeduped) { make_atom(KeyType::VERSION, 4), make_atom(KeyType::TABLE_INDEX, 5), }; - auto result = key_types(keys); + auto result = unique_key_types(keys); ASSERT_EQ(result.size(), 3u); // Results are returned in KeyType enum order. ASSERT_EQ(result.at(0), KeyType::TABLE_DATA); @@ -60,7 +60,7 @@ TEST(VariantKey, KeyTypesAtomAndRefMix) { make_atom(KeyType::TABLE_DATA, 1), RefKey{"sym", KeyType::VERSION_REF}, }; - auto result = key_types(keys); + auto result = unique_key_types(keys); ASSERT_EQ(result.size(), 2u); ASSERT_EQ(result.at(0), KeyType::TABLE_DATA); ASSERT_EQ(result.at(1), KeyType::VERSION_REF); diff --git a/cpp/arcticdb/entity/variant_key.hpp b/cpp/arcticdb/entity/variant_key.hpp index 76ead468c41..2d45d8c6275 100644 --- a/cpp/arcticdb/entity/variant_key.hpp +++ b/cpp/arcticdb/entity/variant_key.hpp @@ -52,7 +52,7 @@ inline KeyType variant_key_type(const VariantKey& vk) { template requires std::same_as>, VariantKey> -std::vector key_types(R&& keys) { +std::vector unique_key_types(R&& keys) { std::array(KeyType::UNDEFINED)> present{}; for (const auto& k : keys) { present.at(static_cast(variant_key_type(k))) = true; diff --git a/cpp/arcticdb/pipeline/write_frame.cpp b/cpp/arcticdb/pipeline/write_frame.cpp index 32a5c48e67e..175e589d5dd 100644 --- a/cpp/arcticdb/pipeline/write_frame.cpp +++ b/cpp/arcticdb/pipeline/write_frame.cpp @@ -641,9 +641,7 @@ std::vector flatten_and_fix_rows( return output; } -folly::Future remove_slice_and_keys( - std::vector&& slices, StreamSink& sink -) { +folly::Future remove_slice_and_keys(std::vector&& slices, StreamSink& sink) { std::vector keys; std::transform( std::make_move_iterator(std::begin(slices)), @@ -654,7 +652,7 @@ folly::Future remove_slice_and_keys( return sink.remove_keys(std::move(keys)).thenValue([](auto&&) { return folly::makeFuture(); }); } -folly::Future remove_slice_and_keys_batches( +folly::Future remove_slice_and_keys_batches( std::vector>&& slices_batches, StreamSink& sink ) { return folly::collect(folly::window( diff --git a/cpp/arcticdb/pipeline/write_frame.hpp b/cpp/arcticdb/pipeline/write_frame.hpp index 7367497b54c..1ecda480751 100644 --- a/cpp/arcticdb/pipeline/write_frame.hpp +++ b/cpp/arcticdb/pipeline/write_frame.hpp @@ -102,7 +102,7 @@ template requires std::is_same_v || std::is_same_v> folly::SemiFuture> rollback_on_quota_exceeded( std::vector>&& try_slices, - folly::Function(std::vector&&)>&& remove_future + folly::Function(std::vector&&)>&& remove_future ) { std::vector succeeded; std::optional exception; @@ -145,8 +145,6 @@ folly::SemiFuture>> rollback_batches_on_quo const std::shared_ptr& sink ); -folly::Future remove_slice_and_keys( - std::vector&& slices, StreamSink& sink -); +folly::Future remove_slice_and_keys(std::vector&& slices, StreamSink& sink); } // namespace arcticdb::pipelines diff --git a/cpp/arcticdb/storage/s3/detail-inl.hpp b/cpp/arcticdb/storage/s3/detail-inl.hpp index f49a76b9bf1..a03384e0b03 100644 --- a/cpp/arcticdb/storage/s3/detail-inl.hpp +++ b/cpp/arcticdb/storage/s3/detail-inl.hpp @@ -318,7 +318,7 @@ void do_remove_impl( std::vector> stat_timers; if (query_stats::QueryStats::instance()->is_enabled()) { - auto distinct_key_types = key_types(ks); + auto distinct_key_types = unique_key_types(ks); stat_timers.reserve(distinct_key_types.size()); for (auto kt : distinct_key_types) { stat_timers.emplace_back(query_stats::add_task_count_and_time(query_stats::TaskType::S3_DeleteObjects, kt)); diff --git a/cpp/arcticdb/storage/test/in_memory_store.hpp b/cpp/arcticdb/storage/test/in_memory_store.hpp index 6b0b3c1b016..f60b5e339f1 100644 --- a/cpp/arcticdb/storage/test/in_memory_store.hpp +++ b/cpp/arcticdb/storage/test/in_memory_store.hpp @@ -281,7 +281,7 @@ class InMemoryStore : public Store { } } - RemoveKeyResultType remove_key_sync(const entity::VariantKey& key, storage::RemoveOpts opts) override { + void remove_key_sync(const entity::VariantKey& key, storage::RemoveOpts opts) override { StorageFailureSimulator::instance()->go(FailureType::DELETE); std::lock_guard lock{mutex_}; size_t removed = util::variant_match( @@ -293,11 +293,11 @@ class InMemoryStore : public Store { if (removed == 0 && !opts.ignores_missing_key_) { throw storage::KeyNotFoundException(VariantKey(key)); } - return {}; } - folly::Future remove_key(const VariantKey& key, storage::RemoveOpts opts) override { - return folly::makeFuture(remove_key_sync(key, opts)); + folly::Future remove_key(const VariantKey& key, storage::RemoveOpts opts) override { + remove_key_sync(key, opts); + return folly::Unit{}; } timestamp current_timestamp() override { return PilotedClock::nanos_since_epoch(); } @@ -401,47 +401,31 @@ class InMemoryStore : public Store { return output; } - folly::Future> remove_keys( - const std::vector& keys, storage::RemoveOpts opts - ) override { - std::vector output; + folly::Future remove_keys(const std::vector& keys, storage::RemoveOpts opts) + override { for (const auto& key : keys) { - output.emplace_back(remove_key_sync(key, opts)); + remove_key_sync(key, opts); } - - return output; + return folly::Unit{}; } - folly::Future> remove_keys( - std::vector&& keys, storage::RemoveOpts opts - ) override { - std::vector output; + folly::Future remove_keys(std::vector&& keys, storage::RemoveOpts opts) override { for (const auto& key : keys) { - output.emplace_back(remove_key_sync(key, opts)); + remove_key_sync(key, opts); } - - return output; + return folly::Unit{}; } - std::vector remove_keys_sync( - const std::vector& keys, storage::RemoveOpts opts - ) override { - std::vector output; + void remove_keys_sync(const std::vector& keys, storage::RemoveOpts opts) override { for (const auto& key : keys) { - output.emplace_back(remove_key_sync(key, opts)); + remove_key_sync(key, opts); } - - return output; } - std::vector remove_keys_sync(std::vector&& keys, storage::RemoveOpts opts) - override { - std::vector output; + void remove_keys_sync(std::vector&& keys, storage::RemoveOpts opts) override { for (const auto& key : keys) { - output.emplace_back(remove_key_sync(key, opts)); + remove_key_sync(key, opts); } - - return output; } size_t num_atom_keys() const { return seg_by_atom_key_.size(); } diff --git a/cpp/arcticdb/stream/stream_sink.hpp b/cpp/arcticdb/stream/stream_sink.hpp index 781130474d4..3d1c599a075 100644 --- a/cpp/arcticdb/stream/stream_sink.hpp +++ b/cpp/arcticdb/stream/stream_sink.hpp @@ -46,13 +46,6 @@ struct PartialKey { }; struct StreamSink { - /** - The remove_key{,s,sync} methods used to return the key to indicate success/not. However, most implementations - moved() the key internally to avoid expensive string copying, so no key can actually be returned. - In the future, may return a bool. - */ - using RemoveKeyResultType = folly::Unit; - virtual ~StreamSink() = default; [[nodiscard]] virtual folly::Future write( @@ -121,27 +114,25 @@ struct StreamSink { [[nodiscard]] virtual folly::Future batch_write_compressed(std::vector kvs ) = 0; - [[nodiscard]] virtual folly::Future remove_key( + [[nodiscard]] virtual folly::Future remove_key( const entity::VariantKey& key, storage::RemoveOpts opts = storage::RemoveOpts{} ) = 0; - virtual RemoveKeyResultType remove_key_sync( - const entity::VariantKey& key, storage::RemoveOpts opts = storage::RemoveOpts{} - ) = 0; + virtual void remove_key_sync(const entity::VariantKey& key, storage::RemoveOpts opts = storage::RemoveOpts{}) = 0; - [[nodiscard]] virtual folly::Future> remove_keys( + [[nodiscard]] virtual folly::Future remove_keys( const std::vector& keys, storage::RemoveOpts opts = storage::RemoveOpts{} ) = 0; - [[nodiscard]] virtual folly::Future> remove_keys( + [[nodiscard]] virtual folly::Future remove_keys( std::vector&& keys, storage::RemoveOpts opts = storage::RemoveOpts{} ) = 0; - virtual std::vector remove_keys_sync( + virtual void remove_keys_sync( const std::vector& keys, storage::RemoveOpts opts = storage::RemoveOpts{} ) = 0; - virtual std::vector remove_keys_sync( + virtual void remove_keys_sync( std::vector&& keys, storage::RemoveOpts opts = storage::RemoveOpts{} ) = 0; diff --git a/cpp/arcticdb/util/key_utils.hpp b/cpp/arcticdb/util/key_utils.hpp index 0e3f9cbb2e4..5adf643aa3f 100644 --- a/cpp/arcticdb/util/key_utils.hpp +++ b/cpp/arcticdb/util/key_utils.hpp @@ -42,6 +42,8 @@ inline void delete_keys_of_type_if( keys = {}; keys.reserve(flush_threshold); const auto batch_size = batch.size(); + // Async remove_keys spreads the batch out across the IO threadpool, remove_keys_sync would + // delete the keys in serial. We block on the result here to bound memory. store->remove_keys(std::move(batch)).get(); total_flushed += batch_size; log::storage().debug( diff --git a/cpp/arcticdb/version/symbol_list.cpp b/cpp/arcticdb/version/symbol_list.cpp index 438cb47022a..9364754fb13 100644 --- a/cpp/arcticdb/version/symbol_list.cpp +++ b/cpp/arcticdb/version/symbol_list.cpp @@ -764,9 +764,7 @@ VariantKey write_symbols( return store->write_sync(KeyType::SYMBOL_LIST, 0, stream_id, NumericIndex{0}, NumericIndex{0}, std::move(segment)); } -std::vector delete_keys( - const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude -) { +void delete_keys(const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude) { auto to_remove = std::move(remove); std::vector variant_keys; variant_keys.reserve(to_remove.size()); @@ -777,7 +775,7 @@ std::vector delete_keys( variant_keys.emplace_back(atom_key); } - return store->remove_keys_sync(variant_keys); + store->remove_keys_sync(variant_keys); } bool has_recent_compaction( diff --git a/cpp/arcticdb/version/symbol_list.hpp b/cpp/arcticdb/version/symbol_list.hpp index f0096b1c92a..723b6e73c40 100644 --- a/cpp/arcticdb/version/symbol_list.hpp +++ b/cpp/arcticdb/version/symbol_list.hpp @@ -215,9 +215,7 @@ class SymbolList { [[nodiscard]] bool needs_compaction(const LoadResult& load_result) const; }; -std::vector delete_keys( - const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude -); +void delete_keys(const std::shared_ptr& store, std::vector&& remove, const AtomKey& exclude); struct WriteSymbolTask : async::BaseTask { const std::shared_ptr store_; diff --git a/cpp/arcticdb/version/version_map.hpp b/cpp/arcticdb/version/version_map.hpp index 387c602ef4e..aa08a553450 100644 --- a/cpp/arcticdb/version/version_map.hpp +++ b/cpp/arcticdb/version/version_map.hpp @@ -709,7 +709,7 @@ class VersionMapImpl { ); store->remove_key_sync(*entry.head_); } - std::vector> key_futs; + std::vector> key_futs; for (const auto& key : entry.keys_) { if (key.type() == KeyType::VERSION) key_futs.emplace_back(store->remove_key(key.to_atom_key(entry.stream_id_))); diff --git a/python/tests/integration/arcticdb/version_store/test_bulk_delete.py b/python/tests/integration/arcticdb/version_store/test_bulk_delete.py index 4d7ff236d91..8ca1a5d83e8 100644 --- a/python/tests/integration/arcticdb/version_store/test_bulk_delete.py +++ b/python/tests/integration/arcticdb/version_store/test_bulk_delete.py @@ -11,7 +11,7 @@ import pytest from arcticdb_ext.storage import KeyType -from arcticdb.util.test import config_context, query_stats_operation_count +from arcticdb.util.test import config_context, config_context_multi, query_stats_operation_count import arcticdb.toolbox.query_stats as qs @@ -22,15 +22,12 @@ def _write_versions(lib, symbol, num_versions): @pytest.mark.storage -def test_bulk_delete_batching(object_and_mem_and_lmdb_version_store, sym): +def test_delete_removes_all_keys_when_chunked(object_and_mem_and_lmdb_version_store, sym): lib = object_and_mem_and_lmdb_version_store num_versions = 12 batch_size = 5 - with ( - config_context("S3Storage.DeleteBatchSize", batch_size), - config_context("AzureStorage.DeleteBatchSize", batch_size), - ): + with config_context_multi({"S3Storage.DeleteBatchSize": batch_size, "AzureStorage.DeleteBatchSize": batch_size}): _write_versions(lib, sym, num_versions) lib_tool = lib.library_tool()