Skip to content

Commit d28b293

Browse files
authored
fix: prevent memory leak from unbounded JoinHandle accumulation during live indexing (#419)
1 parent 35c4461 commit d28b293

4 files changed

Lines changed: 401 additions & 19 deletions

File tree

core/src/indexer/process.rs

Lines changed: 86 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use alloy::primitives::{B256, U64};
22

33
use futures::future::join_all;
4-
use futures::StreamExt;
4+
use futures::stream::FuturesUnordered;
5+
use futures::{poll, StreamExt};
56
use std::{collections::HashMap, sync::Arc, time::Duration};
67
use tokio::sync::Semaphore;
78
use tokio::{
@@ -99,25 +100,26 @@ async fn process_event_logs(
99100

100101
let mut logs_stream =
101102
fetch_logs_stream(Arc::clone(&config), force_no_live_indexing, reorg_coordinator);
102-
let mut tasks = Vec::new();
103+
// Drain inline so handles don't accumulate during infinite live indexing.
104+
let mut in_flight: FuturesUnordered<JoinHandle<()>> = FuturesUnordered::new();
103105

104106
while let Some(result) = logs_stream.next().await {
105107
let task = handle_logs_result(Arc::clone(&config), callback_permits.clone(), result)
106108
.await
107109
.map_err(|e| Box::new(ProviderError::CustomError(e.to_string())))?;
108110

109111
if block_until_indexed {
110-
task.await.map_err(|e| Box::new(ProviderError::CustomError(e.to_string())))?;
112+
task.await.map_err(|e| Box::new(ProviderError::BatchRequestFailed(e)))?;
111113
} else {
112-
tasks.push(task);
114+
in_flight.push(task);
115+
while let std::task::Poll::Ready(Some(joined)) = poll!(in_flight.next()) {
116+
joined.map_err(|e| Box::new(ProviderError::BatchRequestFailed(e)))?;
117+
}
113118
}
114119
}
115120

116-
// Wait for all remaining tasks to complete
117-
if !tasks.is_empty() {
118-
futures::future::try_join_all(tasks)
119-
.await
120-
.map_err(|e| Box::new(ProviderError::CustomError(e.to_string())))?;
121+
while let Some(joined) = in_flight.next().await {
122+
joined.map_err(|e| Box::new(ProviderError::BatchRequestFailed(e)))?;
121123
}
122124

123125
Ok(())
@@ -775,3 +777,78 @@ async fn handle_logs_result(
775777
}
776778
}
777779
}
780+
781+
#[cfg(test)]
782+
mod tests {
783+
//! Regression tests for the in-flight `JoinHandle` handling in
784+
//! `process_event_logs`. Each test exercises the exact pattern used
785+
//! there (push + non-blocking drain + final await-drain) so a future
786+
//! refactor that silently reintroduces the leak will trip a test.
787+
788+
use super::*;
789+
use std::sync::{
790+
atomic::{AtomicUsize, Ordering},
791+
Arc,
792+
};
793+
use std::task::Poll;
794+
795+
#[tokio::test]
796+
async fn inline_drain_keeps_queue_bounded_under_live_indexing() {
797+
let mut in_flight: FuturesUnordered<JoinHandle<()>> = FuturesUnordered::new();
798+
let completed = Arc::new(AtomicUsize::new(0));
799+
let mut drained_inline = 0usize;
800+
801+
for _ in 0..500 {
802+
let completed = Arc::clone(&completed);
803+
in_flight.push(tokio::spawn(async move {
804+
completed.fetch_add(1, Ordering::SeqCst);
805+
}));
806+
tokio::task::yield_now().await;
807+
while let Poll::Ready(Some(joined)) = poll!(in_flight.next()) {
808+
joined.expect("task should not fail");
809+
drained_inline += 1;
810+
}
811+
}
812+
813+
while let Some(joined) = in_flight.next().await {
814+
joined.expect("task should not fail");
815+
}
816+
817+
assert!(
818+
drained_inline > 0,
819+
"inline drain never observed a completed task; test did not exercise the live drain path"
820+
);
821+
assert_eq!(completed.load(Ordering::SeqCst), 500, "all spawned tasks should complete");
822+
assert_eq!(in_flight.len(), 0, "final drain should empty the queue");
823+
}
824+
825+
#[tokio::test]
826+
async fn final_drain_awaits_pending_tasks_after_stream_ends() {
827+
let (tx, rx) = tokio::sync::oneshot::channel();
828+
let mut in_flight: FuturesUnordered<JoinHandle<()>> = FuturesUnordered::new();
829+
in_flight.push(tokio::spawn(async move {
830+
rx.await.expect("oneshot sender dropped");
831+
}));
832+
833+
while let Poll::Ready(Some(joined)) = poll!(in_flight.next()) {
834+
joined.expect("task should not fail");
835+
}
836+
assert_eq!(in_flight.len(), 1, "non-blocking drain must leave pending tasks in place");
837+
838+
tx.send(()).expect("receiver dropped");
839+
while let Some(joined) = in_flight.next().await {
840+
joined.expect("task should not fail");
841+
}
842+
assert_eq!(in_flight.len(), 0);
843+
}
844+
845+
#[tokio::test]
846+
async fn panic_in_spawned_task_surfaces_as_join_error() {
847+
let mut in_flight: FuturesUnordered<JoinHandle<()>> = FuturesUnordered::new();
848+
in_flight.push(tokio::spawn(async { panic!("boom") }));
849+
850+
let result = in_flight.next().await.expect("task should complete");
851+
let err = result.expect_err("panicking task should yield a JoinError");
852+
assert!(err.is_panic(), "expected panic cause, got: {err:?}");
853+
}
854+
}

e2e-tests/src/anvil_setup.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,47 @@ impl AnvilInstance {
310310
Ok(tx_hash)
311311
}
312312

313+
/// Send an ERC20 approve. Returns the tx hash.
314+
/// Does NOT auto-mine — call `mine_block()` afterwards if using manual mining.
315+
pub async fn send_approve(
316+
&self,
317+
contract_address: &str,
318+
spender: &ethers::types::Address,
319+
amount: ethers::types::U256,
320+
) -> Result<String> {
321+
use ethers::middleware::MiddlewareBuilder;
322+
use ethers::providers::{Http, Middleware, Provider};
323+
use ethers::signers::{LocalWallet, Signer};
324+
use ethers::types::TransactionRequest;
325+
326+
let base_provider = Provider::<Http>::try_from(&self.rpc_url)?;
327+
let chain_id = base_provider.get_chainid().await?.as_u64();
328+
329+
let wallet: LocalWallet = ANVIL_DEFAULT_PRIVATE_KEY.parse()?;
330+
let wallet = wallet.with_chain_id(chain_id);
331+
let signer_address = wallet.address();
332+
let provider = base_provider.with_signer(wallet);
333+
334+
let contract_addr: ethers::types::Address = contract_address.parse()?;
335+
let data = encode_approve_call(*spender, amount);
336+
let nonce = provider.get_transaction_count(signer_address, None).await?;
337+
338+
let tx = TransactionRequest {
339+
from: Some(signer_address),
340+
to: Some(contract_addr.into()),
341+
data: Some(data.into()),
342+
gas: Some(100000u64.into()),
343+
nonce: Some(nonce),
344+
gas_price: Some(20000000000u128.into()),
345+
value: None,
346+
chain_id: None,
347+
};
348+
349+
let pending = provider.send_transaction(tx, None).await?;
350+
let tx_hash = format!("{:?}", pending.tx_hash()).to_lowercase();
351+
Ok(tx_hash)
352+
}
353+
313354
/// Send multiple ERC20 transfers without mining between them.
314355
/// All transactions sit in the mempool until the next `mine_block()`.
315356
/// Returns tx hashes in submission order.
@@ -542,3 +583,16 @@ fn encode_transfer_call(to: ethers::types::Address, value: ethers::types::U256)
542583
data.extend_from_slice(&value_bytes);
543584
data
544585
}
586+
587+
/// Encode ERC20 approve(address,uint256) call data.
588+
fn encode_approve_call(spender: ethers::types::Address, value: ethers::types::U256) -> Vec<u8> {
589+
let mut data = vec![0x09, 0x5e, 0xa7, 0xb3]; // approve selector
590+
let mut spender_bytes = [0u8; 32];
591+
spender_bytes[12..].copy_from_slice(spender.as_bytes());
592+
data.extend_from_slice(&spender_bytes);
593+
let mut value_bytes = [0u8; 32];
594+
let value_be: [u8; 32] = value.into();
595+
value_bytes.copy_from_slice(&value_be);
596+
data.extend_from_slice(&value_bytes);
597+
data
598+
}

0 commit comments

Comments
 (0)