@@ -66,8 +66,9 @@ static int cb_doris_init(struct flb_output_instance *ins,
66
66
}
67
67
68
68
static int http_put (struct flb_out_doris * ctx ,
69
- const void * body , size_t body_len ,
70
- const char * tag , int tag_len )
69
+ const char * host , int port ,
70
+ const void * body , size_t body_len ,
71
+ const char * tag , int tag_len )
71
72
{
72
73
int ret ;
73
74
int out_ret = FLB_OK ;
@@ -79,7 +80,16 @@ static int http_put(struct flb_out_doris *ctx,
79
80
struct flb_http_client * c ;
80
81
81
82
/* Get upstream context and connection */
82
- u = ctx -> u ;
83
+ if (strcmp (host , ctx -> host ) == 0 && port == ctx -> port ) {
84
+ u = ctx -> u ;
85
+ }
86
+ else {
87
+ u = flb_upstream_create (ctx -> u -> base .config ,
88
+ host ,
89
+ port ,
90
+ ctx -> u -> base .flags ,
91
+ ctx -> u -> base .tls_context );
92
+ }
83
93
u_conn = flb_upstream_conn_get (u );
84
94
if (!u_conn ) {
85
95
flb_plg_error (ctx -> ins , "no upstream connections available to %s:%i" ,
@@ -94,7 +104,7 @@ static int http_put(struct flb_out_doris *ctx,
94
104
/* Create HTTP client context */
95
105
c = flb_http_client (u_conn , FLB_HTTP_PUT , ctx -> uri ,
96
106
payload_buf , payload_size ,
97
- ctx -> host , ctx -> port ,
107
+ host , port ,
98
108
NULL , 0 );
99
109
100
110
/*
@@ -120,13 +130,32 @@ static int http_put(struct flb_out_doris *ctx,
120
130
121
131
ret = flb_http_do (c , & b_sent );
122
132
if (ret == 0 ) {
123
- flb_plg_info (ctx -> ins , "%s:%i, HTTP status=%i\n%s\n" ,
124
- ctx -> host , ctx -> port ,
125
- c -> resp .status , c -> resp .payload );
126
- if (c -> resp .payload_size > 0 &&
127
- (strstr (c -> resp .payload , "\"Status\": \"Success\"" ) != NULL ||
128
- strstr (c -> resp .payload , "\"Status\": \"Publish Timeout\"" ) != NULL )) {
133
+ flb_plg_debug (ctx -> ins , "%s:%i, HTTP status=%i\n%s\n" ,
134
+ host , port ,
135
+ c -> resp .status , c -> resp .payload );
136
+ if (c -> resp .status == 307 ) { // redict
137
+ // example: Location: http://admin:[email protected] :8040/api/d_fb/t_fb/_stream_load?
138
+ char * location = strstr (c -> resp .data , "Location:" );
139
+ char * start = strstr (location , "@" ) + 1 ;
140
+ char * mid = strstr (start , ":" );
141
+ char * end = strstr (mid , "/api" );
142
+ char redict_host [50 ] = {0 };
143
+ memcpy (redict_host , start , mid - start );
144
+ char redict_port [10 ] = {0 };
145
+ memcpy (redict_port , mid + 1 , end - (mid + 1 ));
146
+
147
+ out_ret = http_put (ctx , redict_host , atoi (redict_port ),
148
+ body , body_len , tag , tag_len );
149
+ }
150
+ else if (c -> resp .status == 200 ) {
151
+ if (c -> resp .payload_size > 0 &&
152
+ (strstr (c -> resp .payload , "\"Status\": \"Success\"" ) != NULL ||
153
+ strstr (c -> resp .payload , "\"Status\": \"Publish Timeout\"" ) != NULL )) {
129
154
// continue
155
+ }
156
+ else {
157
+ out_ret = FLB_RETRY ;
158
+ }
130
159
}
131
160
else {
132
161
out_ret = FLB_RETRY ;
@@ -154,6 +183,11 @@ static int http_put(struct flb_out_doris *ctx,
154
183
/* Release the TCP connection */
155
184
flb_upstream_conn_release (u_conn );
156
185
186
+ /* Release flb_upstream */
187
+ if (u != ctx -> u ) {
188
+ flb_upstream_destroy (u );
189
+ }
190
+
157
191
return out_ret ;
158
192
}
159
193
@@ -169,7 +203,7 @@ static int compose_payload(struct flb_out_doris *ctx,
169
203
encoded = flb_pack_msgpack_to_json_format (in_body ,
170
204
in_size ,
171
205
FLB_PACK_JSON_FORMAT_JSON ,
172
- FLB_PACK_JSON_DATE_DOUBLE ,
206
+ FLB_PACK_JSON_DATE_EPOCH ,
173
207
ctx -> time_key );
174
208
if (encoded == NULL ) {
175
209
flb_plg_error (ctx -> ins , "failed to convert json" );
@@ -178,7 +212,7 @@ static int compose_payload(struct flb_out_doris *ctx,
178
212
* out_body = (void * )encoded ;
179
213
* out_size = flb_sds_len (encoded );
180
214
181
- flb_plg_info (ctx -> ins , "%s" , (char * ) * out_body );
215
+ flb_plg_debug (ctx -> ins , "http body: %s" , (char * ) * out_body );
182
216
183
217
return FLB_OK ;
184
218
}
@@ -202,8 +236,8 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk,
202
236
FLB_OUTPUT_RETURN (ret );
203
237
}
204
238
205
- ret = http_put (ctx , out_body , out_size ,
206
- event_chunk -> tag , flb_sds_len (event_chunk -> tag ));
239
+ ret = http_put (ctx , ctx -> host , ctx -> port , out_body , out_size ,
240
+ event_chunk -> tag , flb_sds_len (event_chunk -> tag ));
207
241
flb_sds_destroy (out_body );
208
242
209
243
FLB_OUTPUT_RETURN (ret );
@@ -219,8 +253,6 @@ static int cb_doris_exit(void *data, struct flb_config *config)
219
253
220
254
/* Configuration properties map */
221
255
static struct flb_config_map config_map [] = {
222
- // host
223
- // port
224
256
// user
225
257
{
226
258
FLB_CONFIG_MAP_STR , "user" , NULL ,
0 commit comments