Skip to content

aws: add integrations for CloudWatch Explore related feature #10321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
*~
_book/
lib/jemalloc
cmake-build-debug/
tests/internal/flb_tests_internal.h
tests/runtime/flb_tests_runtime.h
tests/internal/cmake-build-debug/
tests/runtime/cmake-build-debug/
build/*
include/fluent-bit/flb_info.h
include/fluent-bit/flb_plugins.h
Expand Down
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct flb_hash_table {
int total_count;
int cache_ttl;
int case_sensitivity;
int force_remove_pointer;
size_t size;
struct mk_list entries;
struct flb_hash_table_chain *table;
Expand All @@ -66,6 +67,8 @@ struct flb_hash_table {
struct flb_hash_table *flb_hash_table_create(int evict_mode, size_t size, int max_entries);
struct flb_hash_table *flb_hash_table_create_with_ttl(int cache_ttl, int evict_mode,
size_t size, int max_entries);
struct flb_hash_table *flb_hash_table_create_with_ttl_force_destroy(int cache_ttl, int evict_mode,
size_t size, int max_entries);
void flb_hash_table_destroy(struct flb_hash_table *ht);

void flb_hash_table_set_case_sensitivity(struct flb_hash_table *ht, int status);
Expand Down
49 changes: 48 additions & 1 deletion plugins/filter_aws/aws.c
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ static int ec2_metadata_group_should_fetch(struct flb_filter_aws *ctx,

interval = now - group->last_fetch_attempt;

if (group->last_fetch_attempt > 0 &&
if (group->last_fetch_attempt > 0 &&
interval < required_interval) {
return FLB_FALSE;
}
Expand Down Expand Up @@ -926,6 +926,30 @@ static int get_ec2_metadata(struct flb_filter_aws *ctx)
ctx->metadata_retrieved = FLB_TRUE;
}

if (ctx->enable_entity) {
if (!ctx->account_id) {
ret = flb_aws_imds_request_by_key(ctx->client_imds, FLB_AWS_IMDS_ACCOUNT_ID_PATH,
&ctx->account_id, &ctx->account_id_len,
"accountId");

if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to get Account ID");
return -1;
}
}

if (!ctx->instance_id) {
ret = flb_aws_imds_request(ctx->client_imds, FLB_AWS_IMDS_INSTANCE_ID_PATH,
&ctx->instance_id,
&ctx->instance_id_len);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to get instance ID");
return -1;
}
}
}

ctx->metadata_retrieved = FLB_TRUE;
return 0;
}

Expand Down Expand Up @@ -1104,6 +1128,23 @@ static int cb_aws_filter(const void *data, size_t bytes,
}
}

if (ctx->enable_entity &&
ctx->instance_id &&
ctx->account_id &&
ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_values(
&log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE(FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY),
FLB_LOG_EVENT_STRING_VALUE(ctx->instance_id,
ctx->instance_id_len));
ret = flb_log_event_encoder_append_body_values(
&log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE(FLB_FILTER_AWS_ENTITY_ACCOUNT_ID_KEY),
FLB_LOG_EVENT_STRING_VALUE(ctx->account_id,
ctx->account_id_len));
}


if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_commit_record(&log_encoder);
}
Expand Down Expand Up @@ -1273,6 +1314,12 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_filter_aws, retry_required_interval),
"Defines minimum duration between retries for fetching metadata groups"
},
{
FLB_CONFIG_MAP_BOOL, "enable_entity", "false",
0, FLB_TRUE, offsetof(struct flb_filter_aws, enable_entity),
"Enable entity prefix for fields used for constructing entity."
"This currently only affects instance ID"
},
{0}
};

Expand Down
9 changes: 9 additions & 0 deletions plugins/filter_aws/aws.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#define FLB_FILTER_AWS_AVAILABILITY_ZONE_KEY_LEN 2
#define FLB_FILTER_AWS_INSTANCE_ID_KEY "ec2_instance_id"
#define FLB_FILTER_AWS_INSTANCE_ID_KEY_LEN 15
#define FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY "aws_entity_ec2_instance_id"
#define FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN 26
#define FLB_FILTER_AWS_INSTANCE_TYPE_KEY "ec2_instance_type"
#define FLB_FILTER_AWS_INSTANCE_TYPE_KEY_LEN 17
#define FLB_FILTER_AWS_PRIVATE_IP_KEY "private_ip"
Expand All @@ -37,6 +39,8 @@
#define FLB_FILTER_AWS_AMI_ID_KEY_LEN 6
#define FLB_FILTER_AWS_ACCOUNT_ID_KEY "account_id"
#define FLB_FILTER_AWS_ACCOUNT_ID_KEY_LEN 10
#define FLB_FILTER_AWS_ENTITY_ACCOUNT_ID_KEY "aws_entity_account_id"
#define FLB_FILTER_AWS_ENTITY_ACCOUNT_ID_KEY_LEN 21
#define FLB_FILTER_AWS_HOSTNAME_KEY "hostname"
#define FLB_FILTER_AWS_HOSTNAME_KEY_LEN 8

Expand Down Expand Up @@ -110,6 +114,11 @@ struct flb_filter_aws {
/* tags_* fields are related to exposing EC2 tags in log labels
* tags_enabled defines if EC2 tags functionality is enabled */
int tags_enabled;
/*
* Enable entity prefix appending. This appends
* 'aws_entity' to relevant keys
*/
int enable_entity;

/* tags_fetched defines if tag keys and values were fetched successfully
* and might be used to inject into msgpack */
Expand Down
24 changes: 24 additions & 0 deletions plugins/filter_kubernetes/kube_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ struct flb_kube *flb_kube_conf_create(struct flb_filter_instance *ins,
flb_plg_info(ctx->ins, "https=%i host=%s port=%i",
ctx->api_https, ctx->api_host, ctx->api_port);
}

ctx->pod_hash_table = flb_hash_table_create_with_ttl_force_destroy(ctx->pod_service_map_ttl,
FLB_HASH_TABLE_EVICT_OLDER,
FLB_HASH_TABLE_SIZE,
FLB_HASH_TABLE_SIZE);
return ctx;
}

Expand All @@ -206,6 +211,10 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
flb_hash_table_destroy(ctx->namespace_hash_table);
}

if (ctx->pod_hash_table) {
flb_hash_table_destroy(ctx->pod_hash_table);
}

if (ctx->merge_log == FLB_TRUE) {
flb_free(ctx->unesc_buf);
}
Expand All @@ -214,6 +223,9 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
if (ctx->parser == NULL && ctx->regex) {
flb_regex_destroy(ctx->regex);
}
if (ctx->deploymentRegex) {
flb_regex_destroy(ctx->deploymentRegex);
}

flb_free(ctx->api_host);
flb_free(ctx->token);
Expand All @@ -228,6 +240,18 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
flb_upstream_destroy(ctx->kube_api_upstream);
}

if(ctx->pod_association_tls) {
flb_tls_destroy(ctx->pod_association_tls);
}

if (ctx->pod_association_upstream) {
flb_upstream_destroy(ctx->pod_association_upstream);
}

if (ctx->platform) {
flb_free(ctx->platform);
}

#ifdef FLB_HAVE_TLS
if (ctx->tls) {
flb_tls_destroy(ctx->tls);
Expand Down
68 changes: 68 additions & 0 deletions plugins/filter_kubernetes/kube_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,40 @@
#define FLB_KUBE_TAG_PREFIX "kube.var.log.containers."
#endif

/*
* Maximum attribute length for Entity's KeyAttributes
* values
* https://docs.aws.amazon.com/applicationsignals/latest/APIReference/API_Service.html#:~:text=Maximum%20length%20of%201024.
*/
#define KEY_ATTRIBUTES_MAX_LEN 1024
#define SERVICE_NAME_SOURCE_MAX_LEN 64

/*
* Configmap used for verifying whether if FluentBit is
* on EKS or native Kubernetes
*/
#define KUBE_SYSTEM_NAMESPACE "kube-system"
#define AWS_AUTH_CONFIG_MAP "aws-auth"

/*
* Possible platform values for Kubernetes plugin
*/
#define NATIVE_KUBERNETES_PLATFORM "k8s"
#define EKS_PLATFORM "eks"

struct kube_meta;

struct service_attributes {
char name[KEY_ATTRIBUTES_MAX_LEN];
int name_len;
char environment[KEY_ATTRIBUTES_MAX_LEN];
int environment_len;
char name_source[SERVICE_NAME_SOURCE_MAX_LEN];
int name_source_len;
int fields;

};

/* Filter context */
struct flb_kube {
/* Configuration parameters */
Expand Down Expand Up @@ -124,6 +156,7 @@ struct flb_kube {

/* Regex context to parse records */
struct flb_regex *regex;
struct flb_regex *deploymentRegex;
struct flb_parser *parser;

/* TLS CA certificate file */
Expand Down Expand Up @@ -165,6 +198,41 @@ struct flb_kube {
int kube_meta_cache_ttl;
int kube_meta_namespace_cache_ttl;

/* Configuration used for enabling pod to service name mapping*/
int use_pod_association;
char *pod_association_host;
char *pod_association_endpoint;
int pod_association_port;

/*
* TTL is used to check how long should the mapped entry
* remain in the hash table
*/
struct flb_hash_table *pod_hash_table;
int pod_service_map_ttl;
int pod_service_map_refresh_interval;
flb_sds_t pod_service_preload_cache_path;
struct flb_upstream *pod_association_upstream;
/*
* This variable holds the Kubernetes platform type
* Current checks for EKS or Native Kuberentes
*/
char *platform;
/*
* This value is used for holding the platform config
* value. Platform will be overriden with this variable
* if it's set
*/
char *set_platform;

//Agent TLS certs
struct flb_tls *pod_association_tls;
char *pod_association_host_server_ca_file;
char *pod_association_host_client_cert_file;
char *pod_association_host_client_key_file;
int pod_association_host_tls_debug;
int pod_association_host_tls_verify;

struct flb_tls *tls;
struct flb_tls *kubelet_tls;

Expand Down
Loading
Loading