diff --git a/src/limits.rs b/src/limits.rs index 9c8702f..4f793fb 100644 --- a/src/limits.rs +++ b/src/limits.rs @@ -5,14 +5,9 @@ //! flashblock partitioning logic. use { - crate::{ - Flashblocks, - state::{FlashblockNumber, TargetFlashblocks}, - }, + crate::Flashblocks, core::time::Duration, rblib::{alloy::consensus::BlockHeader, prelude::*}, - std::sync::{Arc, Mutex}, - tracing::debug, }; /// Specifies the limits for individual flashblocks. @@ -26,123 +21,31 @@ use { /// deadline by the flashblock interval. #[derive(Debug, Clone, Default)] pub struct FlashblockLimits { - state: Arc>, /// The time interval between flashblocks within one payload job. interval: Duration, } -#[derive(Debug, Clone, Default)] -pub struct FlashblockState { - /// Current block number being built, or `None` if uninitialized. - current_block: Option, - /// Current flashblock number. Used to check if we're on the first - /// flashblock or to adjust the target number of flashblocks for a block. - target_flashblocks: Arc, - /// Duration for the first flashblock, which may be shortened to absorb - /// timing variance. - first_flashblock_interval: Duration, - /// Gas allocated per flashblock (total gas limit divided by flashblock - /// count). - gas_per_flashblock: u64, -} - -impl FlashblockState { - fn current_gas_limit(&self, flashblock_number: &FlashblockNumber) -> u64 { - self - .gas_per_flashblock - .saturating_mul(flashblock_number.current()) - } -} - impl FlashblockLimits { - pub fn new( - interval: Duration, - target_flashblocks: Arc, - ) -> Self { - let state = FlashblockState { - target_flashblocks, - ..Default::default() - }; - FlashblockLimits { - interval, - state: Arc::new(Mutex::new(state)), - } + pub fn new(interval: Duration) -> Self { + FlashblockLimits { interval } } +} - /// Resets state when starting a new block, calculating target flashblock - /// count. - /// - /// If a new block is detected (different block number than current state), - /// initializes the flashblock partition for this block by: - /// - Calculating available time and dividing it into flashblock intervals - /// - Computing gas per flashblock from the total gas limit - /// - Resetting the current flashblock counter to 0 - /// - Adjusting the target number of flashblocks - pub fn update_state( +impl ScopedLimits for FlashblockLimits { + /// Creates the payload limits for the next flashblock in a new payload job. + fn create( &self, payload: &Checkpoint, enclosing: &Limits, - ) { - let mut state = self.state.lock().expect("mutex is not poisoned"); - - if state.current_block != Some(payload.block().number()) { - let payload_deadline = enclosing.deadline.expect( - "Flashblock limit require its enclosing scope to have a deadline", - ); - let elapsed = payload.building_since().elapsed(); - let remaining_time = payload_deadline.saturating_sub(elapsed); - - let (target_flashblocks, first_flashblock_interval) = - self.calculate_flashblocks(payload, remaining_time); - - state.gas_per_flashblock = enclosing - .gas_limit - .checked_div(target_flashblocks) - .unwrap_or(enclosing.gas_limit); - state.current_block = Some(payload.block().number()); - state.first_flashblock_interval = first_flashblock_interval; - state.target_flashblocks.set(target_flashblocks); - - debug!( - target_flashblocks = target_flashblocks, - first_flashblock_interval = ?first_flashblock_interval, - "Set flashblock timing for this block" - ); - } - } - - /// Returns limits for the current flashblock. - /// - /// If all flashblocks have been produced, returns a deadline of 1ms to stop - /// production. - pub fn get_limits( - &self, - enclosing: &Limits, - flashblock_number: &FlashblockNumber, ) -> Limits { - let state = self.state.lock().expect("mutex is not poisoned"); - // If flashblock number == 1, we're building the first flashblock - let deadline = if flashblock_number.current() == 1 { - state.first_flashblock_interval - } else { - self.interval - }; - - enclosing - .with_deadline(deadline) - .with_gas_limit(state.current_gas_limit(flashblock_number)) - } + let flashblock_number = payload.context(); + + let payload_deadline = enclosing.deadline.expect( + "FlashblockLimits requires its enclosing scope to have a deadline", + ); + let elapsed = payload.building_since().elapsed(); + let remaining_time = payload_deadline.saturating_sub(elapsed); - /// Calculates the number of flashblocks and first flashblock interval for - /// this block. - /// - /// Extracts block time from block timestamps, then partitions the remaining - /// time into flashblock intervals. - pub fn calculate_flashblocks( - &self, - payload: &Checkpoint, - remaining_time: Duration, - ) -> (u64, Duration) { let block_time = Duration::from_secs( payload .block() @@ -150,49 +53,19 @@ impl FlashblockLimits { .saturating_sub(payload.block().parent().header().timestamp()), ); - partition_time_into_flashblocks(block_time, remaining_time, self.interval) - } -} - -impl ScopedLimits for FlashblockLimits { - /// Creates the payload limits for the next flashblock in a new payload job. - fn create( - &self, - payload: &Checkpoint, - enclosing: &Limits, - ) -> Limits { - let flashblock_number = payload.context(); - // Check the state and reset if we started building next block - self.update_state(payload, enclosing); - - let limits = self.get_limits(enclosing, flashblock_number); + let (target_flashblocks, deadline) = partition_time_into_flashblocks( + block_time, + remaining_time, + self.interval, + ); - let state = self.state.lock().expect("mutex is not poisoned"); - let flashblock_number = payload.context(); - if flashblock_number.current() <= state.target_flashblocks.get() { - let gas_used = payload.cumulative_gas_used(); - let remaining_gas = enclosing.gas_limit.saturating_sub(gas_used); - tracing::info!( - "Creating flashblocks limits: {}, payload txs: {}, gas used: {} \ - ({}%), gas_remaining: {} ({}%), next_block_gas_limit: {} ({}%), gas \ - per block: {} ({}%), remaining_time: {}ms, gas_limit: {}", - flashblock_number, - payload.history().transactions().count(), - gas_used, - (gas_used * 100 / enclosing.gas_limit), - remaining_gas, - (remaining_gas * 100 / enclosing.gas_limit), - state.current_gas_limit(flashblock_number), - (state.current_gas_limit(flashblock_number) * 100 - / enclosing.gas_limit), - state.gas_per_flashblock, - (state.gas_per_flashblock * 100 / enclosing.gas_limit), - limits.deadline.expect("deadline is set").as_millis(), - limits.gas_limit - ); - } + let gas_limit = enclosing + .gas_limit + .checked_div(target_flashblocks) + .unwrap_or(enclosing.gas_limit) + .saturating_mul(flashblock_number.current()); - limits + enclosing.with_deadline(deadline).with_gas_limit(gas_limit) } } @@ -221,21 +94,21 @@ fn partition_time_into_flashblocks( ) -> (u64, Duration) { let remaining_time = remaining_time.min(block_time); - let remaining_millis = u64::try_from(remaining_time.as_millis()) + let remaining_ms = u64::try_from(remaining_time.as_millis()) .expect("remaining_time should never exceed u64::MAX milliseconds"); - let interval_millis = u64::try_from(flashblock_interval.as_millis()) + let flashblock_interval_ms = u64::try_from(flashblock_interval.as_millis()) .expect("flashblock_interval should never exceed u64::MAX milliseconds"); - let first_offset_millis = remaining_millis % interval_millis; + let first_offset_ms = remaining_ms % flashblock_interval_ms; - if first_offset_millis == 0 { + if first_offset_ms == 0 { // Perfect division: remaining time is exact multiple of interval - (remaining_millis / interval_millis, flashblock_interval) + (remaining_ms / flashblock_interval_ms, flashblock_interval) } else { // Non-perfect division: add extra flashblock with shortened first interval ( - remaining_millis / interval_millis + 1, - Duration::from_millis(first_offset_millis), + remaining_ms / flashblock_interval_ms + 1, + Duration::from_millis(first_offset_ms), ) } } diff --git a/src/main.rs b/src/main.rs index be6c711..70c0633 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,6 @@ use { publish::{PublishFlashblock, WebSocketSink}, rpc::TransactionStatusRpc, signer::BuilderSigner, - state::TargetFlashblocks, stop::BreakAfterMaxFlashblocks, }, platform::Flashblocks, @@ -20,7 +19,6 @@ use { steps::*, }, std::sync::Arc, - tracing::info, }; mod args; @@ -84,8 +82,7 @@ fn build_pipeline( cli_args: &BuilderArgs, pool: &OrderPool, ) -> eyre::Result> { - // how often a flashblock is published - let interval = cli_args.flashblocks_args.interval; + let flashblock_interval = cli_args.flashblocks_args.interval; // time by which flashblocks will be delivered earlier to account for latency let leeway_time = cli_args.flashblocks_args.leeway_time; @@ -104,14 +101,7 @@ fn build_pipeline( .clone() .unwrap_or(BuilderSigner::random()); - let target_flashblocks = Arc::new(TargetFlashblocks::new()); - - info!( - "cli_args.builder_signer.is_some() = {}", - cli_args.builder_signer.is_some() - ); - - let pipeline = Pipeline::::named("top") + let pipeline = Pipeline::::named("block") .with_step(OptimismPrologue) .with_step_if( cli_args.flashtestations.flashtestations_enabled @@ -123,45 +113,31 @@ fn build_pipeline( ) .with_pipeline( Loop, - Pipeline::named("n_flashblocks") + Pipeline::named("flashblocks") .with_pipeline( - Once, + Loop, Pipeline::named("single_flashblock") - .with_pipeline( - Once, - Pipeline::named("flashblock_steps") - .with_pipeline( - Loop, - Pipeline::named("inner_flashblock_steps") - .with_step(AppendOrders::from_pool(pool).with_ok_on_limit()) - .with_step(OrderByPriorityFee::default()) - .with_step_if( - cli_args.revert_protection, - RemoveRevertedTransactions::default(), - ) - .with_step(BreakAfterDeadline), - ) - .with_step_if( - cli_args.builder_signer.is_some(), - BuilderEpilogue::with_signer(builder_signer.clone().into()) - .with_message(|block| { - format!("Block Number: {}", block.number()) - }), - ) - .with_step(PublishFlashblock::new( - ws.clone(), - cli_args.flashblocks_args.calculate_state_root, - )) - .with_limits(FlashblockLimits::new( - interval, - target_flashblocks.clone(), - )), + .with_step(AppendOrders::from_pool(pool).with_ok_on_limit()) + .with_step(OrderByPriorityFee::default()) + .with_step_if( + cli_args.revert_protection, + RemoveRevertedTransactions::default(), ) - .with_step(BreakAfterDeadline), + .with_step(BreakAfterDeadline) + .with_limits(FlashblockLimits::new(flashblock_interval)), ) - .with_step(BreakAfterMaxFlashblocks::new(target_flashblocks)), - ) - .with_limits(Scaled::default().deadline(total_building_time)); + .with_step_if( + cli_args.builder_signer.is_some(), + BuilderEpilogue::with_signer(builder_signer.clone().into()) + .with_message(|block| format!("Block Number: {}", block.number())), + ) + .with_step(PublishFlashblock::new( + ws.clone(), + cli_args.flashblocks_args.calculate_state_root, + )) + .with_step(BreakAfterMaxFlashblocks::new(flashblock_interval)) + .with_limits(Scaled::default().deadline(total_building_time)), + ); ws.watch_shutdown(&pipeline); pool.attach_pipeline(&pipeline); diff --git a/src/publish.rs b/src/publish.rs index d960dbf..26e3f1c 100644 --- a/src/publish.rs +++ b/src/publish.rs @@ -172,7 +172,7 @@ impl Step for PublishFlashblock { self.capture_payload_metrics(&this_block_span); // Increment flashblock number since we've built the flashblock - let next_flashblock_number = flashblock_number.advance(); + let next_flashblock_number = flashblock_number.next(); // Place a barrier after each published flashblock to freeze the contents // of the payload up to this point, since this becomes a publicly committed diff --git a/src/state.rs b/src/state.rs index 0f94683..79d4526 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,20 +1,13 @@ use { crate::Flashblocks, rblib::prelude::CheckpointContext, - std::{ - fmt::Display, - sync::atomic::{AtomicU64, Ordering}, - }, + std::fmt::Display, }; /// Current flashblock number (1-indexed). #[derive(Debug, Clone, PartialEq, Eq)] pub struct FlashblockNumber(u64); -/// Number of flashblocks we're targeting to build for this block. -#[derive(Debug, Default)] -pub struct TargetFlashblocks(AtomicU64); - impl FlashblockNumber { pub fn new() -> Self { Self(1) @@ -30,7 +23,7 @@ impl FlashblockNumber { } #[must_use] - pub fn advance(&self) -> Self { + pub fn next(&self) -> Self { Self(self.0 + 1) } } @@ -48,17 +41,3 @@ impl Display for FlashblockNumber { } impl CheckpointContext for FlashblockNumber {} - -impl TargetFlashblocks { - pub fn new() -> Self { - Self(AtomicU64::default()) - } - - pub fn get(&self) -> u64 { - self.0.load(Ordering::Relaxed) - } - - pub fn set(&self, val: u64) { - self.0.store(val, Ordering::Relaxed); - } -} diff --git a/src/stop.rs b/src/stop.rs index 4da39e3..e9b927e 100644 --- a/src/stop.rs +++ b/src/stop.rs @@ -1,16 +1,14 @@ -use { - crate::{Flashblocks, state::TargetFlashblocks}, - rblib::prelude::{Checkpoint, ControlFlow, Step, StepContext}, - std::sync::Arc, -}; +use {crate::Flashblocks, rblib::prelude::*, std::time::Duration}; pub struct BreakAfterMaxFlashblocks { - target_flashblocks: Arc, + flashblock_interval: Duration, } impl BreakAfterMaxFlashblocks { - pub fn new(target_flashblocks: Arc) -> Self { - Self { target_flashblocks } + pub fn new(flashblock_interval: Duration) -> Self { + Self { + flashblock_interval, + } } } @@ -18,9 +16,31 @@ impl Step for BreakAfterMaxFlashblocks { async fn step( self: std::sync::Arc, payload: Checkpoint, - _: StepContext, + ctx: StepContext, ) -> ControlFlow { - if payload.context().current() <= self.target_flashblocks.get() { + let payload_deadline = ctx.limits().deadline.expect( + "Flashblock limit require its enclosing scope to have a deadline", + ); + + let payload_deadline_ms = u64::try_from(payload_deadline.as_millis()) + .expect("payload_deadline should never exceed u64::MAX milliseconds"); + let flashblock_interval_ms = u64::try_from( + self.flashblock_interval.as_millis(), + ) + .expect("flashblock_interval should never exceed u64::MAX milliseconds"); + + let offset_ms = payload_deadline_ms % flashblock_interval_ms; + + let target_flashblocks = if offset_ms == 0 { + // Perfect division: payload time is exact multiple of interval + payload_deadline_ms / flashblock_interval_ms + } else { + // Non-perfect division: add extra flashblock with shortened first + // interval + payload_deadline_ms / flashblock_interval_ms + 1 + }; + + if payload.context().current() <= target_flashblocks { ControlFlow::Ok(payload) } else { ControlFlow::Break(payload)