Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@
package org.apache.iceberg.mr.hive.udf;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ColStatistics;
import org.apache.hadoop.hive.ql.stats.estimator.StatEstimator;
import org.apache.hadoop.hive.ql.stats.estimator.StatEstimatorProvider;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
Expand Down Expand Up @@ -52,7 +58,7 @@
value = "_FUNC_(value, bucketCount) - " +
"Returns the bucket value calculated by Iceberg bucket transform function ",
extended = "Example:\n > SELECT _FUNC_('A bucket full of ice!', 5);\n 4")
public class GenericUDFIcebergBucket extends GenericUDF {
public class GenericUDFIcebergBucket extends GenericUDF implements StatEstimatorProvider {
private final IntWritable result = new IntWritable();
private int numBuckets = -1;
private transient PrimitiveObjectInspector argumentOI;
Expand Down Expand Up @@ -209,4 +215,34 @@ public Object evaluate(DeferredObject[] arguments) throws HiveException {
public String getDisplayString(String[] children) {
return getStandardDisplayString("iceberg_bucket", children);
}

@Override
public StatEstimator getStatEstimator() {
return new BucketStatEstimator(numBuckets);
}

static class BucketStatEstimator implements StatEstimator {
private final int numBuckets;

BucketStatEstimator(int numBuckets) {
this.numBuckets = numBuckets;
}

@Override
public Optional<ColStatistics> estimate(List<ColStatistics> argStats) {
if (argStats.isEmpty() || numBuckets <= 0) {
return Optional.empty();
}
ColStatistics inputStats = argStats.getFirst();

ColStatistics result = new ColStatistics();
result.setCountDistint(Math.min(inputStats.getCountDistint(), numBuckets));
result.setNumNulls(inputStats.getNumNulls());
result.setAvgColLen(JavaDataModel.get().primitive1());
result.setRange(0, numBuckets - 1);
result.setIsEstimated(true);

return Optional.of(result);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.mr.hive.udf;

import java.util.Arrays;
import java.util.Optional;
import org.apache.hadoop.hive.ql.plan.ColStatistics;
import org.apache.hadoop.hive.ql.stats.estimator.StatEstimator;
import org.junit.Assert;
import org.junit.Test;

/**
* Tests for the BucketStatEstimator in GenericUDFIcebergBucket.
* Verifies that the StatEstimator correctly narrows NDV based on bucket count.
*/
public class TestGenericUDFIcebergBucketStatEstimator {

@Test
public void testNdvNarrowedByBucketCount() {

Check warning on line 35 in iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/udf/TestGenericUDFIcebergBucketStatEstimator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace these 3 tests with a single Parameterized one.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ1kBKSI0jMYmy4UasF7&open=AZ1kBKSI0jMYmy4UasF7&pullRequest=6389
// source NDV (100) > numBuckets (8) -> output NDV should be 8
Optional<ColStatistics> result = estimateBucket(100, 8);
Assert.assertTrue(result.isPresent());
Assert.assertEquals(8, result.get().getCountDistint());
}

@Test
public void testNdvBelowBucketCount() {
// source NDV (3) < numBuckets (8) -> output NDV should be 3
Optional<ColStatistics> result = estimateBucket(3, 8);
Assert.assertTrue(result.isPresent());
Assert.assertEquals(3, result.get().getCountDistint());
}

@Test
public void testNdvEqualsBucketCount() {
// source NDV (8) == numBuckets (8) -> output NDV should be 8
Optional<ColStatistics> result = estimateBucket(8, 8);
Assert.assertTrue(result.isPresent());
Assert.assertEquals(8, result.get().getCountDistint());
}

@Test
public void testZeroBucketsReturnsEmpty() {
Optional<ColStatistics> result = estimateBucket(100, 0);
Assert.assertFalse(result.isPresent());
}

private static Optional<ColStatistics> estimateBucket(long sourceNdv, int numBuckets) {
ColStatistics sourceStats = new ColStatistics("col", "int");
sourceStats.setCountDistint(sourceNdv);

StatEstimator estimator = new GenericUDFIcebergBucket.BucketStatEstimator(numBuckets);
return estimator.estimate(Arrays.asList(sourceStats));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,31 @@ insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, '
insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12), (110, NULL, NULL);

create external table tbl_target_identity (a int) partitioned by (ccy string) stored by iceberg stored as orc;
-- threshold = 0 (default, cost-based): NDV(b) = 5 > MAX_WRITERS (~3) -> sort (ClusteredWriter)
explain insert overwrite table tbl_target_identity select a, b from tbl_src;
insert overwrite table tbl_target_identity select a, b from tbl_src;
select * from tbl_target_identity order by a, ccy;

--bucketed case - should invoke GenericUDFIcebergBucket to calculate buckets before sorting
create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc;
--bucketed case - should invoke GenericUDFIcebergBucket to estimate bucket NDV for sort decision
create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (3, ccy)) stored by iceberg stored as orc;
-- threshold = 0 (default, cost-based): bucket NDV = min(NDV(b) = 5, 3) = 3 <= MAX_WRITERS (~3) -> no sort (FanoutWriter)
explain insert into table tbl_target_bucket select a, b from tbl_src;
insert into table tbl_target_bucket select a, b from tbl_src;
select * from tbl_target_bucket order by a, ccy;

--mixed case - 1 identity + 1 bucket cols
--mixed case - 1 identity + 1 bucket cols: NDV(b) * min(NDV(c) = 8, 3) = 5 * 3 = 15 > MAX_WRITERS (~3) -> sort (ClusteredWriter)
create external table tbl_target_mixed (a int, ccy string, c bigint) partitioned by spec (ccy, bucket (3, c)) stored by iceberg stored as orc;
explain insert into table tbl_target_mixed select * from tbl_src;
insert into table tbl_target_mixed select * from tbl_src;
select * from tbl_target_mixed order by a, ccy;
select `partition` from default.tbl_target_mixed.partitions order by `partition`;
select * from default.tbl_target_mixed.files;

--1 of 2 partition cols is folded with constant - should still sort
--b = 'EUR' folds ccy to constant: bucket NDV = min(NDV(c) = 8, 3) = 3 <= MAX_WRITERS (~3) -> no sort (FanoutWriter)
explain insert into table tbl_target_mixed select * from tbl_src where b = 'EUR';
insert into table tbl_target_mixed select * from tbl_src where b = 'EUR';

--all partitions cols folded - should not sort as it's not needed
--all partition cols folded (b = 'USD', c = 100) -> no sort (FanoutWriter)
explain insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100;
insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100;

Expand Down Expand Up @@ -124,7 +126,7 @@ explain insert into tbl_month_timestamp values (88669, '2018-05-27 11:12:00', 20
insert into tbl_month_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018);
select * from tbl_month_timestamp order by id, date_time_timestamp;

--day case - should invoke GenericUDFIcebergMonth to convert the date/timestamp value to day and use for clustering and sorting
--day case - should invoke GenericUDFIcebergDay to convert the date/timestamp value to day and use for clustering and sorting
create external table tbl_day_date (id string, date_time_date date, year_partition int)
partitioned by spec (year_partition, day(date_time_date))
stored by iceberg stored as parquet
Expand All @@ -143,12 +145,39 @@ explain insert into tbl_day_timestamp values (88669, '2018-05-27 11:12:00', 2018
insert into tbl_day_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018);
select * from tbl_day_timestamp order by id, date_time_timestamp;

--hour case - should invoke GenericUDFIcebergMonth to convert the date/timestamp value to day and use for clustering and sorting
--hour case - should invoke GenericUDFIcebergHour to convert the timestamp value to hour and use for clustering and sorting
create external table tbl_hour_timestamp (id string, date_time_timestamp timestamp, year_partition int)
partitioned by spec (year_partition, hour(date_time_timestamp))
stored by iceberg stored as parquet
tblproperties ('parquet.compression'='snappy','format-version'='2');

explain insert into tbl_hour_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018);
insert into tbl_hour_timestamp values (88669, '2018-05-27 11:12:00', 2018), (40568, '2018-02-12 12:45:56', 2018), (40568, '2018-07-03 06:07:56', 2018);
select * from tbl_hour_timestamp order by id, date_time_timestamp;
select * from tbl_hour_timestamp order by id, date_time_timestamp;

-- threshold = -1: never sort -> FanoutWriter
set hive.optimize.sort.dynamic.partition.threshold=-1;
explain insert into tbl_target_identity select a, b from tbl_src;
explain insert into tbl_target_bucket select a, b from tbl_src;

-- threshold = 1: always sort -> ClusteredWriter
set hive.optimize.sort.dynamic.partition.threshold=1;
explain insert into tbl_target_identity select a, b from tbl_src;
explain insert into tbl_target_bucket select a, b from tbl_src;

-- threshold = 2: bucket NDV = min(NDV(b) = 5, 3) = 3 > 2 -> sort (ClusteredWriter)
set hive.optimize.sort.dynamic.partition.threshold=2;
explain insert into tbl_target_identity select a, b from tbl_src;
explain insert into tbl_target_bucket select a, b from tbl_src;

-- threshold = 100: bucket NDV = min(NDV(b) = 5, 3) = 3 <= 100 -> no sort (FanoutWriter)
set hive.optimize.sort.dynamic.partition.threshold=100;
explain insert into tbl_target_identity select a, b from tbl_src;
explain insert into tbl_target_bucket select a, b from tbl_src;

-- write.fanout.enabled=false: SerDe forces threshold=1 -> always ClusteredWriter
set hive.optimize.sort.dynamic.partition.threshold=0;
drop table if exists tbl_target_nofanout;
create external table tbl_target_nofanout (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc
tblproperties ('write.fanout.enabled'='false');
explain insert into tbl_target_nofanout select a, b from tbl_src;
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
-- Mask iceberg version
--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/

set hive.optimize.sort.dynamic.partition.threshold=1;
set hive.explain.user=false;

create external table ice_parquet_int(
strcol string,
intcol integer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
-- Mask iceberg version
--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/

set hive.optimize.sort.dynamic.partition.threshold=1;
set hive.explain.user=false;

create external table ice_parquet_date_transform_year(
bigintcol bigint,
intcol integer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
-- Mask iceberg version
--! qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/

set hive.optimize.sort.dynamic.partition.threshold=1;
set hive.explain.user=false;

create external table ice_parquet_int(
strcol string,
intcol integer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-- SORT_QUERY_RESULTS
set hive.optimize.sort.dynamic.partition.threshold=1;
set hive.explain.user=false;

create external table target_ice(a int, b string, c int) partitioned by spec (bucket(16, a), truncate(3, b)) stored by iceberg stored as orc tblproperties ('format-version'='2');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
set hive.optimize.sort.dynamic.partition.threshold=1;
set hive.explain.user=false;

drop table if exists tbl_ice;
Expand Down
Loading
Loading