Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,7 @@ if(${TEST})
processing/test/benchmark_binary.cpp
processing/test/benchmark_clause.cpp
processing/test/benchmark_common.cpp
processing/test/benchmark_resample.cpp
processing/test/benchmark_ternary.cpp
util/test/benchmark_bitset.cpp
version/test/benchmark_write.cpp
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/column_store/column_algorithms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

#pragma once

#include <algorithm>
#include <concepts>
#include <functional>
#include <arcticdb/column_store/column.hpp>
#include <arcticdb/util/lambda_inlining.hpp>

Expand Down
5 changes: 0 additions & 5 deletions cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,6 @@ struct ResampleClause {
std::vector<timestamp> generate_bucket_boundaries(
timestamp first_ts, timestamp last_ts, bool responsible_for_first_overlapping_bucket
) const;

std::shared_ptr<Column> generate_output_index_column(
const std::vector<std::shared_ptr<Column>>& input_index_columns,
const std::vector<timestamp>& bucket_boundaries
) const;
};

template<typename T>
Expand Down
80 changes: 4 additions & 76 deletions cpp/arcticdb/processing/clause_resample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,9 @@ std::vector<EntityId> ResampleClause<closed_boundary>::process(std::vector<Entit
for (const auto& row_slice : row_slices) {
input_index_columns.emplace_back(row_slice.segments_->at(0)->column_ptr(0));
}
const auto output_index_column = generate_output_index_column(input_index_columns, bucket_boundaries);
// Bucket boundaries can be wider than the date range specified by the user, narrow the first and last buckets
// here if necessary
bucket_boundaries.front(
) = std::max(bucket_boundaries.front(), date_range_->first - (closed_boundary == ResampleBoundary::RIGHT ? 1 : 0));
bucket_boundaries.back(
) = std::min(bucket_boundaries.back(), date_range_->second + (closed_boundary == ResampleBoundary::LEFT ? 1 : 0));
const auto [output_index_column, mapping] = generate_output_index_column<closed_boundary>(
input_index_columns, bucket_boundaries, *date_range_, label_boundary_
);
SegmentInMemory seg;
RowRange output_row_range(
row_slices.front().row_ranges_->at(0)->start(),
Expand Down Expand Up @@ -369,14 +365,7 @@ std::vector<EntityId> ResampleClause<closed_boundary>::process(std::vector<Entit
}
);
}
std::optional<Column> aggregated = aggregator.aggregate(
input_index_columns,
input_agg_columns,
bucket_boundaries,
*output_index_column,
string_pool,
label_boundary_
);
std::optional<Column> aggregated = aggregator.aggregate(input_agg_columns, mapping, string_pool);
if (aggregated) {
seg.add_column(
scalar_field(aggregated->type().data_type(), aggregator.get_output_column_name().value),
Expand Down Expand Up @@ -435,67 +424,6 @@ std::vector<timestamp> ResampleClause<closed_boundary>::generate_bucket_boundari
return bucket_boundaries;
}

template<ResampleBoundary closed_boundary>
std::shared_ptr<Column> ResampleClause<closed_boundary>::generate_output_index_column(
const std::vector<std::shared_ptr<Column>>& input_index_columns, const std::vector<timestamp>& bucket_boundaries
) const {
constexpr auto data_type = DataType::NANOSECONDS_UTC64;
using IndexTDT = ScalarTagType<DataTypeTag<data_type>>;

const auto max_index_column_bytes = (bucket_boundaries.size() - 1) * get_type_size(data_type);
auto output_index_column = std::make_shared<Column>(
TypeDescriptor(data_type, Dimension::Dim0),
Sparsity::NOT_PERMITTED,
ChunkedBuffer::presized_in_blocks(max_index_column_bytes)
);
auto output_index_column_data = output_index_column->data();
auto output_index_column_it = output_index_column_data.template begin<IndexTDT>();
size_t output_index_column_row_count{0};

auto bucket_end_it = std::next(bucket_boundaries.cbegin());
Bucket<closed_boundary> current_bucket{*std::prev(bucket_end_it), *bucket_end_it};
bool current_bucket_added_to_index{false};
// Only include buckets that have at least one index value in range
for (const auto& input_index_column : input_index_columns) {
auto index_column_data = input_index_column->data();
const auto cend = index_column_data.cend<IndexTDT>();
auto it = index_column_data.cbegin<IndexTDT>();
// In case the passed date_range does not span the whole segment we need to skip the index values
// which are before the date range start.
while (it != cend && *it < date_range_->first) {
++it;
}
for (; it != cend && *it <= date_range_->second; ++it) {
if (ARCTICDB_LIKELY(current_bucket.contains(*it))) {
if (ARCTICDB_UNLIKELY(!current_bucket_added_to_index)) {
*output_index_column_it++ =
label_boundary_ == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it;
++output_index_column_row_count;
current_bucket_added_to_index = true;
}
} else {
advance_boundary_past_value<closed_boundary>(bucket_boundaries, bucket_end_it, *it);
if (ARCTICDB_UNLIKELY(bucket_end_it == bucket_boundaries.end())) {
break;
} else {
current_bucket.set_boundaries(*std::prev(bucket_end_it), *bucket_end_it);
current_bucket_added_to_index = false;
if (ARCTICDB_LIKELY(current_bucket.contains(*it))) {
*output_index_column_it++ =
label_boundary_ == ResampleBoundary::LEFT ? *std::prev(bucket_end_it) : *bucket_end_it;
++output_index_column_row_count;
current_bucket_added_to_index = true;
}
}
}
}
}
const auto actual_index_column_bytes = output_index_column_row_count * get_type_size(data_type);
output_index_column->buffer().trim(actual_index_column_bytes);
output_index_column->set_row_data(output_index_column_row_count - 1);
return output_index_column;
}

template struct ResampleClause<ResampleBoundary::LEFT>;
template struct ResampleClause<ResampleBoundary::RIGHT>;

Expand Down
Loading
Loading