Skip to content

Conversation

@dan-j
Copy link

@dan-j dan-j commented Apr 15, 2024

Keen to get your feeback on this, I've branched from your original feature/backport-426-to-main branch so there's conflicts, but essentially I've addressed all the issues I've created:

If you agree with the general idea, I will try and get it properly rebased on top of either your branch (or upstream/main), and write some extra tests, specifically around the changing of finalizers.

If the size of this is putting you off looking, I can try and split into smaller PRs but I've been pushed for time and wanted to get the ideas committed while I had it in my head. I'll get a few more hours this week but need to focus on other things.

There's a bit of unused code around the NumBatchGoRoutines constant, but I think the comment explains a different idea I had in order to limit the number of concurrent messages being handled so I've left it in for now. This idea was explained in knative-extensions#542 then I back-tracked to keep spawning go routines unrestricted.

for {
batch, err := c.natsConsumer.FetchNoWait(FetchBatchSize)
if err != nil {
return err
Copy link
Owner

@astelmashenko astelmashenko Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider situation when js client loses it's connection to JetStream, does this lead to err and return from Consumer Start method?
Should it just continue trying to fetch with hope jsclient restores JS connection?
At the end if connection is not restored dispatcher will panic and will be restarted by k8s.
Current implementation just tries to reconnect and continues working.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, need to think about reconnects when we don't actually want the consumer to stop. There is a TODO comment in the dispatcher about if Start() returns an error, should we re-reconcile the consumers.


if err := c.consumeMessages(ctx, batch, &wg); err != nil {
if errors.Is(err, io.EOF) {
return nil
Copy link
Owner

@astelmashenko astelmashenko Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just continue cycle and iterate over again. There is no wait, so it may produce a lot of errors if connection is lost. Just thoughts.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will think about this, it kind of fits in with my response above and the TODO comment in dispatcher around where we call Start(). Definitely need some solution to prevent infinite loops and errors


d.consumers[uid] = consumer

go func() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding it is now a seprarate goroutine and polling consumer. I like the idea, it is easier to control back pressure on dispatcher side!

// Returning as a result of Consumer.Close results in an io.EOF error.
func (c *Consumer) consumeMessages(ctx context.Context, batch jetstream.MessageBatch, wg *sync.WaitGroup) error {
for {
select {
Copy link
Owner

@astelmashenko astelmashenko Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this select? FetchNoWait docs says:

Messages channel is always closed, thus it is safe to range over it without additional checks.

Should we do just foreach?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows us to stop immediately when someone calls Consumer.Close(), but yeah you could continue to process the batch since ctx will be cancelled anyway, just if you've got a batch of 100 when closing, you might end up with 100 context deadline exceeded errors.

logging.FromContext(ctx).Errorw("failed to mark message as in progress", zap.Error(err))
}
// wait for handlers to finish
defer wg.Wait()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this waiting here? Should it be inside the foreach block below (where FetchNoWait is called) to wait for previous batch to be processed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Messages are processed by consumeMessages in new goroutines, then we immediately fetch a new batch. We don't wait for existing messages to be handled before fetching a new batch (this was my first approach which my comment on the NumBatchGoRoutines constant is referencing, I can add this back in if you wanted to see what I did there).

As such, wg is tracking how many messages are being handled by consumeMessages in total, and we wait for all these to exit when the consumer is closed.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it does not wait for previous batch to complete before taking the next batch, it will work the same way as push-based consumers, it will consume all cpu fast if there are a lot of messages in a stream.
In case there are other consumers it will affect those as well. We notice high cpu consumption of dispatcher when do load tests. To control back pressure with push based consumers we set lower limits of maxAckPending parameter and control maximum number of in-flight messages.
We need to think about something like this to control back pressure per consumer.


te := TypeExtractorTransformer("")

dispatchExecutionInfo, err := c.dispatcher.SendMessage(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main problem here is:

  • you do not know is it failed to send to Subscriber or to ReplyUrl, in case it failed to send to ReplyUrl it will nack msg, right?
  • if you pass noRetry then it is used to for all targets Subscriber, DLQ, ReplyURL; we only want noRetry for send to target

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhhh good point! I had zero thought on the ReplyURL, this does change things considerably.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants