Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 61 additions & 29 deletions turbopack/crates/turbo-tasks/src/raw_vc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,37 +408,60 @@ impl Unpin for ResolveRawVcFuture {}

#[must_use]
pub struct ReadRawVcFuture {
/// Phase 1: resolves the [`RawVc`] pointer chain to a [`RawVc::TaskCell`].
resolve: ResolveRawVcFuture,
/// Phase 2: options for the cell read once we have a [`RawVc::TaskCell`].
/// Options for the cell read once we have a [`RawVc::TaskCell`]. Persists across both phases
/// because it is configured by the builder before phase 1 starts.
read_cell_options: ReadCellOptions,
/// Phase 2: the resolved task and cell identity, set when phase 1 completes.
resolved: Option<(TaskId, CellId)>,
/// Phase 2: listener for the cell read wait.
listener: Option<EventListener>,
/// Whether phase 1 was a strongly-consistent read. Needed in phase 2 to re-apply
/// [`suppress_top_level_task_check`]. Stored here so we can drop the [`ResolveRawVcFuture`]
/// once phase 1 completes.
strongly_consistent: bool,
state: ReadRawVcState,
}

/// Phase 1 and phase 2 of [`ReadRawVcFuture`] use disjoint sets of fields. Storing them in an
/// enum keeps the future smaller than holding both sets simultaneously.
enum ReadRawVcState {
/// Phase 1: resolves the [`RawVc`] pointer chain to a [`RawVc::TaskCell`].
Resolving(ResolveRawVcFuture),
/// Phase 2: the resolved task/cell identity plus a listener for the cell read wait.
Reading {
task: TaskId,
index: CellId,
listener: Option<EventListener>,
},
}

impl ReadRawVcFuture {
pub(crate) fn new(vc: RawVc) -> Self {
ReadRawVcFuture {
resolve: ResolveRawVcFuture::new(vc),
read_cell_options: ReadCellOptions::default(),
resolved: None,
listener: None,
strongly_consistent: false,
state: ReadRawVcState::Resolving(ResolveRawVcFuture::new(vc)),
}
}

fn map_resolve(mut self, f: impl FnOnce(ResolveRawVcFuture) -> ResolveRawVcFuture) -> Self {
match self.state {
ReadRawVcState::Resolving(resolve) => {
self.state = ReadRawVcState::Resolving(f(resolve));
}
ReadRawVcState::Reading { .. } => {
unreachable!("builder methods are only called before polling");
}
}
self
}

/// Make reads strongly consistent.
pub fn strongly_consistent(mut self) -> Self {
self.resolve = self.resolve.strongly_consistent();
self
self.strongly_consistent = true;
self.map_resolve(|r| r.strongly_consistent())
}

/// Track the value as a dependency with an key.
pub fn track_with_key(mut self, key: u64) -> Self {
self.resolve = self.resolve.track_with_key();
self.read_cell_options.tracking = ReadCellTracking::Tracked { key: Some(key) };
self
self.map_resolve(|r| r.track_with_key())
}

/// This will not track the value as dependency, but will still track the error as dependency,
Expand All @@ -447,9 +470,8 @@ impl ReadRawVcFuture {
/// INVALIDATION: Be careful with this, it will not track dependencies, so
/// using it could break cache invalidation.
pub fn untracked(mut self) -> Self {
self.resolve = self.resolve.untracked();
self.read_cell_options.tracking = ReadCellTracking::TrackOnlyError;
self
self.map_resolve(|r| r.untracked())
}

/// Hint that this is the final read of the cell content.
Expand All @@ -471,40 +493,50 @@ impl Future for ReadRawVcFuture {
//
// `ResolveRawVcFuture` is `Unpin`, so `Pin::new` is safe.
// It handles `with_turbo_tasks` and `suppress_top_level_task_check` internally.
if this.resolved.is_none() {
match ready!(Pin::new(&mut this.resolve).poll(cx)) {
if let ReadRawVcState::Resolving(resolve) = &mut this.state {
match ready!(Pin::new(resolve).poll(cx)) {
Err(err) => return Poll::Ready(Err(err)),
Ok(RawVc::TaskCell(task, index)) => {
this.resolved = Some((task, index));
this.state = ReadRawVcState::Reading {
task,
index,
listener: None,
};
}
Ok(_) => unreachable!("ResolveRawVcFuture always resolves to a TaskCell"),
}
}

// --- Phase 2: read the cell content ---
//
// At this point `this.resolved` is `Some((task, index))`.
let (task, index) = this.resolved.unwrap();
let ReadRawVcState::Reading {
task,
index,
listener,
} = &mut this.state
else {
unreachable!("phase 1 transitioned to Reading above");
};
let task = *task;
let index = *index;
let read_cell_options = this.read_cell_options;

let poll_fn = |tt: &Arc<dyn TurboTasksApi>| -> Poll<Self::Output> {
loop {
ready!(poll_listener(&mut this.listener, cx));
let listener = match tt.try_read_task_cell(task, index, this.read_cell_options) {
ready!(poll_listener(listener, cx));
let new_listener = match tt.try_read_task_cell(task, index, read_cell_options) {
Ok(Ok(content)) => return Poll::Ready(Ok(content)),
Ok(Err(listener)) => listener,
Ok(Err(l)) => l,
Err(err) => return Poll::Ready(Err(err)),
};
this.listener = Some(listener);
*listener = Some(new_listener);
}
};

// Phase 2 must also suppress the top-level task check when phase 1 was
// strongly-consistent. The suppression from `ResolveRawVcFuture::poll` only lasts for
// the duration of that individual `poll` call and does not carry over to subsequent calls
// or to this phase.
suppress_top_level_task_check(this.resolve.strongly_consistent, || {
with_turbo_tasks(poll_fn)
})
suppress_top_level_task_check(this.strongly_consistent, || with_turbo_tasks(poll_fn))
}
}

Expand Down
10 changes: 4 additions & 6 deletions turbopack/crates/turbo-tasks/src/task/task_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,10 @@ where
self.node.is_transient()
}

async fn resolve_input(&self) -> Result<Self> {
Ok(*(*self).to_resolved().await?)
fn resolve_input(&self) -> impl Future<Output = Result<Self>> + Send + '_ {
// It isn't ideal to use this function but it exactly matches this usecase (resolved but
// still a Vc)
(*self).resolve()
}
}

Expand All @@ -276,10 +278,6 @@ where
fn is_transient(&self) -> bool {
self.node.is_transient()
}

async fn resolve_input(&self) -> Result<Self> {
Ok(*self)
}
}

impl<T> TaskInput for TransientValue<T>
Expand Down
Loading