Skip to content

Commit 7b44ce6

Browse files
committed
add label config
Signed-off-by: composer <[email protected]>
1 parent e24c442 commit 7b44ce6

File tree

3 files changed

+37
-3
lines changed

3 files changed

+37
-3
lines changed

plugins/out_doris/doris.c

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ static int cb_doris_init(struct flb_output_instance *ins,
6868
static int http_put(struct flb_out_doris *ctx,
6969
const char *host, int port,
7070
const void *body, size_t body_len,
71-
const char *tag, int tag_len)
71+
const char *tag, int tag_len,
72+
const char *label, int label_len)
7273
{
7374
int ret;
7475
int out_ret = FLB_OK;
@@ -134,6 +135,11 @@ static int http_put(struct flb_out_doris *ctx,
134135
flb_http_add_header(c, "strip_outer_array", 17, "true", 4);
135136
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
136137

138+
if (ctx->add_label) {
139+
flb_http_add_header(c, "label", 5, label, label_len);
140+
flb_plg_debug(ctx->ins, "add label: %s", label);
141+
}
142+
137143
flb_config_map_foreach(head, mv, ctx->headers) {
138144
key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
139145
val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
@@ -170,7 +176,7 @@ static int http_put(struct flb_out_doris *ctx,
170176
memcpy(redict_port, mid + 1, end - (mid + 1));
171177

172178
out_ret = http_put(ctx, redict_host, atoi(redict_port),
173-
body, body_len, tag, tag_len);
179+
body, body_len, tag, tag_len, label, label_len);
174180
}
175181
else if (c->resp.status == 200 && c->resp.payload_size > 0) {
176182
ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
@@ -296,6 +302,9 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk,
296302
size_t out_size;
297303
(void) i_ins;
298304

305+
char label[256] = {0};
306+
int len = 0;
307+
299308
ret = compose_payload(ctx, event_chunk->data, event_chunk->size,
300309
&out_body, &out_size);
301310

@@ -306,8 +315,14 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk,
306315
FLB_OUTPUT_RETURN(ret);
307316
}
308317

318+
if (ctx->add_label) {
319+
len = snprintf(label, sizeof(label) - 1, "%s_%lu_", ctx->label_prefix, cfl_time_now() / 1000000000L);
320+
flb_utils_uuid_v4_gen(label + len);
321+
len += 36;
322+
}
323+
309324
ret = http_put(ctx, ctx->host, ctx->port, out_body, out_size,
310-
event_chunk->tag, flb_sds_len(event_chunk->tag));
325+
event_chunk->tag, flb_sds_len(event_chunk->tag), label, len);
311326
flb_sds_destroy(out_body);
312327

313328
if (ret == FLB_OK) {
@@ -353,6 +368,12 @@ static struct flb_config_map config_map[] = {
353368
0, FLB_TRUE, offsetof(struct flb_out_doris, table),
354369
"Set table"
355370
},
371+
// label_prefix
372+
{
373+
FLB_CONFIG_MAP_STR, "label_prefix", "flubentbit",
374+
0, FLB_TRUE, offsetof(struct flb_out_doris, label_prefix),
375+
"Set label prefix"
376+
},
356377
// time_key
357378
{
358379
FLB_CONFIG_MAP_STR, "time_key", "date",

plugins/out_doris/doris.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ struct flb_out_doris {
3939
flb_sds_t database;
4040
flb_sds_t table;
4141

42+
flb_sds_t label_prefix;
43+
int add_label;
44+
4245
flb_sds_t time_key;
4346
flb_sds_t date_key; /* internal use */
4447

plugins/out_doris/doris_conf.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,16 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins,
136136
/* url: /api/{database}/{table}/_stream_load */
137137
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/api/%s/%s/_stream_load", ctx->database, ctx->table);
138138

139+
/* label prefix */
140+
ctx->add_label = 1;
141+
tmp = flb_output_get_property("label_prefix", ins);
142+
if (tmp) {
143+
/* Just check if we have to disable it */
144+
if (flb_utils_bool(tmp) == FLB_FALSE) {
145+
ctx->add_label = 0;
146+
}
147+
}
148+
139149
/* Date key */
140150
ctx->date_key = ctx->time_key;
141151
tmp = flb_output_get_property("time_key", ins);

0 commit comments

Comments
 (0)