Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 54 additions & 67 deletions datafusion/functions/benches/left_right.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,40 @@
// under the License.

use std::hint::black_box;
use std::ops::Range;
use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array};
use arrow::datatypes::{DataType, Field};
use arrow::util::bench_util::{
create_string_array_with_len, create_string_view_array_with_len,
};
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
use datafusion_functions::unicode::{left, right};

const BATCH_SIZE: usize = 8192;

fn create_args(
size: usize,
str_len: usize,
use_negative: bool,
n_range: Range<i64>,
is_string_view: bool,
) -> Vec<ColumnarValue> {
let string_arg = if is_string_view {
ColumnarValue::Array(Arc::new(create_string_view_array_with_len(
size, 0.1, str_len, true,
BATCH_SIZE, 0.1, str_len, true,
)))
} else {
ColumnarValue::Array(Arc::new(create_string_array_with_len::<i32>(
size, 0.1, str_len,
BATCH_SIZE, 0.1, str_len,
)))
};

// For negative n, we want to trigger the double-iteration code path
let n_values: Vec<i64> = if use_negative {
(0..size).map(|i| -((i % 10 + 1) as i64)).collect()
} else {
(0..size).map(|i| (i % 10 + 1) as i64).collect()
};
let n_span = (n_range.end - n_range.start) as usize;
let n_values: Vec<i64> = (0..BATCH_SIZE)
.map(|i| n_range.start + (i % n_span) as i64)
.collect();
let n_array = Arc::new(Int64Array::from(n_values));

vec![
Expand All @@ -59,68 +59,55 @@ fn create_args(
}

fn criterion_benchmark(c: &mut Criterion) {
let left_function = left();
let right_function = right();
// Short results (1-10 chars) produce inline StringView entries (≤12 bytes).
// Long results (20-29 chars) produce out-of-line entries.
let cases = [
("short_result", 32, 1..11_i64),
("long_result", 32, 20..30_i64),
];

for function in [left_function, right_function] {
for is_string_view in [false, true] {
for is_negative in [false, true] {
for size in [1024, 4096] {
let function_name = function.name();
let mut group =
c.benchmark_group(format!("{function_name} size={size}"));
for function in [left(), right()] {
let mut group = c.benchmark_group(function.name().to_string());

let bench_name = format!(
"{} {} n",
if is_string_view {
"string_view_array"
} else {
"string_array"
},
if is_negative { "negative" } else { "positive" },
);
let return_type = if is_string_view {
DataType::Utf8View
} else {
DataType::Utf8
};

let args = create_args(size, 32, is_negative, is_string_view);
group.bench_function(BenchmarkId::new(bench_name, size), |b| {
let arg_fields = args
.iter()
.enumerate()
.map(|(idx, arg)| {
Field::new(format!("arg_{idx}"), arg.data_type(), true)
.into()
})
.collect::<Vec<_>>();
let config_options = Arc::new(ConfigOptions::default());
for is_string_view in [false, true] {
let array_type = if is_string_view {
"string_view"
} else {
"string"
};

b.iter(|| {
black_box(
function
.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
arg_fields: arg_fields.clone(),
number_rows: size,
return_field: Field::new(
"f",
return_type.clone(),
true,
)
.into(),
config_options: Arc::clone(&config_options),
})
.expect("should work"),
)
})
});
for (case_name, str_len, n_range) in &cases {
let bench_name = format!("{array_type} {case_name}");
let args = create_args(*str_len, n_range.clone(), is_string_view);
let arg_fields: Vec<_> = args
.iter()
.enumerate()
.map(|(idx, arg)| {
Field::new(format!("arg_{idx}"), arg.data_type(), true).into()
})
.collect();
let config_options = Arc::new(ConfigOptions::default());
let return_field = Field::new("f", DataType::Utf8View, true).into();

group.finish();
}
group.bench_function(&bench_name, |b| {
b.iter(|| {
black_box(
function
.invoke_with_args(ScalarFunctionArgs {
args: args.clone(),
arg_fields: arg_fields.clone(),
number_rows: BATCH_SIZE,
return_field: Arc::clone(&return_field),
config_options: Arc::clone(&config_options),
})
.expect("should work"),
)
})
});
}
}

group.finish();
}
}

Expand Down
181 changes: 112 additions & 69 deletions datafusion/functions/src/unicode/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
//! Common utilities for implementing unicode functions

use arrow::array::{
Array, ArrayAccessor, ArrayIter, ArrayRef, ByteView, GenericStringArray, Int64Array,
OffsetSizeTrait, StringViewArray, make_view,
Array, ArrayRef, ByteView, GenericStringArray, Int64Array, OffsetSizeTrait,
StringViewArray, make_view,
};
use arrow::datatypes::DataType;
use arrow_buffer::{NullBuffer, ScalarBuffer};
use datafusion_common::Result;
use datafusion_common::ScalarValue;
use datafusion_common::cast::{
as_generic_string_array, as_int64_array, as_string_view_array,
Expand Down Expand Up @@ -99,17 +100,17 @@ fn left_right_byte_length(string: &str, n: i64) -> usize {
/// General implementation for `left` and `right` functions
pub(crate) fn general_left_right<F: LeftRightSlicer>(
args: &[ArrayRef],
) -> datafusion_common::Result<ArrayRef> {
) -> Result<ArrayRef> {
let n_array = as_int64_array(&args[1])?;

match args[0].data_type() {
DataType::Utf8 => {
let string_array = as_generic_string_array::<i32>(&args[0])?;
general_left_right_array::<i32, _, F>(string_array, n_array)
general_left_right_array::<i32, F>(string_array, n_array)
}
DataType::LargeUtf8 => {
let string_array = as_generic_string_array::<i64>(&args[0])?;
general_left_right_array::<i64, _, F>(string_array, n_array)
general_left_right_array::<i64, F>(string_array, n_array)
}
DataType::Utf8View => {
let string_view_array = as_string_view_array(&args[0])?;
Expand All @@ -119,83 +120,125 @@ pub(crate) fn general_left_right<F: LeftRightSlicer>(
}
}

/// `general_left_right` implementation for strings
fn general_left_right_array<
'a,
T: OffsetSizeTrait,
V: ArrayAccessor<Item = &'a str>,
F: LeftRightSlicer,
>(
string_array: V,
/// Returns true if all offsets in the array fit in u32, meaning the values
/// buffer can be referenced by StringView's u32 offset field.
fn values_fit_in_u32<T: OffsetSizeTrait>(string_array: &GenericStringArray<T>) -> bool {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also appears in #21366, I'll refactor to avoid code duplication in a followup PR.

string_array
.offsets()
.last()
.map(|offset| offset.as_usize() <= u32::MAX as usize)
.unwrap_or(true)
}

/// `left`/`right` for Utf8/LargeUtf8 input.
///
/// When offsets fit in u32, produces a zero-copy `StringViewArray` with views
/// pointing into the input values buffer. Otherwise falls back to building a
/// `StringViewArray` by copying.
fn general_left_right_array<T: OffsetSizeTrait, F: LeftRightSlicer>(
string_array: &GenericStringArray<T>,
n_array: &Int64Array,
) -> datafusion_common::Result<ArrayRef> {
let iter = ArrayIter::new(string_array);
let result = iter
.zip(n_array.iter())
.map(|(string, n)| match (string, n) {
(Some(string), Some(n)) => {
let range = F::slice(string, n);
// Extract a given range from a byte-indexed slice
Some(&string[range])
}
_ => None,
})
.collect::<GenericStringArray<T>>();
) -> Result<ArrayRef> {
if !values_fit_in_u32(string_array) {
let result = string_array
.iter()
.zip(n_array.iter())
.map(|(string, n)| match (string, n) {
(Some(string), Some(n)) => Some(&string[F::slice(string, n)]),
_ => None,
})
.collect::<StringViewArray>();
return Ok(Arc::new(result) as ArrayRef);
}

let len = string_array.len();
let offsets = string_array.value_offsets();
let nulls = NullBuffer::union(string_array.nulls(), n_array.nulls());

Ok(Arc::new(result) as ArrayRef)
let mut views_buf = Vec::with_capacity(len);
let mut has_out_of_line = false;

for (i, offset) in offsets.iter().enumerate().take(len) {
if nulls.as_ref().is_some_and(|n| !n.is_valid(i)) {
views_buf.push(0);
continue;
}

// SAFETY: we just checked validity above
let string = unsafe { string_array.value_unchecked(i) };
let n = n_array.value(i);
let range = F::slice(string, n);
let result_bytes = &string.as_bytes()[range.clone()];
if result_bytes.len() > 12 {
has_out_of_line = true;
}

let buf_offset = offset.as_usize() as u32 + range.start as u32;
views_buf.push(make_view(result_bytes, 0, buf_offset));
}

let views = ScalarBuffer::from(views_buf);
let data_buffers = if has_out_of_line {
vec![string_array.values().clone()]
} else {
vec![]
};

// SAFETY:
// - Each view is produced by `make_view` with correct bytes and offset
// - Out-of-line views reference buffer index 0, which is the original
// values buffer included in data_buffers when has_out_of_line is true
// - values_fit_in_u32 guarantees all offsets fit in u32
unsafe {
let array = StringViewArray::new_unchecked(views, data_buffers, nulls);
Ok(Arc::new(array) as ArrayRef)
}
}

/// `general_left_right` implementation for StringViewArray
/// `general_left_right` for StringViewArray input.
fn general_left_right_view<F: LeftRightSlicer>(
string_view_array: &StringViewArray,
n_array: &Int64Array,
) -> datafusion_common::Result<ArrayRef> {
let len = n_array.len();

) -> Result<ArrayRef> {
let views = string_view_array.views();
// Every string in StringViewArray has one corresponding view in `views`
debug_assert!(views.len() == string_view_array.len());

// Compose null buffer at once
let string_nulls = string_view_array.nulls();
let n_nulls = n_array.nulls();
let new_nulls = NullBuffer::union(string_nulls, n_nulls);
let new_nulls = NullBuffer::union(string_view_array.nulls(), n_array.nulls());
let len = n_array.len();
let mut has_out_of_line = false;

let new_views = (0..len)
.map(|idx| {
let view = views[idx];

let is_valid = match &new_nulls {
Some(nulls_buf) => nulls_buf.is_valid(idx),
None => true,
};

if is_valid {
let string: &str = string_view_array.value(idx);
let n = n_array.value(idx);

// Input string comes from StringViewArray, so it should fit in 32-bit length
let range = F::slice(string, n);
let result_bytes = &string.as_bytes()[range.clone()];

let byte_view = ByteView::from(view);
// New offset starts at 0 for left, and at `range.start` for right,
// which is encoded in the given range
let new_offset = byte_view.offset + (range.start as u32);
// Reuse buffer
make_view(result_bytes, byte_view.buffer_index, new_offset)
} else {
// For nulls, keep the original view
view
if new_nulls.as_ref().is_some_and(|n| !n.is_valid(idx)) {
return 0;
}

// SAFETY: we just checked validity above
let string: &str = unsafe { string_view_array.value_unchecked(idx) };
let n = n_array.value(idx);

let range = F::slice(string, n);
let result_bytes = &string.as_bytes()[range.clone()];
if result_bytes.len() > 12 {
has_out_of_line = true;
}

let byte_view = ByteView::from(views[idx]);
let new_offset = byte_view.offset + (range.start as u32);
make_view(result_bytes, byte_view.buffer_index, new_offset)
})
.collect::<Vec<u128>>();

// Buffers are unchanged
let result = StringViewArray::try_new(
ScalarBuffer::from(new_views),
Vec::from(string_view_array.data_buffers()),
new_nulls,
)?;
Ok(Arc::new(result) as ArrayRef)
let views = ScalarBuffer::from(new_views);
let data_buffers = if has_out_of_line {
string_view_array.data_buffers().to_vec()
} else {
vec![]
};

// SAFETY:
// - Each view is produced by `make_view` with correct bytes and offset
// - Out-of-line views reuse the original buffer index and adjusted offset
unsafe {
let array = StringViewArray::new_unchecked(views, data_buffers, new_nulls);
Ok(Arc::new(array) as ArrayRef)
}
}
Loading
Loading