diff --git a/plugins/in_systemd/systemd.c b/plugins/in_systemd/systemd.c index 073b5a5c072..1279458df28 100644 --- a/plugins/in_systemd/systemd.c +++ b/plugins/in_systemd/systemd.c @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include "systemd_config.h" #include "systemd_db.h" @@ -70,6 +72,65 @@ static int tag_compose(const char *tag, const char *unit_name, return 0; } +/* Helper function to unpack and repack msgpack data from parser output */ +static int flb_systemd_repack_map(struct flb_log_event_encoder *encoder, + char *data, + size_t data_size) +{ + msgpack_unpacked source_map; + size_t offset; + int result; + size_t index; + msgpack_object value; + msgpack_object key; + + result = FLB_EVENT_ENCODER_SUCCESS; + + if (data_size > 0) { + msgpack_unpacked_init(&source_map); + + offset = 0; + result = msgpack_unpack_next(&source_map, + data, + data_size, + &offset); + + if (result == MSGPACK_UNPACK_SUCCESS) { + if (source_map.data.type == MSGPACK_OBJECT_MAP) { + result = FLB_EVENT_ENCODER_SUCCESS; + } + else { + result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; + } + } + else { + result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; + } + + for (index = 0; + index < source_map.data.via.map.size && + result == FLB_EVENT_ENCODER_SUCCESS; + index++) { + key = source_map.data.via.map.ptr[index].key; + value = source_map.data.via.map.ptr[index].val; + + result = flb_log_event_encoder_append_body_msgpack_object( + encoder, + &key); + + if (result == FLB_EVENT_ENCODER_SUCCESS) { + result = flb_log_event_encoder_append_body_msgpack_object( + encoder, + &value); + } + } + + msgpack_unpacked_destroy(&source_map); + } + + return result; +} + static int append_enumerate_data(struct flb_systemd_config *ctx, struct cfl_kvlist *kvlist) { int i; @@ -129,7 +190,10 @@ static int systemd_enumerate_data_store(struct flb_config *config, struct flb_input_instance *ins, void *plugin_context, void *format_context, - const void *data, size_t data_size) + const void *data, size_t data_size, + struct flb_parser *parser, + void **out_buf, size_t *out_size, + struct flb_time *out_time) { int i; int len; @@ -155,6 +219,25 @@ static int systemd_enumerate_data_store(struct flb_config *config, len = (sep - key); key_len = len; + + /* Skip FLUENT_BIT_PARSER field - it's metadata, not log content + * Return -1 so it doesn't count toward max_fields */ + if (strncmp(key, "FLUENT_BIT_PARSER", key_len) == 0) { + return -1; + } + + /* If this is MESSAGE field and parser is specified, apply parser */ + if (parser && strncmp(key, "MESSAGE", key_len) == 0) { + val = sep + 1; + len = length - (sep - key) - 1; + int ret_parser = flb_parser_do(parser, val, len, out_buf, out_size, out_time); + if (ret_parser != -1) { + /* Return special code to indicate parsed content should be added */ + return -3; + } + /* If parser failed, continue with unparsed message */ + } + list_key = flb_sds_create_len(key, key_len); if (!list_key) { @@ -261,19 +344,23 @@ static int in_systemd_collect(struct flb_input_instance *ins, long nsec; uint64_t usec; size_t length; + size_t plength; const char *key; #ifdef FLB_HAVE_SQLDB char *cursor = NULL; #endif char *tag = NULL; + char *name; char new_tag[PATH_MAX]; char last_tag[PATH_MAX] = {0}; size_t tag_len; size_t last_tag_len = 0; const void *data; + void *pbuf = NULL; struct flb_systemd_config *ctx = in_context; struct flb_time tm; struct cfl_kvlist *kvlist = NULL; + struct flb_parser *parser; /* Restricted by mem_buf_limit */ if (flb_input_buf_paused(ins) == FLB_TRUE) { @@ -337,6 +424,23 @@ static int in_systemd_collect(struct flb_input_instance *ins, tag_len = ctx->ins->tag_len; } + /* Find the parser, if specified */ + parser = NULL; + ret = sd_journal_get_data(ctx->j, "FLUENT_BIT_PARSER", &data, &length); + if (ret == 0) { + name = flb_strndup((const char *)(data+18), length-18); + if (name == NULL) { + flb_plg_error(ctx->ins, "failed to allocate parser name"); + } + else { + parser = flb_parser_get(name, config); + if (!parser) { + flb_plg_error(ctx->ins, "no such parser: '%s'", name); + } + flb_free(name); + } + } + if (last_tag_len == 0) { strncpy(last_tag, tag, tag_len); last_tag_len = tag_len; @@ -412,11 +516,22 @@ static int in_systemd_collect(struct flb_input_instance *ins, ret = systemd_enumerate_data_store(config, ctx->ins, (void *)ctx, (void *)kvlist, - key, length); + key, length, parser, &pbuf, &plength, &tm); if (ret == -2) { skip_entries++; continue; } + else if (ret == -3) { + /* Parsed content - add it to encoder as msgpack */ + ret = flb_systemd_repack_map(ctx->log_encoder, pbuf, plength); + flb_free(pbuf); + pbuf = NULL; + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + continue; + } + entries++; + continue; + } else if (ret == -1) { continue; } @@ -688,7 +803,8 @@ static int cb_systemd_format_test(struct flb_config *config, cur = cfl_list_entry(head, struct cfl_split_entry, _head); ret = systemd_enumerate_data_store(config, ctx->ins, (void *)ctx, (void *)kvlist, - cur->value, cur->len); + cur->value, cur->len, + NULL, NULL, NULL, &tm); if (ret == -2 || ret == -1) { continue;