Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ std::vector<std::vector<EntityId>> RowRangeClause::structure_for_processing(
new_row_ranges.emplace_back(std::move(new_row_range));
}

component_manager_->replace_entities<std::shared_ptr<RowRange>>(entity_ids, new_row_ranges);
component_manager_->replace_entities_zip(entity_ids, std::move(new_row_ranges));

auto new_structure_offsets = structure_by_row_slice(ranges_and_entities);
return offsets_to_entity_ids(new_structure_offsets, ranges_and_entities);
Expand Down Expand Up @@ -1130,8 +1130,8 @@ std::vector<std::vector<EntityId>> ConcatClause::structure_for_processing(
new_row_ranges.emplace_back(std::move(new_row_range));
}
}
component_manager_->replace_entities<std::shared_ptr<RowRange>>(
util::flatten_vectors(std::move(entity_ids_vec)), new_row_ranges
component_manager_->replace_entities_zip(
util::flatten_vectors(std::move(entity_ids_vec)), std::move(new_row_ranges)
);
auto new_structure_offsets = structure_by_row_slice(ranges_and_entities);
return offsets_to_entity_ids(new_structure_offsets, ranges_and_entities);
Expand Down
40 changes: 22 additions & 18 deletions cpp/arcticdb/processing/component_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ class ComponentManager {

// Add a single entity with the components defined by args
template<class... Args>
void add_entity(EntityId id, Args... args) {
void add_entity(EntityId id, Args&&... args) {
std::unique_lock lock(mtx_);
(
[&] {
registry_.emplace<Args>(id, args);
registry_.emplace<Args>(id, std::forward<decltype(args)>(args));
// Store the initial entity fetch count component as a "first-class" entity, accessible by
// registry_.get<EntityFetchCount>(id), as this is external facing (used by resample)
// The remaining entity fetch count below will be decremented each time an entity is fetched, but is
// never accessed externally. Stored as an atomic to minimise the requirement to take the
// shared_mutex with a unique_lock.
if constexpr (std::is_same_v<Args, EntityFetchCount>) {
registry_.emplace<std::atomic<EntityFetchCount>>(id, args);
if constexpr (std::same_as<std::decay_t<Args>, EntityFetchCount>) {
registry_.emplace<std::atomic<EntityFetchCount>>(id, std::forward<decltype(args)>(args));
}
Comment on lines +48 to 50
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical: this branch is no longer additive — when Args = EntityFetchCount only the std::atomic<EntityFetchCount> is emplaced, and the plain EntityFetchCount component is no longer added. The comment immediately above still documents that both are intentional: the plain copy is the "first-class" entity "accessible by registry_.get<EntityFetchCount>(id), as this is external facing (used by resample)", and the atomic is the internal counter that gets decremented.

External consumers retrieve the plain value via registry_.view<…, const EntityFetchCount>().get(id):

  • clause_utils.hpp::gather_entitiesget_entities_and_decrement_refcount<…, EntityFetchCount>(...) → the view path.
  • ResampleClause::process in clause_resample.cpp:290 is exactly this on production data.
  • ComponentManager.Simple (test_component_manager.cpp:35) and ResampleClause tests at test_resample.cpp:375 exercise it directly.

With the plain EntityFetchCount storage now empty, view.get(id) reads from a storage that doesn't contain the entity (UB / assertion in entt). The PR description's reasoning "the EntityFetchCount was always added… which is wrong" doesn't hold here — both entries were required.

The same regression applies to the matching else-branches in add_entities, replace_entities, and replace_entities_zip below. They should remain additive for the EntityFetchCount case (emplace/replace both the plain value and the atomic), not exclusive.

}(),
...
Expand All @@ -56,7 +56,7 @@ class ComponentManager {
// Add a collection of entities. Each element of args should be a collection of components, all of which have the
// same number of elements
template<class... Args>
std::vector<EntityId> add_entities(Args... args) {
std::vector<EntityId> add_entities(Args&&... args) {
std::vector<EntityId> ids;
size_t entity_count{0};
ARCTICDB_SAMPLE_DEFAULT(AddEntities)
Expand All @@ -74,10 +74,13 @@ class ComponentManager {
"ComponentManager::add_entities received collections of differing lengths"
);
}
registry_.insert<typename Args::value_type>(ids.cbegin(), ids.cend(), args.begin());
if constexpr (std::is_same_v<typename Args::value_type, EntityFetchCount>) {
using T = std::decay_t<typename Args::value_type>;
registry_.insert<T>(ids.cbegin(), ids.cend(), std::make_move_iterator(args.begin()));
if constexpr (std::same_as<T, EntityFetchCount>) {
for (auto&& [idx, id] : folly::enumerate(ids)) {
registry_.emplace<std::atomic<EntityFetchCount>>(id, args[idx]);
registry_.emplace<std::atomic<EntityFetchCount>>(
id, std::forward<EntityFetchCount>(args[idx])
);
}
}
}(),
Expand All @@ -87,29 +90,30 @@ class ComponentManager {
}

template<typename T>
void replace_entities(const std::vector<EntityId>& ids, T value) {
void replace_entities(std::span<const EntityId> ids, const T& value) {
ARCTICDB_SAMPLE_DEFAULT(ReplaceEntities)
std::unique_lock lock(mtx_);
for (auto id : ids) {
registry_.replace<T>(id, value);
if constexpr (std::is_same_v<T, EntityFetchCount>) {
if constexpr (std::same_as<std::decay_t<T>, EntityFetchCount>) {
update_entity_fetch_count(id, value);
}
}
}

template<typename T>
void replace_entities(const std::vector<EntityId>& ids, const std::vector<T>& values) {
template<std::ranges::random_access_range R>
void replace_entities_zip(std::span<const EntityId> ids, R&& values) {
ARCTICDB_SAMPLE_DEFAULT(ReplaceEntityValues)
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
ids.size() == values.size(),
"Received vectors of differing lengths in ComponentManager::replace_entities"
);
using T = std::ranges::range_value_t<R>;
std::unique_lock lock(mtx_);
for (auto [idx, id] : folly::enumerate(ids)) {
registry_.replace<T>(id, values[idx]);
if constexpr (std::is_same_v<T, EntityFetchCount>) {
update_entity_fetch_count(id, values[idx]);
registry_.replace<T>(id, std::forward<R>(values)[idx]);
if constexpr (std::same_as<T, EntityFetchCount>) {
update_entity_fetch_count(id, std::forward<R>(values)[idx]);
}
}
Comment on lines 113 to 118
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::forward<R>(values)[idx] does not move the element. std::vector::operator[] has no rvalue overload, so even when std::forward<R>(values) produces an rvalue reference to the vector, indexing it returns T& (lvalue) and the elements are copied, not moved. The PR description cites "perfect forwarding" as the motivation, but here it has no effect.

If the intent is to consume the values, use std::move(values[idx]) (or std::ranges::iter_move) inside the loop — keeping in mind that this should only fire when R is an rvalue reference, otherwise it silently moves out of the caller's container.

}
Expand All @@ -118,13 +122,13 @@ class ComponentManager {

// Get a collection of entities. Returns a tuple of vectors, one for each component requested via Args
template<class... Args>
std::tuple<std::vector<Args>...> get_entities_and_decrement_refcount(const std::vector<EntityId>& ids) {
std::tuple<std::vector<Args>...> get_entities_and_decrement_refcount(std::span<const EntityId> ids) {
return get_entities_impl<Args...>(ids, true);
}

// Get a collection of entities. Returns a tuple of vectors, one for each component requested via Args
template<class... Args>
std::tuple<std::vector<Args>...> get_entities(const std::vector<EntityId>& ids) {
std::tuple<std::vector<Args>...> get_entities(std::span<const EntityId> ids) {
return get_entities_impl<Args...>(ids, false);
}

Expand All @@ -133,7 +137,7 @@ class ComponentManager {
void update_entity_fetch_count(EntityId id, EntityFetchCount count);

template<class... Args>
std::tuple<std::vector<Args>...> get_entities_impl(const std::vector<EntityId>& ids, bool decrement_ref_count) {
std::tuple<std::vector<Args>...> get_entities_impl(std::span<const EntityId> ids, bool decrement_ref_count) {
std::vector<std::tuple<Args...>> tuple_res;
ARCTICDB_SAMPLE_DEFAULT(GetEntities)
tuple_res.reserve(ids.size());
Expand Down
Loading