diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index dfdd6f5..d944099 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -24,5 +24,5 @@ jobs: with: go-version: ${{ matrix.golang }} - name: Test - run: go test -v ./fluent + run: go test -v -race -cover -covermode=atomic ./fluent shell: bash diff --git a/fluent/fluent.go b/fluent/fluent.go index 7216991..d33012f 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -13,6 +13,7 @@ import ( "reflect" "strconv" "sync" + "sync/atomic" "time" "bytes" @@ -34,12 +35,10 @@ const ( defaultMaxRetryWait = 60000 defaultMaxRetry = 13 defaultReconnectWaitIncreRate = 1.5 - // Default sub-second precision value to false since it is only compatible - // with fluentd versions v0.14 and above. - defaultSubSecondPrecision = false // Default value whether to skip checking insecure certs on TLS connections. defaultTlsInsecureSkipVerify = false + defaultReadTimeout = time.Duration(0) // Read() will not time out ) // randomGenerator is used by getUniqueId to generate ack hashes. Its value is replaced @@ -80,7 +79,10 @@ type Config struct { RequestAck bool `json:"request_ack"` // Flag to skip verifying insecure certs on TLS connections - TlsInsecureSkipVerify bool `json: "tls_insecure_skip_verify"` + TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify"` + + // ReadTimeout specifies the timeout on reads. Currently only acks are read. + ReadTimeout time.Duration `json:"read_timeout"` } type ErrUnknownNetwork struct { @@ -104,20 +106,22 @@ type Fluent struct { Config dialer dialer - // stopRunning is used in async mode to signal to run() it should abort. - stopRunning chan struct{} // cancelDialings is used by Close() to stop any in-progress dialing. cancelDialings context.CancelFunc pending chan *msgToSend - pendingMutex sync.RWMutex - closed bool - wg sync.WaitGroup + // closed indicates if the connection is open or closed. + // 0 = open (false), 1 = closed (true). Since the code is built in CI with + // golang < 1.19, we're using atomic int32 here. Otherwise, atomic.Bool + // could have been used. + closed int32 + wg sync.WaitGroup // time at which the most recent connection to fluentd-address was established. latestReconnectTime time.Time - muconn sync.RWMutex - conn net.Conn + muconn sync.RWMutex + pendingMutex sync.RWMutex + conn net.Conn } type dialer interface { @@ -150,6 +154,9 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { if config.WriteTimeout == 0 { config.WriteTimeout = defaultWriteTimeout } + if config.ReadTimeout == 0 { + config.ReadTimeout = defaultReadTimeout + } if config.BufferLimit == 0 { config.BufferLimit = defaultBufferLimit } @@ -176,20 +183,20 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { f = &Fluent{ Config: config, dialer: d, - stopRunning: make(chan struct{}), cancelDialings: cancel, pending: make(chan *msgToSend, config.BufferLimit), - pendingMutex: sync.RWMutex{}, muconn: sync.RWMutex{}, + pendingMutex: sync.RWMutex{}, } f.wg.Add(1) go f.run(ctx) } else { f = &Fluent{ - Config: config, - dialer: d, - muconn: sync.RWMutex{}, + Config: config, + dialer: d, + muconn: sync.RWMutex{}, + pendingMutex: sync.RWMutex{}, } err = f.connect(context.Background()) } @@ -200,27 +207,26 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { // // Examples: // -// // send map[string] -// mapStringData := map[string]string{ -// "foo": "bar", -// } -// f.Post("tag_name", mapStringData) -// -// // send message with specified time -// mapStringData := map[string]string{ -// "foo": "bar", -// } -// tm := time.Now() -// f.PostWithTime("tag_name", tm, mapStringData) +// // send map[string] +// mapStringData := map[string]string{ +// "foo": "bar", +// } +// f.Post("tag_name", mapStringData) // -// // send struct -// structData := struct { -// Name string `msg:"name"` -// } { -// "john smith", -// } -// f.Post("tag_name", structData) +// // send message with specified time +// mapStringData := map[string]string{ +// "foo": "bar", +// } +// tm := time.Now() +// f.PostWithTime("tag_name", tm, mapStringData) // +// // send struct +// structData := struct { +// Name string `msg:"name"` +// } { +// "john smith", +// } +// f.Post("tag_name", structData) func (f *Fluent) Post(tag string, message interface{}) error { timeNow := time.Now() return f.PostWithTime(tag, timeNow, message) @@ -278,7 +284,7 @@ func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{} var msg *msgToSend var err error if msg, err = f.EncodeData(tag, tm, message); err != nil { - return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err) + return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%w", message, err) } return f.postRawData(msg) } @@ -294,7 +300,7 @@ func (f *Fluent) postRawData(msg *msgToSend) error { } // Synchronous write - if f.closed { + if atomic.LoadInt32(&f.closed) == 1 { return fmt.Errorf("fluent#postRawData: Logger already closed") } return f.writeWithRetry(context.Background(), msg) @@ -371,20 +377,22 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg // running in async mode, the run() goroutine exits before Close() returns. func (f *Fluent) Close() (err error) { if f.Config.Async { + // Use a mutex to ensure thread safety when closing the channel f.pendingMutex.Lock() - if f.closed { + + if atomic.LoadInt32(&f.closed) == 1 { f.pendingMutex.Unlock() return nil } - f.closed = true - f.pendingMutex.Unlock() + atomic.StoreInt32(&f.closed, 1) if f.Config.ForceStopAsyncSend { - close(f.stopRunning) f.cancelDialings() } close(f.pending) + f.pendingMutex.Unlock() + // If ForceStopAsyncSend is false, all logs in the channel have to be sent // before closing the connection. At this point closed is true so no more // logs are written to the channel and f.pending has been closed, so run() @@ -394,10 +402,7 @@ func (f *Fluent) Close() (err error) { } } - f.muconn.Lock() - f.close() - f.closed = true - f.muconn.Unlock() + f.syncClose(true) // If ForceStopAsyncSend is true, we shall close the connection before waiting for // run() goroutine to exit to be sure we aren't waiting on ack message that might @@ -411,11 +416,19 @@ func (f *Fluent) Close() (err error) { // appendBuffer appends data to buffer with lock. func (f *Fluent) appendBuffer(msg *msgToSend) error { - f.pendingMutex.RLock() - defer f.pendingMutex.RUnlock() - if f.closed { + if atomic.LoadInt32(&f.closed) == 1 { return fmt.Errorf("fluent#appendBuffer: Logger already closed") } + + // Use a mutex to ensure thread safety when writing to the channel + f.pendingMutex.Lock() + defer f.pendingMutex.Unlock() + + // Check again after acquiring the lock + if atomic.LoadInt32(&f.closed) == 1 { + return fmt.Errorf("fluent#appendBuffer: Logger already closed") + } + select { case f.pending <- msg: default: @@ -424,6 +437,17 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error { return nil } +func (f *Fluent) syncClose(setClosed bool) { + f.muconn.Lock() + defer f.muconn.Unlock() + + if setClosed { + atomic.StoreInt32(&f.closed, 1) + } + + f.close() +} + // close closes the connection. Callers should take care of locking muconn first. func (f *Fluent) close() { if f.conn != nil { @@ -464,6 +488,17 @@ func (f *Fluent) connect(ctx context.Context) (err error) { var errIsClosing = errors.New("fluent logger is closing") +func (f *Fluent) syncConnectWithRetry(ctx context.Context) error { + f.muconn.Lock() + defer f.muconn.Unlock() + + if f.conn == nil { + return f.connectWithRetry(ctx) + } + + return nil +} + // Caller should take care of locking muconn first. func (f *Fluent) connectWithRetry(ctx context.Context) error { // A Time channel is used instead of time.Sleep() to avoid blocking this @@ -513,7 +548,7 @@ func (f *Fluent) run(ctx context.Context) { for { select { case entry, ok := <-f.pending: - // f.stopRunning is closed before f.pending only when ForceStopAsyncSend + // The context is cancelled before f.pending only when ForceStopAsyncSend // is enabled. Otherwise, f.pending is closed when Close() is called. if !ok { f.wg.Done() @@ -540,9 +575,9 @@ func (f *Fluent) run(ctx context.Context) { } f.AsyncResultCallback(data, err) } - case <-f.stopRunning: + case <-ctx.Done(): + // Context was canceled, which means ForceStopAsyncSend was enabled fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) - f.wg.Done() return } @@ -563,72 +598,99 @@ func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error { return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry) } -// write writes the provided msg to fluentd server. Its first return values is -// a bool indicating whether the write should be retried. -// This method relies on function literals to execute muconn.Unlock or -// muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in -// the case of panic recovering. -func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { - closer := func() { - f.muconn.Lock() - defer f.muconn.Unlock() +func (f *Fluent) syncWriteMessage(ctx context.Context, msg *msgToSend) error { + f.muconn.Lock() + defer f.muconn.Unlock() - f.close() + // Check if context is cancelled. If it is, we can return early here. + if err := ctx.Err(); err != nil { + return errIsClosing } - if err := func() (err error) { - f.muconn.Lock() - defer f.muconn.Unlock() + if f.conn == nil { + return fmt.Errorf("fluent#write: connection has been closed before writing to it") + } - if f.conn == nil { - err = f.connectWithRetry(ctx) - } + t := f.Config.WriteTimeout + var err error + if time.Duration(0) < t { + err = f.conn.SetWriteDeadline(time.Now().Add(t)) + } else { + err = f.conn.SetWriteDeadline(time.Time{}) + } - return err - }(); err != nil { - // Here, we don't want to retry the write since connectWithRetry already - // retries Config.MaxRetry times to connect. - return false, fmt.Errorf("fluent#write: %v", err) + if err != nil { + return fmt.Errorf("fluent#write: failed to set write deadline: %w", err) } + _, err = f.conn.Write(msg.data) + return err +} - if err := func() (err error) { - f.muconn.RLock() - defer f.muconn.RUnlock() +func (f *Fluent) syncReadAck(ctx context.Context) (*AckResp, error) { + f.muconn.Lock() + defer f.muconn.Unlock() - if f.conn == nil { - return fmt.Errorf("connection has been closed before writing to it") - } + resp := &AckResp{} + var err error - t := f.Config.WriteTimeout - if time.Duration(0) < t { - f.conn.SetWriteDeadline(time.Now().Add(t)) - } else { - f.conn.SetWriteDeadline(time.Time{}) - } + if f.conn == nil { + return resp, fmt.Errorf("fluent#read: connection has been closed before reading from it") + } - _, err = f.conn.Write(msg.data) - return err - }(); err != nil { - closer() - return true, fmt.Errorf("fluent#write: %v", err) + // Check if context is cancelled. If it is, we can return early here. + if err := ctx.Err(); err != nil { + return resp, errIsClosing + } + + t := f.Config.ReadTimeout + if time.Duration(0) < t { + err = f.conn.SetReadDeadline(time.Now().Add(t)) + } else { + err = f.conn.SetReadDeadline(time.Time{}) + } + if err != nil { + return resp, fmt.Errorf("fluent#read: failed to set read deadline: %w", err) + } + + if f.Config.MarshalAsJSON { + dec := json.NewDecoder(f.conn) + err = dec.Decode(resp) + } else { + r := msgp.NewReader(f.conn) + err = resp.DecodeMsg(r) + } + + return resp, err +} + +// write writes the provided msg to fluentd server. Its first return values is +// a bool indicating whether the write should be retried. +// This method relies on function literals to execute muconn.Unlock or +// muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in +// the case of panic recovering. +func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { + if err := f.syncConnectWithRetry(ctx); err != nil { + // Here, we don't want to retry the write since connectWithRetry already + // retries Config.MaxRetry times to connect. + return false, fmt.Errorf("fluent#write: %w", err) + } + + if err := f.syncWriteMessage(ctx, msg); err != nil { + f.syncClose(false) + return true, fmt.Errorf("fluent#write: %w", err) } // Acknowledgment check if msg.ack != "" { - resp := &AckResp{} - var err error - if f.Config.MarshalAsJSON { - dec := json.NewDecoder(f.conn) - err = dec.Decode(resp) - } else { - r := msgp.NewReader(f.conn) - err = resp.DecodeMsg(r) + resp, err := f.syncReadAck(ctx) + if err != nil { + fmt.Fprintf(os.Stderr, "fluent#write: error reading message response ack %v. Closing connection...", err) + f.syncClose(false) + return true, err } - - if err != nil || resp.Ack != msg.ack { + if resp.Ack != msg.ack { fmt.Fprintf(os.Stderr, "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection...", resp.Ack, msg.ack) - - closer() + f.syncClose(false) return true, err } } diff --git a/fluent/fluent_test.go b/fluent/fluent_test.go index cdd82f2..c8b33e7 100644 --- a/fluent/fluent_test.go +++ b/fluent/fluent_test.go @@ -10,6 +10,9 @@ import ( "net" "reflect" "runtime" + "strconv" + "strings" + "sync" "testing" "time" @@ -44,18 +47,18 @@ func newTestDialer() *testDialer { // For instance, to test an async logger that have to dial 4 times before succeeding, // the test should look like this: // -// d := newTestDialer() // Create a new stubbed dialer -// cfg := Config{ -// Async: true, -// // ... -// } -// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer -// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) +// d := newTestDialer() // Create a new stubbed dialer +// cfg := Config{ +// Async: true, +// // ... +// } +// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer +// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) // -// d.waitForNextDialing(false, false) // 1st dialing attempt fails -// d.waitForNextDialing(false, false) // 2nd attempt fails too -// d.waitForNextDialing(false, false) // 3rd attempt fails too -// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds +// d.waitForNextDialing(false, false) // 1st dialing attempt fails +// d.waitForNextDialing(false, false) // 2nd attempt fails too +// d.waitForNextDialing(false, false) // 3rd attempt fails too +// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds // // Note that in the above example, the logger operates in async mode. As such, // a call to Post, PostWithTime or EncodeAndPostData is needed *before* calling @@ -66,20 +69,20 @@ func newTestDialer() *testDialer { // case, you have to put the calls to newWithDialer() and to EncodeAndPostData() // into their own goroutine. An example: // -// d := newTestDialer() // Create a new stubbed dialer -// cfg := Config{ -// Async: false, -// // ... -// } -// go func() { -// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer -// f.Close() -// }() +// d := newTestDialer() // Create a new stubbed dialer +// cfg := Config{ +// Async: false, +// // ... +// } +// go func() { +// f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer +// f.Close() +// }() // -// d.waitForNextDialing(false, false) // 1st dialing attempt fails -// d.waitForNextDialing(false, false) // 2nd attempt fails too -// d.waitForNextDialing(false, false) // 3rd attempt fails too -// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds +// d.waitForNextDialing(false, false) // 1st dialing attempt fails +// d.waitForNextDialing(false, false) // 2nd attempt fails too +// d.waitForNextDialing(false, false) // 3rd attempt fails too +// d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds // // Moreover, waitForNextDialing() returns a *Conn which extends net.Conn to provide testing // facilities. For instance, you can call waitForNextWrite() on these connections, to @@ -90,24 +93,24 @@ func newTestDialer() *testDialer { // // Here's a full example: // -// d := newTestDialer() -// cfg := Config{Async: true} +// d := newTestDialer() +// cfg := Config{Async: true} // -// f := newWithDialer(cfg, d) -// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) +// f := newWithDialer(cfg, d) +// f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) // -// conn := d.waitForNextDialing(true, false) // Accept the dialing -// conn.waitForNextWrite(false, "") // Discard the 1st attempt to write the message +// conn := d.waitForNextDialing(true, false) // Accept the dialing +// conn.waitForNextWrite(false, "") // Discard the 1st attempt to write the message // -// conn := d.waitForNextDialing(true, false) -// assertReceived(t, // t is *testing.T -// conn.waitForNextWrite(true, ""), -// "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]") +// conn := d.waitForNextDialing(true, false) +// assertReceived(t, // t is *testing.T +// conn.waitForNextWrite(true, ""), +// "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]") // -// f.EncodeAndPostData("something_else", time.Unix(1482493050, 0), map[string]string{"bar": "baz"}) -// assertReceived(t, // t is *testing.T -// conn.waitForNextWrite(true, ""), -// "[\"something_else\",1482493050,{\"bar\":\"baz\"},{}]") +// f.EncodeAndPostData("something_else", time.Unix(1482493050, 0), map[string]string{"bar": "baz"}) +// assertReceived(t, // t is *testing.T +// conn.waitForNextWrite(true, ""), +// "[\"something_else\",1482493050,{\"bar\":\"baz\"},{}]") // // In this example, the 1st connection dialing succeeds but the 1st attempt to write the // message is discarded. As the logger discards the connection whenever a message @@ -269,6 +272,11 @@ func (c *Conn) SetWriteDeadline(t time.Time) error { return nil } +// SetReadDeadline is a nop for our test dialer. +func (c *Conn) SetReadDeadline(time.Time) error { + return nil +} + func (c *Conn) Close() error { if c.delayNextReadCh != nil { close(c.delayNextReadCh) @@ -477,7 +485,10 @@ func TestPostWithTime(t *testing.T) { _ = f.PostWithTime("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) _ = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"fluentd": "is awesome"}) _ = f.PostWithTime("tag_name", time.Unix(1634263200, 0), - struct {Welcome string `msg:"welcome"`; cannot string}{"to use", "see me"}) + struct { + Welcome string `msg:"welcome"` + cannot string + }{"to use", "see me"}) }() conn := d.waitForNextDialing(true, false) @@ -678,6 +689,7 @@ func TestCloseOnFailingAsyncReconnect(t *testing.T) { ForceStopAsyncSend: true, RequestAck: true, }, + "without RequestAck": { Async: true, ForceStopAsyncSend: true, @@ -764,8 +776,8 @@ func TestSyncWriteAfterCloseFails(t *testing.T) { t.Error("expected an error") } - // and also must keep Fluentd closed. - if f.closed != true { + // and also must keep Fluentd closed. true equals 1. + if f.closed != int32(1) { t.Error("expected Fluentd to be kept closed") } }() @@ -774,6 +786,53 @@ func TestSyncWriteAfterCloseFails(t *testing.T) { conn.waitForNextWrite(true, "") } +func TestPendingChannelThreadSafety(t *testing.T) { + f, err := New(Config{ + Async: true, + ForceStopAsyncSend: true, + }) + if err != nil { + t.Fatalf("Failed to create logger: %v", err) + } + + // Start multiple goroutines posting messages + const numGoroutines = 10 + const messagesPerGoroutine = 100 + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < messagesPerGoroutine; j++ { + // Post a message + err := f.Post("tag", map[string]string{ + "goroutine": strconv.Itoa(id), + "message": strconv.Itoa(j), + }) + + // If the logger is closed, we expect an error + if err != nil && !strings.Contains(err.Error(), "already closed") { + t.Errorf("Unexpected error: %v", err) + return + } + + // Add a small delay to increase the chance of race conditions + time.Sleep(time.Millisecond) + } + }(i) + } + + // Wait a bit to let some messages be posted + time.Sleep(10 * time.Millisecond) + + // Close the logger while goroutines are still posting + f.Close() + + // Wait for all goroutines to finish + wg.Wait() +} + func Benchmark_PostWithShortMessage(b *testing.B) { b.StopTimer() d := newTestDialer()