Skip to content

Commit 54d80b8

Browse files
committed
✨ feat: ⚡ add worker pool sample
✨ add worker pool sample
0 parents  commit 54d80b8

File tree

10 files changed

+485
-0
lines changed

10 files changed

+485
-0
lines changed

.github/workflows/go.yml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
name: Go
2+
3+
on:
4+
push:
5+
branches:
6+
- master
7+
pull_request:
8+
branches:
9+
- master
10+
jobs:
11+
build_and_test:
12+
runs-on: ubuntu-latest
13+
steps:
14+
- name: Setup go-task
15+
uses: pnorton5432/setup-task@v1
16+
with:
17+
task-version: 3.29.1
18+
- name: Checkout
19+
uses: actions/checkout@v4
20+
- name: Setup Go
21+
uses: actions/setup-go@v5
22+
with:
23+
go-version: 'stable'
24+
check-latest: true
25+
- name: Task Build
26+
run: task build
27+
- name: Task Build for mage
28+
run: task build-gg
29+
- name: Test with gg build
30+
run: ./gg build

.gitignore

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# If you prefer the allow list template instead of the deny list, see community template:
2+
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
3+
#
4+
# Binaries for programs and plugins
5+
*.exe
6+
*.exe~
7+
*.dll
8+
*.so
9+
*.dylib
10+
11+
# Test binary, built with `go test -c`
12+
*.test
13+
14+
# Output of the go coverage tool, specifically when used with LiteIDE
15+
*.out
16+
17+
# Dependency directories (remove the comment below to include it)
18+
# vendor/
19+
20+
# Go workspace file
21+
go.work
22+
go.work.sum
23+
24+
# env file
25+
.env
26+
gg
27+
bin
28+
mage

README.md

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# golang-worker-pool-management-with-tunny
2+
3+
This repository is demo how to use tunny library to implementing worker-pool management
4+
5+
## implementation
6+
7+
1. simple worker pool with create function
8+
9+
```golang
10+
package main
11+
12+
import (
13+
"errors"
14+
"fmt"
15+
"runtime"
16+
"time"
17+
18+
"github.com/Jeffail/tunny"
19+
)
20+
21+
func SendEmail(email string, subject string, body string) {
22+
fmt.Printf("Sending email to %s\n", email)
23+
fmt.Printf("Subject %s\n Body: %s\n", subject, body)
24+
// Simulate sending email
25+
time.Sleep(2 * time.Second)
26+
}
27+
28+
func main() {
29+
numCPUs := runtime.NumCPU()
30+
fmt.Printf("Number of CPUs: %d\n\n", numCPUs)
31+
32+
pool := tunny.NewFunc(numCPUs, func(payload any) any {
33+
m, ok := payload.(map[string]any)
34+
if !ok {
35+
return errors.New("unable to extract map")
36+
}
37+
38+
// Extract the fields
39+
email, ok := m["email"].(string)
40+
if !ok {
41+
return errors.New("email field is missing or not a string")
42+
}
43+
44+
subject, ok := m["subject"].(string)
45+
if !ok {
46+
return errors.New("subject field is missing or not a string")
47+
}
48+
49+
body, ok := m["body"].(string)
50+
if !ok {
51+
return errors.New("body field is missing or not a string")
52+
}
53+
54+
SendEmail(email, subject, body)
55+
return nil
56+
})
57+
defer pool.Close()
58+
59+
for i := 0; i < 100; i++ {
60+
var data any = map[string]any{
61+
"email": fmt.Sprintf("email%d@sample.io", i+1),
62+
"subject": "Welcome",
63+
"body": "Thank you for signing up",
64+
}
65+
go func() {
66+
result := pool.Process(data)
67+
if result == nil {
68+
fmt.Println("Mail sent!")
69+
}
70+
}()
71+
}
72+
73+
for {
74+
qLen := pool.QueueLength()
75+
fmt.Printf("----------------- Queue Length: %d\n", qLen)
76+
if qLen == 0 {
77+
break
78+
}
79+
time.Sleep(1 * time.Second)
80+
}
81+
time.Sleep(3 * time.Second)
82+
}
83+
84+
```
85+
86+
2. manage state by create worker structure with each state hook
87+
88+
```golang
89+
package main
90+
91+
import (
92+
"fmt"
93+
"runtime"
94+
"time"
95+
96+
"github.com/Jeffail/tunny"
97+
)
98+
99+
type myWorker struct {
100+
jobID int
101+
state string
102+
}
103+
104+
func (w myWorker) Process(payload any) any {
105+
w.jobID, _ = payload.(int)
106+
w.state = "processing"
107+
fmt.Printf("Processing job %v, state: %s\n", payload, w.state)
108+
time.Sleep(2 * time.Second)
109+
return nil
110+
}
111+
func (w myWorker) BlockUntilReady() {
112+
w.state = "starting"
113+
fmt.Printf("State: %s\n", w.state)
114+
time.Sleep(10 * time.Millisecond)
115+
}
116+
117+
func (w myWorker) Interrupt() {
118+
w.state = "interrputed"
119+
fmt.Printf("State: %s\n", w.state)
120+
time.Sleep(10 * time.Millisecond)
121+
}
122+
123+
func (w myWorker) Terminate() {
124+
w.state = "terminated"
125+
fmt.Printf("State: %s\n", w.state)
126+
}
127+
128+
func main() {
129+
numCPUs := runtime.NumCPU()
130+
pool := tunny.New(numCPUs, func() tunny.Worker {
131+
return myWorker{}
132+
})
133+
defer pool.Close()
134+
135+
for i := 0; i < 10; i++ {
136+
go func() {
137+
var data any = i
138+
result := pool.Process(data)
139+
if result == nil {
140+
fmt.Println("success!")
141+
} else {
142+
fmt.Println("failure!")
143+
}
144+
}()
145+
}
146+
for {
147+
qLen := pool.QueueLength()
148+
fmt.Printf("------------------- Queue Length: %d\n", qLen)
149+
if qLen == 0 {
150+
break
151+
}
152+
time.Sleep(1 * time.Second)
153+
}
154+
155+
time.Sleep(5 * time.Second)
156+
fmt.Println("Done!")
157+
}
158+
159+
```

Taskfile.yml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
version: '3'
2+
3+
tasks:
4+
default:
5+
cmds:
6+
- echo "This is task cmd"
7+
silent: true
8+
9+
build:
10+
cmds:
11+
- CGO_ENABLED=0 GOOS=linux go build -o bin/simple cmd/simple/main.go
12+
silent: true
13+
run:
14+
cmds:
15+
- ./bin/simple
16+
deps:
17+
- build
18+
silent: true
19+
20+
build-mage:
21+
cmds:
22+
- CGO_ENABLED=0 GOOS=linux go build -o ./mage mage-tools/mage.go
23+
silent: true
24+
25+
build-gg:
26+
cmds:
27+
- ./mage -d mage-tools -compile ../gg
28+
deps:
29+
- build-mage
30+
silent: true
31+
32+
coverage:
33+
cmds:
34+
- go test -v -cover ./...
35+
silent: true
36+
test:
37+
cmds:
38+
- go test -v ./...
39+
silent: true
40+

cmd/simple/main.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package main
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"runtime"
7+
"time"
8+
9+
"github.com/Jeffail/tunny"
10+
)
11+
12+
func SendEmail(email string, subject string, body string) {
13+
fmt.Printf("Sending email to %s\n", email)
14+
fmt.Printf("Subject %s\n Body: %s\n", subject, body)
15+
// Simulate sending email
16+
time.Sleep(2 * time.Second)
17+
}
18+
19+
func main() {
20+
numCPUs := runtime.NumCPU()
21+
fmt.Printf("Number of CPUs: %d\n\n", numCPUs)
22+
23+
pool := tunny.NewFunc(numCPUs, func(payload any) any {
24+
m, ok := payload.(map[string]any)
25+
if !ok {
26+
return errors.New("unable to extract map")
27+
}
28+
29+
// Extract the fields
30+
email, ok := m["email"].(string)
31+
if !ok {
32+
return errors.New("email field is missing or not a string")
33+
}
34+
35+
subject, ok := m["subject"].(string)
36+
if !ok {
37+
return errors.New("subject field is missing or not a string")
38+
}
39+
40+
body, ok := m["body"].(string)
41+
if !ok {
42+
return errors.New("body field is missing or not a string")
43+
}
44+
45+
SendEmail(email, subject, body)
46+
return nil
47+
})
48+
defer pool.Close()
49+
50+
for i := 0; i < 100; i++ {
51+
var data any = map[string]any{
52+
"email": fmt.Sprintf("email%[email protected]", i+1),
53+
"subject": "Welcome",
54+
"body": "Thank you for signing up",
55+
}
56+
go func() {
57+
result := pool.Process(data)
58+
if result == nil {
59+
fmt.Println("Mail sent!")
60+
}
61+
}()
62+
}
63+
64+
for {
65+
qLen := pool.QueueLength()
66+
fmt.Printf("----------------- Queue Length: %d\n", qLen)
67+
if qLen == 0 {
68+
break
69+
}
70+
time.Sleep(1 * time.Second)
71+
}
72+
time.Sleep(3 * time.Second)
73+
}

0 commit comments

Comments
 (0)