diff --git a/.sqlx/query-851d4723bbabf1c8b9a512ce9496d5489d04542bd7eb52a4bb840407a567e342.json b/.sqlx/query-851d4723bbabf1c8b9a512ce9496d5489d04542bd7eb52a4bb840407a567e342.json new file mode 100644 index 000000000..10b427889 --- /dev/null +++ b/.sqlx/query-851d4723bbabf1c8b9a512ce9496d5489d04542bd7eb52a4bb840407a567e342.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE cdn_invalidation_queue\n SET\n created_in_cdn = CURRENT_TIMESTAMP,\n cdn_reference = $1\n WHERE\n cdn_distribution_id = $2 AND created_in_cdn IS NULL", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Varchar", + "Text" + ] + }, + "nullable": [] + }, + "hash": "851d4723bbabf1c8b9a512ce9496d5489d04542bd7eb52a4bb840407a567e342" +} diff --git a/.sqlx/query-c0250bce719751f94bfcfd1904652234950624b4e03f5e7d716f2afa79769929.json b/.sqlx/query-c0250bce719751f94bfcfd1904652234950624b4e03f5e7d716f2afa79769929.json new file mode 100644 index 000000000..bfc2943bc --- /dev/null +++ b/.sqlx/query-c0250bce719751f94bfcfd1904652234950624b4e03f5e7d716f2afa79769929.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT queued\n FROM cdn_invalidation_queue\n WHERE cdn_distribution_id = $1 AND created_in_cdn IS NULL\n FOR UPDATE", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "queued", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "c0250bce719751f94bfcfd1904652234950624b4e03f5e7d716f2afa79769929" +} diff --git a/.sqlx/query-eafa89f156c92fefcdc2195a1308e4837885c57b7522e5766d69993d9e917e51.json b/.sqlx/query-eafa89f156c92fefcdc2195a1308e4837885c57b7522e5766d69993d9e917e51.json new file mode 100644 index 000000000..34349e72d --- /dev/null +++ b/.sqlx/query-eafa89f156c92fefcdc2195a1308e4837885c57b7522e5766d69993d9e917e51.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT\n min(queued)\n FROM cdn_invalidation_queue\n WHERE\n cdn_distribution_id = $1 AND\n created_in_cdn IS NULL", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "min", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "eafa89f156c92fefcdc2195a1308e4837885c57b7522e5766d69993d9e917e51" +} diff --git a/src/cdn.rs b/src/cdn.rs index efa5143c6..25be58cd1 100644 --- a/src/cdn.rs +++ b/src/cdn.rs @@ -298,8 +298,67 @@ impl CdnBackend { } } +/// fully invalidate the CDN distribution, also emptying the queue. +#[instrument(skip(conn))] +pub(crate) async fn full_invalidation( + config: &Config, + cdn: &CdnBackend, + metrics: &InstanceMetrics, + conn: &mut sqlx::PgConnection, + distribution_id: &str, +) -> Result<()> { + let mut transaction = conn.begin().await?; + + let now = Utc::now(); + for row in sqlx::query!( + "SELECT queued + FROM cdn_invalidation_queue + WHERE cdn_distribution_id = $1 AND created_in_cdn IS NULL + FOR UPDATE", + distribution_id, + ) + .fetch_all(&mut *transaction) + .await? + { + if let Ok(duration) = (now - row.queued).to_std() { + // This can only fail when the duration is negative, which can't happen anyways + metrics + .cdn_queue_time + .with_label_values(&[distribution_id]) + .observe(duration_to_seconds(duration)); + } + } + + match cdn + .create_invalidation(distribution_id, &["/*"]) + .await + .context("error creating new invalidation") + { + Ok(invalidation) => { + sqlx::query!( + "UPDATE cdn_invalidation_queue + SET + created_in_cdn = CURRENT_TIMESTAMP, + cdn_reference = $1 + WHERE + cdn_distribution_id = $2 AND created_in_cdn IS NULL", + invalidation.invalidation_id, + distribution_id, + ) + .execute(&mut *transaction) + .await?; + + transaction.commit().await?; + } + Err(err) => return Err(err), + }; + + Ok(()) +} + #[instrument(skip(conn))] pub(crate) async fn handle_queued_invalidation_requests( + config: &Config, cdn: &CdnBackend, metrics: &InstanceMetrics, conn: &mut sqlx::PgConnection, @@ -385,6 +444,24 @@ pub(crate) async fn handle_queued_invalidation_requests( return Ok(()); } + if let Some(min_queued) = sqlx::query_scalar!( + "SELECT + min(queued) + FROM cdn_invalidation_queue + WHERE + cdn_distribution_id = $1 AND + created_in_cdn IS NULL", + distribution_id + ) + .fetch_one(&mut *conn) + .await? + { + if (now - min_queued).to_std().unwrap_or_default() >= config.cdn_max_queued_age { + full_invalidation(config, cdn, metrics, conn, distribution_id).await?; + return Ok(()); + } + } + // create new an invalidation for the queued path patterns let mut transaction = conn.begin().await?; let mut path_patterns: Vec = Vec::new(); @@ -566,6 +643,8 @@ pub(crate) async fn queued_or_active_crate_invalidation_count_by_distribution( #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; use crate::test::async_wrapper; @@ -671,6 +750,111 @@ mod tests { }) } + #[test] + fn escalate_to_full_invalidation() { + crate::test::async_wrapper(|env| async move { + env.override_config(|config| { + config.cloudfront_distribution_id_web = Some("distribution_id_web".into()); + config.cloudfront_distribution_id_static = Some("distribution_id_static".into()); + config.cdn_max_queued_age = Duration::from_secs(0); + }); + + let cdn = env.cdn().await; + let config = env.config(); + let mut conn = env.async_db().await.async_conn().await; + assert!(queued_or_active_crate_invalidations(&mut conn) + .await? + .is_empty()); + + queue_crate_invalidation(&mut conn, &env.config(), "krate").await?; + + // invalidation paths are queued. + assert_eq!( + queued_or_active_crate_invalidations(&mut conn) + .await? + .into_iter() + .map(|i| ( + i.cdn_distribution_id, + i.krate, + i.path_pattern, + i.cdn_reference + )) + .collect::>(), + vec![ + ( + "distribution_id_web".into(), + "krate".into(), + "/krate*".into(), + None + ), + ( + "distribution_id_web".into(), + "krate".into(), + "/crate/krate*".into(), + None + ), + ( + "distribution_id_static".into(), + "krate".into(), + "/rustdoc/krate*".into(), + None + ), + ] + ); + + let counts = + queued_or_active_crate_invalidation_count_by_distribution(&mut conn, &config) + .await?; + assert_eq!(counts.len(), 2); + assert_eq!(*counts.get("distribution_id_web").unwrap(), 2); + assert_eq!(*counts.get("distribution_id_static").unwrap(), 1); + + // queueing the invalidation doesn't create it in the CDN + assert!(active_invalidations(&cdn, "distribution_id_web").is_empty()); + assert!(active_invalidations(&cdn, "distribution_id_static").is_empty()); + + let cdn = env.cdn().await; + let config = env.config(); + + // now handle the queued invalidations + handle_queued_invalidation_requests( + &config, + &cdn, + &env.instance_metrics(), + &mut conn, + "distribution_id_web", + ) + .await?; + handle_queued_invalidation_requests( + &config, + &cdn, + &env.instance_metrics(), + &mut conn, + "distribution_id_static", + ) + .await?; + + // which creates them in the CDN + { + let ir_web = active_invalidations(&cdn, "distribution_id_web"); + assert_eq!(ir_web.len(), 1); + assert_eq!(ir_web[0].path_patterns, vec!["/*"]); + + let ir_static = active_invalidations(&cdn, "distribution_id_static"); + assert_eq!(ir_web.len(), 1); + assert_eq!(ir_static[0].path_patterns, vec!["/*"]); + } + + // the queued entries got a CDN reference attached + assert!(queued_or_active_crate_invalidations(&mut conn) + .await? + .iter() + .all(|i| i.cdn_reference.is_some() && i.created_in_cdn.is_some())); + + Ok(()) + }); + } + #[test] fn invalidate_a_crate() { crate::test::async_wrapper(|env| async move { @@ -734,9 +918,11 @@ mod tests { assert!(active_invalidations(&cdn, "distribution_id_static").is_empty()); let cdn = env.cdn().await; + let config = env.config(); // now handle the queued invalidations handle_queued_invalidation_requests( + &config, &cdn, &env.instance_metrics(), &mut conn, @@ -744,6 +930,7 @@ mod tests { ) .await?; handle_queued_invalidation_requests( + &config, &cdn, &env.instance_metrics(), &mut conn, @@ -774,6 +961,7 @@ mod tests { // now handle again handle_queued_invalidation_requests( + &config, &cdn, &env.instance_metrics(), &mut conn, @@ -781,6 +969,7 @@ mod tests { ) .await?; handle_queued_invalidation_requests( + &config, &cdn, &env.instance_metrics(), &mut conn, @@ -849,6 +1038,7 @@ mod tests { // handle the queued invalidations handle_queued_invalidation_requests( + &env.config(), &*env.cdn().await, &env.instance_metrics(), &mut conn, @@ -909,6 +1099,7 @@ mod tests { // handle the queued invalidations handle_queued_invalidation_requests( + &env.config(), &*env.cdn().await, &env.instance_metrics(), &mut conn, @@ -937,6 +1128,7 @@ mod tests { // now handle again handle_queued_invalidation_requests( + &env.config(), &*env.cdn().await, &env.instance_metrics(), &mut conn, @@ -976,6 +1168,7 @@ mod tests { // run the handler handle_queued_invalidation_requests( + &env.config(), &*env.cdn().await, &env.instance_metrics(), &mut conn, diff --git a/src/config.rs b/src/config.rs index 32bbdbef4..60be0b2cb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -90,6 +90,11 @@ pub struct Config { pub(crate) cdn_backend: CdnKind, + /// The maximum age of a queued invalidation request before it is + /// considered too old and we fall back to a full purge of the + /// distributions. + pub(crate) cdn_max_queued_age: Duration, + // CloudFront distribution ID for the web server. // Will be used for invalidation-requests. pub cloudfront_distribution_id_web: Option, @@ -201,6 +206,7 @@ impl Config { cache_invalidatable_responses: env("DOCSRS_CACHE_INVALIDATEABLE_RESPONSES", true)?, cdn_backend: env("DOCSRS_CDN_BACKEND", CdnKind::Dummy)?, + cdn_max_queued_age: Duration::from_secs(env("DOCSRS_CDN_MAX_QUEUED_AGE", 3600)?), cloudfront_distribution_id_web: maybe_env("CLOUDFRONT_DISTRIBUTION_ID_WEB")?, cloudfront_distribution_id_static: maybe_env("CLOUDFRONT_DISTRIBUTION_ID_STATIC")?, diff --git a/src/utils/daemon.rs b/src/utils/daemon.rs index bcd88e14b..94789245c 100644 --- a/src/utils/daemon.rs +++ b/src/utils/daemon.rs @@ -123,6 +123,7 @@ pub fn start_background_cdn_invalidator(context: &dyn Context) -> Result<(), Err let mut conn = pool.get_async().await?; if let Some(distribution_id) = config.cloudfront_distribution_id_web.as_ref() { cdn::handle_queued_invalidation_requests( + &config, &cdn, &metrics, &mut conn, @@ -133,6 +134,7 @@ pub fn start_background_cdn_invalidator(context: &dyn Context) -> Result<(), Err } if let Some(distribution_id) = config.cloudfront_distribution_id_static.as_ref() { cdn::handle_queued_invalidation_requests( + &config, &cdn, &metrics, &mut conn,