diff --git a/plugins/in_tcp/tcp_conn.c b/plugins/in_tcp/tcp_conn.c index bac640b0887..406df726b40 100644 --- a/plugins/in_tcp/tcp_conn.c +++ b/plugins/in_tcp/tcp_conn.c @@ -189,6 +189,7 @@ static ssize_t parse_payload_none(struct tcp_conn *conn) char *buf; char *s; char *separator; + char *source_address; struct flb_in_tcp_config *ctx; ctx = conn->ctx; @@ -214,10 +215,25 @@ static ssize_t parse_payload_none(struct tcp_conn *conn) } if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = flb_log_event_encoder_append_body_values( + source_address = NULL; + if (ctx->source_address_key != NULL) { + source_address = flb_connection_get_remote_address(conn->connection); + } + + if (ctx->source_address_key != NULL && source_address != NULL) { + ret = flb_log_event_encoder_append_body_values( + ctx->log_encoder, + FLB_LOG_EVENT_CSTRING_VALUE("log"), + FLB_LOG_EVENT_STRING_VALUE(buf, len), + FLB_LOG_EVENT_CSTRING_VALUE(ctx->source_address_key), + FLB_LOG_EVENT_CSTRING_VALUE(source_address)); + } + else { + ret = flb_log_event_encoder_append_body_values( ctx->log_encoder, FLB_LOG_EVENT_CSTRING_VALUE("log"), FLB_LOG_EVENT_STRING_VALUE(buf, len)); + } } if (ret == FLB_EVENT_ENCODER_SUCCESS) { diff --git a/tests/runtime/in_tcp.c b/tests/runtime/in_tcp.c index 732af004198..f42f38b1a20 100644 --- a/tests/runtime/in_tcp.c +++ b/tests/runtime/in_tcp.c @@ -547,6 +547,61 @@ void flb_test_format_none_separator() test_ctx_destroy(ctx); } +void flb_test_format_none_with_source_address() +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + flb_sockfd_t fd; + int ret; + int num; + ssize_t w_size; + char *buf = "message\n"; + size_t size = strlen(buf); + clear_output_num(); + cb_data.cb = cb_check_result_json; + cb_data.data = "\"log\":\"message\",\"source_host\":\"tcp://"; + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "source_address_key", "source_host", + NULL); + TEST_CHECK(ret == 0); + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "format", "none", + NULL); + TEST_CHECK(ret == 0); + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + /* use default host/port */ + fd = connect_tcp(NULL, -1); + if (!TEST_CHECK(fd >= 0)) { + exit(EXIT_FAILURE); + } + w_size = send(fd, buf, size, 0); + if (!TEST_CHECK(w_size == size)) { + TEST_MSG("failed to send, errno=%d", errno); + flb_socket_close(fd); + exit(EXIT_FAILURE); + } + /* waiting to flush */ + flb_time_msleep(1500); + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + flb_socket_close(fd); + test_ctx_destroy(ctx); +} + /* * Ingest 64k records. * https://github.com/fluent/fluent-bit/issues/5336 @@ -619,6 +674,7 @@ TEST_LIST = { {"tcp_with_tls", flb_test_tcp_with_tls}, {"format_none", flb_test_format_none}, {"format_none_separator", flb_test_format_none_separator}, + {"format_none_with_source_address", flb_test_format_none_with_source_address}, {"65535_records_issue_5336", flb_test_issue_5336}, {NULL, NULL} };