From d6918202b985fc89b628d37e95650253f5407b97 Mon Sep 17 00:00:00 2001 From: Ivo Date: Tue, 28 Apr 2026 11:50:14 +0100 Subject: [PATCH 1/8] Resampling C++ benchmark --- cpp/arcticdb/CMakeLists.txt | 1 + .../processing/test/benchmark_resample.cpp | 194 ++++++++++++++++++ 2 files changed, 195 insertions(+) create mode 100644 cpp/arcticdb/processing/test/benchmark_resample.cpp diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index dd9fd672f3e..3be56265f17 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -1240,6 +1240,7 @@ if(${TEST}) processing/test/benchmark_binary.cpp processing/test/benchmark_clause.cpp processing/test/benchmark_common.cpp + processing/test/benchmark_resample.cpp processing/test/benchmark_ternary.cpp util/test/benchmark_bitset.cpp version/test/benchmark_write.cpp diff --git a/cpp/arcticdb/processing/test/benchmark_resample.cpp b/cpp/arcticdb/processing/test/benchmark_resample.cpp new file mode 100644 index 00000000000..2bc9ce0bae1 --- /dev/null +++ b/cpp/arcticdb/processing/test/benchmark_resample.cpp @@ -0,0 +1,194 @@ +/* Copyright 2026 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software + * will be governed by the Apache License, version 2.0. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include + +using namespace arcticdb; +using namespace arcticdb::pipelines; +using namespace arcticdb::stream; + +// run like: --benchmark_time_unit=ms --benchmark_filter=BM_resample.* --benchmark_min_time=5x +// +// Isolates ResampleClause::process from the rest of the read pipeline. Segments are constructed in memory and +// fed directly into the clause, so timings reflect the cost of bucket-boundary advancement, output index +// construction, and per-column aggregation only. +// +// Args (per benchmark): {rows_per_segment, num_segments, num_buckets, num_value_cols} +// +// num_buckets together with the total row count selects the layout: +// * num_buckets <= num_segments -> each bucket spans several row-slices +// * num_segments < num_buckets <= total_rows -> many rows per bucket, all inside a single row-slice +// * num_buckets > total_rows -> bucket size smaller than the row spacing, most empty +// +// Row spacing is derived: if num_buckets > total_rows the rows are spread out so that 1ns buckets still +// cover the full data span; otherwise rows are 1ns apart. + +namespace { + +constexpr DataType kValueDataType = DataType::INT64; +const std::string kValuePrefix = "col_"; + +ResampleClause::BucketGeneratorT make_bucket_generator(std::vector boundaries) { + return [boundaries = std::move(boundaries)]( + timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, ResampleOrigin + ) { return boundaries; }; +} + +StreamDescriptor make_descriptor(size_t num_value_cols) { + std::vector fields; + fields.reserve(num_value_cols); + for (size_t c = 0; c < num_value_cols; ++c) { + fields.emplace_back(TypeDescriptor(kValueDataType, Dimension::Dim0), kValuePrefix + std::to_string(c)); + } + return TimeseriesIndex::default_index().create_stream_descriptor("Resample", std::move(fields)); +} + +std::shared_ptr make_segment( + const StreamDescriptor& desc, size_t num_rows, timestamp start_ts, timestamp step, size_t num_value_cols +) { + auto seg = std::make_shared( + desc, num_rows, AllocationType::PRESIZED, Sparsity::NOT_PERMITTED, std::nullopt + ); + std::vector index_data(num_rows); + for (size_t row = 0; row < num_rows; ++row) { + index_data[row] = start_ts + step * static_cast(row); + } + fill_dense_column_data(*seg, 0, index_data); + std::vector col_data(num_rows); + for (size_t c = 0; c < num_value_cols; ++c) { + for (size_t row = 0; row < num_rows; ++row) { + col_data[row] = static_cast((row + c) % 1024); + } + fill_dense_column_data(*seg, c + 1, col_data); + } + return seg; +} + +ResampleClause make_resample_clause( + std::vector bucket_boundaries, timestamp date_range_start, timestamp date_range_end, + const std::shared_ptr& component_manager, size_t num_value_cols +) { + ResampleClause clause{ + "dummy", ResampleBoundary::LEFT, make_bucket_generator(bucket_boundaries), 0, 0 + }; + clause.bucket_boundaries_ = std::move(bucket_boundaries); + clause.date_range_ = TimestampRange{date_range_start, date_range_end}; + clause.set_component_manager(component_manager); + clause.set_processing_config(ProcessingConfig{false, 0, IndexDescriptor::Type::TIMESTAMP}); + std::vector aggs; + aggs.reserve(num_value_cols); + for (size_t c = 0; c < num_value_cols; ++c) { + const auto col = kValuePrefix + std::to_string(c); + aggs.emplace_back("sum", col, col); + } + clause.set_aggregations(aggs); + return clause; +} + +struct Layout { + std::vector> segments; + std::vector bucket_boundaries; + timestamp date_range_start; + timestamp date_range_end; +}; + +Layout build_layout(size_t rows_per_segment, size_t num_segments, size_t num_buckets, size_t num_value_cols) { + const size_t total_rows = rows_per_segment * num_segments; + // Pick row_step so the data spans at least num_buckets ns. If num_buckets <= total_rows the rows are 1ns + // apart and buckets are bigger than 1ns. If num_buckets > total_rows the rows are spread out so 1ns + // buckets still cover everything, leaving most buckets empty. + const auto target_span = std::max(total_rows, num_buckets); + const auto row_step = static_cast(target_span / total_rows); + const auto total_span = static_cast(total_rows) * row_step; + const auto bucket_size = std::max(1, total_span / static_cast(num_buckets)); + + auto desc = make_descriptor(num_value_cols); + std::vector> segments; + segments.reserve(num_segments); + for (size_t s = 0; s < num_segments; ++s) { + const auto start_ts = static_cast(s * rows_per_segment) * row_step; + segments.push_back(make_segment(desc, rows_per_segment, start_ts, row_step, num_value_cols)); + } + + std::vector boundaries; + boundaries.reserve(num_buckets + 1); + for (timestamp b = 0; b <= total_span; b += bucket_size) { + boundaries.push_back(b); + } + if (boundaries.back() <= total_span - row_step) { + boundaries.push_back(boundaries.back() + bucket_size); + } + + return Layout{ + std::move(segments), std::move(boundaries), 0, total_span - row_step + }; +} + +std::vector register_segments( + ComponentManager& component_manager, const std::vector>& segments, + size_t num_value_cols +) { + auto ids = component_manager.get_new_entity_ids(segments.size()); + size_t row_offset = 0; + for (size_t i = 0; i < segments.size(); ++i) { + const auto& seg = segments[i]; + const auto rows = seg->row_count(); + auto rr = std::make_shared(row_offset, row_offset + rows); + auto cr = std::make_shared(1, num_value_cols + 1); + component_manager.add_entity(ids[i], seg, rr, cr, EntityFetchCount{1}); + row_offset += rows; + } + return ids; +} + +} // namespace + +static void BM_resample(benchmark::State& state) { + const auto rows_per_segment = static_cast(state.range(0)); + const auto num_segments = static_cast(state.range(1)); + const auto num_buckets = static_cast(state.range(2)); + const auto num_value_cols = static_cast(state.range(3)); + + auto layout = build_layout(rows_per_segment, num_segments, num_buckets, num_value_cols); + + for (auto _ : state) { + state.PauseTiming(); + auto component_manager = std::make_shared(); + auto clause = make_resample_clause( + layout.bucket_boundaries, + layout.date_range_start, + layout.date_range_end, + component_manager, + num_value_cols + ); + auto ids = register_segments(*component_manager, layout.segments, num_value_cols); + state.ResumeTiming(); + auto out = clause.process(std::move(ids)); + benchmark::DoNotOptimize(out); + } +} + +// 1m rows total across all regimes; num_segments and num_buckets carry the variation. +// 1000 rows per bucket, all rows in a single row-slice. +BENCHMARK(BM_resample)->Args({100'000, 10, 1'000, 1})->Args({100'000, 10, 1'000, 100}); +// 100 rows per bucket, all rows in a single row-slice. +BENCHMARK(BM_resample)->Args({100'000, 10, 10'000, 1})->Args({100'000, 10, 10'000, 100}); +// 10 rows per bucket, all rows in a single row-slice. +BENCHMARK(BM_resample)->Args({100'000, 10, 100'000, 1})->Args({100'000, 10, 100'000, 100}); +// Each bucket spans several row-slices. +BENCHMARK(BM_resample)->Args({2'000, 500, 100, 1})->Args({2'000, 500, 100, 100}); +// Bucket size smaller than row spacing, most buckets empty. +BENCHMARK(BM_resample)->Args({100'000, 10, 10'000'000, 1})->Args({100'000, 10, 10'000'000, 100}); From 5ed1265ebdea8b61649ed95eb40430700401e616 Mon Sep 17 00:00:00 2001 From: Ivo Date: Tue, 28 Apr 2026 13:34:27 +0100 Subject: [PATCH 2/8] Pure code move of `generate_output_index_column` to sorted_aggregation --- cpp/arcticdb/processing/clause.hpp | 5 -- cpp/arcticdb/processing/clause_resample.cpp | 63 +--------------- .../processing/sorted_aggregation.cpp | 73 +++++++++++++++++++ .../processing/sorted_aggregation.hpp | 8 ++ 4 files changed, 82 insertions(+), 67 deletions(-) diff --git a/cpp/arcticdb/processing/clause.hpp b/cpp/arcticdb/processing/clause.hpp index 734dd24a409..288720e06e7 100644 --- a/cpp/arcticdb/processing/clause.hpp +++ b/cpp/arcticdb/processing/clause.hpp @@ -438,11 +438,6 @@ struct ResampleClause { std::vector generate_bucket_boundaries( timestamp first_ts, timestamp last_ts, bool responsible_for_first_overlapping_bucket ) const; - - std::shared_ptr generate_output_index_column( - const std::vector>& input_index_columns, - const std::vector& bucket_boundaries - ) const; }; template diff --git a/cpp/arcticdb/processing/clause_resample.cpp b/cpp/arcticdb/processing/clause_resample.cpp index dbab71bb5c3..9a2827fe66c 100644 --- a/cpp/arcticdb/processing/clause_resample.cpp +++ b/cpp/arcticdb/processing/clause_resample.cpp @@ -328,7 +328,7 @@ std::vector ResampleClause::process(std::vectorat(0)->column_ptr(0)); } - const auto output_index_column = generate_output_index_column(input_index_columns, bucket_boundaries); + const auto output_index_column = generate_output_index_column(input_index_columns, bucket_boundaries, *date_range_, label_boundary_); // Bucket boundaries can be wider than the date range specified by the user, narrow the first and last buckets // here if necessary bucket_boundaries.front( @@ -435,67 +435,6 @@ std::vector ResampleClause::generate_bucket_boundari return bucket_boundaries; } -template -std::shared_ptr ResampleClause::generate_output_index_column( - const std::vector>& input_index_columns, const std::vector& bucket_boundaries -) const { - constexpr auto data_type = DataType::NANOSECONDS_UTC64; - using IndexTDT = ScalarTagType>; - - const auto max_index_column_bytes = (bucket_boundaries.size() - 1) * get_type_size(data_type); - auto output_index_column = std::make_shared( - TypeDescriptor(data_type, Dimension::Dim0), - Sparsity::NOT_PERMITTED, - ChunkedBuffer::presized_in_blocks(max_index_column_bytes) - ); - auto output_index_column_data = output_index_column->data(); - auto output_index_column_it = output_index_column_data.template begin(); - size_t output_index_column_row_count{0}; - - auto bucket_end_it = std::next(bucket_boundaries.cbegin()); - Bucket current_bucket{*std::prev(bucket_end_it), *bucket_end_it}; - bool current_bucket_added_to_index{false}; - // Only include buckets that have at least one index value in range - for (const auto& input_index_column : input_index_columns) { - auto index_column_data = input_index_column->data(); - const auto cend = index_column_data.cend(); - auto it = index_column_data.cbegin(); - // In case the passed date_range does not span the whole segment we need to skip the index values - // which are before the date range start. - while (it != cend && *it < date_range_->first) { - ++it; - } - for (; it != cend && *it <= date_range_->second; ++it) { - if (ARCTICDB_LIKELY(current_bucket.contains(*it))) { - if (ARCTICDB_UNLIKELY(!current_bucket_added_to_index)) { - *output_index_column_it++ = - label_boundary_ == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; - ++output_index_column_row_count; - current_bucket_added_to_index = true; - } - } else { - advance_boundary_past_value(bucket_boundaries, bucket_end_it, *it); - if (ARCTICDB_UNLIKELY(bucket_end_it == bucket_boundaries.end())) { - break; - } else { - current_bucket.set_boundaries(*std::prev(bucket_end_it), *bucket_end_it); - current_bucket_added_to_index = false; - if (ARCTICDB_LIKELY(current_bucket.contains(*it))) { - *output_index_column_it++ = - label_boundary_ == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; - ++output_index_column_row_count; - current_bucket_added_to_index = true; - } - } - } - } - } - const auto actual_index_column_bytes = output_index_column_row_count * get_type_size(data_type); - output_index_column->buffer().trim(actual_index_column_bytes); - output_index_column->set_row_data(output_index_column_row_count - 1); - return output_index_column; -} - template struct ResampleClause; template struct ResampleClause; diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index 0c2d1e42898..9401664bebb 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -80,6 +81,78 @@ bool value_past_bucket_start(const timestamp bucket_start, const timestamp value return value > bucket_start; } +template +std::shared_ptr generate_output_index_column( + const std::vector>& input_index_columns, + const std::vector& bucket_boundaries, const TimestampRange& date_range, + const ResampleBoundary label_boundary +) { + constexpr auto data_type = DataType::NANOSECONDS_UTC64; + using IndexTDT = ScalarTagType>; + + const auto max_index_column_bytes = (bucket_boundaries.size() - 1) * get_type_size(data_type); + auto output_index_column = std::make_shared( + TypeDescriptor(data_type, Dimension::Dim0), + Sparsity::NOT_PERMITTED, + ChunkedBuffer::presized_in_blocks(max_index_column_bytes) + ); + auto output_index_column_data = output_index_column->data(); + auto output_index_column_it = output_index_column_data.template begin(); + size_t output_index_column_row_count{0}; + + auto bucket_end_it = std::next(bucket_boundaries.cbegin()); + Bucket current_bucket{*std::prev(bucket_end_it), *bucket_end_it}; + bool current_bucket_added_to_index{false}; + // Only include buckets that have at least one index value in range + for (const auto& input_index_column : input_index_columns) { + auto index_column_data = input_index_column->data(); + const auto cend = index_column_data.cend(); + auto it = index_column_data.cbegin(); + // In case the passed date_range does not span the whole segment we need to skip the index values + // which are before the date range start. + while (it != cend && *it < date_range.first) { + ++it; + } + for (; it != cend && *it <= date_range.second; ++it) { + if (ARCTICDB_LIKELY(current_bucket.contains(*it))) { + if (ARCTICDB_UNLIKELY(!current_bucket_added_to_index)) { + *output_index_column_it++ = + label_boundary == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; + ++output_index_column_row_count; + current_bucket_added_to_index = true; + } + } else { + advance_boundary_past_value(bucket_boundaries, bucket_end_it, *it); + if (ARCTICDB_UNLIKELY(bucket_end_it == bucket_boundaries.end())) { + break; + } else { + current_bucket.set_boundaries(*std::prev(bucket_end_it), *bucket_end_it); + current_bucket_added_to_index = false; + if (ARCTICDB_LIKELY(current_bucket.contains(*it))) { + *output_index_column_it++ = + label_boundary == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; + ++output_index_column_row_count; + current_bucket_added_to_index = true; + } + } + } + } + } + const auto actual_index_column_bytes = output_index_column_row_count * get_type_size(data_type); + output_index_column->buffer().trim(actual_index_column_bytes); + output_index_column->set_row_data(output_index_column_row_count - 1); + return output_index_column; +} + +template std::shared_ptr generate_output_index_column( + const std::vector>&, const std::vector&, const TimestampRange&, + ResampleBoundary +); +template std::shared_ptr generate_output_index_column( + const std::vector>&, const std::vector&, const TimestampRange&, + ResampleBoundary +); + template std::optional SortedAggregator::generate_resampling_output_column( [[maybe_unused]] const std::span> input_index_columns, diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index 4087a018afb..16afe18bd3a 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -89,6 +90,13 @@ class Bucket { timestamp end_; }; +template +std::shared_ptr generate_output_index_column( + const std::vector>& input_index_columns, + const std::vector& bucket_boundaries, const TimestampRange& date_range, + ResampleBoundary label_boundary +); + enum class AggregationOperator { SUM, MEAN, MIN, MAX, FIRST, LAST, COUNT }; template From 2548927213361903a5bade57499f14121a8bdcde Mon Sep 17 00:00:00 2001 From: Ivo Date: Wed, 29 Apr 2026 15:52:31 +0100 Subject: [PATCH 3/8] Construct and use `ResampleMapping` for all resampling operations Previously each of `generate_output_index_column`, `generate_resample_output_column` and `aggregate` had complicated logic to identify which row corresponds to which output column. This is simplified by creating a `ResampleMapping` when building the output index column to store which output row corresponds to which input values. Then `ResampleMapping` is used in the other methods. --- cpp/arcticdb/processing/clause_resample.cpp | 20 +- .../processing/sorted_aggregation.cpp | 293 ++++++++---------- .../processing/sorted_aggregation.hpp | 39 ++- .../processing/test/benchmark_resample.cpp | 4 +- .../processing/test/test_resample.cpp | 161 +++++----- 5 files changed, 233 insertions(+), 284 deletions(-) diff --git a/cpp/arcticdb/processing/clause_resample.cpp b/cpp/arcticdb/processing/clause_resample.cpp index 9a2827fe66c..577852b3687 100644 --- a/cpp/arcticdb/processing/clause_resample.cpp +++ b/cpp/arcticdb/processing/clause_resample.cpp @@ -328,13 +328,9 @@ std::vector ResampleClause::process(std::vectorat(0)->column_ptr(0)); } - const auto output_index_column = generate_output_index_column(input_index_columns, bucket_boundaries, *date_range_, label_boundary_); - // Bucket boundaries can be wider than the date range specified by the user, narrow the first and last buckets - // here if necessary - bucket_boundaries.front( - ) = std::max(bucket_boundaries.front(), date_range_->first - (closed_boundary == ResampleBoundary::RIGHT ? 1 : 0)); - bucket_boundaries.back( - ) = std::min(bucket_boundaries.back(), date_range_->second + (closed_boundary == ResampleBoundary::LEFT ? 1 : 0)); + const auto [output_index_column, mapping] = generate_output_index_column( + input_index_columns, bucket_boundaries, *date_range_, label_boundary_ + ); SegmentInMemory seg; RowRange output_row_range( row_slices.front().row_ranges_->at(0)->start(), @@ -369,14 +365,8 @@ std::vector ResampleClause::process(std::vector aggregated = aggregator.aggregate( - input_index_columns, - input_agg_columns, - bucket_boundaries, - *output_index_column, - string_pool, - label_boundary_ - ); + std::optional aggregated = + aggregator.aggregate(input_index_columns, input_agg_columns, mapping, string_pool); if (aggregated) { seg.add_column( scalar_field(aggregated->type().data_type(), aggregator.get_output_column_name().value), diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index 9401664bebb..e0abd7a20ea 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -74,15 +74,7 @@ SortedAggregatorOutputColumnInfo SortedAggregator -bool value_past_bucket_start(const timestamp bucket_start, const timestamp value) { - if constexpr (closed_boundary == ResampleBoundary::LEFT) { - return value >= bucket_start; - } - return value > bucket_start; -} - -template -std::shared_ptr generate_output_index_column( +std::pair, ResampleMapping> generate_output_index_column( const std::vector>& input_index_columns, const std::vector& bucket_boundaries, const TimestampRange& date_range, const ResampleBoundary label_boundary @@ -100,30 +92,42 @@ std::shared_ptr generate_output_index_column( auto output_index_column_it = output_index_column_data.template begin(); size_t output_index_column_row_count{0}; + ResampleMapping mapping; + mapping.reserve(bucket_boundaries.size()); + auto bucket_end_it = std::next(bucket_boundaries.cbegin()); Bucket current_bucket{*std::prev(bucket_end_it), *bucket_end_it}; bool current_bucket_added_to_index{false}; - // Only include buckets that have at least one index value in range - for (const auto& input_index_column : input_index_columns) { + bool reached_end_of_buckets{false}; + ResampleInputCursor close_cursor{input_index_columns.size(), 0}; + for (auto&& [col_idx, input_index_column] : folly::enumerate(input_index_columns)) { + if (reached_end_of_buckets) { + break; + } auto index_column_data = input_index_column->data(); - const auto cend = index_column_data.cend(); - auto it = index_column_data.cbegin(); + const auto cend = index_column_data.template cend(); + auto it = index_column_data.template cbegin(); + size_t offset_in_col{0}; // In case the passed date_range does not span the whole segment we need to skip the index values // which are before the date range start. while (it != cend && *it < date_range.first) { ++it; + ++offset_in_col; } - for (; it != cend && *it <= date_range.second; ++it) { + for (; it != cend && *it <= date_range.second; ++it, ++offset_in_col) { if (ARCTICDB_LIKELY(current_bucket.contains(*it))) { if (ARCTICDB_UNLIKELY(!current_bucket_added_to_index)) { *output_index_column_it++ = label_boundary == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; ++output_index_column_row_count; + mapping.push_back({col_idx, offset_in_col}); current_bucket_added_to_index = true; } } else { advance_boundary_past_value(bucket_boundaries, bucket_end_it, *it); if (ARCTICDB_UNLIKELY(bucket_end_it == bucket_boundaries.end())) { + close_cursor = {col_idx, offset_in_col}; + reached_end_of_buckets = true; break; } else { current_bucket.set_boundaries(*std::prev(bucket_end_it), *bucket_end_it); @@ -132,90 +136,68 @@ std::shared_ptr generate_output_index_column( *output_index_column_it++ = label_boundary == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; ++output_index_column_row_count; + mapping.push_back({col_idx, offset_in_col}); current_bucket_added_to_index = true; } } } } + if (!reached_end_of_buckets && it != cend) { + // Stopped because *it > date_range.second; subsequent values do not feed any bucket. + close_cursor = {col_idx, offset_in_col}; + reached_end_of_buckets = true; + } } + mapping.push_back(close_cursor); const auto actual_index_column_bytes = output_index_column_row_count * get_type_size(data_type); output_index_column->buffer().trim(actual_index_column_bytes); output_index_column->set_row_data(output_index_column_row_count - 1); - return output_index_column; + return {std::move(output_index_column), std::move(mapping)}; } -template std::shared_ptr generate_output_index_column( +template std::pair, ResampleMapping> generate_output_index_column( const std::vector>&, const std::vector&, const TimestampRange&, ResampleBoundary ); -template std::shared_ptr generate_output_index_column( +template std::pair, ResampleMapping> generate_output_index_column( const std::vector>&, const std::vector&, const TimestampRange&, ResampleBoundary ); template std::optional SortedAggregator::generate_resampling_output_column( - [[maybe_unused]] const std::span> input_index_columns, - const std::span> input_agg_columns, const Column& output_index_column, - [[maybe_unused]] const ResampleBoundary label + const std::span> input_agg_columns, const ResampleMapping& mapping ) const { - using IndexTDT = ScalarTagType>; const SortedAggregatorOutputColumnInfo type_info = generate_common_input_type(input_agg_columns); - if (!type_info.data_type_) { return std::nullopt; } - + const int64_t output_row_count = static_cast(mapping.size()) - 1; if (!type_info.maybe_sparse_) { return Column( make_scalar_type(generate_output_data_type(*type_info.data_type_)), - output_index_column.row_count(), + output_row_count, AllocationType::PRESIZED, Sparsity::NOT_PERMITTED ); } - auto output_data = output_index_column.data(); - const auto output_accessor = random_accessor(&output_data); - util::BitSet sparse_map(output_index_column.row_count()); - int64_t output_row = 0; - int64_t output_row_prev = 0; - - for (auto&& [col_index, input_index_column] : folly::enumerate(input_index_columns)) { - // Skip all labels that come before the first index value in the input column - const timestamp first_index_value = *(input_index_column->template begin()); - while (output_row < output_index_column.row_count() && - value_past_bucket_start(output_accessor.at(output_row), first_index_value)) { - ++output_row; - } - // If label is left this means the "bucket" is represented by the start of the interval, thus the loop above - // skipped to the beginning of the next bucket. - output_row = std::max(int64_t{0}, output_row - (label == ResampleBoundary::LEFT)); - output_row_prev = output_row; - - // Compute how many output index values does the column span - const timestamp last_index_value = - *(input_index_column->template begin() + (input_index_column->row_count() - 1)); - while (output_row < output_index_column.row_count() && - value_past_bucket_start(output_accessor.at(output_row), last_index_value)) { - ++output_row; - } - output_row = std::max(int64_t{0}, output_row - (label == ResampleBoundary::LEFT)); - - if (input_agg_columns[col_index]) { - // This can happen when a column has values in the last bucket and there are columns after that which are - // also in the last bucket. There's no need to iterate over the rest columns as we only need to know - // if there's something in the bucket. - if (output_row >= sparse_map.size()) { - sparse_map.set_range(output_row_prev, sparse_map.size() - 1); + util::BitSet sparse_map(output_row_count); + for (int64_t out_row = 0; out_row < output_row_count; ++out_row) { + const auto& start = mapping[out_row]; + const auto& end = mapping[out_row + 1]; + const size_t last_contributing_exclusive = end.input_column_idx + (end.offset > 0 ? 1 : 0); + for (size_t col_idx = start.input_column_idx; + col_idx < last_contributing_exclusive && col_idx < input_agg_columns.size(); + ++col_idx) { + if (input_agg_columns[col_idx].has_value()) { + sparse_map.set(out_row); break; } - sparse_map.set_range(output_row_prev, output_row); } - output_row_prev = output_row; } const Sparsity sparsity = sparse_map.count() == sparse_map.size() ? Sparsity::NOT_PERMITTED : Sparsity::PERMITTED; - const int64_t row_count = sparsity == Sparsity::PERMITTED ? sparse_map.count() : output_index_column.row_count(); + const int64_t row_count = sparsity == Sparsity::PERMITTED ? sparse_map.count() : output_row_count; Column result( make_scalar_type(generate_output_data_type(*type_info.data_type_)), row_count, @@ -225,54 +207,76 @@ std::optional SortedAggregator::g if (sparsity == Sparsity::PERMITTED) { result.set_sparse_map(std::move(sparse_map)); } - result.set_row_data(output_index_column.row_count() - 1); + result.set_row_data(output_row_count - 1); return result; } template std::optional SortedAggregator::aggregate( const std::vector>& input_index_columns, - const std::vector>& input_agg_columns, - const std::vector& bucket_boundaries, const Column& output_index_column, StringPool& string_pool, - const ResampleBoundary label + const std::vector>& input_agg_columns, const ResampleMapping& mapping, + StringPool& string_pool ) const { - using IndexTDT = ScalarTagType>; - std::optional res = - generate_resampling_output_column(input_index_columns, input_agg_columns, output_index_column, label); + std::optional res = generate_resampling_output_column(input_agg_columns, mapping); if (!res) { return std::nullopt; } details::visit_type(res->type().data_type(), [&](auto output_type_desc_tag) { using output_type_info = ScalarTypeInfo; - auto output_data = res->data(); - auto output_it = output_data.begin(); - auto output_end_it = output_data.end(); // Need this here to only generate valid get_bucket_aggregator code, exception will have been thrown earlier at // runtime if constexpr (is_aggregation_allowed(aggregation_operator)) { auto bucket_aggregator = get_bucket_aggregator(); - bool reached_end_of_buckets{false}; - auto bucket_start_it = bucket_boundaries.cbegin(); - auto bucket_end_it = std::next(bucket_start_it); - Bucket current_bucket(*bucket_start_it, *bucket_end_it); - bool bucket_has_values{false}; - const auto bucket_boundaries_end = bucket_boundaries.cend(); - for (auto [idx, input_agg_column] : folly::enumerate(input_agg_columns)) { - // If input_agg_column is std::nullopt this means that the aggregated column is missing from the - // segment. This means that there is no way we can push in the aggregator. The only thing that must - // be done is skipping buckets and (if needed) finalize the aggregator but that is covered by the - // else if (index_value_past_end_of_bucket(*index_it, current_bucket.end())) && output_it != - // output_end_it) below. This works because the sparse structure of the output column is precomputed by - // generate_resampling_output_column and the column data is pre-allocated. - if (input_agg_column.has_value()) { + auto output_data = res->data(); + const auto run = [&]() { + auto output_it = output_data.template begin< + typename output_type_info::TDT, + IteratorType::ENUMERATED, + output_density>(); + const auto output_end = + output_data + .template end( + ); + if (output_it == output_end) { + return; + } + size_t start_col_idx = mapping[output_it->idx()].input_column_idx; + size_t start_col_offset = mapping[output_it->idx()].offset; + size_t end_col_idx = mapping[output_it->idx() + 1].input_column_idx; + size_t end_col_offset = mapping[output_it->idx() + 1].offset; + const auto advance_output = [&]() { + output_it->value() = + finalize_aggregator(bucket_aggregator, string_pool); + ++output_it; + if (output_it != output_end) { + start_col_idx = mapping[output_it->idx()].input_column_idx; + start_col_offset = mapping[output_it->idx()].offset; + end_col_idx = mapping[output_it->idx() + 1].input_column_idx; + end_col_offset = mapping[output_it->idx() + 1].offset; + } + }; + for (size_t col_idx = 0; col_idx < input_agg_columns.size() && output_it != output_end; ++col_idx) { + // Finalise any buckets whose exclusive end falls at or before the start of this column. + while (output_it != output_end && + (col_idx > end_col_idx || (col_idx == end_col_idx && end_col_offset == 0))) { + advance_output(); + } + if (output_it == output_end) { + break; + } + if (col_idx < start_col_idx) { + continue; + } + const auto& opt_input_agg_column = input_agg_columns[col_idx]; + if (!opt_input_agg_column.has_value()) { + continue; + } + const auto& agg_column = *opt_input_agg_column; + const auto& input_index_column = input_index_columns[col_idx]; details::visit_type( - input_agg_column->column_->type().data_type(), - [&, - &agg_column = *input_agg_column, - &input_index_column = input_index_columns.at(idx)](auto input_type_desc_tag) { + agg_column.column_->type().data_type(), + [&, &agg_column = agg_column](auto input_type_desc_tag) { using input_type_info = ScalarTypeInfo; - // Again, only needed to generate valid code below, exception will have been thrown - // earlier at runtime if constexpr (is_aggregation_allowed( aggregation_operator )) { @@ -283,76 +287,47 @@ std::optional SortedAggregator::a "resampling.", get_input_column_name().value ); - auto index_data = input_index_column->data(); - const auto index_cend = index_data.template cend(); auto agg_data = agg_column.column_->data(); - auto agg_it = agg_data.template cbegin(); - for (auto index_it = index_data.template cbegin(); - index_it != index_cend && !reached_end_of_buckets; - ++index_it, ++agg_it) { - if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { - push_to_aggregator( - bucket_aggregator, *agg_it, agg_column - ); - bucket_has_values = true; - } else if (ARCTICDB_LIKELY(index_value_past_end_of_bucket( - *index_it, current_bucket.end() - )) && - output_it != output_end_it) { - if (bucket_has_values) { - *output_it = finalize_aggregator( - bucket_aggregator, string_pool - ); - ++output_it; - } - // The following code is equivalent to: - // if constexpr (closed_boundary == ResampleBoundary::LEFT) { - // bucket_end_it = std::upper_bound(bucket_end_it, - // bucket_boundaries_end, *index_it); - // } else { - // bucket_end_it = std::upper_bound(bucket_end_it, - // bucket_boundaries_end, *index_it, std::less_equal{}); - // } - // bucket_start_it = std::prev(bucket_end_it); - // reached_end_of_buckets = bucket_end_it == bucket_boundaries_end; - // The above code will be more performant when the vast majority of buckets - // are empty See comment in ResampleClause::advance_boundary_past_value for - // mathematical and experimental bounds - ++bucket_start_it; - if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { - reached_end_of_buckets = true; - } else { - while (ARCTICDB_UNLIKELY( - index_value_past_end_of_bucket(*index_it, *bucket_end_it) - )) { - ++bucket_start_it; - if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { - reached_end_of_buckets = true; - break; - } - } - } - if (ARCTICDB_LIKELY(!reached_end_of_buckets)) { - bucket_has_values = false; - current_bucket.set_boundaries(*bucket_start_it, *bucket_end_it); - if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { - push_to_aggregator( - bucket_aggregator, *agg_it, agg_column - ); - bucket_has_values = true; - } - } + auto col_it = agg_data.template cbegin< + typename input_type_info::TDT, + IteratorType::ENUMERATED, + IteratorDensity::DENSE>(); + const auto col_end = agg_data.template cend< + typename input_type_info::TDT, + IteratorType::ENUMERATED, + IteratorDensity::DENSE>(); + for (; col_it != col_end && output_it != output_end; ++col_it) { + const auto idx = static_cast(col_it->idx()); + // After advance_output, the next bucket may not include this column. + if (col_idx < start_col_idx) { + break; + } + // Skip rows before the bucket's start (e.g., right-closed bucket excluding + // its leftmost edge, or date_range trimming). + if (col_idx == start_col_idx && idx < start_col_offset) { + continue; + } + push_to_aggregator( + bucket_aggregator, col_it->value(), agg_column + ); + if (col_idx == end_col_idx && idx + 1 == end_col_offset) { + advance_output(); } } } } ); } - } - // We were in the middle of aggregating a bucket when we ran out of index values - if (output_it != output_end_it) { - *output_it = finalize_aggregator(bucket_aggregator, string_pool); - ++output_it; + // The trailing bucket (whose exclusive end is past the last input column) is finalised here. + if (output_it != output_end) { + advance_output(); + } + util::check(output_it == output_end, "Resample aggregation finished without consuming all output rows"); + }; + if (res->is_sparse()) { + run.template operator()(); + } else { + run.template operator()(); } } }); @@ -393,18 +368,6 @@ std::optional SortedAggregator::ge } } -template -bool SortedAggregator::index_value_past_end_of_bucket( - timestamp index_value, timestamp bucket_end -) const { - if constexpr (closed_boundary == ResampleBoundary::LEFT) { - return index_value >= bucket_end; - } else { - // closed_boundary == ResampleBoundary::RIGHT - return index_value > bucket_end; - } -} - template class SortedAggregator; template class SortedAggregator; template class SortedAggregator; diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index 16afe18bd3a..ca176199556 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -23,6 +23,18 @@ namespace arcticdb { enum class ResampleBoundary { LEFT, RIGHT }; +// Identifies a row position within a sequence of sorted-row-slice index columns. `input_column_idx` indexes into the +// outer `input_index_columns` vector; `offset` is the row offset within that column. +struct ResampleInputCursor { + size_t input_column_idx; + size_t offset; +}; + +// Mapping from output row to input range. For output row i, the input values feeding it are +// `[mapping[i], mapping[i+1])` interpreted as a contiguous range across the input index columns. The vector has length +// N+1 where N is the number of non-empty buckets (= output rows). The trailing cursor is the one-past-the-end position. +using ResampleMapping = std::vector; + struct ISortedAggregator { template struct Interface : Base { @@ -30,19 +42,10 @@ struct ISortedAggregator { [[nodiscard]] ColumnName get_output_column_name() const { return folly::poly_call<1>(*this); }; [[nodiscard]] std::optional aggregate( const std::vector>& input_index_columns, - const std::vector>& input_agg_columns, - const std::vector& bucket_boundaries, const Column& output_index_column, - StringPool& string_pool, ResampleBoundary label + const std::vector>& input_agg_columns, const ResampleMapping& mapping, + StringPool& string_pool ) const { - return folly::poly_call<2>( - *this, - input_index_columns, - input_agg_columns, - bucket_boundaries, - output_index_column, - string_pool, - label - ); + return folly::poly_call<2>(*this, input_index_columns, input_agg_columns, mapping, string_pool); } void check_aggregator_supported_with_data_type(DataType data_type) const { folly::poly_call<3>(*this, data_type); @@ -91,7 +94,7 @@ class Bucket { }; template -std::shared_ptr generate_output_index_column( +std::pair, ResampleMapping> generate_output_index_column( const std::vector>& input_index_columns, const std::vector& bucket_boundaries, const TimestampRange& date_range, ResampleBoundary label_boundary @@ -396,25 +399,21 @@ class SortedAggregator { [[nodiscard]] std::optional aggregate( const std::vector>& input_index_columns, - const std::vector>& input_agg_columns, - const std::vector& bucket_boundaries, const Column& output_index_column, StringPool& string_pool, - ResampleBoundary label + const std::vector>& input_agg_columns, const ResampleMapping& mapping, + StringPool& string_pool ) const; void check_aggregator_supported_with_data_type(DataType data_type) const; [[nodiscard]] std::optional get_default_value(DataType common_input_data_type) const; [[nodiscard]] DataType generate_output_data_type(const DataType common_input_data_type) const; [[nodiscard]] std::optional generate_resampling_output_column( - const std::span> input_index_columns, - const std::span> input_agg_columns, - const Column& output_index_column, const ResampleBoundary label + std::span> input_agg_columns, const ResampleMapping& mapping ) const; private: [[nodiscard]] SortedAggregatorOutputColumnInfo generate_common_input_type(std::span< const std::optional>) const; - [[nodiscard]] bool index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) const; template void push_to_aggregator( diff --git a/cpp/arcticdb/processing/test/benchmark_resample.cpp b/cpp/arcticdb/processing/test/benchmark_resample.cpp index 2bc9ce0bae1..8d24d766b3b 100644 --- a/cpp/arcticdb/processing/test/benchmark_resample.cpp +++ b/cpp/arcticdb/processing/test/benchmark_resample.cpp @@ -132,9 +132,7 @@ Layout build_layout(size_t rows_per_segment, size_t num_segments, size_t num_buc boundaries.push_back(boundaries.back() + bucket_size); } - return Layout{ - std::move(segments), std::move(boundaries), 0, total_span - row_step - }; + return Layout{std::move(segments), std::move(boundaries), 0, total_span - row_step}; } std::vector register_segments( diff --git a/cpp/arcticdb/processing/test/test_resample.cpp b/cpp/arcticdb/processing/test/test_resample.cpp index 8606a06ca8c..a89c31bb631 100644 --- a/cpp/arcticdb/processing/test/test_resample.cpp +++ b/cpp/arcticdb/processing/test/test_resample.cpp @@ -455,6 +455,19 @@ void assert_column_is_sparse(const Column& c) { ASSERT_TRUE(c.opt_sparse_map().has_value()); } +template +std::optional compute_output_column( + const AggregatorT& aggregator, std::span> input_index_columns, + std::span> input_agg_columns, + const std::vector& bucket_boundaries, ResampleBoundary label_boundary +) { + const std::vector> idx_cols(input_index_columns.begin(), input_index_columns.end()); + const TimestampRange full_range{std::numeric_limits::min(), std::numeric_limits::max()}; + auto [_, mapping] = + generate_output_index_column(idx_cols, bucket_boundaries, full_range, label_boundary); + return aggregator.generate_resampling_output_column(input_agg_columns, mapping); +} + // The aggregation operator does not matter for this case. Just pick one that's applicable to all column types. using AggregatorTypes = ::testing::Types< AggregatorAndLabel< @@ -474,47 +487,48 @@ TYPED_TEST(SortedAggregatorSparseStructure, NoMissingInputColumnsProducesDenseCo ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10, 20, 30, 40, 50}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10, 20, 30, 40, 50}; + // Values are monotonically increasing across the two input columns (sorted-input invariant) and span all 5 buckets. const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{1, 2, 3})), - std::make_shared(create_dense_column(std::array{11, 21, 31, 41})) + std::make_shared(create_dense_column(std::array{1, 2, 11, 12, 21})), + std::make_shared(create_dense_column(std::array{22, 31, 32, 41, 42})) }; const std::array input_agg_columns{ std::make_optional(ColumnWithStrings{ - create_dense_column>>(std::array{0, 5, 6}), + create_dense_column>>(std::array{0, 1, 2, 3, 4}), nullptr, "col1" }), std::make_optional(ColumnWithStrings{ - create_dense_column>>(std::array{10, 35, 56, 1, 2}), + create_dense_column>>(std::array{10, 11, 12, 13, 14}), nullptr, "col1" }) }; { - // Test single input column - const std::optional output = aggregator.generate_resampling_output_column( - std::span{input_index_columns.begin(), 1}, - std::span{input_agg_columns.begin(), 1}, - output_index_column, - label + // Test multiple input columns — values span all 5 buckets + const std::optional output = compute_output_column( + aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label ); EXPECT_TRUE(output.has_value()); assert_column_is_dense(*output); - ASSERT_EQ(output->row_count(), output_index.size()); + ASSERT_EQ(output->row_count(), 5); } { - // Test multiple input columns - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label + // Single input column — only the first column, which covers buckets 0, 1, and 2. + const std::optional output = compute_output_column( + aggregator, + std::span{input_index_columns.begin(), 1}, + std::span{input_agg_columns.begin(), 1}, + bucket_boundaries, + label ); EXPECT_TRUE(output.has_value()); assert_column_is_dense(*output); - ASSERT_EQ(output->row_count(), output_index.size()); + ASSERT_EQ(output->row_count(), 3); } } @@ -524,12 +538,11 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnExistSecondIsMissing) { ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10, 20}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10, 20}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 2, 3})), - std::make_shared(create_dense_column(std::array{11, 21, 22, 24})) + std::make_shared(create_dense_column(std::array{1, 2, 3})), + std::make_shared(create_dense_column(std::array{11, 12, 13, 14})) }; const std::array input_agg_columns{ std::make_optional(ColumnWithStrings{ @@ -539,14 +552,13 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnExistSecondIsMissing) { }), std::optional{} }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); ASSERT_TRUE(output.has_value()); assert_column_is_sparse(*output); const util::BitSet& sparse_map = output->sparse_map(); ASSERT_EQ(output->row_count(), 1); - ASSERT_EQ(output->last_row(), output_index.size() - 1); + ASSERT_EQ(output->last_row(), 1); ASSERT_EQ(sparse_map.size(), 2); ASSERT_EQ(sparse_map.count(), 1); ASSERT_EQ(sparse_map[0], true); @@ -560,13 +572,7 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnExistWithValueOnRightBoun }; constexpr ResampleBoundary label = TypeParam::label; constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; - const Column output_index_column = []() { - if constexpr (label == ResampleBoundary::LEFT) { - return create_dense_column(std::array{0, 10, 30}); - } else { - return create_dense_column(std::array{10, 20, 40}); - } - }(); + const std::vector bucket_boundaries{0, 10, 20, 30, 40}; const std::array input_index_columns{ std::make_shared(create_dense_column(std::array{0, 2, 10})), std::make_shared(create_dense_column(std::array{35, 36})) @@ -579,23 +585,26 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnExistWithValueOnRightBoun }), std::optional{} }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); ASSERT_TRUE(output.has_value()); assert_column_is_sparse(*output); const util::BitSet& sparse_map = output->sparse_map(); if constexpr (closed == ResampleBoundary::LEFT) { + // Buckets [0,10), [10,20), [30,40) are non-empty; [20,30) is dropped. + ASSERT_EQ(sparse_map.size(), 3); ASSERT_EQ(sparse_map.count(), 2); ASSERT_EQ(sparse_map[0], true); ASSERT_EQ(sparse_map[1], true); ASSERT_EQ(sparse_map[2], false); } else if constexpr (closed == ResampleBoundary::RIGHT) { + // Buckets (0,10] (containing 2 and 10) and (30,40] are non-empty; value 0 is not in any + // right-closed bucket and (10,20], (20,30] are empty. + ASSERT_EQ(sparse_map.size(), 2); ASSERT_EQ(sparse_map.count(), 1); ASSERT_EQ(sparse_map[0], true); ASSERT_EQ(sparse_map[1], false); - ASSERT_EQ(sparse_map[2], false); } } @@ -607,12 +616,11 @@ TYPED_TEST(SortedAggregatorSparseStructure, ReturnDenseInCaseOutputIndexIsFilled ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10, 20}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10, 20}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 2, 12})), - std::make_shared(create_dense_column(std::array{15, 16, 18, 20})) + std::make_shared(create_dense_column(std::array{1, 2, 12})), + std::make_shared(create_dense_column(std::array{15, 16, 18})) }; const std::array input_agg_columns{ std::make_optional(ColumnWithStrings{ @@ -622,12 +630,11 @@ TYPED_TEST(SortedAggregatorSparseStructure, ReturnDenseInCaseOutputIndexIsFilled }), std::optional{} }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); EXPECT_TRUE(output.has_value()); assert_column_is_dense(*output); - ASSERT_EQ(output->row_count(), output_index_column.row_count()); + ASSERT_EQ(output->row_count(), 2); } TYPED_TEST(SortedAggregatorSparseStructure, ReturnDenseInCaseOutputIndexIsFilledFirstColumnMissing) { @@ -638,11 +645,10 @@ TYPED_TEST(SortedAggregatorSparseStructure, ReturnDenseInCaseOutputIndexIsFilled ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10, 20}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10, 20}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 2, 5})), + std::make_shared(create_dense_column(std::array{1, 2, 5})), std::make_shared(create_dense_column(std::array{7, 8, 9, 15})) }; const std::array input_agg_columns{ @@ -653,12 +659,11 @@ TYPED_TEST(SortedAggregatorSparseStructure, ReturnDenseInCaseOutputIndexIsFilled "col1" }), }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); EXPECT_TRUE(output.has_value()); assert_column_is_dense(*output); - ASSERT_EQ(output->row_count(), output_index_column.row_count()); + ASSERT_EQ(output->row_count(), 2); } TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnIsMissing) { @@ -667,11 +672,10 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnIsMissing) { ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10, 20}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10, 20}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 2, 3})), + std::make_shared(create_dense_column(std::array{1, 2, 3})), std::make_shared(create_dense_column(std::array{11, 15, 16, 17})) }; const std::array input_agg_columns{ @@ -683,14 +687,13 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnIsMissing) { }), }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); ASSERT_TRUE(output.has_value()); assert_column_is_sparse(*output); const util::BitSet& sparse_map = output->sparse_map(); ASSERT_EQ(output->row_count(), 1); - ASSERT_EQ(output->last_row(), output_index.size() - 1); + ASSERT_EQ(output->last_row(), 1); ASSERT_EQ(sparse_map.size(), 2); ASSERT_EQ(sparse_map.count(), 1); ASSERT_EQ(sparse_map[0], false); @@ -703,14 +706,13 @@ TYPED_TEST(SortedAggregatorSparseStructure, ThreeSegmentsInABucketMiddleIsMissin ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 1})), - std::make_shared(create_dense_column(std::array{2})), - std::make_shared(create_dense_column(std::array{3})) + std::make_shared(create_dense_column(std::array{1, 2})), + std::make_shared(create_dense_column(std::array{3})), + std::make_shared(create_dense_column(std::array{4})) }; const std::array input_agg_columns{ std::make_optional(ColumnWithStrings{ @@ -721,9 +723,8 @@ TYPED_TEST(SortedAggregatorSparseStructure, ThreeSegmentsInABucketMiddleIsMissin create_dense_column>>(std::array{3, 4}), nullptr, "col1" }), }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); ASSERT_TRUE(output.has_value()); assert_column_is_dense(*output); ASSERT_EQ(output->row_count(), 1); @@ -735,14 +736,13 @@ TYPED_TEST(SortedAggregatorSparseStructure, ThreeSegmentsInABuckeOnlyMiddleIsPre ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 1})), - std::make_shared(create_dense_column(std::array{2})), - std::make_shared(create_dense_column(std::array{3})) + std::make_shared(create_dense_column(std::array{1, 2})), + std::make_shared(create_dense_column(std::array{3})), + std::make_shared(create_dense_column(std::array{4})) }; const std::array input_agg_columns{ std::optional{}, @@ -751,9 +751,8 @@ TYPED_TEST(SortedAggregatorSparseStructure, ThreeSegmentsInABuckeOnlyMiddleIsPre }), std::optional{} }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); ASSERT_TRUE(output.has_value()); assert_column_is_dense(*output); ASSERT_EQ(output->row_count(), 1); From 1030d414c0fb9bd9945bbf7ee6f24e5fc96c9d19 Mon Sep 17 00:00:00 2001 From: Ivo Date: Thu, 30 Apr 2026 08:14:04 +0100 Subject: [PATCH 4/8] Use exponential search in `generate_output_index_column` A lot of resampling runtime was spent during generation of output index column. This can be sped up significantly in the common case where number of buckets is much smaller then input rows by using exponential binary search. --- .../column_store/column_algorithms.hpp | 2 + .../processing/sorted_aggregation.cpp | 76 +++++++++++-------- .../processing/test/test_resample.cpp | 2 +- 3 files changed, 46 insertions(+), 34 deletions(-) diff --git a/cpp/arcticdb/column_store/column_algorithms.hpp b/cpp/arcticdb/column_store/column_algorithms.hpp index 4c2d59a6ef5..81c1c74e76a 100644 --- a/cpp/arcticdb/column_store/column_algorithms.hpp +++ b/cpp/arcticdb/column_store/column_algorithms.hpp @@ -8,7 +8,9 @@ #pragma once +#include #include +#include #include #include diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index e0abd7a20ea..c31debf48a7 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -6,6 +6,7 @@ * will be governed by the Apache License, version 2.0. */ +#include #include #include #include @@ -95,6 +96,16 @@ std::pair, ResampleMapping> generate_output_index_column ResampleMapping mapping; mapping.reserve(bucket_boundaries.size()); + // Largest value contained in the bucket that ends at `bucket_end_value`. + // LEFT-closed [_, end) → end - 1; RIGHT-closed (_, end] → end. + constexpr auto inclusive_bucket_end = [](timestamp bucket_end_value) { + if constexpr (closed_boundary == ResampleBoundary::LEFT) { + return bucket_end_value - 1; + } else { + return bucket_end_value; + } + }; + auto bucket_end_it = std::next(bucket_boundaries.cbegin()); Bucket current_bucket{*std::prev(bucket_end_it), *bucket_end_it}; bool current_bucket_added_to_index{false}; @@ -105,46 +116,45 @@ std::pair, ResampleMapping> generate_output_index_column break; } auto index_column_data = input_index_column->data(); - const auto cend = index_column_data.template cend(); - auto it = index_column_data.template cbegin(); - size_t offset_in_col{0}; - // In case the passed date_range does not span the whole segment we need to skip the index values - // which are before the date range start. - while (it != cend && *it < date_range.first) { + const auto cend = index_column_data.template cend(); + auto it = index_column_data.template cbegin(); + // Skip rows before the date range start and rows before the current bucket + it = exponential_upper_bound( + it, cend, std::max(date_range.first - 1, inclusive_bucket_end(*std::prev(bucket_end_it))) + ); + + // Advances `it` to beginning of next bucket or first element after `date_range.second` + auto advance_it = [&]() { + // At the end of `for` it is guaranteed that it points before bucket_end_it, so we need to advance by at + // least one. ++it; - ++offset_in_col; - } - for (; it != cend && *it <= date_range.second; ++it, ++offset_in_col) { - if (ARCTICDB_LIKELY(current_bucket.contains(*it))) { - if (ARCTICDB_UNLIKELY(!current_bucket_added_to_index)) { - *output_index_column_it++ = - label_boundary == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; - ++output_index_column_row_count; - mapping.push_back({col_idx, offset_in_col}); - current_bucket_added_to_index = true; - } - } else { - advance_boundary_past_value(bucket_boundaries, bucket_end_it, *it); - if (ARCTICDB_UNLIKELY(bucket_end_it == bucket_boundaries.end())) { - close_cursor = {col_idx, offset_in_col}; + it = exponential_upper_bound(it, cend, std::min(date_range.second, inclusive_bucket_end(*bucket_end_it))); + }; + for (; it != cend && it->value() <= date_range.second; advance_it()) { + if (!current_bucket.contains(it->value())) { + advance_boundary_past_value(bucket_boundaries, bucket_end_it, it->value()); + + if (bucket_end_it == bucket_boundaries.end()) { + close_cursor = {col_idx, static_cast(it->idx())}; reached_end_of_buckets = true; break; - } else { - current_bucket.set_boundaries(*std::prev(bucket_end_it), *bucket_end_it); - current_bucket_added_to_index = false; - if (ARCTICDB_LIKELY(current_bucket.contains(*it))) { - *output_index_column_it++ = - label_boundary == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; - ++output_index_column_row_count; - mapping.push_back({col_idx, offset_in_col}); - current_bucket_added_to_index = true; - } } + + current_bucket.set_boundaries(*std::prev(bucket_end_it), *bucket_end_it); + current_bucket_added_to_index = false; + } + + if (!current_bucket_added_to_index) { + *output_index_column_it++ = + label_boundary == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; + ++output_index_column_row_count; + mapping.push_back({col_idx, static_cast(it->idx())}); + current_bucket_added_to_index = true; } } if (!reached_end_of_buckets && it != cend) { - // Stopped because *it > date_range.second; subsequent values do not feed any bucket. - close_cursor = {col_idx, offset_in_col}; + // Stopped because *it > date_range.second + close_cursor = {col_idx, static_cast(it->idx())}; reached_end_of_buckets = true; } } diff --git a/cpp/arcticdb/processing/test/test_resample.cpp b/cpp/arcticdb/processing/test/test_resample.cpp index a89c31bb631..e78f0a0803a 100644 --- a/cpp/arcticdb/processing/test/test_resample.cpp +++ b/cpp/arcticdb/processing/test/test_resample.cpp @@ -462,7 +462,7 @@ std::optional compute_output_column( const std::vector& bucket_boundaries, ResampleBoundary label_boundary ) { const std::vector> idx_cols(input_index_columns.begin(), input_index_columns.end()); - const TimestampRange full_range{std::numeric_limits::min(), std::numeric_limits::max()}; + const TimestampRange full_range{std::numeric_limits::min() + 1, std::numeric_limits::max()}; auto [_, mapping] = generate_output_index_column(idx_cols, bucket_boundaries, full_range, label_boundary); return aggregator.generate_resampling_output_column(input_agg_columns, mapping); From cc25a4ba866f1f2901182d4fca9b1d667e920c8d Mon Sep 17 00:00:00 2001 From: Ivo Date: Thu, 30 Apr 2026 08:45:46 +0100 Subject: [PATCH 5/8] Preallocate min(num_buckets, num_input_rows) Helps speed up and decrease memory usage for the very rare case where num_buckets >> num_input_rows. --- cpp/arcticdb/processing/sorted_aggregation.cpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index c31debf48a7..8b7d2c4dd89 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -83,7 +83,13 @@ std::pair, ResampleMapping> generate_output_index_column constexpr auto data_type = DataType::NANOSECONDS_UTC64; using IndexTDT = ScalarTagType>; - const auto max_index_column_bytes = (bucket_boundaries.size() - 1) * get_type_size(data_type); + // The output row count is bounded by both the number of buckets and the number of input rows + size_t total_input_rows{0}; + for (const auto& col : input_index_columns) { + total_input_rows += col->row_count(); + } + const auto max_output_rows = std::min(bucket_boundaries.size() - 1, total_input_rows); + const auto max_index_column_bytes = max_output_rows * get_type_size(data_type); auto output_index_column = std::make_shared( TypeDescriptor(data_type, Dimension::Dim0), Sparsity::NOT_PERMITTED, @@ -94,7 +100,7 @@ std::pair, ResampleMapping> generate_output_index_column size_t output_index_column_row_count{0}; ResampleMapping mapping; - mapping.reserve(bucket_boundaries.size()); + mapping.reserve(max_output_rows + 1); // Largest value contained in the bucket that ends at `bucket_end_value`. // LEFT-closed [_, end) → end - 1; RIGHT-closed (_, end] → end. From 831b1aa70cdb1df69024e2e4acb2e7384b7eee4f Mon Sep 17 00:00:00 2001 From: Ivo Date: Thu, 14 May 2026 12:21:41 +0100 Subject: [PATCH 6/8] Use heuristic for choosing between exponential_search and linear_scan With benchmarking of various rows_per_bucket it was confirmed that exponential_search becomes faster than linear scan at around 32 elements. For <32 rows per bucket the linear pass is faster. For >32 the exponential search is faster. --- .../processing/sorted_aggregation.cpp | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index 8b7d2c4dd89..b225fcc35d6 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -89,6 +89,10 @@ std::pair, ResampleMapping> generate_output_index_column total_input_rows += col->row_count(); } const auto max_output_rows = std::min(bucket_boundaries.size() - 1, total_input_rows); + // Below this rows/bucket threshold, element-by-element iteration benchmarked to beat galloping search. + constexpr size_t linear_scan_threshold = 32; + const size_t num_buckets = bucket_boundaries.size() > 1 ? bucket_boundaries.size() - 1 : 1; + const bool use_linear_scan = (total_input_rows / num_buckets) < linear_scan_threshold; const auto max_index_column_bytes = max_output_rows * get_type_size(data_type); auto output_index_column = std::make_shared( TypeDescriptor(data_type, Dimension::Dim0), @@ -129,15 +133,19 @@ std::pair, ResampleMapping> generate_output_index_column it, cend, std::max(date_range.first - 1, inclusive_bucket_end(*std::prev(bucket_end_it))) ); - // Advances `it` to beginning of next bucket or first element after `date_range.second` auto advance_it = [&]() { - // At the end of `for` it is guaranteed that it points before bucket_end_it, so we need to advance by at - // least one. - ++it; - it = exponential_upper_bound(it, cend, std::min(date_range.second, inclusive_bucket_end(*bucket_end_it))); + const auto advance_until = std::min(date_range.second, inclusive_bucket_end(*bucket_end_it)); + if (use_linear_scan) { + while (ARCTICDB_LIKELY(it != cend && it->value() <= advance_until)) { + ++it; + } + } else { + ++it; + it = exponential_upper_bound(it, cend, advance_until); + } }; for (; it != cend && it->value() <= date_range.second; advance_it()) { - if (!current_bucket.contains(it->value())) { + if (ARCTICDB_LIKELY(!current_bucket.contains(it->value()))) { advance_boundary_past_value(bucket_boundaries, bucket_end_it, it->value()); if (bucket_end_it == bucket_boundaries.end()) { @@ -150,7 +158,7 @@ std::pair, ResampleMapping> generate_output_index_column current_bucket_added_to_index = false; } - if (!current_bucket_added_to_index) { + if (ARCTICDB_LIKELY(!current_bucket_added_to_index)) { *output_index_column_it++ = label_boundary == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; ++output_index_column_row_count; From a45ac0a564917e1e77d751c91ea265dc731b937f Mon Sep 17 00:00:00 2001 From: Ivo Date: Thu, 30 Apr 2026 10:13:33 +0100 Subject: [PATCH 7/8] Enable resampling on sparse agg columns Construct output agg column based on rs_index of input sparse columns. Then use sparse iterators to populate the values. --- cpp/arcticdb/processing/clause_resample.cpp | 3 +- .../processing/sorted_aggregation.cpp | 96 ++++++++++++------- .../processing/sorted_aggregation.hpp | 4 +- .../version_store/test_arrow_sparse.py | 83 ++++++++++++---- .../arcticdb/version_store/test_resample.py | 20 ---- 5 files changed, 129 insertions(+), 77 deletions(-) diff --git a/cpp/arcticdb/processing/clause_resample.cpp b/cpp/arcticdb/processing/clause_resample.cpp index 577852b3687..6705d8a257d 100644 --- a/cpp/arcticdb/processing/clause_resample.cpp +++ b/cpp/arcticdb/processing/clause_resample.cpp @@ -365,8 +365,7 @@ std::vector ResampleClause::process(std::vector aggregated = - aggregator.aggregate(input_index_columns, input_agg_columns, mapping, string_pool); + std::optional aggregated = aggregator.aggregate(input_agg_columns, mapping, string_pool); if (aggregated) { seg.add_column( scalar_field(aggregated->type().data_type(), aggregator.get_output_column_name().value), diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index b225fcc35d6..50e8660970b 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -67,6 +67,9 @@ SortedAggregatorOutputColumnInfo SortedAggregatorcolumn_->type().data_type(); check_aggregator_supported_with_data_type(input_data_type); add_data_type_impl(input_data_type, output_column_info.data_type_); + if (opt_input_agg_column->column_->is_sparse()) { + output_column_info.maybe_sparse_ = true; + } } else { output_column_info.maybe_sparse_ = true; } @@ -206,6 +209,8 @@ std::optional SortedAggregator::g ); } + util::BitIndex rs_index; + ssize_t rs_index_built_for_idx = -1; util::BitSet sparse_map(output_row_count); for (int64_t out_row = 0; out_row < output_row_count; ++out_row) { const auto& start = mapping[out_row]; @@ -214,7 +219,30 @@ std::optional SortedAggregator::g for (size_t col_idx = start.input_column_idx; col_idx < last_contributing_exclusive && col_idx < input_agg_columns.size(); ++col_idx) { - if (input_agg_columns[col_idx].has_value()) { + const auto& opt_col = input_agg_columns[col_idx]; + if (!opt_col.has_value()) { + continue; + } + const auto& col = *opt_col->column_; + if (!col.is_sparse()) { + sparse_map.set(out_row); + break; + } + if (static_cast(col_idx) != rs_index_built_for_idx) { + // Build the rs_index just once per column + col.sparse_map().build_rs_index(&rs_index); + rs_index_built_for_idx = static_cast(col_idx); + } + const size_t range_start = (col_idx == start.input_column_idx) ? start.offset : 0; + const size_t range_end_exclusive = + (col_idx == end.input_column_idx) ? end.offset : static_cast(col.last_row()) + 1; + if (range_start >= range_end_exclusive) { + continue; + } + const auto cnt = col.sparse_map().count_range_no_check( + bv_size(range_start), bv_size(range_end_exclusive - 1), rs_index + ); + if (cnt > 0) { sparse_map.set(out_row); break; } @@ -237,7 +265,6 @@ std::optional SortedAggregator::g template std::optional SortedAggregator::aggregate( - const std::vector>& input_index_columns, const std::vector>& input_agg_columns, const ResampleMapping& mapping, StringPool& string_pool ) const { @@ -296,7 +323,6 @@ std::optional SortedAggregator::a continue; } const auto& agg_column = *opt_input_agg_column; - const auto& input_index_column = input_index_columns[col_idx]; details::visit_type( agg_column.column_->type().data_type(), [&, &agg_column = agg_column](auto input_type_desc_tag) { @@ -304,39 +330,41 @@ std::optional SortedAggregator::a if constexpr (is_aggregation_allowed( aggregation_operator )) { - schema::check( - !agg_column.column_->is_sparse() && - agg_column.column_->row_count() == input_index_column->row_count(), - "Not implemented yet: Cannot aggregate sparse column '{}' during " - "resampling.", - get_input_column_name().value - ); auto agg_data = agg_column.column_->data(); - auto col_it = agg_data.template cbegin< - typename input_type_info::TDT, - IteratorType::ENUMERATED, - IteratorDensity::DENSE>(); - const auto col_end = agg_data.template cend< - typename input_type_info::TDT, - IteratorType::ENUMERATED, - IteratorDensity::DENSE>(); - for (; col_it != col_end && output_it != output_end; ++col_it) { - const auto idx = static_cast(col_it->idx()); - // After advance_output, the next bucket may not include this column. - if (col_idx < start_col_idx) { - break; - } - // Skip rows before the bucket's start (e.g., right-closed bucket excluding - // its leftmost edge, or date_range trimming). - if (col_idx == start_col_idx && idx < start_col_offset) { - continue; - } - push_to_aggregator( - bucket_aggregator, col_it->value(), agg_column - ); - if (col_idx == end_col_idx && idx + 1 == end_col_offset) { - advance_output(); + const auto run_iter = [&]() { + auto col_it = agg_data.template cbegin< + typename input_type_info::TDT, + IteratorType::ENUMERATED, + input_density>(); + const auto col_end = agg_data.template cend< + typename input_type_info::TDT, + IteratorType::ENUMERATED, + input_density>(); + for (; col_it != col_end && output_it != output_end; ++col_it) { + const auto idx = static_cast(col_it->idx()); + // Finalise any buckets whose exclusive end falls at or before this row. + while (output_it != output_end && + (col_idx > end_col_idx || + (col_idx == end_col_idx && idx >= end_col_offset))) { + advance_output(); + } + if (output_it == output_end || col_idx < start_col_idx) { + break; + } + // Skip rows before the bucket's start (e.g., right-closed bucket excluding + // its leftmost edge, or date_range trimming). + if (col_idx == start_col_idx && idx < start_col_offset) { + continue; + } + push_to_aggregator( + bucket_aggregator, col_it->value(), agg_column + ); } + }; + if (agg_column.column_->is_sparse()) { + run_iter.template operator()(); + } else { + run_iter.template operator()(); } } } diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index ca176199556..377b25f7c71 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -41,11 +41,10 @@ struct ISortedAggregator { [[nodiscard]] ColumnName get_input_column_name() const { return folly::poly_call<0>(*this); }; [[nodiscard]] ColumnName get_output_column_name() const { return folly::poly_call<1>(*this); }; [[nodiscard]] std::optional aggregate( - const std::vector>& input_index_columns, const std::vector>& input_agg_columns, const ResampleMapping& mapping, StringPool& string_pool ) const { - return folly::poly_call<2>(*this, input_index_columns, input_agg_columns, mapping, string_pool); + return folly::poly_call<2>(*this, input_agg_columns, mapping, string_pool); } void check_aggregator_supported_with_data_type(DataType data_type) const { folly::poly_call<3>(*this, data_type); @@ -398,7 +397,6 @@ class SortedAggregator { [[nodiscard]] ColumnName get_output_column_name() const { return output_column_name_; } [[nodiscard]] std::optional aggregate( - const std::vector>& input_index_columns, const std::vector>& input_agg_columns, const ResampleMapping& mapping, StringPool& string_pool ) const; diff --git a/python/tests/unit/arcticdb/version_store/test_arrow_sparse.py b/python/tests/unit/arcticdb/version_store/test_arrow_sparse.py index 650402a4adc..e5d8a4c5e0e 100644 --- a/python/tests/unit/arcticdb/version_store/test_arrow_sparse.py +++ b/python/tests/unit/arcticdb/version_store/test_arrow_sparse.py @@ -766,39 +766,70 @@ def test_named_aggs(self, group_col): _check_query_result(self.lib, self.sym, q, expected, count_columns=count_columns, check_row_order=False) -@pytest.mark.xfail( - reason="Resample rejects sparse columns. (monday ref: 11679866800)", - raises=Exception, -) class TestSparseArrowResample: sym = "test_sparse_resample" - # TODO: Also add testing for: - # - Aggregating string, datetime columns - # - Other resampling kwargs like offset, origin + # TODO: `origin` = one of (start_day, end, end_day, explicit timestamp) doesn't have polars equvalent. + # Add sparse coverage for those origin values + + @staticmethod + def _polars_agg_expr(col, op, alias=None): + # Applys `op` on the resampled column `col` after `drop_nulls()`. + # `drop_nulls` is required to match pandas resampling behavior for `first` and `last`. + expr = getattr(pl.col(col).drop_nulls(), op)() + return expr.alias(alias) if alias is not None else expr @pytest.fixture(autouse=True) def setup(self, in_memory_store_factory): self.lib = in_memory_store_factory(segment_row_size=4) self.lib.set_output_format(OutputFormat.POLARS) self.lib._set_allow_arrow_input() - # Bucket 2 (hours 6-8) is fully null for both columns when resampled at 3h + # First timestamp is 00:15 (mid-bucket) so `origin="start"` produces different bucket + # boundaries than the default `origin="epoch"`. + # Bucket 2 (hours 6-8) is fully null for every column when resampled at 3h + ts = pd.date_range("2025-01-01", periods=12, freq="h").to_list() + ts[0] = pd.Timestamp("2025-01-01 00:15") self.pldf = pl.DataFrame( { - "ts": pd.date_range("2025-01-01", periods=12, freq="h"), + "ts": ts, "int_col": [1, None, 3, None, 5, None, None, None, None, None, 11, None], "float_col": [None, 2.0, None, 4.0, None, 6.0, None, None, None, 10.0, None, 12.0], + "datetime_col": [ + None, + pd.Timestamp("2025-02-02"), + None, + pd.Timestamp("2025-02-04"), + None, + pd.Timestamp("2025-02-06"), + None, + None, + None, + pd.Timestamp("2025-02-10"), + None, + pd.Timestamp("2025-02-12"), + ], + "str_col": [None, "b", None, "d", None, "f", None, None, None, "j", None, "l"], }, - schema_overrides={"ts": pl.Datetime("ns")}, + schema_overrides={"ts": pl.Datetime("ns"), "datetime_col": pl.Datetime("ns")}, ) self.lib.write(self.sym, self.pldf, index_column=True) @pytest.mark.parametrize("agg_op", ["sum", "mean", "min", "max", "first", "last", "count"]) - @pytest.mark.parametrize("agg_col", ["int_col", "float_col"]) + @pytest.mark.parametrize("agg_col", ["int_col", "float_col", "datetime_col"]) @pytest.mark.parametrize("rule", ["3h", "6h"]) def test_single_agg(self, agg_op, agg_col, rule): + if agg_col == "datetime_col" and agg_op == "sum": + pytest.skip("sum is not a valid aggregation on a datetime column") q = QueryBuilder().resample(rule).agg({agg_col: agg_op}) - expected = self.pldf.group_by_dynamic("ts", every=rule).agg(getattr(pl.col(agg_col), agg_op)()) + expected = self.pldf.group_by_dynamic("ts", every=rule).agg(self._polars_agg_expr(agg_col, agg_op)) + count_columns = [agg_col] if agg_op == "count" else None + _check_query_result(self.lib, self.sym, q, expected, count_columns=count_columns) + + @pytest.mark.parametrize("agg_op", ["first", "last"]) + @pytest.mark.parametrize("rule", ["3h", "6h"]) + def test_string_agg(self, agg_op, rule): + q = QueryBuilder().resample(rule).agg({"str_col": agg_op}) + expected = self.pldf.group_by_dynamic("ts", every=rule).agg(self._polars_agg_expr("str_col", agg_op)) _check_query_result(self.lib, self.sym, q, expected) @pytest.mark.parametrize("rule", ["3h", "6h"]) @@ -808,11 +839,15 @@ def test_named_aggs(self, rule): "float_max": ("float_col", "max"), "int_count": ("int_col", "count"), "float_first": ("float_col", "first"), + "datetime_mean": ("datetime_col", "mean"), + "datetime_first": ("datetime_col", "first"), + "str_last": ("str_col", "last"), } q = QueryBuilder().resample(rule).agg(agg_dict) - exprs = [getattr(pl.col(col), op)().alias(name) for name, (col, op) in agg_dict.items()] + exprs = [self._polars_agg_expr(col, op, name) for name, (col, op) in agg_dict.items()] expected = self.pldf.group_by_dynamic("ts", every=rule).agg(exprs) - _check_query_result(self.lib, self.sym, q, expected) + count_columns = [name for name, (_, op) in agg_dict.items() if op == "count"] or None + _check_query_result(self.lib, self.sym, q, expected, count_columns=count_columns) @pytest.mark.parametrize("closed", ["left", "right"]) @pytest.mark.parametrize("label", ["left", "right"]) @@ -821,6 +856,22 @@ def test_closed_label(self, closed, label): expected = self.pldf.group_by_dynamic("ts", every="3h", closed=closed, label=label).agg(pl.col("int_col").sum()) _check_query_result(self.lib, self.sym, q, expected) + # pandas/arcticdb use "min" as the minute unit; polars uses "m". Same offset, different spelling. + @pytest.mark.parametrize("pd_offset,pl_offset", [("30min", "30m"), ("1h", "1h")]) + @pytest.mark.parametrize("rule", ["3h", "6h"]) + def test_offset(self, pd_offset, pl_offset, rule): + q = QueryBuilder().resample(rule, offset=pd_offset).agg({"int_col": "sum"}) + expected = self.pldf.group_by_dynamic("ts", every=rule, offset=pl_offset).agg(pl.col("int_col").sum()) + _check_query_result(self.lib, self.sym, q, expected) + + @pytest.mark.parametrize("rule", ["3h", "6h"]) + def test_origin_start(self, rule): + # Uses the shared pldf — its first timestamp is 00:15 so bucket boundaries with + # origin="start" (00:15, 03:15, ...) differ from the default origin="epoch" (00:00, 03:00, ...). + q = QueryBuilder().resample(rule, origin="start").agg({"int_col": "sum"}) + expected = self.pldf.group_by_dynamic("ts", every=rule, start_by="datapoint").agg(pl.col("int_col").sum()) + _check_query_result(self.lib, self.sym, q, expected) + def test_with_date_range(self): start, end = pd.Timestamp("2025-01-01 03:00"), pd.Timestamp("2025-01-01 08:00") q = QueryBuilder().date_range((start, end)).resample("3h").agg({"int_col": "sum"}) @@ -899,10 +950,6 @@ def test_concat_with_index(self): expected = pl.concat([t1, t2]) polars_assert_frame_equal(received, expected) - @pytest.mark.xfail( - reason="Resample rejects sparse columns: sorted_aggregation.cpp 'Cannot aggregate column as it is sparse'", - raises=Exception, - ) def test_concat_with_resample(self): t1 = pl.DataFrame( {"ts": pd.date_range("2025-01-01", periods=6, freq="h"), "val": [1, None, 3, None, 5, None]}, diff --git a/python/tests/unit/arcticdb/version_store/test_resample.py b/python/tests/unit/arcticdb/version_store/test_resample.py index 42a74ffd8bb..2bc3c6e6f72 100644 --- a/python/tests/unit/arcticdb/version_store/test_resample.py +++ b/python/tests/unit/arcticdb/version_store/test_resample.py @@ -566,26 +566,6 @@ def test_resampling_unsupported_aggregation_type_combos(lmdb_version_store_v1, a lib.read(sym, query_builder=q) -def test_resampling_sparse_data(lmdb_version_store_v1, any_output_format): - lib = lmdb_version_store_v1 - lib._set_output_format_for_pipeline_tests(any_output_format) - sym = "test_resampling_sparse_data" - - # col_1 will be dense, but with fewer rows than the index column, and so semantically sparse - data = {"col_0": [np.nan, 1.0], "col_1": [2.0, np.nan]} - lib.write(sym, pd.DataFrame(data, index=[pd.Timestamp(0), pd.Timestamp(1000)]), sparsify_floats=True) - - q = QueryBuilder() - q = q.resample("us").agg({"col_0": "sum"}) - with pytest.raises(SchemaException): - lib.read(sym, query_builder=q) - - q = QueryBuilder() - q = q.resample("s").agg({"col_1": "sum"}) - with pytest.raises(SchemaException): - lib.read(sym, query_builder=q) - - def test_resampling_empty_type_column(lmdb_version_store_empty_types_v1, any_output_format): lib = lmdb_version_store_empty_types_v1 lib._set_output_format_for_pipeline_tests(any_output_format) From 89d9fd838c06ffd77b2b050caa295257374ca912 Mon Sep 17 00:00:00 2001 From: Ivo Date: Thu, 21 May 2026 15:18:41 +0100 Subject: [PATCH 8/8] Address Alex's comments --- cpp/arcticdb/processing/sorted_aggregation.cpp | 17 +++++++++++------ cpp/arcticdb/processing/test/test_resample.cpp | 1 + 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index 50e8660970b..afed9a656ad 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -87,15 +87,20 @@ std::pair, ResampleMapping> generate_output_index_column using IndexTDT = ScalarTagType>; // The output row count is bounded by both the number of buckets and the number of input rows - size_t total_input_rows{0}; - for (const auto& col : input_index_columns) { - total_input_rows += col->row_count(); - } + const size_t total_input_rows = std::accumulate( + input_index_columns.begin(), + input_index_columns.end(), + size_t{0}, + [](size_t acc, const auto& col) { return acc + col->row_count(); } + ); const auto max_output_rows = std::min(bucket_boundaries.size() - 1, total_input_rows); // Below this rows/bucket threshold, element-by-element iteration benchmarked to beat galloping search. + // The threshold assumes rows are distributed uniformly across buckets; the less uniform the distribution, the more + // galloping search wins, so this is a conservative threshold. constexpr size_t linear_scan_threshold = 32; - const size_t num_buckets = bucket_boundaries.size() > 1 ? bucket_boundaries.size() - 1 : 1; - const bool use_linear_scan = (total_input_rows / num_buckets) < linear_scan_threshold; + const size_t num_buckets = bucket_boundaries.size() - 1; + // Equivalent to `total_input_rows / num_buckets < linear_scan_threshold` and avoids potential division by zero. + const bool use_linear_scan = total_input_rows < linear_scan_threshold * num_buckets; const auto max_index_column_bytes = max_output_rows * get_type_size(data_type); auto output_index_column = std::make_shared( TypeDescriptor(data_type, Dimension::Dim0), diff --git a/cpp/arcticdb/processing/test/test_resample.cpp b/cpp/arcticdb/processing/test/test_resample.cpp index e78f0a0803a..fec832fbc1c 100644 --- a/cpp/arcticdb/processing/test/test_resample.cpp +++ b/cpp/arcticdb/processing/test/test_resample.cpp @@ -462,6 +462,7 @@ std::optional compute_output_column( const std::vector& bucket_boundaries, ResampleBoundary label_boundary ) { const std::vector> idx_cols(input_index_columns.begin(), input_index_columns.end()); + // Using `min()+1` to avoid underflow when calculating exclusive min timestamp in resample code. const TimestampRange full_range{std::numeric_limits::min() + 1, std::numeric_limits::max()}; auto [_, mapping] = generate_output_index_column(idx_cols, bucket_boundaries, full_range, label_boundary);