Skip to content

Commit 92af676

Browse files
authored
Merge pull request #206 from thirdweb-dev/05-21-limit_concurrent_rpc_requests_in_worker
limit concurrent rpc requests in worker
2 parents b32c54a + aff8413 commit 92af676

File tree

1 file changed

+16
-11
lines changed

1 file changed

+16
-11
lines changed

internal/worker/worker.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,17 @@ func NewWorker(rpc rpc.IRPCClient) *Worker {
2424
}
2525
}
2626

27-
func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, resultsCh chan<- []rpc.GetFullBlockResult) {
27+
func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, resultsCh chan<- []rpc.GetFullBlockResult, sem chan struct{}) {
2828
select {
2929
case <-ctx.Done():
3030
return
3131
default:
3232
}
3333

34-
defer func() {
35-
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
36-
}()
37-
38-
// Try with current chunk size
34+
// Acquire semaphore only for the RPC request
35+
sem <- struct{}{}
3936
results := w.rpc.GetFullBlocks(ctx, chunk)
37+
<-sem // Release semaphore immediately after RPC request
4038

4139
if len(chunk) == 1 {
4240
// chunk size 1 is the minimum, so we return whatever we get
@@ -56,6 +54,7 @@ func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, re
5654
}
5755
}
5856

57+
log.Debug().Msgf("Out of %d blocks, %d successful, %d failed", len(results), len(successfulResults), len(failedBlocks))
5958
// If we have successful results, send them
6059
if len(successfulResults) > 0 {
6160
resultsCh <- successfulResults
@@ -68,7 +67,7 @@ func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, re
6867

6968
// can't split any further, so try one last time
7069
if len(failedBlocks) == 1 {
71-
w.processChunkWithRetry(ctx, failedBlocks, resultsCh)
70+
w.processChunkWithRetry(ctx, failedBlocks, resultsCh, sem)
7271
return
7372
}
7473

@@ -84,12 +83,12 @@ func (w *Worker) processChunkWithRetry(ctx context.Context, chunk []*big.Int, re
8483

8584
go func() {
8685
defer wg.Done()
87-
w.processChunkWithRetry(ctx, leftChunk, resultsCh)
86+
w.processChunkWithRetry(ctx, leftChunk, resultsCh, sem)
8887
}()
8988

9089
go func() {
9190
defer wg.Done()
92-
w.processChunkWithRetry(ctx, rightChunk, resultsCh)
91+
w.processChunkWithRetry(ctx, rightChunk, resultsCh, sem)
9392
}()
9493

9594
wg.Wait()
@@ -102,9 +101,15 @@ func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFull
102101
var wg sync.WaitGroup
103102
resultsCh := make(chan []rpc.GetFullBlockResult, blockCount)
104103

104+
// Create a semaphore channel to limit concurrent goroutines
105+
sem := make(chan struct{}, 20)
106+
105107
log.Debug().Msgf("Worker Processing %d blocks in %d chunks of max %d blocks", blockCount, len(chunks), w.rpc.GetBlocksPerRequest().Blocks)
106108

107-
for _, chunk := range chunks {
109+
for i, chunk := range chunks {
110+
if i > 0 {
111+
time.Sleep(time.Duration(config.Cfg.RPC.Blocks.BatchDelay) * time.Millisecond)
112+
}
108113
select {
109114
case <-ctx.Done():
110115
log.Debug().Msg("Context canceled, stopping Worker")
@@ -116,7 +121,7 @@ func (w *Worker) Run(ctx context.Context, blockNumbers []*big.Int) []rpc.GetFull
116121
wg.Add(1)
117122
go func(chunk []*big.Int) {
118123
defer wg.Done()
119-
w.processChunkWithRetry(ctx, chunk, resultsCh)
124+
w.processChunkWithRetry(ctx, chunk, resultsCh, sem)
120125
}(chunk)
121126
}
122127

0 commit comments

Comments
 (0)