-
Notifications
You must be signed in to change notification settings - Fork 326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement task groups and returning results #84
Comments
Have you seen Alternatively, a task submitted to a stream returns a closure that will be executed in the order submitted, so something like the following should work: s := stream.New().WithMaxGoroutines(10)
for query := range searchQueries {
for pageNumber := 1; pageNumber < 5; pageNumber++ {
pageNumber := pageNumber
query := query
s.Go(func() stream.Callback {
res, err := getPageResults(query, pageNumber)
return func() { printGroupResults(query, page, res, err) }
})
}
}
s.Wait() |
I think the use case in the non-synchronous version is similar to what was available in categoriesGroup = group.NewWithStreaming[error]()
// checksLimiter is shared to limit all concurrent checks across categories.
checksLimiter = group.NewBasicLimiter(r.Concurrency)
// ...
for i, category := range r.categories {
// Run categories concurrently
categoriesGroup.Go(func() error {
// Run all checks for this category concurrently, with a shared
// limit on how many checks can run together
checksGroup := group.New().
WithErrors().
WithConcurrencyLimiter(checksLimiter)
for _, check := range category.Checks {
// run checks concurrently
checksGroup.Go(func() error { ... })
}
return checksGroup.Wait()
}, ...)
} This helps us show the user that all categories are running, while only |
We might be able to do something similar by allowing users to specify a type PoolLike interface {
unwrap() Pool
}
func (p *Pool) InPool(pool PoolLike) *Pool {
p.limiter = pool.unwrap().limiter
} In the current design, I think the above will "just work" for the most part, since nobody ever closes Usage might look like: s := stream.New().WithMaxGoroutines(10)
for query := range searchQueries {
s.Go(func() stream.Callback {
// all groups, combined, can use up to the 10 goroutines in s
g := pool.NewWithResults().InPool(s)
for pageNumber := 1; pageNumber < 5; pageNumber++ {
pageNumber := pageNumber
g.Go(func() (res interface{}, err error) {
res, err = getPageResults(query, pageNumber)
return
})
}
groupResults, err := g.Wait()
return func() {
printGroupResults(...)
}
})
}
s.Wait() The awkward bit in this example is that |
Sometimes we need to run a lot of tasks, grouped into small sets (less than goroutines count). The effective way is to run groups concurrently, not one-by-one (they are small). But the result should be grouped. Similar logic is implemented in alito/pong
One of usage case is parsing of search results:
Or how can I implement this logic an easy way with current functionality?
The text was updated successfully, but these errors were encountered: