Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: profile signal path addition #9748

Merged
merged 18 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
d1fd492
build: added profile ingestion file
leonardo-albertovich Dec 18, 2024
5d60350
input_profiles: initial commit
leonardo-albertovich Dec 18, 2024
bc1b3fc
event: added profile signal type
leonardo-albertovich Dec 18, 2024
4c6845a
input: added profile event support
leonardo-albertovich Dec 18, 2024
562bbe4
input_chunk: added profile event support
leonardo-albertovich Dec 18, 2024
7cf2cef
input_event: added profile signal type
leonardo-albertovich Dec 18, 2024
403f0ec
output: added profile event support
leonardo-albertovich Dec 18, 2024
c323c63
processor: added profile event support
leonardo-albertovich Dec 18, 2024
f8fb651
router: added profile event support
leonardo-albertovich Dec 18, 2024
cad28cb
in_opentelemetry: added proper profile event ingestion support
leonardo-albertovich Dec 18, 2024
b1f120f
out_stdout: added profile event support
leonardo-albertovich Dec 18, 2024
01f82a2
out_opentelemetry: profiles support added
leonardo-albertovich Dec 19, 2024
992ee77
in_opentelemetry: restored previous default profiles as text behavior
leonardo-albertovich Dec 19, 2024
15d1bc5
output: fixed constant that was meant to be a bit mask
leonardo-albertovich Dec 19, 2024
b6a778b
processor: fixed constant that was meant to be a bit mask
leonardo-albertovich Dec 19, 2024
e67d47d
out_opentelemetry: renamed option as requested
leonardo-albertovich Dec 19, 2024
42f5d58
output: added missing error reporting macro calls
leonardo-albertovich Dec 19, 2024
c9d269d
in_opentelemetry: renamed option I missed in the previous commit
leonardo-albertovich Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#define FLB_EVENT_TYPE_LOGS FLB_INPUT_CHUNK_TYPE_LOGS
#define FLB_EVENT_TYPE_METRICS FLB_INPUT_CHUNK_TYPE_METRICS
#define FLB_EVENT_TYPE_TRACES FLB_INPUT_CHUNK_TYPE_TRACES
#define FLB_EVENT_TYPE_PROFILES FLB_INPUT_CHUNK_TYPE_PROFILES
#define FLB_EVENT_TYPE_BLOBS FLB_INPUT_CHUNK_TYPE_BLOBS

#define FLB_EVENT_TYPE_HAS_TRACE FLB_INPUT_CHUNK_HAS_TRACE
Expand Down
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <fluent-bit/flb_input_log.h>
#include <fluent-bit/flb_input_metric.h>
#include <fluent-bit/flb_input_trace.h>
#include <fluent-bit/flb_input_profiles.h>
#include <fluent-bit/flb_config_format.h>
#include <fluent-bit/flb_processor.h>

Expand Down Expand Up @@ -429,6 +430,7 @@ struct flb_input_instance {
struct flb_hash_table *ht_log_chunks;
struct flb_hash_table *ht_metric_chunks;
struct flb_hash_table *ht_trace_chunks;
struct flb_hash_table *ht_profile_chunks;

/* TLS settings */
int use_tls; /* bool, try to use TLS for I/O */
Expand Down
3 changes: 2 additions & 1 deletion include/fluent-bit/flb_input_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@
#define FLB_INPUT_CHUNK_MAGIC_BYTE_0 (unsigned char) 0xF1
#define FLB_INPUT_CHUNK_MAGIC_BYTE_1 (unsigned char) 0x77

/* Chunk types: Log, Metrics and Traces are supported */
/* Chunk types: Log, Metrics, Traces, Profiles and Blobs are supported */
#define FLB_INPUT_CHUNK_TYPE_LOGS 0
#define FLB_INPUT_CHUNK_TYPE_METRICS 1
#define FLB_INPUT_CHUNK_TYPE_TRACES 2
#define FLB_INPUT_CHUNK_TYPE_BLOBS 3
#define FLB_INPUT_CHUNK_TYPE_PROFILES 4

#ifdef FLB_HAVE_CHUNK_TRACE
#define FLB_INPUT_CHUNK_HAS_TRACE 1 << 31
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_input_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@
#define FLB_INPUT_METRICS 1
#define FLB_INPUT_TRACES 2
#define FLB_INPUT_BLOBS 3
#define FLB_INPUT_PROFILES 4

#endif
36 changes: 36 additions & 0 deletions include/fluent-bit/flb_input_profiles.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2024 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef FLB_INPUT_PROFILES_H
#define FLB_INPUT_PROFILES_H

#include <fluent-bit/flb_info.h>
#include <cprofiles/cprofiles.h>

int flb_input_profiles_append(struct flb_input_instance *ins,
const char *tag, size_t tag_len,
struct cprof *profiles_context);

int flb_input_profiles_append_skip_processor_stages(
struct flb_input_instance *ins,
size_t processor_starting_stage,
const char *tag, size_t tag_len,
struct cprof *profiles_context);

#endif
117 changes: 117 additions & 0 deletions include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
#include <ctraces/ctr_encode_msgpack.h>
#include <ctraces/ctr_mpack_utils_defs.h>

#include <cprofiles/cprofiles.h>
#include <cprofiles/cprof_decode_msgpack.h>
#include <cprofiles/cprof_encode_msgpack.h>
#include <cprofiles/cprof_mpack_utils_defs.h>

#ifdef FLB_HAVE_REGEX
#include <fluent-bit/flb_regex.h>
#endif
Expand Down Expand Up @@ -90,6 +95,7 @@ int flb_chunk_trace_output(struct flb_chunk_trace *trace, struct flb_output_inst
#define FLB_OUTPUT_METRICS 2
#define FLB_OUTPUT_TRACES 4
#define FLB_OUTPUT_BLOBS 8
#define FLB_OUTPUT_PROFILES 16

#define FLB_OUTPUT_FLUSH_COMPAT_OLD_18() \
const void *data = event_chunk->data; \
Expand Down Expand Up @@ -702,17 +708,20 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
struct flb_event_chunk *tmp;
char *resized_serialization_buffer;
size_t serialization_buffer_offset;
cfl_sds_t serialized_profiles_context_buffer;
char *serialized_context_buffer;
size_t serialized_context_size;
struct cmt *metrics_context;
struct ctrace *trace_context;
struct cprof *profile_context;
size_t chunk_offset;
struct cmt *cmt_out_context = NULL;

/* Custom output coroutine info */
out_flush = (struct flb_output_flush *) flb_calloc(1, sizeof(struct flb_output_flush));
if (!out_flush) {
flb_errno();

return NULL;
}

Expand Down Expand Up @@ -766,6 +775,8 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
p_buf = flb_calloc(evc->size * 2, sizeof(char));

if (p_buf == NULL) {
flb_errno();

flb_coro_destroy(coro);
flb_free(out_flush);

Expand Down Expand Up @@ -825,6 +836,8 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
flb_realloc(p_buf, p_size + serialized_context_size);

if (resized_serialization_buffer == NULL) {
flb_errno();

cmt_encode_msgpack_destroy(serialized_context_buffer);
flb_coro_destroy(coro);
flb_free(out_flush);
Expand Down Expand Up @@ -875,6 +888,8 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
p_buf = flb_calloc(evc->size * 2, sizeof(char));

if (p_buf == NULL) {
flb_errno();

flb_coro_destroy(coro);
flb_free(out_flush);

Expand Down Expand Up @@ -922,6 +937,8 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
flb_realloc(p_buf, p_size + serialized_context_size);

if (resized_serialization_buffer == NULL) {
flb_errno();

ctr_encode_msgpack_destroy(serialized_context_buffer);
flb_coro_destroy(coro);
flb_free(out_flush);
Expand Down Expand Up @@ -952,6 +969,106 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task,
return NULL;
}

out_flush->processed_event_chunk = flb_event_chunk_create(
evc->type,
0,
evc->tag,
flb_sds_len(evc->tag),
p_buf,
p_size);

if (out_flush->processed_event_chunk == NULL) {
flb_coro_destroy(coro);
flb_free(out_flush);
flb_free(p_buf);

return NULL;
}
}
else if (evc->type == FLB_EVENT_TYPE_PROFILES) {
p_buf = flb_calloc(evc->size * 2, sizeof(char));

leonardo-albertovich marked this conversation as resolved.
Show resolved Hide resolved
if (p_buf == NULL) {
flb_errno();

flb_coro_destroy(coro);
flb_free(out_flush);

return NULL;
}

p_size = evc->size;

chunk_offset = 0;
serialization_buffer_offset = 0;

while ((ret = cprof_decode_msgpack_create(
&profile_context,
(unsigned char *) evc->data,
evc->size,
&chunk_offset)) == CPROF_DECODE_MSGPACK_SUCCESS) {
ret = flb_processor_run(o_ins->processor,
0,
FLB_PROCESSOR_PROFILES,
evc->tag,
flb_sds_len(evc->tag),
(char *) profile_context,
0,
NULL,
NULL);

if (ret == 0) {
ret = cprof_encode_msgpack_create(&serialized_profiles_context_buffer,
profile_context);

cprof_destroy(profile_context);

if (ret != 0) {
flb_coro_destroy(coro);
flb_free(out_flush);
flb_free(p_buf);

return NULL;
}

if ((serialization_buffer_offset +
cfl_sds_len(serialized_profiles_context_buffer)) > p_size) {
resized_serialization_buffer = \
flb_realloc(p_buf, p_size + cfl_sds_len(serialized_profiles_context_buffer));

leonardo-albertovich marked this conversation as resolved.
Show resolved Hide resolved
if (resized_serialization_buffer == NULL) {
flb_errno();

cprof_encode_msgpack_destroy(serialized_profiles_context_buffer);
flb_coro_destroy(coro);
flb_free(out_flush);
flb_free(p_buf);

return NULL;
}

p_size += cfl_sds_len(serialized_profiles_context_buffer);
p_buf = resized_serialization_buffer;
}

memcpy(&(((char *) p_buf)[serialization_buffer_offset]),
serialized_profiles_context_buffer,
cfl_sds_len(serialized_profiles_context_buffer));

serialization_buffer_offset += cfl_sds_len(serialized_profiles_context_buffer);

cprof_encode_msgpack_destroy(serialized_profiles_context_buffer);
}
}

if (serialization_buffer_offset == 0) {
flb_coro_destroy(coro);
flb_free(out_flush);
flb_free(p_buf);

return NULL;
}

out_flush->processed_event_chunk = flb_event_chunk_create(
evc->type,
0,
Expand Down
10 changes: 9 additions & 1 deletion include/fluent-bit/flb_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <ctraces/ctraces.h>
#include <cmetrics/cmetrics.h>
#include <cprofiles/cprofiles.h>

/* Processor plugin result values */
#define FLB_PROCESSOR_SUCCESS 0
Expand All @@ -37,6 +38,7 @@
#define FLB_PROCESSOR_LOGS 1
#define FLB_PROCESSOR_METRICS 2
#define FLB_PROCESSOR_TRACES 4
#define FLB_PROCESSOR_PROFILES 8

/* Type of processor unit: 'pipeline filter' or 'native unit' */
#define FLB_PROCESSOR_UNIT_NATIVE 0
Expand Down Expand Up @@ -93,7 +95,7 @@ struct flb_processor_unit {
*/
struct mk_list unused_list;

/* link to struct flb_processor->(logs, metrics, traces) list */
/* link to struct flb_processor->(logs, metrics, traces, profiles) list */
struct mk_list _head;

/* link to parent processor */
Expand All @@ -110,6 +112,7 @@ struct flb_processor {
struct mk_list logs;
struct mk_list metrics;
struct mk_list traces;
struct mk_list profiles;

size_t stage_count;
/*
Expand Down Expand Up @@ -155,6 +158,11 @@ struct flb_processor_plugin {
const char *,
int);

int (*cb_process_profiles) (struct flb_processor_instance *,
struct cprof *,
const char *,
int);

int (*cb_exit) (struct flb_processor_instance *, void *);

/* Notification: this callback will be invoked anytime a notification is received*/
Expand Down
4 changes: 4 additions & 0 deletions include/fluent-bit/flb_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ static inline int flb_router_match_type(int in_event_type,
!(o_ins->event_type & FLB_OUTPUT_TRACES)) {
return FLB_FALSE;
}
else if (in_event_type == FLB_INPUT_PROFILES &&
!(o_ins->event_type & FLB_OUTPUT_PROFILES)) {
return FLB_FALSE;
}
else if (in_event_type == FLB_INPUT_BLOBS &&
!(o_ins->event_type & FLB_OUTPUT_BLOBS)) {
return FLB_FALSE;
Expand Down
5 changes: 5 additions & 0 deletions plugins/in_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ static struct flb_config_map config_map[] = {
"feel free to test it but please do not enable this in production " \
"environments"
},
{
FLB_CONFIG_MAP_BOOL, "encode_profiles_as_log", "true",
0, FLB_TRUE, offsetof(struct flb_opentelemetry, encode_profiles_as_log),
"Encode profiles received as text and ingest them in the logging pipeline"
},

{
FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE,
Expand Down
1 change: 1 addition & 0 deletions plugins/in_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ struct flb_opentelemetry {
int tag_from_uri;
flb_sds_t logs_metadata_key;
int profile_support_enabled;
int encode_profiles_as_log;

struct flb_input_instance *ins;

Expand Down
Loading
Loading