Skip to content

api: proxy: custom: Implement APIs for custom Go plugins #10299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/fluent-bit/flb_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_custom.h>

struct flb_api {
const char *(*output_get_property) (const char *, struct flb_output_instance *);
Expand All @@ -33,6 +34,11 @@ struct flb_api {
void (*log_print) (int, const char*, int, const char*, ...);
int (*input_log_check) (struct flb_input_instance *, int);
int (*output_log_check) (struct flb_output_instance *, int);

/* To preserve ABI, we need to add these APIs after the
* input/output definitions. */
const char *(*custom_get_property) (const char *, struct flb_custom_instance *);
int (*custom_log_check) (struct flb_custom_instance *, int);
};

#ifdef FLB_CORE
Expand Down
12 changes: 12 additions & 0 deletions include/fluent-bit/flb_custom.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,20 @@
#define FLB_CUSTOM_NET_CLIENT 1 /* custom may use upstream net.* properties */
#define FLB_CUSTOM_NET_SERVER 2 /* custom may use downstream net.* properties */

/* Custom plugin types */
#define FLB_CUSTOM_PLUGIN_CORE 0
#define FLB_CUSTOM_PLUGIN_PROXY 1

struct flb_custom_instance;

struct flb_custom_plugin {
/*
* The type defines if this is a core-based plugin or it's handled by
* some specific proxy.
*/
int type;
void *proxy;

int flags; /* Flags (not available at the moment */
char *name; /* Custom plugin short name */
char *description; /* Description */
Expand Down Expand Up @@ -96,5 +107,6 @@ int flb_custom_plugin_property_check(struct flb_custom_instance *ins,
int flb_custom_init_all(struct flb_config *config);
void flb_custom_set_context(struct flb_custom_instance *ins, void *context);
void flb_custom_instance_destroy(struct flb_custom_instance *ins);
int flb_custom_log_check(struct flb_custom_instance *ins, int l);

#endif
1 change: 1 addition & 0 deletions include/fluent-bit/flb_plugin_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
/* Plugin Types */
#define FLB_PROXY_INPUT_PLUGIN 1
#define FLB_PROXY_OUTPUT_PLUGIN 2
#define FLB_PROXY_CUSTOM_PLUGIN 3

/* Proxies available */
#define FLB_PROXY_GOLANG 11
Expand Down
3 changes: 3 additions & 0 deletions src/flb_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_custom.h>

struct flb_api *flb_api_create()
{
Expand All @@ -37,6 +38,7 @@ struct flb_api *flb_api_create()

api->output_get_property = flb_output_get_property;
api->input_get_property = flb_input_get_property;
api->custom_get_property = flb_custom_get_property;

#ifdef FLB_HAVE_METRICS
api->output_get_cmt_instance = flb_output_get_cmt_instance;
Expand All @@ -46,6 +48,7 @@ struct flb_api *flb_api_create()
api->log_print = flb_log_print;
api->input_log_check = flb_input_log_check;
api->output_log_check = flb_output_log_check;
api->custom_log_check = flb_custom_log_check;

return api;
}
Expand Down
31 changes: 31 additions & 0 deletions src/flb_custom.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_metrics.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_plugin_proxy.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_downstream.h>
#include <chunkio/chunkio.h>
Expand Down Expand Up @@ -183,6 +184,24 @@ struct flb_custom_instance *flb_custom_new(struct flb_config *config,
snprintf(instance->name, sizeof(instance->name) - 1,
"%s.%i", plugin->name, id);

if (plugin->type == FLB_CUSTOM_PLUGIN_CORE) {
instance->context = NULL;
}
else {
struct flb_plugin_proxy_context *ctx;

ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
if (!ctx) {
flb_errno();
flb_free(instance);
return NULL;
}

ctx->proxy = plugin->proxy;

instance->context = ctx;
}

instance->id = id;
instance->alias = NULL;
instance->p = plugin;
Expand Down Expand Up @@ -352,3 +371,15 @@ void flb_custom_set_context(struct flb_custom_instance *ins, void *context)
{
ins->context = context;
}

/* Check custom plugin's log level.
* Not for core plugins but for Golang plugins.
* Golang plugins do not have thread-local flb_worker_ctx information. */
int flb_custom_log_check(struct flb_custom_instance *ins, int l)
{
if (ins->log_level < l) {
return FLB_FALSE;
}

return FLB_TRUE;
}
91 changes: 91 additions & 0 deletions src/flb_plugin_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_plugin_proxy.h>
#include <fluent-bit/flb_input_log.h>
#include <fluent-bit/flb_custom.h>

/* Proxies */
#include "proxy/go/go.h"
Expand Down Expand Up @@ -422,6 +423,42 @@ static int flb_proxy_register_input(struct flb_plugin_proxy *proxy,
return 0;
}

int flb_proxy_custom_cb_init(struct flb_custom_instance *c_ins,
struct flb_config *config, void *data);

static int flb_proxy_custom_cb_exit(void *custom_context,
struct flb_config *config);

static int flb_proxy_register_custom(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def,
struct flb_config *config)
{
struct flb_custom_plugin *custom;

custom = flb_calloc(1, sizeof(struct flb_custom_plugin));
if (!custom) {
flb_errno();
return -1;
}

/* Plugin registration */
custom->type = FLB_CUSTOM_PLUGIN_PROXY;
custom->proxy = proxy;
custom->flags = def->flags;
custom->name = flb_strdup(def->name);
custom->description = def->description;
mk_list_add(&custom->_head, &config->custom_plugins);

/*
* Set proxy callbacks: external plugins which are not following
* the core plugins specs, have a different callback approach, so
* we put our proxy-middle callbacks to do the translation properly.
*/
custom->cb_init = flb_proxy_custom_cb_init;
custom->cb_exit = flb_proxy_custom_cb_exit;
return 0;
}

void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy,
const char *symbol)
{
Expand Down Expand Up @@ -487,6 +524,9 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
}
else if (def->type == FLB_PROXY_INPUT_PLUGIN) {
ret = proxy_go_input_register(proxy, def);
}
else if (def->type == FLB_PROXY_CUSTOM_PLUGIN) {
ret = proxy_go_custom_register(proxy, def);
}
#endif
}
Expand All @@ -501,6 +541,9 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
else if (def->type == FLB_PROXY_INPUT_PLUGIN) {
flb_proxy_register_input(proxy, def, config);
}
else if (def->type == FLB_PROXY_CUSTOM_PLUGIN) {
flb_proxy_register_custom(proxy, def, config);
}
}

return 0;
Expand Down Expand Up @@ -616,3 +659,51 @@ int flb_plugin_proxy_set(struct flb_plugin_proxy_def *def, int type,

return 0;
}

int flb_proxy_custom_cb_init(struct flb_custom_instance *c_ins,
struct flb_config *config, void *data)
{
int ret = -1;
struct flb_plugin_proxy_context *pc;
struct flb_plugin_proxy *proxy;

pc = (struct flb_plugin_proxy_context *)(c_ins->context);
proxy = pc->proxy;

/* Before to initialize, set the instance reference */
pc->proxy->instance = c_ins;

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
ret = proxy_go_custom_init(proxy);
#endif
}

if (ret == -1) {
flb_error("[custom] could not initialize '%s' plugin",
c_ins->p->name);
return -1;
}

return 0;
}

int flb_proxy_custom_cb_exit(void *custom_context,
struct flb_config *config)
{
int ret = -1;
struct flb_plugin_proxy_context *ctx = custom_context;
struct flb_plugin_proxy *proxy = (ctx->proxy);
if (!custom_context) {
return ret;
}

if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
ret = proxy_go_custom_destroy(ctx);
#endif
}

flb_free(ctx);
return ret;
}
87 changes: 86 additions & 1 deletion src/proxy/go/go.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_plugin_proxy.h>
#include <fluent-bit/flb_output.h>
#include <fluent-bit/flb_custom.h>
#include "./go.h"

/*
Expand All @@ -41,7 +42,7 @@
*
* - name: shortname of the plugin.
* - description: plugin description.
* - type: input, output, filter, whatever.
* - type: input, output, filter, custom, whatever.
* - proxy: type of proxy e.g. GOLANG
* - flags: optional flags, not used by Go plugins at the moment.
*
Expand Down Expand Up @@ -286,3 +287,87 @@ void proxy_go_input_unregister(void *data) {
flb_free(plugin->name);
flb_free(plugin);
}

int proxy_go_custom_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def)
{
struct flbgo_custom_plugin *plugin;

plugin = flb_malloc(sizeof(struct flbgo_custom_plugin));
if (!plugin) {
flb_errno();
return -1;
}

/*
* Lookup the entry point function:
*
* - FLBPluginInit
* - FLBPluginExit
*
* note: registration callback FLBPluginRegister() is resolved by the
* parent proxy interface.
*/

plugin->cb_init = flb_plugin_proxy_symbol(proxy, "FLBPluginInit");
if (!plugin->cb_init) {
flb_error("[go proxy]: could not load FLBPluginInit symbol");
flb_free(plugin);
return -1;
}

plugin->cb_exit = flb_plugin_proxy_symbol(proxy, "FLBPluginExit");

plugin->name = flb_strdup(def->name);

/* This Go plugin context is an opaque data for the parent proxy */
proxy->data = plugin;

return 0;
}

int proxy_go_custom_init(struct flb_plugin_proxy *proxy)
{
int ret = 0;
struct flbgo_custom_plugin *plugin = proxy->data;

/* set the API */
plugin->api = proxy->api;
plugin->i_ins = proxy->instance;
/* In order to avoid having the whole instance as part of the ABI we */
/* copy the context pointer into the plugin. */
plugin->context = ((struct flb_custom_instance *)proxy->instance)->context;

ret = plugin->cb_init(plugin);
if (ret <= 0) {
flb_error("[go proxy]: plugin '%s' failed to initialize",
plugin->name);
flb_free(plugin);
return -1;
}

return ret;
}

int proxy_go_custom_destroy(struct flb_plugin_proxy_context *ctx)
{
int ret = 0;
struct flbgo_custom_plugin *plugin;

plugin = (struct flbgo_custom_plugin *) ctx->proxy->data;

if (plugin->cb_exit) {
ret = plugin->cb_exit();
}

return ret;
}

void proxy_go_custom_unregister(void *data) {
struct flbgo_custom_plugin *plugin;

plugin = (struct flbgo_custom_plugin *) data;
flb_free(plugin->name);
flb_free(plugin);
}

18 changes: 18 additions & 0 deletions src/proxy/go/go.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ struct flbgo_input_plugin {
int (*cb_exit)();
};

struct flbgo_custom_plugin {
char *name;
void *api;
void *i_ins;
struct flb_plugin_proxy_context *context;

int (*cb_init)();
int (*cb_exit)();
};

int proxy_go_output_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def);

Expand All @@ -69,4 +79,12 @@ int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx,
void *allocated_data);
int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx);
void proxy_go_input_unregister(void *data);

int proxy_go_custom_register(struct flb_plugin_proxy *proxy,
struct flb_plugin_proxy_def *def);

int proxy_go_custom_init(struct flb_plugin_proxy *proxy);

int proxy_go_custom_destroy(struct flb_plugin_proxy_context *ctx);
void proxy_go_custom_unregister(void *data);
#endif
Loading