-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
110 lines (90 loc) · 2.64 KB
/
main.go
File metadata and controls
110 lines (90 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package main
import (
"context"
"fmt"
"time"
"github.com/davidroman0O/retrypool"
)
// Define the input and output types
type RequestData struct {
ID int
}
type ResponseData struct {
Result string
}
// Implement the Worker interface
type ResponseWorker struct{}
func (w *ResponseWorker) Run(ctx context.Context, data *retrypool.RequestResponse[RequestData, ResponseData]) error {
// Simulate processing time
time.Sleep(time.Millisecond * 200)
var id int
data.ConsultRequest(func(rd RequestData) error {
id = rd.ID
return nil
})
// Simulate an error for a specific ID
if id == 3 {
err := fmt.Errorf("failed to process request ID: %d", id)
data.CompleteWithError(err)
return nil // Return nil to avoid retrying
}
// Create a response
response := ResponseData{
Result: fmt.Sprintf("Processed request ID: %d", id),
}
// Complete the request with the response
data.Complete(response)
return nil
}
func main() {
ctx := context.Background()
// Create a pool with the ResponseWorker
pool := retrypool.New[*retrypool.RequestResponse[RequestData, ResponseData]](
ctx,
[]retrypool.Worker[*retrypool.RequestResponse[RequestData, ResponseData]]{&ResponseWorker{}},
retrypool.WithAttempts[*retrypool.RequestResponse[RequestData, ResponseData]](3),
)
// Number of requests to submit
numRequests := 5
// Slice to hold references to the requests
requests := make([]*retrypool.RequestResponse[RequestData, ResponseData], numRequests)
for i := 1; i <= numRequests; i++ {
// Create a new RequestResponse instance
req := retrypool.NewRequestResponse[RequestData, ResponseData](RequestData{ID: i})
requests[i-1] = req
// Submit the request to the pool
err := pool.Submit(req)
if err != nil {
req.ConsultRequest(func(rd RequestData) error {
fmt.Printf("Error submitting request ID %d: %v\n", rd.ID, err)
return nil
})
continue
}
}
// Wait for all requests to complete and collect responses
for _, req := range requests {
go func(r *retrypool.RequestResponse[RequestData, ResponseData]) {
// Wait with a timeout
ctx, cancel := context.WithTimeout(ctx, time.Second*2)
defer cancel()
resp, err := r.Wait(ctx)
if err != nil {
r.ConsultRequest(func(rd RequestData) error {
fmt.Printf("Request ID %d failed: %v\n", rd.ID, err)
return nil
})
} else {
r.ConsultRequest(func(rd RequestData) error {
fmt.Printf("Request ID %d succeeded: %s\n", rd.ID, resp.Result)
return nil
})
}
}(req)
}
pool.WaitWithCallback(ctx, func(queueSize, processingCount, deadTaskCount int) bool {
return queueSize > 0 || processingCount > 0
}, time.Second)
// Close the pool
pool.Close()
}