Skip to content

Commit c74c4f2

Browse files
committed
Merge branch 'bat/feat/wait-for-genesis-logs' (#2502)
* origin/bat/feat/wait-for-genesis-logs: [chore]: Add changelog [feat]: Reworks the way the ledger waits for genesis start. It now fully initializes the node and outputs logs before sleeping until genesis start time
2 parents b7f083d + 90266c5 commit c74c4f2

File tree

5 files changed

+56
-52
lines changed

5 files changed

+56
-52
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
- Reworks the way the ledger waits for genesis start. It now fully initializes the node and
2+
outputs logs before sleeping until genesis start time. Previously it would not start any
3+
processes until genesis times, giving no feedback to users until genesis time was reached.
4+
([\#2502](https://github.com/anoma/namada/pull/2502))

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ test-e2e:
168168
NAMADA_E2E_USE_PREBUILT_BINARIES=$(NAMADA_E2E_USE_PREBUILT_BINARIES) \
169169
NAMADA_E2E_DEBUG=$(NAMADA_E2E_DEBUG) \
170170
RUST_BACKTRACE=$(RUST_BACKTRACE) \
171-
$(cargo) +$(nightly) test e2e::$(TEST_FILTER) \
171+
$(cargo) +$(nightly) test $(jobs) e2e::$(TEST_FILTER) \
172172
-Z unstable-options \
173173
-- \
174174
--test-threads=1 \

crates/apps/src/lib/node/ledger/broadcaster.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::net::SocketAddr;
22
use std::ops::ControlFlow;
33

44
use namada::types::control_flow::time;
5+
use namada::types::time::{DateTimeUtc, Utc};
56
use tokio::sync::mpsc::UnboundedReceiver;
67

78
use crate::facade::tendermint_rpc::{Client, HttpClient};
@@ -27,7 +28,15 @@ impl Broadcaster {
2728

2829
/// Loop forever, broadcasting messages that have been received
2930
/// by the receiver
30-
async fn run_loop(&mut self) {
31+
async fn run_loop(&mut self, genesis_time: DateTimeUtc) {
32+
// wait for start time if necessary
33+
if let Ok(sleep_time) =
34+
genesis_time.0.signed_duration_since(Utc::now()).to_std()
35+
{
36+
if !sleep_time.is_zero() {
37+
tokio::time::sleep(sleep_time).await;
38+
}
39+
}
3140
let result = time::Sleep {
3241
strategy: time::ExponentialBackoff {
3342
base: 2,
@@ -62,6 +71,8 @@ impl Broadcaster {
6271
if let Err(()) = result {
6372
tracing::error!("Broadcaster failed to connect to CometBFT node");
6473
return;
74+
} else {
75+
tracing::info!("Broadcaster successfully started.");
6576
}
6677
loop {
6778
if let Some(msg) = self.receiver.recv().await {
@@ -75,10 +86,11 @@ impl Broadcaster {
7586
pub async fn run(
7687
&mut self,
7788
abort_recv: tokio::sync::oneshot::Receiver<()>,
89+
genesis_time: DateTimeUtc,
7890
) {
7991
tracing::info!("Starting broadcaster.");
8092
tokio::select! {
81-
_ = self.run_loop() => {
93+
_ = self.run_loop(genesis_time) => {
8294
tracing::error!("Broadcaster unexpectedly shut down.");
8395
tracing::info!("Shutting down broadcaster...");
8496
},

crates/apps/src/lib/node/ledger/mod.rs

Lines changed: 4 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use futures::future::TryFutureExt;
1717
use namada::eth_bridge::ethers::providers::{Http, Provider};
1818
use namada::governance::storage::keys as governance_storage;
1919
use namada::types::storage::Key;
20-
use namada::types::time::{DateTimeUtc, Utc};
20+
use namada::types::time::DateTimeUtc;
2121
use namada_sdk::tendermint::abci::request::CheckTxKind;
2222
use once_cell::unsync::Lazy;
2323
use sysinfo::{RefreshKind, System, SystemExt};
@@ -242,12 +242,6 @@ pub fn rollback(config: config::Ledger) -> Result<(), shell::Error> {
242242
///
243243
/// All must be alive for correct functioning.
244244
async fn run_aux(config: config::Ledger, wasm_dir: PathBuf) {
245-
// wait for genesis time
246-
let genesis_time = DateTimeUtc::try_from(config.genesis_time.clone())
247-
.expect("Should be able to parse genesis time");
248-
if let std::ops::ControlFlow::Break(_) = sleep_until(genesis_time).await {
249-
return;
250-
}
251245
let setup_data = run_aux_setup(&config, &wasm_dir).await;
252246

253247
// Create an `AbortableSpawner` for signalling shut down from the shell or
@@ -441,7 +435,8 @@ fn start_abci_broadcaster_shell(
441435
// Channels for validators to send protocol txs to be broadcast to the
442436
// broadcaster service
443437
let (broadcaster_sender, broadcaster_receiver) = mpsc::unbounded_channel();
444-
438+
let genesis_time = DateTimeUtc::try_from(config.genesis_time.clone())
439+
.expect("Should be able to parse genesis time");
445440
// Start broadcaster
446441
let broadcaster = if matches!(
447442
config.shell.tendermint_mode,
@@ -456,7 +451,7 @@ fn start_abci_broadcaster_shell(
456451
// the ledger
457452
let mut broadcaster =
458453
Broadcaster::new(rpc_address, broadcaster_receiver);
459-
broadcaster.run(bc_abort_recv).await;
454+
broadcaster.run(bc_abort_recv, genesis_time).await;
460455
tracing::info!("Broadcaster is no longer running.");
461456

462457
drop(aborter);
@@ -788,36 +783,3 @@ pub fn test_genesis_files(
788783
fn spawn_dummy_task<T: Send + 'static>(ready: T) -> task::JoinHandle<T> {
789784
tokio::spawn(async { std::future::ready(ready).await })
790785
}
791-
792-
/// Sleep until the genesis time if necessary.
793-
async fn sleep_until(time: DateTimeUtc) -> std::ops::ControlFlow<()> {
794-
// Sleep until start time if needed
795-
let sleep = async {
796-
if let Ok(sleep_time) =
797-
time.0.signed_duration_since(Utc::now()).to_std()
798-
{
799-
if !sleep_time.is_zero() {
800-
tracing::info!(
801-
"Waiting for ledger genesis time: {:?}, time left: {:?}",
802-
time,
803-
sleep_time
804-
);
805-
tokio::time::sleep(sleep_time).await
806-
}
807-
}
808-
};
809-
let shutdown_signal = async {
810-
let (tx, rx) = tokio::sync::oneshot::channel();
811-
namada_sdk::control_flow::shutdown_send(tx).await;
812-
rx.await
813-
};
814-
tokio::select! {
815-
_ = shutdown_signal => {
816-
std::ops::ControlFlow::Break(())
817-
}
818-
_ = sleep => {
819-
tracing::info!("Genesis time reached, starting ledger");
820-
std::ops::ControlFlow::Continue(())
821-
}
822-
}
823-
}

crates/apps/src/lib/node/ledger/shims/abcipp_shim.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use namada::tx::Tx;
1111
use namada::types::hash::Hash;
1212
use namada::types::key::tm_raw_hash_to_string;
1313
use namada::types::storage::{BlockHash, BlockHeight};
14+
use namada::types::time::Utc;
15+
use namada_sdk::types::time::DateTimeUtc;
1416
use tokio::sync::broadcast;
1517
use tokio::sync::mpsc::UnboundedSender;
1618
use tower::Service;
@@ -182,6 +184,7 @@ impl AbcippShim {
182184
Err(err) => Err(err),
183185
},
184186
};
187+
185188
let resp = resp.map_err(|e| e.into());
186189
if resp_sender.send(resp).is_err() {
187190
tracing::info!("ABCI response channel is closed")
@@ -292,20 +295,43 @@ impl AbciService {
292295
/// forward it normally.
293296
fn forward_request(&mut self, req: Req) -> <Self as Service<Req>>::Future {
294297
let (resp_send, recv) = tokio::sync::oneshot::channel();
295-
let result = self.shell_send.send((req, resp_send));
296-
298+
let shell_send = self.shell_send.clone();
297299
async move {
300+
let genesis_time = if let Req::InitChain(ref init) = req {
301+
Some(
302+
DateTimeUtc::try_from(init.time)
303+
.expect("Should be able to parse genesis time."),
304+
)
305+
} else {
306+
None
307+
};
308+
let result = shell_send.send((req, resp_send));
298309
if let Err(err) = result {
299310
// The shell has shut-down
300311
return Err(err.into());
301312
}
302-
match recv.await {
303-
Ok(resp) => resp,
304-
Err(err) => {
313+
recv.await
314+
.unwrap_or_else(|err| {
305315
tracing::info!("ABCI response channel didn't respond");
306316
Err(err.into())
307-
}
308-
}
317+
})
318+
.map(|res| {
319+
// emit a log line stating that we are sleeping until
320+
// genesis.
321+
if let Some(Ok(sleep_time)) = genesis_time
322+
.map(|t| t.0.signed_duration_since(Utc::now()).to_std())
323+
{
324+
if !sleep_time.is_zero() {
325+
tracing::info!(
326+
"Waiting for ledger genesis time: {:?}, time \
327+
left: {:?}",
328+
genesis_time.unwrap(),
329+
sleep_time
330+
);
331+
}
332+
}
333+
res
334+
})
309335
}
310336
.boxed()
311337
}

0 commit comments

Comments
 (0)