Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -1188,8 +1188,10 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
static inline void flb_output_return(int ret, struct flb_coro *co) {
int n;
int pipe_fd;
int effective_records;
uint32_t set;
uint64_t val;
size_t effective_bytes;
struct flb_task *task;
struct flb_output_flush *out_flush;
struct flb_output_instance *o_ins;
Expand All @@ -1199,8 +1201,21 @@ static inline void flb_output_return(int ret, struct flb_coro *co) {
o_ins = out_flush->o_ins;
task = out_flush->task;

effective_records = 0;
effective_bytes = 0;
if (task->event_chunk != NULL) {
effective_records = task->event_chunk->total_events;
effective_bytes = task->event_chunk->size;
}

if (out_flush->processed_event_chunk != NULL) {
effective_records = out_flush->processed_event_chunk->total_events;
effective_bytes = out_flush->processed_event_chunk->size;
}

flb_task_acquire_lock(task);

flb_task_set_route_metrics(task, o_ins, effective_records, effective_bytes);
flb_task_deactivate_route(task, o_ins);

flb_task_release_lock(task);
Expand Down
50 changes: 50 additions & 0 deletions include/fluent-bit/flb_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@

struct flb_task_route {
int status;
int records;
size_t bytes;
struct flb_output_instance *out;
struct mk_list _head;
};
Expand Down Expand Up @@ -257,6 +259,54 @@ static FLB_INLINE void flb_task_set_route_status(
}
}

static FLB_INLINE void flb_task_set_route_metrics(
struct flb_task *task,
struct flb_output_instance *o_ins,
int records,
size_t bytes)
{
struct mk_list *iterator;
struct flb_task_route *route;

mk_list_foreach(iterator, &task->routes) {
route = mk_list_entry(iterator, struct flb_task_route, _head);

if (route->out == o_ins) {
route->records = records;
route->bytes = bytes;
break;
}
}
}

static FLB_INLINE int flb_task_get_route_metrics(
struct flb_task *task,
struct flb_output_instance *o_ins,
int *records,
size_t *bytes)
{
struct mk_list *iterator;
struct flb_task_route *route;

mk_list_foreach(iterator, &task->routes) {
route = mk_list_entry(iterator, struct flb_task_route, _head);

if (route->out == o_ins) {
if (records != NULL) {
*records = route->records;
}

if (bytes != NULL) {
*bytes = route->bytes;
}

return 0;
}
}

return -1;
}


static FLB_INLINE void flb_task_activate_route(
struct flb_task *task,
Expand Down
45 changes: 25 additions & 20 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,9 @@ static inline int handle_output_event(uint64_t ts,
int retry_seconds;
uint32_t type;
uint32_t key;
int effective_records;
double latency_seconds;
size_t effective_bytes;
char *in_name;
char *out_name;
struct flb_task *task;
Expand Down Expand Up @@ -340,6 +342,9 @@ static inline int handle_output_event(uint64_t ts,
}
in_name = (char *) flb_input_name(task->i_ins);
out_name = (char *) flb_output_name(ins);
effective_records = task->event_chunk->total_events;
effective_bytes = task->event_chunk->size;
flb_task_get_route_metrics(task, ins, &effective_records, &effective_bytes);

/* If we are in synchronous mode, flush the next waiting task */
if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) {
Expand All @@ -351,19 +356,19 @@ static inline int handle_output_event(uint64_t ts,
/* A task has finished, delete it */
if (ret == FLB_OK) {
/* cmetrics */
cmt_counter_add(ins->cmt_proc_records, ts, task->event_chunk->total_events,
cmt_counter_add(ins->cmt_proc_records, ts, effective_records,
1, (char *[]) {out_name});

cmt_counter_add(ins->cmt_proc_bytes, ts, task->event_chunk->size,
cmt_counter_add(ins->cmt_proc_bytes, ts, effective_bytes,
1, (char *[]) {out_name});

if (config->router && task->event_chunk->type == FLB_EVENT_TYPE_LOGS) {
cmt_counter_add(config->router->logs_records_total, ts,
task->event_chunk->total_events,
effective_records,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_bytes_total, ts,
task->event_chunk->size,
effective_bytes,
2, (char *[]) {in_name, out_name});
}

Expand All @@ -378,9 +383,9 @@ static inline int handle_output_event(uint64_t ts,
#ifdef FLB_HAVE_METRICS
if (ins->metrics) {
flb_metrics_sum(FLB_METRIC_OUT_OK_RECORDS,
task->event_chunk->total_events, ins->metrics);
effective_records, ins->metrics);
flb_metrics_sum(FLB_METRIC_OUT_OK_BYTES,
task->event_chunk->size, ins->metrics);
effective_bytes, ins->metrics);
}
#endif
/* Inform the user if a 'retry' succedeed */
Expand Down Expand Up @@ -416,17 +421,17 @@ static inline int handle_output_event(uint64_t ts,
handle_dlq_if_available(config, task, ins, 0);

/* cmetrics: output_dropped_records_total */
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
cmt_counter_add(ins->cmt_dropped_records, ts, effective_records,
1, (char *[]) {out_name});

if (config->router && task->event_chunk &&
task->event_chunk->type == FLB_EVENT_TYPE_LOGS) {
cmt_counter_add(config->router->logs_drop_records_total, ts,
task->records,
effective_records,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_drop_bytes_total, ts,
task->event_chunk->size,
effective_bytes,
2, (char *[]) {in_name, out_name});
}

Expand All @@ -436,7 +441,7 @@ static inline int handle_output_event(uint64_t ts,

/* OLD metrics API */
#ifdef FLB_HAVE_METRICS
flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, task->records, ins->metrics);
flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, effective_records, ins->metrics);
#endif
flb_info("[engine] chunk '%s' is not retried (no retry config): "
"task_id=%i, input=%s > output=%s (out_id=%i)",
Expand Down Expand Up @@ -465,17 +470,17 @@ static inline int handle_output_event(uint64_t ts,

/* cmetrics */
cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {out_name});
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
cmt_counter_add(ins->cmt_dropped_records, ts, effective_records,
1, (char *[]) {out_name});

if (config->router && task->event_chunk &&
task->event_chunk->type == FLB_EVENT_TYPE_LOGS) {
cmt_counter_add(config->router->logs_drop_records_total, ts,
task->records,
effective_records,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_drop_bytes_total, ts,
task->event_chunk->size,
effective_bytes,
2, (char *[]) {in_name, out_name});
}

Expand All @@ -486,7 +491,7 @@ static inline int handle_output_event(uint64_t ts,
/* OLD metrics API */
#ifdef FLB_HAVE_METRICS
flb_metrics_sum(FLB_METRIC_OUT_RETRY_FAILED, 1, ins->metrics);
flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, task->records, ins->metrics);
flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, effective_records, ins->metrics);
#endif
/* Notify about this failed retry */
flb_error("[engine] chunk '%s' cannot be retried: "
Expand Down Expand Up @@ -538,7 +543,7 @@ static inline int handle_output_event(uint64_t ts,

/* cmetrics */
cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {out_name});
cmt_counter_add(ins->cmt_retried_records, ts, task->records,
cmt_counter_add(ins->cmt_retried_records, ts, effective_records,
1, (char *[]) {out_name});

cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts,
Expand All @@ -548,25 +553,25 @@ static inline int handle_output_event(uint64_t ts,
/* OLD metrics API: update the metrics since a new retry is coming */
#ifdef FLB_HAVE_METRICS
flb_metrics_sum(FLB_METRIC_OUT_RETRY, 1, ins->metrics);
flb_metrics_sum(FLB_METRIC_OUT_RETRIED_RECORDS, task->records, ins->metrics);
flb_metrics_sum(FLB_METRIC_OUT_RETRIED_RECORDS, effective_records, ins->metrics);
#endif
}
}
else if (ret == FLB_ERROR) {
handle_dlq_if_available(config, task, ins, 0);
/* cmetrics */
cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {out_name});
cmt_counter_add(ins->cmt_dropped_records, ts, task->records,
cmt_counter_add(ins->cmt_dropped_records, ts, effective_records,
1, (char *[]) {out_name});

if (config->router && task->event_chunk &&
task->event_chunk->type == FLB_EVENT_TYPE_LOGS) {
cmt_counter_add(config->router->logs_drop_records_total, ts,
task->records,
effective_records,
2, (char *[]) {in_name, out_name});

cmt_counter_add(config->router->logs_drop_bytes_total, ts,
task->event_chunk->size,
effective_bytes,
2, (char *[]) {in_name, out_name});
}

Expand All @@ -577,7 +582,7 @@ static inline int handle_output_event(uint64_t ts,
/* OLD API */
#ifdef FLB_HAVE_METRICS
flb_metrics_sum(FLB_METRIC_OUT_ERROR, 1, ins->metrics);
flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, task->records, ins->metrics);
flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, effective_records, ins->metrics);
#endif

flb_task_retry_clean(task, ins);
Expand Down
6 changes: 6 additions & 0 deletions src/flb_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,8 @@ struct flb_task *flb_task_create(uint64_t ref_id,
}

route->status = FLB_TASK_ROUTE_INACTIVE;
route->records = task->event_chunk->total_events;
route->bytes = task->event_chunk->size;
route->out = stored_matches[stored_match_index];
mk_list_add(&route->_head, &task->routes);
direct_count++;
Expand Down Expand Up @@ -810,6 +812,8 @@ struct flb_task *flb_task_create(uint64_t ref_id,
}

route->status = FLB_TASK_ROUTE_INACTIVE;
route->records = task->event_chunk->total_events;
route->bytes = task->event_chunk->size;
route->out = o_ins;
mk_list_add(&route->_head, &task->routes);
direct_count++;
Expand Down Expand Up @@ -856,6 +860,8 @@ struct flb_task *flb_task_create(uint64_t ref_id,
}

route->status = FLB_TASK_ROUTE_INACTIVE;
route->records = task->event_chunk->total_events;
route->bytes = task->event_chunk->size;
route->out = o_ins;
mk_list_add(&route->_head, &task->routes);
count++;
Expand Down
5 changes: 5 additions & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ if (FLB_PROCESSOR_CONTENT_MODIFIER)
FLB_RT_TEST(FLB_PROCESSOR_CONTENT_MODIFIER "processor_content_modifier.c")
endif()

if (FLB_IN_DUMMY AND FLB_IN_FLUENTBIT_METRICS AND FLB_OUT_LIB AND
FLB_OUT_STDOUT AND FLB_FILTER_LUA)
FLB_RT_CORE_TEST(ON "processor_output_counters.c")
endif()

# HTTP Client Debug (requires -DFLB_HTTP_CLIENT_DEBUG=On)
if(FLB_HTTP_CLIENT_DEBUG)
FLB_RT_TEST(FLB_OUT_TD "http_callbacks.c")
Expand Down
Loading