Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.gluten.connector.write
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.IcebergWriteJniWrapper
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.proto.{IcebergNestedField, IcebergPartitionField, IcebergPartitionSpec}
import org.apache.gluten.proto.{IcebergNestedField, IcebergPartitionField, IcebergPartitionSpec, IcebergSortingColumnList}
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.utils.ArrowAbiUtil

Expand All @@ -43,6 +43,7 @@ case class IcebergDataWriteFactory(
partitionSpec: PartitionSpec,
sortOrder: SortOrder,
field: IcebergNestedField,
sortingColumnList: IcebergSortingColumnList,
queryId: String)
extends ColumnarBatchDataWriterFactory
with ColumnarStreamingDataWriterFactory {
Expand Down Expand Up @@ -113,7 +114,9 @@ case class IcebergDataWriteFactory(
taskId,
operationId,
partitionSpec.toByteArray,
field.toByteArray)
field.toByteArray,
sortingColumnList.toByteArray
)
cSchema.close()
(writer, jniWrapper)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package org.apache.gluten.execution

import org.apache.gluten.IcebergNestedFieldVisitor
import org.apache.gluten.connector.write.{ColumnarBatchDataWriterFactory, ColumnarStreamingDataWriterFactory, IcebergDataWriteFactory}
import org.apache.gluten.proto.{IcebergSortingColumn, IcebergSortingColumnList}

import org.apache.spark.sql.types.StructType

import org.apache.iceberg.NullOrder.NULLS_FIRST
import org.apache.iceberg.SortDirection.ASC
import org.apache.iceberg.spark.source.IcebergWriteUtil
import org.apache.iceberg.types.TypeUtil

Expand All @@ -39,14 +42,29 @@ abstract class AbstractIcebergWriteExec extends IcebergWriteExec {
val filteredSchema = StructType(
schema.fields.filter(field => writeFieldNames.contains(field.name))
)
val sortOrder = IcebergWriteUtil.getSortOrder(write)
val sortingColumnList = IcebergSortingColumnList.newBuilder()
sortOrder.fields().forEach {
field =>
val sortingColumn = IcebergSortingColumn
.newBuilder()
.setColumnName(IcebergWriteUtil.getWriteSchema(write).findColumnName(field.sourceId()))
.setAscending(field.direction() == ASC)
.setNullsFirst(field.nullOrder() == NULLS_FIRST)
.build()

sortingColumnList.addFields(sortingColumn)
}

IcebergDataWriteFactory(
filteredSchema,
getFileFormat(IcebergWriteUtil.getFileFormat(write)),
IcebergWriteUtil.getDirectory(write),
getCodec,
getPartitionSpec,
IcebergWriteUtil.getSortOrder(write),
sortOrder,
nestedField,
sortingColumnList.build(),
IcebergWriteUtil.getQueryId(write)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public native long init(long cSchema, int format,
long taskId,
String operationId,
byte[] partitionSpec,
byte[] field);
byte[] field,
byte[] sortedBy);

public native void write(long writerHandle, long batch);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import org.apache.gluten.tags.EnhancedFeaturesTest

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.CommandResultExec
import org.apache.spark.sql.execution.GlutenImplicits._
import org.apache.spark.sql.execution.datasources.v2.AppendDataExec
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.gluten.TestUtils

Expand Down Expand Up @@ -386,28 +384,20 @@ class VeloxIcebergSuite extends IcebergSuite {
}
}

test("iceberg native write fallback when validation fails - sort order") {
test("iceberg native write - sort order") {
withTable("iceberg_sorted_tbl") {
spark.sql("CREATE TABLE iceberg_sorted_tbl (a INT, b STRING) USING iceberg")
spark.sql("ALTER TABLE iceberg_sorted_tbl WRITE ORDERED BY a")

val df = spark.sql("INSERT INTO iceberg_sorted_tbl VALUES (1, 'hello'), (2, 'world')")

// Should fallback to vanilla Spark's AppendDataExec.
val commandPlan =
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan
assert(commandPlan.isInstanceOf[AppendDataExec])
assert(!commandPlan.isInstanceOf[VeloxIcebergAppendDataExec])
assert(commandPlan.isInstanceOf[VeloxIcebergAppendDataExec])

checkAnswer(
spark.sql("SELECT * FROM iceberg_sorted_tbl ORDER BY a"),
Seq(Row(1, "hello"), Row(2, "world")))

// Verify fallbackSummary reports the sort order fallback reason.
val summary = df.fallbackSummary()
assert(
summary.fallbackNodeToReason.exists(
_.values.exists(_.contains("Not support write table with sort order"))))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package gluten;

option java_package = "org.apache.gluten.proto";
option java_multiple_files = true;

message IcebergSortingColumn {
string column_name = 1;
bool ascending = 2;
bool nulls_first = 3;
}

message IcebergSortingColumnList {
repeated IcebergSortingColumn fields = 1;
}
3 changes: 3 additions & 0 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "VeloxRuntime.h"

#include <IcebergSortingColumnList.pb.h>
#include <operators/plannodes/RowVectorStream.h>

#include <algorithm>
Expand Down Expand Up @@ -264,6 +265,7 @@ std::shared_ptr<IcebergWriter> VeloxRuntime::createIcebergWriter(
const std::string& operationId,
std::shared_ptr<const facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& protoField,
const gluten::IcebergSortingColumnList& protoSortingColumnList,
const std::unordered_map<std::string, std::string>& sparkConfs) {
auto veloxPool = memoryManager()->getLeafMemoryPool();
auto connectorPool = memoryManager()->getAggregateMemoryPool();
Expand All @@ -277,6 +279,7 @@ std::shared_ptr<IcebergWriter> VeloxRuntime::createIcebergWriter(
operationId,
spec,
protoField,
protoSortingColumnList,
sparkConfs,
veloxPool,
connectorPool);
Expand Down
1 change: 1 addition & 0 deletions cpp/velox/compute/VeloxRuntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class VeloxRuntime final : public Runtime {
const std::string& operationId,
std::shared_ptr<const facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& protoField,
const gluten::IcebergSortingColumnList& protoSortingColumnList,
const std::unordered_map<std::string, std::string>& sparkConfs);
#endif

Expand Down
20 changes: 18 additions & 2 deletions cpp/velox/compute/iceberg/IcebergWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "IcebergWriter.h"

#include <IcebergSortingColumnList.pb.h>

#include "IcebergPartitionSpec.pb.h"
#include "compute/ProtobufUtils.h"
#include "compute/iceberg/IcebergFormat.h"
Expand Down Expand Up @@ -112,6 +114,19 @@ iceberg::IcebergNestedField convertToIcebergNestedField(const gluten::IcebergNes
return result;
}

std::vector<IcebergSortingColumn> convertToIcebergSortingColumn(
const gluten::IcebergSortingColumnList& protoColumnList) {
std::vector<IcebergSortingColumn> sortingColumns;
sortingColumns.reserve(protoColumnList.fields_size());

for (const auto& protoCol : protoColumnList.fields()) {
core::SortOrder sortOrder(protoCol.ascending(), protoCol.nulls_first());

sortingColumns.emplace_back(protoCol.column_name(), sortOrder);
}
return sortingColumns;
}

std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
const RowTypePtr& outputRowType,
const std::string& outputDirectoryPath,
Expand All @@ -120,6 +135,7 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
int32_t partitionId,
int64_t taskId,
const std::string& operationId,
std::vector<IcebergSortingColumn> sortedBy,
std::shared_ptr<const IcebergPartitionSpec> spec,
const iceberg::IcebergNestedField& nestedField,
facebook::velox::memory::MemoryPool* pool) {
Expand Down Expand Up @@ -157,7 +173,6 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
std::make_shared<connector::hive::LocationHandle>(
outputDirectoryPath, outputDirectoryPath, connector::hive::LocationHandle::TableType::kExisting);
const std::vector<IcebergSortingColumn> sortedBy;
const std::unordered_map<std::string, std::string> serdeParameters;
return std::make_shared<connector::hive::iceberg::IcebergInsertTableHandle>(
columnHandles,
Expand All @@ -184,6 +199,7 @@ IcebergWriter::IcebergWriter(
const std::string& operationId,
std::shared_ptr<const iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& field,
const gluten::IcebergSortingColumnList& sortingColumnList,
const std::unordered_map<std::string, std::string>& sparkConfs,
std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool)
Expand Down Expand Up @@ -224,6 +240,7 @@ IcebergWriter::IcebergWriter(
partitionId_,
taskId_,
operationId_,
convertToIcebergSortingColumn(sortingColumnList),
spec,
field_,
pool_.get()),
Expand Down Expand Up @@ -272,7 +289,6 @@ parseIcebergPartitionSpec(const uint8_t* data, const int32_t length, RowTypePtr
gluten::parseProtobuf(data, length, &protoSpec);
std::vector<iceberg::IcebergPartitionSpec::Field> fields;
fields.reserve(protoSpec.fields_size());

for (const auto& protoField : protoSpec.fields()) {
// Convert protobuf enum to C++ enum
iceberg::TransformType transform;
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/compute/iceberg/IcebergWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <IcebergSortingColumnList.pb.h>

#include "IcebergNestedField.pb.h"
#include "memory/VeloxColumnarBatch.h"
#include "utils/Metrics.h"
Expand Down Expand Up @@ -48,6 +50,7 @@ class IcebergWriter {
const std::string& operationId,
std::shared_ptr<const facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& field,
const gluten::IcebergSortingColumnList& sortingColumnList,
const std::unordered_map<std::string, std::string>& sparkConfs,
std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool);
Expand Down
8 changes: 7 additions & 1 deletion cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

#include <IcebergSortingColumnList.pb.h>
#include <jni.h>

#include <folly/executors/CPUThreadPoolExecutor.h>
Expand Down Expand Up @@ -859,7 +860,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_
jlong taskId,
jstring operationId,
jbyteArray partition,
jbyteArray fieldBytes) {
jbyteArray fieldBytes,
jbyteArray sortingColumnBytes) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);
auto runtime = dynamic_cast<VeloxRuntime*>(ctx);
Expand All @@ -874,6 +876,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_
auto safeArrayField = gluten::getByteArrayElementsSafe(env, fieldBytes);
gluten::IcebergNestedField protoField;
gluten::parseProtobuf(safeArrayField.elems(), safeArrayField.length(), &protoField);
auto safeArraySortingList = gluten::getByteArrayElementsSafe(env, sortingColumnBytes);
gluten::IcebergSortingColumnList protoColumnList;
gluten::parseProtobuf(safeArraySortingList.elems(), safeArraySortingList.length(), &protoColumnList);
return ctx->saveObject(runtime->createIcebergWriter(
rowType,
format,
Expand All @@ -884,6 +889,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_execution_IcebergWriteJniWrapper_
jStringToCString(env, operationId),
spec,
protoField,
protoColumnList,
sparkConf));
JNI_METHOD_END(kInvalidObjectHandle)
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/tests/iceberg/IcebergWriteTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ TEST_F(VeloxIcebergWriteTest, write) {
gluten::IcebergNestedField* child2 = root.add_children();
child2->set_id(2);

gluten::IcebergSortingColumnList list;

auto writer = std::make_unique<IcebergWriter>(
asRowType(vector->type()),
1,
Expand All @@ -63,6 +65,7 @@ TEST_F(VeloxIcebergWriteTest, write) {
folly::to<std::string>(folly::Random::rand64()), // operationId
partitionSpec,
root,
list,
std::unordered_map<std::string, std::string>(),
pool_,
connectorPool_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,16 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec {
"Not support write unsupported partition type, or is nested partition column")
}
}
if (IcebergWriteUtil.getTable(write).sortOrder().isSorted) {
return ValidationResult.failed("Not support write table with sort order")
if (
IcebergWriteUtil
.getTable(write)
.sortOrder()
.fields()
.stream()
.anyMatch(f => !f.transform().isIdentity)
) {
return ValidationResult
.failed("Not support write table with sort order transform not identity")
}
val format = IcebergWriteUtil.getFileFormat(write)
if (format != FileFormat.PARQUET) {
Expand Down
Loading