Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions cpp/arcticdb/pipeline/column_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ std::string type_to_operator_string(ColumnStatTypeInternal type) {
return "v1_MIN";
case ColumnStatTypeInternal::MAX_V1:
return "v1_MAX";
case ColumnStatTypeInternal::NAN_COUNT_V1:
return "v1_NAN_COUNT";
case ColumnStatTypeInternal::NAT_COUNT_V1:
return "v1_NAT_COUNT";
default:
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unknown column stat type requested");
}
Expand Down Expand Up @@ -160,6 +164,8 @@ ColumnStats::ColumnStats(
switch (entry.type()) {
case MIN_V1:
case MAX_V1:
case NAN_COUNT_V1:
case NAT_COUNT_V1:
external_type = ColumnStatType::MINMAX;
break;
case UNKNOWN:
Expand Down Expand Up @@ -209,7 +215,10 @@ namespace {
std::unordered_set<ColumnStatTypeInternal> external_to_internal(ColumnStatType type) {
switch (type) {
case ColumnStatType::MINMAX:
return {ColumnStatTypeInternal::MIN_V1, ColumnStatTypeInternal::MAX_V1};
return {ColumnStatTypeInternal::MIN_V1,
ColumnStatTypeInternal::MAX_V1,
ColumnStatTypeInternal::NAN_COUNT_V1,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

external_to_internal(MINMAX) now unconditionally returns 4 internal stat types. drop() calls this to construct the list of column names to remove (v1_MIN, v1_MAX, v1_NAN_COUNT, v1_NAT_COUNT). For column stats segments that were written by an older client (only v1_MIN and v1_MAX columns exist), dropping will produce names for columns that aren't in the segment.

Please verify what the downstream consumer of dropped_names does when asked to drop a non-existent column — if it raises, this is a forward-compatibility break that needs handling; if it silently ignores, please add a test that creates column stats with the old format and then drops them with the new client.

ColumnStatTypeInternal::NAT_COUNT_V1};
default:
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unknown column stat type");
}
Expand Down Expand Up @@ -294,7 +303,13 @@ std::optional<Clause> ColumnStats::clause() const {
),
ColumnName(
to_segment_column_name(name_and_stat_types.mangled_name, ColumnStatTypeInternal::MAX_V1)
)
),
ColumnName(to_segment_column_name(
name_and_stat_types.mangled_name, ColumnStatTypeInternal::NAN_COUNT_V1
)),
ColumnName(to_segment_column_name(
name_and_stat_types.mangled_name, ColumnStatTypeInternal::NAT_COUNT_V1
))
));
break;
default:
Expand Down
28 changes: 24 additions & 4 deletions cpp/arcticdb/processing/unsorted_aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,15 @@ void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) {
[[maybe_unused]] bool any_nan{false};
arcticdb::for_each<typename type_info::TDT>(*input_column.column_, [&](auto value) {
const auto& curr = static_cast<RawType>(value);
if constexpr (is_floating_point_type(type_info::data_type) || is_time_type(type_info::data_type)) {
// Skip NaN/NaT as they don't generate a stable ordering
if constexpr (is_floating_point_type(type_info::data_type)) {
if (is_nat_or_nan(curr)) {
++nan_count_;
any_nan = true;
return;
}
} else if constexpr (is_time_type(type_info::data_type)) {
if (is_nat_or_nan(curr)) {
++nat_count_;
any_nan = true;
return;
}
Expand Down Expand Up @@ -78,8 +84,8 @@ void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) {

SegmentInMemory MinMaxAggregatorData::finalize(const std::vector<ColumnName>& output_column_names) const {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
output_column_names.size() == 2,
"Expected 2 output column names in MinMaxAggregatorData::finalize, but got {}",
output_column_names.size() == 4,
"Expected 4 output column names in MinMaxAggregatorData::finalize, but got {}",
output_column_names.size()
);
SegmentInMemory seg;
Expand All @@ -93,16 +99,30 @@ SegmentInMemory MinMaxAggregatorData::finalize(const std::vector<ColumnName>& ou
auto max_col = std::make_shared<Column>(make_scalar_type(max_->data_type()), Sparsity::PERMITTED);
max_col->push_back<RawType>(max_->get<RawType>());

auto nan_count_col = std::make_shared<Column>(make_scalar_type(DataType::UINT64), Sparsity::PERMITTED);
nan_count_col->push_back<uint64_t>(nan_count_);

auto nat_count_col = std::make_shared<Column>(make_scalar_type(DataType::UINT64), Sparsity::PERMITTED);
nat_count_col->push_back<uint64_t>(nat_count_);

auto& entry_list = (*header.mutable_stats_by_column())[data_col_offset_];
auto* min_entry = entry_list.add_entries();
min_entry->set_stats_seg_offset(0);
min_entry->set_type(arcticc::pb2::column_stats_pb2::MIN_V1);
auto* max_entry = entry_list.add_entries();
max_entry->set_stats_seg_offset(1);
max_entry->set_type(arcticc::pb2::column_stats_pb2::MAX_V1);
auto* nan_entry = entry_list.add_entries();
nan_entry->set_stats_seg_offset(2);
nan_entry->set_type(arcticc::pb2::column_stats_pb2::NAN_COUNT_V1);
auto* nat_entry = entry_list.add_entries();
nat_entry->set_stats_seg_offset(3);
nat_entry->set_type(arcticc::pb2::column_stats_pb2::NAT_COUNT_V1);

seg.add_column(scalar_field(min_col->type().data_type(), output_column_names[0].value), min_col);
seg.add_column(scalar_field(max_col->type().data_type(), output_column_names[1].value), max_col);
seg.add_column(scalar_field(DataType::UINT64, output_column_names[2].value), nan_count_col);
seg.add_column(scalar_field(DataType::UINT64, output_column_names[3].value), nat_count_col);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finalize now unconditionally writes 4 columns (min, max, nan_count, nat_count) and 4 header entries for the MINMAX stat. This changes the on-disk shape of every MINMAX column-stats segment.

Two concerns that should be addressed before merge:

  1. Backwards-compatibility test missing. There is no test that creates column stats with a pinned older ArcticDB version and then reads/drops/recreates them with this new client. Old segments only contain 2 stat columns; merge_column_stats_segments and ColumnStats::ColumnStats(header, tsd) should be exercised against that shape.
  2. Old client reading new data. Although the proto comment relies on the UNKNOWN = 0 fallback in proto3 for forward-compat, the new client also adds two extra real columns to the segment. Please confirm that an older release reading a segment with the extra v1_NAN_COUNT/v1_NAT_COUNT fields doesn't fail descriptor/field-count assertions, and document the result in the PR description.

});
}

Expand Down
16 changes: 13 additions & 3 deletions cpp/arcticdb/processing/unsorted_aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,33 @@ class MinMaxAggregatorData {
private:
std::optional<Value> min_;
std::optional<Value> max_;
uint64_t nan_count_{0};
uint64_t nat_count_{0};
size_t data_col_offset_;
};

class MinMaxAggregator {
public:
explicit MinMaxAggregator(
ColumnName column_name, size_t data_col_offset, ColumnName output_column_name_min,
ColumnName output_column_name_max
ColumnName output_column_name_max, ColumnName output_column_name_nan_count,
ColumnName output_column_name_nat_count
) :
column_name_(std::move(column_name)),
data_col_offset_(data_col_offset),
output_column_name_min_(std::move(output_column_name_min)),
output_column_name_max_(std::move(output_column_name_max)) {}
output_column_name_max_(std::move(output_column_name_max)),
output_column_name_nan_count_(std::move(output_column_name_nan_count)),
output_column_name_nat_count_(std::move(output_column_name_nat_count)) {}

ARCTICDB_MOVE_COPY_DEFAULT(MinMaxAggregator)

[[nodiscard]] ColumnName get_input_column_name() const { return column_name_; }
[[nodiscard]] std::vector<ColumnName> get_output_column_names() const {
return {output_column_name_min_, output_column_name_max_};
return {output_column_name_min_,
output_column_name_max_,
output_column_name_nan_count_,
output_column_name_nat_count_};
}
[[nodiscard]] MinMaxAggregatorData get_aggregator_data() const { return MinMaxAggregatorData(data_col_offset_); }

Expand All @@ -51,6 +59,8 @@ class MinMaxAggregator {
size_t data_col_offset_;
ColumnName output_column_name_min_;
ColumnName output_column_name_max_;
ColumnName output_column_name_nan_count_;
ColumnName output_column_name_nat_count_;
};

class AggregatorDataBase {
Expand Down
2 changes: 2 additions & 0 deletions cpp/proto/arcticc/pb2/column_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ enum ColumnStatsType {
// misinterpret the new statistics format.
MIN_V1 = 1;
MAX_V1 = 2;
NAN_COUNT_V1 = 3;
NAT_COUNT_V1 = 4;
}

message StatEntry {
Expand Down
64 changes: 64 additions & 0 deletions python/tests/unit/arcticdb/test_column_stats_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,70 @@ def test_column_stats_only_nat_values(lmdb_version_store, any_output_format):
assert raw_stats["v1_MAX(col_1)"].values.view("int64")[0] == nat_sentinel


def test_column_stats_nan_and_nat_counts(lmdb_version_store, any_output_format):
lib = lmdb_version_store
lib._set_output_format_for_pipeline_tests(any_output_format)
sym = "test_column_stats_nan_and_nat_counts"

# Each write/append produces a separate segment, so we get one row per dataframe in the stats.
# float_col counts toward v1_NAN_COUNT, ts_col counts toward v1_NAT_COUNT.
df0 = pd.DataFrame(
{"float_col": [1.0, 2.0], "ts_col": [pd.Timestamp("2020-01-01"), pd.Timestamp("2020-06-01")]},
index=pd.date_range("2000-01-01", periods=2),
)
df1 = pd.DataFrame(
{"float_col": [np.nan, 5.0], "ts_col": [pd.NaT, pd.Timestamp("2021-01-01")]},
index=pd.date_range("2000-01-03", periods=2),
)
df2 = pd.DataFrame(
{"float_col": [np.nan, np.nan], "ts_col": [pd.NaT, pd.NaT]},
index=pd.date_range("2000-01-05", periods=2),
)
df3 = pd.DataFrame(
{"float_col": [1.0, np.nan, 2.0], "ts_col": [pd.Timestamp("2022-01-01"), pd.NaT, pd.Timestamp("2022-06-01")]},
index=pd.date_range("2000-01-07", periods=3),
)
lib.write(sym, df0)
lib.append(sym, df1)
lib.append(sym, df2)
lib.append(sym, df3)

column_stats_dict = {"float_col": {"MINMAX"}, "ts_col": {"MINMAX"}}
lib.create_column_stats(sym, column_stats_dict)

expected_column_stats = index_columns_to_pl(lib, sym).with_columns(
pl.Series("v1_MIN(float_col)", [1.0, 5.0, np.nan, 1.0]),
pl.Series("v1_MAX(float_col)", [2.0, 5.0, np.nan, 2.0]),
pl.Series("v1_NAN_COUNT(float_col)", [0, 1, 2, 1], dtype=pl.UInt64),
pl.Series("v1_NAT_COUNT(float_col)", [0, 0, 0, 0], dtype=pl.UInt64),
pl.Series(
"v1_MIN(ts_col)",
[
pd.Timestamp("2020-01-01").value,
pd.Timestamp("2021-01-01").value,
None,
pd.Timestamp("2022-01-01").value,
],
dtype=pl.Int64,
).cast(pl.Datetime("ns")),
pl.Series(
"v1_MAX(ts_col)",
[
pd.Timestamp("2020-06-01").value,
pd.Timestamp("2021-01-01").value,
None,
pd.Timestamp("2022-06-01").value,
],
dtype=pl.Int64,
).cast(pl.Datetime("ns")),
pl.Series("v1_NAN_COUNT(ts_col)", [0, 0, 0, 0], dtype=pl.UInt64),
pl.Series("v1_NAT_COUNT(ts_col)", [0, 1, 2, 1], dtype=pl.UInt64),
)

column_stats = lib.read_column_stats(sym)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new test only covers writing-then-reading with the same (new) client. Please add (or update an existing test) so that the all-NaN / all-NaT segments verify the new v1_NAN_COUNT / v1_NAT_COUNT values directly (e.g. extend test_column_stats_only_nat_values), and add a test for backwards compatibility — column stats written before this change (where only v1_MIN/v1_MAX columns exist) must still be readable, droppable, and mergeable by the new code path.

assert_stats_equal(column_stats, expected_column_stats)


def test_column_stats_as_of(version_store_factory, lib_name, encoding_version, any_output_format):
lib = version_store_factory(
column_group_size=2,
Expand Down
Loading