Skip to content

Commit b8d8d8a

Browse files
authored
Merge pull request #77 from JamesJJ/master
`AsyncStop` option for graceful stop in async mode
2 parents e5d2460 + 005bb40 commit b8d8d8a

File tree

4 files changed

+52
-26
lines changed

4 files changed

+52
-26
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ Since the default is zero value, Write will not time out.
6868
Enable asynchronous I/O (connect and write) for sending events to Fluentd.
6969
The default is false.
7070

71+
### ForceStopAsyncSend
72+
73+
When Async is enabled, immediately discard the event queue on close() and return (instead of trying MaxRetry times for each event in the queue before returning)
74+
The default is false.
75+
7176
### RequestAck
7277

7378
Sets whether to request acknowledgment from Fluentd to increase the reliability

fluent/fluent.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,19 @@ const (
3737
)
3838

3939
type Config struct {
40-
FluentPort int `json:"fluent_port"`
41-
FluentHost string `json:"fluent_host"`
42-
FluentNetwork string `json:"fluent_network"`
43-
FluentSocketPath string `json:"fluent_socket_path"`
44-
Timeout time.Duration `json:"timeout"`
45-
WriteTimeout time.Duration `json:"write_timeout"`
46-
BufferLimit int `json:"buffer_limit"`
47-
RetryWait int `json:"retry_wait"`
48-
MaxRetry int `json:"max_retry"`
49-
MaxRetryWait int `json:"max_retry_wait"`
50-
TagPrefix string `json:"tag_prefix"`
51-
Async bool `json:"async"`
40+
FluentPort int `json:"fluent_port"`
41+
FluentHost string `json:"fluent_host"`
42+
FluentNetwork string `json:"fluent_network"`
43+
FluentSocketPath string `json:"fluent_socket_path"`
44+
Timeout time.Duration `json:"timeout"`
45+
WriteTimeout time.Duration `json:"write_timeout"`
46+
BufferLimit int `json:"buffer_limit"`
47+
RetryWait int `json:"retry_wait"`
48+
MaxRetry int `json:"max_retry"`
49+
MaxRetryWait int `json:"max_retry_wait"`
50+
TagPrefix string `json:"tag_prefix"`
51+
Async bool `json:"async"`
52+
ForceStopAsyncSend bool `json:"force_stop_async_send"`
5253
// Deprecated: Use Async instead
5354
AsyncConnect bool `json:"async_connect"`
5455
MarshalAsJSON bool `json:"marshal_as_json"`
@@ -83,8 +84,9 @@ type msgToSend struct {
8384
type Fluent struct {
8485
Config
8586

86-
pending chan *msgToSend
87-
wg sync.WaitGroup
87+
stopRunning chan bool
88+
pending chan *msgToSend
89+
wg sync.WaitGroup
8890

8991
muconn sync.Mutex
9092
conn net.Conn
@@ -305,6 +307,10 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg
305307
// Close closes the connection, waiting for pending logs to be sent
306308
func (f *Fluent) Close() (err error) {
307309
if f.Config.Async {
310+
if f.Config.ForceStopAsyncSend {
311+
f.stopRunning <- true
312+
close(f.stopRunning)
313+
}
308314
close(f.pending)
309315
f.wg.Wait()
310316
}
@@ -347,18 +353,31 @@ func (f *Fluent) connect() (err error) {
347353
}
348354

349355
func (f *Fluent) run() {
356+
drainEvents := false
357+
var emitEventDrainMsg sync.Once
350358
for {
351359
select {
352360
case entry, ok := <-f.pending:
353361
if !ok {
354362
f.wg.Done()
355363
return
356364
}
365+
if drainEvents {
366+
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
367+
continue
368+
}
357369
err := f.write(entry)
358370
if err != nil {
359371
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
360372
}
361373
}
374+
select {
375+
case stopRunning, ok := <-f.stopRunning:
376+
if stopRunning || !ok {
377+
drainEvents = true
378+
}
379+
default:
380+
}
362381
}
363382
}
364383

fluent/fluent_test.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -249,18 +249,19 @@ func TestJsonConfig(t *testing.T) {
249249
}
250250
var got Config
251251
expect := Config{
252-
FluentPort: 8888,
253-
FluentHost: "localhost",
254-
FluentNetwork: "tcp",
255-
FluentSocketPath: "/var/tmp/fluent.sock",
256-
Timeout: 3000,
257-
WriteTimeout: 6000,
258-
BufferLimit: 10,
259-
RetryWait: 5,
260-
MaxRetry: 3,
261-
TagPrefix: "fluent",
262-
Async: false,
263-
MarshalAsJSON: true,
252+
FluentPort: 8888,
253+
FluentHost: "localhost",
254+
FluentNetwork: "tcp",
255+
FluentSocketPath: "/var/tmp/fluent.sock",
256+
Timeout: 3000,
257+
WriteTimeout: 6000,
258+
BufferLimit: 10,
259+
RetryWait: 5,
260+
MaxRetry: 3,
261+
TagPrefix: "fluent",
262+
Async: false,
263+
ForceStopAsyncSend: false,
264+
MarshalAsJSON: true,
264265
}
265266

266267
err = json.Unmarshal(b, &got)

fluent/testdata/config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@
1010
"max_retry":3,
1111
"tag_prefix":"fluent",
1212
"async": false,
13+
"force_stop_async_send": false,
1314
"marshal_as_json": true
1415
}

0 commit comments

Comments
 (0)