Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 133 additions & 1 deletion plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
#include <cfl/cfl.h>
#include <fluent-otel-proto/fluent-otel.h>

#include <cmetrics/cmetrics.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_zstd.h>
#include <fluent-bit/flb_hash_table.h>

#include <cmetrics/cmetrics.h>
#include <cmetrics/cmt_encode_opentelemetry.h>

#include <ctraces/ctraces.h>
Expand Down Expand Up @@ -75,6 +77,115 @@ static int is_http_status_code_retrayable(int http_code)
return FLB_FALSE;
}

static int opentelemetry_is_grpc_status_retryable(int status_code)
{
if (status_code == 1 || /* CANCELLED */
status_code == 4 || /* DEADLINE_EXCEEDED */
status_code == 8 || /* RESOURCE_EXHAUSTED */
status_code == 10 || /* ABORTED */
status_code == 13 || /* INTERNAL */
status_code == 14) { /* UNAVAILABLE */
return FLB_TRUE;
}

return FLB_FALSE;
}

static int opentelemetry_lookup_header_value(struct flb_hash_table *table,
const char *name,
cfl_sds_t *out_value)
{
void *value;
size_t value_length;
int result;

if (table == NULL) {
return FLB_FALSE;
}

result = flb_hash_table_get(table,
name,
strlen(name),
&value,
&value_length);

if (result == -1) {
return FLB_FALSE;
}

*out_value = cfl_sds_create_len((const char *) value, value_length);

if (*out_value == NULL) {
return FLB_FALSE;
}

return FLB_TRUE;
}

static int opentelemetry_check_grpc_status(struct opentelemetry_context *ctx,
struct flb_http_response *response)
{
cfl_sds_t grpc_message;
cfl_sds_t grpc_status_text;
int grpc_status;
int result;

grpc_message = NULL;
grpc_status_text = NULL;
grpc_status = 0;
result = FLB_OK;

/* ref: https://grpc.io/docs/guides/status-codes/ */
if (opentelemetry_lookup_header_value(response->trailer_headers,
"grpc-status",
&grpc_status_text) == FLB_FALSE &&
opentelemetry_lookup_header_value(response->headers,
"grpc-status",
&grpc_status_text) == FLB_FALSE) {

return FLB_OK;
}

grpc_status = strtol(grpc_status_text, NULL, 10);

if (opentelemetry_lookup_header_value(response->trailer_headers,
"grpc-message",
&grpc_message) == FLB_FALSE) {
opentelemetry_lookup_header_value(response->headers,
"grpc-message",
&grpc_message);
}

if (grpc_status != 0) {
if (grpc_message != NULL) {
flb_plg_error(ctx->ins,
"grpc-status=%d, grpc-message=%s",
grpc_status,
grpc_message);
}
else {
flb_plg_error(ctx->ins, "grpc-status=%d", grpc_status);
}

if (opentelemetry_is_grpc_status_retryable(grpc_status)) {
result = FLB_RETRY;
}
else {
result = FLB_ERROR;
}
}

if (grpc_message != NULL) {
cfl_sds_destroy(grpc_message);
}

if (grpc_status_text != NULL) {
cfl_sds_destroy(grpc_status_text);
}

return result;
}

int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
const void *body, size_t body_len,
const char *tag, int tag_len,
Expand Down Expand Up @@ -342,6 +453,20 @@ int opentelemetry_post(struct opentelemetry_context *ctx,

if (request->protocol_version == HTTP_PROTOCOL_VERSION_20 &&
ctx->enable_grpc_flag) {
/* nghttp2 does not automatically add the TE header because it is not
* tied to the gRPC semantics, so we must set the required
* "te: trailers" header explicitly for gRPC-over-HTTP/2.
*/
result = flb_http_request_set_header(request,
"te", 2,
"trailers", 8);

if (result != 0) {
flb_plg_error(ctx->ins, "failed to set gRPC TE header");
flb_http_client_request_destroy(request, FLB_TRUE);

return FLB_RETRY;
}

grpc_body = cfl_sds_create_size(body_len + 5);

Expand Down Expand Up @@ -521,6 +646,13 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
out_ret = FLB_OK;
}

if (ctx->enable_grpc_flag && request->protocol_version == HTTP_PROTOCOL_VERSION_20 && out_ret == FLB_OK) {
result = opentelemetry_check_grpc_status(ctx, response);
if (result != FLB_OK) {
out_ret = result;
}
}

flb_http_client_request_destroy(request, FLB_TRUE);

return out_ret;
Expand Down
Loading