diff --git a/plugins/out_es/es.c b/plugins/out_es/es.c index 3b34de865e6..72cb152f1d8 100644 --- a/plugins/out_es/es.c +++ b/plugins/out_es/es.c @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include #include @@ -83,7 +85,7 @@ static flb_sds_t add_aws_auth(struct flb_elasticsearch *ctx, static inline int es_pack_map_content(msgpack_packer *tmp_pck, msgpack_object map, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int i; char *ptr_key = NULL; @@ -132,7 +134,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, * * https://goo.gl/R5NMTr */ - if (ctx->replace_dots == FLB_TRUE) { + if (ec->replace_dots == FLB_TRUE) { char *p = ptr_key; char *end = ptr_key + key_size; while (p != end) { @@ -157,7 +159,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, */ if (v->type == MSGPACK_OBJECT_MAP) { msgpack_pack_map(tmp_pck, v->via.map.size); - es_pack_map_content(tmp_pck, *v, ctx); + es_pack_map_content(tmp_pck, *v, ec); } else { msgpack_pack_object(tmp_pck, *v); @@ -175,7 +177,7 @@ static inline int es_pack_map_content(msgpack_packer *tmp_pck, */ static char *elasticsearch_format(const void *data, size_t bytes, const char *tag, int tag_len, int *out_size, - struct flb_elasticsearch *ctx) + struct flb_elasticsearch_config *ec) { int ret; int len; @@ -245,9 +247,9 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_unpacked_init(&result); /* Copy logstash prefix if logstash format is enabled */ - if (ctx->logstash_format == FLB_TRUE) { - memcpy(logstash_index, ctx->logstash_prefix, flb_sds_len(ctx->logstash_prefix)); - logstash_index[flb_sds_len(ctx->logstash_prefix)] = '\0'; + if (ec->logstash_format == FLB_TRUE) { + memcpy(logstash_index, ec->logstash_prefix, flb_sds_len(ec->logstash_prefix)); + logstash_index[flb_sds_len(ec->logstash_prefix)] = '\0'; } /* @@ -257,17 +259,17 @@ static char *elasticsearch_format(const void *data, size_t bytes, * The header stored in 'j_index' will be used for the all records on * this payload. */ - if (ctx->logstash_format == FLB_FALSE && ctx->generate_id == FLB_FALSE) { + if (ec->logstash_format == FLB_FALSE && ec->generate_id == FLB_FALSE) { flb_time_get(&tms); gmtime_r(&tms.tm.tv_sec, &tm); strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT, - es_index, ctx->type); + es_index, ec->type); } /* @@ -276,7 +278,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * in order to prevent generating millions of indexes * we can set to always use current time for index generation */ - if (ctx->current_time_index == FLB_TRUE) { + if (ec->current_time_index == FLB_TRUE) { flb_time_get(&tms); } @@ -293,7 +295,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, } /* Only pop time from record if current_time_index is disabled */ - if (ctx->current_time_index == FLB_FALSE) { + if (ec->current_time_index == FLB_FALSE) { flb_time_pop_from_msgpack(&tms, &result, &obj); } @@ -308,17 +310,17 @@ static char *elasticsearch_format(const void *data, size_t bytes, map_size = map.via.map.size; es_index_custom_len = 0; - if (ctx->logstash_prefix_key) { + if (ec->logstash_prefix_key) { for (i = 0; i < map_size; i++) { key = map.via.map.ptr[i].key; if (key.type != MSGPACK_OBJECT_STR) { continue; } - if (key.via.str.size != flb_sds_len(ctx->logstash_prefix_key)) { + if (key.via.str.size != flb_sds_len(ec->logstash_prefix_key)) { continue; } - if (strncmp(key.via.str.ptr, ctx->logstash_prefix_key, - flb_sds_len(ctx->logstash_prefix_key)) != 0) { + if (strncmp(key.via.str.ptr, ec->logstash_prefix_key, + flb_sds_len(ec->logstash_prefix_key)) != 0) { continue; } val = map.via.map.ptr[i].val; @@ -340,7 +342,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_sbuffer_init(&tmp_sbuf); msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); - if (ctx->include_tag_key == FLB_TRUE) { + if (ec->include_tag_key == FLB_TRUE) { map_size++; } @@ -348,13 +350,13 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_pack_map(&tmp_pck, map_size + 1); /* Append the time key */ - msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->time_key)); - msgpack_pack_str_body(&tmp_pck, ctx->time_key, flb_sds_len(ctx->time_key)); + msgpack_pack_str(&tmp_pck, flb_sds_len(ec->time_key)); + msgpack_pack_str_body(&tmp_pck, ec->time_key, flb_sds_len(ec->time_key)); /* Format the time */ gmtime_r(&tms.tm.tv_sec, &tm); s = strftime(time_formatted, sizeof(time_formatted) - 1, - ctx->time_key_format, &tm); + ec->time_key_format, &tm); len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s, ".%03" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec); @@ -362,40 +364,40 @@ static char *elasticsearch_format(const void *data, size_t bytes, msgpack_pack_str(&tmp_pck, s); msgpack_pack_str_body(&tmp_pck, time_formatted, s); - es_index = ctx->index; - if (ctx->logstash_format == FLB_TRUE) { + es_index = ec->index; + if (ec->logstash_format == FLB_TRUE) { /* Compose Index header */ if (es_index_custom_len > 0) { p = logstash_index + es_index_custom_len; } else { - p = logstash_index + flb_sds_len(ctx->logstash_prefix); + p = logstash_index + flb_sds_len(ec->logstash_prefix); } *p++ = '-'; len = p - logstash_index; s = strftime(p, sizeof(logstash_index) - len - 1, - ctx->logstash_dateformat, &tm); + ec->logstash_dateformat, &tm); p += s; *p++ = '\0'; es_index = logstash_index; - if (ctx->generate_id == FLB_FALSE) { + if (ec->generate_id == FLB_FALSE) { index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT, - es_index, ctx->type); + es_index, ec->type); } } - else if (ctx->current_time_index == FLB_TRUE) { + else if (ec->current_time_index == FLB_TRUE) { /* Make sure we handle index time format for index */ strftime(index_formatted, sizeof(index_formatted) - 1, - ctx->index, &tm); + ec->index, &tm); es_index = index_formatted; } /* Tag Key */ - if (ctx->include_tag_key == FLB_TRUE) { - msgpack_pack_str(&tmp_pck, flb_sds_len(ctx->tag_key)); - msgpack_pack_str_body(&tmp_pck, ctx->tag_key, flb_sds_len(ctx->tag_key)); + if (ec->include_tag_key == FLB_TRUE) { + msgpack_pack_str(&tmp_pck, flb_sds_len(ec->tag_key)); + msgpack_pack_str_body(&tmp_pck, ec->tag_key, flb_sds_len(ec->tag_key)); msgpack_pack_str(&tmp_pck, tag_len); msgpack_pack_str_body(&tmp_pck, tag, tag_len); } @@ -407,7 +409,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * Elasticsearch have a restriction that key names cannot contain * a dot; if some dot is found, it's replaced with an underscore. */ - ret = es_pack_map_content(&tmp_pck, map, ctx); + ret = es_pack_map_content(&tmp_pck, map, ec); if (ret == -1) { msgpack_unpacked_destroy(&result); msgpack_sbuffer_destroy(&tmp_sbuf); @@ -415,7 +417,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, return NULL; } - if (ctx->generate_id == FLB_TRUE) { + if (ec->generate_id == FLB_TRUE) { MurmurHash3_x64_128(tmp_sbuf.data, tmp_sbuf.size, 42, hash); snprintf(es_uuid, sizeof(es_uuid), "%04x%04x-%04x-%04x-%04x-%04x%04x%04x", @@ -424,7 +426,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, index_len = snprintf(j_index, ES_BULK_HEADER, ES_BULK_INDEX_FMT_ID, - es_index, ctx->type, es_uuid); + es_index, ec->type, es_uuid); } /* Convert msgpack to JSON */ @@ -458,7 +460,7 @@ static char *elasticsearch_format(const void *data, size_t bytes, * return the bulk->ptr buffer */ flb_free(bulk); - if (ctx->trace_output) { + if (ec->trace_output) { fwrite(buf, 1, *out_size, stdout); fflush(stdout); } @@ -469,20 +471,31 @@ static int cb_es_init(struct flb_output_instance *ins, struct flb_config *config, void *data) { + int ret; + const char *tmp; struct flb_elasticsearch *ctx; + (void) data; - ctx = flb_es_conf_create(ins, config); + ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); if (!ctx) { - flb_plg_error(ins, "cannot initialize plugin"); + flb_errno(); return -1; } + ctx->ins = ins; - flb_plg_debug(ctx->ins, "host=%s port=%i uri=%s index=%s type=%s", - ins->host.name, ins->host.port, ctx->uri, - ctx->index, ctx->type); - + mk_list_init(&ctx->configs); flb_output_set_context(ins, ctx); - return 0; + + /* Configure HA or simple mode ? */ + tmp = flb_output_get_property("upstream", ins); + if (tmp) { + ret = es_config_ha(tmp, ctx, config); + } + else { + ret = es_config_simple(ins, ctx, config); + } + + return ret; } static int elasticsearch_error_check(struct flb_elasticsearch *ctx, @@ -593,31 +606,56 @@ static void cb_es_flush(const void *data, size_t bytes, char *pack; size_t b_sent; struct flb_elasticsearch *ctx = out_context; + struct flb_elasticsearch_config *ec = NULL; struct flb_upstream_conn *u_conn; struct flb_http_client *c; flb_sds_t signature = NULL; + struct flb_upstream_node *node; (void) i_ins; (void) tag; (void) tag_len; + if (ctx->ha_mode == FLB_TRUE) { + node = flb_upstream_ha_node_get(ctx->ha); + if (!node) { + flb_plg_error(ctx->ins, "cannot get an Upstream HA node"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + /* Get forward_config stored in node opaque data */ + ec = flb_upstream_node_get_data(node); + flb_plg_debug(ctx->ins, "trying node %s", node->name); + } + else { + ec = mk_list_entry_first(&ctx->configs, + struct flb_elasticsearch_config, + _head); + } + /* Get upstream connection */ - u_conn = flb_upstream_conn_get(ctx->u); + if (ctx->ha_mode == FLB_TRUE) { + u_conn = flb_upstream_conn_get(node->u); + } + else { + u_conn = flb_upstream_conn_get(ctx->u); + } if (!u_conn) { + flb_plg_error(ctx->ins, "no upstream connections available"); FLB_OUTPUT_RETURN(FLB_RETRY); } /* Convert format */ - pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ctx); + pack = elasticsearch_format(data, bytes, tag, tag_len, &bytes_out, ec); if (!pack) { flb_upstream_conn_release(u_conn); FLB_OUTPUT_RETURN(FLB_ERROR); } /* Compose HTTP Client request */ - c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->uri, + c = flb_http_client(u_conn, FLB_HTTP_POST, ec->uri, pack, bytes_out, NULL, 0, NULL, 0); - flb_http_buffer_size(c, ctx->buffer_size); + flb_http_buffer_size(c, ec->buffer_size); #ifndef FLB_HAVE_SIGNV4 flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); @@ -625,15 +663,15 @@ static void cb_es_flush(const void *data, size_t bytes, flb_http_add_header(c, "Content-Type", 12, "application/x-ndjson", 20); - if (ctx->http_user && ctx->http_passwd) { - flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); + if (ec->http_user && ec->http_passwd) { + flb_http_basic_auth(c, ec->http_user, ec->http_passwd); } #ifdef FLB_HAVE_SIGNV4 - if (ctx->has_aws_auth == FLB_TRUE) { + if (ec->has_aws_auth == FLB_TRUE) { /* User agent for AWS tools must start with "aws-" */ flb_http_add_header(c, "User-Agent", 10, "aws-fluent-bit-plugin", 21); - signature = add_aws_auth(ctx, c, ctx->aws_region); + signature = add_aws_auth(ctx, c, ec->aws_region); if (!signature) { goto retry; } @@ -645,20 +683,20 @@ static void cb_es_flush(const void *data, size_t bytes, ret = flb_http_do(c, &b_sent); if (ret != 0) { - flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ctx->uri); + flb_plg_warn(ctx->ins, "http_do=%i URI=%s", ret, ec->uri); goto retry; } else { /* The request was issued successfully, validate the 'error' field */ - flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ctx->uri); + flb_plg_debug(ctx->ins, "HTTP Status=%i URI=%s", c->resp.status, ec->uri); if (c->resp.status != 200 && c->resp.status != 201) { if (c->resp.payload_size > 0) { flb_plg_error(ctx->ins, "HTTP status=%i URI=%s, response:\n%s\n", - c->resp.status, ctx->uri, c->resp.payload); + c->resp.status, ec->uri, c->resp.payload); } else { flb_plg_error(ctx->ins, "HTTP status=%i URI=%s", - c->resp.status, ctx->uri); + c->resp.status, ec->uri); } goto retry; } @@ -671,7 +709,7 @@ static void cb_es_flush(const void *data, size_t bytes, ret = elasticsearch_error_check(ctx, c); if (ret == FLB_TRUE) { /* we got an error */ - if (ctx->trace_error) { + if (ec->trace_error) { /* * If trace_error is set, trace the actual * input/output to Elasticsearch that caused the problem. @@ -711,8 +749,32 @@ static void cb_es_flush(const void *data, size_t bytes, static int cb_es_exit(void *data, struct flb_config *config) { struct flb_elasticsearch *ctx = data; + struct flb_elasticsearch_config *ec; + struct mk_list *head; + struct mk_list *tmp; + (void) config; + + if (!ctx) { + return 0; + } + + /* Destroy elasticsearch_config contexts */ + mk_list_foreach_safe(head, tmp, &ctx->configs) { + ec = mk_list_entry(head, struct flb_elasticsearch_config, _head); + mk_list_del(&ec->_head); + flb_es_conf_destroy(ec); + } + + if (ctx->ha_mode == FLB_TRUE) { + if (ctx->ha) { + flb_upstream_ha_destroy(ctx->ha); + } + } + else { + flb_upstream_destroy(ctx->u); + } + flb_free(ctx); - flb_es_conf_destroy(ctx); return 0; } @@ -720,24 +782,24 @@ static int cb_es_exit(void *data, struct flb_config *config) static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, index), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, index), NULL }, { FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, type), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, type), NULL }, /* HTTP Authentication */ { FLB_CONFIG_MAP_STR, "http_user", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_user), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_user), NULL }, { FLB_CONFIG_MAP_STR, "http_passwd", "", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, http_passwd), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_passwd), NULL }, @@ -745,12 +807,12 @@ static struct flb_config_map config_map[] = { #ifdef FLB_HAVE_SIGNV4 { FLB_CONFIG_MAP_BOOL, "aws_auth", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, has_aws_auth), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, has_aws_auth), NULL }, { FLB_CONFIG_MAP_STR, "aws_region", "", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, aws_region), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_region), NULL }, #endif @@ -758,49 +820,49 @@ static struct flb_config_map config_map[] = { /* Logstash compatibility */ { FLB_CONFIG_MAP_BOOL, "logstash_format", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_format), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_format), NULL }, { FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix), NULL }, { FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_prefix_key), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_key), NULL }, { FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, logstash_dateformat), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_dateformat), NULL }, /* Custom Time and Tag keys */ { FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key), NULL }, { FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_format), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_format), NULL }, { FLB_CONFIG_MAP_BOOL, "include_tag_key", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, include_tag_key), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, include_tag_key), NULL }, { FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, tag_key), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, tag_key), NULL }, { FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX, - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, buffer_size), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, buffer_size), NULL }, @@ -817,30 +879,35 @@ static struct flb_config_map config_map[] = { }, { FLB_CONFIG_MAP_BOOL, "generate_id", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, generate_id), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, generate_id), NULL }, { FLB_CONFIG_MAP_BOOL, "replace_dots", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, replace_dots), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, replace_dots), NULL }, { FLB_CONFIG_MAP_BOOL, "current_time_index", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, current_time_index), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, current_time_index), NULL }, /* Trace */ { FLB_CONFIG_MAP_BOOL, "trace_output", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_output), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_output), NULL }, { FLB_CONFIG_MAP_BOOL, "trace_error", "false", - 0, FLB_TRUE, offsetof(struct flb_elasticsearch, trace_error), + 0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_error), + NULL + }, + { + FLB_CONFIG_MAP_STR, "upstream", NULL, + 0, FLB_FALSE, 0, NULL }, @@ -859,5 +926,5 @@ struct flb_output_plugin out_es_plugin = { .config_map = config_map, /* Plugin flags */ - .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, + .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS }; diff --git a/plugins/out_es/es.h b/plugins/out_es/es.h index 4422af3f99a..fdc4f547302 100644 --- a/plugins/out_es/es.h +++ b/plugins/out_es/es.h @@ -32,7 +32,7 @@ #define FLB_ES_DEFAULT_TAG_KEY "flb-key" #define FLB_ES_DEFAULT_HTTP_MAX "4096" -struct flb_elasticsearch { +struct flb_elasticsearch_config { /* Elasticsearch index (database) and type (table) */ char *index; char *type; @@ -91,9 +91,19 @@ struct flb_elasticsearch { /* Elasticsearch HTTP API */ char uri[256]; - /* Upstream connection to the backend server */ - struct flb_upstream *u; + /* Link to list flb_elasticsearch->configs */ + struct mk_list _head; +}; +struct flb_elasticsearch { + /* if HA mode is enabled */ + int ha_mode; /* High Availability mode enabled ? */ + char *ha_upstream; /* Upstream configuration file */ + struct flb_upstream_ha *ha; + + /* Upstream handler and config context for single mode (no HA) */ + struct flb_upstream *u; + struct mk_list configs; /* Plugin output instance reference */ struct flb_output_instance *ins; }; diff --git a/plugins/out_es/es_conf.c b/plugins/out_es/es_conf.c index 2b64e3ef354..06188265ac3 100644 --- a/plugins/out_es/es_conf.c +++ b/plugins/out_es/es_conf.c @@ -23,12 +23,135 @@ #include #include #include +#include +#include +#include #include "es.h" #include "es_conf.h" -struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, - struct flb_config *config) +/* Configure in HA mode */ +int es_config_ha(const char *upstream_file, + struct flb_elasticsearch *ctx, + struct flb_config *config) +{ + int io_flags = 0; + ssize_t ret; + const char *tmp; + const char *path; + struct mk_list *head; + struct flb_uri_field *f_index = NULL; + struct flb_uri_field *f_type = NULL; + struct flb_upstream_node *node; + struct flb_elasticsearch_config *ec = NULL; + + ctx->ha_mode = FLB_TRUE; + ctx->ha = flb_upstream_ha_from_file(upstream_file, config); + if (!ctx->ha) { + flb_plg_error(ctx->ins, "cannot load Upstream file"); + return -1; + } + + /* Iterate nodes and create a forward_config context */ + mk_list_foreach(head, &ctx->ha->nodes) { + node = mk_list_entry(head, struct flb_upstream_node, _head); + /* Allocate context */ + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + flb_errno(); + flb_plg_error(ctx->ins, "failed config allocation"); + continue; + } + + /* Populate context with config map defaults and incoming properties */ + ret = flb_output_config_map_set(ctx->ins, (void *) ec); + if (ret == -1) { + flb_plg_error(ctx->ins, "configuration error"); + flb_es_conf_destroy(ec); + return -1; + } + + /* Is TLS enabled ? */ + if (node->tls_enabled == FLB_TRUE) { + io_flags = FLB_IO_TLS; + } + else { + io_flags = FLB_IO_TCP; + } + + /* Set manual Index and Type */ + tmp = flb_upstream_node_get_property("index", node); + if (tmp) { + ec->index = tmp; + } + else { + ec->index = FLB_ES_DEFAULT_INDEX; + } + + tmp = flb_upstream_node_get_property("type", node); + if (tmp) { + ec->type = tmp; + } + else { + ec->type = FLB_ES_DEFAULT_TYPE; + } + + /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ + if (ec->buffer_size == -1) { + ec->buffer_size = 0; + } + + /* Elasticsearch: Path */ + path = flb_upstream_node_get_property("path", node); + if (!path) { + path = ""; + } + + /* Elasticsearch: Pipeline */ + tmp = flb_upstream_node_get_property("pipeline", node); + if (tmp) { + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + } + else { + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); + } + +#ifdef FLB_HAVE_SIGNV4 + /* AWS Auth */ + ec->has_aws_auth = FLB_FALSE; + tmp = flb_upstream_node_get_property("aws_auth", node); + if (tmp) { + if (strncasecmp(tmp, "On", 2) == 0) { + ec->has_aws_auth = FLB_TRUE; + flb_plg_warn(ctx->ins, + "Enabled AWS Auth. Note: Amazon ElasticSearch " + "Service support in Fluent Bit is experimental."); + + tmp = flb_upstream_node_get_property("aws_region", node); + if (!tmp) { + flb_plg_error(ctx->ins, + "aws_auth enabled but aws_region not set"); + flb_es_conf_destroy(ctx); + return NULL; + } + ec->aws_region = tmp; + } + } +#endif + + /* Initialize and validate es_config context */ + mk_list_add(&ec->_head, &ctx->configs); + + /* Set our elasticsearch_config context into the node */ + flb_upstream_node_set_data(ec, node); + } + + return 0; +} + +int es_config_simple(struct flb_output_instance *ins, + struct flb_elasticsearch *ctx, + struct flb_config *config) { int io_flags = 0; @@ -39,15 +162,13 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, struct flb_uri_field *f_index = NULL; struct flb_uri_field *f_type = NULL; struct flb_upstream *upstream; - struct flb_elasticsearch *ctx; + struct flb_elasticsearch_config *ec = NULL; /* Allocate context */ - ctx = flb_calloc(1, sizeof(struct flb_elasticsearch)); - if (!ctx) { - flb_errno(); - return NULL; + ec = flb_calloc(1, sizeof(struct flb_elasticsearch_config)); + if (!ec) { + return -1; } - ctx->ins = ins; if (uri) { if (uri->count >= 2) { @@ -60,10 +181,11 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, flb_output_net_default("127.0.0.1", 9200, ins); /* Populate context with config map defaults and incoming properties */ - ret = flb_output_config_map_set(ins, (void *) ctx); + ret = flb_output_config_map_set(ins, (void *) ec); if (ret == -1) { flb_plg_error(ctx->ins, "configuration error"); - flb_es_conf_destroy(ctx); + flb_es_conf_destroy(ec); + flb_free(ctx); return NULL; } @@ -87,7 +209,8 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, &ins->tls); if (!upstream) { flb_plg_error(ctx->ins, "cannot create Upstream context"); - flb_es_conf_destroy(ctx); + flb_es_conf_destroy(ec); + flb_free(ctx); return NULL; } ctx->u = upstream; @@ -95,18 +218,9 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, /* Set instance flags into upstream */ flb_output_upstream_set(ctx->u, ins); - /* Set manual Index and Type */ - if (f_index) { - ctx->index = flb_strdup(f_index->value); /* FIXME */ - } - - if (f_type) { - ctx->type = flb_strdup(f_type->value); /* FIXME */ - } - /* HTTP Payload (response) maximum buffer size (0 == unlimited) */ - if (ctx->buffer_size == -1) { - ctx->buffer_size = 0; + if (ec->buffer_size == -1) { + ec->buffer_size = 0; } /* Elasticsearch: Path */ @@ -118,19 +232,19 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, /* Elasticsearch: Pipeline */ tmp = flb_output_get_property("pipeline", ins); if (tmp) { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk/?pipeline=%s", path, tmp); } else { - snprintf(ctx->uri, sizeof(ctx->uri) - 1, "%s/_bulk", path); + snprintf(ec->uri, sizeof(ec->uri) - 1, "%s/_bulk", path); } #ifdef FLB_HAVE_SIGNV4 /* AWS Auth */ - ctx->has_aws_auth = FLB_FALSE; + ec->has_aws_auth = FLB_FALSE; tmp = flb_output_get_property("aws_auth", ins); if (tmp) { if (strncasecmp(tmp, "On", 2) == 0) { - ctx->has_aws_auth = FLB_TRUE; + ec->has_aws_auth = FLB_TRUE; flb_plg_warn(ctx->ins, "Enabled AWS Auth. Note: Amazon ElasticSearch " "Service support in Fluent Bit is experimental."); @@ -142,24 +256,27 @@ struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, flb_es_conf_destroy(ctx); return NULL; } - ctx->aws_region = (char *) tmp; + ec->aws_region = tmp; } } #endif - return ctx; + mk_list_add(&ec->_head, &ctx->configs); + + flb_plg_debug(ctx->ins, "[out_es] host=%s port=%i uri=%s index=%s type=%s", + ins->host.name, ins->host.port, ec->uri, + ec->index, ec->type); + + return 0; } -int flb_es_conf_destroy(struct flb_elasticsearch *ctx) +int flb_es_conf_destroy(struct flb_elasticsearch_config *ec) { - if (!ctx) { + if (!ec) { return 0; } - if (ctx->u) { - flb_upstream_destroy(ctx->u); - } - flb_free(ctx); + flb_free(ec); return 0; } diff --git a/plugins/out_es/es_conf.h b/plugins/out_es/es_conf.h index 52c37c74570..3af6a3b356c 100644 --- a/plugins/out_es/es_conf.h +++ b/plugins/out_es/es_conf.h @@ -27,8 +27,15 @@ #include "es.h" -struct flb_elasticsearch *flb_es_conf_create(struct flb_output_instance *ins, - struct flb_config *config); -int flb_es_conf_destroy(struct flb_elasticsearch *ctx); + +int es_config_ha(const char *upstream_file, + struct flb_elasticsearch *ctx, + struct flb_config *config); + +int es_config_simple(struct flb_output_instance *ins, + struct flb_elasticsearch *ctx, + struct flb_config *config); + +int flb_es_conf_destroy(struct flb_elasticsearch_config *ec); #endif