Skip to content

Commit 70f2b1a

Browse files
authored
add ballista plugin manager and udf plugin (#2131)
1 parent 9815ac6 commit 70f2b1a

File tree

9 files changed

+476
-2
lines changed

9 files changed

+476
-2
lines changed

ballista/rust/core/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ datafusion = { path = "../../../datafusion/core", version = "7.0.0" }
4242
datafusion-proto = { path = "../../../datafusion/proto", version = "7.0.0" }
4343
futures = "0.3"
4444
hashbrown = "0.12"
45+
46+
libloading = "0.7.3"
4547
log = "0.4"
48+
once_cell = "1.9.0"
4649

4750
parking_lot = "0.12"
4851
parse_arg = "0.1.3"
@@ -53,9 +56,11 @@ sqlparser = "0.16"
5356
tokio = "1.0"
5457
tonic = "0.6"
5558
uuid = { version = "0.8", features = ["v4"] }
59+
walkdir = "2.3.2"
5660

5761
[dev-dependencies]
5862
tempfile = "3"
5963

6064
[build-dependencies]
65+
rustc_version = "0.4.0"
6166
tonic-build = { version = "0.6" }

ballista/rust/core/build.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ fn main() -> Result<(), String> {
2020
println!("cargo:rerun-if-env-changed=FORCE_REBUILD");
2121

2222
println!("cargo:rerun-if-changed=proto/ballista.proto");
23+
let version = rustc_version::version().unwrap();
24+
println!("cargo:rustc-env=RUSTC_VERSION={}", version);
2325
println!("cargo:rerun-if-changed=proto/datafusion.proto");
2426
tonic_build::configure()
2527
.extern_path(".datafusion", "::datafusion_proto::protobuf")

ballista/rust/core/src/config.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ pub const BALLISTA_REPARTITION_AGGREGATIONS: &str = "ballista.repartition.aggreg
3434
pub const BALLISTA_REPARTITION_WINDOWS: &str = "ballista.repartition.windows";
3535
pub const BALLISTA_PARQUET_PRUNING: &str = "ballista.parquet.pruning";
3636
pub const BALLISTA_WITH_INFORMATION_SCHEMA: &str = "ballista.with_information_schema";
37+
/// give a plugin files dir, and then the dynamic library files in this dir will be load when scheduler state init.
38+
pub const BALLISTA_PLUGIN_DIR: &str = "ballista.plugin_dir";
3739

3840
pub type ParseResult<T> = result::Result<T, String>;
3941

@@ -139,6 +141,9 @@ impl BallistaConfig {
139141
.parse::<bool>()
140142
.map_err(|e| format!("{:?}", e))?;
141143
}
144+
DataType::Utf8 => {
145+
val.to_string();
146+
}
142147
_ => {
143148
return Err(format!("not support data type: {}", data_type));
144149
}
@@ -171,6 +176,9 @@ impl BallistaConfig {
171176
ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(),
172177
"Sets whether enable information_schema".to_string(),
173178
DataType::Boolean,Some("false".to_string())),
179+
ConfigEntry::new(BALLISTA_PLUGIN_DIR.to_string(),
180+
"Sets the plugin dir".to_string(),
181+
DataType::Utf8,Some("".to_string())),
174182
];
175183
entries
176184
.iter()
@@ -186,6 +194,10 @@ impl BallistaConfig {
186194
self.get_usize_setting(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS)
187195
}
188196

197+
pub fn default_plugin_dir(&self) -> String {
198+
self.get_string_setting(BALLISTA_PLUGIN_DIR)
199+
}
200+
189201
pub fn default_batch_size(&self) -> usize {
190202
self.get_usize_setting(BALLISTA_DEFAULT_BATCH_SIZE)
191203
}
@@ -233,6 +245,17 @@ impl BallistaConfig {
233245
v.parse::<bool>().unwrap()
234246
}
235247
}
248+
fn get_string_setting(&self, key: &str) -> String {
249+
if let Some(v) = self.settings.get(key) {
250+
// infallible because we validate all configs in the constructor
251+
v.to_string()
252+
} else {
253+
let entries = Self::valid_entries();
254+
// infallible because we validate all configs in the constructor
255+
let v = entries.get(key).unwrap().default_value.as_ref().unwrap();
256+
v.to_string()
257+
}
258+
}
236259
}
237260

238261
// an enum used to configure the scheduler policy
@@ -266,6 +289,7 @@ mod tests {
266289
let config = BallistaConfig::new()?;
267290
assert_eq!(2, config.default_shuffle_partitions());
268291
assert!(!config.default_with_information_schema());
292+
assert_eq!("", config.default_plugin_dir().as_str());
269293
Ok(())
270294
}
271295

@@ -284,6 +308,7 @@ mod tests {
284308
fn custom_config_invalid() -> Result<()> {
285309
let config = BallistaConfig::builder()
286310
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "true")
311+
.set(BALLISTA_PLUGIN_DIR, "test_dir")
287312
.build();
288313
assert!(config.is_err());
289314
assert_eq!("General(\"Failed to parse user-supplied value 'ballista.shuffle.partitions' for configuration setting 'true': ParseIntError { kind: InvalidDigit }\")", format!("{:?}", config.unwrap_err()));
@@ -293,7 +318,6 @@ mod tests {
293318
.build();
294319
assert!(config.is_err());
295320
assert_eq!("General(\"Failed to parse user-supplied value 'ballista.with_information_schema' for configuration setting '123': ParseBoolError\")", format!("{:?}", config.unwrap_err()));
296-
297321
Ok(())
298322
}
299323
}

ballista/rust/core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub mod config;
2727
pub mod error;
2828
pub mod event_loop;
2929
pub mod execution_plans;
30+
/// some plugins
31+
pub mod plugin;
3032
pub mod utils;
3133

3234
#[macro_use]

ballista/rust/core/src/plugin/mod.rs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::error::Result;
19+
use crate::plugin::udf::UDFPluginManager;
20+
use libloading::Library;
21+
use std::any::Any;
22+
use std::env;
23+
use std::sync::Arc;
24+
25+
/// plugin manager
26+
pub mod plugin_manager;
27+
/// udf plugin
28+
pub mod udf;
29+
30+
/// CARGO_PKG_VERSION
31+
pub static CORE_VERSION: &str = env!("CARGO_PKG_VERSION");
32+
/// RUSTC_VERSION
33+
pub static RUSTC_VERSION: &str = env!("RUSTC_VERSION");
34+
35+
/// Top plugin trait
36+
pub trait Plugin {
37+
/// Returns the plugin as [`Any`](std::any::Any) so that it can be
38+
/// downcast to a specific implementation.
39+
fn as_any(&self) -> &dyn Any;
40+
}
41+
42+
/// The enum of Plugin
43+
#[derive(PartialEq, std::cmp::Eq, std::hash::Hash, Copy, Clone)]
44+
pub enum PluginEnum {
45+
/// UDF/UDAF plugin
46+
UDF,
47+
}
48+
49+
impl PluginEnum {
50+
/// new a struct which impl the PluginRegistrar trait
51+
pub fn init_plugin_manager(&self) -> Box<dyn PluginRegistrar> {
52+
match self {
53+
PluginEnum::UDF => Box::new(UDFPluginManager::default()),
54+
}
55+
}
56+
}
57+
58+
/// Every plugin need a PluginDeclaration
59+
#[derive(Copy, Clone)]
60+
pub struct PluginDeclaration {
61+
/// Rust doesn’t have a stable ABI, meaning different compiler versions can generate incompatible code.
62+
/// For these reasons, the UDF plug-in must be compiled using the same version of rustc as datafusion.
63+
pub rustc_version: &'static str,
64+
65+
/// core version of the plugin. The plugin's core_version need same as plugin manager.
66+
pub core_version: &'static str,
67+
68+
/// One of PluginEnum
69+
pub plugin_type: unsafe extern "C" fn() -> PluginEnum,
70+
}
71+
72+
/// Plugin Registrar , Every plugin need implement this trait
73+
pub trait PluginRegistrar: Send + Sync + 'static {
74+
/// # Safety
75+
/// load plugin from library
76+
unsafe fn load(&mut self, library: Arc<Library>) -> Result<()>;
77+
78+
/// Returns the plugin as [`Any`](std::any::Any) so that it can be
79+
/// downcast to a specific implementation.
80+
fn as_any(&self) -> &dyn Any;
81+
}
82+
83+
/// Declare a plugin's PluginDeclaration.
84+
///
85+
/// # Notes
86+
///
87+
/// This works by automatically generating an `extern "C"` function named `get_plugin_type` with a
88+
/// pre-defined signature and symbol name. And then generating a PluginDeclaration.
89+
/// Therefore you will only be able to declare one plugin per library.
90+
#[macro_export]
91+
macro_rules! declare_plugin {
92+
($plugin_type:expr) => {
93+
#[no_mangle]
94+
pub extern "C" fn get_plugin_type() -> $crate::plugin::PluginEnum {
95+
$plugin_type
96+
}
97+
98+
#[no_mangle]
99+
pub static plugin_declaration: $crate::plugin::PluginDeclaration =
100+
$crate::plugin::PluginDeclaration {
101+
rustc_version: $crate::plugin::RUSTC_VERSION,
102+
core_version: $crate::plugin::CORE_VERSION,
103+
plugin_type: get_plugin_type,
104+
};
105+
};
106+
}
107+
108+
/// get the plugin dir
109+
pub fn plugin_dir() -> String {
110+
let current_exe_dir = match env::current_exe() {
111+
Ok(exe_path) => exe_path.display().to_string(),
112+
Err(_e) => "".to_string(),
113+
};
114+
115+
// If current_exe_dir contain `deps` the root dir is the parent dir
116+
// eg: /Users/xxx/workspace/rust/rust_plugin_sty/target/debug/deps/plugins_app-067452b3ff2af70e
117+
// the plugin dir is /Users/xxx/workspace/rust/rust_plugin_sty/target/debug/deps
118+
// else eg: /Users/xxx/workspace/rust/rust_plugin_sty/target/debug/plugins_app
119+
// the plugin dir is /Users/xxx/workspace/rust/rust_plugin_sty/target/debug/
120+
if current_exe_dir.contains("/deps/") {
121+
let i = current_exe_dir.find("/deps/").unwrap();
122+
String::from(&current_exe_dir.as_str()[..i + 6])
123+
} else {
124+
let i = current_exe_dir.rfind('/').unwrap();
125+
String::from(&current_exe_dir.as_str()[..i])
126+
}
127+
}

0 commit comments

Comments
 (0)