From 24f236bdc6f8f8c1637cedbb10171e2f4e35e5bf Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 6 Jun 2026 16:50:08 +0800 Subject: [PATCH 1/5] For performance, split to two fns --- parquet/src/arrow/arrow_writer/levels.rs | 207 +++++++++-------------- 1 file changed, 76 insertions(+), 131 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index e577cf73a807..f5c7f327c881 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -352,131 +352,103 @@ impl LevelInfoBuilder { return; } - // Fast path for "last-level list": when the child has no nested rep_levels, - // each child element produces exactly one rep_level entry. We can batch - // contiguous non-empty list slots into a single child.write() call, then - // fix up the rep_levels at list-slot boundaries using offsets directly. - // - // Kept as a separate function so the compiler can optimize write_list's + // Dispatch to separate functions so the compiler can optimize each // hot loop independently (function body size affects codegen quality). if is_last_level { - Self::write_list_last_level(child, ctx, offsets, nulls, range); - return; + Self::write_list_direct(child, ctx, offsets, nulls, range); + } else { + Self::write_list_scan(child, ctx, offsets, nulls, range); } + } - let offsets = &offsets[range.start..range.end + 1]; - - let write_non_null_slice = - |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| { - child.write(start_idx..end_idx); - child.visit_leaves(|leaf| { - let rep_levels = leaf.rep_levels.materialize_mut().unwrap(); - let mut rev = rep_levels.iter_mut().rev(); - let mut remaining = end_idx - start_idx; + /// Batch write for lists whose child has no nested repetition. + /// Each child element produces exactly one rep_level entry, so list-start + /// positions are directly computable from offsets. + fn write_list_direct( + child: &mut LevelInfoBuilder, + ctx: &LevelContext, + offsets: &[O], + nulls: Option<&NullBuffer>, + range: Range, + ) { + let list_start_rep = ctx.rep_level - 1; - loop { - let next = rev.next().unwrap(); - if *next > ctx.rep_level { - // Nested element - ignore - continue; - } + let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: &[O]| { + let values_start = run_offsets[0].as_usize(); + let values_end = run_offsets[run_offsets.len() - 1].as_usize(); - remaining -= 1; - if remaining == 0 { - *next = ctx.rep_level - 1; - break; - } - } - }) - }; + child.write(values_start..values_end); - // In a list column, each row falls into one of three categories: - // - "null": the list slot is absent (!is_valid), encoded at def_level - 2 - // - "empty": the list slot is present but has zero elements - // (offsets[i] == offsets[i+1]), encoded at def_level - 1 - // - non-empty: the list slot has child values, which are recursed into - // - // Consecutive runs of null or empty rows are batched and written together. - let write_null_run = |child: &mut LevelInfoBuilder, count: usize| { - if count > 0 { - child.visit_leaves(|leaf| { - leaf.append_rep_level_run(ctx.rep_level - 1, count); - leaf.append_def_level_run(ctx.def_level - 2, count); - }); - } + child.visit_leaves(|leaf| { + let rep_levels = leaf.rep_levels.materialize_mut().unwrap(); + let batch_len = values_end - values_start; + let batch_base = rep_levels.len() - batch_len; + for slot_offset in run_offsets.iter().take(run_offsets.len() - 1) { + let pos = batch_base + (slot_offset.as_usize() - values_start); + rep_levels[pos] = list_start_rep; + } + }); }; - let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| { - if count > 0 { - child.visit_leaves(|leaf| { - leaf.append_rep_level_run(ctx.rep_level - 1, count); - leaf.append_def_level_run(ctx.def_level - 1, count); - }); - } - }; + Self::write_list_impl(child, ctx, offsets, nulls, range, emit_non_empty_run); + } - match nulls { - Some(nulls) => { - let null_offset = range.start; - let mut pending_nulls: usize = 0; - let mut pending_empties: usize = 0; + /// Batch write for lists whose child has nested repetition. + /// After batch-writing child elements, scans backward through rep_levels + /// counting child-element starts to find and stamp slot boundaries. + fn write_list_scan( + child: &mut LevelInfoBuilder, + ctx: &LevelContext, + offsets: &[O], + nulls: Option<&NullBuffer>, + range: Range, + ) { + let list_start_rep = ctx.rep_level - 1; - // TODO: Faster bitmask iteration (#1757) - for (idx, w) in offsets.windows(2).enumerate() { - let is_valid = nulls.is_valid(idx + null_offset); - let start_idx = w[0].as_usize(); - let end_idx = w[1].as_usize(); + let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: &[O]| { + let values_start = run_offsets[0].as_usize(); + let values_end = run_offsets[run_offsets.len() - 1].as_usize(); - if !is_valid { - write_empty_run(child, pending_empties); - pending_empties = 0; - pending_nulls += 1; - } else if start_idx == end_idx { - write_null_run(child, pending_nulls); - pending_nulls = 0; - pending_empties += 1; - } else { - write_null_run(child, pending_nulls); - pending_nulls = 0; - write_empty_run(child, pending_empties); - pending_empties = 0; - write_non_null_slice(child, start_idx, end_idx); - } - } - write_null_run(child, pending_nulls); - write_empty_run(child, pending_empties); - } - None => { - let mut pending_empties: usize = 0; - for w in offsets.windows(2) { - let start_idx = w[0].as_usize(); - let end_idx = w[1].as_usize(); - if start_idx == end_idx { - pending_empties += 1; - } else { - write_empty_run(child, pending_empties); - pending_empties = 0; - write_non_null_slice(child, start_idx, end_idx); + child.write(values_start..values_end); + + // Backward scan: count child-element starts (rep <= ctx.rep_level) + // and stamp list_start_rep at slot boundaries. + child.visit_leaves(|leaf| { + let rep_levels = leaf.rep_levels.materialize_mut().unwrap(); + let mut seen = 0usize; + let mut next_slot_rev = run_offsets.len() - 2; + let mut next_stamp_at = values_end - run_offsets[next_slot_rev].as_usize(); + + for rep in rep_levels.iter_mut().rev() { + if *rep <= ctx.rep_level { + seen += 1; + if seen == next_stamp_at { + *rep = list_start_rep; + if next_slot_rev > 0 { + next_slot_rev -= 1; + next_stamp_at = values_end - run_offsets[next_slot_rev].as_usize(); + } else { + break; + } + } } } - write_empty_run(child, pending_empties); - } - } + }); + }; + + Self::write_list_impl(child, ctx, offsets, nulls, range, emit_non_empty_run); } - /// Optimized write path for lists whose child has no nested repetition levels. - /// - /// When the child is a leaf (or a struct of leaves), each child element maps to - /// exactly one rep_level entry. This lets us batch contiguous non-empty list - /// slots into a single `child.write()` call, then stamp the list-start markers - /// at positions computed directly from offsets — avoiding per-slot `write` + - /// reverse-scan overhead. - fn write_list_last_level( + /// Shared run-classification loop for write_list_direct and write_list_scan. + /// Monomorphized per `emit_non_empty_run` closure type, giving the compiler + /// separate optimization contexts for each backfill strategy. + fn write_list_impl( child: &mut LevelInfoBuilder, ctx: &LevelContext, offsets: &[O], nulls: Option<&NullBuffer>, range: Range, + mut emit_non_empty_run: impl FnMut(&mut LevelInfoBuilder, &[O]), ) { let null_offset = range.start; let offsets = &offsets[range.start..range.end + 1]; @@ -496,33 +468,6 @@ impl LevelInfoBuilder { }); }; - let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: &[O]| { - debug_assert!(run_offsets.len() >= 2); - let values_start = run_offsets[0].as_usize(); - let values_end = run_offsets[run_offsets.len() - 1].as_usize(); - debug_assert!(values_end > values_start); - - // Write all leaf values in one batch. Since the child has no nested - // rep, this emits (values_end - values_start) rep_levels all equal - // to ctx.rep_level (= "continuation within list"). - child.write(values_start..values_end); - - // The first element of each list slot needs rep_level = - // list_start_rep to mark a new list boundary. Because there's a 1:1 - // mapping between child elements and rep_level entries, the position - // of each slot's first element is directly computable from offsets. - child.visit_leaves(|leaf| { - let rep_levels = leaf.rep_levels.materialize_mut().unwrap(); - let batch_len = values_end - values_start; - let batch_base = rep_levels.len() - batch_len; - - for slot_offset in run_offsets.iter().take(run_offsets.len() - 1) { - let list_start_pos = batch_base + (slot_offset.as_usize() - values_start); - rep_levels[list_start_pos] = list_start_rep; - } - }); - }; - // Classify each slot, detect run boundaries, flush on transition. #[derive(Clone, Copy, PartialEq)] enum SlotKind { From d5764044db10157f8fbce4240cef47c7b24b4d55 Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 6 Jun 2026 17:21:00 +0800 Subject: [PATCH 2/5] More comments and checking --- parquet/src/arrow/arrow_writer/levels.rs | 61 ++++++++++++++++-------- 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index f5c7f327c881..93bb9eec4133 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -361,9 +361,6 @@ impl LevelInfoBuilder { } } - /// Batch write for lists whose child has no nested repetition. - /// Each child element produces exactly one rep_level entry, so list-start - /// positions are directly computable from offsets. fn write_list_direct( child: &mut LevelInfoBuilder, ctx: &LevelContext, @@ -374,12 +371,19 @@ impl LevelInfoBuilder { let list_start_rep = ctx.rep_level - 1; let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: &[O]| { + debug_assert!(run_offsets.len() >= 2); let values_start = run_offsets[0].as_usize(); let values_end = run_offsets[run_offsets.len() - 1].as_usize(); + debug_assert!(values_end > values_start); child.write(values_start..values_end); + // The first element of each list slot needs rep_level = + // list_start_rep to mark a new list boundary. Because there's a 1:1 + // mapping between child elements and rep_level entries, the position + // of each slot's first element is directly computable from offsets. child.visit_leaves(|leaf| { + debug_assert!(leaf.max_rep_level == ctx.rep_level); let rep_levels = leaf.rep_levels.materialize_mut().unwrap(); let batch_len = values_end - values_start; let batch_base = rep_levels.len() - batch_len; @@ -394,8 +398,11 @@ impl LevelInfoBuilder { } /// Batch write for lists whose child has nested repetition. + /// /// After batch-writing child elements, scans backward through rep_levels /// counting child-element starts to find and stamp slot boundaries. + /// + /// Scan backward because we don't know start offset before writing. fn write_list_scan( child: &mut LevelInfoBuilder, ctx: &LevelContext, @@ -406,29 +413,45 @@ impl LevelInfoBuilder { let list_start_rep = ctx.rep_level - 1; let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: &[O]| { + debug_assert!(run_offsets.len() >= 2); let values_start = run_offsets[0].as_usize(); let values_end = run_offsets[run_offsets.len() - 1].as_usize(); + debug_assert!(values_end > values_start); child.write(values_start..values_end); - // Backward scan: count child-element starts (rep <= ctx.rep_level) - // and stamp list_start_rep at slot boundaries. child.visit_leaves(|leaf| { let rep_levels = leaf.rep_levels.materialize_mut().unwrap(); - let mut seen = 0usize; - let mut next_slot_rev = run_offsets.len() - 2; - let mut next_stamp_at = values_end - run_offsets[next_slot_rev].as_usize(); - - for rep in rep_levels.iter_mut().rev() { - if *rep <= ctx.rep_level { - seen += 1; - if seen == next_stamp_at { - *rep = list_start_rep; - if next_slot_rev > 0 { - next_slot_rev -= 1; - next_stamp_at = values_end - run_offsets[next_slot_rev].as_usize(); - } else { - break; + + if leaf.max_rep_level == ctx.rep_level { + // This algorithm is same as write_list_direct. + // Use separate function because the branch code size would affect codegen + // quality of the hot loop of write_list_direct. + let batch_len = values_end - values_start; + let batch_base = rep_levels.len() - batch_len; + for slot_offset in run_offsets.iter().take(run_offsets.len() - 1) { + let pos = batch_base + (slot_offset.as_usize() - values_start); + rep_levels[pos] = list_start_rep; + } + } else { + // Backward scan: count child-element starts (rep <= ctx.rep_level) + // and stamp list_start_rep at slot boundaries. + let mut slot_bounds = run_offsets[..run_offsets.len() - 1].iter().rev(); + let mut next_stamp_at = + values_end - slot_bounds.next().unwrap().as_usize(); + let mut seen = 0usize; + + for rep in rep_levels.iter_mut().rev() { + if *rep <= ctx.rep_level { + seen += 1; + if seen == next_stamp_at { + *rep = list_start_rep; + match slot_bounds.next() { + Some(offset) => { + next_stamp_at = values_end - offset.as_usize() + } + None => break, + } } } } From 7f52901e4e71d2eb1de632e41d193003ce7b459b Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 6 Jun 2026 17:28:49 +0800 Subject: [PATCH 3/5] Enhancement --- parquet/src/arrow/arrow_writer/levels.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index 93bb9eec4133..2e8cf787b27b 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -361,6 +361,9 @@ impl LevelInfoBuilder { } } + /// Batch write for lists whose child not has nested repetition. + /// + /// direct means direct write the child rep-levels by offsets without scan. fn write_list_direct( child: &mut LevelInfoBuilder, ctx: &LevelContext, @@ -437,19 +440,18 @@ impl LevelInfoBuilder { // Backward scan: count child-element starts (rep <= ctx.rep_level) // and stamp list_start_rep at slot boundaries. let mut slot_bounds = run_offsets[..run_offsets.len() - 1].iter().rev(); - let mut next_stamp_at = - values_end - slot_bounds.next().unwrap().as_usize(); + let mut next_stamp_at = values_end - slot_bounds.next().unwrap().as_usize(); let mut seen = 0usize; for rep in rep_levels.iter_mut().rev() { + // This can uses `==`, since list write is recursive and the child is written + // before the parent. if *rep <= ctx.rep_level { seen += 1; if seen == next_stamp_at { *rep = list_start_rep; match slot_bounds.next() { - Some(offset) => { - next_stamp_at = values_end - offset.as_usize() - } + Some(offset) => next_stamp_at = values_end - offset.as_usize(), None => break, } } From 22f7991adc826c74dc7706804faf6f30147eb636 Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 6 Jun 2026 19:31:30 +0800 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- parquet/src/arrow/arrow_writer/levels.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index 2e8cf787b27b..6e9ebe79ff6e 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -361,9 +361,9 @@ impl LevelInfoBuilder { } } - /// Batch write for lists whose child not has nested repetition. + /// Batch write for lists whose child has no nested repetition. /// - /// direct means direct write the child rep-levels by offsets without scan. + /// "direct" means writing the child rep levels using offsets without scanning. fn write_list_direct( child: &mut LevelInfoBuilder, ctx: &LevelContext, @@ -427,8 +427,8 @@ impl LevelInfoBuilder { let rep_levels = leaf.rep_levels.materialize_mut().unwrap(); if leaf.max_rep_level == ctx.rep_level { - // This algorithm is same as write_list_direct. - // Use separate function because the branch code size would affect codegen + // This algorithm is the same as write_list_direct. + // Use a separate function because the branch code size would affect codegen // quality of the hot loop of write_list_direct. let batch_len = values_end - values_start; let batch_base = rep_levels.len() - batch_len; From 9e980e65a13bafca7114ac9fdffba05e732878d6 Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 6 Jun 2026 19:32:05 +0800 Subject: [PATCH 5/5] Update parquet/src/arrow/arrow_writer/levels.rs --- parquet/src/arrow/arrow_writer/levels.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index 6e9ebe79ff6e..80901105385b 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -444,8 +444,11 @@ impl LevelInfoBuilder { let mut seen = 0usize; for rep in rep_levels.iter_mut().rev() { + // Count element starts by skipping nested reps (rep > ctx.rep_level). + // // This can uses `==`, since list write is recursive and the child is written - // before the parent. + // before the parent. However, benchmark shows there is no differences + // between them, so uses `<=` here. if *rep <= ctx.rep_level { seen += 1; if seen == next_stamp_at {