-
Notifications
You must be signed in to change notification settings - Fork 335
feat: support size() for MapType input #4580
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
4f17386
0b3c451
ff01bbc
92789ac
8e8f6c5
206fbd0
22407c5
4ed725f
1921d5c
a7f7529
90d59ca
d5ecc47
678f2d5
3b1c1f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -198,6 +198,14 @@ fn spark_size_scalar(scalar: &ScalarValue) -> Result<ScalarValue, DataFusionErro | |
| Ok(ScalarValue::Int32(Some(len))) | ||
| } | ||
| } | ||
| ScalarValue::Map(array) => { | ||
| if array.is_null(0) { | ||
| Ok(ScalarValue::Int32(Some(-1))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } else { | ||
| let len = array.value_length(0) as i32; | ||
| Ok(ScalarValue::Int32(Some(len))) | ||
|
marvelshan marked this conversation as resolved.
|
||
| } | ||
| } | ||
| ScalarValue::Null => { | ||
| Ok(ScalarValue::Int32(Some(-1))) // Spark behavior: return -1 for null | ||
| } | ||
|
|
@@ -276,78 +284,125 @@ mod tests { | |
| assert_eq!(result, ScalarValue::Int32(Some(-1))); | ||
| } | ||
|
|
||
| // TODO: Add map array test once Arrow MapArray API constraints are resolved | ||
| // Currently MapArray doesn't allow nulls in entries which makes testing complex | ||
| // The core size() implementation supports maps correctly | ||
| #[ignore] | ||
| #[test] | ||
| fn test_spark_size_map_array() { | ||
| use arrow::array::{MapArray, StringArray}; | ||
| use arrow::array::{Int32Array, MapArray, StringArray}; | ||
|
|
||
| // Create a simpler test with maps: | ||
| // [{"key1": "value1", "key2": "value2"}, {"key3": "value3"}, {}, null] | ||
| let keys = StringArray::from(vec![Some("key1"), Some("key2"), Some("key3")]); | ||
| let values = Int32Array::from(vec![Some(1), Some(2), Some(3)]); | ||
|
|
||
| // Create keys array for all entries (no nulls) | ||
| let keys = StringArray::from(vec!["key1", "key2", "key3"]); | ||
|
|
||
| // Create values array for all entries (no nulls) | ||
| let values = StringArray::from(vec!["value1", "value2", "value3"]); | ||
|
|
||
| // Create entry offsets: [0, 2, 3, 3] representing: | ||
| // - Map 1: entries 0-1 (2 key-value pairs) | ||
| // - Map 2: entries 2-2 (1 key-value pair) | ||
| // - Map 3: entries 3-2 (0 key-value pairs, empty map) | ||
| // - Map 4: null (handled by null buffer) | ||
| let entry_offsets = arrow::buffer::OffsetBuffer::new(vec![0, 2, 3, 3, 3].into()); | ||
| let entry_offsets = arrow::buffer::OffsetBuffer::new(vec![0i32, 2, 3, 3, 3].into()); | ||
|
|
||
| let key_field = Arc::new(Field::new("key", DataType::Utf8, false)); | ||
| let value_field = Arc::new(Field::new("value", DataType::Utf8, false)); // Make values non-nullable too | ||
| let value_field = Arc::new(Field::new("value", DataType::Int32, true)); | ||
|
|
||
| // Create the entries struct array | ||
| let entries = arrow::array::StructArray::new( | ||
| arrow::datatypes::Fields::from(vec![key_field, value_field]), | ||
| vec![Arc::new(keys), Arc::new(values)], | ||
| None, // No nulls in the entries struct array itself | ||
| None, | ||
| ); | ||
|
|
||
| // Create null buffer for the map array (fourth map is null) | ||
| let mut null_buffer = NullBufferBuilder::new(4); | ||
| null_buffer.append(true); // Map with 2 entries - not null | ||
| null_buffer.append(true); // Map with 1 entry - not null | ||
| null_buffer.append(true); // Empty map - not null | ||
| null_buffer.append(false); // null map | ||
|
|
||
| let map_data_type = DataType::Map( | ||
| Arc::new(Field::new( | ||
| "entries", | ||
| DataType::Struct(arrow::datatypes::Fields::from(vec![ | ||
| Field::new("key", DataType::Utf8, false), | ||
| Field::new("value", DataType::Utf8, false), // Make values non-nullable too | ||
| ])), | ||
| false, | ||
| )), | ||
| false, // keys are not sorted | ||
| ); | ||
|
|
||
| let map_field = Arc::new(Field::new("map", map_data_type, true)); | ||
|
|
||
| let map_array = MapArray::new( | ||
| null_buffer.append(true); | ||
| null_buffer.append(true); | ||
| null_buffer.append(true); | ||
| null_buffer.append(false); | ||
|
|
||
| let map_field = Arc::new(Field::new( | ||
| "entries", | ||
| DataType::Struct(arrow::datatypes::Fields::from(vec![ | ||
| Field::new("key", DataType::Utf8, false), | ||
| Field::new("value", DataType::Int32, true), | ||
| ])), | ||
| false, | ||
| )); | ||
|
|
||
| let map_array = MapArray::try_new( | ||
| map_field, | ||
| entry_offsets, | ||
| entries, | ||
| null_buffer.finish(), | ||
| false, // keys are not sorted | ||
| ); | ||
| false, | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| let array_ref: ArrayRef = Arc::new(map_array); | ||
| let result = spark_size_array(&array_ref).unwrap(); | ||
| let result = result.as_any().downcast_ref::<Int32Array>().unwrap(); | ||
|
|
||
| // Expected: [2, 1, 0, -1] | ||
| assert_eq!(result.value(0), 2); // Map with 2 key-value pairs | ||
| assert_eq!(result.value(1), 1); // Map with 1 key-value pair | ||
| assert_eq!(result.value(2), 0); // empty map has 0 pairs | ||
| assert_eq!(result.value(3), -1); // null map returns -1 | ||
| assert_eq!(result.value(0), 2); | ||
| assert_eq!(result.value(1), 1); | ||
| assert_eq!(result.value(2), 0); | ||
| assert_eq!(result.value(3), -1); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_spark_size_scalar_map() { | ||
| use arrow::array::{Int32Array, MapArray, StringArray}; | ||
|
|
||
| let keys = StringArray::from(vec![Some("a"), Some("b")]); | ||
| let values = Int32Array::from(vec![Some(1), Some(2)]); | ||
| let entry_offsets = arrow::buffer::OffsetBuffer::new(vec![0i32, 2].into()); | ||
|
|
||
| let key_field = Arc::new(Field::new("key", DataType::Utf8, false)); | ||
| let value_field = Arc::new(Field::new("value", DataType::Int32, true)); | ||
|
|
||
| let entries = arrow::array::StructArray::new( | ||
| arrow::datatypes::Fields::from(vec![key_field, value_field]), | ||
| vec![Arc::new(keys), Arc::new(values)], | ||
| None, | ||
| ); | ||
|
|
||
| let map_field = Arc::new(Field::new( | ||
| "entries", | ||
| DataType::Struct(arrow::datatypes::Fields::from(vec![ | ||
| Field::new("key", DataType::Utf8, false), | ||
| Field::new("value", DataType::Int32, true), | ||
| ])), | ||
| false, | ||
| )); | ||
|
|
||
| let map_array = MapArray::try_new(map_field, entry_offsets, entries, None, false).unwrap(); | ||
| let scalar = ScalarValue::Map(Arc::new(map_array)); | ||
| let result = spark_size_scalar(&scalar).unwrap(); | ||
| assert_eq!(result, ScalarValue::Int32(Some(2))); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_spark_size_scalar_null_map() { | ||
| use arrow::array::{Int32Array, MapArray, StringArray}; | ||
|
|
||
| let keys = StringArray::from(vec![Some("a")]); | ||
| let values = Int32Array::from(vec![Some(1)]); | ||
| let entry_offsets = arrow::buffer::OffsetBuffer::new(vec![0i32, 1].into()); | ||
|
|
||
| let key_field = Arc::new(Field::new("key", DataType::Utf8, false)); | ||
| let value_field = Arc::new(Field::new("value", DataType::Int32, true)); | ||
|
|
||
| let entries = arrow::array::StructArray::new( | ||
| arrow::datatypes::Fields::from(vec![key_field, value_field]), | ||
| vec![Arc::new(keys), Arc::new(values)], | ||
| None, | ||
| ); | ||
|
|
||
| let map_field = Arc::new(Field::new( | ||
| "entries", | ||
| DataType::Struct(arrow::datatypes::Fields::from(vec![ | ||
| Field::new("key", DataType::Utf8, false), | ||
| Field::new("value", DataType::Int32, true), | ||
| ])), | ||
| false, | ||
| )); | ||
|
|
||
| let mut null_buffer = NullBufferBuilder::new(1); | ||
| null_buffer.append(false); | ||
|
|
||
| let map_array = | ||
| MapArray::try_new(map_field, entry_offsets, entries, null_buffer.finish(), false) | ||
| .unwrap(); | ||
| let scalar = ScalarValue::Map(Arc::new(map_array)); | ||
| let result = spark_size_scalar(&scalar).unwrap(); | ||
| assert_eq!(result, ScalarValue::Int32(Some(-1))); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,9 +21,21 @@ CREATE TABLE test_size(arr array<int>, m map<string, int>) USING parquet | |
| statement | ||
| INSERT INTO test_size VALUES (array(1, 2, 3), map('a', 1, 'b', 2)), (array(), map()), (NULL, NULL) | ||
|
|
||
| query spark_answer_only | ||
| query | ||
| SELECT size(arr), size(m) FROM test_size | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you also add literal map cases below the existing literal-args query? Something like: query
SELECT size(map('a', 1, 'b', 2)), size(map()), size(cast(NULL as map<string,int>))That way the literal path is covered for both shapes. While you're here, a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think literal map is not supported yet
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — all three items added:
ScalarValue::Map unit tests also added per review (ff01bbc), and stale docs notes cleared (b1d177a). Should I file a follow-up issue for |
||
|
|
||
| -- literal arguments | ||
| -- literal array arguments | ||
| query | ||
| SELECT size(array(1, 2, 3)), size(array()), size(cast(NULL as array<int>)) | ||
|
|
||
| -- literal map via CreateMap (falls back: Comet has no CreateMap serde; | ||
| -- cast(NULL as map) avoids CreateMap and goes through CometLiteral instead) | ||
| query spark_answer_only | ||
| SELECT size(map('a', 1, 'b', 2)), size(map()) | ||
|
|
||
| query | ||
| SELECT size(cast(NULL as map<string,int>)) | ||
|
|
||
| -- cardinality is a SQL alias for size | ||
| query | ||
| SELECT cardinality(arr), cardinality(m) FROM test_size | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| -- Licensed to the Apache Software Foundation (ASF) under one | ||
| -- or more contributor license agreements. See the NOTICE file | ||
| -- distributed with this work for additional information | ||
| -- regarding copyright ownership. The ASF licenses this file | ||
| -- to you under the Apache License, Version 2.0 (the | ||
| -- "License"); you may not use this file except in compliance | ||
| -- with the License. You may obtain a copy of the License at | ||
| -- | ||
| -- http://www.apache.org/licenses/LICENSE-2.0 | ||
| -- | ||
| -- Unless required by applicable law or agreed to in writing, | ||
| -- software distributed under the License is distributed on an | ||
| -- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| -- KIND, either express or implied. See the License for the | ||
| -- specific language governing permissions and limitations | ||
| -- under the License. | ||
|
|
||
| -- Config: spark.sql.legacy.sizeOfNull=false | ||
|
marvelshan marked this conversation as resolved.
Outdated
|
||
|
|
||
| statement | ||
| CREATE TABLE test_size_legacy_off(arr array<int>, m map<string, int>) USING parquet | ||
|
|
||
| statement | ||
| INSERT INTO test_size_legacy_off VALUES (array(1, 2, 3), map('a', 1, 'b', 2)), (array(), map()), (NULL, NULL) | ||
|
|
||
| -- With sizeOfNull=false, size(NULL) returns NULL instead of -1 | ||
| query | ||
| SELECT size(arr), size(m) FROM test_size_legacy_off | ||
|
|
||
| query | ||
| SELECT size(cast(NULL as array<int>)), size(cast(NULL as map<string,int>)) | ||
|
|
||
| query | ||
| SELECT cardinality(arr), cardinality(m) FROM test_size_legacy_off | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -126,23 +126,7 @@ class CometMapExpressionSuite extends CometTestBase { | |
| } | ||
| } | ||
|
|
||
| test("fallback for size with map input") { | ||
| withTempDir { dir => | ||
| withTempView("t1") { | ||
| val path = new Path(dir.toURI.toString, "test.parquet") | ||
| makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = true, 100) | ||
| spark.read.parquet(path.toString).createOrReplaceTempView("t1") | ||
|
|
||
| // Use column references in maps to avoid constant folding | ||
| checkSparkAnswerAndFallbackReason( | ||
| sql("SELECT size(case when _2 < 0 then map(_8, _9) else map() end) from t1"), | ||
| "size does not support map inputs") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // fails with "map is not supported" | ||
| ignore("size with map input") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this test still available?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it has been promoted from |
||
| test("size with map input") { | ||
| withTempDir { dir => | ||
| withTempView("t1") { | ||
| val path = new Path(dir.toURI.toString, "test.parquet") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.