Skip to content

Commit 3dec709

Browse files
authored
HIVE-29544: Fix Vectorized Parquet reading Struct columns with all fields null (#6408)
1 parent 709b9e2 commit 3dec709

File tree

9 files changed

+176
-20
lines changed

9 files changed

+176
-20
lines changed

iceberg/iceberg-handler/src/test/results/positive/iceberg_default_column.q.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
174174
3 {"x":100,"y":99} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general NULL
175175
4 {"x":100,"y":99} NULL 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general NULL
176176
5 {"x":100,"y":99} custom_name 30 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general NULL
177-
6 NULL unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general NULL
177+
6 {"x":null,"y":null} unknown 25 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general NULL
178178
7 NULL null NULL 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general NULL
179179
8 NULL null NULL 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general {"name":"John","address":{"street":"Main St","city":"New York"}}
180180
9 NULL null NULL 50000.0 true 2024-01-01 2024-01-01 10:00:00 100.00 general {"name":null,"address":{"street":null,"city":"Bangalore"}}

ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/BaseVectorizedColumnReader.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ public abstract class BaseVectorizedColumnReader implements VectorizedColumnRead
8787
protected int definitionLevel;
8888
protected int repetitionLevel;
8989

90+
protected int[] currentDefLevels;
91+
protected int defLevelIndex = 0;
92+
9093
/**
9194
* Repetition/Definition/Value readers.
9295
*/
@@ -154,6 +157,9 @@ public BaseVectorizedColumnReader(
154157
protected void readRepetitionAndDefinitionLevels() {
155158
repetitionLevel = repetitionLevelColumn.nextInt();
156159
definitionLevel = definitionLevelColumn.nextInt();
160+
if (currentDefLevels != null && defLevelIndex < currentDefLevels.length) {
161+
currentDefLevels[defLevelIndex++] = definitionLevel;
162+
}
157163
valuesRead++;
158164
}
159165

@@ -309,4 +315,9 @@ int nextInt() {
309315
return 0;
310316
}
311317
}
318+
319+
@Override
320+
public int[] getDefinitionLevels() {
321+
return currentDefLevels;
322+
}
312323
}

ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,8 @@ void readBatch(
3636
int total,
3737
ColumnVector column,
3838
TypeInfo columnType) throws IOException;
39+
40+
default int[] getDefinitionLevels() {
41+
return null;
42+
}
3943
}

ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -459,14 +459,14 @@ private void checkEndOfRowGroup() throws IOException {
459459
columnReaders[i] =
460460
buildVectorizedParquetReader(columnTypesList.get(colsToInclude.get(i)), types.get(i),
461461
pages, requestedSchema.getColumns(), skipTimestampConversion, writerTimezone, skipProlepticConversion,
462-
legacyConversionEnabled, 0);
462+
legacyConversionEnabled, 0, 0);
463463
}
464464
}
465465
} else {
466466
for (int i = 0; i < types.size(); ++i) {
467467
columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages,
468468
requestedSchema.getColumns(), skipTimestampConversion, writerTimezone, skipProlepticConversion,
469-
legacyConversionEnabled, 0);
469+
legacyConversionEnabled, 0, 0);
470470
}
471471
}
472472

@@ -522,7 +522,12 @@ private VectorizedColumnReader buildVectorizedParquetReader(
522522
ZoneId writerTimezone,
523523
boolean skipProlepticConversion,
524524
boolean legacyConversionEnabled,
525-
int depth) throws IOException {
525+
int depth, int currentDefLevel) throws IOException {
526+
527+
int typeDefLevel = currentDefLevel;
528+
if (type.isRepetition(Type.Repetition.OPTIONAL) || type.isRepetition(Type.Repetition.REPEATED)) {
529+
typeDefLevel++;
530+
}
526531
List<ColumnDescriptor> descriptors =
527532
getAllColumnDescriptorByType(depth, type, columnDescriptors);
528533
// Support for schema evolution: if the column from the current
@@ -549,8 +554,8 @@ private VectorizedColumnReader buildVectorizedParquetReader(
549554
List<Type> types = type.asGroupType().getFields();
550555
for (int i = 0; i < fieldTypes.size(); i++) {
551556
VectorizedColumnReader r =
552-
buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors,
553-
skipTimestampConversion, writerTimezone, skipProlepticConversion, legacyConversionEnabled, depth + 1);
557+
buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors, skipTimestampConversion,
558+
writerTimezone, skipProlepticConversion, legacyConversionEnabled, depth + 1, typeDefLevel);
554559
if (r != null) {
555560
fieldReaders.add(r);
556561
} else {
@@ -559,7 +564,7 @@ private VectorizedColumnReader buildVectorizedParquetReader(
559564
.getTypeName() + " and Parquet type" + types.get(i).toString());
560565
}
561566
}
562-
return new VectorizedStructColumnReader(fieldReaders);
567+
return new VectorizedStructColumnReader(fieldReaders, typeDefLevel);
563568
case LIST:
564569
checkListColumnSupport(((ListTypeInfo) typeInfo).getListElementTypeInfo());
565570
if (columnDescriptors == null || columnDescriptors.isEmpty()) {

ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public void readBatch(
6464
int total,
6565
ColumnVector column,
6666
TypeInfo columnType) throws IOException {
67+
this.currentDefLevels = new int[total];
68+
this.defLevelIndex = 0;
6769
int rowId = 0;
6870
while (total > 0) {
6971
// Compute the number of values we want to read in this page.

ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
public class VectorizedStructColumnReader implements VectorizedColumnReader {
3030

3131
private final List<VectorizedColumnReader> fieldReaders;
32+
private final int structDefLevel;
3233

33-
public VectorizedStructColumnReader(List<VectorizedColumnReader> fieldReaders) {
34+
public VectorizedStructColumnReader(List<VectorizedColumnReader> fieldReaders, int structDefLevel) {
3435
this.fieldReaders = fieldReaders;
36+
this.structDefLevel = structDefLevel;
3537
}
3638

3739
@Override
@@ -46,14 +48,29 @@ public void readBatch(
4648
fieldReaders.get(i)
4749
.readBatch(total, vectors[i], structTypeInfo.getAllStructFieldTypeInfos().get(i));
4850
structColumnVector.isRepeating = structColumnVector.isRepeating && vectors[i].isRepeating;
51+
}
52+
int[] defLevels = getDefinitionLevels();
4953

50-
for (int j = 0; j < vectors[i].isNull.length; j++) {
51-
structColumnVector.isNull[j] =
52-
(i == 0) ? vectors[i].isNull[j] : structColumnVector.isNull[j] && vectors[i].isNull[j];
54+
// Evaluate struct nullability using Parquet Definition Levels
55+
if (defLevels != null) {
56+
for (int j = 0; j < total; j++) {
57+
if (defLevels[j] < structDefLevel) {
58+
// The Definition Level boundary crossed the struct. The whole struct is null.
59+
structColumnVector.isNull[j] = true;
60+
structColumnVector.noNulls = false;
61+
}
5362
}
54-
structColumnVector.noNulls =
55-
(i == 0) ? vectors[i].noNulls : structColumnVector.noNulls && vectors[i].noNulls;
5663
}
64+
}
5765

66+
@Override
67+
public int[] getDefinitionLevels() {
68+
for (VectorizedColumnReader reader : fieldReaders) {
69+
int[] defLevels = reader.getDefinitionLevels();
70+
if (defLevels != null) {
71+
return defLevels;
72+
}
73+
}
74+
return null;
5875
}
5976
}

ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -317,14 +317,15 @@ protected static void writeData(ParquetWriter<Group> writer, boolean isDictionar
317317
g.addGroup("nsf").append("c", intVal).append("d", intVal);
318318
g.append("e", doubleVal);
319319

320-
Group some_null_g = group.addGroup("struct_field_some_null");
321-
if (i % 2 != 0) {
322-
some_null_g.append("f", intVal);
323-
}
324-
if (i % 3 != 0) {
325-
some_null_g.append("g", doubleVal);
320+
if (i % 2 != 0 || i % 3 != 0) {
321+
Group structFieldWithNulls = group.addGroup("struct_field_some_null");
322+
if (i % 2 != 0) {
323+
structFieldWithNulls.append("f", intVal);
324+
}
325+
if (i % 3 != 0) {
326+
structFieldWithNulls.append("g", doubleVal);
327+
}
326328
}
327-
328329
Group mapGroup = group.addGroup("map_field");
329330
if (i % 13 != 1) {
330331
mapGroup.addGroup("map").append("key", binary).append("value", "abc");
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
-- SORT_QUERY_RESULTS
2+
SET hive.vectorized.execution.enabled=true;
3+
set hive.vectorized.execution.reduce.enabled=true;
4+
SET hive.fetch.task.conversion=none;
5+
6+
CREATE TABLE test_parquet_struct_nulls (
7+
id INT,
8+
st_prim STRUCT<x:INT, y:INT>
9+
) STORED AS PARQUET;
10+
11+
INSERT INTO test_parquet_struct_nulls VALUES
12+
(1, named_struct('x', CAST(NULL AS INT), 'y', CAST(NULL AS INT))),
13+
(2, if(1=0, named_struct('x', 1, 'y', 1), null)),
14+
(3, named_struct('x', 3, 'y', CAST(NULL AS INT))),
15+
(4, named_struct('x', 4, 'y', 4));
16+
17+
-- Test A: Full table scan to check JSON representation
18+
SELECT * FROM test_parquet_struct_nulls;
19+
20+
-- Test B: Verify IS NULL evaluates correctly
21+
SELECT id FROM test_parquet_struct_nulls WHERE st_prim IS NULL;
22+
23+
-- Test C: Verify IS NOT NULL evaluates correctly
24+
SELECT id FROM test_parquet_struct_nulls WHERE st_prim IS NOT NULL;
25+
26+
-- Test D: Verify field-level null evaluation inside a valid struct
27+
SELECT id FROM test_parquet_struct_nulls WHERE st_prim IS NOT NULL AND st_prim.x IS NULL;
28+
29+
-- Validate without vectorization
30+
SET hive.vectorized.execution.enabled=false;
31+
SELECT * FROM test_parquet_struct_nulls;
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
PREHOOK: query: CREATE TABLE test_parquet_struct_nulls (
2+
id INT,
3+
st_prim STRUCT<x:INT, y:INT>
4+
) STORED AS PARQUET
5+
PREHOOK: type: CREATETABLE
6+
PREHOOK: Output: database:default
7+
PREHOOK: Output: default@test_parquet_struct_nulls
8+
POSTHOOK: query: CREATE TABLE test_parquet_struct_nulls (
9+
id INT,
10+
st_prim STRUCT<x:INT, y:INT>
11+
) STORED AS PARQUET
12+
POSTHOOK: type: CREATETABLE
13+
POSTHOOK: Output: database:default
14+
POSTHOOK: Output: default@test_parquet_struct_nulls
15+
PREHOOK: query: INSERT INTO test_parquet_struct_nulls VALUES
16+
(1, named_struct('x', CAST(NULL AS INT), 'y', CAST(NULL AS INT))),
17+
(2, if(1=0, named_struct('x', 1, 'y', 1), null)),
18+
(3, named_struct('x', 3, 'y', CAST(NULL AS INT))),
19+
(4, named_struct('x', 4, 'y', 4))
20+
PREHOOK: type: QUERY
21+
PREHOOK: Input: _dummy_database@_dummy_table
22+
PREHOOK: Output: default@test_parquet_struct_nulls
23+
POSTHOOK: query: INSERT INTO test_parquet_struct_nulls VALUES
24+
(1, named_struct('x', CAST(NULL AS INT), 'y', CAST(NULL AS INT))),
25+
(2, if(1=0, named_struct('x', 1, 'y', 1), null)),
26+
(3, named_struct('x', 3, 'y', CAST(NULL AS INT))),
27+
(4, named_struct('x', 4, 'y', 4))
28+
POSTHOOK: type: QUERY
29+
POSTHOOK: Input: _dummy_database@_dummy_table
30+
POSTHOOK: Output: default@test_parquet_struct_nulls
31+
POSTHOOK: Lineage: test_parquet_struct_nulls.id SCRIPT []
32+
POSTHOOK: Lineage: test_parquet_struct_nulls.st_prim SCRIPT []
33+
PREHOOK: query: SELECT * FROM test_parquet_struct_nulls
34+
PREHOOK: type: QUERY
35+
PREHOOK: Input: default@test_parquet_struct_nulls
36+
#### A masked pattern was here ####
37+
POSTHOOK: query: SELECT * FROM test_parquet_struct_nulls
38+
POSTHOOK: type: QUERY
39+
POSTHOOK: Input: default@test_parquet_struct_nulls
40+
#### A masked pattern was here ####
41+
1 {"x":null,"y":null}
42+
2 NULL
43+
3 {"x":3,"y":null}
44+
4 {"x":4,"y":4}
45+
PREHOOK: query: SELECT id FROM test_parquet_struct_nulls WHERE st_prim IS NULL
46+
PREHOOK: type: QUERY
47+
PREHOOK: Input: default@test_parquet_struct_nulls
48+
#### A masked pattern was here ####
49+
POSTHOOK: query: SELECT id FROM test_parquet_struct_nulls WHERE st_prim IS NULL
50+
POSTHOOK: type: QUERY
51+
POSTHOOK: Input: default@test_parquet_struct_nulls
52+
#### A masked pattern was here ####
53+
2
54+
PREHOOK: query: SELECT id FROM test_parquet_struct_nulls WHERE st_prim IS NOT NULL
55+
PREHOOK: type: QUERY
56+
PREHOOK: Input: default@test_parquet_struct_nulls
57+
#### A masked pattern was here ####
58+
POSTHOOK: query: SELECT id FROM test_parquet_struct_nulls WHERE st_prim IS NOT NULL
59+
POSTHOOK: type: QUERY
60+
POSTHOOK: Input: default@test_parquet_struct_nulls
61+
#### A masked pattern was here ####
62+
1
63+
3
64+
4
65+
PREHOOK: query: SELECT id FROM test_parquet_struct_nulls WHERE st_prim IS NOT NULL AND st_prim.x IS NULL
66+
PREHOOK: type: QUERY
67+
PREHOOK: Input: default@test_parquet_struct_nulls
68+
#### A masked pattern was here ####
69+
POSTHOOK: query: SELECT id FROM test_parquet_struct_nulls WHERE st_prim IS NOT NULL AND st_prim.x IS NULL
70+
POSTHOOK: type: QUERY
71+
POSTHOOK: Input: default@test_parquet_struct_nulls
72+
#### A masked pattern was here ####
73+
1
74+
PREHOOK: query: SELECT * FROM test_parquet_struct_nulls
75+
PREHOOK: type: QUERY
76+
PREHOOK: Input: default@test_parquet_struct_nulls
77+
#### A masked pattern was here ####
78+
POSTHOOK: query: SELECT * FROM test_parquet_struct_nulls
79+
POSTHOOK: type: QUERY
80+
POSTHOOK: Input: default@test_parquet_struct_nulls
81+
#### A masked pattern was here ####
82+
1 {"x":null,"y":null}
83+
2 NULL
84+
3 {"x":3,"y":null}
85+
4 {"x":4,"y":4}

0 commit comments

Comments
 (0)