Skip to content

Commit 5388183

Browse files
pk910paulhauner
andcommitted
Allow per validator fee recipient via flag or file in validator client (similar to graffiti / graffiti-file) (#2924)
## Issue Addressed #2883 ## Proposed Changes * Added `suggested-fee-recipient` & `suggested-fee-recipient-file` flags to validator client (similar to graffiti / graffiti-file implementation). * Added proposer preparation service to VC, which sends the fee-recipient of all known validators to the BN via [/eth/v1/validator/prepare_beacon_proposer](ethereum/beacon-APIs#178) api once per slot * Added [/eth/v1/validator/prepare_beacon_proposer](ethereum/beacon-APIs#178) api endpoint and preparation data caching * Added cleanup routine to remove cached proposer preparations when not updated for 2 epochs ## Additional Info Changed the Implementation following the discussion in #2883. Co-authored-by: pk910 <[email protected]> Co-authored-by: Paul Hauner <[email protected]> Co-authored-by: Philipp K <[email protected]>
1 parent d172c0b commit 5388183

File tree

33 files changed

+1060
-40
lines changed

33 files changed

+1060
-40
lines changed

account_manager/src/validator/import.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,15 @@ pub fn cli_run(matches: &ArgMatches, validator_dir: PathBuf) -> Result<(), Strin
273273
eprintln!("Successfully imported keystore.");
274274
num_imported_keystores += 1;
275275

276-
let validator_def =
277-
ValidatorDefinition::new_keystore_with_password(&dest_keystore, password_opt, None)
278-
.map_err(|e| format!("Unable to create new validator definition: {:?}", e))?;
276+
let graffiti = None;
277+
let suggested_fee_recipient = None;
278+
let validator_def = ValidatorDefinition::new_keystore_with_password(
279+
&dest_keystore,
280+
password_opt,
281+
graffiti,
282+
suggested_fee_recipient,
283+
)
284+
.map_err(|e| format!("Unable to create new validator definition: {:?}", e))?;
279285

280286
defs.push(validator_def);
281287

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3101,7 +3101,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31013101
}
31023102
BeaconState::Merge(_) => {
31033103
let sync_aggregate = get_sync_aggregate()?;
3104-
let execution_payload = get_execution_payload(self, &state)?;
3104+
let execution_payload = get_execution_payload(self, &state, proposer_index)?;
31053105
BeaconBlock::Merge(BeaconBlockMerge {
31063106
slot,
31073107
proposer_index,

beacon_node/beacon_chain/src/execution_payload.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,22 +204,26 @@ pub fn validate_execution_payload_for_gossip<T: BeaconChainTypes>(
204204
pub fn get_execution_payload<T: BeaconChainTypes>(
205205
chain: &BeaconChain<T>,
206206
state: &BeaconState<T::EthSpec>,
207+
proposer_index: u64,
207208
) -> Result<ExecutionPayload<T::EthSpec>, BlockProductionError> {
208-
Ok(prepare_execution_payload_blocking(chain, state)?.unwrap_or_default())
209+
Ok(prepare_execution_payload_blocking(chain, state, proposer_index)?.unwrap_or_default())
209210
}
210211

211212
/// Wraps the async `prepare_execution_payload` function as a blocking task.
212213
pub fn prepare_execution_payload_blocking<T: BeaconChainTypes>(
213214
chain: &BeaconChain<T>,
214215
state: &BeaconState<T::EthSpec>,
216+
proposer_index: u64,
215217
) -> Result<Option<ExecutionPayload<T::EthSpec>>, BlockProductionError> {
216218
let execution_layer = chain
217219
.execution_layer
218220
.as_ref()
219221
.ok_or(BlockProductionError::ExecutionLayerMissing)?;
220222

221223
execution_layer
222-
.block_on_generic(|_| async { prepare_execution_payload(chain, state).await })
224+
.block_on_generic(|_| async {
225+
prepare_execution_payload(chain, state, proposer_index).await
226+
})
223227
.map_err(BlockProductionError::BlockingFailed)?
224228
}
225229

@@ -240,6 +244,7 @@ pub fn prepare_execution_payload_blocking<T: BeaconChainTypes>(
240244
pub async fn prepare_execution_payload<T: BeaconChainTypes>(
241245
chain: &BeaconChain<T>,
242246
state: &BeaconState<T::EthSpec>,
247+
proposer_index: u64,
243248
) -> Result<Option<ExecutionPayload<T::EthSpec>>, BlockProductionError> {
244249
let spec = &chain.spec;
245250
let execution_layer = chain
@@ -300,6 +305,7 @@ pub async fn prepare_execution_payload<T: BeaconChainTypes>(
300305
timestamp,
301306
random,
302307
finalized_block_hash.unwrap_or_else(Hash256::zero),
308+
proposer_index,
303309
)
304310
.await
305311
.map_err(BlockProductionError::GetPayloadFailed)?;

beacon_node/client/src/builder.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,11 @@ where
700700

701701
// Spawn a routine that tracks the status of the execution engines.
702702
execution_layer.spawn_watchdog_routine(beacon_chain.slot_clock.clone());
703+
704+
// Spawn a routine that removes expired proposer preparations.
705+
execution_layer.spawn_clean_proposer_preparation_routine::<TSlotClock, TEthSpec>(
706+
beacon_chain.slot_clock.clone(),
707+
);
703708
}
704709
}
705710

beacon_node/execution_layer/src/lib.rs

Lines changed: 143 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use lru::LruCache;
1010
use sensitive_url::SensitiveUrl;
1111
use slog::{crit, debug, error, info, Logger};
1212
use slot_clock::SlotClock;
13+
use std::collections::HashMap;
1314
use std::future::Future;
1415
use std::sync::Arc;
1516
use std::time::Duration;
@@ -18,7 +19,7 @@ use tokio::{
1819
sync::{Mutex, MutexGuard},
1920
time::{sleep, sleep_until, Instant},
2021
};
21-
use types::ChainSpec;
22+
use types::{ChainSpec, Epoch, ProposerPreparationData};
2223

2324
pub use engine_api::{http::HttpJsonRpc, ExecutePayloadResponseStatus};
2425

@@ -30,6 +31,16 @@ pub mod test_utils;
3031
/// in an LRU cache to avoid redundant lookups. This is the size of that cache.
3132
const EXECUTION_BLOCKS_LRU_CACHE_SIZE: usize = 128;
3233

34+
/// A fee recipient address for use during block production. Only used as a very last resort if
35+
/// there is no address provided by the user.
36+
///
37+
/// ## Note
38+
///
39+
/// This is *not* the zero-address, since Geth has been known to return errors for a coinbase of
40+
/// 0x00..00.
41+
const DEFAULT_SUGGESTED_FEE_RECIPIENT: [u8; 20] =
42+
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];
43+
3344
#[derive(Debug)]
3445
pub enum Error {
3546
NoEngines,
@@ -46,9 +57,16 @@ impl From<ApiError> for Error {
4657
}
4758
}
4859

60+
#[derive(Clone)]
61+
pub struct ProposerPreparationDataEntry {
62+
update_epoch: Epoch,
63+
preparation_data: ProposerPreparationData,
64+
}
65+
4966
struct Inner {
5067
engines: Engines<HttpJsonRpc>,
5168
suggested_fee_recipient: Option<Address>,
69+
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationDataEntry>>,
5270
execution_blocks: Mutex<LruCache<Hash256, ExecutionBlock>>,
5371
executor: TaskExecutor,
5472
log: Logger,
@@ -96,6 +114,7 @@ impl ExecutionLayer {
96114
log: log.clone(),
97115
},
98116
suggested_fee_recipient,
117+
proposer_preparation_data: Mutex::new(HashMap::new()),
99118
execution_blocks: Mutex::new(LruCache::new(EXECUTION_BLOCKS_LRU_CACHE_SIZE)),
100119
executor,
101120
log,
@@ -116,17 +135,18 @@ impl ExecutionLayer {
116135
&self.inner.executor
117136
}
118137

119-
fn suggested_fee_recipient(&self) -> Result<Address, Error> {
120-
self.inner
121-
.suggested_fee_recipient
122-
.ok_or(Error::FeeRecipientUnspecified)
123-
}
124-
125138
/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
126139
async fn execution_blocks(&self) -> MutexGuard<'_, LruCache<Hash256, ExecutionBlock>> {
127140
self.inner.execution_blocks.lock().await
128141
}
129142

143+
/// Note: this function returns a mutex guard, be careful to avoid deadlocks.
144+
async fn proposer_preparation_data(
145+
&self,
146+
) -> MutexGuard<'_, HashMap<u64, ProposerPreparationDataEntry>> {
147+
self.inner.proposer_preparation_data.lock().await
148+
}
149+
130150
fn log(&self) -> &Logger {
131151
&self.inner.log
132152
}
@@ -234,11 +254,124 @@ impl ExecutionLayer {
234254
self.engines().upcheck_not_synced(Logging::Disabled).await;
235255
}
236256

257+
/// Spawns a routine which cleans the cached proposer preparations periodically.
258+
pub fn spawn_clean_proposer_preparation_routine<S: SlotClock + 'static, T: EthSpec>(
259+
&self,
260+
slot_clock: S,
261+
) {
262+
let preparation_cleaner = |el: ExecutionLayer| async move {
263+
// Start the loop to periodically clean proposer preparation cache.
264+
loop {
265+
if let Some(duration_to_next_epoch) =
266+
slot_clock.duration_to_next_epoch(T::slots_per_epoch())
267+
{
268+
// Wait for next epoch
269+
sleep(duration_to_next_epoch).await;
270+
271+
match slot_clock
272+
.now()
273+
.map(|slot| slot.epoch(T::slots_per_epoch()))
274+
{
275+
Some(current_epoch) => el
276+
.clean_proposer_preparation(current_epoch)
277+
.await
278+
.map_err(|e| {
279+
error!(
280+
el.log(),
281+
"Failed to clean proposer preparation cache";
282+
"error" => format!("{:?}", e)
283+
)
284+
})
285+
.unwrap_or(()),
286+
None => error!(el.log(), "Failed to get current epoch from slot clock"),
287+
}
288+
} else {
289+
error!(el.log(), "Failed to read slot clock");
290+
// If we can't read the slot clock, just wait another slot and retry.
291+
sleep(slot_clock.slot_duration()).await;
292+
}
293+
}
294+
};
295+
296+
self.spawn(preparation_cleaner, "exec_preparation_cleanup");
297+
}
298+
237299
/// Returns `true` if there is at least one synced and reachable engine.
238300
pub async fn is_synced(&self) -> bool {
239301
self.engines().any_synced().await
240302
}
241303

304+
/// Updates the proposer preparation data provided by validators
305+
pub fn update_proposer_preparation_blocking(
306+
&self,
307+
update_epoch: Epoch,
308+
preparation_data: &[ProposerPreparationData],
309+
) -> Result<(), Error> {
310+
self.block_on_generic(|_| async move {
311+
self.update_proposer_preparation(update_epoch, preparation_data)
312+
.await
313+
})?
314+
}
315+
316+
/// Updates the proposer preparation data provided by validators
317+
async fn update_proposer_preparation(
318+
&self,
319+
update_epoch: Epoch,
320+
preparation_data: &[ProposerPreparationData],
321+
) -> Result<(), Error> {
322+
let mut proposer_preparation_data = self.proposer_preparation_data().await;
323+
for preparation_entry in preparation_data {
324+
proposer_preparation_data.insert(
325+
preparation_entry.validator_index,
326+
ProposerPreparationDataEntry {
327+
update_epoch,
328+
preparation_data: preparation_entry.clone(),
329+
},
330+
);
331+
}
332+
333+
Ok(())
334+
}
335+
336+
/// Removes expired entries from cached proposer preparations
337+
async fn clean_proposer_preparation(&self, current_epoch: Epoch) -> Result<(), Error> {
338+
let mut proposer_preparation_data = self.proposer_preparation_data().await;
339+
340+
// Keep all entries that have been updated in the last 2 epochs
341+
let retain_epoch = current_epoch.saturating_sub(Epoch::new(2));
342+
proposer_preparation_data.retain(|_validator_index, preparation_entry| {
343+
preparation_entry.update_epoch >= retain_epoch
344+
});
345+
346+
Ok(())
347+
}
348+
349+
/// Returns the fee-recipient address that should be used to build a block
350+
async fn get_suggested_fee_recipient(&self, proposer_index: u64) -> Address {
351+
if let Some(preparation_data_entry) =
352+
self.proposer_preparation_data().await.get(&proposer_index)
353+
{
354+
// The values provided via the API have first priority.
355+
preparation_data_entry.preparation_data.fee_recipient
356+
} else if let Some(address) = self.inner.suggested_fee_recipient {
357+
// If there has been no fee recipient provided via the API, but the BN has been provided
358+
// with a global default address, use that.
359+
address
360+
} else {
361+
// If there is no user-provided fee recipient, use a junk value and complain loudly.
362+
crit!(
363+
self.log(),
364+
"Fee recipient unknown";
365+
"msg" => "the suggested_fee_recipient was unknown during block production. \
366+
a junk address was used, rewards were lost! \
367+
check the --suggested-fee-recipient flag and VC configuration.",
368+
"proposer_index" => ?proposer_index
369+
);
370+
371+
Address::from_slice(&DEFAULT_SUGGESTED_FEE_RECIPIENT)
372+
}
373+
}
374+
242375
/// Maps to the `engine_getPayload` JSON-RPC call.
243376
///
244377
/// However, it will attempt to call `self.prepare_payload` if it cannot find an existing
@@ -254,8 +387,10 @@ impl ExecutionLayer {
254387
timestamp: u64,
255388
random: Hash256,
256389
finalized_block_hash: Hash256,
390+
proposer_index: u64,
257391
) -> Result<ExecutionPayload<T>, Error> {
258-
let suggested_fee_recipient = self.suggested_fee_recipient()?;
392+
let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await;
393+
259394
debug!(
260395
self.log(),
261396
"Issuing engine_getPayload";

beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,16 @@ impl<T: EthSpec> MockExecutionLayer<T> {
127127
.await
128128
.unwrap();
129129

130+
let validator_index = 0;
130131
let payload = self
131132
.el
132-
.get_payload::<T>(parent_hash, timestamp, random, finalized_block_hash)
133+
.get_payload::<T>(
134+
parent_hash,
135+
timestamp,
136+
random,
137+
finalized_block_hash,
138+
validator_index,
139+
)
133140
.await
134141
.unwrap();
135142
let block_hash = payload.block_hash;

beacon_node/http_api/src/lib.rs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ use tokio::sync::mpsc::UnboundedSender;
4545
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
4646
use types::{
4747
Attestation, AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch,
48-
EthSpec, ForkName, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBeaconBlock,
49-
SignedContributionAndProof, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
50-
SyncContributionData,
48+
EthSpec, ForkName, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
49+
SignedAggregateAndProof, SignedBeaconBlock, SignedContributionAndProof, SignedVoluntaryExit,
50+
Slot, SyncCommitteeMessage, SyncContributionData,
5151
};
5252
use version::{
5353
add_consensus_version_header, fork_versioned_response, inconsistent_fork_rejection,
@@ -2186,6 +2186,53 @@ pub fn serve<T: BeaconChainTypes>(
21862186
},
21872187
);
21882188

2189+
// POST validator/prepare_beacon_proposer
2190+
let post_validator_prepare_beacon_proposer = eth1_v1
2191+
.and(warp::path("validator"))
2192+
.and(warp::path("prepare_beacon_proposer"))
2193+
.and(warp::path::end())
2194+
.and(not_while_syncing_filter.clone())
2195+
.and(chain_filter.clone())
2196+
.and(warp::addr::remote())
2197+
.and(log_filter.clone())
2198+
.and(warp::body::json())
2199+
.and_then(
2200+
|chain: Arc<BeaconChain<T>>,
2201+
client_addr: Option<SocketAddr>,
2202+
log: Logger,
2203+
preparation_data: Vec<ProposerPreparationData>| {
2204+
blocking_json_task(move || {
2205+
let execution_layer = chain
2206+
.execution_layer
2207+
.as_ref()
2208+
.ok_or(BeaconChainError::ExecutionLayerMissing)
2209+
.map_err(warp_utils::reject::beacon_chain_error)?;
2210+
let current_epoch = chain
2211+
.epoch()
2212+
.map_err(warp_utils::reject::beacon_chain_error)?;
2213+
2214+
debug!(
2215+
log,
2216+
"Received proposer preparation data";
2217+
"count" => preparation_data.len(),
2218+
"client" => client_addr
2219+
.map(|a| a.to_string())
2220+
.unwrap_or_else(|| "unknown".to_string()),
2221+
);
2222+
2223+
execution_layer
2224+
.update_proposer_preparation_blocking(current_epoch, &preparation_data)
2225+
.map_err(|_e| {
2226+
warp_utils::reject::custom_bad_request(
2227+
"error processing proposer preparations".to_string(),
2228+
)
2229+
})?;
2230+
2231+
Ok(())
2232+
})
2233+
},
2234+
);
2235+
21892236
// POST validator/sync_committee_subscriptions
21902237
let post_validator_sync_committee_subscriptions = eth1_v1
21912238
.and(warp::path("validator"))
@@ -2710,6 +2757,7 @@ pub fn serve<T: BeaconChainTypes>(
27102757
.or(post_validator_contribution_and_proofs.boxed())
27112758
.or(post_validator_beacon_committee_subscriptions.boxed())
27122759
.or(post_validator_sync_committee_subscriptions.boxed())
2760+
.or(post_validator_prepare_beacon_proposer.boxed())
27132761
.or(post_lighthouse_liveness.boxed())
27142762
.or(post_lighthouse_database_reconstruct.boxed())
27152763
.or(post_lighthouse_database_historical_blocks.boxed()),

0 commit comments

Comments
 (0)