Skip to content

Commit f0cd9c2

Browse files
committed
[nexus] add instance-updater lock
In order to simplify the instance lifecycle state machine and ensure that instance state updates are processed reliably, we intend to perform instance state updates in a saga (which will be added in PR #5749). This saga will require a notion of mutual exclusion between update sagas for the same instance, in order to avoid race conditions like the following: 1. Sagas `S1` and `S2` start at the same time and observe the same instance/VMM states, which indicate that the instance’s active VMM has shut down 2. `S1` clears all the resources/provisioning counters and marks the instance as `Stopped`` 3. User restarts the instance 4. `S2` clears the same instance provisioning counters again Presently, these races are avoided by the fact that instance state updates are performed partially in `sled-agent`, which serves as an "actor" with exclusive ownership over the state transition. Moving these state transitions to Nexus requires introducing mutual exclusion. This commit adds a distributed lock on instance state transitions to the datastore. We add the following fields to the `instance` table: - `updater_id`, which is the UUID of the saga currently holding the update lock on the instance (or `NULL` if no saga has locked the instance) - `updater_gen`, a generation counter that is incremented each time the lock is acquired by a new saga Using these fields, we can add new datastore methods to try and acquire an instance update lock by doing the following: 1. Generate a UUID for the saga, `saga_lock_id`. This will be performed in the saga itself and isn't part of this PR. 2. Read the instance record and interpret the value of the `updater_id` field as follows: - `NULL`: lock not held, we can acquire it by incrementing the `updater_gen` field and setting the `updater_id` field to the saga's UUID. - `updater_id == saga_id`: the saga already holds the lock, we can proceed with the update. - `updater_id != saga_id`: another saga holds the lock, we can't proceed with the update. Fail the operation. 3. Attempt to write back the updated instance record with generation incremented and the `updater_id` set to the saga's ID, conditional on the `updater_gen` field being equal to the ID that was read when read the instance record. This is equivalent to the atomic compare-and-swap operation that one might use to implement a non-distributed lock in a single address space. - If this fails because the generation number is outdated, try again (i.e. goto (2)). - If this succeeds, the lock was acquired successfully. Additionally, we can add a method for unlocking an instance record by clearing the `updater_id` field and incrementing the `updater_gen`. This query is conditional on the `updater_id` field being equal to the saga's UUID, to prevent cases where we accidentally unlock an instance that was locked by a different saga. Introducing this distributed lock is considered fairly safe, as it will only ever be acquired in a saga, and the reverse action for the saga action that acquires the lock will ensure that the lock is released if the saga unwinds. Essentially, this is equivalent to a RAII guard releasing a lock when a thread panics in a single-threaded Rust program. Presently, none of these methods are actually used. The saga that uses them will be added in PR #5749. I've factored out this change into its own PR so that we can merge the foundation needed for that branch. Hopefully this makes the diff a bit smaller and easier to review, as well as decreasing merge conflict churn with the schema changes.
1 parent 1b66988 commit f0cd9c2

File tree

1 file changed

+365
-0
lines changed

1 file changed

+365
-0
lines changed

nexus/db-queries/src/db/datastore/instance.rs

Lines changed: 365 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,4 +718,369 @@ impl DataStore {
718718

719719
Ok(())
720720
}
721+
722+
/// Attempts to lock an instance's record to apply state updates in an
723+
/// instance-update saga, returning the state of the instance when the lock
724+
/// was acquired.
725+
///
726+
/// # Notes
727+
///
728+
/// This method MUST only be called from the context of a saga! The
729+
/// calling saga must ensure that the reverse action for the action that
730+
/// acquires the lock must call [`DataStore::instance_updater_unlock`] to
731+
/// ensure that the lock is always released if the saga unwinds.
732+
///
733+
/// # Arguments
734+
///
735+
/// - `authz_instance`: the instance to attempt to lock to lock
736+
/// - `saga_lock_id`: the UUID of the saga that's attempting to lock this
737+
/// instance.
738+
///
739+
/// # Returns
740+
///
741+
/// - [`Ok`]`(`[Instance`]`)` if the lock was acquired.
742+
/// - [`Err`]`([`Error::InternalError`])` if the instance was locked by
743+
/// another saga.
744+
/// - [`Err`]`(`[`Error::ObjectNotFound`]`)` if no instance with the
745+
/// provided ID exists (or the instance record has been deleted).
746+
/// - [`Err`]`(other)` if the database query failed for another reason.
747+
pub async fn instance_updater_lock(
748+
&self,
749+
opctx: &OpContext,
750+
authz_instance: &authz::Instance,
751+
saga_lock_id: &Uuid,
752+
) -> Result<Instance, Error> {
753+
let mut instance = self.instance_refetch(opctx, authz_instance).await?;
754+
// `true` if the instance was locked by *this* call to
755+
// `instance_updater_lock`, *false* in the (rare) case that it was
756+
// previously locked by *this* saga's ID. This is used only for logging,
757+
// as this method is idempotent --- if the instance's current updater ID
758+
// matches the provided saga ID, this method completes successfully.
759+
//
760+
// XXX(eliza): I *think* this is the right behavior for sagas, since
761+
// saga actions are expected to be idempotent...but it also means that a
762+
// UUID collision would allow two sagas to lock the instance. But...(1)
763+
// a UUID collision is extremely unlikely, and (2), if a UUID collision
764+
// *did* occur, the odds are even lower that the same UUID would
765+
// assigned to two instance-update sagas which both try to update the
766+
// *same* instance at the same time. So, idempotency is probably more
767+
// important than handling that extremely unlikely edge case.
768+
let mut did_lock = false;
769+
loop {
770+
match instance.runtime_state.updater_id {
771+
// If the `updater_id` field is not null and the ID equals this
772+
// saga's ID, we already have the lock. We're done here!
773+
Some(lock_id) if lock_id == *saga_lock_id => {
774+
slog::info!(
775+
&opctx.log,
776+
"instance updater lock acquired!";
777+
"instance_id" => %instance.id(),
778+
"saga_id" => %saga_lock_id,
779+
"already_locked" => !did_lock,
780+
);
781+
return Ok(instance);
782+
}
783+
// The `updater_id` field is set, but it's not our ID. The instance
784+
// is locked by a different saga, so give up.
785+
Some(lock_id) => {
786+
slog::info!(
787+
&opctx.log,
788+
"instance is locked by another saga";
789+
"instance_id" => %instance.id(),
790+
"locked_by" => %lock_id,
791+
"saga_id" => %saga_lock_id,
792+
);
793+
return Err(Error::internal_error(
794+
"instance is already locked by another saga",
795+
));
796+
}
797+
// No saga's ID is set as the instance's `updater_id`. We can
798+
// attempt to lock it.
799+
None => {}
800+
}
801+
let gen = instance.runtime_state.updater_gen;
802+
slog::debug!(
803+
&opctx.log,
804+
"attempting to acquire instance updater lock";
805+
"instance_id" => %instance.id(),
806+
"saga_id" => %saga_lock_id,
807+
"current_gen" => ?gen,
808+
);
809+
(instance, did_lock) = self
810+
.instance_updater_try_lock(
811+
opctx,
812+
authz_instance,
813+
gen,
814+
saga_lock_id,
815+
)
816+
.await?;
817+
}
818+
}
819+
820+
/// Attempts to lock an instance's record to apply state updates in an
821+
/// instance-update saga, if the pprovided `current_gen` matches the
822+
/// instance's current `updater_gen`.
823+
///
824+
/// This function will attempt to set the `updater_id` field on the record
825+
/// corresponding to the provided `authz_instance` to the provided
826+
/// `saga_lock_id`. If the instance's `updater_gen` field is equal to
827+
/// `current_gen`, and the instance's `updater_id` field is null, then the
828+
/// generation is advanced and the `updater_id` field is set to
829+
/// `saga_lock_id`, acquiring the lock for the calling saga. Otherwise, if
830+
/// the generation has advanced since `current_gen` was captured, or if
831+
/// another saga has locked the instance, the lock is not acquired.
832+
///
833+
/// # Notes
834+
///
835+
/// This method MUST only be called from the context of a saga! The
836+
/// calling saga must ensure that the reverse action for the action that
837+
/// acquires the lock must call [`DataStore::instance_updater_unlock`] to
838+
/// ensure that the lock is always released if the saga unwinds.
839+
///
840+
/// # Arguments
841+
///
842+
/// - `authz_instance`: the instance to attempt to lock to lock
843+
/// - `current_gen`: the current generation of the instance's `updater_id`
844+
/// - `saga_lock_id`: the UUID of the saga that's attempting to lock this
845+
/// instance.
846+
///
847+
/// # Returns
848+
///
849+
/// - [`Ok`]`((`[Instance`]`, true))` if the lock was acquired.
850+
/// - [`Ok`]`((`[Instance`]`, false))` if the lock was not acquired because
851+
/// the instance was already locked by another saga, or because the
852+
/// generation has advanced since `current_gen` was captured.
853+
/// - [`Err`]`(`[`Error::ObjectNotFound`]`)` if no instance with the
854+
/// provided ID exists (or the instance record has been deleted).
855+
/// - [`Err`]`(other)` if the database query failed for another reason.
856+
pub async fn instance_updater_try_lock(
857+
&self,
858+
opctx: &OpContext,
859+
authz_instance: &authz::Instance,
860+
current_gen: Generation,
861+
saga_lock_id: &Uuid,
862+
) -> Result<(Instance, bool), Error> {
863+
use db::schema::instance::dsl;
864+
865+
// The generation to advance to.
866+
let new_gen = Generation(current_gen.0.next());
867+
868+
let instance_id = authz_instance.id();
869+
870+
let locked = diesel::update(dsl::instance)
871+
.filter(dsl::time_deleted.is_null())
872+
.filter(dsl::id.eq(instance_id))
873+
// If the generation is the same as the captured generation when we
874+
// read the instance record to check if it was not locked, we can
875+
// lock this instance. Otherwise, the update will fail. This query
876+
// is equivalent to an atomic compare-and-swap instruction in a
877+
// non-distributed single-process mutex.
878+
.filter(dsl::updater_gen.eq(current_gen))
879+
.set((
880+
dsl::updater_gen.eq(new_gen),
881+
dsl::updater_id.eq(Some(*saga_lock_id)),
882+
))
883+
.check_if_exists::<Instance>(instance_id)
884+
.execute_and_check(&*self.pool_connection_authorized(opctx).await?)
885+
.await
886+
.map(|r| {
887+
// If we successfully updated the instance record, we have
888+
// acquired the lock; otherwise, we haven't --- either because
889+
// our generation is stale, or because the instance is already locked.
890+
let locked = match r.status {
891+
UpdateStatus::Updated => true,
892+
UpdateStatus::NotUpdatedButExists => false,
893+
};
894+
(r.found, locked)
895+
})
896+
.map_err(|e| {
897+
public_error_from_diesel(
898+
e,
899+
ErrorHandler::NotFoundByLookup(
900+
ResourceType::Instance,
901+
LookupType::ById(instance_id),
902+
),
903+
)
904+
})?;
905+
906+
Ok(locked)
907+
}
908+
909+
/// Release the instance-updater lock acquired by
910+
/// [`DataStore::instance_updater_try_lock`].
911+
pub async fn instance_updater_unlock(
912+
&self,
913+
opctx: &OpContext,
914+
authz_instance: &authz::Instance,
915+
saga_lock_id: &Uuid,
916+
) -> Result<bool, Error> {
917+
use db::schema::instance::dsl;
918+
919+
let instance_id = authz_instance.id();
920+
921+
let unlocked = diesel::update(dsl::instance)
922+
.filter(dsl::time_deleted.is_null())
923+
.filter(dsl::id.eq(authz_instance.id()))
924+
// Only unlock the instance if:
925+
// - the provided updater ID matches that of the saga that has
926+
// currently locked this instance.
927+
.filter(dsl::updater_id.eq(Some(*saga_lock_id)))
928+
.set((
929+
dsl::updater_gen.eq(dsl::updater_gen + 1),
930+
dsl::updater_id.eq(None::<Uuid>),
931+
))
932+
.check_if_exists::<Instance>(instance_id)
933+
.execute_and_check(&*self.pool_connection_authorized(opctx).await?)
934+
.await
935+
.map(|r| match r.status {
936+
UpdateStatus::Updated => true,
937+
// TODO(eliza): should this be an error?
938+
UpdateStatus::NotUpdatedButExists => false,
939+
})
940+
.map_err(|e| {
941+
public_error_from_diesel(
942+
e,
943+
ErrorHandler::NotFoundByLookup(
944+
ResourceType::Instance,
945+
LookupType::ById(instance_id),
946+
),
947+
)
948+
})?;
949+
950+
Ok(unlocked)
951+
}
952+
}
953+
954+
#[cfg(test)]
955+
mod tests {
956+
use super::*;
957+
use crate::db::datastore::test_utils::datastore_test;
958+
use crate::db::fixed_data;
959+
use crate::db::lookup::LookupPath;
960+
use nexus_db_model::Project;
961+
use nexus_test_utils::db::test_setup_database;
962+
use nexus_types::external_api::params;
963+
use omicron_common::api::external::ByteCount;
964+
use omicron_common::api::external::IdentityMetadataCreateParams;
965+
966+
use omicron_test_utils::dev;
967+
968+
#[tokio::test]
969+
async fn test_instance_updater_acquires_lock() {
970+
// Setup
971+
let logctx = dev::test_setup_log("test_empty_blueprint");
972+
let mut db = test_setup_database(&logctx.log).await;
973+
let (opctx, datastore) = datastore_test(&logctx, &db).await;
974+
let silo_id = *fixed_data::silo::DEFAULT_SILO_ID;
975+
let project_id = Uuid::new_v4();
976+
let instance_id = Uuid::new_v4();
977+
let saga1 = Uuid::new_v4();
978+
let saga2 = Uuid::new_v4();
979+
980+
let (authz_project, _project) = datastore
981+
.project_create(
982+
&opctx,
983+
Project::new_with_id(
984+
project_id,
985+
silo_id,
986+
params::ProjectCreate {
987+
identity: IdentityMetadataCreateParams {
988+
name: "stuff".parse().unwrap(),
989+
description: "Where I keep my stuff".into(),
990+
},
991+
},
992+
),
993+
)
994+
.await
995+
.expect("project must be created successfully");
996+
997+
let _ = datastore
998+
.project_create_instance(
999+
&opctx,
1000+
&authz_project,
1001+
Instance::new(
1002+
instance_id,
1003+
project_id,
1004+
&params::InstanceCreate {
1005+
identity: IdentityMetadataCreateParams {
1006+
name: "myinstance".parse().unwrap(),
1007+
description: "It's an instance".into(),
1008+
},
1009+
ncpus: 2i64.try_into().unwrap(),
1010+
memory: ByteCount::from_gibibytes_u32(16),
1011+
hostname: "myhostname".try_into().unwrap(),
1012+
user_data: Vec::new(),
1013+
network_interfaces:
1014+
params::InstanceNetworkInterfaceAttachment::None,
1015+
external_ips: Vec::new(),
1016+
disks: Vec::new(),
1017+
ssh_public_keys: None,
1018+
start: false,
1019+
},
1020+
),
1021+
)
1022+
.await
1023+
.expect("instance must be created successfully");
1024+
1025+
let (.., authz_instance) = LookupPath::new(&opctx, &datastore)
1026+
.instance_id(instance_id)
1027+
.lookup_for(authz::Action::Modify)
1028+
.await
1029+
.expect("instance must exist");
1030+
1031+
macro_rules! assert_locked {
1032+
($id:expr) => {
1033+
let instance = dbg!(
1034+
datastore
1035+
.instance_updater_lock(&opctx, &authz_instance, &$id)
1036+
.await
1037+
)
1038+
.expect(concat!(
1039+
"instance must be locked by ",
1040+
stringify!($id)
1041+
));
1042+
assert_eq!(
1043+
instance.runtime_state.updater_id,
1044+
Some($id),
1045+
"instance's `updater_id` must be set to {}",
1046+
stringify!($id),
1047+
);
1048+
};
1049+
}
1050+
1051+
macro_rules! assert_not_locked {
1052+
($id:expr) => {
1053+
let err = dbg!(datastore
1054+
.instance_updater_lock(&opctx, &authz_instance, &$id)
1055+
.await)
1056+
.expect_err("attempting to lock the instance while it is already locked must fail");
1057+
assert_eq!(
1058+
err,
1059+
Error::internal_error("instance is already locked by another saga")
1060+
);
1061+
};
1062+
}
1063+
1064+
// attempt to lock the instance from saga 1
1065+
assert_locked!(saga1);
1066+
1067+
// now, also attempt to lock the instance from saga 2. this must fail.
1068+
assert_not_locked!(saga2);
1069+
1070+
// unlock the instance from saga 1
1071+
datastore
1072+
.instance_updater_unlock(&opctx, &authz_instance, &saga1)
1073+
.await
1074+
.expect("instance must be unlocked by saga 1");
1075+
1076+
// now, locking the instance from saga 2 should succeed.
1077+
assert_locked!(saga2);
1078+
1079+
// trying to lock the instance again from saga 1 should fail
1080+
assert_not_locked!(saga1);
1081+
1082+
// Clean up.
1083+
db.cleanup().await.unwrap();
1084+
logctx.cleanup_successful();
1085+
}
7211086
}

0 commit comments

Comments
 (0)