Skip to content

Commit d731ff3

Browse files
committed
Fix bug in parallel query
1 parent b71ee71 commit d731ff3

File tree

2 files changed

+115
-0
lines changed

2 files changed

+115
-0
lines changed

internal/gen/view.tgo

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,15 @@ func (v *View{{len $element}}[{{join $element ","}}]) MapIdParallel(lambda func(
163163
totalWork += len(lookup.id) // - len(lookup.holes)
164164
}
165165

166+
// Nothing to do if there is no work
167+
if totalWork == 0 { return }
168+
166169
// 2. Calculate number of threads to execute with
167170
numThreads := runtime.NumCPU()
171+
172+
// Ensure that the number of threads we plan to use is <= total amount of work
173+
numThreads = min(totalWork, numThreads)
174+
168175
var waitGroup sync.WaitGroup
169176

170177
type workItem struct{

view_gen.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,17 @@ func (v *View1[A]) MapIdParallel(lambda func(id Id, a *A)) {
158158
totalWork += len(lookup.id) // - len(lookup.holes)
159159
}
160160

161+
// Nothing to do if there is no work
162+
if totalWork == 0 {
163+
return
164+
}
165+
161166
// 2. Calculate number of threads to execute with
162167
numThreads := runtime.NumCPU()
168+
169+
// Ensure that the number of threads we plan to use is <= total amount of work
170+
numThreads = min(totalWork, numThreads)
171+
163172
var waitGroup sync.WaitGroup
164173

165174
type workItem struct {
@@ -459,8 +468,17 @@ func (v *View2[A, B]) MapIdParallel(lambda func(id Id, a *A, b *B)) {
459468
totalWork += len(lookup.id) // - len(lookup.holes)
460469
}
461470

471+
// Nothing to do if there is no work
472+
if totalWork == 0 {
473+
return
474+
}
475+
462476
// 2. Calculate number of threads to execute with
463477
numThreads := runtime.NumCPU()
478+
479+
// Ensure that the number of threads we plan to use is <= total amount of work
480+
numThreads = min(totalWork, numThreads)
481+
464482
var waitGroup sync.WaitGroup
465483

466484
type workItem struct {
@@ -806,8 +824,17 @@ func (v *View3[A, B, C]) MapIdParallel(lambda func(id Id, a *A, b *B, c *C)) {
806824
totalWork += len(lookup.id) // - len(lookup.holes)
807825
}
808826

827+
// Nothing to do if there is no work
828+
if totalWork == 0 {
829+
return
830+
}
831+
809832
// 2. Calculate number of threads to execute with
810833
numThreads := runtime.NumCPU()
834+
835+
// Ensure that the number of threads we plan to use is <= total amount of work
836+
numThreads = min(totalWork, numThreads)
837+
811838
var waitGroup sync.WaitGroup
812839

813840
type workItem struct {
@@ -1199,8 +1226,17 @@ func (v *View4[A, B, C, D]) MapIdParallel(lambda func(id Id, a *A, b *B, c *C, d
11991226
totalWork += len(lookup.id) // - len(lookup.holes)
12001227
}
12011228

1229+
// Nothing to do if there is no work
1230+
if totalWork == 0 {
1231+
return
1232+
}
1233+
12021234
// 2. Calculate number of threads to execute with
12031235
numThreads := runtime.NumCPU()
1236+
1237+
// Ensure that the number of threads we plan to use is <= total amount of work
1238+
numThreads = min(totalWork, numThreads)
1239+
12041240
var waitGroup sync.WaitGroup
12051241

12061242
type workItem struct {
@@ -1638,8 +1674,17 @@ func (v *View5[A, B, C, D, E]) MapIdParallel(lambda func(id Id, a *A, b *B, c *C
16381674
totalWork += len(lookup.id) // - len(lookup.holes)
16391675
}
16401676

1677+
// Nothing to do if there is no work
1678+
if totalWork == 0 {
1679+
return
1680+
}
1681+
16411682
// 2. Calculate number of threads to execute with
16421683
numThreads := runtime.NumCPU()
1684+
1685+
// Ensure that the number of threads we plan to use is <= total amount of work
1686+
numThreads = min(totalWork, numThreads)
1687+
16431688
var waitGroup sync.WaitGroup
16441689

16451690
type workItem struct {
@@ -2123,8 +2168,17 @@ func (v *View6[A, B, C, D, E, F]) MapIdParallel(lambda func(id Id, a *A, b *B, c
21232168
totalWork += len(lookup.id) // - len(lookup.holes)
21242169
}
21252170

2171+
// Nothing to do if there is no work
2172+
if totalWork == 0 {
2173+
return
2174+
}
2175+
21262176
// 2. Calculate number of threads to execute with
21272177
numThreads := runtime.NumCPU()
2178+
2179+
// Ensure that the number of threads we plan to use is <= total amount of work
2180+
numThreads = min(totalWork, numThreads)
2181+
21282182
var waitGroup sync.WaitGroup
21292183

21302184
type workItem struct {
@@ -2654,8 +2708,17 @@ func (v *View7[A, B, C, D, E, F, G]) MapIdParallel(lambda func(id Id, a *A, b *B
26542708
totalWork += len(lookup.id) // - len(lookup.holes)
26552709
}
26562710

2711+
// Nothing to do if there is no work
2712+
if totalWork == 0 {
2713+
return
2714+
}
2715+
26572716
// 2. Calculate number of threads to execute with
26582717
numThreads := runtime.NumCPU()
2718+
2719+
// Ensure that the number of threads we plan to use is <= total amount of work
2720+
numThreads = min(totalWork, numThreads)
2721+
26592722
var waitGroup sync.WaitGroup
26602723

26612724
type workItem struct {
@@ -3231,8 +3294,17 @@ func (v *View8[A, B, C, D, E, F, G, H]) MapIdParallel(lambda func(id Id, a *A, b
32313294
totalWork += len(lookup.id) // - len(lookup.holes)
32323295
}
32333296

3297+
// Nothing to do if there is no work
3298+
if totalWork == 0 {
3299+
return
3300+
}
3301+
32343302
// 2. Calculate number of threads to execute with
32353303
numThreads := runtime.NumCPU()
3304+
3305+
// Ensure that the number of threads we plan to use is <= total amount of work
3306+
numThreads = min(totalWork, numThreads)
3307+
32363308
var waitGroup sync.WaitGroup
32373309

32383310
type workItem struct {
@@ -3854,8 +3926,17 @@ func (v *View9[A, B, C, D, E, F, G, H, I]) MapIdParallel(lambda func(id Id, a *A
38543926
totalWork += len(lookup.id) // - len(lookup.holes)
38553927
}
38563928

3929+
// Nothing to do if there is no work
3930+
if totalWork == 0 {
3931+
return
3932+
}
3933+
38573934
// 2. Calculate number of threads to execute with
38583935
numThreads := runtime.NumCPU()
3936+
3937+
// Ensure that the number of threads we plan to use is <= total amount of work
3938+
numThreads = min(totalWork, numThreads)
3939+
38593940
var waitGroup sync.WaitGroup
38603941

38613942
type workItem struct {
@@ -4523,8 +4604,17 @@ func (v *View10[A, B, C, D, E, F, G, H, I, J]) MapIdParallel(lambda func(id Id,
45234604
totalWork += len(lookup.id) // - len(lookup.holes)
45244605
}
45254606

4607+
// Nothing to do if there is no work
4608+
if totalWork == 0 {
4609+
return
4610+
}
4611+
45264612
// 2. Calculate number of threads to execute with
45274613
numThreads := runtime.NumCPU()
4614+
4615+
// Ensure that the number of threads we plan to use is <= total amount of work
4616+
numThreads = min(totalWork, numThreads)
4617+
45284618
var waitGroup sync.WaitGroup
45294619

45304620
type workItem struct {
@@ -5238,8 +5328,17 @@ func (v *View11[A, B, C, D, E, F, G, H, I, J, K]) MapIdParallel(lambda func(id I
52385328
totalWork += len(lookup.id) // - len(lookup.holes)
52395329
}
52405330

5331+
// Nothing to do if there is no work
5332+
if totalWork == 0 {
5333+
return
5334+
}
5335+
52415336
// 2. Calculate number of threads to execute with
52425337
numThreads := runtime.NumCPU()
5338+
5339+
// Ensure that the number of threads we plan to use is <= total amount of work
5340+
numThreads = min(totalWork, numThreads)
5341+
52435342
var waitGroup sync.WaitGroup
52445343

52455344
type workItem struct {
@@ -5999,8 +6098,17 @@ func (v *View12[A, B, C, D, E, F, G, H, I, J, K, L]) MapIdParallel(lambda func(i
59996098
totalWork += len(lookup.id) // - len(lookup.holes)
60006099
}
60016100

6101+
// Nothing to do if there is no work
6102+
if totalWork == 0 {
6103+
return
6104+
}
6105+
60026106
// 2. Calculate number of threads to execute with
60036107
numThreads := runtime.NumCPU()
6108+
6109+
// Ensure that the number of threads we plan to use is <= total amount of work
6110+
numThreads = min(totalWork, numThreads)
6111+
60046112
var waitGroup sync.WaitGroup
60056113

60066114
type workItem struct {

0 commit comments

Comments
 (0)