Skip to content

Commit

Permalink
coprocessor: Prevent reloading from unloaded plugins (tikv#10875)
Browse files Browse the repository at this point in the history
* coprocessor: Prevent reloading from unloaded plugins

Signed-off-by: Peng Guanwen <[email protected]>

* coprocessor: Don't unload when file deleted/changed

Signed-off-by: Peng Guanwen <[email protected]>

* coprocessor: modify test to adapt new behavior

Signed-off-by: Peng Guanwen <[email protected]>

* coprocessor_v2: Update comment

Co-authored-by: Andy Lok <[email protected]>
Signed-off-by: Peng Guanwen <[email protected]>

* Update outdated doc comments

Signed-off-by: Peng Guanwen <[email protected]>

Co-authored-by: Andy Lok <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
3 people authored Sep 14, 2021
1 parent 0a23699 commit 6be3893
Showing 1 changed file with 27 additions and 13 deletions.
40 changes: 27 additions & 13 deletions src/coprocessor_v2/plugin_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use std::path::{Path, PathBuf};
use std::sync::{mpsc, Arc, RwLock};
use std::thread;
use std::time::Duration;
use std::{collections::HashMap, ops::Range};
use std::{
collections::{HashMap, HashSet},
ops::Range,
};
use thiserror::Error;

#[derive(Debug, Error)]
Expand Down Expand Up @@ -43,6 +46,9 @@ pub enum PluginLoadingError {
plugin_api: String,
tikv_api: String,
},

#[error("unloaded plugin `{path:?}` cannot be reloaded")]
ReloadError { path: OsString },
}

/// Helper function for error handling.
Expand Down Expand Up @@ -126,12 +132,6 @@ impl PluginRegistry {
hot_reload_registry.load_plugin(file).ok();
}
};
let unload = |file: &PathBuf| {
let mut hot_reload_registry = hot_reload_registry.write().unwrap();
if let Some(plugin) = hot_reload_registry.get_plugin_by_path(file) {
hot_reload_registry.unload_plugin(plugin.name());
}
};
let rename = |old_path: &PathBuf, new_path: &PathBuf| {
let mut hot_reload_registry = hot_reload_registry.write().unwrap();
if let Some(plugin) = hot_reload_registry.get_plugin_by_path(old_path) {
Expand All @@ -145,17 +145,17 @@ impl PluginRegistry {
maybe_load(&file);
}
Ok(DebouncedEvent::Remove(file)) => {
unload(&file);
// We cannot do much when the file is deleted. See issue #10854
warn!("a loaded coprocessor plugin is removed. Be aware that original plugin is still running"; "plugin_path" => ?file);
}
Ok(DebouncedEvent::Rename(old_file, new_file)) => {
// If the file is renamed with a different parent directory, we will receive a `Remove` instead.
debug_assert!(old_file.parent() == new_file.parent());
rename(&old_file, &new_file);
}
Ok(DebouncedEvent::Write(file)) => {
warn!("another process is overwriting a coprocessor plugin while the plugin is loaded. This can lead to severe issues!"; "plugin_path" => ?file);
unload(&file);
maybe_load(&file);
// We cannot do much when the file is deleted. See issue #10854
warn!("another process is overwriting a coprocessor plugin while the plugin is loaded. Be aware that original plugin is still running"; "plugin_path" => ?file);
}
Ok(_) => (),
Err(_) => break, // Stop when watcher is dropped.
Expand Down Expand Up @@ -278,6 +278,10 @@ struct PluginRegistryInner {
/// Plugins that are currently loaded.
/// Provides a mapping from the plugin's name to the actual instance.
loaded_plugins: HashMap<String, (OsString, Arc<LoadedPlugin>)>,

/// Original paths that plugins loaded from.
/// Files in this list should not be loaded again.
library_paths: HashSet<OsString>,
}

impl PluginRegistryInner {
Expand Down Expand Up @@ -306,12 +310,22 @@ impl PluginRegistryInner {
&mut self,
filename: P,
) -> Result<String, PluginLoadingError> {
if self.library_paths.contains(filename.as_ref()) {
let err = Err(PluginLoadingError::ReloadError {
path: filename.as_ref().to_owned(),
});
let filename = filename.as_ref().to_string_lossy();
error!("Unloaded plugin should not load again!"; "plugin_path" => ?filename, "error" => ?err);
return err;
}
let plugin = unsafe { LoadedPlugin::new(&filename) };
if let Err(err) = &plugin {
let filename = filename.as_ref().to_string_lossy();
warn!("failed to load coprocessor plugin. Maybe not compiled correctly as a TiKV plugin?"; "plugin_path" => ?filename, "error" => ?err);
}
let plugin = plugin?;
// plugin successfully loaded, add path to library_paths.
self.library_paths.insert(filename.as_ref().to_owned());

let plugin_name = plugin.name().to_string();

Expand Down Expand Up @@ -566,11 +580,11 @@ mod tests {
&library_path_2
);

// trigger unloading
std::fs::remove_file(&library_path_2).unwrap();
// fs watcher detects changes in every 3 seconds, therefore, wait 4 seconds so as to make sure the watcher is triggered.
std::thread::sleep(Duration::from_secs(4));

assert!(registry.get_plugin(plugin_name).is_none());
// plugin will not be unloadad
assert!(registry.get_plugin(plugin_name).is_some());
}
}

0 comments on commit 6be3893

Please sign in to comment.