diff --git a/include/fluent-bit/flb_api.h b/include/fluent-bit/flb_api.h index 0559a5b2e58..4b87adde413 100644 --- a/include/fluent-bit/flb_api.h +++ b/include/fluent-bit/flb_api.h @@ -22,6 +22,7 @@ #include #include +#include struct flb_api { const char *(*output_get_property) (const char *, struct flb_output_instance *); @@ -33,6 +34,11 @@ struct flb_api { void (*log_print) (int, const char*, int, const char*, ...); int (*input_log_check) (struct flb_input_instance *, int); int (*output_log_check) (struct flb_output_instance *, int); + + /* To preserve ABI, we need to add these APIs after the + * input/output definitions. */ + const char *(*custom_get_property) (const char *, struct flb_custom_instance *); + int (*custom_log_check) (struct flb_custom_instance *, int); }; #ifdef FLB_CORE diff --git a/include/fluent-bit/flb_custom.h b/include/fluent-bit/flb_custom.h index 4c296f8ff04..c589e90f411 100644 --- a/include/fluent-bit/flb_custom.h +++ b/include/fluent-bit/flb_custom.h @@ -31,9 +31,20 @@ #define FLB_CUSTOM_NET_CLIENT 1 /* custom may use upstream net.* properties */ #define FLB_CUSTOM_NET_SERVER 2 /* custom may use downstream net.* properties */ +/* Custom plugin types */ +#define FLB_CUSTOM_PLUGIN_CORE 0 +#define FLB_CUSTOM_PLUGIN_PROXY 1 + struct flb_custom_instance; struct flb_custom_plugin { + /* + * The type defines if this is a core-based plugin or it's handled by + * some specific proxy. + */ + int type; + void *proxy; + int flags; /* Flags (not available at the moment */ char *name; /* Custom plugin short name */ char *description; /* Description */ @@ -96,5 +107,6 @@ int flb_custom_plugin_property_check(struct flb_custom_instance *ins, int flb_custom_init_all(struct flb_config *config); void flb_custom_set_context(struct flb_custom_instance *ins, void *context); void flb_custom_instance_destroy(struct flb_custom_instance *ins); +int flb_custom_log_check(struct flb_custom_instance *ins, int l); #endif diff --git a/include/fluent-bit/flb_plugin_proxy.h b/include/fluent-bit/flb_plugin_proxy.h index 97ceb7ff5d2..3eb885562b1 100644 --- a/include/fluent-bit/flb_plugin_proxy.h +++ b/include/fluent-bit/flb_plugin_proxy.h @@ -28,6 +28,7 @@ /* Plugin Types */ #define FLB_PROXY_INPUT_PLUGIN 1 #define FLB_PROXY_OUTPUT_PLUGIN 2 +#define FLB_PROXY_CUSTOM_PLUGIN 3 /* Proxies available */ #define FLB_PROXY_GOLANG 11 diff --git a/src/flb_api.c b/src/flb_api.c index c11c1712c28..e7e84f349f8 100644 --- a/src/flb_api.c +++ b/src/flb_api.c @@ -24,6 +24,7 @@ #include #include +#include struct flb_api *flb_api_create() { @@ -37,6 +38,7 @@ struct flb_api *flb_api_create() api->output_get_property = flb_output_get_property; api->input_get_property = flb_input_get_property; + api->custom_get_property = flb_custom_get_property; #ifdef FLB_HAVE_METRICS api->output_get_cmt_instance = flb_output_get_cmt_instance; @@ -46,6 +48,7 @@ struct flb_api *flb_api_create() api->log_print = flb_log_print; api->input_log_check = flb_input_log_check; api->output_log_check = flb_output_log_check; + api->custom_log_check = flb_custom_log_check; return api; } diff --git a/src/flb_custom.c b/src/flb_custom.c index 94b6c180444..80ec9484b22 100644 --- a/src/flb_custom.c +++ b/src/flb_custom.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -183,6 +184,24 @@ struct flb_custom_instance *flb_custom_new(struct flb_config *config, snprintf(instance->name, sizeof(instance->name) - 1, "%s.%i", plugin->name, id); + if (plugin->type == FLB_CUSTOM_PLUGIN_CORE) { + instance->context = NULL; + } + else { + struct flb_plugin_proxy_context *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context)); + if (!ctx) { + flb_errno(); + flb_free(instance); + return NULL; + } + + ctx->proxy = plugin->proxy; + + instance->context = ctx; + } + instance->id = id; instance->alias = NULL; instance->p = plugin; @@ -352,3 +371,15 @@ void flb_custom_set_context(struct flb_custom_instance *ins, void *context) { ins->context = context; } + +/* Check custom plugin's log level. + * Not for core plugins but for Golang plugins. + * Golang plugins do not have thread-local flb_worker_ctx information. */ +int flb_custom_log_check(struct flb_custom_instance *ins, int l) +{ + if (ins->log_level < l) { + return FLB_FALSE; + } + + return FLB_TRUE; +} diff --git a/src/flb_plugin_proxy.c b/src/flb_plugin_proxy.c index c8f318afbbd..018674ed992 100644 --- a/src/flb_plugin_proxy.c +++ b/src/flb_plugin_proxy.c @@ -33,6 +33,7 @@ #include #include #include +#include /* Proxies */ #include "proxy/go/go.h" @@ -422,6 +423,42 @@ static int flb_proxy_register_input(struct flb_plugin_proxy *proxy, return 0; } +int flb_proxy_custom_cb_init(struct flb_custom_instance *c_ins, + struct flb_config *config, void *data); + +static int flb_proxy_custom_cb_exit(void *custom_context, + struct flb_config *config); + +static int flb_proxy_register_custom(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def, + struct flb_config *config) +{ + struct flb_custom_plugin *custom; + + custom = flb_calloc(1, sizeof(struct flb_custom_plugin)); + if (!custom) { + flb_errno(); + return -1; + } + + /* Plugin registration */ + custom->type = FLB_CUSTOM_PLUGIN_PROXY; + custom->proxy = proxy; + custom->flags = def->flags; + custom->name = flb_strdup(def->name); + custom->description = def->description; + mk_list_add(&custom->_head, &config->custom_plugins); + + /* + * Set proxy callbacks: external plugins which are not following + * the core plugins specs, have a different callback approach, so + * we put our proxy-middle callbacks to do the translation properly. + */ + custom->cb_init = flb_proxy_custom_cb_init; + custom->cb_exit = flb_proxy_custom_cb_exit; + return 0; +} + void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy, const char *symbol) { @@ -487,6 +524,9 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy, } else if (def->type == FLB_PROXY_INPUT_PLUGIN) { ret = proxy_go_input_register(proxy, def); + } + else if (def->type == FLB_PROXY_CUSTOM_PLUGIN) { + ret = proxy_go_custom_register(proxy, def); } #endif } @@ -501,6 +541,9 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy, else if (def->type == FLB_PROXY_INPUT_PLUGIN) { flb_proxy_register_input(proxy, def, config); } + else if (def->type == FLB_PROXY_CUSTOM_PLUGIN) { + flb_proxy_register_custom(proxy, def, config); + } } return 0; @@ -616,3 +659,51 @@ int flb_plugin_proxy_set(struct flb_plugin_proxy_def *def, int type, return 0; } + +int flb_proxy_custom_cb_init(struct flb_custom_instance *c_ins, + struct flb_config *config, void *data) +{ + int ret = -1; + struct flb_plugin_proxy_context *pc; + struct flb_plugin_proxy *proxy; + + pc = (struct flb_plugin_proxy_context *)(c_ins->context); + proxy = pc->proxy; + + /* Before to initialize, set the instance reference */ + pc->proxy->instance = c_ins; + + if (proxy->def->proxy == FLB_PROXY_GOLANG) { +#ifdef FLB_HAVE_PROXY_GO + ret = proxy_go_custom_init(proxy); +#endif + } + + if (ret == -1) { + flb_error("[custom] could not initialize '%s' plugin", + c_ins->p->name); + return -1; + } + + return 0; +} + +int flb_proxy_custom_cb_exit(void *custom_context, + struct flb_config *config) +{ + int ret = -1; + struct flb_plugin_proxy_context *ctx = custom_context; + struct flb_plugin_proxy *proxy = (ctx->proxy); + if (!custom_context) { + return ret; + } + + if (proxy->def->proxy == FLB_PROXY_GOLANG) { +#ifdef FLB_HAVE_PROXY_GO + ret = proxy_go_custom_destroy(ctx); +#endif + } + + flb_free(ctx); + return ret; +} diff --git a/src/proxy/go/go.c b/src/proxy/go/go.c index 540005a8d9d..2045124df2c 100644 --- a/src/proxy/go/go.c +++ b/src/proxy/go/go.c @@ -21,6 +21,7 @@ #include #include #include +#include #include "./go.h" /* @@ -41,7 +42,7 @@ * * - name: shortname of the plugin. * - description: plugin description. - * - type: input, output, filter, whatever. + * - type: input, output, filter, custom, whatever. * - proxy: type of proxy e.g. GOLANG * - flags: optional flags, not used by Go plugins at the moment. * @@ -286,3 +287,87 @@ void proxy_go_input_unregister(void *data) { flb_free(plugin->name); flb_free(plugin); } + +int proxy_go_custom_register(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def) +{ + struct flbgo_custom_plugin *plugin; + + plugin = flb_malloc(sizeof(struct flbgo_custom_plugin)); + if (!plugin) { + flb_errno(); + return -1; + } + + /* + * Lookup the entry point function: + * + * - FLBPluginInit + * - FLBPluginExit + * + * note: registration callback FLBPluginRegister() is resolved by the + * parent proxy interface. + */ + + plugin->cb_init = flb_plugin_proxy_symbol(proxy, "FLBPluginInit"); + if (!plugin->cb_init) { + flb_error("[go proxy]: could not load FLBPluginInit symbol"); + flb_free(plugin); + return -1; + } + + plugin->cb_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginExit"); + + plugin->name = flb_strdup(def->name); + + /* This Go plugin context is an opaque data for the parent proxy */ + proxy->data = plugin; + + return 0; +} + +int proxy_go_custom_init(struct flb_plugin_proxy *proxy) +{ + int ret = 0; + struct flbgo_custom_plugin *plugin = proxy->data; + + /* set the API */ + plugin->api = proxy->api; + plugin->i_ins = proxy->instance; + /* In order to avoid having the whole instance as part of the ABI we */ + /* copy the context pointer into the plugin. */ + plugin->context = ((struct flb_custom_instance *)proxy->instance)->context; + + ret = plugin->cb_init(plugin); + if (ret <= 0) { + flb_error("[go proxy]: plugin '%s' failed to initialize", + plugin->name); + flb_free(plugin); + return -1; + } + + return ret; +} + +int proxy_go_custom_destroy(struct flb_plugin_proxy_context *ctx) +{ + int ret = 0; + struct flbgo_custom_plugin *plugin; + + plugin = (struct flbgo_custom_plugin *) ctx->proxy->data; + + if (plugin->cb_exit) { + ret = plugin->cb_exit(); + } + + return ret; +} + +void proxy_go_custom_unregister(void *data) { + struct flbgo_custom_plugin *plugin; + + plugin = (struct flbgo_custom_plugin *) data; + flb_free(plugin->name); + flb_free(plugin); +} + diff --git a/src/proxy/go/go.h b/src/proxy/go/go.h index eed328e24bf..0d011bc323a 100644 --- a/src/proxy/go/go.h +++ b/src/proxy/go/go.h @@ -48,6 +48,16 @@ struct flbgo_input_plugin { int (*cb_exit)(); }; +struct flbgo_custom_plugin { + char *name; + void *api; + void *i_ins; + struct flb_plugin_proxy_context *context; + + int (*cb_init)(); + int (*cb_exit)(); +}; + int proxy_go_output_register(struct flb_plugin_proxy *proxy, struct flb_plugin_proxy_def *def); @@ -69,4 +79,12 @@ int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx, void *allocated_data); int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx); void proxy_go_input_unregister(void *data); + +int proxy_go_custom_register(struct flb_plugin_proxy *proxy, + struct flb_plugin_proxy_def *def); + +int proxy_go_custom_init(struct flb_plugin_proxy *proxy); + +int proxy_go_custom_destroy(struct flb_plugin_proxy_context *ctx); +void proxy_go_custom_unregister(void *data); #endif