From 29dacc318a68e08bb6c03e99ba7990e2a2ad4d2f Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Thu, 2 Jan 2025 09:23:18 -0800 Subject: [PATCH] feat(processing_engine): Add REST API endpoints for activating and deactivating triggers. (#25711) --- influxdb3_catalog/src/catalog.rs | 110 +++- influxdb3_catalog/src/serialize.rs | 3 + influxdb3_server/src/http.rs | 76 ++- .../src/system_tables/python_call.rs | 9 +- influxdb3_wal/src/lib.rs | 17 +- influxdb3_wal/src/object_store.rs | 8 +- influxdb3_write/src/write_buffer/mod.rs | 523 +++++++++++++++++- influxdb3_write/src/write_buffer/plugins.rs | 57 +- .../src/write_buffer/queryable_buffer.rs | 74 ++- 9 files changed, 814 insertions(+), 63 deletions(-) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index da00824fa41..91711916743 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -9,9 +9,10 @@ use hashbrown::HashMap; use indexmap::IndexMap; use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId}; use influxdb3_wal::{ - CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition, FieldAdditions, - FieldDefinition, LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete, - OrderedCatalogBatch, PluginDefinition, TriggerDefinition, + CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeletePluginDefinition, + DeleteTableDefinition, FieldAdditions, FieldDefinition, LastCacheDefinition, LastCacheDelete, + MetaCacheDefinition, MetaCacheDelete, OrderedCatalogBatch, PluginDefinition, TriggerDefinition, + TriggerIdentifier, }; use influxdb_line_protocol::FieldValue; use iox_time::Time; @@ -99,6 +100,18 @@ pub enum Error { trigger_name: String, }, + #[error( + "Cannot delete plugin {} in database {} because it is used by trigger {}", + plugin_name, + database_name, + trigger_name + )] + ProcessingEnginePluginInUse { + database_name: String, + plugin_name: String, + trigger_name: String, + }, + #[error( "Processing Engine Plugin {} not in DB schema for {}", plugin_name, @@ -317,8 +330,14 @@ impl Catalog { .flat_map(|schema| { schema .processing_engine_triggers - .keys() - .map(move |key| (schema.name.to_string(), key.to_string())) + .iter() + .filter_map(move |(key, trigger)| { + if trigger.disabled { + None + } else { + Some((schema.name.to_string(), key.to_string())) + } + }) }) .collect(); result @@ -692,8 +711,15 @@ impl UpdateDatabaseSchema for CatalogOp { } CatalogOp::DeleteDatabase(delete_database) => delete_database.update_schema(schema), CatalogOp::DeleteTable(delete_table) => delete_table.update_schema(schema), + CatalogOp::DeletePlugin(delete_plugin) => delete_plugin.update_schema(schema), CatalogOp::CreatePlugin(create_plugin) => create_plugin.update_schema(schema), CatalogOp::CreateTrigger(create_trigger) => create_trigger.update_schema(schema), + CatalogOp::EnableTrigger(trigger_identifier) => { + EnableTrigger(trigger_identifier.clone()).update_schema(schema) + } + CatalogOp::DisableTrigger(trigger_identifier) => { + DisableTrigger(trigger_identifier.clone()).update_schema(schema) + } } } } @@ -760,6 +786,29 @@ impl UpdateDatabaseSchema for DeleteTableDefinition { } } +impl UpdateDatabaseSchema for DeletePluginDefinition { + fn update_schema<'a>( + &self, + mut schema: Cow<'a, DatabaseSchema>, + ) -> Result> { + // check that there aren't any triggers with this name. + for (trigger_name, trigger) in &schema.processing_engine_triggers { + if trigger.plugin_name == self.plugin_name { + return Err(Error::ProcessingEnginePluginInUse { + database_name: schema.name.to_string(), + plugin_name: self.plugin_name.to_string(), + trigger_name: trigger_name.to_string(), + }); + } + } + schema + .to_mut() + .processing_engine_plugins + .remove(&self.plugin_name); + Ok(schema) + } +} + impl UpdateDatabaseSchema for PluginDefinition { fn update_schema<'a>( &self, @@ -785,6 +834,57 @@ impl UpdateDatabaseSchema for PluginDefinition { } } +struct EnableTrigger(TriggerIdentifier); +struct DisableTrigger(TriggerIdentifier); + +impl UpdateDatabaseSchema for EnableTrigger { + fn update_schema<'a>( + &self, + mut schema: Cow<'a, DatabaseSchema>, + ) -> Result> { + let Some(trigger) = schema.processing_engine_triggers.get(&self.0.trigger_name) else { + return Err(Error::ProcessingEngineTriggerNotFound { + database_name: self.0.db_name.to_string(), + trigger_name: self.0.trigger_name.to_string(), + }); + }; + if !trigger.disabled { + return Ok(schema); + } + let mut_trigger = schema + .to_mut() + .processing_engine_triggers + .get_mut(&self.0.trigger_name) + .expect("already checked containment"); + mut_trigger.disabled = false; + Ok(schema) + } +} + +impl UpdateDatabaseSchema for DisableTrigger { + fn update_schema<'a>( + &self, + mut schema: Cow<'a, DatabaseSchema>, + ) -> Result> { + let Some(trigger) = schema.processing_engine_triggers.get(&self.0.trigger_name) else { + return Err(Error::ProcessingEngineTriggerNotFound { + database_name: self.0.db_name.to_string(), + trigger_name: self.0.trigger_name.to_string(), + }); + }; + if trigger.disabled { + return Ok(schema); + } + let mut_trigger = schema + .to_mut() + .processing_engine_triggers + .get_mut(&self.0.trigger_name) + .expect("already checked containment"); + mut_trigger.disabled = true; + Ok(schema) + } +} + impl UpdateDatabaseSchema for TriggerDefinition { fn update_schema<'a>( &self, diff --git a/influxdb3_catalog/src/serialize.rs b/influxdb3_catalog/src/serialize.rs index 738ae3b76b6..a6b6fbcfb3d 100644 --- a/influxdb3_catalog/src/serialize.rs +++ b/influxdb3_catalog/src/serialize.rs @@ -105,6 +105,7 @@ impl From for DatabaseSchema { plugin_name: plugin.plugin_name.to_string(), plugin, trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(), + disabled: trigger.disabled, }, ) }) @@ -171,6 +172,7 @@ struct ProcessingEngineTriggerSnapshot { pub trigger_name: String, pub plugin_name: String, pub trigger_specification: String, + pub disabled: bool, } /// Representation of Arrow's `DataType` for table snapshots. @@ -434,6 +436,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot { plugin_name: trigger.plugin_name.to_string(), trigger_specification: serde_json::to_string(&trigger.trigger) .expect("should be able to serialize trigger specification"), + disabled: trigger.disabled, } } } diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index ef0ac007141..8b70b6ea9ef 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -973,6 +973,19 @@ where .body(Body::empty())?) } + async fn delete_processing_engine_plugin(&self, req: Request) -> Result> { + let ProcessingEnginePluginDeleteRequest { db, plugin_name } = + if let Some(query) = req.uri().query() { + serde_urlencoded::from_str(query)? + } else { + self.read_body_json(req).await? + }; + self.write_buffer.delete_plugin(&db, &plugin_name).await?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) + } + async fn configure_processing_engine_trigger( &self, req: Request, @@ -982,6 +995,7 @@ where plugin_name, trigger_name, trigger_specification, + disabled, } = if let Some(query) = req.uri().query() { serde_urlencoded::from_str(query)? } else { @@ -993,13 +1007,47 @@ where trigger_name.clone(), plugin_name, trigger_specification, + disabled, ) .await?; + if !disabled { + self.write_buffer + .run_trigger( + Arc::clone(&self.write_buffer), + db.as_str(), + trigger_name.as_str(), + ) + .await?; + } + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) + } + + async fn deactivate_processing_engine_trigger( + &self, + req: Request, + ) -> Result> { + let query = req.uri().query().unwrap_or(""); + let delete_req = serde_urlencoded::from_str::(query)?; self.write_buffer - .run_trigger( + .deactivate_trigger(delete_req.db.as_str(), delete_req.trigger_name.as_str()) + .await?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) + } + async fn activate_processing_engine_trigger( + &self, + req: Request, + ) -> Result> { + let query = req.uri().query().unwrap_or(""); + let delete_req = serde_urlencoded::from_str::(query)?; + self.write_buffer + .activate_trigger( Arc::clone(&self.write_buffer), - db.as_str(), - trigger_name.as_str(), + delete_req.db.as_str(), + delete_req.trigger_name.as_str(), ) .await?; Ok(Response::builder() @@ -1441,6 +1489,12 @@ struct ProcessingEnginePluginCreateRequest { plugin_type: PluginType, } +#[derive(Debug, Deserialize)] +struct ProcessingEnginePluginDeleteRequest { + db: String, + plugin_name: String, +} + /// Request definition for `POST /api/v3/configure/processing_engine_trigger` API #[derive(Debug, Deserialize)] struct ProcessEngineTriggerCreateRequest { @@ -1448,6 +1502,13 @@ struct ProcessEngineTriggerCreateRequest { plugin_name: String, trigger_name: String, trigger_specification: TriggerSpecificationDefinition, + disabled: bool, +} + +#[derive(Debug, Deserialize)] +struct ProcessingEngineTriggerIdentifier { + db: String, + trigger_name: String, } #[derive(Debug, Deserialize)] @@ -1563,6 +1624,15 @@ pub(crate) async fn route_request( (Method::POST, "/api/v3/configure/processing_engine_plugin") => { http_server.configure_processing_engine_plugin(req).await } + (Method::DELETE, "/api/v3/configure/processing_engine_plugin") => { + http_server.delete_processing_engine_plugin(req).await + } + (Method::POST, "/api/v3/configure/processing_engine_trigger/deactivate") => { + http_server.deactivate_processing_engine_trigger(req).await + } + (Method::POST, "/api/v3/configure/processing_engine_trigger/activate") => { + http_server.activate_processing_engine_trigger(req).await + } (Method::POST, "/api/v3/configure/processing_engine_trigger") => { http_server.configure_processing_engine_trigger(req).await } diff --git a/influxdb3_server/src/system_tables/python_call.rs b/influxdb3_server/src/system_tables/python_call.rs index d56bea42e7e..9ae58292044 100644 --- a/influxdb3_server/src/system_tables/python_call.rs +++ b/influxdb3_server/src/system_tables/python_call.rs @@ -1,4 +1,4 @@ -use arrow_array::{ArrayRef, RecordBatch, StringArray}; +use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; use datafusion::common::Result; @@ -94,6 +94,7 @@ fn trigger_schema() -> SchemaRef { Field::new("trigger_name", DataType::Utf8, false), Field::new("plugin_name", DataType::Utf8, false), Field::new("trigger_specification", DataType::Utf8, false), + Field::new("disabled", DataType::Boolean, false), ]; Schema::new(columns).into() } @@ -124,10 +125,16 @@ impl IoxSystemTable for ProcessingEngineTriggerTable { .iter() .map(|trigger| serde_json::to_string(&trigger.trigger).ok()) .collect::(); + let disabled = self + .triggers + .iter() + .map(|trigger| Some(trigger.disabled)) + .collect::(); let columns: Vec = vec![ Arc::new(trigger_column), Arc::new(plugin_column), Arc::new(specification_column), + Arc::new(disabled), ]; Ok(RecordBatch::try_new(Arc::clone(&self.schema), columns)?) } diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 532748e8b56..ed523996613 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -106,7 +106,7 @@ pub trait Wal: Debug + Send + Sync + 'static { #[async_trait] pub trait WalFileNotifier: Debug + Send + Sync + 'static { /// Notify the handler that a new WAL file has been persisted with the given contents. - fn notify(&self, write: WalContents); + async fn notify(&self, write: WalContents); /// Notify the handler that a new WAL file has been persisted with the given contents and tell /// it to snapshot the data. The returned receiver will be signalled when the snapshot is complete. @@ -301,7 +301,10 @@ pub enum CatalogOp { DeleteDatabase(DeleteDatabaseDefinition), DeleteTable(DeleteTableDefinition), CreatePlugin(PluginDefinition), + DeletePlugin(DeletePluginDefinition), CreateTrigger(TriggerDefinition), + EnableTrigger(TriggerIdentifier), + DisableTrigger(TriggerIdentifier), } #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] @@ -587,6 +590,11 @@ pub struct PluginDefinition { pub plugin_type: PluginType, } +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +pub struct DeletePluginDefinition { + pub plugin_name: String, +} + #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize, Copy)] #[serde(rename_all = "snake_case")] pub enum PluginType { @@ -600,6 +608,13 @@ pub struct TriggerDefinition { pub trigger: TriggerSpecificationDefinition, // TODO: decide whether this should be populated from a reference rather than stored on its own. pub plugin: PluginDefinition, + pub disabled: bool, +} + +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +pub struct TriggerIdentifier { + pub db_name: String, + pub trigger_name: String, } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 741911a4abd..22fbc5a5c42 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -134,7 +134,7 @@ impl WalObjectStore { match wal_contents.snapshot { // This branch uses so much time - None => self.file_notifier.notify(wal_contents), + None => self.file_notifier.notify(wal_contents).await, Some(snapshot_details) => { let snapshot_info = { let mut buffer = self.flush_buffer.lock().await; @@ -151,7 +151,7 @@ impl WalObjectStore { if snapshot_details.snapshot_sequence_number <= last_snapshot_sequence_number { // Instead just notify about the WAL, as this snapshot has already been taken // and WAL files may have been cleared. - self.file_notifier.notify(wal_contents); + self.file_notifier.notify(wal_contents).await; } else { let snapshot_done = self .file_notifier @@ -297,7 +297,7 @@ impl WalObjectStore { "notify sent to buffer for wal file {}", wal_contents.wal_file_number.as_u64() ); - self.file_notifier.notify(wal_contents); + self.file_notifier.notify(wal_contents).await; None } }; @@ -1100,7 +1100,7 @@ mod tests { #[async_trait] impl WalFileNotifier for TestNotifier { - fn notify(&self, write: WalContents) { + async fn notify(&self, write: WalContents) { self.notified_writes.lock().push(write); } diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 7f390937b1e..29fdbaa8682 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -30,10 +30,9 @@ use influxdb3_cache::last_cache::{self, LastCacheProvider}; use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MetaCacheProvider}; use influxdb3_cache::parquet_cache::ParquetCacheOracle; use influxdb3_catalog::catalog; +use influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::{ColumnId, DbId, TableId}; -use influxdb3_wal::FieldDataType; -use influxdb3_wal::TableDefinition; use influxdb3_wal::{ object_store::WalObjectStore, DeleteDatabaseDefinition, PluginDefinition, PluginType, TriggerDefinition, TriggerSpecificationDefinition, WalContents, @@ -44,6 +43,8 @@ use influxdb3_wal::{ }; use influxdb3_wal::{CatalogOp::CreateLastCache, DeleteTableDefinition}; use influxdb3_wal::{DatabaseDefinition, FieldDefinition}; +use influxdb3_wal::{DeletePluginDefinition, TableDefinition}; +use influxdb3_wal::{FieldDataType, TriggerIdentifier}; use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges}; use iox_query::QueryChunk; use iox_time::{Time, TimeProvider}; @@ -59,13 +60,11 @@ use schema::Schema; use std::sync::Arc; use std::time::Duration; use thiserror::Error; +use tokio::sync::oneshot; use tokio::sync::watch::Receiver; #[cfg(feature = "system-py")] -use { - crate::write_buffer::plugins::PluginContext, - influxdb3_catalog::catalog::Error::ProcessingEngineTriggerNotFound, -}; +use crate::write_buffer::plugins::PluginContext; #[derive(Debug, Error)] pub enum Error { @@ -848,12 +847,38 @@ impl ProcessingEngineManager for WriteBufferImpl { Ok(()) } + async fn delete_plugin(&self, db: &str, plugin_name: &str) -> crate::Result<(), Error> { + let (db_id, db_schema) = + self.catalog + .db_id_and_schema(db) + .ok_or_else(|| Error::DatabaseNotFound { + db_name: db.to_string(), + })?; + let catalog_op = CatalogOp::DeletePlugin(DeletePluginDefinition { + plugin_name: plugin_name.to_string(), + }); + let catalog_batch = CatalogBatch { + time_ns: self.time_provider.now().timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![catalog_op], + }; + + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(catalog_batch)? { + self.wal + .write_ops(vec![WalOp::Catalog(catalog_batch)]) + .await?; + } + Ok(()) + } + async fn insert_trigger( &self, db_name: &str, trigger_name: String, plugin_name: String, trigger_specification: TriggerSpecificationDefinition, + disabled: bool, ) -> crate::Result<(), Error> { let Some((db_id, db_schema)) = self.catalog.db_id_and_schema(db_name) else { return Err(Error::DatabaseNotFound { @@ -872,6 +897,7 @@ impl ProcessingEngineManager for WriteBufferImpl { plugin_name, plugin: plugin.clone(), trigger: trigger_specification, + disabled, }); let creation_time = self.time_provider.now(); let catalog_batch = CatalogBatch { @@ -910,7 +936,10 @@ impl ProcessingEngineManager for WriteBufferImpl { trigger_name: trigger_name.to_string(), })? .clone(); - let trigger_rx = self.buffer.subscribe_to_plugin_events(); + let trigger_rx = self + .buffer + .subscribe_to_plugin_events(trigger_name.to_string()) + .await; let plugin_context = PluginContext { trigger_rx, write_buffer, @@ -920,12 +949,103 @@ impl ProcessingEngineManager for WriteBufferImpl { Ok(()) } + + async fn deactivate_trigger( + &self, + db_name: &str, + trigger_name: &str, + ) -> std::result::Result<(), Error> { + let (db_id, db_schema) = + self.catalog + .db_id_and_schema(db_name) + .ok_or_else(|| Error::DatabaseNotFound { + db_name: db_name.to_string(), + })?; + let trigger = db_schema + .processing_engine_triggers + .get(trigger_name) + .ok_or_else(|| ProcessingEngineTriggerNotFound { + database_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + })?; + // Already disabled, so this is a no-op + if trigger.disabled { + return Ok(()); + }; + + let mut deactivated = trigger.clone(); + deactivated.disabled = true; + let catalog_op = CatalogOp::DisableTrigger(TriggerIdentifier { + db_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + }); + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(CatalogBatch { + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + time_ns: self.time_provider.now().timestamp_nanos(), + ops: vec![catalog_op], + })? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } + // TODO: handle processing engine errors + self.buffer + .deactivate_trigger(trigger_name.to_string()) + .await + .unwrap(); + Ok(()) + } + + async fn activate_trigger( + &self, + write_buffer: Arc, + db_name: &str, + trigger_name: &str, + ) -> std::result::Result<(), Error> { + let (db_id, db_schema) = + self.catalog + .db_id_and_schema(db_name) + .ok_or_else(|| Error::DatabaseNotFound { + db_name: db_name.to_string(), + })?; + let trigger = db_schema + .processing_engine_triggers + .get(trigger_name) + .ok_or_else(|| ProcessingEngineTriggerNotFound { + database_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + })?; + // Already disabled, so this is a no-op + if !trigger.disabled { + return Ok(()); + }; + + let mut activated = trigger.clone(); + activated.disabled = false; + let catalog_op = CatalogOp::EnableTrigger(TriggerIdentifier { + db_name: db_name.to_string(), + trigger_name: trigger_name.to_string(), + }); + if let Some(catalog_batch) = self.catalog.apply_catalog_batch(CatalogBatch { + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + time_ns: self.time_provider.now().timestamp_nanos(), + ops: vec![catalog_op], + })? { + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + } + + self.run_trigger(write_buffer, db_name, trigger_name) + .await?; + Ok(()) + } } -#[derive(Clone)] #[allow(unused)] pub(crate) enum PluginEvent { WriteWalContents(Arc), + Shutdown(oneshot::Sender<()>), } impl WriteBuffer for WriteBufferImpl {} @@ -2681,4 +2801,391 @@ mod tests { } batches } + + #[tokio::test] + async fn test_create_plugin() -> Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let (write_buffer, _, _) = + setup_cache_optional(start_time, test_store, wal_config, false).await; + + write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + let empty_udf = r#"def example(iterator, output): + return"#; + + write_buffer + .insert_plugin( + "foo", + "my_plugin".to_string(), + empty_udf.to_string(), + "example".to_string(), + PluginType::WalRows, + ) + .await?; + + let plugin = write_buffer + .catalog + .db_schema("foo") + .expect("should have db named foo") + .processing_engine_plugins + .get("my_plugin") + .unwrap() + .clone(); + let expected = PluginDefinition { + plugin_name: "my_plugin".to_string(), + code: empty_udf.to_string(), + function_name: "example".to_string(), + plugin_type: PluginType::WalRows, + }; + assert_eq!(expected, plugin); + + // confirm that creating it again is a no-op. + write_buffer + .insert_plugin( + "foo", + "my_plugin".to_string(), + empty_udf.to_string(), + "example".to_string(), + PluginType::WalRows, + ) + .await?; + + // confirm that a different argument is an error + let Err(Error::CatalogUpdateError(catalog::Error::ProcessingEngineCallExists { .. })) = + write_buffer + .insert_plugin( + "foo", + "my_plugin".to_string(), + empty_udf.to_string(), + "bad_example".to_string(), + PluginType::WalRows, + ) + .await + else { + panic!("failed to insert plugin"); + }; + + // Confirm the same contents can be added to a new name. + write_buffer + .insert_plugin( + "foo", + "my_second_plugin".to_string(), + empty_udf.to_string(), + "example".to_string(), + PluginType::WalRows, + ) + .await?; + Ok(()) + } + #[tokio::test] + async fn test_delete_plugin() -> Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let (write_buffer, _, _) = + setup_cache_optional(start_time, test_store, wal_config, false).await; + + // Create the DB by inserting a line. + write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // First create a plugin + write_buffer + .insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + "process".to_string(), + PluginType::WalRows, + ) + .await?; + + // Then delete it + write_buffer.delete_plugin("foo", "test_plugin").await?; + + // Verify plugin is gone from schema + let schema = write_buffer.catalog().db_schema("foo").unwrap(); + assert!(!schema.processing_engine_plugins.contains_key("test_plugin")); + + // Verify we can add a newly named plugin + write_buffer + .insert_plugin( + "foo", + "test_plugin".to_string(), + "def new_process(iterator, output): pass".to_string(), + "new_process".to_string(), + PluginType::WalRows, + ) + .await?; + + Ok(()) + } + + #[tokio::test] + async fn test_delete_plugin_with_active_trigger() -> Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let (write_buffer, _, _) = + setup_cache_optional(start_time, test_store, wal_config, false).await; + + // Create the DB by inserting a line. + write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // Create a plugin + write_buffer + .insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + "process".to_string(), + PluginType::WalRows, + ) + .await + .unwrap(); + + // Create a trigger using the plugin + write_buffer + .insert_trigger( + "foo", + "test_trigger".to_string(), + "test_plugin".to_string(), + TriggerSpecificationDefinition::AllTablesWalWrite, + false, + ) + .await + .unwrap(); + + // Try to delete the plugin - should fail because trigger exists + let result = write_buffer.delete_plugin("foo", "test_plugin").await; + assert!(matches!( + result, + Err(Error::CatalogUpdateError(catalog::Error::ProcessingEnginePluginInUse { + database_name, + plugin_name, + trigger_name, + })) if database_name == "foo" && plugin_name == "test_plugin" && trigger_name == "test_trigger" + )); + Ok(()) + } + + #[tokio::test] + async fn test_trigger_lifecycle() -> Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let (write_buffer, _, _) = + setup_cache_optional(start_time, test_store, wal_config, false).await; + + // convert to Arc + let write_buffer: Arc = write_buffer.clone(); + + // Create the DB by inserting a line. + write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // Create a plugin + write_buffer + .insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + "process".to_string(), + PluginType::WalRows, + ) + .await?; + + // Create an enabled trigger + write_buffer + .insert_trigger( + "foo", + "test_trigger".to_string(), + "test_plugin".to_string(), + TriggerSpecificationDefinition::AllTablesWalWrite, + false, + ) + .await?; + // Run the trigger + write_buffer + .run_trigger(Arc::clone(&write_buffer), "foo", "test_trigger") + .await?; + + // Deactivate the trigger + let result = write_buffer.deactivate_trigger("foo", "test_trigger").await; + assert!(result.is_ok()); + + // Verify trigger is disabled in schema + let schema = write_buffer.catalog().db_schema("foo").unwrap(); + let trigger = schema + .processing_engine_triggers + .get("test_trigger") + .unwrap(); + assert!(trigger.disabled); + + // Activate the trigger + let result = write_buffer + .activate_trigger(Arc::clone(&write_buffer), "foo", "test_trigger") + .await; + assert!(result.is_ok()); + + // Verify trigger is enabled and running + let schema = write_buffer.catalog().db_schema("foo").unwrap(); + let trigger = schema + .processing_engine_triggers + .get("test_trigger") + .unwrap(); + assert!(!trigger.disabled); + Ok(()) + } + + #[tokio::test] + async fn test_create_disabled_trigger() -> Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let (write_buffer, _, _) = + setup_cache_optional(start_time, test_store, wal_config, false).await; + + // Create the DB by inserting a line. + write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + // Create a plugin + write_buffer + .insert_plugin( + "foo", + "test_plugin".to_string(), + "def process(iterator, output): pass".to_string(), + "process".to_string(), + PluginType::WalRows, + ) + .await?; + + // Create a disabled trigger + write_buffer + .insert_trigger( + "foo", + "test_trigger".to_string(), + "test_plugin".to_string(), + TriggerSpecificationDefinition::AllTablesWalWrite, + true, + ) + .await?; + + // Verify trigger is created but disabled + let schema = write_buffer.catalog().db_schema("foo").unwrap(); + let trigger = schema + .processing_engine_triggers + .get("test_trigger") + .unwrap(); + assert!(trigger.disabled); + + // Verify trigger is not in active triggers list + assert!(write_buffer.catalog().triggers().is_empty()); + Ok(()) + } + + #[tokio::test] + async fn test_activate_nonexistent_trigger() -> Result<()> { + let start_time = Time::from_rfc3339("2024-11-14T11:00:00+00:00").unwrap(); + let test_store = Arc::new(InMemory::new()); + let wal_config = WalConfig { + gen1_duration: Gen1Duration::new_1m(), + max_write_buffer_size: 100, + flush_interval: Duration::from_millis(10), + snapshot_size: 1, + }; + let (write_buffer, _, _) = + setup_cache_optional(start_time, test_store, wal_config, false).await; + + let write_buffer: Arc = write_buffer.clone(); + + // Create the DB by inserting a line. + write_buffer + .write_lp( + NamespaceName::new("foo").unwrap(), + "cpu,warehouse=us-east,room=01a,device=10001 reading=37\n", + start_time, + false, + Precision::Nanosecond, + ) + .await?; + + let result = write_buffer + .activate_trigger(Arc::clone(&write_buffer), "foo", "nonexistent_trigger") + .await; + + assert!(matches!( + result, + Err(Error::CatalogUpdateError(catalog::Error::ProcessingEngineTriggerNotFound { + database_name, + trigger_name, + })) if database_name == "foo" && trigger_name == "nonexistent_trigger" + )); + Ok(()) + } } diff --git a/influxdb3_write/src/write_buffer/plugins.rs b/influxdb3_write/src/write_buffer/plugins.rs index 0ba393c5b7f..83b9d07309a 100644 --- a/influxdb3_write/src/write_buffer/plugins.rs +++ b/influxdb3_write/src/write_buffer/plugins.rs @@ -1,11 +1,10 @@ use crate::write_buffer::PluginEvent; use crate::{write_buffer, WriteBuffer}; use influxdb3_wal::{PluginType, TriggerDefinition, TriggerSpecificationDefinition}; -use observability_deps::tracing::warn; use std::fmt::Debug; use std::sync::Arc; use thiserror::Error; -use tokio::sync::broadcast::error::RecvError; +use tokio::sync::mpsc; #[derive(Debug, Error)] pub enum Error { @@ -18,6 +17,9 @@ pub enum Error { #[error(transparent)] WriteBufferError(#[from] write_buffer::Error), + + #[error("failed to send shutdown message back")] + FailedToShutdown, } /// `[ProcessingEngineManager]` is used to interact with the processing engine, @@ -35,12 +37,19 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { plugin_type: PluginType, ) -> crate::Result<(), write_buffer::Error>; + async fn delete_plugin( + &self, + db: &str, + plugin_name: &str, + ) -> crate::Result<(), write_buffer::Error>; + async fn insert_trigger( &self, db_name: &str, trigger_name: String, plugin_name: String, trigger_specification: TriggerSpecificationDefinition, + disabled: bool, ) -> crate::Result<(), write_buffer::Error>; /// Starts running the trigger, which will run in the background. @@ -50,6 +59,19 @@ pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { db_name: &str, trigger_name: &str, ) -> crate::Result<(), write_buffer::Error>; + + async fn deactivate_trigger( + &self, + db_name: &str, + trigger_name: &str, + ) -> Result<(), write_buffer::Error>; + + async fn activate_trigger( + &self, + write_buffer: Arc, + db_name: &str, + trigger_name: &str, + ) -> Result<(), write_buffer::Error>; } #[cfg(feature = "system-py")] @@ -72,31 +94,26 @@ pub(crate) fn run_plugin( pub(crate) struct PluginContext { // tokio channel for inputs - pub(crate) trigger_rx: tokio::sync::broadcast::Receiver, + pub(crate) trigger_rx: mpsc::Receiver, // handler to write data back to the DB. pub(crate) write_buffer: Arc, } #[async_trait::async_trait] trait RunnablePlugin { + // Returns true if it should exit async fn process_event( &self, event: PluginEvent, write_buffer: Arc, - ) -> Result<(), Error>; + ) -> Result; async fn run_plugin(&self, context: &mut PluginContext) -> Result<(), Error> { - loop { - match context.trigger_rx.recv().await { - Err(RecvError::Closed) => { - break; - } - Err(RecvError::Lagged(_)) => { - warn!("plugin lagged"); - } - Ok(event) => { - self.process_event(event, context.write_buffer.clone()) - .await?; - } + while let Some(event) = context.trigger_rx.recv().await { + if self + .process_event(event, context.write_buffer.clone()) + .await? + { + break; } } Ok(()) @@ -124,7 +141,7 @@ mod python_plugin { &self, event: PluginEvent, write_buffer: Arc, - ) -> Result<(), Error> { + ) -> Result { let Some(schema) = write_buffer.catalog().db_schema(self.db_name.as_str()) else { return Err(Error::MissingDb); }; @@ -168,6 +185,10 @@ mod python_plugin { } } } + PluginEvent::Shutdown(sender) => { + sender.send(()).map_err(|_| Error::FailedToShutdown)?; + return Ok(true); + } } if !output_lines.is_empty() { let ingest_time = SystemTime::now() @@ -184,7 +205,7 @@ mod python_plugin { .await?; } - Ok(()) + Ok(false) } } } diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 1e6cc4fabf1..27410488ccb 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -29,7 +29,7 @@ use iox_query::frontend::reorg::ReorgPlanner; use iox_query::QueryChunk; use object_store::path::Path; use observability_deps::tracing::{error, info}; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use parquet::format::FileMetaData; use schema::sort::SortKey; use schema::Schema; @@ -37,7 +37,7 @@ use std::any::Any; use std::sync::Arc; use std::time::Duration; use tokio::sync::oneshot::Receiver; -use tokio::sync::{broadcast, oneshot}; +use tokio::sync::{mpsc, oneshot, Mutex}; #[derive(Debug)] pub struct QueryableBuffer { @@ -52,7 +52,7 @@ pub struct QueryableBuffer { /// Sends a notification to this watch channel whenever a snapshot info is persisted persisted_snapshot_notify_rx: tokio::sync::watch::Receiver>, persisted_snapshot_notify_tx: tokio::sync::watch::Sender>, - plugin_event_tx: Mutex>>, + plugin_event_tx: Mutex>>, } pub struct QueryableBufferArgs { @@ -91,7 +91,7 @@ impl QueryableBuffer { parquet_cache, persisted_snapshot_notify_rx, persisted_snapshot_notify_tx, - plugin_event_tx: Mutex::new(None), + plugin_event_tx: Mutex::new(HashMap::new()), } } @@ -388,26 +388,55 @@ impl QueryableBuffer { } #[cfg(feature = "system-py")] - pub(crate) fn subscribe_to_plugin_events(&self) -> broadcast::Receiver { - let mut sender = self.plugin_event_tx.lock(); + pub(crate) async fn subscribe_to_plugin_events( + &self, + trigger_name: String, + ) -> mpsc::Receiver { + let mut senders = self.plugin_event_tx.lock().await; + + // TODO: should we be checking for replacements? + let (plugin_tx, plugin_rx) = mpsc::channel(4); + senders.insert(trigger_name, plugin_tx); + plugin_rx + } - if sender.is_none() { - let (tx, rx) = broadcast::channel(1024); - *sender = Some(tx); - return rx; + /// Deactivates a running trigger by sending it a oneshot sender. It should send back a message and then immediately shut down. + pub(crate) async fn deactivate_trigger( + &self, + #[allow(unused)] trigger_name: String, + ) -> Result<(), anyhow::Error> { + #[cfg(feature = "system-py")] + { + let Some(sender) = self.plugin_event_tx.lock().await.remove(&trigger_name) else { + anyhow::bail!("no trigger named '{}' found", trigger_name); + }; + let (oneshot_tx, oneshot_rx) = oneshot::channel(); + sender.send(PluginEvent::Shutdown(oneshot_tx)).await?; + oneshot_rx.await?; + } + Ok(()) + } + + async fn send_to_plugins(&self, wal_contents: &WalContents) { + let senders = self.plugin_event_tx.lock().await; + if !senders.is_empty() { + let wal_contents = Arc::new(wal_contents.clone()); + for (plugin, sender) in senders.iter() { + if let Err(err) = sender + .send(PluginEvent::WriteWalContents(Arc::clone(&wal_contents))) + .await + { + error!("failed to send plugin event to plugin {}: {}", plugin, err); + } + } } - sender.as_ref().unwrap().subscribe() } } #[async_trait] impl WalFileNotifier for QueryableBuffer { - fn notify(&self, write: WalContents) { - if let Some(sender) = self.plugin_event_tx.lock().as_ref() { - if let Err(err) = sender.send(PluginEvent::WriteWalContents(Arc::new(write.clone()))) { - error!(%err, "Error sending WAL content to plugins"); - } - } + async fn notify(&self, write: WalContents) { + self.send_to_plugins(&write).await; self.buffer_contents(write) } @@ -416,11 +445,7 @@ impl WalFileNotifier for QueryableBuffer { write: WalContents, snapshot_details: SnapshotDetails, ) -> Receiver { - if let Some(sender) = self.plugin_event_tx.lock().as_ref() { - if let Err(err) = sender.send(PluginEvent::WriteWalContents(Arc::new(write.clone()))) { - error!(%err, "Error sending WAL content to plugins"); - } - } + self.send_to_plugins(&write).await; self.buffer_contents_and_persist_snapshotted_data(write, snapshot_details) .await } @@ -537,6 +562,9 @@ impl BufferState { } CatalogOp::CreatePlugin(_) => {} CatalogOp::CreateTrigger(_) => {} + CatalogOp::EnableTrigger(_) => {} + CatalogOp::DisableTrigger(_) => {} + CatalogOp::DeletePlugin(_) => {} } } } @@ -786,7 +814,7 @@ mod tests { wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64; // write the lp into the buffer - queryable_buffer.notify(wal_contents); + queryable_buffer.notify(wal_contents).await; // now force a snapshot, persisting the data to parquet file. Also, buffer up a new write let snapshot_sequence_number = SnapshotSequenceNumber::new(1);