Skip to content

Commit f3ccfb8

Browse files
committed
fluent: refactor code for muconn lock acquisition
Refactored the code to follow the pattern of releasing the mutex lock in a defer block as much as possible for the muconn lock. This should make the code more maintainable, avoiding issues with not accidentally releasing the lock. Signed-off-by: Anirudh Aithal <[email protected]>
1 parent 82ba6a1 commit f3ccfb8

File tree

1 file changed

+69
-55
lines changed

1 file changed

+69
-55
lines changed

fluent/fluent.go

Lines changed: 69 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -398,10 +398,7 @@ func (f *Fluent) Close() (err error) {
398398
}
399399
}
400400

401-
f.muconn.Lock()
402-
f.close()
403-
atomic.StoreInt32(&f.closed, 1)
404-
f.muconn.Unlock()
401+
f.syncClose(true)
405402

406403
// If ForceStopAsyncSend is true, we shall close the connection before waiting for
407404
// run() goroutine to exit to be sure we aren't waiting on ack message that might
@@ -436,6 +433,17 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error {
436433
return nil
437434
}
438435

436+
func (f *Fluent) syncClose(setClosed bool) {
437+
f.muconn.Lock()
438+
defer f.muconn.Unlock()
439+
440+
if setClosed {
441+
atomic.StoreInt32(&f.closed, 1)
442+
}
443+
444+
f.close()
445+
}
446+
439447
// close closes the connection. Callers should take care of locking muconn first.
440448
func (f *Fluent) close() {
441449
if f.conn != nil {
@@ -476,6 +484,17 @@ func (f *Fluent) connect(ctx context.Context) (err error) {
476484

477485
var errIsClosing = errors.New("fluent logger is closing")
478486

487+
func (f *Fluent) syncConnectWithRetry(ctx context.Context) error {
488+
f.muconn.Lock()
489+
defer f.muconn.Unlock()
490+
491+
if f.conn == nil {
492+
return f.connectWithRetry(ctx)
493+
}
494+
495+
return nil
496+
}
497+
479498
// Caller should take care of locking muconn first.
480499
func (f *Fluent) connectWithRetry(ctx context.Context) error {
481500
// A Time channel is used instead of time.Sleep() to avoid blocking this
@@ -575,75 +594,70 @@ func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error {
575594
return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry)
576595
}
577596

597+
func (f *Fluent) syncWriteMessage(msg *msgToSend) error {
598+
f.muconn.RLock()
599+
defer f.muconn.RUnlock()
600+
601+
if f.conn == nil {
602+
return fmt.Errorf("connection has been closed before writing to it")
603+
}
604+
605+
t := f.Config.WriteTimeout
606+
if time.Duration(0) < t {
607+
f.conn.SetWriteDeadline(time.Now().Add(t))
608+
} else {
609+
f.conn.SetWriteDeadline(time.Time{})
610+
}
611+
612+
_, err := f.conn.Write(msg.data)
613+
return err
614+
}
615+
616+
func (f *Fluent) syncReadAck() (*AckResp, error) {
617+
f.muconn.RLock()
618+
defer f.muconn.RUnlock()
619+
620+
resp := &AckResp{}
621+
var err error
622+
if f.Config.MarshalAsJSON {
623+
dec := json.NewDecoder(f.conn)
624+
err = dec.Decode(resp)
625+
} else {
626+
r := msgp.NewReader(f.conn)
627+
err = resp.DecodeMsg(r)
628+
}
629+
630+
return resp, err
631+
}
632+
578633
// write writes the provided msg to fluentd server. Its first return values is
579634
// a bool indicating whether the write should be retried.
580635
// This method relies on function literals to execute muconn.Unlock or
581636
// muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in
582637
// the case of panic recovering.
583638
func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) {
584-
closer := func() {
585-
f.muconn.Lock()
586-
defer f.muconn.Unlock()
587-
588-
f.close()
589-
}
590-
591-
if err := func() (err error) {
592-
f.muconn.Lock()
593-
defer f.muconn.Unlock()
594-
595-
if f.conn == nil {
596-
err = f.connectWithRetry(ctx)
597-
}
598-
599-
return err
600-
}(); err != nil {
639+
if err := f.syncConnectWithRetry(ctx); err != nil {
601640
// Here, we don't want to retry the write since connectWithRetry already
602641
// retries Config.MaxRetry times to connect.
603642
return false, fmt.Errorf("fluent#write: %v", err)
604643
}
605644

606-
if err := func() (err error) {
607-
f.muconn.RLock()
608-
defer f.muconn.RUnlock()
609-
610-
if f.conn == nil {
611-
return fmt.Errorf("connection has been closed before writing to it")
612-
}
613-
614-
t := f.Config.WriteTimeout
615-
if time.Duration(0) < t {
616-
f.conn.SetWriteDeadline(time.Now().Add(t))
617-
} else {
618-
f.conn.SetWriteDeadline(time.Time{})
619-
}
620-
621-
_, err = f.conn.Write(msg.data)
622-
return err
623-
}(); err != nil {
624-
closer()
645+
if err := f.syncWriteMessage(msg); err != nil {
646+
f.syncClose(false)
625647
return true, fmt.Errorf("fluent#write: %v", err)
626648
}
627649

628650
// Acknowledgment check
629651
if msg.ack != "" {
630-
f.muconn.Lock()
631-
632-
resp := &AckResp{}
633-
var err error
634-
if f.Config.MarshalAsJSON {
635-
dec := json.NewDecoder(f.conn)
636-
err = dec.Decode(resp)
637-
} else {
638-
r := msgp.NewReader(f.conn)
639-
err = resp.DecodeMsg(r)
652+
resp, err := f.syncReadAck()
653+
if err != nil {
654+
fmt.Fprintf(os.Stderr, "fluent#write: error reading message response ack %v", err)
655+
f.syncClose(false)
656+
return true, err
640657
}
641-
f.muconn.Unlock()
642-
643-
if err != nil || resp.Ack != msg.ack {
658+
if resp.Ack != msg.ack {
644659
fmt.Fprintf(os.Stderr, "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection...", resp.Ack, msg.ack)
645-
646-
closer()
660+
f.syncClose(false)
647661
return true, err
648662
}
649663
}

0 commit comments

Comments
 (0)