From 2b53b8822bb7e8e2641dbbaf5c91f45f98c9e55f Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 18 Jun 2025 13:55:17 +0530 Subject: [PATCH 1/6] Bring targets out of alerts Adds CRUD operations for targets TODO- - Storage of targets depending upon RBAC discussion - Modify alerts to use targets using ID (AlertConfig update) --- src/alerts/mod.rs | 43 ++++++++++++------ src/alerts/target.rs | 61 +++++++++++++++++++++++++ src/handlers/http/alerts.rs | 2 +- src/handlers/http/mod.rs | 1 + src/handlers/http/modal/server.rs | 21 +++++++++ src/handlers/http/targets.rs | 75 +++++++++++++++++++++++++++++++ 6 files changed, 188 insertions(+), 15 deletions(-) create mode 100644 src/handlers/http/targets.rs diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index e9324e96b..d74f47371 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -37,6 +37,7 @@ use ulid::Ulid; pub mod alerts_utils; pub mod target; +use crate::alerts::target::TARGETS; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::rbac::map::SessionKey; use crate::storage; @@ -513,23 +514,28 @@ pub struct AlertRequest { pub alert_type: AlertType, pub aggregates: Aggregates, pub eval_config: EvalConfig, - pub targets: Vec, + pub targets: Vec, } -impl From for AlertConfig { - fn from(val: AlertRequest) -> AlertConfig { - AlertConfig { +impl AlertRequest { + pub async fn into(self) -> Result { + let mut targets = Vec::new(); + for id in self.targets { + targets.push(TARGETS.get_target_by_id(id).await?); + } + let config = AlertConfig { version: AlertVerison::from(CURRENT_ALERTS_VERSION), id: Ulid::new(), - severity: val.severity, - title: val.title, - stream: val.stream, - alert_type: val.alert_type, - aggregates: val.aggregates, - eval_config: val.eval_config, - targets: val.targets, + severity: self.severity, + title: self.title, + stream: self.stream, + alert_type: self.alert_type, + aggregates: self.aggregates, + eval_config: self.eval_config, + targets, state: AlertState::default(), - } + }; + Ok(config) } } @@ -552,14 +558,20 @@ pub struct AlertConfig { } impl AlertConfig { - pub fn modify(&mut self, alert: AlertRequest) { + pub async fn modify(&mut self, alert: AlertRequest) -> Result<(), AlertError> { + let mut targets = Vec::new(); + for id in alert.targets { + targets.push(TARGETS.get_target_by_id(id).await?); + } + self.title = alert.title; self.stream = alert.stream; self.alert_type = alert.alert_type; self.aggregates = alert.aggregates; self.eval_config = alert.eval_config; - self.targets = alert.targets; + self.targets = targets; self.state = AlertState::default(); + Ok(()) } pub fn get_base_query(&self) -> String { @@ -801,6 +813,8 @@ pub enum AlertError { InvalidAlertModifyRequest, #[error("{0}")] FromStrError(#[from] FromStrError), + #[error("Invalid Target ID- {0}")] + InvalidTargetID(String), } impl actix_web::ResponseError for AlertError { @@ -818,6 +832,7 @@ impl actix_web::ResponseError for AlertError { Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::InvalidAlertModifyRequest => StatusCode::BAD_REQUEST, Self::FromStrError(_) => StatusCode::BAD_REQUEST, + Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST, } } diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 3c8939cca..e46269ede 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -27,14 +27,65 @@ use base64::Engine; use chrono::Utc; use http::{header::AUTHORIZATION, HeaderMap, HeaderValue}; use humantime_serde::re::humantime; +use itertools::Itertools; +use once_cell::sync::Lazy; use reqwest::ClientBuilder; +use tokio::sync::RwLock; use tracing::{error, trace, warn}; +use ulid::Ulid; use url::Url; +use crate::alerts::AlertError; + use super::ALERTS; use super::{AlertState, CallableTarget, Context}; +pub static TARGETS: Lazy = Lazy::new(|| TargetConfigs { + target_configs: RwLock::new(HashMap::new()), +}); + +#[derive(Debug)] +pub struct TargetConfigs { + pub target_configs: RwLock>, +} + +impl TargetConfigs { + pub async fn update(&self, target: Target) -> Result<(), AlertError> { + let id = target.id; + self.target_configs.write().await.insert(id, target); + Ok(()) + } + + pub async fn list(&self) -> Result, AlertError> { + let targets = self + .target_configs + .read() + .await + .iter() + .map(|(_, v)| v.clone()) + .collect_vec(); + Ok(targets) + } + + pub async fn get_target_by_id(&self, target_id: Ulid) -> Result { + self.target_configs + .read() + .await + .get(&target_id) + .ok_or(AlertError::InvalidTargetID(target_id.to_string())) + .cloned() + } + + pub async fn delete(&self, target_id: Ulid) -> Result { + self.target_configs + .write() + .await + .remove(&target_id) + .ok_or(AlertError::InvalidTargetID(target_id.to_string())) + } +} + #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] #[serde(untagged)] @@ -57,9 +108,16 @@ pub struct Target { pub target: TargetType, #[serde(default, rename = "repeat")] pub timeout: Timeout, + #[serde(default = "Ulid::new")] + pub id: Ulid, } impl Target { + pub async fn validate(&self) { + // just check for liveness + // what if the target is not live yet but is added by the user? + } + pub fn call(&self, context: Context) { trace!("target.call context- {context:?}"); let timeout = &self.timeout; @@ -193,6 +251,8 @@ pub struct TargetVerifier { pub target: TargetType, #[serde(default)] pub repeat: Option, + #[serde(default = "Ulid::new")] + pub id: Ulid, } impl TryFrom for Target { @@ -225,6 +285,7 @@ impl TryFrom for Target { Ok(Target { target: value.target, timeout, + id: value.id, }) } } diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index 66e573c18..bbc073785 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -48,7 +48,7 @@ pub async fn post( req: HttpRequest, Json(alert): Json, ) -> Result { - let alert: AlertConfig = alert.into(); + let alert: AlertConfig = alert.into().await?; alert.validate().await?; // validate the incoming alert query diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 874a2aed5..f9417207a 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -47,6 +47,7 @@ pub mod prism_logstream; pub mod query; pub mod rbac; pub mod role; +pub mod targets; pub mod users; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "api"; diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index d22e5de02..94945d437 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -26,6 +26,7 @@ use crate::handlers::http::base_path; use crate::handlers::http::health_check; use crate::handlers::http::prism_base_path; use crate::handlers::http::query; +use crate::handlers::http::targets; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; use crate::hottier::HotTierManager; @@ -87,6 +88,7 @@ impl ParseableServer for Server { .service(Self::get_roles_webscope()) .service(Self::get_counts_webscope()) .service(Self::get_alerts_webscope()) + .service(Self::get_targets_webscope()) .service(Self::get_metrics_webscope()), ) .service( @@ -253,6 +255,25 @@ impl Server { ) } + pub fn get_targets_webscope() -> Scope { + web::scope("/targets") + .service( + web::resource("") + .route(web::get().to(targets::list).authorize(Action::GetAlert)) + .route(web::post().to(targets::post).authorize(Action::PutAlert)), + ) + .service( + web::resource("/{target_id}") + .route(web::get().to(targets::get).authorize(Action::GetAlert)) + .route(web::put().to(targets::update).authorize(Action::PutAlert)) + .route( + web::delete() + .to(targets::delete) + .authorize(Action::DeleteAlert), + ), + ) + } + // get the dashboards web scope pub fn get_dashboards_webscope() -> Scope { web::scope("/dashboards") diff --git a/src/handlers/http/targets.rs b/src/handlers/http/targets.rs new file mode 100644 index 000000000..80f5443b5 --- /dev/null +++ b/src/handlers/http/targets.rs @@ -0,0 +1,75 @@ +use actix_web::{ + web::{self, Json, Path}, + HttpRequest, Responder, +}; +use ulid::Ulid; + +use crate::alerts::{ + target::{Target, TARGETS}, + AlertError, +}; + +// POST /targets +pub async fn post( + _req: HttpRequest, + Json(target): Json, +) -> Result { + // should check for duplicacy and liveness (??) + target.validate().await; + + // add to the map + TARGETS.update(target.clone()).await?; + + Ok(web::Json(target)) +} + +// GET /targets +pub async fn list(_req: HttpRequest) -> Result { + // add to the map + let list = TARGETS.list().await?; + + Ok(web::Json(list)) +} + +// GET /targets/{target_id} +pub async fn get(_req: HttpRequest, target_id: Path) -> Result { + let target_id = target_id.into_inner(); + + let target = TARGETS.get_target_by_id(target_id).await?; + + Ok(web::Json(target)) +} + +// POST /targets/{target_id} +pub async fn update( + _req: HttpRequest, + target_id: Path, + Json(mut target): Json, +) -> Result { + let target_id = target_id.into_inner(); + + // if target_id does not exist, error + TARGETS.get_target_by_id(target_id).await?; + + // esnure that the supplied target id is assigned to the target config + target.id = target_id; + // should check for duplicacy and liveness (??) + target.validate().await; + + // add to the map + TARGETS.update(target.clone()).await?; + + Ok(web::Json(target)) +} + +// DELETE /targets/{target_id} +pub async fn delete( + _req: HttpRequest, + target_id: Path, +) -> Result { + let target_id = target_id.into_inner(); + + let target = TARGETS.delete(target_id).await?; + + Ok(web::Json(target)) +} From 484a0c710fe7ce976a5ae4794d8643ec254c8462 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 30 Jun 2025 07:07:48 +0530 Subject: [PATCH 2/6] Integrated Targets with Alerts Alerts creation will require the ULID of the target instead of the target body --- src/alerts/mod.rs | 31 ++++++++++++------------- src/alerts/target.rs | 22 ++++++++++++++---- src/handlers/http/modal/query_server.rs | 1 + src/handlers/http/targets.rs | 31 +++++++++++++++++++------ src/storage/mod.rs | 1 + src/storage/object_storage.rs | 27 +++++++++++++++++++++ 6 files changed, 85 insertions(+), 28 deletions(-) diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index d74f47371..480f94869 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -45,8 +45,6 @@ use crate::storage::ObjectStorageError; use crate::sync::alert_runtime; use crate::utils::user_auth_for_query; -use self::target::Target; - // these types describe the scheduled task for an alert pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>); @@ -519,10 +517,10 @@ pub struct AlertRequest { impl AlertRequest { pub async fn into(self) -> Result { - let mut targets = Vec::new(); - for id in self.targets { - targets.push(TARGETS.get_target_by_id(id).await?); - } + // let mut targets = Vec::new(); + // for id in self.targets { + // targets.push(TARGETS.get_target_by_id(id).await?); + // } let config = AlertConfig { version: AlertVerison::from(CURRENT_ALERTS_VERSION), id: Ulid::new(), @@ -532,7 +530,7 @@ impl AlertRequest { alert_type: self.alert_type, aggregates: self.aggregates, eval_config: self.eval_config, - targets, + targets: self.targets, state: AlertState::default(), }; Ok(config) @@ -551,7 +549,7 @@ pub struct AlertConfig { pub alert_type: AlertType, pub aggregates: Aggregates, pub eval_config: EvalConfig, - pub targets: Vec, + pub targets: Vec, // for new alerts, state should be resolved #[serde(default)] pub state: AlertState, @@ -559,17 +557,16 @@ pub struct AlertConfig { impl AlertConfig { pub async fn modify(&mut self, alert: AlertRequest) -> Result<(), AlertError> { - let mut targets = Vec::new(); - for id in alert.targets { - targets.push(TARGETS.get_target_by_id(id).await?); - } - + // let mut targets = Vec::new(); + // for id in alert.targets { + // targets.push(id); + // } self.title = alert.title; self.stream = alert.stream; self.alert_type = alert.alert_type; self.aggregates = alert.aggregates; self.eval_config = alert.eval_config; - self.targets = targets; + self.targets = alert.targets; self.state = AlertState::default(); Ok(()) } @@ -593,7 +590,8 @@ impl AlertConfig { }; // validate that target repeat notifs !> eval_frequency - for target in &self.targets { + for target_id in &self.targets { + let target = TARGETS.get_target_by_id(target_id).await?; match &target.timeout.times { target::Retry::Infinite => {} target::Retry::Finite(repeat) => { @@ -779,7 +777,8 @@ impl AlertConfig { pub async fn trigger_notifications(&self, message: String) -> Result<(), AlertError> { let mut context = self.get_context(); context.message = message; - for target in &self.targets { + for target_id in &self.targets { + let target = TARGETS.get_target_by_id(target_id).await?; trace!("Target (trigger_notifications)-\n{target:?}"); target.call(context.clone()); } diff --git a/src/alerts/target.rs b/src/alerts/target.rs index e46269ede..4818f93e3 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -35,7 +35,7 @@ use tracing::{error, trace, warn}; use ulid::Ulid; use url::Url; -use crate::alerts::AlertError; +use crate::{alerts::AlertError, parseable::PARSEABLE}; use super::ALERTS; @@ -51,6 +51,18 @@ pub struct TargetConfigs { } impl TargetConfigs { + /// Loads alerts from disk, blocks + pub async fn load(&self) -> anyhow::Result<()> { + let mut map = self.target_configs.write().await; + let store = PARSEABLE.storage.get_object_store(); + + for alert in store.get_targets().await.unwrap_or_default() { + map.insert(alert.id, alert); + } + + Ok(()) + } + pub async fn update(&self, target: Target) -> Result<(), AlertError> { let id = target.id; self.target_configs.write().await.insert(id, target); @@ -68,20 +80,20 @@ impl TargetConfigs { Ok(targets) } - pub async fn get_target_by_id(&self, target_id: Ulid) -> Result { + pub async fn get_target_by_id(&self, target_id: &Ulid) -> Result { self.target_configs .read() .await - .get(&target_id) + .get(target_id) .ok_or(AlertError::InvalidTargetID(target_id.to_string())) .cloned() } - pub async fn delete(&self, target_id: Ulid) -> Result { + pub async fn delete(&self, target_id: &Ulid) -> Result { self.target_configs .write() .await - .remove(&target_id) + .remove(target_id) .ok_or(AlertError::InvalidTargetID(target_id.to_string())) } } diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index b43fa68a9..9cd224526 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -69,6 +69,7 @@ impl ParseableServer for QueryServer { .service(Server::get_counts_webscope()) .service(Server::get_metrics_webscope()) .service(Server::get_alerts_webscope()) + .service(Server::get_targets_webscope()) .service(Self::get_cluster_web_scope()), ) .service( diff --git a/src/handlers/http/targets.rs b/src/handlers/http/targets.rs index 80f5443b5..e6519b5a4 100644 --- a/src/handlers/http/targets.rs +++ b/src/handlers/http/targets.rs @@ -2,11 +2,16 @@ use actix_web::{ web::{self, Json, Path}, HttpRequest, Responder, }; +use bytes::Bytes; use ulid::Ulid; -use crate::alerts::{ - target::{Target, TARGETS}, - AlertError, +use crate::{ + alerts::{ + target::{Target, TARGETS}, + AlertError, + }, + parseable::PARSEABLE, + storage::object_storage::target_json_path, }; // POST /targets @@ -17,6 +22,12 @@ pub async fn post( // should check for duplicacy and liveness (??) target.validate().await; + let path = target_json_path(target.id); + + let store = PARSEABLE.storage.get_object_store(); + let target_bytes = serde_json::to_vec(&target)?; + store.put_object(&path, Bytes::from(target_bytes)).await?; + // add to the map TARGETS.update(target.clone()).await?; @@ -35,12 +46,12 @@ pub async fn list(_req: HttpRequest) -> Result { pub async fn get(_req: HttpRequest, target_id: Path) -> Result { let target_id = target_id.into_inner(); - let target = TARGETS.get_target_by_id(target_id).await?; + let target = TARGETS.get_target_by_id(&target_id).await?; Ok(web::Json(target)) } -// POST /targets/{target_id} +// PUT /targets/{target_id} pub async fn update( _req: HttpRequest, target_id: Path, @@ -49,13 +60,19 @@ pub async fn update( let target_id = target_id.into_inner(); // if target_id does not exist, error - TARGETS.get_target_by_id(target_id).await?; + TARGETS.get_target_by_id(&target_id).await?; // esnure that the supplied target id is assigned to the target config target.id = target_id; // should check for duplicacy and liveness (??) target.validate().await; + let path = target_json_path(target.id); + + let store = PARSEABLE.storage.get_object_store(); + let target_bytes = serde_json::to_vec(&target)?; + store.put_object(&path, Bytes::from(target_bytes)).await?; + // add to the map TARGETS.update(target.clone()).await?; @@ -69,7 +86,7 @@ pub async fn delete( ) -> Result { let target_id = target_id.into_inner(); - let target = TARGETS.delete(target_id).await?; + let target = TARGETS.delete(&target_id).await?; Ok(web::Json(target)) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 63e3803bd..1b0718844 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -59,6 +59,7 @@ pub const STREAM_ROOT_DIRECTORY: &str = ".stream"; pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable"; pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts"; +pub const TARGETS_ROOT_DIRECTORY: &str = ".targets"; pub const MANIFEST_FILE: &str = "manifest.json"; // max concurrent request allowed for datafusion object store diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 00e262631..85c3502a1 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -43,6 +43,7 @@ use tracing::info; use tracing::{error, warn}; use ulid::Ulid; +use crate::alerts::target::Target; use crate::alerts::AlertConfig; use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; use crate::correlation::{CorrelationConfig, CorrelationError}; @@ -58,6 +59,7 @@ use crate::option::Mode; use crate::parseable::LogStream; use crate::parseable::PARSEABLE; use crate::stats::FullStats; +use crate::storage::TARGETS_ROOT_DIRECTORY; use super::{ retention::Retention, ObjectStorageError, ObjectStoreFormat, StorageMetadata, @@ -422,6 +424,25 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(alerts) } + async fn get_targets(&self) -> Result, ObjectStorageError> { + let targets_path = RelativePathBuf::from(TARGETS_ROOT_DIRECTORY); + let targets = self + .get_objects( + Some(&targets_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await? + .iter() + .filter_map(|bytes| { + serde_json::from_slice(bytes) + .inspect_err(|err| warn!("Expected compatible json, error = {err}")) + .ok() + }) + .collect(); + + Ok(targets) + } + async fn upsert_stream_metadata( &self, stream_name: &str, @@ -974,6 +995,12 @@ pub fn alert_json_path(alert_id: Ulid) -> RelativePathBuf { RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY, &format!("{alert_id}.json")]) } +/// TODO: Needs to be updated for distributed mode +#[inline(always)] +pub fn target_json_path(target_id: Ulid) -> RelativePathBuf { + RelativePathBuf::from_iter([TARGETS_ROOT_DIRECTORY, &format!("{target_id}.json")]) +} + #[inline(always)] pub fn manifest_path(prefix: &str) -> RelativePathBuf { match &PARSEABLE.options.mode { From cd39b98d150dd2314f834f7fa21add886f1b8f07 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 30 Jun 2025 07:57:27 +0530 Subject: [PATCH 3/6] Updates: Coderabbit suggestions and validations --- src/alerts/mod.rs | 22 +++++++++++------ src/alerts/target.rs | 46 +++++++++++++++++++++++++++++------ src/handlers/http/targets.rs | 28 +++++---------------- src/storage/mod.rs | 1 + src/storage/object_storage.rs | 12 ++++++--- 5 files changed, 68 insertions(+), 41 deletions(-) diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 4817da3a9..1d794e511 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -535,10 +535,10 @@ pub struct AlertRequest { impl AlertRequest { pub async fn into(self) -> Result { - // let mut targets = Vec::new(); - // for id in self.targets { - // targets.push(TARGETS.get_target_by_id(id).await?); - // } + // Validate that all target IDs exist + for id in &self.targets { + TARGETS.get_target_by_id(id).await?; + } let config = AlertConfig { version: AlertVerison::from(CURRENT_ALERTS_VERSION), id: Ulid::new(), @@ -575,10 +575,10 @@ pub struct AlertConfig { impl AlertConfig { pub async fn modify(&mut self, alert: AlertRequest) -> Result<(), AlertError> { - // let mut targets = Vec::new(); - // for id in alert.targets { - // targets.push(id); - // } + // Validate that all target IDs exist + for id in &alert.targets { + TARGETS.get_target_by_id(id).await?; + } self.title = alert.title; self.stream = alert.stream; self.alert_type = alert.alert_type; @@ -853,6 +853,10 @@ pub enum AlertError { FromStrError(#[from] FromStrError), #[error("Invalid Target ID- {0}")] InvalidTargetID(String), + #[error("Target already exists")] + DuplicateTargetConfig, + #[error("Can't delete a Target which is being used")] + TargetInUse, } impl actix_web::ResponseError for AlertError { @@ -871,6 +875,8 @@ impl actix_web::ResponseError for AlertError { Self::InvalidAlertModifyRequest => StatusCode::BAD_REQUEST, Self::FromStrError(_) => StatusCode::BAD_REQUEST, Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST, + Self::DuplicateTargetConfig => StatusCode::BAD_REQUEST, + Self::TargetInUse => StatusCode::CONFLICT, } } diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 2e72257a8..62301b315 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -24,6 +24,7 @@ use std::{ use async_trait::async_trait; use base64::Engine; +use bytes::Bytes; use chrono::Utc; use http::{header::AUTHORIZATION, HeaderMap, HeaderValue}; use humantime_serde::re::humantime; @@ -35,7 +36,7 @@ use tracing::{error, trace, warn}; use ulid::Ulid; use url::Url; -use crate::{alerts::AlertError, parseable::PARSEABLE}; +use crate::{alerts::AlertError, parseable::PARSEABLE, storage::object_storage::target_json_path}; use super::ALERTS; @@ -65,7 +66,12 @@ impl TargetConfigs { pub async fn update(&self, target: Target) -> Result<(), AlertError> { let id = target.id; - self.target_configs.write().await.insert(id, target); + self.target_configs.write().await.insert(id, target.clone()); + let path = target_json_path(&id); + + let store = PARSEABLE.storage.get_object_store(); + let target_bytes = serde_json::to_vec(&target)?; + store.put_object(&path, Bytes::from(target_bytes)).await?; Ok(()) } @@ -81,24 +87,38 @@ impl TargetConfigs { } pub async fn get_target_by_id(&self, target_id: &Ulid) -> Result { - self.target_configs + let target = self + .target_configs .read() .await .get(target_id) .ok_or(AlertError::InvalidTargetID(target_id.to_string())) - .cloned() + .cloned()?; + + Ok(target) } pub async fn delete(&self, target_id: &Ulid) -> Result { - self.target_configs + // ensure that the target is not being used by any alert + for (_, alert) in ALERTS.alerts.read().await.iter() { + if alert.targets.contains(target_id) { + return Err(AlertError::TargetInUse); + } + } + let target = self + .target_configs .write() .await .remove(target_id) - .ok_or(AlertError::InvalidTargetID(target_id.to_string())) + .ok_or(AlertError::InvalidTargetID(target_id.to_string()))?; + let path = target_json_path(&target.id); + let store = PARSEABLE.storage.get_object_store(); + store.delete_object(&path).await?; + Ok(target) } } -#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq)] #[serde(rename_all = "camelCase")] #[serde(untagged)] pub enum Retry { @@ -125,9 +145,19 @@ pub struct Target { } impl Target { - pub async fn validate(&self) { + pub async fn validate(&self) -> Result<(), AlertError> { // just check for liveness // what if the target is not live yet but is added by the user? + let targets = TARGETS.list().await?; + for target in targets { + if target.target == self.target + && target.timeout.interval == self.timeout.interval + && target.timeout.times == self.timeout.times + { + return Err(AlertError::DuplicateTargetConfig); + } + } + Ok(()) } pub fn call(&self, context: Context) { diff --git a/src/handlers/http/targets.rs b/src/handlers/http/targets.rs index e6519b5a4..1284cd337 100644 --- a/src/handlers/http/targets.rs +++ b/src/handlers/http/targets.rs @@ -2,16 +2,11 @@ use actix_web::{ web::{self, Json, Path}, HttpRequest, Responder, }; -use bytes::Bytes; use ulid::Ulid; -use crate::{ - alerts::{ - target::{Target, TARGETS}, - AlertError, - }, - parseable::PARSEABLE, - storage::object_storage::target_json_path, +use crate::alerts::{ + target::{Target, TARGETS}, + AlertError, }; // POST /targets @@ -20,13 +15,7 @@ pub async fn post( Json(target): Json, ) -> Result { // should check for duplicacy and liveness (??) - target.validate().await; - - let path = target_json_path(target.id); - - let store = PARSEABLE.storage.get_object_store(); - let target_bytes = serde_json::to_vec(&target)?; - store.put_object(&path, Bytes::from(target_bytes)).await?; + target.validate().await?; // add to the map TARGETS.update(target.clone()).await?; @@ -64,14 +53,9 @@ pub async fn update( // esnure that the supplied target id is assigned to the target config target.id = target_id; - // should check for duplicacy and liveness (??) - target.validate().await; - let path = target_json_path(target.id); - - let store = PARSEABLE.storage.get_object_store(); - let target_bytes = serde_json::to_vec(&target)?; - store.put_object(&path, Bytes::from(target_bytes)).await?; + // should check for duplicacy and liveness (??) + target.validate().await?; // add to the map TARGETS.update(target.clone()).await?; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 1b0718844..8cab1690f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -59,6 +59,7 @@ pub const STREAM_ROOT_DIRECTORY: &str = ".stream"; pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable"; pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts"; +pub const SETTINGS_ROOT_DIRECTORY: &str = ".settings"; pub const TARGETS_ROOT_DIRECTORY: &str = ".targets"; pub const MANIFEST_FILE: &str = "manifest.json"; diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index d226aa00c..f306d86ba 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -79,6 +79,7 @@ use crate::parseable::PARSEABLE; use crate::query::QUERY_SESSION_STATE; use crate::stats::FullStats; use crate::storage::StreamType; +use crate::storage::SETTINGS_ROOT_DIRECTORY; use crate::storage::TARGETS_ROOT_DIRECTORY; use crate::utils::DATASET_STATS_STREAM_NAME; @@ -446,7 +447,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } async fn get_targets(&self) -> Result, ObjectStorageError> { - let targets_path = RelativePathBuf::from(TARGETS_ROOT_DIRECTORY); + let targets_path = + RelativePathBuf::from_iter([SETTINGS_ROOT_DIRECTORY, TARGETS_ROOT_DIRECTORY]); let targets = self .get_objects( Some(&targets_path), @@ -1360,8 +1362,12 @@ pub fn alert_json_path(alert_id: Ulid) -> RelativePathBuf { /// TODO: Needs to be updated for distributed mode #[inline(always)] -pub fn target_json_path(target_id: Ulid) -> RelativePathBuf { - RelativePathBuf::from_iter([TARGETS_ROOT_DIRECTORY, &format!("{target_id}.json")]) +pub fn target_json_path(target_id: &Ulid) -> RelativePathBuf { + RelativePathBuf::from_iter([ + SETTINGS_ROOT_DIRECTORY, + TARGETS_ROOT_DIRECTORY, + &format!("{target_id}.json"), + ]) } #[inline(always)] From 68de9f0e37e23bdc3b2faa58d59dcf7ba5b80a56 Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 1 Jul 2025 15:50:09 +0530 Subject: [PATCH 4/6] update: add name to target --- src/alerts/target.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 62301b315..bad5a98b5 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -136,6 +136,7 @@ impl Default for Retry { #[serde(rename_all = "camelCase")] #[serde(try_from = "TargetVerifier")] pub struct Target { + pub name: String, #[serde(flatten)] pub target: TargetType, #[serde(default, rename = "repeat")] @@ -289,6 +290,7 @@ pub struct RepeatVerifier { #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct TargetVerifier { + pub name: String, #[serde(flatten)] pub target: TargetType, #[serde(default)] @@ -325,6 +327,7 @@ impl TryFrom for Target { } Ok(Target { + name: value.name, target: value.target, timeout, id: value.id, From b9ed392ff87ead4e846918c4cb67b9ada2d2ac34 Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 1 Jul 2025 19:32:36 +0530 Subject: [PATCH 5/6] bugfix: coderabbit suggestion --- src/alerts/target.rs | 30 ++++++++++++------------------ src/handlers/http/targets.rs | 4 ---- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/src/alerts/target.rs b/src/alerts/target.rs index bad5a98b5..6b072369c 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -65,9 +65,18 @@ impl TargetConfigs { } pub async fn update(&self, target: Target) -> Result<(), AlertError> { - let id = target.id; - self.target_configs.write().await.insert(id, target.clone()); - let path = target_json_path(&id); + let mut map = self.target_configs.write().await; + if map.values().any(|t| { + t.target == target.target + && t.timeout.interval == target.timeout.interval + && t.timeout.times == target.timeout.times + && t.id != target.id + }) { + return Err(AlertError::DuplicateTargetConfig); + } + map.insert(target.id, target.clone()); + + let path = target_json_path(&target.id); let store = PARSEABLE.storage.get_object_store(); let target_bytes = serde_json::to_vec(&target)?; @@ -146,21 +155,6 @@ pub struct Target { } impl Target { - pub async fn validate(&self) -> Result<(), AlertError> { - // just check for liveness - // what if the target is not live yet but is added by the user? - let targets = TARGETS.list().await?; - for target in targets { - if target.target == self.target - && target.timeout.interval == self.timeout.interval - && target.timeout.times == self.timeout.times - { - return Err(AlertError::DuplicateTargetConfig); - } - } - Ok(()) - } - pub fn call(&self, context: Context) { trace!("target.call context- {context:?}"); let timeout = &self.timeout; diff --git a/src/handlers/http/targets.rs b/src/handlers/http/targets.rs index 1284cd337..391eb53f5 100644 --- a/src/handlers/http/targets.rs +++ b/src/handlers/http/targets.rs @@ -15,8 +15,6 @@ pub async fn post( Json(target): Json, ) -> Result { // should check for duplicacy and liveness (??) - target.validate().await?; - // add to the map TARGETS.update(target.clone()).await?; @@ -55,8 +53,6 @@ pub async fn update( target.id = target_id; // should check for duplicacy and liveness (??) - target.validate().await?; - // add to the map TARGETS.update(target.clone()).await?; From b764f26ce4414069a3f9f5cbf17ded3b4083aab9 Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 2 Jul 2025 11:10:10 +0530 Subject: [PATCH 6/6] bugfix: ignore settings directory for list stream --- src/storage/localfs.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index cc6a82a53..57a7e5a4c 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -41,6 +41,7 @@ use crate::{ metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, option::validation, parseable::LogStream, + storage::SETTINGS_ROOT_DIRECTORY, }; use super::{ @@ -336,6 +337,7 @@ impl ObjectStorage for LocalFS { PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR, ALERTS_ROOT_DIRECTORY, + SETTINGS_ROOT_DIRECTORY, ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?; @@ -356,6 +358,7 @@ impl ObjectStorage for LocalFS { "lost+found", PARSEABLE_ROOT_DIRECTORY, ALERTS_ROOT_DIRECTORY, + SETTINGS_ROOT_DIRECTORY, ]; let directories = ReadDirStream::new(fs::read_dir(&self.root).await?); let entries: Vec = directories.try_collect().await?;