From f010f192078d7370f1d35b64a995f4b5e91824af Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 30 May 2024 21:54:21 -0700 Subject: [PATCH 1/4] engine: input grace period delay shutdown for pending tasks & chunks 1. Input grace period Currently, Fluent Bit pauses all inputs 1 second after SIGTERM. This change creates an input grace period, which by default is one half the total Grace setting. This means that half way through the grace period Fluent Bit stops accepting any new logs and only sends logs pending in the buffers. 2. Check pending chunks on shutdown Previously the engine shutdown immediately if there were no pending tasks. A task is created from a chunk in the buffer. If there is a new chunk, but no task yet, the engine should keep running until the task is created and completed. This change makes the engine wait on shutdown for all pending chunks until the max grace period has expired. Signed-off-by: Wesley Pettit Co-authored-by: Anuj Singh --- include/fluent-bit/flb_config.h | 7 +++--- include/fluent-bit/flb_engine.h | 1 + include/fluent-bit/flb_storage.h | 2 ++ src/flb_config.c | 1 + src/flb_engine.c | 42 +++++++++++++++++++++++++++++--- src/flb_storage.c | 10 ++++++++ 6 files changed, 56 insertions(+), 7 deletions(-) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index c2b7efdbc0a..e31987c095b 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -67,9 +67,10 @@ struct flb_config { * shutdown when all remaining tasks are flushed */ int grace; - int grace_count; /* Count of grace shutdown tries */ - flb_pipefd_t flush_fd; /* Timer FD associated to flush */ - int convert_nan_to_null; /* convert null to nan ? */ + int grace_count; /* Count of grace shutdown tries */ + int grace_input; /* Shutdown grace to keep inputs ingesting */ + flb_pipefd_t flush_fd; /* Timer FD associated to flush */ + int convert_nan_to_null; /* Convert null to nan ? */ int daemon; /* Run as a daemon ? */ flb_pipefd_t shutdown_fd; /* Shutdown FD, 5 seconds */ diff --git a/include/fluent-bit/flb_engine.h b/include/fluent-bit/flb_engine.h index d242cc61a43..b10bed4202c 100644 --- a/include/fluent-bit/flb_engine.h +++ b/include/fluent-bit/flb_engine.h @@ -37,6 +37,7 @@ int flb_engine_exit_status(struct flb_config *config, int status); int flb_engine_shutdown(struct flb_config *config); int flb_engine_destroy_tasks(struct mk_list *tasks); void flb_engine_reschedule_retries(struct flb_config *config); +void flb_engine_stop_ingestion(struct flb_config *config); /* Engine event loop */ void flb_engine_evl_init(); diff --git a/include/fluent-bit/flb_storage.h b/include/fluent-bit/flb_storage.h index 220568ae41d..57b50e19016 100644 --- a/include/fluent-bit/flb_storage.h +++ b/include/fluent-bit/flb_storage.h @@ -83,4 +83,6 @@ struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx); /* cmetrics */ int flb_storage_metrics_update(struct flb_config *config, struct flb_storage_metrics *sm); +void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks); + #endif diff --git a/src/flb_config.c b/src/flb_config.c index 168324cbd84..e32eda10766 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -246,6 +246,7 @@ struct flb_config *flb_config_init() config->verbose = 3; config->grace = 5; config->grace_count = 0; + config->grace_input = config->grace / 2; config->exit_status_code = 0; /* json */ diff --git a/src/flb_engine.c b/src/flb_engine.c index e1069a4c4cd..8a743d15385 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -694,6 +694,9 @@ int sb_segregate_chunks(struct flb_config *config) int flb_engine_start(struct flb_config *config) { int ret; + int tasks = 0; + int fs_chunks = 0; + int mem_chunks = 0; uint64_t ts; char tmp[16]; int rb_flush_flag; @@ -977,6 +980,9 @@ int flb_engine_start(struct flb_config *config) return -2; } + config->grace_input = config->grace / 2; + flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input); + while (1) { rb_flush_flag = FLB_FALSE; @@ -1045,19 +1051,36 @@ int flb_engine_start(struct flb_config *config) * If grace period is set to -1, keep trying to shut down until all * tasks and retries get flushed. */ - ret = flb_task_running_count(config); + tasks = 0; + mem_chunks = 0; + fs_chunks = 0; + tasks = flb_task_running_count(config); + flb_storage_chunk_count(config, &mem_chunks, &fs_chunks); + ret = tasks + mem_chunks + fs_chunks; if (ret > 0 && (config->grace_count < config->grace || config->grace == -1)) { if (config->grace_count == 1) { flb_task_running_print(config); } - flb_engine_exit(config); + if ((mem_chunks + fs_chunks) > 0) { + flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d", + mem_chunks, fs_chunks); + } + if (config->grace_count < config->grace_input) { + flb_engine_exit(config); + } else { + flb_engine_stop_ingestion(config); + } } else { - if (ret > 0) { + if (tasks > 0) { flb_task_running_print(config); } + if ((mem_chunks + fs_chunks) > 0) { + flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d", + mem_chunks, fs_chunks); + } flb_info("[engine] service has stopped (%i pending tasks)", - ret); + tasks); ret = config->exit_status_code; flb_engine_shutdown(config); config = NULL; @@ -1158,6 +1181,7 @@ int flb_engine_shutdown(struct flb_config *config) struct flb_sched_timer_coro_cb_params *sched_params; config->is_running = FLB_FALSE; + config->is_ingestion_active = FLB_FALSE; flb_input_pause_all(config); #ifdef FLB_HAVE_STREAM_PROCESSOR @@ -1226,6 +1250,16 @@ int flb_engine_exit(struct flb_config *config) return ret; } +/* Stop ingestion and pause all inputs */ +void flb_engine_stop_ingestion(struct flb_config *config) +{ + config->is_ingestion_active = FLB_FALSE; + config->is_shutting_down = FLB_TRUE; + + flb_info("[engine] pausing all inputs.."); + flb_input_pause_all(config); +} + int flb_engine_exit_status(struct flb_config *config, int status) { config->exit_status_code = status; diff --git a/src/flb_storage.c b/src/flb_storage.c index 4148be66781..4662e9d064c 100644 --- a/src/flb_storage.c +++ b/src/flb_storage.c @@ -710,6 +710,16 @@ int flb_storage_create(struct flb_config *ctx) return 0; } +void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks) +{ + struct cio_stats storage_st; + + cio_stats_get(ctx->cio, &storage_st); + + *mem_chunks = storage_st.chunks_mem; + *fs_chunks = storage_st.chunks_fs; +} + void flb_storage_destroy(struct flb_config *ctx) { struct cio_ctx *cio; From ff457fced2c119d6520421f13419c80fbd313bb6 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Wed, 12 Jun 2024 21:01:09 -0700 Subject: [PATCH 2/4] engine: force flush on shutdown to create tasks for pending chunks Signed-off-by: Wesley Pettit --- src/flb_engine.c | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/flb_engine.c b/src/flb_engine.c index 8a743d15385..534f89bf671 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -707,6 +707,7 @@ int flb_engine_start(struct flb_config *config) struct flb_sched *sched; struct flb_net_dns dns_ctx; struct flb_notification *notification; + int exiting = FLB_FALSE; /* Initialize the networking layer */ flb_net_lib_init(); @@ -1062,13 +1063,21 @@ int flb_engine_start(struct flb_config *config) flb_task_running_print(config); } if ((mem_chunks + fs_chunks) > 0) { - flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d", - mem_chunks, fs_chunks); + flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", + mem_chunks, fs_chunks, config->grace_count); } + + /* Create new tasks for pending chunks */ + flb_engine_flush(config, NULL); if (config->grace_count < config->grace_input) { - flb_engine_exit(config); + if (exiting == FLB_FALSE) { + flb_engine_exit(config); + exiting = FLB_TRUE; + } } else { - flb_engine_stop_ingestion(config); + if (config->is_ingestion_active == FLB_TRUE) { + flb_engine_stop_ingestion(config); + } } } else { @@ -1076,8 +1085,8 @@ int flb_engine_start(struct flb_config *config) flb_task_running_print(config); } if ((mem_chunks + fs_chunks) > 0) { - flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d", - mem_chunks, fs_chunks); + flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", + mem_chunks, fs_chunks, config->grace_count); } flb_info("[engine] service has stopped (%i pending tasks)", tasks); From c460714ac4a04c05f38e2218b83842b2aaf8696c Mon Sep 17 00:00:00 2001 From: Anuj Singh Date: Fri, 2 May 2025 11:48:40 -0400 Subject: [PATCH 3/4] config: add new global 'storage.backlog.shutdown_flush' property (default: off) Signed-off-by: Anuj Singh --- include/fluent-bit/flb_config.h | 3 +++ src/flb_config.c | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index e31987c095b..083a3b3e123 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -244,6 +244,7 @@ struct flb_config { int storage_max_chunks_up; /* max number of chunks 'up' in memory */ int storage_del_bad_chunks; /* delete irrecoverable chunks */ char *storage_bl_mem_limit; /* storage backlog memory limit */ + int storage_bl_flush_on_shutdown; /* enable/disable backlog chunks flush on shutdown */ struct flb_storage_metrics *storage_metrics_ctx; /* storage metrics context */ int storage_trim_files; /* enable/disable file trimming */ @@ -393,6 +394,8 @@ enum conf_type { #define FLB_CONF_STORAGE_METRICS "storage.metrics" #define FLB_CONF_STORAGE_CHECKSUM "storage.checksum" #define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit" +#define FLB_CONF_STORAGE_BL_FLUSH_ON_SHUTDOWN \ + "storage.backlog.flush_on_shutdown" #define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up" #define FLB_CONF_STORAGE_DELETE_IRRECOVERABLE_CHUNKS \ "storage.delete_irrecoverable_chunks" diff --git a/src/flb_config.c b/src/flb_config.c index e32eda10766..d00f068935e 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -145,6 +145,9 @@ struct flb_service_config service_configs[] = { {FLB_CONF_STORAGE_BL_MEM_LIMIT, FLB_CONF_TYPE_STR, offsetof(struct flb_config, storage_bl_mem_limit)}, + {FLB_CONF_STORAGE_BL_FLUSH_ON_SHUTDOWN, + FLB_CONF_TYPE_BOOL, + offsetof(struct flb_config, storage_bl_flush_on_shutdown)}, {FLB_CONF_STORAGE_MAX_CHUNKS_UP, FLB_CONF_TYPE_INT, offsetof(struct flb_config, storage_max_chunks_up)}, @@ -287,6 +290,7 @@ struct flb_config *flb_config_init() config->storage_path = NULL; config->storage_input_plugin = NULL; config->storage_metrics = FLB_TRUE; + config->storage_bl_flush_on_shutdown = FLB_FALSE; config->sched_cap = FLB_SCHED_CAP; config->sched_base = FLB_SCHED_BASE; From 38850e25f81ef896f02086ddaadd6ca17e75ee65 Mon Sep 17 00:00:00 2001 From: Anuj Singh Date: Fri, 2 May 2025 11:49:15 -0400 Subject: [PATCH 4/4] engine: send backlog chunks on shutdown Signed-off-by: Anuj Singh Co-authored-by: Wesley Pettit --- src/flb_engine.c | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/flb_engine.c b/src/flb_engine.c index 534f89bf671..cc057ad30af 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -976,7 +976,8 @@ int flb_engine_start(struct flb_config *config) ret = sb_segregate_chunks(config); - if (ret) { + if (ret < 0) + { flb_error("[engine] could not segregate backlog chunks"); return -2; } @@ -1061,6 +1062,18 @@ int flb_engine_start(struct flb_config *config) if (ret > 0 && (config->grace_count < config->grace || config->grace == -1)) { if (config->grace_count == 1) { flb_task_running_print(config); + /* + * If storage.backlog.shutdown_flush is enabled, attempt to flush pending + * filesystem chunks during shutdown. This is particularly useful in scenarios + * where Fluent Bit cannot restart to ensure buffered data is not lost. + */ + if (config->storage_bl_flush_on_shutdown) { + ret = sb_segregate_chunks(config); + if (ret < 0) { + flb_error("[engine] could not segregate backlog chunks during shutdown"); + return -2; + } + } } if ((mem_chunks + fs_chunks) > 0) { flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d",