From 41e7e8c7dc364e481749a5295a9f928e0b90a76a Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Wed, 5 Apr 2023 14:15:20 +0200 Subject: [PATCH] bottomless: add read-only replicas that read from S3 This commit adds support for using bottomless in "read-only replica" mode. In this mode, all data is read exclusively from S3 and nothing gets restored to disk. FIXME: it also contains a terrible, terrible hack in order to work - on startup, the first page of the database is restored from S3, and the database file is truncated to match the original DB size, thus making libSQL think it's a properly structured database file. After that, all data is read only from S3. A proper solution is to ship bottomless WAL with a bottomless VFS implementation - with the VFS set up to read pages from S3. Effectively, it will only ever read the first db page from there, and from then on it will just read everything from S3. --- Cargo.lock | 6 +- bottomless-cli/src/replicator_extras.rs | 28 ++- bottomless/Cargo.toml | 2 + bottomless/Makefile | 4 +- bottomless/src/ffi.rs | 5 +- bottomless/src/lib.rs | 253 ++++++++++++++++++++++-- bottomless/src/replicator.rs | 124 +++++++++++- bottomless/test/restore_test.sql | 1 + bottomless/test/smoke_test.sql | 1 - sqld-libsql-bindings/src/ffi/mod.rs | 5 +- 10 files changed, 397 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19810780..139e1293 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -721,7 +721,7 @@ dependencies = [ [[package]] name = "bottomless" -version = "0.1.17" +version = "0.1.18" dependencies = [ "anyhow", "async-compression", @@ -2273,7 +2273,7 @@ dependencies = [ [[package]] name = "libsqlite3-sys" version = "0.26.0" -source = "git+https://github.com/psarna/rusqlite?rev=63b7aabfccbc21738#63b7aabfccbc217384225604cc419f2cf792b8d1" +source = "git+https://github.com/psarna/rusqlite?rev=a6332e530f30dc2d47110#a6332e530f30dc2d471103eed96a650407a73c7a" dependencies = [ "bindgen", "cc", @@ -3455,7 +3455,7 @@ dependencies = [ [[package]] name = "rusqlite" version = "0.29.0" -source = "git+https://github.com/psarna/rusqlite?rev=63b7aabfccbc21738#63b7aabfccbc217384225604cc419f2cf792b8d1" +source = "git+https://github.com/psarna/rusqlite?rev=a6332e530f30dc2d47110#a6332e530f30dc2d471103eed96a650407a73c7a" dependencies = [ "bitflags 2.0.2", "fallible-iterator", diff --git a/bottomless-cli/src/replicator_extras.rs b/bottomless-cli/src/replicator_extras.rs index a8d8d912..64e8c86c 100644 --- a/bottomless-cli/src/replicator_extras.rs +++ b/bottomless-cli/src/replicator_extras.rs @@ -60,7 +60,33 @@ impl Replicator { ); } Err(aws_sdk_s3::types::SdkError::ServiceError(err)) if err.err().is_no_such_key() => { - println!("\tno main database snapshot file found") + match self + .client + .get_object_attributes() + .bucket(&self.bucket) + .key(format!("{}-{}/db.gz", self.db_name, generation)) + .object_attributes(aws_sdk_s3::model::ObjectAttributes::ObjectSize) + .send() + .await { + Ok(attrs) => { + println!("\tmain database snapshot:"); + println!("\t\tobject size: {}", attrs.object_size()); + println!( + "\t\tlast modified: {}", + attrs + .last_modified() + .map(|s| s + .fmt(aws_smithy_types::date_time::Format::DateTime) + .unwrap_or_else(|e| e.to_string())) + .as_deref() + .unwrap_or("never") + ); + } + Err(aws_sdk_s3::types::SdkError::ServiceError(err)) if err.err().is_no_such_key() => { + println!("\tmain database snapshot: not found"); + } + Err(e) => println!("\tfailed to fetch main database snapshot info: {e}"), + } } Err(e) => println!("\tfailed to fetch main database snapshot info: {e}"), }; diff --git a/bottomless/Cargo.toml b/bottomless/Cargo.toml index 3b91729e..82b69912 100644 --- a/bottomless/Cargo.toml +++ b/bottomless/Cargo.toml @@ -13,6 +13,7 @@ anyhow = "1.0.66" async-compression = { version = "0.3.15", features = ["tokio", "gzip"] } aws-config = { version = "0.52.0" } aws-sdk-s3 = { version = "0.22.0" } +byteorder = "1.4.3" bytes = "1" crc = "3.0.0" futures = { version = "0.3.25" } @@ -24,6 +25,7 @@ uuid = { version = "1.3", features = ["v7"] } [features] libsql_linked_statically = [] +init_tracing_statically = [] [lib] crate-type = ["rlib", "staticlib"] diff --git a/bottomless/Makefile b/bottomless/Makefile index 7575cc91..6e2ce2d3 100644 --- a/bottomless/Makefile +++ b/bottomless/Makefile @@ -1,10 +1,10 @@ all: debug debug: bottomless.c src/lib.rs - cargo build -p bottomless && clang -Wall -fPIC -shared -DLIBSQL_ENABLE_BOTTOMLESS_WAL bottomless.c -I${LIBSQL_DIR} ../target/debug/libbottomless.a -o ../target/debug/bottomless.so + cargo build -p bottomless -Finit_tracing_statically && clang -Wall -fPIC -shared -DLIBSQL_ENABLE_BOTTOMLESS_WAL bottomless.c -I${LIBSQL_DIR} ../target/debug/libbottomless.a -o ../target/debug/bottomless.so release: bottomless.c src/lib.rs - cargo build -p bottomless -j1 --quiet --release && \ + cargo build -p bottomless -Finit_tracing_statically -j1 --quiet --release && \ clang -fPIC -shared -DLIBSQL_ENABLE_BOTTOMLESS_WAL bottomless.c -I${LIBSQL_DIR} ../target/release/libbottomless.a \ -o ../target/release/bottomless.so diff --git a/bottomless/src/ffi.rs b/bottomless/src/ffi.rs index e8dd2239..1c694b4a 100644 --- a/bottomless/src/ffi.rs +++ b/bottomless/src/ffi.rs @@ -1,6 +1,7 @@ pub use sqld_libsql_bindings::ffi::{ - libsql_wal_methods, sqlite3, sqlite3_file, sqlite3_vfs, PageHdrIter, PgHdr, Wal, WalIndexHdr, - SQLITE_CANTOPEN, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR_WRITE, SQLITE_OK, + libsql_wal, libsql_wal_methods, sqlite3, sqlite3_file, sqlite3_vfs, PageHdrIter, PgHdr, Wal, + WalIndexHdr, SQLITE_CANTOPEN, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR_READ, + SQLITE_IOERR_WRITE, SQLITE_OK, SQLITE_READONLY, }; #[repr(C)] diff --git a/bottomless/src/lib.rs b/bottomless/src/lib.rs index e290fdde..fb86b138 100644 --- a/bottomless/src/lib.rs +++ b/bottomless/src/lib.rs @@ -49,12 +49,75 @@ pub extern "C" fn xOpen( std::ffi::CStr::from_ptr(wal_name).to_str().unwrap() }); + let readonly_replica = std::env::var("LIBSQL_BOTTOMLESS_READONLY_REPLICA") + .map(|v| v == "1" || v == "true") + .unwrap_or(false); + + tracing::info!("Readonly replica: {readonly_replica}"); + let orig_methods = unsafe { &*(*(methods as *mut bottomless_methods)).underlying_methods }; - let rc = unsafe { - (orig_methods.xOpen.unwrap())(vfs, db_file, wal_name, no_shm_mode, max_size, methods, wal) - }; - if rc != ffi::SQLITE_OK { - return rc; + + if readonly_replica { + tracing::debug!("Allocating new readonly WAL"); + let new_wal = Box::leak(Box::new(ffi::libsql_wal { + pVfs: vfs, + pDbFd: db_file, + pWalFd: std::ptr::null_mut(), + iCallback: 0, + mxWalSize: max_size, + szFirstBlock: 4096, + nWiData: 0, + apWiData: std::ptr::null_mut(), + szPage: 4096, + readLock: 0, + syncFlags: 0, + exclusiveMode: 1, + writeLock: 0, + ckptLock: 0, + readOnly: 1, + truncateOnCommit: 0, + syncHeader: 0, + padToSectorBoundary: 0, + bShmUnreliable: 1, + hdr: ffi::WalIndexHdr { + iVersion: 1, + unused: 0, + iChange: 0, + isInit: 0, + bigEndCksum: 0, + szPage: 4096, + mxFrame: u32::MAX, + nPage: 1, + aFrameCksum: [0, 0], + aSalt: [0, 0], + aCksum: [0, 0], + }, + minFrame: 1, + iReCksum: 0, + zWalName: "bottomless\0".as_ptr() as *const c_char, + nCkpt: 0, + lockError: 0, + pSnapshot: std::ptr::null_mut(), + db: std::ptr::null_mut(), + pMethods: methods, + pMethodsData: std::ptr::null_mut(), // fixme: validate + })); + unsafe { *wal = new_wal as *mut ffi::libsql_wal } + } else { + let rc = unsafe { + (orig_methods.xOpen.unwrap())( + vfs, + db_file, + wal_name, + no_shm_mode, + max_size, + methods, + wal, + ) + }; + if rc != ffi::SQLITE_OK { + return rc; + } } if !is_regular(vfs) { @@ -78,7 +141,15 @@ pub extern "C" fn xOpen( } }; - let replicator = block_on!(runtime, replicator::Replicator::new()); + let replicator = block_on!( + runtime, + replicator::Replicator::create(replicator::Options { + create_bucket_if_not_exists: false, + verify_crc: true, + use_compression: false, + readonly_replica, + }) + ); let mut replicator = match replicator { Ok(repl) => repl, Err(e) => { @@ -110,7 +181,7 @@ pub extern "C" fn xOpen( }; let context_ptr = Box::into_raw(Box::new(context)) as *mut c_void; unsafe { (*(*wal)).pMethodsData = context_ptr }; - + tracing::debug!("bottomless WAL initialized"); ffi::SQLITE_OK } @@ -132,8 +203,17 @@ pub extern "C" fn xClose( z_buf: *mut u8, ) -> i32 { tracing::debug!("Closing wal"); + let ctx = get_replicator_context(wal); let orig_methods = get_orig_methods(wal); let methods_data = unsafe { (*wal).pMethodsData as *mut replicator::Context }; + + if ctx.replicator.readonly_replica { + tracing::debug!("Read-only replica, skipping WAL close"); + let _box = unsafe { Box::from_raw(methods_data) }; + let _wal = unsafe { Box::from_raw(wal) }; + return ffi::SQLITE_OK; + } + let rc = unsafe { (orig_methods.xClose.unwrap())(wal, db, sync_flags, n_buf, z_buf) }; if rc != ffi::SQLITE_OK { return rc; @@ -151,27 +231,81 @@ pub extern "C" fn xLimit(wal: *mut Wal, limit: i64) { pub extern "C" fn xBeginReadTransaction(wal: *mut Wal, changed: *mut i32) -> i32 { let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xBeginReadTransaction.unwrap())(wal, changed) } + let ctx = get_replicator_context(wal); + if ctx.replicator.readonly_replica { + tracing::debug!( + "Started read transaction in readonly replica mode. changed == {}", + unsafe { *changed } + ); + unsafe { *changed = 1 } + tracing::debug!("Fetching latest generation information"); + let generation = block_on!(ctx.runtime, ctx.replicator.find_newest_generation()); + match generation { + Some(gen) => { + tracing::debug!("Latest generation: {gen}"); + ctx.replicator.set_generation(gen); + } + None => { + tracing::debug!("No generation found"); + return ffi::SQLITE_IOERR_READ; + } + } + ffi::SQLITE_OK + } else { + unsafe { (orig_methods.xBeginReadTransaction.unwrap())(wal, changed) } + } } pub extern "C" fn xEndReadTransaction(wal: *mut Wal) { + let ctx = get_replicator_context(wal); + if ctx.replicator.readonly_replica { + tracing::debug!("Ended read transaction in readonly replica mode"); + return; + } let orig_methods = get_orig_methods(wal); unsafe { (orig_methods.xEndReadTransaction.unwrap())(wal) } } pub extern "C" fn xFindFrame(wal: *mut Wal, pgno: u32, frame: *mut u32) -> i32 { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xFindFrame.unwrap())(wal, pgno, frame) } + let ctx = get_replicator_context(wal); + if ctx.replicator.readonly_replica { + tracing::debug!("Finding frame for page {pgno}"); + unsafe { *frame = pgno }; + ffi::SQLITE_OK + } else { + let orig_methods = get_orig_methods(wal); + unsafe { (orig_methods.xFindFrame.unwrap())(wal, pgno, frame) } + } } pub extern "C" fn xReadFrame(wal: *mut Wal, frame: u32, n_out: i32, p_out: *mut u8) -> i32 { - let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xReadFrame.unwrap())(wal, frame, n_out, p_out) } + let ctx = get_replicator_context(wal); + if ctx.replicator.readonly_replica { + tracing::debug!("Reading newest page {frame} from the replicator. n_out == {n_out}"); + match block_on!(ctx.runtime, ctx.replicator.read_newest_page(frame)) { + Ok(page) => { + unsafe { + std::ptr::copy_nonoverlapping(page.as_ptr(), p_out, page.len()); + }; + tracing::trace!("Copied {} bytes", page.len()); + ffi::SQLITE_OK + } + Err(e) => { + tracing::error!("Failed to read page {frame}: {e}"); + ffi::SQLITE_IOERR_READ + } + } + } else { + let orig_methods = get_orig_methods(wal); + unsafe { (orig_methods.xReadFrame.unwrap())(wal, frame, n_out, p_out) } + } } pub extern "C" fn xDbsize(wal: *mut Wal) -> u32 { let orig_methods = get_orig_methods(wal); - unsafe { (orig_methods.xDbsize.unwrap())(wal) } + let size = unsafe { (orig_methods.xDbsize.unwrap())(wal) }; + tracing::debug!("DB size: {size}"); + size } pub extern "C" fn xBeginWriteTransaction(wal: *mut Wal) -> i32 { @@ -240,8 +374,15 @@ pub extern "C" fn xFrames( sync_flags: i32, ) -> i32 { let mut last_consistent_frame = 0; + let ctx = get_replicator_context(wal); + + tracing::warn!("READONLY {}", ctx.replicator.readonly_replica); + if !is_local() && ctx.replicator.readonly_replica { + tracing::error!("Attempt to write to a read-only replica"); + return ffi::SQLITE_READONLY; + } + if !is_local() { - let ctx = get_replicator_context(wal); let last_valid_frame = unsafe { (*wal).hdr.mxFrame }; ctx.replicator.register_last_valid_frame(last_valid_frame); // In theory it's enough to set the page size only once, but in practice @@ -288,7 +429,6 @@ pub extern "C" fn xFrames( return rc; } - let ctx = get_replicator_context(wal); if is_commit != 0 { let frame_checksum = unsafe { (*wal).hdr.aFrameCksum }; @@ -374,6 +514,11 @@ pub extern "C" fn xCheckpoint( return ffi::SQLITE_OK; } + if ctx.replicator.readonly_replica { + tracing::debug!("Read-only replica, not snapshotting"); + return ffi::SQLITE_OK; + } + ctx.replicator.new_generation(); tracing::debug!("Snapshotting after checkpoint"); let result = block_on!(ctx.runtime, ctx.replicator.snapshot_main_db_file()); @@ -429,6 +574,11 @@ pub extern "C" fn xGetPathname(buf: *mut c_char, orig: *const c_char, orig_len: } async fn try_restore(replicator: &mut replicator::Replicator) -> i32 { + if replicator.readonly_replica { + tracing::debug!("Read-only replica, not restoring"); + return ffi::SQLITE_OK; + } + match replicator.restore().await { Ok(replicator::RestoreAction::None) => (), Ok(replicator::RestoreAction::SnapshotMainDbFile) => { @@ -493,6 +643,7 @@ pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const create_bucket_if_not_exists: true, verify_crc: true, use_compression: false, + readonly_replica: false, }) ); let mut replicator = match replicator { @@ -504,6 +655,76 @@ pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const }; replicator.register_db(path); + + let readonly_replica = std::env::var("LIBSQL_BOTTOMLESS_READONLY_REPLICA") + .map(|v| v == "1" || v == "true") + .unwrap_or(false); + + if readonly_replica { + use byteorder::ReadBytesExt; + use std::io::{Seek, Write}; + + let generation = block_on!(runtime, replicator.find_newest_generation()); + match generation { + Some(gen) => { + tracing::debug!("Latest generation: {gen}"); + replicator.set_generation(gen); + } + None => { + tracing::debug!("No generation found"); + return ffi::SQLITE_IOERR_READ; + } + } + + tracing::info!("Running in read-only replica mode"); + tracing::warn!("Rewriting the database with only its first page restored from S3. That needs to happen with a specialized VFS instead"); + let page = match block_on!(runtime, replicator.read_newest_page(1)) { + Ok(page) => page, + Err(e) => { + tracing::error!("Failed to read page 1: {e}"); + return ffi::SQLITE_CANTOPEN; + } + }; + + let mut file = match std::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&replicator.db_path) + { + Ok(file) => file, + Err(e) => { + tracing::error!("Failed to open the main database file for writing: {}", e); + return ffi::SQLITE_CANTOPEN; + } + }; + + file.write_all(&page).ok(); + + // FIXME: terrible hack - download page 1 and truncate the database to proper size + // in order to trick SQLite into thinking that the database is in a consistent state. + // A proper solution is to ship with a custom VFS that can read page 1 from S3 directly. + let page_size = file + .seek(std::io::SeekFrom::Start(16)) + .and_then(|_| file.read_u16::()) + .unwrap_or(4096); + let page_size = if page_size == 1 { + 65536 + } else { + page_size as u32 + }; + let db_file_size = file + .seek(std::io::SeekFrom::Start(28)) + .and_then(|_| file.read_u32::()) + .unwrap_or(0); + tracing::warn!("Page size: {page_size}, db file size: {db_file_size}"); + + file.set_len(db_file_size as u64 * page_size as u64).ok(); + tracing::warn!("Just overwritten file {}", &replicator.db_path); + + return ffi::SQLITE_OK; + } + block_on!(runtime, try_restore(&mut replicator)) } @@ -578,6 +799,8 @@ pub mod static_init { static INIT: std::sync::Once = std::sync::Once::new(); INIT.call_once(|| { crate::bottomless_init(); + #[cfg(feature = "init_tracing_statically")] + crate::bottomless_tracing_init(); let orig_methods = unsafe { libsql_wal_methods_find(std::ptr::null()) }; if orig_methods.is_null() {} let methods = crate::bottomless_methods(orig_methods); diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 87a2ddf6..97312620 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -32,6 +32,7 @@ pub struct Replicator { pub db_name: String, use_compression: bool, + pub readonly_replica: bool, } #[derive(Debug)] @@ -52,16 +53,19 @@ pub struct Options { pub create_bucket_if_not_exists: bool, pub verify_crc: bool, pub use_compression: bool, + pub readonly_replica: bool, } impl Replicator { pub const UNSET_PAGE_SIZE: usize = usize::MAX; + pub const ENCODED_FRAME_BASE: u32 = u32::MAX; pub async fn new() -> Result { Self::create(Options { create_bucket_if_not_exists: false, verify_crc: true, use_compression: false, + readonly_replica: false, }) .await } @@ -109,6 +113,7 @@ impl Replicator { db_path: String::new(), db_name: String::new(), use_compression: options.use_compression, + readonly_replica: options.readonly_replica, }) } @@ -246,9 +251,10 @@ impl Replicator { tracing::warn!("Unexpected truncated page of size {}", data.len()) } + let encoded_frame = Self::ENCODED_FRAME_BASE - frame; let key = format!( "{}-{}/{:012}-{:012}-{:016x}", - self.db_name, self.generation, frame, pgno, crc + self.db_name, self.generation, pgno, encoded_frame, crc ); let body: ByteStream = if self.use_compression { @@ -306,6 +312,107 @@ impl Replicator { Ok(()) } + async fn read_newest_page_from_wal(&mut self, pgno: u32) -> Result> { + use tokio::io::AsyncReadExt; + + let prefix = format!("{}-{}/{:012}-", self.db_name, self.generation, pgno); + tracing::trace!("Reading newest page from wal: {prefix}"); + let response = self + .client + .list_objects() + .bucket(&self.bucket) + .prefix(prefix) + .max_keys(1) + .send() + .await?; + + let objs = response + .contents + .ok_or_else(|| anyhow::anyhow!("No pages found"))?; + let key = objs + .first() + .ok_or_else(|| anyhow::anyhow!("No pages found"))? + .key + .as_ref() + .ok_or_else(|| anyhow::anyhow!("No pages found"))?; + tracing::trace!("Found key {key}"); + let obj = self + .client + .get_object() + .bucket(&self.bucket) + .key(key) + .send() + .await?; + let reader = obj.body.into_async_read(); + let mut buf = Vec::with_capacity(self.page_size); + let mut reader = tokio::io::BufReader::new(reader); + if self.use_compression { + let mut reader = async_compression::tokio::bufread::GzipDecoder::new(reader); + reader.read_to_end(&mut buf).await?; + } else { + reader.read_to_end(&mut buf).await?; + } + + tracing::trace!("Read page {pgno} from wal: {}B", buf.len()); + Ok(buf) + } + + async fn read_newest_page_from_main_db_file(&mut self, pgno: u32) -> Result> { + use tokio::io::AsyncReadExt; + + if self.use_compression { + return Err(anyhow::anyhow!( + "Cannot read pages from main db file when compression is enabled" + )); + } + + let range_start = (pgno - 1) * self.page_size as u32; + let range_end = pgno * self.page_size as u32 - 1; + + tracing::debug!( + "Reading page {} from main db file. Range: {}-{}", + pgno, + range_start, + range_end + ); + let obj = self + .client + .get_object() + .bucket(&self.bucket) + .range(format!("bytes={range_start}-{range_end}")) + .key(format!("{}-{}/db.db", self.db_name, self.generation)) + .send() + .await?; + + let reader = obj.body.into_async_read(); + + let mut buf: Vec = Vec::with_capacity(self.page_size); + let mut reader = tokio::io::BufReader::new(reader); + reader.read_to_end(&mut buf).await?; + + tracing::trace!("Read {} bytes", buf.len()); + Ok(buf) + } + + pub async fn read_newest_page(&mut self, pgno: u32) -> Result> { + if self.page_size == Self::UNSET_PAGE_SIZE { + let page_size = std::env::var("LIBSQL_BOTTOMLESS_PAGE_SIZE") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(4096); + tracing::warn!("Setting page size to {page_size}. If this is not correct, please set it with LIBSQL_BOTTOMLESS_PAGE_SIZE env variable."); + self.set_page_size(page_size).ok(); + } + + match self.read_newest_page_from_wal(pgno).await { + Ok(page) => Ok(page), + Err(e) => { + tracing::debug!("Failed to read page from WAL: {e}. Let's try main db file"); + self.read_newest_page_from_main_db_file(pgno).await + } + } + } + // Drops uncommitted frames newer than given last valid frame pub fn rollback_to_frame(&mut self, last_valid_frame: u32) { // NOTICE: O(size), can be optimized to O(removed) if ever needed @@ -579,13 +686,14 @@ impl Replicator { } // Parses the frame and page number from given key. - // Format: -/-- + // Format: -/-- fn parse_frame_page_crc(key: &str) -> Option<(u32, i32, u64)> { let checksum_delim = key.rfind('-')?; - let page_delim = key[0..checksum_delim].rfind('-')?; - let frame_delim = key[0..page_delim].rfind('/')?; - let frameno = key[frame_delim + 1..page_delim].parse::().ok()?; - let pgno = key[page_delim + 1..checksum_delim].parse::().ok()?; + let frame_delim = key[0..checksum_delim].rfind('-')?; + let page_delim = key[0..frame_delim].rfind('/')?; + let frameno = + Self::ENCODED_FRAME_BASE - key[frame_delim + 1..checksum_delim].parse::().ok()?; + let pgno = key[page_delim + 1..frame_delim].parse::().ok()?; let crc = u64::from_str_radix(&key[checksum_delim + 1..], 16).ok()?; tracing::debug!(frameno, pgno, crc); Some((frameno, pgno, crc)) @@ -822,6 +930,10 @@ impl Replicator { // Restores the database state from newest remote generation pub async fn restore(&mut self) -> Result { + if self.readonly_replica { + tracing::info!("Read-only replica, nothing to restore"); + return Ok(RestoreAction::None); + } let newest_generation = match self.find_newest_generation().await { Some(gen) => gen, None => { diff --git a/bottomless/test/restore_test.sql b/bottomless/test/restore_test.sql index 2027a423..ec414887 100644 --- a/bottomless/test/restore_test.sql +++ b/bottomless/test/restore_test.sql @@ -2,5 +2,6 @@ .echo on .load ../../target/debug/bottomless .open file:test.db?wal=bottomless&immutable=1 +pragma journal_mode; .mode column SELECT v, length(v) FROM test; diff --git a/bottomless/test/smoke_test.sql b/bottomless/test/smoke_test.sql index 7398575d..ec790d4d 100644 --- a/bottomless/test/smoke_test.sql +++ b/bottomless/test/smoke_test.sql @@ -2,7 +2,6 @@ .echo on .load ../../target/debug/bottomless .open file:test.db?wal=bottomless -PRAGMA page_size=65536; PRAGMA journal_mode=wal; PRAGMA page_size; DROP TABLE IF EXISTS test; diff --git a/sqld-libsql-bindings/src/ffi/mod.rs b/sqld-libsql-bindings/src/ffi/mod.rs index 9588ba6b..d6f5a78c 100644 --- a/sqld-libsql-bindings/src/ffi/mod.rs +++ b/sqld-libsql-bindings/src/ffi/mod.rs @@ -3,9 +3,10 @@ pub mod types; pub use rusqlite::ffi::{ - libsql_wal_methods, libsql_wal_methods_find, libsql_wal_methods_register, sqlite3, + libsql_wal, libsql_wal_methods, libsql_wal_methods_find, libsql_wal_methods_register, sqlite3, sqlite3_file, sqlite3_io_methods, sqlite3_vfs, WalIndexHdr, SQLITE_CANTOPEN, - SQLITE_CHECKPOINT_FULL, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR_WRITE, SQLITE_OK, + SQLITE_CHECKPOINT_FULL, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR_READ, SQLITE_IOERR_WRITE, + SQLITE_OK, SQLITE_READONLY, }; pub use rusqlite::ffi::libsql_pghdr as PgHdr;