Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,7 @@ if(${TEST})
entity/test/test_key_serialization.cpp
entity/test/test_metrics.cpp
entity/test/test_ref_key.cpp
entity/test/test_variant_key.cpp
Comment thread
poodlewars marked this conversation as resolved.
entity/test/test_tensor.cpp
log/test/test_log.cpp
pipeline/test/test_container.hpp
Expand Down
72 changes: 52 additions & 20 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,37 +372,56 @@ class AsyncStore : public Store {
return async::submit_io_task(WriteCompressedBatchTask(std::move(kvs), library_));
}

folly::Future<RemoveKeyResultType> remove_key(const entity::VariantKey& key, storage::RemoveOpts opts) override {
folly::Future<folly::Unit> remove_key(const entity::VariantKey& key, storage::RemoveOpts opts) override {
return async::submit_io_task(RemoveTask{key, library_, opts});
}

RemoveKeyResultType remove_key_sync(const entity::VariantKey& key, storage::RemoveOpts opts) override {
return RemoveTask{key, library_, opts}();
void remove_key_sync(const entity::VariantKey& key, storage::RemoveOpts opts) override {
RemoveTask{key, library_, opts}();
}

folly::Future<std::vector<RemoveKeyResultType>> remove_keys(
const std::vector<entity::VariantKey>& keys, storage::RemoveOpts opts
) override {
return keys.empty() ? std::vector<RemoveKeyResultType>()
: async::submit_io_task(RemoveBatchTask{keys, library_, opts});
folly::Future<folly::Unit> remove_keys(const std::vector<entity::VariantKey>& keys, storage::RemoveOpts opts)
override {
return remove_keys(std::vector<entity::VariantKey>{keys}, opts);
}

folly::Future<std::vector<RemoveKeyResultType>> remove_keys(
std::vector<entity::VariantKey>&& keys, storage::RemoveOpts opts
) override {
return keys.empty() ? std::vector<RemoveKeyResultType>()
: async::submit_io_task(RemoveBatchTask{std::move(keys), library_, opts});
folly::Future<folly::Unit> remove_keys(std::vector<entity::VariantKey>&& keys, storage::RemoveOpts opts) override {
if (keys.empty()) {
return folly::Unit{};
}
const auto batch_size = library_->max_delete_batch_size();
if (!batch_size.has_value() || keys.size() <= *batch_size) {
return async::submit_io_task(RemoveBatchTask{std::move(keys), library_, opts});
}
auto chunks = chunk_keys(std::move(keys), *batch_size);
auto futs = folly::window(
std::move(chunks),
[this, opts](std::vector<entity::VariantKey>&& chunk) {
return async::submit_io_task(RemoveBatchTask{std::move(chunk), library_, opts});
},
async::TaskScheduler::instance()->io_thread_count()
);
return folly::collect(std::move(futs)).via(&async::io_executor()).thenValue([](auto&&) {
return folly::Unit{};
});
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/man-group/ArcticDB/blob/master/cpp/arcticdb/stream/stream_sink.hpp#L49-L54
Don't have to, but if you want to rip out this pointless abstraction while you're here I won't object

}

std::vector<RemoveKeyResultType> remove_keys_sync(
const std::vector<entity::VariantKey>& keys, storage::RemoveOpts opts
) override {
return keys.empty() ? std::vector<RemoveKeyResultType>() : RemoveBatchTask{keys, library_, opts}();
void remove_keys_sync(const std::vector<entity::VariantKey>& keys, storage::RemoveOpts opts) override {
remove_keys_sync(std::vector<entity::VariantKey>{keys}, opts);
}

std::vector<RemoveKeyResultType> remove_keys_sync(std::vector<entity::VariantKey>&& keys, storage::RemoveOpts opts)
override {
return keys.empty() ? std::vector<RemoveKeyResultType>() : RemoveBatchTask{std::move(keys), library_, opts}();
void remove_keys_sync(std::vector<entity::VariantKey>&& keys, storage::RemoveOpts opts) override {
if (keys.empty()) {
return;
}
const auto batch_size = library_->max_delete_batch_size();
if (!batch_size.has_value() || keys.size() <= *batch_size) {
RemoveBatchTask{std::move(keys), library_, opts}();
return;
}
for (auto& chunk : chunk_keys(std::move(keys), *batch_size)) {
RemoveBatchTask{std::move(chunk), library_, opts}();
}
}

std::vector<folly::Future<VariantKey>> batch_read_compressed(
Expand Down Expand Up @@ -513,6 +532,19 @@ class AsyncStore : public Store {

private:
friend class arcticdb::toolbox::apy::LibraryTool;

static std::vector<std::vector<entity::VariantKey>> chunk_keys(
std::vector<entity::VariantKey>&& keys, size_t batch_size
) {
Comment thread
poodlewars marked this conversation as resolved.
std::vector<std::vector<entity::VariantKey>> chunks;
chunks.reserve((keys.size() + batch_size - 1) / batch_size);
for (size_t i = 0; i < keys.size(); i += batch_size) {
const size_t end = std::min(i + batch_size, keys.size());
chunks.emplace_back(std::make_move_iterator(keys.begin() + i), std::make_move_iterator(keys.begin() + end));
}
return chunks;
}

std::shared_ptr<storage::Library> library_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_;
const EncodingVersion encoding_version_;
Expand Down
10 changes: 2 additions & 8 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,10 +665,7 @@ struct RemoveTask : BaseTask {

ARCTICDB_MOVE_ONLY_DEFAULT(RemoveTask)

stream::StreamSink::RemoveKeyResultType operator()() {
lib_->remove(std::move(key_), opts_);
return {};
}
void operator()() { lib_->remove(std::move(key_), opts_); }
};

struct RemoveBatchTask : BaseTask {
Expand All @@ -685,10 +682,7 @@ struct RemoveBatchTask : BaseTask {

ARCTICDB_MOVE_ONLY_DEFAULT(RemoveBatchTask)

std::vector<stream::StreamSink::RemoveKeyResultType> operator()() {
lib_->remove(std::span(keys_), opts_);
return {};
}
void operator()() { lib_->remove(std::span(keys_), opts_); }
};

struct VisitObjectSizesTask : BaseTask {
Expand Down
143 changes: 143 additions & 0 deletions cpp/arcticdb/async/test/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <arcticdb/storage/mock/s3_mock_client.hpp>
#include <arcticdb/storage/s3/detail-inl.hpp>
#include <arcticdb/storage/mock/storage_mock_client.hpp>
#include <arcticdb/util/configs_map.hpp>
#include <arcticdb/util/key_utils.hpp>
#include <aws/core/Aws.h>

using namespace arcticdb;
Expand Down Expand Up @@ -671,3 +673,144 @@ TEST(Async, CopyCompressedInterStoreKeyExistsCheckFailureWithRetry) {
ASSERT_EQ(std::get<RefKey>(read_result_2.first), key);
ASSERT_EQ(read_result_2.second.row_count(), row_count);
}

namespace remove_batch_test {

class RecordingStorage final : public as::Storage {
public:
RecordingStorage(const as::LibraryPath& lib, as::OpenMode mode, std::optional<size_t> batch_size) :
Storage(lib, mode),
batch_size_(batch_size) {}

std::string name() const final { return "recording_storage"; }

std::optional<size_t> max_delete_batch_size() const override { return batch_size_; }

std::vector<size_t> recorded_call_sizes() {
std::lock_guard guard(mutex_);
return recorded_;
}

void seed_iter_keys(std::vector<entity::VariantKey> keys) {
std::lock_guard guard(mutex_);
iter_keys_ = std::move(keys);
}

private:
void do_write(as::KeySegmentPair&) final { util::raise_rte("unused"); }
void do_write_if_none(as::KeySegmentPair&) final { util::raise_rte("unused"); }
void do_update(as::KeySegmentPair&, as::UpdateOpts) final { util::raise_rte("unused"); }
void do_read(entity::VariantKey&&, const as::ReadVisitor&, as::ReadKeyOpts) final { util::raise_rte("unused"); }
as::KeySegmentPair do_read(entity::VariantKey&&, as::ReadKeyOpts) final { util::raise_rte("unused"); }

void do_remove(entity::VariantKey&&, as::RemoveOpts) final {
std::lock_guard guard(mutex_);
recorded_.push_back(1);
}

void do_remove(std::span<entity::VariantKey> keys, as::RemoveOpts) final {
std::lock_guard guard(mutex_);
recorded_.push_back(keys.size());
}

bool do_key_exists(const entity::VariantKey&) final { return false; }
bool do_supports_prefix_matching() const final { return false; }
as::SupportsAtomicWrites do_supports_atomic_writes() const final { return as::SupportsAtomicWrites::NO; }
bool do_fast_delete() final { return false; }
bool do_iterate_type_until_match(entity::KeyType, const as::IterateTypePredicate& visitor, const std::string&)
final {
std::vector<entity::VariantKey> keys;
{
std::lock_guard guard(mutex_);
keys = iter_keys_;
}
for (auto& key : keys) {
if (visitor(entity::VariantKey{key})) {
return true;
}
}
return false;
}
std::string do_key_path(const entity::VariantKey&) const final { return {}; }

std::optional<size_t> batch_size_;
std::mutex mutex_;
std::vector<size_t> recorded_;
std::vector<entity::VariantKey> iter_keys_;
};

inline std::shared_ptr<aa::AsyncStore<>> build_async_store(std::shared_ptr<RecordingStorage> storage) {
auto storages = std::make_shared<as::Storages>(as::Storages::StorageVector{storage}, as::OpenMode::DELETE);
auto library = std::make_shared<as::Library>(as::LibraryPath{"a", "b"}, std::move(storages));
return std::make_shared<aa::AsyncStore<>>(std::move(library), proto::encoding::VariantCodec{}, EncodingVersion::V1);
}

inline std::vector<entity::VariantKey> make_keys(size_t n) {
std::vector<entity::VariantKey> keys;
keys.reserve(n);
for (size_t i = 0; i < n; ++i) {
keys.emplace_back(arcticdb::atom_key_builder().build(fmt::format("k_{}", i), entity::KeyType::TABLE_DATA));
}
return keys;
}

} // namespace remove_batch_test

TEST(AsyncStoreRemoveKeys, AsyncSplitsIntoChunksRespectingBatchSize) {
using namespace remove_batch_test;
auto storage = std::make_shared<RecordingStorage>(as::LibraryPath{"a", "b"}, as::OpenMode::DELETE, 3);
auto store = build_async_store(storage);

store->remove_keys(make_keys(10), as::RemoveOpts{}).get();

auto sizes = storage->recorded_call_sizes();
std::sort(sizes.begin(), sizes.end());
ASSERT_EQ(sizes, std::vector<size_t>({1, 3, 3, 3}));
}

TEST(AsyncStoreRemoveKeys, SyncSplitsIntoChunksRespectingBatchSize) {
using namespace remove_batch_test;
auto storage = std::make_shared<RecordingStorage>(as::LibraryPath{"a", "b"}, as::OpenMode::DELETE, 3);
auto store = build_async_store(storage);

store->remove_keys_sync(make_keys(10), as::RemoveOpts{});

auto sizes = storage->recorded_call_sizes();
ASSERT_EQ(sizes, std::vector<size_t>({3, 3, 3, 1}));
}

TEST(AsyncStoreRemoveKeys, NulloptBatchSizeSendsSingleCall) {
using namespace remove_batch_test;
auto storage = std::make_shared<RecordingStorage>(as::LibraryPath{"a", "b"}, as::OpenMode::DELETE, std::nullopt);
auto store = build_async_store(storage);

store->remove_keys(make_keys(100), as::RemoveOpts{}).get();

auto sizes = storage->recorded_call_sizes();
ASSERT_EQ(sizes, std::vector<size_t>({100}));
}

TEST(AsyncStoreRemoveKeys, EmptyKeysIsNoOp) {
using namespace remove_batch_test;
auto storage = std::make_shared<RecordingStorage>(as::LibraryPath{"a", "b"}, as::OpenMode::DELETE, 3);
auto store = build_async_store(storage);

store->remove_keys(std::vector<entity::VariantKey>{}, as::RemoveOpts{}).get();

ASSERT_TRUE(storage->recorded_call_sizes().empty());
}

TEST(KeyUtilsDeleteBatching, FlushesPendingBufferAtThreshold) {
using namespace remove_batch_test;
ScopedConfig scoped("Storage.DeletePendingBufferSize", 3);

auto storage = std::make_shared<RecordingStorage>(as::LibraryPath{"a", "b"}, as::OpenMode::DELETE, std::nullopt);
storage->seed_iter_keys(make_keys(11));
auto store = build_async_store(storage);

arcticdb::delete_keys_of_type_if(
store, [](const entity::VariantKey&) { return true; }, entity::KeyType::TABLE_DATA
);

ASSERT_EQ(storage->recorded_call_sizes(), std::vector<size_t>({3, 3, 3, 2}));
}
67 changes: 67 additions & 0 deletions cpp/arcticdb/entity/test/test_variant_key.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* Copyright 2026 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software
* will be governed by the Apache License, version 2.0.
*/

#include <gtest/gtest.h>

#include <arcticdb/entity/atom_key.hpp>
#include <arcticdb/entity/ref_key.hpp>
#include <arcticdb/entity/variant_key.hpp>

using namespace arcticdb;
using namespace arcticdb::entity;

namespace {
AtomKey make_atom(KeyType kt, int64_t id) {
return atom_key_builder()
.version_id(0)
.creation_ts(0)
.content_hash(0)
.start_index(NumericId{0})
.end_index(NumericId{0})
.build(NumericId{id}, kt);
}
} // namespace

TEST(VariantKey, KeyTypesEmpty) {
std::vector<VariantKey> keys;
ASSERT_TRUE(unique_key_types(keys).empty());
}

TEST(VariantKey, KeyTypesSingleType) {
std::vector<VariantKey> keys{make_atom(KeyType::TABLE_DATA, 1), make_atom(KeyType::TABLE_DATA, 2)};
auto result = unique_key_types(keys);
ASSERT_EQ(result.size(), 1u);
ASSERT_EQ(result.at(0), KeyType::TABLE_DATA);
}

TEST(VariantKey, KeyTypesMultipleTypesDeduped) {
std::vector<VariantKey> keys{
make_atom(KeyType::TABLE_DATA, 1),
make_atom(KeyType::TABLE_INDEX, 2),
make_atom(KeyType::TABLE_DATA, 3),
make_atom(KeyType::VERSION, 4),
make_atom(KeyType::TABLE_INDEX, 5),
};
auto result = unique_key_types(keys);
ASSERT_EQ(result.size(), 3u);
// Results are returned in KeyType enum order.
ASSERT_EQ(result.at(0), KeyType::TABLE_DATA);
ASSERT_EQ(result.at(1), KeyType::TABLE_INDEX);
ASSERT_EQ(result.at(2), KeyType::VERSION);
}

TEST(VariantKey, KeyTypesAtomAndRefMix) {
std::vector<VariantKey> keys{
make_atom(KeyType::TABLE_DATA, 1),
RefKey{"sym", KeyType::VERSION_REF},
};
auto result = unique_key_types(keys);
ASSERT_EQ(result.size(), 2u);
ASSERT_EQ(result.at(0), KeyType::TABLE_DATA);
ASSERT_EQ(result.at(1), KeyType::VERSION_REF);
}
18 changes: 18 additions & 0 deletions cpp/arcticdb/entity/variant_key.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

#include <arcticdb/entity/atom_key.hpp>
#include <arcticdb/entity/ref_key.hpp>
#include <array>
#include <variant>
#include <vector>
#include <ranges>

namespace arcticdb::entity {
Expand Down Expand Up @@ -48,6 +50,22 @@ inline KeyType variant_key_type(const VariantKey& vk) {
return std::visit([](const auto& key) { return key.type(); }, vk);
}

template<std::ranges::range R>
requires std::same_as<std::remove_cvref_t<std::ranges::range_value_t<R>>, VariantKey>
std::vector<KeyType> unique_key_types(R&& keys) {
std::array<bool, static_cast<size_t>(KeyType::UNDEFINED)> present{};
for (const auto& k : keys) {
present.at(static_cast<size_t>(variant_key_type(k))) = true;
}
std::vector<KeyType> result;
for (size_t i = 0; i < present.size(); ++i) {
if (present.at(i)) {
result.push_back(static_cast<KeyType>(i));
}
}
return result;
}

inline const StreamId& variant_key_id(const VariantKey& vk) {
return std::visit([](const auto& key) -> const StreamId& { return key.id(); }, vk);
}
Expand Down
Loading
Loading