diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc b/cpp/velox/compute/iceberg/IcebergWriter.cc index 576524d49969..6412f5d8c2c1 100644 --- a/cpp/velox/compute/iceberg/IcebergWriter.cc +++ b/cpp/velox/compute/iceberg/IcebergWriter.cc @@ -236,15 +236,42 @@ void IcebergWriter::write(const VeloxColumnarBatch& batch) { auto inputRowVector = batch.getRowVector(); auto inputRowType = asRowType(inputRowVector->type()); + // Filter columns to match the expected schema (rowType_) + // This is needed because Spark 4.0 adds metadata columns like __row_operation, _file, _pos + // which are not part of the Iceberg write schema if (inputRowType->size() != rowType_->size()) { - const auto& children = inputRowVector->children(); - std::vector dataColumns(children.begin() + 1, children.begin() + 1 + rowType_->size()); + std::vector filteredChildren; + filteredChildren.reserve(rowType_->size()); + // Build a map of column names to indices in the input + std::unordered_map inputColumnIndices; + for (size_t i = 0; i < inputRowType->size(); i++) { + inputColumnIndices[inputRowType->nameOf(i)] = i; + } + + // Extract columns in the order specified by rowType_ + for (size_t i = 0; i < rowType_->size(); i++) { + const auto& columnName = rowType_->nameOf(i); + auto it = inputColumnIndices.find(columnName); + VELOX_CHECK( + it != inputColumnIndices.end(), + "Column '{}' not found in input batch. Available columns: {}", + columnName, + folly::join(", ", inputRowType->names())); + filteredChildren.push_back(inputRowVector->childAt(it->second)); + } + + // Create a new RowVector with filtered columns auto filteredRowVector = std::make_shared( - pool_.get(), rowType_, inputRowVector->nulls(), inputRowVector->size(), std::move(dataColumns)); + pool_.get(), + rowType_, + inputRowVector->nulls(), + inputRowVector->size(), + std::move(filteredChildren)); dataSink_->appendData(filteredRowVector); } else { + // No filtering needed, schemas match dataSink_->appendData(inputRowVector); } }