diff --git a/Cargo.lock b/Cargo.lock index 89c5f06306..a6be45d4c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6644,6 +6644,21 @@ dependencies = [ "uuid", ] +[[package]] +name = "nexus-webhooks" +version = "0.1.0" +dependencies = [ + "async-trait", + "nexus-auth", + "nexus-db-model", + "omicron-common", + "omicron-rpaths", + "omicron-uuid-kinds", + "omicron-workspace-hack", + "schemars", + "serde", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -7319,6 +7334,7 @@ dependencies = [ "nexus-test-utils", "nexus-test-utils-macros", "nexus-types", + "nexus-webhooks", "num-integer", "omicron-common", "omicron-passwords", diff --git a/Cargo.toml b/Cargo.toml index abd6f0104c..b75aa15bb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -105,6 +105,7 @@ members = [ "nexus/test-utils-macros", "nexus/test-utils", "nexus/types", + "nexus/webhooks", "oximeter/api", "oximeter/collector", "oximeter/db", @@ -257,6 +258,7 @@ default-members = [ "nexus/test-utils-macros", "nexus/test-utils", "nexus/types", + "nexus/webhooks", "oximeter/api", "oximeter/collector", "oximeter/db", @@ -534,6 +536,7 @@ nexus-test-interface = { path = "nexus/test-interface" } nexus-test-utils-macros = { path = "nexus/test-utils-macros" } nexus-test-utils = { path = "nexus/test-utils" } nexus-types = { path = "nexus/types" } +nexus-webhooks = { path = "nexus/webhooks" } nix = { version = "0.29", features = ["net"] } nom = "7.1.3" num-integer = "0.1.46" diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index e2b4a74876..40bca515d7 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -64,6 +64,7 @@ nexus-mgs-updates.workspace = true nexus-networking.workspace = true nexus-saga-recovery.workspace = true nexus-test-interface.workspace = true +nexus-webhooks.workspace = true num-integer.workspace = true openssl.workspace = true oximeter-client.workspace = true @@ -151,6 +152,7 @@ nexus-db-queries = { workspace = true, features = ["testing"] } nexus-client.workspace = true nexus-test-utils-macros.workspace = true nexus-test-utils.workspace = true +nexus-webhooks = { workspace = true, features = ["test-events"] } omicron-sled-agent.workspace = true omicron-test-utils.workspace = true openapi-lint.workspace = true diff --git a/nexus/db-model/src/webhook_event.rs b/nexus/db-model/src/webhook_event.rs index 9865567c66..9dcb6b294b 100644 --- a/nexus/db-model/src/webhook_event.rs +++ b/nexus/db-model/src/webhook_event.rs @@ -2,6 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use crate::SqlU32; use crate::WebhookEventClass; use chrono::{DateTime, Utc}; use db_macros::Asset; @@ -39,6 +40,9 @@ pub struct WebhookEvent { pub event: serde_json::Value, pub num_dispatched: i64, + + /// The version of the JSON schema for `event`. + pub payload_schema_version: SqlU32, } impl WebhookEvent { diff --git a/nexus/db-model/src/webhook_event_class.rs b/nexus/db-model/src/webhook_event_class.rs index dc7005ccf7..f2d1e85316 100644 --- a/nexus/db-model/src/webhook_event_class.rs +++ b/nexus/db-model/src/webhook_event_class.rs @@ -84,6 +84,19 @@ impl WebhookEventClass { ::VARIANTS; } +// Alphabetical ordering +impl Ord for WebhookEventClass { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.as_str().cmp(other.as_str()) + } +} + +impl PartialOrd for WebhookEventClass { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + impl fmt::Display for WebhookEventClass { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.as_str()) diff --git a/nexus/db-queries/src/db/datastore/webhook_delivery.rs b/nexus/db-queries/src/db/datastore/webhook_delivery.rs index e4593810e7..0785ed0c08 100644 --- a/nexus/db-queries/src/db/datastore/webhook_delivery.rs +++ b/nexus/db-queries/src/db/datastore/webhook_delivery.rs @@ -52,15 +52,6 @@ pub struct DeliveryConfig { pub lease_timeout: TimeDelta, } -/// A record from the [`WebhookDelivery`] table along with the event class and -/// data of the corresponding [`WebhookEvent`] record. -#[derive(Debug, Clone)] -pub struct DeliveryAndEvent { - pub delivery: WebhookDelivery, - pub event_class: WebhookEventClass, - pub event: serde_json::Value, -} - impl DataStore { pub async fn webhook_delivery_create_batch( &self, @@ -203,8 +194,7 @@ impl DataStore { opctx: &OpContext, rx_id: &WebhookReceiverUuid, cfg: &DeliveryConfig, - ) -> Result + 'static, Error> - { + ) -> Result, Error> { let conn = self.pool_connection_authorized(opctx).await?; let now = diesel::dsl::now.into_sql::(); @@ -249,17 +239,11 @@ impl DataStore { .inner_join( event_dsl::webhook_event.on(event_dsl::id.eq(dsl::event_id)), ) - .select(( - WebhookDelivery::as_select(), - event_dsl::event_class, - event_dsl::event, - )) + .select((WebhookDelivery::as_select(), WebhookEvent::as_select())) .load_async(&*conn) .await .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?; - Ok(rows.into_iter().map(|(delivery, event_class, event)| { - DeliveryAndEvent { delivery, event_class, event } - })) + Ok(rows) } pub async fn webhook_delivery_start_attempt( @@ -503,6 +487,7 @@ mod test { &opctx, event_id, WebhookEventClass::TestFoo, + 1, serde_json::json!({ "answer": 42, }), diff --git a/nexus/db-queries/src/db/datastore/webhook_event.rs b/nexus/db-queries/src/db/datastore/webhook_event.rs index 1cdf6cde7e..ef7d095d8e 100644 --- a/nexus/db-queries/src/db/datastore/webhook_event.rs +++ b/nexus/db-queries/src/db/datastore/webhook_event.rs @@ -26,6 +26,7 @@ impl DataStore { opctx: &OpContext, id: WebhookEventUuid, event_class: WebhookEventClass, + payload_schema_version: u32, event: serde_json::Value, ) -> CreateResult { let conn = self.pool_connection_authorized(&opctx).await?; @@ -34,6 +35,7 @@ impl DataStore { identity: WebhookEventIdentity::new(id), time_dispatched: None, event_class, + payload_schema_version: payload_schema_version.into(), event, num_dispatched: 0, }) diff --git a/nexus/db-queries/src/db/datastore/webhook_rx.rs b/nexus/db-queries/src/db/datastore/webhook_rx.rs index 6f0fae5c82..621066571c 100644 --- a/nexus/db-queries/src/db/datastore/webhook_rx.rs +++ b/nexus/db-queries/src/db/datastore/webhook_rx.rs @@ -1212,7 +1212,13 @@ mod test { ) -> (authz::WebhookEvent, crate::db::model::WebhookEvent) { let id = WebhookEventUuid::new_v4(); datastore - .webhook_event_create(opctx, id, event_class, serde_json::json!({})) + .webhook_event_create( + opctx, + id, + event_class, + 1, + serde_json::json!({}), + ) .await .expect("cant create ye event"); LookupPath::new(opctx, datastore) diff --git a/nexus/db-schema/src/schema.rs b/nexus/db-schema/src/schema.rs index ba699b579a..e0b5a045ae 100644 --- a/nexus/db-schema/src/schema.rs +++ b/nexus/db-schema/src/schema.rs @@ -2266,6 +2266,7 @@ table! { event -> Jsonb, time_dispatched -> Nullable, num_dispatched -> Int8, + payload_schema_version -> Int8, } } diff --git a/nexus/src/app/background/tasks/webhook_deliverator.rs b/nexus/src/app/background/tasks/webhook_deliverator.rs index 41f202a5d7..ddaa0a3686 100644 --- a/nexus/src/app/background/tasks/webhook_deliverator.rs +++ b/nexus/src/app/background/tasks/webhook_deliverator.rs @@ -35,10 +35,10 @@ use crate::app::webhook::ReceiverClient; use futures::future::BoxFuture; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::DataStore; -use nexus_db_queries::db::datastore::webhook_delivery::DeliveryAndEvent; use nexus_db_queries::db::datastore::webhook_delivery::DeliveryAttemptState; pub use nexus_db_queries::db::datastore::webhook_delivery::DeliveryConfig; use nexus_db_queries::db::model::WebhookDeliveryAttemptResult; +use nexus_db_queries::db::model::WebhookEvent; use nexus_db_queries::db::model::WebhookReceiverConfig; use nexus_db_queries::db::pagination::Paginator; use nexus_types::identity::Resource; @@ -230,7 +230,11 @@ impl WebhookDeliverator { ..Default::default() }; - for DeliveryAndEvent { delivery, event_class, event } in deliveries { + for (delivery, event) in deliveries { + let WebhookEvent { + event_class, event, payload_schema_version, .. + } = event; + let event_version = payload_schema_version.into(); let attempt = (*delivery.attempts) + 1; let delivery_id = WebhookDeliveryUuid::from(delivery.id); match self @@ -248,6 +252,7 @@ impl WebhookDeliverator { "webhook event delivery attempt started"; "event_id" => %delivery.event_id, "event_class" => %event_class, + "event_version" => %event_version, "delivery_id" => %delivery_id, "attempt" => ?attempt, ); @@ -259,6 +264,7 @@ impl WebhookDeliverator { at {time:?}"; "event_id" => %delivery.event_id, "event_class" => %event_class, + "event_version" => %event_version, "delivery_id" => %delivery_id, "time_completed" => ?time, ); @@ -272,6 +278,7 @@ impl WebhookDeliverator { another Nexus"; "event_id" => %delivery.event_id, "event_class" => %event_class, + "event_version" => %event_version, "delivery_id" => %delivery_id, "nexus_id" => %nexus_id, "time_started" => ?started, @@ -286,6 +293,7 @@ impl WebhookDeliverator { delivery attempt"; "event_id" => %delivery.event_id, "event_class" => %event_class, + "event_version" => %event_version, "delivery_id" => %delivery_id, "error" => %error, ); @@ -298,7 +306,13 @@ impl WebhookDeliverator { // okay, actually do the thing... let delivery_attempt = match client - .send_delivery_request(opctx, &delivery, event_class, &event) + .send_delivery_request( + opctx, + &delivery, + event_class, + event_version, + &event, + ) .await { Ok(delivery) => delivery, @@ -326,6 +340,7 @@ impl WebhookDeliverator { "{MSG}"; "event_id" => %delivery.event_id, "event_class" => %event_class, + "event_version" => %event_version, "delivery_id" => %delivery_id, "error" => %e, ); diff --git a/nexus/src/app/background/tasks/webhook_dispatcher.rs b/nexus/src/app/background/tasks/webhook_dispatcher.rs index 17f9f0d570..c103be5489 100644 --- a/nexus/src/app/background/tasks/webhook_dispatcher.rs +++ b/nexus/src/app/background/tasks/webhook_dispatcher.rs @@ -454,6 +454,7 @@ mod test { &opctx, event_id, db::model::WebhookEventClass::TestQuuxBar, + 1, serde_json::json!({"msg": "help im trapped in a webhook event factory"}), ) .await diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index 1291f369c7..2f64c4e49d 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -99,7 +99,7 @@ mod volume; mod vpc; mod vpc_router; mod vpc_subnet; -mod webhook; +pub mod webhook; // Sagas are not part of the "Nexus" implementation, but they are // application logic. @@ -253,6 +253,9 @@ pub struct Nexus { /// reports status of pending MGS-managed updates mgs_update_status_rx: watch::Receiver, + + /// Collection of JSON schemas for webhook event classes and versions. + webhook_schemas: webhook::EventSchemaRegistry, } impl Nexus { @@ -480,6 +483,7 @@ impl Nexus { )), tuf_artifact_replication_tx, mgs_update_status_rx, + webhook_schemas: webhook::event_schemas(), }; // TODO-cleanup all the extra Arcs here seems wrong diff --git a/nexus/src/app/webhook.rs b/nexus/src/app/webhook.rs index 0b98ff7558..81984ea6f4 100644 --- a/nexus/src/app/webhook.rs +++ b/nexus/src/app/webhook.rs @@ -180,9 +180,37 @@ use std::time::Duration; use std::time::Instant; use uuid::Uuid; +pub use nexus_webhooks::{Event, EventSchemaRegistry}; + +impl nexus_webhooks::PublishEvent for Nexus { + async fn publish_event( + &self, + opctx: &OpContext, + id: WebhookEventUuid, + event: E, + ) -> Result { + self.webhook_event_publish(opctx, id, event).await + } +} + +pub(crate) fn event_schemas() -> EventSchemaRegistry { + let mut registry = EventSchemaRegistry::new(); + + #[cfg(debug_assertions)] + nexus_webhooks::events::test::register_all(&mut registry); + + // WHEN ADDING NEW WEBHOOK EVENT CLASSES OR NEW SCHEMA VERSIONS, REMEMBER TO + // REGISTER THEM HERE! + + registry +} + impl Nexus { - /// Publish a new webhook event, with the provided `id`, `event_class`, and - /// JSON data payload. + /// Publish a new webhook event. + /// + /// The event payload is represented by a type that implements the [`Event`] + /// trait defined in this module. Publishing the event converts it to a JSON + /// object that's stored in the database. /// /// If this method returns `Ok`, the event has been durably recorded in /// CockroachDB. Once the new event record is inserted into the database, @@ -190,22 +218,63 @@ impl Nexus { /// event to receivers. However, if (for whatever reason) this Nexus fails /// to do that, the event remains durably in the database to be dispatched /// and delivered by someone else. - pub async fn webhook_event_publish( + pub async fn webhook_event_publish( &self, opctx: &OpContext, id: WebhookEventUuid, - event_class: WebhookEventClass, - event: serde_json::Value, + event: E, ) -> Result { + #[cfg(debug_assertions)] + { + // In test builds, assert that this is a schema that we know about. + let versions = match self + .webhook_schemas + .schema_versions_for(E::CLASS) + { + Some(versions) => versions, + None => panic!( + "You have attempted to publish a webhook event type whose \ + event class was not added to the webhook event schema \ + registry in `nexus::app::webhook::event_schemas()`! This \ + means that the event type's schema will not be returned \ + by the /v1/webhooks/event-classes endpoint. This is \ + probably a mistake. Since I am a test build, I will now \ + panic!\n event class: {}", + E::CLASS, + ), + }; + + if !versions.contains_key(&E::VERSION) { + panic!( + "You have attempted to publish a webhook event type whose \ + schema version is not present in the webhook event schema \ + registry in `nexus::app::webhook::event_schemas()`! This \ + is probably a mistake. Since I am a test build, I will \ + now panic!\n event class: {}\n schema version: {}", + E::CLASS, + E::VERSION, + ); + } + } + + let json = + serde_json::to_value(event).map_err(|e| Error::InternalError { + internal_message: format!( + "failed to convert {} (class: {}) to JSON: {e}", + std::any::type_name::(), + E::CLASS + ), + })?; let event = self .datastore() - .webhook_event_create(opctx, id, event_class, event) + .webhook_event_create(opctx, id, E::CLASS, E::VERSION, json) .await?; slog::debug!( &opctx.log, "enqueued webhook event"; "event_id" => ?id, - "event_class" => %event.event_class, + "event_class" => %E::CLASS, + "event_version" => E::VERSION, "time_created" => ?event.identity.time_created, ); @@ -528,7 +597,7 @@ impl Nexus { LazyLock::new(|| serde_json::json!({})); let attempt = match client - .send_delivery_request(opctx, &delivery, CLASS, &DATA) + .send_delivery_request(opctx, &delivery, CLASS, 1, &DATA) .await { Ok(attempt) => attempt, @@ -809,6 +878,7 @@ impl<'a> ReceiverClient<'a> { opctx: &OpContext, delivery: &WebhookDelivery, event_class: WebhookEventClass, + event_version: u32, data: &serde_json::Value, ) -> Result { const HDR_DELIVERY_ID: HeaderName = @@ -819,6 +889,8 @@ impl<'a> ReceiverClient<'a> { HeaderName::from_static("x-oxide-event-id"); const HDR_EVENT_CLASS: HeaderName = HeaderName::from_static("x-oxide-event-class"); + const HDR_EVENT_VERSION: HeaderName = + HeaderName::from_static("x-oxide-event-version"); const HDR_SIG: HeaderName = HeaderName::from_static("x-oxide-signature"); @@ -826,6 +898,7 @@ impl<'a> ReceiverClient<'a> { struct Payload<'a> { event_class: WebhookEventClass, event_id: WebhookEventUuid, + event_version: u32, data: &'a serde_json::Value, delivery: DeliveryMetadata<'a>, } @@ -843,6 +916,7 @@ impl<'a> ReceiverClient<'a> { let sent_at = time_attempted.to_rfc3339(); let payload = Payload { event_class, + event_version, event_id: delivery.event_id.into(), data, delivery: DeliveryMetadata { @@ -892,6 +966,7 @@ impl<'a> ReceiverClient<'a> { .header(HDR_DELIVERY_ID, delivery.id.to_string()) .header(HDR_EVENT_ID, delivery.event_id.to_string()) .header(HDR_EVENT_CLASS, event_class.to_string()) + .header(HDR_EVENT_VERSION, event_version.to_string()) .header(http::header::CONTENT_TYPE, "application/json"); // For each secret assigned to this webhook, calculate the HMAC and add a signature header. diff --git a/nexus/tests/integration_tests/webhooks.rs b/nexus/tests/integration_tests/webhooks.rs index 7b4a559954..0591b95ad6 100644 --- a/nexus/tests/integration_tests/webhooks.rs +++ b/nexus/tests/integration_tests/webhooks.rs @@ -7,7 +7,6 @@ use dropshot::test_util::ClientTestContext; use hmac::{Hmac, Mac}; use httpmock::prelude::*; -use nexus_db_model::WebhookEventClass; use nexus_db_queries::context::OpContext; use nexus_test_utils::background::activate_background_task; use nexus_test_utils::http_testing::AuthnMode; @@ -26,6 +25,8 @@ use sha2::Sha256; use std::time::Duration; use uuid::Uuid; +use nexus_webhooks::events::test as test_events; + type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; @@ -536,8 +537,7 @@ async fn test_event_delivery(cptestctx: &ControlPlaneTestContext) { .webhook_event_publish( &opctx, id, - WebhookEventClass::TestFoo, - serde_json::json!({"hello_world": true}), + test_events::Foo { hello_world: true }, ) .await .expect("event should be published successfully"); @@ -658,8 +658,7 @@ async fn test_multiple_secrets(cptestctx: &ControlPlaneTestContext) { .webhook_event_publish( &opctx, id, - WebhookEventClass::TestFoo, - serde_json::json!({"hello_world": true}), + test_events::Foo { hello_world: true }, ) .await .expect("event should be published successfully"); @@ -819,8 +818,7 @@ async fn test_multiple_receivers(cptestctx: &ControlPlaneTestContext) { .webhook_event_publish( &opctx, bar_event_id, - WebhookEventClass::TestFooBar, - serde_json::json!({"lol": "webhooked on phonics"}), + test_events::FooBar { hello: "emeryville" }, ) .await .expect("event should be published successfully"); @@ -830,8 +828,7 @@ async fn test_multiple_receivers(cptestctx: &ControlPlaneTestContext) { .webhook_event_publish( &opctx, baz_event_id, - WebhookEventClass::TestFooBaz, - serde_json::json!({"lol": "webhook, line, and sinker"}), + test_events::FooBaz { hello: "san francisco" }, ) .await .expect("event should be published successfully"); @@ -901,8 +898,7 @@ async fn test_retry_backoff(cptestctx: &ControlPlaneTestContext) { .webhook_event_publish( &opctx, id, - WebhookEventClass::TestFoo, - serde_json::json!({"hello_world": true}), + test_events::Foo { hello_world: true }, ) .await .expect("event should be published successfully"); @@ -1295,8 +1291,7 @@ async fn test_probe_resends_failed_deliveries( .webhook_event_publish( &opctx, event1_id, - WebhookEventClass::TestFoo, - serde_json::json!({"hello": "world"}), + test_events::Foo { hello_world: true } ) .await .expect("event1 should be published successfully") @@ -1306,8 +1301,7 @@ async fn test_probe_resends_failed_deliveries( .webhook_event_publish( &opctx, event2_id, - WebhookEventClass::TestFoo, - serde_json::json!({"hello": "emeryville"}), + test_events::Foo { hello_world: false } ) .await .expect("event2 should be published successfully") @@ -1451,8 +1445,7 @@ async fn test_api_resends_failed_deliveries( .webhook_event_publish( &opctx, event1_id, - WebhookEventClass::TestFoo, - serde_json::json!({"hello_world": true}), + test_events::Foo { hello_world: true }, ) .await .expect("event should be published successfully"); @@ -1463,8 +1456,7 @@ async fn test_api_resends_failed_deliveries( .webhook_event_publish( &opctx, event2_id, - WebhookEventClass::TestQuuxBar, - serde_json::json!({"hello_world": true}), + test_events::QuuxBar { a: true, b: 42 }, ) .await .expect("event should be published successfully"); @@ -1567,7 +1559,7 @@ async fn subscription_add_test( "event_class": "test.foo.bar", "event_id": id2, "data": { - "hello_world": true, + "hello": "emeryville", } }) .to_string(); @@ -1591,8 +1583,7 @@ async fn subscription_add_test( .webhook_event_publish( &opctx, id1, - WebhookEventClass::TestFooBar, - serde_json::json!({"hello_world": false}), + test_events::FooBar { hello: "san francisco" }, ) .await .expect("event should be published successfully"); @@ -1624,8 +1615,7 @@ async fn subscription_add_test( .webhook_event_publish( &opctx, id2, - WebhookEventClass::TestFooBar, - serde_json::json!({"hello_world": true}), + test_events::FooBar { hello: "emeryville" }, ) .await .expect("event should be published successfully"); @@ -1699,7 +1689,7 @@ async fn subscription_remove_test( "event_class": "test.foo.bar", "event_id": id1, "data": { - "hello_world": true, + "hello": "emeryville", } }) .to_string(); @@ -1723,8 +1713,7 @@ async fn subscription_remove_test( .webhook_event_publish( &opctx, id1, - WebhookEventClass::TestFooBar, - serde_json::json!({"hello_world": true}), + test_events::FooBar { hello: "emeryville" }, ) .await .expect("event should be published successfully"); @@ -1755,8 +1744,7 @@ async fn subscription_remove_test( .webhook_event_publish( &opctx, id2, - WebhookEventClass::TestFooBar, - serde_json::json!({"hello_world": false}), + test_events::FooBar { hello: "san francisco" }, ) .await .expect("event should be published successfully"); @@ -1780,7 +1768,7 @@ async fn subscription_remove_test( "event_class": "test.foo", "event_id": id3, "data": { - "whatever": 1 + "hello_world": true, } }) .to_string(); @@ -1802,8 +1790,7 @@ async fn subscription_remove_test( .webhook_event_publish( &opctx, id3, - WebhookEventClass::TestFoo, - serde_json::json!({"whatever": 1}), + test_events::Foo { hello_world: true }, ) .await .expect("event should be published successfully"); diff --git a/nexus/webhooks/Cargo.toml b/nexus/webhooks/Cargo.toml new file mode 100644 index 0000000000..3ba2e2692f --- /dev/null +++ b/nexus/webhooks/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "nexus-webhooks" +version = "0.1.0" +edition = "2024" + +[lints] +workspace = true + +[build-dependencies] +omicron-rpaths.workspace = true + +[dependencies] +async-trait.workspace = true +nexus-db-model.workspace = true +omicron-common.workspace = true +omicron-uuid-kinds.workspace = true +nexus-auth.workspace = true +schemars.workspace = true +serde.workspace = true + +omicron-workspace-hack.workspace = true + +[features] +test-events = [] diff --git a/nexus/webhooks/src/events.rs b/nexus/webhooks/src/events.rs new file mode 100644 index 0000000000..0d449b6fdb --- /dev/null +++ b/nexus/webhooks/src/events.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "test-events")] +pub mod test; diff --git a/nexus/webhooks/src/events/test.rs b/nexus/webhooks/src/events/test.rs new file mode 100644 index 0000000000..5fd310801c --- /dev/null +++ b/nexus/webhooks/src/events/test.rs @@ -0,0 +1,54 @@ +use crate::{Event, EventClass}; +use schemars::JsonSchema; +use serde::Serialize; + +pub fn register_all(registry: &mut crate::EventSchemaRegistry) { + registry.register::(); + registry.register::(); + registry.register::(); + registry.register::(); +} + +// +// Define test event class types +// +#[derive(Debug, Serialize, JsonSchema)] +pub struct Foo { + pub hello_world: bool, +} + +impl Event for Foo { + const CLASS: EventClass = EventClass::TestFoo; + const VERSION: u32 = 1; +} + +#[derive(Debug, Serialize, JsonSchema)] +pub struct FooBar { + pub hello: &'static str, +} + +impl Event for FooBar { + const CLASS: EventClass = EventClass::TestFooBar; + const VERSION: u32 = 1; +} + +#[derive(Debug, Serialize, JsonSchema)] +pub struct FooBaz { + pub hello: &'static str, +} + +impl Event for FooBaz { + const CLASS: EventClass = EventClass::TestFooBaz; + const VERSION: u32 = 1; +} + +#[derive(Debug, Serialize, JsonSchema)] +pub struct QuuxBar { + pub a: bool, + pub b: usize, +} + +impl Event for QuuxBar { + const CLASS: EventClass = EventClass::TestQuuxBar; + const VERSION: u32 = 1; +} diff --git a/nexus/webhooks/src/lib.rs b/nexus/webhooks/src/lib.rs new file mode 100644 index 0000000000..aae001ef62 --- /dev/null +++ b/nexus/webhooks/src/lib.rs @@ -0,0 +1,85 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +pub use nexus_db_model::WebhookEventClass as EventClass; +pub use omicron_uuid_kinds::WebhookEventUuid as EventUuid; + +use nexus_auth::context::OpContext; +use omicron_common::api::external::Error; +use schemars::{JsonSchema, schema::Schema}; +use serde::Serialize; +use std::collections::BTreeMap; + +pub mod events; + +/// Trait implemented by types that represent the payload of a webhook event. +pub trait Event: Serialize + JsonSchema { + /// The event's event class. + const CLASS: EventClass; + /// The version number of the event's payload. + const VERSION: u32; +} + +/// The interface for publishing webhook events. +/// +/// This is represented as a separate trait, so that code in crates downstream +/// of `omicron-nexus` can publish events using the `Nexus` struct's +/// `webhook_event_publish` method, without depending on `nexus` itself. This +/// allows code in crates that are dependencies of `omicron-nexus` to publish +/// events. + +#[allow(async_fn_in_trait)] +pub trait PublishEvent { + async fn publish_event( + &self, + opctx: &OpContext, + id: EventUuid, + event: E, + ) -> Result; +} + +#[derive(Default)] +pub struct EventSchemaRegistry { + schemas: BTreeMap>, +} + +impl EventSchemaRegistry { + pub fn register(&mut self) { + let class = E::CLASS; + let version = E::VERSION; + let mut schema_gen = schemars::SchemaGenerator::new(Default::default()); + let schema = E::json_schema(&mut schema_gen); + + if self + .schemas + .entry(class) + .or_insert_with(BTreeMap::new) + .insert(version, schema) + .is_some() + { + panic!( + "Attempted to register two event class schemas for {class} v{version}!" + ); + } + } + + pub fn new() -> Self { + Self::default() + } + + pub fn schema_versions_for( + &self, + class: EventClass, + ) -> Option<&BTreeMap> { + self.schemas.get(&class) + } + + pub fn schema_for( + &self, + class: EventClass, + version: u32, + ) -> Option<&Schema> { + self.schema_versions_for(class)?.get(&version) + } +} diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index d0b0e28004..5c2e202a67 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -5284,6 +5284,9 @@ CREATE TABLE IF NOT EXISTS omicron.public.webhook_event ( -- The number of receivers that this event was dispatched to. num_dispatched INT8 NOT NULL, + -- The version number of the JSON schema for this event class. + payload_schema_version INT8 NOT NULL, + CONSTRAINT time_dispatched_set_if_dispatched CHECK ( (num_dispatched = 0) OR (time_dispatched IS NOT NULL) ), @@ -5301,7 +5304,8 @@ INSERT INTO omicron.public.webhook_event ( event_class, event, time_dispatched, - num_dispatched + num_dispatched, + payload_schema_version ) VALUES ( -- NOTE: this UUID is duplicated in nexus_db_model::webhook_event. '001de000-7768-4000-8000-000000000001', @@ -5312,7 +5316,8 @@ INSERT INTO omicron.public.webhook_event ( -- Pretend to be dispatched so we won't show up in "list events needing -- dispatch" queries NOW(), - 0 + 0, + 1 ) ON CONFLICT DO NOTHING; -- Look up webhook events in need of dispatching. diff --git a/schema/crdb/webhook-event-payload-schema-version/up01.sql b/schema/crdb/webhook-event-payload-schema-version/up01.sql new file mode 100644 index 0000000000..00ca8c8694 --- /dev/null +++ b/schema/crdb/webhook-event-payload-schema-version/up01.sql @@ -0,0 +1,2 @@ +ALTER TABLE omicron.public.webhook_event +ADD COLUMN IF NOT EXISTS payload_schema_version INT8 NOT NULL DEFAULT 1; diff --git a/schema/crdb/webhook-event-payload-schema-version/up02.sql b/schema/crdb/webhook-event-payload-schema-version/up02.sql new file mode 100644 index 0000000000..1a37761b87 --- /dev/null +++ b/schema/crdb/webhook-event-payload-schema-version/up02.sql @@ -0,0 +1,3 @@ +ALTER TABLE omicron.public.webhook_event +ALTER COLUMN payload_schema_version +DROP DEFAULT;