diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 1c48407bf682..65852f1a8553 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -94,6 +94,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.RowLineageUtils; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec; import org.apache.hadoop.hive.ql.parse.AlterTableSnapshotRefSpec; @@ -313,8 +314,9 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map map.put(SessionStateUtil.MISSING_COLUMNS, String.join(",", (HashSet) cols))); - SessionStateUtil.getResource(conf, SessionStateUtil.ROW_LINEAGE) - .ifPresent(v -> map.put(SessionStateUtil.ROW_LINEAGE, v.toString())); + if (RowLineageUtils.isRowLineageInsert(conf)) { + map.put(SessionStateUtil.ROW_LINEAGE, Boolean.toString(true)); + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java index 9c4a73cc7d0b..ab478176ebf4 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; +import org.apache.hadoop.hive.ql.metadata.RowLineageUtils; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.TransformSpec; import org.apache.hadoop.hive.ql.session.SessionState; @@ -69,84 +70,140 @@ public boolean run(CompactorContext context) throws IOException, HiveException, HiveConf conf = new HiveConf(context.getConf()); CompactionInfo ci = context.getCompactionInfo(); - String compactionQuery = buildCompactionQuery(context, compactTableName, conf); + org.apache.hadoop.hive.ql.metadata.Table hiveTable = + new org.apache.hadoop.hive.ql.metadata.Table(context.getTable()); + boolean rowLineageEnabled = RowLineageUtils.supportsRowLineage(hiveTable); + String compactionQuery = buildCompactionQuery(context, compactTableName, conf, rowLineageEnabled); SessionState sessionState = setupQueryCompactionSession(conf, ci, tblProperties); + + if (rowLineageEnabled) { + RowLineageUtils.enableRowLineage(sessionState); + LOG.debug("Row lineage flag set for compaction of table {}", compactTableName); + } + String compactionTarget = "table " + HiveUtils.unparseIdentifier(compactTableName) + (ci.partName != null ? ", partition " + HiveUtils.unparseIdentifier(ci.partName) : ""); try { - DriverUtils.runOnDriver(conf, sessionState, compactionQuery); + DriverUtils.runOnDriver(sessionState.getConf(), sessionState, compactionQuery); LOG.info("Completed compaction for {}", compactionTarget); return true; } catch (HiveException e) { LOG.error("Failed compacting {}", compactionTarget, e); throw e; } finally { + RowLineageUtils.disableRowLineage(sessionState); sessionState.setCompaction(false); } } - private String buildCompactionQuery(CompactorContext context, String compactTableName, HiveConf conf) + private String buildCompactionQuery(CompactorContext context, String compactTableName, HiveConf conf, + boolean rowLineageEnabled) throws HiveException { CompactionInfo ci = context.getCompactionInfo(); + String rowLineageColumns = RowLineageUtils.getRowLineageSelectColumns(rowLineageEnabled); org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(), context.getTable().getTableName()); Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable()); String orderBy = ci.orderByClause == null ? "" : ci.orderByClause; - String fileSizePredicate = null; - String compactionQuery; - - if (ci.type == CompactionType.MINOR) { - long fileSizeInBytesThreshold = CompactionEvaluator.getFragmentSizeBytes(table.getParameters()); - fileSizePredicate = String.format("%1$s in (select file_path from %2$s.files where file_size_in_bytes < %3$d)", - VirtualColumn.FILE_PATH.getName(), compactTableName, fileSizeInBytesThreshold); - conf.setLong(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD, fileSizeInBytesThreshold); - // IOW query containing a join with Iceberg .files metadata table fails with exception that Iceberg AVRO format - // doesn't support vectorization, hence disabling it in this case. - conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false); + String fileSizePredicate = buildMinorFileSizePredicate(ci, compactTableName, conf, table); + + String compactionQuery = (ci.partName == null) ? + buildFullTableCompactionQuery(compactTableName, conf, icebergTable, + rowLineageColumns, fileSizePredicate, orderBy) : + buildPartitionCompactionQuery(ci, compactTableName, conf, icebergTable, + rowLineageColumns, fileSizePredicate, orderBy); + + LOG.info("Compaction query: {}", compactionQuery); + return compactionQuery; + } + + private static String buildMinorFileSizePredicate( + CompactionInfo ci, String compactTableName, HiveConf conf, org.apache.hadoop.hive.ql.metadata.Table table) { + if (ci.type != CompactionType.MINOR) { + return null; } - if (ci.partName == null) { - if (!icebergTable.spec().isPartitioned()) { - HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name()); - compactionQuery = String.format("insert overwrite table %s select * from % 1) { - // Compacting partitions of old partition specs on a partitioned table with partition evolution - HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name()); - // A single filter on a virtual column causes errors during compilation, - // added another filter on file_path as a workaround. - compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " + - "where %2$s != %3$d and %4$s is not null %5$s %6$s", - compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(), - VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" : "and " + fileSizePredicate, orderBy); - } else { - // Partitioned table without partition evolution with partition spec as null in the compaction request - this - // code branch is not supposed to be reachable - throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION); - } - } else { - HiveConf.setBoolVar(conf, ConfVars.HIVE_CONVERT_JOIN, false); - conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false); + long fileSizeInBytesThreshold = CompactionEvaluator.getFragmentSizeBytes(table.getParameters()); + conf.setLong(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD, fileSizeInBytesThreshold); + // IOW query containing a join with Iceberg .files metadata table fails with exception that Iceberg AVRO format + // doesn't support vectorization, hence disabling it in this case. + conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false); + + return String.format("%1$s in (select file_path from %2$s.files where file_size_in_bytes < %3$d)", + VirtualColumn.FILE_PATH.getName(), compactTableName, fileSizeInBytesThreshold); + } + + private String buildFullTableCompactionQuery( + String compactTableName, + HiveConf conf, + Table icebergTable, + String rowLineageColumns, + String fileSizePredicate, + String orderBy) throws HiveException { + String selectColumns = buildSelectColumnList(icebergTable, conf); + + if (!icebergTable.spec().isPartitioned()) { + HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name()); + return String.format("insert overwrite table %1$s select %2$s%3$s from %1$s %4$s %5$s", + compactTableName, selectColumns, rowLineageColumns, + fileSizePredicate == null ? "" : "where " + fileSizePredicate, orderBy); + } + + if (icebergTable.specs().size() > 1) { + // Compacting partitions of old partition specs on a partitioned table with partition evolution HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name()); - conf.set(IcebergCompactionService.PARTITION_PATH, new Path(ci.partName).toString()); - - PartitionSpec spec; - String partitionPredicate; - try { - spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName); - partitionPredicate = buildPartitionPredicate(ci, spec); - } catch (MetaException e) { - throw new HiveException(e); - } + // A single filter on a virtual column causes errors during compilation, + // added another filter on file_path as a workaround. + return String.format("insert overwrite table %1$s select %2$s%3$s from %1$s " + + "where %4$s != %5$d and %6$s is not null %7$s %8$s", + compactTableName, selectColumns, rowLineageColumns, + VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(), + VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" : "and " + fileSizePredicate, orderBy); + } + + // Partitioned table without partition evolution with partition spec as null in the compaction request - this + // code branch is not supposed to be reachable + throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION); + } - compactionQuery = String.format("INSERT OVERWRITE TABLE %1$s SELECT * FROM %1$s WHERE %2$s IN " + - "(SELECT FILE_PATH FROM %1$s.FILES WHERE %3$s AND SPEC_ID = %4$d) %5$s %6$s", - compactTableName, VirtualColumn.FILE_PATH.getName(), partitionPredicate, spec.specId(), - fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy); + private String buildPartitionCompactionQuery( + CompactionInfo ci, + String compactTableName, + HiveConf conf, + Table icebergTable, + String rowLineageColumns, + String fileSizePredicate, + String orderBy) throws HiveException { + HiveConf.setBoolVar(conf, ConfVars.HIVE_CONVERT_JOIN, false); + conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false); + HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name()); + conf.set(IcebergCompactionService.PARTITION_PATH, new Path(ci.partName).toString()); + + PartitionSpec spec; + String partitionPredicate; + try { + spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName); + partitionPredicate = buildPartitionPredicate(ci, spec); + } catch (MetaException e) { + throw new HiveException(e); } - return compactionQuery; + + return String.format("INSERT OVERWRITE TABLE %1$s SELECT *%2$s FROM %1$s WHERE %3$s IN " + + "(SELECT FILE_PATH FROM %1$s.FILES WHERE %4$s AND SPEC_ID = %5$d) %6$s %7$s", + compactTableName, rowLineageColumns, VirtualColumn.FILE_PATH.getName(), partitionPredicate, spec.specId(), + fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy); + } + + /** + * Builds a comma-separated SELECT list from the Iceberg table schema. + */ + private static String buildSelectColumnList(Table icebergTable, HiveConf conf) { + return icebergTable.schema().columns().stream() + .map(Types.NestedField::name) + .map(col -> HiveUtils.unparseIdentifier(col, conf)) + .collect(Collectors.joining(", ")); } private String buildPartitionPredicate(CompactionInfo ci, PartitionSpec spec) throws MetaException { diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_row_lineage_compactions.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_row_lineage_compactions.q new file mode 100644 index 000000000000..8f802d75b39a --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_row_lineage_compactions.q @@ -0,0 +1,141 @@ +-- SORT_QUERY_RESULTS + +--! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +--! qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +--! qt:replace:/(MINOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +--! qt:replace:/(MINOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +-- Mask compaction id as they will be allocated in parallel threads +--! qt:replace:/^(\d+)(\t.*\tmanual\ticeberg\t)/#Masked#$2/ + +set hive.llap.io.enabled=true; +set hive.vectorized.execution.enabled=true; + +create database if not exists ice_comp_all with dbproperties('hive.compactor.worker.pool'='iceberg'); +use ice_comp_all; + +-- Partitioned table with minor and major compaction +create table part_tbl( + id int, + data string +) +partitioned by (dept_id int) +stored by iceberg stored as parquet +tblproperties ( + 'format-version'='3', + 'hive.compactor.worker.pool'='iceberg', + -- Use target.size only to make Parquet data files (~996 bytes) count as fragments. + -- Default fragment ratio is 8, so fragment_size = target_size / 8. + -- Pick target_size > 996 * 8 (7968) so files are treated as fragment files and minor compaction is eligible. + 'compactor.threshold.target.size'='8000', + 'compactor.threshold.min.input.files'='2', + 'compactor.threshold.delete.file.ratio'='0.0' +); + +insert into part_tbl values (1,'p1', 10); +insert into part_tbl values (2,'p2', 10); +insert into part_tbl values (3,'p3', 20); +insert into part_tbl values (4,'p4', 20); + +SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID; + +alter table part_tbl compact 'minor' and wait pool 'iceberg'; + +SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID; + +show compactions; + +-- For MAJOR eligibility, avoid treating files as "fragments" by lowering target.size +alter table part_tbl set tblproperties ('compactor.threshold.target.size'='1500'); + +merge into part_tbl t +using (select 1 as id, 'p1_upd' as data, 10 as dept_id) s +on t.dept_id = s.dept_id and t.id = s.id +when matched then update set data = s.data; + +SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID; + +alter table part_tbl compact 'major' and wait pool 'iceberg'; + +SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID; + +show compactions; + +-- Partition evolution +alter table part_tbl set tblproperties ('compactor.threshold.target.size'='8000'); + +alter table part_tbl set partition spec(dept_id, id); + +insert into part_tbl values (5,'p5', 10); +insert into part_tbl values (6,'p6', 20); + +SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID; + +alter table part_tbl compact 'minor' and wait pool 'iceberg'; + +SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID; + +show compactions; + +-- Unpartitioned table with minor and major compaction +create table unpart_tbl( + id int, + data string +) +stored by iceberg stored as parquet +tblproperties ( + 'format-version'='3', + 'hive.compactor.worker.pool'='iceberg', + -- Use target.size only to make Parquet data files (~996 bytes) count as fragments (default ratio 8). + 'compactor.threshold.target.size'='8000', + 'compactor.threshold.min.input.files'='2', + 'compactor.threshold.delete.file.ratio'='0.0' +); + +insert into unpart_tbl values (1,'a'); +insert into unpart_tbl values (2,'b'); +insert into unpart_tbl values (3,'c'); +insert into unpart_tbl values (4,'d'); + +SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID; + +alter table unpart_tbl compact 'minor' and wait pool 'iceberg'; + +SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID; + +show compactions; + +-- For MAJOR eligibility, avoid treating files as "fragments" by lowering target.size, then create deletes via MERGE. +alter table unpart_tbl set tblproperties ('compactor.threshold.target.size'='1500'); + +merge into unpart_tbl t +using (select 1 as id, 'a_upd' as data) s +on t.id = s.id +when matched then update set data = s.data; + +SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID; + +alter table unpart_tbl compact 'major' and wait pool 'iceberg'; + +SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID; + +show compactions; diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_row_lineage_compactions.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_row_lineage_compactions.q.out new file mode 100644 index 000000000000..b4d8dcd592d5 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_row_lineage_compactions.q.out @@ -0,0 +1,496 @@ +PREHOOK: query: create database if not exists ice_comp_all with dbproperties('hive.compactor.worker.pool'='iceberg') +PREHOOK: type: CREATEDATABASE +PREHOOK: Output: database:ice_comp_all +POSTHOOK: query: create database if not exists ice_comp_all with dbproperties('hive.compactor.worker.pool'='iceberg') +POSTHOOK: type: CREATEDATABASE +POSTHOOK: Output: database:ice_comp_all +PREHOOK: query: use ice_comp_all +PREHOOK: type: SWITCHDATABASE +PREHOOK: Input: database:ice_comp_all +POSTHOOK: query: use ice_comp_all +POSTHOOK: type: SWITCHDATABASE +POSTHOOK: Input: database:ice_comp_all +PREHOOK: query: create table part_tbl( + id int, + data string +) +partitioned by (dept_id int) +stored by iceberg stored as parquet +tblproperties ( + 'format-version'='3', + 'hive.compactor.worker.pool'='iceberg', + -- Use target.size only to make Parquet data files (~996 bytes) count as fragments. + -- Default fragment ratio is 8, so fragment_size = target_size / 8. + -- Pick target_size > 996 * 8 (7968) so files are treated as fragment files and minor compaction is eligible. + 'compactor.threshold.target.size'='8000', + 'compactor.threshold.min.input.files'='2', + 'compactor.threshold.delete.file.ratio'='0.0' +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:ice_comp_all +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: create table part_tbl( + id int, + data string +) +partitioned by (dept_id int) +stored by iceberg stored as parquet +tblproperties ( + 'format-version'='3', + 'hive.compactor.worker.pool'='iceberg', + -- Use target.size only to make Parquet data files (~996 bytes) count as fragments. + -- Default fragment ratio is 8, so fragment_size = target_size / 8. + -- Pick target_size > 996 * 8 (7968) so files are treated as fragment files and minor compaction is eligible. + 'compactor.threshold.target.size'='8000', + 'compactor.threshold.min.input.files'='2', + 'compactor.threshold.delete.file.ratio'='0.0' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:ice_comp_all +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: insert into part_tbl values (1,'p1', 10) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: insert into part_tbl values (1,'p1', 10) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: insert into part_tbl values (2,'p2', 10) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: insert into part_tbl values (2,'p2', 10) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: insert into part_tbl values (3,'p3', 20) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: insert into part_tbl values (3,'p3', 20) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: insert into part_tbl values (4,'p4', 20) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: insert into part_tbl values (4,'p4', 20) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +PREHOOK: type: QUERY +PREHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +POSTHOOK: type: QUERY +POSTHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +1 p1 10 0 1 +2 p2 10 1 2 +3 p3 20 2 3 +4 p4 20 3 4 +PREHOOK: query: alter table part_tbl compact 'minor' and wait pool 'iceberg' +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: ice_comp_all@part_tbl +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: alter table part_tbl compact 'minor' and wait pool 'iceberg' +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: ice_comp_all@part_tbl +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +PREHOOK: type: QUERY +PREHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +POSTHOOK: type: QUERY +POSTHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +1 p1 10 0 1 +2 p2 10 1 2 +3 p3 20 2 3 +4 p4 20 3 4 +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# ice_comp_all part_tbl dept_id=10 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +PREHOOK: query: alter table part_tbl set tblproperties ('compactor.threshold.target.size'='1500') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: ice_comp_all@part_tbl +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: alter table part_tbl set tblproperties ('compactor.threshold.target.size'='1500') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: ice_comp_all@part_tbl +POSTHOOK: Output: ice_comp_all@part_tbl +Warning: Shuffle Join MERGEJOIN[22][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product +PREHOOK: query: merge into part_tbl t +using (select 1 as id, 'p1_upd' as data, 10 as dept_id) s +on t.dept_id = s.dept_id and t.id = s.id +when matched then update set data = s.data +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Input: ice_comp_all@part_tbl +PREHOOK: Output: ice_comp_all@merge_tmp_table +PREHOOK: Output: ice_comp_all@part_tbl +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: merge into part_tbl t +using (select 1 as id, 'p1_upd' as data, 10 as dept_id) s +on t.dept_id = s.dept_id and t.id = s.id +when matched then update set data = s.data +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Input: ice_comp_all@part_tbl +POSTHOOK: Output: ice_comp_all@merge_tmp_table +POSTHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(part_tbl)part_tbl.null, ] +PREHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +PREHOOK: type: QUERY +PREHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +POSTHOOK: type: QUERY +POSTHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +1 p1_upd 10 0 7 +2 p2 10 1 2 +3 p3 20 2 3 +4 p4 20 3 4 +PREHOOK: query: alter table part_tbl compact 'major' and wait pool 'iceberg' +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: ice_comp_all@part_tbl +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: alter table part_tbl compact 'major' and wait pool 'iceberg' +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: ice_comp_all@part_tbl +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +PREHOOK: type: QUERY +PREHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +POSTHOOK: type: QUERY +POSTHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +1 p1_upd 10 0 7 +2 p2 10 1 2 +3 p3 20 2 3 +4 p4 20 3 4 +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# ice_comp_all part_tbl dept_id=10 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=10 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20 MAJOR refused #Masked# manual iceberg 0 0 0 --- +PREHOOK: query: alter table part_tbl set tblproperties ('compactor.threshold.target.size'='8000') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: ice_comp_all@part_tbl +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: alter table part_tbl set tblproperties ('compactor.threshold.target.size'='8000') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: ice_comp_all@part_tbl +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: alter table part_tbl set partition spec(dept_id, id) +PREHOOK: type: ALTERTABLE_SETPARTSPEC +PREHOOK: Input: ice_comp_all@part_tbl +POSTHOOK: query: alter table part_tbl set partition spec(dept_id, id) +POSTHOOK: type: ALTERTABLE_SETPARTSPEC +POSTHOOK: Input: ice_comp_all@part_tbl +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: insert into part_tbl values (5,'p5', 10) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: insert into part_tbl values (5,'p5', 10) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: insert into part_tbl values (6,'p6', 20) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: insert into part_tbl values (6,'p6', 20) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +PREHOOK: type: QUERY +PREHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +POSTHOOK: type: QUERY +POSTHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +1 p1_upd 10 0 7 +2 p2 10 1 2 +3 p3 20 2 3 +4 p4 20 3 4 +5 p5 10 11 9 +6 p6 20 16 10 +PREHOOK: query: alter table part_tbl compact 'minor' and wait pool 'iceberg' +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: ice_comp_all@part_tbl +PREHOOK: Output: ice_comp_all@part_tbl +POSTHOOK: query: alter table part_tbl compact 'minor' and wait pool 'iceberg' +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: ice_comp_all@part_tbl +POSTHOOK: Output: ice_comp_all@part_tbl +PREHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +PREHOOK: type: QUERY +PREHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM part_tbl +ORDER BY ROW__LINEAGE__ID +POSTHOOK: type: QUERY +POSTHOOK: Input: ice_comp_all@part_tbl +#### A masked pattern was here #### +1 p1_upd 10 0 7 +2 p2 10 1 2 +3 p3 20 2 3 +4 p4 20 3 4 +5 p5 10 11 9 +6 p6 20 16 10 +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# ice_comp_all part_tbl dept_id=10 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=10 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=10/id=5 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20/id=6 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl --- MINOR refused #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20 MAJOR refused #Masked# manual iceberg 0 0 0 --- +PREHOOK: query: create table unpart_tbl( + id int, + data string +) +stored by iceberg stored as parquet +tblproperties ( + 'format-version'='3', + 'hive.compactor.worker.pool'='iceberg', + -- Use target.size only to make Parquet data files (~996 bytes) count as fragments (default ratio 8). + 'compactor.threshold.target.size'='8000', + 'compactor.threshold.min.input.files'='2', + 'compactor.threshold.delete.file.ratio'='0.0' +) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:ice_comp_all +PREHOOK: Output: ice_comp_all@unpart_tbl +POSTHOOK: query: create table unpart_tbl( + id int, + data string +) +stored by iceberg stored as parquet +tblproperties ( + 'format-version'='3', + 'hive.compactor.worker.pool'='iceberg', + -- Use target.size only to make Parquet data files (~996 bytes) count as fragments (default ratio 8). + 'compactor.threshold.target.size'='8000', + 'compactor.threshold.min.input.files'='2', + 'compactor.threshold.delete.file.ratio'='0.0' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:ice_comp_all +POSTHOOK: Output: ice_comp_all@unpart_tbl +PREHOOK: query: insert into unpart_tbl values (1,'a') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: ice_comp_all@unpart_tbl +POSTHOOK: query: insert into unpart_tbl values (1,'a') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: ice_comp_all@unpart_tbl +PREHOOK: query: insert into unpart_tbl values (2,'b') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: ice_comp_all@unpart_tbl +POSTHOOK: query: insert into unpart_tbl values (2,'b') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: ice_comp_all@unpart_tbl +PREHOOK: query: insert into unpart_tbl values (3,'c') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: ice_comp_all@unpart_tbl +POSTHOOK: query: insert into unpart_tbl values (3,'c') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: ice_comp_all@unpart_tbl +PREHOOK: query: insert into unpart_tbl values (4,'d') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: ice_comp_all@unpart_tbl +POSTHOOK: query: insert into unpart_tbl values (4,'d') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: ice_comp_all@unpart_tbl +PREHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID +PREHOOK: type: QUERY +PREHOOK: Input: ice_comp_all@unpart_tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID +POSTHOOK: type: QUERY +POSTHOOK: Input: ice_comp_all@unpart_tbl +#### A masked pattern was here #### +1 a 0 1 +2 b 1 2 +3 c 2 3 +4 d 3 4 +PREHOOK: query: alter table unpart_tbl compact 'minor' and wait pool 'iceberg' +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: ice_comp_all@unpart_tbl +PREHOOK: Output: ice_comp_all@unpart_tbl +POSTHOOK: query: alter table unpart_tbl compact 'minor' and wait pool 'iceberg' +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: ice_comp_all@unpart_tbl +POSTHOOK: Output: ice_comp_all@unpart_tbl +PREHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID +PREHOOK: type: QUERY +PREHOOK: Input: ice_comp_all@unpart_tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID +POSTHOOK: type: QUERY +POSTHOOK: Input: ice_comp_all@unpart_tbl +#### A masked pattern was here #### +1 a 0 1 +2 b 1 2 +3 c 2 3 +4 d 3 4 +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# ice_comp_all part_tbl dept_id=10 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=10 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=10/id=5 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20/id=6 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all unpart_tbl --- MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl --- MINOR refused #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20 MAJOR refused #Masked# manual iceberg 0 0 0 --- +PREHOOK: query: alter table unpart_tbl set tblproperties ('compactor.threshold.target.size'='1500') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: ice_comp_all@unpart_tbl +PREHOOK: Output: ice_comp_all@unpart_tbl +POSTHOOK: query: alter table unpart_tbl set tblproperties ('compactor.threshold.target.size'='1500') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: ice_comp_all@unpart_tbl +POSTHOOK: Output: ice_comp_all@unpart_tbl +Warning: Shuffle Join MERGEJOIN[22][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product +PREHOOK: query: merge into unpart_tbl t +using (select 1 as id, 'a_upd' as data) s +on t.id = s.id +when matched then update set data = s.data +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Input: ice_comp_all@unpart_tbl +PREHOOK: Output: ice_comp_all@merge_tmp_table +PREHOOK: Output: ice_comp_all@unpart_tbl +PREHOOK: Output: ice_comp_all@unpart_tbl +POSTHOOK: query: merge into unpart_tbl t +using (select 1 as id, 'a_upd' as data) s +on t.id = s.id +when matched then update set data = s.data +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Input: ice_comp_all@unpart_tbl +POSTHOOK: Output: ice_comp_all@merge_tmp_table +POSTHOOK: Output: ice_comp_all@unpart_tbl +POSTHOOK: Output: ice_comp_all@unpart_tbl +POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(unpart_tbl)unpart_tbl.null, ] +PREHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID +PREHOOK: type: QUERY +PREHOOK: Input: ice_comp_all@unpart_tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID +POSTHOOK: type: QUERY +POSTHOOK: Input: ice_comp_all@unpart_tbl +#### A masked pattern was here #### +1 a_upd 0 6 +2 b 1 2 +3 c 2 3 +4 d 3 4 +PREHOOK: query: alter table unpart_tbl compact 'major' and wait pool 'iceberg' +PREHOOK: type: ALTERTABLE_COMPACT +PREHOOK: Input: ice_comp_all@unpart_tbl +PREHOOK: Output: ice_comp_all@unpart_tbl +POSTHOOK: query: alter table unpart_tbl compact 'major' and wait pool 'iceberg' +POSTHOOK: type: ALTERTABLE_COMPACT +POSTHOOK: Input: ice_comp_all@unpart_tbl +POSTHOOK: Output: ice_comp_all@unpart_tbl +PREHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID +PREHOOK: type: QUERY +PREHOOK: Input: ice_comp_all@unpart_tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT *, ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER +FROM unpart_tbl +ORDER BY ROW__LINEAGE__ID +POSTHOOK: type: QUERY +POSTHOOK: Input: ice_comp_all@unpart_tbl +#### A masked pattern was here #### +1 a_upd 0 6 +2 b 1 2 +3 c 2 3 +4 d 3 4 +PREHOOK: query: show compactions +PREHOOK: type: SHOW COMPACTIONS +POSTHOOK: query: show compactions +POSTHOOK: type: SHOW COMPACTIONS +CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId +#Masked# ice_comp_all part_tbl dept_id=10 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=10 MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=10/id=5 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20/id=6 MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all unpart_tbl --- MINOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all unpart_tbl --- MAJOR succeeded #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl --- MINOR refused #Masked# manual iceberg 0 0 0 --- +#Masked# ice_comp_all part_tbl dept_id=20 MAJOR refused #Masked# manual iceberg 0 0 0 --- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 92e80e6a822d..64a761acfddb 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -458,7 +458,8 @@ iceberg.llap.query.compactor.files=\ iceberg_major_compaction_unpartitioned_w_filter.q,\ iceberg_minor_compaction_bucket.q,\ iceberg_minor_compaction_partition_evolution.q,\ - iceberg_minor_compaction_unpartitioned.q + iceberg_minor_compaction_unpartitioned.q,\ + iceberg_row_lineage_compactions.q iceberg.llap.query.rest.hms.files=\ iceberg_rest_catalog_hms.q diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/RowLineageUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/RowLineageUtils.java index 7db91af977b5..b1beea331515 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/RowLineageUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/RowLineageUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.parse.rewrite.MergeStatement; import org.apache.hadoop.hive.ql.parse.rewrite.sql.MultiInsertSqlGenerator; import org.apache.hadoop.hive.ql.session.SessionStateUtil; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -63,7 +64,7 @@ public static void addRowLineageColumnsForUpdate(Table table, MultiInsertSqlGene public static void setRowLineageColumns(boolean isRowLineageEnabled, MultiInsertSqlGenerator sqlGenerator, Configuration conf) { if (isRowLineageEnabled) { - SessionStateUtil.addResource(conf, SessionStateUtil.ROW_LINEAGE, true); + setRowLineage(conf, true); // copy ROW_ID sqlGenerator.append(","); sqlGenerator.append(HiveUtils.unparseIdentifier(VirtualColumn.ROW_LINEAGE_ID.getName(), conf)); @@ -81,17 +82,54 @@ public static boolean supportsRowLineage(Table table) { return table.getStorageHandler().supportsRowLineage(table.getParameters()); } + /** + * Returns the row lineage virtual columns with the leading comma for string concatenation. + * Example: {@code ", ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER"}. + */ + public static String getRowLineageSelectColumns(boolean rowLineageEnabled) { + return rowLineageEnabled + ? ", " + VirtualColumn.ROW_LINEAGE_ID.getName() + ", " + VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName() + : ""; + } + + /** + * Enables or disables row lineage for the current query/session context. + */ + public static void setRowLineage(Configuration conf, boolean enabled) { + SessionStateUtil.addResource(conf, SessionStateUtil.ROW_LINEAGE, enabled); + } + + private static void setRowLineageConfFlag(Configuration conf, boolean enabled) { + if (enabled) { + conf.setBoolean(SessionStateUtil.ROW_LINEAGE, true); + } else { + conf.unset(SessionStateUtil.ROW_LINEAGE); + } + } + + /** + * Enable the row lineage session flag for the current statement execution. + * Returns {@code true} if the flag was enabled + */ + public static void enableRowLineage(SessionState sessionState) { + setRowLineageConfFlag(sessionState.getConf(), true); + } + + public static void disableRowLineage(SessionState sessionState) { + setRowLineageConfFlag(sessionState.getConf(), false); + } + public static boolean shouldAddRowLineageColumnsForMerge(MergeStatement mergeStatement, Configuration conf) { boolean shouldAddRowLineageColumns = supportsRowLineage(mergeStatement.getTargetTable()) && mergeStatement.hasWhenMatchedUpdateClause(); - SessionStateUtil.addResource(conf, SessionStateUtil.ROW_LINEAGE, shouldAddRowLineageColumns); + setRowLineage(conf, shouldAddRowLineageColumns); return shouldAddRowLineageColumns; } public static void addSourceColumnsForRowLineage(boolean isRowLineageSupported, MultiInsertSqlGenerator sqlGenerator, String prefix, Configuration conf) { if (isRowLineageSupported) { - SessionStateUtil.addResource(conf, SessionStateUtil.ROW_LINEAGE, true); + setRowLineage(conf, true); sqlGenerator.append(", "); sqlGenerator.append(HiveUtils.unparseIdentifier(prefix + VirtualColumn.ROW_LINEAGE_ID.getName(), conf)); sqlGenerator.append(", "); @@ -134,7 +172,8 @@ public static void initializeRowLineageColumns(VectorizedRowBatchCtx vrbCtx, Vec } public static boolean isRowLineageInsert(Configuration conf) { - return SessionStateUtil.getResource(conf, SessionStateUtil.ROW_LINEAGE).map(Boolean.class::cast).orElse(false); + return SessionStateUtil.getResource(conf, SessionStateUtil.ROW_LINEAGE).map(Boolean.class::cast).orElse(false) || + conf.getBoolean(SessionStateUtil.ROW_LINEAGE, false); } public static MessageType getRequestedSchemaWithRowLineageColumns(VectorizedRowBatchCtx rbCtx, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java index 12f5b77871df..d8b568936edf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java @@ -462,22 +462,29 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, sb.append(", "); } Dependency exprDep = ExprProcFactory.getExprDependency(lctx, inpOp, expr, outputMap); - if (exprDep != null && !exprDep.getBaseCols().isEmpty()) { - newType = LineageCtx.getNewDependencyType(exprDep.getType(), newType); - bciSet.addAll(exprDep.getBaseCols()); - if (exprDep.getType() == LineageInfo.DependencyType.SIMPLE) { - BaseColumnInfo col = exprDep.getBaseCols().iterator().next(); - Table t = col.getTabAlias().getTable(); - if (t != null) { - sb.append(Warehouse.getQualifiedName(t)).append("."); + BaseColumnInfo col = null; + + if (exprDep != null) { + Set baseCols = exprDep.getBaseCols(); + if (baseCols != null && !baseCols.isEmpty()) { + newType = LineageCtx.getNewDependencyType(exprDep.getType(), newType); + bciSet.addAll(baseCols); + if (exprDep.getType() == LineageInfo.DependencyType.SIMPLE) { + col = baseCols.iterator().next(); } - sb.append(col.getColumn().getName()); } } - if (exprDep == null || exprDep.getBaseCols().isEmpty() - || exprDep.getType() != LineageInfo.DependencyType.SIMPLE) { - sb.append(exprDep != null && exprDep.getExpr() != null ? exprDep.getExpr() : - ExprProcFactory.getExprString(rs, expr, lctx, inpOp, null)); + + if (col != null && col.getColumn() != null) { + Table t = (col.getTabAlias() != null) ? col.getTabAlias().getTable() : null; + if (t != null) { + sb.append(Warehouse.getQualifiedName(t)).append("."); + } + sb.append(col.getColumn().getName()); + } else { + sb.append(exprDep != null && exprDep.getExpr() != null + ? exprDep.getExpr() + : ExprProcFactory.getExprString(rs, expr, lctx, inpOp, null)); } } String expr = sb.toString();