Skip to content

Commit e24c442

Browse files
committed
feat: add more doris config
Signed-off-by: composer <[email protected]>
1 parent daa4540 commit e24c442

File tree

3 files changed

+202
-26
lines changed

3 files changed

+202
-26
lines changed

plugins/out_doris/doris.c

Lines changed: 106 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,27 @@ static int http_put(struct flb_out_doris *ctx,
7878
struct flb_upstream *u;
7979
struct flb_connection *u_conn;
8080
struct flb_http_client *c;
81+
struct mk_list *head;
82+
struct flb_config_map_val *mv;
83+
struct flb_slist_entry *key = NULL;
84+
struct flb_slist_entry *val = NULL;
85+
86+
int i;
87+
int root_type;
88+
char *out_buf;
89+
size_t off = 0;
90+
size_t out_size;
91+
msgpack_unpacked result;
92+
msgpack_object root;
93+
msgpack_object msg_key;
94+
msgpack_object msg_val;
8195

8296
/* Get upstream context and connection */
8397
if (strcmp(host, ctx->host) == 0 && port == ctx->port) {
8498
u = ctx->u;
8599
}
86100
else {
101+
// TODO cache
87102
u = flb_upstream_create(ctx->u->base.config,
88103
host,
89104
port,
@@ -117,22 +132,32 @@ static int http_put(struct flb_out_doris *ctx,
117132
flb_http_add_header(c, "format", 6, "json", 4);
118133
flb_http_add_header(c, "Expect", 6, "100-continue", 12);
119134
flb_http_add_header(c, "strip_outer_array", 17, "true", 4);
120-
flb_http_add_header(c, "columns", 7, ctx->columns, strlen(ctx->columns));
121135
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
122-
if (ctx->timeout_second > 0) {
123-
char timeout[256];
124-
snprintf(timeout, sizeof(timeout) - 1, "%d", ctx->timeout_second);
125-
flb_http_add_header(c, "timeout", 7, timeout, strlen(timeout));
136+
137+
flb_config_map_foreach(head, mv, ctx->headers) {
138+
key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head);
139+
val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head);
140+
141+
flb_http_add_header(c,
142+
key->str, flb_sds_len(key->str),
143+
val->str, flb_sds_len(val->str));
126144
}
127145

128146
/* Basic Auth headers */
129147
flb_http_basic_auth(c, ctx->user, ctx->password);
130148

131149
ret = flb_http_do(c, &b_sent);
132150
if (ret == 0) {
133-
flb_plg_debug(ctx->ins, "%s:%i, HTTP status=%i\n%s\n",
134-
host, port,
135-
c->resp.status, c->resp.payload);
151+
if (ctx->log_request) {
152+
flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s\n",
153+
host, port,
154+
c->resp.status, c->resp.payload);
155+
} else {
156+
flb_plg_debug(ctx->ins, "%s:%i, HTTP status=%i\n%s\n",
157+
host, port,
158+
c->resp.status, c->resp.payload);
159+
}
160+
136161
if (c->resp.status == 307) { // redict
137162
// example: Location: http://admin:[email protected]:8040/api/d_fb/t_fb/_stream_load?
138163
char* location = strstr(c->resp.data, "Location:");
@@ -147,15 +172,53 @@ static int http_put(struct flb_out_doris *ctx,
147172
out_ret = http_put(ctx, redict_host, atoi(redict_port),
148173
body, body_len, tag, tag_len);
149174
}
150-
else if (c->resp.status == 200) {
151-
if (c->resp.payload_size > 0 &&
152-
(strstr(c->resp.payload, "\"Status\": \"Success\"") != NULL ||
153-
strstr(c->resp.payload, "\"Status\": \"Publish Timeout\"") != NULL)) {
154-
// continue
175+
else if (c->resp.status == 200 && c->resp.payload_size > 0) {
176+
ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
177+
&out_buf, &out_size, &root_type, NULL);
178+
179+
if (ret == -1) {
180+
out_ret = FLB_RETRY;
181+
}
182+
183+
msgpack_unpacked_init(&result);
184+
ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
185+
if (ret != MSGPACK_UNPACK_SUCCESS) {
186+
out_ret = FLB_RETRY;
155187
}
156-
else {
188+
189+
root = result.data;
190+
if (root.type != MSGPACK_OBJECT_MAP) {
157191
out_ret = FLB_RETRY;
158192
}
193+
194+
for (i = 0; i < root.via.map.size; i++) {
195+
msg_key = root.via.map.ptr[i].key;
196+
if (msg_key.type != MSGPACK_OBJECT_STR) {
197+
out_ret = FLB_RETRY;
198+
break;
199+
}
200+
201+
if (msg_key.via.str.size == 6 && strncmp(msg_key.via.str.ptr, "Status", 6) == 0) {
202+
msg_val = root.via.map.ptr[i].val;
203+
if (msg_val.type != MSGPACK_OBJECT_STR) {
204+
out_ret = FLB_RETRY;
205+
break;
206+
}
207+
208+
if (msg_val.via.str.size == 7 && strncmp(msg_val.via.str.ptr, "Success", 7) == 0) {
209+
out_ret = FLB_OK;
210+
break;
211+
}
212+
213+
if (msg_val.via.str.size == 15 && strncmp(msg_val.via.str.ptr, "Publish Timeout", 15) == 0) {
214+
out_ret = FLB_OK;
215+
break;
216+
}
217+
218+
out_ret = FLB_RETRY;
219+
break;
220+
}
221+
}
159222
}
160223
else {
161224
out_ret = FLB_RETRY;
@@ -204,15 +267,19 @@ static int compose_payload(struct flb_out_doris *ctx,
204267
in_size,
205268
FLB_PACK_JSON_FORMAT_JSON,
206269
FLB_PACK_JSON_DATE_EPOCH,
207-
ctx->time_key);
270+
ctx->date_key);
208271
if (encoded == NULL) {
209272
flb_plg_error(ctx->ins, "failed to convert json");
210273
return FLB_ERROR;
211274
}
212275
*out_body = (void*)encoded;
213276
*out_size = flb_sds_len(encoded);
214277

215-
flb_plg_debug(ctx->ins, "http body: %s", (char*) *out_body);
278+
if (ctx->log_request) {
279+
flb_plg_info(ctx->ins, "http body: %s", (char*) *out_body);
280+
} else {
281+
flb_plg_debug(ctx->ins, "http body: %s", (char*) *out_body);
282+
}
216283

217284
return FLB_OK;
218285
}
@@ -233,13 +300,22 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk,
233300
&out_body, &out_size);
234301

235302
if (ret != FLB_OK) {
303+
if (ret == FLB_ERROR) {
304+
__sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events);
305+
}
236306
FLB_OUTPUT_RETURN(ret);
237307
}
238308

239309
ret = http_put(ctx, ctx->host, ctx->port, out_body, out_size,
240310
event_chunk->tag, flb_sds_len(event_chunk->tag));
241311
flb_sds_destroy(out_body);
242312

313+
if (ret == FLB_OK) {
314+
__sync_fetch_and_add(&ctx->reporter->total_bytes, out_size);
315+
__sync_fetch_and_add(&ctx->reporter->total_rows, event_chunk->total_events);
316+
} else if (ret == FLB_ERROR) {
317+
__sync_fetch_and_add(&ctx->reporter->failed_rows, event_chunk->total_events);
318+
}
243319
FLB_OUTPUT_RETURN(ret);
244320
}
245321

@@ -283,17 +359,23 @@ static struct flb_config_map config_map[] = {
283359
0, FLB_TRUE, offsetof(struct flb_out_doris, time_key),
284360
"Specify the name of the date field in output"
285361
},
286-
// columns
362+
// header
363+
{
364+
FLB_CONFIG_MAP_SLIST_1, "header", NULL,
365+
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_out_doris, headers),
366+
"Add a doris stream load header key/value pair. Multiple headers can be set"
367+
},
368+
// log_request
287369
{
288-
FLB_CONFIG_MAP_STR, "columns", "date,log",
289-
0, FLB_TRUE, offsetof(struct flb_out_doris, columns),
290-
"Set columns"
370+
FLB_CONFIG_MAP_BOOL, "log_request", "true",
371+
0, FLB_TRUE, offsetof(struct flb_out_doris, log_request),
372+
"Specify if the doris stream load request and response should be logged or not"
291373
},
292-
// timeout
374+
// log_progress_interval
293375
{
294-
FLB_CONFIG_MAP_INT, "timeout_second", "60",
295-
0, FLB_TRUE, offsetof(struct flb_out_doris, timeout_second),
296-
"Set timeout in second"
376+
FLB_CONFIG_MAP_INT, "log_progress_interval", "10",
377+
0, FLB_TRUE, offsetof(struct flb_out_doris, log_progress_interval),
378+
"Specify the interval in seconds to log the progress of the doris stream load"
297379
},
298380

299381
/* EOF */

plugins/out_doris/doris.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@
2020
#ifndef FLB_OUT_DORIS_H
2121
#define FLB_OUT_DORIS_H
2222

23+
#include <fluent-bit/flb_pthread.h>
24+
25+
struct flb_doris_progress_reporter {
26+
size_t total_bytes;
27+
size_t total_rows;
28+
size_t failed_rows;
29+
};
30+
2331
struct flb_out_doris {
2432
char *host;
2533
int port;
@@ -32,9 +40,16 @@ struct flb_out_doris {
3240
flb_sds_t table;
3341

3442
flb_sds_t time_key;
35-
flb_sds_t columns;
43+
flb_sds_t date_key; /* internal use */
44+
45+
/* doris stream load headers */
46+
struct mk_list *headers;
47+
48+
int log_request;
49+
int log_progress_interval;
3650

37-
int timeout_second;
51+
struct flb_doris_progress_reporter *reporter;
52+
pthread_t reporter_thread;
3853

3954
/* Upstream connection to the backend server */
4055
struct flb_upstream *u;

plugins/out_doris/doris_conf.c

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,60 @@
2323
#include <fluent-bit/flb_sds.h>
2424
#include <fluent-bit/flb_kv.h>
2525
#include <fluent-bit/flb_record_accessor.h>
26+
#include <fluent-bit/flb_pthread.h>
2627
#include "doris.h"
2728
#include "doris_conf.h"
2829

30+
void *report(void *c) {
31+
struct flb_out_doris *ctx = (struct flb_out_doris *) c;
32+
33+
size_t init_time = cfl_time_now() / 1000000000L;
34+
size_t last_time = init_time;
35+
size_t last_bytes = ctx->reporter->total_bytes;
36+
size_t last_rows = ctx->reporter->total_rows;
37+
38+
size_t cur_time, cur_bytes, cur_rows, total_time, total_speed_mbps, total_speed_rps;
39+
size_t inc_bytes, inc_rows, inc_time, inc_speed_mbps, inc_speed_rps;
40+
41+
flb_plg_info(ctx->ins, "Start progress reporter with interval %d", ctx->log_progress_interval);
42+
43+
while (ctx->log_progress_interval > 0) {
44+
sleep(ctx->log_progress_interval);
45+
46+
cur_time = cfl_time_now() / 1000000000L;
47+
cur_bytes = ctx->reporter->total_bytes;
48+
cur_rows = ctx->reporter->total_rows;
49+
total_time = cur_time - init_time;
50+
total_speed_mbps = cur_bytes / 1024 / 1024 / total_time;
51+
total_speed_rps = cur_rows / total_time;
52+
53+
inc_bytes = cur_bytes - last_bytes;
54+
inc_rows = cur_rows - last_rows;
55+
inc_time = cur_time - last_time;
56+
inc_speed_mbps = inc_bytes / 1024 / 1024 / inc_time;
57+
inc_speed_rps = inc_rows / inc_time;
58+
59+
flb_plg_info(ctx->ins, "total %zu MB %zu ROWS, total speed %zu MB/s %zu R/s, last %zu seconds speed %zu MB/s %zu R/s",
60+
cur_bytes/1024/1024, cur_rows, total_speed_mbps, total_speed_rps,
61+
inc_time, inc_speed_mbps, inc_speed_rps);
62+
63+
last_time = cur_time;
64+
last_bytes = cur_bytes;
65+
last_rows = cur_rows;
66+
}
67+
68+
return NULL;
69+
}
70+
2971
struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins,
3072
struct flb_config *config)
3173
{
3274
int ret;
3375
int io_flags = 0;
76+
const char *tmp;
3477
struct flb_upstream *upstream;
3578
struct flb_out_doris *ctx = NULL;
79+
struct flb_doris_progress_reporter *reporter = NULL;
3680

3781
/* Allocate plugin context */
3882
ctx = flb_calloc(1, sizeof(struct flb_out_doris));
@@ -92,13 +136,43 @@ struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins,
92136
/* url: /api/{database}/{table}/_stream_load */
93137
snprintf(ctx->uri, sizeof(ctx->uri) - 1, "/api/%s/%s/_stream_load", ctx->database, ctx->table);
94138

139+
/* Date key */
140+
ctx->date_key = ctx->time_key;
141+
tmp = flb_output_get_property("time_key", ins);
142+
if (tmp) {
143+
/* Just check if we have to disable it */
144+
if (flb_utils_bool(tmp) == FLB_FALSE) {
145+
ctx->date_key = NULL;
146+
}
147+
}
148+
95149
ctx->u = upstream;
96150
ctx->host = ins->host.name;
97151
ctx->port = ins->host.port;
98152

99153
/* Set instance flags into upstream */
100154
flb_output_upstream_set(ctx->u, ins);
101155

156+
/* create and start the progress reporter */
157+
if (ctx->log_progress_interval > 0) {
158+
reporter = flb_calloc(1, sizeof(struct flb_doris_progress_reporter));
159+
if (!reporter) {
160+
flb_plg_error(ins, "failed to create progress reporter");
161+
flb_doris_conf_destroy(ctx);
162+
return NULL;
163+
}
164+
reporter->total_bytes = 0;
165+
reporter->total_rows = 0;
166+
reporter->failed_rows = 0;
167+
ctx->reporter = reporter;
168+
169+
if(pthread_create(&ctx->reporter_thread, NULL, report, (void *) ctx)) {
170+
flb_plg_error(ins, "failed to create progress reporter");
171+
flb_doris_conf_destroy(ctx);
172+
return NULL;
173+
}
174+
}
175+
102176
return ctx;
103177
}
104178

@@ -112,5 +186,10 @@ void flb_doris_conf_destroy(struct flb_out_doris *ctx)
112186
flb_upstream_destroy(ctx->u);
113187
}
114188

189+
if (ctx->reporter) {
190+
pthread_cancel(ctx->reporter_thread);
191+
flb_free(ctx->reporter);
192+
}
193+
115194
flb_free(ctx);
116195
}

0 commit comments

Comments
 (0)