diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index c6d1e14f31f..167a5fe2b10 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -1243,6 +1243,7 @@ if(${TEST}) processing/test/benchmark_binary.cpp processing/test/benchmark_clause.cpp processing/test/benchmark_common.cpp + processing/test/benchmark_resample.cpp processing/test/benchmark_ternary.cpp util/test/benchmark_bitset.cpp version/test/benchmark_write.cpp diff --git a/cpp/arcticdb/column_store/column_algorithms.hpp b/cpp/arcticdb/column_store/column_algorithms.hpp index 4c2d59a6ef5..81c1c74e76a 100644 --- a/cpp/arcticdb/column_store/column_algorithms.hpp +++ b/cpp/arcticdb/column_store/column_algorithms.hpp @@ -8,7 +8,9 @@ #pragma once +#include #include +#include #include #include diff --git a/cpp/arcticdb/processing/clause.hpp b/cpp/arcticdb/processing/clause.hpp index 734dd24a409..288720e06e7 100644 --- a/cpp/arcticdb/processing/clause.hpp +++ b/cpp/arcticdb/processing/clause.hpp @@ -438,11 +438,6 @@ struct ResampleClause { std::vector generate_bucket_boundaries( timestamp first_ts, timestamp last_ts, bool responsible_for_first_overlapping_bucket ) const; - - std::shared_ptr generate_output_index_column( - const std::vector>& input_index_columns, - const std::vector& bucket_boundaries - ) const; }; template diff --git a/cpp/arcticdb/processing/clause_resample.cpp b/cpp/arcticdb/processing/clause_resample.cpp index dbab71bb5c3..6705d8a257d 100644 --- a/cpp/arcticdb/processing/clause_resample.cpp +++ b/cpp/arcticdb/processing/clause_resample.cpp @@ -328,13 +328,9 @@ std::vector ResampleClause::process(std::vectorat(0)->column_ptr(0)); } - const auto output_index_column = generate_output_index_column(input_index_columns, bucket_boundaries); - // Bucket boundaries can be wider than the date range specified by the user, narrow the first and last buckets - // here if necessary - bucket_boundaries.front( - ) = std::max(bucket_boundaries.front(), date_range_->first - (closed_boundary == ResampleBoundary::RIGHT ? 1 : 0)); - bucket_boundaries.back( - ) = std::min(bucket_boundaries.back(), date_range_->second + (closed_boundary == ResampleBoundary::LEFT ? 1 : 0)); + const auto [output_index_column, mapping] = generate_output_index_column( + input_index_columns, bucket_boundaries, *date_range_, label_boundary_ + ); SegmentInMemory seg; RowRange output_row_range( row_slices.front().row_ranges_->at(0)->start(), @@ -369,14 +365,7 @@ std::vector ResampleClause::process(std::vector aggregated = aggregator.aggregate( - input_index_columns, - input_agg_columns, - bucket_boundaries, - *output_index_column, - string_pool, - label_boundary_ - ); + std::optional aggregated = aggregator.aggregate(input_agg_columns, mapping, string_pool); if (aggregated) { seg.add_column( scalar_field(aggregated->type().data_type(), aggregator.get_output_column_name().value), @@ -435,67 +424,6 @@ std::vector ResampleClause::generate_bucket_boundari return bucket_boundaries; } -template -std::shared_ptr ResampleClause::generate_output_index_column( - const std::vector>& input_index_columns, const std::vector& bucket_boundaries -) const { - constexpr auto data_type = DataType::NANOSECONDS_UTC64; - using IndexTDT = ScalarTagType>; - - const auto max_index_column_bytes = (bucket_boundaries.size() - 1) * get_type_size(data_type); - auto output_index_column = std::make_shared( - TypeDescriptor(data_type, Dimension::Dim0), - Sparsity::NOT_PERMITTED, - ChunkedBuffer::presized_in_blocks(max_index_column_bytes) - ); - auto output_index_column_data = output_index_column->data(); - auto output_index_column_it = output_index_column_data.template begin(); - size_t output_index_column_row_count{0}; - - auto bucket_end_it = std::next(bucket_boundaries.cbegin()); - Bucket current_bucket{*std::prev(bucket_end_it), *bucket_end_it}; - bool current_bucket_added_to_index{false}; - // Only include buckets that have at least one index value in range - for (const auto& input_index_column : input_index_columns) { - auto index_column_data = input_index_column->data(); - const auto cend = index_column_data.cend(); - auto it = index_column_data.cbegin(); - // In case the passed date_range does not span the whole segment we need to skip the index values - // which are before the date range start. - while (it != cend && *it < date_range_->first) { - ++it; - } - for (; it != cend && *it <= date_range_->second; ++it) { - if (ARCTICDB_LIKELY(current_bucket.contains(*it))) { - if (ARCTICDB_UNLIKELY(!current_bucket_added_to_index)) { - *output_index_column_it++ = - label_boundary_ == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; - ++output_index_column_row_count; - current_bucket_added_to_index = true; - } - } else { - advance_boundary_past_value(bucket_boundaries, bucket_end_it, *it); - if (ARCTICDB_UNLIKELY(bucket_end_it == bucket_boundaries.end())) { - break; - } else { - current_bucket.set_boundaries(*std::prev(bucket_end_it), *bucket_end_it); - current_bucket_added_to_index = false; - if (ARCTICDB_LIKELY(current_bucket.contains(*it))) { - *output_index_column_it++ = - label_boundary_ == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; - ++output_index_column_row_count; - current_bucket_added_to_index = true; - } - } - } - } - } - const auto actual_index_column_bytes = output_index_column_row_count * get_type_size(data_type); - output_index_column->buffer().trim(actual_index_column_bytes); - output_index_column->set_row_data(output_index_column_row_count - 1); - return output_index_column; -} - template struct ResampleClause; template struct ResampleClause; diff --git a/cpp/arcticdb/processing/sorted_aggregation.cpp b/cpp/arcticdb/processing/sorted_aggregation.cpp index 0c2d1e42898..afed9a656ad 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.cpp +++ b/cpp/arcticdb/processing/sorted_aggregation.cpp @@ -6,8 +6,10 @@ * will be governed by the Apache License, version 2.0. */ +#include #include #include +#include #include #include #include @@ -65,6 +67,9 @@ SortedAggregatorOutputColumnInfo SortedAggregatorcolumn_->type().data_type(); check_aggregator_supported_with_data_type(input_data_type); add_data_type_impl(input_data_type, output_column_info.data_type_); + if (opt_input_agg_column->column_->is_sparse()) { + output_column_info.maybe_sparse_ = true; + } } else { output_column_info.maybe_sparse_ = true; } @@ -73,76 +78,183 @@ SortedAggregatorOutputColumnInfo SortedAggregator -bool value_past_bucket_start(const timestamp bucket_start, const timestamp value) { - if constexpr (closed_boundary == ResampleBoundary::LEFT) { - return value >= bucket_start; +std::pair, ResampleMapping> generate_output_index_column( + const std::vector>& input_index_columns, + const std::vector& bucket_boundaries, const TimestampRange& date_range, + const ResampleBoundary label_boundary +) { + constexpr auto data_type = DataType::NANOSECONDS_UTC64; + using IndexTDT = ScalarTagType>; + + // The output row count is bounded by both the number of buckets and the number of input rows + const size_t total_input_rows = std::accumulate( + input_index_columns.begin(), + input_index_columns.end(), + size_t{0}, + [](size_t acc, const auto& col) { return acc + col->row_count(); } + ); + const auto max_output_rows = std::min(bucket_boundaries.size() - 1, total_input_rows); + // Below this rows/bucket threshold, element-by-element iteration benchmarked to beat galloping search. + // The threshold assumes rows are distributed uniformly across buckets; the less uniform the distribution, the more + // galloping search wins, so this is a conservative threshold. + constexpr size_t linear_scan_threshold = 32; + const size_t num_buckets = bucket_boundaries.size() - 1; + // Equivalent to `total_input_rows / num_buckets < linear_scan_threshold` and avoids potential division by zero. + const bool use_linear_scan = total_input_rows < linear_scan_threshold * num_buckets; + const auto max_index_column_bytes = max_output_rows * get_type_size(data_type); + auto output_index_column = std::make_shared( + TypeDescriptor(data_type, Dimension::Dim0), + Sparsity::NOT_PERMITTED, + ChunkedBuffer::presized_in_blocks(max_index_column_bytes) + ); + auto output_index_column_data = output_index_column->data(); + auto output_index_column_it = output_index_column_data.template begin(); + size_t output_index_column_row_count{0}; + + ResampleMapping mapping; + mapping.reserve(max_output_rows + 1); + + // Largest value contained in the bucket that ends at `bucket_end_value`. + // LEFT-closed [_, end) → end - 1; RIGHT-closed (_, end] → end. + constexpr auto inclusive_bucket_end = [](timestamp bucket_end_value) { + if constexpr (closed_boundary == ResampleBoundary::LEFT) { + return bucket_end_value - 1; + } else { + return bucket_end_value; + } + }; + + auto bucket_end_it = std::next(bucket_boundaries.cbegin()); + Bucket current_bucket{*std::prev(bucket_end_it), *bucket_end_it}; + bool current_bucket_added_to_index{false}; + bool reached_end_of_buckets{false}; + ResampleInputCursor close_cursor{input_index_columns.size(), 0}; + for (auto&& [col_idx, input_index_column] : folly::enumerate(input_index_columns)) { + if (reached_end_of_buckets) { + break; + } + auto index_column_data = input_index_column->data(); + const auto cend = index_column_data.template cend(); + auto it = index_column_data.template cbegin(); + // Skip rows before the date range start and rows before the current bucket + it = exponential_upper_bound( + it, cend, std::max(date_range.first - 1, inclusive_bucket_end(*std::prev(bucket_end_it))) + ); + + auto advance_it = [&]() { + const auto advance_until = std::min(date_range.second, inclusive_bucket_end(*bucket_end_it)); + if (use_linear_scan) { + while (ARCTICDB_LIKELY(it != cend && it->value() <= advance_until)) { + ++it; + } + } else { + ++it; + it = exponential_upper_bound(it, cend, advance_until); + } + }; + for (; it != cend && it->value() <= date_range.second; advance_it()) { + if (ARCTICDB_LIKELY(!current_bucket.contains(it->value()))) { + advance_boundary_past_value(bucket_boundaries, bucket_end_it, it->value()); + + if (bucket_end_it == bucket_boundaries.end()) { + close_cursor = {col_idx, static_cast(it->idx())}; + reached_end_of_buckets = true; + break; + } + + current_bucket.set_boundaries(*std::prev(bucket_end_it), *bucket_end_it); + current_bucket_added_to_index = false; + } + + if (ARCTICDB_LIKELY(!current_bucket_added_to_index)) { + *output_index_column_it++ = + label_boundary == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it; + ++output_index_column_row_count; + mapping.push_back({col_idx, static_cast(it->idx())}); + current_bucket_added_to_index = true; + } + } + if (!reached_end_of_buckets && it != cend) { + // Stopped because *it > date_range.second + close_cursor = {col_idx, static_cast(it->idx())}; + reached_end_of_buckets = true; + } } - return value > bucket_start; + mapping.push_back(close_cursor); + const auto actual_index_column_bytes = output_index_column_row_count * get_type_size(data_type); + output_index_column->buffer().trim(actual_index_column_bytes); + output_index_column->set_row_data(output_index_column_row_count - 1); + return {std::move(output_index_column), std::move(mapping)}; } +template std::pair, ResampleMapping> generate_output_index_column( + const std::vector>&, const std::vector&, const TimestampRange&, + ResampleBoundary +); +template std::pair, ResampleMapping> generate_output_index_column( + const std::vector>&, const std::vector&, const TimestampRange&, + ResampleBoundary +); + template std::optional SortedAggregator::generate_resampling_output_column( - [[maybe_unused]] const std::span> input_index_columns, - const std::span> input_agg_columns, const Column& output_index_column, - [[maybe_unused]] const ResampleBoundary label + const std::span> input_agg_columns, const ResampleMapping& mapping ) const { - using IndexTDT = ScalarTagType>; const SortedAggregatorOutputColumnInfo type_info = generate_common_input_type(input_agg_columns); - if (!type_info.data_type_) { return std::nullopt; } - + const int64_t output_row_count = static_cast(mapping.size()) - 1; if (!type_info.maybe_sparse_) { return Column( make_scalar_type(generate_output_data_type(*type_info.data_type_)), - output_index_column.row_count(), + output_row_count, AllocationType::PRESIZED, Sparsity::NOT_PERMITTED ); } - auto output_data = output_index_column.data(); - const auto output_accessor = random_accessor(&output_data); - util::BitSet sparse_map(output_index_column.row_count()); - int64_t output_row = 0; - int64_t output_row_prev = 0; - - for (auto&& [col_index, input_index_column] : folly::enumerate(input_index_columns)) { - // Skip all labels that come before the first index value in the input column - const timestamp first_index_value = *(input_index_column->template begin()); - while (output_row < output_index_column.row_count() && - value_past_bucket_start(output_accessor.at(output_row), first_index_value)) { - ++output_row; - } - // If label is left this means the "bucket" is represented by the start of the interval, thus the loop above - // skipped to the beginning of the next bucket. - output_row = std::max(int64_t{0}, output_row - (label == ResampleBoundary::LEFT)); - output_row_prev = output_row; - - // Compute how many output index values does the column span - const timestamp last_index_value = - *(input_index_column->template begin() + (input_index_column->row_count() - 1)); - while (output_row < output_index_column.row_count() && - value_past_bucket_start(output_accessor.at(output_row), last_index_value)) { - ++output_row; - } - output_row = std::max(int64_t{0}, output_row - (label == ResampleBoundary::LEFT)); - - if (input_agg_columns[col_index]) { - // This can happen when a column has values in the last bucket and there are columns after that which are - // also in the last bucket. There's no need to iterate over the rest columns as we only need to know - // if there's something in the bucket. - if (output_row >= sparse_map.size()) { - sparse_map.set_range(output_row_prev, sparse_map.size() - 1); + util::BitIndex rs_index; + ssize_t rs_index_built_for_idx = -1; + util::BitSet sparse_map(output_row_count); + for (int64_t out_row = 0; out_row < output_row_count; ++out_row) { + const auto& start = mapping[out_row]; + const auto& end = mapping[out_row + 1]; + const size_t last_contributing_exclusive = end.input_column_idx + (end.offset > 0 ? 1 : 0); + for (size_t col_idx = start.input_column_idx; + col_idx < last_contributing_exclusive && col_idx < input_agg_columns.size(); + ++col_idx) { + const auto& opt_col = input_agg_columns[col_idx]; + if (!opt_col.has_value()) { + continue; + } + const auto& col = *opt_col->column_; + if (!col.is_sparse()) { + sparse_map.set(out_row); + break; + } + if (static_cast(col_idx) != rs_index_built_for_idx) { + // Build the rs_index just once per column + col.sparse_map().build_rs_index(&rs_index); + rs_index_built_for_idx = static_cast(col_idx); + } + const size_t range_start = (col_idx == start.input_column_idx) ? start.offset : 0; + const size_t range_end_exclusive = + (col_idx == end.input_column_idx) ? end.offset : static_cast(col.last_row()) + 1; + if (range_start >= range_end_exclusive) { + continue; + } + const auto cnt = col.sparse_map().count_range_no_check( + bv_size(range_start), bv_size(range_end_exclusive - 1), rs_index + ); + if (cnt > 0) { + sparse_map.set(out_row); break; } - sparse_map.set_range(output_row_prev, output_row); } - output_row_prev = output_row; } const Sparsity sparsity = sparse_map.count() == sparse_map.size() ? Sparsity::NOT_PERMITTED : Sparsity::PERMITTED; - const int64_t row_count = sparsity == Sparsity::PERMITTED ? sparse_map.count() : output_index_column.row_count(); + const int64_t row_count = sparsity == Sparsity::PERMITTED ? sparse_map.count() : output_row_count; Column result( make_scalar_type(generate_output_data_type(*type_info.data_type_)), row_count, @@ -152,134 +264,127 @@ std::optional SortedAggregator::g if (sparsity == Sparsity::PERMITTED) { result.set_sparse_map(std::move(sparse_map)); } - result.set_row_data(output_index_column.row_count() - 1); + result.set_row_data(output_row_count - 1); return result; } template std::optional SortedAggregator::aggregate( - const std::vector>& input_index_columns, - const std::vector>& input_agg_columns, - const std::vector& bucket_boundaries, const Column& output_index_column, StringPool& string_pool, - const ResampleBoundary label + const std::vector>& input_agg_columns, const ResampleMapping& mapping, + StringPool& string_pool ) const { - using IndexTDT = ScalarTagType>; - std::optional res = - generate_resampling_output_column(input_index_columns, input_agg_columns, output_index_column, label); + std::optional res = generate_resampling_output_column(input_agg_columns, mapping); if (!res) { return std::nullopt; } details::visit_type(res->type().data_type(), [&](auto output_type_desc_tag) { using output_type_info = ScalarTypeInfo; - auto output_data = res->data(); - auto output_it = output_data.begin(); - auto output_end_it = output_data.end(); // Need this here to only generate valid get_bucket_aggregator code, exception will have been thrown earlier at // runtime if constexpr (is_aggregation_allowed(aggregation_operator)) { auto bucket_aggregator = get_bucket_aggregator(); - bool reached_end_of_buckets{false}; - auto bucket_start_it = bucket_boundaries.cbegin(); - auto bucket_end_it = std::next(bucket_start_it); - Bucket current_bucket(*bucket_start_it, *bucket_end_it); - bool bucket_has_values{false}; - const auto bucket_boundaries_end = bucket_boundaries.cend(); - for (auto [idx, input_agg_column] : folly::enumerate(input_agg_columns)) { - // If input_agg_column is std::nullopt this means that the aggregated column is missing from the - // segment. This means that there is no way we can push in the aggregator. The only thing that must - // be done is skipping buckets and (if needed) finalize the aggregator but that is covered by the - // else if (index_value_past_end_of_bucket(*index_it, current_bucket.end())) && output_it != - // output_end_it) below. This works because the sparse structure of the output column is precomputed by - // generate_resampling_output_column and the column data is pre-allocated. - if (input_agg_column.has_value()) { + auto output_data = res->data(); + const auto run = [&]() { + auto output_it = output_data.template begin< + typename output_type_info::TDT, + IteratorType::ENUMERATED, + output_density>(); + const auto output_end = + output_data + .template end( + ); + if (output_it == output_end) { + return; + } + size_t start_col_idx = mapping[output_it->idx()].input_column_idx; + size_t start_col_offset = mapping[output_it->idx()].offset; + size_t end_col_idx = mapping[output_it->idx() + 1].input_column_idx; + size_t end_col_offset = mapping[output_it->idx() + 1].offset; + const auto advance_output = [&]() { + output_it->value() = + finalize_aggregator(bucket_aggregator, string_pool); + ++output_it; + if (output_it != output_end) { + start_col_idx = mapping[output_it->idx()].input_column_idx; + start_col_offset = mapping[output_it->idx()].offset; + end_col_idx = mapping[output_it->idx() + 1].input_column_idx; + end_col_offset = mapping[output_it->idx() + 1].offset; + } + }; + for (size_t col_idx = 0; col_idx < input_agg_columns.size() && output_it != output_end; ++col_idx) { + // Finalise any buckets whose exclusive end falls at or before the start of this column. + while (output_it != output_end && + (col_idx > end_col_idx || (col_idx == end_col_idx && end_col_offset == 0))) { + advance_output(); + } + if (output_it == output_end) { + break; + } + if (col_idx < start_col_idx) { + continue; + } + const auto& opt_input_agg_column = input_agg_columns[col_idx]; + if (!opt_input_agg_column.has_value()) { + continue; + } + const auto& agg_column = *opt_input_agg_column; details::visit_type( - input_agg_column->column_->type().data_type(), - [&, - &agg_column = *input_agg_column, - &input_index_column = input_index_columns.at(idx)](auto input_type_desc_tag) { + agg_column.column_->type().data_type(), + [&, &agg_column = agg_column](auto input_type_desc_tag) { using input_type_info = ScalarTypeInfo; - // Again, only needed to generate valid code below, exception will have been thrown - // earlier at runtime if constexpr (is_aggregation_allowed( aggregation_operator )) { - schema::check( - !agg_column.column_->is_sparse() && - agg_column.column_->row_count() == input_index_column->row_count(), - "Not implemented yet: Cannot aggregate sparse column '{}' during " - "resampling.", - get_input_column_name().value - ); - auto index_data = input_index_column->data(); - const auto index_cend = index_data.template cend(); auto agg_data = agg_column.column_->data(); - auto agg_it = agg_data.template cbegin(); - for (auto index_it = index_data.template cbegin(); - index_it != index_cend && !reached_end_of_buckets; - ++index_it, ++agg_it) { - if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { - push_to_aggregator( - bucket_aggregator, *agg_it, agg_column - ); - bucket_has_values = true; - } else if (ARCTICDB_LIKELY(index_value_past_end_of_bucket( - *index_it, current_bucket.end() - )) && - output_it != output_end_it) { - if (bucket_has_values) { - *output_it = finalize_aggregator( - bucket_aggregator, string_pool - ); - ++output_it; + const auto run_iter = [&]() { + auto col_it = agg_data.template cbegin< + typename input_type_info::TDT, + IteratorType::ENUMERATED, + input_density>(); + const auto col_end = agg_data.template cend< + typename input_type_info::TDT, + IteratorType::ENUMERATED, + input_density>(); + for (; col_it != col_end && output_it != output_end; ++col_it) { + const auto idx = static_cast(col_it->idx()); + // Finalise any buckets whose exclusive end falls at or before this row. + while (output_it != output_end && + (col_idx > end_col_idx || + (col_idx == end_col_idx && idx >= end_col_offset))) { + advance_output(); } - // The following code is equivalent to: - // if constexpr (closed_boundary == ResampleBoundary::LEFT) { - // bucket_end_it = std::upper_bound(bucket_end_it, - // bucket_boundaries_end, *index_it); - // } else { - // bucket_end_it = std::upper_bound(bucket_end_it, - // bucket_boundaries_end, *index_it, std::less_equal{}); - // } - // bucket_start_it = std::prev(bucket_end_it); - // reached_end_of_buckets = bucket_end_it == bucket_boundaries_end; - // The above code will be more performant when the vast majority of buckets - // are empty See comment in ResampleClause::advance_boundary_past_value for - // mathematical and experimental bounds - ++bucket_start_it; - if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { - reached_end_of_buckets = true; - } else { - while (ARCTICDB_UNLIKELY( - index_value_past_end_of_bucket(*index_it, *bucket_end_it) - )) { - ++bucket_start_it; - if (ARCTICDB_UNLIKELY(++bucket_end_it == bucket_boundaries_end)) { - reached_end_of_buckets = true; - break; - } - } + if (output_it == output_end || col_idx < start_col_idx) { + break; } - if (ARCTICDB_LIKELY(!reached_end_of_buckets)) { - bucket_has_values = false; - current_bucket.set_boundaries(*bucket_start_it, *bucket_end_it); - if (ARCTICDB_LIKELY(current_bucket.contains(*index_it))) { - push_to_aggregator( - bucket_aggregator, *agg_it, agg_column - ); - bucket_has_values = true; - } + // Skip rows before the bucket's start (e.g., right-closed bucket excluding + // its leftmost edge, or date_range trimming). + if (col_idx == start_col_idx && idx < start_col_offset) { + continue; } + push_to_aggregator( + bucket_aggregator, col_it->value(), agg_column + ); } + }; + if (agg_column.column_->is_sparse()) { + run_iter.template operator()(); + } else { + run_iter.template operator()(); } } } ); } - } - // We were in the middle of aggregating a bucket when we ran out of index values - if (output_it != output_end_it) { - *output_it = finalize_aggregator(bucket_aggregator, string_pool); - ++output_it; + // The trailing bucket (whose exclusive end is past the last input column) is finalised here. + if (output_it != output_end) { + advance_output(); + } + util::check(output_it == output_end, "Resample aggregation finished without consuming all output rows"); + }; + if (res->is_sparse()) { + run.template operator()(); + } else { + run.template operator()(); } } }); @@ -320,18 +425,6 @@ std::optional SortedAggregator::ge } } -template -bool SortedAggregator::index_value_past_end_of_bucket( - timestamp index_value, timestamp bucket_end -) const { - if constexpr (closed_boundary == ResampleBoundary::LEFT) { - return index_value >= bucket_end; - } else { - // closed_boundary == ResampleBoundary::RIGHT - return index_value > bucket_end; - } -} - template class SortedAggregator; template class SortedAggregator; template class SortedAggregator; diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index 4087a018afb..377b25f7c71 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -22,26 +23,28 @@ namespace arcticdb { enum class ResampleBoundary { LEFT, RIGHT }; +// Identifies a row position within a sequence of sorted-row-slice index columns. `input_column_idx` indexes into the +// outer `input_index_columns` vector; `offset` is the row offset within that column. +struct ResampleInputCursor { + size_t input_column_idx; + size_t offset; +}; + +// Mapping from output row to input range. For output row i, the input values feeding it are +// `[mapping[i], mapping[i+1])` interpreted as a contiguous range across the input index columns. The vector has length +// N+1 where N is the number of non-empty buckets (= output rows). The trailing cursor is the one-past-the-end position. +using ResampleMapping = std::vector; + struct ISortedAggregator { template struct Interface : Base { [[nodiscard]] ColumnName get_input_column_name() const { return folly::poly_call<0>(*this); }; [[nodiscard]] ColumnName get_output_column_name() const { return folly::poly_call<1>(*this); }; [[nodiscard]] std::optional aggregate( - const std::vector>& input_index_columns, - const std::vector>& input_agg_columns, - const std::vector& bucket_boundaries, const Column& output_index_column, - StringPool& string_pool, ResampleBoundary label + const std::vector>& input_agg_columns, const ResampleMapping& mapping, + StringPool& string_pool ) const { - return folly::poly_call<2>( - *this, - input_index_columns, - input_agg_columns, - bucket_boundaries, - output_index_column, - string_pool, - label - ); + return folly::poly_call<2>(*this, input_agg_columns, mapping, string_pool); } void check_aggregator_supported_with_data_type(DataType data_type) const { folly::poly_call<3>(*this, data_type); @@ -89,6 +92,13 @@ class Bucket { timestamp end_; }; +template +std::pair, ResampleMapping> generate_output_index_column( + const std::vector>& input_index_columns, + const std::vector& bucket_boundaries, const TimestampRange& date_range, + ResampleBoundary label_boundary +); + enum class AggregationOperator { SUM, MEAN, MIN, MAX, FIRST, LAST, COUNT }; template @@ -387,26 +397,21 @@ class SortedAggregator { [[nodiscard]] ColumnName get_output_column_name() const { return output_column_name_; } [[nodiscard]] std::optional aggregate( - const std::vector>& input_index_columns, - const std::vector>& input_agg_columns, - const std::vector& bucket_boundaries, const Column& output_index_column, StringPool& string_pool, - ResampleBoundary label + const std::vector>& input_agg_columns, const ResampleMapping& mapping, + StringPool& string_pool ) const; void check_aggregator_supported_with_data_type(DataType data_type) const; [[nodiscard]] std::optional get_default_value(DataType common_input_data_type) const; [[nodiscard]] DataType generate_output_data_type(const DataType common_input_data_type) const; [[nodiscard]] std::optional generate_resampling_output_column( - const std::span> input_index_columns, - const std::span> input_agg_columns, - const Column& output_index_column, const ResampleBoundary label + std::span> input_agg_columns, const ResampleMapping& mapping ) const; private: [[nodiscard]] SortedAggregatorOutputColumnInfo generate_common_input_type(std::span< const std::optional>) const; - [[nodiscard]] bool index_value_past_end_of_bucket(timestamp index_value, timestamp bucket_end) const; template void push_to_aggregator( diff --git a/cpp/arcticdb/processing/test/benchmark_resample.cpp b/cpp/arcticdb/processing/test/benchmark_resample.cpp new file mode 100644 index 00000000000..8d24d766b3b --- /dev/null +++ b/cpp/arcticdb/processing/test/benchmark_resample.cpp @@ -0,0 +1,192 @@ +/* Copyright 2026 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software + * will be governed by the Apache License, version 2.0. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include + +using namespace arcticdb; +using namespace arcticdb::pipelines; +using namespace arcticdb::stream; + +// run like: --benchmark_time_unit=ms --benchmark_filter=BM_resample.* --benchmark_min_time=5x +// +// Isolates ResampleClause::process from the rest of the read pipeline. Segments are constructed in memory and +// fed directly into the clause, so timings reflect the cost of bucket-boundary advancement, output index +// construction, and per-column aggregation only. +// +// Args (per benchmark): {rows_per_segment, num_segments, num_buckets, num_value_cols} +// +// num_buckets together with the total row count selects the layout: +// * num_buckets <= num_segments -> each bucket spans several row-slices +// * num_segments < num_buckets <= total_rows -> many rows per bucket, all inside a single row-slice +// * num_buckets > total_rows -> bucket size smaller than the row spacing, most empty +// +// Row spacing is derived: if num_buckets > total_rows the rows are spread out so that 1ns buckets still +// cover the full data span; otherwise rows are 1ns apart. + +namespace { + +constexpr DataType kValueDataType = DataType::INT64; +const std::string kValuePrefix = "col_"; + +ResampleClause::BucketGeneratorT make_bucket_generator(std::vector boundaries) { + return [boundaries = std::move(boundaries)]( + timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, ResampleOrigin + ) { return boundaries; }; +} + +StreamDescriptor make_descriptor(size_t num_value_cols) { + std::vector fields; + fields.reserve(num_value_cols); + for (size_t c = 0; c < num_value_cols; ++c) { + fields.emplace_back(TypeDescriptor(kValueDataType, Dimension::Dim0), kValuePrefix + std::to_string(c)); + } + return TimeseriesIndex::default_index().create_stream_descriptor("Resample", std::move(fields)); +} + +std::shared_ptr make_segment( + const StreamDescriptor& desc, size_t num_rows, timestamp start_ts, timestamp step, size_t num_value_cols +) { + auto seg = std::make_shared( + desc, num_rows, AllocationType::PRESIZED, Sparsity::NOT_PERMITTED, std::nullopt + ); + std::vector index_data(num_rows); + for (size_t row = 0; row < num_rows; ++row) { + index_data[row] = start_ts + step * static_cast(row); + } + fill_dense_column_data(*seg, 0, index_data); + std::vector col_data(num_rows); + for (size_t c = 0; c < num_value_cols; ++c) { + for (size_t row = 0; row < num_rows; ++row) { + col_data[row] = static_cast((row + c) % 1024); + } + fill_dense_column_data(*seg, c + 1, col_data); + } + return seg; +} + +ResampleClause make_resample_clause( + std::vector bucket_boundaries, timestamp date_range_start, timestamp date_range_end, + const std::shared_ptr& component_manager, size_t num_value_cols +) { + ResampleClause clause{ + "dummy", ResampleBoundary::LEFT, make_bucket_generator(bucket_boundaries), 0, 0 + }; + clause.bucket_boundaries_ = std::move(bucket_boundaries); + clause.date_range_ = TimestampRange{date_range_start, date_range_end}; + clause.set_component_manager(component_manager); + clause.set_processing_config(ProcessingConfig{false, 0, IndexDescriptor::Type::TIMESTAMP}); + std::vector aggs; + aggs.reserve(num_value_cols); + for (size_t c = 0; c < num_value_cols; ++c) { + const auto col = kValuePrefix + std::to_string(c); + aggs.emplace_back("sum", col, col); + } + clause.set_aggregations(aggs); + return clause; +} + +struct Layout { + std::vector> segments; + std::vector bucket_boundaries; + timestamp date_range_start; + timestamp date_range_end; +}; + +Layout build_layout(size_t rows_per_segment, size_t num_segments, size_t num_buckets, size_t num_value_cols) { + const size_t total_rows = rows_per_segment * num_segments; + // Pick row_step so the data spans at least num_buckets ns. If num_buckets <= total_rows the rows are 1ns + // apart and buckets are bigger than 1ns. If num_buckets > total_rows the rows are spread out so 1ns + // buckets still cover everything, leaving most buckets empty. + const auto target_span = std::max(total_rows, num_buckets); + const auto row_step = static_cast(target_span / total_rows); + const auto total_span = static_cast(total_rows) * row_step; + const auto bucket_size = std::max(1, total_span / static_cast(num_buckets)); + + auto desc = make_descriptor(num_value_cols); + std::vector> segments; + segments.reserve(num_segments); + for (size_t s = 0; s < num_segments; ++s) { + const auto start_ts = static_cast(s * rows_per_segment) * row_step; + segments.push_back(make_segment(desc, rows_per_segment, start_ts, row_step, num_value_cols)); + } + + std::vector boundaries; + boundaries.reserve(num_buckets + 1); + for (timestamp b = 0; b <= total_span; b += bucket_size) { + boundaries.push_back(b); + } + if (boundaries.back() <= total_span - row_step) { + boundaries.push_back(boundaries.back() + bucket_size); + } + + return Layout{std::move(segments), std::move(boundaries), 0, total_span - row_step}; +} + +std::vector register_segments( + ComponentManager& component_manager, const std::vector>& segments, + size_t num_value_cols +) { + auto ids = component_manager.get_new_entity_ids(segments.size()); + size_t row_offset = 0; + for (size_t i = 0; i < segments.size(); ++i) { + const auto& seg = segments[i]; + const auto rows = seg->row_count(); + auto rr = std::make_shared(row_offset, row_offset + rows); + auto cr = std::make_shared(1, num_value_cols + 1); + component_manager.add_entity(ids[i], seg, rr, cr, EntityFetchCount{1}); + row_offset += rows; + } + return ids; +} + +} // namespace + +static void BM_resample(benchmark::State& state) { + const auto rows_per_segment = static_cast(state.range(0)); + const auto num_segments = static_cast(state.range(1)); + const auto num_buckets = static_cast(state.range(2)); + const auto num_value_cols = static_cast(state.range(3)); + + auto layout = build_layout(rows_per_segment, num_segments, num_buckets, num_value_cols); + + for (auto _ : state) { + state.PauseTiming(); + auto component_manager = std::make_shared(); + auto clause = make_resample_clause( + layout.bucket_boundaries, + layout.date_range_start, + layout.date_range_end, + component_manager, + num_value_cols + ); + auto ids = register_segments(*component_manager, layout.segments, num_value_cols); + state.ResumeTiming(); + auto out = clause.process(std::move(ids)); + benchmark::DoNotOptimize(out); + } +} + +// 1m rows total across all regimes; num_segments and num_buckets carry the variation. +// 1000 rows per bucket, all rows in a single row-slice. +BENCHMARK(BM_resample)->Args({100'000, 10, 1'000, 1})->Args({100'000, 10, 1'000, 100}); +// 100 rows per bucket, all rows in a single row-slice. +BENCHMARK(BM_resample)->Args({100'000, 10, 10'000, 1})->Args({100'000, 10, 10'000, 100}); +// 10 rows per bucket, all rows in a single row-slice. +BENCHMARK(BM_resample)->Args({100'000, 10, 100'000, 1})->Args({100'000, 10, 100'000, 100}); +// Each bucket spans several row-slices. +BENCHMARK(BM_resample)->Args({2'000, 500, 100, 1})->Args({2'000, 500, 100, 100}); +// Bucket size smaller than row spacing, most buckets empty. +BENCHMARK(BM_resample)->Args({100'000, 10, 10'000'000, 1})->Args({100'000, 10, 10'000'000, 100}); diff --git a/cpp/arcticdb/processing/test/test_resample.cpp b/cpp/arcticdb/processing/test/test_resample.cpp index 8606a06ca8c..fec832fbc1c 100644 --- a/cpp/arcticdb/processing/test/test_resample.cpp +++ b/cpp/arcticdb/processing/test/test_resample.cpp @@ -455,6 +455,20 @@ void assert_column_is_sparse(const Column& c) { ASSERT_TRUE(c.opt_sparse_map().has_value()); } +template +std::optional compute_output_column( + const AggregatorT& aggregator, std::span> input_index_columns, + std::span> input_agg_columns, + const std::vector& bucket_boundaries, ResampleBoundary label_boundary +) { + const std::vector> idx_cols(input_index_columns.begin(), input_index_columns.end()); + // Using `min()+1` to avoid underflow when calculating exclusive min timestamp in resample code. + const TimestampRange full_range{std::numeric_limits::min() + 1, std::numeric_limits::max()}; + auto [_, mapping] = + generate_output_index_column(idx_cols, bucket_boundaries, full_range, label_boundary); + return aggregator.generate_resampling_output_column(input_agg_columns, mapping); +} + // The aggregation operator does not matter for this case. Just pick one that's applicable to all column types. using AggregatorTypes = ::testing::Types< AggregatorAndLabel< @@ -474,47 +488,48 @@ TYPED_TEST(SortedAggregatorSparseStructure, NoMissingInputColumnsProducesDenseCo ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10, 20, 30, 40, 50}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10, 20, 30, 40, 50}; + // Values are monotonically increasing across the two input columns (sorted-input invariant) and span all 5 buckets. const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{1, 2, 3})), - std::make_shared(create_dense_column(std::array{11, 21, 31, 41})) + std::make_shared(create_dense_column(std::array{1, 2, 11, 12, 21})), + std::make_shared(create_dense_column(std::array{22, 31, 32, 41, 42})) }; const std::array input_agg_columns{ std::make_optional(ColumnWithStrings{ - create_dense_column>>(std::array{0, 5, 6}), + create_dense_column>>(std::array{0, 1, 2, 3, 4}), nullptr, "col1" }), std::make_optional(ColumnWithStrings{ - create_dense_column>>(std::array{10, 35, 56, 1, 2}), + create_dense_column>>(std::array{10, 11, 12, 13, 14}), nullptr, "col1" }) }; { - // Test single input column - const std::optional output = aggregator.generate_resampling_output_column( - std::span{input_index_columns.begin(), 1}, - std::span{input_agg_columns.begin(), 1}, - output_index_column, - label + // Test multiple input columns — values span all 5 buckets + const std::optional output = compute_output_column( + aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label ); EXPECT_TRUE(output.has_value()); assert_column_is_dense(*output); - ASSERT_EQ(output->row_count(), output_index.size()); + ASSERT_EQ(output->row_count(), 5); } { - // Test multiple input columns - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label + // Single input column — only the first column, which covers buckets 0, 1, and 2. + const std::optional output = compute_output_column( + aggregator, + std::span{input_index_columns.begin(), 1}, + std::span{input_agg_columns.begin(), 1}, + bucket_boundaries, + label ); EXPECT_TRUE(output.has_value()); assert_column_is_dense(*output); - ASSERT_EQ(output->row_count(), output_index.size()); + ASSERT_EQ(output->row_count(), 3); } } @@ -524,12 +539,11 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnExistSecondIsMissing) { ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10, 20}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10, 20}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 2, 3})), - std::make_shared(create_dense_column(std::array{11, 21, 22, 24})) + std::make_shared(create_dense_column(std::array{1, 2, 3})), + std::make_shared(create_dense_column(std::array{11, 12, 13, 14})) }; const std::array input_agg_columns{ std::make_optional(ColumnWithStrings{ @@ -539,14 +553,13 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnExistSecondIsMissing) { }), std::optional{} }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); ASSERT_TRUE(output.has_value()); assert_column_is_sparse(*output); const util::BitSet& sparse_map = output->sparse_map(); ASSERT_EQ(output->row_count(), 1); - ASSERT_EQ(output->last_row(), output_index.size() - 1); + ASSERT_EQ(output->last_row(), 1); ASSERT_EQ(sparse_map.size(), 2); ASSERT_EQ(sparse_map.count(), 1); ASSERT_EQ(sparse_map[0], true); @@ -560,13 +573,7 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnExistWithValueOnRightBoun }; constexpr ResampleBoundary label = TypeParam::label; constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; - const Column output_index_column = []() { - if constexpr (label == ResampleBoundary::LEFT) { - return create_dense_column(std::array{0, 10, 30}); - } else { - return create_dense_column(std::array{10, 20, 40}); - } - }(); + const std::vector bucket_boundaries{0, 10, 20, 30, 40}; const std::array input_index_columns{ std::make_shared(create_dense_column(std::array{0, 2, 10})), std::make_shared(create_dense_column(std::array{35, 36})) @@ -579,23 +586,26 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnExistWithValueOnRightBoun }), std::optional{} }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); ASSERT_TRUE(output.has_value()); assert_column_is_sparse(*output); const util::BitSet& sparse_map = output->sparse_map(); if constexpr (closed == ResampleBoundary::LEFT) { + // Buckets [0,10), [10,20), [30,40) are non-empty; [20,30) is dropped. + ASSERT_EQ(sparse_map.size(), 3); ASSERT_EQ(sparse_map.count(), 2); ASSERT_EQ(sparse_map[0], true); ASSERT_EQ(sparse_map[1], true); ASSERT_EQ(sparse_map[2], false); } else if constexpr (closed == ResampleBoundary::RIGHT) { + // Buckets (0,10] (containing 2 and 10) and (30,40] are non-empty; value 0 is not in any + // right-closed bucket and (10,20], (20,30] are empty. + ASSERT_EQ(sparse_map.size(), 2); ASSERT_EQ(sparse_map.count(), 1); ASSERT_EQ(sparse_map[0], true); ASSERT_EQ(sparse_map[1], false); - ASSERT_EQ(sparse_map[2], false); } } @@ -607,12 +617,11 @@ TYPED_TEST(SortedAggregatorSparseStructure, ReturnDenseInCaseOutputIndexIsFilled ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10, 20}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10, 20}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 2, 12})), - std::make_shared(create_dense_column(std::array{15, 16, 18, 20})) + std::make_shared(create_dense_column(std::array{1, 2, 12})), + std::make_shared(create_dense_column(std::array{15, 16, 18})) }; const std::array input_agg_columns{ std::make_optional(ColumnWithStrings{ @@ -622,12 +631,11 @@ TYPED_TEST(SortedAggregatorSparseStructure, ReturnDenseInCaseOutputIndexIsFilled }), std::optional{} }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); EXPECT_TRUE(output.has_value()); assert_column_is_dense(*output); - ASSERT_EQ(output->row_count(), output_index_column.row_count()); + ASSERT_EQ(output->row_count(), 2); } TYPED_TEST(SortedAggregatorSparseStructure, ReturnDenseInCaseOutputIndexIsFilledFirstColumnMissing) { @@ -638,11 +646,10 @@ TYPED_TEST(SortedAggregatorSparseStructure, ReturnDenseInCaseOutputIndexIsFilled ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10, 20}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10, 20}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 2, 5})), + std::make_shared(create_dense_column(std::array{1, 2, 5})), std::make_shared(create_dense_column(std::array{7, 8, 9, 15})) }; const std::array input_agg_columns{ @@ -653,12 +660,11 @@ TYPED_TEST(SortedAggregatorSparseStructure, ReturnDenseInCaseOutputIndexIsFilled "col1" }), }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); EXPECT_TRUE(output.has_value()); assert_column_is_dense(*output); - ASSERT_EQ(output->row_count(), output_index_column.row_count()); + ASSERT_EQ(output->row_count(), 2); } TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnIsMissing) { @@ -667,11 +673,10 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnIsMissing) { ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10, 20}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10, 20}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 2, 3})), + std::make_shared(create_dense_column(std::array{1, 2, 3})), std::make_shared(create_dense_column(std::array{11, 15, 16, 17})) }; const std::array input_agg_columns{ @@ -683,14 +688,13 @@ TYPED_TEST(SortedAggregatorSparseStructure, FirstColumnIsMissing) { }), }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); ASSERT_TRUE(output.has_value()); assert_column_is_sparse(*output); const util::BitSet& sparse_map = output->sparse_map(); ASSERT_EQ(output->row_count(), 1); - ASSERT_EQ(output->last_row(), output_index.size() - 1); + ASSERT_EQ(output->last_row(), 1); ASSERT_EQ(sparse_map.size(), 2); ASSERT_EQ(sparse_map.count(), 1); ASSERT_EQ(sparse_map[0], false); @@ -703,14 +707,13 @@ TYPED_TEST(SortedAggregatorSparseStructure, ThreeSegmentsInABucketMiddleIsMissin ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 1})), - std::make_shared(create_dense_column(std::array{2})), - std::make_shared(create_dense_column(std::array{3})) + std::make_shared(create_dense_column(std::array{1, 2})), + std::make_shared(create_dense_column(std::array{3})), + std::make_shared(create_dense_column(std::array{4})) }; const std::array input_agg_columns{ std::make_optional(ColumnWithStrings{ @@ -721,9 +724,8 @@ TYPED_TEST(SortedAggregatorSparseStructure, ThreeSegmentsInABucketMiddleIsMissin create_dense_column>>(std::array{3, 4}), nullptr, "col1" }), }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); ASSERT_TRUE(output.has_value()); assert_column_is_dense(*output); ASSERT_EQ(output->row_count(), 1); @@ -735,14 +737,13 @@ TYPED_TEST(SortedAggregatorSparseStructure, ThreeSegmentsInABuckeOnlyMiddleIsPre ColumnName{"input_column_name"}, ColumnName{"output_column_name"} }; constexpr ResampleBoundary label = TypeParam::label; - constexpr static std::array bucket_boundaries{0, 10}; - constexpr static std::array output_index = generate_labels(bucket_boundaries, label); - const Column output_index_column = create_dense_column(output_index); + constexpr ResampleBoundary closed = TypeParam::SortedAggregator::closed; + const std::vector bucket_boundaries{0, 10}; const std::array input_index_columns{ - std::make_shared(create_dense_column(std::array{0, 1})), - std::make_shared(create_dense_column(std::array{2})), - std::make_shared(create_dense_column(std::array{3})) + std::make_shared(create_dense_column(std::array{1, 2})), + std::make_shared(create_dense_column(std::array{3})), + std::make_shared(create_dense_column(std::array{4})) }; const std::array input_agg_columns{ std::optional{}, @@ -751,9 +752,8 @@ TYPED_TEST(SortedAggregatorSparseStructure, ThreeSegmentsInABuckeOnlyMiddleIsPre }), std::optional{} }; - const std::optional output = aggregator.generate_resampling_output_column( - input_index_columns, input_agg_columns, output_index_column, label - ); + const std::optional output = + compute_output_column(aggregator, input_index_columns, input_agg_columns, bucket_boundaries, label); ASSERT_TRUE(output.has_value()); assert_column_is_dense(*output); ASSERT_EQ(output->row_count(), 1); diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index f96d5e08b8b..db97101d4b3 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -4056,10 +4056,6 @@ def compact_data( Note that any fixed-width string columns that are compacted by this method will be coerced to dynamic UTF-8. - !!! warning - Compacting dynamic schema data can produce sparse data, even if the input data was dense, and resampling - does not yet support sparse data. - Parameters ---------- symbol : str diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index af7d72f9b07..92df81fe496 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -3259,10 +3259,6 @@ def compact_data( The metadata from the version being compacted is maintained with the newly created version. - !!! warning - Compacting dynamic schema data can produce sparse data, even if the input data was dense, and resampling - does not yet support sparse data. - Parameters ---------- symbol : str diff --git a/python/arcticdb/version_store/processing.py b/python/arcticdb/version_store/processing.py index 028f015d9d5..15b56e6e3be 100644 --- a/python/arcticdb/version_store/processing.py +++ b/python/arcticdb/version_store/processing.py @@ -736,10 +736,6 @@ def resample( Note that time-buckets which contain no index values in the symbol will NOT be included in the returned DataFrame. This is not the same as Pandas default behaviour. - Resampling is currently not supported with: - - * Dynamic schema where an aggregation column is missing from one or more of the row-slices. - * Sparse data. The resample results match pandas resample with `origin="epoch"`. We plan to add an 'origin' argument in a future release and will then change the default value to '"start_day"' to match the Pandas default. This @@ -787,9 +783,6 @@ def resample( * If the aggregation specified is not compatible with the type of the column being aggregated as specified above. - * The library has dynamic schema enabled, and at least one of the columns being aggregated is missing - from at least one row-slice. - * At least one of the columns being aggregated contains sparse data. UserInputException * `start`, `start_day`, `end`, `end_day` is used in conjunction with `date_range` diff --git a/python/tests/unit/arcticdb/version_store/test_arrow_sparse.py b/python/tests/unit/arcticdb/version_store/test_arrow_sparse.py index 650402a4adc..5e9dba6c193 100644 --- a/python/tests/unit/arcticdb/version_store/test_arrow_sparse.py +++ b/python/tests/unit/arcticdb/version_store/test_arrow_sparse.py @@ -693,6 +693,12 @@ def make_array(values, nulls, typ): assert_frame_equal_with_arrow_for_sparse(expected, received_pandas) +def _polars_agg_expr(col, op, alias=None): + # drop_nulls() is required to match pandas resampling behavior for `first` and `last`. + expr = getattr(pl.col(col).drop_nulls(), op)() + return expr.alias(alias) if alias is not None else expr + + def _assert_polars_equal_for_aggregation(received, expected, count_columns=None, check_row_order=True): if count_columns: # Polars groupby behaves differently to arcticdb groupby in two ways: @@ -766,39 +772,63 @@ def test_named_aggs(self, group_col): _check_query_result(self.lib, self.sym, q, expected, count_columns=count_columns, check_row_order=False) -@pytest.mark.xfail( - reason="Resample rejects sparse columns. (monday ref: 11679866800)", - raises=Exception, -) class TestSparseArrowResample: sym = "test_sparse_resample" - # TODO: Also add testing for: - # - Aggregating string, datetime columns - # - Other resampling kwargs like offset, origin + # TODO: `origin` = one of (start_day, end, end_day, explicit timestamp) doesn't have polars equvalent. + # Add sparse coverage for those origin values @pytest.fixture(autouse=True) def setup(self, in_memory_store_factory): self.lib = in_memory_store_factory(segment_row_size=4) self.lib.set_output_format(OutputFormat.POLARS) self.lib._set_allow_arrow_input() - # Bucket 2 (hours 6-8) is fully null for both columns when resampled at 3h + # First timestamp is 00:15 (mid-bucket) so `origin="start"` produces different bucket + # boundaries than the default `origin="epoch"`. + # Bucket 2 (hours 6-8) is fully null for every column when resampled at 3h + ts = pd.date_range("2025-01-01", periods=12, freq="h").to_list() + ts[0] = pd.Timestamp("2025-01-01 00:15") self.pldf = pl.DataFrame( { - "ts": pd.date_range("2025-01-01", periods=12, freq="h"), + "ts": ts, "int_col": [1, None, 3, None, 5, None, None, None, None, None, 11, None], "float_col": [None, 2.0, None, 4.0, None, 6.0, None, None, None, 10.0, None, 12.0], + "datetime_col": [ + None, + pd.Timestamp("2025-02-02"), + None, + pd.Timestamp("2025-02-04"), + None, + pd.Timestamp("2025-02-06"), + None, + None, + None, + pd.Timestamp("2025-02-10"), + None, + pd.Timestamp("2025-02-12"), + ], + "str_col": [None, "b", None, "d", None, "f", None, None, None, "j", None, "l"], }, - schema_overrides={"ts": pl.Datetime("ns")}, + schema_overrides={"ts": pl.Datetime("ns"), "datetime_col": pl.Datetime("ns")}, ) self.lib.write(self.sym, self.pldf, index_column=True) @pytest.mark.parametrize("agg_op", ["sum", "mean", "min", "max", "first", "last", "count"]) - @pytest.mark.parametrize("agg_col", ["int_col", "float_col"]) + @pytest.mark.parametrize("agg_col", ["int_col", "float_col", "datetime_col"]) @pytest.mark.parametrize("rule", ["3h", "6h"]) def test_single_agg(self, agg_op, agg_col, rule): + if agg_col == "datetime_col" and agg_op == "sum": + pytest.skip("sum is not a valid aggregation on a datetime column") q = QueryBuilder().resample(rule).agg({agg_col: agg_op}) - expected = self.pldf.group_by_dynamic("ts", every=rule).agg(getattr(pl.col(agg_col), agg_op)()) + expected = self.pldf.group_by_dynamic("ts", every=rule).agg(_polars_agg_expr(agg_col, agg_op)) + count_columns = [agg_col] if agg_op == "count" else None + _check_query_result(self.lib, self.sym, q, expected, count_columns=count_columns) + + @pytest.mark.parametrize("agg_op", ["first", "last"]) + @pytest.mark.parametrize("rule", ["3h", "6h"]) + def test_string_agg(self, agg_op, rule): + q = QueryBuilder().resample(rule).agg({"str_col": agg_op}) + expected = self.pldf.group_by_dynamic("ts", every=rule).agg(_polars_agg_expr("str_col", agg_op)) _check_query_result(self.lib, self.sym, q, expected) @pytest.mark.parametrize("rule", ["3h", "6h"]) @@ -808,11 +838,15 @@ def test_named_aggs(self, rule): "float_max": ("float_col", "max"), "int_count": ("int_col", "count"), "float_first": ("float_col", "first"), + "datetime_mean": ("datetime_col", "mean"), + "datetime_first": ("datetime_col", "first"), + "str_last": ("str_col", "last"), } q = QueryBuilder().resample(rule).agg(agg_dict) - exprs = [getattr(pl.col(col), op)().alias(name) for name, (col, op) in agg_dict.items()] + exprs = [_polars_agg_expr(col, op, name) for name, (col, op) in agg_dict.items()] expected = self.pldf.group_by_dynamic("ts", every=rule).agg(exprs) - _check_query_result(self.lib, self.sym, q, expected) + count_columns = [name for name, (_, op) in agg_dict.items() if op == "count"] or None + _check_query_result(self.lib, self.sym, q, expected, count_columns=count_columns) @pytest.mark.parametrize("closed", ["left", "right"]) @pytest.mark.parametrize("label", ["left", "right"]) @@ -821,6 +855,22 @@ def test_closed_label(self, closed, label): expected = self.pldf.group_by_dynamic("ts", every="3h", closed=closed, label=label).agg(pl.col("int_col").sum()) _check_query_result(self.lib, self.sym, q, expected) + # pandas/arcticdb use "min" as the minute unit; polars uses "m". Same offset, different spelling. + @pytest.mark.parametrize("pd_offset,pl_offset", [("30min", "30m"), ("1h", "1h")]) + @pytest.mark.parametrize("rule", ["3h", "6h"]) + def test_offset(self, pd_offset, pl_offset, rule): + q = QueryBuilder().resample(rule, offset=pd_offset).agg({"int_col": "sum"}) + expected = self.pldf.group_by_dynamic("ts", every=rule, offset=pl_offset).agg(pl.col("int_col").sum()) + _check_query_result(self.lib, self.sym, q, expected) + + @pytest.mark.parametrize("rule", ["3h", "6h"]) + def test_origin_start(self, rule): + # Uses the shared pldf — its first timestamp is 00:15 so bucket boundaries with + # origin="start" (00:15, 03:15, ...) differ from the default origin="epoch" (00:00, 03:00, ...). + q = QueryBuilder().resample(rule, origin="start").agg({"int_col": "sum"}) + expected = self.pldf.group_by_dynamic("ts", every=rule, start_by="datapoint").agg(pl.col("int_col").sum()) + _check_query_result(self.lib, self.sym, q, expected) + def test_with_date_range(self): start, end = pd.Timestamp("2025-01-01 03:00"), pd.Timestamp("2025-01-01 08:00") q = QueryBuilder().date_range((start, end)).resample("3h").agg({"int_col": "sum"}) @@ -829,6 +879,42 @@ def test_with_date_range(self): _check_query_result(self.lib, self.sym, q, expected) +@use_of_function_scoped_fixtures_in_hypothesis_checked +@settings(deadline=None) +@given( + data=st.data(), + rule=st.sampled_from(["1h", "2h", "3h", "6h"]), + agg_op=st.sampled_from(["sum", "mean", "min", "max", "count", "first", "last"]), +) +def test_sparse_polars_resample_hypothesis(in_memory_version_store_arrow, data, rule, agg_op): + # Draw a sorted list of unique minute-offsets within 5 days, then matching float values. + # Unique sorted offsets give strictly-increasing timestamps so bucket sizes vary naturally. + offsets = sorted( + data.draw(st.lists(st.integers(min_value=0, max_value=7200), min_size=1, max_size=500, unique=True)) + ) + float_values = data.draw( + st.lists( + st.one_of(st.none(), st.floats(min_value=0, max_value=1000, allow_nan=False)), + min_size=len(offsets), + max_size=len(offsets), + ) + ) + lib = in_memory_version_store_arrow + lib.set_output_format(OutputFormat.POLARS) + sym = "test_sparse_polars_resample_hypothesis" + base = pd.Timestamp("2025-01-01") + ts = [base + pd.Timedelta(minutes=int(m)) for m in offsets] + pldf = pl.DataFrame( + {"ts": ts, "float_col": float_values}, + schema_overrides={"ts": pl.Datetime("ns"), "float_col": pl.Float64}, + ) + lib.write(sym, pldf, index_column=True) + q = QueryBuilder().resample(rule).agg({"float_col": agg_op}) + count_columns = ["float_col"] if agg_op == "count" else None + expected = pldf.group_by_dynamic("ts", every=rule).agg(_polars_agg_expr("float_col", agg_op)) + _check_query_result(lib, sym, q, expected, count_columns=count_columns) + + class TestSparseArrowConcat: @pytest.fixture(autouse=True) def setup(self, mem_library_factory): @@ -899,10 +985,6 @@ def test_concat_with_index(self): expected = pl.concat([t1, t2]) polars_assert_frame_equal(received, expected) - @pytest.mark.xfail( - reason="Resample rejects sparse columns: sorted_aggregation.cpp 'Cannot aggregate column as it is sparse'", - raises=Exception, - ) def test_concat_with_resample(self): t1 = pl.DataFrame( {"ts": pd.date_range("2025-01-01", periods=6, freq="h"), "val": [1, None, 3, None, 5, None]}, diff --git a/python/tests/unit/arcticdb/version_store/test_resample.py b/python/tests/unit/arcticdb/version_store/test_resample.py index 42a74ffd8bb..2bc3c6e6f72 100644 --- a/python/tests/unit/arcticdb/version_store/test_resample.py +++ b/python/tests/unit/arcticdb/version_store/test_resample.py @@ -566,26 +566,6 @@ def test_resampling_unsupported_aggregation_type_combos(lmdb_version_store_v1, a lib.read(sym, query_builder=q) -def test_resampling_sparse_data(lmdb_version_store_v1, any_output_format): - lib = lmdb_version_store_v1 - lib._set_output_format_for_pipeline_tests(any_output_format) - sym = "test_resampling_sparse_data" - - # col_1 will be dense, but with fewer rows than the index column, and so semantically sparse - data = {"col_0": [np.nan, 1.0], "col_1": [2.0, np.nan]} - lib.write(sym, pd.DataFrame(data, index=[pd.Timestamp(0), pd.Timestamp(1000)]), sparsify_floats=True) - - q = QueryBuilder() - q = q.resample("us").agg({"col_0": "sum"}) - with pytest.raises(SchemaException): - lib.read(sym, query_builder=q) - - q = QueryBuilder() - q = q.resample("s").agg({"col_1": "sum"}) - with pytest.raises(SchemaException): - lib.read(sym, query_builder=q) - - def test_resampling_empty_type_column(lmdb_version_store_empty_types_v1, any_output_format): lib = lmdb_version_store_empty_types_v1 lib._set_output_format_for_pipeline_tests(any_output_format)