diff --git a/backend/.sqlx/query-08643ecaac35008b37bfd71a136ee42efb674f61bd2f3507278199773d5d6479.json b/backend/.sqlx/query-08643ecaac35008b37bfd71a136ee42efb674f61bd2f3507278199773d5d6479.json deleted file mode 100644 index 3b1b8e3825489..0000000000000 --- a/backend/.sqlx/query-08643ecaac35008b37bfd71a136ee42efb674f61bd2f3507278199773d5d6479.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT usage FROM usage\n WHERE id = $1\n AND is_workspace = FALSE\n AND month_ = EXTRACT(YEAR FROM current_date) * 12 + EXTRACT(MONTH FROM current_date)", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "usage", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [ - "Text" - ] - }, - "nullable": [ - false - ] - }, - "hash": "08643ecaac35008b37bfd71a136ee42efb674f61bd2f3507278199773d5d6479" -} diff --git a/backend/.sqlx/query-23419adcd74c326d716527293eff518b42f4cdb33e034441015494bd26c172d2.json b/backend/.sqlx/query-23419adcd74c326d716527293eff518b42f4cdb33e034441015494bd26c172d2.json index 1cb68e3acbe73..242b358ff561d 100644 --- a/backend/.sqlx/query-23419adcd74c326d716527293eff518b42f4cdb33e034441015494bd26c172d2.json +++ b/backend/.sqlx/query-23419adcd74c326d716527293eff518b42f4cdb33e034441015494bd26c172d2.json @@ -39,7 +39,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-3162ec92bb32af47a71cc41172cc740b5dea1304ce4dfdb4d3d0efa4266f38c5.json b/backend/.sqlx/query-3162ec92bb32af47a71cc41172cc740b5dea1304ce4dfdb4d3d0efa4266f38c5.json index fa013ba585079..36e56c308fc0e 100644 --- a/backend/.sqlx/query-3162ec92bb32af47a71cc41172cc740b5dea1304ce4dfdb4d3d0efa4266f38c5.json +++ b/backend/.sqlx/query-3162ec92bb32af47a71cc41172cc740b5dea1304ce4dfdb4d3d0efa4266f38c5.json @@ -231,7 +231,8 @@ "postgres", "sqs", "gcp", - "mqtt" + "mqtt", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-d1d9a3184f3470949840cc07414d08641527a4fe329204c65c49ab82d9ee4afb.json b/backend/.sqlx/query-3c67a015f3f49d12ef488480aed517a2c8541249234f134f530693ff02e9f92e.json similarity index 83% rename from backend/.sqlx/query-d1d9a3184f3470949840cc07414d08641527a4fe329204c65c49ab82d9ee4afb.json rename to backend/.sqlx/query-3c67a015f3f49d12ef488480aed517a2c8541249234f134f530693ff02e9f92e.json index 2d3fc8b5b716f..c2b1805cd18e9 100644 --- a/backend/.sqlx/query-d1d9a3184f3470949840cc07414d08641527a4fe329204c65c49ab82d9ee4afb.json +++ b/backend/.sqlx/query-3c67a015f3f49d12ef488480aed517a2c8541249234f134f530693ff02e9f92e.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n EXISTS(SELECT 1 FROM websocket_trigger WHERE workspace_id = $1) AS \"websocket_used!\",\n EXISTS(SELECT 1 FROM http_trigger WHERE workspace_id = $1) AS \"http_routes_used!\",\n EXISTS(SELECT 1 FROM kafka_trigger WHERE workspace_id = $1) as \"kafka_used!\",\n EXISTS(SELECT 1 FROM nats_trigger WHERE workspace_id = $1) as \"nats_used!\",\n EXISTS(SELECT 1 FROM postgres_trigger WHERE workspace_id = $1) AS \"postgres_used!\",\n EXISTS(SELECT 1 FROM mqtt_trigger WHERE workspace_id = $1) AS \"mqtt_used!\",\n EXISTS(SELECT 1 FROM sqs_trigger WHERE workspace_id = $1) AS \"sqs_used!\",\n EXISTS(SELECT 1 FROM gcp_trigger WHERE workspace_id = $1) AS \"gcp_used!\",\n EXISTS(SELECT 1 FROM email_trigger WHERE workspace_id = $1) AS \"email_used!\"\n ", + "query": "\n SELECT\n EXISTS(SELECT 1 FROM websocket_trigger WHERE workspace_id = $1) AS \"websocket_used!\",\n EXISTS(SELECT 1 FROM http_trigger WHERE workspace_id = $1) AS \"http_routes_used!\",\n EXISTS(SELECT 1 FROM kafka_trigger WHERE workspace_id = $1) as \"kafka_used!\",\n EXISTS(SELECT 1 FROM nats_trigger WHERE workspace_id = $1) as \"nats_used!\",\n EXISTS(SELECT 1 FROM postgres_trigger WHERE workspace_id = $1) AS \"postgres_used!\",\n EXISTS(SELECT 1 FROM mqtt_trigger WHERE workspace_id = $1) AS \"mqtt_used!\",\n EXISTS(SELECT 1 FROM sqs_trigger WHERE workspace_id = $1) AS \"sqs_used!\",\n EXISTS(SELECT 1 FROM gcp_trigger WHERE workspace_id = $1) AS \"gcp_used!\",\n EXISTS(SELECT 1 FROM email_trigger WHERE workspace_id = $1) AS \"email_used!\",\n EXISTS(SELECT 1 FROM native_triggers WHERE workspace_id = $1 AND service_name = 'nextcloud'::native_trigger_service) AS \"nextcloud_used!\"\n ", "describe": { "columns": [ { @@ -47,6 +47,11 @@ "ordinal": 8, "name": "email_used!", "type_info": "Bool" + }, + { + "ordinal": 9, + "name": "nextcloud_used!", + "type_info": "Bool" } ], "parameters": { @@ -63,8 +68,9 @@ null, null, null, + null, null ] }, - "hash": "d1d9a3184f3470949840cc07414d08641527a4fe329204c65c49ab82d9ee4afb" + "hash": "3c67a015f3f49d12ef488480aed517a2c8541249234f134f530693ff02e9f92e" } diff --git a/backend/.sqlx/query-42b4b73e9d60348e2d90fcade9dcad6d8995242dc20a4e14c1a8fae4fc6a9fd2.json b/backend/.sqlx/query-42b4b73e9d60348e2d90fcade9dcad6d8995242dc20a4e14c1a8fae4fc6a9fd2.json index 52a2d901ec1bf..e6d71f386f5a4 100644 --- a/backend/.sqlx/query-42b4b73e9d60348e2d90fcade9dcad6d8995242dc20a4e14c1a8fae4fc6a9fd2.json +++ b/backend/.sqlx/query-42b4b73e9d60348e2d90fcade9dcad6d8995242dc20a4e14c1a8fae4fc6a9fd2.json @@ -23,7 +23,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-4f547c0fd54f3bc57212ce87810e35adf640d44d607e62a1fb296e38ac3fdd36.json b/backend/.sqlx/query-4f547c0fd54f3bc57212ce87810e35adf640d44d607e62a1fb296e38ac3fdd36.json index cb53599789015..789c0334d729f 100644 --- a/backend/.sqlx/query-4f547c0fd54f3bc57212ce87810e35adf640d44d607e62a1fb296e38ac3fdd36.json +++ b/backend/.sqlx/query-4f547c0fd54f3bc57212ce87810e35adf640d44d607e62a1fb296e38ac3fdd36.json @@ -31,7 +31,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } @@ -68,7 +69,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-5a219a2532517869578c4504ff3153c43903f929ae5d62fbba12610f89c36d55.json b/backend/.sqlx/query-5a219a2532517869578c4504ff3153c43903f929ae5d62fbba12610f89c36d55.json index 713ccb9dd35d9..36ddb8ab9fd94 100644 --- a/backend/.sqlx/query-5a219a2532517869578c4504ff3153c43903f929ae5d62fbba12610f89c36d55.json +++ b/backend/.sqlx/query-5a219a2532517869578c4504ff3153c43903f929ae5d62fbba12610f89c36d55.json @@ -15,7 +15,7 @@ ] }, "nullable": [ - null + true ] }, "hash": "5a219a2532517869578c4504ff3153c43903f929ae5d62fbba12610f89c36d55" diff --git a/backend/.sqlx/query-615d832a452a6c64de50cd0efada0be238fb16daacb3464bbcf47ca2e21bdaae.json b/backend/.sqlx/query-615d832a452a6c64de50cd0efada0be238fb16daacb3464bbcf47ca2e21bdaae.json index da8704fe30aa1..b0da0f0e69930 100644 --- a/backend/.sqlx/query-615d832a452a6c64de50cd0efada0be238fb16daacb3464bbcf47ca2e21bdaae.json +++ b/backend/.sqlx/query-615d832a452a6c64de50cd0efada0be238fb16daacb3464bbcf47ca2e21bdaae.json @@ -30,7 +30,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-7fbf72d9059fcd77e4c1112fa4fa22e4276c1da653475628889ce17dc904fbaa.json b/backend/.sqlx/query-7fbf72d9059fcd77e4c1112fa4fa22e4276c1da653475628889ce17dc904fbaa.json index 693785ebd0c43..393a920b7c7ac 100644 --- a/backend/.sqlx/query-7fbf72d9059fcd77e4c1112fa4fa22e4276c1da653475628889ce17dc904fbaa.json +++ b/backend/.sqlx/query-7fbf72d9059fcd77e4c1112fa4fa22e4276c1da653475628889ce17dc904fbaa.json @@ -26,7 +26,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-87564a196a1662f524407d853db506bf08c28efe82b68b3d44bafbd3d0e91c29.json b/backend/.sqlx/query-87564a196a1662f524407d853db506bf08c28efe82b68b3d44bafbd3d0e91c29.json index 36927a7fd779e..9350442134dd0 100644 --- a/backend/.sqlx/query-87564a196a1662f524407d853db506bf08c28efe82b68b3d44bafbd3d0e91c29.json +++ b/backend/.sqlx/query-87564a196a1662f524407d853db506bf08c28efe82b68b3d44bafbd3d0e91c29.json @@ -34,7 +34,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-9c50e3a136a8ee3ec56e083f26d3a960b89e02ec40b292f3b5198baf2a1d3dbf.json b/backend/.sqlx/query-9c50e3a136a8ee3ec56e083f26d3a960b89e02ec40b292f3b5198baf2a1d3dbf.json index 94bffa26d0e4b..9759bad4d47ac 100644 --- a/backend/.sqlx/query-9c50e3a136a8ee3ec56e083f26d3a960b89e02ec40b292f3b5198baf2a1d3dbf.json +++ b/backend/.sqlx/query-9c50e3a136a8ee3ec56e083f26d3a960b89e02ec40b292f3b5198baf2a1d3dbf.json @@ -31,7 +31,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-ac9037b8adce156b95390a0ffac04e38ab8474849e0cacb3be1443d7f3265d30.json b/backend/.sqlx/query-ac9037b8adce156b95390a0ffac04e38ab8474849e0cacb3be1443d7f3265d30.json index df39f8f868e22..7587b063d9043 100644 --- a/backend/.sqlx/query-ac9037b8adce156b95390a0ffac04e38ab8474849e0cacb3be1443d7f3265d30.json +++ b/backend/.sqlx/query-ac9037b8adce156b95390a0ffac04e38ab8474849e0cacb3be1443d7f3265d30.json @@ -30,7 +30,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-b3f0595cacba194e08b9a3e244d9e637e9e156cd85b69126c87dfff89a47711d.json b/backend/.sqlx/query-b3f0595cacba194e08b9a3e244d9e637e9e156cd85b69126c87dfff89a47711d.json index d55d9a0a43e9a..54a4e3cd935d4 100644 --- a/backend/.sqlx/query-b3f0595cacba194e08b9a3e244d9e637e9e156cd85b69126c87dfff89a47711d.json +++ b/backend/.sqlx/query-b3f0595cacba194e08b9a3e244d9e637e9e156cd85b69126c87dfff89a47711d.json @@ -23,7 +23,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-bf2163c542fb8c4e173167a8f333ef762fecf782424c5b61b89f32918b8d6971.json b/backend/.sqlx/query-bf2163c542fb8c4e173167a8f333ef762fecf782424c5b61b89f32918b8d6971.json index 46db07e1e0d00..25a24d6491292 100644 --- a/backend/.sqlx/query-bf2163c542fb8c4e173167a8f333ef762fecf782424c5b61b89f32918b8d6971.json +++ b/backend/.sqlx/query-bf2163c542fb8c4e173167a8f333ef762fecf782424c5b61b89f32918b8d6971.json @@ -118,7 +118,8 @@ "postgres", "sqs", "gcp", - "mqtt" + "mqtt", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-db7b39335049f7b5fbb1ba2b99618eeeccdd4b7e14a0c0077af9d978f99ae899.json b/backend/.sqlx/query-db7b39335049f7b5fbb1ba2b99618eeeccdd4b7e14a0c0077af9d978f99ae899.json index 1b48864951515..7ba4c70a26739 100644 --- a/backend/.sqlx/query-db7b39335049f7b5fbb1ba2b99618eeeccdd4b7e14a0c0077af9d978f99ae899.json +++ b/backend/.sqlx/query-db7b39335049f7b5fbb1ba2b99618eeeccdd4b7e14a0c0077af9d978f99ae899.json @@ -24,7 +24,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-dcaf17a826e8f4cba4145abcf72bf749ad1d4381fa3b9df8b5bf534f9c13692e.json b/backend/.sqlx/query-dcaf17a826e8f4cba4145abcf72bf749ad1d4381fa3b9df8b5bf534f9c13692e.json index 95b23e62bf7e1..5f7957c57fab8 100644 --- a/backend/.sqlx/query-dcaf17a826e8f4cba4145abcf72bf749ad1d4381fa3b9df8b5bf534f9c13692e.json +++ b/backend/.sqlx/query-dcaf17a826e8f4cba4145abcf72bf749ad1d4381fa3b9df8b5bf534f9c13692e.json @@ -23,7 +23,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/.sqlx/query-eac595e19e5c8e70f1514ef29dec35c7342ac9a814c73f6290e1d6ebd3a55423.json b/backend/.sqlx/query-eac595e19e5c8e70f1514ef29dec35c7342ac9a814c73f6290e1d6ebd3a55423.json index c4ee4c1a48af9..57607ae052a78 100644 --- a/backend/.sqlx/query-eac595e19e5c8e70f1514ef29dec35c7342ac9a814c73f6290e1d6ebd3a55423.json +++ b/backend/.sqlx/query-eac595e19e5c8e70f1514ef29dec35c7342ac9a814c73f6290e1d6ebd3a55423.json @@ -23,7 +23,8 @@ "sqs", "mqtt", "gcp", - "default_email" + "default_email", + "nextcloud" ] } } diff --git a/backend/Cargo.lock b/backend/Cargo.lock index b7b69cb93fc7c..520995f76ad6a 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -15264,6 +15264,7 @@ dependencies = [ "sha2 0.10.9", "sql-builder", "sqlx", + "strum 0.27.2", "tempfile", "thiserror 2.0.17", "time", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 239555984d8fa..8faf51344d13a 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -78,6 +78,7 @@ http_trigger = ["windmill-api/http_trigger"] postgres_trigger = ["windmill-api/postgres_trigger"] mcp = ["windmill-api/mcp"] mqtt_trigger = ["windmill-api/mqtt_trigger"] +native_triggers = ["windmill-api/native_triggers"] sqs_trigger = ["windmill-api/sqs_trigger", "windmill-common/aws_auth", "windmill-api/openidconnect"] gcp_trigger = ["windmill-api/gcp_trigger"] smtp = ["windmill-api/smtp", "windmill-common/smtp", "windmill-queue/smtp"] diff --git a/backend/migrations/20251015093553_native_triggers.down.sql b/backend/migrations/20251015093553_native_triggers.down.sql new file mode 100644 index 0000000000000..4d3a4fc16e64e --- /dev/null +++ b/backend/migrations/20251015093553_native_triggers.down.sql @@ -0,0 +1,9 @@ +-- Add down migration script here + +DROP TABLE IF EXISTS native_triggers; + +DROP TYPE IF EXISTS native_trigger_service; + +DROP TYPE IF EXISTS runnable_kind; + +DROP TABLE IF EXISTS workspace_integrations; diff --git a/backend/migrations/20251015093553_native_triggers.up.sql b/backend/migrations/20251015093553_native_triggers.up.sql new file mode 100644 index 0000000000000..cdad824ec0da3 --- /dev/null +++ b/backend/migrations/20251015093553_native_triggers.up.sql @@ -0,0 +1,61 @@ +-- Add up migration script here + +CREATE TYPE native_trigger_service AS ENUM ('nextcloud'); +CREATE TYPE runnable_kind AS ENUM ('script', 'flow'); +ALTER TYPE TRIGGER_KIND ADD VALUE IF NOT EXISTS 'nextcloud'; +ALTER TYPE job_trigger_kind ADD VALUE IF NOT EXISTS 'nextcloud'; + +CREATE TABLE native_triggers ( + id BIGSERIAL PRIMARY KEY, + service_name native_trigger_service NOT NULL, + external_id VARCHAR(255) NOT NULL, + runnable_path VARCHAR(255) NOT NULL, + runnable_kind RUNNABLE_KIND NOT NULL, + event_type JSONB NOT NULL, + workspace_id VARCHAR(50) NOT NULL, + summary TEXT, + metadata JSONB NULL, + edited_by VARCHAR(50) NOT NULL, + email VARCHAR(255) NOT NULL, + edited_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT uq_native_triggers_external UNIQUE (service_name, workspace_id, external_id), + CONSTRAINT uq_native_triggers_internal UNIQUE (service_name, workspace_id, id), + CONSTRAINT fk_native_triggers_workspace FOREIGN KEY (workspace_id) + REFERENCES workspace(id) ON DELETE CASCADE +); + +CREATE INDEX idx_native_triggers_service_workspace_external + ON native_triggers (service_name, workspace_id, external_id); + +CREATE INDEX idx_native_triggers_workspace_and_id + ON native_triggers (service_name, workspace_id, id); + +GRANT ALL ON native_triggers TO windmill_user; +GRANT ALL ON native_triggers TO windmill_admin; + +ALTER TABLE native_triggers ENABLE ROW LEVEL SECURITY; + + + +CREATE TABLE workspace_integrations ( + workspace_id VARCHAR(50) NOT NULL, + service_name native_trigger_service NOT NULL, + oauth_data JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_by VARCHAR(50) NOT NULL, + PRIMARY KEY (workspace_id, service_name), + CONSTRAINT fk_workspace_integrations_workspace FOREIGN KEY (workspace_id) + REFERENCES workspace(id) ON DELETE CASCADE +); + +CREATE INDEX idx_workspace_integrations_workspace + ON workspace_integrations (workspace_id); + +CREATE INDEX idx_workspace_integrations_service + ON workspace_integrations (service_name); + +GRANT ALL ON workspace_integrations TO windmill_user; +GRANT ALL ON workspace_integrations TO windmill_admin; + +ALTER TABLE workspace_integrations ENABLE ROW LEVEL SECURITY; \ No newline at end of file diff --git a/backend/src/monitor.rs b/backend/src/monitor.rs index 9d4a8a5faee88..be3c28377f01b 100644 --- a/backend/src/monitor.rs +++ b/backend/src/monitor.rs @@ -29,6 +29,9 @@ use windmill_api::{ SCIM_TOKEN, }; +#[cfg(feature = "native_triggers")] +use windmill_api::native_triggers::sync::sync_all_triggers; + #[cfg(feature = "enterprise")] use windmill_common::ee_oss::low_disk_alerts; #[cfg(feature = "enterprise")] @@ -1693,6 +1696,33 @@ pub async fn monitor_db( update_min_version(conn).await; }; + let native_triggers_sync_f = async { + #[cfg(feature = "native_triggers")] + if server_mode && iteration.is_some() && iteration.as_ref().unwrap().should_run(20) { + if let Some(db) = conn.as_sql() { + match sync_all_triggers(db).await { + Ok(result) => { + tracing::debug!( + "Native triggers sync completed: {} workspaces, {} deleted, {} errors", + result.workspaces_processed, + result.total_deleted, + result.total_errors + ); + if result.total_errors > 0 { + tracing::warn!( + "Native triggers sync encountered {} errors", + result.total_errors + ); + } + } + Err(e) => { + tracing::error!("Error during native triggers sync: {:#}", e); + } + } + } + } + }; + join!( expired_items_f, zombie_jobs_f, @@ -1708,6 +1738,7 @@ pub async fn monitor_db( cleanup_concurrency_counters_f, cleanup_concurrency_counters_empty_keys_f, cleanup_worker_group_stats_f, + native_triggers_sync_f, ); } @@ -1922,7 +1953,6 @@ pub async fn load_base_url(conn: &Connection) -> error::Result { } else { std_base_url }; - { let mut l = BASE_URL.write().await; *l = base_url.clone(); diff --git a/backend/windmill-api/Cargo.toml b/backend/windmill-api/Cargo.toml index 76d05a7188e8a..29bcc0e9ba3e7 100644 --- a/backend/windmill-api/Cargo.toml +++ b/backend/windmill-api/Cargo.toml @@ -32,6 +32,8 @@ http_trigger = ["dep:matchit", "dep:thiserror", "dep:sha1", "dep:constant_time_e static_frontend = ["dep:rust-embed"] postgres_trigger = ["dep:rust-postgres", "dep:pg_escape", "dep:byteorder", "dep:thiserror", "dep:rust_decimal", "dep:rust-postgres-native-tls"] mqtt_trigger = ["dep:thiserror", "dep:rumqttc"] +native_trigger = ["dep:strum"] +native_triggers = ["native_trigger", "dep:backon", "dep:strum"] sqs_trigger = ["dep:aws-sdk-sqs", "dep:aws-sdk-sts", "dep:aws-sdk-sso", "dep:aws-sdk-ssooidc", "dep:thiserror", "dep:aws-config", "dep:backon"] deno_core = ["dep:deno_core", "dep:deno_error"] gcp_trigger = ["dep:thiserror", "dep:google-cloud-pubsub", "dep:google-cloud-googleapis", "dep:tonic"] @@ -155,5 +157,6 @@ tonic = { workspace = true, optional = true } deno_error = { workspace = true, optional = true } deno_core = { workspace = true, optional = true } backon = {workspace = true, optional = true} +strum = { workspace = true, optional = true } [build-dependencies] deno_core = { workspace = true, optional = true } diff --git a/backend/windmill-api/openapi.yaml b/backend/windmill-api/openapi.yaml index 591f8992a07d4..1af60c7578613 100644 --- a/backend/windmill-api/openapi.yaml +++ b/backend/windmill-api/openapi.yaml @@ -2997,6 +2997,8 @@ paths: type: boolean email_used: type: boolean + nextcloud_used: + type: boolean required: - http_routes_used - websocket_used @@ -3007,6 +3009,7 @@ paths: - gcp_used - sqs_used - email_used + - nextcloud_used /w/{workspace}/users/list: get: summary: list users @@ -10508,6 +10511,414 @@ paths: schema: type: string + /w/{workspace}/native_triggers/integrations/list: + get: + summary: list available native trigger services + operationId: listNativeTriggerServices + tags: + - workspace_integration + parameters: + - $ref: "#/components/parameters/WorkspaceId" + responses: + "200": + description: native trigger services list + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/WorkspaceIntegrations" + + /w/{workspace}/native_triggers/integrations/{service_name}/exists: + get: + summary: check if integrations for a particular service exists + operationId: checkIfNativeTriggersServiceExists + tags: + - workspace_integration + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + responses: + "200": + description: integration exists + content: + application/json: + schema: + type: boolean + + /w/{workspace}/native_triggers/integrations/{service_name}/create: + post: + summary: create native trigger service + operationId: createNativeTriggerService + tags: + - workspace_integration + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + requestBody: + description: new native trigger service + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/WorkspaceOAuthConfig" + responses: + "201": + description: native trigger service created + content: + text/plain: + schema: + type: string + + /w/{workspace}/native_triggers/integrations/{service_name}/generate_connect_url: + post: + summary: generate connect url for native trigger service + operationId: generateNativeTriggerServiceConnectUrl + tags: + - workspace_integration + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + requestBody: + description: redirect_uri + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/RedirectUri" + responses: + "200": + description: native trigger service connect url + content: + application/json: + schema: + type: string + + /w/{workspace}/native_triggers/integrations/{service_name}/delete: + delete: + summary: delete native trigger service + operationId: deleteNativeTriggerService + tags: + - workspace_integration + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + responses: + "200": + description: native trigger service deleted + content: + text/plain: + schema: + type: string + + /w/{workspace}/native_triggers/integrations/{service_name}/callback/{code}/{state}: + post: + summary: native trigger service oauth callback + operationId: nativeTriggerServiceCallback + tags: + - workspace_integration + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + - name: code + in: path + required: true + schema: + type: string + - name: state + in: path + required: true + schema: + type: string + requestBody: + description: redirect_uri + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/RedirectUri" + responses: + "200": + description: native trigger service oauth completed + content: + text/plain: + schema: + type: string + + /w/{workspace}/native_triggers/{service_name}/create: + post: + summary: create native trigger + operationId: createNativeTrigger + tags: + - native_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + requestBody: + description: new native trigger + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/NativeTriggerData" + responses: + "201": + description: native trigger created + content: + application/json: + schema: + $ref: "#/components/schemas/CreateTriggerResponse" + + /w/{workspace}/native_triggers/{service_name}/update/{id}: + post: + summary: update native trigger + operationId: updateNativeTrigger + tags: + - native_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + - name: id + in: path + required: true + schema: + type: integer + format: int64 + requestBody: + description: updated native trigger + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/NativeTriggerData" + responses: + "200": + description: native trigger updated + content: + text/plain: + schema: + type: string + + /w/{workspace}/native_triggers/{service_name}/get/{id}/{path}: + get: + summary: get native trigger + operationId: getNativeTrigger + tags: + - native_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + - name: id + in: path + required: true + schema: + type: integer + format: int64 + - $ref: "#/components/parameters/Path" + responses: + "200": + description: native trigger with external configuration + content: + application/json: + schema: + $ref: "#/components/schemas/FullTriggerResponse" + + /w/{workspace}/native_triggers/{service_name}/delete: + delete: + summary: delete native trigger + operationId: deleteNativeTrigger + tags: + - native_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + requestBody: + description: trigger id and runnable path + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/IdAndRunnablePath" + responses: + "200": + description: native trigger deleted + content: + text/plain: + schema: + type: string + + /w/{workspace}/native_triggers/{service_name}/list: + get: + summary: list native triggers + operationId: listNativeTriggers + tags: + - native_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + required: true + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + - $ref: "#/components/parameters/Page" + - $ref: "#/components/parameters/PerPage" + responses: + "200": + description: native triggers list + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/NativeTrigger" + + /w/{workspace}/native_triggers/{service_name}/exists: + post: + summary: does native trigger exist + operationId: existsNativeTrigger + tags: + - native_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + requestBody: + description: trigger id and runnable path + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/IdAndRunnablePath" + responses: + "200": + description: native trigger exists + content: + application/json: + schema: + type: boolean + + /w/{workspace}/native_triggers/{service_name}/sync: + post: + summary: sync native triggers with external service + operationId: syncNativeTriggers + tags: + - native_trigger + parameters: + - $ref: "#/components/parameters/WorkspaceId" + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + responses: + "200": + description: sync completed successfully + + /w/{workspace}/native_triggers/nextcloud/events: + get: + summary: list available NextCloud events + operationId: listNextCloudEvents + tags: + - native_trigger + parameters: + - name: workspace + in: path + required: true + schema: + type: string + responses: + "200": + description: list of available NextCloud events + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/NextCloudEventType" + + /native_triggers/{service_name}/w/{workspace_id}/webhook/{internal_id}: + post: + summary: receive webhook from external native trigger service + operationId: nativeTriggerWebhook + tags: + - native_trigger + parameters: + - name: service_name + in: path + required: true + schema: + $ref: "#/components/schemas/NativeServiceName" + - name: workspace_id + in: path + required: true + schema: + type: string + - name: internal_id + in: path + required: true + schema: + type: integer + format: int64 + description: The internal database ID of the trigger + requestBody: + description: webhook payload from external service + required: false + content: + application/json: + schema: + type: object + additionalProperties: true + text/plain: + schema: + type: string + responses: + "200": + description: webhook received successfully + content: + text/plain: + schema: + type: string + /w/{workspace}/mqtt_triggers/create: post: summary: create mqtt trigger @@ -19568,3 +19979,223 @@ components: kind: $ref: "#/components/schemas/AssetKind" required: [path, kind] + + NativeServiceName: + type: string + enum: + - nextcloud + + NativeTrigger: + type: object + properties: + id: + type: integer + format: int64 + service_name: + $ref: "#/components/schemas/NativeServiceName" + external_id: + type: string + workspace_id: + type: string + resource_path: + type: string + runnable_path: + type: string + runnable_kind: + $ref: "#/components/schemas/RunnableKind" + summary: + type: string + metadata: + type: object + edited_by: + type: string + email: + type: string + edited_at: + type: string + format: date-time + required: + - id + - service_name + - external_id + - workspace_id + - resource_path + - runnable_path + - runnable_kind + - summary + - edited_by + - email + - edited_at + + FullTriggerResponse: + type: object + description: Full trigger response containing both Windmill data and external service data + allOf: + - $ref: "#/components/schemas/NativeTrigger" + - type: object + properties: + external_data: + type: object + description: Configuration data from the external service + additionalProperties: true + required: + - external_data + + WorkspaceIntegrations: + type: object + properties: + service_name: + $ref: '#/components/schemas/NativeServiceName' + oauth_data: + nullable: true + $ref: '#/components/schemas/WorkspaceOAuthConfig' + required: + - service_name + + + WorkspaceOAuthConfig: + type: object + properties: + client_id: + type: string + description: The OAuth client ID for the workspace + client_secret: + type: string + description: The OAuth client secret for the workspace + base_url: + type: string + format: uri + description: The base URL of the workspace + redirect_uri: + type: string + format: uri + description: The OAuth redirect URI + required: + - client_id + - client_secret + - base_url + - redirect_uri + + EventType: + oneOf: + - $ref: '#/components/schemas/WebhookEvent' + discriminator: + propertyName: type + mapping: + webhook: '#/components/schemas/WebhookEvent' + + WebhookEvent: + type: object + properties: + type: + type: string + enum: [webhook] + request_type: + $ref: '#/components/schemas/WebhookRequestType' + required: + - type + - request_type + + WebhookRequestType: + type: string + description: The type of webhook request (define possible values here) + enum: + - async + - sync + + RedirectUri: + type: object + properties: + redirect_uri: + type: + string + required: + - redirect_uri + + + NativeTriggerData: + type: object + properties: + runnable_path: + type: string + runnable_kind: + $ref: "#/components/schemas/RunnableKind" + summary: + type: string + external_id: + type: string + event_type: + $ref: "#/components/schemas/EventType" + payload: + type: object + description: Service-specific configuration payload + additionalProperties: true + required: + - runnable_path + - runnable_kind + - resource_path + - external_id + - payload + - event_type + + CreateTriggerResponse: + type: object + properties: + id: + type: integer + format: int64 + description: The internal database ID of the created trigger + required: + - id + + IdAndRunnablePath: + type: object + properties: + id: + type: integer + format: int64 + runnable_path: + type: string + required: + - id + - runnable_path + + SyncResult: + type: object + properties: + already_in_sync: + type: boolean + added_count: + type: integer + added_triggers: + type: array + items: + type: string + total_external: + type: integer + total_windmill: + type: integer + required: + - already_in_sync + - added_count + - added_triggers + - total_external + - total_windmill + + NextCloudEventType: + type: object + properties: + id: + type: string + name: + type: string + description: + type: string + category: + type: string + path: + type: string + required: + - id + - name + - path \ No newline at end of file diff --git a/backend/windmill-api/src/jobs.rs b/backend/windmill-api/src/jobs.rs index 0ff66f67d46b7..6bbaf428ece37 100644 --- a/backend/windmill-api/src/jobs.rs +++ b/backend/windmill-api/src/jobs.rs @@ -1760,6 +1760,8 @@ pub struct RunJobQuery { pub skip_preprocessor: Option, pub poll_delay_ms: Option, pub memory_id: Option, + pub internal_id: Option, + pub service_name: Option, } impl RunJobQuery { @@ -4000,15 +4002,17 @@ pub async fn run_flow_by_path( Query(run_query): Query, args: RawWebhookArgs, ) -> error::Result<(StatusCode, String)> { - let args = args - .to_args_from_runnable( - &authed, - &db, - &w_id, - RunnableId::from_flow_path(flow_path.to_path()), - run_query.skip_preprocessor, - ) - .await?; + let args = generate_args_runnable( + &db, + &authed, + user_db.clone(), + flow_path.to_path(), + &run_query, + &w_id, + args, + true, + ) + .await?; let (uuid, _) = run_flow_by_path_inner(authed, db, user_db, w_id, flow_path, run_query, args).await?; @@ -4233,20 +4237,106 @@ pub async fn run_script_by_path( Query(run_query): Query, args: RawWebhookArgs, ) -> error::Result<(StatusCode, String)> { + let args = generate_args_runnable( + &db, + &authed, + user_db.clone(), + script_path.to_path(), + &run_query, + &w_id, + args, + false, + ) + .await?; + + let (uuid, _) = + run_script_by_path_inner(authed, db, user_db, w_id, script_path, run_query, args).await?; + + Ok((StatusCode::CREATED, uuid.to_string())) +} + +#[allow(unused)] +pub async fn generate_args_runnable( + db: &DB, + authed: &ApiAuthed, + user_db: UserDB, + runnable_path: &str, + run_query: &RunJobQuery, + w_id: &str, + args: RawWebhookArgs, + is_flow: bool, +) -> error::Result { + #[cfg(feature = "native_triggers")] + if let (Some(service_name), Some(internal_id)) = + (&run_query.service_name, &run_query.internal_id) + { + use crate::native_triggers::{handler::get_native_trigger, ServiceName}; + + let service_name = ServiceName::try_from(service_name.to_owned())?; + let id = internal_id + .parse::() + .map_err(|_| Error::BadRequest("Invalid native trigger id".to_string()))?; + let mut tx = user_db.begin(authed).await?; + let native_trigger = get_native_trigger(&mut *tx, &w_id, id, service_name).await?; + tx.commit().await?; + if native_trigger.runnable_path != runnable_path { + return Err(Error::BadRequest(format!( + "Expected to run runnable: {} not: {}", + &native_trigger.runnable_path, runnable_path + ))); + } + + let args = match service_name { + ServiceName::Nextcloud => { + use crate::{ + native_triggers::{ + decrypt_oauth_data, nextcloud::NextCloud, nextcloud::NextCloudOAuthData, + }, + triggers::trigger_helpers::TriggerJobArgs, + }; + + let mut webhook_payload = args.process_args(authed, db, w_id, None).await?; + + let oauth_data: NextCloudOAuthData = + decrypt_oauth_data(db, &db, &w_id, service_name).await?; + + //Uncomment once nextcloud integrate the ephemeral token in the webhook payload + /*webhook_payload + .metadata + .headers + .insert("base_url".to_string(), to_raw_value(&oauth_data.base_url)); + webhook_payload.metadata.headers.insert( + "access_token".to_string(), + to_raw_value(&oauth_data.access_token), + );*/ + + let args = NextCloud::build_job_args( + runnable_path, + is_flow, + w_id, + db, + to_raw_value(&webhook_payload.body), + webhook_payload.metadata.headers, + ) + .await?; + args + } + }; + + return Ok(args); + } + let args = args .to_args_from_runnable( &authed, &db, &w_id, - RunnableId::from_script_path(script_path.to_path()), + RunnableId::from_script_path(runnable_path), run_query.skip_preprocessor, ) .await?; - let (uuid, _) = - run_script_by_path_inner(authed, db, user_db, w_id, script_path, run_query, args).await?; - - Ok((StatusCode::CREATED, uuid.to_string())) + Ok(args) } pub async fn run_script_by_path_inner( diff --git a/backend/windmill-api/src/lib.rs b/backend/windmill-api/src/lib.rs index 587c5665220e1..f8725321cc550 100644 --- a/backend/windmill-api/src/lib.rs +++ b/backend/windmill-api/src/lib.rs @@ -151,6 +151,8 @@ mod smtp_server_oss; pub mod teams_approvals_ee; mod teams_approvals_oss; +#[cfg(feature = "native_triggers")] +pub mod native_triggers; mod static_assets; #[cfg(all(feature = "stripe", feature = "enterprise", feature = "private"))] pub mod stripe_ee; @@ -455,6 +457,18 @@ pub async fn run_server( .nest("/job_metrics", job_metrics::workspaced_service()) .nest("/job_helpers", job_helpers_service) .nest("/jobs", jobs::workspaced_service()) + .nest("/native_triggers", { + #[cfg(feature = "native_triggers")] + { + native_triggers::handler::generate_native_trigger_routers().merge( + native_triggers::workspace_integrations::workspaced_service(), + ) + } + #[cfg(not(feature = "native_triggers"))] + { + axum::Router::new() + } + }) .nest("/oauth", { #[cfg(feature = "oauth2")] { diff --git a/backend/windmill-api/src/native_triggers/handler.rs b/backend/windmill-api/src/native_triggers/handler.rs new file mode 100644 index 0000000000000..aeb9e4d7bbf71 --- /dev/null +++ b/backend/windmill-api/src/native_triggers/handler.rs @@ -0,0 +1,505 @@ +use crate::{ + db::ApiAuthed, + native_triggers::{ + delete_native_trigger, get_workspace_integration, list_native_triggers, + store_native_trigger, update_native_trigger, EventType, External, NativeTrigger, + NativeTriggerData, ServiceName, + }, + users::{create_token_internal, NewToken}, + utils::check_scopes, +}; +use axum::{ + extract::{Path, Query}, + routing::{delete, get, post}, + Extension, Json, Router, +}; +use serde::{Deserialize, Serialize}; +use sqlx::{PgConnection, Postgres}; +use std::sync::Arc; +use windmill_audit::{audit_oss::audit_log, ActionKind}; +use windmill_common::{ + db::UserDB, + error::{Error, JsonResult, Result}, + utils::{rd_string, RunnableKind, StripPath}, + DB, +}; + +#[derive(Debug, Deserialize)] +pub struct ListQuery { + pub page: Option, + pub per_page: Option, +} + +#[derive(Debug, Serialize)] +pub struct FullTriggerResponse { + #[serde(flatten)] + pub windmill_data: NativeTrigger, + pub external_data: T, +} + +#[derive(Debug, Serialize)] +pub struct CreateTriggerResponse { + pub id: i64, +} + +async fn new_webhook_token( + tx: &mut PgConnection, + db: &DB, + authed: &ApiAuthed, + runnable_path: &str, + runnable_kind: RunnableKind, + workspace_id: &str, + service_name: ServiceName, +) -> Result { + let kind = if runnable_kind == RunnableKind::Script { + "scripts" + } else { + "flows" + }; + + let scopes = vec![format!("jobs:run:{kind}:{runnable_path}")]; + let label = format!( + "native-triggers-webhook-{}-{}", + service_name.as_str(), + rd_string(5) + ); + let token_config = NewToken::new( + Some(label), + None, + None, + Some(scopes), + Some(workspace_id.to_owned()), + ); + let token = create_token_internal(&mut *tx, &db, &authed, token_config).await?; + + Ok(token) +} + +async fn create_native_trigger( + Extension(handler): Extension>, + Extension(service_name): Extension, + authed: ApiAuthed, + Extension(db): Extension, + Extension(user_db): Extension, + Path(workspace_id): Path, + Json(mut data): Json>, +) -> JsonResult { + check_scopes(&authed, || { + format!("native_triggers:write:{}", &data.runnable_path) + })?; + + let _ = handler.validate_data_config(&data); + let mut tx = user_db.begin(&authed).await?; + + let EventType::Webhook(webhook) = &mut data.event_type; + + webhook.token = new_webhook_token( + &mut *tx, + &db, + &authed, + &data.runnable_path, + data.runnable_kind, + &workspace_id, + service_name, + ) + .await?; + + let integration = get_workspace_integration(&mut *tx, &workspace_id, service_name).await?; + + let oauth_data: T::OAuthData = serde_json::from_value(integration.oauth_data).map_err(|e| { + Error::InternalErr(format!( + "Failed to parse {} OAuth data: {}", + T::DISPLAY_NAME, + e + )) + })?; + + let trigger_id = + store_native_trigger(&mut *tx, &authed, &workspace_id, service_name, &data).await?; + + let resp = handler + .create(&workspace_id, trigger_id, &oauth_data, &data, &db, &mut tx) + .await?; + + let (external_id, _) = handler.external_id_and_metadata_from_response(&resp); + + sqlx::query!( + r#" + UPDATE + native_triggers + SET + external_id = $1 + WHERE + workspace_id = $2 + AND id = $3 + AND service_name = $4 + "#, + external_id, + &workspace_id, + trigger_id, + service_name as ServiceName + ) + .execute(&mut *tx) + .await?; + + audit_log( + &mut *tx, + &authed, + &format!("native_triggers.{}.create", service_name), + ActionKind::Create, + &workspace_id, + None, + None, + ) + .await?; + + tx.commit().await?; + + Ok(Json(CreateTriggerResponse { id: trigger_id })) +} + +async fn update_native_trigger_handler( + Extension(handler): Extension>, + Extension(service_name): Extension, + authed: ApiAuthed, + Extension(db): Extension, + Extension(user_db): Extension, + Path((workspace_id, id)): Path<(String, i64)>, + Json(mut data): Json>, +) -> Result { + check_scopes(&authed, || { + format!("native_triggers:write:{}", &data.runnable_path) + })?; + let _ = handler.validate_data_config(&data); + + let mut tx = user_db.begin(&authed).await?; + + let EventType::Webhook(webhook) = &mut data.event_type; + + let existing = get_native_trigger(&mut *tx, &workspace_id, id, service_name).await?; + + let EventType::Webhook(exist_webhook_token) = existing.event_type.0; + + webhook.token = exist_webhook_token.token; + + let integration = get_workspace_integration(&mut *tx, &workspace_id, service_name).await?; + + let oauth_data: T::OAuthData = serde_json::from_value(integration.oauth_data).map_err(|e| { + Error::InternalErr(format!( + "Failed to parse {} OAuth data: {}", + T::DISPLAY_NAME, + e + )) + })?; + + handler + .update( + &workspace_id, + id, + &oauth_data, + &existing.external_id, + &data, + &db, + &mut tx, + ) + .await?; + + update_native_trigger(&mut *tx, &authed, &workspace_id, id, service_name, &data).await?; + + audit_log( + &mut *tx, + &authed, + &format!("native_triggers.{}.update", service_name), + ActionKind::Update, + &workspace_id, + None, + None, + ) + .await?; + + tx.commit().await?; + + Ok(format!("Native trigger updated")) +} + +async fn get_native_trigger_handler( + Extension(handler): Extension>, + Extension(service_name): Extension, + authed: ApiAuthed, + Extension(db): Extension, + Extension(user_db): Extension, + Path((workspace_id, id, runnable_path)): Path<(String, i64, StripPath)>, +) -> JsonResult> { + let runnable_path = runnable_path.to_path(); + + check_scopes(&authed, || { + format!("native_triggers:read:{}", runnable_path) + })?; + + let mut tx = user_db.begin(&authed).await?; + + let windmill_trigger = get_native_trigger(&mut *tx, &workspace_id, id, service_name).await?; + + let integration = get_workspace_integration(&mut *tx, &workspace_id, service_name).await?; + + let oauth_data: T::OAuthData = serde_json::from_value(integration.oauth_data).map_err(|e| { + Error::InternalErr(format!( + "Failed to parse {} OAuth data: {}", + T::DISPLAY_NAME, + e + )) + })?; + + let native_trigger = handler + .get( + &workspace_id, + &oauth_data, + &windmill_trigger.external_id, + &db, + &mut tx, + ) + .await; + let native_trigger_config = match native_trigger { + Ok(native_cfg) => native_cfg, + Err(Error::NotFound(_)) => { + tracing::warn!( + "Native trigger no longer exists on external service {}, auto-deleting from database", + service_name + ); + let deleted = delete_native_trigger(&mut *tx, &workspace_id, id, service_name).await?; + + if deleted { + audit_log( + &mut *tx, + &authed, + &format!("native_triggers.{}.auto_delete", service_name), + ActionKind::Delete, + &workspace_id, + Some(&format!("Auto-deleted trigger {} (external_id: {}) because it no longer exists on external service", + windmill_trigger.runnable_path, windmill_trigger.external_id)), + None, + ) + .await?; + + tracing::info!( + "Auto-deleted native trigger {} from database (external_id: {})", + windmill_trigger.runnable_path, + windmill_trigger.external_id + ); + } + + return Err(Error::NotFound(format!( + "Trigger '{}' (external_id: {}) no longer exists on external service {} and has been automatically deleted", + windmill_trigger.runnable_path, + windmill_trigger.external_id, + service_name + ))); + } + Err(e) => return Err(e), + }; + + let full_resp = Json(FullTriggerResponse { + windmill_data: windmill_trigger, + external_data: native_trigger_config, + }); + + Ok(full_resp) +} + +#[derive(Debug, Deserialize, Serialize)] +struct IdAndRunnablePath { + id: i64, + runnable_path: String, +} + +async fn delete_native_trigger_handler( + Extension(handler): Extension>, + Extension(service_name): Extension, + authed: ApiAuthed, + Extension(db): Extension, + Extension(user_db): Extension, + Path(workspace_id): Path, + Json(IdAndRunnablePath { id, runnable_path }): Json, +) -> Result { + check_scopes(&authed, || { + format!("native_triggers:write:{}", runnable_path) + })?; + + let mut tx = user_db.begin(&authed).await?; + + let existing = get_native_trigger(&mut *tx, &workspace_id, id, service_name).await?; + + let integration = get_workspace_integration(&mut *tx, &workspace_id, service_name).await?; + + let oauth_data: T::OAuthData = serde_json::from_value(integration.oauth_data).map_err(|e| { + Error::InternalErr(format!( + "Failed to parse {} OAuth data: {}", + T::DISPLAY_NAME, + e + )) + })?; + + handler + .delete( + &workspace_id, + &oauth_data, + &existing.external_id, + &db, + &mut tx, + ) + .await?; + + let deleted = delete_native_trigger(&mut *tx, &workspace_id, id, service_name).await?; + + if !deleted { + return Err(Error::NotFound(format!("Native trigger not found",))); + } + + audit_log( + &mut *tx, + &authed, + &format!("native_triggers.{}.delete", service_name), + ActionKind::Delete, + &workspace_id, + None, + None, + ) + .await?; + + tx.commit().await?; + + Ok(format!("Native trigger deleted")) +} + +async fn exists_native_trigger_handler( + Extension(service_name): Extension, + authed: ApiAuthed, + Extension(db): Extension, + Path(workspace_id): Path, + Json(IdAndRunnablePath { id, runnable_path }): Json, +) -> JsonResult { + check_scopes(&authed, || { + format!("native_triggers:read:{}", &runnable_path) + })?; + + let exists = sqlx::query_scalar!( + r#" + SELECT EXISTS( + SELECT + 1 + FROM + native_triggers + WHERE + workspace_id = $1 AND + id = $2 AND + service_name = $3 AND + runnable_path = $4 + ) + "#, + workspace_id, + id, + service_name as ServiceName, + runnable_path + ) + .fetch_one(&db) + .await? + .unwrap_or(false); + + Ok(Json(exists)) +} + +async fn list_native_triggers_handler( + Extension(service_name): Extension, + authed: ApiAuthed, + Extension(user_db): Extension, + Path(workspace_id): Path, + Query(query): Query, +) -> JsonResult> { + let mut tx = user_db.begin(&authed).await?; + let triggers = list_native_triggers( + &mut *tx, + &workspace_id, + service_name, + query.page, + query.per_page, + ) + .await?; + tx.commit().await?; + Ok(Json(triggers)) +} + +pub async fn get_native_trigger<'c, E: sqlx::Executor<'c, Database = Postgres>>( + db: E, + workspace_id: &str, + id: i64, + service_name: ServiceName, +) -> Result { + let native_trigger = sqlx::query_as!( + NativeTrigger, + r#" + SELECT + id, + runnable_path, + event_type AS "event_type!: sqlx::types::Json", + runnable_kind AS "runnable_kind!: RunnableKind", + service_name as "service_name!: ServiceName", + external_id, + workspace_id, + summary, + metadata, + edited_by, + email, + edited_at + FROM + native_triggers + WHERE + workspace_id = $1 + AND id = $2 + AND service_name = $3 + "#, + workspace_id, + id, + service_name as ServiceName + ) + .fetch_optional(db) + .await? + .ok_or_else(|| Error::NotFound(format!("Native trigger not found at path"))); + + native_trigger +} + +pub fn service_routes(handler: T) -> Router { + let additional_routes = handler.additional_routes(); + let service_name = T::SERVICE_NAME; + + let handler_arc = Arc::new(handler); + + let standard_routes = Router::new() + .route("/create", post(create_native_trigger::)) + .route("/list", get(list_native_triggers_handler::)) + .route("/get/:id/*path", get(get_native_trigger_handler::)) + .route("/update/:id", post(update_native_trigger_handler::)) + .route("/delete", delete(delete_native_trigger_handler::)) + .route("/exists", post(exists_native_trigger_handler::)); + + standard_routes + .merge(additional_routes) + .layer(Extension(handler_arc)) + .layer(Extension(service_name)) +} + +pub fn generate_native_trigger_routers() -> Router { + let router = Router::new(); + + #[cfg(feature = "native_triggers")] + { + use crate::native_triggers::nextcloud::NextCloud; + + return router.nest("/nextcloud", service_routes(NextCloud)); + } + + #[cfg(not(feature = "native_triggers"))] + { + router + } +} diff --git a/backend/windmill-api/src/native_triggers/mod.rs b/backend/windmill-api/src/native_triggers/mod.rs new file mode 100644 index 0000000000000..c370dfcdf6734 --- /dev/null +++ b/backend/windmill-api/src/native_triggers/mod.rs @@ -0,0 +1,845 @@ +use crate::db::ApiAuthed; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use http::StatusCode; +use itertools::Itertools; +use reqwest::{Client, Method}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde_json::json; +use sqlx::{FromRow, PgConnection, Postgres}; +use std::{collections::HashMap, fmt::Debug}; +use strum::{EnumIter, IntoEnumIterator}; +use tokio::task; +use windmill_common::{ + error::{to_anyhow, Error, Result}, + triggers::TriggerKind, + utils::RunnableKind, + variables::{build_crypt, decrypt, encrypt}, + DB, +}; +use windmill_queue::PushArgsOwned; +pub mod handler; +pub mod sync; +pub mod workspace_integrations; + +#[cfg(feature = "native_triggers")] +pub mod nextcloud; + +#[derive(EnumIter, sqlx::Type, Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[sqlx(type_name = "native_trigger_service", rename_all = "lowercase")] +#[serde(rename_all = "lowercase")] + +pub enum ServiceName { + Nextcloud, +} + +impl TryFrom for ServiceName { + type Error = Error; + fn try_from(value: String) -> std::result::Result { + let service = match value.as_str() { + "nextcloud" => ServiceName::Nextcloud, + _ => { + return Err(anyhow::anyhow!( + "Unknown service, currently supported services are: [{}]", + ServiceName::iter().join(",") + ) + .into()) + } + }; + + Ok(service) + } +} + +impl ServiceName { + pub fn as_str(&self) -> &'static str { + match self { + ServiceName::Nextcloud => "nextcloud", + } + } + pub fn as_trigger_kind(&self) -> TriggerKind { + match self { + ServiceName::Nextcloud => TriggerKind::Nextcloud, + } + } +} + +impl std::fmt::Display for ServiceName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let service_name = match self { + ServiceName::Nextcloud => "nextcloud", + }; + + write!(f, "{}", service_name) + } +} + +#[derive(Debug, Clone, FromRow, Serialize, Deserialize)] +pub struct NativeTrigger { + pub id: i64, + pub service_name: ServiceName, + pub external_id: String, + pub workspace_id: String, + pub runnable_path: String, + pub runnable_kind: RunnableKind, + pub event_type: sqlx::types::Json, + pub summary: Option, + pub metadata: Option, + pub edited_by: String, + pub email: String, + pub edited_at: DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum WebhookRequestType { + Async, + Sync, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebhookConfig { + pub request_type: WebhookRequestType, + #[serde(default)] + pub token: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "lowercase")] +pub enum EventType { + Webhook(WebhookConfig), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct NativeTriggerData

{ + pub external_id: String, + pub runnable_path: String, + pub runnable_kind: RunnableKind, + pub summary: Option, + pub event_type: EventType, + pub payload: P, +} + +#[derive(Debug, Clone, FromRow, Serialize, Deserialize)] +pub struct WorkspaceIntegration { + pub workspace_id: String, + pub service_name: ServiceName, + pub oauth_data: serde_json::Value, + pub created_at: DateTime, + pub updated_at: DateTime, + pub created_by: String, +} + +#[async_trait] +pub trait External: Send + Sync + 'static { + type Payload: Debug + DeserializeOwned + Serialize + Send + Sync; + type TriggerData: Debug + Serialize + Send + Sync; + type OAuthData: DeserializeOwned + Serialize + Clone + Send + Sync; + type CreateResponse: DeserializeOwned + Send + Sync; + + const SUPPORT_WEBHOOK: bool; + const SERVICE_NAME: ServiceName; + const DISPLAY_NAME: &'static str; + const TOKEN_ENDPOINT: &'static str; + const REFRESH_ENDPOINT: &'static str; + + async fn create( + &self, + w_id: &str, + internal_id: i64, + oauth_data: &Self::OAuthData, + data: &NativeTriggerData, + db: &DB, + tx: &mut PgConnection, + ) -> Result; + + async fn update( + &self, + w_id: &str, + internal_id: i64, + oauth_data: &Self::OAuthData, + external_id: &str, + data: &NativeTriggerData, + db: &DB, + tx: &mut PgConnection, + ) -> Result<()>; + + async fn get( + &self, + w_id: &str, + oauth_data: &Self::OAuthData, + external_id: &str, + db: &DB, + tx: &mut PgConnection, + ) -> Result; + + async fn delete( + &self, + w_id: &str, + oauth_data: &Self::OAuthData, + external_id: &str, + db: &DB, + tx: &mut PgConnection, + ) -> Result<()>; + + #[allow(unused)] + async fn exists( + &self, + w_id: &str, + oauth_data: &Self::OAuthData, + external_id: &str, + db: &DB, + tx: &mut PgConnection, + ) -> Result; + + async fn list_all( + &self, + w_id: &str, + oauth_data: &Self::OAuthData, + db: &DB, + tx: &mut PgConnection, + ) -> Result>; + + async fn validate_data_config(&self, _data: &NativeTriggerData) -> Result<()> { + Ok(()) + } + + async fn prepare_webhook( + &self, + _db: &DB, + _w_id: &str, + _header: HashMap, + _body: String, + _runnable_path: &str, + _is_flow: bool, + ) -> Result { + Ok(PushArgsOwned { extra: None, args: HashMap::new() }) + } + + fn external_id_and_metadata_from_response( + &self, + resp: &Self::CreateResponse, + ) -> (String, Option); + + fn get_external_id_from_trigger_data(&self, data: &Self::TriggerData) -> String; + + fn additional_routes(&self) -> axum::Router { + axum::Router::new() + } + + async fn http_client_request( + &self, + url: &str, + method: Method, + workspace_id: &str, + tx: &mut PgConnection, + db: &DB, + headers: Option>, + body: Option<&B>, + ) -> Result { + let oauth_config: OAuthConfig = + decrypt_oauth_data(tx, db, workspace_id, Self::SERVICE_NAME).await?; + + let result = make_http_request( + url, + method.clone(), + headers.clone(), + body.as_ref(), + &oauth_config.access_token, + ) + .await; + + match result { + Ok(response) => Ok(response), + Err(err) + if err.status() == Some(StatusCode::UNAUTHORIZED) + || err.status() == Some(StatusCode::FORBIDDEN) => + { + tracing::info!( + "HTTP auth error ({}), attempting token refresh", + err.status().unwrap() + ); + + let refreshed_oauth_config = + refresh_oauth_tokens(&oauth_config, Self::REFRESH_ENDPOINT).await?; + + task::spawn({ + let db_clone = db.clone(); + let workspace_id_clone = workspace_id.to_string(); + let refreshed_json = oauth_config_to_json(&refreshed_oauth_config); + async move { + update_workspace_integration_tokens_helper( + db_clone, + workspace_id_clone, + Self::SERVICE_NAME, + refreshed_json, + ) + .await; + } + }); + + let response = make_http_request( + url, + method, + headers, + body.as_ref(), + &refreshed_oauth_config.access_token, + ) + .await + .map_err(to_anyhow)?; + Ok(response) + } + Err(e) => Err(to_anyhow(e).into()), + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct OAuthConfig { + pub base_url: String, + pub access_token: String, + pub refresh_token: Option, + pub client_id: String, + pub client_secret: String, +} + +pub async fn make_http_request( + url: &str, + method: Method, + headers: Option>, + body: Option<&B>, + access_token: &str, +) -> std::result::Result { + let client = Client::new(); + let mut request = client.request(method, url); + + request = request + .header("Accept", "application/json") + .header("Authorization", format!("Bearer {}", access_token)); + + if body.is_some() { + request = request.header("Content-Type", "application/json"); + } + + if let Some(custom_headers) = headers { + for (key, value) in custom_headers { + request = request.header(key, value); + } + } + + if let Some(body_content) = body { + request = request.json(body_content); + } + + let response = request.send().await?.error_for_status()?; + + let response_json = response.json().await?; + + Ok(response_json) +} + +pub async fn decrypt_oauth_data< + 'c, + E: sqlx::Executor<'c, Database = Postgres>, + T: DeserializeOwned, +>( + tx: E, + db: &DB, + workspace_id: &str, + service_name: ServiceName, +) -> Result { + let integration = get_workspace_integration(tx, workspace_id, service_name).await?; + + let mc = build_crypt(db, workspace_id).await?; + let mut oauth_data: serde_json::Value = integration.oauth_data; + + if let Some(encrypted_access_token) = oauth_data.get("access_token").and_then(|v| v.as_str()) { + let decrypted_access_token = decrypt(&mc, encrypted_access_token.to_string()) + .map_err(|e| Error::InternalErr(format!("Failed to decrypt access token: {}", e)))?; + oauth_data["access_token"] = serde_json::Value::String(decrypted_access_token); + } + + if let Some(encrypted_refresh_token) = oauth_data.get("refresh_token").and_then(|v| v.as_str()) + { + let decrypted_refresh_token = decrypt(&mc, encrypted_refresh_token.to_string()) + .map_err(|e| Error::InternalErr(format!("Failed to decrypt refresh token: {}", e)))?; + oauth_data["refresh_token"] = serde_json::Value::String(decrypted_refresh_token); + } + + serde_json::from_value(oauth_data) + .map_err(|e| Error::InternalErr(format!("Failed to deserialize OAuth data: {}", e))) +} + +#[allow(unused)] +pub fn oauth_data_to_config(oauth_data: &serde_json::Value) -> Result { + let base_url = oauth_data + .get("base_url") + .and_then(|v| v.as_str()) + .ok_or_else(|| Error::InternalErr("No base_url in OAuth data".to_string()))? + .to_string(); + + let access_token = oauth_data + .get("access_token") + .and_then(|v| v.as_str()) + .ok_or_else(|| Error::InternalErr("No access_token in OAuth data".to_string()))? + .to_string(); + + let refresh_token = oauth_data + .get("refresh_token") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + let client_id = oauth_data + .get("client_id") + .and_then(|v| v.as_str()) + .ok_or_else(|| Error::InternalErr("No client_id in OAuth data".to_string()))? + .to_string(); + + let client_secret = oauth_data + .get("client_secret") + .and_then(|v| v.as_str()) + .ok_or_else(|| Error::InternalErr("No client_secret in OAuth data".to_string()))? + .to_string(); + + Ok(OAuthConfig { base_url, access_token, refresh_token, client_id, client_secret }) +} + +#[inline] +pub fn oauth_config_to_json(config: &OAuthConfig) -> serde_json::Value { + let mut json = json!({ + "base_url": config.base_url, + "access_token": config.access_token, + "client_id": config.client_id, + "client_secret": config.client_secret, + }); + + if let Some(refresh_token) = &config.refresh_token { + json["refresh_token"] = serde_json::Value::String(refresh_token.clone()); + } + + json +} + +pub async fn refresh_oauth_tokens( + oauth_config: &OAuthConfig, + refresh_endpoint: &str, +) -> Result { + let refresh_token = oauth_config + .refresh_token + .as_ref() + .ok_or_else(|| Error::InternalErr("No refresh token available".to_string()))?; + + let client = Client::new(); + + let params = [ + ("grant_type", "refresh_token"), + ("client_id", &oauth_config.client_id), + ("client_secret", &oauth_config.client_secret), + ("refresh_token", refresh_token), + ]; + + let response = client + .post(format!("{}{}", oauth_config.base_url, refresh_endpoint)) + .form(¶ms) + .send() + .await + .map_err(to_anyhow)? + .error_for_status() + .map_err(to_anyhow)?; + + let token_data: serde_json::Value = response + .json() + .await + .map_err(|e| Error::InternalErr(format!("Failed to parse token response: {}", e)))?; + + let new_access_token = token_data["access_token"] + .as_str() + .ok_or_else(|| Error::InternalErr("No access_token in refresh response".to_string()))? + .to_string(); + + let new_refresh_token = token_data + .get("refresh_token") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .or_else(|| oauth_config.refresh_token.clone()); + + Ok(OAuthConfig { + base_url: oauth_config.base_url.clone(), + access_token: new_access_token, + refresh_token: new_refresh_token, + client_id: oauth_config.client_id.clone(), + client_secret: oauth_config.client_secret.clone(), + }) +} + +async fn update_workspace_integration_tokens_helper( + db: DB, + workspace_id: String, + service_name: ServiceName, + oauth_data: serde_json::Value, +) { + let result = async { + let mut tx = db.begin().await?; + let mc = build_crypt(&db, &workspace_id).await?; + let mut encrypted_oauth_data = oauth_data; + + if let Some(access_token) = encrypted_oauth_data + .get("access_token") + .and_then(|v| v.as_str()) + { + let encrypted_access_token = encrypt(&mc, access_token); + encrypted_oauth_data["access_token"] = + serde_json::Value::String(encrypted_access_token); + } + + if let Some(refresh_token) = encrypted_oauth_data + .get("refresh_token") + .and_then(|v| v.as_str()) + { + let encrypted_refresh_token = encrypt(&mc, refresh_token); + encrypted_oauth_data["refresh_token"] = + serde_json::Value::String(encrypted_refresh_token); + } + + sqlx::query!( + r#" + UPDATE workspace_integrations + SET oauth_data = $1, updated_at = now() + WHERE workspace_id = $2 AND service_name = $3 + "#, + encrypted_oauth_data, + workspace_id, + service_name as ServiceName, + ) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok::<(), Error>(()) + } + .await; + + if let Err(e) = result { + tracing::error!("Critical error: Failed to update workspace integration tokens for {} in workspace {}: {}", + service_name, workspace_id, e); + } +} + +pub async fn store_native_trigger<'c, E: sqlx::Executor<'c, Database = Postgres>, P>( + db: E, + authed: &ApiAuthed, + workspace_id: &str, + service_name: ServiceName, + native_trigger_data: &NativeTriggerData

, +) -> Result { + let row = sqlx::query!( + r#" + INSERT INTO native_triggers ( + service_name, + event_type, + external_id, + runnable_path, + runnable_kind, + workspace_id, + summary, + metadata, + edited_by, + email, + edited_at + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, now() + ) + RETURNING id + "#, + service_name as ServiceName, + serde_json::to_value(native_trigger_data.event_type.clone()).unwrap(), + &native_trigger_data.external_id, + &native_trigger_data.runnable_path, + native_trigger_data.runnable_kind as RunnableKind, + workspace_id, + native_trigger_data.summary.as_ref(), + Some(serde_json::Value::Null), + authed.username, + authed.email, + ) + .fetch_one(db) + .await?; + + Ok(row.id) +} + +pub async fn update_native_trigger<'c, E: sqlx::Executor<'c, Database = Postgres>, P>( + db: E, + authed: &ApiAuthed, + workspace_id: &str, + id: i64, + service_name: ServiceName, + native_trigger_data: &NativeTriggerData

, +) -> Result<()> { + sqlx::query!( + r#" + UPDATE + native_triggers + SET + runnable_path = $1, + runnable_kind = $2, + summary = $3, + metadata = $4, + edited_by = $5, + email = $6, + edited_at = now() + WHERE + workspace_id = $7 + AND id = $8 + AND service_name = $9 + "#, + native_trigger_data.runnable_path, + native_trigger_data.runnable_kind as RunnableKind, + native_trigger_data.summary, + Some(serde_json::Value::Null), + authed.username, + authed.email, + workspace_id, + id, + service_name as ServiceName + ) + .execute(db) + .await?; + + Ok(()) +} + +pub async fn delete_native_trigger<'c, E: sqlx::Executor<'c, Database = Postgres>>( + db: E, + workspace_id: &str, + id: i64, + service_name: ServiceName, +) -> Result { + let deleted = sqlx::query!( + r#" + DELETE + FROM native_triggers + WHERE + workspace_id = $1 + AND id = $2 + AND service_name = $3 + "#, + workspace_id, + id, + service_name as ServiceName + ) + .execute(db) + .await? + .rows_affected(); + + Ok(deleted > 0) +} +#[allow(unused)] +pub async fn get_native_trigger_by_external_id( + tx: &mut PgConnection, + workspace_id: &str, + service_name: ServiceName, + external_id: &str, +) -> Result> { + let trigger = sqlx::query_as!( + NativeTrigger, + r#" + SELECT + id, + event_type AS "event_type!: sqlx::types::Json", + runnable_path, + runnable_kind AS "runnable_kind!: RunnableKind", + service_name AS "service_name!: ServiceName", + external_id, + workspace_id, + summary, + metadata, + edited_by, + email, + edited_at + FROM + native_triggers + WHERE + workspace_id = $1 + AND service_name = $2 + AND external_id = $3 + "#, + workspace_id, + service_name as ServiceName, + external_id + ) + .fetch_optional(tx) + .await?; + + Ok(trigger) +} + +pub async fn list_native_triggers<'c, E: sqlx::Executor<'c, Database = Postgres>>( + db: E, + workspace_id: &str, + service_name: ServiceName, + page: Option, + per_page: Option, +) -> Result> { + let offset = (page.unwrap_or(0) * per_page.unwrap_or(100)) as i64; + let limit = per_page.unwrap_or(100) as i64; + + let triggers = sqlx::query_as!( + NativeTrigger, + r#" + SELECT + id, + runnable_path, + event_type AS "event_type!: sqlx::types::Json", + runnable_kind AS "runnable_kind!: RunnableKind", + service_name AS "service_name!: ServiceName", + external_id, + workspace_id, + summary, + metadata, + edited_by, + email, + edited_at + FROM + native_triggers + WHERE + workspace_id = $1 AND + service_name = $2 + ORDER BY + edited_at DESC + LIMIT + $3 + OFFSET + $4 + "#, + workspace_id, + service_name as ServiceName, + limit, + offset + ) + .fetch_all(db) + .await?; + + Ok(triggers) +} + +pub async fn store_workspace_integration( + tx: &mut PgConnection, + authed: &ApiAuthed, + workspace_id: &str, + service_name: ServiceName, + oauth_data: serde_json::Value, +) -> Result<()> { + sqlx::query!( + r#" + INSERT INTO workspace_integrations ( + workspace_id, + service_name, + oauth_data, + created_by, + created_at, + updated_at + ) VALUES ( + $1, $2, $3, $4, now(), now() + ) + ON CONFLICT (workspace_id, service_name) + DO UPDATE SET + oauth_data = $3, + updated_at = now() + "#, + workspace_id, + service_name as ServiceName, + oauth_data, + authed.username, + ) + .execute(&mut *tx) + .await?; + + Ok(()) +} + +pub async fn get_workspace_integration<'c, E: sqlx::Executor<'c, Database = Postgres>>( + db: E, + workspace_id: &str, + service_name: ServiceName, +) -> Result { + let integration = sqlx::query_as!( + WorkspaceIntegration, + r#" + SELECT + workspace_id, + service_name AS "service_name!: ServiceName", + oauth_data, + created_at, + updated_at, + created_by + FROM + workspace_integrations + WHERE + workspace_id = $1 + AND service_name = $2 + "#, + workspace_id, + service_name as ServiceName, + ) + .fetch_one(db) + .await?; + + Ok(integration) +} + +pub async fn delete_workspace_integration( + tx: &mut PgConnection, + workspace_id: &str, + service_name: ServiceName, +) -> Result { + let deleted = sqlx::query!( + r#" + DELETE FROM workspace_integrations + WHERE + workspace_id = $1 + AND service_name = $2 + "#, + workspace_id, + service_name as ServiceName, + ) + .execute(&mut *tx) + .await? + .rows_affected(); + + Ok(deleted > 0) +} + +pub fn generate_webhook_service_url( + base_url: &str, + w_id: &str, + runnable_path: &str, + runnable_kind: RunnableKind, + internal_id: &str, + service_name: ServiceName, + webhook_config: &WebhookConfig, +) -> String { + let endpoint_base = match webhook_config.request_type { + WebhookRequestType::Async => "run", + WebhookRequestType::Sync => "run_wait_result", + }; + + let runnable_prefix = match runnable_kind { + RunnableKind::Script => "p", + RunnableKind::Flow => "f", + }; + + let url = format!( + "{}/api/w/{}/jobs/{}/{}/{}?token={}&internal_id={}&service_name={}", + base_url, + w_id, + endpoint_base, + runnable_prefix, + runnable_path, + &webhook_config.token, + internal_id, + service_name.as_str() + ); + + url +} diff --git a/backend/windmill-api/src/native_triggers/nextcloud/external.rs b/backend/windmill-api/src/native_triggers/nextcloud/external.rs new file mode 100644 index 0000000000000..903d54fd96b5d --- /dev/null +++ b/backend/windmill-api/src/native_triggers/nextcloud/external.rs @@ -0,0 +1,328 @@ +use reqwest::Method; +use serde::{Deserialize, Serialize}; +use serde_json::value::RawValue; +use sqlx::PgConnection; +use std::collections::HashMap; +use windmill_common::{ + error::{Error, Result}, + triggers::TriggerKind, + BASE_URL, DB, +}; + +use crate::{ + native_triggers::{ + generate_webhook_service_url, + nextcloud::{ + routes, NextCloud, NextCloudOAuthData, NextCloudPayload, NextCloudTriggerData, + OcsResponse, + }, + EventType, External, NativeTriggerData, ServiceName, + }, + triggers::trigger_helpers::TriggerJobArgs, +}; + +#[allow(unused)] +#[derive(Debug, Serialize)] +#[serde(rename_all(serialize = "lowercase", deserialize = "lowercase"))] +enum AuthMethod { + Null, + None, + Header, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +struct FullNextCloudPayload { + pub http_method: String, + pub uri: String, + pub auth_method: AuthMethod, + #[serde(flatten)] + payload: NextCloudPayload, +} + +impl FullNextCloudPayload { + async fn new( + w_id: &str, + internal_id: &str, + data: &NativeTriggerData, + ) -> FullNextCloudPayload { + let EventType::Webhook(webhook_config) = &data.event_type; + let base_url = &*BASE_URL.read().await; + let uri = generate_webhook_service_url( + base_url, + w_id, + &data.runnable_path, + data.runnable_kind, + internal_id, + ServiceName::Nextcloud, + webhook_config, + ); + + FullNextCloudPayload { + http_method: http::Method::POST.to_string().to_uppercase(), + auth_method: AuthMethod::None, + uri, + payload: data.payload.clone(), + } + } +} + +#[derive(Debug, Deserialize)] +pub struct RegisterWebhookResponse { + pub id: i64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct User { + pub uid: String, + #[serde(rename = "displayName")] + pub display_name: Option, +} +#[derive(Debug, Serialize, Deserialize)] +pub struct WebhookPayload { + pub event: EventPayload, + pub user: User, + pub time: i64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct EventPayload { + pub node: Node, + #[serde(rename = "class")] + pub class_name: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Node { + pub id: i64, + pub path: String, +} + +impl TriggerJobArgs for NextCloud { + const TRIGGER_KIND: TriggerKind = TriggerKind::Nextcloud; + type Payload = Box; + fn v1_payload_fn(payload: &Self::Payload) -> HashMap> { + HashMap::from([("payload".to_owned(), payload.to_owned())]) + } +} + +#[async_trait::async_trait] +impl External for NextCloud { + type Payload = NextCloudPayload; + type TriggerData = NextCloudTriggerData; + type OAuthData = NextCloudOAuthData; + type CreateResponse = RegisterWebhookResponse; + const SERVICE_NAME: ServiceName = ServiceName::Nextcloud; + const DISPLAY_NAME: &'static str = "NextCloud"; + const SUPPORT_WEBHOOK: bool = true; + const TOKEN_ENDPOINT: &'static str = "/apps/oauth2/api/v1/token"; + const REFRESH_ENDPOINT: &'static str = "/apps/oauth2/api/v1/token"; + + async fn create( + &self, + w_id: &str, + internal_id: i64, + oauth_data: &Self::OAuthData, + data: &NativeTriggerData, + db: &DB, + tx: &mut PgConnection, + ) -> Result { + let full_nextcloud_payload = + FullNextCloudPayload::new(w_id, &internal_id.to_string(), data).await; + + let url = format!( + "{}/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks", + oauth_data.base_url + ); + + let mut headers = HashMap::new(); + headers.insert("OCS-APIRequest".to_string(), "true".to_string()); + + let ocs_response = self + .http_client_request::, _>( + &url, + Method::POST, + w_id, + tx, + db, + Some(headers), + Some(&full_nextcloud_payload), + ) + .await?; + + Ok(ocs_response.ocs.data) + } + + async fn update( + &self, + w_id: &str, + internal_id: i64, + oauth_data: &Self::OAuthData, + external_id: &str, + data: &NativeTriggerData, + db: &DB, + tx: &mut PgConnection, + ) -> Result<()> { + let full_nextcloud_payload = + FullNextCloudPayload::new(w_id, &internal_id.to_string(), data).await; + + let url = format!( + "{}/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{}", + oauth_data.base_url, external_id + ); + + let mut headers = HashMap::new(); + headers.insert("OCS-APIRequest".to_string(), "true".to_string()); + + let _ = self + .http_client_request::( + &url, + Method::POST, + w_id, + tx, + db, + Some(headers), + Some(&full_nextcloud_payload), + ) + .await?; + + Ok(()) + } + + async fn validate_data_config(&self, data: &NativeTriggerData) -> Result<()> { + let event_type = &data.event_type; + if !matches!(event_type, &EventType::Webhook(_)) { + return Err(Error::BadRequest( + "Nextcloud native trigger only support webhook event".to_string(), + )); + } + + return Ok(()); + } + + async fn get( + &self, + w_id: &str, + oauth_data: &Self::OAuthData, + external_id: &str, + db: &DB, + tx: &mut PgConnection, + ) -> Result { + let url = format!( + "{}/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{}", + oauth_data.base_url, external_id + ); + + let mut headers = HashMap::new(); + headers.insert("OCS-APIRequest".to_string(), "true".to_string()); + + let ocs_response: OcsResponse = self + .http_client_request::<_, ()>(&url, Method::GET, w_id, tx, db, Some(headers), None) + .await?; + + Ok(ocs_response.ocs.data) + } + + async fn delete( + &self, + w_id: &str, + oauth_data: &Self::OAuthData, + external_id: &str, + db: &DB, + tx: &mut PgConnection, + ) -> Result<()> { + let url = format!( + "{}/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{}", + oauth_data.base_url, external_id + ); + + let mut headers = HashMap::new(); + headers.insert("OCS-APIRequest".to_string(), "true".to_string()); + + let _: serde_json::Value = self + .http_client_request::<_, ()>(&url, Method::DELETE, w_id, tx, db, Some(headers), None) + .await + .or_else(|e| match &e { + Error::InternalErr(msg) if msg.contains("404") => Ok(serde_json::Value::Null), + _ => Err(e), + })?; + + Ok(()) + } + + async fn exists( + &self, + w_id: &str, + oauth_data: &Self::OAuthData, + external_id: &str, + db: &DB, + tx: &mut PgConnection, + ) -> Result { + let url = format!( + "{}/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks/{}", + oauth_data.base_url, external_id + ); + + let mut headers = HashMap::new(); + headers.insert("OCS-APIRequest".to_string(), "true".to_string()); + + let _ = self + .http_client_request::( + &url, + Method::GET, + w_id, + tx, + db, + Some(headers), + None, + ) + .await?; + + Ok(true) + } + + async fn list_all( + &self, + w_id: &str, + oauth_data: &Self::OAuthData, + db: &DB, + tx: &mut PgConnection, + ) -> Result> { + let url = format!( + "{}/ocs/v2.php/apps/webhook_listeners/api/v1/webhooks", + oauth_data.base_url + ); + + let mut headers = HashMap::new(); + headers.insert("OCS-APIRequest".to_string(), "true".to_string()); + + let ocs_response = self + .http_client_request::>, ()>( + &url, + Method::GET, + w_id, + tx, + db, + Some(headers), + None, + ) + .await?; + + Ok(ocs_response.ocs.data) + } + + fn external_id_and_metadata_from_response( + &self, + resp: &Self::CreateResponse, + ) -> (String, Option) { + (resp.id.to_string(), None) + } + + fn get_external_id_from_trigger_data(&self, data: &Self::TriggerData) -> String { + data.id.to_string() + } + + fn additional_routes(&self) -> axum::Router { + routes::nextcloud_routes(self.clone()) + } +} diff --git a/backend/windmill-api/src/native_triggers/nextcloud/mod.rs b/backend/windmill-api/src/native_triggers/nextcloud/mod.rs new file mode 100644 index 0000000000000..36f196a742aeb --- /dev/null +++ b/backend/windmill-api/src/native_triggers/nextcloud/mod.rs @@ -0,0 +1,75 @@ +use serde::{Deserialize, Serialize}; + +pub mod external; +mod routes; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct NextCloudOAuthData { + pub base_url: String, + pub access_token: String, + pub refresh_token: Option, + pub token_expires_at: Option>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct OcsResponse { + pub ocs: OcsData, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Meta { + status: String, + #[serde(rename = "statuscode")] + status_code: u16, + message: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct OcsData { + pub meta: Meta, + pub data: T, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NextCloudEventType { + pub name: String, + pub description: Option, + pub path: Option, + pub parameters: Option, +} + +#[derive(Copy, Clone)] +pub struct NextCloud; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NextCloudPayload { + pub event: String, + #[serde( + rename(serialize = "eventFilter"), + skip_serializing_if = "Option::is_none" + )] + pub event_filter: Option>, + #[serde( + skip_serializing_if = "Option::is_none", + rename(serialize = "userIdFilter") + )] + pub user_id_filter: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub headers: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NextCloudTriggerData { + pub id: i64, + pub uri: String, + pub event: String, + #[serde(rename(deserialize = "eventFilter"))] + pub event_filter: Option>, + #[serde(rename(deserialize = "userIdFilter"))] + pub user_id_filter: Option, + pub headers: Option>, + #[serde(rename(deserialize = "authMethod"))] + pub auth_method: String, + #[serde(rename(deserialize = "authData"))] + pub auth_data: Option>, +} diff --git a/backend/windmill-api/src/native_triggers/nextcloud/routes.rs b/backend/windmill-api/src/native_triggers/nextcloud/routes.rs new file mode 100644 index 0000000000000..40633dc248afb --- /dev/null +++ b/backend/windmill-api/src/native_triggers/nextcloud/routes.rs @@ -0,0 +1,66 @@ +use std::{collections::HashMap, sync::Arc}; + +use axum::{extract::Path, routing::get, Extension, Json, Router}; +use http::Method; +use windmill_common::{ + db::UserDB, + error::{Error, JsonResult}, + DB, +}; + +use crate::{ + db::ApiAuthed, + native_triggers::{ + get_workspace_integration, + nextcloud::{NextCloudEventType, OcsResponse}, + External, OAuthConfig, ServiceName, + }, +}; + +async fn list_available_events( + authed: ApiAuthed, + Extension(handler): Extension>, + Extension(db): Extension, + Extension(user_db): Extension, + Path(workspace_id): Path, +) -> JsonResult> { + let mut tx = user_db.clone().begin(&authed).await?; + let integration = + get_workspace_integration(&mut *tx, &workspace_id, ServiceName::Nextcloud).await?; + + let auth = serde_json::from_value::(integration.oauth_data) + .map_err(|e| Error::InternalErr(format!("Failed to parse NextCloud OAuth data: {}", e)))?; + + let url = format!( + "{}/ocs/v2.php/apps/integration_windmill/api/v1/list/events", + &auth.base_url, + ); + + let mut headers = HashMap::new(); + headers.insert("OCS-APIRequest".to_string(), "true".to_string()); + + let ocs_response = handler + .http_client_request::( + &url, + Method::GET, + &workspace_id, + &mut *tx, + &db, + Some(headers), + None, + ) + .await?; + tx.commit().await?; + + let events = serde_json::from_str(&ocs_response.ocs.data) + .map_err(|e| Error::InternalErr(format!("Failed to parse NextCloud events data: {}", e)))?; + + Ok(Json(events)) +} + +pub fn nextcloud_routes(service: T) -> Router { + let service = Arc::new(service); + Router::new() + .route("/events", get(list_available_events::)) + .layer(Extension(service)) +} diff --git a/backend/windmill-api/src/native_triggers/sync.rs b/backend/windmill-api/src/native_triggers/sync.rs new file mode 100644 index 0000000000000..c15ff680e269d --- /dev/null +++ b/backend/windmill-api/src/native_triggers/sync.rs @@ -0,0 +1,302 @@ +use std::collections::HashMap; +use windmill_audit::{audit_oss::audit_log, ActionKind}; +use windmill_common::{db::UserDB, error::Result, DB}; + +use serde::Serialize; + +use crate::{ + native_triggers::{ + decrypt_oauth_data, delete_native_trigger, list_native_triggers, External, ServiceName, + }, + users::fetch_api_authed, +}; + +#[derive(Debug, Serialize)] +pub struct DeletedTriggerInfo { + pub internal_id: i64, + pub external_id: String, + pub runnable_path: String, + pub reason: String, +} + +#[derive(Debug, Serialize)] +pub struct SyncError { + pub resource_path: String, + pub error_message: String, + pub error_type: String, +} + +#[derive(Debug)] +pub struct BackgroundSyncResult { + pub workspaces_processed: usize, + pub total_deleted: usize, + pub total_errors: usize, + pub service_results: HashMap, +} + +#[derive(Debug)] +pub struct ServiceSyncResult { + pub deleted_triggers: Vec, + pub errors: Vec, +} + +pub const SYNC_INTERVAL: u64 = 10 * 60; + +pub async fn sync_all_triggers(db: &DB) -> Result { + tracing::debug!("Starting native triggers sync"); + + let workspaces = sqlx::query_scalar!( + r#" + SELECT id + FROM workspace + WHERE deleted = false + "# + ) + .fetch_all(db) + .await?; + + let mut service_results: HashMap = HashMap::new(); + let mut total_deleted = 0; + let mut total_errors = 0; + + use crate::native_triggers::nextcloud::NextCloud; + + match sync_service_triggers::(db, &workspaces, NextCloud).await { + Ok(result) => { + total_deleted += result.deleted_triggers.len(); + total_errors += result.errors.len(); + service_results.insert(ServiceName::Nextcloud, result); + } + Err(e) => { + tracing::error!("Error syncing NextCloud triggers: {:#}", e); + service_results.insert( + ServiceName::Nextcloud, + ServiceSyncResult { + deleted_triggers: Vec::new(), + errors: vec![SyncError { + resource_path: "background_sync".to_string(), + error_message: format!("Failed to sync NextCloud triggers: {}", e), + error_type: "service_sync_error".to_string(), + }], + }, + ); + total_errors += 1; + } + } + + let result = BackgroundSyncResult { + workspaces_processed: workspaces.len(), + total_deleted, + total_errors, + service_results, + }; + + tracing::debug!( + "Completed native triggers sync: {} workspaces, {} deleted, {} errors", + result.workspaces_processed, + result.total_deleted, + result.total_errors + ); + + Ok(result) +} +async fn sync_service_triggers( + db: &DB, + workspaces: &[String], + handler: T, +) -> Result { + let mut all_deleted_triggers = Vec::new(); + let mut all_errors = Vec::new(); + + for workspace_id in workspaces { + tracing::debug!( + "Syncing {} triggers for workspace {}", + T::SERVICE_NAME.as_str(), + workspace_id + ); + let sync_result = sync_workspace_triggers::(db, workspace_id, &handler).await; + + match sync_result { + Ok((deleted_triggers, errors)) => { + all_deleted_triggers.extend(deleted_triggers); + all_errors.extend(errors); + } + Err(e) => { + tracing::error!( + "Error syncing {} triggers for workspace {}: {:#}", + T::SERVICE_NAME.as_str(), + workspace_id, + e + ); + all_errors.push(SyncError { + resource_path: format!("workspace:{}", workspace_id), + error_message: format!("Failed to sync workspace: {}", e), + error_type: "workspace_sync_error".to_string(), + }); + } + } + } + + Ok(ServiceSyncResult { deleted_triggers: all_deleted_triggers, errors: all_errors }) +} + +#[cfg(feature = "native_triggers")] +pub async fn sync_workspace_triggers( + db: &DB, + workspace_id: &str, + handler: &T, +) -> Result<(Vec, Vec)> { + tracing::debug!( + "Syncing {} triggers for workspace '{}'", + T::SERVICE_NAME.as_str(), + workspace_id + ); + + let windmill_triggers = + list_native_triggers(db, workspace_id, T::SERVICE_NAME, None, None).await?; + + if windmill_triggers.is_empty() { + tracing::debug!( + "No {} triggers found for workspace '{}'", + T::SERVICE_NAME.as_str(), + workspace_id + ); + return Ok((Vec::new(), Vec::new())); + } + + let mut all_deleted_triggers = Vec::new(); + let mut all_sync_errors = Vec::new(); + + let oauth_data = { + match decrypt_oauth_data(db, db, workspace_id, T::SERVICE_NAME).await { + Ok(oauth_data) => oauth_data, + Err(e) => { + all_sync_errors.push(SyncError { + resource_path: format!("workspace:{}", workspace_id), + error_message: format!("Failed to get workspace integration OAuth data: {}", e), + error_type: "oauth_error".to_string(), + }); + return Ok((Vec::new(), all_sync_errors)); + } + } + }; + + let mut tx = db.begin().await?; + let external_triggers = match handler + .list_all(workspace_id, &oauth_data, db, &mut tx) + .await + { + Ok(triggers) => triggers, + Err(e) => { + all_sync_errors.push(SyncError { + resource_path: format!("workspace:{}", workspace_id), + error_message: format!("Failed to fetch external triggers: {}", e), + error_type: "external_service_error".to_string(), + }); + return Ok((Vec::new(), all_sync_errors)); + } + }; + + let mut external_trigger_ids: HashMap = HashMap::new(); + for external_trigger in &external_triggers { + let external_id = handler.get_external_id_from_trigger_data(external_trigger); + external_trigger_ids.insert(external_id, true); + } + + for trigger in &windmill_triggers { + if !external_trigger_ids.contains_key(&trigger.external_id) { + tracing::info!( + "Trigger '{}' (external_id: '{}') no longer exists in external service, deleting", + trigger.runnable_path, + trigger.external_id + ); + + let authed = match fetch_api_authed( + trigger.edited_by.clone(), + trigger.email.clone(), + workspace_id, + db, + Some("background-sync".to_string()), + ) + .await + { + Ok(authed) => authed, + Err(e) => { + all_sync_errors.push(SyncError { + resource_path: format!("workspace:{}", workspace_id), + error_message: format!( + "Failed to get authentication for trigger {}: {}", + trigger.id, e + ), + error_type: "authentication_error".to_string(), + }); + continue; + } + }; + + let user_db = UserDB::new(db.clone()); + let mut tx = user_db.begin(&authed).await?; + + match delete_native_trigger(&mut *tx, workspace_id, trigger.id, T::SERVICE_NAME).await { + Ok(true) => { + if let Err(audit_err) = audit_log( + &mut *tx, + &authed, + &format!("native_triggers.{}.background_sync_auto_delete", T::SERVICE_NAME.as_str()), + ActionKind::Delete, + workspace_id, + Some(&format!( + "Auto-deleted trigger '{}' (external_id: '{}') during background sync because it no longer exists in external service", + trigger.runnable_path, + trigger.external_id + )), + None, + ).await { + tracing::warn!( + "Failed to log audit for auto-deleted trigger {}: {}", + trigger.id, + audit_err + ); + } + + tx.commit().await?; + + all_deleted_triggers.push(DeletedTriggerInfo { + internal_id: trigger.id, + external_id: trigger.external_id.clone(), + runnable_path: trigger.runnable_path.clone(), + reason: "Trigger no longer exists in external service (background sync)" + .to_string(), + }); + } + Ok(false) => { + tracing::warn!( + "Trigger {} (external_id: '{}') was not found in database during deletion", + trigger.id, + trigger.external_id + ); + } + Err(e) => { + all_sync_errors.push(SyncError { + resource_path: format!("workspace:{}", workspace_id), + error_message: format!( + "Failed to delete trigger '{}' (external_id: '{}'): {}", + trigger.runnable_path, trigger.external_id, e + ), + error_type: "database_deletion_error".to_string(), + }); + } + } + } + } + + tracing::debug!( + "Sync completed for {} in workspace '{}'. Deleted: {}, Errors: {}", + T::SERVICE_NAME.as_str(), + workspace_id, + all_deleted_triggers.len(), + all_sync_errors.len() + ); + + Ok((all_deleted_triggers, all_sync_errors)) +} diff --git a/backend/windmill-api/src/native_triggers/workspace_integrations.rs b/backend/windmill-api/src/native_triggers/workspace_integrations.rs new file mode 100644 index 0000000000000..c170bb81340bd --- /dev/null +++ b/backend/windmill-api/src/native_triggers/workspace_integrations.rs @@ -0,0 +1,425 @@ +use axum::{ + extract::Path, + routing::{delete, get, post}, + Extension, Json, Router, +}; + +#[cfg(feature = "native_triggers")] +use serde_json::to_value; +use sqlx::prelude::FromRow; +use strum::IntoEnumIterator; + +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +#[cfg(feature = "native_triggers")] +use windmill_audit::{audit_oss::audit_log, ActionKind}; +use windmill_common::{ + db::UserDB, + error::{to_anyhow, Error, JsonResult, Result}, + utils::require_admin, + variables::{build_crypt, encrypt}, + DB, +}; + +#[cfg(feature = "native_triggers")] +use crate::{ + db::ApiAuthed, + native_triggers::{delete_workspace_integration, store_workspace_integration, ServiceName}, +}; + +use std::{collections::HashSet, sync::Mutex}; + +lazy_static::lazy_static! { + static ref STATE_STORE: Mutex> = Mutex::new(HashSet::new()); +} + +#[derive(Debug, Serialize)] +pub struct IntegrationStatusResponse { + pub connected: bool, + pub service_name: ServiceName, + pub created_at: Option>, + pub created_by: Option, +} + +#[derive(Debug, Serialize)] +pub struct ListIntegrationsResponse { + pub integrations: Vec, +} + +#[derive(Debug, Serialize)] +pub struct ConnectIntegrationResponse { + pub auth_url: String, +} + +#[derive(FromRow, Debug, Clone, Serialize, Deserialize)] +pub struct WorkspaceOAuthConfig { + pub client_id: String, + pub client_secret: String, + pub base_url: String, + pub access_token: Option +} + +#[derive(Debug, Serialize)] +pub struct OAuthConfigResponse { + pub configured: bool, + pub base_url: Option, + pub redirect_uri: Option, +} + +#[cfg(feature = "native_triggers")] +async fn generate_connect_url( + authed: ApiAuthed, + Extension(db): Extension, + Path((workspace_id, service_name)): Path<(String, ServiceName)>, + Json(RedirectUri { redirect_uri }): Json, +) -> JsonResult { + require_admin(authed.is_admin, &workspace_id)?; + + let oauth_config = + get_workspace_oauth_config_as_oauth_config(&db, &workspace_id, service_name).await?; + let random_state = uuid::Uuid::new_v4().to_string(); + let auth_url = build_authorization_url(&oauth_config, &random_state, &redirect_uri)?; + Ok(Json(auth_url)) +} + +#[cfg(feature = "native_triggers")] +async fn delete_integration( + authed: ApiAuthed, + Extension(user_db): Extension, + Path((workspace_id, service_name)): Path<(String, ServiceName)>, +) -> JsonResult { + require_admin(authed.is_admin, &workspace_id)?; + + let mut tx = user_db.begin(&authed).await?; + + let deleted = delete_workspace_integration(&mut *tx, &workspace_id, service_name).await?; + + if !deleted { + return Err(Error::NotFound(format!( + "{} integration not found for workspace", + service_name + ))); + } + + audit_log( + &mut *tx, + &authed, + &format!("workspace_integrations.{}.disconnect", service_name), + ActionKind::Delete, + &workspace_id, + Some(&format!("Disconnected {} integration", service_name)), + None, + ) + .await?; + + tx.commit().await?; + + Ok(Json(format!( + "{} integration disconnected successfully", + service_name + ))) +} + +#[derive(FromRow, Debug, Deserialize, Serialize)] +struct WorkspaceIntegrations { + service_name: ServiceName, + oauth_data: Option>, +} + +#[cfg(feature = "native_triggers")] +async fn list_integrations( + authed: ApiAuthed, + Extension(db): Extension, + Extension(_user_db): Extension, + Path(workspace_id): Path, +) -> JsonResult> { + require_admin(authed.is_admin, &workspace_id)?; + let mut tx = db.begin().await?; + let integrations = sqlx::query_as!( + WorkspaceIntegrations, + r#" + SELECT + oauth_data as "oauth_data!: sqlx::types::Json", + service_name as "service_name!: ServiceName" + FROM + workspace_integrations + WHERE + workspace_id = $1 + "#, + workspace_id + ) + .fetch_all(&mut *tx) + .await?; + + let key_value = integrations + .into_iter() + .map(|integration| (integration.service_name, integration.oauth_data)) + .collect::>(); + + let integrations = ServiceName::iter() + .map(|service_name| WorkspaceIntegrations { + service_name: service_name, + oauth_data: key_value.get(&service_name).cloned().flatten(), + }) + .collect::>(); + + tx.commit().await?; + + Ok(Json(integrations)) +} + +async fn integration_exist( + authed: ApiAuthed, + Extension(user_db): Extension, + Path((workspace_id, service_name)): Path<(String, ServiceName)>, +) -> JsonResult { + let mut tx = user_db.begin(&authed).await?; + let exists = sqlx::query_scalar!( + r#" + SELECT EXISTS ( + SELECT 1 + FROM workspace_integrations + WHERE workspace_id = $1 + AND service_name = $2 + AND oauth_data IS NOT NULL + ) + "#, + workspace_id, + service_name as ServiceName + ) + .fetch_one(&mut *tx) + .await? + .unwrap_or(false); + + Ok(Json(exists)) +} + +#[derive(Debug, Deserialize)] +struct RedirectUri { + redirect_uri: String, +} + +#[cfg(feature = "native_triggers")] +async fn oauth_callback( + authed: ApiAuthed, + Extension(db): Extension, + Extension(user_db): Extension, + Path((workspace_id, service_name, code, state)): Path<(String, ServiceName, String, String)>, + Json(RedirectUri { redirect_uri }): Json, +) -> JsonResult { + require_admin(authed.is_admin, &workspace_id)?; + + let state_was_present = STATE_STORE.lock().unwrap().remove(&state); + + if !state_was_present { + return Err(Error::BadRequest("Unknown state parameter".to_string())); + } + + let oauth_config = + get_workspace_oauth_config::(&db, &workspace_id, service_name) + .await?; + + let token_response = exchange_code_for_token(&oauth_config, &code, &redirect_uri).await?; + + let mut tx = user_db.begin(&authed).await?; + + let mc = build_crypt(&db, &workspace_id).await?; + let mut oauth_data = serde_json::to_value(oauth_config).unwrap(); + + let encrypted_access_token = encrypt(&mc, &token_response.access_token); + oauth_data["access_token"] = serde_json::Value::String(encrypted_access_token); + + if let Some(refresh_token) = token_response.refresh_token { + let encrypted_refresh_token = encrypt(&mc, &refresh_token); + oauth_data["refresh_token"] = serde_json::Value::String(encrypted_refresh_token); + } + if let Some(expires_at) = token_response.expires_at { + oauth_data["token_expires_at"] = serde_json::Value::String(expires_at.to_rfc3339()); + } + + store_workspace_integration(&mut *tx, &authed, &workspace_id, service_name, oauth_data).await?; + + audit_log( + &mut *tx, + &authed, + &format!("workspace_integrations.{}.connect", service_name), + ActionKind::Create, + &workspace_id, + Some(&format!("Connected {} integration via OAuth", service_name)), + None, + ) + .await?; + + tx.commit().await?; + + Ok(Json(format!( + "{} integration connected successfully via OAuth", + service_name + ))) +} + +#[allow(unused)] +#[derive(Debug, Deserialize)] +struct TokenResponse { + access_token: String, + refresh_token: Option, + expires_in: Option, + expires_at: Option>, +} + +async fn exchange_code_for_token( + config: &WorkspaceOAuthConfig, + code: &str, + redirect_uri: &str, +) -> Result { + let client = reqwest::Client::new(); + + let params = [ + ("grant_type", "authorization_code"), + ("client_id", &config.client_id), + ("client_secret", &config.client_secret), + ("code", code), + ("redirect_uri", redirect_uri), + ]; + + let response = client + .post(format!("{}/apps/oauth2/api/v1/token", config.base_url)) + .form(¶ms) + .send() + .await + .map_err(to_anyhow)? + .error_for_status() + .map_err(to_anyhow)?; + + let token_data: serde_json::Value = response + .json() + .await + .map_err(|e| Error::InternalErr(format!("Failed to parse token response: {}", e)))?; + + let expires_at = if let Some(expires_in) = token_data.get("expires_in").and_then(|v| v.as_u64()) + { + Some(chrono::Utc::now() + chrono::Duration::seconds(expires_in as i64)) + } else { + None + }; + + Ok(TokenResponse { + access_token: token_data["access_token"] + .as_str() + .ok_or_else(|| Error::BadRequest("Missing access_token in response".to_string()))? + .to_string(), + refresh_token: token_data["refresh_token"].as_str().map(|s| s.to_string()), + expires_in: token_data["expires_in"].as_u64(), + expires_at, + }) +} + +async fn get_workspace_oauth_config( + db: &DB, + workspace_id: &str, + service_name: ServiceName, +) -> Result { + let oauth_configs = sqlx::query_scalar!( + r#" + SELECT + oauth_data + FROM + workspace_integrations + WHERE + workspace_id = $1 AND + service_name = $2 + "#, + workspace_id, + service_name as ServiceName + ) + .fetch_optional(db) + .await? + .ok_or(Error::NotFound(format!( + "Integration for service {} not found", + service_name.as_str() + )))?; + + let config = serde_json::from_value::(oauth_configs) + .map_err(|e| Error::InternalErr(format!("Failed to parse OAuth config: {}", e)))?; + + Ok(config) +} + +#[cfg(feature = "native_triggers")] +pub async fn create_workspace_integration( + authed: ApiAuthed, + Extension(user_db): Extension, + Path((workspace_id, service_name)): Path<(String, ServiceName)>, + Json(oauth_data): Json, +) -> Result<()> { + require_admin(authed.is_admin, &workspace_id)?; + + let mut tx = user_db.begin(&authed).await?; + + store_workspace_integration( + &mut tx, + &authed, + &workspace_id, + service_name, + to_value(oauth_data).unwrap(), + ) + .await?; + + tx.commit().await?; + + Ok(()) +} + +#[inline] +async fn get_workspace_oauth_config_as_oauth_config( + db: &DB, + workspace_id: &str, + service_name: ServiceName, +) -> Result { + get_workspace_oauth_config::(db, workspace_id, service_name).await +} + +fn build_authorization_url( + config: &WorkspaceOAuthConfig, + state: &str, + redirect_uri: &str, +) -> Result { + let params = [ + ("response_type", "code"), + ("client_id", &config.client_id), + ("redirect_uri", redirect_uri), + ("state", state), + ("scope", "read write"), + ]; + + let query_string = params + .iter() + .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v))) + .collect::>() + .join("&"); + + { + let mut store = STATE_STORE.lock().unwrap(); + + store.insert(state.to_owned()); + } + + Ok(format!( + "{}/apps/oauth2/authorize?{}", + config.base_url, query_string + )) +} + +pub fn workspaced_service() -> Router { + let router = Router::new() + .route("/list", get(list_integrations)) + .route("/:service_name/exists", get(integration_exist)) + .route("/:service_name/create", post(create_workspace_integration)) + .route( + "/:service_name/generate_connect_url", + post(generate_connect_url), + ) + .route("/:service_name/delete", delete(delete_integration)) + .route("/:service_name/callback/:code/:state", post(oauth_callback)); + + Router::new().nest("/integrations", router) +} diff --git a/backend/windmill-api/src/scopes.rs b/backend/windmill-api/src/scopes.rs index 7984e4b62787d..1c1bdd4f26b43 100644 --- a/backend/windmill-api/src/scopes.rs +++ b/backend/windmill-api/src/scopes.rs @@ -257,6 +257,9 @@ pub enum ScopeDomain { PostgresTriggers, EmailTriggers, + // Native trigger domains + NativeTriggers, + // System domains Audit, Settings, @@ -310,6 +313,7 @@ impl ScopeDomain { Self::GcpTriggers => "gcp_triggers", Self::PostgresTriggers => "postgres_triggers", Self::EmailTriggers => "email_triggers", + Self::NativeTriggers => "native_triggers", Self::Audit => "audit", Self::Settings => "settings", Self::Workers => "workers", @@ -366,6 +370,7 @@ impl ScopeDomain { "ai" => Some(Self::AI), "indexer" | "srch" => Some(Self::Indexer), "teams" => Some(Self::Teams), + "native_triggers" => Some(Self::NativeTriggers), "git_sync" | "github_app" => Some(Self::GitSync), "capture" => Some(Self::Capture), "drafts" => Some(Self::Drafts), diff --git a/backend/windmill-api/src/token.rs b/backend/windmill-api/src/token.rs index f38ce642e6ca3..25978eb873ceb 100644 --- a/backend/windmill-api/src/token.rs +++ b/backend/windmill-api/src/token.rs @@ -124,6 +124,12 @@ fn build_standard_scope_domains() -> Vec { "Git synchronization management", false, ), + ( + "native_triggers", + "Native Triggers", + "Native triggers management", + true, + ), ]; STANDARD_DOMAINS diff --git a/backend/windmill-api/src/users.rs b/backend/windmill-api/src/users.rs index 67a946f16bbc0..6eb6632b0021d 100644 --- a/backend/windmill-api/src/users.rs +++ b/backend/windmill-api/src/users.rs @@ -9,7 +9,7 @@ #![allow(non_snake_case)] use quick_cache::sync::Cache; -use sqlx::{Postgres, Transaction}; +use sqlx::{PgConnection, Postgres, Transaction}; use std::sync::atomic::AtomicBool; use std::sync::Arc; @@ -381,6 +381,19 @@ pub struct NewToken { pub workspace_id: Option, } +#[cfg(feature = "native_triggers")] +impl NewToken { + pub fn new( + label: Option, + expiration: Option>, + impersonate_email: Option, + scopes: Option>, + workspace_id: Option, + ) -> NewToken { + NewToken { label, expiration, impersonate_email, scopes, workspace_id } + } +} + #[derive(Deserialize)] pub struct Login { pub email: String, @@ -2064,13 +2077,13 @@ pub async fn create_session_token<'c>( Ok(token) } -async fn create_token( - Extension(db): Extension, - authed: ApiAuthed, - Json(new_token): Json, -) -> Result<(StatusCode, String)> { +pub async fn create_token_internal( + tx: &mut PgConnection, + db: &DB, + authed: &ApiAuthed, + token_config: NewToken, +) -> Result { let token = rd_string(32); - let mut tx = db.begin().await?; let is_super_admin = sqlx::query_scalar!( "SELECT super_admin FROM password WHERE email = $1", @@ -2082,7 +2095,7 @@ async fn create_token( if *CLOUD_HOSTED { let nb_tokens = sqlx::query_scalar!("SELECT COUNT(*) FROM token WHERE email = $1", &authed.email) - .fetch_one(&db) + .fetch_one(db) .await?; if nb_tokens.unwrap_or(0) >= 10000 { return Err(Error::BadRequest( @@ -2097,18 +2110,18 @@ async fn create_token( VALUES ($1, $2, $3, $4, $5, $6, $7)", token, authed.email, - new_token.label, - new_token.expiration, + token_config.label, + token_config.expiration, is_super_admin, - new_token.scopes.as_ref().map(|x| x.as_slice()), - new_token.workspace_id, + token_config.scopes.as_ref().map(|x| x.as_slice()), + token_config.workspace_id, ) .execute(&mut *tx) .await?; audit_log( &mut *tx, - &authed, + authed, "users.token.create", ActionKind::Create, &"global", @@ -2117,6 +2130,19 @@ async fn create_token( ) .instrument(tracing::info_span!("token", email = &authed.email)) .await?; + + Ok(token) +} + +async fn create_token( + Extension(db): Extension, + authed: ApiAuthed, + Json(token_config): Json, +) -> Result<(StatusCode, String)> { + let mut tx = db.begin().await?; + + let token = create_token_internal(&mut *tx, &db, &authed, token_config).await?; + tx.commit().await?; Ok((StatusCode::CREATED, token)) } diff --git a/backend/windmill-api/src/workspaces.rs b/backend/windmill-api/src/workspaces.rs index ff399ee7c6d79..1707ae5cf7ac2 100644 --- a/backend/windmill-api/src/workspaces.rs +++ b/backend/windmill-api/src/workspaces.rs @@ -2071,6 +2071,7 @@ struct UsedTriggers { pub sqs_used: bool, pub gcp_used: bool, pub email_used: bool, + pub nextcloud_used: bool, } async fn get_used_triggers( @@ -2079,7 +2080,7 @@ async fn get_used_triggers( Path(w_id): Path, ) -> JsonResult { let mut tx = user_db.begin(&authed).await?; - let websocket_used = sqlx::query_as!( + let triggers_used = sqlx::query_as!( UsedTriggers, r#" SELECT @@ -2091,7 +2092,8 @@ async fn get_used_triggers( EXISTS(SELECT 1 FROM mqtt_trigger WHERE workspace_id = $1) AS "mqtt_used!", EXISTS(SELECT 1 FROM sqs_trigger WHERE workspace_id = $1) AS "sqs_used!", EXISTS(SELECT 1 FROM gcp_trigger WHERE workspace_id = $1) AS "gcp_used!", - EXISTS(SELECT 1 FROM email_trigger WHERE workspace_id = $1) AS "email_used!" + EXISTS(SELECT 1 FROM email_trigger WHERE workspace_id = $1) AS "email_used!", + EXISTS(SELECT 1 FROM native_triggers WHERE workspace_id = $1 AND service_name = 'nextcloud'::native_trigger_service) AS "nextcloud_used!" "#, w_id ) @@ -2099,7 +2101,7 @@ async fn get_used_triggers( .await?; tx.commit().await?; - Ok(Json(websocket_used)) + Ok(Json(triggers_used)) } async fn list_workspaces_as_super_admin( diff --git a/backend/windmill-common/src/triggers.rs b/backend/windmill-common/src/triggers.rs index 43c4ae50fc920..eaffc6862f40b 100644 --- a/backend/windmill-common/src/triggers.rs +++ b/backend/windmill-common/src/triggers.rs @@ -19,6 +19,7 @@ pub enum TriggerKind { Sqs, Postgres, Gcp, + Nextcloud, } impl TriggerKind { @@ -35,6 +36,7 @@ impl TriggerKind { TriggerKind::Sqs => "sqs".to_string(), TriggerKind::Postgres => "postgres".to_string(), TriggerKind::Gcp => "gcp".to_string(), + TriggerKind::Nextcloud => "nextcloud".to_string(), } } } @@ -53,6 +55,7 @@ impl fmt::Display for TriggerKind { TriggerKind::Sqs => "sqs", TriggerKind::Postgres => "postgres", TriggerKind::Gcp => "gcp", + TriggerKind::Nextcloud => "nextcloud", }; write!(f, "{}", s) } diff --git a/backend/windmill-common/src/utils.rs b/backend/windmill-common/src/utils.rs index 6dc1d76768a33..7f2be99cf202f 100644 --- a/backend/windmill-common/src/utils.rs +++ b/backend/windmill-common/src/utils.rs @@ -882,7 +882,8 @@ impl Future for WarnAfterFuture { } } -#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(sqlx::Type, Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash)] +#[sqlx(type_name = "runnable_kind", rename_all = "lowercase")] #[serde(rename_all = "lowercase")] pub enum RunnableKind { Script, diff --git a/frontend/src/lib/components/NextcloudSetting.svelte b/frontend/src/lib/components/NextcloudSetting.svelte index 10df756941104..7ab8b843ce331 100644 --- a/frontend/src/lib/components/NextcloudSetting.svelte +++ b/frontend/src/lib/components/NextcloudSetting.svelte @@ -82,7 +82,7 @@ >Client Secret from your Nextcloud OAuth2 app configuration - +

diff --git a/frontend/src/lib/components/graph/renderers/triggers/TriggersBadge.svelte b/frontend/src/lib/components/graph/renderers/triggers/TriggersBadge.svelte index 048f39d9c4034..5ca63d6673c47 100644 --- a/frontend/src/lib/components/graph/renderers/triggers/TriggersBadge.svelte +++ b/frontend/src/lib/components/graph/renderers/triggers/TriggersBadge.svelte @@ -4,12 +4,14 @@ import { Route } from 'lucide-svelte' import { getContext } from 'svelte' import { type TriggerContext } from '$lib/components/triggers' - import { enterpriseLicense } from '$lib/stores' + import { enterpriseLicense, workspaceStore } from '$lib/stores' import { MqttIcon, NatsIcon, KafkaIcon, AwsIcon, GoogleCloudIcon } from '$lib/components/icons' import { type Trigger, type TriggerType } from '$lib/components/triggers/utils' import { Menu, Menubar, MeltButton, MenuItem, Tooltip } from '$lib/components/meltComponents' import { twMerge } from 'tailwind-merge' import SchedulePollIcon from '$lib/components/icons/SchedulePollIcon.svelte' + import { getAvailableNativeTriggerServices, getServiceConfig, getServiceIcon } from '$lib/components/triggers/native/utils' + import type { NativeServiceName } from '$lib/gen/types.gen' import TriggerLabel from '$lib/components/triggers/TriggerLabel.svelte' import CountBadge from '$lib/components/common/badge/CountBadge.svelte' @@ -42,31 +44,67 @@ }: Props = $props() let menuOpen = $state(false) + let availableNativeServices = $state>([]) - const triggerTypeConfig: { - [key in TriggerType]: { - icon: ComponentType | Component - countKey?: string - disabled?: boolean + // Load available native trigger services + async function loadAvailableNativeTriggers() { + try { + const services = await getAvailableNativeTriggerServices($workspaceStore!) + const serviceData = await Promise.all( + services.map(async (service) => ({ + service, + icon: await getServiceIcon(service), + config: getServiceConfig(service) + })) + ) + availableNativeServices = serviceData + } catch (err) { + console.error('Failed to load available native trigger services:', err) + availableNativeServices = [] } - } = { - webhook: { icon: Webhook, countKey: 'webhook_count' }, - schedule: { icon: Calendar, countKey: 'schedule_count' }, - http: { icon: Route, countKey: 'http_routes_count' }, - websocket: { icon: Unplug, countKey: 'websocket_count' }, - postgres: { icon: Database, countKey: 'postgres_count' }, - kafka: { icon: KafkaIcon, countKey: 'kafka_count', disabled: !$enterpriseLicense }, - default_email: { icon: Mail, countKey: 'default_email_count' }, - email: { icon: Mail, countKey: 'email_count' }, - nats: { icon: NatsIcon, countKey: 'nats_count', disabled: !$enterpriseLicense }, - mqtt: { icon: MqttIcon, countKey: 'mqtt_count', disabled: !$enterpriseLicense }, - sqs: { icon: AwsIcon, countKey: 'sqs_count', disabled: !$enterpriseLicense }, - gcp: { icon: GoogleCloudIcon, countKey: 'gcp_count', disabled: !$enterpriseLicense }, - poll: { icon: SchedulePollIcon }, - cli: { icon: Terminal } } - const allTypes = [ + loadAvailableNativeTriggers() + + let triggerTypeConfig = $derived(() => { + const baseConfig: { + [key in TriggerType]?: { + icon: ComponentType | Component + countKey?: string + disabled?: boolean + } + } = { + webhook: { icon: Webhook, countKey: 'webhook_count' }, + schedule: { icon: Calendar, countKey: 'schedule_count' }, + http: { icon: Route, countKey: 'http_routes_count' }, + websocket: { icon: Unplug, countKey: 'websocket_count' }, + postgres: { icon: Database, countKey: 'postgres_count' }, + kafka: { icon: KafkaIcon, countKey: 'kafka_count', disabled: !$enterpriseLicense }, + default_email: { icon: Mail, countKey: 'default_email_count' }, + email: { icon: Mail, countKey: 'email_count' }, + nats: { icon: NatsIcon, countKey: 'nats_count', disabled: !$enterpriseLicense }, + mqtt: { icon: MqttIcon, countKey: 'mqtt_count', disabled: !$enterpriseLicense }, + sqs: { icon: AwsIcon, countKey: 'sqs_count', disabled: !$enterpriseLicense }, + gcp: { icon: GoogleCloudIcon, countKey: 'gcp_count', disabled: !$enterpriseLicense }, + poll: { icon: SchedulePollIcon }, + cli: { icon: Terminal } + } + + // Add native trigger services that are available + for (const { service, icon } of availableNativeServices) { + baseConfig[service as TriggerType] = { icon } + } + + return baseConfig as { + [key in TriggerType]: { + icon: ComponentType | Component + countKey?: string + disabled?: boolean + } + } + }) + + let allTypes = $derived([ 'webhook', 'schedule', 'http', @@ -80,8 +118,9 @@ 'gcp', 'email', 'poll', - 'cli' - ] + 'cli', + ...availableNativeServices.map(({ service }) => service) + ]) function camelCaseToWords(s: string) { const result = s.replace(/([A-Z])/g, ' $1') @@ -124,10 +163,10 @@ return types.filter( (type) => (!showOnlyTriggersWithCount || - ((triggerTypeConfig[type].countKey && - ($triggersCount?.[triggerTypeConfig[type].countKey] ?? 0)) || + ((triggerTypeConfig()[type]?.countKey && + ($triggersCount?.[triggerTypeConfig()[type].countKey] ?? 0)) || 0) > 0) && - !triggerTypeConfig[type].disabled + !triggerTypeConfig()[type]?.disabled ) }) let triggersToDisplay = $derived(limit ? allTriggerTypes.slice(0, limit) : allTriggerTypes) @@ -240,7 +279,7 @@ {#snippet triggerButton({ type, isSelected, meltElement = undefined, singleItem = false })} - {@const { icon: SvelteComponent, countKey } = triggerTypeConfig[type]} + {@const { icon: SvelteComponent, countKey } = triggerTypeConfig()[type] || { icon: Database, countKey: undefined }}
diff --git a/frontend/src/lib/components/icons/NextcloudIcon.svelte b/frontend/src/lib/components/icons/NextcloudIcon.svelte index ff5e3387233ed..04b61d2aad82a 100644 --- a/frontend/src/lib/components/icons/NextcloudIcon.svelte +++ b/frontend/src/lib/components/icons/NextcloudIcon.svelte @@ -1,13 +1,21 @@ -Nextcloud icon
- {#if newToken} + {#if newToken && displayCreateToken} {/if} - {#if newMcpToken} + {#if newMcpToken && displayCreateToken} {/if}
diff --git a/frontend/src/lib/components/sidebar/SidebarContent.svelte b/frontend/src/lib/components/sidebar/SidebarContent.svelte index 26b2bf8a74f38..4ce9da3d134e0 100644 --- a/frontend/src/lib/components/sidebar/SidebarContent.svelte +++ b/frontend/src/lib/components/sidebar/SidebarContent.svelte @@ -55,6 +55,12 @@ import NatsIcon from '../icons/NatsIcon.svelte' import MqttIcon from '../icons/MqttIcon.svelte' import AwsIcon from '../icons/AwsIcon.svelte' + import { + getAvailableNativeTriggerServices, + getServiceConfig, + getServiceIcon + } from '../triggers/native/utils' + import type { NativeServiceName } from '$lib/gen/types.gen' import { Menubar, Menu, @@ -82,6 +88,29 @@ let hasNewChangelogs = $state(false) let recentChangelogs: Changelog[] = $state([]) let lastOpened = localStorage.getItem('changelogsLastOpened') + let availableNativeServices = $state< + Array<{ service: NativeServiceName; icon: any; config: any }> + >([]) + + async function loadAvailableNativeTriggers() { + try { + const services = await getAvailableNativeTriggerServices($workspaceStore!) + console.log({ services }) + const serviceData = await Promise.all( + services.map(async (service) => ({ + service, + icon: await getServiceIcon(service), + config: getServiceConfig(service) + })) + ) + availableNativeServices = serviceData + } catch (err) { + console.error('Failed to load available native trigger services:', err) + availableNativeServices = [] + } + } + + loadAvailableNativeTriggers() onMount(() => { if (lastOpened) { @@ -287,6 +316,21 @@ aiDescription: 'Button to navigate to Email triggers' } ]) + + let nativeTriggerLinks = $derived( + availableNativeServices.map(({ service, icon, config }) => ({ + label: config?.serviceDisplayName || service, + href: `/native_triggers/${service}`, + icon: icon, + disabled: $userStore?.operator, + kind: service, + aiId: `sidebar-menu-link-${service}`, + aiDescription: `Button to navigate to ${config?.serviceDisplayName || service} triggers` + })) + ) + + let allTriggerLinks = $derived([...defaultExtraTriggerLinks, ...nativeTriggerLinks]) + let triggerMenuLinks = $derived([ { label: 'Schedules', @@ -296,12 +340,12 @@ aiId: 'sidebar-menu-link-schedules', aiDescription: 'Button to navigate to schedules' }, - ...defaultExtraTriggerLinks.filter( + ...allTriggerLinks.filter( (link) => $usedTriggerKinds.includes(link.kind) || $page.url.pathname.includes(link.href) ) ]) let extraTriggerLinks = $derived( - defaultExtraTriggerLinks.filter((link) => { + allTriggerLinks.filter((link) => { return !$page.url.pathname.includes(link.href) && !$usedTriggerKinds.includes(link.kind) }) ) diff --git a/frontend/src/lib/components/triggers.ts b/frontend/src/lib/components/triggers.ts index eee4f37e8b1f2..7483b0c76bea7 100644 --- a/frontend/src/lib/components/triggers.ts +++ b/frontend/src/lib/components/triggers.ts @@ -56,6 +56,7 @@ export type TriggerKind = | 'mqtt' | 'sqs' | 'gcp' + | 'nextcloud' export function captureTriggerKindToTriggerKind(kind: CaptureTriggerKind): TriggerKind { switch (kind) { case 'webhook': diff --git a/frontend/src/lib/components/triggers/AddTriggersButton.svelte b/frontend/src/lib/components/triggers/AddTriggersButton.svelte index 6e056ef07de7b..6801eb78859e6 100644 --- a/frontend/src/lib/components/triggers/AddTriggersButton.svelte +++ b/frontend/src/lib/components/triggers/AddTriggersButton.svelte @@ -5,7 +5,8 @@ import type { Placement } from '@floating-ui/core' import { isCloudHosted } from '$lib/cloud' import { CloudOff } from 'lucide-svelte' - import type { Item } from '$lib/utils' + import { isServiceAvailable } from './native/utils' + import { workspaceStore } from '$lib/stores' interface Props { setDropdownWidthToButtonWidth?: boolean @@ -33,73 +34,87 @@ let dropdown: DropdownV2 | undefined const cloudHosted = isCloudHosted() + let nextcloudAvailable = $state(false) - // Dropdown items for adding new triggers - const addTriggerItems: Item[] = [ - { - displayName: 'Schedule', - action: () => onAddDraftTrigger?.('schedule'), - icon: triggerIconMap.schedule - }, - { displayName: 'HTTP', action: () => onAddDraftTrigger?.('http'), icon: triggerIconMap.http }, - { - displayName: 'WebSocket', - action: () => onAddDraftTrigger?.('websocket'), - icon: triggerIconMap.websocket, - extra: cloudHosted ? extra : undefined - }, - { - displayName: 'Postgres', - action: () => onAddDraftTrigger?.('postgres'), - icon: triggerIconMap.postgres, - extra: cloudHosted ? extra : undefined - }, - { - displayName: 'Kafka', - action: () => onAddDraftTrigger?.('kafka'), - icon: triggerIconMap.kafka, - extra: cloudHosted ? extra : undefined - }, - { - displayName: 'NATS', - action: () => onAddDraftTrigger?.('nats'), - icon: triggerIconMap.nats, - extra: cloudHosted ? extra : undefined - }, - { - displayName: 'MQTT', - action: () => onAddDraftTrigger?.('mqtt'), - icon: triggerIconMap.mqtt, - extra: cloudHosted ? extra : undefined - }, - { - displayName: 'SQS', - action: () => onAddDraftTrigger?.('sqs'), - icon: triggerIconMap.sqs, - extra: cloudHosted ? extra : undefined - }, - { - displayName: 'GCP Pub/Sub', - action: () => onAddDraftTrigger?.('gcp'), - icon: triggerIconMap.gcp, - extra: cloudHosted ? extra : undefined - }, - { - displayName: 'Email', - action: () => onAddDraftTrigger?.('email'), - icon: triggerIconMap.email, - extra: cloudHosted ? extra : undefined - }, - { - displayName: 'Scheduled Poll', - action: (e) => { - onAddDraftTrigger?.('poll') - onAddScheduledPoll?.() + async function setNextcloudState() { + nextcloudAvailable = await isServiceAvailable('nextcloud', $workspaceStore!) + } + + setNextcloudState() + + const addTriggerItems = $derived( + [ + { + displayName: 'Schedule', + action: () => onAddDraftTrigger?.('schedule'), + icon: triggerIconMap.schedule + }, + { displayName: 'HTTP', action: () => onAddDraftTrigger?.('http'), icon: triggerIconMap.http }, + { + displayName: 'WebSocket', + action: () => onAddDraftTrigger?.('websocket'), + icon: triggerIconMap.websocket, + extra: cloudHosted ? extra : undefined + }, + { + displayName: 'Postgres', + action: () => onAddDraftTrigger?.('postgres'), + icon: triggerIconMap.postgres, + extra: cloudHosted ? extra : undefined + }, + { + displayName: 'Kafka', + action: () => onAddDraftTrigger?.('kafka'), + icon: triggerIconMap.kafka, + extra: cloudHosted ? extra : undefined + }, + { + displayName: 'NATS', + action: () => onAddDraftTrigger?.('nats'), + icon: triggerIconMap.nats, + extra: cloudHosted ? extra : undefined + }, + { + displayName: 'MQTT', + action: () => onAddDraftTrigger?.('mqtt'), + icon: triggerIconMap.mqtt, + extra: cloudHosted ? extra : undefined + }, + { + displayName: 'SQS', + action: () => onAddDraftTrigger?.('sqs'), + icon: triggerIconMap.sqs, + extra: cloudHosted ? extra : undefined + }, + { + displayName: 'GCP Pub/Sub', + action: () => onAddDraftTrigger?.('gcp'), + icon: triggerIconMap.gcp, + extra: cloudHosted ? extra : undefined + }, + { + displayName: 'Email', + action: () => onAddDraftTrigger?.('email'), + icon: triggerIconMap.email, + extra: cloudHosted ? extra : undefined + }, + { + displayName: 'Scheduled Poll', + action: (e) => { + onAddDraftTrigger?.('poll') + onAddScheduledPoll?.() + }, + icon: SchedulePollIcon, + hidden: !isEditor }, - icon: SchedulePollIcon, - hidden: !isEditor - } - ].filter((item) => !item.hidden) + { + displayName: 'Nextcloud', + action: () => onAddDraftTrigger?.('nextcloud'), + icon: triggerIconMap.nextcloud, + hidden: !nextcloudAvailable + } + ].filter((item) => !item.hidden) + ) let triggersButtonWidth = $state(0) diff --git a/frontend/src/lib/components/triggers/native/NativeTriggerEditor.svelte b/frontend/src/lib/components/triggers/native/NativeTriggerEditor.svelte new file mode 100644 index 0000000000000..cb9ace947456c --- /dev/null +++ b/frontend/src/lib/components/triggers/native/NativeTriggerEditor.svelte @@ -0,0 +1,343 @@ + + + +
+
+

+ {isNew ? 'Create' : 'Edit'} + {serviceConfig?.serviceDisplayName} Trigger +

+ + +
+ +
+
+
+
+

+ Pick a script or flow to be triggered +

+
+ + +
+ {#if errors.runnable_path} +
{errors.runnable_path}
+ {/if} +
+ +
+
+ + +
+ + {#if loadingConfig} +
+

+ {serviceConfig?.serviceDisplayName} Configuration +

+
+
+ Loading configuration from {serviceConfig?.serviceDisplayName}... +
+
+ {:else if ServiceFormComponent} + + {:else} +
+

+ {serviceConfig?.serviceDisplayName} Configuration +

+
+
Failed to load service configuration component for {service}.
+
+ Ensure your workspace has a connected {serviceConfig?.serviceDisplayName} integration. +
+ +
+
+ {/if} +
+
+
+
diff --git a/frontend/src/lib/components/triggers/native/NativeTriggerTable.svelte b/frontend/src/lib/components/triggers/native/NativeTriggerTable.svelte new file mode 100644 index 0000000000000..eda51f55bcb8c --- /dev/null +++ b/frontend/src/lib/components/triggers/native/NativeTriggerTable.svelte @@ -0,0 +1,172 @@ + + +
+ {#if loading} + {#each new Array(6) as _} + + {/each} + {:else if !triggers?.length} +
+ No {serviceConfig?.serviceDisplayName} triggers found +
+ {:else} +
+ {#each triggers as trigger} + {@const isFlow = trigger.runnable_path.includes('/flows/')} + {@const href = `${isFlow ? '/flows/get' : '/scripts/get'}/${trigger.runnable_path}`} +
+
+ {#if service === 'nextcloud'} + + {:else} + + {/if} + + onEdit?.(trigger)} + class="min-w-0 grow hover:underline decoration-gray-400" + > +
+ {trigger.summary || trigger.external_id} +
+
+ external_id: {trigger.external_id} +
+
+ runnable: {trigger.runnable_path} +
+
+ +
+ + + openDeleteConfirmation(trigger) + }, + { + displayName: 'Audit logs', + icon: Eye, + href: `${base}/audit_logs?resource=${trigger.external_id}` + } + ]} + /> +
+
+ +
+ edited by {trigger.edited_by} • {displayDate(trigger.edited_at)} +
+
+ {/each} +
+ {/if} +
+ + +
+ Are you sure you want to delete this trigger? +
+
+
+ Warning: This will permanently delete the trigger from both Windmill and {serviceConfig?.serviceDisplayName}. + This action cannot be undone. +
+
+
+ {#if triggerToDelete} +
+
External ID: {triggerToDelete.external_id}
+
+ {/if} +
+
diff --git a/frontend/src/lib/components/triggers/native/services/nextcloud/NextcloudTriggerForm.svelte b/frontend/src/lib/components/triggers/native/services/nextcloud/NextcloudTriggerForm.svelte new file mode 100644 index 0000000000000..9d962047661fc --- /dev/null +++ b/frontend/src/lib/components/triggers/native/services/nextcloud/NextcloudTriggerForm.svelte @@ -0,0 +1,230 @@ + + +
+
+

Nextcloud Configuration

+ +
+ + {#if showCustomRawEditor} +
+
+ {:else if loadingEvents} +
+
+ Loading available events... +
+ {:else if availableEvents.length === 0} +
+
No events available. Please ensure your workspace has a connected Nextcloud integration.
+
+ + +
+
+ {:else if serviceSchema} + + {:else} +
+ Please ensure NextCloud workspace integration is connected to load available configuration + options. +
+ {/if} +
diff --git a/frontend/src/lib/components/triggers/native/utils.ts b/frontend/src/lib/components/triggers/native/utils.ts new file mode 100644 index 0000000000000..60344003174a9 --- /dev/null +++ b/frontend/src/lib/components/triggers/native/utils.ts @@ -0,0 +1,141 @@ +import type { NativeServiceName, NativeTrigger } from '$lib/gen/types.gen' +import { isCloudHosted } from '$lib/cloud' +import { WorkspaceIntegrationService } from '$lib/gen' + +export interface NativeTriggerConfig { + readonly serviceDisplayName: string + readonly serviceKey: NativeServiceName + readonly supportsSync: boolean + readonly supportsFetchConfig: boolean + readonly isCloudCompatible: boolean + readonly templates?: { + script?: string + flow?: string + } +} + +export const NATIVE_TRIGGER_SERVICES: Record = { + nextcloud: { + serviceDisplayName: 'Nextcloud', + serviceKey: 'nextcloud', + supportsSync: true, + supportsFetchConfig: true, + isCloudCompatible: true, + templates: { + script: '/scripts/add?hub=hub%2F28045', + flow: '/flows/add?hub=73' + } + } +} + +export async function isServiceAvailable( + service: NativeServiceName, + workspace: string +): Promise { + const config = NATIVE_TRIGGER_SERVICES[service] + if (!config) return false + + if (isCloudHosted() && !config.isCloudCompatible) { + return false + } + + try { + const response = await WorkspaceIntegrationService.checkIfNativeTriggersServiceExists({ + workspace, + serviceName: service + }) + + return response + } catch (workspaceErr) { + console.debug(`Workspace integration check failed for ${service}:`, workspaceErr) + return false + } +} + +export function getAvailableServices(): NativeServiceName[] { + return Object.keys(NATIVE_TRIGGER_SERVICES) as NativeServiceName[] +} + +export async function getAvailableNativeTriggerServices( + workspace: string +): Promise { + const services = getAvailableServices() + const availableServices: NativeServiceName[] = [] + + for (const service of services) { + const available = await isServiceAvailable(service, workspace) + if (available) { + availableServices.push(service) + } + } + + return availableServices +} + +export function getServiceConfig(service: NativeServiceName): NativeTriggerConfig | undefined { + return NATIVE_TRIGGER_SERVICES[service] +} + +export type EventTypeVal = 'webhook' + +export interface ExtendedNativeTrigger extends NativeTrigger { + id: number + runnable_path: string + runnable_kind: 'script' | 'flow' +} + +export interface ServiceFormProps { + config: Record + errors: Record + resources: Array<{ path: string; description?: string }> + onConfigChange: (newConfig: Record) => void + onTest?: () => Promise + disabled?: boolean +} + +export function validateCommonFields(config: Record): Record { + const errors: Record = {} + + if (!config.runnable_path?.trim()) { + errors.runnable_path = 'Script/Flow path is required' + } + + return errors +} + +export function formatTriggerDisplayName(trigger: NativeTrigger): string { + const serviceConfig = getServiceConfig(trigger.service_name) + const serviceName = serviceConfig?.serviceDisplayName || trigger.service_name + return `${serviceName} - ${trigger.summary || trigger.external_id}` +} + +export function getTriggerIconName(service: NativeServiceName): string { + switch (service) { + case 'nextcloud': + return 'NextcloudIcon' + default: + return 'Database' + } +} + +export async function getServiceIcon(service: NativeServiceName): Promise { + switch (service) { + case 'nextcloud': + return (await import('$lib/components/icons/NextcloudIcon.svelte')).default + } +} + +export function getServiceTemplates( + service: NativeServiceName +): { script?: string; flow?: string } | undefined { + const config = getServiceConfig(service) + return config?.templates +} + +export function getTemplatePath( + service: NativeServiceName, + type: 'script' | 'flow' +): string | undefined { + const templates = getServiceTemplates(service) + return templates?.[type] +} diff --git a/frontend/src/lib/components/triggers/utils.ts b/frontend/src/lib/components/triggers/utils.ts index 23d37413e14ec..23bf0c38f6a2e 100644 --- a/frontend/src/lib/components/triggers/utils.ts +++ b/frontend/src/lib/components/triggers/utils.ts @@ -26,6 +26,7 @@ import { saveGcpTriggerFromCfg } from './gcp/utils' import type { Triggers } from './triggers.svelte' import { emptyString } from '$lib/utils' import { saveEmailTriggerFromCfg } from './email/utils' +import NextcloudIcon from '$lib/components/icons/NextcloudIcon.svelte' export const CLOUD_DISABLED_TRIGGER_TYPES = [ 'nats', @@ -53,6 +54,7 @@ export type TriggerType = | 'email' | 'poll' | 'cli' + | 'nextcloud' export type Trigger = { type: TriggerType @@ -83,7 +85,8 @@ export const triggerIconMap = { gcp: GoogleCloudIcon, primary_schedule: Calendar, poll: SchedulePollIcon, - cli: Terminal + cli: Terminal, + nextcloud: NextcloudIcon } /** @@ -137,7 +140,8 @@ export function updateTriggersCount( gcp: 'gcp_count', email: 'email_count', poll: undefined, - cli: undefined + cli: undefined, + nextcloud: undefined } const countProperty = countPropertyMap[type] @@ -319,7 +323,8 @@ export async function deployTriggers( usedTriggerKinds ), poll: undefined, - cli: undefined + cli: undefined, + nextcloud: undefined } await Promise.all( diff --git a/frontend/src/lib/components/workspaceSettings/OAuthClientConfig.svelte b/frontend/src/lib/components/workspaceSettings/OAuthClientConfig.svelte new file mode 100644 index 0000000000000..b532ff46f34d6 --- /dev/null +++ b/frontend/src/lib/components/workspaceSettings/OAuthClientConfig.svelte @@ -0,0 +1,212 @@ + + +
+
+

{serviceDisplayName} OAuth Client Configuration

+ {#if existingConfig} +
+ + Configured +
+ {:else} +
Not Configured
+ {/if} +
+ + {#if !existingConfig} + +

+ Before you can connect to {serviceDisplayName}, you need to configure an OAuth client. This + requires: +

+
    +
  1. Create an OAuth2 application in your {serviceDisplayName} instance
  2. +
  3. Configure the redirect URI: {redirectUri || + `${window.location.origin}/workspace_settings?tab=integrations&service=${serviceName}`}
  4. +
  5. Enter the client credentials below
  6. +
+
+ {/if} + +
+
+
+
+ +
+
+ +
+
+ +
+
+ +
+ + + {#if baseUrl} + + {/if} +
+
+
+ + {#if existingConfig} + +

+ OAuth client is configured for {serviceDisplayName}. You can now connect this workspace to + your {serviceDisplayName} instance. +

+
+ {/if} +
diff --git a/frontend/src/lib/components/workspaceSettings/WorkspaceIntegrations.svelte b/frontend/src/lib/components/workspaceSettings/WorkspaceIntegrations.svelte new file mode 100644 index 0000000000000..9237495306646 --- /dev/null +++ b/frontend/src/lib/components/workspaceSettings/WorkspaceIntegrations.svelte @@ -0,0 +1,367 @@ + + +
+
+ + Connect your workspace to external services for native triggers and enhanced functionality. + These connections are shared across all workspace members and are required for native triggers + to work. + + + Learn more about native triggers and workspace integrations. + +
+ + {#if processingCallback} + +

Completing your OAuth connection... Please wait.

+
+ {:else if loading} +
+ {#each new Array(3) as _} + + {/each} +
+ {:else} +
+ {#each Object.entries(supportedServices) as [serviceName, config]} + {@const integration = getIntegrationByService(serviceName)} + {@const isConnecting = connecting === serviceName} + {@const isOAuthConfigured = integration && isConfigured(integration)} + {@const isServiceConnected = integration && isConnected(integration)} + {@const isShowingConfig = showingConfig === serviceName} + +
+
+
+
+ +
+
+
{config.displayName}
+
{config.description}
+
+
+ +
+ {#if isServiceConnected} +
+ + Connected +
+ + + {:else if isOAuthConfigured} +
+ + Configured, Not Connected +
+ + + {:else} +
+ + Not Configured +
+ + {/if} + + {#if config.docsUrl} + + {/if} +
+
+ + {#if isShowingConfig} +
+ { + await createOrUpdateIntegration(serviceName, oauthData) + showingConfig = null + }} + /> +
+ {/if} +
+ {/each} +
+ + +
+

To use native triggers, you need:

+
    +
  1. Configure OAuth client credentials for the desired service (workspace level)
  2. +
  3. Connect your workspace to the external service using the configured OAuth client
  4. +
  5. Appropriate permissions to create and manage triggers
  6. +
+

+ OAuth client configuration is required before you can connect to external services. This + ensures secure authentication between your workspace and the external service. +

+
+
+ + {#if integrations.length === 0} + + Connect to external services above to enable native triggers for your workspace. + + {/if} + {/if} +
diff --git a/frontend/src/routes/(root)/(logged)/+layout.svelte b/frontend/src/routes/(root)/(logged)/+layout.svelte index 4f53e5a3bfb43..75c1898e5f847 100644 --- a/frontend/src/routes/(root)/(logged)/+layout.svelte +++ b/frontend/src/routes/(root)/(logged)/+layout.svelte @@ -202,7 +202,8 @@ sqs_used, mqtt_used, gcp_used, - email_used + email_used, + nextcloud_used } = await WorkspaceService.getUsedTriggers({ workspace: $workspaceStore ?? '' }) @@ -233,6 +234,9 @@ if (email_used) { usedKinds.push('email') } + if (nextcloud_used) { + usedKinds.push('nextcloud') + } $usedTriggerKinds = usedKinds } diff --git a/frontend/src/routes/(root)/(logged)/native_triggers/[service_name]/+page.svelte b/frontend/src/routes/(root)/(logged)/native_triggers/[service_name]/+page.svelte new file mode 100644 index 0000000000000..3511d4e84d0d6 --- /dev/null +++ b/frontend/src/routes/(root)/(logged)/native_triggers/[service_name]/+page.svelte @@ -0,0 +1,250 @@ + + + + + + `${formatTriggerDisplayName(trigger)} ${trigger.path} ${trigger.runnable_path}`} +/> + +{#if $userStore?.operator && $workspaceStore && !$userWorkspaces.find((_) => _.id === $workspaceStore)?.operator_settings?.triggers} + +{:else if !serviceSupported} + +
+

Service Not Supported

+

+ The service "{serviceName}" is not supported for native triggers. +

+

Supported services: nextcloud

+
+
+{:else} + + + {#if serviceAvailable} + + {/if} + + + {#if serviceAvailable === false} + +
+
+

+ {serviceConfig?.serviceDisplayName || serviceName} triggers are not available. This could + be because: +

+
    +
  • The workspace doesn't have a {serviceConfig?.serviceDisplayName || serviceName} integration + connected
  • +
  • The {serviceConfig?.serviceDisplayName || serviceName} OAuth2 integration is not configured + in the instance settings
  • +
+
+ + +
+
+
+
+ {:else if serviceAvailable} +
+
+ +
+ + {#if loading} +
+
Loading {serviceConfig?.serviceDisplayName || serviceName} triggers...
+
+ {:else if !triggers?.length} +
+
No {serviceConfig?.serviceDisplayName || serviceName} triggers found
+ +
+ {:else if filteredItems?.length} + editor?.openEdit(trigger)} + onSync={syncTriggers} + {shareModal} + /> + {:else} + + {/if} +
+ {:else} +
+ +
Checking {serviceConfig?.serviceDisplayName || serviceName} availability...
+
+ {/if} +
+{/if} diff --git a/frontend/src/routes/(root)/(logged)/native_triggers/[service_name]/+page.ts b/frontend/src/routes/(root)/(logged)/native_triggers/[service_name]/+page.ts new file mode 100644 index 0000000000000..e0ae01cc55233 --- /dev/null +++ b/frontend/src/routes/(root)/(logged)/native_triggers/[service_name]/+page.ts @@ -0,0 +1,22 @@ +import { getServiceConfig } from '$lib/components/triggers/native/utils' +import { error } from '@sveltejs/kit' +import type { PageLoad } from './$types' +import type { NativeServiceName } from '$lib/gen' + +export const load: PageLoad = async ({ params }) => { + const serviceName = params.service_name as NativeServiceName + + const serviceConfig = getServiceConfig(serviceName) + + if (!serviceConfig) { + throw error(404, { + message: `Service "${serviceName}" is not supported for native triggers.` + }) + } + + return { + serviceName, + serviceConfig, + stuff: { title: `${serviceConfig.serviceDisplayName} triggers` } + } +} diff --git a/frontend/src/routes/(root)/(logged)/workspace_settings/+page.svelte b/frontend/src/routes/(root)/(logged)/workspace_settings/+page.svelte index 206148381099e..14c1467e8de97 100644 --- a/frontend/src/routes/(root)/(logged)/workspace_settings/+page.svelte +++ b/frontend/src/routes/(root)/(logged)/workspace_settings/+page.svelte @@ -128,6 +128,7 @@ | 'windmill_lfs' | 'git_sync' | 'default_app' + | 'native_triggers' | 'encryption') ?? 'users' ) let usingOpenaiClientCredentialsOauth = $state(false) @@ -595,6 +596,14 @@ >
Windmill AI
+ +
Native Triggers
+
{/key} + {:else if tab == 'native_triggers'} + {#if $workspaceStore} + {#await import('$lib/components/workspaceSettings/WorkspaceIntegrations.svelte') then { default: WorkspaceIntegrations }} + + {/await} + {:else} +
+
Loading workspace...
+
+ {/if} {:else if tab == 'encryption'}