diff --git a/cpp/arcticdb/pipeline/column_stats.cpp b/cpp/arcticdb/pipeline/column_stats.cpp index 2c9047bf253..17ae2e939fb 100644 --- a/cpp/arcticdb/pipeline/column_stats.cpp +++ b/cpp/arcticdb/pipeline/column_stats.cpp @@ -96,6 +96,10 @@ std::string type_to_operator_string(ColumnStatTypeInternal type) { return "v1_MIN"; case ColumnStatTypeInternal::MAX_V1: return "v1_MAX"; + case ColumnStatTypeInternal::NAN_COUNT_V1: + return "v1_NAN_COUNT"; + case ColumnStatTypeInternal::NULL_COUNT_V1: + return "v1_NULL_COUNT"; default: internal::raise("Unknown column stat type requested"); } @@ -160,6 +164,8 @@ ColumnStats::ColumnStats( switch (entry.type()) { case MIN_V1: case MAX_V1: + case NAN_COUNT_V1: + case NULL_COUNT_V1: external_type = ColumnStatType::MINMAX; break; case UNKNOWN: @@ -209,7 +215,10 @@ namespace { std::unordered_set external_to_internal(ColumnStatType type) { switch (type) { case ColumnStatType::MINMAX: - return {ColumnStatTypeInternal::MIN_V1, ColumnStatTypeInternal::MAX_V1}; + return {ColumnStatTypeInternal::MIN_V1, + ColumnStatTypeInternal::MAX_V1, + ColumnStatTypeInternal::NAN_COUNT_V1, + ColumnStatTypeInternal::NULL_COUNT_V1}; default: internal::raise("Unknown column stat type"); } @@ -294,7 +303,13 @@ std::optional ColumnStats::clause() const { ), ColumnName( to_segment_column_name(name_and_stat_types.mangled_name, ColumnStatTypeInternal::MAX_V1) - ) + ), + ColumnName(to_segment_column_name( + name_and_stat_types.mangled_name, ColumnStatTypeInternal::NAN_COUNT_V1 + )), + ColumnName(to_segment_column_name( + name_and_stat_types.mangled_name, ColumnStatTypeInternal::NULL_COUNT_V1 + )) )); break; default: diff --git a/cpp/arcticdb/processing/unsorted_aggregation.cpp b/cpp/arcticdb/processing/unsorted_aggregation.cpp index 48df8684cbe..7cb173f11fd 100644 --- a/cpp/arcticdb/processing/unsorted_aggregation.cpp +++ b/cpp/arcticdb/processing/unsorted_aggregation.cpp @@ -25,6 +25,15 @@ void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) { using type_info = ScalarTypeInfo; using RawType = typename type_info::RawType; if constexpr (!is_sequence_type(type_info::data_type)) { + // Sparse-map gaps are real nulls (e.g. from Arrow validity bitmaps) that the dense + // for_each below never visits. Count them from metadata so they reach null_count_. + if (input_column.column_->is_sparse()) { + const auto sparse_gap_count = + input_column.column_->last_row() + 1 - input_column.column_->row_count(); + if (sparse_gap_count > 0) { + null_count_ += static_cast(sparse_gap_count); + } + } auto is_nat_or_nan = []([[maybe_unused]] RawType v) { if constexpr (is_floating_point_type(type_info::data_type)) { return std::isnan(v); @@ -46,9 +55,15 @@ void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) { [[maybe_unused]] bool any_nan{false}; arcticdb::for_each(*input_column.column_, [&](auto value) { const auto& curr = static_cast(value); - if constexpr (is_floating_point_type(type_info::data_type) || is_time_type(type_info::data_type)) { - // Skip NaN/NaT as they don't generate a stable ordering + if constexpr (is_floating_point_type(type_info::data_type)) { + if (is_nat_or_nan(curr)) { + ++nan_count_; + any_nan = true; + return; + } + } else if constexpr (is_time_type(type_info::data_type)) { if (is_nat_or_nan(curr)) { + ++null_count_; any_nan = true; return; } @@ -78,8 +93,8 @@ void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) { SegmentInMemory MinMaxAggregatorData::finalize(const std::vector& output_column_names) const { internal::check( - output_column_names.size() == 2, - "Expected 2 output column names in MinMaxAggregatorData::finalize, but got {}", + output_column_names.size() == 4, + "Expected 4 output column names in MinMaxAggregatorData::finalize, but got {}", output_column_names.size() ); SegmentInMemory seg; @@ -93,6 +108,12 @@ SegmentInMemory MinMaxAggregatorData::finalize(const std::vector& ou auto max_col = std::make_shared(make_scalar_type(max_->data_type()), Sparsity::PERMITTED); max_col->push_back(max_->get()); + auto nan_count_col = std::make_shared(make_scalar_type(DataType::UINT64), Sparsity::PERMITTED); + nan_count_col->push_back(nan_count_); + + auto null_count_col = std::make_shared(make_scalar_type(DataType::UINT64), Sparsity::PERMITTED); + null_count_col->push_back(null_count_); + auto& entry_list = (*header.mutable_stats_by_column())[data_col_offset_]; auto* min_entry = entry_list.add_entries(); min_entry->set_stats_seg_offset(0); @@ -100,9 +121,17 @@ SegmentInMemory MinMaxAggregatorData::finalize(const std::vector& ou auto* max_entry = entry_list.add_entries(); max_entry->set_stats_seg_offset(1); max_entry->set_type(arcticc::pb2::column_stats_pb2::MAX_V1); + auto* nan_entry = entry_list.add_entries(); + nan_entry->set_stats_seg_offset(2); + nan_entry->set_type(arcticc::pb2::column_stats_pb2::NAN_COUNT_V1); + auto* null_entry = entry_list.add_entries(); + null_entry->set_stats_seg_offset(3); + null_entry->set_type(arcticc::pb2::column_stats_pb2::NULL_COUNT_V1); seg.add_column(scalar_field(min_col->type().data_type(), output_column_names[0].value), min_col); seg.add_column(scalar_field(max_col->type().data_type(), output_column_names[1].value), max_col); + seg.add_column(scalar_field(DataType::UINT64, output_column_names[2].value), nan_count_col); + seg.add_column(scalar_field(DataType::UINT64, output_column_names[3].value), null_count_col); }); } diff --git a/cpp/arcticdb/processing/unsorted_aggregation.hpp b/cpp/arcticdb/processing/unsorted_aggregation.hpp index 1290b363616..be7d4e07274 100644 --- a/cpp/arcticdb/processing/unsorted_aggregation.hpp +++ b/cpp/arcticdb/processing/unsorted_aggregation.hpp @@ -24,6 +24,8 @@ class MinMaxAggregatorData { private: std::optional min_; std::optional max_; + uint64_t nan_count_{0}; + uint64_t null_count_{0}; size_t data_col_offset_; }; @@ -31,18 +33,24 @@ class MinMaxAggregator { public: explicit MinMaxAggregator( ColumnName column_name, size_t data_col_offset, ColumnName output_column_name_min, - ColumnName output_column_name_max + ColumnName output_column_name_max, ColumnName output_column_name_nan_count, + ColumnName output_column_name_null_count ) : column_name_(std::move(column_name)), data_col_offset_(data_col_offset), output_column_name_min_(std::move(output_column_name_min)), - output_column_name_max_(std::move(output_column_name_max)) {} + output_column_name_max_(std::move(output_column_name_max)), + output_column_name_nan_count_(std::move(output_column_name_nan_count)), + output_column_name_null_count_(std::move(output_column_name_null_count)) {} ARCTICDB_MOVE_COPY_DEFAULT(MinMaxAggregator) [[nodiscard]] ColumnName get_input_column_name() const { return column_name_; } [[nodiscard]] std::vector get_output_column_names() const { - return {output_column_name_min_, output_column_name_max_}; + return {output_column_name_min_, + output_column_name_max_, + output_column_name_nan_count_, + output_column_name_null_count_}; } [[nodiscard]] MinMaxAggregatorData get_aggregator_data() const { return MinMaxAggregatorData(data_col_offset_); } @@ -51,6 +59,8 @@ class MinMaxAggregator { size_t data_col_offset_; ColumnName output_column_name_min_; ColumnName output_column_name_max_; + ColumnName output_column_name_nan_count_; + ColumnName output_column_name_null_count_; }; class AggregatorDataBase { diff --git a/cpp/proto/arcticc/pb2/column_stats.proto b/cpp/proto/arcticc/pb2/column_stats.proto index e1c6e049548..8a1e7d07d73 100644 --- a/cpp/proto/arcticc/pb2/column_stats.proto +++ b/cpp/proto/arcticc/pb2/column_stats.proto @@ -23,6 +23,8 @@ enum ColumnStatsType { // misinterpret the new statistics format. MIN_V1 = 1; MAX_V1 = 2; + NAN_COUNT_V1 = 3; + NULL_COUNT_V1 = 4; } message StatEntry { diff --git a/python/tests/unit/arcticdb/test_column_stats_creation.py b/python/tests/unit/arcticdb/test_column_stats_creation.py index 39cb85a7135..18233847604 100644 --- a/python/tests/unit/arcticdb/test_column_stats_creation.py +++ b/python/tests/unit/arcticdb/test_column_stats_creation.py @@ -55,6 +55,12 @@ def assert_stats_equal(received, expected): assert isinstance(received, pa.Table) assert isinstance(expected, pl.DataFrame) received_pl = pl.from_arrow(received) + # The C++ aggregator always emits v1_NAN_COUNT and v1_NULL_COUNT columns alongside MIN/MAX. + # Tests that aren't exercising the count behaviour omit those columns from `expected`; + # subselect `received` down to the expected columns so the comparison stays focused. + missing = set(expected.columns) - set(received_pl.columns) + assert not missing, f"Expected columns missing from received: {missing}" + received_pl = received_pl.select(expected.columns) pl_assert_frame_equal(received_pl, expected, check_column_order=False, check_dtypes=False) @@ -274,6 +280,70 @@ def test_column_stats_only_nat_values(lmdb_version_store, any_output_format): assert raw_stats["v1_MAX(col_1)"].values.view("int64")[0] == nat_sentinel +def test_column_stats_nan_and_null_counts(lmdb_version_store, any_output_format): + lib = lmdb_version_store + lib._set_output_format_for_pipeline_tests(any_output_format) + sym = "test_column_stats_nan_and_null_counts" + + # Each write/append produces a separate segment, so we get one row per dataframe in the stats. + # float_col counts toward v1_NAN_COUNT, ts_col counts toward v1_NULL_COUNT. + df0 = pd.DataFrame( + {"float_col": [1.0, 2.0], "ts_col": [pd.Timestamp("2020-01-01"), pd.Timestamp("2020-06-01")]}, + index=pd.date_range("2000-01-01", periods=2), + ) + df1 = pd.DataFrame( + {"float_col": [np.nan, 5.0], "ts_col": [pd.NaT, pd.Timestamp("2021-01-01")]}, + index=pd.date_range("2000-01-03", periods=2), + ) + df2 = pd.DataFrame( + {"float_col": [np.nan, np.nan], "ts_col": [pd.NaT, pd.NaT]}, + index=pd.date_range("2000-01-05", periods=2), + ) + df3 = pd.DataFrame( + {"float_col": [1.0, np.nan, 2.0], "ts_col": [pd.Timestamp("2022-01-01"), pd.NaT, pd.Timestamp("2022-06-01")]}, + index=pd.date_range("2000-01-07", periods=3), + ) + lib.write(sym, df0) + lib.append(sym, df1) + lib.append(sym, df2) + lib.append(sym, df3) + + column_stats_dict = {"float_col": {"MINMAX"}, "ts_col": {"MINMAX"}} + lib.create_column_stats(sym, column_stats_dict) + + expected_column_stats = index_columns_to_pl(lib, sym).with_columns( + pl.Series("v1_MIN(float_col)", [1.0, 5.0, np.nan, 1.0]), + pl.Series("v1_MAX(float_col)", [2.0, 5.0, np.nan, 2.0]), + pl.Series("v1_NAN_COUNT(float_col)", [0, 1, 2, 1], dtype=pl.UInt64), + pl.Series("v1_NULL_COUNT(float_col)", [0, 0, 0, 0], dtype=pl.UInt64), + pl.Series( + "v1_MIN(ts_col)", + [ + pd.Timestamp("2020-01-01").value, + pd.Timestamp("2021-01-01").value, + None, + pd.Timestamp("2022-01-01").value, + ], + dtype=pl.Int64, + ).cast(pl.Datetime("ns")), + pl.Series( + "v1_MAX(ts_col)", + [ + pd.Timestamp("2020-06-01").value, + pd.Timestamp("2021-01-01").value, + None, + pd.Timestamp("2022-06-01").value, + ], + dtype=pl.Int64, + ).cast(pl.Datetime("ns")), + pl.Series("v1_NAN_COUNT(ts_col)", [0, 0, 0, 0], dtype=pl.UInt64), + pl.Series("v1_NULL_COUNT(ts_col)", [0, 1, 2, 1], dtype=pl.UInt64), + ) + + column_stats = lib.read_column_stats(sym) + assert_stats_equal(column_stats, expected_column_stats) + + def test_column_stats_as_of(version_store_factory, lib_name, encoding_version, any_output_format): lib = version_store_factory( column_group_size=2, @@ -920,18 +990,29 @@ def test_column_stats_header_metadata(version_store_factory, lib_name, encoding_ sym = "test_column_stats_header_metadata" generate_symbol(lib, sym) + # MINMAX always emits 4 stat entries: MIN, MAX, NAN_COUNT, NULL_COUNT. + minmax_types = { + ColumnStatsType.MIN_V1, + ColumnStatsType.MAX_V1, + ColumnStatsType.NAN_COUNT_V1, + ColumnStatsType.NULL_COUNT_V1, + } + field_name_by_type = { + ColumnStatsType.MIN_V1: "v1_MIN", + ColumnStatsType.MAX_V1: "v1_MAX", + ColumnStatsType.NAN_COUNT_V1: "v1_NAN_COUNT", + ColumnStatsType.NULL_COUNT_V1: "v1_NULL_COUNT", + } + # Create stats for col_1 lib.create_column_stats(sym, {"col_1": {"MINMAX"}}) header = read_column_stats_header(lib, sym) assert header.version == 1 - assert header_stat_count(header) == 2 - assert header_stat_pairs(header) == { - (2, ColumnStatsType.MIN_V1), - (2, ColumnStatsType.MAX_V1), - } + assert header_stat_count(header) == 4 + assert header_stat_pairs(header) == {(2, t) for t in minmax_types} offsets = [entry.stats_seg_offset for _, entry in header_all_entries(header)] - assert len(set(offsets)) == 2 + assert len(set(offsets)) == 4 # Verify descriptor field names match the offsets lib_tool = lib.library_tool() @@ -939,25 +1020,17 @@ def test_column_stats_header_metadata(version_store_factory, lib_name, encoding_ fields = lib_tool.read_descriptor(keys[0]).fields() for _, entry in header_all_entries(header): field_name = fields[entry.stats_seg_offset].name - if entry.type == ColumnStatsType.MIN_V1: - assert field_name == "v1_MIN(col_1)" - else: - assert field_name == "v1_MAX(col_1)" + assert field_name == f"{field_name_by_type[entry.type]}(col_1)" # Create stats for col_2 over existing col_1 stats lib.create_column_stats(sym, {"col_2": {"MINMAX"}}) header = read_column_stats_header(lib, sym) assert header.version == 1 - assert header_stat_count(header) == 4 - assert header_stat_pairs(header) == { - (2, ColumnStatsType.MIN_V1), - (2, ColumnStatsType.MAX_V1), - (3, ColumnStatsType.MIN_V1), - (3, ColumnStatsType.MAX_V1), - } + assert header_stat_count(header) == 8 + assert header_stat_pairs(header) == {(2, t) for t in minmax_types} | {(3, t) for t in minmax_types} offsets = [entry.stats_seg_offset for _, entry in header_all_entries(header)] - assert len(set(offsets)) == 4 + assert len(set(offsets)) == 8 # Drop col_1 stats lib.drop_column_stats(sym, {"col_1": {"MINMAX"}}) @@ -966,20 +1039,14 @@ def test_column_stats_header_metadata(version_store_factory, lib_name, encoding_ assert header.version == 1 # if you change the structure, consider whether you need to change header.version too assert len(header.ListFields()) == 2 - assert header_stat_count(header) == 2 - assert header_stat_pairs(header) == { - (3, ColumnStatsType.MIN_V1), - (3, ColumnStatsType.MAX_V1), - } + assert header_stat_count(header) == 4 + assert header_stat_pairs(header) == {(3, t) for t in minmax_types} keys = lib_tool.find_keys_for_symbol(KeyType.COLUMN_STATS, sym) fields = lib_tool.read_descriptor(keys[0]).fields() for _, entry in header_all_entries(header): field_name = fields[entry.stats_seg_offset].name - if entry.type == ColumnStatsType.MIN_V1: - assert field_name == "v1_MIN(col_2)" - else: - assert field_name == "v1_MAX(col_2)" + assert field_name == f"{field_name_by_type[entry.type]}(col_2)" def test_column_stats_duplicated_column_names(version_store_factory, lib_name, encoding_version, any_output_format):