Skip to content

Commit cd39b98

Browse files
committed
Updates: Coderabbit suggestions and validations
1 parent cd363c2 commit cd39b98

File tree

5 files changed

+68
-41
lines changed

5 files changed

+68
-41
lines changed

src/alerts/mod.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -535,10 +535,10 @@ pub struct AlertRequest {
535535

536536
impl AlertRequest {
537537
pub async fn into(self) -> Result<AlertConfig, AlertError> {
538-
// let mut targets = Vec::new();
539-
// for id in self.targets {
540-
// targets.push(TARGETS.get_target_by_id(id).await?);
541-
// }
538+
// Validate that all target IDs exist
539+
for id in &self.targets {
540+
TARGETS.get_target_by_id(id).await?;
541+
}
542542
let config = AlertConfig {
543543
version: AlertVerison::from(CURRENT_ALERTS_VERSION),
544544
id: Ulid::new(),
@@ -575,10 +575,10 @@ pub struct AlertConfig {
575575

576576
impl AlertConfig {
577577
pub async fn modify(&mut self, alert: AlertRequest) -> Result<(), AlertError> {
578-
// let mut targets = Vec::new();
579-
// for id in alert.targets {
580-
// targets.push(id);
581-
// }
578+
// Validate that all target IDs exist
579+
for id in &alert.targets {
580+
TARGETS.get_target_by_id(id).await?;
581+
}
582582
self.title = alert.title;
583583
self.stream = alert.stream;
584584
self.alert_type = alert.alert_type;
@@ -853,6 +853,10 @@ pub enum AlertError {
853853
FromStrError(#[from] FromStrError),
854854
#[error("Invalid Target ID- {0}")]
855855
InvalidTargetID(String),
856+
#[error("Target already exists")]
857+
DuplicateTargetConfig,
858+
#[error("Can't delete a Target which is being used")]
859+
TargetInUse,
856860
}
857861

858862
impl actix_web::ResponseError for AlertError {
@@ -871,6 +875,8 @@ impl actix_web::ResponseError for AlertError {
871875
Self::InvalidAlertModifyRequest => StatusCode::BAD_REQUEST,
872876
Self::FromStrError(_) => StatusCode::BAD_REQUEST,
873877
Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST,
878+
Self::DuplicateTargetConfig => StatusCode::BAD_REQUEST,
879+
Self::TargetInUse => StatusCode::CONFLICT,
874880
}
875881
}
876882

src/alerts/target.rs

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::{
2424

2525
use async_trait::async_trait;
2626
use base64::Engine;
27+
use bytes::Bytes;
2728
use chrono::Utc;
2829
use http::{header::AUTHORIZATION, HeaderMap, HeaderValue};
2930
use humantime_serde::re::humantime;
@@ -35,7 +36,7 @@ use tracing::{error, trace, warn};
3536
use ulid::Ulid;
3637
use url::Url;
3738

38-
use crate::{alerts::AlertError, parseable::PARSEABLE};
39+
use crate::{alerts::AlertError, parseable::PARSEABLE, storage::object_storage::target_json_path};
3940

4041
use super::ALERTS;
4142

@@ -65,7 +66,12 @@ impl TargetConfigs {
6566

6667
pub async fn update(&self, target: Target) -> Result<(), AlertError> {
6768
let id = target.id;
68-
self.target_configs.write().await.insert(id, target);
69+
self.target_configs.write().await.insert(id, target.clone());
70+
let path = target_json_path(&id);
71+
72+
let store = PARSEABLE.storage.get_object_store();
73+
let target_bytes = serde_json::to_vec(&target)?;
74+
store.put_object(&path, Bytes::from(target_bytes)).await?;
6975
Ok(())
7076
}
7177

@@ -81,24 +87,38 @@ impl TargetConfigs {
8187
}
8288

8389
pub async fn get_target_by_id(&self, target_id: &Ulid) -> Result<Target, AlertError> {
84-
self.target_configs
90+
let target = self
91+
.target_configs
8592
.read()
8693
.await
8794
.get(target_id)
8895
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))
89-
.cloned()
96+
.cloned()?;
97+
98+
Ok(target)
9099
}
91100

92101
pub async fn delete(&self, target_id: &Ulid) -> Result<Target, AlertError> {
93-
self.target_configs
102+
// ensure that the target is not being used by any alert
103+
for (_, alert) in ALERTS.alerts.read().await.iter() {
104+
if alert.targets.contains(target_id) {
105+
return Err(AlertError::TargetInUse);
106+
}
107+
}
108+
let target = self
109+
.target_configs
94110
.write()
95111
.await
96112
.remove(target_id)
97-
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))
113+
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))?;
114+
let path = target_json_path(&target.id);
115+
let store = PARSEABLE.storage.get_object_store();
116+
store.delete_object(&path).await?;
117+
Ok(target)
98118
}
99119
}
100120

101-
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
121+
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq)]
102122
#[serde(rename_all = "camelCase")]
103123
#[serde(untagged)]
104124
pub enum Retry {
@@ -125,9 +145,19 @@ pub struct Target {
125145
}
126146

127147
impl Target {
128-
pub async fn validate(&self) {
148+
pub async fn validate(&self) -> Result<(), AlertError> {
129149
// just check for liveness
130150
// what if the target is not live yet but is added by the user?
151+
let targets = TARGETS.list().await?;
152+
for target in targets {
153+
if target.target == self.target
154+
&& target.timeout.interval == self.timeout.interval
155+
&& target.timeout.times == self.timeout.times
156+
{
157+
return Err(AlertError::DuplicateTargetConfig);
158+
}
159+
}
160+
Ok(())
131161
}
132162

133163
pub fn call(&self, context: Context) {

src/handlers/http/targets.rs

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,11 @@ use actix_web::{
22
web::{self, Json, Path},
33
HttpRequest, Responder,
44
};
5-
use bytes::Bytes;
65
use ulid::Ulid;
76

8-
use crate::{
9-
alerts::{
10-
target::{Target, TARGETS},
11-
AlertError,
12-
},
13-
parseable::PARSEABLE,
14-
storage::object_storage::target_json_path,
7+
use crate::alerts::{
8+
target::{Target, TARGETS},
9+
AlertError,
1510
};
1611

1712
// POST /targets
@@ -20,13 +15,7 @@ pub async fn post(
2015
Json(target): Json<Target>,
2116
) -> Result<impl Responder, AlertError> {
2217
// should check for duplicacy and liveness (??)
23-
target.validate().await;
24-
25-
let path = target_json_path(target.id);
26-
27-
let store = PARSEABLE.storage.get_object_store();
28-
let target_bytes = serde_json::to_vec(&target)?;
29-
store.put_object(&path, Bytes::from(target_bytes)).await?;
18+
target.validate().await?;
3019

3120
// add to the map
3221
TARGETS.update(target.clone()).await?;
@@ -64,14 +53,9 @@ pub async fn update(
6453

6554
// esnure that the supplied target id is assigned to the target config
6655
target.id = target_id;
67-
// should check for duplicacy and liveness (??)
68-
target.validate().await;
6956

70-
let path = target_json_path(target.id);
71-
72-
let store = PARSEABLE.storage.get_object_store();
73-
let target_bytes = serde_json::to_vec(&target)?;
74-
store.put_object(&path, Bytes::from(target_bytes)).await?;
57+
// should check for duplicacy and liveness (??)
58+
target.validate().await?;
7559

7660
// add to the map
7761
TARGETS.update(target.clone()).await?;

src/storage/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub const STREAM_ROOT_DIRECTORY: &str = ".stream";
5959
pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable";
6060
pub const SCHEMA_FILE_NAME: &str = ".schema";
6161
pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts";
62+
pub const SETTINGS_ROOT_DIRECTORY: &str = ".settings";
6263
pub const TARGETS_ROOT_DIRECTORY: &str = ".targets";
6364
pub const MANIFEST_FILE: &str = "manifest.json";
6465

src/storage/object_storage.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ use crate::parseable::PARSEABLE;
7979
use crate::query::QUERY_SESSION_STATE;
8080
use crate::stats::FullStats;
8181
use crate::storage::StreamType;
82+
use crate::storage::SETTINGS_ROOT_DIRECTORY;
8283
use crate::storage::TARGETS_ROOT_DIRECTORY;
8384
use crate::utils::DATASET_STATS_STREAM_NAME;
8485

@@ -446,7 +447,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
446447
}
447448

448449
async fn get_targets(&self) -> Result<Vec<Target>, ObjectStorageError> {
449-
let targets_path = RelativePathBuf::from(TARGETS_ROOT_DIRECTORY);
450+
let targets_path =
451+
RelativePathBuf::from_iter([SETTINGS_ROOT_DIRECTORY, TARGETS_ROOT_DIRECTORY]);
450452
let targets = self
451453
.get_objects(
452454
Some(&targets_path),
@@ -1360,8 +1362,12 @@ pub fn alert_json_path(alert_id: Ulid) -> RelativePathBuf {
13601362

13611363
/// TODO: Needs to be updated for distributed mode
13621364
#[inline(always)]
1363-
pub fn target_json_path(target_id: Ulid) -> RelativePathBuf {
1364-
RelativePathBuf::from_iter([TARGETS_ROOT_DIRECTORY, &format!("{target_id}.json")])
1365+
pub fn target_json_path(target_id: &Ulid) -> RelativePathBuf {
1366+
RelativePathBuf::from_iter([
1367+
SETTINGS_ROOT_DIRECTORY,
1368+
TARGETS_ROOT_DIRECTORY,
1369+
&format!("{target_id}.json"),
1370+
])
13651371
}
13661372

13671373
#[inline(always)]

0 commit comments

Comments
 (0)