Skip to content

fix: change detection for fixpoint queries #836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl<A: Accumulator> Ingredient for IngredientImpl<A> {
_db: &dyn Database,
_input: Id,
_revision: Revision,
_in_cycle: bool,
) -> VerifyResult {
panic!("nothing should ever depend on an accumulator directly")
}
Expand Down
14 changes: 14 additions & 0 deletions src/active_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ pub(crate) struct ActiveQuery {
}

impl ActiveQuery {
pub(super) fn seed_iteration(
&mut self,
durability: Durability,
changed_at: Revision,
edges: &[QueryEdge],
untracked_read: bool,
) {
assert!(self.input_outputs.is_empty());
self.input_outputs = edges.iter().cloned().collect();
self.durability = self.durability.min(durability);
self.changed_at = self.changed_at.max(changed_at);
self.untracked_read |= untracked_read;
}

pub(super) fn add_read(
&mut self,
input: DatabaseKeyIndex,
Expand Down
4 changes: 2 additions & 2 deletions src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,11 @@ where
db: &dyn Database,
input: Id,
revision: Revision,
in_cycle: bool,
) -> VerifyResult {
// SAFETY: The `db` belongs to the ingredient as per caller invariant
let db = unsafe { self.view_caster.downcast_unchecked(db) };
self.maybe_changed_after(db, input, revision)
self.maybe_changed_after(db, input, revision, in_cycle)
}

/// True if the input `input` contains a memo that cites itself as a cycle head.
Expand Down Expand Up @@ -285,7 +286,6 @@ where
_db: &dyn Database,
_executor: DatabaseKeyIndex,
_stale_output_key: crate::Id,
_provisional: bool,
) {
// This function is invoked when a query Q specifies the value for `stale_output_key` in rev 1,
// but not in rev 2. We don't do anything in this case, we just leave the (now stale) memo.
Expand Down
4 changes: 3 additions & 1 deletion src/function/backdate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::function::memo::Memo;
use crate::function::{Configuration, IngredientImpl};
use crate::zalsa_local::QueryRevisions;
use crate::DatabaseKeyIndex;

impl<C> IngredientImpl<C>
where
Expand All @@ -12,6 +13,7 @@ where
pub(super) fn backdate_if_appropriate<'db>(
&self,
old_memo: &Memo<C::Output<'db>>,
index: DatabaseKeyIndex,
revisions: &mut QueryRevisions,
value: &C::Output<'db>,
) {
Expand All @@ -24,7 +26,7 @@ where
&& C::values_equal(old_value, value)
{
tracing::debug!(
"value is equal, back-dating to {:?}",
"{index:?} value is equal, back-dating to {:?}",
old_memo.revisions.changed_at,
);

Expand Down
6 changes: 2 additions & 4 deletions src/function/diff_outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ where
key: DatabaseKeyIndex,
old_memo: &Memo<C::Output<'_>>,
revisions: &mut QueryRevisions,
provisional: bool,
) {
// Iterate over the outputs of the `old_memo` and put them into a hashset
let mut old_outputs: FxIndexSet<_> = old_memo.revisions.origin.outputs().collect();
Expand All @@ -50,7 +49,7 @@ where
});

for old_output in old_outputs {
Self::report_stale_output(zalsa, db, key, old_output, provisional);
Self::report_stale_output(zalsa, db, key, old_output);
}
}

Expand All @@ -59,14 +58,13 @@ where
db: &C::DbView,
key: DatabaseKeyIndex,
output: DatabaseKeyIndex,
provisional: bool,
) {
db.salsa_event(&|| {
Event::new(EventKind::WillDiscardStaleOutput {
execute_key: key,
output_key: output,
})
});
output.remove_stale_output(zalsa, db.as_dyn_database(), key, provisional);
output.remove_stale_output(zalsa, db.as_dyn_database(), key);
}
}
46 changes: 21 additions & 25 deletions src/function/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,11 @@ where
// really change, even if some of its inputs have. So we can
// "backdate" its `changed_at` revision to be the same as the
// old value.
self.backdate_if_appropriate(old_memo, &mut revisions, &new_value);
self.backdate_if_appropriate(old_memo, database_key_index, &mut revisions, &new_value);

// Diff the new outputs with the old, to discard any no-longer-emitted
// outputs and update the tracked struct IDs for seeding the next revision.
let provisional = !revisions.cycle_heads.is_empty();
self.diff_outputs(
zalsa,
db,
database_key_index,
old_memo,
&mut revisions,
provisional,
);
self.diff_outputs(zalsa, db, database_key_index, old_memo, &mut revisions);
}
self.insert_memo(
zalsa,
Expand Down Expand Up @@ -142,8 +134,14 @@ where
// only when a cycle is actually encountered.
let mut opt_last_provisional: Option<&Memo<<C as Configuration>::Output<'db>>> = None;
loop {
let (mut new_value, mut revisions) =
Self::execute_query(db, active_query, opt_old_memo, zalsa.current_revision(), id);
let previous_memo = opt_last_provisional.or(opt_old_memo);
let (mut new_value, mut revisions) = Self::execute_query(
db,
active_query,
previous_memo,
zalsa.current_revision(),
id,
);

// Did the new result we got depend on our own provisional value, in a cycle?
if revisions.cycle_heads.contains(&database_key_index) {
Expand Down Expand Up @@ -255,27 +253,25 @@ where
current_revision: Revision,
id: Id,
) -> (C::Output<'db>, QueryRevisions) {
// If we already executed this query once, then use the tracked-struct ids from the
// previous execution as the starting point for the new one.
if let Some(old_memo) = opt_old_memo {
// If we already executed this query once, then use the tracked-struct ids from the
// previous execution as the starting point for the new one.
active_query.seed_tracked_struct_ids(&old_memo.revisions.tracked_struct_ids);

// Copy over all inputs and outputs from a previous iteration.
// This is necessary to:
// * ensure that tracked struct created during the previous iteration
// (and are owned by the query) are alive even if the query in this iteration no longer creates them.
// * ensure the final returned memo depends on all inputs from all iterations.
if old_memo.may_be_provisional() && old_memo.verified_at.load() == current_revision {
active_query.seed_iteration(&old_memo.revisions);
}
}

// Query was not previously executed, or value is potentially
// stale, or value is absent. Let's execute!
let new_value = C::execute(db, C::id_to_input(db, id));

if let Some(old_memo) = opt_old_memo {
// Copy over all outputs from a previous iteration.
// This is necessary to ensure that tracked struct created during the previous iteration
// (and are owned by the query) are alive even if the query in this iteration no longer creates them.
// The query not re-creating the tracked struct doesn't guarantee that there
// aren't any other queries depending on it.
if old_memo.may_be_provisional() && old_memo.verified_at.load() == current_revision {
active_query.append_outputs(old_memo.revisions.origin.outputs());
}
}

(new_value, active_query.pop())
}
}
15 changes: 5 additions & 10 deletions src/function/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,10 @@ where
"hit cycle at {database_key_index:#?}, \
inserting and returning fixpoint initial value"
);
let revisions = QueryRevisions::fixpoint_initial(
database_key_index,
zalsa.current_revision(),
);
let initial_value = self.initial_value(db, id).expect(
"`CycleRecoveryStrategy::Fixpoint` \
should have initial_value",
);
let revisions = QueryRevisions::fixpoint_initial(database_key_index);
let initial_value = self
.initial_value(db, id)
.expect("`CycleRecoveryStrategy::Fixpoint` should have initial_value");
Some(self.insert_memo(
zalsa,
id,
Expand All @@ -159,8 +155,7 @@ where
);
let active_query = db.zalsa_local().push_query(database_key_index, 0);
let fallback_value = self.initial_value(db, id).expect(
"`CycleRecoveryStrategy::FallbackImmediate` \
should have initial_value",
"`CycleRecoveryStrategy::FallbackImmediate` should have initial_value",
);
let mut revisions = active_query.pop();
revisions.cycle_heads = CycleHeads::initial(database_key_index);
Expand Down
Loading