diff --git a/docs/src/operations/ddl/optimize.md b/docs/src/operations/ddl/optimize.md index e3149e896..d6b2b232b 100644 --- a/docs/src/operations/ddl/optimize.md +++ b/docs/src/operations/ddl/optimize.md @@ -31,6 +31,16 @@ The `OPTIMIZE` command supports several options to control compaction behavior: | `batch_size` | Long | Batch size for processing | | `defer_index_remap` | Boolean | Whether to defer index remapping | | `max_source_fragments` | Long | Maximum number of source fragments to compact in a single task | +| `compaction_mode` | String | How data is rewritten: `reencode` (default), `try_binary_copy`, or `force_binary_copy` | +| `binary_copy_read_batch_bytes` | Long | Read batch size in bytes used during binary-copy compaction | + +### Compaction modes + +The `compaction_mode` option controls how fragments are rewritten: + +- `reencode` (default) — decode and re-encode the data. +- `try_binary_copy` — binary-copy compatible fragments (skipping decode/re-encode), falling back to `reencode` otherwise. Much cheaper when fragments are compatible. +- `force_binary_copy` — use binary copy, or fail if fragments are not compatible. ### Examples @@ -53,6 +63,13 @@ Optimize with multiple options: ); ``` +Optimize using binary copy when fragments are compatible: + +=== "SQL" + ```sql + OPTIMIZE lance.db.users WITH (compaction_mode = 'try_binary_copy'); + ``` + ## Output The `OPTIMIZE` command returns statistics about the compaction operation: diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeExec.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeExec.scala index f40dd5363..933dc139a 100644 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeExec.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeExec.scala @@ -18,7 +18,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow} import org.apache.spark.sql.catalyst.plans.logical.{LanceNamedArgument, OptimizeOutputType} import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.util.LanceSerializeUtil.{decode, encode} -import org.lance.compaction.{Compaction, CompactionOptions, CompactionTask, RewriteResult} +import org.lance.compaction.{Compaction, CompactionMode, CompactionOptions, CompactionTask, RewriteResult} import org.lance.spark.{BaseLanceNamespaceSparkCatalog, LanceDataset, LanceSparkReadOptions} import org.lance.spark.utils.Utils @@ -51,6 +51,16 @@ case class OptimizeExec( builder.withDeferIndexRemap(t.value.asInstanceOf[Boolean])) argsMap.get("max_source_fragments").map(t => builder.withMaxSourceFragments(t.value.asInstanceOf[Long])) + argsMap.get("compaction_mode").map { t => + val modeStr = t.value.asInstanceOf[String] + val mode = CompactionMode.values().find(_.getValue == modeStr).getOrElse( + throw new IllegalArgumentException( + s"Unknown compaction_mode '$modeStr'. Valid values: " + + CompactionMode.values().map(_.getValue).mkString(", "))) + builder.withCompactionMode(mode) + } + argsMap.get("binary_copy_read_batch_bytes").map(t => + builder.withBinaryCopyReadBatchBytes(t.value.asInstanceOf[Long])) builder.build() } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseOptimizeTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseOptimizeTest.java index f618d328b..bd0dd53d9 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseOptimizeTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseOptimizeTest.java @@ -122,13 +122,41 @@ public void testWithFullArgs() { + "num_threads=2," + "batch_size=2000," + "defer_index_remap=true," - + "max_source_fragments=128" + + "max_source_fragments=128," + + "compaction_mode='try_binary_copy'," + + "binary_copy_read_batch_bytes=1048576" + ")", fullTable)); Assertions.assertEquals("[10,1,10,1]", result.collectAsList().get(0).toString()); } + @Test + public void testCompactionModeTryBinaryCopy() { + prepareDataset(); + + Dataset result = + spark.sql( + String.format( + "optimize %s with (target_rows_per_fragment=20000, " + + "compaction_mode='try_binary_copy')", + fullTable)); + + Assertions.assertEquals("[10,1,10,1]", result.collectAsList().get(0).toString()); + } + + @Test + public void testInvalidCompactionMode() { + prepareDataset(); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + spark + .sql(String.format("optimize %s with (compaction_mode='bogus')", fullTable)) + .collectAsList()); + } + @Test public void testWithoutArgs() { prepareDataset(); diff --git a/pom.xml b/pom.xml index 4c3aa704b..2f5ed6bdd 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ 0.5.0 - 7.0.0 + 8.0.0-beta.7 0.7.5 0.3.0