diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index 0bf714df237..a342e2247e2 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -67,6 +67,8 @@ pub trait Shuffler: Send + Sync { &self, data: Box, ) -> Result>; + + fn as_any_ref(&self) -> &dyn std::any::Any; } pub struct IvfShuffler { @@ -102,6 +104,10 @@ impl IvfShuffler { #[async_trait::async_trait] impl Shuffler for IvfShuffler { + fn as_any_ref(&self) -> &dyn std::any::Any { + self + } + async fn shuffle( &self, data: Box, @@ -301,12 +307,15 @@ impl ShuffleReader for EmptyReader { /// one file per partition. /// /// An optional `progress` callback can be provided to receive shuffle progress -/// updates. +/// updates. An optional `memory_budget_bytes` sizes the [`TwoFileShuffler`] +/// batch via [`batch_size_from_budget`]; pass `None` to use the default +/// (128 MiB or `LANCE_SHUFFLE_BATCH_BYTES` env var). pub fn create_ivf_shuffler( output_dir: Path, num_partitions: usize, format_version: LanceFileVersion, progress: Option>, + memory_budget_bytes: Option, ) -> Box { let use_legacy = std::env::var("LANCE_LEGACY_SHUFFLER") .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) @@ -320,6 +329,9 @@ pub fn create_ivf_shuffler( Box::new(shuffler) } else { let mut shuffler = TwoFileShuffler::new(output_dir, num_partitions); + if let Some(budget) = memory_budget_bytes { + shuffler = shuffler.with_batch_size_bytes(batch_size_from_budget(budget)); + } if let Some(progress) = progress { shuffler = shuffler.with_progress(progress); } @@ -351,6 +363,15 @@ fn shuffle_batch_bytes() -> usize { } } +/// Derive the shuffler batch size from a memory budget. +/// +/// Uses half the budget as the target, with a floor of +/// `DEFAULT_SHUFFLE_BATCH_BYTES` so very small budgets don't produce +/// pathologically tiny batches. +pub(crate) fn batch_size_from_budget(memory_budget_bytes: usize) -> usize { + (memory_budget_bytes / 2).max(DEFAULT_SHUFFLE_BATCH_BYTES) +} + /// A shuffler that writes all data to just two files (data + offsets) instead /// of one file per partition. This avoids hitting OS file descriptor limits /// when there are many partitions. @@ -388,8 +409,7 @@ impl TwoFileShuffler { self } - #[cfg(test)] - fn with_batch_size_bytes(mut self, batch_size_bytes: usize) -> Self { + pub fn with_batch_size_bytes(mut self, batch_size_bytes: usize) -> Self { self.batch_size_bytes = batch_size_bytes; self } @@ -397,6 +417,10 @@ impl TwoFileShuffler { #[async_trait::async_trait] impl Shuffler for TwoFileShuffler { + fn as_any_ref(&self) -> &dyn std::any::Any { + self + } + async fn shuffle( &self, data: Box, @@ -753,6 +777,50 @@ mod tests { Some(arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap()) } + #[test] + fn test_batch_size_from_budget() { + // Half the budget, but not below the default floor. + assert_eq!(batch_size_from_budget(512 * 1024 * 1024), 256 * 1024 * 1024); + // Budget smaller than 2× floor → floor wins. + assert_eq!( + batch_size_from_budget(64 * 1024 * 1024), + DEFAULT_SHUFFLE_BATCH_BYTES, + ); + // Budget exactly 2× default → half equals default. + assert_eq!( + batch_size_from_budget(2 * DEFAULT_SHUFFLE_BATCH_BYTES), + DEFAULT_SHUFFLE_BATCH_BYTES, + ); + // Large budget. + assert_eq!( + batch_size_from_budget(4 * 1024 * 1024 * 1024), + 2 * 1024 * 1024 * 1024, + ); + } + + #[test] + fn test_create_ivf_shuffler_batch_size_from_budget() { + let dir = TempStrDir::default(); + let path = Path::from(dir.as_ref()); + let budget = 512 * 1024 * 1024_usize; + let shuffler = + create_ivf_shuffler(path.clone(), 4, LanceFileVersion::V2_0, None, Some(budget)); + // Downcast to verify batch size (use_legacy is false by default) + let two_file = shuffler + .as_any_ref() + .downcast_ref::() + .expect("expected TwoFileShuffler"); + assert_eq!(two_file.batch_size_bytes, batch_size_from_budget(budget)); + + // No budget → env-var or default path, which we just check is > 0. + let shuffler2 = create_ivf_shuffler(path, 4, LanceFileVersion::V2_0, None, None); + let two_file2 = shuffler2 + .as_any_ref() + .downcast_ref::() + .expect("expected TwoFileShuffler"); + assert!(two_file2.batch_size_bytes > 0); + } + #[tokio::test] async fn test_two_file_shuffler_round_trip() { let dir = TempStrDir::default(); @@ -876,7 +944,7 @@ mod tests { let output_dir = Path::from(dir.as_ref()); let num_partitions = 3; - // Use a very small batch size to force multiple write batches + // Use a very small batch size to force multiple write batches. // Each i32 is 4 bytes, each u32 is 4 bytes, so ~8 bytes/row. // With a small batch_size_bytes, we get multiple rechunked batches. let batch1 = make_batch(&[0, 1, 2], &[10, 20, 30], Some(1.0)); diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index c5c9038403e..271567d5a98 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -32,6 +32,7 @@ use lance_index::vector::ivf::builder::recommended_num_partitions; use lance_index::vector::ivf::storage::IvfModel; use object_store::path::Path; +use datafusion::execution::memory_pool::MemoryPool; use lance_arrow::FixedSizeListArrayExt; use lance_index::vector::pq::ProductQuantizer; use lance_index::vector::quantizer::QuantizationType; @@ -53,7 +54,7 @@ use lance_linalg::distance::*; use lance_table::format::{IndexMetadata, list_index_files_with_sizes}; use serde::Serialize; use tracing::instrument; -use utils::get_vector_type; +use utils::{get_vector_type, make_index_memory_pool}; use uuid::Uuid; use super::{DatasetIndexExt, DatasetIndexInternalExt, IndexParams, pb, vector_index_details}; @@ -340,7 +341,13 @@ async fn prepare_vector_segment_build( progress: Arc, mode: &str, require_precomputed_ivf: bool, -) -> Result<(DataType, IndexType, IvfBuildParams, Box)> { +) -> Result<( + DataType, + IndexType, + IvfBuildParams, + Box, + Arc, +)> { let stages = ¶ms.stages; if stages.is_empty() { @@ -386,14 +393,16 @@ async fn prepare_vector_segment_build( let format_version = dataset_format_version(dataset); let temp_dir = TempStdDir::default(); let temp_dir_path = Path::from_filesystem_path(&temp_dir)?; + let (memory_pool, memory_budget) = make_index_memory_pool(); let shuffler = create_ivf_shuffler( temp_dir_path, num_partitions, format_version, Some(progress), + memory_budget, ); - Ok((element_type, index_type, ivf_params, shuffler)) + Ok((element_type, index_type, ivf_params, shuffler, memory_pool)) } /// Build a Distributed Vector Index for specific fragments @@ -409,15 +418,16 @@ pub(crate) async fn build_distributed_vector_index( fragment_ids: &[u32], progress: Arc, ) -> Result { - let (element_type, index_type, ivf_params, shuffler) = prepare_vector_segment_build( - dataset, - column, - params, - progress.clone(), - "Build Distributed Vector Index", - true, - ) - .await?; + let (element_type, index_type, ivf_params, shuffler, memory_pool) = + prepare_vector_segment_build( + dataset, + column, + params, + progress.clone(), + "Build Distributed Vector Index", + true, + ) + .await?; let stages = ¶ms.stages; let ivf_centroids = ivf_params @@ -488,6 +498,7 @@ pub(crate) async fn build_distributed_vector_index( .with_ivf(ivf_model) .with_fragment_filter(fragment_filter) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -508,6 +519,7 @@ pub(crate) async fn build_distributed_vector_index( .with_ivf(ivf_model) .with_fragment_filter(fragment_filter) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -556,6 +568,7 @@ pub(crate) async fn build_distributed_vector_index( .with_transpose(false) .with_fragment_filter(fragment_filter) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -582,6 +595,7 @@ pub(crate) async fn build_distributed_vector_index( )? .with_fragment_filter(fragment_filter) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -609,6 +623,7 @@ pub(crate) async fn build_distributed_vector_index( )? .with_fragment_filter(fragment_filter) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -626,6 +641,7 @@ pub(crate) async fn build_distributed_vector_index( )? .with_fragment_filter(fragment_filter) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -667,6 +683,7 @@ pub(crate) async fn build_distributed_vector_index( .with_transpose(false) .with_fragment_filter(fragment_filter) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -697,6 +714,7 @@ pub(crate) async fn build_distributed_vector_index( )? .with_fragment_filter(fragment_filter) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -731,15 +749,16 @@ pub(crate) async fn build_vector_index( frag_reuse_index: Option>, progress: Arc, ) -> Result<()> { - let (element_type, index_type, ivf_params, shuffler) = prepare_vector_segment_build( - dataset, - column, - params, - progress.clone(), - "Build Vector Index", - false, - ) - .await?; + let (element_type, index_type, ivf_params, shuffler, memory_pool) = + prepare_vector_segment_build( + dataset, + column, + params, + progress.clone(), + "Build Vector Index", + false, + ) + .await?; let stages = ¶ms.stages; match index_type { @@ -757,6 +776,7 @@ pub(crate) async fn build_vector_index( frag_reuse_index, )? .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -773,6 +793,7 @@ pub(crate) async fn build_vector_index( frag_reuse_index, )? .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -822,6 +843,7 @@ pub(crate) async fn build_vector_index( builder .with_transpose(!params.skip_transpose) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -847,6 +869,7 @@ pub(crate) async fn build_vector_index( frag_reuse_index, )? .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -873,6 +896,7 @@ pub(crate) async fn build_vector_index( builder .with_transpose(!params.skip_transpose) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -897,6 +921,7 @@ pub(crate) async fn build_vector_index( frag_reuse_index, )? .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -913,6 +938,7 @@ pub(crate) async fn build_vector_index( frag_reuse_index, )? .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -943,6 +969,7 @@ pub(crate) async fn build_vector_index( frag_reuse_index, )? .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -971,6 +998,7 @@ pub(crate) async fn build_vector_index( frag_reuse_index, )? .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -1040,11 +1068,13 @@ pub(crate) async fn build_vector_index_incremental( let temp_dir = TempStdDir::default(); let temp_dir_path = Path::from_filesystem_path(&temp_dir)?; + let (memory_pool, memory_budget) = make_index_memory_pool(); let shuffler = create_ivf_shuffler( temp_dir_path, ivf_model.num_partitions(), format_version, Some(progress.clone()), + memory_budget, ); let index_dir = dataset.indices_dir().child(uuid); @@ -1069,6 +1099,7 @@ pub(crate) async fn build_vector_index_incremental( .with_ivf(ivf_model) .with_quantizer(quantizer.try_into()?) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -1086,6 +1117,7 @@ pub(crate) async fn build_vector_index_incremental( .with_ivf(ivf_model) .with_quantizer(quantizer.try_into()?) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -1113,6 +1145,7 @@ pub(crate) async fn build_vector_index_incremental( .with_quantizer(quantizer.try_into()?) .with_transpose(!params.skip_transpose) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -1131,6 +1164,7 @@ pub(crate) async fn build_vector_index_incremental( .with_ivf(ivf_model) .with_quantizer(quantizer.try_into()?) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -1151,6 +1185,7 @@ pub(crate) async fn build_vector_index_incremental( .with_quantizer(quantizer.try_into()?) .with_transpose(!params.skip_transpose) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -1179,6 +1214,7 @@ pub(crate) async fn build_vector_index_incremental( .with_ivf(ivf_model) .with_quantizer(quantizer.try_into()?) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -1196,6 +1232,7 @@ pub(crate) async fn build_vector_index_incremental( .with_ivf(ivf_model) .with_quantizer(quantizer.try_into()?) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -1214,6 +1251,7 @@ pub(crate) async fn build_vector_index_incremental( .with_ivf(ivf_model) .with_quantizer(quantizer.try_into()?) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } @@ -1231,6 +1269,7 @@ pub(crate) async fn build_vector_index_incremental( .with_ivf(ivf_model) .with_quantizer(quantizer.try_into()?) .with_progress(progress.clone()) + .with_memory_pool(memory_pool.clone()) .build() .await?; } diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 017d50319f8..2f7ec8e12f7 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -16,6 +16,7 @@ use arrow_array::{ RecordBatch, UInt32Array, UInt64Array, }; use arrow_schema::{DataType, Field, Fields}; +use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool, UnboundedMemoryPool}; use futures::{FutureExt, stream}; use futures::{ Stream, @@ -155,6 +156,9 @@ pub struct IvfIndexBuilder { format_version: LanceFileVersion, progress: Arc, + + // per-build memory pool for tracking and limiting memory usage during the build + memory_pool: Arc, } type BuildStream = @@ -200,6 +204,7 @@ impl IvfIndexBuilder transpose_codes: true, format_version, progress: Arc::new(NoopIndexBuildProgress), + memory_pool: Arc::new(UnboundedMemoryPool::default()), }) } @@ -266,6 +271,7 @@ impl IvfIndexBuilder transpose_codes: true, format_version, progress: Arc::new(NoopIndexBuildProgress), + memory_pool: Arc::new(UnboundedMemoryPool::default()), }) } @@ -384,6 +390,17 @@ impl IvfIndexBuilder self } + /// Set the memory pool for this build. + /// + /// Each partition build acquires a [`MemoryReservation`] from this pool before + /// loading partition data. [`MemoryPool::try_grow`] returning `Err` is the + /// spill signal (see issue #7300 for the actual spill reaction). When no pool + /// is provided the builder defaults to an [`UnboundedMemoryPool`] (no limit). + pub fn with_memory_pool(&mut self, pool: Arc) -> &mut Self { + self.memory_pool = pool; + self + } + #[instrument(name = "load_or_build_ivf", level = "debug", skip_all)] async fn load_or_build_ivf(&self) -> Result { match &self.ivf { @@ -837,6 +854,7 @@ impl IvfIndexBuilder let distance_type = self.distance_type; let column = self.column.clone(); let frag_reuse_index = self.frag_reuse_index.clone(); + let memory_pool = self.memory_pool.clone(); let build_iter = assign_batches .into_iter() @@ -849,6 +867,7 @@ impl IvfIndexBuilder let sub_index_params = sub_index_params.clone(); let column = column.clone(); let frag_reuse_index = frag_reuse_index.clone(); + let memory_pool = memory_pool.clone(); let skip_existing_batches = partition_adjustment == Some(PartitionAdjustment::Split(partition)); let partition = match partition_adjustment { @@ -871,7 +890,31 @@ impl IvfIndexBuilder .await? }; - spawn_cpu(move || { + // Acquire a memory reservation for this partition's working set. + // try_grow returning Err is the spill signal (see #7300 for the + // actual spill reaction; for now we log a warning and proceed). + let batch_bytes: usize = + batches.iter().map(|b| b.get_array_memory_size()).sum(); + let mut reservation = MemoryConsumer::new(format!( + "IvfPartition[{}]", + partition + )) + .register(&memory_pool); + if batch_bytes > 0 { + if let Err(e) = reservation.try_grow(batch_bytes) { + log::warn!( + "memory pressure building partition {}: {}; continuing without reservation", + partition, + e + ); + } + } + + let result = spawn_cpu(move || { + // reservation is held for the duration of the CPU-bound build + // and freed on drop when this closure returns. + let _reservation = reservation; + if let Some((assign_batch, deleted_row_ids)) = assign_batch { if !deleted_row_ids.is_empty() { let deleted_row_ids = HashSet::::from_iter( @@ -911,7 +954,8 @@ impl IvfIndexBuilder )?; Ok(Some((storage, sub_index, loss))) }) - .await + .await; + result } }); Ok(stream::iter(build_iter) diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index b67e6ea8e81..3dae40c6bee 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -10,7 +10,7 @@ use super::{ }; use crate::dataset::index::dataset_format_version; use crate::index::DatasetIndexInternalExt; -use crate::index::vector::utils::{get_vector_dim, get_vector_type}; +use crate::index::vector::utils::{get_vector_dim, get_vector_type, make_index_memory_pool}; use crate::{ dataset::Dataset, index::{INDEX_FILE_NAME, pb, prefilter::PreFilter, vector::ivf::io::write_pq_partitions}, @@ -394,7 +394,14 @@ pub(crate) async fn optimize_vector_indices_v2( let temp_dir = lance_core::utils::tempfile::TempStdDir::default(); let temp_dir_path = Path::from_filesystem_path(&temp_dir)?; - let shuffler = create_ivf_shuffler(temp_dir_path, num_partitions, format_version, None); + let (memory_pool, memory_budget) = make_index_memory_pool(); + let shuffler = create_ivf_shuffler( + temp_dir_path, + num_partitions, + format_version, + None, + memory_budget, + ); let (_, element_type) = get_vector_type(dataset.schema(), vector_column)?; let merged_num = match index_type { @@ -416,6 +423,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_existing_indices(existing_indices.clone()) .shuffle_data(unindexed) .await? + .with_memory_pool(memory_pool.clone()) .build() .await? } else { @@ -434,6 +442,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_existing_indices(existing_indices.clone()) .shuffle_data(unindexed) .await? + .with_memory_pool(memory_pool.clone()) .build() .await? } @@ -455,6 +464,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_existing_indices(existing_indices.clone()) .shuffle_data(unindexed) .await? + .with_memory_pool(memory_pool.clone()) .build() .await? } @@ -475,6 +485,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_existing_indices(existing_indices.clone()) .shuffle_data(unindexed) .await? + .with_memory_pool(memory_pool.clone()) .build() .await? } @@ -494,6 +505,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_existing_indices(existing_indices.clone()) .shuffle_data(unindexed) .await? + .with_memory_pool(memory_pool.clone()) .build() .await? } @@ -515,6 +527,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_existing_indices(existing_indices.clone()) .shuffle_data(unindexed) .await? + .with_memory_pool(memory_pool.clone()) .build() .await? } else { @@ -533,6 +546,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_existing_indices(existing_indices.clone()) .shuffle_data(unindexed) .await? + .with_memory_pool(memory_pool.clone()) .build() .await? } @@ -554,6 +568,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_existing_indices(existing_indices.clone()) .shuffle_data(unindexed) .await? + .with_memory_pool(memory_pool.clone()) .build() .await? } @@ -574,6 +589,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_existing_indices(existing_indices.clone()) .shuffle_data(unindexed) .await? + .with_memory_pool(memory_pool.clone()) .build() .await? } diff --git a/rust/lance/src/index/vector/utils.rs b/rust/lance/src/index/vector/utils.rs index f00a81b764d..d12d94c9119 100644 --- a/rust/lance/src/index/vector/utils.rs +++ b/rust/lance/src/index/vector/utils.rs @@ -8,6 +8,7 @@ use arrow::datatypes::DataType; use arrow_array::new_empty_array; use arrow_array::{Array, ArrayRef, FixedSizeListArray, RecordBatch, UInt32Array, cast::AsArray}; use arrow_buffer::{Buffer, MutableBuffer}; +use datafusion::execution::memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool}; use futures::StreamExt; use lance_arrow::DataTypeExt; use lance_core::datatypes::Schema; @@ -21,6 +22,28 @@ use tokio::sync::Mutex; use crate::dataset::Dataset; use crate::{Error, Result}; +/// Create a memory pool for a single index build. +/// +/// Reads `LANCE_INDEX_MEMORY_BUDGET` (bytes). If set to a non-zero value, +/// returns a [`GreedyMemoryPool`] capped at that limit. Otherwise returns +/// an [`UnboundedMemoryPool`] that imposes no limit (existing behavior). +/// +/// The returned budget, if any, should be passed to `create_ivf_shuffler` so +/// that the shuffler's batch size is derived from the same limit. +pub(crate) fn make_index_memory_pool() -> (Arc, Option) { + match std::env::var("LANCE_INDEX_MEMORY_BUDGET") + .ok() + .and_then(|s| s.parse::().ok()) + .filter(|&b| b > 0) + { + Some(budget) => { + log::info!("IVF index build: memory pool budget = {} bytes", budget); + (Arc::new(GreedyMemoryPool::new(budget)), Some(budget)) + } + None => (Arc::new(UnboundedMemoryPool::default()), None), + } +} + /// Helper function to extract a column from a RecordBatch, supporting nested field paths. /// /// This function handles: