diff --git a/Cargo.lock b/Cargo.lock index 6e3399dfb0d..6475581beba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3270,6 +3270,7 @@ dependencies = [ "async-trait", "influxdb3_catalog", "influxdb3_wal", + "log", "parking_lot", "pyo3", "schema", @@ -3304,6 +3305,7 @@ dependencies = [ "influxdb-line-protocol", "influxdb3_cache", "influxdb3_catalog", + "influxdb3_client", "influxdb3_id", "influxdb3_process", "influxdb3_sys_events", @@ -3328,6 +3330,7 @@ dependencies = [ "parquet_file", "pin-project-lite", "pretty_assertions", + "pyo3", "schema", "secrecy", "serde", @@ -3457,6 +3460,7 @@ dependencies = [ "influxdb-line-protocol", "influxdb3_cache", "influxdb3_catalog", + "influxdb3_client", "influxdb3_id", "influxdb3_py_api", "influxdb3_telemetry", diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index 457163c4541..43da370c3bc 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -74,7 +74,7 @@ tokio_console = ["console-subscriber", "tokio/tracing", "observability_deps/rele # Use jemalloc as the default allocator. jemalloc_replacing_malloc = ["influxdb3_process/jemalloc_replacing_malloc"] -system-py = ["influxdb3_write/system-py"] +system-py = ["influxdb3_write/system-py", "influxdb3_server/system-py"] [dev-dependencies] # Core Crates diff --git a/influxdb3/src/commands/plugin_test/mod.rs b/influxdb3/src/commands/plugin_test/mod.rs new file mode 100644 index 00000000000..45ad961b397 --- /dev/null +++ b/influxdb3/src/commands/plugin_test/mod.rs @@ -0,0 +1,21 @@ +use std::error::Error; + +pub mod wal; + +#[derive(Debug, clap::Parser)] +pub(crate) struct Config { + #[clap(subcommand)] + command: Command, +} + +#[derive(Debug, clap::Parser)] +enum Command { + /// Test a plugin triggered by WAL writes + Wal(wal::Config), +} + +pub(crate) async fn command(config: Config) -> Result<(), Box> { + match config.command { + Command::Wal(config) => wal::command(config).await, + } +} diff --git a/influxdb3/src/commands/plugin_test/wal.rs b/influxdb3/src/commands/plugin_test/wal.rs new file mode 100644 index 00000000000..46b429da29c --- /dev/null +++ b/influxdb3/src/commands/plugin_test/wal.rs @@ -0,0 +1,68 @@ +use std::error::Error; +use secrecy::ExposeSecret; +use influxdb3_client::plugin_test::WalPluginTestRequest; +use crate::commands::common::InfluxDb3Config; + +#[derive(Debug, clap::Parser)] +pub struct Config { + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + #[clap(flatten)] + wal_plugin_test: WalPluginTest, +} + +#[derive(Debug, clap::Parser)] +pub struct WalPluginTest { + /// The name of the plugin, which should match its file name on the server /.py + #[clap(short = 'n', long = "name")] + pub name: String, + /// If given, pass this line protocol as input + #[clap(long = "lp")] + pub input_lp: Option, + /// If given, pass this file of LP as input from on the server /_test/ + #[clap(long = "file")] + pub input_file: Option, + /// If given, save the output to this file on the server in /_test/ + #[clap(long = "save-output-to-file")] + pub save_output_to_file: Option, + /// If given, validate the output against this file on the server in /_test/ + #[clap(long = "validate-output-file")] + pub validate_output_file: Option, +} + +impl Into for WalPluginTest { + fn into(self) -> WalPluginTestRequest { + WalPluginTestRequest { + name: self.name, + input_lp: self.input_lp, + input_file: self.input_file, + save_output_to_file: self.save_output_to_file, + validate_output_file: self.validate_output_file, + } + } +} + +pub(super) async fn command(config: Config) -> Result<(), Box> { + let InfluxDb3Config { + host_url, + auth_token, + .. + } = config.influxdb3_config; + + let wal_plugin_test_request: WalPluginTestRequest = config.wal_plugin_test.into(); + + let mut client = influxdb3_client::Client::new(host_url)?; + if let Some(t) = auth_token { + client = client.with_auth_token(t.expose_secret()); + } + let resonse = client.wal_plugin_test(wal_plugin_test_request).await?; + + // pretty print the response + println!( + "RESPONSE:\n{}", + serde_json::to_string_pretty(&resonse).expect("serialize wal plugin test response as JSON") + ); + + Ok(()) +} diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index bc1a433b6f4..1b85e315370 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -38,6 +38,7 @@ use panic_logging::SendPanicsToTracing; use parquet_file::storage::{ParquetStorage, StorageId}; use std::{collections::HashMap, path::Path, str::FromStr}; use std::{num::NonZeroUsize, sync::Arc}; +use std::path::PathBuf; use thiserror::Error; use tokio::net::TcpListener; use tokio::time::Instant; @@ -293,6 +294,14 @@ pub struct Config { action )] pub meta_cache_eviction_interval: humantime::Duration, + + /// The local directory that has python plugins and their test files. + #[clap( + long = "plugin-dir", + env = "INFLUXDB3_PLUGIN_DIR", + action + )] + pub plugin_dir: Option, } /// Specified size of the Parquet cache in megabytes (MB) @@ -485,6 +494,7 @@ pub async fn command(config: Config) -> Result<()> { wal_config, parquet_cache, metric_registry: Arc::clone(&metrics), + plugin_dir: config.plugin_dir, }) .await .map_err(|e| Error::WriteBufferInit(e.into()))?; diff --git a/influxdb3/src/main.rs b/influxdb3/src/main.rs index 393d8ba9174..ba1710c4929 100644 --- a/influxdb3/src/main.rs +++ b/influxdb3/src/main.rs @@ -29,6 +29,7 @@ mod commands { pub mod serve; pub mod token; pub mod write; + pub mod plugin_test; } enum ReturnCode { @@ -101,6 +102,9 @@ enum Command { /// Manage table (delete only for the moment) Table(commands::manage::table::Config), + + /// Test Python plugins for processing WAL writes, persistence Snapshots, requests, or scheduled tasks. + PluginTest(commands::plugin_test::Config), } fn main() -> Result<(), std::io::Error> { @@ -177,6 +181,12 @@ fn main() -> Result<(), std::io::Error> { std::process::exit(ReturnCode::Failure as _) } } + Some(Command::PluginTest(config)) => { + if let Err(e) = commands::plugin_test::command(config).await { + eprintln!("Plugin Test command failed: {e}"); + std::process::exit(ReturnCode::Failure as _) + } + } } }); diff --git a/influxdb3_client/Cargo.toml b/influxdb3_client/Cargo.toml index d3dd6211cdb..5e7d53cb37a 100644 --- a/influxdb3_client/Cargo.toml +++ b/influxdb3_client/Cargo.toml @@ -14,6 +14,7 @@ bytes.workspace = true reqwest.workspace = true secrecy.workspace = true serde.workspace = true +serde_json.workspace = true thiserror.workspace = true url.workspace = true diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index dc7b851a390..9fafbbc4c14 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -1,3 +1,5 @@ +pub mod plugin_test; + use std::{ collections::HashMap, fmt::Display, num::NonZeroUsize, string::FromUtf8Error, time::Duration, }; @@ -8,6 +10,7 @@ use reqwest::{Body, IntoUrl, Method, StatusCode}; use secrecy::{ExposeSecret, Secret}; use serde::{Deserialize, Serialize}; use url::Url; +use crate::plugin_test::{WalPluginTestRequest, WalPluginTestResponse}; /// Primary error type for the [`Client`] #[derive(Debug, thiserror::Error)] @@ -466,6 +469,32 @@ impl Client { } } + /// Make a request to the `POST /api/v3/plugin_test/wal` API + pub async fn wal_plugin_test(&self, wal_plugin_test_request: WalPluginTestRequest) -> Result { + let api_path = "/api/v3/plugin_test/wal"; + + let url = self.base_url.join(api_path)?; + + let mut req = self.http_client.post(url).json(&wal_plugin_test_request); + + if let Some(token) = &self.auth_token { + req = req.bearer_auth(token.expose_secret()); + } + let resp = req + .send() + .await + .map_err(|src| Error::request_send(Method::POST, api_path, src))?; + + if resp.status().is_success() { + resp.json().await.map_err(Error::Json) + } else { + Err(Error::ApiError { + code: resp.status(), + message: resp.text().await.map_err(Error::Text)?, + }) + } + } + /// Send a `/ping` request to the target `influxdb3` server to check its /// status and gather `version` and `revision` information pub async fn ping(&self) -> Result { diff --git a/influxdb3_client/src/plugin_test.rs b/influxdb3_client/src/plugin_test.rs new file mode 100644 index 00000000000..8c60d01b7a2 --- /dev/null +++ b/influxdb3_client/src/plugin_test.rs @@ -0,0 +1,22 @@ +//! Request structs for the /api/v3/plugin_test API + +use std::collections::HashMap; +use serde::{Deserialize, Serialize}; + +/// Request definition for `POST /api/v3/plugin_test/wal` API +#[derive(Debug, Serialize, Deserialize)] +pub struct WalPluginTestRequest { + pub name: String, + pub input_lp: Option, + pub input_file: Option, + pub save_output_to_file: Option, + pub validate_output_file: Option, +} + +/// Response definition for `POST /api/v3/plugin_test/wal` API +#[derive(Debug, Serialize, Deserialize)] +pub struct WalPluginTestResponse { + pub log_lines: Vec, + pub database_writes: HashMap>, + pub errors: Vec, +} \ No newline at end of file diff --git a/influxdb3_py_api/Cargo.toml b/influxdb3_py_api/Cargo.toml index a76817489e8..c7fdea35d65 100644 --- a/influxdb3_py_api/Cargo.toml +++ b/influxdb3_py_api/Cargo.toml @@ -14,6 +14,7 @@ influxdb3_catalog = {path = "../influxdb3_catalog"} async-trait.workspace = true schema.workspace = true parking_lot.workspace = true +log = "0.4.22" [dependencies.pyo3] version = "0.23.3" diff --git a/influxdb3_py_api/src/lib.rs b/influxdb3_py_api/src/lib.rs index efb3333444b..3c9980d6a3a 100644 --- a/influxdb3_py_api/src/lib.rs +++ b/influxdb3_py_api/src/lib.rs @@ -1,2 +1,2 @@ #[cfg(feature = "system-py")] -pub mod system_py; +pub mod system_py; \ No newline at end of file diff --git a/influxdb3_py_api/src/system_py.rs b/influxdb3_py_api/src/system_py.rs index 2c0b7e1caf7..ce2b81d071d 100644 --- a/influxdb3_py_api/src/system_py.rs +++ b/influxdb3_py_api/src/system_py.rs @@ -1,12 +1,13 @@ -use influxdb3_catalog::catalog::{DatabaseSchema, TableDefinition}; +use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition}; use influxdb3_wal::{FieldData, Row, WriteBatch}; use parking_lot::Mutex; use pyo3::exceptions::PyValueError; use pyo3::prelude::{PyAnyMethods, PyModule, PyModuleMethods}; -use pyo3::{pyclass, pymethods, pymodule, Bound, IntoPyObject, PyErr, PyObject, PyResult, Python}; -use schema::InfluxColumnType; +use pyo3::{pyclass, pymethods, pymodule, Bound, IntoPy, IntoPyObject, PyErr, PyObject, PyResult, Python}; +use schema::{InfluxColumnType, Schema}; use std::ffi::CString; use std::sync::Arc; +use pyo3::types::{PyDict, PyList}; #[pyclass] #[derive(Debug)] @@ -183,6 +184,131 @@ impl PyWriteBatch { } } +#[pyclass] +#[derive(Debug)] +struct PyPluginCallApi { + schema: Arc, + catalog: Arc, + return_state: Arc>, +} + +#[derive(Debug, Default)] +struct ReturnState { + log_lines: Vec, +} + +enum LogLine { + Info(String), + Warn(String), + Error(String), +} + +impl std::fmt::Display for LogLine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + LogLine::Info(s) => write!(f, "INFO: {}", s), + LogLine::Warn(s) => write!(f, "WARN: {}", s), + LogLine::Error(s) => write!(f, "ERROR: {}", s), + } + } +} + +impl std::fmt::Debug for LogLine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(self, f) + } +} + + +#[pymethods] +impl PyPluginCallApi { + fn info(&self, line: &str) -> PyResult<()> { + self.return_state.lock().log_lines.push(LogLine::Info(line.to_string())); + Ok(()) + } + + fn warn(&self, line: &str) -> PyResult<()> { + self.return_state.lock().log_lines.push(LogLine::Warn(line.to_string())); + Ok(()) + } + + fn error(&self, line: &str) -> PyResult<()> { + self.return_state.lock().log_lines.push(LogLine::Error(line.to_string())); + Ok(()) + } +} + +pub fn execute_python_with_batch(code: &str, write_batch: &WriteBatch, schema: Arc, catalog: Arc) -> PyResult> { + Python::with_gil(|py| { + let mut table_batches = Vec::with_capacity(write_batch.table_chunks.len()); + + for (table_id, table_chunks) in &write_batch.table_chunks { + let table_def = schema.tables.get(table_id).unwrap(); + + let dict = PyDict::new(py); + dict.set_item("table_name", table_def.table_name.as_ref()).unwrap(); + + let mut rows: Vec = Vec::new(); + for chunk in table_chunks.chunk_time_to_chunk.values() { + for row in &chunk.rows { + let py_row = PyDict::new(py); + py_row.set_item("time", row.time).unwrap(); + let mut fields = Vec::with_capacity(row.fields.len()); + for field in &row.fields { + let field_data = match &field.value { + FieldData::String(s) => s.into_py(py), + FieldData::Integer(i) => i.into_py(py), + FieldData::UInteger(u) => u.into_py(py), + FieldData::Float(f) => f.into_py(py), + FieldData::Boolean(b) => b.into_py(py), + FieldData::Tag(t) => t.into_py(py), + FieldData::Key(k) => k.into_py(py), + FieldData::Timestamp(ts) => ts.into_py(py), + }; + let field_name = table_def.column_id_to_name(&field.id).unwrap(); + if field_name.as_ref() == "time" { + continue; + } + + let field = PyDict::new(py); + field.set_item("name", field_name.as_ref()).unwrap(); + field.set_item("value", field_data).unwrap(); + fields.push(field.unbind()); + } + let fields = PyList::new(py, fields).unwrap(); + py_row.set_item("fields", fields.unbind()).unwrap(); + + rows.push(py_row.into()); + } + } + + let rows = PyList::new(py, rows).unwrap(); + + dict.set_item("rows", rows.unbind()).unwrap(); + table_batches.push(dict); + } + + let py_batches = PyList::new(py, table_batches).unwrap(); + + let locals = PyDict::new(py); + locals.set_item("table_batches", py_batches.unbind()).unwrap(); + + let api = PyPluginCallApi { + schema, + catalog, + return_state: Default::default(), + }; + let return_state = Arc::clone(&api.return_state); + locals.set_item("api", api.into_py(py)).unwrap(); + + py.run(&CString::new(code).unwrap(), None, Some(&locals)).unwrap(); + + let log_lines = return_state.lock().log_lines.iter().map(|line| line.to_string()).collect(); + + Ok(log_lines) + }) +} + // Module initialization #[pymodule] fn influxdb3_py_api(m: &Bound<'_, PyModule>) -> PyResult<()> { diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index ffd3fa8b2f0..c1aa58fb96b 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -32,6 +32,7 @@ tracker.workspace = true # Local Deps influxdb3_cache = { path = "../influxdb3_cache" } influxdb3_catalog = { path = "../influxdb3_catalog" } +influxdb3_client = { path = "../influxdb3_client" } influxdb3_id = { path = "../influxdb3_id" } influxdb3_process = { path = "../influxdb3_process", default-features = false } influxdb3_wal = { path = "../influxdb3_wal"} @@ -75,6 +76,15 @@ tonic.workspace = true tower.workspace = true unicode-segmentation.workspace = true +[dependencies.pyo3] +version = "0.23.3" +# this is necessary to automatically initialize the Python interpreter +features = ["auto-initialize"] +optional = true + +[features] +system-py = ["pyo3"] + [dev-dependencies] # Core Crates parquet.workspace = true diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 5624115cb43..bb74558f9aa 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -49,6 +49,7 @@ use std::sync::Arc; use std::time::Duration; use thiserror::Error; use unicode_segmentation::UnicodeSegmentation; +use influxdb3_client::plugin_test::WalPluginTestRequest; mod v1; @@ -206,6 +207,9 @@ pub enum Error { #[error(transparent)] Catalog(#[from] CatalogError), + + #[error("Python plugins not enabled on this server")] + PythonPluginsNotEnabled, } #[derive(Debug, Error)] @@ -1030,6 +1034,24 @@ where .unwrap()) } + /// Endpoint for testing a plugin that will be trigger on WAL writes. + #[cfg(feature = "system-py")] + async fn test_processing_engine_wal_plugin(&self, req: Request) -> Result> { + let request: WalPluginTestRequest = self.read_body_json(req).await?; + + let output = self.write_buffer.test_wal_plugin(request).await?; + let body = serde_json::to_string(&output)?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(body))?) + } + + #[cfg(not(feature = "system-py"))] + async fn test_processing_engine_wal_plugin(&self, _req: Request) -> Result> { + Err(Error::PythonPluginsNotEnabled) + } + async fn delete_database(&self, req: Request) -> Result> { let query = req.uri().query().unwrap_or(""); let delete_req = serde_urlencoded::from_str::(query)?; @@ -1558,6 +1580,7 @@ pub(crate) async fn route_request( (Method::POST, "/api/v3/configure/table") => http_server.create_table(req).await, // TODO: make table delete to use path param (DELETE db/foodb/table/bar) (Method::DELETE, "/api/v3/configure/table") => http_server.delete_table(req).await, + (Method::POST, "/api/v3/plugin_test/wal") => http_server.test_processing_engine_wal_plugin(req).await, _ => { let body = Body::from("not found"); Ok(Response::builder() diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index 6d0fa9b45c0..22a7f6ee171 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -26,6 +26,7 @@ schema.workspace = true # Local deps influxdb3_cache = { path = "../influxdb3_cache" } influxdb3_catalog = { path = "../influxdb3_catalog" } +influxdb3_client = { path = "../influxdb3_client" } influxdb3_id = { path = "../influxdb3_id" } influxdb3_test_helpers = { path = "../influxdb3_test_helpers" } influxdb3_wal = { path = "../influxdb3_wal" } diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 7fcc64e414d..b79bf100821 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -8,6 +8,8 @@ pub mod queryable_buffer; mod table_buffer; pub mod validator; +use std::fmt::format; +use std::path::PathBuf; use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::queryable_buffer::QueryableBuffer; @@ -58,6 +60,7 @@ use queryable_buffer::QueryableBufferArgs; use schema::Schema; use std::sync::Arc; use std::time::Duration; +use anyhow::Context; use thiserror::Error; use tokio::sync::watch::Receiver; @@ -66,6 +69,7 @@ use { crate::write_buffer::plugins::PluginContext, influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound, }; +use influxdb3_client::plugin_test::{WalPluginTestRequest, WalPluginTestResponse}; #[derive(Debug, Error)] pub enum Error { @@ -133,6 +137,12 @@ pub enum Error { #[error("error in metadata cache: {0}")] MetaCacheError(#[from] meta_cache::ProviderError), + + #[error("error: {0}")] + AnyhowError(#[from] anyhow::Error), + + #[error("reading plugin file: {0}")] + ReadPluginError(#[from] std::io::Error), } pub type Result = std::result::Result; @@ -160,6 +170,7 @@ pub struct WriteBufferImpl { metrics: WriteMetrics, meta_cache: Arc, last_cache: Arc, + plugin_dir: Option, } /// The maximum number of snapshots to load on start @@ -176,6 +187,7 @@ pub struct WriteBufferImplArgs { pub wal_config: WalConfig, pub parquet_cache: Option>, pub metric_registry: Arc, + pub plugin_dir: Option, } impl WriteBufferImpl { @@ -190,6 +202,7 @@ impl WriteBufferImpl { wal_config, parquet_cache, metric_registry, + plugin_dir, }: WriteBufferImplArgs, ) -> Result> { // load snapshots and replay the wal into the in memory buffer @@ -248,6 +261,7 @@ impl WriteBufferImpl { persisted_files, buffer: queryable_buffer, metrics: WriteMetrics::new(&metric_registry), + plugin_dir, }); let write_buffer: Arc = result.clone(); let triggers = result.catalog().triggers(); @@ -407,6 +421,13 @@ impl WriteBufferImpl { Ok(chunks) } + + + fn read_plugin_code(&self, name: &str) -> Result { + let plugin_dir = self.plugin_dir.clone().context("plugin dir not set")?; + let path = plugin_dir.join(format!("{}.py", name)); + Ok(std::fs::read_to_string(path)?) + } } pub fn parquet_chunk_from_file( @@ -954,6 +975,20 @@ impl ProcessingEngineManager for WriteBufferImpl { Ok(()) } + + #[cfg_attr(not(feature = "system-py"), allow(unused))] + async fn test_wal_plugin(&self, request: WalPluginTestRequest) -> crate::Result { + // create a copy of the catalog so we don't modify the original + let catalog = Arc::new(Catalog::from_inner(self.catalog.clone_inner())); + let now = self.time_provider.now(); + + let code = self.read_plugin_code(&request.name)?; + + #[cfg(feature = "system-py")] + return Ok(plugins::run_test_wal_plugin(now, catalog, code, request).unwrap()); + + Err(Error::AnyhowError(anyhow::anyhow!("system-py feature not enabled"))) + } } #[derive(Clone)] @@ -1042,6 +1077,7 @@ mod tests { wal_config: WalConfig::test_config(), parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), + plugin_dir: None, }) .await .unwrap(); @@ -1126,6 +1162,7 @@ mod tests { }, parquet_cache: Some(Arc::clone(&parquet_cache)), metric_registry: Default::default(), + plugin_dir: None, }) .await .unwrap(); @@ -1194,6 +1231,7 @@ mod tests { }, parquet_cache: wbuf.parquet_cache.clone(), metric_registry: Default::default(), + plugin_dir: None, }) .await .unwrap() @@ -1421,6 +1459,7 @@ mod tests { }, parquet_cache: write_buffer.parquet_cache.clone(), metric_registry: Default::default(), + plugin_dir: None, }) .await .unwrap(); @@ -2661,6 +2700,7 @@ mod tests { wal_config, parquet_cache, metric_registry: Arc::clone(&metric_registry), + plugin_dir: None, }) .await .unwrap(); diff --git a/influxdb3_write/src/write_buffer/plugins.rs b/influxdb3_write/src/write_buffer/plugins.rs index 0ba393c5b7f..fdb9e6f9d36 100644 --- a/influxdb3_write/src/write_buffer/plugins.rs +++ b/influxdb3_write/src/write_buffer/plugins.rs @@ -1,11 +1,18 @@ use crate::write_buffer::PluginEvent; -use crate::{write_buffer, WriteBuffer}; -use influxdb3_wal::{PluginType, TriggerDefinition, TriggerSpecificationDefinition}; +use crate::{write_buffer, Precision, WriteBuffer}; +use influxdb3_wal::{Gen1Duration, PluginType, TriggerDefinition, TriggerSpecificationDefinition}; use observability_deps::tracing::warn; use std::fmt::Debug; +use std::path::PathBuf; use std::sync::Arc; +use anyhow::Context; +use data_types::NamespaceName; +use iox_time::Time; use thiserror::Error; use tokio::sync::broadcast::error::RecvError; +use influxdb3_catalog::catalog::Catalog; +use influxdb3_client::plugin_test::{WalPluginTestRequest, WalPluginTestResponse}; +use crate::write_buffer::validator::WriteValidator; #[derive(Debug, Error)] pub enum Error { @@ -18,6 +25,9 @@ pub enum Error { #[error(transparent)] WriteBufferError(#[from] write_buffer::Error), + + #[error(transparent)] + AnyhowError(#[from] anyhow::Error), } /// `[ProcessingEngineManager]` is used to interact with the processing engine, @@ -50,6 +60,11 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { db_name: &str, trigger_name: &str, ) -> crate::Result<(), write_buffer::Error>; + + async fn test_wal_plugin( + &self, + request: WalPluginTestRequest, + ) -> crate::Result; } #[cfg(feature = "system-py")] @@ -188,3 +203,57 @@ mod python_plugin { } } } + +#[cfg(feature = "system-py")] +pub(crate) fn run_test_wal_plugin(now_time: Time, catalog: Arc, code: String, request: WalPluginTestRequest) -> Result { + // parse the lp into a write batch + let namespace = NamespaceName::new("_testdb").unwrap(); + let validator = WriteValidator::initialize(namespace, Arc::clone(&catalog), now_time.timestamp_nanos())?; + let data = validator.v1_parse_lines_and_update_schema(&request.input_lp.unwrap(), false, now_time, Precision::Nanosecond)?; + let data = data.convert_lines_to_buffer(Gen1Duration::new_1m()); + let db = catalog.db_schema("_testdb").unwrap(); + + let log_lines = influxdb3_py_api::system_py::execute_python_with_batch(&code, &data.valid_data, db, catalog)?; + Ok(WalPluginTestResponse{ + log_lines, + database_writes: Default::default(), + errors: vec![], + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_wal_plugin() { + let now = Time::from_timestamp_nanos(1); + let catalog = Catalog::new("foo".into(), "bar".into()); + let code = r#" +for table_batch in table_batches: + api.info("table: " + table_batch["table_name"]) + + for row in table_batch["rows"]: + api.info("row: " + str(row)) + +api.info("done") + ""#; + + let request = WalPluginTestRequest { + name: "test".into(), + input_lp: Some("cpu,host=A,region=west usage=1i,system=23.2 100".into()), + input_file: None, + save_output_to_file: None, + validate_output_file: None, + }; + + let reesponse = run_test_wal_plugin(now, Arc::new(catalog), code.to_string(), request).unwrap(); + + let expected_log_lines = vec![ + "table: cpu", + "row: {\"host\": \"A\", \"region\": \"west\", \"usage\": 1, \"system\": 23.2, \"_time\": 100}", + "done", + ]; + assert_eq!(reesponse.log_lines, expected_log_lines); + } +} \ No newline at end of file