From fff1439a0b84fecf91f1ea9f3fe80ff74484ff7a Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Mon, 15 Jun 2026 15:06:06 -0700 Subject: [PATCH 1/3] perf(fts): push prefilter through scalar index on flat FTS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Flat FTS (no inverted index on the text column) used `scan_fragments` + a manual `LanceFilterExec` to apply the prefilter, bypassing any scalar index on the filter column. Route the scan through `filtered_read` instead, matching the brute-force KNN path, so the pushable part of the prefilter is evaluated inside `FilteredReadExec` (using a scalar index when one exists) and only the unpushable `refine_expr` is reapplied on top. Requires `prefilter(true)` on the scanner — the postfilter branch still sends an empty filter plan down and is unaffected. Co-Authored-By: Claude Opus 4.7 --- rust/lance/src/dataset/scanner.rs | 40 ++++++----- rust/lance/src/dataset/tests/dataset_index.rs | 72 +++++++++++++++++++ 2 files changed, 93 insertions(+), 19 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 1112721bb33..f88a75b539b 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3591,33 +3591,35 @@ impl Scanner { .clone(); let mut columns = vec![column]; - if let Some(expr) = filter_plan.full_expr.as_ref() { - let filter_columns = Planner::column_names_in_expr(expr); - columns.extend(filter_columns); + if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { + columns.extend(Planner::column_names_in_expr(refine_expr)); } - let flat_fts_scan_schema = Arc::new(self.dataset.schema().project(&columns).unwrap()); - let mut scan_node = self.scan_fragments( - true, - false, - false, - false, - false, - flat_fts_scan_schema, - Arc::new(fragments), - None, - false, - ); + let scan_projection = self + .dataset + .empty_projection() + .with_row_id() + .union_columns(&columns, OnMissing::Error)?; - if let Some(expr) = filter_plan.full_expr.as_ref() { - // If there is a prefilter we need to manually apply it to the new data - scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?); + let PlannedFilteredScan { mut plan, .. } = self + .filtered_read( + filter_plan, + scan_projection, + /*make_deletions_null=*/ false, + Some(Arc::new(fragments)), + None, + /*is_prefilter=*/ true, + ) + .await?; + + if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { + plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?); } let flat_match_plan = Arc::new(FlatMatchQueryExec::new( self.dataset.clone(), query.clone(), params.clone(), - scan_node, + plan, )); Ok(flat_match_plan) } diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index beb6e2b99fd..b008c37d886 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -1137,6 +1137,78 @@ async fn test_fts_without_index() { assert_eq!(results.num_rows(), 1); } +#[tokio::test] +async fn test_fts_without_index_uses_scalar_index_for_prefilter() { + // Verify that flat FTS (no inverted index on text) routes its prefilter + // through `FilteredReadExec` so a scalar index on the filter column is + // actually used. Six rows with two distinct ids: a prefilter of `id = 1` + // must match exactly the three text rows tagged with id=1. + let text = StringArray::from(vec![ + "alpha bravo", + "charlie delta", + "alpha echo", + "foxtrot", + "alpha golf", + "hotel india", + ]); + let ids = Int32Array::from(vec![1, 1, 1, 2, 2, 2]); + let batch = RecordBatch::try_new( + arrow_schema::Schema::new(vec![ + Field::new("text", text.data_type().to_owned(), false), + Field::new("id", ids.data_type().to_owned(), false), + ]) + .into(), + vec![Arc::new(text) as ArrayRef, Arc::new(ids) as ArrayRef], + ) + .unwrap(); + let schema = batch.schema(); + let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + let test_uri = TempStrDir::default(); + let mut dataset = Dataset::write(batches, &test_uri, None).await.unwrap(); + + // Scalar index on `id` only — no FTS index on `text`. + dataset + .create_index( + &["id"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let mut scan = dataset.scan(); + scan.prefilter(true) + .full_text_search( + FullTextSearchQuery::new("alpha".to_owned()) + .with_columns(&["text".to_string()]) + .unwrap(), + ) + .unwrap() + .filter("id = 1") + .unwrap(); + + let plan = scan.analyze_plan().await.unwrap(); + // The flat-FTS path now reads via `FilteredReadExec` (prints as `LanceRead`) + // with the prefilter plumbed into it, so the scalar index on `id` is used. + assert_contains!(&plan, "FlatMatchQuery"); + assert_contains!(&plan, "LanceRead"); + assert_contains!(&plan, "full_filter=id = Int32(1)"); + // The legacy plan ran a `LanceScan` wrapped in a manual `LanceFilterExec`; + // make sure we did not regress to that shape. + assert_not_contains!(&plan, "LanceScan:"); + + let results = scan.try_into_batch().await.unwrap(); + // Only rows with id=1 AND text matching "alpha": rows 0 ("alpha bravo") + // and 2 ("alpha echo"). Row 4 ("alpha golf") has id=2 and must be excluded. + assert_eq!( + results.num_rows(), + 2, + "expected the two id=1 rows that match `alpha`, got plan:\n{plan}" + ); +} + #[tokio::test] async fn test_fts_rank() { let params = InvertedIndexParams::default(); From c02d647a2c6e175eeb81a2995852f8a32297c46c Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Mon, 15 Jun 2026 20:34:32 -0700 Subject: [PATCH 2/3] test: update test_plans goldens for flat FTS FilteredRead MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The flat-FTS scan now goes through `FilteredReadExec`, so the golden plans in `test_plans` need to reflect both shapes: - No prefilter: legacy emits `LanceScan` with `ordered=true` (vs. the old hardcoded `ordered=false`); v2 emits `LanceRead` with empty filters. Functionally equivalent — output still feeds an outer SortExec by score. - With prefilter on an indexed column: the BTree now pushes into the unindexed-fragment scan. Legacy uses the `MaterializeIndex` shape, v2 uses `LanceRead` with `full_filter` set — same pushdown the indexed `MatchQuery` side already had. Co-Authored-By: Claude Opus 4.7 --- rust/lance/src/dataset/scanner.rs | 38 ++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index f88a75b539b..d8d5173ea90 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -10280,7 +10280,22 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\") .await?; log::info!("Test case: Full text search with unindexed rows"); - let expected = r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid] + // The flat-FTS path now reads through `FilteredReadExec`, matching the + // brute-force KNN path. With no prefilter the scan still produces no + // pushdown, but the operator differs by storage version: legacy emits + // a `LanceScan`, v2 emits a `LanceRead` with empty filters. + let expected = if data_storage_version == LanceFileVersion::Legacy { + r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid] + Take: columns="_rowid, _score, (s)" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: expr=[_score@1 DESC NULLS LAST], preserve_partitioning=[false] + CoalescePartitionsExec + UnionExec + MatchQuery: column=s, query=hello + FlatMatchQuery: column=s, query=hello + LanceScan: uri=..., projection=[s], row_id=true, row_addr=false, ordered=true, range=None"# + } else { + r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid] Take: columns="_rowid, _score, (s)" CoalesceBatchesExec: target_batch_size=8192 SortExec: expr=[_score@1 DESC NULLS LAST], preserve_partitioning=[false] @@ -10288,7 +10303,8 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\") UnionExec MatchQuery: column=s, query=hello FlatMatchQuery: column=s, query=hello - LanceScan: uri=..., projection=[s], row_id=true, row_addr=false, ordered=false, range=None"#; + LanceRead: uri=..., projection=[s], num_fragments=1, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=--, refine_filter=--"# + }; dataset.append_new_data().await?; assert_plan_equals( &dataset.dataset, @@ -10321,6 +10337,10 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\") .await?; log::info!("Test case: Full text search with unindexed rows and prefilter"); + // After routing flat FTS through `FilteredReadExec`, the BTree on `i` + // pushes into the unindexed-fragment scan too — no more `FilterExec` on + // top of an unfiltered `LanceScan`. Legacy uses the `MaterializeIndex` + // shape, v2 uses `LanceRead` with `full_filter` set. let expected = if data_storage_version == LanceFileVersion::Legacy { r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid] Take: columns="_rowid, _score, (s)" @@ -10336,8 +10356,14 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\") FilterExec: i@0 > 10 LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None FlatMatchQuery: column=s, query=hello - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[s, i], row_id=true, row_addr=false, ordered=false, range=None"# + CoalescePartitionsExec + UnionExec + Take: columns="_rowid, (s)" + CoalesceBatchesExec: target_batch_size=8192 + MaterializeIndex: query=[i > 10]@i_idx(BTree) + ProjectionExec: expr=[_rowid@2 as _rowid, s@1 as s] + FilterExec: i@0 > 10 + LanceScan: uri=..., projection=[i, s], row_id=true, row_addr=false, ordered=false, range=None"# } else { r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid] Take: columns="_rowid, _score, (s)" @@ -10349,8 +10375,8 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\") LanceRead: uri=..., projection=[], num_fragments=5, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=-- ScalarIndexQuery: query=[i > 10]@i_idx(BTree) FlatMatchQuery: column=s, query=hello - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[s, i], row_id=true, row_addr=false, ordered=false, range=None"# + LanceRead: uri=..., projection=[s], num_fragments=1, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=-- + ScalarIndexQuery: query=[i > 10]@i_idx(BTree)"# }; assert_plan_equals( &dataset.dataset, From be85be7e4105424e20cb8b82153c87fac1477702 Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Tue, 16 Jun 2026 08:43:41 -0700 Subject: [PATCH 3/3] test(python): update test_partly_indexed_prefiltered_search for FilteredRead The flat-FTS path now reads through `FilteredReadExec` (`LanceRead`), so the unindexed-fragment branch shows `LanceRead` instead of `LanceScan` and the BTree on `id` pushes into the flat scan too. Update the assertion to reflect the new pushdown. Co-Authored-By: Claude Opus 4.7 --- python/python/tests/test_scalar_index.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 7ddfbbc0dc8..13b3de74838 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -648,7 +648,10 @@ def make_fts_search(ds): assert "ScalarIndexQuery" in plan assert "MaterializeIndex" not in plan assert "FlatMatchQuery" in plan - assert "LanceScan" in plan + # Flat FTS now reads via FilteredReadExec (prints as `LanceRead`) so the + # BTree on `id` pushes into the unindexed-fragment scan too. + assert "LanceRead" in plan + assert "LanceScan" not in plan assert make_fts_search(ds).to_table().num_rows == 12 # Update vector index but NOT scalar index