Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use tracing::{debug, info, info_span, warn};
use ulid::Ulid;

use super::ProcessedParquetBatch;
use super::parquet_merge_messages::ParquetMergeTask;
use super::parquet_packager::{ParquetBatchForPackager, ParquetPackager, PartitionedRecordBatch};
use crate::actors::indexer::OTHER_PARTITION_ID;
use crate::models::{NewPublishLock, NewPublishToken, PublishLock};
Expand Down Expand Up @@ -126,10 +127,10 @@ pub struct ParquetSplitBatch {
/// `None` for the ingest path (packager manages its own temp dir).
/// `Some` for the merge path (executor's scratch directory).
pub _scratch_directory_opt: Option<quickwit_common::temp_dir::TempDirectory>,
/// Merge concurrency permit — carried through to the publisher so the
/// semaphore slot isn't released until the upload completes.
/// Merge task — carried through to the publisher so the planner inventory
/// guard and semaphore permit stay alive until publish completes.
/// `None` for the ingest path. `Some` for the merge path.
pub _merge_permit_opt: Option<crate::actors::MergePermit>,
pub _merge_task_opt: Option<ParquetMergeTask>,
}

impl std::fmt::Debug for ParquetSplitBatch {
Expand All @@ -139,7 +140,7 @@ impl std::fmt::Debug for ParquetSplitBatch {
.field("num_splits", &self.splits.len())
.field("output_dir", &self.output_dir)
.field("replaced_split_ids", &self.replaced_split_ids)
.field("has_merge_permit", &self._merge_permit_opt.is_some())
.field("has_merge_task", &self._merge_task_opt.is_some())
.finish()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tracing::{info, instrument, warn};

use super::ParquetUploader;
use super::parquet_indexer::ParquetSplitBatch;
use super::parquet_merge_messages::ParquetMergeScratch;
use super::parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask};
use crate::models::PublishLock;

/// Executes Parquet merge operations using the Phase 1 k-way merge engine.
Expand Down Expand Up @@ -173,7 +173,10 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
publish_token_opt: None,
replaced_split_ids,
_scratch_directory_opt: Some(scratch.scratch_directory),
_merge_permit_opt: Some(scratch.merge_permit),
_merge_task_opt: Some(ParquetMergeTask {
merge_operation: scratch.merge_operation,
merge_permit: scratch.merge_permit,
}),
};
ctx.send_message(&self.uploader_mailbox, batch).await?;
return Ok(());
Expand Down Expand Up @@ -230,12 +233,15 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
publish_token_opt: None,
replaced_split_ids,
_scratch_directory_opt: Some(scratch.scratch_directory),
_merge_permit_opt: Some(scratch.merge_permit),
_merge_task_opt: Some(ParquetMergeTask {
merge_operation: scratch.merge_operation,
merge_permit: scratch.merge_permit,
}),
};

ctx.send_message(&self.uploader_mailbox, batch).await?;

// The merge permit is now carried by the batch — it will be held
// The merge task is now carried by the batch — it will be held
// through the uploader and released when the publisher drops the
// ParquetSplitsUpdate message.
info!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl Handler<ParquetBatchForPackager> for ParquetPackager {
publish_token_opt,
replaced_split_ids: Vec::new(),
_scratch_directory_opt: None,
_merge_permit_opt: None,
_merge_task_opt: None,
};

ctx.send_message(&self.uploader_mailbox, split_batch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use quickwit_parquet_engine::split::ParquetSplitMetadata;
use quickwit_proto::types::{IndexUid, PublishToken};
use tracing::Span;

use super::parquet_merge_messages::ParquetMergeTask;
use crate::models::PublishLock;

/// Message sent by ParquetUploader to downstream actors after staging and uploading.
Expand All @@ -43,10 +44,10 @@ pub struct ParquetSplitsUpdate {
pub publish_token_opt: Option<PublishToken>,
/// Parent span for tracing.
pub parent_span: Span,
/// Merge concurrency permit — held until the publisher drops this message,
/// ensuring the semaphore slot stays occupied while the merge output is
/// in flight. `None` for the ingest path.
pub _merge_permit_opt: Option<crate::actors::MergePermit>,
/// Merge task — held until the publisher drops this message, ensuring the
/// planner inventory guard and semaphore permit stay alive while the merge
/// output is in flight. `None` for the ingest path.
pub _merge_task_opt: Option<ParquetMergeTask>,
}

impl fmt::Debug for ParquetSplitsUpdate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ use crate::actors::sequencer::{Sequencer, SequencerCommand};
use crate::actors::{Publisher, UploaderCounters, UploaderType};
use crate::metrics::INDEXER_METRICS;

/// Concurrent upload permits for metrics uploader.
/// Uses same permit pool as indexer uploads.
static CONCURRENT_UPLOAD_PERMITS_METRICS: OnceLock<Semaphore> = OnceLock::new();
/// Concurrent upload permits for metrics ingest uploads.
static CONCURRENT_UPLOAD_PERMITS_METRICS_INDEX: OnceLock<Semaphore> = OnceLock::new();
/// Concurrent upload permits for metrics merge uploads.
static CONCURRENT_UPLOAD_PERMITS_METRICS_MERGE: OnceLock<Semaphore> = OnceLock::new();

/// Stage splits in the metastore, dispatching to the correct RPC based on split kind.
async fn stage_splits(
Expand Down Expand Up @@ -120,12 +121,24 @@ impl ParquetUploader {
ctx: &ActorContext<Self>,
) -> anyhow::Result<SemaphorePermit<'static>> {
let _guard = ctx.protect_zone();
let concurrent_upload_permits = CONCURRENT_UPLOAD_PERMITS_METRICS
let (concurrent_upload_permits_once_cell, concurrent_upload_permits_gauge) =
match self.uploader_type {
UploaderType::IndexUploader => (
&CONCURRENT_UPLOAD_PERMITS_METRICS_INDEX,
INDEXER_METRICS
.available_concurrent_upload_permits
.with_label_values(["metrics_indexer"]),
),
UploaderType::MergeUploader | UploaderType::DeleteUploader => (
&CONCURRENT_UPLOAD_PERMITS_METRICS_MERGE,
INDEXER_METRICS
.available_concurrent_upload_permits
.with_label_values(["metrics_merger"]),
),
};
let concurrent_upload_permits = concurrent_upload_permits_once_cell
.get_or_init(|| Semaphore::const_new(self.max_concurrent_uploads));
let gauge = INDEXER_METRICS
.available_concurrent_upload_permits
.with_label_values(["metrics"]);
gauge.set(concurrent_upload_permits.available_permits() as i64);
concurrent_upload_permits_gauge.set(concurrent_upload_permits.available_permits() as i64);
concurrent_upload_permits
.acquire()
.await
Expand Down Expand Up @@ -185,7 +198,7 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
publish_lock: batch.publish_lock,
publish_token_opt: batch.publish_token_opt,
parent_span: tracing::Span::current(),
_merge_permit_opt: batch._merge_permit_opt,
_merge_task_opt: batch._merge_task_opt,
};
if tx.send(SequencerCommand::Proceed(update)).is_err() {
warn!("sequencer receiver dropped for empty batch");
Expand Down Expand Up @@ -223,7 +236,7 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
let publish_token_opt = batch.publish_token_opt;
let splits = batch.splits;
let replaced_split_ids = batch.replaced_split_ids;
let merge_permit_opt = batch._merge_permit_opt;
let merge_task_opt = batch._merge_task_opt;
// Hold the scratch directory alive until the upload task completes.
// For the merge path, this prevents the TempDirectory from being
// cleaned up before the upload task reads the merged files.
Expand Down Expand Up @@ -325,8 +338,8 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
}

// Create ParquetSplitsUpdate and send downstream.
// The merge permit (if present) transfers to the update so it
// stays alive until the publisher drops the message.
// The merge task (if present) transfers to the update so the
// planner guard and semaphore permit stay alive until publish.
let update = ParquetSplitsUpdate {
index_uid,
new_splits: splits,
Expand All @@ -335,7 +348,7 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
publish_lock,
publish_token_opt,
parent_span: Span::current(),
_merge_permit_opt: merge_permit_opt,
_merge_task_opt: merge_task_opt,
};

if tx.send(SequencerCommand::Proceed(update)).is_err() {
Expand Down Expand Up @@ -441,7 +454,7 @@ mod tests {
publish_token_opt: None,
replaced_split_ids: Vec::new(),
_scratch_directory_opt: None,
_merge_permit_opt: None,
_merge_task_opt: None,
};

uploader_mailbox.send_message(batch).await.unwrap();
Expand Down Expand Up @@ -537,7 +550,7 @@ mod tests {
publish_token_opt: None,
replaced_split_ids: Vec::new(),
_scratch_directory_opt: None,
_merge_permit_opt: None,
_merge_task_opt: None,
};

uploader_mailbox.send_message(batch).await.unwrap();
Expand Down Expand Up @@ -614,7 +627,7 @@ mod tests {
publish_token_opt: None,
replaced_split_ids: Vec::new(),
_scratch_directory_opt: None,
_merge_permit_opt: None,
_merge_task_opt: None,
};

uploader_mailbox.send_message(batch).await.unwrap();
Expand Down Expand Up @@ -687,7 +700,7 @@ mod tests {
publish_token_opt: None,
replaced_split_ids: Vec::new(),
_scratch_directory_opt: None,
_merge_permit_opt: None,
_merge_task_opt: None,
};
uploader_mailbox.send_message(batch).await.unwrap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl Handler<ParquetSplitsUpdate> for Publisher {
checkpoint_delta_opt,
publish_lock,
publish_token_opt,
_merge_task_opt,
..
} = split_update;

Expand Down Expand Up @@ -108,6 +109,10 @@ impl Handler<ParquetSplitsUpdate> for Publisher {
} else {
self.counters.num_replace_operations += 1;
}
// Keep the merge task alive until after the metastore publish and
// planner feedback have completed. Dropping it releases both the merge
// semaphore permit and the planner's tracked-operation inventory guard.
drop(_merge_task_opt);
Ok(())
}
}
Expand Down Expand Up @@ -175,7 +180,7 @@ mod tests {
publish_lock: PublishLock::default(),
publish_token_opt: None,
parent_span: Span::none(),
_merge_permit_opt: None,
_merge_task_opt: None,
};

publisher_mailbox.send_message(update).await.unwrap();
Expand Down Expand Up @@ -224,7 +229,7 @@ mod tests {
publish_lock: PublishLock::default(),
publish_token_opt: None,
parent_span: Span::none(),
_merge_permit_opt: None,
_merge_task_opt: None,
};

publisher_mailbox.send_message(update).await.unwrap();
Expand Down Expand Up @@ -270,7 +275,7 @@ mod tests {
publish_lock,
publish_token_opt: None,
parent_span: Span::none(),
_merge_permit_opt: None,
_merge_task_opt: None,
};

publisher_mailbox.send_message(update).await.unwrap();
Expand Down
Loading