Skip to content

Bring targets out of alerts #1353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 38 additions & 18 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ use ulid::Ulid;
pub mod alerts_utils;
pub mod target;

use crate::alerts::target::TARGETS;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::rbac::map::SessionKey;
use crate::storage;
use crate::storage::ObjectStorageError;
use crate::sync::alert_runtime;
use crate::utils::user_auth_for_query;

use self::target::Target;

// these types describe the scheduled task for an alert
pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>);

Expand Down Expand Up @@ -531,23 +530,28 @@ pub struct AlertRequest {
pub alert_type: AlertType,
pub aggregates: Aggregates,
pub eval_config: EvalConfig,
pub targets: Vec<Target>,
pub targets: Vec<Ulid>,
}

impl From<AlertRequest> for AlertConfig {
fn from(val: AlertRequest) -> AlertConfig {
AlertConfig {
impl AlertRequest {
pub async fn into(self) -> Result<AlertConfig, AlertError> {
// Validate that all target IDs exist
for id in &self.targets {
TARGETS.get_target_by_id(id).await?;
}
let config = AlertConfig {
version: AlertVerison::from(CURRENT_ALERTS_VERSION),
id: Ulid::new(),
severity: val.severity,
title: val.title,
stream: val.stream,
alert_type: val.alert_type,
aggregates: val.aggregates,
eval_config: val.eval_config,
targets: val.targets,
severity: self.severity,
title: self.title,
stream: self.stream,
alert_type: self.alert_type,
aggregates: self.aggregates,
eval_config: self.eval_config,
targets: self.targets,
state: AlertState::default(),
}
};
Ok(config)
}
}

Expand All @@ -563,21 +567,26 @@ pub struct AlertConfig {
pub alert_type: AlertType,
pub aggregates: Aggregates,
pub eval_config: EvalConfig,
pub targets: Vec<Target>,
pub targets: Vec<Ulid>,
// for new alerts, state should be resolved
#[serde(default)]
pub state: AlertState,
}

impl AlertConfig {
pub fn modify(&mut self, alert: AlertRequest) {
pub async fn modify(&mut self, alert: AlertRequest) -> Result<(), AlertError> {
// Validate that all target IDs exist
for id in &alert.targets {
TARGETS.get_target_by_id(id).await?;
}
self.title = alert.title;
self.stream = alert.stream;
self.alert_type = alert.alert_type;
self.aggregates = alert.aggregates;
self.eval_config = alert.eval_config;
self.targets = alert.targets;
self.state = AlertState::default();
Ok(())
}

pub fn get_base_query(&self) -> String {
Expand All @@ -599,7 +608,8 @@ impl AlertConfig {
};

// validate that target repeat notifs !> eval_frequency
for target in &self.targets {
for target_id in &self.targets {
let target = TARGETS.get_target_by_id(target_id).await?;
match &target.timeout.times {
target::Retry::Infinite => {}
target::Retry::Finite(repeat) => {
Expand Down Expand Up @@ -806,7 +816,8 @@ impl AlertConfig {
pub async fn trigger_notifications(&self, message: String) -> Result<(), AlertError> {
let mut context = self.get_context();
context.message = message;
for target in &self.targets {
for target_id in &self.targets {
let target = TARGETS.get_target_by_id(target_id).await?;
trace!("Target (trigger_notifications)-\n{target:?}");
target.call(context.clone());
}
Expand Down Expand Up @@ -840,6 +851,12 @@ pub enum AlertError {
InvalidAlertModifyRequest,
#[error("{0}")]
FromStrError(#[from] FromStrError),
#[error("Invalid Target ID- {0}")]
InvalidTargetID(String),
#[error("Target already exists")]
DuplicateTargetConfig,
#[error("Can't delete a Target which is being used")]
TargetInUse,
}

impl actix_web::ResponseError for AlertError {
Expand All @@ -857,6 +874,9 @@ impl actix_web::ResponseError for AlertError {
Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::InvalidAlertModifyRequest => StatusCode::BAD_REQUEST,
Self::FromStrError(_) => StatusCode::BAD_REQUEST,
Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST,
Self::DuplicateTargetConfig => StatusCode::BAD_REQUEST,
Self::TargetInUse => StatusCode::CONFLICT,
}
}

Expand Down
102 changes: 101 additions & 1 deletion src/alerts/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,110 @@ use std::{

use async_trait::async_trait;
use base64::Engine;
use bytes::Bytes;
use chrono::Utc;
use http::{header::AUTHORIZATION, HeaderMap, HeaderValue};
use humantime_serde::re::humantime;
use itertools::Itertools;
use once_cell::sync::Lazy;
use reqwest::ClientBuilder;
use tokio::sync::RwLock;
use tracing::{error, trace, warn};
use ulid::Ulid;
use url::Url;

use crate::{alerts::AlertError, parseable::PARSEABLE, storage::object_storage::target_json_path};

use super::ALERTS;

use super::{AlertState, CallableTarget, Context};

#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
pub static TARGETS: Lazy<TargetConfigs> = Lazy::new(|| TargetConfigs {
target_configs: RwLock::new(HashMap::new()),
});

#[derive(Debug)]
pub struct TargetConfigs {
pub target_configs: RwLock<HashMap<Ulid, Target>>,
}

impl TargetConfigs {
/// Loads alerts from disk, blocks
pub async fn load(&self) -> anyhow::Result<()> {
let mut map = self.target_configs.write().await;
let store = PARSEABLE.storage.get_object_store();

for alert in store.get_targets().await.unwrap_or_default() {
map.insert(alert.id, alert);
}

Ok(())
}

pub async fn update(&self, target: Target) -> Result<(), AlertError> {
let mut map = self.target_configs.write().await;
if map.values().any(|t| {
t.target == target.target
&& t.timeout.interval == target.timeout.interval
&& t.timeout.times == target.timeout.times
&& t.id != target.id
}) {
return Err(AlertError::DuplicateTargetConfig);
}
map.insert(target.id, target.clone());

let path = target_json_path(&target.id);

let store = PARSEABLE.storage.get_object_store();
let target_bytes = serde_json::to_vec(&target)?;
store.put_object(&path, Bytes::from(target_bytes)).await?;
Ok(())
}

pub async fn list(&self) -> Result<Vec<Target>, AlertError> {
let targets = self
.target_configs
.read()
.await
.values()
.cloned()
.collect_vec();
Ok(targets)
}

pub async fn get_target_by_id(&self, target_id: &Ulid) -> Result<Target, AlertError> {
let target = self
.target_configs
.read()
.await
.get(target_id)
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))
.cloned()?;

Ok(target)
}

pub async fn delete(&self, target_id: &Ulid) -> Result<Target, AlertError> {
// ensure that the target is not being used by any alert
for (_, alert) in ALERTS.alerts.read().await.iter() {
if alert.targets.contains(target_id) {
return Err(AlertError::TargetInUse);
}
}
let target = self
.target_configs
.write()
.await
.remove(target_id)
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))?;
let path = target_json_path(&target.id);
let store = PARSEABLE.storage.get_object_store();
store.delete_object(&path).await?;
Ok(target)
}
}

#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[serde(untagged)]
pub enum Retry {
Expand All @@ -53,10 +145,13 @@ impl Default for Retry {
#[serde(rename_all = "camelCase")]
#[serde(try_from = "TargetVerifier")]
pub struct Target {
pub name: String,
#[serde(flatten)]
pub target: TargetType,
#[serde(default, rename = "repeat")]
pub timeout: Timeout,
#[serde(default = "Ulid::new")]
pub id: Ulid,
}

impl Target {
Expand Down Expand Up @@ -189,10 +284,13 @@ pub struct RepeatVerifier {
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TargetVerifier {
pub name: String,
#[serde(flatten)]
pub target: TargetType,
#[serde(default)]
pub repeat: Option<RepeatVerifier>,
#[serde(default = "Ulid::new")]
pub id: Ulid,
}

impl TryFrom<TargetVerifier> for Target {
Expand Down Expand Up @@ -223,8 +321,10 @@ impl TryFrom<TargetVerifier> for Target {
}

Ok(Target {
name: value.name,
target: value.target,
timeout,
id: value.id,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub async fn post(
req: HttpRequest,
Json(alert): Json<AlertRequest>,
) -> Result<impl Responder, AlertError> {
let alert: AlertConfig = alert.into();
let alert: AlertConfig = alert.into().await?;
alert.validate().await?;

// validate the incoming alert query
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub mod query;
pub mod rbac;
pub mod resource_check;
pub mod role;
pub mod targets;
pub mod users;
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
pub const API_BASE_PATH: &str = "api";
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl ParseableServer for QueryServer {
)))
.service(Server::get_metrics_webscope())
.service(Server::get_alerts_webscope())
.service(Server::get_targets_webscope())
.service(Self::get_cluster_web_scope()),
)
.service(
Expand Down
21 changes: 21 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::handlers::http::health_check;
use crate::handlers::http::prism_base_path;
use crate::handlers::http::query;
use crate::handlers::http::resource_check;
use crate::handlers::http::targets;
use crate::handlers::http::users::dashboards;
use crate::handlers::http::users::filters;
use crate::hottier::HotTierManager;
Expand Down Expand Up @@ -95,6 +96,7 @@ impl ParseableServer for Server {
resource_check::check_resource_utilization_middleware,
)))
.service(Self::get_alerts_webscope())
.service(Self::get_targets_webscope())
.service(Self::get_metrics_webscope()),
)
.service(
Expand Down Expand Up @@ -263,6 +265,25 @@ impl Server {
)
}

pub fn get_targets_webscope() -> Scope {
web::scope("/targets")
.service(
web::resource("")
.route(web::get().to(targets::list).authorize(Action::GetAlert))
.route(web::post().to(targets::post).authorize(Action::PutAlert)),
)
.service(
web::resource("/{target_id}")
.route(web::get().to(targets::get).authorize(Action::GetAlert))
.route(web::put().to(targets::update).authorize(Action::PutAlert))
.route(
web::delete()
.to(targets::delete)
.authorize(Action::DeleteAlert),
),
)
}

// get the dashboards web scope
pub fn get_dashboards_webscope() -> Scope {
web::scope("/dashboards")
Expand Down
Loading
Loading