diff --git a/src/pipelines/exec/mod.rs b/src/pipelines/exec/mod.rs index 7f1f5d7..3c48c42 100644 --- a/src/pipelines/exec/mod.rs +++ b/src/pipelines/exec/mod.rs @@ -13,416 +13,234 @@ //! [`BlockBuilder::apply_pre_execution_changes`]: crate::reth::evm::execute::BlockBuilder::apply_pre_execution_changes use { - super::{StepInstance, service::ServiceContext}, + super::{StepInstance, metrics}, crate::{prelude::*, reth}, - core::{ - pin::Pin, - task::{Context, Poll}, - }, + core::pin::Pin, futures::FutureExt, navi::{StepNavigator, StepPath}, reth::payload::builder::{PayloadBuilderError, PayloadId}, scope::RootScope, - std::{sync::Arc, time::Instant}, + std::sync::Arc, tracing::{debug, trace}, }; pub(super) mod navi; pub(super) mod scope; -type PipelineOutput = +pub(super) type PipelineOutput

= Result, Arc>; +pub(super) type PipelineFuture

= + Pin> + Send>>; + /// This type is responsible for executing a single run of a pipeline. /// -/// The execution is driven by the future poll that it implements. Each call to -/// `poll` will run one step of the pipeline at a time, or parts of a step if -/// the step is async and needs many polls before it completes. The executor -/// future will resolve when the whole pipeline has been executed, or when an -/// error occurs. -pub(super) struct PipelineExecutor +/// The execution is driven by an async function. Call `into_future()` to get +/// the future that executes the pipeline. +pub(super) struct PipelineExecutor

where P: Platform, - Provider: traits::ProviderBounds

, { - /// The current state of the executor state machine. - cursor: Cursor

, - - // The pipeline that is being executed. - pipeline: Arc>, - - // The block context that is being built. - block: BlockContext

, - - // The reth payload builder service context that is running this payload job. - service: Arc>, - - /// Execution scopes. This root scope represents the top-level pipeline that - /// may contain nested scopes for each nested pipeline. - scope: Arc>, + payload_id: PayloadId, + future: PipelineFuture

, } -impl> - PipelineExecutor -{ - /// Begins the execution of a pipeline for a new block/payload job. +impl PipelineExecutor

{ pub(super) fn run( pipeline: Arc>, block: BlockContext

, - service: Arc>, + metrics: Arc, ) -> Self { - // Emit a system event for this new payload job and record initial metrics. pipeline.events.publish(PayloadJobStarted(block.clone())); - service.metrics().jobs_started.increment(1); - service - .metrics() - .record_payload_job_attributes::

(block.attributes()); + metrics.jobs_started.increment(1); + metrics.record_payload_job_attributes::

(block.attributes()); - // Create the initial payload checkpoint, this will implicitly capture the - // time we started executing the pipeline for this payload job. - let checkpoint = block.start(); + let payload_id = block.payload_id(); + let future = Self::execute(pipeline, block, metrics).boxed(); - // initialize pipeline scopes - let root = Arc::new(RootScope::new(&pipeline, &checkpoint)); - - // Initially set the execution cursor to initializing state, that will call - // all `before_job` methods of the steps in the pipeline. - Self { - cursor: Cursor::

::Initializing({ - let block = block.clone(); - let pipeline = Arc::clone(&pipeline); - let scope = Arc::clone(&root); - - async move { - for step in pipeline.iter_steps() { - let navi = step.navigator(&pipeline).expect( - "Invalid step path. This is a bug in the pipeline executor \ - implementation.", - ); - let limits = scope.limits_of(&step).expect("invalid step path"); - let ctx = StepContext::new(&block, &navi, limits, None); - navi.instance().before_job(ctx).await?; - } - - Ok(checkpoint) - } - .boxed() - }), - pipeline, - block, - service, - scope: root, - } + Self { payload_id, future } } - /// Returns the payload id for which we are building a payload. pub(super) fn payload_id(&self) -> PayloadId { - self.block.payload_id() + self.payload_id } -} -/// private implementation details for the `PipelineExecutor`. -impl> - PipelineExecutor -{ - /// This method creates a future that encapsulates the execution as an async - /// step. The created future will be held inside `Cursor::StepInProgress` and - /// polled until it resolves. - /// - /// It will prepare the step context and all the information a pipeline step - /// needs to execute, then create a future object that will be stored in the - /// cursor and polled whenever the executor is polled. - fn execute_step( - &self, - path: &StepPath, - input: Checkpoint

, - ) -> Pin> + Send>> { - let limits = self.scope.limits_of(path).expect( - "Invalid step path. This is a bug in the pipeline executor \ - implementation.", - ); - - let navi = path.navigator(&self.pipeline).expect( - "Invalid step path. This is a bug in the pipeline executor \ - implementation.", - ); - - let entered_at = self.scope.entered_at(path); - let ctx = StepContext::new(&self.block, &navi, limits, entered_at); - let step = Arc::clone(navi.instance()); - async move { step.step(input, ctx).await }.boxed() + pub(super) fn into_future(self) -> PipelineFuture

{ + self.future } - /// This method handles the control flow of the pipeline execution. - /// - /// Once a step is executed it determines the next step to execute based on - /// the output of the step, the current cursor state and the pipeline - /// structure. - fn advance_cursor( - &self, - current_path: &StepPath, - output: ControlFlow

, - ) -> Cursor

{ - // we need this type to determine the next step to execute - // based on the current step output. - let navigator = current_path.navigator(&self.pipeline).expect( - "Step path is unreachable. This is a bug in the pipeline executor \ - implementation.", - ); - - // identify the next step to execute based on the output of the previously - // executed step. - let (step, input) = match output { - // If the step output is a failure, we terminate the execution of the - // whole pipeline and return the error as the final output on next future - // poll. - ControlFlow::Fail(payload_builder_error) => { - return Cursor::Finalizing( - self.finalize(Err(Arc::new(payload_builder_error))), - ); - } + async fn execute( + pipeline: Arc>, + block: BlockContext

, + metrics: Arc, + ) -> PipelineOutput

{ + let checkpoint = block.start(); + let scope = Arc::new(RootScope::new(&pipeline, &checkpoint)); - // not a failure, chose the next step based on the control flow output - ControlFlow::Break(checkpoint) => (navigator.next_break(), checkpoint), - ControlFlow::Ok(checkpoint) => (navigator.next_ok(), checkpoint), - }; + let init_result = + Self::initialize(&pipeline, &block, &scope, checkpoint).await; - let Some(step) = step else { - // If there is no next step, we are done with the pipeline execution. - // We can finalize the pipeline and return the output as the final - // result of the pipeline run. - return Cursor::Finalizing( - self.finalize(input.build_payload().map_err(Arc::new)), - ); + let output = match init_result { + Ok(checkpoint) => { + Self::run_steps(&pipeline, &block, &scope, checkpoint).await + } + Err(e) => Err(Arc::new(e)), }; - // there is a next step to be executed, create a cursor that will - // start running the next step with the output of the current step - // as input on next executor future poll + Self::finalize(&pipeline, &block, &metrics, &scope, output).await + } - // enter the scope of the next step - self.scope.switch_context(step.path(), &input); + async fn initialize( + pipeline: &Arc>, + block: &BlockContext

, + scope: &Arc>, + checkpoint: Checkpoint

, + ) -> Result, PayloadBuilderError> { + for step in pipeline.iter_steps() { + let navi = step + .navigator(pipeline) + .expect("Invalid step path in pipeline executor"); + let limits = scope.limits_of(&step).expect("invalid step path"); + let ctx = StepContext::new(block, &navi, limits, None); + navi.instance().before_job(ctx).await?; + } - // schedule execution on next future poll - Cursor::BeforeStep(step.into(), input) + trace!("{pipeline} initialized successfully"); + Ok(checkpoint) } - /// After pipeline steps are initialized, this method will identify the first - /// step to execute in the pipeline and prepare the cursor to run it. - fn first_step(&self, checkpoint: Checkpoint

) -> Cursor

{ - let Some(navigator) = StepNavigator::entrypoint(&self.pipeline) else { + async fn run_steps( + pipeline: &Arc>, + block: &BlockContext

, + scope: &Arc>, + mut checkpoint: Checkpoint

, + ) -> PipelineOutput

{ + let Some(mut navigator) = StepNavigator::entrypoint(pipeline) else { debug!( "empty pipeline, building empty payload for attributes: {:?}", - self.block.attributes() - ); - - return Cursor::

::Finalizing( - self.finalize(self.block.start().build_payload().map_err(Arc::new)), + block.attributes() ); + return block.start().build_payload().map_err(Arc::new); }; - // enter the scope of the root pipeline - self.scope.enter(&checkpoint); + scope.enter(&checkpoint); - // Begin executing the first step of the pipeline in the next future poll - Cursor::BeforeStep(navigator.into(), checkpoint) - } + loop { + let path: StepPath = navigator.path().clone(); + scope.switch_context(&path, &checkpoint); - /// This method will walk through the pipeline steps and invoke the - /// `after_job` method of each step in the pipeline with the final output. - fn finalize( - &self, - output: PipelineOutput

, - ) -> Pin> + Send>> { - let output = Arc::new(output.map_err(|e| clone_payload_error_lossy(&e))); - let pipeline = Arc::clone(&self.pipeline); - let block = self.block.clone(); - let pipeline = Arc::clone(&pipeline); - let scope = Arc::clone(&self.scope); - - async move { - // invoke the `after_job` method of each step in the pipeline - // if any of them fails, we fail the pipeline execution, otherwise - // we return the output of the pipeline. - for step in pipeline.iter_steps() { - let navi = step.navigator(&pipeline).expect( - "Invalid step path. This is a bug in the pipeline executor \ - implementation.", - ); - let limits = scope.limits_of(&step).expect("invalid step path"); - let ctx = StepContext::new(&block, &navi, limits, None); - navi.instance().after_job(ctx, output.clone()).await?; - } + let limits = scope.limits_of(&path).expect("invalid step path"); + let entered_at = scope.entered_at(&path); + let ctx = StepContext::new(block, &navigator, limits, entered_at); + let step = Arc::clone(navigator.instance()); - // leave the scope of the root pipeline (if entered, we never enter the - // root scope only in empty pipelines). - if scope.is_active() { - scope.leave(); - } + trace!("{pipeline} will execute step {path}"); + let output = step.step(checkpoint, ctx).await; + trace!("{pipeline} step {path:?} completed with output: {output:#?}"); - Arc::into_inner(output) - .expect("unexpected > 1 strong reference count") - .map_err(Arc::new) - } - .boxed() - } -} + let (next_navigator, next_checkpoint) = match output { + ControlFlow::Ok(cp) => (navigator.next_ok(), cp), + ControlFlow::Break(cp) => (navigator.next_break(), cp), + ControlFlow::Fail(e) => return Err(Arc::new(e)), + }; -impl Future for PipelineExecutor -where - P: Platform, - Provider: traits::ProviderBounds

, -{ - type Output = PipelineOutput

; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let executor = self.get_mut(); - - // The executor has not run any steps yet, it is invoking the `before_job` - // method of each step in the pipeline. - if let Cursor::Initializing(ref mut future) = executor.cursor { - if let Poll::Ready(output) = future.as_mut().poll_unpin(cx) { - match output { - Ok(checkpoint) => { - trace!("{} initialized successfully", executor.pipeline); - executor.cursor = executor.first_step(checkpoint); - } - Err(error) => { - trace!( - "{} initialization failed with error: {error:?}", - executor.pipeline - ); - // If the initialization failed, we immediately finalize the - // pipeline with the error that occurred during initialization - // and not attempt to run any steps. - executor.cursor = - Cursor::Finalizing(executor.finalize(Err(error.into()))); - } + checkpoint = next_checkpoint; + + match next_navigator { + Some(next) => { + navigator = next; + // Cooperative yielding: The `run_steps()` loop can execute multiple + // pipeline steps in a tight sequence without yielding back to + // the tokio scheduler, especially when steps complete quickly + // (synchronously). This can cause executor starvation where other + // tasks are delayed. + // + // The `yield_now().await` here provides a yield point to prevent + // starvation. This yield point is not at a fixed location - it + // could be placed at several alternative points in the execution + // loop. For example: + // - After step execution completes + // - Before executing the next step + // - At the end of each loop iteration + // + // The critical requirement is that SOME yield point exists in the + // loop to allow the tokio scheduler to run other tasks. The + // exact placement is a trade-off that hasn't been fully explored: + // - Here: Minimal overhead (one yield per step), but may yield + // unnecessarily + // - Before step: Never misses a yield, but always adds overhead + // - Conditional (e.g., "if step completed synchronously"): Optimal, + // but requires additional bookkeeping/changes to the step API + // + // If executor starvation persists, consider: + // 1. Adding yield points to `initialize()` and `finalize()` loops + // (which lack await points between iterations) + // 2. Moving yield point to different location in this loop + tokio::task::consume_budget().await; } - trace!("{} initializing completed", executor.pipeline); + None => return checkpoint.build_payload().map_err(Arc::new), } - - // tell the async runtime to poll again because we are still initializing - cx.waker().wake_by_ref(); } + } - // the pipeline has completed executing all steps or encountered and error. - // Now we are running the `after_job` of each step in the pipeline. - if let Cursor::Finalizing(ref mut future) = executor.cursor { - if let Poll::Ready(output) = future.as_mut().poll_unpin(cx) { - trace!("{} completed with output: {output:#?}", executor.pipeline); - - // Execution of this pipeline has completed, This resolves the - // executor future with the final output of the pipeline. Also - // emit an appropriate system event and record metrics. - - let payload_id = executor.block.payload_id(); - let events_bus = &executor.pipeline.events; - let metrics = executor.service.metrics(); - - match &output { - Ok(built_payload) => { - events_bus.publish(PayloadJobCompleted::

{ - payload_id, - built_payload: built_payload.clone(), - }); - metrics.jobs_completed.increment(1); - metrics.record_payload::

(built_payload, &executor.block); - } - Err(error) => { - events_bus.publish(PayloadJobFailed { - payload_id, - error: error.clone(), - }); - metrics.jobs_failed.increment(1); - } - } + async fn finalize( + pipeline: &Arc>, + block: &BlockContext

, + metrics: &Arc, + scope: &Arc>, + output: PipelineOutput

, + ) -> PipelineOutput

{ + let output = Arc::new(output.map_err(|e| clone_payload_error_lossy(&e))); - return Poll::Ready(output); + for step in pipeline.iter_steps() { + let navi = step + .navigator(pipeline) + .expect("Invalid step path in pipeline executor"); + let limits = scope.limits_of(&step).expect("invalid step path"); + let step_ctx = StepContext::new(block, &navi, limits, None); + + if let Err(e) = navi.instance().after_job(step_ctx, output.clone()).await + { + trace!("{pipeline} finalization failed with error: {e:?}"); + return Err(Arc::new(e)); } + } - // tell the async runtime to poll again because we are still finalizing - cx.waker().wake_by_ref(); + if scope.is_active() { + scope.leave(); } - if matches!(executor.cursor, Cursor::BeforeStep(_, _)) { - // If the cursor is in the BeforeStep state, we need to run the next - // step of the pipeline. Steps are async futures, so we need to store - // their instance while they are running and being polled until resolved. + let result = Arc::into_inner(output) + .expect("unexpected > 1 strong reference count") + .map_err(Arc::new); - let Cursor::BeforeStep(path, input) = - std::mem::replace(&mut executor.cursor, Cursor::PreparingStep) - else { - unreachable!("bug in PipelineExecutor state machine"); - }; + trace!("{pipeline} completed with output: {result:#?}"); - trace!("{} will execute step {path}", executor.pipeline); - let future = executor.execute_step(&path, input); - executor.cursor = Cursor::StepInProgress(path, future); - cx.waker().wake_by_ref(); // tell the async runtime to poll again - } + let payload_id = block.payload_id(); + let events_bus = &pipeline.events; - if let Cursor::StepInProgress(ref path, ref mut future) = executor.cursor { - // If the cursor is in the StepInProgress state, we to poll the - // future instance of that step to see if it has completed. - if let Poll::Ready(output) = future.as_mut().poll_unpin(cx) { - trace!( - "{} step {path:?} completed with output: {output:#?}", - executor.pipeline - ); - - // step has completed, we can advance the cursor - executor.cursor = executor.advance_cursor(path, output); + match &result { + Ok(built_payload) => { + events_bus.publish(PayloadJobCompleted::

{ + payload_id, + built_payload: built_payload.clone(), + }); + metrics.jobs_completed.increment(1); + metrics.record_payload::

(built_payload, block); + } + Err(error) => { + events_bus.publish(PayloadJobFailed { + payload_id, + error: error.clone(), + }); + metrics.jobs_failed.increment(1); } - - cx.waker().wake_by_ref(); // tell the async runtime to poll again } - Poll::Pending + result } } -/// Keeps track of the current pipeline execution progress. -enum Cursor { - /// The pipeline will execute this step on the next iteration. - BeforeStep(StepPath, Checkpoint

), - - /// a pipeline step execution is in progress. - /// - /// This state is set when the pipeline executor has began executing a step - /// but has not yet completed it between two polls. Steps are asynchronous - /// and may span multiple polls. Until the step future is resolved, we need - /// to store its instance and poll it. - /// - /// Here we store the step path that is currently being executed - /// and the pinned future that is executing the step. - StepInProgress( - StepPath, - Pin> + Send>>, - ), - - /// The pipeline is currently initializing all steps for a new payload job. - /// - /// This happens once before any step is executed, and it calls the - /// `before_job` method of each step in the pipeline. - Initializing( - Pin< - Box< - dyn Future, PayloadBuilderError>> + Send, - >, - >, - ), - - /// This state occurs after the `Completed` state is reached. It calls - /// the `after_job` method of each step in the pipeline with the output. - Finalizing(Pin> + Send>>), - - /// The pipeline is currently preparing to execute the next step. - /// We are in this state only for a brief moment inside the `poll` method, and - /// it will never be seen by the `run_step` method. - PreparingStep, -} - pub(crate) fn clone_payload_error_lossy( error: &PayloadBuilderError, ) -> PayloadBuilderError { diff --git a/src/pipelines/exec/scope.rs b/src/pipelines/exec/scope.rs index af8c0bb..ed9a7af 100644 --- a/src/pipelines/exec/scope.rs +++ b/src/pipelines/exec/scope.rs @@ -48,10 +48,10 @@ use { super::*, + ::metrics::{Counter, Histogram}, core::{cell::RefCell, time::Duration}, - metrics::{Counter, Histogram}, parking_lot::RwLock, - std::collections::HashMap, + std::{collections::HashMap, time::Instant}, }; /// Keeps track of the currently active pipeline execution scope relative to the diff --git a/src/pipelines/job.rs b/src/pipelines/job.rs index 133f4b0..a07517b 100644 --- a/src/pipelines/job.rs +++ b/src/pipelines/job.rs @@ -1,8 +1,8 @@ use { - super::traits, + super::metrics, crate::{ alloy, - pipelines::{exec::PipelineExecutor, service::ServiceContext}, + pipelines::exec::{PipelineExecutor, PipelineFuture}, prelude::*, reth, }, @@ -10,6 +10,7 @@ use { core::{ pin::Pin, task::{Context, Poll}, + time::Duration, }, futures::{FutureExt, future::Shared}, reth::{ @@ -35,26 +36,18 @@ use { /// /// This job will automatically terminate after a deadline even `resolve_kind` /// is not called. -pub(super) struct PayloadJob -where - P: Platform, - Provider: traits::ProviderBounds

, -{ +pub(super) struct PayloadJob { block: BlockContext

, - fut: ExecutorFuture, - /// The deadline when this job should resolve. + fut: ExecutorFuture

, deadline: Pin>, } -impl PayloadJob -where - P: Platform, - Provider: traits::ProviderBounds

, -{ +impl PayloadJob

{ pub(super) fn new( pipeline: &Arc>, block: BlockContext

, - service: &Arc>, + metrics: Arc, + deadline: Duration, ) -> Self { debug!( "New Payload Job {} with block context: {block:#?}", @@ -64,32 +57,21 @@ where let fut = ExecutorFuture::new(PipelineExecutor::run( Arc::clone(pipeline), block.clone(), - Arc::clone(service), + metrics, )); - // Job should complete within deadline (12s) even if GetPayload is never - // called. This prevents job accumulation. - // TODO: when FCU update avalanche is fixed we could replace - // builder.deadline with payload.attributes.timeout - let deadline = - Box::pin(tokio::time::sleep(service.node_config().builder.deadline)); - Self { block, fut, - deadline, + deadline: Box::pin(tokio::time::sleep(deadline)), } } } -impl RethPayloadJobTrait for PayloadJob -where - P: Platform, - Provider: traits::ProviderBounds

, -{ +impl RethPayloadJobTrait for PayloadJob

{ type BuiltPayload = types::BuiltPayload

; type PayloadAttributes = types::PayloadBuilderAttributes

; - type ResolvePayloadFuture = ExecutorFuture; + type ResolvePayloadFuture = ExecutorFuture

; fn best_payload(&self) -> Result { unimplemented!("PayloadJob::best_payload is not implemented"); @@ -128,11 +110,7 @@ where /// This future is polled for the first time by the Reth runtime when the /// `PayloadJob` is created. Here we want to immediately start executing /// the pipeline instead of waiting for the `resolve_kind` to be called. -impl Future for PayloadJob -where - P: Platform, - Provider: traits::ProviderBounds

, -{ +impl Future for PayloadJob

{ type Output = Result<(), PayloadBuilderError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -173,14 +151,13 @@ where /// This future wraps the `PipelineExecutor` and is used to poll the /// internal executor of the pipeline. Once this future is resolved, it /// can be polled again and will return copy of the resolved payload. -pub(super) struct ExecutorFuture +pub(super) struct ExecutorFuture

where P: Platform, - Provider: traits::ProviderBounds

, { payload_id: PayloadId, started_at: Instant, - state: ExecutorFutureState, + state: ExecutorFutureState

, } /// This enum allows us to wrap the `PipelineExecutor` future @@ -191,25 +168,23 @@ where /// Whenever any of the copies of the future is polled, it will poll the /// executor, if any copy resolved, all copies will also resolve with the same /// result. -enum ExecutorFutureState +enum ExecutorFutureState

where P: Platform, - Provider: traits::ProviderBounds

, { Ready(Result, Arc>), - Future(Shared>), + Future(Shared>), } -impl ExecutorFuture +impl

ExecutorFuture

where P: Platform, - Provider: traits::ProviderBounds

, { - pub(super) fn new(executor: PipelineExecutor) -> Self { + pub(super) fn new(executor: PipelineExecutor

) -> Self { Self { started_at: Instant::now(), payload_id: executor.payload_id(), - state: ExecutorFutureState::Future(executor.shared()), + state: ExecutorFutureState::Future(executor.into_future().shared()), } } @@ -261,10 +236,9 @@ where } } -impl Future for ExecutorFuture +impl

Future for ExecutorFuture

where P: Platform, - Provider: traits::ProviderBounds

, { type Output = Result, PayloadBuilderError>; @@ -279,8 +253,8 @@ where ), // we are still in progress. keep polling the inner executor future. - ExecutorFutureState::Future(ref mut executor) => { - match executor.poll_unpin(cx) { + ExecutorFutureState::Future(ref mut future) => { + match future.poll_unpin(cx) { Poll::Ready(result) => { // got a result. All future polls will return the result directly // without polling the executor again. @@ -304,10 +278,9 @@ where /// We want this to be cloneable because the `resolve_kind` method could /// potentially return multiple copies of the future, and we want all of them to /// resolve with the same result at the same time. -impl Clone for ExecutorFuture +impl

Clone for ExecutorFuture

where P: Platform, - Provider: traits::ProviderBounds

, { fn clone(&self) -> Self { Self { diff --git a/src/pipelines/service.rs b/src/pipelines/service.rs index 4cb2337..384ea9e 100644 --- a/src/pipelines/service.rs +++ b/src/pipelines/service.rs @@ -56,13 +56,12 @@ where let provider = Arc::new(ctx.provider().clone()); + let metrics = + metrics::Payload::with_scope(&format!("{}_payloads", pipeline.name())); + let service = ServiceContext { provider: Arc::clone(&provider), node_config: ctx.config().clone(), - metrics: metrics::Payload::with_scope(&format!( - "{}_payloads", - pipeline.name() - )), }; // assign metric names to each step in the pipeline. @@ -86,7 +85,7 @@ where } let (service, builder) = PayloadBuilderService::new( - JobGenerator::new(pipeline, service), + JobGenerator::new(pipeline, service, metrics), ctx.provider().canonical_state_stream(), ); @@ -101,14 +100,13 @@ where /// There is one service context instance per reth node. This type gives /// individual jobs access to the node state, transaction pool and other /// runtime facilities that are managed by reth. -pub(super) struct ServiceContext +struct ServiceContext where Plat: Platform, Provider: traits::ProviderBounds, { provider: Arc, node_config: NodeConfig>, - metrics: metrics::Payload, } impl ServiceContext @@ -116,23 +114,17 @@ where Plat: Platform, Provider: traits::ProviderBounds, { - pub(super) fn provider(&self) -> &Provider { + fn provider(&self) -> &Provider { &self.provider } - pub(super) const fn node_config( - &self, - ) -> &NodeConfig> { + const fn node_config(&self) -> &NodeConfig> { &self.node_config } - pub(super) const fn chain_spec(&self) -> &Arc> { + const fn chain_spec(&self) -> &Arc> { &self.node_config().chain } - - pub(super) const fn metrics(&self) -> &metrics::Payload { - &self.metrics - } } /// This type is stored inside the [`PayloadBuilderService`] type in Reth. @@ -149,6 +141,7 @@ where { pipeline: Arc>, service: Arc>, + metrics: Arc, pre_cached: Option, } @@ -157,9 +150,10 @@ where Plat: Platform, Provider: traits::ProviderBounds, { - pub(super) fn new( + fn new( pipeline: Pipeline, service: ServiceContext, + metrics: metrics::Payload, ) -> Self { let pipeline = Arc::new(pipeline); let service = Arc::new(service); @@ -167,6 +161,7 @@ where Self { pipeline, service, + metrics: Arc::new(metrics), pre_cached: None, } } @@ -190,7 +185,7 @@ where Plat: Platform, Provider: traits::ProviderBounds, { - type Job = PayloadJob; + type Job = PayloadJob; fn new_payload_job( &self, @@ -218,7 +213,18 @@ where ) .map_err(PayloadBuilderError::other)?; - Ok(PayloadJob::new(&self.pipeline, block_ctx, &self.service)) + // Job should complete within deadline (12s) even if GetPayload is never + // called. This prevents job accumulation. + // TODO: when FCU update avalanche is fixed we could replace + // builder.deadline with payload.attributes.timeout + let deadline = self.service.node_config().builder.deadline; + + Ok(PayloadJob::new( + &self.pipeline, + block_ctx, + self.metrics.clone(), + deadline, + )) } fn on_new_state(