Skip to content

Commit 4244323

Browse files
committed
JoinSide: store 'Vec<PV>'s instead
1 parent ceef3fd commit 4244323

File tree

1 file changed

+51
-95
lines changed

1 file changed

+51
-95
lines changed

crates/core/src/subscription/subscription.rs

Lines changed: 51 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use spacetimedb_lib::identity::AuthCtx;
4040
use spacetimedb_lib::ProductValue;
4141
use spacetimedb_primitives::TableId;
4242
use spacetimedb_sats::db::auth::{StAccess, StTableType};
43-
use spacetimedb_sats::relation::{DbTable, Header};
43+
use spacetimedb_sats::relation::DbTable;
4444
use spacetimedb_vm::expr::{self, IndexJoin, Query, QueryExpr, SourceSet};
4545
use spacetimedb_vm::rel_ops::RelOps;
4646
use spacetimedb_vm::relation::MemTable;
@@ -210,31 +210,21 @@ pub struct IncrementalJoin {
210210
/// One side of an [`IncrementalJoin`].
211211
///
212212
/// Holds the "physical" [`DbTable`] this side of the join operates on, as well
213-
/// as the [`DatabaseTableUpdate`]s pertaining that table.
213+
/// as the updates pertaining to that table.
214214
struct JoinSide {
215-
table_id: TableId,
216-
table_name: String,
217-
inserts: Vec<TableOp>,
218-
deletes: Vec<TableOp>,
215+
inserts: Vec<ProductValue>,
216+
deletes: Vec<ProductValue>,
219217
}
220218

221219
impl JoinSide {
222-
/// Return a [`DatabaseTableUpdate`] consisting of only insert operations.
223-
pub fn inserts(&self) -> DatabaseTableUpdate {
224-
DatabaseTableUpdate {
225-
table_id: self.table_id,
226-
table_name: self.table_name.clone(),
227-
ops: self.inserts.to_vec(),
228-
}
220+
/// Return a list of updates consisting of only insert operations.
221+
pub fn inserts(&self) -> Vec<ProductValue> {
222+
self.inserts.clone()
229223
}
230224

231-
/// Return a [`DatabaseTableUpdate`] with only delete operations.
232-
pub fn deletes(&self) -> DatabaseTableUpdate {
233-
DatabaseTableUpdate {
234-
table_id: self.table_id,
235-
table_name: self.table_name.clone(),
236-
ops: self.deletes.to_vec(),
237-
}
225+
/// Return a list of updates with only delete operations.
226+
pub fn deletes(&self) -> Vec<ProductValue> {
227+
self.deletes.clone()
238228
}
239229

240230
/// Does this table update include inserts?
@@ -249,18 +239,6 @@ impl JoinSide {
249239
}
250240

251241
impl IncrementalJoin {
252-
/// Construct an empty [`DatabaseTableUpdate`] with the schema of `table`
253-
/// to use as a source when pre-compiling `eval_incr` queries.
254-
fn dummy_table_update(table: &DbTable) -> DatabaseTableUpdate {
255-
let table_id = table.table_id;
256-
let table_name = table.head.table_name.clone();
257-
DatabaseTableUpdate {
258-
table_id,
259-
table_name,
260-
ops: vec![],
261-
}
262-
}
263-
264242
fn optimize_query(join: IndexJoin) -> QueryExpr {
265243
let expr = QueryExpr::from(join);
266244
// Because (at least) one of the two tables will be a `MemTable`,
@@ -313,21 +291,15 @@ impl IncrementalJoin {
313291
.context("expected a physical database table")?
314292
.clone();
315293

316-
let (virtual_index_plan, _sources) =
317-
with_delta_table(join.clone(), Some(Self::dummy_table_update(&index_table)), None);
294+
let (virtual_index_plan, _sources) = with_delta_table(join.clone(), Some(Vec::new()), None);
318295
debug_assert_eq!(_sources.len(), 1);
319296
let virtual_index_plan = Self::optimize_query(virtual_index_plan);
320297

321-
let (virtual_probe_plan, _sources) =
322-
with_delta_table(join.clone(), None, Some(Self::dummy_table_update(&probe_table)));
298+
let (virtual_probe_plan, _sources) = with_delta_table(join.clone(), None, Some(Vec::new()));
323299
debug_assert_eq!(_sources.len(), 1);
324300
let virtual_probe_plan = Self::optimize_query(virtual_probe_plan);
325301

326-
let (virtual_plan, _sources) = with_delta_table(
327-
join.clone(),
328-
Some(Self::dummy_table_update(&index_table)),
329-
Some(Self::dummy_table_update(&probe_table)),
330-
);
302+
let (virtual_plan, _sources) = with_delta_table(join.clone(), Some(Vec::new()), Some(Vec::new()));
331303
debug_assert_eq!(_sources.len(), 2);
332304
let virtual_plan = virtual_plan.to_inner_join();
333305

@@ -360,41 +332,48 @@ impl IncrementalJoin {
360332
&self,
361333
updates: impl IntoIterator<Item = &'a DatabaseTableUpdate>,
362334
) -> Option<(JoinSide, JoinSide)> {
363-
let mut lhs_ops = Vec::new();
364-
let mut rhs_ops = Vec::new();
335+
let mut lhs_inserts = Vec::new();
336+
let mut lhs_deletes = Vec::new();
337+
let mut rhs_inserts = Vec::new();
338+
let mut rhs_deletes = Vec::new();
339+
340+
// Partitions deletes of `update` into `ds` and inserts into `is`.
341+
let partition_into = |ds: &mut Vec<_>, is: &mut Vec<_>, updates: &DatabaseTableUpdate| {
342+
for update in &updates.ops {
343+
if update.op_type == 0 { &mut *ds } else { &mut *is }.push(update.row.clone());
344+
}
345+
};
365346

347+
// Partitions all updates into the `l/rhs_insert/delete_ops` above.
366348
for update in updates {
367349
if update.table_id == self.lhs.table_id {
368-
lhs_ops.extend(update.ops.iter());
350+
partition_into(&mut lhs_deletes, &mut lhs_inserts, update);
369351
} else if update.table_id == self.rhs.table_id {
370-
rhs_ops.extend(update.ops.iter());
352+
partition_into(&mut rhs_deletes, &mut rhs_inserts, update);
371353
}
372354
}
373355

374-
if lhs_ops.is_empty() && rhs_ops.is_empty() {
356+
// No updates at all? Return `None`.
357+
if [&lhs_inserts, &lhs_deletes, &rhs_inserts, &rhs_deletes]
358+
.iter()
359+
.all(|ops| ops.is_empty())
360+
{
375361
return None;
376362
}
377363

378-
let join_side = |table: &DbTable, ops: Vec<&TableOp>| {
379-
let (deletes, inserts) = ops.into_iter().cloned().partition(|op| op.op_type == 0);
380-
JoinSide {
381-
table_id: table.table_id,
382-
table_name: table.head.table_name.clone(),
383-
deletes,
384-
inserts,
385-
}
386-
};
387-
Some((join_side(&self.lhs, lhs_ops), join_side(&self.rhs, rhs_ops)))
364+
// Stich together the `JoinSide`s.
365+
let join_side = |deletes, inserts| JoinSide { deletes, inserts };
366+
Some((join_side(lhs_deletes, lhs_inserts), join_side(rhs_deletes, rhs_inserts)))
388367
}
389368

390369
/// Evaluate join plan for lhs updates.
391370
fn eval_lhs(
392371
&self,
393372
db: &RelationalDB,
394373
tx: &Tx,
395-
lhs: DatabaseTableUpdate,
374+
lhs: Vec<ProductValue>,
396375
) -> Result<impl Iterator<Item = ProductValue>, DBError> {
397-
let lhs = to_mem_table(self.lhs.head.clone(), self.lhs.table_access, lhs);
376+
let lhs = MemTable::new(self.lhs.head.clone(), self.lhs.table_access, lhs);
398377
let mut sources = SourceSet::default();
399378
sources.add_mem_table(lhs);
400379
eval_updates(db, tx, self.plan_for_delta_lhs(), sources)
@@ -405,9 +384,9 @@ impl IncrementalJoin {
405384
&self,
406385
db: &RelationalDB,
407386
tx: &Tx,
408-
rhs: DatabaseTableUpdate,
387+
rhs: Vec<ProductValue>,
409388
) -> Result<impl Iterator<Item = ProductValue>, DBError> {
410-
let rhs = to_mem_table(self.rhs.head.clone(), self.rhs.table_access, rhs);
389+
let rhs = MemTable::new(self.rhs.head.clone(), self.rhs.table_access, rhs);
411390
let mut sources = SourceSet::default();
412391
sources.add_mem_table(rhs);
413392
eval_updates(db, tx, self.plan_for_delta_rhs(), sources)
@@ -418,11 +397,11 @@ impl IncrementalJoin {
418397
&self,
419398
db: &RelationalDB,
420399
tx: &Tx,
421-
lhs: DatabaseTableUpdate,
422-
rhs: DatabaseTableUpdate,
400+
lhs: Vec<ProductValue>,
401+
rhs: Vec<ProductValue>,
423402
) -> Result<impl Iterator<Item = ProductValue>, DBError> {
424-
let lhs = to_mem_table(self.lhs.head.clone(), self.lhs.table_access, lhs);
425-
let rhs = to_mem_table(self.rhs.head.clone(), self.rhs.table_access, rhs);
403+
let lhs = MemTable::new(self.lhs.head.clone(), self.lhs.table_access, lhs);
404+
let rhs = MemTable::new(self.rhs.head.clone(), self.rhs.table_access, rhs);
426405
let mut sources = SourceSet::default();
427406
let (index_side, probe_side) = if self.return_index_rows { (lhs, rhs) } else { (rhs, lhs) };
428407
sources.add_mem_table(index_side);
@@ -566,39 +545,25 @@ impl IncrementalJoin {
566545
}
567546
}
568547

569-
/// Construct a [`MemTable`] containing the updates from `delta`,
570-
/// which must be derived from a table with `head` and `table_access`.
571-
fn to_mem_table(head: Arc<Header>, table_access: StAccess, delta: DatabaseTableUpdate) -> MemTable {
572-
MemTable::new(
573-
head,
574-
table_access,
575-
delta.ops.into_iter().map(|op| op.row).collect::<Vec<_>>(),
576-
)
577-
}
578-
579548
/// Replace an [IndexJoin]'s scan or fetch operation with a delta table.
580549
/// A delta table consists purely of updates or changes to the base table.
581550
fn with_delta_table(
582551
mut join: IndexJoin,
583-
index_side: Option<DatabaseTableUpdate>,
584-
probe_side: Option<DatabaseTableUpdate>,
552+
index_side: Option<Vec<ProductValue>>,
553+
probe_side: Option<Vec<ProductValue>>,
585554
) -> (IndexJoin, SourceSet) {
586555
let mut sources = SourceSet::default();
587556

588557
if let Some(index_side) = index_side {
589558
let head = join.index_side.head().clone();
590559
let table_access = join.index_side.table_access();
591-
let mem_table = to_mem_table(head, table_access, index_side);
592-
let source_expr = sources.add_mem_table(mem_table);
593-
join.index_side = source_expr;
560+
join.index_side = sources.add_mem_table(MemTable::new(head, table_access, index_side));
594561
}
595562

596563
if let Some(probe_side) = probe_side {
597564
let head = join.probe_side.source.head().clone();
598565
let table_access = join.probe_side.source.table_access();
599-
let mem_table = to_mem_table(head, table_access, probe_side);
600-
let source_expr = sources.add_mem_table(mem_table);
601-
join.probe_side.source = source_expr;
566+
join.probe_side.source = sources.add_mem_table(MemTable::new(head, table_access, probe_side));
602567
}
603568

604569
(join, sources)
@@ -715,7 +680,6 @@ pub(crate) fn get_all(relational_db: &RelationalDB, tx: &Tx, auth: &AuthCtx) ->
715680
mod tests {
716681
use super::*;
717682
use crate::db::relational_db::tests_utils::make_test_db;
718-
use crate::host::module_host::TableOp;
719683
use crate::sql::compiler::compile_sql;
720684
use spacetimedb_lib::error::ResultTest;
721685
use spacetimedb_sats::relation::{DbTable, FieldName};
@@ -731,7 +695,7 @@ mod tests {
731695
// Create table [lhs] with index on [b]
732696
let schema = &[("a", AlgebraicType::U64), ("b", AlgebraicType::U64)];
733697
let indexes = &[(1.into(), "b")];
734-
let lhs_id = db.create_table_for_test("lhs", schema, indexes)?;
698+
let _ = db.create_table_for_test("lhs", schema, indexes)?;
735699

736700
// Create table [rhs] with index on [b, c]
737701
let schema = &[
@@ -761,11 +725,7 @@ mod tests {
761725
};
762726

763727
// Create an insert for an incremental update.
764-
let delta = DatabaseTableUpdate {
765-
table_id: lhs_id,
766-
table_name: String::from("lhs"),
767-
ops: vec![TableOp::insert(product![0u64, 0u64])],
768-
};
728+
let delta = vec![product![0u64, 0u64]];
769729

770730
// Optimize the query plan for the incremental update.
771731
let (expr, _sources) = with_delta_table(join, Some(delta), None);
@@ -829,7 +789,7 @@ mod tests {
829789
("d", AlgebraicType::U64),
830790
];
831791
let indexes = &[(0.into(), "b"), (1.into(), "c")];
832-
let rhs_id = db.create_table_for_test("rhs", schema, indexes)?;
792+
let _ = db.create_table_for_test("rhs", schema, indexes)?;
833793

834794
let tx = db.begin_tx();
835795
// Should generate an index join since there is an index on `lhs.b`.
@@ -850,11 +810,7 @@ mod tests {
850810
};
851811

852812
// Create an insert for an incremental update.
853-
let delta = DatabaseTableUpdate {
854-
table_id: rhs_id,
855-
table_name: String::from("rhs"),
856-
ops: vec![TableOp::insert(product![0u64, 0u64, 0u64])],
857-
};
813+
let delta = vec![product![0u64, 0u64, 0u64]];
858814

859815
// Optimize the query plan for the incremental update.
860816
let (expr, _sources) = with_delta_table(join, None, Some(delta));

0 commit comments

Comments
 (0)