diff --git a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala index 731a19330b73..793cb5b9070b 100644 --- a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala +++ b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala @@ -19,7 +19,7 @@ package org.apache.gluten.connector.write import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.execution.IcebergWriteJniWrapper import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators -import org.apache.gluten.proto.{IcebergNestedField, IcebergPartitionField, IcebergPartitionSpec} +import org.apache.gluten.proto.{IcebergNestedField, IcebergPartitionField, IcebergPartitionSpec, IcebergSortingColumnList} import org.apache.gluten.runtime.Runtimes import org.apache.gluten.utils.ArrowAbiUtil @@ -43,6 +43,7 @@ case class IcebergDataWriteFactory( partitionSpec: PartitionSpec, sortOrder: SortOrder, field: IcebergNestedField, + sortingColumnList: IcebergSortingColumnList, queryId: String) extends ColumnarBatchDataWriterFactory with ColumnarStreamingDataWriterFactory { @@ -113,7 +114,9 @@ case class IcebergDataWriteFactory( taskId, operationId, partitionSpec.toByteArray, - field.toByteArray) + field.toByteArray, + sortingColumnList.toByteArray + ) cSchema.close() (writer, jniWrapper) } diff --git a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala index 955659359472..da042e7b5078 100644 --- a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala +++ b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala @@ -18,9 +18,12 @@ package org.apache.gluten.execution import org.apache.gluten.IcebergNestedFieldVisitor import org.apache.gluten.connector.write.{ColumnarBatchDataWriterFactory, ColumnarStreamingDataWriterFactory, IcebergDataWriteFactory} +import org.apache.gluten.proto.{IcebergSortingColumn, IcebergSortingColumnList} import org.apache.spark.sql.types.StructType +import org.apache.iceberg.NullOrder.NULLS_FIRST +import org.apache.iceberg.SortDirection.ASC import org.apache.iceberg.spark.source.IcebergWriteUtil import org.apache.iceberg.types.TypeUtil @@ -39,14 +42,29 @@ abstract class AbstractIcebergWriteExec extends IcebergWriteExec { val filteredSchema = StructType( schema.fields.filter(field => writeFieldNames.contains(field.name)) ) + val sortOrder = IcebergWriteUtil.getSortOrder(write) + val sortingColumnList = IcebergSortingColumnList.newBuilder() + sortOrder.fields().forEach { + field => + val sortingColumn = IcebergSortingColumn + .newBuilder() + .setColumnName(IcebergWriteUtil.getWriteSchema(write).findColumnName(field.sourceId())) + .setAscending(field.direction() == ASC) + .setNullsFirst(field.nullOrder() == NULLS_FIRST) + .build() + + sortingColumnList.addFields(sortingColumn) + } + IcebergDataWriteFactory( filteredSchema, getFileFormat(IcebergWriteUtil.getFileFormat(write)), IcebergWriteUtil.getDirectory(write), getCodec, getPartitionSpec, - IcebergWriteUtil.getSortOrder(write), + sortOrder, nestedField, + sortingColumnList.build(), IcebergWriteUtil.getQueryId(write) ) } diff --git a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java index 60bab597862f..868e16b7fd9a 100644 --- a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java +++ b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/IcebergWriteJniWrapper.java @@ -35,7 +35,8 @@ public native long init(long cSchema, int format, long taskId, String operationId, byte[] partitionSpec, - byte[] field); + byte[] field, + byte[] sortedBy); public native void write(long writerHandle, long batch); diff --git a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala index e2df119bc0d5..df2c08a42188 100644 --- a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala +++ b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala @@ -21,8 +21,6 @@ import org.apache.gluten.tags.EnhancedFeaturesTest import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.execution.CommandResultExec -import org.apache.spark.sql.execution.GlutenImplicits._ -import org.apache.spark.sql.execution.datasources.v2.AppendDataExec import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.gluten.TestUtils @@ -386,28 +384,20 @@ class VeloxIcebergSuite extends IcebergSuite { } } - test("iceberg native write fallback when validation fails - sort order") { + test("iceberg native write - sort order") { withTable("iceberg_sorted_tbl") { spark.sql("CREATE TABLE iceberg_sorted_tbl (a INT, b STRING) USING iceberg") spark.sql("ALTER TABLE iceberg_sorted_tbl WRITE ORDERED BY a") val df = spark.sql("INSERT INTO iceberg_sorted_tbl VALUES (1, 'hello'), (2, 'world')") - // Should fallback to vanilla Spark's AppendDataExec. val commandPlan = df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan - assert(commandPlan.isInstanceOf[AppendDataExec]) - assert(!commandPlan.isInstanceOf[VeloxIcebergAppendDataExec]) + assert(commandPlan.isInstanceOf[VeloxIcebergAppendDataExec]) checkAnswer( spark.sql("SELECT * FROM iceberg_sorted_tbl ORDER BY a"), Seq(Row(1, "hello"), Row(2, "world"))) - - // Verify fallbackSummary reports the sort order fallback reason. - val summary = df.fallbackSummary() - assert( - summary.fallbackNodeToReason.exists( - _.values.exists(_.contains("Not support write table with sort order")))) } } diff --git a/backends-velox/src/main/resources/org/apache/gluten/proto/IcebergSortingColumnList.proto b/backends-velox/src/main/resources/org/apache/gluten/proto/IcebergSortingColumnList.proto new file mode 100644 index 000000000000..26a3a9e81f6e --- /dev/null +++ b/backends-velox/src/main/resources/org/apache/gluten/proto/IcebergSortingColumnList.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package gluten; + +option java_package = "org.apache.gluten.proto"; +option java_multiple_files = true; + +message IcebergSortingColumn { + string column_name = 1; + bool ascending = 2; + bool nulls_first = 3; +} + +message IcebergSortingColumnList { + repeated IcebergSortingColumn fields = 1; +} \ No newline at end of file diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 7c1276f49abf..527b5de6074c 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -17,6 +17,7 @@ #include "VeloxRuntime.h" +#include #include #include @@ -264,6 +265,7 @@ std::shared_ptr VeloxRuntime::createIcebergWriter( const std::string& operationId, std::shared_ptr spec, const gluten::IcebergNestedField& protoField, + const gluten::IcebergSortingColumnList& protoSortingColumnList, const std::unordered_map& sparkConfs) { auto veloxPool = memoryManager()->getLeafMemoryPool(); auto connectorPool = memoryManager()->getAggregateMemoryPool(); @@ -277,6 +279,7 @@ std::shared_ptr VeloxRuntime::createIcebergWriter( operationId, spec, protoField, + protoSortingColumnList, sparkConfs, veloxPool, connectorPool); diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 2cb75c5a124c..e19df6b82065 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -83,6 +83,7 @@ class VeloxRuntime final : public Runtime { const std::string& operationId, std::shared_ptr spec, const gluten::IcebergNestedField& protoField, + const gluten::IcebergSortingColumnList& protoSortingColumnList, const std::unordered_map& sparkConfs); #endif diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc b/cpp/velox/compute/iceberg/IcebergWriter.cc index 576524d49969..e5794c7f686d 100644 --- a/cpp/velox/compute/iceberg/IcebergWriter.cc +++ b/cpp/velox/compute/iceberg/IcebergWriter.cc @@ -17,6 +17,8 @@ #include "IcebergWriter.h" +#include + #include "IcebergPartitionSpec.pb.h" #include "compute/ProtobufUtils.h" #include "compute/iceberg/IcebergFormat.h" @@ -112,6 +114,19 @@ iceberg::IcebergNestedField convertToIcebergNestedField(const gluten::IcebergNes return result; } +std::vector convertToIcebergSortingColumn( + const gluten::IcebergSortingColumnList& protoColumnList) { + std::vector sortingColumns; + sortingColumns.reserve(protoColumnList.fields_size()); + + for (const auto& protoCol : protoColumnList.fields()) { + core::SortOrder sortOrder(protoCol.ascending(), protoCol.nulls_first()); + + sortingColumns.emplace_back(protoCol.column_name(), sortOrder); + } + return sortingColumns; +} + std::shared_ptr createIcebergInsertTableHandle( const RowTypePtr& outputRowType, const std::string& outputDirectoryPath, @@ -120,6 +135,7 @@ std::shared_ptr createIcebergInsertTableHandle( int32_t partitionId, int64_t taskId, const std::string& operationId, + std::vector sortedBy, std::shared_ptr spec, const iceberg::IcebergNestedField& nestedField, facebook::velox::memory::MemoryPool* pool) { @@ -157,7 +173,6 @@ std::shared_ptr createIcebergInsertTableHandle( std::shared_ptr locationHandle = std::make_shared( outputDirectoryPath, outputDirectoryPath, connector::hive::LocationHandle::TableType::kExisting); - const std::vector sortedBy; const std::unordered_map serdeParameters; return std::make_shared( columnHandles, @@ -184,6 +199,7 @@ IcebergWriter::IcebergWriter( const std::string& operationId, std::shared_ptr spec, const gluten::IcebergNestedField& field, + const gluten::IcebergSortingColumnList& sortingColumnList, const std::unordered_map& sparkConfs, std::shared_ptr memoryPool, std::shared_ptr connectorPool) @@ -224,6 +240,7 @@ IcebergWriter::IcebergWriter( partitionId_, taskId_, operationId_, + convertToIcebergSortingColumn(sortingColumnList), spec, field_, pool_.get()), @@ -272,7 +289,6 @@ parseIcebergPartitionSpec(const uint8_t* data, const int32_t length, RowTypePtr gluten::parseProtobuf(data, length, &protoSpec); std::vector fields; fields.reserve(protoSpec.fields_size()); - for (const auto& protoField : protoSpec.fields()) { // Convert protobuf enum to C++ enum iceberg::TransformType transform; diff --git a/cpp/velox/compute/iceberg/IcebergWriter.h b/cpp/velox/compute/iceberg/IcebergWriter.h index 2fa13dcd698c..a3773d4ef21c 100644 --- a/cpp/velox/compute/iceberg/IcebergWriter.h +++ b/cpp/velox/compute/iceberg/IcebergWriter.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "IcebergNestedField.pb.h" #include "memory/VeloxColumnarBatch.h" #include "utils/Metrics.h" @@ -48,6 +50,7 @@ class IcebergWriter { const std::string& operationId, std::shared_ptr spec, const gluten::IcebergNestedField& field, + const gluten::IcebergSortingColumnList& sortingColumnList, const std::unordered_map& sparkConfs, std::shared_ptr memoryPool, std::shared_ptr connectorPool); diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 2435d9f641b2..1d9096ea75fe 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -15,6 +15,7 @@ * limitations under the License. */ +#include #include #include @@ -859,7 +860,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_ jlong taskId, jstring operationId, jbyteArray partition, - jbyteArray fieldBytes) { + jbyteArray fieldBytes, + jbyteArray sortingColumnBytes) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto runtime = dynamic_cast(ctx); @@ -874,6 +876,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_ auto safeArrayField = gluten::getByteArrayElementsSafe(env, fieldBytes); gluten::IcebergNestedField protoField; gluten::parseProtobuf(safeArrayField.elems(), safeArrayField.length(), &protoField); + auto safeArraySortingList = gluten::getByteArrayElementsSafe(env, sortingColumnBytes); + gluten::IcebergSortingColumnList protoColumnList; + gluten::parseProtobuf(safeArraySortingList.elems(), safeArraySortingList.length(), &protoColumnList); return ctx->saveObject(runtime->createIcebergWriter( rowType, format, @@ -884,6 +889,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_ jStringToCString(env, operationId), spec, protoField, + protoColumnList, sparkConf)); JNI_METHOD_END(kInvalidObjectHandle) } diff --git a/cpp/velox/tests/iceberg/IcebergWriteTest.cc b/cpp/velox/tests/iceberg/IcebergWriteTest.cc index 740fa66f3f3d..75a7b9cb0979 100644 --- a/cpp/velox/tests/iceberg/IcebergWriteTest.cc +++ b/cpp/velox/tests/iceberg/IcebergWriteTest.cc @@ -53,6 +53,8 @@ TEST_F(VeloxIcebergWriteTest, write) { gluten::IcebergNestedField* child2 = root.add_children(); child2->set_id(2); + gluten::IcebergSortingColumnList list; + auto writer = std::make_unique( asRowType(vector->type()), 1, @@ -63,6 +65,7 @@ TEST_F(VeloxIcebergWriteTest, write) { folly::to(folly::Random::rand64()), // operationId partitionSpec, root, + list, std::unordered_map(), pool_, connectorPool_); diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala index bf99b8bffe08..82bf32a3b383 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala @@ -86,8 +86,16 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec { "Not support write unsupported partition type, or is nested partition column") } } - if (IcebergWriteUtil.getTable(write).sortOrder().isSorted) { - return ValidationResult.failed("Not support write table with sort order") + if ( + IcebergWriteUtil + .getTable(write) + .sortOrder() + .fields() + .stream() + .anyMatch(f => !f.transform().isIdentity) + ) { + return ValidationResult + .failed("Not support write table with sort order transform not identity") } val format = IcebergWriteUtil.getFileFormat(write) if (format != FileFormat.PARQUET) {