Skip to content

Commit 8e6224e

Browse files
bernard-wagnerEvalir
authored andcommitted
feat(cast): subscribe to logs using websockets (foundry-rs#5743)
* feat(cast): subscribe to logs * undo generic signal * fix tokio signal feature --------- Co-authored-by: Enrique Ortiz <[email protected]>
1 parent 8bfc145 commit 8e6224e

File tree

3 files changed

+186
-43
lines changed

3 files changed

+186
-43
lines changed

crates/cast/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ regex = { version = "1", default-features = false }
6060
rpassword = "7"
6161
semver = "1"
6262
tempfile = "3"
63-
tokio = { version = "1", features = ["macros"] }
63+
tokio = { version = "1", features = ["macros", "signal"] }
6464
tracing = "0.1"
6565
yansi = "0.5"
6666

crates/cast/bin/cmd/logs.rs

+40-41
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
1+
use std::{io, str::FromStr};
2+
13
use cast::Cast;
24
use clap::Parser;
3-
use ethers::{
5+
use ethers::{providers::Middleware, types::NameOrAddress};
6+
use ethers_core::{
47
abi::{Address, Event, RawTopicFilter, Topic, TopicFilter},
5-
providers::Middleware,
6-
types::{BlockId, BlockNumber, Filter, FilterBlockOption, NameOrAddress, ValueOrArray, H256},
8+
types::{BlockId, BlockNumber, Filter, FilterBlockOption, ValueOrArray, H256},
79
};
810
use eyre::Result;
911
use foundry_cli::{opts::EthereumOpts, utils};
1012
use foundry_common::abi::{get_event, parse_tokens};
1113
use foundry_config::Config;
1214
use itertools::Itertools;
13-
use std::str::FromStr;
1415

1516
/// CLI arguments for `cast logs`.
1617
#[derive(Debug, Parser)]
@@ -44,7 +45,12 @@ pub struct LogsArgs {
4445
#[clap(value_name = "TOPICS_OR_ARGS")]
4546
topics_or_args: Vec<String>,
4647

47-
/// Print the logs as JSON.
48+
/// If the RPC type and endpoints supports `eth_subscribe` stream logs instead of printing and
49+
/// exiting. Will continue until interrupted or TO_BLOCK is reached.
50+
#[clap(long)]
51+
subscribe: bool,
52+
53+
/// Print the logs as JSON.s
4854
#[clap(long, short, help_heading = "Display options")]
4955
json: bool,
5056

@@ -55,12 +61,21 @@ pub struct LogsArgs {
5561
impl LogsArgs {
5662
pub async fn run(self) -> Result<()> {
5763
let LogsArgs {
58-
from_block, to_block, address, topics_or_args, sig_or_topic, json, eth, ..
64+
from_block,
65+
to_block,
66+
address,
67+
sig_or_topic,
68+
topics_or_args,
69+
subscribe,
70+
json,
71+
eth,
5972
} = self;
6073

6174
let config = Config::from(&eth);
6275
let provider = utils::get_provider(&config)?;
6376

77+
let cast = Cast::new(&provider);
78+
6479
let address = match address {
6580
Some(address) => {
6681
let address = match address {
@@ -72,48 +87,29 @@ impl LogsArgs {
7287
None => None,
7388
};
7489

75-
let from_block = convert_block_number(&provider, from_block).await?;
76-
let to_block = convert_block_number(&provider, to_block).await?;
77-
78-
let cast = Cast::new(&provider);
90+
let from_block = cast.convert_block_number(from_block).await?;
91+
let to_block = cast.convert_block_number(to_block).await?;
7992

8093
let filter = build_filter(from_block, to_block, address, sig_or_topic, topics_or_args)?;
8194

82-
let logs = cast.filter_logs(filter, json).await?;
95+
if !subscribe {
96+
let logs = cast.filter_logs(filter, json).await?;
8397

84-
println!("{}", logs);
98+
println!("{}", logs);
8599

86-
Ok(())
87-
}
88-
}
100+
return Ok(())
101+
}
89102

90-
/// Converts a block identifier into a block number.
91-
///
92-
/// If the block identifier is a block number, then this function returns the block number. If the
93-
/// block identifier is a block hash, then this function returns the block number of that block
94-
/// hash. If the block identifier is `None`, then this function returns `None`.
95-
async fn convert_block_number<M: Middleware>(
96-
provider: M,
97-
block: Option<BlockId>,
98-
) -> Result<Option<BlockNumber>, eyre::Error>
99-
where
100-
M::Error: 'static,
101-
{
102-
match block {
103-
Some(block) => match block {
104-
BlockId::Number(block_number) => Ok(Some(block_number)),
105-
BlockId::Hash(hash) => {
106-
let block = provider.get_block(hash).await?;
107-
Ok(block.map(|block| block.number.unwrap()).map(BlockNumber::from))
108-
}
109-
},
110-
None => Ok(None),
103+
let mut stdout = io::stdout();
104+
cast.subscribe(filter, &mut stdout, json).await?;
105+
106+
Ok(())
111107
}
112108
}
113109

114-
// First tries to parse the `sig_or_topic` as an event signature. If successful, `topics_or_args` is
115-
// parsed as indexed inputs and converted to topics. Otherwise, `sig_or_topic` is prepended to
116-
// `topics_or_args` and used as raw topics.
110+
/// Builds a Filter by first trying to parse the `sig_or_topic` as an event signature. If
111+
/// successful, `topics_or_args` is parsed as indexed inputs and converted to topics. Otherwise,
112+
/// `sig_or_topic` is prepended to `topics_or_args` and used as raw topics.
117113
fn build_filter(
118114
from_block: Option<BlockNumber>,
119115
to_block: Option<BlockNumber>,
@@ -154,7 +150,7 @@ fn build_filter(
154150
Ok(filter)
155151
}
156152

157-
// Creates a TopicFilter for the given event signature and arguments.
153+
/// Creates a TopicFilter from the given event signature and arguments.
158154
fn build_filter_event_sig(event: Event, args: Vec<String>) -> Result<TopicFilter, eyre::Error> {
159155
let args = args.iter().map(|arg| arg.as_str()).collect::<Vec<_>>();
160156

@@ -195,7 +191,7 @@ fn build_filter_event_sig(event: Event, args: Vec<String>) -> Result<TopicFilter
195191
Ok(event.filter(raw)?)
196192
}
197193

198-
// Creates a TopicFilter from raw topic hashes.
194+
/// Creates a TopicFilter from raw topic hashes.
199195
fn build_filter_topics(topics: Vec<String>) -> Result<TopicFilter, eyre::Error> {
200196
let mut topics = topics
201197
.into_iter()
@@ -214,8 +210,11 @@ fn build_filter_topics(topics: Vec<String>) -> Result<TopicFilter, eyre::Error>
214210

215211
#[cfg(test)]
216212
mod tests {
213+
use std::str::FromStr;
214+
217215
use super::*;
218216
use ethers::types::H160;
217+
use ethers_core::types::H256;
219218

220219
const ADDRESS: &str = "0x4D1A2e2bB4F88F0250f26Ffff098B0b30B26BF38";
221220
const TRANSFER_SIG: &str = "Transfer(address indexed,address indexed,uint256)";

crates/cast/src/lib.rs

+145-1
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,25 @@ use ethers_core::{
1313
},
1414
};
1515
use ethers_etherscan::{errors::EtherscanError, Client};
16-
use ethers_providers::{Middleware, PendingTransaction};
16+
use ethers_providers::{Middleware, PendingTransaction, PubsubClient};
1717
use evm_disassembler::{disassemble_bytes, disassemble_str, format_operations};
1818
use eyre::{Context, Result};
1919
use foundry_common::{abi::encode_args, fmt::*, TransactionReceiptWithRevertReason};
2020
pub use foundry_evm::*;
21+
use futures::{future::Either, FutureExt, StreamExt};
2122
use rayon::prelude::*;
2223
pub use rusoto_core::{
2324
credential::ChainProvider as AwsChainProvider, region::Region as AwsRegion,
2425
request::HttpClient as AwsHttpClient, Client as AwsClient,
2526
};
2627
pub use rusoto_kms::KmsClient;
2728
use std::{
29+
io,
2830
path::PathBuf,
2931
str::FromStr,
3032
sync::atomic::{AtomicBool, Ordering},
3133
};
34+
use tokio::signal::ctrl_c;
3235
pub use tx::TxBuilder;
3336
use tx::{TxBuilderOutput, TxBuilderPeekOutput};
3437

@@ -816,6 +819,147 @@ where
816819
};
817820
Ok(res)
818821
}
822+
823+
/// Converts a block identifier into a block number.
824+
///
825+
/// If the block identifier is a block number, then this function returns the block number. If
826+
/// the block identifier is a block hash, then this function returns the block number of
827+
/// that block hash. If the block identifier is `None`, then this function returns `None`.
828+
///
829+
/// # Example
830+
///
831+
/// ```no_run
832+
/// use cast::Cast;
833+
/// use ethers_providers::{Provider, Http};
834+
/// use ethers_core::types::{BlockId, BlockNumber};
835+
/// use std::convert::TryFrom;
836+
///
837+
/// # async fn foo() -> eyre::Result<()> {
838+
/// let provider = Provider::<Http>::try_from("http://localhost:8545")?;
839+
/// let cast = Cast::new(provider);
840+
///
841+
/// let block_number = cast.convert_block_number(Some(BlockId::Number(BlockNumber::from(5)))).await?;
842+
/// assert_eq!(block_number, Some(BlockNumber::from(5)));
843+
///
844+
/// let block_number = cast.convert_block_number(Some(BlockId::Hash("0x1234".parse().unwrap()))).await?;
845+
/// assert_eq!(block_number, Some(BlockNumber::from(1234)));
846+
///
847+
/// let block_number = cast.convert_block_number(None).await?;
848+
/// assert_eq!(block_number, None);
849+
/// # Ok(())
850+
/// # }
851+
/// ```
852+
pub async fn convert_block_number(
853+
&self,
854+
block: Option<BlockId>,
855+
) -> Result<Option<BlockNumber>, eyre::Error> {
856+
match block {
857+
Some(block) => match block {
858+
BlockId::Number(block_number) => Ok(Some(block_number)),
859+
BlockId::Hash(hash) => {
860+
let block = self.provider.get_block(hash).await?;
861+
Ok(block.map(|block| block.number.unwrap()).map(BlockNumber::from))
862+
}
863+
},
864+
None => Ok(None),
865+
}
866+
}
867+
868+
/// Sets up a subscription to the given filter and writes the logs to the given output.
869+
///
870+
/// # Example
871+
///
872+
/// ```no_run
873+
/// use cast::Cast;
874+
/// use ethers_core::abi::Address;
875+
/// use ethers_providers::{Provider, Ws};
876+
/// use ethers_core::types::Filter;
877+
/// use std::{str::FromStr, convert::TryFrom};
878+
/// use std::io;
879+
///
880+
/// # async fn foo() -> eyre::Result<()> {
881+
/// let provider = Provider::new(Ws::connect("wss://localhost:8545").await?);
882+
/// let cast = Cast::new(provider);
883+
///
884+
/// let filter = Filter::new().address(Address::from_str("0x00000000006c3852cbEf3e08E8dF289169EdE581")?);
885+
/// let mut output = io::stdout();
886+
/// cast.subscribe(filter, &mut output, false).await?;
887+
/// # Ok(())
888+
/// # }
889+
/// ```
890+
pub async fn subscribe(
891+
&self,
892+
filter: Filter,
893+
output: &mut dyn io::Write,
894+
to_json: bool,
895+
) -> Result<()>
896+
where
897+
<M as Middleware>::Provider: PubsubClient,
898+
{
899+
// Initialize the subscription stream for logs
900+
let mut subscription = self.provider.subscribe_logs(&filter).await?;
901+
902+
// Check if a to_block is specified, if so, subscribe to blocks
903+
let mut block_subscription = if filter.get_to_block().is_some() {
904+
Some(self.provider.subscribe_blocks().await?)
905+
} else {
906+
None
907+
};
908+
909+
let to_block_number = filter.get_to_block();
910+
911+
// If output should be JSON, start with an opening bracket
912+
if to_json {
913+
write!(output, "[")?;
914+
}
915+
916+
let mut first = true;
917+
918+
loop {
919+
tokio::select! {
920+
// If block subscription is present, listen to it to avoid blocking indefinitely past the desired to_block
921+
block = if let Some(bs) = &mut block_subscription {
922+
Either::Left(bs.next().fuse())
923+
} else {
924+
Either::Right(futures::future::pending())
925+
} => {
926+
if let (Some(block), Some(to_block)) = (block, to_block_number) {
927+
if block.number.map_or(false, |bn| bn > to_block) {
928+
break;
929+
}
930+
}
931+
},
932+
// Process incoming log
933+
log = subscription.next() => {
934+
if to_json {
935+
if !first {
936+
write!(output, ",")?;
937+
}
938+
first = false;
939+
let log_str = serde_json::to_string(&log).unwrap();
940+
write!(output, "{}", log_str)?;
941+
} else {
942+
let log_str = log.pretty()
943+
.replacen('\n', "- ", 1) // Remove empty first line
944+
.replace('\n', "\n "); // Indent
945+
writeln!(output, "{}", log_str)?;
946+
}
947+
},
948+
// Break on cancel signal, to allow for closing JSON bracket
949+
_ = ctrl_c() => {
950+
break;
951+
},
952+
else => break,
953+
}
954+
}
955+
956+
// If output was JSON, end with a closing bracket
957+
if to_json {
958+
write!(output, "]")?;
959+
}
960+
961+
Ok(())
962+
}
819963
}
820964

821965
pub struct InterfaceSource {

0 commit comments

Comments
 (0)