Skip to content

Commit 8036c99

Browse files
authored
feat: implement ReleaseContext for all pool types (#387)
Fixes #384
1 parent 0b77168 commit 8036c99

5 files changed

Lines changed: 361 additions & 4 deletions

File tree

ants.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ func ReleaseTimeout(timeout time.Duration) error {
139139
return defaultAntsPool.ReleaseTimeout(timeout)
140140
}
141141

142+
// ReleaseContext is like Release but with a context, it waits all workers to exit before the context is done.
143+
//
144+
// Note that if the context is nil, it is the same as Release,
145+
// just return immediately without waiting for all workers to exit.
146+
func ReleaseContext(ctx context.Context) error {
147+
return defaultAntsPool.ReleaseContext(ctx)
148+
}
149+
142150
// Reboot reboots the default pool.
143151
func Reboot() {
144152
defaultAntsPool.Reboot()
@@ -398,12 +406,31 @@ func (p *poolCommon) Release() {
398406

399407
// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
400408
func (p *poolCommon) ReleaseTimeout(timeout time.Duration) error {
409+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
410+
defer cancel()
411+
err := p.ReleaseContext(ctx)
412+
if errors.Is(err, context.DeadlineExceeded) {
413+
return ErrTimeout
414+
}
415+
return err
416+
}
417+
418+
// ReleaseContext is like Release but with a context, it waits all workers to exit before the context is done.
419+
//
420+
// Note that if the context is nil, it is the same as Release,
421+
// just return immediately without waiting for all workers to exit.
422+
func (p *poolCommon) ReleaseContext(ctx context.Context) error {
401423
if p.IsClosed() || (!p.options.DisablePurge && p.stopPurge == nil) || p.stopTicktock == nil {
402424
return ErrPoolClosed
403425
}
404426

405427
p.Release()
406428

429+
// Don't wait for all workers to exit, just return immediately if the context is nil.
430+
if ctx == nil {
431+
return nil
432+
}
433+
407434
var purgeCh <-chan struct{}
408435
if !p.options.DisablePurge {
409436
purgeCh = p.purgeCtx.Done()
@@ -417,12 +444,10 @@ func (p *poolCommon) ReleaseTimeout(timeout time.Duration) error {
417444
})
418445
}
419446

420-
timer := time.NewTimer(timeout)
421-
defer timer.Stop()
422447
for {
423448
select {
424-
case <-timer.C:
425-
return ErrTimeout
449+
case <-ctx.Done():
450+
return ctx.Err()
426451
case <-p.allDone:
427452
<-purgeCh
428453
<-p.ticktockCtx.Done()

ants_test.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
package ants_test
2424

2525
import (
26+
"context"
2627
"log"
2728
"os"
2829
"runtime"
@@ -1351,6 +1352,34 @@ func TestDefaultPoolReleaseTimeout(t *testing.T) {
13511352
require.NoError(t, err)
13521353
}
13531354

1355+
func TestDefaultPoolReleaseContext(t *testing.T) {
1356+
ants.Reboot()
1357+
for i := 0; i < 5; i++ {
1358+
_ = ants.Submit(func() {
1359+
time.Sleep(time.Second)
1360+
})
1361+
}
1362+
require.NotZero(t, ants.Running())
1363+
err := ants.ReleaseContext(context.Background())
1364+
require.NoError(t, err)
1365+
}
1366+
1367+
func TestReleaseContextWithNil(t *testing.T) {
1368+
p, err := ants.NewPool(10)
1369+
require.NoError(t, err)
1370+
for i := 0; i < 5; i++ {
1371+
_ = p.Submit(func() {
1372+
time.Sleep(time.Second)
1373+
})
1374+
}
1375+
require.NotZero(t, p.Running())
1376+
1377+
// Passing nil context should release immediately without waiting for workers to exit.
1378+
err = p.ReleaseContext(nil) //nolint:staticcheck
1379+
require.NoError(t, err)
1380+
require.True(t, p.IsClosed())
1381+
}
1382+
13541383
func TestMultiPool(t *testing.T) {
13551384
_, err := ants.NewMultiPool(-1, 10, 8)
13561385
require.ErrorIs(t, err, ants.ErrInvalidMultiPoolSize)
@@ -1536,6 +1565,189 @@ func TestMultiPoolWithFuncGeneric(t *testing.T) {
15361565
mp.Tune(10)
15371566
}
15381567

1568+
func TestMultiPoolReleaseContext(t *testing.T) {
1569+
mp, err := ants.NewMultiPool(10, 5, ants.RoundRobin)
1570+
require.NoError(t, err)
1571+
1572+
for i := 0; i < 50; i++ {
1573+
err = mp.Submit(longRunningFunc)
1574+
require.NoError(t, err)
1575+
}
1576+
require.EqualValues(t, 50, mp.Running())
1577+
1578+
// Signal workers to stop, then release with a background context.
1579+
atomic.StoreInt32(&stopLongRunningFunc, 1)
1580+
err = mp.ReleaseContext(context.Background())
1581+
require.NoError(t, err)
1582+
require.Zero(t, mp.Running())
1583+
require.True(t, mp.IsClosed())
1584+
atomic.StoreInt32(&stopLongRunningFunc, 0)
1585+
1586+
// Calling ReleaseContext on a closed pool should return ErrPoolClosed.
1587+
require.ErrorIs(t, mp.ReleaseContext(context.Background()), ants.ErrPoolClosed)
1588+
1589+
// Test with LeastTasks strategy.
1590+
mp, err = ants.NewMultiPool(10, 5, ants.LeastTasks)
1591+
require.NoError(t, err)
1592+
for i := 0; i < 50; i++ {
1593+
err = mp.Submit(longRunningFunc)
1594+
require.NoError(t, err)
1595+
}
1596+
require.EqualValues(t, 50, mp.Running())
1597+
1598+
atomic.StoreInt32(&stopLongRunningFunc, 1)
1599+
err = mp.ReleaseContext(context.Background())
1600+
require.NoError(t, err)
1601+
require.Zero(t, mp.Running())
1602+
require.True(t, mp.IsClosed())
1603+
atomic.StoreInt32(&stopLongRunningFunc, 0)
1604+
1605+
// Test that a cancelled context returns an error.
1606+
mp, err = ants.NewMultiPool(10, 5, ants.RoundRobin)
1607+
require.NoError(t, err)
1608+
for i := 0; i < 50; i++ {
1609+
err = mp.Submit(longRunningFunc)
1610+
require.NoError(t, err)
1611+
}
1612+
ctx, cancel := context.WithCancel(context.Background())
1613+
cancel() // cancel immediately
1614+
err = mp.ReleaseContext(ctx)
1615+
require.Error(t, err)
1616+
atomic.StoreInt32(&stopLongRunningFunc, 1)
1617+
require.Eventually(t, func() bool {
1618+
return mp.Running() == 0
1619+
}, 3*time.Second, 100*time.Millisecond)
1620+
atomic.StoreInt32(&stopLongRunningFunc, 0)
1621+
1622+
// Test reboot after ReleaseContext.
1623+
mp, err = ants.NewMultiPool(10, 5, ants.RoundRobin)
1624+
require.NoError(t, err)
1625+
for i := 0; i < 50; i++ {
1626+
err = mp.Submit(longRunningFunc)
1627+
require.NoError(t, err)
1628+
}
1629+
atomic.StoreInt32(&stopLongRunningFunc, 1)
1630+
err = mp.ReleaseContext(context.Background())
1631+
require.NoError(t, err)
1632+
atomic.StoreInt32(&stopLongRunningFunc, 0)
1633+
1634+
mp.Reboot()
1635+
require.False(t, mp.IsClosed())
1636+
for i := 0; i < 50; i++ {
1637+
err = mp.Submit(longRunningFunc)
1638+
require.NoError(t, err)
1639+
}
1640+
require.EqualValues(t, 50, mp.Running())
1641+
atomic.StoreInt32(&stopLongRunningFunc, 1)
1642+
err = mp.ReleaseContext(context.Background())
1643+
require.NoError(t, err)
1644+
atomic.StoreInt32(&stopLongRunningFunc, 0)
1645+
}
1646+
1647+
func TestMultiPoolWithFuncReleaseContext(t *testing.T) {
1648+
ch := make(chan struct{})
1649+
mp, err := ants.NewMultiPoolWithFunc(10, 5, longRunningPoolFunc, ants.RoundRobin)
1650+
require.NoError(t, err)
1651+
1652+
for i := 0; i < 50; i++ {
1653+
err = mp.Invoke(ch)
1654+
require.NoError(t, err)
1655+
}
1656+
require.EqualValues(t, 50, mp.Running())
1657+
1658+
close(ch)
1659+
err = mp.ReleaseContext(context.Background())
1660+
require.NoError(t, err)
1661+
require.Zero(t, mp.Running())
1662+
require.True(t, mp.IsClosed())
1663+
1664+
// Calling ReleaseContext on a closed pool should return ErrPoolClosed.
1665+
require.ErrorIs(t, mp.ReleaseContext(context.Background()), ants.ErrPoolClosed)
1666+
1667+
// Test with LeastTasks strategy.
1668+
ch = make(chan struct{})
1669+
mp, err = ants.NewMultiPoolWithFunc(10, 5, longRunningPoolFunc, ants.LeastTasks)
1670+
require.NoError(t, err)
1671+
for i := 0; i < 50; i++ {
1672+
err = mp.Invoke(ch)
1673+
require.NoError(t, err)
1674+
}
1675+
close(ch)
1676+
err = mp.ReleaseContext(context.Background())
1677+
require.NoError(t, err)
1678+
require.Zero(t, mp.Running())
1679+
require.True(t, mp.IsClosed())
1680+
1681+
// Test that a cancelled context returns an error.
1682+
ch = make(chan struct{})
1683+
mp, err = ants.NewMultiPoolWithFunc(10, 5, longRunningPoolFunc, ants.RoundRobin)
1684+
require.NoError(t, err)
1685+
for i := 0; i < 50; i++ {
1686+
err = mp.Invoke(ch)
1687+
require.NoError(t, err)
1688+
}
1689+
ctx, cancel := context.WithCancel(context.Background())
1690+
cancel()
1691+
err = mp.ReleaseContext(ctx)
1692+
require.Error(t, err)
1693+
close(ch)
1694+
require.Eventually(t, func() bool {
1695+
return mp.Running() == 0
1696+
}, 3*time.Second, 100*time.Millisecond)
1697+
}
1698+
1699+
func TestMultiPoolWithFuncGenericReleaseContext(t *testing.T) {
1700+
ch := make(chan struct{})
1701+
mp, err := ants.NewMultiPoolWithFuncGeneric(10, 5, longRunningPoolFuncCh, ants.RoundRobin)
1702+
require.NoError(t, err)
1703+
1704+
for i := 0; i < 50; i++ {
1705+
err = mp.Invoke(ch)
1706+
require.NoError(t, err)
1707+
}
1708+
require.EqualValues(t, 50, mp.Running())
1709+
1710+
close(ch)
1711+
err = mp.ReleaseContext(context.Background())
1712+
require.NoError(t, err)
1713+
require.Zero(t, mp.Running())
1714+
require.True(t, mp.IsClosed())
1715+
1716+
// Calling ReleaseContext on a closed pool should return ErrPoolClosed.
1717+
require.ErrorIs(t, mp.ReleaseContext(context.Background()), ants.ErrPoolClosed)
1718+
1719+
// Test with LeastTasks strategy.
1720+
ch = make(chan struct{})
1721+
mp, err = ants.NewMultiPoolWithFuncGeneric(10, 5, longRunningPoolFuncCh, ants.LeastTasks)
1722+
require.NoError(t, err)
1723+
for i := 0; i < 50; i++ {
1724+
err = mp.Invoke(ch)
1725+
require.NoError(t, err)
1726+
}
1727+
close(ch)
1728+
err = mp.ReleaseContext(context.Background())
1729+
require.NoError(t, err)
1730+
require.Zero(t, mp.Running())
1731+
require.True(t, mp.IsClosed())
1732+
1733+
// Test that a cancelled context returns an error.
1734+
ch = make(chan struct{})
1735+
mp, err = ants.NewMultiPoolWithFuncGeneric(10, 5, longRunningPoolFuncCh, ants.RoundRobin)
1736+
require.NoError(t, err)
1737+
for i := 0; i < 50; i++ {
1738+
err = mp.Invoke(ch)
1739+
require.NoError(t, err)
1740+
}
1741+
ctx, cancel := context.WithCancel(context.Background())
1742+
cancel()
1743+
err = mp.ReleaseContext(ctx)
1744+
require.Error(t, err)
1745+
close(ch)
1746+
require.Eventually(t, func() bool {
1747+
return mp.Running() == 0
1748+
}, 3*time.Second, 100*time.Millisecond)
1749+
}
1750+
15391751
func TestRebootNewPoolCalc(t *testing.T) {
15401752
atomic.StoreInt32(&sum, 0)
15411753
runTimes := 1000

multipool.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
package ants
2424

2525
import (
26+
"context"
2627
"errors"
2728
"fmt"
2829
"math"
@@ -218,6 +219,45 @@ func (mp *MultiPool) ReleaseTimeout(timeout time.Duration) error {
218219
return errors.New(strings.TrimSuffix(errStr.String(), " | "))
219220
}
220221

222+
// ReleaseContext closes the multi-pool with a context,
223+
// it waits all pools to be closed before the context is done.
224+
func (mp *MultiPool) ReleaseContext(ctx context.Context) error {
225+
if !atomic.CompareAndSwapInt32(&mp.state, OPENED, CLOSED) {
226+
return ErrPoolClosed
227+
}
228+
229+
errCh := make(chan error, len(mp.pools))
230+
var wg errgroup.Group
231+
for i, pool := range mp.pools {
232+
func(p *Pool, idx int) {
233+
wg.Go(func() error {
234+
err := p.ReleaseContext(ctx)
235+
if err != nil {
236+
err = fmt.Errorf("pool %d: %v", idx, err)
237+
}
238+
errCh <- err
239+
return err
240+
})
241+
}(pool, i)
242+
}
243+
244+
_ = wg.Wait()
245+
246+
var errStr strings.Builder
247+
for i := 0; i < len(mp.pools); i++ {
248+
if err := <-errCh; err != nil {
249+
errStr.WriteString(err.Error())
250+
errStr.WriteString(" | ")
251+
}
252+
}
253+
254+
if errStr.Len() == 0 {
255+
return nil
256+
}
257+
258+
return errors.New(strings.TrimSuffix(errStr.String(), " | "))
259+
}
260+
221261
// Reboot reboots a released multi-pool.
222262
func (mp *MultiPool) Reboot() {
223263
if atomic.CompareAndSwapInt32(&mp.state, CLOSED, OPENED) {

0 commit comments

Comments
 (0)