Skip to content

Commit 484a0c7

Browse files
committed
Integrated Targets with Alerts
Alerts creation will require the ULID of the target instead of the target body
1 parent 2b53b88 commit 484a0c7

File tree

6 files changed

+85
-28
lines changed

6 files changed

+85
-28
lines changed

src/alerts/mod.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ use crate::storage::ObjectStorageError;
4545
use crate::sync::alert_runtime;
4646
use crate::utils::user_auth_for_query;
4747

48-
use self::target::Target;
49-
5048
// these types describe the scheduled task for an alert
5149
pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>);
5250

@@ -519,10 +517,10 @@ pub struct AlertRequest {
519517

520518
impl AlertRequest {
521519
pub async fn into(self) -> Result<AlertConfig, AlertError> {
522-
let mut targets = Vec::new();
523-
for id in self.targets {
524-
targets.push(TARGETS.get_target_by_id(id).await?);
525-
}
520+
// let mut targets = Vec::new();
521+
// for id in self.targets {
522+
// targets.push(TARGETS.get_target_by_id(id).await?);
523+
// }
526524
let config = AlertConfig {
527525
version: AlertVerison::from(CURRENT_ALERTS_VERSION),
528526
id: Ulid::new(),
@@ -532,7 +530,7 @@ impl AlertRequest {
532530
alert_type: self.alert_type,
533531
aggregates: self.aggregates,
534532
eval_config: self.eval_config,
535-
targets,
533+
targets: self.targets,
536534
state: AlertState::default(),
537535
};
538536
Ok(config)
@@ -551,25 +549,24 @@ pub struct AlertConfig {
551549
pub alert_type: AlertType,
552550
pub aggregates: Aggregates,
553551
pub eval_config: EvalConfig,
554-
pub targets: Vec<Target>,
552+
pub targets: Vec<Ulid>,
555553
// for new alerts, state should be resolved
556554
#[serde(default)]
557555
pub state: AlertState,
558556
}
559557

560558
impl AlertConfig {
561559
pub async fn modify(&mut self, alert: AlertRequest) -> Result<(), AlertError> {
562-
let mut targets = Vec::new();
563-
for id in alert.targets {
564-
targets.push(TARGETS.get_target_by_id(id).await?);
565-
}
566-
560+
// let mut targets = Vec::new();
561+
// for id in alert.targets {
562+
// targets.push(id);
563+
// }
567564
self.title = alert.title;
568565
self.stream = alert.stream;
569566
self.alert_type = alert.alert_type;
570567
self.aggregates = alert.aggregates;
571568
self.eval_config = alert.eval_config;
572-
self.targets = targets;
569+
self.targets = alert.targets;
573570
self.state = AlertState::default();
574571
Ok(())
575572
}
@@ -593,7 +590,8 @@ impl AlertConfig {
593590
};
594591

595592
// validate that target repeat notifs !> eval_frequency
596-
for target in &self.targets {
593+
for target_id in &self.targets {
594+
let target = TARGETS.get_target_by_id(target_id).await?;
597595
match &target.timeout.times {
598596
target::Retry::Infinite => {}
599597
target::Retry::Finite(repeat) => {
@@ -779,7 +777,8 @@ impl AlertConfig {
779777
pub async fn trigger_notifications(&self, message: String) -> Result<(), AlertError> {
780778
let mut context = self.get_context();
781779
context.message = message;
782-
for target in &self.targets {
780+
for target_id in &self.targets {
781+
let target = TARGETS.get_target_by_id(target_id).await?;
783782
trace!("Target (trigger_notifications)-\n{target:?}");
784783
target.call(context.clone());
785784
}

src/alerts/target.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use tracing::{error, trace, warn};
3535
use ulid::Ulid;
3636
use url::Url;
3737

38-
use crate::alerts::AlertError;
38+
use crate::{alerts::AlertError, parseable::PARSEABLE};
3939

4040
use super::ALERTS;
4141

@@ -51,6 +51,18 @@ pub struct TargetConfigs {
5151
}
5252

5353
impl TargetConfigs {
54+
/// Loads alerts from disk, blocks
55+
pub async fn load(&self) -> anyhow::Result<()> {
56+
let mut map = self.target_configs.write().await;
57+
let store = PARSEABLE.storage.get_object_store();
58+
59+
for alert in store.get_targets().await.unwrap_or_default() {
60+
map.insert(alert.id, alert);
61+
}
62+
63+
Ok(())
64+
}
65+
5466
pub async fn update(&self, target: Target) -> Result<(), AlertError> {
5567
let id = target.id;
5668
self.target_configs.write().await.insert(id, target);
@@ -68,20 +80,20 @@ impl TargetConfigs {
6880
Ok(targets)
6981
}
7082

71-
pub async fn get_target_by_id(&self, target_id: Ulid) -> Result<Target, AlertError> {
83+
pub async fn get_target_by_id(&self, target_id: &Ulid) -> Result<Target, AlertError> {
7284
self.target_configs
7385
.read()
7486
.await
75-
.get(&target_id)
87+
.get(target_id)
7688
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))
7789
.cloned()
7890
}
7991

80-
pub async fn delete(&self, target_id: Ulid) -> Result<Target, AlertError> {
92+
pub async fn delete(&self, target_id: &Ulid) -> Result<Target, AlertError> {
8193
self.target_configs
8294
.write()
8395
.await
84-
.remove(&target_id)
96+
.remove(target_id)
8597
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))
8698
}
8799
}

src/handlers/http/modal/query_server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ impl ParseableServer for QueryServer {
6969
.service(Server::get_counts_webscope())
7070
.service(Server::get_metrics_webscope())
7171
.service(Server::get_alerts_webscope())
72+
.service(Server::get_targets_webscope())
7273
.service(Self::get_cluster_web_scope()),
7374
)
7475
.service(

src/handlers/http/targets.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,16 @@ use actix_web::{
22
web::{self, Json, Path},
33
HttpRequest, Responder,
44
};
5+
use bytes::Bytes;
56
use ulid::Ulid;
67

7-
use crate::alerts::{
8-
target::{Target, TARGETS},
9-
AlertError,
8+
use crate::{
9+
alerts::{
10+
target::{Target, TARGETS},
11+
AlertError,
12+
},
13+
parseable::PARSEABLE,
14+
storage::object_storage::target_json_path,
1015
};
1116

1217
// POST /targets
@@ -17,6 +22,12 @@ pub async fn post(
1722
// should check for duplicacy and liveness (??)
1823
target.validate().await;
1924

25+
let path = target_json_path(target.id);
26+
27+
let store = PARSEABLE.storage.get_object_store();
28+
let target_bytes = serde_json::to_vec(&target)?;
29+
store.put_object(&path, Bytes::from(target_bytes)).await?;
30+
2031
// add to the map
2132
TARGETS.update(target.clone()).await?;
2233

@@ -35,12 +46,12 @@ pub async fn list(_req: HttpRequest) -> Result<impl Responder, AlertError> {
3546
pub async fn get(_req: HttpRequest, target_id: Path<Ulid>) -> Result<impl Responder, AlertError> {
3647
let target_id = target_id.into_inner();
3748

38-
let target = TARGETS.get_target_by_id(target_id).await?;
49+
let target = TARGETS.get_target_by_id(&target_id).await?;
3950

4051
Ok(web::Json(target))
4152
}
4253

43-
// POST /targets/{target_id}
54+
// PUT /targets/{target_id}
4455
pub async fn update(
4556
_req: HttpRequest,
4657
target_id: Path<Ulid>,
@@ -49,13 +60,19 @@ pub async fn update(
4960
let target_id = target_id.into_inner();
5061

5162
// if target_id does not exist, error
52-
TARGETS.get_target_by_id(target_id).await?;
63+
TARGETS.get_target_by_id(&target_id).await?;
5364

5465
// esnure that the supplied target id is assigned to the target config
5566
target.id = target_id;
5667
// should check for duplicacy and liveness (??)
5768
target.validate().await;
5869

70+
let path = target_json_path(target.id);
71+
72+
let store = PARSEABLE.storage.get_object_store();
73+
let target_bytes = serde_json::to_vec(&target)?;
74+
store.put_object(&path, Bytes::from(target_bytes)).await?;
75+
5976
// add to the map
6077
TARGETS.update(target.clone()).await?;
6178

@@ -69,7 +86,7 @@ pub async fn delete(
6986
) -> Result<impl Responder, AlertError> {
7087
let target_id = target_id.into_inner();
7188

72-
let target = TARGETS.delete(target_id).await?;
89+
let target = TARGETS.delete(&target_id).await?;
7390

7491
Ok(web::Json(target))
7592
}

src/storage/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub const STREAM_ROOT_DIRECTORY: &str = ".stream";
5959
pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable";
6060
pub const SCHEMA_FILE_NAME: &str = ".schema";
6161
pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts";
62+
pub const TARGETS_ROOT_DIRECTORY: &str = ".targets";
6263
pub const MANIFEST_FILE: &str = "manifest.json";
6364

6465
// max concurrent request allowed for datafusion object store

src/storage/object_storage.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use tracing::info;
4343
use tracing::{error, warn};
4444
use ulid::Ulid;
4545

46+
use crate::alerts::target::Target;
4647
use crate::alerts::AlertConfig;
4748
use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot};
4849
use crate::correlation::{CorrelationConfig, CorrelationError};
@@ -58,6 +59,7 @@ use crate::option::Mode;
5859
use crate::parseable::LogStream;
5960
use crate::parseable::PARSEABLE;
6061
use crate::stats::FullStats;
62+
use crate::storage::TARGETS_ROOT_DIRECTORY;
6163

6264
use super::{
6365
retention::Retention, ObjectStorageError, ObjectStoreFormat, StorageMetadata,
@@ -422,6 +424,25 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
422424
Ok(alerts)
423425
}
424426

427+
async fn get_targets(&self) -> Result<Vec<Target>, ObjectStorageError> {
428+
let targets_path = RelativePathBuf::from(TARGETS_ROOT_DIRECTORY);
429+
let targets = self
430+
.get_objects(
431+
Some(&targets_path),
432+
Box::new(|file_name| file_name.ends_with(".json")),
433+
)
434+
.await?
435+
.iter()
436+
.filter_map(|bytes| {
437+
serde_json::from_slice(bytes)
438+
.inspect_err(|err| warn!("Expected compatible json, error = {err}"))
439+
.ok()
440+
})
441+
.collect();
442+
443+
Ok(targets)
444+
}
445+
425446
async fn upsert_stream_metadata(
426447
&self,
427448
stream_name: &str,
@@ -974,6 +995,12 @@ pub fn alert_json_path(alert_id: Ulid) -> RelativePathBuf {
974995
RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY, &format!("{alert_id}.json")])
975996
}
976997

998+
/// TODO: Needs to be updated for distributed mode
999+
#[inline(always)]
1000+
pub fn target_json_path(target_id: Ulid) -> RelativePathBuf {
1001+
RelativePathBuf::from_iter([TARGETS_ROOT_DIRECTORY, &format!("{target_id}.json")])
1002+
}
1003+
9771004
#[inline(always)]
9781005
pub fn manifest_path(prefix: &str) -> RelativePathBuf {
9791006
match &PARSEABLE.options.mode {

0 commit comments

Comments
 (0)