Skip to content

Commit

Permalink
coprocessor: disable coproccesor when dir config is not presented. (t…
Browse files Browse the repository at this point in the history
…ikv#10922)

* coprocessor: disable coprocessor when plugin dir is not presented.

Signed-off-by: Peng Guanwen <[email protected]>

* format code

Signed-off-by: Peng Guanwen <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
peng1999 and ti-chi-bot authored Sep 14, 2021
1 parent fa10c34 commit 0a23699
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
3 changes: 1 addition & 2 deletions etc/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,7 @@
[coprocessor-v2]
## Path to the directory where compiled coprocessor plugins are located.
## Plugins in this directory will be automatically loaded by TiKV.
## If a coprocessor plugin file is deleted, it will be automatically unloaded.
## If the config value is not set, hot-reloading will be disabled.
## If the config value is not set, the coprocessor plugin will be disabled.
# coprocessor-plugin-directory = "./coprocessors"

[rocksdb]
Expand Down
54 changes: 30 additions & 24 deletions src/coprocessor_v2/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,32 @@ enum CoprocessorError {
/// A pool to build and run Coprocessor request handlers.
#[derive(Clone)]
pub struct Endpoint {
plugin_registry: Arc<PluginRegistry>,
plugin_registry: Option<Arc<PluginRegistry>>,
}

impl tikv_util::AssertSend for Endpoint {}

impl Endpoint {
pub fn new(copr_cfg: &Config) -> Self {
let mut plugin_registry = PluginRegistry::new();

// Enable hot-reloading of plugins if the user configured a directory.
if let Some(plugin_directory) = &copr_cfg.coprocessor_plugin_directory {
let r = plugin_registry.start_hot_reloading(plugin_directory);
if let Err(err) = r {
warn!("unable to start hot-reloading for coprocessor plugins.";
"coprocessor_directory" => plugin_directory.display(),
"error" => ?err);
}
}

Self {
plugin_registry: Arc::new(plugin_registry),
}
let plugin_registry =
copr_cfg
.coprocessor_plugin_directory
.as_ref()
.map(|plugin_directory| {
let mut plugin_registry = PluginRegistry::new();

if let Err(err) = plugin_registry.start_hot_reloading(plugin_directory) {
warn!(
"unable to start hot-reloading for coprocessor plugins.";
"coprocessor_directory" => plugin_directory.display(),
"error" => ?err
);
}

Arc::new(plugin_registry)
});

Self { plugin_registry }
}

/// Handles a request to the coprocessor framework.
Expand Down Expand Up @@ -72,15 +76,17 @@ impl Endpoint {
storage: &Storage<E, L>,
mut req: kvrpcpb::RawCoprocessorRequest,
) -> Result<RawResponse, CoprocessorError> {
let plugin = self
let plugin_registry = self
.plugin_registry
.get_plugin(&req.copr_name)
.ok_or_else(|| {
CoprocessorError::Other(format!(
"No registered coprocessor with name '{}'",
req.copr_name
))
})?;
.as_ref()
.ok_or_else(|| CoprocessorError::Other("Coprocessor plugin is disabled!".to_owned()))?;

let plugin = plugin_registry.get_plugin(&req.copr_name).ok_or_else(|| {
CoprocessorError::Other(format!(
"No registered coprocessor with name '{}'",
req.copr_name
))
})?;

// Check whether the found plugin satisfies the version constraint.
let version_req = VersionReq::parse(&req.copr_version_req)
Expand Down

0 comments on commit 0a23699

Please sign in to comment.