Skip to content

[wip] structured & versioned webhook event payloads #8107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ members = [
"nexus/test-utils-macros",
"nexus/test-utils",
"nexus/types",
"nexus/webhooks",
"oximeter/api",
"oximeter/collector",
"oximeter/db",
Expand Down Expand Up @@ -257,6 +258,7 @@ default-members = [
"nexus/test-utils-macros",
"nexus/test-utils",
"nexus/types",
"nexus/webhooks",
"oximeter/api",
"oximeter/collector",
"oximeter/db",
Expand Down Expand Up @@ -534,6 +536,7 @@ nexus-test-interface = { path = "nexus/test-interface" }
nexus-test-utils-macros = { path = "nexus/test-utils-macros" }
nexus-test-utils = { path = "nexus/test-utils" }
nexus-types = { path = "nexus/types" }
nexus-webhooks = { path = "nexus/webhooks" }
nix = { version = "0.29", features = ["net"] }
nom = "7.1.3"
num-integer = "0.1.46"
Expand Down
2 changes: 2 additions & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ nexus-mgs-updates.workspace = true
nexus-networking.workspace = true
nexus-saga-recovery.workspace = true
nexus-test-interface.workspace = true
nexus-webhooks.workspace = true
num-integer.workspace = true
openssl.workspace = true
oximeter-client.workspace = true
Expand Down Expand Up @@ -151,6 +152,7 @@ nexus-db-queries = { workspace = true, features = ["testing"] }
nexus-client.workspace = true
nexus-test-utils-macros.workspace = true
nexus-test-utils.workspace = true
nexus-webhooks = { workspace = true, features = ["test-events"] }
omicron-sled-agent.workspace = true
omicron-test-utils.workspace = true
openapi-lint.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions nexus/db-model/src/webhook_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use crate::SqlU32;
use crate::WebhookEventClass;
use chrono::{DateTime, Utc};
use db_macros::Asset;
Expand Down Expand Up @@ -39,6 +40,9 @@ pub struct WebhookEvent {
pub event: serde_json::Value,

pub num_dispatched: i64,

/// The version of the JSON schema for `event`.
pub payload_schema_version: SqlU32,
}

impl WebhookEvent {
Expand Down
13 changes: 13 additions & 0 deletions nexus/db-model/src/webhook_event_class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ impl WebhookEventClass {
<Self as strum::VariantArray>::VARIANTS;
}

// Alphabetical ordering
impl Ord for WebhookEventClass {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.as_str().cmp(other.as_str())
}
}

impl PartialOrd for WebhookEventClass {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl fmt::Display for WebhookEventClass {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.as_str())
Expand Down
23 changes: 4 additions & 19 deletions nexus/db-queries/src/db/datastore/webhook_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,6 @@ pub struct DeliveryConfig {
pub lease_timeout: TimeDelta,
}

/// A record from the [`WebhookDelivery`] table along with the event class and
/// data of the corresponding [`WebhookEvent`] record.
#[derive(Debug, Clone)]
pub struct DeliveryAndEvent {
pub delivery: WebhookDelivery,
pub event_class: WebhookEventClass,
pub event: serde_json::Value,
}

impl DataStore {
pub async fn webhook_delivery_create_batch(
&self,
Expand Down Expand Up @@ -203,8 +194,7 @@ impl DataStore {
opctx: &OpContext,
rx_id: &WebhookReceiverUuid,
cfg: &DeliveryConfig,
) -> Result<impl ExactSizeIterator<Item = DeliveryAndEvent> + 'static, Error>
{
) -> Result<Vec<(WebhookDelivery, WebhookEvent)>, Error> {
let conn = self.pool_connection_authorized(opctx).await?;
let now =
diesel::dsl::now.into_sql::<diesel::pg::sql_types::Timestamptz>();
Expand Down Expand Up @@ -249,17 +239,11 @@ impl DataStore {
.inner_join(
event_dsl::webhook_event.on(event_dsl::id.eq(dsl::event_id)),
)
.select((
WebhookDelivery::as_select(),
event_dsl::event_class,
event_dsl::event,
))
.select((WebhookDelivery::as_select(), WebhookEvent::as_select()))
.load_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;
Ok(rows.into_iter().map(|(delivery, event_class, event)| {
DeliveryAndEvent { delivery, event_class, event }
}))
Ok(rows)
}

pub async fn webhook_delivery_start_attempt(
Expand Down Expand Up @@ -503,6 +487,7 @@ mod test {
&opctx,
event_id,
WebhookEventClass::TestFoo,
1,
serde_json::json!({
"answer": 42,
}),
Expand Down
2 changes: 2 additions & 0 deletions nexus/db-queries/src/db/datastore/webhook_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl DataStore {
opctx: &OpContext,
id: WebhookEventUuid,
event_class: WebhookEventClass,
payload_schema_version: u32,
event: serde_json::Value,
) -> CreateResult<WebhookEvent> {
let conn = self.pool_connection_authorized(&opctx).await?;
Expand All @@ -34,6 +35,7 @@ impl DataStore {
identity: WebhookEventIdentity::new(id),
time_dispatched: None,
event_class,
payload_schema_version: payload_schema_version.into(),
event,
num_dispatched: 0,
})
Expand Down
8 changes: 7 additions & 1 deletion nexus/db-queries/src/db/datastore/webhook_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,13 @@ mod test {
) -> (authz::WebhookEvent, crate::db::model::WebhookEvent) {
let id = WebhookEventUuid::new_v4();
datastore
.webhook_event_create(opctx, id, event_class, serde_json::json!({}))
.webhook_event_create(
opctx,
id,
event_class,
1,
serde_json::json!({}),
)
.await
.expect("cant create ye event");
LookupPath::new(opctx, datastore)
Expand Down
1 change: 1 addition & 0 deletions nexus/db-schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2266,6 +2266,7 @@ table! {
event -> Jsonb,
time_dispatched -> Nullable<Timestamptz>,
num_dispatched -> Int8,
payload_schema_version -> Int8,
}
}

Expand Down
21 changes: 18 additions & 3 deletions nexus/src/app/background/tasks/webhook_deliverator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ use crate::app::webhook::ReceiverClient;
use futures::future::BoxFuture;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::DataStore;
use nexus_db_queries::db::datastore::webhook_delivery::DeliveryAndEvent;
use nexus_db_queries::db::datastore::webhook_delivery::DeliveryAttemptState;
pub use nexus_db_queries::db::datastore::webhook_delivery::DeliveryConfig;
use nexus_db_queries::db::model::WebhookDeliveryAttemptResult;
use nexus_db_queries::db::model::WebhookEvent;
use nexus_db_queries::db::model::WebhookReceiverConfig;
use nexus_db_queries::db::pagination::Paginator;
use nexus_types::identity::Resource;
Expand Down Expand Up @@ -230,7 +230,11 @@ impl WebhookDeliverator {
..Default::default()
};

for DeliveryAndEvent { delivery, event_class, event } in deliveries {
for (delivery, event) in deliveries {
let WebhookEvent {
event_class, event, payload_schema_version, ..
} = event;
let event_version = payload_schema_version.into();
let attempt = (*delivery.attempts) + 1;
let delivery_id = WebhookDeliveryUuid::from(delivery.id);
match self
Expand All @@ -248,6 +252,7 @@ impl WebhookDeliverator {
"webhook event delivery attempt started";
"event_id" => %delivery.event_id,
"event_class" => %event_class,
"event_version" => %event_version,
"delivery_id" => %delivery_id,
"attempt" => ?attempt,
);
Expand All @@ -259,6 +264,7 @@ impl WebhookDeliverator {
at {time:?}";
"event_id" => %delivery.event_id,
"event_class" => %event_class,
"event_version" => %event_version,
"delivery_id" => %delivery_id,
"time_completed" => ?time,
);
Expand All @@ -272,6 +278,7 @@ impl WebhookDeliverator {
another Nexus";
"event_id" => %delivery.event_id,
"event_class" => %event_class,
"event_version" => %event_version,
"delivery_id" => %delivery_id,
"nexus_id" => %nexus_id,
"time_started" => ?started,
Expand All @@ -286,6 +293,7 @@ impl WebhookDeliverator {
delivery attempt";
"event_id" => %delivery.event_id,
"event_class" => %event_class,
"event_version" => %event_version,
"delivery_id" => %delivery_id,
"error" => %error,
);
Expand All @@ -298,7 +306,13 @@ impl WebhookDeliverator {

// okay, actually do the thing...
let delivery_attempt = match client
.send_delivery_request(opctx, &delivery, event_class, &event)
.send_delivery_request(
opctx,
&delivery,
event_class,
event_version,
&event,
)
.await
{
Ok(delivery) => delivery,
Expand Down Expand Up @@ -326,6 +340,7 @@ impl WebhookDeliverator {
"{MSG}";
"event_id" => %delivery.event_id,
"event_class" => %event_class,
"event_version" => %event_version,
"delivery_id" => %delivery_id,
"error" => %e,
);
Expand Down
1 change: 1 addition & 0 deletions nexus/src/app/background/tasks/webhook_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ mod test {
&opctx,
event_id,
db::model::WebhookEventClass::TestQuuxBar,
1,
serde_json::json!({"msg": "help im trapped in a webhook event factory"}),
)
.await
Expand Down
6 changes: 5 additions & 1 deletion nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ mod volume;
mod vpc;
mod vpc_router;
mod vpc_subnet;
mod webhook;
pub mod webhook;

// Sagas are not part of the "Nexus" implementation, but they are
// application logic.
Expand Down Expand Up @@ -253,6 +253,9 @@ pub struct Nexus {

/// reports status of pending MGS-managed updates
mgs_update_status_rx: watch::Receiver<MgsUpdateDriverStatus>,

/// Collection of JSON schemas for webhook event classes and versions.
webhook_schemas: webhook::EventSchemaRegistry,
}

impl Nexus {
Expand Down Expand Up @@ -480,6 +483,7 @@ impl Nexus {
)),
tuf_artifact_replication_tx,
mgs_update_status_rx,
webhook_schemas: webhook::event_schemas(),
};

// TODO-cleanup all the extra Arcs here seems wrong
Expand Down
Loading
Loading