Skip to content

engine: Ring buffer data loss during graceful shutdown with threaded inputs(in_tail) #11338

@jinyongchoi

Description

@jinyongchoi

Bug Report

Describe the bug
Fluent Bit loses data buffered in ring buffers during graceful shutdown when using threaded input plugins. The engine's shutdown sequence doesn't wait for ring buffer contents to be drained before terminating, causing in-flight records to be discarded.

  1. Fixed for logging and build
diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c
index 3c0e343dd..dc86cd3d2 100644
--- a/src/flb_input_chunk.c
+++ b/src/flb_input_chunk.c
@@ -3064,6 +3064,10 @@ void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data)
 
         while (1) {
             if (flb_input_buf_paused(ins) == FLB_TRUE) {
+                if (ctx->is_shutting_down == FLB_TRUE) {
+                    flb_info("[input chunk] %s collector exiting due to shutdown paused state, ring buffer size=%zu",
+                         flb_input_name(ins), lwrb_get_full(ins->rb->ctx));
+                }
                 break;
             }
 

  1. Create input log file
#!/usr/bin/env python3

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%d/%b/%Y:%H:%M:%S +0000")
PATH = "/api/v1/" + "a" * 1000


def generate_large_log_line(line_number):
    log_line = (
        f"192.168.1.100 - - [{TIMESTAMP}] "
        f'"GET {PATH} HTTP/1.1" 200 {line_number} '
        f'"-" "Mozilla/5.0"\n'
    )

    return log_line


def main():
    output_file = "/tmp/testing.input"
    target_size = 2 * 1024 * 1024 * 1024
    line_size = 1024
    total_lines = target_size // line_size

    print(f"Starting log generation: {output_file}")
    print(f"Target size: {target_size / (1024**3):.2f} GB")
    print(f"Expected lines: {total_lines:,}")

    written_size = 0
    line_count = 0

    try:
        with open(output_file, "w") as f:
            while written_size < target_size:
                log_line = generate_large_log_line(line_count)
                f.write(log_line)

                written_size += len(log_line.encode("utf-8"))
                line_count += 1

                if line_count % 100000 == 0:
                    progress = (written_size / target_size) * 100
                    size_mb = written_size / (1024 * 1024)
                    print(
                        f"Progress: {progress:.1f}% - {size_mb:.1f} MB - {line_count:,} lines"
                    )

    except KeyboardInterrupt:
        print("\nLog generation interrupted")
    except Exception as e:
        print(f"Error occurred: {e}")

    final_size_gb = written_size / (1024**3)
    print("\nLog generation completed!")
    print(f"Generated size: {final_size_gb:.2f} GB")
    print(f"Generated lines: {line_count:,}")
    print(f"Average line size: {written_size / line_count:.0f} bytes")


if __name__ == "__main__":
    main()

  1. Run Fluent Bit
fluent-bit -v -c ./fluentbit.conf
  1. After 3Sec and stop Fluent Bit
  2. Check Fluent Bit log
[2026/01/05 22:33:07.252300551] [ info] [input chunk] input_log collector exiting due to shutdown paused state, ring buffer size=1880
[2026/01/05 22:33:07.502327629] [ info] [input chunk] input_log collector exiting due to shutdown paused state, ring buffer size=1880
[2026/01/05 22:33:07.752300897] [ info] [input chunk] input_log collector exiting due to shutdown paused state, ring buffer size=1880
[2026/01/05 22:33:08.2306863] [ info] [input chunk] input_log collector exiting due to shutdown paused state, ring buffer size=1880
  • Rubular link if applicable:
  • Example log message if applicable:
  • Steps to reproduce the problem:

Expected behavior
During graceful shutdown, Fluent Bit should wait for all ring buffer contents to be drained before terminating. Records that have been accepted by threaded input plugins and buffered in ring buffers must be flushed to chunks and processed completely. The shutdown sequence should respect the configured grace period and only exit when all data sources (tasks, chunks, and ring buffers) are empty or the grace timeout is reached. No data loss should occur for records already accepted by the system.

Screenshots
N/A

Your Environment
Ubuntu 24.04

  • Version used: 4.2.3
  • Configuration:
[SERVICE]
    flush 2
    grace 60
    log_level info
    log_file /tmp/testing/logs/testing.log
    parsers_file /tmp/testing/parsers.conf
    plugins_file /tmp/testing/plugins.conf
    http_server on
    http_listen 0.0.0.0
    http_port 22002

    storage.path /tmp/testing/storage
    storage.metrics on
    storage.max_chunks_up 512
    storage.sync full
    storage.checksum off
    storage.backlog.mem_limit 100M

[INPUT]
    Name tail
    Path /tmp/testing.input
    Exclude_Path *.gz,*.zip
    Tag testing
    Key message
    Offset_Key   log_offset

    Read_from_Head true
    Refresh_Interval 3
    Rotate_Wait 31557600

    Buffer_Chunk_Size 1MB
    Buffer_Max_Size 16MB
    Inotify_Watcher false

    storage.type filesystem
    storage.pause_on_chunks_overlimit true

    DB /tmp/testing/storage/testing.db
    DB.sync normal
    DB.locking false

    Alias input_log

    Threaded true

[OUTPUT]
    Name file
    Match *
    File /tmp/testing.out
  • Environment name and version (e.g. Kubernetes? What version?):
  • Server type and version:
  • Operating System and version:
  • Filters and plugins:

Additional context
N/A

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions