-
Notifications
You must be signed in to change notification settings - Fork 326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FanOut/Until Discussion #86
Comments
Hi @thrawn01! Thanks for reaching out. Starting with func main() {
// Golang std pattern
{
done := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
select {
case <-time.After(time.Second):
case <-done:
wg.Done()
return
}
}
}()
close(done)
wg.Wait()
}
// conc version
{
ctx, cancel := context.WithCancel(context.Background())
p := pool.New().WithContext(ctx)
p.Go(func(ctx context.Context) error {
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return ctx.Err()
}
}
})
cancel()
p.Wait()
}
} |
Yes, this is a rather...annoying...problem when it comes to the ergonomics of the library and goroutines/for loops in general. Interestingly, there has been some discussion recently about changing these semantics. The classic way to solve this is by redefining the variable inside the loop: func main() {
var wg conc.WaitGroup
for i := 0; i < 100; i++ {
i := i // redefined the variable to avoid loop aliasing
wg.Go(func() {
println(i)
})
}
wg.Wait()
} Alternatively, if you have a pre-collected list of things you're iterating over, you can use the func main() {
tasks := make([]int, 100)
for i := 0; i < 100; i++ {
tasks[i] = i
}
iter.ForEach(tasks, func(i *int) {
println(*i)
})
} |
This isn't so much a question of how it can be achieved, as there is always the standard implementation. The question is "Does Your proposed I hope that makes sense. |
The func (p *ContextPool) Stop() error {
p.cancel()
return p.Wait()
} Here is how it would look func main() {
// Golang std pattern
{
done := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for {
select {
case <-time.After(time.Second):
case <-done:
wg.Done()
return
}
}
}()
close(done)
wg.Wait()
}
// conc version
{
p := pool.New().WithContext(context.Background())
p.Go(func(ctx context.Context) error {
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return ctx.Err()
}
}
})
p.Stop()
}
} |
I like the use of I expect the exception to this would be reading from a channel to feed the Thoughts? Thinking out loudI recall listening to rob pike lamenting the lack of generics in early versions of golang and because of this, there was no standard library version of |
Ah, I see. Thanks for clarifying here! I like the idea of In the provided example, we're calling So what calls I'm struggling to put words to why this pattern feels a little off, so bear with me 🙁 func doTheThing(ctx context.Context) {
p := pool.New().WithContext(ctx)
for i := 0; i < 10; i++ {
p.Go(func(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
})
}
// What happens here that makes us decide to call p.Stop()?
// <-ctx.Done()? Why not just pass that context directly into
// the pool?
p.Stop()
} |
Yes, it would definitely be possible. Not terribly difficult to implement either. And it does provide an easy way so concurrently process streams of unknown length with However...I'm pretty allergic to making channels part of a public API, so I'd like to make sure the design is thought through first. For the sake of discussion, let's take the following stripped-down design: func ForEachCh[T any](input chan T, f func(T)) {
task := func() {
for v := range input {
f(v)
}
}
var wg conc.WaitGroup
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
wg.Go(task)
}
wg.Wait()
} Some arguments I can think of against adding a channel-based API to the
|
Sure! Our services own the entire problem domain they are trying to solve. Which includes async work, cron, and data movement between packages & services. These often require long running go routines which span the life time of the service. This pattern is so common we use a naming scheme where if a struct/package creates a long running go routine we call it Kafka-Pixy is a public repo of ours (quite old) which has some examples of this pattern. While we no longer use this Factory interface, and no longer call every thing |
I also have an allergy to using channels in a Public API.
No amount of sugar or documentation can keep developers from not respecting or using channels correctly.
Again, as much as I would want to, we can't keep dev's from making silly mistakes. However, If we can, we should encourage good behavior, but this is nearly impossible todo with channels. (One of the reasons I avoid it in my API's) The best way to think about this is to ask "Is the alternative to this sugar a magnitude more painful than the possible pitfalls provided by my API?". Which, I'll admit is often a judgement call. Here is a complete example of what we currently have today, and hopefully explains some of the common use cases. package cmd
import (
"log"
"time"
"github.com/mailgun/holster/v4/syncutil"
)
type Event struct {
AccountID string
}
type EventProcessor struct {
eventCh chan Event
wg syncutil.WaitGroup
fan *syncutil.FanOut
}
func (p *EventProcessor) SpawnEventProcessor() {
// Since Cassandra performs better with lots of concurrency, we want
// to handle each event with as much concurrency as possible and avoid
// synchronous handling of events. Allow 30 concurrent handling of events.
p.fan = syncutil.NewFanOut(30)
// Run until we ask it to stop or until the channel is closed
p.wg.Until(func(done chan struct{}) bool {
select {
case e, ok := <-p.eventCh:
// This should be the common case
if !ok {
return false
}
p.fan.Run(func(obj interface{}) error {
event := obj.(Event)
if err := writeEvent(event); err != nil {
log.Printf("err: %s\n", err)
return nil
}
return nil
}, e)
case <-done:
return false
}
return true
})
}
// HandleEvent is called by our kafka consumer to pass the event to the processor
func (p *EventProcessor) HandleEvent(e Event) {
p.eventCh <- e
}
func (p *EventProcessor) Stop() {
close(p.eventCh)
wait := make(chan struct{})
go func() {
p.fan.Wait()
wait <- struct{}{}
}()
// It's possible that `writeEvent()` is stuck talking to an external system because something
// catastrophic happened. If that is the case we don't want to wait around forever as k8s will
// get mad at us and force kill us, thus possibly not providing time to flush the data other from
// processors in this service. So we wait for a reasonable amount of time, then force the work
// group to close.
select {
case <-time.After(time.Minute):
return
case <-wait:
p.wg.Stop()
}
}
func writeEvent(e Event) error {
// Talk to Cassandra and update a datapoint or something
return nil
} |
Here is a standard golang implementation. Which isn't horrible. Both implementations require the user to handle channels properly. IE, if we called package cmd
import (
"log"
"sync"
"time"
)
type Event struct {
AccountID string
}
type EventProcessor struct {
concurrentCh chan struct{}
eventCh chan Event
doneCh chan struct{}
wg sync.WaitGroup
fan sync.WaitGroup
}
func (p *EventProcessor) SpawnEventProcessor() {
// Since Cassandra performs better with lots of concurrency, we want
// to handle each event with as much concurrency as possible and avoid
// synchronous handling of events. Allow 30 concurrent handling of events.
p.concurrentCh = make(chan struct{}, 30)
// Run until we ask it to stop or until the channel is closed
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case e, ok := <-p.eventCh:
if !ok {
return
}
p.concurrentCh <- struct{}{}
p.fan.Add(1)
go func(event Event) {
defer func() {
<-p.concurrentCh
p.fan.Done()
}()
if err := writeEvent(event); err != nil {
log.Printf("err: %s\n", err)
}
}(e)
case <-p.doneCh:
return
}
}
}()
}
// HandleEvent is called by our kafka consumer to pass the event to the processor
func (p *EventProcessor) HandleEvent(e Event) {
p.eventCh <- e
}
func (p *EventProcessor) Stop() {
close(p.eventCh)
wait := make(chan struct{})
go func() {
p.fan.Wait()
wait <- struct{}{}
}()
// It's possible that `writeEvent()` is stuck talking to an external system because something
// catastrophic happened. If that is the case we don't want to wait around forever as k8s will
// get mad at us and force kill us, thus possibly not providing time to flush the data other from
// processors in this service. So we wait for a reasonable amount of time, then force the work
// group to close.
select {
case <-time.After(time.Minute):
return
case <-wait:
close(p.doneCh)
p.wg.Wait()
}
}
func writeEvent(e Event) error {
// Talk to Cassandra and update a datapoint or something
return nil
} RE: Is the alternative to this sugar a magnitude more painful than the possible pitfalls provided by my API? If a dev uses the standard golang implementation over As an exercise I'll implement this with |
I think the use cases around That aside, in the example above, you could manage type EventProcessor struct {
eventCh chan Event
until *pool.ContextPool
cancelUntil func()
fan *pool.ContextPool
cancelFan func()
}
func (p *EventProcessor) Stop() {
close(p.eventCh)
wait := make(chan struct{})
go func() {
p.cancelFan()
_ = p.fan.Wait()
wait <- struct{}{}
}()
select {
case <-time.After(time.Minute):
return
case <-wait:
p.cancelUntil()
_ = p.until.Wait()
}
} In this case |
Maybe I'm missing something from your requirements for for i := 0; i < 100; i++ {
i := i // each iteration gets its own var
pool.Go(func() { /* use i as normal */ })
} |
I like what you have built here, It's almost exactly what we need! We at mailgun have been using https://github.com/mailgun/holster/tree/master/syncutil for quite some time. However, there are two things missing from this library of which I would like to open for discussion.
Until
conc.WaitGroup.Until()
https://github.com/mailgun/holster/blob/master/syncutil/waitgroup.go#L56Until()
simplifies running and stopping many long running go rountines. You can chainUntil()
multiple times and callStop()
to signal all the go rountine's which are looping to end.In this example, we save state by not needing the obligatory
done
every time we start a new go routine. It also saves a single indent as we no longer need thefor
loop. Our typical use case is to have many go rountines running which all need to be shutdown when the service ends. Avoiding the need for adone
variable every time we use this pattern has been nice. Combined with the panic handling in theconc
,Until
would be even more useful.Fanout
It appears that
conc.pool
provides almost everything our current implementation ofFanout
does and more. However, we have a VERY common use case where we need to pass in a local variable to the closure to avoid race conditions.This isn't a request to add these features so much as a wish to start a conversation around them and other common use cases which might not be covered here.
The text was updated successfully, but these errors were encountered: