Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/src/operations/ddl/optimize.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ The `OPTIMIZE` command supports several options to control compaction behavior:
| `batch_size` | Long | Batch size for processing |
| `defer_index_remap` | Boolean | Whether to defer index remapping |
| `max_source_fragments` | Long | Maximum number of source fragments to compact in a single task |
| `compaction_mode` | String | How data is rewritten: `reencode` (default), `try_binary_copy`, or `force_binary_copy` |
| `binary_copy_read_batch_bytes` | Long | Read batch size in bytes used during binary-copy compaction |

### Compaction modes

The `compaction_mode` option controls how fragments are rewritten:

- `reencode` (default) — decode and re-encode the data.
- `try_binary_copy` — binary-copy compatible fragments (skipping decode/re-encode), falling back to `reencode` otherwise. Much cheaper when fragments are compatible.
- `force_binary_copy` — use binary copy, or fail if fragments are not compatible.

### Examples

Expand All @@ -53,6 +63,13 @@ Optimize with multiple options:
);
```

Optimize using binary copy when fragments are compatible:

=== "SQL"
```sql
OPTIMIZE lance.db.users WITH (compaction_mode = 'try_binary_copy');
```

## Output

The `OPTIMIZE` command returns statistics about the compaction operation:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow}
import org.apache.spark.sql.catalyst.plans.logical.{LanceNamedArgument, OptimizeOutputType}
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.util.LanceSerializeUtil.{decode, encode}
import org.lance.compaction.{Compaction, CompactionOptions, CompactionTask, RewriteResult}
import org.lance.compaction.{Compaction, CompactionMode, CompactionOptions, CompactionTask, RewriteResult}
import org.lance.spark.{BaseLanceNamespaceSparkCatalog, LanceDataset, LanceSparkReadOptions}
import org.lance.spark.utils.Utils

Expand Down Expand Up @@ -51,6 +51,16 @@ case class OptimizeExec(
builder.withDeferIndexRemap(t.value.asInstanceOf[Boolean]))
argsMap.get("max_source_fragments").map(t =>
builder.withMaxSourceFragments(t.value.asInstanceOf[Long]))
argsMap.get("compaction_mode").map { t =>
val modeStr = t.value.asInstanceOf[String]
val mode = CompactionMode.values().find(_.getValue == modeStr).getOrElse(
throw new IllegalArgumentException(
s"Unknown compaction_mode '$modeStr'. Valid values: " +
CompactionMode.values().map(_.getValue).mkString(", ")))
builder.withCompactionMode(mode)
}
argsMap.get("binary_copy_read_batch_bytes").map(t =>
builder.withBinaryCopyReadBatchBytes(t.value.asInstanceOf[Long]))

builder.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,41 @@ public void testWithFullArgs() {
+ "num_threads=2,"
+ "batch_size=2000,"
+ "defer_index_remap=true,"
+ "max_source_fragments=128"
+ "max_source_fragments=128,"
+ "compaction_mode='try_binary_copy',"
+ "binary_copy_read_batch_bytes=1048576"
+ ")",
fullTable));

Assertions.assertEquals("[10,1,10,1]", result.collectAsList().get(0).toString());
}

@Test
public void testCompactionModeTryBinaryCopy() {
prepareDataset();

Dataset<Row> result =
spark.sql(
String.format(
"optimize %s with (target_rows_per_fragment=20000, "
+ "compaction_mode='try_binary_copy')",
fullTable));

Assertions.assertEquals("[10,1,10,1]", result.collectAsList().get(0).toString());
}

@Test
public void testInvalidCompactionMode() {
prepareDataset();

Assertions.assertThrows(
IllegalArgumentException.class,
() ->
spark
.sql(String.format("optimize %s with (compaction_mode='bogus')", fullTable))
.collectAsList());
}

@Test
public void testWithoutArgs() {
prepareDataset();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

<properties>
<lance-spark.version>0.5.0</lance-spark.version>
<lance.version>7.0.0</lance.version>
<lance.version>8.0.0-beta.7</lance.version>
<lance-namespace.version>0.7.5</lance-namespace.version>
<lance-namespace-impl.version>0.3.0</lance-namespace-impl.version>

Expand Down
Loading