Skip to content
This repository was archived by the owner on Sep 28, 2022. It is now read-only.

Commit d8d2108

Browse files
committed
add egpool to limit number of live goroutines in errgroup
use in importbatch
1 parent 4b65f78 commit d8d2108

File tree

3 files changed

+100
-4
lines changed

3 files changed

+100
-4
lines changed

egpool/egpool.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package egpool
2+
3+
import (
4+
"sync"
5+
6+
"golang.org/x/sync/errgroup"
7+
)
8+
9+
type Group struct {
10+
PoolSize int
11+
12+
jobs chan func() error
13+
14+
errMu sync.Mutex
15+
firstErr error
16+
poolEG errgroup.Group
17+
}
18+
19+
func (eg *Group) Go(f func() error) {
20+
if eg.PoolSize == 0 {
21+
eg.PoolSize = 1
22+
}
23+
24+
if eg.jobs == nil {
25+
eg.jobs = make(chan func() error, eg.PoolSize*2)
26+
eg.startProcessJobsPool()
27+
}
28+
29+
eg.jobs <- f
30+
}
31+
32+
func (eg *Group) startProcessJobsPool() {
33+
eg.poolEG = errgroup.Group{}
34+
for i := 0; i < eg.PoolSize; i++ {
35+
eg.poolEG.Go(eg.processJobs)
36+
}
37+
}
38+
39+
func (eg *Group) processJobs() error {
40+
for jobFn := range eg.jobs {
41+
err := jobFn()
42+
if err != nil {
43+
eg.errMu.Lock()
44+
if eg.firstErr == nil {
45+
eg.firstErr = err
46+
}
47+
eg.errMu.Unlock()
48+
}
49+
}
50+
return nil
51+
}
52+
53+
func (eg *Group) Wait() error {
54+
if eg.jobs == nil {
55+
return nil
56+
}
57+
close(eg.jobs)
58+
_ = eg.poolEG.Wait() // never returns err
59+
return eg.firstErr
60+
}

egpool/egpool_test.go

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package egpool_test
2+
3+
import (
4+
"errors"
5+
"testing"
6+
7+
"github.com/pilosa/go-pilosa/egpool"
8+
)
9+
10+
func TestEGPool(t *testing.T) {
11+
eg := egpool.Group{}
12+
13+
a := make([]int, 10)
14+
15+
for i := 0; i < 10; i++ {
16+
i := i
17+
eg.Go(func() error {
18+
a[i] = i
19+
if i == 7 {
20+
return errors.New("blah")
21+
}
22+
return nil
23+
})
24+
}
25+
26+
err := eg.Wait()
27+
if err == nil || err.Error() != "blah" {
28+
t.Errorf("expected err blah, got: %v", err)
29+
}
30+
31+
for i := 0; i < 10; i++ {
32+
if a[i] != i {
33+
t.Errorf("expected a[%d] to be %d, but is %d", i, i, a[i])
34+
}
35+
}
36+
}

gpexp/importbatch.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import (
44
"time"
55

66
"github.com/pilosa/go-pilosa"
7+
"github.com/pilosa/go-pilosa/egpool"
78
"github.com/pilosa/pilosa/roaring"
89
"github.com/pkg/errors"
9-
"golang.org/x/sync/errgroup"
1010
)
1111

1212
// TODO if using column translation, column ids might get way out of
@@ -633,7 +633,7 @@ func (b *Batch) doTranslation() error {
633633
}
634634

635635
func (b *Batch) doImport() error {
636-
eg := errgroup.Group{}
636+
eg := egpool.Group{PoolSize: 50}
637637

638638
frags, clearFrags, err := b.makeFragments()
639639
if err != nil {
@@ -792,7 +792,7 @@ func (b *Batch) importValueData() error {
792792
if shardWidth == 0 {
793793
shardWidth = pilosa.DefaultShardWidth
794794
}
795-
eg := errgroup.Group{}
795+
eg := egpool.Group{PoolSize: 50}
796796
ids := make([]uint64, len(b.ids))
797797
for field, values := range b.values {
798798
// grow our temp ids slice to full length
@@ -864,7 +864,7 @@ func (b *Batch) importMutexData() error {
864864
if shardWidth == 0 {
865865
shardWidth = pilosa.DefaultShardWidth
866866
}
867-
eg := errgroup.Group{}
867+
eg := egpool.Group{PoolSize: 50}
868868
ids := make([]uint64, 0, len(b.ids))
869869

870870
for findex, rowIDs := range b.rowIDs {

0 commit comments

Comments
 (0)