diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentColumnarBatchScanner.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentColumnarBatchScanner.java index 88db979d6..66c1b4fa4 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentColumnarBatchScanner.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceFragmentColumnarBatchScanner.java @@ -156,6 +156,13 @@ private List buildSparkOrderedVectors( throw new IllegalStateException( "Lance scan did not return expected field '" + fieldName + "'"); } + // Pass the Spark field so the column vector reports the blob v2 descriptor schema when + // applicable and binds nested struct children by the Spark schema's names and order. + // Lance's native scan does not push down nested struct projection, so Arrow always + // carries on-disk struct children in physical order; schema-aware binding is required + // when the partition schema differs from that order (e.g. nested struct pruning). See + // GitHub issue #499. The trailing false keeps the vector reusable across batches (it is + // owned and closed by the reader, not this column vector). See GitHub issue #545. LanceArrowColumnVector colVec = new LanceArrowColumnVector(vector, false, field); // Set blob reference context so getBinary() produces blob references diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceArrowColumnVector.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceArrowColumnVector.java index 36d0de12a..d0b54cb92 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceArrowColumnVector.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceArrowColumnVector.java @@ -43,6 +43,7 @@ import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.LanceArrowUtils; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnVector; @@ -81,12 +82,55 @@ public LanceArrowColumnVector(ValueVector vector, boolean closeVectorOnClose) { public LanceArrowColumnVector( ValueVector vector, boolean closeVectorOnClose, StructField sparkField) { - super( - BlobUtils.isBlobV2SparkField(sparkField) - ? BlobUtils.BLOB_DESCRIPTOR_STRUCT - : computeDataType(vector)); + super(reportedSparkType(vector, sparkField)); this.closeVectorOnClose = closeVectorOnClose; + // Schema-aware nested struct projection (#499). Lance's native scan does not push down nested + // struct projection, so a StructVector always carries its on-disk children in physical order. + // When the Spark field is a plain (non-blob) struct, bind Arrow children by the Spark schema's + // field names and order so a pruned or reordered projection reads correctly. Blob descriptor + // structs keep the by-ordinal binding handled in initFromVector. + if (sparkField != null + && sparkField.dataType() instanceof StructType + && vector instanceof StructVector + && !BlobUtils.isBlobArrowField(vector.getField()) + && !BlobUtils.isBlobV2SparkField(sparkField)) { + structAccessor = + new LanceStructAccessor((StructVector) vector, (StructType) sparkField.dataType()); + return; + } + initFromVector(vector, sparkField); + } + public LanceArrowColumnVector(ValueVector vector, DataType sparkType) { + this(vector, sparkType, true); + } + + /** + * Schema-aware constructor. When {@code sparkType} is a {@link StructType} and the underlying + * Arrow vector is a {@link StructVector} (and not a Blob struct), children are bound by name in + * {@code sparkType}'s field order rather than by physical Arrow ordinal. This lets the same Arrow + * vector serve a pruned or reordered Spark schema correctly — see GitHub issue #499. + * + *

For all other vector / type combinations the dispatch is identical to the single-argument + * constructor; {@code sparkType} only changes the reported {@link #dataType()}. + * + *

{@code closeVectorOnClose} controls whether {@link #close()} releases the underlying Arrow + * vector; pass {@code false} when the vector is reused across batches — see GitHub issue #545. + */ + public LanceArrowColumnVector( + ValueVector vector, DataType sparkType, boolean closeVectorOnClose) { + super(sparkType); + this.closeVectorOnClose = closeVectorOnClose; + if (sparkType instanceof StructType + && vector instanceof StructVector + && !BlobUtils.isBlobArrowField(vector.getField())) { + structAccessor = new LanceStructAccessor((StructVector) vector, (StructType) sparkType); + return; + } + initFromVector(vector, null); + } + + private void initFromVector(ValueVector vector, StructField sparkField) { if (vector instanceof UInt1Vector) { uInt1Accessor = new UInt1Accessor((UInt1Vector) vector); } else if (vector instanceof UInt2Vector) { @@ -559,6 +603,25 @@ private static DataType computeDataType(ValueVector vector) { return LanceArrowUtils.fromArrowField(vector.getField()); } + /** + * Computes the Spark type this column reports. Blob v2 descriptor fields report the descriptor + * struct; a plain (non-blob) struct field reports its Spark schema so a pruned or reordered + * nested projection is described correctly (#499); everything else derives the type from the + * Arrow field. + */ + private static DataType reportedSparkType(ValueVector vector, StructField sparkField) { + if (BlobUtils.isBlobV2SparkField(sparkField)) { + return BlobUtils.BLOB_DESCRIPTOR_STRUCT; + } + if (sparkField != null + && sparkField.dataType() instanceof StructType + && vector instanceof StructVector + && !BlobUtils.isBlobArrowField(vector.getField())) { + return sparkField.dataType(); + } + return computeDataType(vector); + } + private static class LanceDecimalAccessor { private static final int DECIMAL128_BYTE_WIDTH = 16; private final DecimalVector vector; diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceStructAccessor.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceStructAccessor.java index 7c46b0ecd..f2706c787 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceStructAccessor.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/vectorized/LanceStructAccessor.java @@ -13,7 +13,10 @@ */ package org.lance.spark.vectorized; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.complex.StructVector; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.vectorized.ColumnVector; /** @@ -37,6 +40,31 @@ public LanceStructAccessor(StructVector vector) { } } + /** + * Schema-aware constructor that maps Arrow children by name to {@code sparkStructType}'s field + * order. Use this when the caller's Spark schema may be a pruned or reordered subset of the Arrow + * vector's on-disk children — Spark's generated projection accesses children by the pruned-schema + * ordinal, so binding by physical Arrow ordinal causes a type mismatch (see GitHub issue #499). + */ + public LanceStructAccessor(StructVector vector, StructType sparkStructType) { + this.accessor = vector; + + StructField[] sparkFields = sparkStructType.fields(); + this.childColumns = new LanceArrowColumnVector[sparkFields.length]; + for (int i = 0; i < sparkFields.length; i++) { + StructField sparkField = sparkFields[i]; + FieldVector arrowChild = vector.getChild(sparkField.name(), FieldVector.class); + if (arrowChild == null) { + throw new IllegalArgumentException( + "Arrow struct vector " + + vector.getField().getName() + + " is missing required field: " + + sparkField.name()); + } + childColumns[i] = new LanceArrowColumnVector(arrowChild, sparkField.dataType()); + } + } + public boolean isNullAt(int rowId) { return this.accessor.isNull(rowId); } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/vectorized/LanceArrowColumnVectorStructProjectionTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/vectorized/LanceArrowColumnVectorStructProjectionTest.java new file mode 100644 index 000000000..a49f351b1 --- /dev/null +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/vectorized/LanceArrowColumnVectorStructProjectionTest.java @@ -0,0 +1,285 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.vectorized; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarRow; +import org.apache.spark.unsafe.types.UTF8String; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Regression tests for the schema-aware {@code LanceArrowColumnVector(ValueVector, DataType)} + * constructor introduced for issue + * #499. Lance's native scan does not push down nested struct projection, so the underlying + * Arrow {@code StructVector} always carries all on-disk children in physical order. Spark's + * generated projection accesses children by the pruned-schema ordinal; binding by physical Arrow + * ordinal therefore causes a type mismatch and an {@code UnsupportedOperationException} in {@code + * getLong} / {@code getInt} / etc. + */ +public class LanceArrowColumnVectorStructProjectionTest { + + /** + * Mirrors the failing case in the bug report: a 4-child struct of differing types is read through + * a 2-field pruned Spark schema that picks the two long children. Without the schema- aware + * constructor, slot 0 would dispatch to the {@code external_path} VarChar accessor and {@code + * getLong(0)} would throw. + */ + @Test + public void prunedStructProjection_issue499() { + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + StructVector reference = createReferenceStruct(allocator)) { + // Row 0: ("path-0", 100, 50, true) + reference.setIndexDefined(0); + writeRow(reference, 0, "path-0", 100L, 50L, true); + // Row 1: ("path-1", 200, 75, false) + reference.setIndexDefined(1); + writeRow(reference, 1, "path-1", 200L, 75L, false); + // Row 2: null struct + reference.setNull(2); + setRowCounts(reference, 3); + + StructType prunedSchema = + new StructType().add("offset", DataTypes.LongType).add("length", DataTypes.LongType); + + try (LanceArrowColumnVector vector = new LanceArrowColumnVector(reference, prunedSchema)) { + assertEquals(prunedSchema, vector.dataType()); + assertTrue(vector.hasNull()); + assertEquals(1, vector.numNulls()); + + ColumnarRow row0 = vector.getStruct(0); + assertEquals(100L, row0.getLong(0)); + assertEquals(50L, row0.getLong(1)); + + ColumnarRow row1 = vector.getStruct(1); + assertEquals(200L, row1.getLong(0)); + assertEquals(75L, row1.getLong(1)); + + assertTrue(vector.isNullAt(2)); + } + } + } + + /** + * The pruned schema reorders children relative to the Arrow on-disk order. Confirms that {@link + * LanceStructAccessor} maps by name, not by ordinal. + */ + @Test + public void schemaAwareConstructor_reorderedChildren() { + Field field = + new Field( + "s", + FieldType.notNullable(ArrowType.Struct.INSTANCE), + Arrays.asList( + new Field("int_field", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("long_field", FieldType.nullable(new ArrowType.Int(64, true)), null), + new Field("string_field", FieldType.nullable(ArrowType.Utf8.INSTANCE), null))); + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + StructVector vector = (StructVector) field.createVector(allocator)) { + vector.allocateNew(); + IntVector intVec = vector.getChild("int_field", IntVector.class); + BigIntVector longVec = vector.getChild("long_field", BigIntVector.class); + VarCharVector stringVec = vector.getChild("string_field", VarCharVector.class); + + vector.setIndexDefined(0); + intVec.setSafe(0, 42); + longVec.setSafe(0, 9000L); + byte[] hello = "hello".getBytes(StandardCharsets.UTF_8); + stringVec.setSafe(0, hello, 0, hello.length); + + intVec.setValueCount(1); + longVec.setValueCount(1); + stringVec.setValueCount(1); + vector.setValueCount(1); + + // Pruned schema: string first, then int; long_field is dropped. + StructType prunedSchema = + new StructType() + .add("string_field", DataTypes.StringType) + .add("int_field", DataTypes.IntegerType); + + try (LanceArrowColumnVector columnVector = new LanceArrowColumnVector(vector, prunedSchema)) { + assertEquals(prunedSchema, columnVector.dataType()); + + ColumnarRow row0 = columnVector.getStruct(0); + assertEquals(UTF8String.fromString("hello"), row0.getUTF8String(0)); + assertEquals(42, row0.getInt(1)); + } + } + } + + /** + * A pruned outer struct contains an inner struct that is itself pruned and reordered relative to + * the Arrow on-disk ordering. Confirms that schema-awareness recurses through nested struct + * children. + */ + @Test + public void schemaAwareConstructor_recursesIntoNestedStructs() { + Field innerField = + new Field( + "inner", + FieldType.notNullable(ArrowType.Struct.INSTANCE), + Arrays.asList( + new Field("a", FieldType.nullable(ArrowType.Utf8.INSTANCE), null), + new Field("b", FieldType.nullable(new ArrowType.Int(64, true)), null), + new Field("c", FieldType.nullable(new ArrowType.Int(64, true)), null), + new Field("d", FieldType.nullable(ArrowType.Bool.INSTANCE), null))); + Field outerField = + new Field( + "outer", + FieldType.notNullable(ArrowType.Struct.INSTANCE), + Arrays.asList( + innerField, + new Field("extra", FieldType.nullable(new ArrowType.Int(32, true)), null))); + + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + StructVector outer = (StructVector) outerField.createVector(allocator)) { + outer.allocateNew(); + StructVector inner = outer.getChild("inner", StructVector.class); + IntVector extra = outer.getChild("extra", IntVector.class); + VarCharVector aVec = inner.getChild("a", VarCharVector.class); + BigIntVector bVec = inner.getChild("b", BigIntVector.class); + BigIntVector cVec = inner.getChild("c", BigIntVector.class); + BitVector dVec = inner.getChild("d", BitVector.class); + + outer.setIndexDefined(0); + inner.setIndexDefined(0); + byte[] alpha = "alpha".getBytes(StandardCharsets.UTF_8); + aVec.setSafe(0, alpha, 0, alpha.length); + bVec.setSafe(0, 11L); + cVec.setSafe(0, 22L); + dVec.setSafe(0, 1); + extra.setSafe(0, 7); + + aVec.setValueCount(1); + bVec.setValueCount(1); + cVec.setValueCount(1); + dVec.setValueCount(1); + inner.setValueCount(1); + extra.setValueCount(1); + outer.setValueCount(1); + + StructType prunedInner = + new StructType().add("b", DataTypes.LongType).add("c", DataTypes.LongType); + StructType prunedOuter = new StructType().add("inner", prunedInner); + + try (LanceArrowColumnVector columnVector = new LanceArrowColumnVector(outer, prunedOuter)) { + assertEquals(prunedOuter, columnVector.dataType()); + + ColumnarRow outerRow = columnVector.getStruct(0); + ColumnarRow innerRow = outerRow.getStruct(0, 2); + assertEquals(11L, innerRow.getLong(0)); + assertEquals(22L, innerRow.getLong(1)); + } + } + } + + /** + * When the pruned schema names a field that does not exist in the Arrow vector, construction + * should fail with an actionable error rather than silently returning bogus data. + */ + @Test + public void schemaAwareConstructor_rejectsMissingField() { + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + StructVector reference = createReferenceStruct(allocator)) { + setRowCounts(reference, 0); + + StructType badSchema = new StructType().add("does_not_exist", DataTypes.LongType); + + IllegalArgumentException ex = + org.junit.jupiter.api.Assertions.assertThrows( + IllegalArgumentException.class, + () -> new LanceArrowColumnVector(reference, badSchema)); + assertTrue(ex.getMessage().contains("does_not_exist")); + } + } + + /** Builds the 4-child struct from the issue: external_path / offset / length / managed. */ + private static StructVector createReferenceStruct(BufferAllocator allocator) { + Field field = + new Field( + "reference", + FieldType.nullable(ArrowType.Struct.INSTANCE), + Arrays.asList( + new Field("external_path", FieldType.nullable(ArrowType.Utf8.INSTANCE), null), + new Field("offset", FieldType.nullable(new ArrowType.Int(64, true)), null), + new Field("length", FieldType.nullable(new ArrowType.Int(64, true)), null), + new Field("managed", FieldType.nullable(ArrowType.Bool.INSTANCE), null))); + StructVector vector = (StructVector) field.createVector(allocator); + vector.allocateNew(); + return vector; + } + + private static void writeRow( + StructVector vector, + int rowId, + String externalPath, + long offset, + long length, + boolean managed) { + VarCharVector externalPathVec = vector.getChild("external_path", VarCharVector.class); + BigIntVector offsetVec = vector.getChild("offset", BigIntVector.class); + BigIntVector lengthVec = vector.getChild("length", BigIntVector.class); + BitVector managedVec = vector.getChild("managed", BitVector.class); + + byte[] pathBytes = externalPath.getBytes(StandardCharsets.UTF_8); + externalPathVec.setSafe(rowId, pathBytes, 0, pathBytes.length); + offsetVec.setSafe(rowId, offset); + lengthVec.setSafe(rowId, length); + managedVec.setSafe(rowId, managed ? 1 : 0); + } + + private static void setRowCounts(StructVector vector, int count) { + for (int i = 0; i < vector.size(); i++) { + vector.getChildByOrdinal(i).setValueCount(count); + } + vector.setValueCount(count); + } + + @Test + public void singleArgConstructor_unchangedForBlobAndPrimitives() { + // Sanity check that adding the schema-aware path didn't break the existing legacy constructor. + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + BigIntVector longVec = new BigIntVector("v", allocator)) { + longVec.allocateNew(2); + longVec.setSafe(0, 7L); + longVec.setSafe(1, 8L); + longVec.setValueCount(2); + + try (LanceArrowColumnVector cv = new LanceArrowColumnVector(longVec)) { + assertEquals(DataTypes.LongType, cv.dataType()); + assertEquals(7L, cv.getLong(0)); + assertEquals(8L, cv.getLong(1)); + assertFalse(cv.hasNull()); + } + } + } +}