diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index f6a93d4af..1d794e511 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -37,6 +37,7 @@ use ulid::Ulid; pub mod alerts_utils; pub mod target; +use crate::alerts::target::TARGETS; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::rbac::map::SessionKey; use crate::storage; @@ -44,8 +45,6 @@ use crate::storage::ObjectStorageError; use crate::sync::alert_runtime; use crate::utils::user_auth_for_query; -use self::target::Target; - // these types describe the scheduled task for an alert pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>); @@ -531,23 +530,28 @@ pub struct AlertRequest { pub alert_type: AlertType, pub aggregates: Aggregates, pub eval_config: EvalConfig, - pub targets: Vec, + pub targets: Vec, } -impl From for AlertConfig { - fn from(val: AlertRequest) -> AlertConfig { - AlertConfig { +impl AlertRequest { + pub async fn into(self) -> Result { + // Validate that all target IDs exist + for id in &self.targets { + TARGETS.get_target_by_id(id).await?; + } + let config = AlertConfig { version: AlertVerison::from(CURRENT_ALERTS_VERSION), id: Ulid::new(), - severity: val.severity, - title: val.title, - stream: val.stream, - alert_type: val.alert_type, - aggregates: val.aggregates, - eval_config: val.eval_config, - targets: val.targets, + severity: self.severity, + title: self.title, + stream: self.stream, + alert_type: self.alert_type, + aggregates: self.aggregates, + eval_config: self.eval_config, + targets: self.targets, state: AlertState::default(), - } + }; + Ok(config) } } @@ -563,14 +567,18 @@ pub struct AlertConfig { pub alert_type: AlertType, pub aggregates: Aggregates, pub eval_config: EvalConfig, - pub targets: Vec, + pub targets: Vec, // for new alerts, state should be resolved #[serde(default)] pub state: AlertState, } impl AlertConfig { - pub fn modify(&mut self, alert: AlertRequest) { + pub async fn modify(&mut self, alert: AlertRequest) -> Result<(), AlertError> { + // Validate that all target IDs exist + for id in &alert.targets { + TARGETS.get_target_by_id(id).await?; + } self.title = alert.title; self.stream = alert.stream; self.alert_type = alert.alert_type; @@ -578,6 +586,7 @@ impl AlertConfig { self.eval_config = alert.eval_config; self.targets = alert.targets; self.state = AlertState::default(); + Ok(()) } pub fn get_base_query(&self) -> String { @@ -599,7 +608,8 @@ impl AlertConfig { }; // validate that target repeat notifs !> eval_frequency - for target in &self.targets { + for target_id in &self.targets { + let target = TARGETS.get_target_by_id(target_id).await?; match &target.timeout.times { target::Retry::Infinite => {} target::Retry::Finite(repeat) => { @@ -806,7 +816,8 @@ impl AlertConfig { pub async fn trigger_notifications(&self, message: String) -> Result<(), AlertError> { let mut context = self.get_context(); context.message = message; - for target in &self.targets { + for target_id in &self.targets { + let target = TARGETS.get_target_by_id(target_id).await?; trace!("Target (trigger_notifications)-\n{target:?}"); target.call(context.clone()); } @@ -840,6 +851,12 @@ pub enum AlertError { InvalidAlertModifyRequest, #[error("{0}")] FromStrError(#[from] FromStrError), + #[error("Invalid Target ID- {0}")] + InvalidTargetID(String), + #[error("Target already exists")] + DuplicateTargetConfig, + #[error("Can't delete a Target which is being used")] + TargetInUse, } impl actix_web::ResponseError for AlertError { @@ -857,6 +874,9 @@ impl actix_web::ResponseError for AlertError { Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::InvalidAlertModifyRequest => StatusCode::BAD_REQUEST, Self::FromStrError(_) => StatusCode::BAD_REQUEST, + Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST, + Self::DuplicateTargetConfig => StatusCode::BAD_REQUEST, + Self::TargetInUse => StatusCode::CONFLICT, } } diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 3c8939cca..6b072369c 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -24,18 +24,110 @@ use std::{ use async_trait::async_trait; use base64::Engine; +use bytes::Bytes; use chrono::Utc; use http::{header::AUTHORIZATION, HeaderMap, HeaderValue}; use humantime_serde::re::humantime; +use itertools::Itertools; +use once_cell::sync::Lazy; use reqwest::ClientBuilder; +use tokio::sync::RwLock; use tracing::{error, trace, warn}; +use ulid::Ulid; use url::Url; +use crate::{alerts::AlertError, parseable::PARSEABLE, storage::object_storage::target_json_path}; + use super::ALERTS; use super::{AlertState, CallableTarget, Context}; -#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] +pub static TARGETS: Lazy = Lazy::new(|| TargetConfigs { + target_configs: RwLock::new(HashMap::new()), +}); + +#[derive(Debug)] +pub struct TargetConfigs { + pub target_configs: RwLock>, +} + +impl TargetConfigs { + /// Loads alerts from disk, blocks + pub async fn load(&self) -> anyhow::Result<()> { + let mut map = self.target_configs.write().await; + let store = PARSEABLE.storage.get_object_store(); + + for alert in store.get_targets().await.unwrap_or_default() { + map.insert(alert.id, alert); + } + + Ok(()) + } + + pub async fn update(&self, target: Target) -> Result<(), AlertError> { + let mut map = self.target_configs.write().await; + if map.values().any(|t| { + t.target == target.target + && t.timeout.interval == target.timeout.interval + && t.timeout.times == target.timeout.times + && t.id != target.id + }) { + return Err(AlertError::DuplicateTargetConfig); + } + map.insert(target.id, target.clone()); + + let path = target_json_path(&target.id); + + let store = PARSEABLE.storage.get_object_store(); + let target_bytes = serde_json::to_vec(&target)?; + store.put_object(&path, Bytes::from(target_bytes)).await?; + Ok(()) + } + + pub async fn list(&self) -> Result, AlertError> { + let targets = self + .target_configs + .read() + .await + .values() + .cloned() + .collect_vec(); + Ok(targets) + } + + pub async fn get_target_by_id(&self, target_id: &Ulid) -> Result { + let target = self + .target_configs + .read() + .await + .get(target_id) + .ok_or(AlertError::InvalidTargetID(target_id.to_string())) + .cloned()?; + + Ok(target) + } + + pub async fn delete(&self, target_id: &Ulid) -> Result { + // ensure that the target is not being used by any alert + for (_, alert) in ALERTS.alerts.read().await.iter() { + if alert.targets.contains(target_id) { + return Err(AlertError::TargetInUse); + } + } + let target = self + .target_configs + .write() + .await + .remove(target_id) + .ok_or(AlertError::InvalidTargetID(target_id.to_string()))?; + let path = target_json_path(&target.id); + let store = PARSEABLE.storage.get_object_store(); + store.delete_object(&path).await?; + Ok(target) + } +} + +#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq)] #[serde(rename_all = "camelCase")] #[serde(untagged)] pub enum Retry { @@ -53,10 +145,13 @@ impl Default for Retry { #[serde(rename_all = "camelCase")] #[serde(try_from = "TargetVerifier")] pub struct Target { + pub name: String, #[serde(flatten)] pub target: TargetType, #[serde(default, rename = "repeat")] pub timeout: Timeout, + #[serde(default = "Ulid::new")] + pub id: Ulid, } impl Target { @@ -189,10 +284,13 @@ pub struct RepeatVerifier { #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct TargetVerifier { + pub name: String, #[serde(flatten)] pub target: TargetType, #[serde(default)] pub repeat: Option, + #[serde(default = "Ulid::new")] + pub id: Ulid, } impl TryFrom for Target { @@ -223,8 +321,10 @@ impl TryFrom for Target { } Ok(Target { + name: value.name, target: value.target, timeout, + id: value.id, }) } } diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index 66e573c18..bbc073785 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -48,7 +48,7 @@ pub async fn post( req: HttpRequest, Json(alert): Json, ) -> Result { - let alert: AlertConfig = alert.into(); + let alert: AlertConfig = alert.into().await?; alert.validate().await?; // validate the incoming alert query diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 7ea660b53..1d6c78246 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -48,6 +48,7 @@ pub mod query; pub mod rbac; pub mod resource_check; pub mod role; +pub mod targets; pub mod users; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "api"; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index d40ab73df..595c52833 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -74,6 +74,7 @@ impl ParseableServer for QueryServer { ))) .service(Server::get_metrics_webscope()) .service(Server::get_alerts_webscope()) + .service(Server::get_targets_webscope()) .service(Self::get_cluster_web_scope()), ) .service( diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 648918907..e92ae836d 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -27,6 +27,7 @@ use crate::handlers::http::health_check; use crate::handlers::http::prism_base_path; use crate::handlers::http::query; use crate::handlers::http::resource_check; +use crate::handlers::http::targets; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; use crate::hottier::HotTierManager; @@ -95,6 +96,7 @@ impl ParseableServer for Server { resource_check::check_resource_utilization_middleware, ))) .service(Self::get_alerts_webscope()) + .service(Self::get_targets_webscope()) .service(Self::get_metrics_webscope()), ) .service( @@ -263,6 +265,25 @@ impl Server { ) } + pub fn get_targets_webscope() -> Scope { + web::scope("/targets") + .service( + web::resource("") + .route(web::get().to(targets::list).authorize(Action::GetAlert)) + .route(web::post().to(targets::post).authorize(Action::PutAlert)), + ) + .service( + web::resource("/{target_id}") + .route(web::get().to(targets::get).authorize(Action::GetAlert)) + .route(web::put().to(targets::update).authorize(Action::PutAlert)) + .route( + web::delete() + .to(targets::delete) + .authorize(Action::DeleteAlert), + ), + ) + } + // get the dashboards web scope pub fn get_dashboards_webscope() -> Scope { web::scope("/dashboards") diff --git a/src/handlers/http/targets.rs b/src/handlers/http/targets.rs new file mode 100644 index 000000000..391eb53f5 --- /dev/null +++ b/src/handlers/http/targets.rs @@ -0,0 +1,72 @@ +use actix_web::{ + web::{self, Json, Path}, + HttpRequest, Responder, +}; +use ulid::Ulid; + +use crate::alerts::{ + target::{Target, TARGETS}, + AlertError, +}; + +// POST /targets +pub async fn post( + _req: HttpRequest, + Json(target): Json, +) -> Result { + // should check for duplicacy and liveness (??) + // add to the map + TARGETS.update(target.clone()).await?; + + Ok(web::Json(target)) +} + +// GET /targets +pub async fn list(_req: HttpRequest) -> Result { + // add to the map + let list = TARGETS.list().await?; + + Ok(web::Json(list)) +} + +// GET /targets/{target_id} +pub async fn get(_req: HttpRequest, target_id: Path) -> Result { + let target_id = target_id.into_inner(); + + let target = TARGETS.get_target_by_id(&target_id).await?; + + Ok(web::Json(target)) +} + +// PUT /targets/{target_id} +pub async fn update( + _req: HttpRequest, + target_id: Path, + Json(mut target): Json, +) -> Result { + let target_id = target_id.into_inner(); + + // if target_id does not exist, error + TARGETS.get_target_by_id(&target_id).await?; + + // esnure that the supplied target id is assigned to the target config + target.id = target_id; + + // should check for duplicacy and liveness (??) + // add to the map + TARGETS.update(target.clone()).await?; + + Ok(web::Json(target)) +} + +// DELETE /targets/{target_id} +pub async fn delete( + _req: HttpRequest, + target_id: Path, +) -> Result { + let target_id = target_id.into_inner(); + + let target = TARGETS.delete(&target_id).await?; + + Ok(web::Json(target)) +} diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index cc6a82a53..57a7e5a4c 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -41,6 +41,7 @@ use crate::{ metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, option::validation, parseable::LogStream, + storage::SETTINGS_ROOT_DIRECTORY, }; use super::{ @@ -336,6 +337,7 @@ impl ObjectStorage for LocalFS { PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR, ALERTS_ROOT_DIRECTORY, + SETTINGS_ROOT_DIRECTORY, ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; @@ -356,6 +358,7 @@ impl ObjectStorage for LocalFS { "lost+found", PARSEABLE_ROOT_DIRECTORY, ALERTS_ROOT_DIRECTORY, + SETTINGS_ROOT_DIRECTORY, ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 63e3803bd..8cab1690f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -59,6 +59,8 @@ pub const STREAM_ROOT_DIRECTORY: &str = ".stream"; pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable"; pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts"; +pub const SETTINGS_ROOT_DIRECTORY: &str = ".settings"; +pub const TARGETS_ROOT_DIRECTORY: &str = ".targets"; pub const MANIFEST_FILE: &str = "manifest.json"; // max concurrent request allowed for datafusion object store diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index f87b98b4b..f306d86ba 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -59,6 +59,7 @@ use tracing::info; use tracing::{error, warn}; use ulid::Ulid; +use crate::alerts::target::Target; use crate::alerts::AlertConfig; use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; use crate::correlation::{CorrelationConfig, CorrelationError}; @@ -78,6 +79,8 @@ use crate::parseable::PARSEABLE; use crate::query::QUERY_SESSION_STATE; use crate::stats::FullStats; use crate::storage::StreamType; +use crate::storage::SETTINGS_ROOT_DIRECTORY; +use crate::storage::TARGETS_ROOT_DIRECTORY; use crate::utils::DATASET_STATS_STREAM_NAME; use super::{ @@ -443,6 +446,26 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(alerts) } + async fn get_targets(&self) -> Result, ObjectStorageError> { + let targets_path = + RelativePathBuf::from_iter([SETTINGS_ROOT_DIRECTORY, TARGETS_ROOT_DIRECTORY]); + let targets = self + .get_objects( + Some(&targets_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await? + .iter() + .filter_map(|bytes| { + serde_json::from_slice(bytes) + .inspect_err(|err| warn!("Expected compatible json, error = {err}")) + .ok() + }) + .collect(); + + Ok(targets) + } + async fn upsert_stream_metadata( &self, stream_name: &str, @@ -1337,6 +1360,16 @@ pub fn alert_json_path(alert_id: Ulid) -> RelativePathBuf { RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY, &format!("{alert_id}.json")]) } +/// TODO: Needs to be updated for distributed mode +#[inline(always)] +pub fn target_json_path(target_id: &Ulid) -> RelativePathBuf { + RelativePathBuf::from_iter([ + SETTINGS_ROOT_DIRECTORY, + TARGETS_ROOT_DIRECTORY, + &format!("{target_id}.json"), + ]) +} + #[inline(always)] pub fn manifest_path(prefix: &str) -> RelativePathBuf { match &PARSEABLE.options.mode {