-
Notifications
You must be signed in to change notification settings - Fork 1.9k
in_systemd: allow a parser to be specified as part of the systemd unit #11353
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,8 @@ | |
| #include <fluent-bit/flb_input_plugin.h> | ||
| #include <fluent-bit/flb_config.h> | ||
| #include <fluent-bit/flb_time.h> | ||
| #include <fluent-bit/flb_parser.h> | ||
| #include <fluent-bit/flb_log_event_decoder.h> | ||
|
|
||
| #include "systemd_config.h" | ||
| #include "systemd_db.h" | ||
|
|
@@ -70,6 +72,65 @@ static int tag_compose(const char *tag, const char *unit_name, | |
| return 0; | ||
| } | ||
|
|
||
| /* Helper function to unpack and repack msgpack data from parser output */ | ||
| static int flb_systemd_repack_map(struct flb_log_event_encoder *encoder, | ||
| char *data, | ||
| size_t data_size) | ||
| { | ||
| msgpack_unpacked source_map; | ||
| size_t offset; | ||
| int result; | ||
| size_t index; | ||
| msgpack_object value; | ||
| msgpack_object key; | ||
|
|
||
| result = FLB_EVENT_ENCODER_SUCCESS; | ||
|
|
||
| if (data_size > 0) { | ||
| msgpack_unpacked_init(&source_map); | ||
|
|
||
| offset = 0; | ||
| result = msgpack_unpack_next(&source_map, | ||
| data, | ||
| data_size, | ||
| &offset); | ||
|
|
||
| if (result == MSGPACK_UNPACK_SUCCESS) { | ||
| if (source_map.data.type == MSGPACK_OBJECT_MAP) { | ||
| result = FLB_EVENT_ENCODER_SUCCESS; | ||
| } | ||
| else { | ||
| result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; | ||
| } | ||
| } | ||
| else { | ||
| result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; | ||
| } | ||
|
|
||
| for (index = 0; | ||
| index < source_map.data.via.map.size && | ||
| result == FLB_EVENT_ENCODER_SUCCESS; | ||
| index++) { | ||
| key = source_map.data.via.map.ptr[index].key; | ||
| value = source_map.data.via.map.ptr[index].val; | ||
|
|
||
| result = flb_log_event_encoder_append_body_msgpack_object( | ||
| encoder, | ||
| &key); | ||
|
|
||
| if (result == FLB_EVENT_ENCODER_SUCCESS) { | ||
| result = flb_log_event_encoder_append_body_msgpack_object( | ||
| encoder, | ||
| &value); | ||
| } | ||
| } | ||
|
|
||
| msgpack_unpacked_destroy(&source_map); | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| static int append_enumerate_data(struct flb_systemd_config *ctx, struct cfl_kvlist *kvlist) | ||
| { | ||
| int i; | ||
|
|
@@ -129,7 +190,10 @@ static int systemd_enumerate_data_store(struct flb_config *config, | |
| struct flb_input_instance *ins, | ||
| void *plugin_context, | ||
| void *format_context, | ||
| const void *data, size_t data_size) | ||
| const void *data, size_t data_size, | ||
| struct flb_parser *parser, | ||
| void **out_buf, size_t *out_size, | ||
| struct flb_time *out_time) | ||
| { | ||
| int i; | ||
| int len; | ||
|
|
@@ -155,6 +219,25 @@ static int systemd_enumerate_data_store(struct flb_config *config, | |
|
|
||
| len = (sep - key); | ||
| key_len = len; | ||
|
|
||
| /* Skip FLUENT_BIT_PARSER field - it's metadata, not log content | ||
| * Return -1 so it doesn't count toward max_fields */ | ||
| if (strncmp(key, "FLUENT_BIT_PARSER", key_len) == 0) { | ||
| return -1; | ||
| } | ||
|
|
||
| /* If this is MESSAGE field and parser is specified, apply parser */ | ||
| if (parser && strncmp(key, "MESSAGE", key_len) == 0) { | ||
| val = sep + 1; | ||
| len = length - (sep - key) - 1; | ||
| int ret_parser = flb_parser_do(parser, val, len, out_buf, out_size, out_time); | ||
| if (ret_parser != -1) { | ||
| /* Return special code to indicate parsed content should be added */ | ||
| return -3; | ||
| } | ||
| /* If parser failed, continue with unparsed message */ | ||
| } | ||
|
|
||
| list_key = flb_sds_create_len(key, key_len); | ||
|
|
||
| if (!list_key) { | ||
|
|
@@ -261,19 +344,23 @@ static int in_systemd_collect(struct flb_input_instance *ins, | |
| long nsec; | ||
| uint64_t usec; | ||
| size_t length; | ||
| size_t plength; | ||
| const char *key; | ||
| #ifdef FLB_HAVE_SQLDB | ||
| char *cursor = NULL; | ||
| #endif | ||
| char *tag = NULL; | ||
| char *name; | ||
| char new_tag[PATH_MAX]; | ||
| char last_tag[PATH_MAX] = {0}; | ||
| size_t tag_len; | ||
| size_t last_tag_len = 0; | ||
| const void *data; | ||
| void *pbuf = NULL; | ||
| struct flb_systemd_config *ctx = in_context; | ||
| struct flb_time tm; | ||
| struct cfl_kvlist *kvlist = NULL; | ||
| struct flb_parser *parser; | ||
|
|
||
| /* Restricted by mem_buf_limit */ | ||
| if (flb_input_buf_paused(ins) == FLB_TRUE) { | ||
|
|
@@ -337,6 +424,23 @@ static int in_systemd_collect(struct flb_input_instance *ins, | |
| tag_len = ctx->ins->tag_len; | ||
| } | ||
|
|
||
| /* Find the parser, if specified */ | ||
| parser = NULL; | ||
| ret = sd_journal_get_data(ctx->j, "FLUENT_BIT_PARSER", &data, &length); | ||
| if (ret == 0) { | ||
| name = flb_strndup((const char *)(data+18), length-18); | ||
| if (name == NULL) { | ||
| flb_plg_error(ctx->ins, "failed to allocate parser name"); | ||
| } | ||
| else { | ||
| parser = flb_parser_get(name, config); | ||
| if (!parser) { | ||
| flb_plg_error(ctx->ins, "no such parser: '%s'", name); | ||
| } | ||
| flb_free(name); | ||
| } | ||
| } | ||
|
|
||
| if (last_tag_len == 0) { | ||
| strncpy(last_tag, tag, tag_len); | ||
| last_tag_len = tag_len; | ||
|
|
@@ -412,11 +516,22 @@ static int in_systemd_collect(struct flb_input_instance *ins, | |
|
|
||
| ret = systemd_enumerate_data_store(config, ctx->ins, | ||
| (void *)ctx, (void *)kvlist, | ||
| key, length); | ||
| key, length, parser, &pbuf, &plength, &tm); | ||
| if (ret == -2) { | ||
| skip_entries++; | ||
| continue; | ||
| } | ||
| else if (ret == -3) { | ||
| /* Parsed content - add it to encoder as msgpack */ | ||
| ret = flb_systemd_repack_map(ctx->log_encoder, pbuf, plength); | ||
| flb_free(pbuf); | ||
| pbuf = NULL; | ||
| if (ret != FLB_EVENT_ENCODER_SUCCESS) { | ||
| continue; | ||
| } | ||
| entries++; | ||
| continue; | ||
| } | ||
|
Comment on lines
+524
to
+534
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add NULL check before repacking parsed content.
🐛 Proposed fix to add defensive NULL check else if (ret == -3) {
/* Parsed content - add it to encoder as msgpack */
+ if (pbuf != NULL) {
- ret = flb_systemd_repack_map(ctx->log_encoder, pbuf, plength);
- flb_free(pbuf);
- pbuf = NULL;
- if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ ret = flb_systemd_repack_map(ctx->log_encoder, pbuf, plength);
+ flb_free(pbuf);
+ pbuf = NULL;
+ if (ret != FLB_EVENT_ENCODER_SUCCESS) {
+ continue;
+ }
+ entries++;
+ }
+ else {
+ flb_plg_error(ctx->ins, "parser succeeded but returned NULL buffer");
continue;
}
- entries++;
continue;
}🤖 Prompt for AI Agents |
||
| else if (ret == -1) { | ||
| continue; | ||
| } | ||
|
|
@@ -688,7 +803,8 @@ static int cb_systemd_format_test(struct flb_config *config, | |
| cur = cfl_list_entry(head, struct cfl_split_entry, _head); | ||
| ret = systemd_enumerate_data_store(config, ctx->ins, | ||
| (void *)ctx, (void *)kvlist, | ||
| cur->value, cur->len); | ||
| cur->value, cur->len, | ||
| NULL, NULL, NULL, &tm); | ||
|
|
||
| if (ret == -2 || ret == -1) { | ||
| continue; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.