Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions cpp/arcticdb/pipeline/column_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ std::string type_to_operator_string(ColumnStatTypeInternal type) {
return "v1_MIN";
case ColumnStatTypeInternal::MAX_V1:
return "v1_MAX";
case ColumnStatTypeInternal::NAN_COUNT_V1:
return "v1_NAN_COUNT";
case ColumnStatTypeInternal::NULL_COUNT_V1:
return "v1_NULL_COUNT";
default:
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unknown column stat type requested");
}
Expand Down Expand Up @@ -160,6 +164,8 @@ ColumnStats::ColumnStats(
switch (entry.type()) {
case MIN_V1:
case MAX_V1:
case NAN_COUNT_V1:
case NULL_COUNT_V1:
external_type = ColumnStatType::MINMAX;
break;
case UNKNOWN:
Expand Down Expand Up @@ -209,7 +215,10 @@ namespace {
std::unordered_set<ColumnStatTypeInternal> external_to_internal(ColumnStatType type) {
switch (type) {
case ColumnStatType::MINMAX:
return {ColumnStatTypeInternal::MIN_V1, ColumnStatTypeInternal::MAX_V1};
return {ColumnStatTypeInternal::MIN_V1,
ColumnStatTypeInternal::MAX_V1,
ColumnStatTypeInternal::NAN_COUNT_V1,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

external_to_internal(MINMAX) now unconditionally returns 4 internal stat types. drop() calls this to construct the list of column names to remove (v1_MIN, v1_MAX, v1_NAN_COUNT, v1_NAT_COUNT). For column stats segments that were written by an older client (only v1_MIN and v1_MAX columns exist), dropping will produce names for columns that aren't in the segment.

Please verify what the downstream consumer of dropped_names does when asked to drop a non-existent column — if it raises, this is a forward-compatibility break that needs handling; if it silently ignores, please add a test that creates column stats with the old format and then drops them with the new client.

ColumnStatTypeInternal::NULL_COUNT_V1};
default:
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unknown column stat type");
}
Expand Down Expand Up @@ -294,7 +303,13 @@ std::optional<Clause> ColumnStats::clause() const {
),
ColumnName(
to_segment_column_name(name_and_stat_types.mangled_name, ColumnStatTypeInternal::MAX_V1)
)
),
ColumnName(to_segment_column_name(
name_and_stat_types.mangled_name, ColumnStatTypeInternal::NAN_COUNT_V1
)),
ColumnName(to_segment_column_name(
name_and_stat_types.mangled_name, ColumnStatTypeInternal::NULL_COUNT_V1
))
));
break;
default:
Expand Down
37 changes: 33 additions & 4 deletions cpp/arcticdb/processing/unsorted_aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) {
using type_info = ScalarTypeInfo<decltype(col_tag)>;
using RawType = typename type_info::RawType;
if constexpr (!is_sequence_type(type_info::data_type)) {
// Sparse-map gaps are real nulls (e.g. from Arrow validity bitmaps) that the dense
// for_each below never visits. Count them from metadata so they reach null_count_.
if (input_column.column_->is_sparse()) {
const auto sparse_gap_count =
input_column.column_->last_row() + 1 - input_column.column_->row_count();
if (sparse_gap_count > 0) {
null_count_ += static_cast<uint64_t>(sparse_gap_count);
}
}
Comment on lines +28 to +36
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sparse-map gap counting is a new behavioural code path, but no test in this PR exercises it — test_column_stats_nan_and_null_counts only uses dense pandas frames with in-band NaN/NaT, and test_column_stats_dynamic_schema_missing_data covers slices where the column is wholly absent (a different mechanism — the column isn't aggregated at all, so the expected stats are None rather than non-zero counts). Please add a test that produces a single segment with a sparse column containing gaps (e.g. via an Arrow input with a validity bitmap, or whatever existing fixture produces column->is_sparse() == true with last_row() + 1 > row_count()) and asserts the resulting v1_NULL_COUNT matches the number of gaps.

Also, please double-check the formula: last_row() + 1 - row_count() assumes last_row() (i.e. last_logical_row_) reflects the full logical length of the slice. If the trailing rows of a segment are null, last_logical_row_ may be set to the last present row rather than the segment's logical length, which would under-count trailing nulls. The test should cover that case explicitly.

auto is_nat_or_nan = []([[maybe_unused]] RawType v) {
if constexpr (is_floating_point_type(type_info::data_type)) {
return std::isnan(v);
Expand All @@ -46,9 +55,15 @@ void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) {
[[maybe_unused]] bool any_nan{false};
arcticdb::for_each<typename type_info::TDT>(*input_column.column_, [&](auto value) {
const auto& curr = static_cast<RawType>(value);
if constexpr (is_floating_point_type(type_info::data_type) || is_time_type(type_info::data_type)) {
// Skip NaN/NaT as they don't generate a stable ordering
if constexpr (is_floating_point_type(type_info::data_type)) {
if (is_nat_or_nan(curr)) {
++nan_count_;
any_nan = true;
return;
}
} else if constexpr (is_time_type(type_info::data_type)) {
if (is_nat_or_nan(curr)) {
++null_count_;
any_nan = true;
return;
}
Expand Down Expand Up @@ -78,8 +93,8 @@ void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) {

SegmentInMemory MinMaxAggregatorData::finalize(const std::vector<ColumnName>& output_column_names) const {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
output_column_names.size() == 2,
"Expected 2 output column names in MinMaxAggregatorData::finalize, but got {}",
output_column_names.size() == 4,
"Expected 4 output column names in MinMaxAggregatorData::finalize, but got {}",
output_column_names.size()
);
SegmentInMemory seg;
Expand All @@ -93,16 +108,30 @@ SegmentInMemory MinMaxAggregatorData::finalize(const std::vector<ColumnName>& ou
auto max_col = std::make_shared<Column>(make_scalar_type(max_->data_type()), Sparsity::PERMITTED);
max_col->push_back<RawType>(max_->get<RawType>());

auto nan_count_col = std::make_shared<Column>(make_scalar_type(DataType::UINT64), Sparsity::PERMITTED);
nan_count_col->push_back<uint64_t>(nan_count_);

auto null_count_col = std::make_shared<Column>(make_scalar_type(DataType::UINT64), Sparsity::PERMITTED);
null_count_col->push_back<uint64_t>(null_count_);

auto& entry_list = (*header.mutable_stats_by_column())[data_col_offset_];
auto* min_entry = entry_list.add_entries();
min_entry->set_stats_seg_offset(0);
min_entry->set_type(arcticc::pb2::column_stats_pb2::MIN_V1);
auto* max_entry = entry_list.add_entries();
max_entry->set_stats_seg_offset(1);
max_entry->set_type(arcticc::pb2::column_stats_pb2::MAX_V1);
auto* nan_entry = entry_list.add_entries();
nan_entry->set_stats_seg_offset(2);
nan_entry->set_type(arcticc::pb2::column_stats_pb2::NAN_COUNT_V1);
auto* null_entry = entry_list.add_entries();
null_entry->set_stats_seg_offset(3);
null_entry->set_type(arcticc::pb2::column_stats_pb2::NULL_COUNT_V1);

seg.add_column(scalar_field(min_col->type().data_type(), output_column_names[0].value), min_col);
seg.add_column(scalar_field(max_col->type().data_type(), output_column_names[1].value), max_col);
seg.add_column(scalar_field(DataType::UINT64, output_column_names[2].value), nan_count_col);
seg.add_column(scalar_field(DataType::UINT64, output_column_names[3].value), null_count_col);
});
}

Expand Down
16 changes: 13 additions & 3 deletions cpp/arcticdb/processing/unsorted_aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,33 @@ class MinMaxAggregatorData {
private:
std::optional<Value> min_;
std::optional<Value> max_;
uint64_t nan_count_{0};
uint64_t null_count_{0};
size_t data_col_offset_;
};

class MinMaxAggregator {
public:
explicit MinMaxAggregator(
ColumnName column_name, size_t data_col_offset, ColumnName output_column_name_min,
ColumnName output_column_name_max
ColumnName output_column_name_max, ColumnName output_column_name_nan_count,
ColumnName output_column_name_null_count
) :
column_name_(std::move(column_name)),
data_col_offset_(data_col_offset),
output_column_name_min_(std::move(output_column_name_min)),
output_column_name_max_(std::move(output_column_name_max)) {}
output_column_name_max_(std::move(output_column_name_max)),
output_column_name_nan_count_(std::move(output_column_name_nan_count)),
output_column_name_null_count_(std::move(output_column_name_null_count)) {}

ARCTICDB_MOVE_COPY_DEFAULT(MinMaxAggregator)

[[nodiscard]] ColumnName get_input_column_name() const { return column_name_; }
[[nodiscard]] std::vector<ColumnName> get_output_column_names() const {
return {output_column_name_min_, output_column_name_max_};
return {output_column_name_min_,
output_column_name_max_,
output_column_name_nan_count_,
output_column_name_null_count_};
}
[[nodiscard]] MinMaxAggregatorData get_aggregator_data() const { return MinMaxAggregatorData(data_col_offset_); }

Expand All @@ -51,6 +59,8 @@ class MinMaxAggregator {
size_t data_col_offset_;
ColumnName output_column_name_min_;
ColumnName output_column_name_max_;
ColumnName output_column_name_nan_count_;
ColumnName output_column_name_null_count_;
};

class AggregatorDataBase {
Expand Down
2 changes: 2 additions & 0 deletions cpp/proto/arcticc/pb2/column_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ enum ColumnStatsType {
// misinterpret the new statistics format.
MIN_V1 = 1;
MAX_V1 = 2;
NAN_COUNT_V1 = 3;
NULL_COUNT_V1 = 4;
}

message StatEntry {
Expand Down
121 changes: 94 additions & 27 deletions python/tests/unit/arcticdb/test_column_stats_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ def assert_stats_equal(received, expected):
assert isinstance(received, pa.Table)
assert isinstance(expected, pl.DataFrame)
received_pl = pl.from_arrow(received)
# The C++ aggregator always emits v1_NAN_COUNT and v1_NULL_COUNT columns alongside MIN/MAX.
# Tests that aren't exercising the count behaviour omit those columns from `expected`;
# subselect `received` down to the expected columns so the comparison stays focused.
missing = set(expected.columns) - set(received_pl.columns)
assert not missing, f"Expected columns missing from received: {missing}"
received_pl = received_pl.select(expected.columns)
pl_assert_frame_equal(received_pl, expected, check_column_order=False, check_dtypes=False)


Expand Down Expand Up @@ -274,6 +280,70 @@ def test_column_stats_only_nat_values(lmdb_version_store, any_output_format):
assert raw_stats["v1_MAX(col_1)"].values.view("int64")[0] == nat_sentinel


def test_column_stats_nan_and_null_counts(lmdb_version_store, any_output_format):
lib = lmdb_version_store
lib._set_output_format_for_pipeline_tests(any_output_format)
sym = "test_column_stats_nan_and_null_counts"

# Each write/append produces a separate segment, so we get one row per dataframe in the stats.
# float_col counts toward v1_NAN_COUNT, ts_col counts toward v1_NULL_COUNT.
df0 = pd.DataFrame(
{"float_col": [1.0, 2.0], "ts_col": [pd.Timestamp("2020-01-01"), pd.Timestamp("2020-06-01")]},
index=pd.date_range("2000-01-01", periods=2),
)
df1 = pd.DataFrame(
{"float_col": [np.nan, 5.0], "ts_col": [pd.NaT, pd.Timestamp("2021-01-01")]},
index=pd.date_range("2000-01-03", periods=2),
)
df2 = pd.DataFrame(
{"float_col": [np.nan, np.nan], "ts_col": [pd.NaT, pd.NaT]},
index=pd.date_range("2000-01-05", periods=2),
)
df3 = pd.DataFrame(
{"float_col": [1.0, np.nan, 2.0], "ts_col": [pd.Timestamp("2022-01-01"), pd.NaT, pd.Timestamp("2022-06-01")]},
index=pd.date_range("2000-01-07", periods=3),
)
lib.write(sym, df0)
lib.append(sym, df1)
lib.append(sym, df2)
lib.append(sym, df3)

column_stats_dict = {"float_col": {"MINMAX"}, "ts_col": {"MINMAX"}}
lib.create_column_stats(sym, column_stats_dict)

expected_column_stats = index_columns_to_pl(lib, sym).with_columns(
pl.Series("v1_MIN(float_col)", [1.0, 5.0, np.nan, 1.0]),
pl.Series("v1_MAX(float_col)", [2.0, 5.0, np.nan, 2.0]),
pl.Series("v1_NAN_COUNT(float_col)", [0, 1, 2, 1], dtype=pl.UInt64),
pl.Series("v1_NULL_COUNT(float_col)", [0, 0, 0, 0], dtype=pl.UInt64),
pl.Series(
"v1_MIN(ts_col)",
[
pd.Timestamp("2020-01-01").value,
pd.Timestamp("2021-01-01").value,
None,
pd.Timestamp("2022-01-01").value,
],
dtype=pl.Int64,
).cast(pl.Datetime("ns")),
pl.Series(
"v1_MAX(ts_col)",
[
pd.Timestamp("2020-06-01").value,
pd.Timestamp("2021-01-01").value,
None,
pd.Timestamp("2022-06-01").value,
],
dtype=pl.Int64,
).cast(pl.Datetime("ns")),
pl.Series("v1_NAN_COUNT(ts_col)", [0, 0, 0, 0], dtype=pl.UInt64),
pl.Series("v1_NULL_COUNT(ts_col)", [0, 1, 2, 1], dtype=pl.UInt64),
)

column_stats = lib.read_column_stats(sym)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new test only covers writing-then-reading with the same (new) client. Please add (or update an existing test) so that the all-NaN / all-NaT segments verify the new v1_NAN_COUNT / v1_NAT_COUNT values directly (e.g. extend test_column_stats_only_nat_values), and add a test for backwards compatibility — column stats written before this change (where only v1_MIN/v1_MAX columns exist) must still be readable, droppable, and mergeable by the new code path.

assert_stats_equal(column_stats, expected_column_stats)


def test_column_stats_as_of(version_store_factory, lib_name, encoding_version, any_output_format):
lib = version_store_factory(
column_group_size=2,
Expand Down Expand Up @@ -920,44 +990,47 @@ def test_column_stats_header_metadata(version_store_factory, lib_name, encoding_
sym = "test_column_stats_header_metadata"
generate_symbol(lib, sym)

# MINMAX always emits 4 stat entries: MIN, MAX, NAN_COUNT, NULL_COUNT.
minmax_types = {
ColumnStatsType.MIN_V1,
ColumnStatsType.MAX_V1,
ColumnStatsType.NAN_COUNT_V1,
ColumnStatsType.NULL_COUNT_V1,
}
field_name_by_type = {
ColumnStatsType.MIN_V1: "v1_MIN",
ColumnStatsType.MAX_V1: "v1_MAX",
ColumnStatsType.NAN_COUNT_V1: "v1_NAN_COUNT",
ColumnStatsType.NULL_COUNT_V1: "v1_NULL_COUNT",
}

# Create stats for col_1
lib.create_column_stats(sym, {"col_1": {"MINMAX"}})
header = read_column_stats_header(lib, sym)

assert header.version == 1
assert header_stat_count(header) == 2
assert header_stat_pairs(header) == {
(2, ColumnStatsType.MIN_V1),
(2, ColumnStatsType.MAX_V1),
}
assert header_stat_count(header) == 4
assert header_stat_pairs(header) == {(2, t) for t in minmax_types}
offsets = [entry.stats_seg_offset for _, entry in header_all_entries(header)]
assert len(set(offsets)) == 2
assert len(set(offsets)) == 4

# Verify descriptor field names match the offsets
lib_tool = lib.library_tool()
keys = lib_tool.find_keys_for_symbol(KeyType.COLUMN_STATS, sym)
fields = lib_tool.read_descriptor(keys[0]).fields()
for _, entry in header_all_entries(header):
field_name = fields[entry.stats_seg_offset].name
if entry.type == ColumnStatsType.MIN_V1:
assert field_name == "v1_MIN(col_1)"
else:
assert field_name == "v1_MAX(col_1)"
assert field_name == f"{field_name_by_type[entry.type]}(col_1)"

# Create stats for col_2 over existing col_1 stats
lib.create_column_stats(sym, {"col_2": {"MINMAX"}})
header = read_column_stats_header(lib, sym)

assert header.version == 1
assert header_stat_count(header) == 4
assert header_stat_pairs(header) == {
(2, ColumnStatsType.MIN_V1),
(2, ColumnStatsType.MAX_V1),
(3, ColumnStatsType.MIN_V1),
(3, ColumnStatsType.MAX_V1),
}
assert header_stat_count(header) == 8
assert header_stat_pairs(header) == {(2, t) for t in minmax_types} | {(3, t) for t in minmax_types}
offsets = [entry.stats_seg_offset for _, entry in header_all_entries(header)]
assert len(set(offsets)) == 4
assert len(set(offsets)) == 8

# Drop col_1 stats
lib.drop_column_stats(sym, {"col_1": {"MINMAX"}})
Expand All @@ -966,20 +1039,14 @@ def test_column_stats_header_metadata(version_store_factory, lib_name, encoding_
assert header.version == 1
# if you change the structure, consider whether you need to change header.version too
assert len(header.ListFields()) == 2
assert header_stat_count(header) == 2
assert header_stat_pairs(header) == {
(3, ColumnStatsType.MIN_V1),
(3, ColumnStatsType.MAX_V1),
}
assert header_stat_count(header) == 4
assert header_stat_pairs(header) == {(3, t) for t in minmax_types}

keys = lib_tool.find_keys_for_symbol(KeyType.COLUMN_STATS, sym)
fields = lib_tool.read_descriptor(keys[0]).fields()
for _, entry in header_all_entries(header):
field_name = fields[entry.stats_seg_offset].name
if entry.type == ColumnStatsType.MIN_V1:
assert field_name == "v1_MIN(col_2)"
else:
assert field_name == "v1_MAX(col_2)"
assert field_name == f"{field_name_by_type[entry.type]}(col_2)"


def test_column_stats_duplicated_column_names(version_store_factory, lib_name, encoding_version, any_output_format):
Expand Down
Loading