diff --git a/include/fluent-bit/flb_input_chunk.h b/include/fluent-bit/flb_input_chunk.h index 08caaf8539b..bbd0e087eed 100644 --- a/include/fluent-bit/flb_input_chunk.h +++ b/include/fluent-bit/flb_input_chunk.h @@ -162,5 +162,6 @@ int flb_input_chunk_down(struct flb_input_chunk *ic); int flb_input_chunk_is_up(struct flb_input_chunk *ic); void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic, size_t chunk_size); +size_t flb_input_chunk_get_total_ring_buffer_size(const struct flb_config *config); #endif diff --git a/include/fluent-bit/flb_input_thread.h b/include/fluent-bit/flb_input_thread.h index 4abfc1511ea..d85baf363a8 100644 --- a/include/fluent-bit/flb_input_thread.h +++ b/include/fluent-bit/flb_input_thread.h @@ -26,6 +26,7 @@ #include #include #include +#include #define BUFFER_SIZE 65535 @@ -89,6 +90,13 @@ struct flb_input_thread_instance { int input_coro_id; struct mk_list input_coro_list; struct mk_list input_coro_list_destroy; + + /* + * Pause state flag for shutdown synchronization. + * Set to 1 when thread completes pause processing. + * Checked by main thread to ensure safe shutdown. + */ + volatile sig_atomic_t is_paused; }; int flb_input_thread_instance_init(struct flb_config *config, diff --git a/include/fluent-bit/flb_ring_buffer.h b/include/fluent-bit/flb_ring_buffer.h index 9ee3a133ff9..f3c04f3ed99 100644 --- a/include/fluent-bit/flb_ring_buffer.h +++ b/include/fluent-bit/flb_ring_buffer.h @@ -40,5 +40,6 @@ int flb_ring_buffer_add_event_loop(struct flb_ring_buffer *rb, void *evl, uint8_ int flb_ring_buffer_write(struct flb_ring_buffer *rb, void *ptr, size_t size); int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size); +size_t flb_ring_buffer_get_used(struct flb_ring_buffer *rb); #endif diff --git a/src/flb_engine.c b/src/flb_engine.c index 9d998b1b702..aaffa754586 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -33,9 +33,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -800,12 +802,42 @@ int sb_segregate_chunks(struct flb_config *config) } #endif +/* Check if all threaded inputs have completed pause */ +static int all_threaded_inputs_paused(struct flb_config *config) +{ + struct mk_list *head; + struct flb_input_instance *in; + + mk_list_foreach(head, &config->inputs) { + in = mk_list_entry(head, struct flb_input_instance, _head); + + if (in->is_threaded && in->thi) { + /* + * Skip inputs that cannot acknowledge pause: + * - No pause/resume callbacks defined + * - No context (plugin initialization failed) + */ + if (in->p->cb_pause == NULL || in->p->cb_resume == NULL || + in->context == NULL) { + continue; + } + + if (in->thi->is_paused == FLB_FALSE) { + return FLB_FALSE; + } + } + } + + return FLB_TRUE; +} + int flb_engine_start(struct flb_config *config) { int ret; int tasks = 0; int fs_chunks = 0; int mem_chunks = 0; + size_t rb_size = 0; uint64_t ts; char tmp[16]; int rb_flush_flag; @@ -1164,6 +1196,7 @@ int flb_engine_start(struct flb_config *config) fs_chunks = 0; tasks = flb_task_running_count(config); flb_storage_chunk_count(config, &mem_chunks, &fs_chunks); + rb_size = flb_input_chunk_get_total_ring_buffer_size(config); if ((mem_chunks + fs_chunks) > 0) { flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", @@ -1174,7 +1207,21 @@ int flb_engine_start(struct flb_config *config) flb_task_running_print(config); } - ret = tasks + mem_chunks + fs_chunks; + ret = tasks + mem_chunks + fs_chunks + (rb_size > 0); + + if (rb_size > 0) { + flb_info("[engine] ring buffer pending: %zu bytes", rb_size); + flb_input_chunk_ring_buffer_collector(config, NULL); + } + + /* Check thread pause only when all other work is done */ + if (ret == 0) { + if (!all_threaded_inputs_paused(config)) { + ret++; + flb_debug("[engine] waiting for threaded inputs to complete pause"); + } + } + if (ret > 0 && (config->grace_count < config->grace || config->grace == -1)) { if (config->grace_count == 1) { /* diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 58d98779782..1df764bf43b 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -2413,6 +2413,7 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) if (flb_input_chunk_is_mem_overlimit(in) == FLB_FALSE && in->config->is_running == FLB_TRUE && in->config->is_ingestion_active == FLB_TRUE && + in->config->is_shutting_down == FLB_FALSE && in->mem_buf_status == FLB_INPUT_PAUSED) { in->mem_buf_status = FLB_INPUT_RUNNING; if (in->p->cb_resume) { @@ -2426,6 +2427,7 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in) if (flb_input_chunk_is_storage_overlimit(in) == FLB_FALSE && in->config->is_running == FLB_TRUE && in->config->is_ingestion_active == FLB_TRUE && + in->config->is_shutting_down == FLB_FALSE && in->storage_buf_status == FLB_INPUT_PAUSED) { in->storage_buf_status = FLB_INPUT_RUNNING; if (in->p->cb_resume) { @@ -2667,11 +2669,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in, } } - /* Check if the input plugin has been paused */ - if (flb_input_buf_paused(in) == FLB_TRUE) { - flb_debug("[input chunk] %s is paused, cannot append records", - flb_input_name(in)); - return -1; + /* + * Check if the input plugin has been paused. + * During shutdown (is_shutting_down=TRUE), we must accept data to flush + * remaining ring buffer contents, so we skip the pause check. + */ + if (in->config->is_shutting_down == FLB_FALSE) { + if (flb_input_buf_paused(in) == FLB_TRUE) { + flb_debug("[input chunk] %s is paused, cannot append records", + flb_input_name(in)); + return -1; + } } if (buf_size == 0) { @@ -3063,8 +3071,21 @@ void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data) cr = NULL; while (1) { - if (flb_input_buf_paused(ins) == FLB_TRUE) { - break; + /* + * During normal operation we respect the pause state to maintain + * backpressure: if the input is paused we stop consuming from + * the ring buffer. + * + * During shutdown (is_shutting_down == FLB_TRUE) we intentionally + * skip this pause check so the ring buffer can be fully drained, + * even when backpressure would normally prevent further reads. + * This is critical to flush all enqueued records and avoid data + * loss during graceful shutdown. + */ + if (ctx->is_shutting_down == FLB_FALSE) { + if (flb_input_buf_paused(ins) == FLB_TRUE) { + break; + } } ret = flb_ring_buffer_read(ins->rb, @@ -3286,3 +3307,23 @@ void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic, } } } + +/* + * Calculate total size of all ring buffers across all threaded input instances. + * Returns 0 if no data is pending in ring buffers. + */ +size_t flb_input_chunk_get_total_ring_buffer_size(const struct flb_config *config) +{ + size_t total_size = 0; + struct mk_list *head; + struct flb_input_instance *ins; + + mk_list_foreach(head, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + if (flb_input_is_threaded(ins) && ins->rb) { + total_size += flb_ring_buffer_get_used(ins->rb); + } + } + + return total_size; +} diff --git a/src/flb_input_thread.c b/src/flb_input_thread.c index 8604a407081..3f8177deb0f 100644 --- a/src/flb_input_thread.c +++ b/src/flb_input_thread.c @@ -77,11 +77,21 @@ static inline int handle_input_event(flb_pipefd_t fd, struct flb_input_instance if (operation == FLB_INPUT_THREAD_PAUSE) { if (ins->p->cb_pause && ins->context) { ins->p->cb_pause(ins->context, ins->config); + + /* Mark thread as paused for shutdown synchronization */ + if (ins->is_threaded && ins->thi) { + ins->thi->is_paused = FLB_TRUE; + } } } else if (operation == FLB_INPUT_THREAD_RESUME) { - if (ins->p->cb_resume) { + if (ins->p->cb_resume && ins->context) { ins->p->cb_resume(ins->context, ins->config); + + /* Clear paused flag on resume */ + if (ins->is_threaded && ins->thi) { + ins->thi->is_paused = FLB_FALSE; + } } } else if (operation == FLB_INPUT_THREAD_EXIT) { diff --git a/src/flb_ring_buffer.c b/src/flb_ring_buffer.c index 00195603f2c..3e0e3f57647 100644 --- a/src/flb_ring_buffer.c +++ b/src/flb_ring_buffer.c @@ -202,4 +202,8 @@ int flb_ring_buffer_read(struct flb_ring_buffer *rb, void *ptr, size_t size) return 0; } +size_t flb_ring_buffer_get_used(struct flb_ring_buffer *rb) +{ + return lwrb_get_full(rb->ctx); +}