From e72785e52ed173bac6dda7823385a749706e9892 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Thu, 11 Jun 2026 07:15:20 +0000 Subject: [PATCH 1/2] feat: expose native Lance scan descriptor --- .../java/org/lance/spark/read/LanceScan.java | 169 ++++++++++++ .../lance/spark/read/LanceScanBuilder.java | 1 + .../LanceNativeScanFallbackReason.java | 43 +++ .../LanceNativeScanFallbackReasonCode.java | 23 ++ .../read/nativeplan/LanceNativeScanPlan.java | 183 +++++++++++++ .../read/nativeplan/LanceNativeScanSplit.java | 42 +++ .../org/lance/spark/read/LanceScanTest.java | 256 ++++++++++++++++++ 7 files changed, 717 insertions(+) create mode 100644 lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanFallbackReason.java create mode 100644 lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanFallbackReasonCode.java create mode 100644 lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanPlan.java create mode 100644 lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanSplit.java diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java index fa8b0602f..d03bddd48 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java @@ -17,6 +17,10 @@ import org.lance.ipc.ColumnOrdering; import org.lance.spark.LanceSparkReadOptions; import org.lance.spark.read.metric.LanceCustomMetrics; +import org.lance.spark.read.nativeplan.LanceNativeScanFallbackReason; +import org.lance.spark.read.nativeplan.LanceNativeScanFallbackReasonCode; +import org.lance.spark.read.nativeplan.LanceNativeScanPlan; +import org.lance.spark.read.nativeplan.LanceNativeScanSplit; import org.lance.spark.sharding.SparkLanceShardingUtils; import org.lance.spark.utils.Optional; @@ -47,8 +51,10 @@ import scala.collection.immutable.Map; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Objects; import java.util.Set; @@ -66,6 +72,7 @@ public class LanceScan private static final long serialVersionUID = 947284762748623947L; private static final Logger LOG = LoggerFactory.getLogger(LanceScan.class); + private final StructType sparkReadSchema; private final StructType schema; private final LanceSparkReadOptions readOptions; private final Optional whereConditions; @@ -144,6 +151,49 @@ public LanceScan( java.util.Map initialStorageOptions, String namespaceImpl, java.util.Map namespaceProperties) { + this( + schema, + schema, + readOptions, + whereConditions, + limit, + offset, + topNSortOrders, + pushedAggregation, + pushedPredicates, + statistics, + zonemapStats, + survivingFragmentIds, + precomputedSplits, + precomputedFragmentRowCounts, + activeShardingExpression, + fragmentShardingKeys, + initialStorageOptions, + namespaceImpl, + namespaceProperties); + } + + public LanceScan( + StructType sparkReadSchema, + StructType schema, + LanceSparkReadOptions readOptions, + Optional whereConditions, + Optional limit, + Optional offset, + Optional> topNSortOrders, + Optional pushedAggregation, + Predicate[] pushedPredicates, + LanceStatistics statistics, + java.util.Map> zonemapStats, + Set survivingFragmentIds, + List precomputedSplits, + java.util.Map precomputedFragmentRowCounts, + Expression activeShardingExpression, + java.util.Map fragmentShardingKeys, + java.util.Map initialStorageOptions, + String namespaceImpl, + java.util.Map namespaceProperties) { + this.sparkReadSchema = sparkReadSchema; this.schema = schema; this.readOptions = readOptions; this.whereConditions = whereConditions; @@ -170,6 +220,125 @@ public LanceScan( this.namespaceProperties = namespaceProperties; } + public java.util.Optional nativeScanPlan() { + if (nativeScanFallbackReason().isPresent()) { + return java.util.Optional.empty(); + } + return java.util.Optional.of( + new LanceNativeScanPlan( + scanId, + readOptions.getDatasetUri(), + readOptions.getVersion(), + sparkReadSchema.json(), + schema.json(), + optionalValue(whereConditions), + optionalValue(limit), + optionalValue(offset), + readOptions.getBatchSize(), + nativeStorageOptions(), + namespaceImpl, + namespaceProperties, + readOptions.getTableId(), + readOptions.getCatalogName(), + nativeSplits())); + } + + public java.util.Optional nativeScanFallbackReason() { + if (pushedAggregation == null) { + return fallback( + LanceNativeScanFallbackReasonCode.UNSAFE_V1_STATE, "pushed aggregation state is missing"); + } + if (pushedAggregation.isPresent()) { + return fallback( + LanceNativeScanFallbackReasonCode.PUSHED_AGGREGATION, + "pushed aggregation is not representable in native scan descriptor v1"); + } + if (topNSortOrders == null) { + return fallback(LanceNativeScanFallbackReasonCode.UNSAFE_V1_STATE, "TopN state is missing"); + } + if (topNSortOrders.isPresent()) { + return fallback( + LanceNativeScanFallbackReasonCode.TOP_N, + "pushed TopN is not representable in native scan descriptor v1"); + } + if (readOptions == null || readOptions.getDatasetUri() == null) { + return fallback( + LanceNativeScanFallbackReasonCode.UNSAFE_V1_STATE, "read options are incomplete"); + } + if (readOptions.getVersion() == null) { + return fallback( + LanceNativeScanFallbackReasonCode.MISSING_RESOLVED_VERSION, + "read options are not pinned to a resolved Lance version"); + } + if (precomputedSplits == null) { + return fallback( + LanceNativeScanFallbackReasonCode.MISSING_SPLIT_STATE, + "scan splits were not planned on the driver"); + } + if (sparkReadSchema == null || schema == null || scanId == null) { + return fallback( + LanceNativeScanFallbackReasonCode.UNSAFE_V1_STATE, "schema or scan identity is missing"); + } + if (whereConditions == null || limit == null || offset == null) { + return fallback( + LanceNativeScanFallbackReasonCode.UNSAFE_V1_STATE, + "filter, limit, or offset state is missing"); + } + return validateSplitsForNativePlan(precomputedSplits); + } + + private java.util.Optional validateSplitsForNativePlan( + List splits) { + for (int splitIndex = 0; splitIndex < splits.size(); splitIndex++) { + LanceSplit split = splits.get(splitIndex); + if (split == null || split.getFragments() == null) { + return fallback( + LanceNativeScanFallbackReasonCode.UNSAFE_V1_STATE, + "split " + splitIndex + " is missing fragment state"); + } + for (Integer fragmentId : split.getFragments()) { + if (fragmentId == null || fragmentId < 0) { + return fallback( + LanceNativeScanFallbackReasonCode.UNSAFE_V1_STATE, + "split " + splitIndex + " contains an invalid fragment id"); + } + } + } + return java.util.Optional.empty(); + } + + private List nativeSplits() { + List prunedSplits = pruneByRowAddrFilters(precomputedSplits); + prunedSplits = pruneByZonemapStats(prunedSplits); + prunedSplits = pruneByLimit(prunedSplits, precomputedFragmentRowCounts); + + List nativeSplits = new ArrayList<>(prunedSplits.size()); + for (int i = 0; i < prunedSplits.size(); i++) { + nativeSplits.add(new LanceNativeScanSplit(i, prunedSplits.get(i).getFragments())); + } + return nativeSplits; + } + + private java.util.Map nativeStorageOptions() { + java.util.Map storageOptions = new HashMap<>(); + if (readOptions.getStorageOptions() != null) { + storageOptions.putAll(readOptions.getStorageOptions()); + } + if (initialStorageOptions != null) { + storageOptions.putAll(initialStorageOptions); + } + return storageOptions; + } + + private static T optionalValue(Optional optional) { + return optional.isPresent() ? optional.get() : null; + } + + private static java.util.Optional fallback( + LanceNativeScanFallbackReasonCode code, String message) { + return java.util.Optional.of(new LanceNativeScanFallbackReason(code, message)); + } + @Override public Batch toBatch() { return this; diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java index 5b7df28fd..c03fed3c1 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScanBuilder.java @@ -258,6 +258,7 @@ public Scan build() { Optional whereCondition = FilterPushDown.compileFiltersToSqlWhereClause(pushedPredicates); return new LanceScan( + fullSchema, schema, resolvedReadOptions, whereCondition, diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanFallbackReason.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanFallbackReason.java new file mode 100644 index 000000000..5bb915de4 --- /dev/null +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanFallbackReason.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.read.nativeplan; + +import java.io.Serializable; +import java.util.Objects; + +/** Explicit reason why a {@code LanceScan} cannot be represented as a v1 native scan plan. */ +public final class LanceNativeScanFallbackReason implements Serializable { + private static final long serialVersionUID = 1L; + + private final LanceNativeScanFallbackReasonCode code; + private final String message; + + public LanceNativeScanFallbackReason(LanceNativeScanFallbackReasonCode code, String message) { + this.code = Objects.requireNonNull(code, "code"); + this.message = Objects.requireNonNull(message, "message"); + } + + public LanceNativeScanFallbackReasonCode getCode() { + return code; + } + + public String getMessage() { + return message; + } + + @Override + public String toString() { + return "LanceNativeScanFallbackReason{" + "code=" + code + ", message='" + message + '\'' + '}'; + } +} diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanFallbackReasonCode.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanFallbackReasonCode.java new file mode 100644 index 000000000..b47afe4a6 --- /dev/null +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanFallbackReasonCode.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.read.nativeplan; + +/** Stable v1 reason codes for declining native Lance scan descriptors. */ +public enum LanceNativeScanFallbackReasonCode { + PUSHED_AGGREGATION, + TOP_N, + MISSING_RESOLVED_VERSION, + MISSING_SPLIT_STATE, + UNSAFE_V1_STATE +} diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanPlan.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanPlan.java new file mode 100644 index 000000000..4561657f4 --- /dev/null +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanPlan.java @@ -0,0 +1,183 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.read.nativeplan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** Immutable v1 native Lance read descriptor for reflection-based consumers. */ +public final class LanceNativeScanPlan implements Serializable { + private static final long serialVersionUID = 1L; + + public static final int DESCRIPTOR_VERSION = 1; + + private final int descriptorVersion; + private final String scanId; + private final String datasetUri; + private final long resolvedVersion; + private final String sparkReadSchemaJson; + private final String projectedReadSchemaJson; + private final String pushedFilterSql; + private final Integer limit; + private final Integer offset; + private final int batchSize; + private final Map storageOptions; + private final String namespaceImpl; + private final Map namespaceProperties; + private final List tableId; + private final String catalogName; + private final List splits; + + public LanceNativeScanPlan( + String scanId, + String datasetUri, + long resolvedVersion, + String sparkReadSchemaJson, + String projectedReadSchemaJson, + String pushedFilterSql, + Integer limit, + Integer offset, + int batchSize, + Map storageOptions, + String namespaceImpl, + Map namespaceProperties, + List tableId, + String catalogName, + List splits) { + this.descriptorVersion = DESCRIPTOR_VERSION; + this.scanId = Objects.requireNonNull(scanId, "scanId"); + this.datasetUri = Objects.requireNonNull(datasetUri, "datasetUri"); + this.resolvedVersion = resolvedVersion; + this.sparkReadSchemaJson = Objects.requireNonNull(sparkReadSchemaJson, "sparkReadSchemaJson"); + this.projectedReadSchemaJson = + Objects.requireNonNull(projectedReadSchemaJson, "projectedReadSchemaJson"); + this.pushedFilterSql = pushedFilterSql; + this.limit = limit; + this.offset = offset; + this.batchSize = batchSize; + this.storageOptions = immutableSortedMap(storageOptions); + this.namespaceImpl = namespaceImpl; + this.namespaceProperties = immutableSortedMap(namespaceProperties); + this.tableId = immutableList(tableId); + this.catalogName = catalogName; + this.splits = immutableList(Objects.requireNonNull(splits, "splits")); + } + + public int getDescriptorVersion() { + return descriptorVersion; + } + + public String getScanId() { + return scanId; + } + + public String getDatasetUri() { + return datasetUri; + } + + public long getResolvedVersion() { + return resolvedVersion; + } + + public String getSparkReadSchemaJson() { + return sparkReadSchemaJson; + } + + public String getProjectedReadSchemaJson() { + return projectedReadSchemaJson; + } + + public boolean hasPushedFilterSql() { + return pushedFilterSql != null; + } + + public String getPushedFilterSql() { + return pushedFilterSql; + } + + public boolean hasLimit() { + return limit != null; + } + + public Integer getLimit() { + return limit; + } + + public boolean hasOffset() { + return offset != null; + } + + public Integer getOffset() { + return offset; + } + + public int getBatchSize() { + return batchSize; + } + + public Map getStorageOptions() { + return storageOptions; + } + + public boolean hasNamespaceImpl() { + return namespaceImpl != null; + } + + public String getNamespaceImpl() { + return namespaceImpl; + } + + public Map getNamespaceProperties() { + return namespaceProperties; + } + + public boolean hasTableId() { + return !tableId.isEmpty(); + } + + public List getTableId() { + return tableId; + } + + public boolean hasCatalogName() { + return catalogName != null; + } + + public String getCatalogName() { + return catalogName; + } + + public List getSplits() { + return splits; + } + + private static Map immutableSortedMap(Map input) { + if (input == null || input.isEmpty()) { + return Collections.emptyMap(); + } + return Collections.unmodifiableMap(new TreeMap<>(input)); + } + + private static List immutableList(List input) { + if (input == null || input.isEmpty()) { + return Collections.emptyList(); + } + return Collections.unmodifiableList(new ArrayList<>(input)); + } +} diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanSplit.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanSplit.java new file mode 100644 index 000000000..11a31220a --- /dev/null +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/nativeplan/LanceNativeScanSplit.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.read.nativeplan; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** Serializable native-read split descriptor containing the Lance fragment IDs in that split. */ +public final class LanceNativeScanSplit implements Serializable { + private static final long serialVersionUID = 1L; + + private final int splitIndex; + private final List fragmentIds; + + public LanceNativeScanSplit(int splitIndex, List fragmentIds) { + this.splitIndex = splitIndex; + this.fragmentIds = + Collections.unmodifiableList(new ArrayList<>(Objects.requireNonNull(fragmentIds))); + } + + public int getSplitIndex() { + return splitIndex; + } + + public List getFragmentIds() { + return fragmentIds; + } +} diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java index 761f7c8f4..c5d879b0a 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java @@ -13,12 +13,20 @@ */ package org.lance.spark.read; +import org.lance.spark.LanceSparkReadOptions; import org.lance.spark.TestUtils; +import org.lance.spark.read.nativeplan.LanceNativeScanFallbackReason; +import org.lance.spark.read.nativeplan.LanceNativeScanFallbackReasonCode; +import org.lance.spark.read.nativeplan.LanceNativeScanPlan; +import org.lance.spark.read.nativeplan.LanceNativeScanSplit; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.expressions.Expression; import org.apache.spark.sql.connector.expressions.Expressions; import org.apache.spark.sql.connector.expressions.FieldReference; +import org.apache.spark.sql.connector.expressions.NullOrdering; +import org.apache.spark.sql.connector.expressions.SortDirection; +import org.apache.spark.sql.connector.expressions.SortOrder; import org.apache.spark.sql.connector.expressions.aggregate.AggregateFunc; import org.apache.spark.sql.connector.expressions.aggregate.Aggregation; import org.apache.spark.sql.connector.expressions.aggregate.CountStar; @@ -33,15 +41,31 @@ import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.*; public class LanceScanTest { private static final StructType TEST_SCHEMA = TestUtils.TestTable1Config.schema; + private static final int NATIVE_BATCH_SIZE = 1234; + private static final String CATALOG_NAME = "native_catalog"; + private static final String NAMESPACE_IMPL = "dir"; + private static final String NAMESPACE_PROPERTY_KEY = "root"; + private static final String NAMESPACE_PROPERTY_VALUE = "/tmp/native"; + private static final String STORAGE_OPTION_KEY = "native.option"; + private static final String STORAGE_OPTION_VALUE = "driver"; + private static final String TABLE_NAMESPACE = "default"; + private static final String TABLE_NAME = "table"; private LanceScan buildScan() { return (LanceScan) @@ -54,6 +78,17 @@ private LanceScan buildScan() { .build(); } + private LanceScan buildScan( + LanceSparkReadOptions readOptions, + Map initialStorageOptions, + String namespaceImpl, + Map namespaceProperties) { + return (LanceScan) + new LanceScanBuilder( + TEST_SCHEMA, readOptions, initialStorageOptions, namespaceImpl, namespaceProperties) + .build(); + } + @Test public void testReadSchemaReturnsOriginalSchema() { assertEquals(TEST_SCHEMA, buildScan().readSchema()); @@ -120,6 +155,119 @@ public void testPlanInputPartitionsPropagatesLimit() { assertEquals(2, partition.getLimit().get()); } + @Test + public void testNativeScanPlanForEligibleOrdinaryRead() throws Exception { + LanceSparkReadOptions readOptions = + LanceSparkReadOptions.builder() + .datasetUri(TestUtils.TestTable1Config.datasetUri) + .batchSize(NATIVE_BATCH_SIZE) + .tableId(Arrays.asList(TABLE_NAMESPACE, TABLE_NAME)) + .catalogName(CATALOG_NAME) + .build(); + LanceScan scan = + buildScan( + readOptions, + Collections.singletonMap(STORAGE_OPTION_KEY, STORAGE_OPTION_VALUE), + NAMESPACE_IMPL, + Collections.singletonMap(NAMESPACE_PROPERTY_KEY, NAMESPACE_PROPERTY_VALUE)); + + java.util.Optional maybePlan = scan.nativeScanPlan(); + assertTrue(maybePlan.isPresent()); + assertFalse(scan.nativeScanFallbackReason().isPresent()); + LanceNativeScanPlan plan = maybePlan.get(); + + assertEquals(LanceNativeScanPlan.DESCRIPTOR_VERSION, plan.getDescriptorVersion()); + assertNotNull(plan.getScanId()); + assertEquals(TestUtils.TestTable1Config.datasetUri, plan.getDatasetUri()); + assertTrue(plan.getResolvedVersion() > 0); + assertEquals(TEST_SCHEMA.json(), plan.getSparkReadSchemaJson()); + assertEquals(TEST_SCHEMA.json(), plan.getProjectedReadSchemaJson()); + assertFalse(plan.hasPushedFilterSql()); + assertFalse(plan.hasLimit()); + assertFalse(plan.hasOffset()); + assertEquals(NATIVE_BATCH_SIZE, plan.getBatchSize()); + assertEquals(STORAGE_OPTION_VALUE, plan.getStorageOptions().get(STORAGE_OPTION_KEY)); + assertEquals(NAMESPACE_IMPL, plan.getNamespaceImpl()); + assertEquals( + NAMESPACE_PROPERTY_VALUE, plan.getNamespaceProperties().get(NAMESPACE_PROPERTY_KEY)); + assertEquals(Arrays.asList(TABLE_NAMESPACE, TABLE_NAME), plan.getTableId()); + assertEquals(CATALOG_NAME, plan.getCatalogName()); + assertFalse(plan.getSplits().isEmpty()); + for (LanceNativeScanSplit split : plan.getSplits()) { + assertFalse(split.getFragmentIds().isEmpty()); + } + assertThrows( + UnsupportedOperationException.class, + () -> plan.getStorageOptions().put(STORAGE_OPTION_KEY, STORAGE_OPTION_VALUE)); + assertThrows(UnsupportedOperationException.class, () -> plan.getTableId().add(TABLE_NAME)); + assertThrows( + UnsupportedOperationException.class, + () -> plan.getSplits().add(new LanceNativeScanSplit(0, Collections.singletonList(0)))); + + LanceNativeScanPlan roundTripped = roundTrip(plan); + assertEquals(plan.getDatasetUri(), roundTripped.getDatasetUri()); + assertEquals(plan.getSplits().size(), roundTripped.getSplits().size()); + } + + @Test + public void testNativeScanPlanIncludesProjectedSchemaAndPushedOptions() { + StructType projectedSchema = new StructType().add("x", DataTypes.LongType); + LanceScanBuilder builder = + new LanceScanBuilder( + TEST_SCHEMA, + TestUtils.TestTable1Config.readOptions, + Collections.emptyMap(), + null, + Collections.emptyMap()); + builder.pruneColumns(projectedSchema); + builder.pushPredicates(new Predicate[] {TestPredicates.gt("x", 0L)}); + builder.pushLimit(2); + + LanceScan scan = (LanceScan) builder.build(); + java.util.Optional maybePlan = scan.nativeScanPlan(); + assertTrue(maybePlan.isPresent()); + LanceNativeScanPlan plan = maybePlan.get(); + + assertEquals(TEST_SCHEMA.json(), plan.getSparkReadSchemaJson()); + assertEquals(projectedSchema.json(), plan.getProjectedReadSchemaJson()); + assertTrue(plan.hasPushedFilterSql()); + assertNotNull(plan.getPushedFilterSql()); + assertEquals(Integer.valueOf(2), plan.getLimit()); + assertFalse(plan.hasOffset()); + } + + @Test + public void testNativeScanFallbackReasons() { + FallbackCase[] cases = + new FallbackCase[] { + new FallbackCase( + "aggregation", + this::buildPushedAggregationScan, + LanceNativeScanFallbackReasonCode.PUSHED_AGGREGATION), + new FallbackCase("topN", this::buildTopNScan, LanceNativeScanFallbackReasonCode.TOP_N), + new FallbackCase( + "missing version", + this::buildMissingResolvedVersionScan, + LanceNativeScanFallbackReasonCode.MISSING_RESOLVED_VERSION), + new FallbackCase( + "missing splits", + this::buildMissingSplitStateScan, + LanceNativeScanFallbackReasonCode.MISSING_SPLIT_STATE), + new FallbackCase( + "unsafe split", + this::buildUnsafeSplitStateScan, + LanceNativeScanFallbackReasonCode.UNSAFE_V1_STATE) + }; + + for (FallbackCase testCase : cases) { + LanceScan scan = testCase.scanSupplier.get(); + java.util.Optional reason = scan.nativeScanFallbackReason(); + assertTrue(reason.isPresent(), testCase.name); + assertEquals(testCase.expectedCode, reason.get().getCode(), testCase.name); + assertFalse(scan.nativeScanPlan().isPresent(), testCase.name); + } + } + @Test public void testLimitPrunesPartitions() { LanceScanBuilder builder = @@ -328,4 +476,112 @@ public void testNotEqualWithDifferentSchema() { assertNotEquals(scan1, scan2, "Scans with different schemas should not be equal"); } + + private LanceScan buildPushedAggregationScan() { + LanceScanBuilder builder = + new LanceScanBuilder( + TEST_SCHEMA, + TestUtils.TestTable1Config.readOptions, + Collections.emptyMap(), + null, + Collections.emptyMap()); + builder.pushPredicates(new Predicate[] {TestPredicates.gt("x", 0L)}); + builder.pushAggregation( + new Aggregation(new AggregateFunc[] {new CountStar()}, new Expression[] {})); + return (LanceScan) builder.build(); + } + + private LanceScan buildTopNScan() { + LanceScanBuilder builder = + new LanceScanBuilder( + TEST_SCHEMA, + TestUtils.TestTable1Config.readOptions, + Collections.emptyMap(), + null, + Collections.emptyMap()); + assertTrue( + builder.pushTopN( + new SortOrder[] { + Expressions.sort( + Expressions.column("x"), SortDirection.ASCENDING, NullOrdering.NULLS_FIRST) + }, + 10)); + return (LanceScan) builder.build(); + } + + private LanceScan buildMissingResolvedVersionScan() { + LanceSplit.ScanPlanResult plan = LanceSplit.planScan(TestUtils.TestTable1Config.readOptions); + return directScan( + TestUtils.TestTable1Config.readOptions, plan.getSplits(), plan.getFragmentRowCounts()); + } + + private LanceScan buildMissingSplitStateScan() { + LanceSparkReadOptions resolvedReadOptions = resolvedReadOptions(); + return directScan(resolvedReadOptions, null, Collections.emptyMap()); + } + + private LanceScan buildUnsafeSplitStateScan() { + LanceSparkReadOptions resolvedReadOptions = resolvedReadOptions(); + return directScan( + resolvedReadOptions, + Collections.singletonList(new LanceSplit(Collections.singletonList(-1))), + Collections.emptyMap()); + } + + private LanceScan directScan( + LanceSparkReadOptions readOptions, + List splits, + Map fragmentRowCounts) { + return new LanceScan( + TEST_SCHEMA, + readOptions, + org.lance.spark.utils.Optional.empty(), + org.lance.spark.utils.Optional.empty(), + org.lance.spark.utils.Optional.empty(), + org.lance.spark.utils.Optional.empty(), + org.lance.spark.utils.Optional.empty(), + new Predicate[0], + null, + Collections.emptyMap(), + null, + splits, + fragmentRowCounts, + null, + null, + Collections.emptyMap(), + null, + Collections.emptyMap()); + } + + private LanceSparkReadOptions resolvedReadOptions() { + LanceSplit.ScanPlanResult plan = LanceSplit.planScan(TestUtils.TestTable1Config.readOptions); + return TestUtils.TestTable1Config.readOptions.withVersion(plan.getResolvedVersion()); + } + + @SuppressWarnings("unchecked") + private static T roundTrip(T value) throws Exception { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (ObjectOutputStream out = new ObjectOutputStream(bytes)) { + out.writeObject(value); + } + try (ObjectInputStream in = + new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray()))) { + return (T) in.readObject(); + } + } + + private static class FallbackCase { + private final String name; + private final Supplier scanSupplier; + private final LanceNativeScanFallbackReasonCode expectedCode; + + private FallbackCase( + String name, + Supplier scanSupplier, + LanceNativeScanFallbackReasonCode expectedCode) { + this.name = name; + this.scanSupplier = scanSupplier; + this.expectedCode = expectedCode; + } + } } From 0888e583730d29b947e80bbd99e8887f2f4d4431 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Thu, 11 Jun 2026 07:43:30 +0000 Subject: [PATCH 2/2] fix: preserve native read storage option precedence --- .../java/org/lance/spark/read/LanceScan.java | 6 +++--- .../org/lance/spark/read/LanceScanTest.java | 18 ++++++++++++++---- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java index d03bddd48..b62a622a1 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceScan.java @@ -321,12 +321,12 @@ private List nativeSplits() { private java.util.Map nativeStorageOptions() { java.util.Map storageOptions = new HashMap<>(); - if (readOptions.getStorageOptions() != null) { - storageOptions.putAll(readOptions.getStorageOptions()); - } if (initialStorageOptions != null) { storageOptions.putAll(initialStorageOptions); } + if (readOptions.getStorageOptions() != null) { + storageOptions.putAll(readOptions.getStorageOptions()); + } return storageOptions; } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java index c5d879b0a..5d59d68ba 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/read/LanceScanTest.java @@ -63,7 +63,10 @@ public class LanceScanTest { private static final String NAMESPACE_PROPERTY_KEY = "root"; private static final String NAMESPACE_PROPERTY_VALUE = "/tmp/native"; private static final String STORAGE_OPTION_KEY = "native.option"; - private static final String STORAGE_OPTION_VALUE = "driver"; + private static final String STORAGE_OPTION_INITIAL_VALUE = "initial"; + private static final String STORAGE_OPTION_READ_VALUE = "read"; + private static final String STORAGE_OPTION_INITIAL_ONLY_KEY = "native.initial.only"; + private static final String STORAGE_OPTION_INITIAL_ONLY_VALUE = "driver"; private static final String TABLE_NAMESPACE = "default"; private static final String TABLE_NAME = "table"; @@ -161,13 +164,17 @@ public void testNativeScanPlanForEligibleOrdinaryRead() throws Exception { LanceSparkReadOptions.builder() .datasetUri(TestUtils.TestTable1Config.datasetUri) .batchSize(NATIVE_BATCH_SIZE) + .storageOptions(Collections.singletonMap(STORAGE_OPTION_KEY, STORAGE_OPTION_READ_VALUE)) .tableId(Arrays.asList(TABLE_NAMESPACE, TABLE_NAME)) .catalogName(CATALOG_NAME) .build(); + Map initialStorageOptions = new HashMap<>(); + initialStorageOptions.put(STORAGE_OPTION_KEY, STORAGE_OPTION_INITIAL_VALUE); + initialStorageOptions.put(STORAGE_OPTION_INITIAL_ONLY_KEY, STORAGE_OPTION_INITIAL_ONLY_VALUE); LanceScan scan = buildScan( readOptions, - Collections.singletonMap(STORAGE_OPTION_KEY, STORAGE_OPTION_VALUE), + initialStorageOptions, NAMESPACE_IMPL, Collections.singletonMap(NAMESPACE_PROPERTY_KEY, NAMESPACE_PROPERTY_VALUE)); @@ -186,7 +193,10 @@ public void testNativeScanPlanForEligibleOrdinaryRead() throws Exception { assertFalse(plan.hasLimit()); assertFalse(plan.hasOffset()); assertEquals(NATIVE_BATCH_SIZE, plan.getBatchSize()); - assertEquals(STORAGE_OPTION_VALUE, plan.getStorageOptions().get(STORAGE_OPTION_KEY)); + assertEquals(STORAGE_OPTION_READ_VALUE, plan.getStorageOptions().get(STORAGE_OPTION_KEY)); + assertEquals( + STORAGE_OPTION_INITIAL_ONLY_VALUE, + plan.getStorageOptions().get(STORAGE_OPTION_INITIAL_ONLY_KEY)); assertEquals(NAMESPACE_IMPL, plan.getNamespaceImpl()); assertEquals( NAMESPACE_PROPERTY_VALUE, plan.getNamespaceProperties().get(NAMESPACE_PROPERTY_KEY)); @@ -198,7 +208,7 @@ public void testNativeScanPlanForEligibleOrdinaryRead() throws Exception { } assertThrows( UnsupportedOperationException.class, - () -> plan.getStorageOptions().put(STORAGE_OPTION_KEY, STORAGE_OPTION_VALUE)); + () -> plan.getStorageOptions().put(STORAGE_OPTION_KEY, STORAGE_OPTION_READ_VALUE)); assertThrows(UnsupportedOperationException.class, () -> plan.getTableId().add(TABLE_NAME)); assertThrows( UnsupportedOperationException.class,