Skip to content

Commit b32c54a

Browse files
authored
pass context to rpc and worker (#205)
### TL;DR Added context propagation throughout the codebase to improve cancellation handling and resource management. ### What changed? This PR adds proper context propagation throughout the codebase by: 1. Adding context parameters to all RPC client interface methods: - `GetFullBlocks` - `GetBlocks` - `GetTransactions` - `GetLatestBlockNumber` 2. Updating all orchestrator components to pass context to RPC calls: - ChainTracker - Committer - FailureRecoverer - Poller - ReorgHandler 3. Propagating context through the worker package to ensure proper cancellation handling 4. Updating all batch RPC operations to respect context cancellation ### How to test? 1. Run the application and verify that it functions normally 2. Test context cancellation by: - Starting the application - Triggering a graceful shutdown - Verifying that in-progress operations are properly canceled ### Why make this change? This change improves resource management and application behavior by: 1. Ensuring proper cancellation of in-progress operations when the application is shutting down 2. Preventing resource leaks by propagating context through all layers of the application 3. Following Go best practices for handling context propagation 4. Enabling better timeout and cancellation handling for RPC operations <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added support for cancellation and timeout controls to blockchain data fetching and processing operations, improving responsiveness and robustness of long-running tasks. - Introduced context propagation to search functionality for better request handling and cancellation support. - **Bug Fixes** - Enhanced cancellation and timeout handling across multiple components to prevent unresponsive or hanging processes. - **Tests** - Updated tests to validate context-aware operations and ensure proper handling of cancellations and timeouts. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents c8cf165 + dce6992 commit b32c54a

File tree

12 files changed

+189
-166
lines changed

12 files changed

+189
-166
lines changed

internal/handlers/search_handlers.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package handlers
22

33
import (
4+
"context"
45
"encoding/hex"
56
"fmt"
67
"math/big"
@@ -76,7 +77,7 @@ func Search(c *gin.Context) {
7677
return
7778
}
7879

79-
result, err := executeSearch(mainStorage, chainId, searchInput)
80+
result, err := executeSearch(c.Request.Context(), mainStorage, chainId, searchInput)
8081
if err != nil {
8182
log.Error().Err(err).Msg("Error executing search")
8283
api.InternalErrorHandler(c)
@@ -124,7 +125,7 @@ func isValidHashWithLength(input string, length int) bool {
124125
return false
125126
}
126127

127-
func executeSearch(storage storage.IMainStorage, chainId *big.Int, input SearchInput) (SearchResultModel, error) {
128+
func executeSearch(ctx context.Context, storage storage.IMainStorage, chainId *big.Int, input SearchInput) (SearchResultModel, error) {
128129
switch {
129130
case input.BlockNumber != nil:
130131
block, err := searchByBlockNumber(storage, chainId, input.BlockNumber)
@@ -134,7 +135,7 @@ func executeSearch(storage storage.IMainStorage, chainId *big.Int, input SearchI
134135
return searchByHash(storage, chainId, input.Hash)
135136

136137
case input.Address != "":
137-
return searchByAddress(storage, chainId, input.Address)
138+
return searchByAddress(ctx, storage, chainId, input.Address)
138139

139140
case input.FunctionSignature != "":
140141
transactions, err := searchByFunctionSelectorOptimistically(storage, chainId, input.FunctionSignature)
@@ -329,9 +330,9 @@ func searchByHash(mainStorage storage.IMainStorage, chainId *big.Int, hash strin
329330
}
330331
}
331332

332-
func searchByAddress(mainStorage storage.IMainStorage, chainId *big.Int, address string) (SearchResultModel, error) {
333+
func searchByAddress(ctx context.Context, mainStorage storage.IMainStorage, chainId *big.Int, address string) (SearchResultModel, error) {
333334
searchResult := SearchResultModel{Type: SearchResultTypeAddress}
334-
contractCode, err := checkIfContractHasCode(chainId, address)
335+
contractCode, err := checkIfContractHasCode(ctx, chainId, address)
335336
if err != nil {
336337
return searchResult, err
337338
}
@@ -437,14 +438,14 @@ const (
437438
ContractCodeDoesNotExist
438439
)
439440

440-
func checkIfContractHasCode(chainId *big.Int, address string) (ContractCodeState, error) {
441+
func checkIfContractHasCode(ctx context.Context, chainId *big.Int, address string) (ContractCodeState, error) {
441442
if config.Cfg.API.Thirdweb.ClientId != "" {
442443
rpcUrl := fmt.Sprintf("https://%s.rpc.thirdweb.com/%s", chainId.String(), config.Cfg.API.Thirdweb.ClientId)
443444
r, err := rpc.InitializeSimpleRPCWithUrl(rpcUrl)
444445
if err != nil {
445446
return ContractCodeUnknown, err
446447
}
447-
hasCode, err := r.HasCode(address)
448+
hasCode, err := r.HasCode(ctx, address)
448449
if err != nil {
449450
return ContractCodeUnknown, err
450451
}

internal/orchestrator/chain_tracker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func (ct *ChainTracker) Start(ctx context.Context) {
3535
log.Info().Msg("Chain tracker shutting down")
3636
return
3737
case <-ticker.C:
38-
latestBlockNumber, err := ct.rpc.GetLatestBlockNumber()
38+
latestBlockNumber, err := ct.rpc.GetLatestBlockNumber(ctx)
3939
if err != nil {
4040
log.Error().Err(err).Msg("Error getting latest block number")
4141
continue

internal/orchestrator/committer.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (c *Committer) Start(ctx context.Context) {
6363
return
6464
default:
6565
time.Sleep(interval)
66-
blockDataToCommit, err := c.getSequentialBlockDataToCommit()
66+
blockDataToCommit, err := c.getSequentialBlockDataToCommit(ctx)
6767
if err != nil {
6868
log.Error().Err(err).Msg("Error getting block data to commit")
6969
continue
@@ -72,7 +72,7 @@ func (c *Committer) Start(ctx context.Context) {
7272
log.Debug().Msg("No block data to commit")
7373
continue
7474
}
75-
if err := c.commit(blockDataToCommit); err != nil {
75+
if err := c.commit(ctx, blockDataToCommit); err != nil {
7676
log.Error().Err(err).Msg("Error committing blocks")
7777
}
7878
}
@@ -108,7 +108,7 @@ func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
108108
return blockNumbers, nil
109109
}
110110

111-
func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error) {
111+
func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]common.BlockData, error) {
112112
blocksToCommit, err := c.getBlockNumbersToCommit()
113113
if err != nil {
114114
return nil, fmt.Errorf("error determining blocks to commit: %v", err)
@@ -123,7 +123,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
123123
}
124124
if len(blocksData) == 0 {
125125
log.Warn().Msgf("Committer didn't find the following range in staging: %v - %v", blocksToCommit[0].Int64(), blocksToCommit[len(blocksToCommit)-1].Int64())
126-
c.handleMissingStagingData(blocksToCommit)
126+
c.handleMissingStagingData(ctx, blocksToCommit)
127127
return nil, nil
128128
}
129129

@@ -133,7 +133,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
133133
})
134134

135135
if blocksData[0].Block.Number.Cmp(blocksToCommit[0]) != 0 {
136-
return nil, c.handleGap(blocksToCommit[0], blocksData[0].Block)
136+
return nil, c.handleGap(ctx, blocksToCommit[0], blocksData[0].Block)
137137
}
138138

139139
var sequentialBlockData []common.BlockData
@@ -161,7 +161,7 @@ func (c *Committer) getSequentialBlockDataToCommit() ([]common.BlockData, error)
161161
return sequentialBlockData, nil
162162
}
163163

164-
func (c *Committer) commit(blockData []common.BlockData) error {
164+
func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
165165
blockNumbers := make([]*big.Int, len(blockData))
166166
for i, block := range blockData {
167167
blockNumbers[i] = block.Block.Number
@@ -199,7 +199,7 @@ func (c *Committer) commit(blockData []common.BlockData) error {
199199
return nil
200200
}
201201

202-
func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
202+
func (c *Committer) handleGap(ctx context.Context, expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
203203
// increment the gap counter in prometheus
204204
metrics.GapCounter.Inc()
205205
// record the first missed block number in prometheus
@@ -220,11 +220,11 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
220220
}
221221

222222
log.Debug().Msgf("Polling %d blocks while handling gap: %v", len(missingBlockNumbers), missingBlockNumbers)
223-
poller.Poll(missingBlockNumbers)
223+
poller.Poll(ctx, missingBlockNumbers)
224224
return fmt.Errorf("first block number (%s) in commit batch does not match expected (%s)", actualFirstBlock.Number.String(), expectedStartBlockNumber.String())
225225
}
226226

227-
func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
227+
func (c *Committer) handleMissingStagingData(ctx context.Context, blocksToCommit []*big.Int) {
228228
// Checks if there are any blocks in staging after the current range end
229229
lastStagedBlockNumber, err := c.storage.StagingStorage.GetLastStagedBlockNumber(c.rpc.GetChainID(), blocksToCommit[len(blocksToCommit)-1], big.NewInt(0))
230230
if err != nil {
@@ -242,6 +242,6 @@ func (c *Committer) handleMissingStagingData(blocksToCommit []*big.Int) {
242242
if len(blocksToCommit) > int(poller.blocksPerPoll) {
243243
blocksToPoll = blocksToCommit[:int(poller.blocksPerPoll)]
244244
}
245-
poller.Poll(blocksToPoll)
245+
poller.Poll(ctx, blocksToPoll)
246246
log.Debug().Msgf("Polled %d blocks due to committer detecting them as missing. Range: %s - %s", len(blocksToPoll), blocksToPoll[0].String(), blocksToPoll[len(blocksToPoll)-1].String())
247247
}

internal/orchestrator/committer_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ func TestGetSequentialBlockDataToCommit(t *testing.T) {
254254
BlockNumbers: []*big.Int{big.NewInt(101), big.NewInt(102), big.NewInt(103)},
255255
}).Return(blockData, nil)
256256

257-
result, err := committer.getSequentialBlockDataToCommit()
257+
result, err := committer.getSequentialBlockDataToCommit(context.Background())
258258

259259
assert.NoError(t, err)
260260
assert.NotNil(t, result)
@@ -290,7 +290,7 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) {
290290
BlockNumbers: []*big.Int{big.NewInt(101), big.NewInt(102), big.NewInt(103)},
291291
}).Return(blockData, nil)
292292

293-
result, err := committer.getSequentialBlockDataToCommit()
293+
result, err := committer.getSequentialBlockDataToCommit(context.Background())
294294

295295
assert.NoError(t, err)
296296
assert.NotNil(t, result)
@@ -320,7 +320,7 @@ func TestCommit(t *testing.T) {
320320
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)
321321
mockStagingStorage.EXPECT().DeleteStagingData(blockData).Return(nil)
322322

323-
err := committer.commit(blockData)
323+
err := committer.commit(context.Background(), blockData)
324324

325325
assert.NoError(t, err)
326326
}
@@ -343,7 +343,7 @@ func TestHandleGap(t *testing.T) {
343343
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
344344
Blocks: 5,
345345
})
346-
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
346+
mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{
347347
{BlockNumber: big.NewInt(100), Data: common.BlockData{Block: common.Block{Number: big.NewInt(100)}}},
348348
{BlockNumber: big.NewInt(101), Data: common.BlockData{Block: common.Block{Number: big.NewInt(101)}}},
349349
{BlockNumber: big.NewInt(102), Data: common.BlockData{Block: common.Block{Number: big.NewInt(102)}}},
@@ -352,7 +352,7 @@ func TestHandleGap(t *testing.T) {
352352
})
353353
mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil)
354354

355-
err := committer.handleGap(expectedStartBlockNumber, actualFirstBlock)
355+
err := committer.handleGap(context.Background(), expectedStartBlockNumber, actualFirstBlock)
356356

357357
assert.Error(t, err)
358358
assert.Contains(t, err.Error(), "first block number (105) in commit batch does not match expected (100)")
@@ -463,7 +463,7 @@ func TestHandleMissingStagingData(t *testing.T) {
463463
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
464464
Blocks: 100,
465465
})
466-
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{
466+
mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)}).Return([]rpc.GetFullBlockResult{
467467
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
468468
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
469469
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
@@ -482,7 +482,7 @@ func TestHandleMissingStagingData(t *testing.T) {
482482
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
483483
}).Return(blockData, nil)
484484

485-
result, err := committer.getSequentialBlockDataToCommit()
485+
result, err := committer.getSequentialBlockDataToCommit(context.Background())
486486

487487
assert.NoError(t, err)
488488
assert.Nil(t, result)
@@ -509,7 +509,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
509509
mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{
510510
Blocks: 3,
511511
})
512-
mockRPC.EXPECT().GetFullBlocks([]*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2)}).Return([]rpc.GetFullBlockResult{
512+
mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2)}).Return([]rpc.GetFullBlockResult{
513513
{BlockNumber: big.NewInt(0), Data: common.BlockData{Block: common.Block{Number: big.NewInt(0)}}},
514514
{BlockNumber: big.NewInt(1), Data: common.BlockData{Block: common.Block{Number: big.NewInt(1)}}},
515515
{BlockNumber: big.NewInt(2), Data: common.BlockData{Block: common.Block{Number: big.NewInt(2)}}},
@@ -526,7 +526,7 @@ func TestHandleMissingStagingDataIsPolledWithCorrectBatchSize(t *testing.T) {
526526
BlockNumbers: []*big.Int{big.NewInt(0), big.NewInt(1), big.NewInt(2), big.NewInt(3), big.NewInt(4)},
527527
}).Return(blockData, nil)
528528

529-
result, err := committer.getSequentialBlockDataToCommit()
529+
result, err := committer.getSequentialBlockDataToCommit(context.Background())
530530

531531
assert.NoError(t, err)
532532
assert.Nil(t, result)

internal/orchestrator/failure_recoverer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (fr *FailureRecoverer) Start(ctx context.Context) {
7575
// Trigger worker for recovery
7676
log.Debug().Msgf("Triggering Failure Recoverer for blocks: %v", blocksToTrigger)
7777
worker := worker.NewWorker(fr.rpc)
78-
results := worker.Run(blocksToTrigger)
78+
results := worker.Run(ctx, blocksToTrigger)
7979
fr.handleWorkerResults(blockFailures, results)
8080

8181
// Track recovery activity

internal/orchestrator/poller.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (p *Poller) Start(ctx context.Context) {
103103
return
104104
}
105105
blockRangeMutex.Lock()
106-
blockNumbers, err := p.getNextBlockRange()
106+
blockNumbers, err := p.getNextBlockRange(pollCtx)
107107
blockRangeMutex.Unlock()
108108

109109
if pollCtx.Err() != nil {
@@ -117,7 +117,7 @@ func (p *Poller) Start(ctx context.Context) {
117117
continue
118118
}
119119

120-
lastPolledBlock := p.Poll(blockNumbers)
120+
lastPolledBlock := p.Poll(pollCtx, blockNumbers)
121121
if p.reachedPollLimit(lastPolledBlock) {
122122
log.Debug().Msg("Reached poll limit, exiting poller")
123123
cancel()
@@ -146,7 +146,7 @@ func (p *Poller) Start(ctx context.Context) {
146146
}
147147
}
148148

149-
func (p *Poller) Poll(blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
149+
func (p *Poller) Poll(ctx context.Context, blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
150150
if len(blockNumbers) < 1 {
151151
log.Debug().Msg("No blocks to poll, skipping")
152152
return
@@ -161,7 +161,7 @@ func (p *Poller) Poll(blockNumbers []*big.Int) (lastPolledBlock *big.Int) {
161161
metrics.PollerLastTriggeredBlock.Set(endBlockNumberFloat)
162162

163163
worker := worker.NewWorker(p.rpc)
164-
results := worker.Run(blockNumbers)
164+
results := worker.Run(ctx, blockNumbers)
165165
p.handleWorkerResults(results)
166166
return endBlock
167167
}
@@ -170,8 +170,8 @@ func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
170170
return blockNumber == nil || (p.pollUntilBlock.Sign() > 0 && blockNumber.Cmp(p.pollUntilBlock) >= 0)
171171
}
172172

173-
func (p *Poller) getNextBlockRange() ([]*big.Int, error) {
174-
latestBlock, err := p.rpc.GetLatestBlockNumber()
173+
func (p *Poller) getNextBlockRange(ctx context.Context) ([]*big.Int, error) {
174+
latestBlock, err := p.rpc.GetLatestBlockNumber(ctx)
175175
if err != nil {
176176
return nil, err
177177
}

internal/orchestrator/reorg_handler.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (rh *ReorgHandler) Start(ctx context.Context) {
8383
rh.publisher.Close()
8484
return
8585
case <-ticker.C:
86-
mostRecentBlockChecked, err := rh.RunFromBlock(rh.lastCheckedBlock)
86+
mostRecentBlockChecked, err := rh.RunFromBlock(ctx, rh.lastCheckedBlock)
8787
if err != nil {
8888
log.Error().Err(err).Msgf("Error during reorg handling: %s", err.Error())
8989
continue
@@ -99,7 +99,7 @@ func (rh *ReorgHandler) Start(ctx context.Context) {
9999
}
100100
}
101101

102-
func (rh *ReorgHandler) RunFromBlock(latestCheckedBlock *big.Int) (lastCheckedBlock *big.Int, err error) {
102+
func (rh *ReorgHandler) RunFromBlock(ctx context.Context, latestCheckedBlock *big.Int) (lastCheckedBlock *big.Int, err error) {
103103
fromBlock, toBlock, err := rh.getReorgCheckRange(latestCheckedBlock)
104104
if err != nil {
105105
return nil, err
@@ -130,7 +130,7 @@ func (rh *ReorgHandler) RunFromBlock(latestCheckedBlock *big.Int) (lastCheckedBl
130130

131131
metrics.ReorgCounter.Inc()
132132
reorgedBlockNumbers := make([]*big.Int, 0)
133-
err = rh.findReorgedBlockNumbers(blockHeaders[firstMismatchIndex:], &reorgedBlockNumbers)
133+
err = rh.findReorgedBlockNumbers(ctx, blockHeaders[firstMismatchIndex:], &reorgedBlockNumbers)
134134
if err != nil {
135135
return nil, fmt.Errorf("error finding reorged block numbers: %w", err)
136136
}
@@ -140,7 +140,7 @@ func (rh *ReorgHandler) RunFromBlock(latestCheckedBlock *big.Int) (lastCheckedBl
140140
return mostRecentBlockHeader.Number, nil
141141
}
142142

143-
err = rh.handleReorg(reorgedBlockNumbers)
143+
err = rh.handleReorg(ctx, reorgedBlockNumbers)
144144
if err != nil {
145145
return nil, fmt.Errorf("error while handling reorg: %w", err)
146146
}
@@ -190,8 +190,8 @@ func findIndexOfFirstHashMismatch(blockHeadersDescending []common.BlockHeader) (
190190
return -1, nil
191191
}
192192

193-
func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.BlockHeader, reorgedBlockNumbers *[]*big.Int) error {
194-
newBlocksByNumber, err := rh.getNewBlocksByNumber(blockHeadersDescending)
193+
func (rh *ReorgHandler) findReorgedBlockNumbers(ctx context.Context, blockHeadersDescending []common.BlockHeader, reorgedBlockNumbers *[]*big.Int) error {
194+
newBlocksByNumber, err := rh.getNewBlocksByNumber(ctx, blockHeadersDescending)
195195
if err != nil {
196196
return err
197197
}
@@ -219,12 +219,12 @@ func (rh *ReorgHandler) findReorgedBlockNumbers(blockHeadersDescending []common.
219219
sort.Slice(nextHeadersBatch, func(i, j int) bool {
220220
return nextHeadersBatch[i].Number.Cmp(nextHeadersBatch[j].Number) > 0
221221
})
222-
return rh.findReorgedBlockNumbers(nextHeadersBatch, reorgedBlockNumbers)
222+
return rh.findReorgedBlockNumbers(ctx, nextHeadersBatch, reorgedBlockNumbers)
223223
}
224224
return nil
225225
}
226226

227-
func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader) (map[string]common.Block, error) {
227+
func (rh *ReorgHandler) getNewBlocksByNumber(ctx context.Context, blockHeaders []common.BlockHeader) (map[string]common.Block, error) {
228228
blockNumbers := make([]*big.Int, 0, len(blockHeaders))
229229
for _, header := range blockHeaders {
230230
blockNumbers = append(blockNumbers, header.Number)
@@ -241,7 +241,7 @@ func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader)
241241
wg.Add(1)
242242
go func(chunk []*big.Int) {
243243
defer wg.Done()
244-
resultsCh <- rh.rpc.GetBlocks(chunk)
244+
resultsCh <- rh.rpc.GetBlocks(ctx, chunk)
245245
if config.Cfg.RPC.Blocks.BatchDelay > 0 {
246246
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
247247
}
@@ -264,9 +264,9 @@ func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader)
264264
return fetchedBlocksByNumber, nil
265265
}
266266

267-
func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
267+
func (rh *ReorgHandler) handleReorg(ctx context.Context, reorgedBlockNumbers []*big.Int) error {
268268
log.Debug().Msgf("Handling reorg for blocks %v", reorgedBlockNumbers)
269-
results := rh.worker.Run(reorgedBlockNumbers)
269+
results := rh.worker.Run(ctx, reorgedBlockNumbers)
270270
data := make([]common.BlockData, 0, len(results))
271271
blocksToDelete := make([]*big.Int, 0, len(results))
272272
for _, result := range results {

0 commit comments

Comments
 (0)