Skip to content

Calls to Write() after calling Close() in sync mode reopen the connection #104

Closed
@akerouanton

Description

@akerouanton

Currently, in sync mode, the code reopens the connection when Write() is called after Close(). This could happen either because:

  1. Client's code call these functions serially from a single goroutine ;
  2. Or when two goroutines call these functions concurrently and the connection mutex lead to both goroutines being serialized in that specific order ⬆️

In the first case, if Write() is serially called after Close(), it might seem legit to have to call Close() once again but in the second case, it doesn't seem good to have to call Close() multiple times to be really sure the connection is closed.

This case doesn't happen in async mode because f.Close() takes care of setting f.chanClosed = true and f.appendBuffer() checks if f.chanClosed = true and returns an error if that's the case.

func (f *Fluent) Close() (err error) {
if f.Config.Async {
f.pendingMutex.Lock()
if f.chanClosed {
f.pendingMutex.Unlock()
return nil
}
f.chanClosed = true

func (f *Fluent) appendBuffer(msg *msgToSend) error {
f.pendingMutex.RLock()
defer f.pendingMutex.RUnlock()
if f.chanClosed {
return fmt.Errorf("fluent#appendBuffer: Logger already closed")
}

I suggest to change the code for sync mode to not accept any new messages when f.Close() has been called and document that change (as well as thread safety improvement done in #82).

I've written the following test to confirm my findings (although I'm not sure how to test the inverse behavior). I ran it against the current master branch and on pre-#82 code to make sure this behavior wasn't introduced by that change. Both tests trigger the error.

func TestSyncWriteAfterCloseFails(t *testing.T) {
	d := newTestDialer()

	go func() {
		var f *Fluent
		var err error
		if f, err = newWithDialer(Config{
			Async: false,
		}, d); err != nil {
			t.Errorf("Unexpected error: %v", err)
		}

		_ = f.PostWithTime("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})

		if err := f.Close(); err != nil {
			t.Errorf("Unexpected error: %v", err)
		}

		_ = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"fluentd": "is awesome"})
	}()

	conn := d.waitForNextDialing(true, false)
	conn.waitForNextWrite(true, "")

	conn = d.waitForNextDialing(true, false)
	conn.waitForNextWrite(true, "")

        // isOpen is a field added to Conn specifically for this test
	if conn.isOpen == true {
		t.Error("Connection has been reopened.") // This is currently triggered
	}
}

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions