Skip to content

WIP: Add Rust implementation for DABA #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions rust/src/daba/chunked_array_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use std::collections::linked_list::Cursor as ChunkCursor;
use std::collections::LinkedList;
use std::mem::MaybeUninit;

/// An [Unrolled Linked List](https://en.wikipedia.org/wiki/Unrolled_linked_list).
pub struct ChunkedArrayQueue<Elem, const SIZE: usize> {
chunks: LinkedList<Chunk<Elem, SIZE>>,
}

struct Chunk<Elem, const SIZE: usize> {
elems: [MaybeUninit<Elem>; SIZE],
front: usize,
back: usize,
}

#[derive(Clone)]
pub(crate) struct Cursor<'i, Elem, const SIZE: usize> {
/// A cursor pointing at a chunk
chunk_cursor: ChunkCursor<'i, Chunk<Elem, SIZE>>,
/// An index pointing at an element inside the chunk
elem_index: usize,
}

impl<Elem, const SIZE: usize> Chunk<Elem, SIZE>
where
Elem: Copy,
{
fn new_middle() -> Self {
Self {
// Unsafe needed so we don't have to initialize every element of the array
elems: unsafe { std::mem::MaybeUninit::uninit().assume_init() },
front: SIZE / 2,
back: SIZE / 2,
}
}
fn new_front() -> Self {
Self {
elems: unsafe { std::mem::MaybeUninit::uninit().assume_init() },
front: SIZE - 1,
back: SIZE - 1,
}
}
fn new_back() -> Self {
Self {
elems: unsafe { std::mem::MaybeUninit::uninit().assume_init() },
front: 0,
back: 0,
}
}
}

impl<Elem, const SIZE: usize> ChunkedArrayQueue<Elem, SIZE>
where
Elem: Copy,
{
pub(crate) fn new() -> Self {
let chunk = Chunk::new_middle();
let mut chunks = LinkedList::new();
chunks.push_back(chunk);
Self { chunks }
}
pub(crate) fn push_back(&mut self, v: Elem) {
match self.chunks.back_mut() {
Some(chunk) if chunk.back < SIZE - 1 => {
chunk.back += 1;
chunk.elems[chunk.back] = MaybeUninit::new(v);
}
_ => {
self.chunks.push_back(Chunk::new_back());
self.push_back(v)
}
}
}
pub(crate) fn push_front(&mut self, v: Elem) {
match self.chunks.front_mut() {
Some(chunk) if chunk.front > 0 => {
chunk.front -= 1;
chunk.elems[chunk.front] = MaybeUninit::new(v);
}
_ => {
self.chunks.push_front(Chunk::new_front());
self.push_front(v)
}
}
}
pub(crate) fn pop_back(&mut self) -> Option<Elem> {
match self.chunks.back_mut() {
Some(chunk) if chunk.front <= chunk.back => unsafe {
let val = chunk.elems[chunk.back].assume_init();
chunk.elems[chunk.back] = MaybeUninit::uninit().assume_init();
chunk.back -= 1;
if chunk.back == 0 {
self.chunks.pop_back();
}
Some(val)
},
_ => None,
}
}
pub(crate) fn pop_front(&mut self) -> Option<Elem> {
match self.chunks.front_mut() {
Some(chunk) if chunk.front <= chunk.back => unsafe {
let val = chunk.elems[chunk.front].assume_init();
chunk.elems[chunk.front] = MaybeUninit::uninit().assume_init();
chunk.front += 1;
if chunk.front == SIZE {
self.chunks.pop_front();
}
Some(val)
},
_ => None,
}
}
pub(crate) fn index_front(&mut self) -> Cursor<'_, Elem, SIZE> {
let chunk_cursor = self.chunks.cursor_front();
let elem_index = self.chunks.front().unwrap().front;
Cursor {
chunk_cursor,
elem_index,
}
}
pub(crate) fn index_back(&mut self) -> Cursor<'_, Elem, SIZE> {
let chunk_cursor = self.chunks.cursor_back();
let elem_index = self.chunks.back().unwrap().back;
Cursor {
chunk_cursor,
elem_index,
}
}
}

impl<'i, Elem, const SIZE: usize> Cursor<'i, Elem, SIZE> {
pub(crate) fn move_next(&mut self) {
if self.elem_index + 1 < self.chunk_cursor.current().unwrap().front {
self.elem_index += 1;
} else {
self.elem_index = 0;
self.chunk_cursor.move_next()
}
}
pub(crate) fn move_prev(&mut self) {
if self.elem_index < self.chunk_cursor.current().unwrap().back {
self.elem_index -= 1;
} else {
self.elem_index = SIZE - 1;
self.chunk_cursor.move_prev()
}
}
pub(crate) fn get(&mut self) -> &Elem {
// OK because elem should always be initialized at this point
unsafe { &self.chunk_cursor.current().unwrap().elems[self.elem_index].get_ref() }
}
pub(crate) fn set(&mut self, v: Elem) {
// Cursors must be able to mutate elems. Using CursorMut is not possible,
// since the cursor must also be cloneable. Therefore, unsafe is used here.
unsafe {
let ptr = self.chunk_cursor.current().unwrap() as *const Chunk<Elem, SIZE>
as *mut Chunk<Elem, SIZE>;
(*ptr).elems[self.elem_index] = MaybeUninit::new(v);
}
}
}

impl<'i, Elem, const SIZE: usize> PartialEq for Cursor<'i, Elem, SIZE> {
fn eq(&self, other: &Self) -> bool {
self.elem_index == other.elem_index
&& self.chunk_cursor.index() == other.chunk_cursor.index()
}
}

#[test]
fn test() {
let mut queue = ChunkedArrayQueue::<i32, 512>::new();
queue.push_front(4);
queue.push_front(4);
queue.push_front(7);
queue.push_back(1);
assert_eq!(queue.pop_front(), Some(7));
assert_eq!(queue.pop_front(), Some(4));
assert_eq!(queue.pop_back(), Some(1));
assert_eq!(queue.pop_front(), Some(4));
}
162 changes: 162 additions & 0 deletions rust/src/daba/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
mod chunked_array_queue;
use crate::FifoWindow;
use alga::general::AbstractMonoid;
use alga::general::Operator;
use std::collections::VecDeque;
use std::marker::PhantomData;

#[derive(Clone)]
struct Item<Value> {
val: Value,
agg: Value,
}

impl<Value> Item<Value> {
fn new(val: Value, agg: Value) -> Self {
Self { val, agg }
}
}

#[derive(Clone)]
pub struct DABA<Value, BinOp>
where
Value: AbstractMonoid<BinOp> + Clone,
BinOp: Operator,
{
// ith oldest value in FIFO order stored at vi = vals[i]
items: VecDeque<Item<Value>>,
// 0 ≤ l ≤ r ≤ a ≤ b ≤ items.len()
l: usize, // Left, ∀p ∈ l...r−1 : items[p].agg = items[p].val ⊕ ... ⊕ items[r−1].val
r: usize, // Right, ∀p ∈ r...a−1 : items[p].agg = items[R].val ⊕ ... ⊕ items[p].val
a: usize, // Accum, ∀p ∈ a...b−1 : items[p].agg = items[p].val ⊕ ... ⊕ items[b−1].val
b: usize, // Back, ∀p ∈ b...e−1 : items[p].agg = items[B].val ⊕ ... ⊕ items[p].val
op: PhantomData<BinOp>,
}

impl<Value, BinOp> FifoWindow<Value, BinOp> for DABA<Value, BinOp>
where
Value: AbstractMonoid<BinOp> + Clone,
BinOp: Operator,
{
fn new() -> Self {
Self {
items: VecDeque::new(),
l: 0,
r: 0,
a: 0,
b: 0,
op: PhantomData,
}
}
fn push(&mut self, v: Value) {
let agg = self.agg_b().operate(&v);
self.items.push_back(Item::new(v, agg));
self.fixup();
}
fn pop(&mut self) {
if self.items.pop_front().is_some() {
self.l -= 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure decrementing is the right thing here - it may be, but it should be carefully considered.

You've implemented l, r, a and b as indices, which is something we're just now grappling with: we implemented them as iterators, but our presentation in the 2017 paper was kinda loose with are they iterators/pointers or indices. When they're indices, that means f==0 and e==vals.len() is always true, and don't need to explicitly represent them. I think you've correctly done that. But I'm unsure about this decrementing - you're following our published algorithms for fixup(), which did not assume decrementing on eviction. It's also possible there's a subtly I'm missing with the semantics of VecDeque.

Copy link
Contributor Author

@segeljakt segeljakt Jul 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Rust it's possible to overload the indexing operator. The indexing for VecDeque is an alias for get which indexes from the front (including offset). I decrement all indices when removing the front element to shift them left. I believe it should work but I might be mistaken. If the VecDeque was replaced with a regular Vec, whose indexing does not include offset, then I would not decrement.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, now I see what you mean. Calling pop_front() on a VecDeque effectively shifts the entire deque to the left, so if you had an index for location i, accessing q[i] before and after the pop will yield different values. You're decrementing i so that it continues to point at the same value in the deque.

I was wondering if it would be easier to just use an iterator, and I don't think it is. From what I can tell, there's no easy way to "decrement" a standard Rust iterator as is required in the shrink case. So, in effect, you have to do your own iterator management.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we want is a linked list of arrays with "cursors".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the concept of a cursor is essentially what C++ style iterators are, and how we ended up presenting the algorithm. The chunked array queue is a linked list, it just has many elements in each node.

self.r -= 1;
self.a -= 1;
self.b -= 1;
self.fixup();
}
}
fn query(&self) -> Value {
self.agg_f().operate(&self.agg_b())
}
fn len(&self) -> usize {
self.items.len()
}
fn is_empty(&self) -> bool {
self.items.is_empty()
}
}

impl<Value, BinOp> DABA<Value, BinOp>
where
Value: AbstractMonoid<BinOp> + Clone,
BinOp: Operator,
{
#[inline(always)]
fn agg_f(&self) -> Value {
if self.items.is_empty() {
Value::identity()
} else {
self.items.front().unwrap().agg.clone()
}
}
#[inline(always)]
fn agg_b(&self) -> Value {
if self.b == self.items.len() {
Value::identity()
} else {
self.items.back().unwrap().agg.clone()
}
}
#[inline(always)]
fn agg_l(&self) -> Value {
if self.l == self.r {
Value::identity()
} else {
self.items[self.l].agg.clone()
}
}
#[inline(always)]
fn agg_r(&self) -> Value {
if self.r == self.a {
Value::identity()
} else {
self.items[self.a - 1].agg.clone()
}
}
#[inline(always)]
fn agg_a(&self) -> Value {
if self.a == self.b {
Value::identity()
} else {
self.items[self.a].agg.clone()
}
}
#[inline(always)]
fn fixup(&mut self) {
if self.b == 0 {
self.singleton()
} else {
if self.l == self.b {
self.flip()
}
if self.l == self.r {
self.shift()
} else {
self.shrink()
}
}
}
#[inline(always)]
fn singleton(&mut self) {
self.l = self.items.len();
self.r = self.l;
self.a = self.l;
self.b = self.l;
}
#[inline(always)]
fn flip(&mut self) {
self.l = 0;
self.a = self.items.len();
self.b = self.a;
}
#[inline(always)]
fn shift(&mut self) {
self.a += 1;
self.r += 1;
self.l += 1;
}
#[inline(always)]
fn shrink(&mut self) {
self.items[self.l].agg = self.agg_l().operate(&self.agg_r()).operate(&self.agg_a());
self.l += 1;
self.items[self.a - 1].agg = self.items[self.a - 1].val.operate(&self.agg_a());
self.a -= 1;
}
}
7 changes: 7 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#![feature(linked_list_cursors)]
#![feature(maybe_uninit_ref)]
#![feature(maybe_uninit_extra)]
#![feature(const_generics)]
#![allow(incomplete_features)]
use alga::general::Operator;
use std::ops::Range;

Expand Down Expand Up @@ -57,3 +62,5 @@ pub mod two_stacks;
/// Reactive-Aggregator
pub mod reactive;

/// De-Amortized Banker's Aggregator
pub mod daba;
12 changes: 6 additions & 6 deletions rust/tests/fifo_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ where
}

test_matrix! {
test1 => [ recalc::ReCalc, soe::SoE, reactive::Reactive, two_stacks::TwoStacks ],
test2 => [ recalc::ReCalc, soe::SoE, reactive::Reactive, two_stacks::TwoStacks ],
test3 => [ recalc::ReCalc, reactive::Reactive, two_stacks::TwoStacks ],
test4 => [ recalc::ReCalc, soe::SoE, reactive::Reactive, two_stacks::TwoStacks ],
test5 => [ recalc::ReCalc, soe::SoE, reactive::Reactive, two_stacks::TwoStacks ],
test6 => [ recalc::ReCalc, soe::SoE, reactive::Reactive, two_stacks::TwoStacks ]
test1 => [ recalc::ReCalc, soe::SoE, reactive::Reactive, two_stacks::TwoStacks, daba::DABA ],
test2 => [ recalc::ReCalc, soe::SoE, reactive::Reactive, two_stacks::TwoStacks, daba::DABA ],
test3 => [ recalc::ReCalc, reactive::Reactive, two_stacks::TwoStacks, daba::DABA ],
test4 => [ recalc::ReCalc, soe::SoE, reactive::Reactive, two_stacks::TwoStacks, daba::DABA ],
test5 => [ recalc::ReCalc, soe::SoE, reactive::Reactive, two_stacks::TwoStacks, daba::DABA ],
test6 => [ recalc::ReCalc, soe::SoE, reactive::Reactive, two_stacks::TwoStacks, daba::DABA ]
}