diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java index f23bfdfe0aea..0091887caa48 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/udf/GenericUDFIcebergBucket.java @@ -19,12 +19,18 @@ package org.apache.iceberg.mr.hive.udf; import java.nio.ByteBuffer; +import java.util.List; +import java.util.Optional; import java.util.function.Function; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ColStatistics; +import org.apache.hadoop.hive.ql.stats.estimator.StatEstimator; +import org.apache.hadoop.hive.ql.stats.estimator.StatEstimatorProvider; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -52,7 +58,7 @@ value = "_FUNC_(value, bucketCount) - " + "Returns the bucket value calculated by Iceberg bucket transform function ", extended = "Example:\n > SELECT _FUNC_('A bucket full of ice!', 5);\n 4") -public class GenericUDFIcebergBucket extends GenericUDF { +public class GenericUDFIcebergBucket extends GenericUDF implements StatEstimatorProvider { private final IntWritable result = new IntWritable(); private int numBuckets = -1; private transient PrimitiveObjectInspector argumentOI; @@ -209,4 +215,34 @@ public Object evaluate(DeferredObject[] arguments) throws HiveException { public String getDisplayString(String[] children) { return getStandardDisplayString("iceberg_bucket", children); } + + @Override + public StatEstimator getStatEstimator() { + return new BucketStatEstimator(numBuckets); + } + + static class BucketStatEstimator implements StatEstimator { + private final int numBuckets; + + BucketStatEstimator(int numBuckets) { + this.numBuckets = numBuckets; + } + + @Override + public Optional estimate(List argStats) { + if (argStats.isEmpty() || numBuckets <= 0) { + return Optional.empty(); + } + ColStatistics inputStats = argStats.getFirst(); + + ColStatistics result = new ColStatistics(); + result.setCountDistint(Math.min(inputStats.getCountDistint(), numBuckets)); + result.setNumNulls(inputStats.getNumNulls()); + result.setAvgColLen(JavaDataModel.get().primitive1()); + result.setRange(0, numBuckets - 1); + result.setIsEstimated(true); + + return Optional.of(result); + } + } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java new file mode 100644 index 000000000000..584d67be5004 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.udf; + +import java.util.Arrays; +import java.util.Optional; +import org.apache.hadoop.hive.ql.plan.ColStatistics; +import org.apache.hadoop.hive.ql.stats.estimator.StatEstimator; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the BucketStatEstimator in GenericUDFIcebergBucket. + * Verifies that the StatEstimator correctly narrows NDV based on bucket count. + */ +public class TestGenericUDFIcebergBucketStatEstimator { + + @Test + public void testNdvNarrowedByBucketCount() { + // source NDV (100) > numBuckets (8) -> output NDV should be 8 + Optional result = estimateBucket(100, 8); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(8, result.get().getCountDistint()); + } + + @Test + public void testNdvBelowBucketCount() { + // source NDV (3) < numBuckets (8) -> output NDV should be 3 + Optional result = estimateBucket(3, 8); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(3, result.get().getCountDistint()); + } + + @Test + public void testNdvEqualsBucketCount() { + // source NDV (8) == numBuckets (8) -> output NDV should be 8 + Optional result = estimateBucket(8, 8); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(8, result.get().getCountDistint()); + } + + @Test + public void testZeroBucketsReturnsEmpty() { + Optional result = estimateBucket(100, 0); + Assert.assertFalse(result.isPresent()); + } + + private static Optional estimateBucket(long sourceNdv, int numBuckets) { + ColStatistics sourceStats = new ColStatistics("col", "int"); + sourceStats.setCountDistint(sourceNdv); + + StatEstimator estimator = new GenericUDFIcebergBucket.BucketStatEstimator(numBuckets); + return estimator.estimate(Arrays.asList(sourceStats)); + } +} diff --git a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q index 54e46a2a2609..f32529f13716 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q @@ -25,17 +25,19 @@ insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, ' insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12), (110, NULL, NULL); create external table tbl_target_identity (a int) partitioned by (ccy string) stored by iceberg stored as orc; +-- threshold = 0 (default, cost-based): NDV(b) = 5 > MAX_WRITERS (~3) -> sort (ClusteredWriter) explain insert overwrite table tbl_target_identity select a, b from tbl_src; insert overwrite table tbl_target_identity select a, b from tbl_src; select * from tbl_target_identity order by a, ccy; ---bucketed case - should invoke GenericUDFIcebergBucket to calculate buckets before sorting -create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc; +--bucketed case - should invoke GenericUDFIcebergBucket to estimate bucket NDV for sort decision +create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (3, ccy)) stored by iceberg stored as orc; +-- threshold = 0 (default, cost-based): bucket NDV = min(NDV(b) = 5, 3) = 3 <= MAX_WRITERS (~3) -> no sort (FanoutWriter) explain insert into table tbl_target_bucket select a, b from tbl_src; insert into table tbl_target_bucket select a, b from tbl_src; select * from tbl_target_bucket order by a, ccy; ---mixed case - 1 identity + 1 bucket cols +--mixed case - 1 identity + 1 bucket cols: NDV(b) * min(NDV(c) = 8, 3) = 5 * 3 = 15 > MAX_WRITERS (~3) -> sort (ClusteredWriter) create external table tbl_target_mixed (a int, ccy string, c bigint) partitioned by spec (ccy, bucket (3, c)) stored by iceberg stored as orc; explain insert into table tbl_target_mixed select * from tbl_src; insert into table tbl_target_mixed select * from tbl_src; @@ -43,11 +45,11 @@ select * from tbl_target_mixed order by a, ccy; select `partition` from default.tbl_target_mixed.partitions order by `partition`; select * from default.tbl_target_mixed.files; ---1 of 2 partition cols is folded with constant - should still sort +--b = 'EUR' folds ccy to constant: bucket NDV = min(NDV(c) = 8, 3) = 3 <= MAX_WRITERS (~3) -> no sort (FanoutWriter) explain insert into table tbl_target_mixed select * from tbl_src where b = 'EUR'; insert into table tbl_target_mixed select * from tbl_src where b = 'EUR'; ---all partitions cols folded - should not sort as it's not needed +--all partition cols folded (b = 'USD', c = 100) -> no sort (FanoutWriter) explain insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100; insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100; @@ -124,7 +126,7 @@ explain insert into tbl_month_timestamp values (88669, '2018-05-27 11:12:00', 20 insert into tbl_month_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018); select * from tbl_month_timestamp order by id, date_time_timestamp; ---day case - should invoke GenericUDFIcebergMonth to convert the date/timestamp value to day and use for clustering and sorting +--day case - should invoke GenericUDFIcebergDay to convert the date/timestamp value to day and use for clustering and sorting create external table tbl_day_date (id string, date_time_date date, year_partition int) partitioned by spec (year_partition, day(date_time_date)) stored by iceberg stored as parquet @@ -143,7 +145,7 @@ explain insert into tbl_day_timestamp values (88669, '2018-05-27 11:12:00', 2018 insert into tbl_day_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018); select * from tbl_day_timestamp order by id, date_time_timestamp; ---hour case - should invoke GenericUDFIcebergMonth to convert the date/timestamp value to day and use for clustering and sorting +--hour case - should invoke GenericUDFIcebergHour to convert the timestamp value to hour and use for clustering and sorting create external table tbl_hour_timestamp (id string, date_time_timestamp timestamp, year_partition int) partitioned by spec (year_partition, hour(date_time_timestamp)) stored by iceberg stored as parquet @@ -151,4 +153,31 @@ tblproperties ('parquet.compression'='snappy','format-version'='2'); explain insert into tbl_hour_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018); insert into tbl_hour_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018); -select * from tbl_hour_timestamp order by id, date_time_timestamp; \ No newline at end of file +select * from tbl_hour_timestamp order by id, date_time_timestamp; + +-- threshold = -1: never sort -> FanoutWriter +set hive.optimize.sort.dynamic.partition.threshold=-1; +explain insert into tbl_target_identity select a, b from tbl_src; +explain insert into tbl_target_bucket select a, b from tbl_src; + +-- threshold = 1: always sort -> ClusteredWriter +set hive.optimize.sort.dynamic.partition.threshold=1; +explain insert into tbl_target_identity select a, b from tbl_src; +explain insert into tbl_target_bucket select a, b from tbl_src; + +-- threshold = 2: bucket NDV = min(NDV(b) = 5, 3) = 3 > 2 -> sort (ClusteredWriter) +set hive.optimize.sort.dynamic.partition.threshold=2; +explain insert into tbl_target_identity select a, b from tbl_src; +explain insert into tbl_target_bucket select a, b from tbl_src; + +-- threshold = 100: bucket NDV = min(NDV(b) = 5, 3) = 3 <= 100 -> no sort (FanoutWriter) +set hive.optimize.sort.dynamic.partition.threshold=100; +explain insert into tbl_target_identity select a, b from tbl_src; +explain insert into tbl_target_bucket select a, b from tbl_src; + +-- write.fanout.enabled=false: SerDe forces threshold=1 -> always ClusteredWriter +set hive.optimize.sort.dynamic.partition.threshold=0; +drop table if exists tbl_target_nofanout; +create external table tbl_target_nofanout (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc + tblproperties ('write.fanout.enabled'='false'); +explain insert into tbl_target_nofanout select a, b from tbl_src; diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition.q index 5b76277c9453..9792d09a3f0e 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition.q @@ -18,7 +18,9 @@ -- Mask iceberg version --! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/ +set hive.optimize.sort.dynamic.partition.threshold=1; set hive.explain.user=false; + create external table ice_parquet_int( strcol string, intcol integer diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition_transforms.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition_transforms.q index e5be1af2632c..4684079c0080 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition_transforms.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_into_partition_transforms.q @@ -18,7 +18,9 @@ -- Mask iceberg version --! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/ +set hive.optimize.sort.dynamic.partition.threshold=1; set hive.explain.user=false; + create external table ice_parquet_date_transform_year( bigintcol bigint, intcol integer, diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_overwrite_partition.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_overwrite_partition.q index 613d77964631..4fedbf00f183 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_overwrite_partition.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_insert_overwrite_partition.q @@ -18,7 +18,9 @@ -- Mask iceberg version --! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/ +set hive.optimize.sort.dynamic.partition.threshold=1; set hive.explain.user=false; + create external table ice_parquet_int( strcol string, intcol integer diff --git a/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_partitioned_orc.q b/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_partitioned_orc.q index f59761621383..379e336f608b 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_partitioned_orc.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/merge_iceberg_partitioned_orc.q @@ -1,4 +1,5 @@ -- SORT_QUERY_RESULTS +set hive.optimize.sort.dynamic.partition.threshold=1; set hive.explain.user=false; create external table target_ice(a int, b string, c int) partitioned by spec (bucket(16, a), truncate(3, b)) stored by iceberg stored as orc tblproperties ('format-version'='2'); diff --git a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_copy_on_write_partitioned.q b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_copy_on_write_partitioned.q index 4e9632944426..5bf5b3eb27f5 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_copy_on_write_partitioned.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/update_iceberg_copy_on_write_partitioned.q @@ -1,3 +1,4 @@ +set hive.optimize.sort.dynamic.partition.threshold=1; set hive.explain.user=false; drop table if exists tbl_ice; diff --git a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out index 6d8325af0158..0c58a118a7ce 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out @@ -207,11 +207,11 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### 90 PLN 100 CZK 110 NULL -PREHOOK: query: create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc +PREHOOK: query: create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (3, ccy)) stored by iceberg stored as orc PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@tbl_target_bucket -POSTHOOK: query: create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc +POSTHOOK: query: create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (3, ccy)) stored by iceberg stored as orc POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@tbl_target_bucket @@ -227,7 +227,6 @@ Plan optimized by CBO. Vertex dependency in root stage Reducer 2 <- Map 1 (SIMPLE_EDGE) -Reducer 3 <- Map 1 (SIMPLE_EDGE) Stage-3 Stats Work{} @@ -238,31 +237,25 @@ Stage-3 Dependency Collection{} Stage-1 Reducer 2 vectorized - File Output Operator [FS_18] - table:{"name:":"default.tbl_target_bucket"} - Select Operator [SEL_17] - Output:["_col0","_col1","iceberg_bucket(_col1, 2)"] - <-Map 1 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_13] - PartitionCols:iceberg_bucket(_col1, 2) - Select Operator [SEL_12] (rows=22 width=87) - Output:["_col0","_col1"] - TableScan [TS_0] (rows=22 width=87) - default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] - Reducer 3 vectorized - File Output Operator [FS_21] - Select Operator [SEL_20] (rows=5 width=574) + File Output Operator [FS_17] + Select Operator [SEL_16] (rows=3 width=574) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] - Group By Operator [GBY_19] (rows=5 width=336) + Group By Operator [GBY_15] (rows=3 width=336) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 <-Map 1 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_16] + File Output Operator [FS_11] + table:{"name:":"default.tbl_target_bucket"} + Select Operator [SEL_10] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + SHUFFLE [RS_14] PartitionCols:_col0 - Group By Operator [GBY_15] (rows=5 width=404) - Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) - Select Operator [SEL_14] (rows=22 width=87) + Group By Operator [GBY_13] (rows=4 width=404) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) + Select Operator [SEL_12] (rows=22 width=87) Output:["a","ccy"] - Please refer to the previous Select Operator [SEL_12] + Please refer to the previous Select Operator [SEL_10] PREHOOK: query: insert into table tbl_target_bucket select a, b from tbl_src PREHOOK: type: QUERY @@ -458,7 +451,6 @@ Plan optimized by CBO. Vertex dependency in root stage Reducer 2 <- Map 1 (SIMPLE_EDGE) -Reducer 3 <- Map 1 (SIMPLE_EDGE) Stage-3 Stats Work{} @@ -469,33 +461,27 @@ Stage-3 Dependency Collection{} Stage-1 Reducer 2 vectorized - File Output Operator [FS_21] - table:{"name:":"default.tbl_target_mixed"} - Select Operator [SEL_20] - Output:["_col0","_col1","_col2","_col1","iceberg_bucket(_col2, 3)"] - <-Map 1 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_16] - PartitionCols:_col1, iceberg_bucket(_col2, 3) - Select Operator [SEL_15] (rows=4 width=99) - Output:["_col0","_col1","_col2"] - Filter Operator [FIL_14] (rows=4 width=99) - predicate:(b = 'EUR') - TableScan [TS_0] (rows=22 width=94) - default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] - Reducer 3 vectorized - File Output Operator [FS_24] - Select Operator [SEL_23] (rows=4 width=1030) + File Output Operator [FS_20] + Select Operator [SEL_19] (rows=3 width=1030) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18"] - Group By Operator [GBY_22] (rows=4 width=591) + Group By Operator [GBY_18] (rows=3 width=591) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)","min(VALUE._col9)","max(VALUE._col10)","count(VALUE._col11)","compute_bit_vector_hll(VALUE._col12)"],keys:KEY._col0, KEY._col1 <-Map 1 [SIMPLE_EDGE] vectorized - SHUFFLE [RS_19] + File Output Operator [FS_14] + table:{"name:":"default.tbl_target_mixed"} + Select Operator [SEL_13] (rows=4 width=99) + Output:["_col0","_col1","_col2"] + Filter Operator [FIL_12] (rows=4 width=99) + predicate:(b = 'EUR') + TableScan [TS_0] (rows=22 width=94) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"] + SHUFFLE [RS_17] PartitionCols:_col0, _col1 - Group By Operator [GBY_18] (rows=4 width=659) + Group By Operator [GBY_16] (rows=3 width=659) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"],keys:ccy, iceberg_bucket(c, 3) - Select Operator [SEL_17] (rows=4 width=99) + Select Operator [SEL_15] (rows=4 width=99) Output:["a","ccy","c"] - Please refer to the previous Select Operator [SEL_15] + Please refer to the previous Select Operator [SEL_13] PREHOOK: query: insert into table tbl_target_mixed select * from tbl_src where b = 'EUR' PREHOOK: type: QUERY @@ -1653,3 +1639,432 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### 40568 2018-02-12 12:45:56 2018 40568 2018-07-03 06:07:56 2018 88669 2018-05-27 11:12:00 2018 +PREHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_identity +POSTHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_identity +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_identity"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_17] + Select Operator [SEL_16] (rows=6 width=754) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_15] (rows=6 width=419) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + File Output Operator [FS_11] + table:{"name:":"default.tbl_target_identity"} + Select Operator [SEL_10] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + SHUFFLE [RS_14] + PartitionCols:_col0 + Group By Operator [GBY_13] (rows=6 width=487) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:ccy + Select Operator [SEL_12] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_10] + +PREHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_bucket +POSTHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_bucket +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_bucket"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_17] + Select Operator [SEL_16] (rows=3 width=574) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_15] (rows=3 width=336) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + File Output Operator [FS_11] + table:{"name:":"default.tbl_target_bucket"} + Select Operator [SEL_10] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + SHUFFLE [RS_14] + PartitionCols:_col0 + Group By Operator [GBY_13] (rows=4 width=404) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) + Select Operator [SEL_12] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_10] + +PREHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_identity +POSTHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_identity +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_identity"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_18] + table:{"name:":"default.tbl_target_identity"} + Select Operator [SEL_17] + Output:["_col0","_col1","_col1"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_13] + PartitionCols:_col1 + Select Operator [SEL_12] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + Reducer 3 vectorized + File Output Operator [FS_21] + Select Operator [SEL_20] (rows=6 width=754) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_19] (rows=6 width=419) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_16] + PartitionCols:_col0 + Group By Operator [GBY_15] (rows=6 width=487) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:ccy + Select Operator [SEL_14] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_12] + +PREHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_bucket +POSTHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_bucket +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_bucket"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_18] + table:{"name:":"default.tbl_target_bucket"} + Select Operator [SEL_17] + Output:["_col0","_col1","iceberg_bucket(_col1, 3)"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_13] + PartitionCols:iceberg_bucket(_col1, 3) + Select Operator [SEL_12] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + Reducer 3 vectorized + File Output Operator [FS_21] + Select Operator [SEL_20] (rows=3 width=574) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_19] (rows=3 width=336) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_16] + PartitionCols:_col0 + Group By Operator [GBY_15] (rows=4 width=404) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) + Select Operator [SEL_14] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_12] + +PREHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_identity +POSTHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_identity +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_identity"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_18] + table:{"name:":"default.tbl_target_identity"} + Select Operator [SEL_17] + Output:["_col0","_col1","_col1"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_13] + PartitionCols:_col1 + Select Operator [SEL_12] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + Reducer 3 vectorized + File Output Operator [FS_21] + Select Operator [SEL_20] (rows=6 width=754) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_19] (rows=6 width=419) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_16] + PartitionCols:_col0 + Group By Operator [GBY_15] (rows=6 width=487) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:ccy + Select Operator [SEL_14] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_12] + +PREHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_bucket +POSTHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_bucket +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_bucket"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_18] + table:{"name:":"default.tbl_target_bucket"} + Select Operator [SEL_17] + Output:["_col0","_col1","iceberg_bucket(_col1, 3)"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_13] + PartitionCols:iceberg_bucket(_col1, 3) + Select Operator [SEL_12] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + Reducer 3 vectorized + File Output Operator [FS_21] + Select Operator [SEL_20] (rows=3 width=574) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_19] (rows=3 width=336) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_16] + PartitionCols:_col0 + Group By Operator [GBY_15] (rows=4 width=404) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) + Select Operator [SEL_14] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_12] + +PREHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_identity +POSTHOOK: query: explain insert into tbl_target_identity select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_identity +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_identity"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_17] + Select Operator [SEL_16] (rows=6 width=754) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_15] (rows=6 width=419) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + File Output Operator [FS_11] + table:{"name:":"default.tbl_target_identity"} + Select Operator [SEL_10] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + SHUFFLE [RS_14] + PartitionCols:_col0 + Group By Operator [GBY_13] (rows=6 width=487) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:ccy + Select Operator [SEL_12] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_10] + +PREHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_bucket +POSTHOOK: query: explain insert into tbl_target_bucket select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_bucket +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_bucket"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_17] + Select Operator [SEL_16] (rows=3 width=574) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_15] (rows=3 width=336) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + File Output Operator [FS_11] + table:{"name:":"default.tbl_target_bucket"} + Select Operator [SEL_10] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + SHUFFLE [RS_14] + PartitionCols:_col0 + Group By Operator [GBY_13] (rows=4 width=404) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 3) + Select Operator [SEL_12] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_10] + +PREHOOK: query: drop table if exists tbl_target_nofanout +PREHOOK: type: DROPTABLE +PREHOOK: Output: database:default +POSTHOOK: query: drop table if exists tbl_target_nofanout +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: database:default +PREHOOK: query: create external table tbl_target_nofanout (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc + tblproperties ('write.fanout.enabled'='false') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_target_nofanout +POSTHOOK: query: create external table tbl_target_nofanout (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc + tblproperties ('write.fanout.enabled'='false') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_target_nofanout +PREHOOK: query: explain insert into tbl_target_nofanout select a, b from tbl_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_src +PREHOOK: Output: default@tbl_target_nofanout +POSTHOOK: query: explain insert into tbl_target_nofanout select a, b from tbl_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_src +POSTHOOK: Output: default@tbl_target_nofanout +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE) +Reducer 3 <- Map 1 (SIMPLE_EDGE) + +Stage-3 + Stats Work{} + Stage-0 + Move Operator + table:{"name:":"default.tbl_target_nofanout"} + Stage-2 + Dependency Collection{} + Stage-1 + Reducer 2 vectorized + File Output Operator [FS_18] + table:{"name:":"default.tbl_target_nofanout"} + Select Operator [SEL_17] + Output:["_col0","_col1","iceberg_bucket(_col1, 2)"] + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_13] + PartitionCols:iceberg_bucket(_col1, 2) + Select Operator [SEL_12] (rows=22 width=87) + Output:["_col0","_col1"] + TableScan [TS_0] (rows=22 width=87) + default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b"] + Reducer 3 vectorized + File Output Operator [FS_21] + Select Operator [SEL_20] (rows=2 width=574) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"] + Group By Operator [GBY_19] (rows=2 width=336) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)"],keys:KEY._col0 + <-Map 1 [SIMPLE_EDGE] vectorized + SHUFFLE [RS_16] + PartitionCols:_col0 + Group By Operator [GBY_15] (rows=3 width=404) + Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)"],keys:iceberg_bucket(ccy, 2) + Select Operator [SEL_14] (rows=22 width=87) + Output:["a","ccy"] + Please refer to the previous Select Operator [SEL_12] + diff --git a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out index c9938d7a2236..1f837c4b586e 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out @@ -53,8 +53,8 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola', POSTHOOK: type: QUERY POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@tbl_ice -Warning: Shuffle Join MERGEJOIN[66][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product -Warning: Shuffle Join MERGEJOIN[68][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product +Warning: Shuffle Join MERGEJOIN[64][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product +Warning: Shuffle Join MERGEJOIN[66][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800) PREHOOK: type: QUERY PREHOOK: Input: default@tbl_ice diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index a057f4137e37..f5431fa34934 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.orc.OrcConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,10 +211,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, LinkedList customSortOrder = new LinkedList<>(dpCtx.getCustomSortOrder()); LinkedList customNullOrder = new LinkedList<>(dpCtx.getCustomSortNullOrder()); - // If custom expressions (partition or sort) are present, there is an explicit requirement to do sorting - if (customPartitionExprs.isEmpty() && customSortExprs.isEmpty() && !shouldDo(partitionPositions, fsParent)) { + // If custom sort expressions are present, there is an explicit requirement to do sorting. + // Custom partition expressions are evaluated inside shouldDo based on column stats. + if (customSortExprs.isEmpty() && !shouldDo(partitionPositions, customPartitionExprs, fsParent, allRSCols)) { return null; } + // if RS is inserted by enforce bucketing or sorting, we need to remove it // since ReduceSinkDeDuplication will not merge them to single RS. // RS inserted by enforce bucketing/sorting will have bucketing column in @@ -228,6 +231,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } + // Mark that sorting will be applied with custom partition expressions, so the writer layer + // (e.g. Iceberg) knows the input is ordered and can use a clustered writer. + if (!customPartitionExprs.isEmpty()) { + dpCtx.setHasCustomPartitionOrSortExpression(true); + } + // unlink connection between FS and its parent fsParent = fsOp.getParentOperators().get(0); // store the index of the file sink operator to later insert the modified operator with RS at the same position @@ -853,22 +862,25 @@ private ArrayList getPositionsToExprNodes(List pos, // The way max number of writers allowed are computed based on // (executor/container memory) * (percentage of memory taken by orc) // and dividing that by max memory (stripe size) taken by a single writer. - private boolean shouldDo(List partitionPos, Operator fsParent) { + private boolean shouldDo(List partitionPos, + List, ExprNodeDesc>> customPartitionExprs, + Operator fsParent, + ArrayList allRSCols) { int threshold = HiveConf.getIntVar(this.parseCtx.getConf(), HiveConf.ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD); long MAX_WRITERS = -1; switch (threshold) { - case -1: - return false; - case 0: - break; - case 1: - return true; - default: - MAX_WRITERS = threshold; - break; + case -1: + return false; + case 0: + break; + case 1: + return true; + default: + MAX_WRITERS = threshold; + break; } Statistics tStats = fsParent.getStatistics(); @@ -880,34 +892,73 @@ private boolean shouldDo(List partitionPos, Operator sort + return true; } if (MAX_WRITERS < 0) { - double orcMemPool = this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(), - (Double) OrcConf.MEMORY_POOL.getDefaultValue()); - long orcStripSize = this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(), - (Long) OrcConf.STRIPE_SIZE.getDefaultValue()); - MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf()); - LOG.debug("Memory info during SDPO opt: {}", memoryInfo); - long executorMem = memoryInfo.getMaxExecutorMemory(); - MAX_WRITERS = (long) (executorMem * orcMemPool) / orcStripSize; + MAX_WRITERS = computeMaxWriters(); + } + return partCardinality > MAX_WRITERS; + } + + private long computeMaxWriters() { + double orcMemPool = this.parseCtx.getConf().getDouble(OrcConf.MEMORY_POOL.getHiveConfName(), + (Double) OrcConf.MEMORY_POOL.getDefaultValue()); + long orcStripSize = this.parseCtx.getConf().getLong(OrcConf.STRIPE_SIZE.getHiveConfName(), + (Long) OrcConf.STRIPE_SIZE.getDefaultValue()); + MemoryInfo memoryInfo = new MemoryInfo(this.parseCtx.getConf()); + LOG.debug("Memory info during SDPO opt: {}", memoryInfo); + long executorMem = memoryInfo.getMaxExecutorMemory(); + return (long) (executorMem * orcMemPool) / orcStripSize; + } + + /** + * Computes the partition cardinality based on column NDV statistics. + * @return positive value = estimated cardinality, 0 = no partition columns, -1 = stats unavailable + */ + private long computePartCardinality(List partitionPos, + List, ExprNodeDesc>> customPartitionExprs, + Statistics tStats, Operator fsParent, + ArrayList allRSCols) { + + long partCardinality = 1; + if (!partitionPos.isEmpty()) { + for (Integer idx : partitionPos) { + ColumnInfo ci = fsParent.getSchema().getSignature().get(idx); + ColStatistics partStats = tStats.getColumnStatisticsFromColName(ci.getInternalName()); + if (partStats == null) { + return -1; + } + partCardinality *= partStats.getCountDistint(); + } + return partCardinality; } - if (partCardinality <= MAX_WRITERS) { - return false; + + if (!customPartitionExprs.isEmpty()) { + for (Function, ExprNodeDesc> expr : customPartitionExprs) { + ExprNodeDesc resolved = expr.apply(allRSCols); + // Use StatsUtils to get accurate output stats, which leverages StatEstimator + // implementations on UDFs (e.g. iceberg_bucket reports min(inputNDV, numBuckets)) + ColStatistics exprStats = StatsUtils.getColStatisticsFromExpression( + this.parseCtx.getConf(), tStats, resolved); + if (exprStats == null) { + return -1; + } + partCardinality *= exprStats.getCountDistint(); + } + return partCardinality; } - return true; + + return 0; } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index 61c519aa62f6..2c8bb6399bf6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -252,7 +252,6 @@ public List, ExprNodeDesc>> getCustomPartitionExpres public void addCustomPartitionExpressions( List, ExprNodeDesc>> customPartitionExpressions) { if (!org.apache.commons.collections.CollectionUtils.isEmpty(customPartitionExpressions)) { - this.hasCustomPartitionOrSortExpr = true; this.customPartitionExpressions.addAll(customPartitionExpressions); } } @@ -290,4 +289,8 @@ public void setCustomSortNullOrder(List customSortNullOrder) { public boolean hasCustomPartitionOrSortExpression() { return hasCustomPartitionOrSortExpr; } + + public void setHasCustomPartitionOrSortExpression(boolean hasCustomPartitionOrSortExpr) { + this.hasCustomPartitionOrSortExpr = hasCustomPartitionOrSortExpr; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/estimator/StatEstimatorProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/estimator/StatEstimatorProvider.java index 96865d194c6e..7c391332df6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/stats/estimator/StatEstimatorProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/estimator/StatEstimatorProvider.java @@ -25,5 +25,5 @@ public interface StatEstimatorProvider { /** * Returns the {@link StatEstimator} for the given UDF instance. */ - public StatEstimator getStatEstimator(); + StatEstimator getStatEstimator(); }