Skip to content

Commit 20d4033

Browse files
Implement log filters (#420)
* Implement log filters --------- Signed-off-by: Alexandru Cihodaru <[email protected]>
1 parent 848ce17 commit 20d4033

File tree

7 files changed

+1037
-36
lines changed

7 files changed

+1037
-36
lines changed

crates/anvil-polkadot/src/api_server/filters.rs

Lines changed: 195 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,29 @@
1-
use crate::api_server::error::ToRpcResponseResult;
1+
use crate::api_server::error::{Result, ToRpcResponseResult};
22
use anvil_core::eth::subscription::SubscriptionId;
33
use anvil_rpc::response::ResponseResult;
4-
use futures::{Stream, StreamExt};
4+
use futures::{FutureExt, Stream, StreamExt};
5+
use pallet_revive_eth_rpc::client::Client as EthRpcClient;
6+
use polkadot_sdk::pallet_revive::evm::{BlockNumberOrTag, Filter, Log};
57
use std::{collections::HashMap, sync::Arc, task::Poll, time::Duration};
68
use subxt::utils::H256;
79
use tokio::{sync::Mutex, time::Instant};
8-
use tokio_stream::wrappers::BroadcastStream;
10+
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
911

12+
/// Default timeout duration for active filters in seconds.
13+
/// Filters that haven't been polled within this duration will be evicted.
1014
pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5;
15+
16+
/// Maps filter IDs to tuples of filter and deadline.
1117
type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;
18+
19+
/// Type alias for block notification streams.
1220
pub type BlockNotifications = BroadcastStream<H256>;
1321

14-
#[derive(Clone, Debug)]
22+
/// Manages Ethereum style filters for block notifications, logs and pending transactions.
23+
///
24+
/// Maintains active filters and automatically evicts that haven't been polled within the
25+
/// keep alive duration. Each filter is identified by a unique hexa string.
26+
#[derive(Clone)]
1527
pub struct Filters {
1628
/// Currently active filters
1729
active_filters: FilterMap,
@@ -25,7 +37,7 @@ impl Filters {
2537
Self { active_filters: Arc::new(Mutex::new(HashMap::default())), keep_alive }
2638
}
2739

28-
/// Inserts a new Filter
40+
/// Inserts a new Filter and returns its unique identifier.
2941
pub async fn add_filter(&self, filter: EthFilter) -> String {
3042
let id = new_id();
3143
trace!(target: "node::filters", "Adding new filter id {}", id);
@@ -34,15 +46,27 @@ impl Filters {
3446
id
3547
}
3648

37-
/// Poll the filter for updates.
49+
/// Poll the filter for changes since the last call.
50+
///
51+
/// This method retrieves any new data from the specified filter and resets its deadline.
52+
///
53+
/// - Block filters: Returns an array of new block hashes
54+
/// - Log filters: Returns an array of logs matching the filter criteria, including both
55+
/// historic logs on the first poll and new logs from blocks produced since the last poll.
3856
pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
3957
{
4058
let mut filters = self.active_filters.lock().await;
4159
if let Some((filter, deadline)) = filters.get_mut(id) {
42-
let response = filter
43-
.next()
44-
.await
45-
.unwrap_or_else(|| ResponseResult::success(Vec::<()>::new()));
60+
let response = match filter {
61+
EthFilter::Logs(logs_filter) => {
62+
let logs = logs_filter.drain_logs().await;
63+
Ok(logs).to_rpc_result()
64+
}
65+
_ => filter
66+
.next()
67+
.await
68+
.unwrap_or_else(|| ResponseResult::success(Vec::<()>::new())),
69+
};
4670
*deadline = self.next_deadline();
4771
return response;
4872
}
@@ -51,17 +75,31 @@ impl Filters {
5175
ResponseResult::success(Vec::<()>::new())
5276
}
5377

54-
/// The lifetime of filters
78+
/// Returns the log filter criteria for a given filter ID.
79+
pub async fn get_log_filter(&self, id: &str) -> Option<Filter> {
80+
let filters = self.active_filters.lock().await;
81+
if let Some((EthFilter::Logs(log), _)) = filters.get(id) {
82+
return Some(log.filter.clone());
83+
}
84+
None
85+
}
86+
87+
/// Returns the keepalive duration for filters.
5588
pub fn keep_alive(&self) -> Duration {
5689
self.keep_alive
5790
}
5891

59-
/// Removes the filter associated with the given id.
92+
/// Removes and returns the filter associated with the given identifier.
6093
pub async fn uninstall_filter(&self, id: &str) -> Option<EthFilter> {
6194
trace!(target: "node::filter", "Uninstalling filter id {}", id);
6295
self.active_filters.lock().await.remove(id).map(|(f, _)| f)
6396
}
6497

98+
/// Evicts all filters that have exceeded their keepalive deadline.
99+
///
100+
/// This method is typically called periodically by the eviction task to clean up
101+
/// stale filters that haven't been polled recently. Evicted filters are permanently
102+
/// removed and cannot be recovered.
65103
pub async fn evict(&self) {
66104
trace!(target: "node::filter", "Evicting stale filters");
67105
let now = Instant::now();
@@ -93,6 +131,14 @@ fn new_id() -> String {
93131
SubscriptionId::random_hex().to_string()
94132
}
95133

134+
/// Background task that periodically evicts stale filters.
135+
///
136+
/// This task runs an infinite loop that calls `Filters::evict()` at regular intervals
137+
/// based on the filter keepalive duration. It ensures that filters which haven't been
138+
/// polled are automatically removed to prevent memory leaks.
139+
///
140+
/// The task should be spawned once when the filter system is initialized and will
141+
/// run for the lifetime of the application.
96142
pub async fn eviction_task(filters: Filters) {
97143
let start = filters.next_deadline();
98144
let mut interval = tokio::time::interval_at(start, filters.keep_alive());
@@ -102,9 +148,23 @@ pub async fn eviction_task(filters: Filters) {
102148
}
103149
}
104150

105-
#[derive(Debug)]
151+
/// Implements the Ethereum JSON-RPC filter specification, supporting block
152+
/// log and pending transactions filtering capabilities. Each filter type
153+
/// has different polling behavior and data delivery semantics.
106154
pub enum EthFilter {
155+
/// Block filter that streams new block hashes.
156+
///
157+
/// Emits the hash (H256) of each new block as it's added to the chain.
158+
/// Subscribers receive notifications through the broadcast channel. When polled,
159+
/// returns all block hashes produced since the last poll.
107160
Blocks(BlockNotifications),
161+
/// Log filter that tracks contract event logs.
162+
///
163+
/// Filters logs based on block range, addresses, and topics. Combines historic
164+
/// logs (from the initial query range) with real-time logs from newly produced
165+
/// blocks. The filter applies topic matching with OR logic between topic alternatives
166+
/// and validates block ranges for incoming blocks.
167+
Logs(Box<LogsFilter>),
108168
}
109169

110170
impl Stream for EthFilter {
@@ -131,6 +191,128 @@ impl Stream for EthFilter {
131191
}
132192
Poll::Ready(Some(Ok(new_blocks).to_rpc_result()))
133193
}
194+
// handled directly in get_filter_changes
195+
Self::Logs(_) => Poll::Pending,
196+
}
197+
}
198+
}
199+
200+
/// Filter for tracking and collecting contract event logs.
201+
///
202+
/// Combines historic log queries with real-time log streaming to provide
203+
/// a complete view of logs matching the filter criteria. On creation, it optionally
204+
/// fetches historic logs based on the specified block range. Subsequently, it monitors
205+
/// new blocks and queries them for matching logs.
206+
///
207+
/// The filter validates that incoming blocks are within the specified range (from_block
208+
/// to to_block) before querying them, ensuring efficient operation and correct semantics.
209+
pub struct LogsFilter {
210+
/// Stream of new block notifications
211+
blocks: BlockNotifications,
212+
/// Client for querying Ethereum RPC endpoints
213+
eth_client: EthRpcClient,
214+
/// Filter criteria (addresses, topics, block range)
215+
filter: Filter,
216+
/// Historic logs fetched at filter creation time, returned on first poll
217+
historic: Option<Vec<Log>>,
218+
}
219+
220+
impl LogsFilter {
221+
/// Creates a new log filter with the specified criteria.
222+
///
223+
/// If the filter specifies a block range (from_block, to_block) or specific block hash,
224+
/// this constructor will immediately query for historic logs matching the criteria.
225+
/// These historic logs are stored and returned on the first call to `get_filter_changes`.
226+
///
227+
/// For filters without explicit block constraints, only real-time logs from future
228+
/// blocks will be collected.
229+
pub async fn new(
230+
block_notifier: BlockNotifications,
231+
eth_rpc_client: EthRpcClient,
232+
filter: Filter,
233+
) -> Result<Self> {
234+
let historic = if filter.from_block.is_some()
235+
|| filter.to_block.is_some()
236+
|| filter.block_hash.is_some()
237+
{
238+
eth_rpc_client.logs(Some(filter.clone())).await.ok()
239+
} else {
240+
None
241+
};
242+
Ok(Self { blocks: block_notifier, eth_client: eth_rpc_client, filter, historic })
243+
}
244+
245+
/// Drains all accumulated logs since the last poll.
246+
///
247+
/// This method:
248+
/// 1. Takes any historic logs (returned only on first call)
249+
/// 2. Drains all pending block notifications without blocking
250+
/// 3. For each new block, checks if it's within the filter's block range
251+
/// 4. Queries each relevant block for logs matching the filter criteria
252+
/// 5. Returns the combined set of logs
253+
async fn drain_logs(&mut self) -> Vec<Log> {
254+
let mut logs = self.historic.take().unwrap_or_default();
255+
let mut block_hashes = vec![];
256+
while let Some(result) = self.blocks.next().now_or_never().flatten() {
257+
match result {
258+
Ok(block_hash) => block_hashes.push(block_hash),
259+
Err(BroadcastStreamRecvError::Lagged(blocks)) => {
260+
// Channel overflowed - some blocks were skipped
261+
warn!(target: "node::filter", "Logs filter lagged, skipped {} block notifications", blocks);
262+
// Continue draining what's left in the channel
263+
continue;
264+
}
265+
}
266+
}
267+
268+
// For each block that we were notified about check for logs
269+
for substrate_hash in block_hashes {
270+
// This can be optimized if we also submit the block number
271+
// from subscribe_and_cache_new_blocks
272+
if !self.is_block_in_range(&substrate_hash).await {
273+
continue;
274+
}
275+
let mut block_filter = self.filter.clone();
276+
block_filter.from_block = None;
277+
block_filter.to_block = None;
278+
block_filter.block_hash = self.eth_client.resolve_ethereum_hash(&substrate_hash).await;
279+
if let Ok(block_logs) = self.eth_client.logs(Some(block_filter)).await {
280+
logs.extend(block_logs);
281+
}
282+
}
283+
logs
284+
}
285+
286+
/// Validates both lower bound (from_block) and upper bound (to_block) constraints.
287+
/// Block tags (like "latest", "pending") are always considered in range.
288+
async fn is_block_in_range(&self, substrate_hash: &H256) -> bool {
289+
let Ok(Some(block)) = self.eth_client.block_by_hash(substrate_hash).await else {
290+
return false; // Can't get block, skip it
291+
};
292+
293+
let block_number = block.number();
294+
// Check lower limit (from_block)
295+
if let Some(from_block) = &self.filter.from_block {
296+
match from_block {
297+
BlockNumberOrTag::U256(limit) => {
298+
if block_number < limit.as_u32() {
299+
return false;
300+
}
301+
}
302+
BlockNumberOrTag::BlockTag(_) => {}
303+
}
304+
}
305+
// Check upper limit (to_block)
306+
if let Some(to_block) = &self.filter.to_block {
307+
match to_block {
308+
BlockNumberOrTag::U256(limit) => {
309+
if block_number > limit.as_u32() {
310+
return false;
311+
}
312+
}
313+
BlockNumberOrTag::BlockTag(_) => {}
314+
}
134315
}
316+
true
135317
}
136318
}

crates/anvil-polkadot/src/api_server/server.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{
22
api_server::{
33
ApiRequest,
44
error::{Error, Result, ToRpcResponseResult},
5-
filters::{BlockNotifications, EthFilter, Filters, eviction_task},
5+
filters::{BlockNotifications, EthFilter, Filters, LogsFilter, eviction_task},
66
revive_conversions::{
77
AlloyU256, ReviveAddress, ReviveBlockId, ReviveBlockNumberOrTag, ReviveBytes,
88
ReviveFilter, SubstrateU256, convert_to_generic_transaction,
@@ -57,8 +57,8 @@ use polkadot_sdk::{
5757
pallet_revive::{
5858
ReviveApi,
5959
evm::{
60-
Block, BlockNumberOrTagOrHash, BlockTag, Bytes, FeeHistoryResult, FilterResults,
61-
ReceiptInfo, TransactionInfo, TransactionSigned,
60+
self, Block, BlockNumberOrTagOrHash, BlockTag, Bytes, FeeHistoryResult, FilterResults,
61+
Log, ReceiptInfo, TransactionInfo, TransactionSigned,
6262
},
6363
},
6464
parachains_common::{AccountId, Hash, Nonce},
@@ -395,17 +395,15 @@ impl ApiServer {
395395
EthRequest::NodeInfo(_) => self.anvil_node_info().await.to_rpc_result(),
396396
EthRequest::AnvilMetadata(_) => self.anvil_metadata().await.to_rpc_result(),
397397
// --- Filters ---
398-
EthRequest::EthNewFilter(_filter) => {
399-
Err::<(), _>(Error::RpcUnimplemented).to_rpc_result()
398+
EthRequest::EthNewFilter(filter) => {
399+
self.new_filter(ReviveFilter::from(filter).into_inner()).await.to_rpc_result()
400400
}
401+
EthRequest::EthGetFilterLogs(id) => self.get_filter_logs(&id).await.to_rpc_result(),
401402
EthRequest::EthGetFilterChanges(id) => self.get_filter_changes(&id).await,
402403
EthRequest::EthNewBlockFilter(_) => self.new_block_filter().await.to_rpc_result(),
403404
EthRequest::EthNewPendingTransactionFilter(_) => {
404405
Err::<(), _>(Error::RpcUnimplemented).to_rpc_result()
405406
}
406-
EthRequest::EthGetFilterLogs(_id) => {
407-
Err::<(), _>(Error::RpcUnimplemented).to_rpc_result()
408-
}
409407
EthRequest::EthUninstallFilter(id) => self.uninstall_filter(&id).await.to_rpc_result(),
410408
_ => Err::<(), _>(Error::RpcUnimplemented).to_rpc_result(),
411409
};
@@ -1371,6 +1369,28 @@ impl ApiServer {
13711369
self.filters.get_filter_changes(id).await
13721370
}
13731371

1372+
async fn new_filter(&self, filter: evm::Filter) -> Result<String> {
1373+
node_info!("eth_newFilter");
1374+
let eth_filter = EthFilter::Logs(Box::new(
1375+
LogsFilter::new(
1376+
BlockNotifications::new(self.new_block_notifications()?),
1377+
self.eth_rpc_client.clone(),
1378+
filter,
1379+
)
1380+
.await?,
1381+
));
1382+
Ok(self.filters.add_filter(eth_filter).await)
1383+
}
1384+
1385+
async fn get_filter_logs(&self, id: &str) -> Result<Vec<Log>> {
1386+
node_info!("eth_getFilterLogs");
1387+
if let Some(filter) = self.filters.get_log_filter(id).await {
1388+
Ok(self.eth_rpc_client.logs(Some(filter)).await?)
1389+
} else {
1390+
Ok(Vec::new())
1391+
}
1392+
}
1393+
13741394
// ----- Helpers
13751395
async fn update_block_provider_on_revert(&self, info: &Info<OpaqueBlock>) -> Result<()> {
13761396
let best_block = self.block_provider.block_by_number(info.best_number).await?.ok_or(

0 commit comments

Comments
 (0)