Skip to content

Commit 2d193cf

Browse files
committed
it works!: fix limits usage -- calculate target flashblocks right
1 parent 0d5fae5 commit 2d193cf

5 files changed

Lines changed: 88 additions & 240 deletions

File tree

src/limits.rs

Lines changed: 32 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,9 @@
55
//! flashblock partitioning logic.
66
77
use {
8-
crate::{
9-
Flashblocks,
10-
state::{FlashblockNumber, TargetFlashblocks},
11-
},
8+
crate::Flashblocks,
129
core::time::Duration,
1310
rblib::{alloy::consensus::BlockHeader, prelude::*},
14-
std::sync::{Arc, Mutex},
15-
tracing::debug,
1611
};
1712

1813
/// Specifies the limits for individual flashblocks.
@@ -26,173 +21,51 @@ use {
2621
/// deadline by the flashblock interval.
2722
#[derive(Debug, Clone, Default)]
2823
pub struct FlashblockLimits {
29-
state: Arc<Mutex<FlashblockState>>,
3024
/// The time interval between flashblocks within one payload job.
3125
interval: Duration,
3226
}
3327

34-
#[derive(Debug, Clone, Default)]
35-
pub struct FlashblockState {
36-
/// Current block number being built, or `None` if uninitialized.
37-
current_block: Option<u64>,
38-
/// Current flashblock number. Used to check if we're on the first
39-
/// flashblock or to adjust the target number of flashblocks for a block.
40-
target_flashblocks: Arc<TargetFlashblocks>,
41-
/// Duration for the first flashblock, which may be shortened to absorb
42-
/// timing variance.
43-
first_flashblock_interval: Duration,
44-
/// Gas allocated per flashblock (total gas limit divided by flashblock
45-
/// count).
46-
gas_per_flashblock: u64,
47-
}
48-
49-
impl FlashblockState {
50-
fn current_gas_limit(&self, flashblock_number: &FlashblockNumber) -> u64 {
51-
self
52-
.gas_per_flashblock
53-
.saturating_mul(flashblock_number.current())
54-
}
55-
}
56-
5728
impl FlashblockLimits {
58-
pub fn new(
59-
interval: Duration,
60-
target_flashblocks: Arc<TargetFlashblocks>,
61-
) -> Self {
62-
let state = FlashblockState {
63-
target_flashblocks,
64-
..Default::default()
65-
};
66-
FlashblockLimits {
67-
interval,
68-
state: Arc::new(Mutex::new(state)),
69-
}
29+
pub fn new(interval: Duration) -> Self {
30+
FlashblockLimits { interval }
7031
}
32+
}
7133

72-
/// Resets state when starting a new block, calculating target flashblock
73-
/// count.
74-
///
75-
/// If a new block is detected (different block number than current state),
76-
/// initializes the flashblock partition for this block by:
77-
/// - Calculating available time and dividing it into flashblock intervals
78-
/// - Computing gas per flashblock from the total gas limit
79-
/// - Resetting the current flashblock counter to 0
80-
/// - Adjusting the target number of flashblocks
81-
pub fn update_state(
34+
impl ScopedLimits<Flashblocks> for FlashblockLimits {
35+
/// Creates the payload limits for the next flashblock in a new payload job.
36+
fn create(
8237
&self,
8338
payload: &Checkpoint<Flashblocks>,
8439
enclosing: &Limits<Flashblocks>,
85-
) {
86-
let mut state = self.state.lock().expect("mutex is not poisoned");
87-
88-
if state.current_block != Some(payload.block().number()) {
89-
let payload_deadline = enclosing.deadline.expect(
90-
"Flashblock limit require its enclosing scope to have a deadline",
91-
);
92-
let elapsed = payload.building_since().elapsed();
93-
let remaining_time = payload_deadline.saturating_sub(elapsed);
94-
95-
let (target_flashblocks, first_flashblock_interval) =
96-
self.calculate_flashblocks(payload, remaining_time);
97-
98-
state.gas_per_flashblock = enclosing
99-
.gas_limit
100-
.checked_div(target_flashblocks)
101-
.unwrap_or(enclosing.gas_limit);
102-
state.current_block = Some(payload.block().number());
103-
state.first_flashblock_interval = first_flashblock_interval;
104-
state.target_flashblocks.set(target_flashblocks);
105-
106-
debug!(
107-
target_flashblocks = target_flashblocks,
108-
first_flashblock_interval = ?first_flashblock_interval,
109-
"Set flashblock timing for this block"
110-
);
111-
}
112-
}
113-
114-
/// Returns limits for the current flashblock.
115-
///
116-
/// If all flashblocks have been produced, returns a deadline of 1ms to stop
117-
/// production.
118-
pub fn get_limits(
119-
&self,
120-
enclosing: &Limits<Flashblocks>,
121-
flashblock_number: &FlashblockNumber,
12240
) -> Limits<Flashblocks> {
123-
let state = self.state.lock().expect("mutex is not poisoned");
124-
// If flashblock number == 1, we're building the first flashblock
125-
let deadline = if flashblock_number.current() == 1 {
126-
state.first_flashblock_interval
127-
} else {
128-
self.interval
129-
};
130-
131-
enclosing
132-
.with_deadline(deadline)
133-
.with_gas_limit(state.current_gas_limit(flashblock_number))
134-
}
41+
let flashblock_number = payload.context();
42+
43+
let payload_deadline = enclosing.deadline.expect(
44+
"FlashblockLimits requires its enclosing scope to have a deadline",
45+
);
46+
let elapsed = payload.building_since().elapsed();
47+
let remaining_time = payload_deadline.saturating_sub(elapsed);
13548

136-
/// Calculates the number of flashblocks and first flashblock interval for
137-
/// this block.
138-
///
139-
/// Extracts block time from block timestamps, then partitions the remaining
140-
/// time into flashblock intervals.
141-
pub fn calculate_flashblocks(
142-
&self,
143-
payload: &Checkpoint<Flashblocks>,
144-
remaining_time: Duration,
145-
) -> (u64, Duration) {
14649
let block_time = Duration::from_secs(
14750
payload
14851
.block()
14952
.timestamp()
15053
.saturating_sub(payload.block().parent().header().timestamp()),
15154
);
15255

153-
partition_time_into_flashblocks(block_time, remaining_time, self.interval)
154-
}
155-
}
156-
157-
impl ScopedLimits<Flashblocks> for FlashblockLimits {
158-
/// Creates the payload limits for the next flashblock in a new payload job.
159-
fn create(
160-
&self,
161-
payload: &Checkpoint<Flashblocks>,
162-
enclosing: &Limits<Flashblocks>,
163-
) -> Limits<Flashblocks> {
164-
let flashblock_number = payload.context();
165-
// Check the state and reset if we started building next block
166-
self.update_state(payload, enclosing);
167-
168-
let limits = self.get_limits(enclosing, flashblock_number);
56+
let (target_flashblocks, deadline) = partition_time_into_flashblocks(
57+
block_time,
58+
remaining_time,
59+
self.interval,
60+
);
16961

170-
let state = self.state.lock().expect("mutex is not poisoned");
171-
let flashblock_number = payload.context();
172-
if flashblock_number.current() <= state.target_flashblocks.get() {
173-
let gas_used = payload.cumulative_gas_used();
174-
let remaining_gas = enclosing.gas_limit.saturating_sub(gas_used);
175-
tracing::info!(
176-
"Creating flashblocks limits: {}, payload txs: {}, gas used: {} \
177-
({}%), gas_remaining: {} ({}%), next_block_gas_limit: {} ({}%), gas \
178-
per block: {} ({}%), remaining_time: {}ms, gas_limit: {}",
179-
flashblock_number,
180-
payload.history().transactions().count(),
181-
gas_used,
182-
(gas_used * 100 / enclosing.gas_limit),
183-
remaining_gas,
184-
(remaining_gas * 100 / enclosing.gas_limit),
185-
state.current_gas_limit(flashblock_number),
186-
(state.current_gas_limit(flashblock_number) * 100
187-
/ enclosing.gas_limit),
188-
state.gas_per_flashblock,
189-
(state.gas_per_flashblock * 100 / enclosing.gas_limit),
190-
limits.deadline.expect("deadline is set").as_millis(),
191-
limits.gas_limit
192-
);
193-
}
62+
let gas_limit = enclosing
63+
.gas_limit
64+
.checked_div(target_flashblocks)
65+
.unwrap_or(enclosing.gas_limit)
66+
.saturating_mul(flashblock_number.current());
19467

195-
limits
68+
enclosing.with_deadline(deadline).with_gas_limit(gas_limit)
19669
}
19770
}
19871

@@ -221,21 +94,21 @@ fn partition_time_into_flashblocks(
22194
) -> (u64, Duration) {
22295
let remaining_time = remaining_time.min(block_time);
22396

224-
let remaining_millis = u64::try_from(remaining_time.as_millis())
97+
let remaining_ms = u64::try_from(remaining_time.as_millis())
22598
.expect("remaining_time should never exceed u64::MAX milliseconds");
226-
let interval_millis = u64::try_from(flashblock_interval.as_millis())
99+
let flashblock_interval_ms = u64::try_from(flashblock_interval.as_millis())
227100
.expect("flashblock_interval should never exceed u64::MAX milliseconds");
228101

229-
let first_offset_millis = remaining_millis % interval_millis;
102+
let first_offset_ms = remaining_ms % flashblock_interval_ms;
230103

231-
if first_offset_millis == 0 {
104+
if first_offset_ms == 0 {
232105
// Perfect division: remaining time is exact multiple of interval
233-
(remaining_millis / interval_millis, flashblock_interval)
106+
(remaining_ms / flashblock_interval_ms, flashblock_interval)
234107
} else {
235108
// Non-perfect division: add extra flashblock with shortened first interval
236109
(
237-
remaining_millis / interval_millis + 1,
238-
Duration::from_millis(first_offset_millis),
110+
remaining_ms / flashblock_interval_ms + 1,
111+
Duration::from_millis(first_offset_ms),
239112
)
240113
}
241114
}

src/main.rs

Lines changed: 23 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use {
66
publish::{PublishFlashblock, WebSocketSink},
77
rpc::TransactionStatusRpc,
88
signer::BuilderSigner,
9-
state::TargetFlashblocks,
109
stop::BreakAfterMaxFlashblocks,
1110
},
1211
platform::Flashblocks,
@@ -20,7 +19,6 @@ use {
2019
steps::*,
2120
},
2221
std::sync::Arc,
23-
tracing::info,
2422
};
2523

2624
mod args;
@@ -84,8 +82,7 @@ fn build_pipeline(
8482
cli_args: &BuilderArgs,
8583
pool: &OrderPool<Flashblocks>,
8684
) -> eyre::Result<Pipeline<Flashblocks>> {
87-
// how often a flashblock is published
88-
let interval = cli_args.flashblocks_args.interval;
85+
let flashblock_interval = cli_args.flashblocks_args.interval;
8986

9087
// time by which flashblocks will be delivered earlier to account for latency
9188
let leeway_time = cli_args.flashblocks_args.leeway_time;
@@ -104,14 +101,7 @@ fn build_pipeline(
104101
.clone()
105102
.unwrap_or(BuilderSigner::random());
106103

107-
let target_flashblocks = Arc::new(TargetFlashblocks::new());
108-
109-
info!(
110-
"cli_args.builder_signer.is_some() = {}",
111-
cli_args.builder_signer.is_some()
112-
);
113-
114-
let pipeline = Pipeline::<Flashblocks>::named("top")
104+
let pipeline = Pipeline::<Flashblocks>::named("block")
115105
.with_step(OptimismPrologue)
116106
.with_step_if(
117107
cli_args.flashtestations.flashtestations_enabled
@@ -123,45 +113,31 @@ fn build_pipeline(
123113
)
124114
.with_pipeline(
125115
Loop,
126-
Pipeline::named("n_flashblocks")
116+
Pipeline::named("flashblocks")
127117
.with_pipeline(
128-
Once,
118+
Loop,
129119
Pipeline::named("single_flashblock")
130-
.with_pipeline(
131-
Once,
132-
Pipeline::named("flashblock_steps")
133-
.with_pipeline(
134-
Loop,
135-
Pipeline::named("inner_flashblock_steps")
136-
.with_step(AppendOrders::from_pool(pool).with_ok_on_limit())
137-
.with_step(OrderByPriorityFee::default())
138-
.with_step_if(
139-
cli_args.revert_protection,
140-
RemoveRevertedTransactions::default(),
141-
)
142-
.with_step(BreakAfterDeadline),
143-
)
144-
.with_step_if(
145-
cli_args.builder_signer.is_some(),
146-
BuilderEpilogue::with_signer(builder_signer.clone().into())
147-
.with_message(|block| {
148-
format!("Block Number: {}", block.number())
149-
}),
150-
)
151-
.with_step(PublishFlashblock::new(
152-
ws.clone(),
153-
cli_args.flashblocks_args.calculate_state_root,
154-
))
155-
.with_limits(FlashblockLimits::new(
156-
interval,
157-
target_flashblocks.clone(),
158-
)),
120+
.with_step(AppendOrders::from_pool(pool).with_ok_on_limit())
121+
.with_step(OrderByPriorityFee::default())
122+
.with_step_if(
123+
cli_args.revert_protection,
124+
RemoveRevertedTransactions::default(),
159125
)
160-
.with_step(BreakAfterDeadline),
126+
.with_step(BreakAfterDeadline)
127+
.with_limits(FlashblockLimits::new(flashblock_interval)),
161128
)
162-
.with_step(BreakAfterMaxFlashblocks::new(target_flashblocks)),
163-
)
164-
.with_limits(Scaled::default().deadline(total_building_time));
129+
.with_step_if(
130+
cli_args.builder_signer.is_some(),
131+
BuilderEpilogue::with_signer(builder_signer.clone().into())
132+
.with_message(|block| format!("Block Number: {}", block.number())),
133+
)
134+
.with_step(PublishFlashblock::new(
135+
ws.clone(),
136+
cli_args.flashblocks_args.calculate_state_root,
137+
))
138+
.with_step(BreakAfterMaxFlashblocks::new(flashblock_interval))
139+
.with_limits(Scaled::default().deadline(total_building_time)),
140+
);
165141

166142
ws.watch_shutdown(&pipeline);
167143
pool.attach_pipeline(&pipeline);

src/publish.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ impl Step<Flashblocks> for PublishFlashblock {
172172
self.capture_payload_metrics(&this_block_span);
173173

174174
// Increment flashblock number since we've built the flashblock
175-
let next_flashblock_number = flashblock_number.advance();
175+
let next_flashblock_number = flashblock_number.next();
176176

177177
// Place a barrier after each published flashblock to freeze the contents
178178
// of the payload up to this point, since this becomes a publicly committed

0 commit comments

Comments
 (0)