Skip to content

Commit 64e3b2b

Browse files
committed
fluent: check if context is cancelled before performing I/O on the connection
There are a number of places in the code where we do not check if the context is cancelled before reading from or writing to the connection. This commit adds those checks. Signed-off-by: Anirudh Aithal <[email protected]> cr: https://code.amazon.com/reviews/CR-194284782
1 parent f3ccfb8 commit 64e3b2b

File tree

1 file changed

+15
-4
lines changed

1 file changed

+15
-4
lines changed

fluent/fluent.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -594,10 +594,15 @@ func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error {
594594
return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry)
595595
}
596596

597-
func (f *Fluent) syncWriteMessage(msg *msgToSend) error {
597+
func (f *Fluent) syncWriteMessage(ctx context.Context, msg *msgToSend) error {
598598
f.muconn.RLock()
599599
defer f.muconn.RUnlock()
600600

601+
// Check if context is cancelled. If it is, we can return early here.
602+
if err := ctx.Err(); err != nil {
603+
return errIsClosing
604+
}
605+
601606
if f.conn == nil {
602607
return fmt.Errorf("connection has been closed before writing to it")
603608
}
@@ -613,12 +618,18 @@ func (f *Fluent) syncWriteMessage(msg *msgToSend) error {
613618
return err
614619
}
615620

616-
func (f *Fluent) syncReadAck() (*AckResp, error) {
621+
func (f *Fluent) syncReadAck(ctx context.Context) (*AckResp, error) {
617622
f.muconn.RLock()
618623
defer f.muconn.RUnlock()
619624

620625
resp := &AckResp{}
621626
var err error
627+
628+
// Check if context is cancelled. If it is, we can return early here.
629+
if err := ctx.Err(); err != nil {
630+
return resp, errIsClosing
631+
}
632+
622633
if f.Config.MarshalAsJSON {
623634
dec := json.NewDecoder(f.conn)
624635
err = dec.Decode(resp)
@@ -642,14 +653,14 @@ func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) {
642653
return false, fmt.Errorf("fluent#write: %v", err)
643654
}
644655

645-
if err := f.syncWriteMessage(msg); err != nil {
656+
if err := f.syncWriteMessage(ctx, msg); err != nil {
646657
f.syncClose(false)
647658
return true, fmt.Errorf("fluent#write: %v", err)
648659
}
649660

650661
// Acknowledgment check
651662
if msg.ack != "" {
652-
resp, err := f.syncReadAck()
663+
resp, err := f.syncReadAck(ctx)
653664
if err != nil {
654665
fmt.Fprintf(os.Stderr, "fluent#write: error reading message response ack %v", err)
655666
f.syncClose(false)

0 commit comments

Comments
 (0)