diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index c2b7efdbc0a..083a3b3e123 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -67,9 +67,10 @@ struct flb_config { * shutdown when all remaining tasks are flushed */ int grace; - int grace_count; /* Count of grace shutdown tries */ - flb_pipefd_t flush_fd; /* Timer FD associated to flush */ - int convert_nan_to_null; /* convert null to nan ? */ + int grace_count; /* Count of grace shutdown tries */ + int grace_input; /* Shutdown grace to keep inputs ingesting */ + flb_pipefd_t flush_fd; /* Timer FD associated to flush */ + int convert_nan_to_null; /* Convert null to nan ? */ int daemon; /* Run as a daemon ? */ flb_pipefd_t shutdown_fd; /* Shutdown FD, 5 seconds */ @@ -243,6 +244,7 @@ struct flb_config { int storage_max_chunks_up; /* max number of chunks 'up' in memory */ int storage_del_bad_chunks; /* delete irrecoverable chunks */ char *storage_bl_mem_limit; /* storage backlog memory limit */ + int storage_bl_flush_on_shutdown; /* enable/disable backlog chunks flush on shutdown */ struct flb_storage_metrics *storage_metrics_ctx; /* storage metrics context */ int storage_trim_files; /* enable/disable file trimming */ @@ -392,6 +394,8 @@ enum conf_type { #define FLB_CONF_STORAGE_METRICS "storage.metrics" #define FLB_CONF_STORAGE_CHECKSUM "storage.checksum" #define FLB_CONF_STORAGE_BL_MEM_LIMIT "storage.backlog.mem_limit" +#define FLB_CONF_STORAGE_BL_FLUSH_ON_SHUTDOWN \ + "storage.backlog.flush_on_shutdown" #define FLB_CONF_STORAGE_MAX_CHUNKS_UP "storage.max_chunks_up" #define FLB_CONF_STORAGE_DELETE_IRRECOVERABLE_CHUNKS \ "storage.delete_irrecoverable_chunks" diff --git a/include/fluent-bit/flb_engine.h b/include/fluent-bit/flb_engine.h index d242cc61a43..b10bed4202c 100644 --- a/include/fluent-bit/flb_engine.h +++ b/include/fluent-bit/flb_engine.h @@ -37,6 +37,7 @@ int flb_engine_exit_status(struct flb_config *config, int status); int flb_engine_shutdown(struct flb_config *config); int flb_engine_destroy_tasks(struct mk_list *tasks); void flb_engine_reschedule_retries(struct flb_config *config); +void flb_engine_stop_ingestion(struct flb_config *config); /* Engine event loop */ void flb_engine_evl_init(); diff --git a/include/fluent-bit/flb_storage.h b/include/fluent-bit/flb_storage.h index 220568ae41d..57b50e19016 100644 --- a/include/fluent-bit/flb_storage.h +++ b/include/fluent-bit/flb_storage.h @@ -83,4 +83,6 @@ struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx); /* cmetrics */ int flb_storage_metrics_update(struct flb_config *config, struct flb_storage_metrics *sm); +void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks); + #endif diff --git a/src/flb_config.c b/src/flb_config.c index 168324cbd84..d00f068935e 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -145,6 +145,9 @@ struct flb_service_config service_configs[] = { {FLB_CONF_STORAGE_BL_MEM_LIMIT, FLB_CONF_TYPE_STR, offsetof(struct flb_config, storage_bl_mem_limit)}, + {FLB_CONF_STORAGE_BL_FLUSH_ON_SHUTDOWN, + FLB_CONF_TYPE_BOOL, + offsetof(struct flb_config, storage_bl_flush_on_shutdown)}, {FLB_CONF_STORAGE_MAX_CHUNKS_UP, FLB_CONF_TYPE_INT, offsetof(struct flb_config, storage_max_chunks_up)}, @@ -246,6 +249,7 @@ struct flb_config *flb_config_init() config->verbose = 3; config->grace = 5; config->grace_count = 0; + config->grace_input = config->grace / 2; config->exit_status_code = 0; /* json */ @@ -286,6 +290,7 @@ struct flb_config *flb_config_init() config->storage_path = NULL; config->storage_input_plugin = NULL; config->storage_metrics = FLB_TRUE; + config->storage_bl_flush_on_shutdown = FLB_FALSE; config->sched_cap = FLB_SCHED_CAP; config->sched_base = FLB_SCHED_BASE; diff --git a/src/flb_engine.c b/src/flb_engine.c index e1069a4c4cd..cc057ad30af 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -694,6 +694,9 @@ int sb_segregate_chunks(struct flb_config *config) int flb_engine_start(struct flb_config *config) { int ret; + int tasks = 0; + int fs_chunks = 0; + int mem_chunks = 0; uint64_t ts; char tmp[16]; int rb_flush_flag; @@ -704,6 +707,7 @@ int flb_engine_start(struct flb_config *config) struct flb_sched *sched; struct flb_net_dns dns_ctx; struct flb_notification *notification; + int exiting = FLB_FALSE; /* Initialize the networking layer */ flb_net_lib_init(); @@ -972,11 +976,15 @@ int flb_engine_start(struct flb_config *config) ret = sb_segregate_chunks(config); - if (ret) { + if (ret < 0) + { flb_error("[engine] could not segregate backlog chunks"); return -2; } + config->grace_input = config->grace / 2; + flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input); + while (1) { rb_flush_flag = FLB_FALSE; @@ -1045,19 +1053,56 @@ int flb_engine_start(struct flb_config *config) * If grace period is set to -1, keep trying to shut down until all * tasks and retries get flushed. */ - ret = flb_task_running_count(config); + tasks = 0; + mem_chunks = 0; + fs_chunks = 0; + tasks = flb_task_running_count(config); + flb_storage_chunk_count(config, &mem_chunks, &fs_chunks); + ret = tasks + mem_chunks + fs_chunks; if (ret > 0 && (config->grace_count < config->grace || config->grace == -1)) { if (config->grace_count == 1) { flb_task_running_print(config); + /* + * If storage.backlog.shutdown_flush is enabled, attempt to flush pending + * filesystem chunks during shutdown. This is particularly useful in scenarios + * where Fluent Bit cannot restart to ensure buffered data is not lost. + */ + if (config->storage_bl_flush_on_shutdown) { + ret = sb_segregate_chunks(config); + if (ret < 0) { + flb_error("[engine] could not segregate backlog chunks during shutdown"); + return -2; + } + } + } + if ((mem_chunks + fs_chunks) > 0) { + flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", + mem_chunks, fs_chunks, config->grace_count); + } + + /* Create new tasks for pending chunks */ + flb_engine_flush(config, NULL); + if (config->grace_count < config->grace_input) { + if (exiting == FLB_FALSE) { + flb_engine_exit(config); + exiting = FLB_TRUE; + } + } else { + if (config->is_ingestion_active == FLB_TRUE) { + flb_engine_stop_ingestion(config); + } } - flb_engine_exit(config); } else { - if (ret > 0) { + if (tasks > 0) { flb_task_running_print(config); } + if ((mem_chunks + fs_chunks) > 0) { + flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", + mem_chunks, fs_chunks, config->grace_count); + } flb_info("[engine] service has stopped (%i pending tasks)", - ret); + tasks); ret = config->exit_status_code; flb_engine_shutdown(config); config = NULL; @@ -1158,6 +1203,7 @@ int flb_engine_shutdown(struct flb_config *config) struct flb_sched_timer_coro_cb_params *sched_params; config->is_running = FLB_FALSE; + config->is_ingestion_active = FLB_FALSE; flb_input_pause_all(config); #ifdef FLB_HAVE_STREAM_PROCESSOR @@ -1226,6 +1272,16 @@ int flb_engine_exit(struct flb_config *config) return ret; } +/* Stop ingestion and pause all inputs */ +void flb_engine_stop_ingestion(struct flb_config *config) +{ + config->is_ingestion_active = FLB_FALSE; + config->is_shutting_down = FLB_TRUE; + + flb_info("[engine] pausing all inputs.."); + flb_input_pause_all(config); +} + int flb_engine_exit_status(struct flb_config *config, int status) { config->exit_status_code = status; diff --git a/src/flb_storage.c b/src/flb_storage.c index 4148be66781..4662e9d064c 100644 --- a/src/flb_storage.c +++ b/src/flb_storage.c @@ -710,6 +710,16 @@ int flb_storage_create(struct flb_config *ctx) return 0; } +void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks) +{ + struct cio_stats storage_st; + + cio_stats_get(ctx->cio, &storage_st); + + *mem_chunks = storage_st.chunks_mem; + *fs_chunks = storage_st.chunks_fs; +} + void flb_storage_destroy(struct flb_config *ctx) { struct cio_ctx *cio;