Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,10 @@ def make_fts_search(ds):
assert "ScalarIndexQuery" in plan
assert "MaterializeIndex" not in plan
assert "FlatMatchQuery" in plan
assert "LanceScan" in plan
# Flat FTS now reads via FilteredReadExec (prints as `LanceRead`) so the
# BTree on `id` pushes into the unindexed-fragment scan too.
assert "LanceRead" in plan
assert "LanceScan" not in plan
assert make_fts_search(ds).to_table().num_rows == 12

# Update vector index but NOT scalar index
Expand Down
78 changes: 53 additions & 25 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3591,33 +3591,35 @@ impl Scanner {
.clone();

let mut columns = vec![column];
if let Some(expr) = filter_plan.full_expr.as_ref() {
let filter_columns = Planner::column_names_in_expr(expr);
columns.extend(filter_columns);
if let Some(refine_expr) = filter_plan.refine_expr.as_ref() {
columns.extend(Planner::column_names_in_expr(refine_expr));
}
let flat_fts_scan_schema = Arc::new(self.dataset.schema().project(&columns).unwrap());
let mut scan_node = self.scan_fragments(
true,
false,
false,
false,
false,
flat_fts_scan_schema,
Arc::new(fragments),
None,
false,
);
let scan_projection = self
.dataset
.empty_projection()
.with_row_id()
.union_columns(&columns, OnMissing::Error)?;

if let Some(expr) = filter_plan.full_expr.as_ref() {
// If there is a prefilter we need to manually apply it to the new data
scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?);
let PlannedFilteredScan { mut plan, .. } = self
.filtered_read(
filter_plan,
scan_projection,
/*make_deletions_null=*/ false,
Some(Arc::new(fragments)),
None,
/*is_prefilter=*/ true,
)
.await?;

if let Some(refine_expr) = filter_plan.refine_expr.as_ref() {
plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?);
}

let flat_match_plan = Arc::new(FlatMatchQueryExec::new(
self.dataset.clone(),
query.clone(),
params.clone(),
scan_node,
plan,
));
Ok(flat_match_plan)
}
Expand Down Expand Up @@ -10278,15 +10280,31 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\")
.await?;

log::info!("Test case: Full text search with unindexed rows");
let expected = r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
// The flat-FTS path now reads through `FilteredReadExec`, matching the
// brute-force KNN path. With no prefilter the scan still produces no
// pushdown, but the operator differs by storage version: legacy emits
// a `LanceScan`, v2 emits a `LanceRead` with empty filters.
let expected = if data_storage_version == LanceFileVersion::Legacy {
r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
Take: columns="_rowid, _score, (s)"
CoalesceBatchesExec: target_batch_size=8192
SortExec: expr=[_score@1 DESC NULLS LAST], preserve_partitioning=[false]
CoalescePartitionsExec
UnionExec
MatchQuery: column=s, query=hello
FlatMatchQuery: column=s, query=hello
LanceScan: uri=..., projection=[s], row_id=true, row_addr=false, ordered=false, range=None"#;
LanceScan: uri=..., projection=[s], row_id=true, row_addr=false, ordered=true, range=None"#
} else {
r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
Take: columns="_rowid, _score, (s)"
CoalesceBatchesExec: target_batch_size=8192
SortExec: expr=[_score@1 DESC NULLS LAST], preserve_partitioning=[false]
CoalescePartitionsExec
UnionExec
MatchQuery: column=s, query=hello
FlatMatchQuery: column=s, query=hello
LanceRead: uri=..., projection=[s], num_fragments=1, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=--, refine_filter=--"#
};
dataset.append_new_data().await?;
assert_plan_equals(
&dataset.dataset,
Expand Down Expand Up @@ -10319,6 +10337,10 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\")
.await?;

log::info!("Test case: Full text search with unindexed rows and prefilter");
// After routing flat FTS through `FilteredReadExec`, the BTree on `i`
// pushes into the unindexed-fragment scan too — no more `FilterExec` on
// top of an unfiltered `LanceScan`. Legacy uses the `MaterializeIndex`
// shape, v2 uses `LanceRead` with `full_filter` set.
let expected = if data_storage_version == LanceFileVersion::Legacy {
r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
Take: columns="_rowid, _score, (s)"
Expand All @@ -10334,8 +10356,14 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\")
FilterExec: i@0 > 10
LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None
FlatMatchQuery: column=s, query=hello
FilterExec: i@1 > 10
LanceScan: uri=..., projection=[s, i], row_id=true, row_addr=false, ordered=false, range=None"#
CoalescePartitionsExec
UnionExec
Take: columns="_rowid, (s)"
CoalesceBatchesExec: target_batch_size=8192
MaterializeIndex: query=[i > 10]@i_idx(BTree)
ProjectionExec: expr=[_rowid@2 as _rowid, s@1 as s]
FilterExec: i@0 > 10
LanceScan: uri=..., projection=[i, s], row_id=true, row_addr=false, ordered=false, range=None"#
} else {
r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
Take: columns="_rowid, _score, (s)"
Expand All @@ -10347,8 +10375,8 @@ full_filter=name LIKE Utf8(\"test%2\"), refine_filter=name LIKE Utf8(\"test%2\")
LanceRead: uri=..., projection=[], num_fragments=5, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=--
ScalarIndexQuery: query=[i > 10]@i_idx(BTree)
FlatMatchQuery: column=s, query=hello
FilterExec: i@1 > 10
LanceScan: uri=..., projection=[s, i], row_id=true, row_addr=false, ordered=false, range=None"#
LanceRead: uri=..., projection=[s], num_fragments=1, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=--
ScalarIndexQuery: query=[i > 10]@i_idx(BTree)"#
};
assert_plan_equals(
&dataset.dataset,
Expand Down
72 changes: 72 additions & 0 deletions rust/lance/src/dataset/tests/dataset_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,78 @@ async fn test_fts_without_index() {
assert_eq!(results.num_rows(), 1);
}

#[tokio::test]
async fn test_fts_without_index_uses_scalar_index_for_prefilter() {
// Verify that flat FTS (no inverted index on text) routes its prefilter
// through `FilteredReadExec` so a scalar index on the filter column is
// actually used. Six rows with two distinct ids: a prefilter of `id = 1`
// must match exactly the three text rows tagged with id=1.
let text = StringArray::from(vec![
"alpha bravo",
"charlie delta",
"alpha echo",
"foxtrot",
"alpha golf",
"hotel india",
]);
let ids = Int32Array::from(vec![1, 1, 1, 2, 2, 2]);
let batch = RecordBatch::try_new(
arrow_schema::Schema::new(vec![
Field::new("text", text.data_type().to_owned(), false),
Field::new("id", ids.data_type().to_owned(), false),
])
.into(),
vec![Arc::new(text) as ArrayRef, Arc::new(ids) as ArrayRef],
)
.unwrap();
let schema = batch.schema();
let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema);
let test_uri = TempStrDir::default();
let mut dataset = Dataset::write(batches, &test_uri, None).await.unwrap();

// Scalar index on `id` only — no FTS index on `text`.
dataset
.create_index(
&["id"],
IndexType::BTree,
None,
&ScalarIndexParams::default(),
true,
)
.await
.unwrap();

let mut scan = dataset.scan();
scan.prefilter(true)
.full_text_search(
FullTextSearchQuery::new("alpha".to_owned())
.with_columns(&["text".to_string()])
.unwrap(),
)
.unwrap()
.filter("id = 1")
.unwrap();

let plan = scan.analyze_plan().await.unwrap();
// The flat-FTS path now reads via `FilteredReadExec` (prints as `LanceRead`)
// with the prefilter plumbed into it, so the scalar index on `id` is used.
assert_contains!(&plan, "FlatMatchQuery");
assert_contains!(&plan, "LanceRead");
assert_contains!(&plan, "full_filter=id = Int32(1)");
// The legacy plan ran a `LanceScan` wrapped in a manual `LanceFilterExec`;
// make sure we did not regress to that shape.
assert_not_contains!(&plan, "LanceScan:");

let results = scan.try_into_batch().await.unwrap();
// Only rows with id=1 AND text matching "alpha": rows 0 ("alpha bravo")
// and 2 ("alpha echo"). Row 4 ("alpha golf") has id=2 and must be excluded.
assert_eq!(
results.num_rows(),
2,
"expected the two id=1 rows that match `alpha`, got plan:\n{plan}"
);
}

#[tokio::test]
async fn test_fts_rank() {
let params = InvertedIndexParams::default();
Expand Down
Loading