From e29e3a91327a9757a4f295d29db3ac52082dbc2d Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Thu, 14 May 2026 19:11:47 +0300 Subject: [PATCH 1/2] Refactor component manager This PR contains 3 functional changes and one cosmetic change. * There is a special case for component of type EntityFetchCount. They are wrapped in atomic type and stored as atomics in the component manager. There are several constexpr ifs checking this. The problem was that the the EntityFetchCount was always added. So we ended up with one EntityFetchCount and one std::atomic which is wrong. All constexpr ifs must have an else * Implement perfect forwarding. All entities were passed by value. Now the methods take forwarding reference. * There were two implementations of replace_entities one taking vector and one taking template parameter. There are two issues with this. First we cannot use this to place the same vector in all of the entities because the type deduction will pick up the more specific implementation. Second parameters were passed by value. The function taking vector is now replace_entities_zip and moves the elements of the vector. The function taking a single parameter and placing it in all entities is replace_entities and takes a const ref. We cannot move in this case * Cosmetic changes: some const std::vector were changed to std::span so that it can accept wider range of types (e.g. std::array) --- cpp/arcticdb/processing/clause.cpp | 6 +-- cpp/arcticdb/processing/component_manager.hpp | 46 +++++++++++-------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 213a45cb621..3ca8d4e51cb 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -940,7 +940,7 @@ std::vector> RowRangeClause::structure_for_processing( new_row_ranges.emplace_back(std::move(new_row_range)); } - component_manager_->replace_entities>(entity_ids, new_row_ranges); + component_manager_->replace_entities_zip(entity_ids, std::move(new_row_ranges)); auto new_structure_offsets = structure_by_row_slice(ranges_and_entities); return offsets_to_entity_ids(new_structure_offsets, ranges_and_entities); @@ -1130,8 +1130,8 @@ std::vector> ConcatClause::structure_for_processing( new_row_ranges.emplace_back(std::move(new_row_range)); } } - component_manager_->replace_entities>( - util::flatten_vectors(std::move(entity_ids_vec)), new_row_ranges + component_manager_->replace_entities_zip( + util::flatten_vectors(std::move(entity_ids_vec)), std::move(new_row_ranges) ); auto new_structure_offsets = structure_by_row_slice(ranges_and_entities); return offsets_to_entity_ids(new_structure_offsets, ranges_and_entities); diff --git a/cpp/arcticdb/processing/component_manager.hpp b/cpp/arcticdb/processing/component_manager.hpp index a8fa749b3fb..d28ce9cec39 100644 --- a/cpp/arcticdb/processing/component_manager.hpp +++ b/cpp/arcticdb/processing/component_manager.hpp @@ -35,18 +35,19 @@ class ComponentManager { // Add a single entity with the components defined by args template - void add_entity(EntityId id, Args... args) { + void add_entity(EntityId id, Args&&... args) { std::unique_lock lock(mtx_); ( [&] { - registry_.emplace(id, args); // Store the initial entity fetch count component as a "first-class" entity, accessible by // registry_.get(id), as this is external facing (used by resample) // The remaining entity fetch count below will be decremented each time an entity is fetched, but is // never accessed externally. Stored as an atomic to minimise the requirement to take the // shared_mutex with a unique_lock. - if constexpr (std::is_same_v) { - registry_.emplace>(id, args); + if constexpr (std::same_as, EntityFetchCount>) { + registry_.emplace>(id, std::forward(args)); + } else { + registry_.emplace(id, std::forward(args)); } }(), ... @@ -56,7 +57,7 @@ class ComponentManager { // Add a collection of entities. Each element of args should be a collection of components, all of which have the // same number of elements template - std::vector add_entities(Args... args) { + std::vector add_entities(Args&&... args) { std::vector ids; size_t entity_count{0}; ARCTICDB_SAMPLE_DEFAULT(AddEntities) @@ -74,11 +75,15 @@ class ComponentManager { "ComponentManager::add_entities received collections of differing lengths" ); } - registry_.insert(ids.cbegin(), ids.cend(), args.begin()); - if constexpr (std::is_same_v) { + using T = std::decay_t; + if constexpr (std::same_as) { for (auto&& [idx, id] : folly::enumerate(ids)) { - registry_.emplace>(id, args[idx]); + registry_.emplace>( + id, std::forward(args[idx]) + ); } + } else { + registry_.insert(ids.cbegin(), ids.cend(), std::make_move_iterator(args.begin())); } }(), ... @@ -87,29 +92,32 @@ class ComponentManager { } template - void replace_entities(const std::vector& ids, T value) { + void replace_entities(std::span ids, const T& value) { ARCTICDB_SAMPLE_DEFAULT(ReplaceEntities) std::unique_lock lock(mtx_); for (auto id : ids) { - registry_.replace(id, value); - if constexpr (std::is_same_v) { + if constexpr (std::same_as, EntityFetchCount>) { update_entity_fetch_count(id, value); + } else { + registry_.replace(id, value); } } } - template - void replace_entities(const std::vector& ids, const std::vector& values) { + template + void replace_entities_zip(std::span ids, R&& values) { ARCTICDB_SAMPLE_DEFAULT(ReplaceEntityValues) internal::check( ids.size() == values.size(), "Received vectors of differing lengths in ComponentManager::replace_entities" ); + using T = std::ranges::range_value_t; std::unique_lock lock(mtx_); for (auto [idx, id] : folly::enumerate(ids)) { - registry_.replace(id, values[idx]); - if constexpr (std::is_same_v) { - update_entity_fetch_count(id, values[idx]); + if constexpr (std::same_as) { + update_entity_fetch_count(id, std::forward(values)[idx]); + } else { + registry_.replace(id, std::forward(values)[idx]); } } } @@ -118,13 +126,13 @@ class ComponentManager { // Get a collection of entities. Returns a tuple of vectors, one for each component requested via Args template - std::tuple...> get_entities_and_decrement_refcount(const std::vector& ids) { + std::tuple...> get_entities_and_decrement_refcount(std::span ids) { return get_entities_impl(ids, true); } // Get a collection of entities. Returns a tuple of vectors, one for each component requested via Args template - std::tuple...> get_entities(const std::vector& ids) { + std::tuple...> get_entities(std::span ids) { return get_entities_impl(ids, false); } @@ -133,7 +141,7 @@ class ComponentManager { void update_entity_fetch_count(EntityId id, EntityFetchCount count); template - std::tuple...> get_entities_impl(const std::vector& ids, bool decrement_ref_count) { + std::tuple...> get_entities_impl(std::span ids, bool decrement_ref_count) { std::vector> tuple_res; ARCTICDB_SAMPLE_DEFAULT(GetEntities) tuple_res.reserve(ids.size()); From 49f31f9a41ae231d71ff8c3ae91b173fbb48c2e9 Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Wed, 20 May 2026 16:58:35 +0300 Subject: [PATCH 2/2] Fix fetch counts --- cpp/arcticdb/processing/component_manager.hpp | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/cpp/arcticdb/processing/component_manager.hpp b/cpp/arcticdb/processing/component_manager.hpp index d28ce9cec39..9cbe007ef25 100644 --- a/cpp/arcticdb/processing/component_manager.hpp +++ b/cpp/arcticdb/processing/component_manager.hpp @@ -39,6 +39,7 @@ class ComponentManager { std::unique_lock lock(mtx_); ( [&] { + registry_.emplace(id, std::forward(args)); // Store the initial entity fetch count component as a "first-class" entity, accessible by // registry_.get(id), as this is external facing (used by resample) // The remaining entity fetch count below will be decremented each time an entity is fetched, but is @@ -46,8 +47,6 @@ class ComponentManager { // shared_mutex with a unique_lock. if constexpr (std::same_as, EntityFetchCount>) { registry_.emplace>(id, std::forward(args)); - } else { - registry_.emplace(id, std::forward(args)); } }(), ... @@ -76,14 +75,13 @@ class ComponentManager { ); } using T = std::decay_t; + registry_.insert(ids.cbegin(), ids.cend(), std::make_move_iterator(args.begin())); if constexpr (std::same_as) { for (auto&& [idx, id] : folly::enumerate(ids)) { registry_.emplace>( id, std::forward(args[idx]) ); } - } else { - registry_.insert(ids.cbegin(), ids.cend(), std::make_move_iterator(args.begin())); } }(), ... @@ -96,10 +94,9 @@ class ComponentManager { ARCTICDB_SAMPLE_DEFAULT(ReplaceEntities) std::unique_lock lock(mtx_); for (auto id : ids) { + registry_.replace(id, value); if constexpr (std::same_as, EntityFetchCount>) { update_entity_fetch_count(id, value); - } else { - registry_.replace(id, value); } } } @@ -114,10 +111,9 @@ class ComponentManager { using T = std::ranges::range_value_t; std::unique_lock lock(mtx_); for (auto [idx, id] : folly::enumerate(ids)) { + registry_.replace(id, std::forward(values)[idx]); if constexpr (std::same_as) { update_entity_fetch_count(id, std::forward(values)[idx]); - } else { - registry_.replace(id, std::forward(values)[idx]); } } }