diff --git a/Cargo.lock b/Cargo.lock index b1df08777cb05..fd715b0e92dc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9620,6 +9620,7 @@ dependencies = [ "anyhow", "futures", "rustc-hash 2.1.1", + "smallvec", "tokio", "turbo-tasks", ] diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 06d35fca998e3..e56657fc62feb 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -24,8 +24,8 @@ use smallvec::{SmallVec, smallvec}; use tokio::time::{Duration, Instant}; use tracing::{Span, trace_span}; use turbo_tasks::{ - CellId, FxDashMap, KeyValuePair, RawVc, ReadCellOptions, ReadConsistency, ReadOutputOptions, - ReadTracking, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, + CellId, FxDashMap, KeyValuePair, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, + ReadOutputOptions, ReadTracking, TRANSIENT_TASK_BIT, TaskExecutionReason, TaskId, TraitTypeId, TurboTasksBackendApi, ValueTypeId, backend::{ Backend, CachedTaskType, CellContent, TaskExecutionSpec, TransientTaskRoot, @@ -791,12 +791,14 @@ impl TurboTasksBackendInner { reader: Option, reader_task: Option, cell: CellId, + key: Option, ) { if let Some(mut reader_task) = reader_task && (!task.is_immutable() || cfg!(feature = "verify_immutable")) { let _ = task.add(CachedDataItem::CellDependent { cell, + key, task: reader.unwrap(), value: (), }); @@ -812,10 +814,14 @@ impl TurboTasksBackendInner { cell, }; if reader_task - .remove(&CachedDataItemKey::OutdatedCellDependency { target }) + .remove(&CachedDataItemKey::OutdatedCellDependency { target, key }) .is_none() { - let _ = reader_task.add(CachedDataItem::CellDependency { target, value: () }); + let _ = reader_task.add(CachedDataItem::CellDependency { + target, + key, + value: (), + }); } } } @@ -828,7 +834,7 @@ impl TurboTasksBackendInner { let mut ctx = self.execute_context(turbo_tasks); let (mut task, reader_task) = if self.should_track_dependencies() - && !matches!(tracking, ReadTracking::Untracked) + && !matches!(tracking, ReadCellTracking::Untracked) && let Some(reader_id) = reader && reader_id != task_id { @@ -848,7 +854,7 @@ impl TurboTasksBackendInner { }; if let Some(content) = content { if tracking.should_track(false) { - add_cell_dependency(task_id, task, reader, reader_task, cell); + add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key()); } return Ok(Ok(TypedCellContent( cell.type_id, @@ -875,7 +881,7 @@ impl TurboTasksBackendInner { .copied(); let Some(max_id) = max_id else { if tracking.should_track(true) { - add_cell_dependency(task_id, task, reader, reader_task, cell); + add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key()); } bail!( "Cell {cell:?} no longer exists in task {} (no cell of this type exists)", @@ -884,7 +890,7 @@ impl TurboTasksBackendInner { }; if cell.index >= max_id { if tracking.should_track(true) { - add_cell_dependency(task_id, task, reader, reader_task, cell); + add_cell_dependency(task_id, task, reader, reader_task, cell, tracking.key()); } bail!( "Cell {cell:?} no longer exists in task {} (index out of bounds)", @@ -1684,22 +1690,26 @@ impl TurboTasksBackendInner { if self.should_track_dependencies() { // Make all dependencies outdated let outdated_cell_dependencies_to_add = - iter_many!(task, CellDependency { target } => target) + iter_many!(task, CellDependency { target, key } => (target, key)) .collect::>(); let outdated_cell_dependencies_to_remove = - iter_many!(task, OutdatedCellDependency { target } => target) - .filter(|&target| { - !task.has_key(&CachedDataItemKey::CellDependency { target }) + iter_many!(task, OutdatedCellDependency { target, key } => (target, key)) + .filter(|&(target, key)| { + !task.has_key(&CachedDataItemKey::CellDependency { target, key }) }) .collect::>(); task.extend( CachedDataItemType::OutdatedCellDependency, outdated_cell_dependencies_to_add .into_iter() - .map(|target| CachedDataItem::OutdatedCellDependency { target, value: () }), + .map(|(target, key)| CachedDataItem::OutdatedCellDependency { + target, + key, + value: (), + }), ); - for target in outdated_cell_dependencies_to_remove { - task.remove(&CachedDataItemKey::OutdatedCellDependency { target }); + for (target, key) in outdated_cell_dependencies_to_remove { + task.remove(&CachedDataItemKey::OutdatedCellDependency { target, key }); } let outdated_output_dependencies_to_add = @@ -2000,7 +2010,7 @@ impl TurboTasksBackendInner { Some( // Collect all dependencies on tasks to check if all dependencies are immutable iter_many!(task, OutputDependency { target } => target) - .chain(iter_many!(task, CellDependency { target } => target.task)) + .chain(iter_many!(task, CellDependency { target, key: _ } => target.task)) .collect::>(), ) } else { @@ -2054,10 +2064,10 @@ impl TurboTasksBackendInner { ); if self.should_track_dependencies() { - old_edges.extend(iter_many!(task, OutdatedCellDependency { target } => OutdatedEdge::CellDependency(target))); + old_edges.extend(iter_many!(task, OutdatedCellDependency { target, key } => OutdatedEdge::CellDependency(target, key))); old_edges.extend(iter_many!(task, OutdatedOutputDependency { target } => OutdatedEdge::OutputDependency(target))); old_edges.extend( - iter_many!(task, CellDependent { cell, task } => (cell, task)).filter_map( + iter_many!(task, CellDependent { cell, task, key: _ } => (cell, task)).filter_map( |(cell, task)| { if cell_counters .get(&cell.type_id) @@ -2742,6 +2752,7 @@ impl TurboTasksBackendInner { cell: CellId, is_serializable_cell_content: bool, content: CellContent, + updated_key_hashes: Option>, verification_mode: VerificationMode, turbo_tasks: &dyn TurboTasksBackendApi>, ) { @@ -2750,6 +2761,7 @@ impl TurboTasksBackendInner { cell, content, is_serializable_cell_content, + updated_key_hashes, verification_mode, self.execute_context(turbo_tasks), ); @@ -3337,6 +3349,7 @@ impl Backend for TurboTasksBackend { cell: CellId, is_serializable_cell_content: bool, content: CellContent, + updated_key_hashes: Option>, verification_mode: VerificationMode, turbo_tasks: &dyn TurboTasksBackendApi, ) { @@ -3345,6 +3358,7 @@ impl Backend for TurboTasksBackend { cell, is_serializable_cell_content, content, + updated_key_hashes, verification_mode, turbo_tasks, ); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs index e18873d38280c..8407f219356a8 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs @@ -42,7 +42,7 @@ pub enum CleanupOldEdgesOperation { pub enum OutdatedEdge { Child(TaskId), Collectible(CollectibleRef, i32), - CellDependency(CellRef), + CellDependency(CellRef, Option), OutputDependency(TaskId), CollectiblesDependency(CollectiblesRef), RemovedCellDependent { @@ -160,14 +160,18 @@ impl Operation for CleanupOldEdgesOperation { AggregatedDataUpdate::new().collectibles_update(collectibles), )); } - OutdatedEdge::CellDependency(CellRef { - task: cell_task_id, - cell, - }) => { + OutdatedEdge::CellDependency( + CellRef { + task: cell_task_id, + cell, + }, + key, + ) => { { let mut task = ctx.task(cell_task_id, TaskDataCategory::Data); task.remove(&CachedDataItemKey::CellDependent { cell, + key, task: task_id, }); } @@ -178,6 +182,7 @@ impl Operation for CleanupOldEdgesOperation { task: cell_task_id, cell, }, + key, }); } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs index 2383da6ee3914..3c4c528a3a172 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs @@ -64,7 +64,7 @@ impl Operation for InvalidateOperation { make_task_dirty( task_id, #[cfg(feature = "trace_task_dirty")] - cause, + cause.clone(), &mut queue, ctx, ); @@ -90,11 +90,12 @@ impl Operation for InvalidateOperation { } #[cfg(feature = "trace_task_dirty")] -#[derive(Serialize, Deserialize, Clone, Copy, Debug)] +#[derive(Encode, Decode, Clone, Debug)] pub enum TaskDirtyCause { InitialDirty, CellChange { value_type: turbo_tasks::ValueTypeId, + keys: SmallVec<[Option; 2]>, }, CellRemoved { value_type: turbo_tasks::ValueTypeId, @@ -132,12 +133,27 @@ impl<'e, E: ExecuteContext<'e>> std::fmt::Display for TaskDirtyCauseInContext<'_ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.cause { TaskDirtyCause::InitialDirty => write!(f, "initial dirty"), - TaskDirtyCause::CellChange { value_type } => { - write!( - f, - "{} cell changed", - turbo_tasks::registry::get_value_type(*value_type).name - ) + TaskDirtyCause::CellChange { value_type, keys } => { + if keys.is_empty() { + write!( + f, + "{} cell changed", + turbo_tasks::registry::get_value_type(*value_type).name + ) + } else { + write!( + f, + "{} cell changed (keys: {})", + turbo_tasks::registry::get_value_type(*value_type).name, + keys.iter() + .map(|key| match key { + Some(k) => k.to_string(), + None => "*".to_string(), + }) + .collect::>() + .join(", ") + ) + } } TaskDirtyCause::CellRemoved { value_type } => { write!( diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 961fba638424a..f6f3d34248919 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -243,6 +243,8 @@ where if id.is_transient() { if call_prepared_task_callback_for_transient_tasks { let mut task = self.backend.storage.access_mut(id); + // TODO add is_restoring and avoid concurrent restores and duplicates tasks + // ids in `task_ids` if !task.state().is_restored(category) { task.state_mut().set_restored(TaskDataCategory::All); } @@ -1003,7 +1005,7 @@ impl TaskGuard for TaskGuardImpl<'_, B> { } self.task.state_mut().set_prefetched(true); let map = iter_many!(self, OutputDependency { target } => (target, TaskDataCategory::Meta)) - .chain(iter_many!(self, CellDependency { target } => (target.task, TaskDataCategory::All))) + .chain(iter_many!(self, CellDependency { target, key: _ } => (target.task, TaskDataCategory::All))) .chain(iter_many!(self, CollectiblesDependency { target } => (target.task, TaskDataCategory::All))) .chain(iter_many!(self, Child { task } => (task, TaskDataCategory::All))) .collect::>(); diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs index e5408cb1f1c5f..c76df6b0a594c 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs @@ -1,10 +1,12 @@ use std::mem::take; use bincode::{Decode, Encode}; +use once_cell::unsync::Lazy; +use rustc_hash::FxHashSet; use smallvec::SmallVec; #[cfg(not(feature = "verify_determinism"))] use turbo_tasks::backend::VerificationMode; -use turbo_tasks::{CellId, TaskId, TypedSharedReference, backend::CellContent}; +use turbo_tasks::{CellId, FxIndexMap, TaskId, TypedSharedReference, backend::CellContent}; #[cfg(feature = "trace_task_dirty")] use crate::backend::operation::invalidate::TaskDirtyCause; @@ -26,7 +28,10 @@ pub enum UpdateCellOperation { InvalidateWhenCellDependency { is_serializable_cell_content: bool, cell_ref: CellRef, - dependent_tasks: SmallVec<[TaskId; 4]>, + #[bincode(with = "turbo_bincode::indexmap")] + dependent_tasks: FxIndexMap; 2]>>, + #[cfg(feature = "trace_task_dirty")] + has_updated_key_hashes: bool, content: Option, queue: AggregationUpdateQueue, }, @@ -49,6 +54,7 @@ impl UpdateCellOperation { cell: CellId, content: CellContent, is_serializable_cell_content: bool, + updated_key_hashes: Option>, #[cfg(feature = "verify_determinism")] verification_mode: VerificationMode, #[cfg(not(feature = "verify_determinism"))] _verification_mode: VerificationMode, mut ctx: impl ExecuteContext, @@ -96,17 +102,31 @@ impl UpdateCellOperation { // When not recomputing, we need to notify dependent tasks if the content actually // changes. - let dependent_tasks: SmallVec<[TaskId; 4]> = iter_many!( + #[cfg(feature = "trace_task_dirty")] + let has_updated_key_hashes = updated_key_hashes.is_some(); + let updated_key_hashes_set = updated_key_hashes.map(|updated_key_hashes| { + Lazy::new(|| updated_key_hashes.into_iter().collect::>()) + }); + + let tasks_with_keys = iter_many!( task, - CellDependent { cell: dependent_cell, task } - if dependent_cell == cell - => task + CellDependent { cell: dependent_cell, key, task } + if dependent_cell == cell && key.is_none_or(|key_hash| { + updated_key_hashes_set.as_ref().is_none_or(|set| { + set.contains(&key_hash) + }) + }) + => (task, key) ) - .filter(|&dependent_task_id| { + .filter(|&(dependent_task_id, _)| { // once tasks are never invalidated !ctx.is_once_task(dependent_task_id) - }) - .collect(); + }); + let mut dependent_tasks: FxIndexMap; 2]>> = + FxIndexMap::default(); + for (task, key) in tasks_with_keys { + dependent_tasks.entry(task).or_default().push(key); + } if !dependent_tasks.is_empty() { // Slow path: We need to invalidate tasks depending on this cell. @@ -132,7 +152,7 @@ impl UpdateCellOperation { ctx.prepare_tasks( dependent_tasks - .iter() + .keys() .map(|&id| (id, TaskDataCategory::All)), ); @@ -143,6 +163,8 @@ impl UpdateCellOperation { cell, }, dependent_tasks, + #[cfg(feature = "trace_task_dirty")] + has_updated_key_hashes, content, queue: AggregationUpdateQueue::new(), } @@ -204,27 +226,36 @@ impl Operation for UpdateCellOperation { is_serializable_cell_content, cell_ref, ref mut dependent_tasks, + #[cfg(feature = "trace_task_dirty")] + has_updated_key_hashes, ref mut content, ref mut queue, } => { - if let Some(dependent_task_id) = dependent_tasks.pop() { - let mut make_stale = true; + if let Some((dependent_task_id, keys)) = dependent_tasks.pop() { + let mut make_stale = false; let dependent = ctx.task(dependent_task_id, TaskDataCategory::All); - if dependent.has_key(&CachedDataItemKey::OutdatedCellDependency { - target: cell_ref, - }) { - // cell dependency is outdated, so it hasn't read the cell yet - // and doesn't need to be invalidated. - // But importantly we still need to make the task dirty as it should no - // longer be considered as "recomputation". - make_stale = false; - } else if !dependent - .has_key(&CachedDataItemKey::CellDependency { target: cell_ref }) - { - // cell dependency has been removed, so the task doesn't depend on the - // cell anymore and doesn't need to be - // invalidated - continue; + for key in keys.iter().copied() { + if dependent.has_key(&CachedDataItemKey::OutdatedCellDependency { + target: cell_ref, + key, + }) { + // cell dependency is outdated, so it hasn't read the cell yet + // and doesn't need to be invalidated. + // We do not need to make the task stale in this case. + // But importantly we still need to make the task dirty as it should + // no longer be considered as + // "recomputation". + } else if !dependent.has_key(&CachedDataItemKey::CellDependency { + target: cell_ref, + key, + }) { + // cell dependency has been removed, so the task doesn't depend on + // the cell anymore and doesn't need + // to be invalidated + continue; + } else { + make_stale = true; + } } make_task_dirty_internal( dependent, @@ -233,6 +264,7 @@ impl Operation for UpdateCellOperation { #[cfg(feature = "trace_task_dirty")] TaskDirtyCause::CellChange { value_type: cell_ref.cell.type_id, + keys: has_updated_key_hashes.then_some(keys).unwrap_or_default(), }, queue, ctx, diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index bff8af1c692df..612984cbbd745 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -263,6 +263,7 @@ pub enum CachedDataItem { }, CellDependency { target: CellRef, + key: Option, value: (), }, CollectiblesDependency { @@ -277,6 +278,7 @@ pub enum CachedDataItem { }, CellDependent { cell: CellId, + key: Option, task: TaskId, value: (), }, @@ -363,6 +365,8 @@ pub enum CachedDataItem { #[bincode(skip, default = "unreachable_decode")] target: CellRef, #[bincode(skip, default = "unreachable_decode")] + key: Option, + #[bincode(skip, default = "unreachable_decode")] value: (), }, OutdatedCollectiblesDependency { @@ -669,8 +673,8 @@ mod tests { #[test] fn test_sizes() { assert_eq!(std::mem::size_of::(), 40); - assert_eq!(std::mem::size_of::(), 20); + assert_eq!(std::mem::size_of::(), 32); assert_eq!(std::mem::size_of::(), 32); - assert_eq!(std::mem::size_of::(), 48); + assert_eq!(std::mem::size_of::(), 56); } } diff --git a/turbopack/crates/turbo-tasks-backend/tests/invalidation.rs b/turbopack/crates/turbo-tasks-backend/tests/invalidation.rs new file mode 100644 index 0000000000000..d3b052348e77d --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/tests/invalidation.rs @@ -0,0 +1,175 @@ +#![feature(arbitrary_self_types)] +#![feature(arbitrary_self_types_pointers)] +#![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this + +use anyhow::Result; +use rustc_hash::{FxHashMap, FxHashSet}; +use turbo_tasks::{OperationVc, ResolvedVc, State, Vc}; +use turbo_tasks_testing::{Registration, register, run}; + +static REGISTRATION: Registration = register!(); + +#[turbo_tasks::value(transparent)] +struct Step(State); + +#[turbo_tasks::function] +fn create_state() -> Vc { + Step(State::new(0)).cell() +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn invalidation_map() { + run(®ISTRATION, || async { + let state = create_state().to_resolved().await?; + state.await?.set(1); + + let map = create_map(state); + let a = get_value(map, "a".to_string()); + let b = get_value(map, "b".to_string()); + let c = get_value(map, "c".to_string()); + + let a_ref = a.read_strongly_consistent().await?; + let b_ref = b.read_strongly_consistent().await?; + let c_ref = c.read_strongly_consistent().await?; + + assert_eq!(a_ref.value, Some(1)); + assert_eq!(b_ref.value, Some(2)); + assert_eq!(c_ref.value, None); + + state.await?.set(2); + + let a_ref2 = a.read_strongly_consistent().await?; + let b_ref2 = b.read_strongly_consistent().await?; + let c_ref2 = c.read_strongly_consistent().await?; + + assert_eq!(a_ref2.value, Some(1)); + assert_eq!(b_ref2.value, Some(22)); + assert_eq!(c_ref2.value, None); + assert_eq!(a_ref.random, a_ref2.random); + assert_eq!(c_ref.random, c_ref2.random); + + state.await?.set(3); + + let a_ref3 = a.read_strongly_consistent().await?; + let b_ref3 = b.read_strongly_consistent().await?; + let c_ref3 = c.read_strongly_consistent().await?; + + assert_eq!(a_ref3.value, None); + assert_eq!(b_ref3.value, Some(22)); + assert_eq!(c_ref3.value, Some(3)); + assert_eq!(b_ref2.random, b_ref3.random); + + anyhow::Ok(()) + }) + .await + .unwrap() +} + +#[turbo_tasks::value(transparent, cell = "keyed")] +struct Map(FxHashMap); + +#[turbo_tasks::function(operation)] +async fn create_map(step: ResolvedVc) -> Result> { + let step = step.await?; + let step_value = step.get(); + + Ok(Vc::cell(match *step_value { + 1 => FxHashMap::from_iter([("a".to_string(), 1), ("b".to_string(), 2)]), + 2 => FxHashMap::from_iter([("a".to_string(), 1), ("b".to_string(), 22)]), + 3 => FxHashMap::from_iter([("c".to_string(), 3), ("b".to_string(), 22)]), + _ => FxHashMap::default(), + })) +} + +#[turbo_tasks::value] +struct GetValueResult { + value: Option, + random: u32, +} + +#[turbo_tasks::function(operation)] +async fn get_value(map: OperationVc, key: String) -> Result> { + let map = map.connect(); + let value = map.get(&key).await?.as_deref().copied(); + let random = rand::random::(); + Ok(GetValueResult { value, random }.cell()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn invalidation_set() { + run(®ISTRATION, || async { + let state = create_state().to_resolved().await?; + state.await?.set(1); + + let set = create_set(state); + let a = has_value(set, "a".to_string()); + let b = has_value(set, "b".to_string()); + let c = has_value(set, "c".to_string()); + + let a_ref = a.read_strongly_consistent().await?; + let b_ref = b.read_strongly_consistent().await?; + let c_ref = c.read_strongly_consistent().await?; + + assert!(a_ref.value); + assert!(b_ref.value); + assert!(!c_ref.value); + + state.await?.set(2); + + let a_ref2 = a.read_strongly_consistent().await?; + let b_ref2 = b.read_strongly_consistent().await?; + let c_ref2 = c.read_strongly_consistent().await?; + + assert!(a_ref2.value); + assert!(b_ref2.value); + assert!(!c_ref2.value); + assert_eq!(a_ref.random, a_ref2.random); + assert_eq!(b_ref.random, b_ref2.random); + assert_eq!(c_ref.random, c_ref2.random); + + state.await?.set(3); + + let a_ref3 = a.read_strongly_consistent().await?; + let b_ref3 = b.read_strongly_consistent().await?; + let c_ref3 = c.read_strongly_consistent().await?; + + assert!(!a_ref3.value); + assert!(b_ref3.value); + assert!(c_ref3.value); + assert_eq!(b_ref2.random, b_ref3.random); + + anyhow::Ok(()) + }) + .await + .unwrap() +} + +#[turbo_tasks::value(transparent, cell = "keyed")] +struct Set(FxHashSet); + +#[turbo_tasks::function(operation)] +async fn create_set(step: ResolvedVc) -> Result> { + let step = step.await?; + let step_value = step.get(); + + Ok(Vc::cell(match *step_value { + 1 => FxHashSet::from_iter(["a".to_string(), "b".to_string()]), + 2 => FxHashSet::from_iter(["e".to_string(), "a".to_string(), "b".to_string()]), + 3 => FxHashSet::from_iter(["c".to_string(), "b".to_string()]), + _ => FxHashSet::default(), + })) +} + +#[turbo_tasks::value] +struct HasValueResult { + value: bool, + random: u32, +} + +#[turbo_tasks::function(operation)] +async fn has_value(set: OperationVc, key: String) -> Result> { + let set = set.connect(); + let value = set.contains_key(&key).await?; + let random = rand::random::(); + Ok(HasValueResult { value, random }.cell()) +} diff --git a/turbopack/crates/turbo-tasks-macros/src/value_macro.rs b/turbopack/crates/turbo-tasks-macros/src/value_macro.rs index 34063874bc894..ee7ee9ea6c91c 100644 --- a/turbopack/crates/turbo-tasks-macros/src/value_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/value_macro.rs @@ -15,6 +15,7 @@ use syn::{ use crate::{global_name::global_name, ident::get_value_type_ident}; enum CellMode { + KeyedCompare, Compare, New, } @@ -31,9 +32,13 @@ impl TryFrom for CellMode { fn try_from(lit: LitStr) -> Result { match lit.value().as_str() { + "keyed" => Ok(CellMode::KeyedCompare), "compare" => Ok(CellMode::Compare), "new" => Ok(CellMode::New), - _ => Err(Error::new_spanned(&lit, "expected \"new\" or \"compare\"")), + _ => Err(Error::new_spanned( + &lit, + "expected \"new\", \"keyed\", or \"compare\"", + )), } } } @@ -275,6 +280,9 @@ pub fn value(args: TokenStream, input: TokenStream) -> TokenStream { CellMode::Compare => quote! { turbo_tasks::VcCellCompareMode<#ident> }, + CellMode::KeyedCompare => quote! { + turbo_tasks::VcCellKeyedCompareMode<#ident> + }, }; let cell_struct = quote! { diff --git a/turbopack/crates/turbo-tasks-testing/Cargo.toml b/turbopack/crates/turbo-tasks-testing/Cargo.toml index 827cc45f62f49..ca841f422bb8f 100644 --- a/turbopack/crates/turbo-tasks-testing/Cargo.toml +++ b/turbopack/crates/turbo-tasks-testing/Cargo.toml @@ -17,5 +17,6 @@ workspace = true anyhow = { workspace = true } futures = { workspace = true } rustc-hash = { workspace = true } +smallvec = { workspace = true } tokio = { workspace = true } turbo-tasks = { workspace = true, features = ["non_operation_vc_strongly_consistent"] } diff --git a/turbopack/crates/turbo-tasks-testing/src/lib.rs b/turbopack/crates/turbo-tasks-testing/src/lib.rs index 2ce5d7b3791e9..2fbf2278e603e 100644 --- a/turbopack/crates/turbo-tasks-testing/src/lib.rs +++ b/turbopack/crates/turbo-tasks-testing/src/lib.rs @@ -14,6 +14,7 @@ use std::{ use anyhow::{Result, anyhow}; use futures::FutureExt; use rustc_hash::FxHashMap; +use smallvec::SmallVec; use tokio::sync::mpsc::Receiver; use turbo_tasks::{ CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadCellOptions, @@ -267,6 +268,7 @@ impl TurboTasksApi for VcStorage { index: CellId, _is_serializable_cell_content: bool, content: CellContent, + _updated_key_hashes: Option>, _verification_mode: VerificationMode, ) { let mut map = self.cells.lock().unwrap(); diff --git a/turbopack/crates/turbo-tasks/src/backend.rs b/turbopack/crates/turbo-tasks/src/backend.rs index 9a11ce9787da7..77b1808fb527d 100644 --- a/turbopack/crates/turbo-tasks/src/backend.rs +++ b/turbopack/crates/turbo-tasks/src/backend.rs @@ -16,6 +16,7 @@ use bincode::{ impl_borrow_decode, }; use rustc_hash::FxHasher; +use smallvec::SmallVec; use tracing::Span; use turbo_bincode::{ TurboBincodeDecode, TurboBincodeDecoder, TurboBincodeEncode, TurboBincodeEncoder, @@ -508,6 +509,7 @@ pub trait Backend: Sync + Send { index: CellId, is_serializable_cell_content: bool, content: CellContent, + updated_key_hashes: Option>, verification_mode: VerificationMode, turbo_tasks: &dyn TurboTasksBackendApi, ); diff --git a/turbopack/crates/turbo-tasks/src/keyed.rs b/turbopack/crates/turbo-tasks/src/keyed.rs new file mode 100644 index 0000000000000..3824f7d9254f0 --- /dev/null +++ b/turbopack/crates/turbo-tasks/src/keyed.rs @@ -0,0 +1,250 @@ +use std::{ + collections::{HashMap, HashSet}, + hash::{BuildHasher, Hash}, +}; + +use indexmap::{IndexMap, IndexSet}; +use smallvec::SmallVec; + +pub trait Keyed { + type Key; + type Value; + + fn different_keys<'l>(&'l self, other: &'l Self) -> SmallVec<[&'l Self::Key; 2]>; + fn get(&self, key: &Self::Key) -> Option<&Self::Value>; + fn contains_key(&self, key: &Self::Key) -> bool { + self.get(key).is_some() + } +} + +impl Keyed for HashMap { + type Key = K; + type Value = V; + + fn different_keys<'l>(&'l self, other: &'l Self) -> SmallVec<[&'l Self::Key; 2]> { + let mut different_keys = SmallVec::new(); + + for (key, value) in self.iter() { + if let Some(other_value) = other.get(key) { + if value != other_value { + different_keys.push(key); + } + } else { + different_keys.push(key); + } + } + + if other.len() != self.len() || !different_keys.is_empty() { + for key in other.keys() { + if !self.contains_key(key) { + different_keys.push(key); + } + } + } + + different_keys + } + + fn get(&self, key: &Self::Key) -> Option<&Self::Value> { + self.get(key) + } + + fn contains_key(&self, key: &Self::Key) -> bool { + self.contains_key(key) + } +} + +impl Keyed for IndexMap { + type Key = K; + type Value = V; + + fn different_keys<'l>(&'l self, other: &'l Self) -> SmallVec<[&'l Self::Key; 2]> { + let mut different_keys = SmallVec::new(); + + for (key, value) in self.iter() { + if let Some(other_value) = other.get(key) { + if value != other_value { + different_keys.push(key); + } + } else { + different_keys.push(key); + } + } + + if other.len() != self.len() || !different_keys.is_empty() { + for key in other.keys() { + if !self.contains_key(key) { + different_keys.push(key); + } + } + } + + different_keys + } + + fn get(&self, key: &Self::Key) -> Option<&Self::Value> { + self.get(key) + } + + fn contains_key(&self, key: &Self::Key) -> bool { + self.contains_key(key) + } +} + +impl Keyed for HashSet { + type Key = K; + type Value = (); + + fn different_keys<'l>(&'l self, other: &'l Self) -> SmallVec<[&'l Self::Key; 2]> { + let mut different_keys = SmallVec::new(); + + for key in self.iter() { + if !other.contains(key) { + different_keys.push(key); + } + } + + if other.len() != self.len() || !different_keys.is_empty() { + for key in other.iter() { + if !self.contains(key) { + different_keys.push(key); + } + } + } + + different_keys + } + + fn get(&self, key: &Self::Key) -> Option<&Self::Value> { + if self.contains(key) { Some(&()) } else { None } + } + + fn contains_key(&self, key: &Self::Key) -> bool { + self.contains(key) + } +} + +impl Keyed for IndexSet { + type Key = K; + type Value = (); + + fn different_keys<'l>(&'l self, other: &'l Self) -> SmallVec<[&'l Self::Key; 2]> { + let mut different_keys = SmallVec::new(); + + for key in self.iter() { + if !other.contains(key) { + different_keys.push(key); + } + } + + if other.len() != self.len() || !different_keys.is_empty() { + for key in other.iter() { + if !self.contains(key) { + different_keys.push(key); + } + } + } + + different_keys + } + + fn get(&self, key: &Self::Key) -> Option<&Self::Value> { + if self.contains(key) { Some(&()) } else { None } + } + + fn contains_key(&self, key: &Self::Key) -> bool { + self.contains(key) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn abc() -> [(&'static str, i32); 3] { + [("a", 1), ("b", 2), ("c", 3)] + } + + fn abd() -> [(&'static str, i32); 3] { + [("a", 1), ("b", 22), ("d", 4)] + } + + fn ab() -> [(&'static str, i32); 2] { + [("a", 1), ("b", 2)] + } + + fn abcde() -> [(&'static str, i32); 5] { + [("a", 1), ("b", 2), ("c", 3), ("d", 4), ("e", 5)] + } + + fn edcba() -> [(&'static str, i32); 5] { + [("e", 5), ("d", 4), ("c", 3), ("b", 2), ("a", 1)] + } + + fn assert_diff(a: &T, b: &T, expected: &[&T::Key]) + where + T::Key: std::fmt::Debug + PartialEq, + { + let diffs = a.different_keys(b); + assert_eq!(diffs.len(), expected.len()); + for key in expected { + assert!(diffs.contains(key)); + } + } + + #[test] + fn test_hash_map_diff() { + let map1 = HashMap::from(abc()); + let map2 = HashMap::from(abd()); + assert_diff(&map1, &map2, &[&"b", &"c", &"d"]); + } + + #[test] + fn test_index_map_diff() { + let map1 = IndexMap::from(abc()); + let map2 = IndexMap::from(abd()); + assert_diff(&map1, &map2, &[&"b", &"c", &"d"]); + } + + #[test] + fn test_hash_map_equal() { + let map1 = HashMap::from(abcde()); + let map2 = HashMap::from(edcba()); + assert_diff(&map1, &map2, &[]); + } + + #[test] + fn test_index_map_equal() { + let map1 = IndexMap::from(abcde()); + let map2 = IndexMap::from(edcba()); + assert_diff(&map1, &map2, &[]); + } + + #[test] + fn test_hash_map_add_key() { + let map1 = HashMap::from(ab()); + let map2 = HashMap::from(abc()); + assert_diff(&map1, &map2, &[&"c"]); + } + + #[test] + fn test_index_map_add_key() { + let map1 = IndexMap::from(ab()); + let map2 = IndexMap::from(abc()); + assert_diff(&map1, &map2, &[&"c"]); + } + + #[test] + fn test_hash_map_remove_key() { + let map1 = HashMap::from(abc()); + let map2 = HashMap::from(ab()); + assert_diff(&map1, &map2, &[&"c"]); + } + + #[test] + fn test_index_map_remove_key() { + let map1 = IndexMap::from(abc()); + let map2 = IndexMap::from(ab()); + assert_diff(&map1, &map2, &[&"c"]); + } +} diff --git a/turbopack/crates/turbo-tasks/src/keyed_read_ref.rs b/turbopack/crates/turbo-tasks/src/keyed_read_ref.rs new file mode 100644 index 0000000000000..e23800cefb387 --- /dev/null +++ b/turbopack/crates/turbo-tasks/src/keyed_read_ref.rs @@ -0,0 +1,150 @@ +use std::fmt::{Debug, Display}; + +use serde::Serialize; + +use crate::{ + debug::{ValueDebugFormat, ValueDebugFormatString}, + keyed::Keyed, + trace::{TraceRawVcs, TraceRawVcsContext}, +}; + +pub struct MappedReadRef { + value: *const T, + arc: triomphe::Arc, +} + +impl MappedReadRef { + /// # Safety + /// The caller must ensure that the `arc` keeps the value pointed to by `value` alive. + pub unsafe fn new(arc: triomphe::Arc, value: *const T) -> Self { + Self { value, arc } + } +} + +impl MappedReadRef { + pub fn ptr_eq(&self, other: &Self) -> bool { + std::ptr::eq(self.value, other.value) + } + + pub fn ptr(&self) -> *const T { + self.value + } +} + +impl Clone for MappedReadRef { + fn clone(&self) -> Self { + Self { + value: self.value, + arc: self.arc.clone(), + } + } +} + +impl std::ops::Deref for MappedReadRef { + type Target = T; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.value } + } +} + +impl Debug for MappedReadRef +where + T: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + (**self).fmt(f) + } +} + +impl Display for MappedReadRef +where + T: Display, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&**self, f) + } +} + +impl PartialEq for MappedReadRef +where + T: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + **self == **other + } +} + +impl Eq for MappedReadRef where T: Eq {} + +impl PartialOrd for MappedReadRef +where + T: PartialOrd, +{ + fn partial_cmp(&self, other: &Self) -> Option { + (**self).partial_cmp(&**other) + } +} + +impl Ord for MappedReadRef +where + T: Ord, +{ + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (**self).cmp(&**other) + } +} + +impl std::hash::Hash for MappedReadRef +where + T: std::hash::Hash, +{ + fn hash(&self, state: &mut H) { + (**self).hash(state); + } +} + +impl ValueDebugFormat for MappedReadRef +where + T: ValueDebugFormat, +{ + fn value_debug_format(&self, depth: usize) -> ValueDebugFormatString<'_> { + let value = &**self; + value.value_debug_format(depth) + } +} + +impl TraceRawVcs for MappedReadRef +where + T: TraceRawVcs, +{ + fn trace_raw_vcs(&self, trace_context: &mut TraceRawVcsContext) { + (**self).trace_raw_vcs(trace_context); + } +} + +impl> IntoIterator for &MappedReadRef +where + T: Keyed, + for<'b> &'b T: IntoIterator, +{ + type Item = I; + + type IntoIter = J; + + fn into_iter(self) -> Self::IntoIter { + (&**self).into_iter() + } +} + +impl Serialize for MappedReadRef +where + T: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + (**self).serialize(serializer) + } +} diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index 6d00a5a75f267..92265a9ff5696 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -54,6 +54,8 @@ mod id_factory; mod invalidation; mod join_iter_ext; mod key_value_pair; +pub mod keyed; +pub mod keyed_read_ref; #[doc(hidden)] pub mod macro_helpers; mod magic_any; @@ -109,11 +111,11 @@ pub use crate::{ key_value_pair::KeyValuePair, magic_any::MagicAny, manager::{ - CurrentCellRef, ReadConsistency, ReadTracking, TaskPersistence, TurboTasks, TurboTasksApi, - TurboTasksBackendApi, TurboTasksCallApi, Unused, UpdateInfo, dynamic_call, emit, - get_serialization_invalidator, mark_finished, mark_root, mark_session_dependent, - prevent_gc, run, run_once, run_once_with_reason, trait_call, turbo_tasks, - turbo_tasks_scope, turbo_tasks_weak, with_turbo_tasks, + CurrentCellRef, ReadCellTracking, ReadConsistency, ReadTracking, TaskPersistence, + TurboTasks, TurboTasksApi, TurboTasksBackendApi, TurboTasksCallApi, Unused, UpdateInfo, + dynamic_call, emit, get_serialization_invalidator, mark_finished, mark_root, + mark_session_dependent, prevent_gc, run, run_once, run_once_with_reason, trait_call, + turbo_tasks, turbo_tasks_scope, turbo_tasks_weak, with_turbo_tasks, }, output::OutputContent, raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError}, @@ -132,9 +134,9 @@ pub use crate::{ value_type::{TraitMethod, TraitType, ValueType}, vc::{ Dynamic, NonLocalValue, OperationValue, OperationVc, OptionVcExt, ReadVcFuture, ResolvedVc, - Upcast, UpcastStrict, ValueDefault, Vc, VcCast, VcCellCompareMode, VcCellNewMode, - VcDefaultRead, VcRead, VcTransparentRead, VcValueTrait, VcValueTraitCast, VcValueType, - VcValueTypeCast, + Upcast, UpcastStrict, ValueDefault, Vc, VcCast, VcCellCompareMode, VcCellKeyedCompareMode, + VcCellNewMode, VcDefaultRead, VcRead, VcTransparentRead, VcValueTrait, VcValueTraitCast, + VcValueType, VcValueTypeCast, }, }; diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index f0b7756d7832c..3d1b2c4a3450a 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -1,7 +1,7 @@ use std::{ fmt::Display, future::Future, - hash::BuildHasherDefault, + hash::{BuildHasher, BuildHasherDefault}, mem::take, pin::Pin, sync::{ @@ -14,8 +14,9 @@ use std::{ use anyhow::{Result, anyhow}; use auto_hash_map::AutoMap; use bincode::{Decode, Encode}; -use rustc_hash::FxHasher; +use rustc_hash::{FxBuildHasher, FxHasher}; use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; use tokio::{select, sync::mpsc::Receiver, task_local}; use tokio_util::task::TaskTracker; use tracing::{Instrument, instrument}; @@ -32,6 +33,7 @@ use crate::{ event::{Event, EventListener}, id::{ExecutionId, LocalTaskId, TRANSIENT_TASK_BIT, TraitTypeId}, id_factory::IdFactoryWithReuse, + keyed::Keyed, macro_helpers::NativeFunction, magic_any::MagicAny, message_queue::{CompilationEvent, CompilationEventQueue}, @@ -162,6 +164,7 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send { index: CellId, is_serializable_cell_content: bool, content: CellContent, + updated_key_hashes: Option>, verification_mode: VerificationMode, ); fn mark_own_task_as_finished(&self, task: TaskId); @@ -305,6 +308,60 @@ pub enum ReadConsistency { Strong, } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ReadCellTracking { + /// Reads are tracked as dependencies of the current task. + Tracked { + /// The key used for the dependency + key: Option, + }, + /// The read is only tracked when there is an error, otherwise it is untracked. + /// + /// INVALIDATION: Be careful with this, it will not track dependencies, so + /// using it could break cache invalidation. + TrackOnlyError, + /// The read is not tracked as a dependency of the current task. + /// + /// INVALIDATION: Be careful with this, it will not track dependencies, so + /// using it could break cache invalidation. + Untracked, +} + +impl ReadCellTracking { + pub fn should_track(&self, is_err: bool) -> bool { + match self { + ReadCellTracking::Tracked { .. } => true, + ReadCellTracking::TrackOnlyError => is_err, + ReadCellTracking::Untracked => false, + } + } + + pub fn key(&self) -> Option { + match self { + ReadCellTracking::Tracked { key } => *key, + ReadCellTracking::TrackOnlyError => None, + ReadCellTracking::Untracked => None, + } + } +} + +impl Default for ReadCellTracking { + fn default() -> Self { + ReadCellTracking::Tracked { key: None } + } +} + +impl Display for ReadCellTracking { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ReadCellTracking::Tracked { key: None } => write!(f, "tracked"), + ReadCellTracking::Tracked { key: Some(key) } => write!(f, "tracked with key {key}"), + ReadCellTracking::TrackOnlyError => write!(f, "track only error"), + ReadCellTracking::Untracked => write!(f, "untracked"), + } + } +} + #[derive(Clone, Copy, Debug, Eq, PartialEq, Default)] pub enum ReadTracking { /// Reads are tracked as dependencies of the current task. @@ -1343,6 +1400,7 @@ impl TurboTasksApi for TurboTasks { index: CellId, is_serializable_cell_content: bool, content: CellContent, + updated_key_hashes: Option>, verification_mode: VerificationMode, ) { self.backend.update_task_cell( @@ -1350,6 +1408,7 @@ impl TurboTasksApi for TurboTasks { index, is_serializable_cell_content, content, + updated_key_hashes, verification_mode, self, ); @@ -1728,28 +1787,36 @@ pub struct CurrentCellRef { } type VcReadRepr = <::Read as VcRead>::Repr; +type VcReadTarget = <::Read as VcRead>::Target; impl CurrentCellRef { /// Updates the cell if the given `functor` returns a value. - pub fn conditional_update(&self, functor: impl FnOnce(Option<&T>) -> Option) - where + fn conditional_update( + &self, + functor: impl FnOnce(Option<&T>) -> Option<(T, Option>)>, + ) where T: VcValueType, { self.conditional_update_with_shared_reference(|old_shared_reference| { let old_ref = old_shared_reference .and_then(|sr| sr.0.downcast_ref::>()) .map(|content| >::repr_to_value_ref(content)); - let new_value = functor(old_ref)?; - Some(SharedReference::new(triomphe::Arc::new( - >::value_to_repr(new_value), - ))) + let (new_value, updated_key_hashes) = functor(old_ref)?; + Some(( + SharedReference::new(triomphe::Arc::new(>::value_to_repr( + new_value, + ))), + updated_key_hashes, + )) }) } /// Updates the cell if the given `functor` returns a `SharedReference`. - pub fn conditional_update_with_shared_reference( + fn conditional_update_with_shared_reference( &self, - functor: impl FnOnce(Option<&SharedReference>) -> Option, + functor: impl FnOnce( + Option<&SharedReference>, + ) -> Option<(SharedReference, Option>)>, ) { let tt = turbo_tasks(); let cell_content = tt @@ -1758,19 +1825,20 @@ impl CurrentCellRef { self.index, ReadCellOptions { // INVALIDATION: Reading our own cell must be untracked - tracking: ReadTracking::Untracked, + tracking: ReadCellTracking::Untracked, is_serializable_cell_content: self.is_serializable_cell_content, final_read_hint: false, }, ) .ok(); let update = functor(cell_content.as_ref().and_then(|cc| cc.1.0.as_ref())); - if let Some(update) = update { + if let Some((update, updated_key_hashes)) = update { tt.update_own_task_cell( self.current_task, self.index, self.is_serializable_cell_content, CellContent(Some(update)), + updated_key_hashes, VerificationMode::EqualityCheck, ) } @@ -1819,7 +1887,7 @@ impl CurrentCellRef { { return None; } - Some(new_value) + Some((new_value, None)) }); } @@ -1835,21 +1903,72 @@ impl CurrentCellRef { where T: VcValueType + PartialEq, { - fn extract_sr_value(sr: &SharedReference) -> &T { - >::repr_to_value_ref( - sr.0.downcast_ref::>() - .expect("cannot update SharedReference of different type"), - ) - } self.conditional_update_with_shared_reference(|old_sr| { if let Some(old_sr) = old_sr { - let old_value: &T = extract_sr_value(old_sr); - let new_value = extract_sr_value(&new_shared_reference); + let old_value = extract_sr_value::(old_sr)?; + let new_value = extract_sr_value::(&new_shared_reference)?; if old_value == new_value { return None; } } - Some(new_shared_reference) + Some((new_shared_reference, None)) + }); + } + + /// See [`Self::compare_and_update`], but selectively update individual keys. + pub fn keyed_compare_and_update(&self, new_value: T) + where + T: PartialEq + VcValueType, + VcReadTarget: Keyed, + as Keyed>::Key: std::hash::Hash, + { + self.conditional_update(|old_value| { + let Some(old_value) = old_value else { + return Some((new_value, None)); + }; + let old_value = ::Read::value_to_target_ref(old_value); + let new_value_ref = ::Read::value_to_target_ref(&new_value); + let updated_keys = old_value.different_keys(new_value_ref); + if updated_keys.is_empty() { + return None; + } + // Duplicates are very unlikely, but ok since the backend is deduplicating them + let updated_key_hashes = updated_keys + .into_iter() + .map(|key| FxBuildHasher.hash_one(key)) + .collect(); + Some((new_value, Some(updated_key_hashes))) + }); + } + + /// See [`Self::compare_and_update_with_shared_reference`], but selectively update individual + /// keys. + pub fn keyed_compare_and_update_with_shared_reference( + &self, + new_shared_reference: SharedReference, + ) where + T: VcValueType + PartialEq, + VcReadTarget: Keyed, + as Keyed>::Key: std::hash::Hash, + { + self.conditional_update_with_shared_reference(|old_sr| { + let Some(old_sr) = old_sr else { + return Some((new_shared_reference, None)); + }; + let old_value = extract_sr_value::(old_sr)?; + let old_value = ::Read::value_to_target_ref(old_value); + let new_value = extract_sr_value::(&new_shared_reference)?; + let new_value = ::Read::value_to_target_ref(new_value); + let updated_keys = old_value.different_keys(new_value); + if updated_keys.is_empty() { + return None; + } + // Duplicates are very unlikely, but ok since the backend is deduplicating them + let updated_key_hashes = updated_keys + .into_iter() + .map(|key| FxBuildHasher.hash_one(key)) + .collect(); + Some((new_shared_reference, Some(updated_key_hashes))) }); } @@ -1866,6 +1985,7 @@ impl CurrentCellRef { CellContent(Some(SharedReference::new(triomphe::Arc::new( >::value_to_repr(new_value), )))), + None, verification_mode, ) } @@ -1891,7 +2011,7 @@ impl CurrentCellRef { self.index, ReadCellOptions { // INVALIDATION: Reading our own cell must be untracked - tracking: ReadTracking::Untracked, + tracking: ReadCellTracking::Untracked, is_serializable_cell_content: self.is_serializable_cell_content, final_read_hint: false, }, @@ -1912,6 +2032,7 @@ impl CurrentCellRef { self.index, self.is_serializable_cell_content, CellContent(Some(shared_ref)), + None, verification_mode, ) } @@ -1924,6 +2045,11 @@ impl From for RawVc { } } +fn extract_sr_value(sr: &SharedReference) -> Option<&T> { + sr.0.downcast_ref::>() + .map(>::repr_to_value_ref) +} + pub fn find_cell_by_type() -> CurrentCellRef { find_cell_by_id(T::get_value_type_id(), T::has_serialization()) } diff --git a/turbopack/crates/turbo-tasks/src/raw_vc.rs b/turbopack/crates/turbo-tasks/src/raw_vc.rs index 2b8553eb135f3..d7a323bf97ec1 100644 --- a/turbopack/crates/turbo-tasks/src/raw_vc.rs +++ b/turbopack/crates/turbo-tasks/src/raw_vc.rs @@ -18,7 +18,8 @@ use crate::{ event::EventListener, id::{ExecutionId, LocalTaskId}, manager::{ - ReadTracking, read_local_output, read_task_cell, read_task_output, with_turbo_tasks, + ReadCellTracking, ReadTracking, read_local_output, read_task_cell, read_task_output, + with_turbo_tasks, }, registry::{self, get_value_type}, turbo_tasks, @@ -212,8 +213,7 @@ impl RawVc { index, ReadCellOptions { is_serializable_cell_content: value_type.bincode.is_some(), - final_read_hint: false, - tracking: ReadTracking::default(), + ..Default::default() }, ) .await @@ -236,8 +236,8 @@ impl RawVc { /// See [`crate::Vc::resolve`]. pub(crate) async fn resolve(self) -> Result { self.resolve_inner(ReadOutputOptions { - tracking: ReadTracking::default(), consistency: ReadConsistency::Eventual, + ..Default::default() }) .await } @@ -245,8 +245,8 @@ impl RawVc { /// See [`crate::Vc::resolve_strongly_consistent`]. pub(crate) async fn resolve_strongly_consistent(self) -> Result { self.resolve_inner(ReadOutputOptions { - tracking: ReadTracking::default(), consistency: ReadConsistency::Strong, + ..Default::default() }) .await } @@ -398,11 +398,19 @@ impl ReadRawVcFuture { } } + /// Make reads strongly consistent. pub fn strongly_consistent(mut self) -> Self { self.read_output_options.consistency = ReadConsistency::Strong; self } + /// Track the value as a dependency with an key. + pub fn track_with_key(mut self, key: u64) -> Self { + self.read_output_options.tracking = ReadTracking::Tracked; + self.read_cell_options.tracking = ReadCellTracking::Tracked { key: Some(key) }; + self + } + /// This will not track the value as dependency, but will still track the error as dependency, /// if there is an error. /// @@ -410,7 +418,7 @@ impl ReadRawVcFuture { /// using it could break cache invalidation. pub fn untracked(mut self) -> Self { self.read_output_options.tracking = ReadTracking::TrackOnlyError; - self.read_cell_options.tracking = ReadTracking::TrackOnlyError; + self.read_cell_options.tracking = ReadCellTracking::TrackOnlyError; self } @@ -421,10 +429,11 @@ impl ReadRawVcFuture { /// using it could break cache invalidation. pub fn untracked_including_errors(mut self) -> Self { self.read_output_options.tracking = ReadTracking::Untracked; - self.read_cell_options.tracking = ReadTracking::Untracked; + self.read_cell_options.tracking = ReadCellTracking::Untracked; self } + /// Hint that this is the final read of the cell content. pub fn final_read_hint(mut self) -> Self { self.read_cell_options.final_read_hint = true; self diff --git a/turbopack/crates/turbo-tasks/src/read_options.rs b/turbopack/crates/turbo-tasks/src/read_options.rs index 49364688c9569..33a1e14115ffd 100644 --- a/turbopack/crates/turbo-tasks/src/read_options.rs +++ b/turbopack/crates/turbo-tasks/src/read_options.rs @@ -1,8 +1,8 @@ -use crate::{ReadConsistency, ReadTracking}; +use crate::{ReadConsistency, ReadTracking, manager::ReadCellTracking}; #[derive(Clone, Copy, Debug, Default)] pub struct ReadCellOptions { - pub tracking: ReadTracking, + pub tracking: ReadCellTracking, pub is_serializable_cell_content: bool, pub final_read_hint: bool, } diff --git a/turbopack/crates/turbo-tasks/src/read_ref.rs b/turbopack/crates/turbo-tasks/src/read_ref.rs index 0478fd6a10623..09f42c0ea0e7b 100644 --- a/turbopack/crates/turbo-tasks/src/read_ref.rs +++ b/turbopack/crates/turbo-tasks/src/read_ref.rs @@ -32,7 +32,7 @@ type VcReadTarget = <::Read as VcRead>::Target; /// certain point in time. /// /// Internally it stores a reference counted reference to a value on the heap. -pub struct ReadRef(triomphe::Arc); +pub struct ReadRef(pub(crate) triomphe::Arc); impl Clone for ReadRef { fn clone(&self) -> Self { @@ -255,6 +255,11 @@ impl ReadRef { &this.0 } + /// Returns the inner `Arc`. + pub fn into_raw_arc(self) -> triomphe::Arc { + self.0 + } + pub fn ptr_eq(&self, other: &ReadRef) -> bool { triomphe::Arc::ptr_eq(&self.0, &other.0) } diff --git a/turbopack/crates/turbo-tasks/src/vc/cell_mode.rs b/turbopack/crates/turbo-tasks/src/vc/cell_mode.rs index cbcc030861044..c0c947c385bdf 100644 --- a/turbopack/crates/turbo-tasks/src/vc/cell_mode.rs +++ b/turbopack/crates/turbo-tasks/src/vc/cell_mode.rs @@ -2,7 +2,7 @@ use std::{any::type_name, marker::PhantomData}; use super::{read::VcRead, traits::VcValueType}; use crate::{ - RawVc, Vc, backend::VerificationMode, manager::find_cell_by_type, + RawVc, Vc, backend::VerificationMode, keyed::Keyed, manager::find_cell_by_type, task::shared_reference::TypedSharedReference, }; @@ -85,6 +85,35 @@ where } } +/// Mode that compares the cell's content with the new value key by key and only updates +/// individual keys if the new value is different. +pub struct VcCellKeyedCompareMode { + _phantom: PhantomData, +} + +impl VcCellMode for VcCellKeyedCompareMode +where + T: VcValueType + PartialEq, + VcReadTarget: Keyed, + as Keyed>::Key: std::hash::Hash, +{ + fn cell(inner: VcReadTarget) -> Vc { + let cell = find_cell_by_type::(); + cell.keyed_compare_and_update(>::target_to_value(inner)); + Vc { + node: cell.into(), + _t: PhantomData, + } + } + + fn raw_cell(content: TypedSharedReference) -> RawVc { + debug_assert_repr::(&content); + let cell = find_cell_by_type::(); + cell.keyed_compare_and_update_with_shared_reference::(content.reference); + cell.into() + } +} + fn debug_assert_repr(content: &TypedSharedReference) { debug_assert!( (*content.reference.0).is::>(), diff --git a/turbopack/crates/turbo-tasks/src/vc/mod.rs b/turbopack/crates/turbo-tasks/src/vc/mod.rs index 2fd35bdefdbb0..e567db9f6786c 100644 --- a/turbopack/crates/turbo-tasks/src/vc/mod.rs +++ b/turbopack/crates/turbo-tasks/src/vc/mod.rs @@ -23,7 +23,7 @@ use shrink_to_fit::ShrinkToFit; pub use self::{ cast::{VcCast, VcValueTraitCast, VcValueTypeCast}, - cell_mode::{VcCellCompareMode, VcCellMode, VcCellNewMode}, + cell_mode::{VcCellCompareMode, VcCellKeyedCompareMode, VcCellMode, VcCellNewMode}, default::ValueDefault, local::NonLocalValue, operation::{OperationValue, OperationVc}, @@ -34,8 +34,10 @@ pub use self::{ use crate::{ CellId, RawVc, ResolveTypeError, debug::{ValueDebug, ValueDebugFormat, ValueDebugFormatString}, + keyed::Keyed, registry, trace::{TraceRawVcs, TraceRawVcsContext}, + vc::read::{ReadContainsKeyedVcFuture, ReadKeyedVcFuture}, }; type VcReadTarget = <::Read as VcRead>::Target; @@ -670,6 +672,30 @@ where } } +impl Vc +where + T: VcValueType, + VcReadTarget: Keyed, + as Keyed>::Key: Hash, +{ + /// Read the value and selects a keyed value from it. Only depends on the used key instead of + /// the full value. + pub fn get<'l>(self, key: &'l as Keyed>::Key) -> ReadKeyedVcFuture<'l, T> { + let future: ReadVcFuture = self.node.into_read(T::has_serialization()).into(); + future.get(key) + } + + /// Read the value and checks if it contains the given key. Only depends on the used key instead + /// of the full value. + pub fn contains_key<'l>( + self, + key: &'l as Keyed>::Key, + ) -> ReadContainsKeyedVcFuture<'l, T> { + let future: ReadVcFuture = self.node.into_read(T::has_serialization()).into(); + future.contains_key(key) + } +} + impl Unpin for Vc where T: ?Sized {} impl Default for Vc diff --git a/turbopack/crates/turbo-tasks/src/vc/read.rs b/turbopack/crates/turbo-tasks/src/vc/read.rs index c0a3f7d3528c3..74edb975f98c1 100644 --- a/turbopack/crates/turbo-tasks/src/vc/read.rs +++ b/turbopack/crates/turbo-tasks/src/vc/read.rs @@ -1,10 +1,22 @@ -use std::{any::Any, marker::PhantomData, mem::ManuallyDrop, pin::Pin, task::Poll}; +use std::{ + any::Any, + hash::{BuildHasher, Hash}, + marker::PhantomData, + mem::ManuallyDrop, + pin::Pin, + task::Poll, +}; use anyhow::Result; use futures::Future; +use pin_project_lite::pin_project; +use rustc_hash::FxBuildHasher; use super::traits::VcValueType; -use crate::{ReadRawVcFuture, ReadRef, VcCast, VcValueTrait, VcValueTraitCast, VcValueTypeCast}; +use crate::{ + ReadRawVcFuture, ReadRef, VcCast, VcValueTrait, VcValueTraitCast, VcValueTypeCast, + keyed::Keyed, keyed_read_ref::MappedReadRef, +}; type VcReadTarget = <::Read as VcRead>::Target; @@ -204,16 +216,21 @@ where T: ?Sized, Cast: VcCast, { + /// Do not use this: Use [`OperationVc::read_strongly_consistent`] instead. pub fn strongly_consistent(mut self) -> Self { self.raw = self.raw.strongly_consistent(); self } + /// Returns a untracked read of the value. This will not invalidate the current function when + /// the read value changed. pub fn untracked(mut self) -> Self { self.raw = self.raw.untracked(); self } + /// Read the value with the hint that this is the final read of the value. This might drop the + /// cell content. Future reads might need to recompute the value. pub fn final_read_hint(mut self) -> Self { self.raw = self.raw.final_read_hint(); self @@ -225,11 +242,39 @@ where T: VcValueType, VcReadTarget: Clone, { + /// Read the value and returns a owned version of it. It might clone the value. pub fn owned(self) -> ReadOwnedVcFuture { ReadOwnedVcFuture { future: self } } } +impl ReadVcFuture> +where + T: VcValueType, + VcReadTarget: Keyed, + as Keyed>::Key: Hash, +{ + /// Read the value and selects a keyed value from it. Only depends on the used key instead of + /// the full value. + pub fn get<'l>(mut self, key: &'l as Keyed>::Key) -> ReadKeyedVcFuture<'l, T> { + self.raw = self.raw.track_with_key(FxBuildHasher.hash_one(key)); + ReadKeyedVcFuture { future: self, key } + } + + /// Read the value and checks if it contains the given key. Only depends on the used key instead + /// of the full value. + /// + /// Note: This is also invalidated when the value of the key changes, not only when the presence + /// of the key changes. + pub fn contains_key<'l>( + mut self, + key: &'l as Keyed>::Key, + ) -> ReadContainsKeyedVcFuture<'l, T> { + self.raw = self.raw.track_with_key(FxBuildHasher.hash_one(key)); + ReadContainsKeyedVcFuture { future: self, key } + } +} + impl From for ReadVcFuture> where T: VcValueType, @@ -295,3 +340,74 @@ where } } } + +pin_project! { + pub struct ReadKeyedVcFuture<'l, T> + where + T: VcValueType, + VcReadTarget: Keyed, + { + #[pin] + future: ReadVcFuture>, + key: &'l as Keyed>::Key, + } +} + +impl<'l, T> Future for ReadKeyedVcFuture<'l, T> +where + T: VcValueType, + VcReadTarget: Keyed, +{ + type Output = Result as Keyed>::Value>>>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + // Safety: We never move the contents of `self` + let this = self.project(); + match this.future.poll(cx) { + Poll::Ready(Ok(result)) => { + let mapped_read_ref = if let Some(value) = (*result).get(this.key) { + let ptr = value as *const _; + Some(unsafe { MappedReadRef::new(result.into_raw_arc(), ptr) }) + } else { + None + }; + Poll::Ready(Ok(mapped_read_ref)) + } + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => Poll::Pending, + } + } +} + +pin_project! { + pub struct ReadContainsKeyedVcFuture<'l, T> + where + T: VcValueType, + VcReadTarget: Keyed, + { + #[pin] + future: ReadVcFuture>, + key: &'l as Keyed>::Key, + } +} + +impl<'l, T> Future for ReadContainsKeyedVcFuture<'l, T> +where + T: VcValueType, + VcReadTarget: Keyed, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + // Safety: We never move the contents of `self` + let this = self.project(); + match this.future.poll(cx) { + Poll::Ready(Ok(result)) => { + let result = (*result).contains_key(this.key); + Poll::Ready(Ok(result)) + } + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => Poll::Pending, + } + } +}