Skip to content

Commit

Permalink
feat(processing_engine): Add REST API endpoints for activating and de…
Browse files Browse the repository at this point in the history
…activating triggers. (#25711)
  • Loading branch information
jacksonrnewhouse authored Jan 2, 2025
1 parent de227b9 commit 29dacc3
Show file tree
Hide file tree
Showing 9 changed files with 814 additions and 63 deletions.
110 changes: 105 additions & 5 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use hashbrown::HashMap;
use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition, FieldAdditions,
FieldDefinition, LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete,
OrderedCatalogBatch, PluginDefinition, TriggerDefinition,
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeletePluginDefinition,
DeleteTableDefinition, FieldAdditions, FieldDefinition, LastCacheDefinition, LastCacheDelete,
MetaCacheDefinition, MetaCacheDelete, OrderedCatalogBatch, PluginDefinition, TriggerDefinition,
TriggerIdentifier,
};
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
Expand Down Expand Up @@ -99,6 +100,18 @@ pub enum Error {
trigger_name: String,
},

#[error(
"Cannot delete plugin {} in database {} because it is used by trigger {}",
plugin_name,
database_name,
trigger_name
)]
ProcessingEnginePluginInUse {
database_name: String,
plugin_name: String,
trigger_name: String,
},

#[error(
"Processing Engine Plugin {} not in DB schema for {}",
plugin_name,
Expand Down Expand Up @@ -317,8 +330,14 @@ impl Catalog {
.flat_map(|schema| {
schema
.processing_engine_triggers
.keys()
.map(move |key| (schema.name.to_string(), key.to_string()))
.iter()
.filter_map(move |(key, trigger)| {
if trigger.disabled {
None
} else {
Some((schema.name.to_string(), key.to_string()))
}
})
})
.collect();
result
Expand Down Expand Up @@ -692,8 +711,15 @@ impl UpdateDatabaseSchema for CatalogOp {
}
CatalogOp::DeleteDatabase(delete_database) => delete_database.update_schema(schema),
CatalogOp::DeleteTable(delete_table) => delete_table.update_schema(schema),
CatalogOp::DeletePlugin(delete_plugin) => delete_plugin.update_schema(schema),
CatalogOp::CreatePlugin(create_plugin) => create_plugin.update_schema(schema),
CatalogOp::CreateTrigger(create_trigger) => create_trigger.update_schema(schema),
CatalogOp::EnableTrigger(trigger_identifier) => {
EnableTrigger(trigger_identifier.clone()).update_schema(schema)
}
CatalogOp::DisableTrigger(trigger_identifier) => {
DisableTrigger(trigger_identifier.clone()).update_schema(schema)
}
}
}
}
Expand Down Expand Up @@ -760,6 +786,29 @@ impl UpdateDatabaseSchema for DeleteTableDefinition {
}
}

impl UpdateDatabaseSchema for DeletePluginDefinition {
fn update_schema<'a>(
&self,
mut schema: Cow<'a, DatabaseSchema>,
) -> Result<Cow<'a, DatabaseSchema>> {
// check that there aren't any triggers with this name.
for (trigger_name, trigger) in &schema.processing_engine_triggers {
if trigger.plugin_name == self.plugin_name {
return Err(Error::ProcessingEnginePluginInUse {
database_name: schema.name.to_string(),
plugin_name: self.plugin_name.to_string(),
trigger_name: trigger_name.to_string(),
});
}
}
schema
.to_mut()
.processing_engine_plugins
.remove(&self.plugin_name);
Ok(schema)
}
}

impl UpdateDatabaseSchema for PluginDefinition {
fn update_schema<'a>(
&self,
Expand All @@ -785,6 +834,57 @@ impl UpdateDatabaseSchema for PluginDefinition {
}
}

struct EnableTrigger(TriggerIdentifier);
struct DisableTrigger(TriggerIdentifier);

impl UpdateDatabaseSchema for EnableTrigger {
fn update_schema<'a>(
&self,
mut schema: Cow<'a, DatabaseSchema>,
) -> Result<Cow<'a, DatabaseSchema>> {
let Some(trigger) = schema.processing_engine_triggers.get(&self.0.trigger_name) else {
return Err(Error::ProcessingEngineTriggerNotFound {
database_name: self.0.db_name.to_string(),
trigger_name: self.0.trigger_name.to_string(),
});
};
if !trigger.disabled {
return Ok(schema);
}
let mut_trigger = schema
.to_mut()
.processing_engine_triggers
.get_mut(&self.0.trigger_name)
.expect("already checked containment");
mut_trigger.disabled = false;
Ok(schema)
}
}

impl UpdateDatabaseSchema for DisableTrigger {
fn update_schema<'a>(
&self,
mut schema: Cow<'a, DatabaseSchema>,
) -> Result<Cow<'a, DatabaseSchema>> {
let Some(trigger) = schema.processing_engine_triggers.get(&self.0.trigger_name) else {
return Err(Error::ProcessingEngineTriggerNotFound {
database_name: self.0.db_name.to_string(),
trigger_name: self.0.trigger_name.to_string(),
});
};
if trigger.disabled {
return Ok(schema);
}
let mut_trigger = schema
.to_mut()
.processing_engine_triggers
.get_mut(&self.0.trigger_name)
.expect("already checked containment");
mut_trigger.disabled = true;
Ok(schema)
}
}

impl UpdateDatabaseSchema for TriggerDefinition {
fn update_schema<'a>(
&self,
Expand Down
3 changes: 3 additions & 0 deletions influxdb3_catalog/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
plugin_name: plugin.plugin_name.to_string(),
plugin,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
disabled: trigger.disabled,
},
)
})
Expand Down Expand Up @@ -171,6 +172,7 @@ struct ProcessingEngineTriggerSnapshot {
pub trigger_name: String,
pub plugin_name: String,
pub trigger_specification: String,
pub disabled: bool,
}

/// Representation of Arrow's `DataType` for table snapshots.
Expand Down Expand Up @@ -434,6 +436,7 @@ impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot {
plugin_name: trigger.plugin_name.to_string(),
trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"),
disabled: trigger.disabled,
}
}
}
Expand Down
76 changes: 73 additions & 3 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,19 @@ where
.body(Body::empty())?)
}

async fn delete_processing_engine_plugin(&self, req: Request<Body>) -> Result<Response<Body>> {
let ProcessingEnginePluginDeleteRequest { db, plugin_name } =
if let Some(query) = req.uri().query() {
serde_urlencoded::from_str(query)?
} else {
self.read_body_json(req).await?
};
self.write_buffer.delete_plugin(&db, &plugin_name).await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}

async fn configure_processing_engine_trigger(
&self,
req: Request<Body>,
Expand All @@ -982,6 +995,7 @@ where
plugin_name,
trigger_name,
trigger_specification,
disabled,
} = if let Some(query) = req.uri().query() {
serde_urlencoded::from_str(query)?
} else {
Expand All @@ -993,13 +1007,47 @@ where
trigger_name.clone(),
plugin_name,
trigger_specification,
disabled,
)
.await?;
if !disabled {
self.write_buffer
.run_trigger(
Arc::clone(&self.write_buffer),
db.as_str(),
trigger_name.as_str(),
)
.await?;
}
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}

async fn deactivate_processing_engine_trigger(
&self,
req: Request<Body>,
) -> Result<Response<Body>> {
let query = req.uri().query().unwrap_or("");
let delete_req = serde_urlencoded::from_str::<ProcessingEngineTriggerIdentifier>(query)?;
self.write_buffer
.run_trigger(
.deactivate_trigger(delete_req.db.as_str(), delete_req.trigger_name.as_str())
.await?;
Ok(Response::builder()
.status(StatusCode::OK)
.body(Body::empty())?)
}
async fn activate_processing_engine_trigger(
&self,
req: Request<Body>,
) -> Result<Response<Body>> {
let query = req.uri().query().unwrap_or("");
let delete_req = serde_urlencoded::from_str::<ProcessingEngineTriggerIdentifier>(query)?;
self.write_buffer
.activate_trigger(
Arc::clone(&self.write_buffer),
db.as_str(),
trigger_name.as_str(),
delete_req.db.as_str(),
delete_req.trigger_name.as_str(),
)
.await?;
Ok(Response::builder()
Expand Down Expand Up @@ -1441,13 +1489,26 @@ struct ProcessingEnginePluginCreateRequest {
plugin_type: PluginType,
}

#[derive(Debug, Deserialize)]
struct ProcessingEnginePluginDeleteRequest {
db: String,
plugin_name: String,
}

/// Request definition for `POST /api/v3/configure/processing_engine_trigger` API
#[derive(Debug, Deserialize)]
struct ProcessEngineTriggerCreateRequest {
db: String,
plugin_name: String,
trigger_name: String,
trigger_specification: TriggerSpecificationDefinition,
disabled: bool,
}

#[derive(Debug, Deserialize)]
struct ProcessingEngineTriggerIdentifier {
db: String,
trigger_name: String,
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -1563,6 +1624,15 @@ pub(crate) async fn route_request<T: TimeProvider>(
(Method::POST, "/api/v3/configure/processing_engine_plugin") => {
http_server.configure_processing_engine_plugin(req).await
}
(Method::DELETE, "/api/v3/configure/processing_engine_plugin") => {
http_server.delete_processing_engine_plugin(req).await
}
(Method::POST, "/api/v3/configure/processing_engine_trigger/deactivate") => {
http_server.deactivate_processing_engine_trigger(req).await
}
(Method::POST, "/api/v3/configure/processing_engine_trigger/activate") => {
http_server.activate_processing_engine_trigger(req).await
}
(Method::POST, "/api/v3/configure/processing_engine_trigger") => {
http_server.configure_processing_engine_trigger(req).await
}
Expand Down
9 changes: 8 additions & 1 deletion influxdb3_server/src/system_tables/python_call.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow_array::{ArrayRef, RecordBatch, StringArray};
use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::common::Result;
Expand Down Expand Up @@ -94,6 +94,7 @@ fn trigger_schema() -> SchemaRef {
Field::new("trigger_name", DataType::Utf8, false),
Field::new("plugin_name", DataType::Utf8, false),
Field::new("trigger_specification", DataType::Utf8, false),
Field::new("disabled", DataType::Boolean, false),
];
Schema::new(columns).into()
}
Expand Down Expand Up @@ -124,10 +125,16 @@ impl IoxSystemTable for ProcessingEngineTriggerTable {
.iter()
.map(|trigger| serde_json::to_string(&trigger.trigger).ok())
.collect::<StringArray>();
let disabled = self
.triggers
.iter()
.map(|trigger| Some(trigger.disabled))
.collect::<BooleanArray>();
let columns: Vec<ArrayRef> = vec![
Arc::new(trigger_column),
Arc::new(plugin_column),
Arc::new(specification_column),
Arc::new(disabled),
];
Ok(RecordBatch::try_new(Arc::clone(&self.schema), columns)?)
}
Expand Down
17 changes: 16 additions & 1 deletion influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub trait Wal: Debug + Send + Sync + 'static {
#[async_trait]
pub trait WalFileNotifier: Debug + Send + Sync + 'static {
/// Notify the handler that a new WAL file has been persisted with the given contents.
fn notify(&self, write: WalContents);
async fn notify(&self, write: WalContents);

/// Notify the handler that a new WAL file has been persisted with the given contents and tell
/// it to snapshot the data. The returned receiver will be signalled when the snapshot is complete.
Expand Down Expand Up @@ -301,7 +301,10 @@ pub enum CatalogOp {
DeleteDatabase(DeleteDatabaseDefinition),
DeleteTable(DeleteTableDefinition),
CreatePlugin(PluginDefinition),
DeletePlugin(DeletePluginDefinition),
CreateTrigger(TriggerDefinition),
EnableTrigger(TriggerIdentifier),
DisableTrigger(TriggerIdentifier),
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -587,6 +590,11 @@ pub struct PluginDefinition {
pub plugin_type: PluginType,
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct DeletePluginDefinition {
pub plugin_name: String,
}

#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize, Copy)]
#[serde(rename_all = "snake_case")]
pub enum PluginType {
Expand All @@ -600,6 +608,13 @@ pub struct TriggerDefinition {
pub trigger: TriggerSpecificationDefinition,
// TODO: decide whether this should be populated from a reference rather than stored on its own.
pub plugin: PluginDefinition,
pub disabled: bool,
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct TriggerIdentifier {
pub db_name: String,
pub trigger_name: String,
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
Expand Down
Loading

0 comments on commit 29dacc3

Please sign in to comment.