Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ private List<ColumnVector> buildSparkOrderedVectors(
throw new IllegalStateException(
"Lance scan did not return expected field '" + fieldName + "'");
}
// Pass the Spark field so the column vector reports the blob v2 descriptor schema when
// applicable and binds nested struct children by the Spark schema's names and order.
// Lance's native scan does not push down nested struct projection, so Arrow always
// carries on-disk struct children in physical order; schema-aware binding is required
// when the partition schema differs from that order (e.g. nested struct pruning). See
// GitHub issue #499. The trailing false keeps the vector reusable across batches (it is
// owned and closed by the reader, not this column vector). See GitHub issue #545.
LanceArrowColumnVector colVec = new LanceArrowColumnVector(vector, false, field);

// Set blob reference context so getBinary() produces blob references
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.LanceArrowUtils;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnVector;
Expand Down Expand Up @@ -81,12 +82,55 @@ public LanceArrowColumnVector(ValueVector vector, boolean closeVectorOnClose) {

public LanceArrowColumnVector(
ValueVector vector, boolean closeVectorOnClose, StructField sparkField) {
super(
BlobUtils.isBlobV2SparkField(sparkField)
? BlobUtils.BLOB_DESCRIPTOR_STRUCT
: computeDataType(vector));
super(reportedSparkType(vector, sparkField));
this.closeVectorOnClose = closeVectorOnClose;
// Schema-aware nested struct projection (#499). Lance's native scan does not push down nested
// struct projection, so a StructVector always carries its on-disk children in physical order.
// When the Spark field is a plain (non-blob) struct, bind Arrow children by the Spark schema's
// field names and order so a pruned or reordered projection reads correctly. Blob descriptor
// structs keep the by-ordinal binding handled in initFromVector.
if (sparkField != null
&& sparkField.dataType() instanceof StructType
&& vector instanceof StructVector
&& !BlobUtils.isBlobArrowField(vector.getField())
&& !BlobUtils.isBlobV2SparkField(sparkField)) {
structAccessor =
new LanceStructAccessor((StructVector) vector, (StructType) sparkField.dataType());
return;
}
initFromVector(vector, sparkField);
}

public LanceArrowColumnVector(ValueVector vector, DataType sparkType) {
this(vector, sparkType, true);
}

/**
* Schema-aware constructor. When {@code sparkType} is a {@link StructType} and the underlying
* Arrow vector is a {@link StructVector} (and not a Blob struct), children are bound by name in
* {@code sparkType}'s field order rather than by physical Arrow ordinal. This lets the same Arrow
* vector serve a pruned or reordered Spark schema correctly — see GitHub issue #499.
*
* <p>For all other vector / type combinations the dispatch is identical to the single-argument
* constructor; {@code sparkType} only changes the reported {@link #dataType()}.
*
* <p>{@code closeVectorOnClose} controls whether {@link #close()} releases the underlying Arrow
* vector; pass {@code false} when the vector is reused across batches — see GitHub issue #545.
*/
public LanceArrowColumnVector(
ValueVector vector, DataType sparkType, boolean closeVectorOnClose) {
super(sparkType);
this.closeVectorOnClose = closeVectorOnClose;
if (sparkType instanceof StructType
&& vector instanceof StructVector
&& !BlobUtils.isBlobArrowField(vector.getField())) {
structAccessor = new LanceStructAccessor((StructVector) vector, (StructType) sparkType);
return;
}
initFromVector(vector, null);
}

private void initFromVector(ValueVector vector, StructField sparkField) {
if (vector instanceof UInt1Vector) {
uInt1Accessor = new UInt1Accessor((UInt1Vector) vector);
} else if (vector instanceof UInt2Vector) {
Expand Down Expand Up @@ -559,6 +603,25 @@ private static DataType computeDataType(ValueVector vector) {
return LanceArrowUtils.fromArrowField(vector.getField());
}

/**
* Computes the Spark type this column reports. Blob v2 descriptor fields report the descriptor
* struct; a plain (non-blob) struct field reports its Spark schema so a pruned or reordered
* nested projection is described correctly (#499); everything else derives the type from the
* Arrow field.
*/
private static DataType reportedSparkType(ValueVector vector, StructField sparkField) {
if (BlobUtils.isBlobV2SparkField(sparkField)) {
return BlobUtils.BLOB_DESCRIPTOR_STRUCT;
}
if (sparkField != null
&& sparkField.dataType() instanceof StructType
&& vector instanceof StructVector
&& !BlobUtils.isBlobArrowField(vector.getField())) {
return sparkField.dataType();
}
return computeDataType(vector);
}

private static class LanceDecimalAccessor {
private static final int DECIMAL128_BYTE_WIDTH = 16;
private final DecimalVector vector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
*/
package org.lance.spark.vectorized;

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnVector;

/**
Expand All @@ -37,6 +40,31 @@ public LanceStructAccessor(StructVector vector) {
}
}

/**
* Schema-aware constructor that maps Arrow children by name to {@code sparkStructType}'s field
* order. Use this when the caller's Spark schema may be a pruned or reordered subset of the Arrow
* vector's on-disk children — Spark's generated projection accesses children by the pruned-schema
* ordinal, so binding by physical Arrow ordinal causes a type mismatch (see GitHub issue #499).
*/
public LanceStructAccessor(StructVector vector, StructType sparkStructType) {
this.accessor = vector;

StructField[] sparkFields = sparkStructType.fields();
this.childColumns = new LanceArrowColumnVector[sparkFields.length];
for (int i = 0; i < sparkFields.length; i++) {
StructField sparkField = sparkFields[i];
FieldVector arrowChild = vector.getChild(sparkField.name(), FieldVector.class);
if (arrowChild == null) {
throw new IllegalArgumentException(
"Arrow struct vector "
+ vector.getField().getName()
+ " is missing required field: "
+ sparkField.name());
}
childColumns[i] = new LanceArrowColumnVector(arrowChild, sparkField.dataType());
}
}

public boolean isNullAt(int rowId) {
return this.accessor.isNull(rowId);
}
Expand Down
Loading
Loading