Skip to content

Commit f2f11c6

Browse files
engine: input grace period delay shutdown for pending tasks & chunks
1. Input grace period Currently, Fluent Bit pauses all inputs 1 second after SIGTERM. This change creates an input grace period, which by default is one half the total Grace setting. This means that half way through the grace period Fluent Bit stops accepting any new logs and only sends logs pending in the buffers. 2. Check pending chunks on shutdown Previously the engine shutdown immediately if there were no pending tasks. A task is created from a chunk in the buffer. If there is a new chunk, but no task yet, the engine should keep running until the task is created and completed. This change makes the engine wait on shutdown for all pending chunks until the max grace period has expired. Signed-off-by: Wesley Pettit <[email protected]> Co-authored-by: Anuj Singh <[email protected]>
1 parent 2fe47eb commit f2f11c6

File tree

6 files changed

+56
-7
lines changed

6 files changed

+56
-7
lines changed

include/fluent-bit/flb_config.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,10 @@ struct flb_config {
6767
* shutdown when all remaining tasks are flushed
6868
*/
6969
int grace;
70-
int grace_count; /* Count of grace shutdown tries */
71-
flb_pipefd_t flush_fd; /* Timer FD associated to flush */
72-
int convert_nan_to_null; /* convert null to nan ? */
70+
int grace_count; /* Count of grace shutdown tries */
71+
int grace_input; /* Shutdown grace to keep inputs ingesting */
72+
flb_pipefd_t flush_fd; /* Timer FD associated to flush */
73+
int convert_nan_to_null; /* Convert null to nan ? */
7374

7475
int daemon; /* Run as a daemon ? */
7576
flb_pipefd_t shutdown_fd; /* Shutdown FD, 5 seconds */

include/fluent-bit/flb_engine.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ int flb_engine_exit_status(struct flb_config *config, int status);
3737
int flb_engine_shutdown(struct flb_config *config);
3838
int flb_engine_destroy_tasks(struct mk_list *tasks);
3939
void flb_engine_reschedule_retries(struct flb_config *config);
40+
void flb_engine_stop_ingestion(struct flb_config *config);
4041

4142
/* Engine event loop */
4243
void flb_engine_evl_init();

include/fluent-bit/flb_storage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,6 @@ struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx);
8383
/* cmetrics */
8484
int flb_storage_metrics_update(struct flb_config *config, struct flb_storage_metrics *sm);
8585

86+
void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks);
87+
8688
#endif

src/flb_config.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ struct flb_config *flb_config_init()
241241
config->verbose = 3;
242242
config->grace = 5;
243243
config->grace_count = 0;
244+
config->grace_input = config->grace / 2;
244245
config->exit_status_code = 0;
245246

246247
/* json */

src/flb_engine.c

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,9 @@ int sb_segregate_chunks(struct flb_config *config)
685685
int flb_engine_start(struct flb_config *config)
686686
{
687687
int ret;
688+
int tasks = 0;
689+
int fs_chunks = 0;
690+
int mem_chunks = 0;
688691
uint64_t ts;
689692
char tmp[16];
690693
int rb_flush_flag;
@@ -965,6 +968,9 @@ int flb_engine_start(struct flb_config *config)
965968
return -2;
966969
}
967970

971+
config->grace_input = config->grace / 2;
972+
flb_info("[engine] Shutdown Grace Period=%d, Shutdown Input Grace Period=%d", config->grace, config->grace_input);
973+
968974
while (1) {
969975
rb_flush_flag = FLB_FALSE;
970976

@@ -1033,19 +1039,36 @@ int flb_engine_start(struct flb_config *config)
10331039
* If grace period is set to -1, keep trying to shut down until all
10341040
* tasks and retries get flushed.
10351041
*/
1036-
ret = flb_task_running_count(config);
1042+
tasks = 0;
1043+
mem_chunks = 0;
1044+
fs_chunks = 0;
1045+
tasks = flb_task_running_count(config);
1046+
flb_storage_chunk_count(config, &mem_chunks, &fs_chunks);
1047+
ret = tasks + mem_chunks + fs_chunks;
10371048
if (ret > 0 && (config->grace_count < config->grace || config->grace == -1)) {
10381049
if (config->grace_count == 1) {
10391050
flb_task_running_print(config);
10401051
}
1041-
flb_engine_exit(config);
1052+
if ((mem_chunks + fs_chunks) > 0) {
1053+
flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d",
1054+
mem_chunks, fs_chunks);
1055+
}
1056+
if (config->grace_count < config->grace_input) {
1057+
flb_engine_exit(config);
1058+
} else {
1059+
flb_engine_stop_ingestion(config);
1060+
}
10421061
}
10431062
else {
1044-
if (ret > 0) {
1063+
if (tasks > 0) {
10451064
flb_task_running_print(config);
10461065
}
1066+
if ((mem_chunks + fs_chunks) > 0) {
1067+
flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d",
1068+
mem_chunks, fs_chunks);
1069+
}
10471070
flb_info("[engine] service has stopped (%i pending tasks)",
1048-
ret);
1071+
tasks);
10491072
ret = config->exit_status_code;
10501073
flb_engine_shutdown(config);
10511074
config = NULL;
@@ -1146,6 +1169,7 @@ int flb_engine_shutdown(struct flb_config *config)
11461169
struct flb_sched_timer_coro_cb_params *sched_params;
11471170

11481171
config->is_running = FLB_FALSE;
1172+
config->is_ingestion_active = FLB_FALSE;
11491173
flb_input_pause_all(config);
11501174

11511175
#ifdef FLB_HAVE_STREAM_PROCESSOR
@@ -1214,6 +1238,16 @@ int flb_engine_exit(struct flb_config *config)
12141238
return ret;
12151239
}
12161240

1241+
/* Stop ingestion and pause all inputs */
1242+
void flb_engine_stop_ingestion(struct flb_config *config)
1243+
{
1244+
config->is_ingestion_active = FLB_FALSE;
1245+
config->is_shutting_down = FLB_TRUE;
1246+
1247+
flb_info("[engine] pausing all inputs..");
1248+
flb_input_pause_all(config);
1249+
}
1250+
12171251
int flb_engine_exit_status(struct flb_config *config, int status)
12181252
{
12191253
config->exit_status_code = status;

src/flb_storage.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,16 @@ int flb_storage_create(struct flb_config *ctx)
710710
return 0;
711711
}
712712

713+
void flb_storage_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks)
714+
{
715+
struct cio_stats storage_st;
716+
717+
cio_stats_get(ctx->cio, &storage_st);
718+
719+
*mem_chunks = storage_st.chunks_mem;
720+
*fs_chunks = storage_st.chunks_fs;
721+
}
722+
713723
void flb_storage_destroy(struct flb_config *ctx)
714724
{
715725
struct cio_ctx *cio;

0 commit comments

Comments
 (0)