Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions backend/tests/relative_imports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def main():
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base", "dependency_map"))]
async fn relative_imports_test_rename_primary_flow(db: Pool<Postgres>) -> anyhow::Result<()> {
use windmill_common::{cache::flow::fetch_version, flows::NewFlow};
use windmill_common::{cache::flow::fetch_version, flows::NewFlow, worker::to_raw_value};

let (client, port, _s) = init(db.clone()).await;
let flow = fetch_version(&db, 1443253234253454).await.unwrap();
Expand All @@ -421,14 +421,14 @@ def main():
path: "f/rel/root_flow_renamed".into(),
summary: "".into(),
description: None,
value: serde_json::from_str(
value: to_raw_value(&serde_json::from_str::<serde_json::Value>(
&serde_json::to_string(flow.value())
.unwrap()
.replace("nstep1", "Foxes")
.replace("nstep2_2", "like")
.replace("nstep_4_1", "Emeralds"),
)
.unwrap(),
.unwrap()),
schema: None,
draft_only: None,
tag: None,
Expand All @@ -437,6 +437,7 @@ def main():
deployment_message: None,
visible_to_runner_only: None,
on_behalf_of_email: None,
ws_error_handler_muted: None
})
.send()
.await
Expand Down
105 changes: 50 additions & 55 deletions backend/windmill-api/src/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use sql_builder::prelude::*;
use sqlx::{FromRow, Postgres, Transaction};
use windmill_audit::audit_oss::audit_log;
use windmill_audit::ActionKind;
use windmill_common::flows::FlowValue;
use windmill_common::utils::{query_elems_from_hub, WarnAfterExt};
use windmill_common::worker::{to_raw_value, CLOUD_HOSTED, MIN_VERSION_SUPPORTS_DEBOUNCING};
use windmill_common::HUB_BASE_URL;
Expand Down Expand Up @@ -281,33 +280,48 @@ async fn toggle_workspace_error_handler(
let mut tx = user_db.begin(&authed).await?;

let error_handler_maybe: Option<String> = sqlx::query_scalar!(
"SELECT error_handler FROM workspace_settings WHERE workspace_id = $1",
r#"
SELECT
error_handler
FROM
workspace_settings
WHERE
workspace_id = $1
"#,
w_id
)
.fetch_optional(&mut *tx)
.await?
.unwrap_or(None);

return match error_handler_maybe {
let response = match error_handler_maybe {
Some(_) => {
sqlx::query_scalar!(
"UPDATE flow SET ws_error_handler_muted = $3 WHERE path = $1 AND workspace_id = $2",
r#"
UPDATE
flow
SET
ws_error_handler_muted = $3
WHERE
path = $1 AND
workspace_id = $2
"#,
path.to_path(),
w_id,
req.muted,
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok("".to_string())
}
None => {
tx.commit().await?;
Err(Error::ExecutionErr(
"Workspace error handler needs to be defined".to_string(),
))
}
None => Err(Error::BadRequest(
"Workspace error handler needs to be defined".to_string(),
)),
};

tx.commit().await?;

return response;
}

async fn check_path_conflict<'c>(
Expand Down Expand Up @@ -378,6 +392,20 @@ async fn list_paths_from_workspace_runnable(
Ok(Json(runnables))
}

async fn validate_flow(new_flow: &NewFlow) -> error::Result<()> {
#[cfg(not(feature = "enterprise"))]
if new_flow.ws_error_handler_muted.is_some_and(|val| val) {
return Err(Error::BadRequest(
"Muting the error handler for certain flow is only available in enterprise version"
.to_string(),
));
}

guard_flow_from_debounce_data(new_flow).await?;

return Ok(());
}

async fn create_flow(
authed: ApiAuthed,
Extension(db): Extension<DB>,
Expand All @@ -387,7 +415,7 @@ async fn create_flow(
Json(nf): Json<NewFlow>,
) -> Result<(StatusCode, String)> {
check_scopes(&authed, || format!("flows:write:{}", nf.path))?;
guard_flow_from_debounce_data(&nf).await?;
validate_flow(&nf).await?;
if *CLOUD_HOSTED {
let nb_flows =
sqlx::query_scalar!("SELECT COUNT(*) FROM flow WHERE workspace_id = $1", &w_id)
Expand All @@ -414,18 +442,6 @@ async fn create_flow(
));
}
}
#[cfg(not(feature = "enterprise"))]
if nf
.value
.get("ws_error_handler_muted")
.map(|val| val.as_bool().unwrap_or(false))
.is_some_and(|val| val)
{
return Err(Error::BadRequest(
"Muting the error handler for certain flow is only available in enterprise version"
.to_string(),
));
}

// cron::Schedule::from_str(&ns.schedule).map_err(|e| error::Error::BadRequest(e.to_string()))?;
let authed = maybe_refresh_folders(&nf.path, &w_id, authed, &db).await;
Expand All @@ -451,17 +467,13 @@ async fn create_flow(
w_id,
nf.path,
nf.summary,
nf.description.unwrap_or_else(String::new),
nf.description.as_deref().unwrap_or(""),
nf.draft_only,
nf.tag,
nf.dedicated_worker,
nf.visible_to_runner_only.unwrap_or(false),
if nf.on_behalf_of_email.is_some() {
Some(&authed.email)
} else {
None
},
nf.value,
nf.on_behalf_of_email.and(Some(&authed.email)),
sqlx::types::Json(&nf.value) as _,
schema_str,
&authed.username,
)
Expand All @@ -474,7 +486,7 @@ async fn create_flow(
RETURNING id",
w_id,
nf.path,
nf.value,
sqlx::types::Json(nf.value) as _,
schema_str,
&authed.username,
)
Expand Down Expand Up @@ -748,20 +760,7 @@ async fn update_flow(
) -> Result<String> {
let flow_path = flow_path.to_path();
check_scopes(&authed, || format!("flows:write:{}", flow_path))?;
guard_flow_from_debounce_data(&nf).await?;

#[cfg(not(feature = "enterprise"))]
if nf
.value
.get("ws_error_handler_muted")
.map(|val| val.as_bool().unwrap_or(false))
.is_some_and(|val| val)
{
return Err(Error::BadRequest(
"Muting the error handler for certain flow is only available in enterprise version"
.to_string(),
));
}
validate_flow(&nf).await?;

let authed = maybe_refresh_folders(&flow_path, &w_id, authed, &db).await;
let mut tx = user_db.clone().begin(&authed).await?;
Expand Down Expand Up @@ -804,16 +803,12 @@ async fn update_flow(
path = $11 AND workspace_id = $12",
if is_new_path { flow_path } else { &nf.path },
nf.summary,
nf.description.unwrap_or_else(String::new),
nf.description.as_deref().unwrap_or(""),
nf.tag,
nf.dedicated_worker,
nf.visible_to_runner_only.unwrap_or(false),
if nf.on_behalf_of_email.is_some() {
Some(&authed.email)
} else {
None
},
nf.value,
nf.on_behalf_of_email.and(Some(&authed.email)),
sqlx::types::Json(&nf.value) as _,
schema_str,
authed.username,
flow_path,
Expand Down Expand Up @@ -902,7 +897,7 @@ async fn update_flow(
"INSERT INTO flow_version (workspace_id, path, value, schema, created_by) VALUES ($1, $2, $3, $4::text::json, $5) RETURNING id",
w_id,
nf.path,
nf.value,
sqlx::types::Json(nf.value) as _,
schema_str,
&authed.username,
)
Expand Down Expand Up @@ -1370,8 +1365,8 @@ async fn archive_flow_by_path(
/// Validates that flow debouncing configuration is supported by all workers
/// Returns an error if debouncing is configured but workers are behind required version
async fn guard_flow_from_debounce_data(nf: &NewFlow) -> Result<()> {
let flow_value = nf.parse_flow_value()?;
if !*MIN_VERSION_SUPPORTS_DEBOUNCING.read().await && {
let flow_value: FlowValue = serde_json::from_value(nf.value.clone())?;
flow_value.debounce_key.is_some() || flow_value.debounce_delay_s.is_some()
} {
tracing::warn!(
Expand Down
Loading