Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,17 +366,21 @@ private List<LanceSplit> pruneByZonemapStats(List<LanceSplit> allSplits) {
*/
@Override
public Partitioning outputPartitioning() {
if (partitionInfo != null) {
// Use partition info fragment count — available before
// planInputPartitions() is called. This allows
// V2ScanPartitioningAndOrdering to see the partitioning
// early enough for SPJ.
int partCount =
numPartitions >= 0 ? numPartitions : partitionInfo.getFragmentPartitionValues().size();
Expression[] keys = new Expression[] {FieldReference.apply(partitionInfo.getColumnName())};
return new KeyGroupedPartitioning(keys, partCount);
if (partitionInfo == null
|| partitionInfo.isSoftCapped()
|| !LanceScanBuilder.readReportingEnabledConf()) {
return new UnknownPartitioning(numPartitions >= 0 ? numPartitions : 0);
}
return new UnknownPartitioning(numPartitions >= 0 ? numPartitions : 0);
List<String> colNames = partitionInfo.getColumnNames();
if (colNames.size() > 1 && !SparkVersionUtil.supportsMultiKeySpj()) {
return new UnknownPartitioning(numPartitions >= 0 ? numPartitions : 0);
}
int partCount = numPartitions >= 0 ? numPartitions : partitionInfo.size();
Expression[] keys = new Expression[colNames.size()];
for (int i = 0; i < colNames.size(); i++) {
keys[i] = FieldReference.apply(colNames.get(i));
}
return new KeyGroupedPartitioning(keys, partCount);
}

@Override
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.spark.read;

import org.apache.spark.package$;

/**
* Runtime Spark-version gates for features whose behavior differs across supported Spark versions.
*
* <p>Kept in a single helper so pre-merge findings on multi-key SPJ (Spark 3.4 vs 3.5+) can be
* adjusted in one place rather than scattered across the codebase.
*/
final class SparkVersionUtil {

private SparkVersionUtil() {}

/**
* Whether the running Spark version reliably honors multi-key {@code KeyGroupedPartitioning}
* without silently falling back to a shuffle. Uses an explicit allowlist (Spark 3.5.x, 4.x+)
* rather than a denylist so custom forks and unexpected version strings default to the safe "fall
* back to UnknownPartitioning" behavior.
*/
static boolean supportsMultiKeySpj() {
return supportsMultiKeySpj(package$.MODULE$.SPARK_VERSION());
}

/**
* Package-private pure-function overload for unit tests. Inputs the version string directly so
* the parse/allowlist logic can be exercised without a running SparkContext.
*/
static boolean supportsMultiKeySpj(String version) {
if (version == null) {
return false;
}
// Accept "3.5.x" or any 4.x+ build. Reject everything else (3.4.x, 3.3.x, custom strings).
if (version.startsWith("3.5.")) {
return true;
}
int dot = version.indexOf('.');
if (dot <= 0) {
return false;
}
try {
int major = Integer.parseInt(version.substring(0, dot));
return major >= 4;
} catch (NumberFormatException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.spark.sql.sources.LessThanOrEqual;
import org.apache.spark.sql.sources.Not;
import org.apache.spark.sql.sources.Or;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -343,47 +345,174 @@ private enum ComparisonType {
}

/**
* Result of partition detection: the partition column name and a map from fragment ID to the
* partition value for that fragment.
* Result of partition detection: the ordered list of partition column names, a parallel list of
* Spark {@link DataType}s (used to encode each fragment's tuple into an {@link InternalRow}), and
* a map from fragment ID to the partition tuple (one value per declared column, in declaration
* order).
*
* <p>Width invariants (enforced by the constructor): {@code columnTypes.size()} equals {@code
* columnNames.size()}; every tuple has length {@code columnNames.size()}; column names are
* distinct and non-empty.
*/
public static final class PartitionInfo implements Serializable {
private static final long serialVersionUID = 1L;
// Bumped from the single-column shape (1L on upstream main): adds columnTypes and uses
// tuple storage (Map<Integer, Comparable<?>[]>) for multi-column type-aware encoding.
private static final long serialVersionUID = 2L;

private final List<String> columnNames;
private final List<DataType> columnTypes;
private final Map<Integer, Comparable<?>[]> fragmentPartitionKeys;
private final boolean softCapped;

public PartitionInfo(
List<String> columnNames,
List<DataType> columnTypes,
Map<Integer, Comparable<?>[]> fragmentPartitionKeys) {
this(columnNames, columnTypes, fragmentPartitionKeys, false);
}

private final String columnName;
private final Map<Integer, Comparable<?>> fragmentPartitionValues;
public PartitionInfo(
List<String> columnNames,
List<DataType> columnTypes,
Map<Integer, Comparable<?>[]> fragmentPartitionKeys,
boolean softCapped) {
if (columnNames == null || columnNames.isEmpty()) {
throw new IllegalArgumentException("columnNames must be non-empty");
}
if (columnTypes == null || columnTypes.size() != columnNames.size()) {
throw new IllegalArgumentException("columnTypes must have the same size as columnNames");
}
if (new HashSet<>(columnNames).size() != columnNames.size()) {
throw new IllegalArgumentException("columnNames must be distinct: " + columnNames);
}
int width = columnNames.size();
Map<Integer, Comparable<?>[]> copy = new HashMap<>();
for (Map.Entry<Integer, Comparable<?>[]> e : fragmentPartitionKeys.entrySet()) {
Comparable<?>[] tuple = e.getValue();
if (tuple == null || tuple.length != width) {
throw new IllegalArgumentException(
"tuple for fragment " + e.getKey() + " must have length " + width);
}
copy.put(e.getKey(), tuple.clone());
}
this.columnNames = Collections.unmodifiableList(new java.util.ArrayList<>(columnNames));
this.columnTypes = Collections.unmodifiableList(new java.util.ArrayList<>(columnTypes));
this.fragmentPartitionKeys = Collections.unmodifiableMap(copy);
this.softCapped = softCapped;
}

public PartitionInfo(String columnName, Map<Integer, Comparable<?>> fragmentPartitionValues) {
this.columnName = columnName;
this.fragmentPartitionValues = Collections.unmodifiableMap(fragmentPartitionValues);
/**
* Factory for the single-column case. Wraps each scalar partition value into a length-1 tuple
* and delegates to the list-form constructor.
*/
public static PartitionInfo forSingleColumn(
String columnName, DataType columnType, Map<Integer, Comparable<?>> valueByFragment) {
Map<Integer, Comparable<?>[]> tupleMap = new HashMap<>();
for (Map.Entry<Integer, Comparable<?>> e : valueByFragment.entrySet()) {
tupleMap.put(e.getKey(), new Comparable<?>[] {e.getValue()});
}
return new PartitionInfo(
Collections.singletonList(columnName), Collections.singletonList(columnType), tupleMap);
}

public String getColumnName() {
return columnName;
public List<String> getColumnNames() {
return columnNames;
}

public Map<Integer, Comparable<?>> getFragmentPartitionValues() {
return fragmentPartitionValues;
public List<DataType> getColumnTypes() {
return columnTypes;
}

/**
* Returns a partition key {@link InternalRow} for the given fragment ID. The row contains a
* single column with the partition value, converted to a Spark-compatible type.
* Returns the fragment-id → tuple map as an unmodifiable snapshot; each tuple array is
* defensively cloned on every call so mutating the returned arrays cannot corrupt internal
* state. Prefer {@link #partitionKeyForFragment(int)} for hot paths — this getter exists for
* inspection, equality checks, and serialization round-trip tests.
*/
public Map<Integer, Comparable<?>[]> getFragmentPartitionKeys() {
Map<Integer, Comparable<?>[]> snapshot = new HashMap<>(fragmentPartitionKeys.size());
for (Map.Entry<Integer, Comparable<?>[]> e : fragmentPartitionKeys.entrySet()) {
snapshot.put(e.getKey(), e.getValue().clone());
}
return Collections.unmodifiableMap(snapshot);
}

public int size() {
return fragmentPartitionKeys.size();
}

public boolean isSoftCapped() {
return softCapped;
}

/**
* Returns a new PartitionInfo restricted to the given fragment-id set. Preserves column order
* and tuple shape. The {@code softCapped} flag is NOT carried over because the cap decision is
* a function of size; if the restricted size still exceeds the cap, the caller must re-apply it
* via {@link #withSoftCapped()}. Used after filter pushdown narrows the surviving fragment set.
*/
public PartitionInfo restrictTo(Set<Integer> survivingFragmentIds) {
Map<Integer, Comparable<?>[]> restricted = new HashMap<>();
for (Map.Entry<Integer, Comparable<?>[]> e : fragmentPartitionKeys.entrySet()) {
if (survivingFragmentIds.contains(e.getKey())) {
restricted.put(e.getKey(), e.getValue());
}
}
return new PartitionInfo(columnNames, columnTypes, restricted, false);
}

/** Marks this info as soft-capped, returning a new instance (immutability preserved). */
public PartitionInfo withSoftCapped() {
return new PartitionInfo(columnNames, columnTypes, fragmentPartitionKeys, true);
}

/**
* Returns a partition key {@link InternalRow} for the given fragment ID. The row contains one
* or more columns (in declaration order), each converted to a Spark-compatible type.
*/
public InternalRow partitionKeyForFragment(int fragmentId) {
Comparable<?> value = fragmentPartitionValues.get(fragmentId);
Object sparkValue = toSparkValue(value);
return new GenericInternalRow(new Object[] {sparkValue});
Comparable<?>[] tuple = fragmentPartitionKeys.get(fragmentId);
int width = columnNames.size();
Object[] out = new Object[width];
if (tuple == null) {
return new GenericInternalRow(out);
}
for (int i = 0; i < width; i++) {
out[i] = toSparkValue(tuple[i], columnTypes.get(i));
}
return new GenericInternalRow(out);
}

private static Object toSparkValue(Comparable<?> value) {
/**
* Converts a ZoneStats value to the exact Java class Spark's {@link InternalRow} expects for
* the target slot. ZoneStats returns {@code Long} for every integral Arrow width (int8/16/32
* included) and for Date (epoch-days) / Timestamp (epoch-micros); those need explicit narrowing
* to match {@code getByte}/{@code getShort}/{@code getInt} accessors. Boolean is already typed;
* Strings are wrapped in {@link UTF8String}.
*/
private static Object toSparkValue(Comparable<?> value, DataType type) {
if (value == null) {
return null;
}
if (value instanceof String) {
if (DataTypes.BooleanType.equals(type)) {
return value;
}
if (DataTypes.ByteType.equals(type)) {
return ((Number) value).byteValue();
}
if (DataTypes.ShortType.equals(type)) {
return ((Number) value).shortValue();
}
if (DataTypes.IntegerType.equals(type) || DataTypes.DateType.equals(type)) {
return ((Number) value).intValue();
}
if (DataTypes.LongType.equals(type) || DataTypes.TimestampType.equals(type)) {
return ((Number) value).longValue();
}
if (DataTypes.StringType.equals(type)) {
return UTF8String.fromString((String) value);
}
// Long, Double, Boolean, Integer are already compatible
return value;
throw new IllegalArgumentException("Unsupported partition column type: " + type);
}
}

Expand Down
Loading
Loading