From 744a81fe580172734fe35014300da7ab4f9dd1ed Mon Sep 17 00:00:00 2001 From: ad hoc Date: Sun, 15 Oct 2023 20:49:15 +0200 Subject: [PATCH] wip --- Cargo.lock | 20 ++ sqld/Cargo.toml | 1 + sqld/proto/replication_log.proto | 7 +- sqld/src/namespace/mod.rs | 5 + sqld/src/replication/primary/mod.rs | 1 + sqld/src/replication/primary/storage/mod.rs | 196 ++++++++++++++++++++ sqld/src/replication/snapshot.rs | 42 ++++- 7 files changed, 267 insertions(+), 5 deletions(-) create mode 100644 sqld/src/replication/primary/storage/mod.rs diff --git a/Cargo.lock b/Cargo.lock index cdb03085..27a0fff8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3516,6 +3516,15 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.22" @@ -3862,6 +3871,7 @@ dependencies = [ "url", "uuid", "vergen", + "walkdir", ] [[package]] @@ -4617,6 +4627,16 @@ dependencies = [ "libc", ] +[[package]] +name = "walkdir" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" diff --git a/sqld/Cargo.toml b/sqld/Cargo.toml index 9ef5e1d2..d3df4ba5 100644 --- a/sqld/Cargo.toml +++ b/sqld/Cargo.toml @@ -72,6 +72,7 @@ async-stream = "0.3.5" libsql = { git = "https://github.com/tursodatabase/libsql", rev = "8847ca05c", optional = true } metrics = "0.21.1" metrics-exporter-prometheus = "0.12.1" +walkdir = "2.4.0" [dev-dependencies] proptest = "1.0.0" diff --git a/sqld/proto/replication_log.proto b/sqld/proto/replication_log.proto index 8ac5db45..3f2da4d0 100644 --- a/sqld/proto/replication_log.proto +++ b/sqld/proto/replication_log.proto @@ -19,9 +19,14 @@ message Frames { repeated Frame frames = 1; } +message BatchEntriesResponse { + Frames frames = 1; + HelloResponse hs_response = 2; +} + service ReplicationLog { rpc Hello(HelloRequest) returns (HelloResponse) {} rpc LogEntries(LogOffset) returns (stream Frame) {} - rpc BatchLogEntries(LogOffset) returns (Frames) {} + rpc BatchLogEntries(LogOffset) returns (BatchEntriesResponse) {} rpc Snapshot(LogOffset) returns (stream Frame) {} } diff --git a/sqld/src/namespace/mod.rs b/sqld/src/namespace/mod.rs index 647a9e3c..afbc64e8 100644 --- a/sqld/src/namespace/mod.rs +++ b/sqld/src/namespace/mod.rs @@ -91,6 +91,11 @@ impl NamespaceName { pub fn as_slice(&self) -> &[u8] { &self.0 } + + /// return the u128 hash of the namespace + pub fn id(&self) -> u128{ + todo!(); + } } impl fmt::Display for NamespaceName { diff --git a/sqld/src/replication/primary/mod.rs b/sqld/src/replication/primary/mod.rs index a7d0eb84..03474d28 100644 --- a/sqld/src/replication/primary/mod.rs +++ b/sqld/src/replication/primary/mod.rs @@ -1,2 +1,3 @@ pub mod frame_stream; pub mod logger; +mod storage; diff --git a/sqld/src/replication/primary/storage/mod.rs b/sqld/src/replication/primary/storage/mod.rs new file mode 100644 index 00000000..7164cb4a --- /dev/null +++ b/sqld/src/replication/primary/storage/mod.rs @@ -0,0 +1,196 @@ +use std::cmp::Ordering; +use std::collections::HashSet; +use std::fs::create_dir_all; +use std::io::{Seek, Write, SeekFrom}; +use std::mem::size_of; +use std::path::{PathBuf, Path}; +use std::pin::Pin; +use std::task::{Poll, Context}; + +use bytemuck::{Zeroable, Pod, bytes_of}; +use futures::StreamExt; +use futures_core::Stream; +use walkdir::{WalkDir, DirEntry}; + +use crate::LIBSQL_PAGE_SIZE; +use crate::replication::frame::{FrameHeader, FrameMut}; +use crate::replication::snapshot::{SnapshotFileHeader, SnapshotFile}; +use crate::replication::{FrameNo, frame::Frame}; +use crate::namespace::NamespaceName; + +pub struct Compactor { + wal: W, + seen: HashSet, +} + +impl Compactor { + /// calls f on the deduplicated frames of the WAL, in reverse frame order. + /// returns the (start_frame_no, end_frame_no, page_count, size_after) + fn frames_with(mut self, mut f: F) -> (FrameNo, FrameNo, u64, u32) + where F: FnMut(&LibsqlFrame, FrameNo) + { + let wal_frame_count = self.wal.frame_count(); + let mut snapshot_frame_count = 0; + let mut size_after = 0; + + while let Some(frame) = self.wal.next_frame_back() { + let frame_no = wal_frame_count - self.seen.len(); + if !self.seen.contains(&frame.header.page_number) { + self.seen.insert(frame.header.page_number); + f(frame, frame_no as FrameNo); + if snapshot_frame_count == 0 { + size_after = frame.header.size_after; + } + snapshot_frame_count += 1; + } + } + + let start_frame_no = self.wal.start_frame_no(); + let end_frame_no = start_frame_no + self.wal.frame_count() as u64; + + (start_frame_no, end_frame_no, snapshot_frame_count, size_after) + } +} + +pub trait Wal { + /// Returns the start frame_no for the current WAL + fn start_frame_no(&self) -> FrameNo; + /// Returns the next frame from the WAL, starting from the end of the WAL. + fn next_frame_back<'a>(&'a mut self) -> Option<&'a LibsqlFrame>; + /// returns the number of frames in the WAL + fn frame_count(&self) -> usize; +} + +pub trait Storage { + type LocateFrameStream: Stream; + + fn checkpoint_wal( + &self, + namespace: NamespaceName, + reader: Compactor, + ) + where W: Wal + Send; + + fn locate(&self, namespace: NamespaceName, frame_no: FrameNo) -> Self::LocateFrameStream; +} + +pub struct FsStorage { + path: PathBuf, +} + +impl Storage for FsStorage { + type LocateFrameStream = FsFrameStreamer; + + fn checkpoint_wal( + &self, + namespace: NamespaceName, + compactor: Compactor, + ) + where W: Wal + Send + { + let mut file = tempfile::NamedTempFile::new().unwrap(); + file.seek(std::io::SeekFrom::Start(size_of::() as _)).unwrap(); + let (start_frame_no, end_frame_no, frame_count, size_after) = compactor.frames_with(|frame, frame_no| { + let frame_header = FrameHeader { + frame_no, + checksum: 0, + page_no: frame.header.page_number, + size_after: 0, + }; + + file.write_all(bytes_of(&frame_header)).unwrap(); + file.write_all(&frame.data).unwrap(); + }); + + let snapshot_header = SnapshotFileHeader { + namespace_id: namespace.id(), + start_frame_no, + end_frame_no, + frame_count, + size_after, + _pad: 0, + }; + + file.seek(SeekFrom::Start(0)).unwrap(); + file.write_all(bytes_of(&snapshot_header)).unwrap(); + + let ns_snapshot_path = self.path.join(namespace.as_str()); + + create_dir_all(&ns_snapshot_path).unwrap(); + + let snapshot_name = format!("{start_frame_no}-{end_frame_no}"); + let snapshot_path = ns_snapshot_path.join(snapshot_name); + + file.persist(snapshot_path).unwrap(); + } + + fn locate(&self, namespace: NamespaceName, mut frame_no: FrameNo) -> Self::LocateFrameStream { + let path = self.path.join(namespace.as_str()); + FsFrameStreamer { + frame_no, + path, + current_snapshot: None, + } + } +} + +struct FsFrameStreamer { + frame_no: FrameNo, + path: PathBuf, + current_snapshot: Option>>>>, +} + +impl Stream for FsFrameStreamer { + type Item = Frame; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.current_snapshot.is_none() { + let Some(snapshot) = locate_snapshot_file(&self.path, self.frame_no) else { return Poll::Ready(None) }; + let stream = snapshot.into_stream_from(self.frame_no).into(); + self.current_snapshot.replace(Box::pin(stream)); + } + + let current = self.current_snapshot.as_mut().unwrap(); + + match current.poll_next_unpin(cx) + } +} + +fn locate_snapshot_file(dir: &Path, frame_no: FrameNo) -> Option { + for entry in WalkDir::new(dir) { + let entry = entry.unwrap(); + let mut split = entry.file_name().to_str().unwrap().split("-"); + let start_fno: FrameNo = split.next().unwrap().parse().unwrap(); + let end_fno: FrameNo = split.next().unwrap().parse().unwrap(); + if (start_fno..=end_fno).contains(&frame_no) { + // FIXME: there is a chance that the snapshot we're trying to open was deleted, we + // should try to relocate the next snapshot + return Some(SnapshotFile::open(entry.path()).unwrap()) + } + } + + None +} + +#[derive(Debug, Copy, Clone, Zeroable, Pod, PartialEq, Eq)] +#[repr(C)] +pub struct LibsqlFrame { + header: LibsqlFrameHeader, + data: [u8; LIBSQL_PAGE_SIZE as usize], +} + +#[derive(Debug, Copy, Clone, Zeroable, Pod, PartialEq, Eq)] +#[repr(C)] +pub struct LibsqlFrameHeader { + page_number: u32, + size_after: u32, + salt1: u32, + salt2: u32, + checksum1: u32, + checksum2: u32, +} + +#[cfg(test)] +mod test { + use super::*; +} diff --git a/sqld/src/replication/snapshot.rs b/sqld/src/replication/snapshot.rs index a9e2fd35..f7593ad8 100644 --- a/sqld/src/replication/snapshot.rs +++ b/sqld/src/replication/snapshot.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::fs::File; use std::io::BufWriter; +use std::io::SeekFrom; use std::io::Write; use std::mem::size_of; use std::os::unix::prelude::FileExt; @@ -14,9 +15,13 @@ use anyhow::Context; use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; use bytes::BytesMut; use crossbeam::channel::bounded; +use futures::StreamExt; +use futures_core::Stream; use once_cell::sync::Lazy; use regex::Regex; use tempfile::NamedTempFile; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; use uuid::Uuid; use crate::namespace::NamespaceName; @@ -35,7 +40,7 @@ const MAX_SNAPSHOT_NUMBER: usize = 32; #[repr(C)] pub struct SnapshotFileHeader { /// id of the database - pub log_id: u128, + pub namespace_id: u128, /// first frame in the snapshot pub start_frame_no: u64, /// end frame in the snapshot @@ -174,6 +179,35 @@ impl SnapshotFile { }) } + pub fn into_stream(self) -> impl Stream> { + async_stream::try_stream! { + let mut current_offset = 0; + let mut file = tokio::fs::File::from_std(self.file); + loop { + if current_offset >= self.header.frame_count { + break; + } + let read_offset = size_of::() as u64 + + current_offset * LogFile::FRAME_SIZE as u64; + current_offset += 1; + let mut buf = BytesMut::zeroed(LogFile::FRAME_SIZE); + file.seek(SeekFrom::Start(read_offset)).await?; + file.read_exact(&mut buf).await?; + let frame = FrameMut::try_from(&*buf)?; + yield frame; + } + } + } + + pub fn into_stream_from(self, from: FrameNo) -> impl Stream> { + self.into_stream().take_while(move |frame| std::future::ready({ + match frame { + Ok(f) => f.header().frame_no >= from, + Err(_) => true, + } + })) + } + pub fn header(&self) -> &SnapshotFileHeader { &self.header } @@ -409,7 +443,7 @@ impl SnapshotBuilder { Ok(Self { seen_pages: HashSet::new(), header: SnapshotFileHeader { - log_id: log_id.as_u128(), + namespace_id: log_id.as_u128(), start_frame_no: u64::MAX, end_frame_no: u64::MIN, frame_count: 0, @@ -468,7 +502,7 @@ impl SnapshotBuilder { file.as_file().write_all_at(bytes_of(&self.header), 0)?; let snapshot_name = format!( "{}-{}-{}.snap", - Uuid::from_u128(self.header.log_id), + Uuid::from_u128(self.header.namespace_id), self.header.start_frame_no, self.header.end_frame_no, ); @@ -544,7 +578,7 @@ mod test { assert_eq!(header.start_frame_no, 0); assert_eq!(header.end_frame_no, 49); assert_eq!(header.frame_count, 25); - assert_eq!(header.log_id, log_id.as_u128()); + assert_eq!(header.namespace_id, log_id.as_u128()); assert_eq!(header.size_after, 25); let mut seen_frames = HashSet::new();