Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions rust/lance-index/src/vector/v3/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub trait Shuffler: Send + Sync {
&self,
data: Box<dyn RecordBatchStream + Unpin + 'static>,
) -> Result<Box<dyn ShuffleReader>>;

fn as_any_ref(&self) -> &dyn std::any::Any;
}

pub struct IvfShuffler {
Expand Down Expand Up @@ -102,6 +104,10 @@ impl IvfShuffler {

#[async_trait::async_trait]
impl Shuffler for IvfShuffler {
fn as_any_ref(&self) -> &dyn std::any::Any {
self
}

async fn shuffle(
&self,
data: Box<dyn RecordBatchStream + Unpin + 'static>,
Expand Down Expand Up @@ -301,12 +307,15 @@ impl ShuffleReader for EmptyReader {
/// one file per partition.
///
/// An optional `progress` callback can be provided to receive shuffle progress
/// updates.
/// updates. An optional `memory_budget_bytes` sizes the [`TwoFileShuffler`]
/// batch via [`batch_size_from_budget`]; pass `None` to use the default
/// (128 MiB or `LANCE_SHUFFLE_BATCH_BYTES` env var).
pub fn create_ivf_shuffler(
output_dir: Path,
num_partitions: usize,
format_version: LanceFileVersion,
progress: Option<Arc<dyn crate::progress::IndexBuildProgress>>,
memory_budget_bytes: Option<usize>,
) -> Box<dyn Shuffler> {
let use_legacy = std::env::var("LANCE_LEGACY_SHUFFLER")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
Expand All @@ -320,6 +329,9 @@ pub fn create_ivf_shuffler(
Box::new(shuffler)
} else {
let mut shuffler = TwoFileShuffler::new(output_dir, num_partitions);
if let Some(budget) = memory_budget_bytes {
shuffler = shuffler.with_batch_size_bytes(batch_size_from_budget(budget));
}
if let Some(progress) = progress {
shuffler = shuffler.with_progress(progress);
}
Expand Down Expand Up @@ -351,6 +363,15 @@ fn shuffle_batch_bytes() -> usize {
}
}

/// Derive the shuffler batch size from a memory budget.
///
/// Uses half the budget as the target, with a floor of
/// `DEFAULT_SHUFFLE_BATCH_BYTES` so very small budgets don't produce
/// pathologically tiny batches.
pub(crate) fn batch_size_from_budget(memory_budget_bytes: usize) -> usize {
(memory_budget_bytes / 2).max(DEFAULT_SHUFFLE_BATCH_BYTES)
}

/// A shuffler that writes all data to just two files (data + offsets) instead
/// of one file per partition. This avoids hitting OS file descriptor limits
/// when there are many partitions.
Expand Down Expand Up @@ -388,15 +409,18 @@ impl TwoFileShuffler {
self
}

#[cfg(test)]
fn with_batch_size_bytes(mut self, batch_size_bytes: usize) -> Self {
pub fn with_batch_size_bytes(mut self, batch_size_bytes: usize) -> Self {
self.batch_size_bytes = batch_size_bytes;
self
}
}

#[async_trait::async_trait]
impl Shuffler for TwoFileShuffler {
fn as_any_ref(&self) -> &dyn std::any::Any {
self
}

async fn shuffle(
&self,
data: Box<dyn RecordBatchStream + Unpin + 'static>,
Expand Down Expand Up @@ -753,6 +777,50 @@ mod tests {
Some(arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap())
}

#[test]
fn test_batch_size_from_budget() {
// Half the budget, but not below the default floor.
assert_eq!(batch_size_from_budget(512 * 1024 * 1024), 256 * 1024 * 1024);
// Budget smaller than 2× floor → floor wins.
assert_eq!(
batch_size_from_budget(64 * 1024 * 1024),
DEFAULT_SHUFFLE_BATCH_BYTES,
);
// Budget exactly 2× default → half equals default.
assert_eq!(
batch_size_from_budget(2 * DEFAULT_SHUFFLE_BATCH_BYTES),
DEFAULT_SHUFFLE_BATCH_BYTES,
);
// Large budget.
assert_eq!(
batch_size_from_budget(4 * 1024 * 1024 * 1024),
2 * 1024 * 1024 * 1024,
);
}

#[test]
fn test_create_ivf_shuffler_batch_size_from_budget() {
let dir = TempStrDir::default();
let path = Path::from(dir.as_ref());
let budget = 512 * 1024 * 1024_usize;
let shuffler =
create_ivf_shuffler(path.clone(), 4, LanceFileVersion::V2_0, None, Some(budget));
// Downcast to verify batch size (use_legacy is false by default)
let two_file = shuffler
.as_any_ref()
.downcast_ref::<TwoFileShuffler>()
.expect("expected TwoFileShuffler");
assert_eq!(two_file.batch_size_bytes, batch_size_from_budget(budget));

// No budget → env-var or default path, which we just check is > 0.
let shuffler2 = create_ivf_shuffler(path, 4, LanceFileVersion::V2_0, None, None);
let two_file2 = shuffler2
.as_any_ref()
.downcast_ref::<TwoFileShuffler>()
.expect("expected TwoFileShuffler");
assert!(two_file2.batch_size_bytes > 0);
}

#[tokio::test]
async fn test_two_file_shuffler_round_trip() {
let dir = TempStrDir::default();
Expand Down Expand Up @@ -876,7 +944,7 @@ mod tests {
let output_dir = Path::from(dir.as_ref());
let num_partitions = 3;

// Use a very small batch size to force multiple write batches
// Use a very small batch size to force multiple write batches.
// Each i32 is 4 bytes, each u32 is 4 bytes, so ~8 bytes/row.
// With a small batch_size_bytes, we get multiple rechunked batches.
let batch1 = make_batch(&[0, 1, 2], &[10, 20, 30], Some(1.0));
Expand Down
Loading
Loading