Skip to content

Commit ba55078

Browse files
committed
clean things up a bit
1 parent 0ac6532 commit ba55078

File tree

2 files changed

+157
-117
lines changed

2 files changed

+157
-117
lines changed

nexus/src/app/sagas/instance_update/mod.rs

Lines changed: 17 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,20 @@ use uuid::Uuid;
1616

1717
mod destroyed;
1818

19-
/// Parameters to the start instance update saga.
20-
#[derive(Debug, Deserialize, Serialize)]
21-
pub(crate) struct Params {
22-
/// Authentication context to use to fetch the instance's current state from
23-
/// the database.
24-
pub serialized_authn: authn::saga::Serialized,
25-
26-
pub authz_instance: authz::Instance,
27-
}
19+
// The public interface to this saga is actually a smaller saga that starts the
20+
// "real" update saga, which inherits the lock from the start saga. This is
21+
// because the decision of which subsaga(s) to run depends on the state of the
22+
// instance record read from the database *once the lock has been acquired*,
23+
// and the saga DAG for the "real" instance update saga may be constructed only
24+
// after the instance state has been fetched. However, since the the instance
25+
// state must be read inside the lock, that *also* needs to happen in a saga,
26+
// so that the lock is always dropped when unwinding. Thus, we have a second,
27+
// smaller saga which starts our real saga, and then the real saga, which
28+
// decides what DAG to build based on the instance fetched by the start saga.
29+
//
30+
// Don't worry, this won't be on the test.
31+
mod start;
32+
pub(crate) use self::start::{Params, SagaInstanceUpdate};
2833

2934
/// Parameters to the "real" instance update saga.
3035
#[derive(Debug, Deserialize, Serialize)]
@@ -43,21 +48,6 @@ const INSTANCE_LOCK_ID: &str = "saga_instance_lock_id";
4348
declare_saga_actions! {
4449
instance_update;
4550

46-
// Acquire the instance updater" lock with this saga's ID if no other saga
47-
// is currently updating the instance.
48-
LOCK_INSTANCE -> "saga_instance_lock_gen" {
49-
+ siu_lock_instance
50-
- siu_lock_instance_undo
51-
}
52-
53-
// Fetch the instance and VMM's state, and start the "real" instance update saga.
54-
// N.B. that this must be performed as a separate action from
55-
// `LOCK_INSTANCE`, so that if the lookup fails, we will still unwind the
56-
// `LOCK_INSTANCE` action and release the lock.
57-
FETCH_STATE_AND_START_REAL_SAGA -> "state" {
58-
+ siu_fetch_state_and_start_real_saga
59-
}
60-
6151
// Become the instance updater
6252
BECOME_UPDATER -> "generation" {
6353
+ siu_become_updater
@@ -70,36 +60,9 @@ declare_saga_actions! {
7060
}
7161

7262
// instance update saga: definition
63+
struct SagaDoActualInstanceUpdate;
7364

74-
#[derive(Debug)]
75-
pub(crate) struct SagaInstanceUpdate;
76-
impl NexusSaga for SagaInstanceUpdate {
77-
const NAME: &'static str = "start-instance-update";
78-
type Params = Params;
79-
80-
fn register_actions(registry: &mut ActionRegistry) {
81-
instance_update_register_actions(registry);
82-
}
83-
84-
fn make_saga_dag(
85-
_params: &Self::Params,
86-
mut builder: DagBuilder,
87-
) -> Result<steno::Dag, super::SagaInitError> {
88-
builder.append(Node::action(
89-
INSTANCE_LOCK_ID,
90-
"GenerateInstanceLockId",
91-
ACTION_GENERATE_ID.as_ref(),
92-
));
93-
builder.append(lock_instance_action());
94-
builder.append(fetch_state_and_start_real_saga_action());
95-
96-
Ok(builder.build()?)
97-
}
98-
}
99-
100-
struct SagaRealInstanceUpdate;
101-
102-
impl NexusSaga for SagaRealInstanceUpdate {
65+
impl NexusSaga for SagaDoActualInstanceUpdate {
10366
const NAME: &'static str = "instance-update";
10467
type Params = RealParams;
10568

@@ -165,58 +128,6 @@ impl NexusSaga for SagaRealInstanceUpdate {
165128
}
166129
}
167130

168-
// instance update saga: action implementations
169-
170-
async fn siu_lock_instance(
171-
sagactx: NexusActionContext,
172-
) -> Result<(), ActionError> {
173-
let osagactx = sagactx.user_data();
174-
let Params { ref serialized_authn, ref authz_instance, .. } =
175-
sagactx.saga_params::<Params>()?;
176-
let lock_id = sagactx.lookup::<Uuid>(INSTANCE_LOCK_ID)?;
177-
let opctx =
178-
crate::context::op_context_for_saga_action(&sagactx, serialized_authn);
179-
slog::info!(
180-
osagactx.log(),
181-
"instance update: attempting to lock instance";
182-
"instance_id" => %authz_instance.id(),
183-
"saga_id" => %lock_id,
184-
);
185-
osagactx
186-
.datastore()
187-
.instance_updater_lock(&opctx, authz_instance, &lock_id)
188-
.await
189-
.map_err(ActionError::action_failed)
190-
.map(|_| ())
191-
}
192-
193-
async fn siu_fetch_state_and_start_real_saga(
194-
sagactx: NexusActionContext,
195-
) -> Result<(), ActionError> {
196-
let osagactx = sagactx.user_data();
197-
let Params { serialized_authn, authz_instance, .. } =
198-
sagactx.saga_params::<Params>()?;
199-
let opctx =
200-
crate::context::op_context_for_saga_action(&sagactx, &serialized_authn);
201-
202-
let state = osagactx
203-
.datastore()
204-
.instance_fetch_with_vmms(&opctx, &authz_instance)
205-
.await
206-
.map_err(ActionError::action_failed)?;
207-
osagactx
208-
.nexus()
209-
.execute_saga::<SagaRealInstanceUpdate>(RealParams {
210-
serialized_authn,
211-
authz_instance,
212-
state,
213-
})
214-
.await
215-
.map_err(ActionError::action_failed)?;
216-
217-
Ok(())
218-
}
219-
220131
async fn siu_become_updater(
221132
sagactx: NexusActionContext,
222133
) -> Result<(), ActionError> {
@@ -244,7 +155,7 @@ async fn siu_become_updater(
244155

245156
slog::info!(
246157
osagactx.log(),
247-
"instance update: became instance updater";
158+
"Now, I am become Updater, the destroyer of VMMs.";
248159
"instance_id" => %authz_instance.id(),
249160
"saga_id" => %lock_id,
250161
"parent_id" => ?state.instance.runtime_state.updater_id,
@@ -271,17 +182,6 @@ async fn siu_unlock_instance(
271182
unlock_instance_inner(serialized_authn, authz_instance, &sagactx).await
272183
}
273184

274-
// N.B. that this has to be a separate function just because the undo action
275-
// must return `anyhow::Error` rather than `ActionError`.
276-
async fn siu_lock_instance_undo(
277-
sagactx: NexusActionContext,
278-
) -> Result<(), anyhow::Error> {
279-
let Params { ref serialized_authn, ref authz_instance, .. } =
280-
sagactx.saga_params::<Params>()?;
281-
unlock_instance_inner(serialized_authn, authz_instance, &sagactx).await?;
282-
Ok(())
283-
}
284-
285185
async fn unlock_instance_inner(
286186
serialized_authn: &authn::saga::Serialized,
287187
authz_instance: &authz::Instance,
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
5+
// instance update start saga
6+
7+
// This Source Code Form is subject to the terms of the Mozilla Public
8+
// License, v. 2.0. If a copy of the MPL was not distributed with this
9+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
10+
11+
use super::{
12+
ActionRegistry, NexusActionContext, NexusSaga, SagaInitError,
13+
ACTION_GENERATE_ID, INSTANCE_LOCK_ID,
14+
};
15+
use crate::app::sagas::declare_saga_actions;
16+
use nexus_db_queries::{authn, authz};
17+
use serde::{Deserialize, Serialize};
18+
use steno::{ActionError, DagBuilder, Node};
19+
use uuid::Uuid;
20+
21+
/// Parameters to the start instance update saga.
22+
#[derive(Debug, Deserialize, Serialize)]
23+
pub(crate) struct Params {
24+
/// Authentication context to use to fetch the instance's current state from
25+
/// the database.
26+
pub serialized_authn: authn::saga::Serialized,
27+
28+
pub authz_instance: authz::Instance,
29+
}
30+
31+
// instance update saga: actions
32+
33+
declare_saga_actions! {
34+
instance_update;
35+
36+
// Acquire the instance updater" lock with this saga's ID if no other saga
37+
// is currently updating the instance.
38+
LOCK_INSTANCE -> "saga_instance_lock_gen" {
39+
+ siu_lock_instance
40+
- siu_lock_instance_undo
41+
}
42+
43+
// Fetch the instance and VMM's state, and start the "real" instance update saga.
44+
// N.B. that this must be performed as a separate action from
45+
// `LOCK_INSTANCE`, so that if the lookup fails, we will still unwind the
46+
// `LOCK_INSTANCE` action and release the lock.
47+
FETCH_STATE_AND_START_REAL_SAGA -> "state" {
48+
+ siu_fetch_state_and_start_real_saga
49+
}
50+
}
51+
52+
// instance update saga: definition
53+
54+
#[derive(Debug)]
55+
pub(crate) struct SagaInstanceUpdate;
56+
impl NexusSaga for SagaInstanceUpdate {
57+
const NAME: &'static str = "start-instance-update";
58+
type Params = Params;
59+
60+
fn register_actions(registry: &mut ActionRegistry) {
61+
instance_update_register_actions(registry);
62+
}
63+
64+
fn make_saga_dag(
65+
_params: &Self::Params,
66+
mut builder: DagBuilder,
67+
) -> Result<steno::Dag, SagaInitError> {
68+
builder.append(Node::action(
69+
INSTANCE_LOCK_ID,
70+
"GenerateInstanceLockId",
71+
ACTION_GENERATE_ID.as_ref(),
72+
));
73+
builder.append(lock_instance_action());
74+
builder.append(fetch_state_and_start_real_saga_action());
75+
76+
Ok(builder.build()?)
77+
}
78+
}
79+
80+
// start instance update saga: action implementations
81+
82+
async fn siu_lock_instance(
83+
sagactx: NexusActionContext,
84+
) -> Result<(), ActionError> {
85+
let osagactx = sagactx.user_data();
86+
let Params { ref serialized_authn, ref authz_instance, .. } =
87+
sagactx.saga_params::<Params>()?;
88+
let lock_id = sagactx.lookup::<Uuid>(INSTANCE_LOCK_ID)?;
89+
let opctx =
90+
crate::context::op_context_for_saga_action(&sagactx, serialized_authn);
91+
slog::info!(
92+
osagactx.log(),
93+
"instance update: attempting to lock instance";
94+
"instance_id" => %authz_instance.id(),
95+
"saga_id" => %lock_id,
96+
);
97+
osagactx
98+
.datastore()
99+
.instance_updater_lock(&opctx, authz_instance, &lock_id)
100+
.await
101+
.map_err(ActionError::action_failed)
102+
.map(|_| ())
103+
}
104+
105+
async fn siu_lock_instance_undo(
106+
sagactx: NexusActionContext,
107+
) -> Result<(), anyhow::Error> {
108+
let Params { ref serialized_authn, ref authz_instance, .. } =
109+
sagactx.saga_params::<Params>()?;
110+
super::unlock_instance_inner(serialized_authn, authz_instance, &sagactx)
111+
.await?;
112+
Ok(())
113+
}
114+
115+
async fn siu_fetch_state_and_start_real_saga(
116+
sagactx: NexusActionContext,
117+
) -> Result<(), ActionError> {
118+
let osagactx = sagactx.user_data();
119+
let Params { serialized_authn, authz_instance, .. } =
120+
sagactx.saga_params::<Params>()?;
121+
let opctx =
122+
crate::context::op_context_for_saga_action(&sagactx, &serialized_authn);
123+
124+
let state = osagactx
125+
.datastore()
126+
.instance_fetch_with_vmms(&opctx, &authz_instance)
127+
.await
128+
.map_err(ActionError::action_failed)?;
129+
osagactx
130+
.nexus()
131+
.execute_saga::<super::SagaDoActualInstanceUpdate>(super::RealParams {
132+
serialized_authn,
133+
authz_instance,
134+
state,
135+
})
136+
.await
137+
.map_err(ActionError::action_failed)?;
138+
139+
Ok(())
140+
}

0 commit comments

Comments
 (0)