Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 182 additions & 1 deletion arrow-select/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,72 @@ fn concat_lists<OffsetSize: OffsetSizeTrait>(
Ok(Arc::new(array))
}

fn concat_maps(

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This copy the logic for List, I don't know can we share a offset based template impl here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

arrays: &[&dyn Array],
field: &FieldRef,
ordered: bool,
) -> Result<ArrayRef, ArrowError> {
let mut output_len = 0;
let mut map_has_nulls = false;
let mut map_has_slices = false;

let maps = arrays
.iter()
.map(|x| x.as_map())
.inspect(|m| {
output_len += m.len();
map_has_nulls |= m.null_count() != 0;
map_has_slices |=
m.offsets()[0] > 0 || m.offsets().last().unwrap().as_usize() < m.entries().len();
})
.collect::<Vec<_>>();

let map_nulls = map_has_nulls.then(|| {
let mut nulls = BooleanBufferBuilder::new(output_len);
for m in &maps {
match m.nulls() {
Some(n) => nulls.append_buffer(n.inner()),
None => nulls.append_n(m.len(), true),
}
}
NullBuffer::new(nulls.finish())
});

// If any of the maps have slices, we need to slice the entries
// to ensure that the offsets are correct
let mut sliced_entries: Vec<ArrayRef>;
let entries: Vec<&dyn Array> = if map_has_slices {
sliced_entries = Vec::with_capacity(maps.len());
for m in &maps {
let offsets = m.offsets();
let start_offset = offsets[0].as_usize();
let end_offset = offsets.last().unwrap().as_usize();
let entries_arr: &dyn Array = m.entries();
sliced_entries.push(entries_arr.slice(start_offset, end_offset - start_offset));
}
sliced_entries.iter().map(|a| a.as_ref()).collect()
} else {
maps.iter().map(|m| m.entries() as &dyn Array).collect()
};

let concatenated_entries = concat(entries.as_slice())?;

// Merge value offsets from the maps
let value_offset_buffer =
OffsetBuffer::<i32>::from_lengths(maps.iter().flat_map(|m| m.offsets().lengths()));

let array = MapArray::try_new(
Arc::clone(field),
value_offset_buffer,
// Safety: Map entries are always StructArrays, so this downcast is guaranteed to succeed
concatenated_entries.as_struct().clone(),
map_nulls,
ordered,
)?;

Ok(Arc::new(array))
}

fn concat_list_view<OffsetSize: OffsetSizeTrait>(
arrays: &[&dyn Array],
field: &FieldRef,
Expand Down Expand Up @@ -482,6 +548,7 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
DataType::LargeList(field) => concat_lists::<i64>(arrays, field),
DataType::ListView(field) => concat_list_view::<i32>(arrays, field),
DataType::LargeListView(field) => concat_list_view::<i64>(arrays, field),
DataType::Map(field, ordered) => concat_maps(arrays, field, *ordered),
DataType::Struct(fields) => concat_structs(arrays, fields),
DataType::Utf8 => concat_bytes::<Utf8Type>(arrays),
DataType::LargeUtf8 => concat_bytes::<LargeUtf8Type>(arrays),
Expand Down Expand Up @@ -561,7 +628,8 @@ pub fn concat_batches<'a>(
mod tests {
use super::*;
use arrow_array::builder::{
GenericListBuilder, Int64Builder, ListViewBuilder, StringDictionaryBuilder,
GenericListBuilder, Int32Builder as Int32ArrayBuilder, Int64Builder, ListViewBuilder,
MapBuilder, StringBuilder, StringDictionaryBuilder,
};
use arrow_schema::{Field, Schema};
use std::fmt::Debug;
Expand Down Expand Up @@ -1900,4 +1968,117 @@ mod tests {
assert_eq!(values.values(), &[10, 20, 30]);
assert_eq!(&[2, 3, 5], run_ends);
}

/// A single row of a {String -> Int32} map: `None` for a null row, otherwise
/// the list of (key, optional value) entries.
type StringIntMapRow<'a> = Option<Vec<(&'a str, Option<i32>)>>;

/// Helper to build a MapArray of {String -> Int32} from a list of entries per row.
fn build_string_int_map(rows: Vec<StringIntMapRow>) -> MapArray {
let mut builder = MapBuilder::new(None, StringBuilder::new(), Int32ArrayBuilder::new());
for row in rows {
match row {
Some(entries) => {
for (k, v) in entries {
builder.keys().append_value(k);
builder.values().append_option(v);
}
builder.append(true).unwrap();
}
None => {
builder.append(false).unwrap();
}
}
}
builder.finish()
}

#[test]
fn test_concat_map_arrays() {
let map1 = build_string_int_map(vec![
Some(vec![("a", Some(1)), ("b", Some(2))]),
Some(vec![("c", Some(3))]),
]);
let map2 = build_string_int_map(vec![
Some(vec![("d", Some(4)), ("e", Some(5))]),
None,
Some(vec![("f", Some(6))]),
]);

let result = concat(&[&map1, &map2]).unwrap();
let result_map = result.as_map();

assert_eq!(result_map.len(), 5);
assert_eq!(result_map.null_count(), 1);

// Check offsets
assert_eq!(result_map.value_offsets(), &[0, 2, 3, 5, 5, 6]);

// Check keys
let keys = result_map.keys().as_string::<i32>();
let expected_keys: Vec<&str> = vec!["a", "b", "c", "d", "e", "f"];
let actual_keys: Vec<&str> = keys.iter().map(|v| v.unwrap()).collect();
assert_eq!(actual_keys, expected_keys);

// Check values
let values = result_map.values().as_primitive::<Int32Type>();
assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6]);
}

#[test]
fn test_concat_map_arrays_sliced() {
let map = build_string_int_map(vec![
Some(vec![("a", Some(1))]),
Some(vec![("b", Some(2)), ("c", Some(3))]),
Some(vec![("d", Some(4))]),
Some(vec![("e", Some(5))]),
]);

// Slice to get the middle two rows: [("b",2),("c",3)] and [("d",4)]
let sliced = map.slice(1, 2);

let map2 = build_string_int_map(vec![Some(vec![("f", Some(6))])]);

let result = concat(&[&sliced, &map2]).unwrap();
let result_map = result.as_map();

assert_eq!(result_map.len(), 3);
assert_eq!(result_map.value_offsets(), &[0, 2, 3, 4]);

let keys = result_map.keys().as_string::<i32>();
let actual_keys: Vec<&str> = keys.iter().map(|v| v.unwrap()).collect();
assert_eq!(actual_keys, vec!["b", "c", "d", "f"]);
}

#[test]
fn test_concat_map_arrays_with_nulls() {
let map1 = build_string_int_map(vec![Some(vec![("a", Some(1))]), None]);
let map2 = build_string_int_map(vec![None, Some(vec![("b", Some(2))])]);

let result = concat(&[&map1, &map2]).unwrap();
let result_map = result.as_map();

assert_eq!(result_map.len(), 4);
assert_eq!(result_map.null_count(), 2);
assert!(result_map.is_valid(0));
assert!(result_map.is_null(1));
assert!(result_map.is_null(2));
assert!(result_map.is_valid(3));
}

#[test]
fn test_concat_map_arrays_empty_maps() {
let map1 = build_string_int_map(vec![Some(vec![]), Some(vec![("a", Some(1))])]);
let map2 = build_string_int_map(vec![
Some(vec![]),
Some(vec![("b", Some(2)), ("c", Some(3))]),
]);

let result = concat(&[&map1, &map2]).unwrap();
let result_map = result.as_map();

assert_eq!(result_map.len(), 4);
assert_eq!(result_map.null_count(), 0);
assert_eq!(result_map.value_offsets(), &[0, 0, 1, 1, 3]);
}
}
Loading