Skip to content

Commit ba3382a

Browse files
committed
pipeline: outputs: es: support overriding the most plugin parameters with Upstream node configuration
For Elastic cloud authentication these parameters are always taken from plugin configuration and never from Upstream node configuration: cloud_id. For AWS authentication these parameters are always taken from plugin configuration and never from Upstream node configuration: http_proxy, no_proxy, tls*. Signed-off-by: Marat Abrarov <[email protected]>
1 parent ec59b32 commit ba3382a

File tree

6 files changed

+615
-139
lines changed

6 files changed

+615
-139
lines changed

plugins/out_es/es.c

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
#include "es.h"
3737
#include "es_conf.h"
38+
#include "es_conf_prop.h"
3839
#include "es_bulk.h"
3940
#include "murmur3.h"
4041

@@ -1030,86 +1031,86 @@ static int cb_es_exit(void *data, struct flb_config *config)
10301031
/* Configuration properties map */
10311032
static struct flb_config_map config_map[] = {
10321033
{
1033-
FLB_CONFIG_MAP_STR, "index", FLB_ES_DEFAULT_INDEX,
1034+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_INDEX, FLB_ES_DEFAULT_INDEX,
10341035
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, index),
10351036
"Set an index name"
10361037
},
10371038
{
1038-
FLB_CONFIG_MAP_STR, "type", FLB_ES_DEFAULT_TYPE,
1039+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TYPE, FLB_ES_DEFAULT_TYPE,
10391040
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, type),
10401041
"Set the document type property"
10411042
},
10421043
{
1043-
FLB_CONFIG_MAP_BOOL, "suppress_type_name", "false",
1044+
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_SUPPRESS_TYPE_NAME, "false",
10441045
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, suppress_type_name),
10451046
"If true, mapping types is removed. (for v7.0.0 or later)"
10461047
},
10471048

10481049
/* HTTP Authentication */
10491050
{
1050-
FLB_CONFIG_MAP_STR, "http_user", NULL,
1051+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_HTTP_USER, NULL,
10511052
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_user),
10521053
"Optional username credential for Elastic X-Pack access"
10531054
},
10541055
{
1055-
FLB_CONFIG_MAP_STR, "http_passwd", "",
1056+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_HTTP_PASSWD, "",
10561057
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, http_passwd),
10571058
"Password for user defined in HTTP_User"
10581059
},
10591060

10601061
/* HTTP Compression */
10611062
{
1062-
FLB_CONFIG_MAP_STR, "compress", NULL,
1063+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_COMPRESS, NULL,
10631064
0, FLB_FALSE, 0,
10641065
"Set payload compression mechanism. Option available is 'gzip'"
10651066
},
10661067

10671068
/* Cloud Authentication */
10681069
{
1069-
FLB_CONFIG_MAP_STR, "cloud_id", NULL,
1070+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_CLOUD_ID, NULL,
10701071
0, FLB_FALSE, 0,
10711072
"Elastic cloud ID of the cluster to connect to"
10721073
},
10731074
{
1074-
FLB_CONFIG_MAP_STR, "cloud_auth", NULL,
1075+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_CLOUD_AUTH, NULL,
10751076
0, FLB_FALSE, 0,
10761077
"Elastic cloud authentication credentials"
10771078
},
10781079

10791080
/* AWS Authentication */
10801081
#ifdef FLB_HAVE_AWS
10811082
{
1082-
FLB_CONFIG_MAP_BOOL, "aws_auth", "false",
1083+
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_AWS_AUTH, "false",
10831084
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, has_aws_auth),
10841085
"Enable AWS Sigv4 Authentication"
10851086
},
10861087
{
1087-
FLB_CONFIG_MAP_STR, "aws_region", NULL,
1088+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_REGION, NULL,
10881089
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_region),
10891090
"AWS Region of your Amazon OpenSearch Service cluster"
10901091
},
10911092
{
1092-
FLB_CONFIG_MAP_STR, "aws_sts_endpoint", NULL,
1093+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_STS_ENDPOINT, NULL,
10931094
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_sts_endpoint),
10941095
"Custom endpoint for the AWS STS API, used with the AWS_Role_ARN option"
10951096
},
10961097
{
1097-
FLB_CONFIG_MAP_STR, "aws_role_arn", NULL,
1098+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_ROLE_ARN, NULL,
10981099
0, FLB_FALSE, 0,
10991100
"AWS IAM Role to assume to put records to your Amazon OpenSearch cluster"
11001101
},
11011102
{
1102-
FLB_CONFIG_MAP_STR, "aws_external_id", NULL,
1103+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_EXTERNAL_ID, NULL,
11031104
0, FLB_FALSE, 0,
11041105
"External ID for the AWS IAM Role specified with `aws_role_arn`"
11051106
},
11061107
{
1107-
FLB_CONFIG_MAP_STR, "aws_service_name", "es",
1108+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_SERVICE_NAME, "es",
11081109
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_service_name),
11091110
"AWS Service Name"
11101111
},
11111112
{
1112-
FLB_CONFIG_MAP_STR, "aws_profile", NULL,
1113+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_AWS_PROFILE, NULL,
11131114
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, aws_profile),
11141115
"AWS Profile name. AWS Profiles can be configured with AWS CLI and are usually stored in "
11151116
"$HOME/.aws/ directory."
@@ -1118,68 +1119,68 @@ static struct flb_config_map config_map[] = {
11181119

11191120
/* Logstash compatibility */
11201121
{
1121-
FLB_CONFIG_MAP_BOOL, "logstash_format", "false",
1122+
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_LOGSTASH_FORMAT, "false",
11221123
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_format),
11231124
"Enable Logstash format compatibility"
11241125
},
11251126
{
1126-
FLB_CONFIG_MAP_STR, "logstash_prefix", FLB_ES_DEFAULT_PREFIX,
1127+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX, FLB_ES_DEFAULT_PREFIX,
11271128
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix),
11281129
"When Logstash_Format is enabled, the Index name is composed using a prefix "
11291130
"and the date, e.g: If Logstash_Prefix is equals to 'mydata' your index will "
11301131
"become 'mydata-YYYY.MM.DD'. The last string appended belongs to the date "
11311132
"when the data is being generated"
11321133
},
11331134
{
1134-
FLB_CONFIG_MAP_STR, "logstash_prefix_separator", "-",
1135+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_SEPARATOR, "-",
11351136
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_separator),
11361137
"Set a separator between logstash_prefix and date."
11371138
},
11381139
{
1139-
FLB_CONFIG_MAP_STR, "logstash_prefix_key", NULL,
1140+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_PREFIX_KEY, NULL,
11401141
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_prefix_key),
11411142
"When included: the value in the record that belongs to the key will be looked "
11421143
"up and over-write the Logstash_Prefix for index generation. If the key/value "
11431144
"is not found in the record then the Logstash_Prefix option will act as a "
11441145
"fallback. Nested keys are supported through record accessor pattern"
11451146
},
11461147
{
1147-
FLB_CONFIG_MAP_STR, "logstash_dateformat", FLB_ES_DEFAULT_TIME_FMT,
1148+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_LOGSTASH_DATEFORMAT, FLB_ES_DEFAULT_TIME_FMT,
11481149
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, logstash_dateformat),
11491150
"Time format (based on strftime) to generate the second part of the Index name"
11501151
},
11511152

11521153
/* Custom Time and Tag keys */
11531154
{
1154-
FLB_CONFIG_MAP_STR, "time_key", FLB_ES_DEFAULT_TIME_KEY,
1155+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY, FLB_ES_DEFAULT_TIME_KEY,
11551156
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key),
11561157
"When Logstash_Format is enabled, each record will get a new timestamp field. "
11571158
"The Time_Key property defines the name of that field"
11581159
},
11591160
{
1160-
FLB_CONFIG_MAP_STR, "time_key_format", FLB_ES_DEFAULT_TIME_KEYF,
1161+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TIME_KEY_FORMAT, FLB_ES_DEFAULT_TIME_KEYF,
11611162
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_format),
11621163
"When Logstash_Format is enabled, this property defines the format of the "
11631164
"timestamp"
11641165
},
11651166
{
1166-
FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false",
1167+
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TIME_KEY_NANOS, "false",
11671168
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, time_key_nanos),
11681169
"When Logstash_Format is enabled, enabling this property sends nanosecond "
11691170
"precision timestamps"
11701171
},
11711172
{
1172-
FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
1173+
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_INCLUDE_TAG_KEY, "false",
11731174
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, include_tag_key),
11741175
"When enabled, it append the Tag name to the record"
11751176
},
11761177
{
1177-
FLB_CONFIG_MAP_STR, "tag_key", FLB_ES_DEFAULT_TAG_KEY,
1178+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_TAG_KEY, FLB_ES_DEFAULT_TAG_KEY,
11781179
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, tag_key),
11791180
"When Include_Tag_Key is enabled, this property defines the key name for the tag"
11801181
},
11811182
{
1182-
FLB_CONFIG_MAP_SIZE, "buffer_size", FLB_ES_DEFAULT_HTTP_MAX,
1183+
FLB_CONFIG_MAP_SIZE, FLB_ES_CONFIG_PROPERTY_BUFFER_SIZE, FLB_ES_DEFAULT_HTTP_MAX,
11831184
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, buffer_size),
11841185
"Specify the buffer size used to read the response from the Elasticsearch HTTP "
11851186
"service. This option is useful for debugging purposes where is required to read "
@@ -1190,64 +1191,64 @@ static struct flb_config_map config_map[] = {
11901191

11911192
/* Elasticsearch specifics */
11921193
{
1193-
FLB_CONFIG_MAP_STR, "path", NULL,
1194+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_PATH, NULL,
11941195
0, FLB_FALSE, 0,
11951196
"Elasticsearch accepts new data on HTTP query path '/_bulk'. But it is also "
11961197
"possible to serve Elasticsearch behind a reverse proxy on a subpath. This "
11971198
"option defines such path on the fluent-bit side. It simply adds a path "
11981199
"prefix in the indexing HTTP POST URI"
11991200
},
12001201
{
1201-
FLB_CONFIG_MAP_STR, "pipeline", NULL,
1202+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_PIPELINE, NULL,
12021203
0, FLB_FALSE, 0,
12031204
"Newer versions of Elasticsearch allows to setup filters called pipelines. "
12041205
"This option allows to define which pipeline the database should use. For "
12051206
"performance reasons is strongly suggested to do parsing and filtering on "
12061207
"Fluent Bit side, avoid pipelines"
12071208
},
12081209
{
1209-
FLB_CONFIG_MAP_BOOL, "generate_id", "false",
1210+
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_GENERATE_ID, "false",
12101211
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, generate_id),
12111212
"When enabled, generate _id for outgoing records. This prevents duplicate "
12121213
"records when retrying ES"
12131214
},
12141215
{
1215-
FLB_CONFIG_MAP_STR, "write_operation", "create",
1216+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_WRITE_OPERATION, "create",
12161217
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, write_operation),
12171218
"Operation to use to write in bulk requests"
12181219
},
12191220
{
1220-
FLB_CONFIG_MAP_STR, "id_key", NULL,
1221+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_ID_KEY, NULL,
12211222
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, id_key),
12221223
"If set, _id will be the value of the key from incoming record."
12231224
},
12241225
{
1225-
FLB_CONFIG_MAP_BOOL, "replace_dots", "false",
1226+
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_REPLACE_DOTS, "false",
12261227
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, replace_dots),
12271228
"When enabled, replace field name dots with underscore, required by Elasticsearch "
12281229
"2.0-2.3."
12291230
},
12301231

12311232
{
1232-
FLB_CONFIG_MAP_BOOL, "current_time_index", "false",
1233+
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_CURRENT_TIME_INDEX, "false",
12331234
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, current_time_index),
12341235
"Use current time for index generation instead of message record"
12351236
},
12361237

12371238
/* Trace */
12381239
{
1239-
FLB_CONFIG_MAP_BOOL, "trace_output", "false",
1240+
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TRACE_OUTPUT, "false",
12401241
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_output),
12411242
"When enabled print the Elasticsearch API calls to stdout (for diag only)"
12421243
},
12431244
{
1244-
FLB_CONFIG_MAP_BOOL, "trace_error", "false",
1245+
FLB_CONFIG_MAP_BOOL, FLB_ES_CONFIG_PROPERTY_TRACE_ERROR, "false",
12451246
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, trace_error),
12461247
"When enabled print the Elasticsearch exception to stderr (for diag only)"
12471248
},
12481249

12491250
{
1250-
FLB_CONFIG_MAP_STR, "upstream", NULL,
1251+
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_UPSTREAM, NULL,
12511252
0, FLB_FALSE, 0,
12521253
"Path to 'upstream' configuration file (define multiple nodes)"
12531254
},

plugins/out_es/es.h

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,26 +34,24 @@
3434
#define FLB_ES_DEFAULT_TAG_KEY "flb-key"
3535
#define FLB_ES_DEFAULT_HTTP_MAX "512k"
3636
#define FLB_ES_DEFAULT_HTTPS_PORT 443
37-
#define FLB_ES_WRITE_OP_INDEX "index"
38-
#define FLB_ES_WRITE_OP_CREATE "create"
39-
#define FLB_ES_WRITE_OP_UPDATE "update"
40-
#define FLB_ES_WRITE_OP_UPSERT "upsert"
4137

4238
struct flb_elasticsearch_config {
4339
/* Elasticsearch index (database) and type (table) */
4440
char *index;
4541
int own_index;
4642
char *type;
4743
int own_type;
48-
char suppress_type_name;
44+
int suppress_type_name;
4945

5046
/* HTTP Auth */
5147
char *http_user;
5248
char *http_passwd;
5349

5450
/* Elastic Cloud Auth */
5551
char *cloud_user;
52+
int own_cloud_user;
5653
char *cloud_passwd;
54+
int own_cloud_passwd;
5755

5856
/* AWS Auth */
5957
#ifdef FLB_HAVE_AWS
@@ -62,12 +60,17 @@ struct flb_elasticsearch_config {
6260
char *aws_sts_endpoint;
6361
char *aws_profile;
6462
struct flb_aws_provider *aws_provider;
63+
int own_aws_provider;
6564
struct flb_aws_provider *base_aws_provider;
65+
int own_base_aws_provider;
6666
/* tls instances can't be re-used; aws provider requires a separate one */
6767
struct flb_tls *aws_tls;
68+
int own_aws_tls;
6869
struct flb_tls *aws_sts_tls;
70+
int own_aws_sts_tls;
6971
char *aws_service_name;
7072
struct mk_list *aws_unsigned_headers;
73+
int own_aws_unsigned_headers;
7174
#endif
7275

7376
/* HTTP Client Setup */
@@ -114,11 +117,12 @@ struct flb_elasticsearch_config {
114117
/* write operation */
115118
flb_sds_t write_operation;
116119
/* write operation elasticsearch operation */
117-
flb_sds_t es_action;
120+
const char *es_action;
118121

119122
/* id_key */
120123
flb_sds_t id_key;
121124
struct flb_record_accessor *ra_id_key;
125+
int own_ra_id_key;
122126

123127
/* include_tag_key */
124128
int include_tag_key;
@@ -128,6 +132,7 @@ struct flb_elasticsearch_config {
128132
char uri[256];
129133

130134
struct flb_record_accessor *ra_prefix_key;
135+
int own_ra_prefix_key;
131136

132137
/* Compression mode (gzip) */
133138
int compress_gzip;

0 commit comments

Comments
 (0)