diff --git a/Cargo.lock b/Cargo.lock index 7037e35fe2..bf275b32f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -208,7 +208,7 @@ dependencies = [ "pin-project-lite", "pin-utils", "slab", - "wasm-bindgen-futures", + "wasm-bindgen-futures 0.4.24", ] [[package]] @@ -228,6 +228,17 @@ dependencies = [ "syn", ] +[[package]] +name = "async_io_stream" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d5ad740b7193a31e80950ab7fece57c38d426fcd23a729d9d7f4cf15bb63f94" +dependencies = [ + "futures 0.3.15", + "pharos", + "rustc_version 0.3.3", +] + [[package]] name = "atoi" version = "0.4.0" @@ -486,6 +497,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "console_error_panic_hook" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8d976903543e0c48546a91908f21588a680a8c8f984df9a5d69feccb2b2a211" +dependencies = [ + "cfg-if 0.1.10", + "wasm-bindgen", +] + [[package]] name = "const_fn" version = "0.4.8" @@ -850,6 +871,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" +[[package]] +name = "futures" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" + [[package]] name = "futures" version = "0.3.15" @@ -875,12 +902,27 @@ dependencies = [ "futures-sink", ] +[[package]] +name = "futures-channel-preview" +version = "0.3.0-alpha.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5e5f4df964fa9c1c2f8bddeb5c3611631cacd93baf810fc8bb2fb4b495c263a" +dependencies = [ + "futures-core-preview", +] + [[package]] name = "futures-core" version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1" +[[package]] +name = "futures-core-preview" +version = "0.3.0-alpha.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b35b6263fb1ef523c3056565fa67b1d16f0a8604ff12b11b08c25f28a734c60a" + [[package]] name = "futures-executor" version = "0.3.15" @@ -970,6 +1012,17 @@ dependencies = [ "slab", ] +[[package]] +name = "futures-util-preview" +version = "0.3.0-alpha.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce968633c17e5f97936bd2797b6e38fb56cf16a7422319f7ec2e30d3c470e8d" +dependencies = [ + "futures-core-preview", + "pin-utils", + "slab", +] + [[package]] name = "generic-array" version = "0.14.4" @@ -987,8 +1040,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ "cfg-if 1.0.0", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1181,7 +1236,7 @@ dependencies = [ "anyhow", "async-std", "dotenv", - "futures", + "futures 0.3.15", "paw", "serde", "serde_json", @@ -1691,6 +1746,16 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "pharos" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e5f09cc3d3cb227536487dfe7fb8c246eccc6c8a32d03daa30ba4cf3212917" +dependencies = [ + "futures 0.3.15", + "rustc_version 0.2.3", +] + [[package]] name = "pin-project-lite" version = "0.2.6" @@ -2113,6 +2178,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "scoped-tls" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" + [[package]] name = "scopeguard" version = "1.1.0" @@ -2185,6 +2256,12 @@ dependencies = [ "pest", ] +[[package]] +name = "send_wrapper" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "930c0acf610d3fdb5e2ab6213019aaa04e227ebe9547b0649ba599b16d788bd7" + [[package]] name = "serde" version = "1.0.126" @@ -2339,7 +2416,7 @@ dependencies = [ "async-std", "dotenv", "env_logger 0.8.3", - "futures", + "futures 0.3.15", "paste", "serde", "serde_json", @@ -2375,7 +2452,7 @@ dependencies = [ "clap_derive", "console", "dotenv", - "futures", + "futures 0.3.15", "glob", "openssl", "promptly", @@ -2414,6 +2491,7 @@ dependencies = [ "futures-intrusive", "futures-util", "generic-array", + "getrandom", "git2", "hashlink", "hex", @@ -2460,7 +2538,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-std", - "futures", + "futures 0.3.15", "paw", "sqlx", "structopt", @@ -2471,7 +2549,7 @@ name = "sqlx-example-postgres-listen" version = "0.1.0" dependencies = [ "async-std", - "futures", + "futures 0.3.15", "sqlx", ] @@ -2483,7 +2561,7 @@ dependencies = [ "async-std", "async-trait", "dotenv", - "futures", + "futures 0.3.15", "mockall", "paw", "sqlx", @@ -2497,7 +2575,7 @@ dependencies = [ "anyhow", "async-std", "dotenv", - "futures", + "futures 0.3.15", "paw", "sqlx", "structopt", @@ -2518,7 +2596,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-std", - "futures", + "futures 0.3.15", "paw", "sqlx", "structopt", @@ -2552,11 +2630,17 @@ dependencies = [ "async-native-tls", "async-rustls", "async-std", + "async_io_stream", + "futures-util", "native-tls", "once_cell", "tokio", "tokio-native-tls", "tokio-rustls", + "wasm-bindgen", + "wasm-bindgen-futures 0.3.27", + "web-sys", + "ws_stream_wasm", ] [[package]] @@ -2571,6 +2655,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "sqlx-wasm-test" +version = "0.1.0" +dependencies = [ + "futures 0.3.15", + "instant", + "paste", + "serde", + "serde_json", + "sqlx", + "time 0.2.26", + "wasm-bindgen", + "wasm-bindgen-futures 0.3.27", + "wasm-bindgen-test", + "web-sys", + "ws_stream_wasm", +] + [[package]] name = "standback" version = "0.2.17" @@ -3113,6 +3215,22 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83420b37346c311b9ed822af41ec2e82839bfe99867ec6c54e2da43b7538771c" +dependencies = [ + "cfg-if 0.1.10", + "futures 0.1.31", + "futures-channel-preview", + "futures-util-preview", + "js-sys", + "lazy_static", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-futures" version = "0.4.24" @@ -3154,6 +3272,30 @@ version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7cff876b8f18eed75a66cf49b65e7f967cb354a7aa16003fb55dbfd25b44b4f" +[[package]] +name = "wasm-bindgen-test" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cab416a9b970464c2882ed92d55b0c33046b08e0bdc9d59b3b718acd4e1bae8" +dependencies = [ + "console_error_panic_hook", + "js-sys", + "scoped-tls", + "wasm-bindgen", + "wasm-bindgen-futures 0.4.24", + "wasm-bindgen-test-macro", +] + +[[package]] +name = "wasm-bindgen-test-macro" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd4543fc6cf3541ef0d98bf720104cc6bd856d7eba449fd2aa365ef4fed0e782" +dependencies = [ + "proc-macro2", + "quote", +] + [[package]] name = "web-sys" version = "0.3.51" @@ -3233,6 +3375,24 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "ws_stream_wasm" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9528a8ec27f348ae922b166e86670ef8be9f0427642d1f8725289f07e5207cd7" +dependencies = [ + "async_io_stream", + "futures 0.3.15", + "js-sys", + "pharos", + "rustc_version 0.3.3", + "send_wrapper", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures 0.4.24", + "web-sys", +] + [[package]] name = "wyz" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index aab4f3b613..a27f8848e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "sqlx-rt", "sqlx-macros", "sqlx-test", + "sqlx-wasm-test", "sqlx-cli", "sqlx-bench", "examples/mysql/todos", @@ -256,6 +257,26 @@ name = "postgres-derives" path = "tests/postgres/derives.rs" required-features = ["postgres", "macros"] +[[test]] +name = "pg-deletes-bench" +path = "tests/postgres/deletes_custom_bench.rs" +required-features = ["postgres"] + +[[test]] +name = "pg-updates-bench" +path = "tests/postgres/updates_custom_bench.rs" +required-features = ["postgres"] + +[[test]] +name = "pg-inserts-bench" +path = "tests/postgres/inserts_custom_bench.rs" +required-features = ["postgres"] + +[[test]] +name = "pg-selects-bench" +path = "tests/postgres/selects_custom_bench.rs" +required-features = ["postgres"] + # # Microsoft SQL Server (MSSQL) # diff --git a/sqlx-bench/Cargo.toml b/sqlx-bench/Cargo.toml index 0b288ca892..167311ac53 100644 --- a/sqlx-bench/Cargo.toml +++ b/sqlx-bench/Cargo.toml @@ -45,3 +45,8 @@ sqlx-rt = { version = "0.5", path = "../sqlx-rt", default-features = false } name = "pg_pool" harness = false required-features = ["postgres"] + +[[bench]] +name = "wasm_querying" +harness = false +required-features = ["postgres"] diff --git a/sqlx-bench/benches/wasm_querying.rs b/sqlx-bench/benches/wasm_querying.rs new file mode 100644 index 0000000000..0c6e89facd --- /dev/null +++ b/sqlx-bench/benches/wasm_querying.rs @@ -0,0 +1,26 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use sqlx::Row; +use sqlx::{postgres::PgRow, Connection}; +use sqlx::{Database, PgConnection, Postgres}; +use sqlx_rt::spawn; + +const URL: &str = "postgresql://paul:pass123@127.0.0.1:8080/jetasap_dev"; + +fn select() { + spawn(async { + let mut conn = ::Connection::connect(URL) + .await + .unwrap(); + + let airports = sqlx::query("select * from airports") + .fetch_all(&mut conn) + .await; + }); +} + +fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("fib 20", |b| b.iter(|| select())); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index b024aa3d16..a6bd78f37d 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -164,3 +164,6 @@ bstr = { version = "0.2.14", default-features = false, features = ["std"], optio git2 = { version = "0.13.20", default-features = false, optional = true } hashlink = "0.7.0" indexmap = "1.6.2" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom = { version = "0.2.2", features = ["js"] } diff --git a/sqlx-core/src/arguments.rs b/sqlx-core/src/arguments.rs index 8867176264..2e76433a33 100644 --- a/sqlx-core/src/arguments.rs +++ b/sqlx-core/src/arguments.rs @@ -5,6 +5,7 @@ use crate::encode::Encode; use crate::types::Type; /// A tuple of arguments to be sent to the database. +#[cfg(not(target_arch = "wasm32"))] pub trait Arguments<'q>: Send + Sized + Default { type Database: Database; @@ -18,10 +19,30 @@ pub trait Arguments<'q>: Send + Sized + Default { T: 'q + Send + Encode<'q, Self::Database> + Type; } +#[cfg(target_arch = "wasm32")] +pub trait Arguments<'q>: Sized + Default { + type Database: Database; + + /// Reserves the capacity for at least `additional` more values (of `size` total bytes) to + /// be added to the arguments without a reallocation. + fn reserve(&mut self, additional: usize, size: usize); + + /// Add the value to the end of the arguments. + fn add(&mut self, value: T) + where + T: 'q + Encode<'q, Self::Database> + Type; +} + +#[cfg(not(target_arch = "wasm32"))] pub trait IntoArguments<'q, DB: HasArguments<'q>>: Sized + Send { fn into_arguments(self) -> >::Arguments; } +#[cfg(target_arch = "wasm32")] +pub trait IntoArguments<'q, DB: HasArguments<'q>>: Sized { + fn into_arguments(self) -> >::Arguments; +} + // NOTE: required due to lack of lazy normalization #[allow(unused_macros)] macro_rules! impl_into_arguments_for_arguments { diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 402a198c9e..07dc191e1b 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -1,13 +1,20 @@ use crate::database::{Database, HasStatementCache}; use crate::error::Error; use crate::transaction::Transaction; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::future::BoxFuture; + +#[cfg(target_arch = "wasm32")] +use futures_core::future::LocalBoxFuture as BoxFuture; + use log::LevelFilter; use std::fmt::Debug; use std::str::FromStr; use std::time::Duration; /// Represents a single database connection. +#[cfg(not(target_arch = "wasm32"))] pub trait Connection: Send { type Database: Database; @@ -125,6 +132,123 @@ pub trait Connection: Send { } } +#[cfg(target_arch = "wasm32")] +pub trait Connection { + type Database: Database; + + type Options: ConnectOptions; + + /// Explicitly close this database connection. + /// + /// This method is **not required** for safe and consistent operation. However, it is + /// recommended to call it instead of letting a connection `drop` as the database backend + /// will be faster at cleaning up resources. + fn close(self) -> BoxFuture<'static, Result<(), Error>>; + + /// Checks if a connection to the database is still valid. + fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>>; + + /// Begin a new transaction or establish a savepoint within the active transaction. + /// + /// Returns a [`Transaction`] for controlling and tracking the new transaction. + fn begin(&mut self) -> BoxFuture<'_, Result, Error>> + where + Self: Sized; + + /// Execute the function inside a transaction. + /// + /// If the function returns an error, the transaction will be rolled back. If it does not + /// return an error, the transaction will be committed. + /// + /// # Example + /// + /// ```rust + /// use sqlx_core::connection::Connection; + /// use sqlx_core::error::Error; + /// use sqlx_core::executor::Executor; + /// use sqlx_core::postgres::{PgConnection, PgRow}; + /// use sqlx_core::query::query; + /// + /// # pub async fn _f(conn: &mut PgConnection) -> Result, Error> { + /// conn.transaction(|conn|Box::pin(async move { + /// query("select * from ..").fetch_all(conn).await + /// })).await + /// # } + /// ``` + fn transaction(&mut self, callback: F) -> BoxFuture<'_, Result> + where + for<'c> F: FnOnce(&'c mut Transaction<'_, Self::Database>) -> BoxFuture<'c, Result> + + 'static + + Send + + Sync, + Self: Sized, + R: Send, + E: From + Send, + { + Box::pin(async move { + let mut transaction = self.begin().await?; + let ret = callback(&mut transaction).await; + + match ret { + Ok(ret) => { + transaction.commit().await?; + + Ok(ret) + } + Err(err) => { + transaction.rollback().await?; + + Err(err) + } + } + }) + } + + /// The number of statements currently cached in the connection. + fn cached_statements_size(&self) -> usize + where + Self::Database: HasStatementCache, + { + 0 + } + + /// Removes all statements from the cache, closing them on the server if + /// needed. + fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> + where + Self::Database: HasStatementCache, + { + Box::pin(async move { Ok(()) }) + } + + #[doc(hidden)] + fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>>; + + #[doc(hidden)] + fn should_flush(&self) -> bool; + + /// Establish a new database connection. + /// + /// A value of [`Options`][Self::Options] is parsed from the provided connection string. This parsing + /// is database-specific. + #[inline] + fn connect(url: &str) -> BoxFuture<'static, Result> + where + Self: Sized, + { + let options = url.parse(); + Box::pin(async move { Ok(Self::connect_with(&options?).await?) }) + } + + /// Establish a new database connection with the provided options. + fn connect_with(options: &Self::Options) -> BoxFuture<'_, Result> + where + Self: Sized, + { + options.connect() + } +} + #[derive(Clone, Debug)] pub(crate) struct LogSettings { pub(crate) statements_level: LevelFilter, @@ -152,6 +276,7 @@ impl LogSettings { } } +#[cfg(not(target_arch = "wasm32"))] pub trait ConnectOptions: 'static + Send + Sync + FromStr + Debug { type Connection: Connection + ?Sized; @@ -173,3 +298,26 @@ pub trait ConnectOptions: 'static + Send + Sync + FromStr + Debug { .log_slow_statements(LevelFilter::Off, Duration::default()) } } + +#[cfg(target_arch = "wasm32")] +pub trait ConnectOptions: 'static + FromStr + Debug { + type Connection: Connection + ?Sized; + + /// Establish a new database connection with the options specified by `self`. + fn connect(&self) -> BoxFuture<'_, Result> + where + Self::Connection: Sized; + + /// Log executed statements with the specified `level` + fn log_statements(&mut self, level: LevelFilter) -> &mut Self; + + /// Log executed statements with a duration above the specified `duration` + /// at the specified `level`. + fn log_slow_statements(&mut self, level: LevelFilter, duration: Duration) -> &mut Self; + + /// Entirely disables statement logging (both slow and regular). + fn disable_statement_logging(&mut self) -> &mut Self { + self.log_statements(LevelFilter::Off) + .log_slow_statements(LevelFilter::Off, Duration::default()) + } +} diff --git a/sqlx-core/src/database.rs b/sqlx-core/src/database.rs index e1788597fb..ea5fadf23d 100644 --- a/sqlx-core/src/database.rs +++ b/sqlx-core/src/database.rs @@ -68,6 +68,7 @@ use crate::value::{Value, ValueRef}; /// /// This trait encapsulates a complete set of traits that implement a driver for a /// specific database (e.g., MySQL, PostgreSQL). +#[cfg(not(target_arch = "wasm32"))] pub trait Database: 'static + Sized @@ -100,6 +101,38 @@ pub trait Database: type Value: Value + 'static; } +#[cfg(target_arch = "wasm32")] +pub trait Database: + 'static + + Sized + + Debug + + for<'r> HasValueRef<'r, Database = Self> + + for<'q> HasArguments<'q, Database = Self> + + for<'q> HasStatement<'q, Database = Self> +{ + /// The concrete `Connection` implementation for this database. + type Connection: Connection; + + /// The concrete `TransactionManager` implementation for this database. + type TransactionManager: TransactionManager; + + /// The concrete `Row` implementation for this database. + type Row: Row; + + /// The concrete `QueryResult` implementation for this database. + type QueryResult: 'static + Sized + Sync + Default + Extend; + + /// The concrete `Column` implementation for this database. + type Column: Column; + + /// The concrete `TypeInfo` implementation for this database. + type TypeInfo: TypeInfo; + + /// The concrete type used to hold an owned copy of the not-yet-decoded value that was + /// received from the database. + type Value: Value + 'static; +} + /// Associate [`Database`] with a [`ValueRef`](crate::value::ValueRef) of a generic lifetime. /// /// --- diff --git a/sqlx-core/src/error.rs b/sqlx-core/src/error.rs index 6a152520db..245ffff178 100644 --- a/sqlx-core/src/error.rs +++ b/sqlx-core/src/error.rs @@ -100,7 +100,7 @@ pub enum Error { #[error("attempted to communicate with a crashed background worker")] WorkerCrashed, - #[cfg(feature = "migrate")] + #[cfg(all(feature = "migrate", not(target_arch = "wasm32")))] #[error("{0}")] Migrate(#[source] Box), } @@ -237,7 +237,7 @@ where } } -#[cfg(feature = "migrate")] +#[cfg(all(feature = "migrate", not(target_arch = "wasm32")))] impl From for Error { #[inline] fn from(error: crate::migrate::MigrateError) -> Self { diff --git a/sqlx-core/src/executor.rs b/sqlx-core/src/executor.rs index 2b0e27c219..c73892fcad 100644 --- a/sqlx-core/src/executor.rs +++ b/sqlx-core/src/executor.rs @@ -2,8 +2,17 @@ use crate::database::{Database, HasArguments, HasStatement}; use crate::describe::Describe; use crate::error::Error; use either::Either; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::future::BoxFuture; +#[cfg(target_arch = "wasm32")] +use futures_core::future::LocalBoxFuture as BoxFuture; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::stream::BoxStream; +#[cfg(target_arch = "wasm32")] +use futures_core::stream::LocalBoxStream as BoxStream; + use futures_util::{future, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use std::fmt::Debug; @@ -22,6 +31,7 @@ use std::fmt::Debug; /// * [`&mut PoolConnection`](super::pool::PoolConnection) /// * [`&mut Connection`](super::connection::Connection) /// +#[cfg(not(target_arch = "wasm32"))] pub trait Executor<'c>: Send + Debug + Sized { type Database: Database; @@ -175,6 +185,160 @@ pub trait Executor<'c>: Send + Debug + Sized { 'c: 'e; } +#[cfg(target_arch = "wasm32")] +pub trait Executor<'c>: Debug + Sized { + type Database: Database; + + /// Execute the query and return the total number of rows affected. + fn execute<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> BoxFuture<'e, Result<::QueryResult, Error>> + where + 'c: 'e, + E: Execute<'q, Self::Database>, + { + self.execute_many(query).try_collect().boxed_local() + } + + /// Execute multiple queries and return the rows affected from each query, in a stream. + fn execute_many<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> BoxStream<'e, Result<::QueryResult, Error>> + where + 'c: 'e, + E: Execute<'q, Self::Database>, + { + self.fetch_many(query) + .try_filter_map(|step| async move { + Ok(match step { + Either::Left(rows) => Some(rows), + Either::Right(_) => None, + }) + }) + .boxed_local() + } + + /// Execute the query and return the generated results as a stream. + fn fetch<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> BoxStream<'e, Result<::Row, Error>> + where + 'c: 'e, + E: Execute<'q, Self::Database>, + { + self.fetch_many(query) + .try_filter_map(|step| async move { + Ok(match step { + Either::Left(_) => None, + Either::Right(row) => Some(row), + }) + }) + .boxed_local() + } + + /// Execute multiple queries and return the generated results as a stream + /// from each query, in a stream. + fn fetch_many<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> BoxStream< + 'e, + Result< + Either<::QueryResult, ::Row>, + Error, + >, + > + where + 'c: 'e, + E: Execute<'q, Self::Database>; + + /// Execute the query and return all the generated results, collected into a [`Vec`]. + fn fetch_all<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> BoxFuture<'e, Result::Row>, Error>> + where + 'c: 'e, + E: Execute<'q, Self::Database>, + { + self.fetch(query).try_collect().boxed_local() + } + + /// Execute the query and returns exactly one row. + fn fetch_one<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> BoxFuture<'e, Result<::Row, Error>> + where + 'c: 'e, + E: Execute<'q, Self::Database>, + { + self.fetch_optional(query) + .and_then(|row| match row { + Some(row) => future::ok(row), + None => future::err(Error::RowNotFound), + }) + .boxed_local() + } + + /// Execute the query and returns at most one row. + fn fetch_optional<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> BoxFuture<'e, Result::Row>, Error>> + where + 'c: 'e, + E: Execute<'q, Self::Database>; + + /// Prepare the SQL query to inspect the type information of its parameters + /// and results. + /// + /// Be advised that when using the `query`, `query_as`, or `query_scalar` functions, the query + /// is transparently prepared and executed. + /// + /// This explicit API is provided to allow access to the statement metadata available after + /// it prepared but before the first row is returned. + #[inline] + fn prepare<'e, 'q: 'e>( + self, + query: &'q str, + ) -> BoxFuture<'e, Result<>::Statement, Error>> + where + 'c: 'e, + { + self.prepare_with(query, &[]) + } + + /// Prepare the SQL query, with parameter type information, to inspect the + /// type information about its parameters and results. + /// + /// Only some database drivers (PostgreSQL, MSSQL) can take advantage of + /// this extra information to influence parameter type inference. + fn prepare_with<'e, 'q: 'e>( + self, + sql: &'q str, + parameters: &'e [::TypeInfo], + ) -> BoxFuture<'e, Result<>::Statement, Error>> + where + 'c: 'e; + + /// Describe the SQL query and return type information about its parameters + /// and results. + /// + /// This is used by compile-time verification in the query macros to + /// power their type inference. + #[doc(hidden)] + fn describe<'e, 'q: 'e>( + self, + sql: &'q str, + ) -> BoxFuture<'e, Result, Error>> + where + 'c: 'e; +} + /// A type that may be executed against a database connection. /// /// Implemented for the following: @@ -182,6 +346,7 @@ pub trait Executor<'c>: Send + Debug + Sized { /// * [`&str`](std::str) /// * [`Query`](super::query::Query) /// +#[cfg(not(target_arch = "wasm32"))] pub trait Execute<'q, DB: Database>: Send + Sized { /// Gets the SQL that will be executed. fn sql(&self) -> &'q str; @@ -200,6 +365,25 @@ pub trait Execute<'q, DB: Database>: Send + Sized { fn persistent(&self) -> bool; } +#[cfg(target_arch = "wasm32")] +pub trait Execute<'q, DB: Database>: Sized { + /// Gets the SQL that will be executed. + fn sql(&self) -> &'q str; + + /// Gets the previously cached statement, if available. + fn statement(&self) -> Option<&>::Statement>; + + /// Returns the arguments to be bound against the query string. + /// + /// Returning `None` for `Arguments` indicates to use a "simple" query protocol and to not + /// prepare the query. Returning `Some(Default::default())` is an empty arguments object that + /// will be prepared (and cached) before execution. + fn take_arguments(&mut self) -> Option<>::Arguments>; + + /// Returns `true` if the statement should be cached. + fn persistent(&self) -> bool; +} + // NOTE: `Execute` is explicitly not implemented for String and &String to make it slightly more // involved to write `conn.execute(format!("SELECT {}", val))` impl<'q, DB: Database> Execute<'q, DB> for &'q str { diff --git a/sqlx-core/src/ext/async_stream.rs b/sqlx-core/src/ext/async_stream.rs index 54c0d0e3de..1f24732da2 100644 --- a/sqlx-core/src/ext/async_stream.rs +++ b/sqlx-core/src/ext/async_stream.rs @@ -3,7 +3,12 @@ use std::pin::Pin; use std::task::{Context, Poll}; use futures_channel::mpsc; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::future::BoxFuture; +#[cfg(target_arch = "wasm32")] +use futures_core::future::LocalBoxFuture as BoxFuture; + use futures_core::stream::Stream; use futures_util::{pin_mut, FutureExt, SinkExt}; @@ -14,6 +19,7 @@ pub struct TryAsyncStream<'a, T> { future: BoxFuture<'a, Result<(), Error>>, } +#[cfg(not(target_arch = "wasm32"))] impl<'a, T> TryAsyncStream<'a, T> { pub fn new(f: F) -> Self where @@ -38,6 +44,31 @@ impl<'a, T> TryAsyncStream<'a, T> { } } +#[cfg(target_arch = "wasm32")] +impl<'a, T> TryAsyncStream<'a, T> { + pub fn new(f: F) -> Self + where + F: FnOnce(mpsc::Sender>) -> Fut, + Fut: 'a + Future>, + T: 'a, + { + let (mut sender, receiver) = mpsc::channel(0); + + let future = f(sender.clone()); + let future = async move { + if let Err(error) = future.await { + let _ = sender.send(Err(error)).await; + } + + Ok(()) + } + .fuse() + .boxed_local(); + + Self { future, receiver } + } +} + impl<'a, T> Stream for TryAsyncStream<'a, T> { type Item = Result; diff --git a/sqlx-core/src/lib.rs b/sqlx-core/src/lib.rs index 9e50d0d842..963dc46904 100644 --- a/sqlx-core/src/lib.rs +++ b/sqlx-core/src/lib.rs @@ -48,6 +48,7 @@ pub mod types; #[macro_use] pub mod query; +#[cfg(not(target_arch = "wasm32"))] #[macro_use] pub mod acquire; @@ -63,6 +64,7 @@ pub mod describe; pub mod executor; pub mod from_row; mod io; +#[cfg(not(target_arch = "wasm32"))] mod logger; mod net; pub mod query_as; @@ -71,7 +73,7 @@ pub mod row; pub mod type_info; pub mod value; -#[cfg(feature = "migrate")] +#[cfg(all(feature = "migrate", not(target_arch = "wasm32")))] pub mod migrate; #[cfg(all( diff --git a/sqlx-core/src/net/mod.rs b/sqlx-core/src/net/mod.rs index 6b8371ef50..7f712a6869 100644 --- a/sqlx-core/src/net/mod.rs +++ b/sqlx-core/src/net/mod.rs @@ -1,16 +1,20 @@ mod socket; + +#[cfg(not(target_arch = "wasm32"))] mod tls; pub use socket::Socket; + +#[cfg(not(target_arch = "wasm32"))] pub use tls::{CertificateInput, MaybeTlsStream}; -#[cfg(feature = "_rt-async-std")] +#[cfg(any(feature = "_rt-async-std", target_arch = "wasm32"))] type PollReadBuf<'a> = [u8]; #[cfg(any(feature = "_rt-actix", feature = "_rt-tokio"))] type PollReadBuf<'a> = sqlx_rt::ReadBuf<'a>; -#[cfg(feature = "_rt-async-std")] +#[cfg(any(feature = "_rt-async-std", target_arch = "wasm32"))] type PollReadOut = usize; #[cfg(any(feature = "_rt-actix", feature = "_rt-tokio"))] diff --git a/sqlx-core/src/net/socket.rs b/sqlx-core/src/net/socket.rs index 06d5575c01..8453ac4e98 100644 --- a/sqlx-core/src/net/socket.rs +++ b/sqlx-core/src/net/socket.rs @@ -5,22 +5,33 @@ use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; +#[cfg(not(target_arch = "wasm32"))] use sqlx_rt::{AsyncRead, AsyncWrite, TcpStream}; +#[cfg(target_arch = "wasm32")] +use sqlx_rt::{AsyncRead, AsyncWrite, IoStream, WsMeta, WsStreamIo}; +#[cfg(target_arch = "wasm32")] +type WSIoStream = IoStream>; + #[derive(Debug)] pub enum Socket { + #[cfg(not(target_arch = "wasm32"))] Tcp(TcpStream), - #[cfg(unix)] + #[cfg(all(unix, not(target_arch = "wasm32")))] Unix(sqlx_rt::UnixStream), + + #[cfg(target_arch = "wasm32")] + WS((WsMeta, WSIoStream)), } impl Socket { + #[cfg(not(target_arch = "wasm32"))] pub async fn connect_tcp(host: &str, port: u16) -> io::Result { TcpStream::connect((host, port)).await.map(Socket::Tcp) } - #[cfg(unix)] + #[cfg(all(unix, not(target_arch = "wasm32")))] pub async fn connect_uds(path: impl AsRef) -> io::Result { sqlx_rt::UnixStream::connect(path.as_ref()) .await @@ -35,15 +46,23 @@ impl Socket { )) } + #[cfg(target_arch = "wasm32")] + pub async fn connect_ws(url: impl AsRef) -> io::Result { + WsMeta::connect(url, None) + .await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "can't connect to ws stream")) + .map(|(m, s)| Socket::WS((m, s.into_io()))) + } + pub async fn shutdown(&mut self) -> io::Result<()> { - #[cfg(feature = "_rt-async-std")] + #[cfg(all(feature = "_rt-async-std", not(target_arch = "wasm32")))] { use std::net::Shutdown; match self { Socket::Tcp(s) => s.shutdown(Shutdown::Both), - #[cfg(unix)] + #[cfg(all(unix, not(target_arch = "wasm32")))] Socket::Unix(s) => s.shutdown(Shutdown::Both), } } @@ -59,6 +78,15 @@ impl Socket { Socket::Unix(s) => s.shutdown().await, } } + + #[cfg(target_arch = "wasm32")] + { + let Socket::WS((m, _)) = self; + m.close() + .await + .map_err(|_| io::Error::new(io::ErrorKind::Other, "error closing ws stream")) + .map(|_| ()) + } } } @@ -69,9 +97,13 @@ impl AsyncRead for Socket { buf: &mut super::PollReadBuf<'_>, ) -> Poll> { match &mut *self { + #[cfg(not(target_arch = "wasm32"))] Socket::Tcp(s) => Pin::new(s).poll_read(cx, buf), - #[cfg(unix)] + #[cfg(target_arch = "wasm32")] + Socket::WS((_, s)) => Pin::new(s).poll_read(cx, buf), + + #[cfg(all(unix, not(target_arch = "wasm32")))] Socket::Unix(s) => Pin::new(s).poll_read(cx, buf), } } @@ -84,18 +116,28 @@ impl AsyncWrite for Socket { buf: &[u8], ) -> Poll> { match &mut *self { + #[cfg(not(target_arch = "wasm32"))] Socket::Tcp(s) => Pin::new(s).poll_write(cx, buf), - #[cfg(unix)] + #[cfg(target_arch = "wasm32")] + Socket::WS((_, s)) => Pin::new(s).poll_write(cx, buf), + + #[cfg(all(unix, not(target_arch = "wasm32")))] Socket::Unix(s) => Pin::new(s).poll_write(cx, buf), } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut *self { + #[cfg(not(target_arch = "wasm32"))] Socket::Tcp(s) => Pin::new(s).poll_flush(cx), - #[cfg(unix)] + #[cfg(target_arch = "wasm32")] + Socket::WS((_, s)) => Pin::new(s) + .poll_flush(cx) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "error flushing ws stream")), + + #[cfg(all(unix, not(target_arch = "wasm32")))] Socket::Unix(s) => Pin::new(s).poll_flush(cx), } } @@ -103,19 +145,32 @@ impl AsyncWrite for Socket { #[cfg(any(feature = "_rt-actix", feature = "_rt-tokio"))] fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut *self { + #[cfg(not(target_arch = "wasm32"))] Socket::Tcp(s) => Pin::new(s).poll_shutdown(cx), - #[cfg(unix)] + #[cfg(all(unix, not(target_arch = "wasm32")))] Socket::Unix(s) => Pin::new(s).poll_shutdown(cx), } } - #[cfg(feature = "_rt-async-std")] + #[cfg(all(feature = "_rt-async-std", not(target_arch = "wasm32")))] fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut *self { Socket::Tcp(s) => Pin::new(s).poll_close(cx), - #[cfg(unix)] + #[cfg(all(unix, not(target_arch = "wasm32")))] + Socket::Unix(s) => Pin::new(s).poll_close(cx), + } + } + + #[cfg(target_arch = "wasm32")] + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &mut *self { + Socket::WS((_, s)) => Pin::new(s) + .poll_close(cx) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "error closing ws stream")), + + #[cfg(all(unix, not(target_arch = "wasm32")))] Socket::Unix(s) => Pin::new(s).poll_close(cx), } } diff --git a/sqlx-core/src/pool/maybe.rs b/sqlx-core/src/pool/maybe.rs index 43c7f3457d..3064ba77f1 100644 --- a/sqlx-core/src/pool/maybe.rs +++ b/sqlx-core/src/pool/maybe.rs @@ -1,10 +1,13 @@ use crate::database::Database; + +#[cfg(not(target_arch = "wasm32"))] use crate::pool::PoolConnection; use std::ops::{Deref, DerefMut}; pub(crate) enum MaybePoolConnection<'c, DB: Database> { #[allow(dead_code)] Connection(&'c mut DB::Connection), + #[cfg(not(target_arch = "wasm32"))] PoolConnection(PoolConnection), } @@ -15,6 +18,7 @@ impl<'c, DB: Database> Deref for MaybePoolConnection<'c, DB> { fn deref(&self) -> &Self::Target { match self { MaybePoolConnection::Connection(v) => v, + #[cfg(not(target_arch = "wasm32"))] MaybePoolConnection::PoolConnection(v) => v, } } @@ -25,6 +29,7 @@ impl<'c, DB: Database> DerefMut for MaybePoolConnection<'c, DB> { fn deref_mut(&mut self) -> &mut Self::Target { match self { MaybePoolConnection::Connection(v) => v, + #[cfg(not(target_arch = "wasm32"))] MaybePoolConnection::PoolConnection(v) => v, } } @@ -33,6 +38,7 @@ impl<'c, DB: Database> DerefMut for MaybePoolConnection<'c, DB> { #[allow(unused_macros)] macro_rules! impl_into_maybe_pool { ($DB:ident, $C:ident) => { + #[cfg(not(target_arch = "wasm32"))] impl<'c> From> for crate::pool::MaybePoolConnection<'c, $DB> { diff --git a/sqlx-core/src/pool/mod.rs b/sqlx-core/src/pool/mod.rs index 400c07ba39..826e6534c6 100644 --- a/sqlx-core/src/pool/mod.rs +++ b/sqlx-core/src/pool/mod.rs @@ -54,28 +54,43 @@ //! [`Pool::acquire`] or //! [`Pool::begin`]. +#[cfg(not(target_arch = "wasm32"))] use self::inner::SharedPool; +#[cfg(not(target_arch = "wasm32"))] use crate::connection::Connection; +#[cfg(not(target_arch = "wasm32"))] use crate::database::Database; +#[cfg(not(target_arch = "wasm32"))] use crate::error::Error; +#[cfg(not(target_arch = "wasm32"))] use crate::transaction::Transaction; +#[cfg(not(target_arch = "wasm32"))] use std::fmt; +#[cfg(not(target_arch = "wasm32"))] use std::future::Future; +#[cfg(not(target_arch = "wasm32"))] use std::sync::Arc; +#[cfg(not(target_arch = "wasm32"))] use std::time::{Duration, Instant}; +#[cfg(not(target_arch = "wasm32"))] #[macro_use] mod executor; #[macro_use] mod maybe; +#[cfg(not(target_arch = "wasm32"))] mod connection; +#[cfg(not(target_arch = "wasm32"))] mod inner; +#[cfg(not(target_arch = "wasm32"))] mod options; +#[cfg(not(target_arch = "wasm32"))] pub use self::connection::PoolConnection; pub(crate) use self::maybe::MaybePoolConnection; +#[cfg(not(target_arch = "wasm32"))] pub use self::options::PoolOptions; /// An asynchronous pool of SQLx database connections. @@ -225,8 +240,11 @@ pub use self::options::PoolOptions; /// /// Depending on the database server, a connection will have caches for all kinds of other data as /// well and queries will generally benefit from these caches being "warm" (populated with data). + +#[cfg(not(target_arch = "wasm32"))] pub struct Pool(pub(crate) Arc>); +#[cfg(not(target_arch = "wasm32"))] impl Pool { /// Creates a new connection pool with a default pool configuration and /// the given connection URI; and, immediately establishes one connection. @@ -338,12 +356,14 @@ impl Pool { } /// Returns a new [Pool] tied to the same shared connection pool. +#[cfg(not(target_arch = "wasm32"))] impl Clone for Pool { fn clone(&self) -> Self { Self(Arc::clone(&self.0)) } } +#[cfg(not(target_arch = "wasm32"))] impl fmt::Debug for Pool { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Pool") @@ -358,6 +378,7 @@ impl fmt::Debug for Pool { /// get the time between the deadline and now and use that as our timeout /// /// returns `Error::PoolTimedOut` if the deadline is in the past +#[cfg(not(target_arch = "wasm32"))] fn deadline_as_timeout(deadline: Instant) -> Result { deadline .checked_duration_since(Instant::now()) diff --git a/sqlx-core/src/postgres/connection/describe.rs b/sqlx-core/src/postgres/connection/describe.rs index 6d7a3f7dc0..a14e2a1a69 100644 --- a/sqlx-core/src/postgres/connection/describe.rs +++ b/sqlx-core/src/postgres/connection/describe.rs @@ -8,7 +8,10 @@ use crate::query_as::query_as; use crate::query_scalar::{query_scalar, query_scalar_with}; use crate::types::Json; use crate::HashMap; +#[cfg(not(target_arch = "wasm32"))] use futures_core::future::BoxFuture; +#[cfg(target_arch = "wasm32")] +use futures_core::future::LocalBoxFuture as BoxFuture; use std::convert::TryFrom; use std::fmt::Write; use std::sync::Arc; diff --git a/sqlx-core/src/postgres/connection/establish.rs b/sqlx-core/src/postgres/connection/establish.rs index 59e3727c24..7bf760a2d5 100644 --- a/sqlx-core/src/postgres/connection/establish.rs +++ b/sqlx-core/src/postgres/connection/establish.rs @@ -3,7 +3,10 @@ use crate::HashMap; use crate::common::StatementCache; use crate::error::Error; use crate::io::Decode; -use crate::postgres::connection::{sasl, stream::PgStream, tls}; +#[cfg(not(target_arch = "wasm32"))] +use crate::postgres::connection::tls; +use crate::postgres::connection::{sasl, stream::PgStream}; + use crate::postgres::message::{ Authentication, BackendKeyData, MessageFormat, Password, ReadyForQuery, Startup, }; @@ -17,6 +20,7 @@ impl PgConnection { let mut stream = PgStream::connect(options).await?; // Upgrade to TLS if we were asked to and the server supports it + #[cfg(not(target_arch = "wasm32"))] tls::maybe_upgrade(&mut stream, options).await?; // To begin a session, a frontend opens a connection to the server diff --git a/sqlx-core/src/postgres/connection/executor.rs b/sqlx-core/src/postgres/connection/executor.rs index a1e27a6a8b..33d3948989 100644 --- a/sqlx-core/src/postgres/connection/executor.rs +++ b/sqlx-core/src/postgres/connection/executor.rs @@ -1,6 +1,7 @@ use crate::describe::Describe; use crate::error::Error; use crate::executor::{Execute, Executor}; +#[cfg(not(target_arch = "wasm32"))] use crate::logger::QueryLogger; use crate::postgres::message::{ self, Bind, Close, CommandComplete, DataRow, MessageFormat, ParameterDescription, Parse, Query, @@ -13,8 +14,17 @@ use crate::postgres::{ PgValueFormat, Postgres, }; use either::Either; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::future::BoxFuture; +#[cfg(target_arch = "wasm32")] +use futures_core::future::LocalBoxFuture as BoxFuture; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::stream::BoxStream; +#[cfg(target_arch = "wasm32")] +use futures_core::stream::LocalBoxStream as BoxStream; + use futures_core::Stream; use futures_util::{pin_mut, TryStreamExt}; use std::{borrow::Cow, sync::Arc}; @@ -199,6 +209,7 @@ impl PgConnection { persistent: bool, metadata_opt: Option>, ) -> Result, Error>> + 'e, Error> { + #[cfg(not(target_arch = "wasm32"))] let mut logger = QueryLogger::new(query, self.log_settings.clone()); // before we continue, wait until we are "ready" to accept more queries @@ -301,6 +312,7 @@ impl PgConnection { } MessageFormat::DataRow => { + #[cfg(not(target_arch = "wasm32"))] logger.increment_rows(); // one of the set of rows returned by a SELECT, FETCH, etc query diff --git a/sqlx-core/src/postgres/connection/mod.rs b/sqlx-core/src/postgres/connection/mod.rs index 5241af0210..09b2c02bd2 100644 --- a/sqlx-core/src/postgres/connection/mod.rs +++ b/sqlx-core/src/postgres/connection/mod.rs @@ -2,7 +2,12 @@ use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; use crate::HashMap; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::future::BoxFuture; +#[cfg(target_arch = "wasm32")] +use futures_core::future::LocalBoxFuture as BoxFuture; + use futures_util::{FutureExt, TryFutureExt}; use crate::common::StatementCache; @@ -25,6 +30,8 @@ mod establish; mod executor; mod sasl; mod stream; + +#[cfg(not(target_arch = "wasm32"))] mod tls; /// A connection to a PostgreSQL database. @@ -129,10 +136,16 @@ impl Connection for PgConnection { }) } + #[cfg(not(target_arch = "wasm32"))] fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { // By sending a comment we avoid an error if the connection was in the middle of a rowset self.execute("/* SQLx ping */").map_ok(|_| ()).boxed() } + #[cfg(target_arch = "wasm32")] + fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { + // By sending a comment we avoid an error if the connection was in the middle of a rowset + self.execute("/* SQLx ping */").map_ok(|_| ()).boxed_local() + } fn begin(&mut self) -> BoxFuture<'_, Result, Error>> where @@ -169,9 +182,14 @@ impl Connection for PgConnection { } #[doc(hidden)] + #[cfg(not(target_arch = "wasm32"))] fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> { self.wait_until_ready().boxed() } + #[cfg(target_arch = "wasm32")] + fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> { + self.wait_until_ready().boxed_local() + } #[doc(hidden)] fn should_flush(&self) -> bool { @@ -190,6 +208,7 @@ impl PgConnectionInfo for PgConnection { } } +#[cfg(not(target_arch = "wasm32"))] impl PgConnectionInfo for crate::pool::PoolConnection { fn server_version_num(&self) -> Option { self.stream.server_version_num diff --git a/sqlx-core/src/postgres/connection/stream.rs b/sqlx-core/src/postgres/connection/stream.rs index 950d8d0969..2b3d8c3b70 100644 --- a/sqlx-core/src/postgres/connection/stream.rs +++ b/sqlx-core/src/postgres/connection/stream.rs @@ -9,7 +9,12 @@ use log::Level; use crate::error::Error; use crate::io::{BufStream, Decode, Encode}; -use crate::net::{MaybeTlsStream, Socket}; + +#[cfg(not(target_arch = "wasm32"))] +use crate::net::MaybeTlsStream; + +use crate::net::Socket; + use crate::postgres::message::{Message, MessageFormat, Notice, Notification, ParameterStatus}; use crate::postgres::{PgConnectOptions, PgDatabaseError, PgSeverity}; @@ -23,8 +28,10 @@ use crate::postgres::{PgConnectOptions, PgDatabaseError, PgSeverity}; // is fully prepared to receive queries pub struct PgStream { + #[cfg(not(target_arch = "wasm32"))] inner: BufStream>, - + #[cfg(target_arch = "wasm32")] + inner: BufStream, // buffer of unreceived notification messages from `PUBLISH` // this is set when creating a PgListener and only written to if that listener is // re-used for query execution in-between receiving messages @@ -37,19 +44,37 @@ pub struct PgStream { impl PgStream { pub(super) async fn connect(options: &PgConnectOptions) -> Result { - let socket = match options.fetch_socket() { - Some(ref path) => Socket::connect_uds(path).await?, - None => Socket::connect_tcp(&options.host, options.port).await?, - }; - - let inner = BufStream::new(MaybeTlsStream::Raw(socket)); + #[cfg(target_arch = "wasm32")] + { + let socket = match options.fetch_socket() { + Some(ref path) => Socket::connect_ws(path).await?, + None => return Err(Error::Configuration("no ws url set".into())), + }; + let inner = BufStream::new(socket); + + Ok(Self { + inner, + notifications: None, + parameter_statuses: BTreeMap::default(), + server_version_num: None, + }) + } - Ok(Self { - inner, - notifications: None, - parameter_statuses: BTreeMap::default(), - server_version_num: None, - }) + #[cfg(not(target_arch = "wasm32"))] + { + let socket = match options.fetch_socket() { + Some(ref path) => Socket::connect_uds(path).await?, + None => Socket::connect_tcp(&options.host, options.port).await?, + }; + let inner = BufStream::new(MaybeTlsStream::Raw(socket)); + + Ok(Self { + inner, + notifications: None, + parameter_statuses: BTreeMap::default(), + server_version_num: None, + }) + } } pub(crate) async fn send<'en, T>(&mut self, message: T) -> Result<(), Error> @@ -170,7 +195,10 @@ impl PgStream { } impl Deref for PgStream { + #[cfg(not(target_arch = "wasm32"))] type Target = BufStream>; + #[cfg(target_arch = "wasm32")] + type Target = BufStream; #[inline] fn deref(&self) -> &Self::Target { diff --git a/sqlx-core/src/postgres/copy.rs b/sqlx-core/src/postgres/copy.rs index babdecca47..ddff8ce5d0 100644 --- a/sqlx-core/src/postgres/copy.rs +++ b/sqlx-core/src/postgres/copy.rs @@ -1,13 +1,20 @@ use crate::error::{Error, Result}; use crate::ext::async_stream::TryAsyncStream; +#[cfg(not(target_arch = "wasm32"))] use crate::pool::{Pool, PoolConnection}; use crate::postgres::connection::PgConnection; use crate::postgres::message::{ CommandComplete, CopyData, CopyDone, CopyFail, CopyResponse, MessageFormat, Query, }; +#[cfg(not(target_arch = "wasm32"))] use crate::postgres::Postgres; use bytes::{BufMut, Bytes}; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::stream::BoxStream; +#[cfg(target_arch = "wasm32")] +use futures_core::stream::LocalBoxStream as BoxStream; + use smallvec::alloc::borrow::Cow; use sqlx_rt::{AsyncRead, AsyncReadExt, AsyncWriteExt}; use std::convert::TryFrom; @@ -59,6 +66,7 @@ impl PgConnection { } } +#[cfg(not(target_arch = "wasm32"))] impl Pool { /// Issue a `COPY FROM STDIN` statement and begin streaming data to Postgres. /// This is a more efficient way to import data into Postgres as compared to diff --git a/sqlx-core/src/postgres/mod.rs b/sqlx-core/src/postgres/mod.rs index cac6c004b7..8482315d23 100644 --- a/sqlx-core/src/postgres/mod.rs +++ b/sqlx-core/src/postgres/mod.rs @@ -9,7 +9,12 @@ mod copy; mod database; mod error; mod io; + +#[cfg(not(target_arch = "wasm32"))] mod listener; +#[cfg(target_arch = "wasm32")] +mod ws_listener; + mod message; mod options; mod query_result; @@ -20,7 +25,7 @@ mod type_info; pub mod types; mod value; -#[cfg(feature = "migrate")] +#[cfg(all(feature = "migrate", not(target_arch = "wasm32")))] mod migrate; pub use arguments::{PgArgumentBuffer, PgArguments}; @@ -29,7 +34,12 @@ pub use connection::{PgConnection, PgConnectionInfo}; pub use copy::PgCopyIn; pub use database::Postgres; pub use error::{PgDatabaseError, PgErrorPosition}; + +#[cfg(not(target_arch = "wasm32"))] pub use listener::{PgListener, PgNotification}; +#[cfg(target_arch = "wasm32")] +pub use ws_listener::PgListener; + pub use message::PgSeverity; pub use options::{PgConnectOptions, PgSslMode}; pub use query_result::PgQueryResult; @@ -40,9 +50,11 @@ pub use type_info::{PgTypeInfo, PgTypeKind}; pub use value::{PgValue, PgValueFormat, PgValueRef}; /// An alias for [`Pool`][crate::pool::Pool], specialized for Postgres. +#[cfg(not(target_arch = "wasm32"))] pub type PgPool = crate::pool::Pool; /// An alias for [`PoolOptions`][crate::pool::PoolOptions], specialized for Postgres. +#[cfg(not(target_arch = "wasm32"))] pub type PgPoolOptions = crate::pool::PoolOptions; /// An alias for [`Executor<'_, Database = Postgres>`][Executor]. @@ -50,8 +62,12 @@ pub trait PgExecutor<'c>: Executor<'c, Database = Postgres> {} impl<'c, T: Executor<'c, Database = Postgres>> PgExecutor<'c> for T {} impl_into_arguments_for_arguments!(PgArguments); + +#[cfg(not(target_arch = "wasm32"))] impl_executor_for_pool_connection!(Postgres, PgConnection, PgRow); impl_executor_for_transaction!(Postgres, PgRow); + +#[cfg(not(target_arch = "wasm32"))] impl_acquire!(Postgres, PgConnection); impl_column_index_for_row!(PgRow); impl_column_index_for_statement!(PgStatement); diff --git a/sqlx-core/src/postgres/options/connect.rs b/sqlx-core/src/postgres/options/connect.rs index 5c98598dd6..c9f4b02344 100644 --- a/sqlx-core/src/postgres/options/connect.rs +++ b/sqlx-core/src/postgres/options/connect.rs @@ -1,7 +1,12 @@ use crate::connection::ConnectOptions; use crate::error::Error; use crate::postgres::{PgConnectOptions, PgConnection}; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::future::BoxFuture; +#[cfg(target_arch = "wasm32")] +use futures_core::future::LocalBoxFuture as BoxFuture; + use log::LevelFilter; use std::time::Duration; diff --git a/sqlx-core/src/postgres/options/mod.rs b/sqlx-core/src/postgres/options/mod.rs index 1770959fdf..37c85a768e 100644 --- a/sqlx-core/src/postgres/options/mod.rs +++ b/sqlx-core/src/postgres/options/mod.rs @@ -5,7 +5,12 @@ mod connect; mod parse; mod pgpass; mod ssl_mode; -use crate::{connection::LogSettings, net::CertificateInput}; + +#[cfg(not(target_arch = "wasm32"))] +use crate::net::CertificateInput; + +use crate::connection::LogSettings; + pub use ssl_mode::PgSslMode; /// Options and flags which can be used to configure a PostgreSQL connection. @@ -81,10 +86,14 @@ pub struct PgConnectOptions { pub(crate) password: Option, pub(crate) database: Option, pub(crate) ssl_mode: PgSslMode, + #[cfg(not(target_arch = "wasm32"))] pub(crate) ssl_root_cert: Option, pub(crate) statement_cache_capacity: usize, pub(crate) application_name: Option, pub(crate) log_settings: LogSettings, + + #[cfg(target_arch = "wasm32")] + pub(crate) ws_url: Option, } impl Default for PgConnectOptions { @@ -137,6 +146,8 @@ impl PgConnectOptions { username, password, database, + + #[cfg(not(target_arch = "wasm32"))] ssl_root_cert: var("PGSSLROOTCERT").ok().map(CertificateInput::from), ssl_mode: var("PGSSLMODE") .ok() @@ -145,6 +156,8 @@ impl PgConnectOptions { statement_cache_capacity: 100, application_name: var("PGAPPNAME").ok(), log_settings: Default::default(), + #[cfg(target_arch = "wasm32")] + ws_url: None, } } @@ -240,6 +253,14 @@ impl PgConnectOptions { self } + /// Sets the websocket url. + /// + #[cfg(target_arch = "wasm32")] + pub fn ws_url(mut self) -> Self { + self.ws_url = Some(format!("ws://{}:{}", self.host, self.port)); + self + } + /// Sets whether or with what priority a secure SSL TCP/IP connection will be negotiated /// with the server. /// @@ -273,6 +294,7 @@ impl PgConnectOptions { /// .ssl_mode(PgSslMode::VerifyCa) /// .ssl_root_cert("./ca-certificate.crt"); /// ``` + #[cfg(not(target_arch = "wasm32"))] pub fn ssl_root_cert(mut self, cert: impl AsRef) -> Self { self.ssl_root_cert = Some(CertificateInput::File(cert.as_ref().to_path_buf())); self @@ -289,6 +311,7 @@ impl PgConnectOptions { /// .ssl_mode(PgSslMode::VerifyCa) /// .ssl_root_cert_from_pem(vec![]); /// ``` + #[cfg(not(target_arch = "wasm32"))] pub fn ssl_root_cert_from_pem(mut self, pem_certificate: Vec) -> Self { self.ssl_root_cert = Some(CertificateInput::Inline(pem_certificate)); self @@ -322,16 +345,24 @@ impl PgConnectOptions { /// We try using a socket if hostname starts with `/` or if socket parameter /// is specified. pub(crate) fn fetch_socket(&self) -> Option { - match self.socket { - Some(ref socket) => { - let full_path = format!("{}/.s.PGSQL.{}", socket.display(), self.port); - Some(full_path) - } - None if self.host.starts_with('/') => { - let full_path = format!("{}/.s.PGSQL.{}", self.host, self.port); - Some(full_path) + #[cfg(target_arch = "wasm32")] + { + self.ws_url.as_ref().cloned() + } + + #[cfg(not(target_arch = "wasm32"))] + { + match self.socket { + Some(ref socket) => { + let full_path = format!("{}/.s.PGSQL.{}", socket.display(), self.port); + Some(full_path) + } + None if self.host.starts_with('/') => { + let full_path = format!("{}/.s.PGSQL.{}", self.host, self.port); + Some(full_path) + } + _ => None, } - _ => None, } } } diff --git a/sqlx-core/src/postgres/options/parse.rs b/sqlx-core/src/postgres/options/parse.rs index 5c5cd71ee8..8971283a0d 100644 --- a/sqlx-core/src/postgres/options/parse.rs +++ b/sqlx-core/src/postgres/options/parse.rs @@ -25,6 +25,11 @@ impl FromStr for PgConnectOptions { options = options.port(port); } + #[cfg(target_arch = "wasm32")] + { + options = options.ws_url(); + } + let username = url.username(); if !username.is_empty() { options = options.username( @@ -53,6 +58,7 @@ impl FromStr for PgConnectOptions { options = options.ssl_mode(value.parse().map_err(Error::config)?); } + #[cfg(not(target_arch = "wasm32"))] "sslrootcert" | "ssl-root-cert" | "ssl-ca" => { options = options.ssl_root_cert(&*value); } diff --git a/sqlx-core/src/postgres/transaction.rs b/sqlx-core/src/postgres/transaction.rs index efb11b8223..e88bec3386 100644 --- a/sqlx-core/src/postgres/transaction.rs +++ b/sqlx-core/src/postgres/transaction.rs @@ -1,4 +1,7 @@ +#[cfg(not(target_arch = "wasm32"))] use futures_core::future::BoxFuture; +#[cfg(target_arch = "wasm32")] +use futures_core::future::LocalBoxFuture as BoxFuture; use crate::error::Error; use crate::executor::Executor; diff --git a/sqlx-core/src/postgres/ws_listener.rs b/sqlx-core/src/postgres/ws_listener.rs new file mode 100644 index 0000000000..4fe9c99b18 --- /dev/null +++ b/sqlx-core/src/postgres/ws_listener.rs @@ -0,0 +1,355 @@ +use crate::describe::Describe; +use crate::executor::{Execute, Executor}; +use crate::postgres::message::{MessageFormat, Notification}; +use crate::postgres::{PgConnection, PgQueryResult, PgRow, PgStatement, PgTypeInfo, Postgres}; +use crate::{connection::Connection, error::Error}; +use either::Either; +use futures_channel::mpsc; +use futures_core::future::LocalBoxFuture as BoxFuture; +use futures_core::stream::LocalBoxStream as BoxStream; +use futures_core::stream::Stream; +use std::fmt::{self, Debug}; +use std::io; +use std::str::from_utf8; + +/// Represents a connection to a Postgres db over a websocket connection +pub struct PgListener { + connection: Option, + buffer_rx: mpsc::UnboundedReceiver, + buffer_tx: Option>, + channels: Vec, + url: String, +} + +/// An asynchronous notification from Postgres. +pub struct PgNotification(Notification); + +impl PgListener { + /// Connects to a PG instance over a websocket connection + pub async fn connect(url: &str) -> Result { + let mut connection = PgConnection::connect(url).await?; + let (sender, receiver) = mpsc::unbounded(); + connection.stream.notifications = Some(sender); + + Ok(Self { + connection: Some(connection), + buffer_rx: receiver, + buffer_tx: None, + channels: Vec::new(), + url: url.into(), + }) + } + + /// Starts listening for notifications on a channel. + /// The channel name is quoted here to ensure case sensitivity. + pub async fn listen(&mut self, channel: &str) -> Result<(), Error> { + self.connection() + .execute(&*format!(r#"LISTEN "{}""#, ident(channel))) + .await?; + + self.channels.push(channel.to_owned()); + + Ok(()) + } + + /// Starts listening for notifications on all channels. + pub async fn listen_all( + &mut self, + channels: impl IntoIterator, + ) -> Result<(), Error> { + let beg = self.channels.len(); + self.channels.extend(channels.into_iter().map(|s| s.into())); + + self.connection + .as_mut() + .unwrap() + .execute(&*build_listen_all_query(&self.channels[beg..])) + .await?; + + Ok(()) + } + + /// Stops listening for notifications on a channel. + /// The channel name is quoted here to ensure case sensitivity. + pub async fn unlisten(&mut self, channel: &str) -> Result<(), Error> { + self.connection() + .execute(&*format!(r#"UNLISTEN "{}""#, ident(channel))) + .await?; + + if let Some(pos) = self.channels.iter().position(|s| s == channel) { + self.channels.remove(pos); + } + + Ok(()) + } + + /// Stops listening for notifications on all channels. + pub async fn unlisten_all(&mut self) -> Result<(), Error> { + self.connection().execute("UNLISTEN *").await?; + + self.channels.clear(); + + Ok(()) + } + + #[inline] + async fn connect_if_needed(&mut self) -> Result<(), Error> { + if self.connection.is_none() { + let mut connection = PgConnection::connect(&self.url).await?; + connection.stream.notifications = self.buffer_tx.take(); + + connection + .execute(&*build_listen_all_query(&self.channels)) + .await?; + + self.connection = Some(connection); + } + + Ok(()) + } + + #[inline] + fn connection(&mut self) -> &mut PgConnection { + self.connection.as_mut().unwrap() + } + + /// Receives the next notification available from any of the subscribed channels. + /// + /// If the connection to PostgreSQL is lost, it is automatically reconnected on the next + /// call to `recv()`, and should be entirely transparent (as long as it was just an + /// intermittent network failure or long-lived connection reaper). + /// + /// As notifications are transient, any received while the connection was lost, will not + /// be returned. If you'd prefer the reconnection to be explicit and have a chance to + /// do something before, please see [`try_recv`](Self::try_recv). + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx_core::postgres::PgListener; + /// # use sqlx_core::error::Error; + /// # + /// # #[cfg(feature = "_rt-async-std")] + /// # sqlx_rt::block_on::<_, Result<(), Error>>(async move { + /// # let mut listener = PgListener::connect("postgres:// ...").await?; + /// loop { + /// // ask for next notification, re-connecting (transparently) if needed + /// let notification = listener.recv().await?; + /// + /// // handle notification, do something interesting + /// } + /// # Ok(()) + /// # }).unwrap(); + /// ``` + pub async fn recv(&mut self) -> Result { + loop { + if let Some(notification) = self.try_recv().await? { + return Ok(notification); + } + } + } + + /// Receives the next notification available from any of the subscribed channels. + /// + /// If the connection to PostgreSQL is lost, `None` is returned, and the connection is + /// reconnected on the next call to `try_recv()`. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx_core::postgres::PgListener; + /// # use sqlx_core::error::Error; + /// # + /// # #[cfg(feature = "_rt-async-std")] + /// # sqlx_rt::block_on::<_, Result<(), Error>>(async move { + /// # let mut listener = PgListener::connect("postgres:// ...").await?; + /// loop { + /// // start handling notifications, connecting if needed + /// while let Some(notification) = listener.try_recv().await? { + /// // handle notification + /// } + /// + /// // connection lost, do something interesting + /// } + /// # Ok(()) + /// # }).unwrap(); + /// ``` + pub async fn try_recv(&mut self) -> Result, Error> { + // Flush the buffer first, if anything + // This would only fill up if this listener is used as a connection + if let Ok(Some(notification)) = self.buffer_rx.try_next() { + return Ok(Some(PgNotification(notification))); + } + + loop { + // Ensure we have an active connection to work with. + self.connect_if_needed().await?; + + let message = match self.connection().stream.recv_unchecked().await { + Ok(message) => message, + + // The connection is dead, ensure that it is dropped, + // update self state, and loop to try again. + Err(Error::Io(err)) if err.kind() == io::ErrorKind::ConnectionAborted => { + self.buffer_tx = self.connection().stream.notifications.take(); + self.connection = None; + + // lost connection + return Ok(None); + } + + // Forward other errors + Err(error) => { + return Err(error); + } + }; + + match message.format { + // We've received an async notification, return it. + MessageFormat::NotificationResponse => { + return Ok(Some(PgNotification(message.decode()?))); + } + + // Mark the connection as ready for another query + MessageFormat::ReadyForQuery => { + self.connection().pending_ready_for_query_count -= 1; + } + + // Ignore unexpected messages + _ => {} + } + } + } + + /// Consume this listener, returning a `Stream` of notifications. + /// + /// The backing connection will be automatically reconnected should it be lost. + /// + /// This has the same potential drawbacks as [`recv`](PgListener::recv). + /// + pub fn into_stream(mut self) -> impl Stream> + Unpin { + Box::pin(try_stream! { + loop { + r#yield!(self.recv().await?); + } + }) + } +} + +impl<'c> Executor<'c> for &'c mut PgListener { + type Database = Postgres; + + fn fetch_many<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> BoxStream<'e, Result, Error>> + where + 'c: 'e, + E: Execute<'q, Self::Database>, + { + self.connection().fetch_many(query) + } + + fn fetch_optional<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> BoxFuture<'e, Result, Error>> + where + 'c: 'e, + E: Execute<'q, Self::Database>, + { + self.connection().fetch_optional(query) + } + + fn prepare_with<'e, 'q: 'e>( + self, + query: &'q str, + parameters: &'e [PgTypeInfo], + ) -> BoxFuture<'e, Result, Error>> + where + 'c: 'e, + { + self.connection().prepare_with(query, parameters) + } + + #[doc(hidden)] + fn describe<'e, 'q: 'e>( + self, + query: &'q str, + ) -> BoxFuture<'e, Result, Error>> + where + 'c: 'e, + { + self.connection().describe(query) + } +} + +impl PgNotification { + /// The process ID of the notifying backend process. + #[inline] + pub fn process_id(&self) -> u32 { + self.0.process_id + } + + /// The channel that the notify has been raised on. This can be thought + /// of as the message topic. + #[inline] + pub fn channel(&self) -> &str { + from_utf8(&self.0.channel).unwrap() + } + + /// The payload of the notification. An empty payload is received as an + /// empty string. + #[inline] + pub fn payload(&self) -> &str { + from_utf8(&self.0.payload).unwrap() + } +} + +impl Debug for PgListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PgListener").finish() + } +} + +impl Debug for PgNotification { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PgNotification") + .field("process_id", &self.process_id()) + .field("channel", &self.channel()) + .field("payload", &self.payload()) + .finish() + } +} + +fn ident(mut name: &str) -> String { + // If the input string contains a NUL byte, we should truncate the + // identifier. + if let Some(index) = name.find('\0') { + name = &name[..index]; + } + + // Any double quotes must be escaped + name.replace('"', "\"\"") +} + +fn build_listen_all_query(channels: impl IntoIterator>) -> String { + channels.into_iter().fold(String::new(), |mut acc, chan| { + acc.push_str(r#"LISTEN ""#); + acc.push_str(&ident(chan.as_ref())); + acc.push_str(r#"";"#); + acc + }) +} + +#[test] +fn test_build_listen_all_query_with_single_channel() { + let output = build_listen_all_query(&["test"]); + assert_eq!(output.as_str(), r#"LISTEN "test";"#); +} + +#[test] +fn test_build_listen_all_query_with_multiple_channels() { + let output = build_listen_all_query(&["channel.0", "channel.1"]); + assert_eq!(output.as_str(), r#"LISTEN "channel.0";LISTEN "channel.1";"#); +} diff --git a/sqlx-core/src/query.rs b/sqlx-core/src/query.rs index b3e30dc52c..ad01f24e4c 100644 --- a/sqlx-core/src/query.rs +++ b/sqlx-core/src/query.rs @@ -1,7 +1,12 @@ use std::marker::PhantomData; use either::Either; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::stream::BoxStream; +#[cfg(target_arch = "wasm32")] +use futures_core::stream::LocalBoxStream as BoxStream; + use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt}; use crate::arguments::{Arguments, IntoArguments}; @@ -36,6 +41,7 @@ pub struct Map<'q, DB: Database, F, A> { mapper: F, } +#[cfg(not(target_arch = "wasm32"))] impl<'q, DB, A> Execute<'q, DB> for Query<'q, DB, A> where DB: Database, @@ -67,6 +73,38 @@ where } } +#[cfg(target_arch = "wasm32")] +impl<'q, DB, A> Execute<'q, DB> for Query<'q, DB, A> +where + DB: Database, + A: IntoArguments<'q, DB>, +{ + #[inline] + fn sql(&self) -> &'q str { + match self.statement { + Either::Right(ref statement) => statement.sql(), + Either::Left(sql) => sql, + } + } + + fn statement(&self) -> Option<&>::Statement> { + match self.statement { + Either::Right(ref statement) => Some(&statement), + Either::Left(_) => None, + } + } + + #[inline] + fn take_arguments(&mut self) -> Option<>::Arguments> { + self.arguments.take().map(IntoArguments::into_arguments) + } + + #[inline] + fn persistent(&self) -> bool { + self.persistent + } +} + impl<'q, DB: Database> Query<'q, DB, >::Arguments> { /// Bind a value for use with this SQL query. /// @@ -76,6 +114,7 @@ impl<'q, DB: Database> Query<'q, DB, >::Arguments> { /// /// There is no validation that the value is of the type expected by the query. Most SQL /// flavors will perform type coercion (Postgres will return a database error). + #[cfg(not(target_arch = "wasm32"))] pub fn bind + Type>(mut self, value: T) -> Self { if let Some(arguments) = &mut self.arguments { arguments.add(value); @@ -83,6 +122,15 @@ impl<'q, DB: Database> Query<'q, DB, >::Arguments> { self } + + #[cfg(target_arch = "wasm32")] + pub fn bind + Type>(mut self, value: T) -> Self { + if let Some(arguments) = &mut self.arguments { + arguments.add(value); + } + + self + } } impl<'q, DB, A> Query<'q, DB, A> @@ -103,6 +151,7 @@ where } } +#[cfg(not(target_arch = "wasm32"))] impl<'q, DB, A: Send> Query<'q, DB, A> where DB: Database, @@ -227,6 +276,129 @@ where } } +#[cfg(target_arch = "wasm32")] +impl<'q, DB, A> Query<'q, DB, A> +where + DB: Database, + A: 'q + IntoArguments<'q, DB>, +{ + /// Map each row in the result to another type. + /// + /// See [`try_map`](Query::try_map) for a fallible version of this method. + /// + /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using + /// a [`FromRow`](super::from_row::FromRow) implementation. + #[inline] + pub fn map(self, mut f: F) -> Map<'q, DB, impl FnMut(DB::Row) -> Result, A> + where + F: FnMut(DB::Row) -> O, + O: Unpin, + { + self.try_map(move |row| Ok(f(row))) + } + + /// Map each row in the result to another type. + /// + /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using + /// a [`FromRow`](super::from_row::FromRow) implementation. + #[inline] + pub fn try_map(self, f: F) -> Map<'q, DB, F, A> + where + F: FnMut(DB::Row) -> Result, + O: Unpin, + { + Map { + inner: self, + mapper: f, + } + } + + /// Execute the query and return the total number of rows affected. + #[inline] + pub async fn execute<'e, 'c: 'e, E>(self, executor: E) -> Result + where + 'q: 'e, + A: 'e, + E: Executor<'c, Database = DB>, + { + executor.execute(self).await + } + + /// Execute multiple queries and return the rows affected from each query, in a stream. + #[inline] + pub async fn execute_many<'e, 'c: 'e, E>( + self, + executor: E, + ) -> BoxStream<'e, Result> + where + 'q: 'e, + A: 'e, + E: Executor<'c, Database = DB>, + { + executor.execute_many(self) + } + + /// Execute the query and return the generated results as a stream. + #[inline] + pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result> + where + 'q: 'e, + A: 'e, + E: Executor<'c, Database = DB>, + { + executor.fetch(self) + } + + /// Execute multiple queries and return the generated results as a stream + /// from each query, in a stream. + #[inline] + pub fn fetch_many<'e, 'c: 'e, E>( + self, + executor: E, + ) -> BoxStream<'e, Result, Error>> + where + 'q: 'e, + A: 'e, + E: Executor<'c, Database = DB>, + { + executor.fetch_many(self) + } + + /// Execute the query and return all the generated results, collected into a [`Vec`]. + #[inline] + pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result, Error> + where + 'q: 'e, + A: 'e, + E: Executor<'c, Database = DB>, + { + executor.fetch_all(self).await + } + + /// Execute the query and returns exactly one row. + #[inline] + pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result + where + 'q: 'e, + A: 'e, + E: Executor<'c, Database = DB>, + { + executor.fetch_one(self).await + } + + /// Execute the query and returns at most one row. + #[inline] + pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result, Error> + where + 'q: 'e, + A: 'e, + E: Executor<'c, Database = DB>, + { + executor.fetch_optional(self).await + } +} + +#[cfg(not(target_arch = "wasm32"))] impl<'q, DB, F: Send, A: Send> Execute<'q, DB> for Map<'q, DB, F, A> where DB: Database, @@ -253,6 +425,34 @@ where } } +#[cfg(target_arch = "wasm32")] +impl<'q, DB, F, A> Execute<'q, DB> for Map<'q, DB, F, A> +where + DB: Database, + A: IntoArguments<'q, DB>, +{ + #[inline] + fn sql(&self) -> &'q str { + self.inner.sql() + } + + #[inline] + fn statement(&self) -> Option<&>::Statement> { + self.inner.statement() + } + + #[inline] + fn take_arguments(&mut self) -> Option<>::Arguments> { + self.inner.take_arguments() + } + + #[inline] + fn persistent(&self) -> bool { + self.inner.arguments.is_some() + } +} + +#[cfg(not(target_arch = "wasm32"))] impl<'q, DB, F, O, A> Map<'q, DB, F, A> where DB: Database, @@ -394,6 +594,142 @@ where } } +#[cfg(target_arch = "wasm32")] +impl<'q, DB, F, O, A> Map<'q, DB, F, A> +where + DB: Database, + F: FnMut(DB::Row) -> Result, + O: Unpin, + A: 'q + IntoArguments<'q, DB>, +{ + /// Map each row in the result to another type. + /// + /// See [`try_map`](Map::try_map) for a fallible version of this method. + /// + /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using + /// a [`FromRow`](super::from_row::FromRow) implementation. + #[inline] + pub fn map(self, mut g: G) -> Map<'q, DB, impl FnMut(DB::Row) -> Result, A> + where + G: FnMut(O) -> P, + P: Unpin, + { + self.try_map(move |data| Ok(g(data))) + } + + /// Map each row in the result to another type. + /// + /// The [`query_as`](super::query_as::query_as) method will construct a mapped query using + /// a [`FromRow`](super::from_row::FromRow) implementation. + #[inline] + pub fn try_map(self, mut g: G) -> Map<'q, DB, impl FnMut(DB::Row) -> Result, A> + where + G: FnMut(O) -> Result, + P: Unpin, + { + let mut f = self.mapper; + Map { + inner: self.inner, + mapper: move |row| f(row).and_then(|o| g(o)), + } + } + + /// Execute the query and return the generated results as a stream. + pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + F: 'e, + O: 'e, + { + self.fetch_many(executor) + .try_filter_map(|step| async move { + Ok(match step { + Either::Left(_) => None, + Either::Right(o) => Some(o), + }) + }) + .boxed_local() + } + + /// Execute multiple queries and return the generated results as a stream + /// from each query, in a stream. + pub fn fetch_many<'e, 'c: 'e, E>( + mut self, + executor: E, + ) -> BoxStream<'e, Result, Error>> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + F: 'e, + O: 'e, + { + Box::pin(try_stream! { + let mut s = executor.fetch_many(self.inner); + + while let Some(v) = s.try_next().await? { + r#yield!(match v { + Either::Left(v) => Either::Left(v), + Either::Right(row) => { + Either::Right((self.mapper)(row)?) + } + }); + } + + Ok(()) + }) + } + + /// Execute the query and return all the generated results, collected into a [`Vec`]. + pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result, Error> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + F: 'e, + O: 'e, + { + self.fetch(executor).try_collect().await + } + + /// Execute the query and returns exactly one row. + pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + F: 'e, + O: 'e, + { + self.fetch_optional(executor) + .and_then(|row| match row { + Some(row) => future::ok(row), + None => future::err(Error::RowNotFound), + }) + .await + } + + /// Execute the query and returns at most one row. + pub async fn fetch_optional<'e, 'c: 'e, E>(mut self, executor: E) -> Result, Error> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + F: 'e, + O: 'e, + { + let row = executor.fetch_optional(self.inner).await?; + + if let Some(row) = row { + (self.mapper)(row).map(Some) + } else { + Ok(None) + } + } +} + // Make a SQL query from a statement. pub(crate) fn query_statement<'q, DB>( statement: &'q >::Statement, diff --git a/sqlx-core/src/query_as.rs b/sqlx-core/src/query_as.rs index 60d06698bc..e23d3f2581 100644 --- a/sqlx-core/src/query_as.rs +++ b/sqlx-core/src/query_as.rs @@ -1,7 +1,12 @@ use std::marker::PhantomData; use either::Either; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::stream::BoxStream; +#[cfg(target_arch = "wasm32")] +use futures_core::stream::LocalBoxStream as BoxStream; + use futures_util::{StreamExt, TryStreamExt}; use crate::arguments::IntoArguments; @@ -21,6 +26,7 @@ pub struct QueryAs<'q, DB: Database, O, A> { pub(crate) output: PhantomData, } +#[cfg(not(target_arch = "wasm32"))] impl<'q, DB, O: Send, A: Send> Execute<'q, DB> for QueryAs<'q, DB, O, A> where DB: Database, @@ -47,14 +53,48 @@ where } } +#[cfg(target_arch = "wasm32")] +impl<'q, DB, O: Send, A> Execute<'q, DB> for QueryAs<'q, DB, O, A> +where + DB: Database, + A: 'q + IntoArguments<'q, DB>, +{ + #[inline] + fn sql(&self) -> &'q str { + self.inner.sql() + } + + #[inline] + fn statement(&self) -> Option<&>::Statement> { + self.inner.statement() + } + + #[inline] + fn take_arguments(&mut self) -> Option<>::Arguments> { + self.inner.take_arguments() + } + + #[inline] + fn persistent(&self) -> bool { + self.inner.persistent() + } +} + impl<'q, DB: Database, O> QueryAs<'q, DB, O, >::Arguments> { /// Bind a value for use with this SQL query. /// /// See [`Query::bind`](Query::bind). + #[cfg(not(target_arch = "wasm32"))] pub fn bind + Type>(mut self, value: T) -> Self { self.inner = self.inner.bind(value); self } + + #[cfg(target_arch = "wasm32")] + pub fn bind + Type>(mut self, value: T) -> Self { + self.inner = self.inner.bind(value); + self + } } impl<'q, DB, O, A> QueryAs<'q, DB, O, A> @@ -77,6 +117,7 @@ where // FIXME: This is very close, nearly 1:1 with `Map` // noinspection DuplicatedCode +#[cfg(not(target_arch = "wasm32"))] impl<'q, DB, O, A> QueryAs<'q, DB, O, A> where DB: Database, @@ -169,6 +210,99 @@ where } } +#[cfg(target_arch = "wasm32")] +impl<'q, DB, O, A> QueryAs<'q, DB, O, A> +where + DB: Database, + A: 'q + IntoArguments<'q, DB>, + O: Unpin + for<'r> FromRow<'r, DB::Row>, +{ + /// Execute the query and return the generated results as a stream. + pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + O: 'e, + A: 'e, + { + self.fetch_many(executor) + .try_filter_map(|step| async move { Ok(step.right()) }) + .boxed_local() + } + + /// Execute multiple queries and return the generated results as a stream + /// from each query, in a stream. + pub fn fetch_many<'e, 'c: 'e, E>( + self, + executor: E, + ) -> BoxStream<'e, Result, Error>> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + O: 'e, + A: 'e, + { + Box::pin(try_stream! { + let mut s = executor.fetch_many(self.inner); + + while let Some(v) = s.try_next().await? { + r#yield!(match v { + Either::Left(v) => Either::Left(v), + Either::Right(row) => Either::Right(O::from_row(&row)?), + }); + } + + Ok(()) + }) + } + + /// Execute the query and return all the generated results, collected into a [`Vec`]. + #[inline] + pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result, Error> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + O: 'e, + A: 'e, + { + self.fetch(executor).try_collect().await + } + + /// Execute the query and returns exactly one row. + pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + O: 'e, + A: 'e, + { + self.fetch_optional(executor) + .await + .and_then(|row| row.ok_or(Error::RowNotFound)) + } + + /// Execute the query and returns at most one row. + pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result, Error> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + O: 'e, + A: 'e, + { + let row = executor.fetch_optional(self.inner).await?; + if let Some(row) = row { + O::from_row(&row).map(Some) + } else { + Ok(None) + } + } +} + /// Make a SQL query that is mapped to a concrete type /// using [`FromRow`]. #[inline] diff --git a/sqlx-core/src/query_scalar.rs b/sqlx-core/src/query_scalar.rs index 19a78287bc..8898ccf654 100644 --- a/sqlx-core/src/query_scalar.rs +++ b/sqlx-core/src/query_scalar.rs @@ -1,5 +1,10 @@ use either::Either; + +#[cfg(not(target_arch = "wasm32"))] use futures_core::stream::BoxStream; +#[cfg(target_arch = "wasm32")] +use futures_core::stream::LocalBoxStream as BoxStream; + use futures_util::{StreamExt, TryFutureExt, TryStreamExt}; use crate::arguments::IntoArguments; @@ -20,6 +25,7 @@ pub struct QueryScalar<'q, DB: Database, O, A> { inner: QueryAs<'q, DB, (O,), A>, } +#[cfg(not(target_arch = "wasm32"))] impl<'q, DB: Database, O: Send, A: Send> Execute<'q, DB> for QueryScalar<'q, DB, O, A> where A: 'q + IntoArguments<'q, DB>, @@ -44,14 +50,46 @@ where } } +#[cfg(target_arch = "wasm32")] +impl<'q, DB: Database, O: Send, A> Execute<'q, DB> for QueryScalar<'q, DB, O, A> +where + A: 'q + IntoArguments<'q, DB>, +{ + #[inline] + fn sql(&self) -> &'q str { + self.inner.sql() + } + + fn statement(&self) -> Option<&>::Statement> { + self.inner.statement() + } + + #[inline] + fn take_arguments(&mut self) -> Option<>::Arguments> { + self.inner.take_arguments() + } + + #[inline] + fn persistent(&self) -> bool { + self.inner.persistent() + } +} + impl<'q, DB: Database, O> QueryScalar<'q, DB, O, >::Arguments> { /// Bind a value for use with this SQL query. /// /// See [`Query::bind`](crate::query::Query::bind). + #[cfg(not(target_arch = "wasm32"))] pub fn bind + Type>(mut self, value: T) -> Self { self.inner = self.inner.bind(value); self } + + #[cfg(target_arch = "wasm32")] + pub fn bind + Type>(mut self, value: T) -> Self { + self.inner = self.inner.bind(value); + self + } } impl<'q, DB, O, A> QueryScalar<'q, DB, O, A> @@ -74,6 +112,7 @@ where // FIXME: This is very close, nearly 1:1 with `Map` // noinspection DuplicatedCode +#[cfg(not(target_arch = "wasm32"))] impl<'q, DB, O, A> QueryScalar<'q, DB, O, A> where DB: Database, @@ -158,6 +197,91 @@ where } } +#[cfg(target_arch = "wasm32")] +impl<'q, DB, O, A> QueryScalar<'q, DB, O, A> +where + DB: Database, + O: Unpin, + A: 'q + IntoArguments<'q, DB>, + (O,): Unpin + for<'r> FromRow<'r, DB::Row>, +{ + /// Execute the query and return the generated results as a stream. + #[inline] + pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + A: 'e, + O: 'e, + { + self.inner.fetch(executor).map_ok(|it| it.0).boxed_local() + } + + /// Execute multiple queries and return the generated results as a stream + /// from each query, in a stream. + #[inline] + pub fn fetch_many<'e, 'c: 'e, E>( + self, + executor: E, + ) -> BoxStream<'e, Result, Error>> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + A: 'e, + O: 'e, + { + self.inner + .fetch_many(executor) + .map_ok(|v| v.map_right(|it| it.0)) + .boxed_local() + } + + /// Execute the query and return all the generated results, collected into a [`Vec`]. + #[inline] + pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result, Error> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + (O,): 'e, + A: 'e, + { + self.inner + .fetch(executor) + .map_ok(|it| it.0) + .try_collect() + .await + } + + /// Execute the query and returns exactly one row. + #[inline] + pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + O: 'e, + A: 'e, + { + self.inner.fetch_one(executor).map_ok(|it| it.0).await + } + + /// Execute the query and returns at most one row. + #[inline] + pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result, Error> + where + 'q: 'e, + E: 'e + Executor<'c, Database = DB>, + DB: 'e, + O: 'e, + A: 'e, + { + Ok(self.inner.fetch_optional(executor).await?.map(|it| it.0)) + } +} + /// Make a SQL query that is mapped to a single concrete type /// using [`FromRow`]. #[inline] diff --git a/sqlx-core/src/transaction.rs b/sqlx-core/src/transaction.rs index 76d5ac85ba..77b2a3570f 100644 --- a/sqlx-core/src/transaction.rs +++ b/sqlx-core/src/transaction.rs @@ -2,7 +2,10 @@ use std::borrow::Cow; use std::fmt::{self, Debug, Formatter}; use std::ops::{Deref, DerefMut}; +#[cfg(not(target_arch = "wasm32"))] use futures_core::future::BoxFuture; +#[cfg(target_arch = "wasm32")] +use futures_core::future::LocalBoxFuture as BoxFuture; use crate::database::Database; use crate::error::Error; @@ -95,6 +98,7 @@ where } // NOTE: required due to lack of lazy normalization +#[cfg(not(target_arch = "wasm32"))] #[allow(unused_macros)] macro_rules! impl_executor_for_transaction { ($DB:ident, $Row:ident) => { @@ -165,6 +169,77 @@ macro_rules! impl_executor_for_transaction { }; } +#[cfg(target_arch = "wasm32")] +#[allow(unused_macros)] +macro_rules! impl_executor_for_transaction { + ($DB:ident, $Row:ident) => { + impl<'c, 't> crate::executor::Executor<'t> + for &'t mut crate::transaction::Transaction<'c, $DB> + { + type Database = $DB; + + fn fetch_many<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> futures_core::stream::LocalBoxStream< + 'e, + Result< + either::Either<<$DB as crate::database::Database>::QueryResult, $Row>, + crate::error::Error, + >, + > + where + 't: 'e, + E: crate::executor::Execute<'q, Self::Database>, + { + (&mut **self).fetch_many(query) + } + + fn fetch_optional<'e, 'q: 'e, E: 'q>( + self, + query: E, + ) -> futures_core::future::LocalBoxFuture<'e, Result, crate::error::Error>> + where + 't: 'e, + E: crate::executor::Execute<'q, Self::Database>, + { + (&mut **self).fetch_optional(query) + } + + fn prepare_with<'e, 'q: 'e>( + self, + sql: &'q str, + parameters: &'e [::TypeInfo], + ) -> futures_core::future::LocalBoxFuture< + 'e, + Result< + >::Statement, + crate::error::Error, + >, + > + where + 't: 'e, + { + (&mut **self).prepare_with(sql, parameters) + } + + #[doc(hidden)] + fn describe<'e, 'q: 'e>( + self, + query: &'q str, + ) -> futures_core::future::LocalBoxFuture< + 'e, + Result, crate::error::Error>, + > + where + 't: 'e, + { + (&mut **self).describe(query) + } + } + }; +} + impl<'c, DB> Debug for Transaction<'c, DB> where DB: Database, diff --git a/sqlx-macros/src/lib.rs b/sqlx-macros/src/lib.rs index c1f173e655..8a4ea4a248 100644 --- a/sqlx-macros/src/lib.rs +++ b/sqlx-macros/src/lib.rs @@ -21,7 +21,7 @@ mod database; mod derives; mod query; -#[cfg(feature = "migrate")] +#[cfg(all(feature = "migrate", not(target_arch = "wasm32")))] mod migrate; #[proc_macro] @@ -78,7 +78,7 @@ pub fn derive_from_row(input: TokenStream) -> TokenStream { } } -#[cfg(feature = "migrate")] +#[cfg(all(feature = "migrate", not(target_arch = "wasm32")))] #[proc_macro] pub fn migrate(input: TokenStream) -> TokenStream { use syn::LitStr; diff --git a/sqlx-macros/src/query/mod.rs b/sqlx-macros/src/query/mod.rs index aa862bd743..58c5dc5f34 100644 --- a/sqlx-macros/src/query/mod.rs +++ b/sqlx-macros/src/query/mod.rs @@ -12,6 +12,8 @@ use quote::{format_ident, quote}; use sqlx_core::connection::Connection; use sqlx_core::database::Database; use sqlx_core::{column::Column, describe::Describe, type_info::TypeInfo}; + +#[cfg(not(target_arch = "wasm32"))] use sqlx_rt::block_on; use crate::database::DatabaseExt; @@ -19,6 +21,9 @@ use crate::query::data::QueryData; use crate::query::input::RecordType; use either::Either; +#[cfg(target_arch = "wasm32")] +use {futures::channel::oneshot, sqlx_rt::spawn}; + mod args; mod data; mod input; @@ -164,7 +169,7 @@ fn expand_from_db(input: QueryMacroInput, db_url: &str) -> crate::Result { let data = block_on(async { let mut conn = sqlx_core::postgres::PgConnection::connect(db_url.as_str()).await?; @@ -174,6 +179,25 @@ fn expand_from_db(input: QueryMacroInput, db_url: &str) -> crate::Result { + let (tx, mut rx) = oneshot::channel(); + let src = input.src.clone(); + spawn(async move { + let mut conn = match sqlx_core::postgres::PgConnection::connect(db_url.as_str()).await { + Ok(conn) => conn, + _ => return + }; + let _ = tx.send(QueryData::from_db(&mut conn, &src).await); + }); + + if let Some(Ok(data)) = rx.try_recv()? { + expand_with_data(input, data, false) + } else { + Err("unable to connect to database".into()) + } + }, + #[cfg(not(feature = "postgres"))] "postgres" | "postgresql" => Err("database URL has the scheme of a PostgreSQL database but the `postgres` feature is not enabled".into()), diff --git a/sqlx-rt/Cargo.toml b/sqlx-rt/Cargo.toml index 7a42ed3107..5d9d8d523a 100644 --- a/sqlx-rt/Cargo.toml +++ b/sqlx-rt/Cargo.toml @@ -40,6 +40,15 @@ tokio-rustls = { version = "0.22.0", optional = true } native-tls = { version = "0.2.4", optional = true } once_cell = { version = "1.4", features = ["std"], optional = true } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm-bindgen = { version = "0.2.71" } +wasm-bindgen-futures = { version = "^0.3", features = ["futures_0_3"] } +futures-util = { version = "0.3.5", features = ["sink", "io"] } +ws_stream_wasm = { version = "0.7" } +async_io_stream = { version = "0.3.1" } +web-sys = { version = "*" } + [dependencies.tokio] version = "1.0.1" features = ["fs", "net", "rt", "rt-multi-thread", "time", "io-util"] diff --git a/sqlx-rt/src/lib.rs b/sqlx-rt/src/lib.rs index 39c50855ec..ed28d79c73 100644 --- a/sqlx-rt/src/lib.rs +++ b/sqlx-rt/src/lib.rs @@ -138,6 +138,7 @@ macro_rules! blocking { #[cfg(all( feature = "_rt-async-std", not(any(feature = "_rt-actix", feature = "_rt-tokio")), + not(target_arch = "wasm32") ))] pub use async_std::{ self, fs, future::timeout, io::prelude::ReadExt as AsyncReadExt, @@ -194,3 +195,19 @@ pub use async_native_tls::{TlsConnector, TlsStream}; )), ))] pub use async_rustls::{client::TlsStream, TlsConnector}; + +// +// wasm-bindgen +// +#[cfg(target_arch = "wasm32")] +pub use { + async_io_stream::IoStream, + futures_util::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + pin_mut, + sink::Sink, + }, + wasm_bindgen_futures::futures_0_3::spawn_local as spawn, + web_sys::console, + ws_stream_wasm::{WsMeta, WsStream, WsStreamIo}, +}; diff --git a/sqlx-test/src/lib.rs b/sqlx-test/src/lib.rs index 052e157271..da70155e5f 100644 --- a/sqlx-test/src/lib.rs +++ b/sqlx-test/src/lib.rs @@ -222,3 +222,125 @@ macro_rules! Postgres_query_for_test_prepared_type { "SELECT ({0} is not distinct from $1)::int4, {0}, $2" }; } + +#[macro_export] +macro_rules! time_delete_query { + ($n:expr, $count:literal) => { + let mut conn = new::().await.unwrap(); + + conn.execute("create temp table bench_deletes (id integer, descr text, primary key(id))") + .await; + + conn.execute("create bitmap index id_idx on bench_deletes (id)") + .await; + + let _ = sqlx::query(&format!( + "insert into bench_deletes (id, descr) select generate_series(1,{}) AS id, md5(random()::text) AS descr", + $count + )) + + .execute(&mut conn) + .await; + + let start = Instant::now(); + for _ in 0..3u8 { + for i in 1..$count { + let _ = sqlx::query(&format!( + "delete from bench_deletes where id = {}", + i + )) + .execute(&mut conn) + .await; + } + } + + let end = Instant::now(); + + println!("{}: Avg time is {}", $n, end.duration_since(start).as_millis() / 3u128); + }; +} + +#[macro_export] +macro_rules! time_update_query { + ($n:expr, $count:literal) => { + let mut conn = new::().await.unwrap(); + + conn.execute("create temp table bench_updates (id integer, descr text, primary key(id))") + .await; + + conn.execute("create bitmap index id_idx on bench_updates (id)") + .await; + + let _ = sqlx::query(&format!( + "insert into bench_updates (id, descr) select generate_series(1,{}) AS id, md5(random()::text) AS descr", + $count + )) + .execute(&mut conn) + .await; + + let start = Instant::now(); + for _ in 0..3u8 { + for i in 1..$count { + let _ = sqlx::query(&format!( + "update bench_updates set descr = md5(random()::text) where id = {}", + i + )) + .execute(&mut conn) + .await; + } + } + + let end = Instant::now(); + println!("{}: Avg time is {}", $n, end.duration_since(start).as_millis() / 3u128); + }; +} + +#[macro_export] +macro_rules! time_insert_query { + ($n:expr, $count:literal) => { + let mut conn = new::().await.unwrap(); + conn.execute("create temp table bench_inserts (id integer, descr text)") + .await; + + let start = Instant::now(); + + for _ in 0..3u8 { + for i in 0..$count { + let _ = sqlx::query(&format!( + "insert into bench_inserts (id, desc) values ({}, md5(random()::text))", + i + )) + .execute(&mut conn) + .await; + } + } + + let end = Instant::now(); + println!( + "{}: Avg time is {}", + $n, + end.duration_since(start).as_millis() / 3u128 + ); + }; +} + +#[macro_export] +macro_rules! time_query { + ($n:expr, $q:expr) => { + let mut conn = new::().await.unwrap(); + + let start = Instant::now(); + + for _ in 0..3u8 { + let _ = sqlx::query($q).fetch_all(&mut conn).await; + } + + let end = Instant::now(); + + println!( + "{}: Avg time is {}", + $n, + end.duration_since(start).as_millis() / 3u128 + ); + }; +} diff --git a/sqlx-wasm-test/.cargo/config b/sqlx-wasm-test/.cargo/config new file mode 100644 index 0000000000..4ec2f3b862 --- /dev/null +++ b/sqlx-wasm-test/.cargo/config @@ -0,0 +1,2 @@ +[target.wasm32-unknown-unknown] +runner = 'wasm-bindgen-test-runner' diff --git a/sqlx-wasm-test/.gitignore b/sqlx-wasm-test/.gitignore new file mode 100644 index 0000000000..96ef6c0b94 --- /dev/null +++ b/sqlx-wasm-test/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/sqlx-wasm-test/Cargo.toml b/sqlx-wasm-test/Cargo.toml new file mode 100644 index 0000000000..8697ae7d69 --- /dev/null +++ b/sqlx-wasm-test/Cargo.toml @@ -0,0 +1,103 @@ +[package] +name = "sqlx-wasm-test" +version = "0.1.0" +authors = ["abhi"] +edition = "2018" + +[lib] +crate-type = ["cdylib", "rlib"] +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +sqlx = { path = "..", features = ["postgres", "decimal", "bigdecimal", "time", "chrono", "bit-vec", "ipnetwork", "uuid", "json"] } +wasm-bindgen-futures = { version = "^0.3", features = ["futures_0_3"] } +wasm-bindgen = { version = "0.2.73" } +ws_stream_wasm = "0.7" +wasm-bindgen-test = "0.3.0" +instant = "0.1.9" +web-sys = { version = "0.3.50", features = ["console", "Performance"] } +futures = "0.3.14" +serde = "1.0.117" +serde_json = { version = "1.0.51", features = ["raw_value"] } +paste = "1.0.1" +time = { version = "0.2.26" } + +[[test]] +name = "selects" +path = "src/selects_bench.rs" + +[[test]] +name = "inserts" +path = "src/inserts_bench.rs" + +[[test]] +name = "updates" +path = "src/updates_bench.rs" + +[[test]] +name = "deletes" +path = "src/deletes_bench.rs" + +[[test]] +name = "pg_wasm_tests" +path = "src/pg_wasm_tests.rs" + +[[test]] +name = "pg_types_tests" +path = "src/pg_types_tests.rs" + +[[test]] +name = "pg_types_tests_2" +path = "src/pg_types_tests_2.rs" + +[[test]] +name = "pg_types_tests_3" +path = "src/pg_types_tests_3.rs" + +[[test]] +name = "pg_types_tests_4" +path = "src/pg_types_tests_4.rs" + +[[test]] +name = "pg_types_tests_uuid" +path = "src/pg_types_tests_uuid.rs" + +[[test]] +name = "pg_types_tests_ipnetwork" +path = "src/pg_types_tests_ipnetwork.rs" + +[[test]] +name = "pg_types_tests_bitvec" +path = "src/pg_types_tests_bitvec.rs" + +[[test]] +name = "pg_types_tests_chrono" +path = "src/pg_types_tests_chrono.rs" + +[[test]] +name = "pg_types_tests_time" +path = "src/pg_types_tests_time.rs" + +[[test]] +name = "pg_types_tests_json" +path = "src/pg_types_tests_json.rs" + +[[test]] +name = "pg_types_tests_bigdecimal" +path = "src/pg_types_tests_bigdecimal.rs" + +[[test]] +name = "pg_types_tests_decimal" +path = "src/pg_types_tests_decimal.rs" + +[[test]] +name = "pg_types_tests_money" +path = "src/pg_types_tests_money.rs" + +[[test]] +name = "pg_types_tests_range" +path = "src/pg_types_tests_range.rs" + +[[test]] +name = "pg_types_tests_interval" +path = "src/pg_types_tests_interval.rs" diff --git a/sqlx-wasm-test/README.md b/sqlx-wasm-test/README.md new file mode 100644 index 0000000000..cbee8d1229 --- /dev/null +++ b/sqlx-wasm-test/README.md @@ -0,0 +1,13 @@ +# Setup +1. Make sure postgres is installed and listening at port 5432. +2. Start a websocket-tcp proxy using [websocat](https://github.com/vi/websocat) + `$ websocat --binary ws-l:127.0.0.1:8080 tcp:127.0.0.1:5432` + +# Running +From the root folder of this crate: +1. `wasm-pack test --firefox -- --test` +2. Launch Firefox and navigate to [http://127.0.0.1:8000](http://127.0.0.1:8000) + +Corresponding native queries' benchmarking is done in `../tests/postgres/*_custom_bench.rs` files and they can be run by executing (insert benchmarking for example) - +`$ cargo test --no-default-features --features postgres,runtime-async-std-rustls --test pg-inserts-bench -- --test-threads=1 --nocapture` +from the root of this repo. diff --git a/sqlx-wasm-test/setup.sql b/sqlx-wasm-test/setup.sql new file mode 100644 index 0000000000..3d738e449b --- /dev/null +++ b/sqlx-wasm-test/setup.sql @@ -0,0 +1,3 @@ +CREATE DOMAIN month_id AS INT2 CHECK (1 <= value AND value <= 12); +CREATE TYPE year_month AS (year INT4, month month_id); +CREATE DOMAIN winter_year_month AS year_month CHECK ((value).month <= 3); diff --git a/sqlx-wasm-test/src/deletes_bench.rs b/sqlx-wasm-test/src/deletes_bench.rs new file mode 100644 index 0000000000..90aec3b9db --- /dev/null +++ b/sqlx-wasm-test/src/deletes_bench.rs @@ -0,0 +1,18 @@ +use sqlx::Executor; +use sqlx_wasm_test::time_delete_query; +use wasm_bindgen_test::*; + +#[wasm_bindgen_test] +async fn deletes_query_small() { + time_delete_query!("small", 100u32); +} + +#[wasm_bindgen_test] +async fn deletes_query_medium() { + time_delete_query!("medium", 1000u32); +} + +#[wasm_bindgen_test] +async fn deletes_query_large() { + time_delete_query!("large", 10000u32); +} diff --git a/sqlx-wasm-test/src/inserts_bench.rs b/sqlx-wasm-test/src/inserts_bench.rs new file mode 100644 index 0000000000..173880206d --- /dev/null +++ b/sqlx-wasm-test/src/inserts_bench.rs @@ -0,0 +1,18 @@ +use sqlx::Executor; +use sqlx_wasm_test::time_insert_query; +use wasm_bindgen_test::*; + +#[wasm_bindgen_test] +async fn insert_query_small() { + time_insert_query!("small", 100u32); +} + +#[wasm_bindgen_test] +async fn insert_query_medium() { + time_insert_query!("medium", 1000u32); +} + +#[wasm_bindgen_test] +async fn insert_query_large() { + time_insert_query!("large", 10000u32); +} diff --git a/sqlx-wasm-test/src/lib.rs b/sqlx-wasm-test/src/lib.rs new file mode 100644 index 0000000000..c06c0ad15c --- /dev/null +++ b/sqlx-wasm-test/src/lib.rs @@ -0,0 +1,286 @@ +#![feature(test)] + +extern crate test; + +wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + +use sqlx::Connection; +use sqlx::{Database, PgConnection, Postgres}; + +pub const URL: &str = "postgresql://paul:pass123@127.0.0.1:8080/jetasap_dev"; + +pub async fn new() -> PgConnection { + ::Connection::connect(URL) + .await + .unwrap() +} + +#[macro_export] +macro_rules! time_query { + ($n:expr, $q:expr) => { + let mut conn = sqlx_wasm_test::new().await; + + let performance = web_sys::window().unwrap().performance().unwrap(); + let start = performance.now(); + + for _ in 0..3u8 { + let _ = sqlx::query($q).fetch_all(&mut conn).await; + } + + let end = performance.now(); + web_sys::console::log_1(&format!("{}: Avg time is {}", $n, (end - start) / 3f64).into()); + }; +} + +#[macro_export] +macro_rules! time_insert_query { + ($n:expr, $count:literal) => { + let mut conn = sqlx_wasm_test::new().await; + let _ = conn + .execute("create temp table bench_inserts (id integer, descr text)") + .await; + + let performance = web_sys::window().unwrap().performance().unwrap(); + let start = performance.now(); + + for _ in 0..3u8 { + for i in 0..$count { + let _ = sqlx::query(&format!( + "insert into bench_inserts (id, desc) values ({}, md5(random()::text))", + i + )) + .execute(&mut conn) + .await; + } + } + + let end = performance.now(); + web_sys::console::log_1(&format!("{}: Avg time is {}", $n, (end - start) / 3f64).into()); + }; +} + +#[macro_export] +macro_rules! time_update_query { + ($n:expr, $count:literal) => { + let mut conn = sqlx_wasm_test::new().await; + let _ = conn.execute("create temp table bench_updates (id integer, descr text, primary key(id))") + .await; + let _ = conn.execute("create bitmap index id_idx on bench_updates (id)") + .await; + + let _ = sqlx::query(&format!( + "insert into bench_updates (id, descr) select generate_series(1,{}) AS id, md5(random()::text) AS descr", + $count + )) + .execute(&mut conn) + .await; + + let performance = web_sys::window().unwrap().performance().unwrap(); + let start = performance.now(); + + for _ in 0..3u8 { + for i in 1..$count { + let _ = sqlx::query(&format!( + "update bench_updates set descr = md5(random()::text) where id = {}", + i + )) + .execute(&mut conn) + .await; + } + } + + let end = performance.now(); + web_sys::console::log_1(&format!("{}: Avg time is {}", $n, (end - start) / 3f64).into()); + }; +} + +#[macro_export] +macro_rules! time_delete_query { + ($n:expr, $count:literal) => { + let mut conn = sqlx_wasm_test::new().await; + let _ = conn.execute("create temp table bench_deletes (id integer, descr text, primary key(id))") + .await; + + let _ = conn.execute("create bitmap index id_idx on bench_deletes (id)") + .await; + + let _ = sqlx::query(&format!( + "insert into bench_deletes (id, descr) select generate_series(1,{}) AS id, md5(random()::text) AS descr", + $count + )) + + .execute(&mut conn) + .await; + let performance = web_sys::window().unwrap().performance().unwrap(); + let start = performance.now(); + + for _ in 0..3u8 { + for i in 1..$count { + let _ = sqlx::query(&format!( + "delete from bench_deletes where id = {}", + i + )) + .execute(&mut conn) + .await; + } + } + + let end = performance.now(); + web_sys::console::log_1(&format!("{}: Avg time is {}", $n, (end - start) / 3f64).into()); + }; +} + +#[macro_export] +macro_rules! test_type { + ($name:ident<$ty:ty>($db:ident, $sql:literal, $($text:literal == $value:expr),+ $(,)?)) => { + $crate::__test_prepared_type!($name<$ty>($db, $sql, $($text == $value),+)); + $crate::test_unprepared_type!($name<$ty>($db, $($text == $value),+)); + }; + + ($name:ident<$ty:ty>($db:ident, $($text:literal == $value:expr),+ $(,)?)) => { + paste::item! { + $crate::__test_prepared_type!($name<$ty>($db, $crate::[< $db _query_for_test_prepared_type >]!(), $($text == $value),+)); + $crate::test_unprepared_type!($name<$ty>($db, $($text == $value),+)); + } + }; + + ($name:ident($db:ident, $($text:literal == $value:expr),+ $(,)?)) => { + $crate::test_type!($name<$name>($db, $($text == $value),+)); + }; +} + +#[macro_export] +macro_rules! test_decode_type { + ($name:ident<$ty:ty>($db:ident, $($text:literal == $value:expr),+ $(,)?)) => { + $crate::__test_prepared_decode_type!($name<$ty>($db, $($text == $value),+)); + $crate::test_unprepared_type!($name<$ty>($db, $($text == $value),+)); + }; + + ($name:ident($db:ident, $($text:literal == $value:expr),+ $(,)?)) => { + $crate::test_decode_type!($name<$name>($db, $($text == $value),+)); + }; +} + +#[macro_export] +macro_rules! test_unprepared_type { + ($name:ident<$ty:ty>($db:ident, $($text:literal == $value:expr),+ $(,)?)) => { + paste::item! { + #[wasm_bindgen_test::wasm_bindgen_test] + async fn [< test_unprepared_type_ $name >] () { + use sqlx::prelude::*; + use futures::TryStreamExt; + + let mut conn = sqlx_wasm_test::new().await; + + $( + let query = format!("SELECT {}", $text); + let mut s = conn.fetch(&*query); + let row = s.try_next().await.unwrap().unwrap(); + let rec = row.try_get::<$ty, _>(0).unwrap(); + + assert!($value == rec); + + drop(s); + )+ + } + } + } +} + +#[macro_export] +macro_rules! __test_prepared_type { + ($name:ident<$ty:ty>($db:ident, $sql:expr, $($text:literal == $value:expr),+ $(,)?)) => { + paste::item! { + #[wasm_bindgen_test::wasm_bindgen_test] + async fn [< test_prepared_type_ $name >] () { + use sqlx::Row; + + let mut conn = sqlx_wasm_test::new().await; + + $( + let query = format!($sql, $text); + + let row = sqlx::query(&query) + .bind($value) + .bind($value) + .fetch_one(&mut conn) + .await.unwrap(); + + let matches: i32 = row.try_get(0).unwrap(); + let returned: $ty = row.try_get(1).unwrap(); + let round_trip: $ty = row.try_get(2).unwrap(); + + assert!(matches != 0, + "[1] DB value mismatch; given value: {:?}\n\ + as returned: {:?}\n\ + round-trip: {:?}", + $value, returned, round_trip); + + assert_eq!($value, returned, + "[2] DB value mismatch; given value: {:?}\n\ + as returned: {:?}\n\ + round-trip: {:?}", + $value, returned, round_trip); + + assert_eq!($value, round_trip, + "[3] DB value mismatch; given value: {:?}\n\ + as returned: {:?}\n\ + round-trip: {:?}", + $value, returned, round_trip); + )+ + } + } + }; +} + +// Test type encoding and decoding +#[macro_export] +macro_rules! test_prepared_type { + ($name:ident<$ty:ty>($db:ident, $sql:literal, $($text:literal == $value:expr),+ $(,)?)) => { + $crate::__test_prepared_type!($name<$ty>($db, $sql, $($text == $value),+)); + }; + + ($name:ident<$ty:ty>($db:ident, $($text:literal == $value:expr),+ $(,)?)) => { + paste::item! { + $crate::__test_prepared_type!($name<$ty>($db, $crate::[< $db _query_for_test_prepared_type >]!(), $($text == $value),+)); + } + }; + + ($name:ident($db:ident, $($text:literal == $value:expr),+ $(,)?)) => { + $crate::__test_prepared_type!($name<$name>($db, $($text == $value),+)); + }; +} + +// Test type decoding only for the prepared query API +#[macro_export] +macro_rules! __test_prepared_decode_type { + ($name:ident<$ty:ty>($db:ident, $($text:literal == $value:expr),+ $(,)?)) => { + paste::item! { + #[wasm_bindgen_test::wasm_bindgen_test] + async fn [< test_prepared_decode_type_ $name >] () { + use sqlx::Row; + + let mut conn = sqlx_wasm_test::new().await; + + $( + let query = format!("SELECT {}", $text); + + let row = sqlx::query(&query) + .fetch_one(&mut conn) + .await.unwrap(); + + let rec: $ty = row.try_get(0).unwrap(); + + assert!($value == rec); + )+ + } + } + }; +} + +#[macro_export] +macro_rules! Postgres_query_for_test_prepared_type { + () => { + "SELECT ({0} is not distinct from $1)::int4, {0}, $2" + }; +} diff --git a/sqlx-wasm-test/src/pg_types_tests.rs b/sqlx-wasm-test/src/pg_types_tests.rs new file mode 100644 index 0000000000..c7cf06014f --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests.rs @@ -0,0 +1,41 @@ +use sqlx_wasm_test::test_type; + +test_type!(i8( + Postgres, + "0::\"char\"" == 0_i8, + "120::\"char\"" == 120_i8, +)); + +test_type!(u32(Postgres, "325235::oid" == 325235_u32,)); + +test_type!(i16( + Postgres, + "-2144::smallint" == -2144_i16, + "821::smallint" == 821_i16, +)); + +test_type!(i32( + Postgres, + "94101::int" == 94101_i32, + "-5101::int" == -5101_i32 +)); + +test_type!(i32_vec>(Postgres, + "'{5,10,50,100}'::int[]" == vec![5_i32, 10, 50, 100], + "'{1050}'::int[]" == vec![1050_i32], + "'{}'::int[]" == Vec::::new(), + "'{1,3,-5}'::int[]" == vec![1_i32, 3, -5] +)); + +test_type!(i64(Postgres, "9358295312::bigint" == 9358295312_i64)); + +test_type!(f32(Postgres, "9419.122::real" == 9419.122_f32)); + +test_type!(f64( + Postgres, + "939399419.1225182::double precision" == 939399419.1225182_f64 +)); + +test_type!(f64_vec>(Postgres, + "'{939399419.1225182,-12.0}'::float8[]" == vec![939399419.1225182_f64, -12.0] +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_2.rs b/sqlx-wasm-test/src/pg_types_tests_2.rs new file mode 100644 index 0000000000..04899da96d --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_2.rs @@ -0,0 +1,32 @@ +use sqlx_wasm_test::{test_prepared_type, test_type}; + +// BYTEA cannot be decoded by-reference from a simple query as postgres sends it as hex +test_prepared_type!(byte_slice<&[u8]>(Postgres, + "E'\\\\xDEADBEEF'::bytea" + == &[0xDE_u8, 0xAD, 0xBE, 0xEF][..], + "E'\\\\x0000000052'::bytea" + == &[0_u8, 0, 0, 0, 0x52][..] +)); + +test_type!(str<&str>(Postgres, + "'this is foo'" == "this is foo", + "''" == "", + "'identifier'::name" == "identifier", + "'five'::char(4)" == "five", + "'more text'::varchar" == "more text", +)); + +test_type!(string(Postgres, + "'this is foo'" == format!("this is foo"), +)); + +test_type!(string_vec>(Postgres, + "array['one','two','three']::text[]" + == vec!["one","two","three"], + + "array['', '\"']::text[]" + == vec!["", "\""], + + "array['Hello, World', '', 'Goodbye']::text[]" + == vec!["Hello, World", "", "Goodbye"] +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_3.rs b/sqlx-wasm-test/src/pg_types_tests_3.rs new file mode 100644 index 0000000000..b1f87a8c6c --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_3.rs @@ -0,0 +1,27 @@ +use sqlx_wasm_test::test_type; + +test_type!(null>(Postgres, + "NULL::int2" == None:: +)); + +test_type!(null_vec>>(Postgres, + "array[10,NULL,50]::int2[]" == vec![Some(10_i16), None, Some(50)], +)); + +test_type!(bool(Postgres, + "false::boolean" == false, + "true::boolean" == true +)); + +test_type!(bool_vec>(Postgres, + "array[true,false,true]::bool[]" == vec![true, false, true], +)); + +test_type!(byte_vec>(Postgres, + "E'\\\\xDEADBEEF'::bytea" + == vec![0xDE_u8, 0xAD, 0xBE, 0xEF], + "E'\\\\x'::bytea" + == Vec::::new(), + "E'\\\\x0000000052'::bytea" + == vec![0_u8, 0, 0, 0, 0x52] +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_4.rs b/sqlx-wasm-test/src/pg_types_tests_4.rs new file mode 100644 index 0000000000..5525ab8ed7 --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_4.rs @@ -0,0 +1,18 @@ +use sqlx_wasm_test::test_decode_type; + +test_decode_type!(bool_tuple<(bool,)>(Postgres, "row(true)" == (true,))); + +test_decode_type!(num_tuple<(i32, i64, f64,)>(Postgres, "row(10,515::int8,3.124::float8)" == (10,515,3.124))); + +test_decode_type!(empty_tuple<()>(Postgres, "row()" == ())); + +test_decode_type!(string_tuple<(String, String, String)>(Postgres, + "row('one','two','three')" + == ("one".to_string(), "two".to_string(), "three".to_string()), + + "row('', '\"', '\"\"\"\"\"\"')" + == ("".to_string(), "\"".to_string(), "\"\"\"\"\"\"".to_string()), + + "row('Hello, World', '', 'Goodbye')" + == ("Hello, World".to_string(), "".to_string(), "Goodbye".to_string()) +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_bigdecimal.rs b/sqlx-wasm-test/src/pg_types_tests_bigdecimal.rs new file mode 100644 index 0000000000..e562733b9c --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_bigdecimal.rs @@ -0,0 +1,22 @@ +use sqlx_wasm_test::test_type; + +test_type!(bigdecimal(Postgres, + + // https://github.com/launchbadge/sqlx/issues/283 + "0::numeric" == "0".parse::().unwrap(), + + "1::numeric" == "1".parse::().unwrap(), + "10000::numeric" == "10000".parse::().unwrap(), + "0.1::numeric" == "0.1".parse::().unwrap(), + "0.01::numeric" == "0.01".parse::().unwrap(), + "0.012::numeric" == "0.012".parse::().unwrap(), + "0.0123::numeric" == "0.0123".parse::().unwrap(), + "0.01234::numeric" == "0.01234".parse::().unwrap(), + "0.012345::numeric" == "0.012345".parse::().unwrap(), + "0.0123456::numeric" == "0.0123456".parse::().unwrap(), + "0.01234567::numeric" == "0.01234567".parse::().unwrap(), + "0.012345678::numeric" == "0.012345678".parse::().unwrap(), + "0.0123456789::numeric" == "0.0123456789".parse::().unwrap(), + "12.34::numeric" == "12.34".parse::().unwrap(), + "12345.6789::numeric" == "12345.6789".parse::().unwrap(), +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_bitvec.rs b/sqlx-wasm-test/src/pg_types_tests_bitvec.rs new file mode 100644 index 0000000000..8431b0ce9c --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_bitvec.rs @@ -0,0 +1,25 @@ +use sqlx_wasm_test::test_type; + +test_type!(bitvec( + Postgres, + // A full byte VARBIT + "B'01101001'" == sqlx::types::BitVec::from_bytes(&[0b0110_1001]), + // A VARBIT value missing five bits from a byte + "B'110'" == { + let mut bit_vec = sqlx::types::BitVec::with_capacity(4); + bit_vec.push(true); + bit_vec.push(true); + bit_vec.push(false); + bit_vec + }, + // A BIT value + "B'01101'::bit(5)" == { + let mut bit_vec = sqlx::types::BitVec::with_capacity(5); + bit_vec.push(false); + bit_vec.push(true); + bit_vec.push(true); + bit_vec.push(false); + bit_vec.push(true); + bit_vec + }, +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_chrono.rs b/sqlx-wasm-test/src/pg_types_tests_chrono.rs new file mode 100644 index 0000000000..81f18b6cb8 --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_chrono.rs @@ -0,0 +1,54 @@ +use sqlx::types::chrono::{ + DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc, +}; +use sqlx_wasm_test::test_type; + +type PgTimeTz = sqlx::postgres::types::PgTimeTz; + +test_type!(chrono_date(Postgres, + "DATE '2001-01-05'" == NaiveDate::from_ymd(2001, 1, 5), + "DATE '2050-11-23'" == NaiveDate::from_ymd(2050, 11, 23) +)); + +test_type!(chrono_time(Postgres, + "TIME '05:10:20.115100'" == NaiveTime::from_hms_micro(5, 10, 20, 115100) +)); + +test_type!(chrono_date_time(Postgres, + "'2019-01-02 05:10:20'::timestamp" == NaiveDate::from_ymd(2019, 1, 2).and_hms(5, 10, 20) +)); + +test_type!(chrono_date_time_vec>(Postgres, + "array['2019-01-02 05:10:20']::timestamp[]" + == vec![NaiveDate::from_ymd(2019, 1, 2).and_hms(5, 10, 20)] +)); + +test_type!(chrono_date_time_tz_utc>(Postgres, + "TIMESTAMPTZ '2019-01-02 05:10:20.115100'" + == DateTime::::from_utc( + NaiveDate::from_ymd(2019, 1, 2).and_hms_micro(5, 10, 20, 115100), + Utc, + ) +)); + +test_type!(chrono_date_time_tz>(Postgres, + "TIMESTAMPTZ '2019-01-02 05:10:20.115100+06:30'" + == FixedOffset::east(60 * 60 * 6 + 1800).ymd(2019, 1, 2).and_hms_micro(5, 10, 20, 115100) +)); + +test_type!(chrono_date_time_tz_vec>>(Postgres, + "array['2019-01-02 05:10:20.115100']::timestamptz[]" + == vec![ + DateTime::::from_utc( + NaiveDate::from_ymd(2019, 1, 2).and_hms_micro(5, 10, 20, 115100), + Utc, + ) + ] +)); + +test_type!(chrono_time_tz(Postgres, + "TIMETZ '05:10:20.115100+00'" == PgTimeTz { time: NaiveTime::from_hms_micro(5, 10, 20, 115100), offset: FixedOffset::east(0) }, + "TIMETZ '05:10:20.115100+06:30'" == PgTimeTz { time: NaiveTime::from_hms_micro(5, 10, 20, 115100), offset: FixedOffset::east(60 * 60 * 6 + 1800) }, + "TIMETZ '05:10:20.115100-05'" == PgTimeTz { time: NaiveTime::from_hms_micro(5, 10, 20, 115100), offset: FixedOffset::west(60 * 60 * 5) }, + "TIMETZ '05:10:20+02'" == PgTimeTz { time: NaiveTime::from_hms(5, 10, 20), offset: FixedOffset::east(60 * 60 * 2 )} +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_decimal.rs b/sqlx-wasm-test/src/pg_types_tests_decimal.rs new file mode 100644 index 0000000000..6534b94753 --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_decimal.rs @@ -0,0 +1,13 @@ +use sqlx_wasm_test::test_type; + +use std::str::FromStr; + +test_type!(decimal(Postgres, + "0::numeric" == sqlx::types::Decimal::from_str("0").unwrap(), + "1::numeric" == sqlx::types::Decimal::from_str("1").unwrap(), + "10000::numeric" == sqlx::types::Decimal::from_str("10000").unwrap(), + "0.1::numeric" == sqlx::types::Decimal::from_str("0.1").unwrap(), + "0.01234::numeric" == sqlx::types::Decimal::from_str("0.01234").unwrap(), + "12.34::numeric" == sqlx::types::Decimal::from_str("12.34").unwrap(), + "12345.6789::numeric" == sqlx::types::Decimal::from_str("12345.6789").unwrap(), +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_interval.rs b/sqlx-wasm-test/src/pg_types_tests_interval.rs new file mode 100644 index 0000000000..fa2c4b11f2 --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_interval.rs @@ -0,0 +1,30 @@ +use sqlx::postgres::types::PgInterval; +use sqlx_wasm_test::test_prepared_type; + +test_prepared_type!(interval( + Postgres, + "INTERVAL '1h'" + == PgInterval { + months: 0, + days: 0, + microseconds: 3_600_000_000 + }, + "INTERVAL '-1 hours'" + == PgInterval { + months: 0, + days: 0, + microseconds: -3_600_000_000 + }, + "INTERVAL '3 months 12 days 1h 15 minutes 10 second '" + == PgInterval { + months: 3, + days: 12, + microseconds: (3_600 + 15 * 60 + 10) * 1_000_000 + }, + "INTERVAL '03:10:20.116100'" + == PgInterval { + months: 0, + days: 0, + microseconds: (3 * 3_600 + 10 * 60 + 20) * 1_000_000 + 116100 + }, +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_ipnetwork.rs b/sqlx-wasm-test/src/pg_types_tests_ipnetwork.rs new file mode 100644 index 0000000000..71e18bff8b --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_ipnetwork.rs @@ -0,0 +1,52 @@ +use sqlx_wasm_test::test_unprepared_type; + +macro_rules! test_type { + ($name:ident<$ty:ty>($db:ident, $sql:literal, $($text:literal == $value:expr),+ $(,)?)) => { + $crate::test_unprepared_type!($name<$ty>($db, $($text == $value),+)); + }; + + ($name:ident<$ty:ty>($db:ident, $($text:literal == $value:expr),+ $(,)?)) => { + paste::item! { + $crate::test_unprepared_type!($name<$ty>($db, $($text == $value),+)); + } + }; + + ($name:ident($db:ident, $($text:literal == $value:expr),+ $(,)?)) => { + $crate::test_type!($name<$name>($db, $($text == $value),+)); + }; +} + +test_type!(ipnetwork(Postgres, + "'127.0.0.1'::inet" + == "127.0.0.1" + .parse::() + .unwrap(), + "'8.8.8.8/24'::inet" + == "8.8.8.8/24" + .parse::() + .unwrap(), + "'::ffff:1.2.3.0'::inet" + == "::ffff:1.2.3.0" + .parse::() + .unwrap(), + "'2001:4f8:3:ba::/64'::inet" + == "2001:4f8:3:ba::/64" + .parse::() + .unwrap(), + "'192.168'::cidr" + == "192.168.0.0/24" + .parse::() + .unwrap(), + "'::ffff:1.2.3.0/120'::cidr" + == "::ffff:1.2.3.0/120" + .parse::() + .unwrap(), +)); + +test_type!(ipnetwork_vec>(Postgres, + "'{127.0.0.1,8.8.8.8/24}'::inet[]" + == vec![ + "127.0.0.1".parse::().unwrap(), + "8.8.8.8/24".parse::().unwrap() + ] +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_json.rs b/sqlx-wasm-test/src/pg_types_tests_json.rs new file mode 100644 index 0000000000..8ff6c26231 --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_json.rs @@ -0,0 +1,80 @@ +use serde_json::value::RawValue as JsonRawValue; +use serde_json::{json, Value as JsonValue}; +use sqlx::postgres::PgRow; +use sqlx::types::Json; +use sqlx::{Executor, Row}; +use sqlx_wasm_test::test_type; + +// When testing JSON, coerce to JSONB for `=` comparison as `JSON = JSON` is not +// supported in PostgreSQL + +test_type!(json( + Postgres, + "SELECT ({0}::jsonb is not distinct from $1::jsonb)::int4, {0} as _2, $2 as _3", + "'\"Hello, World\"'::json" == json!("Hello, World"), + "'\"😎\"'::json" == json!("😎"), + "'\"🙋‍♀️\"'::json" == json!("🙋‍♀️"), + "'[\"Hello\", \"World!\"]'::json" == json!(["Hello", "World!"]) +)); + +test_type!(json_array>( + Postgres, + "SELECT ({0}::jsonb[] is not distinct from $1::jsonb[])::int4, {0} as _2, $2 as _3", + "array['\"😎\"'::json, '\"🙋‍♀️\"'::json]::json[]" == vec![json!("😎"), json!("🙋‍♀️")], +)); + +test_type!(jsonb( + Postgres, + "'\"Hello, World\"'::jsonb" == json!("Hello, World"), + "'\"😎\"'::jsonb" == json!("😎"), + "'\"🙋‍♀️\"'::jsonb" == json!("🙋‍♀️"), + "'[\"Hello\", \"World!\"]'::jsonb" == json!(["Hello", "World!"]) +)); + +test_type!(jsonb_array>( + Postgres, + "array['\"😎\"'::jsonb, '\"🙋‍♀️\"'::jsonb]::jsonb[]" == vec![json!("😎"), json!("🙋‍♀️")], +)); + +#[derive(serde::Deserialize, serde::Serialize, Debug, PartialEq)] +struct Friend { + name: String, + age: u32, +} + +test_type!(json_struct>(Postgres, + "'{\"name\":\"Joe\",\"age\":33}'::jsonb" == Json(Friend { name: "Joe".to_string(), age: 33 }) +)); + +test_type!(json_struct_vec>>(Postgres, + "array['{\"name\":\"Joe\",\"age\":33}','{\"name\":\"Bob\",\"age\":22}']::jsonb[]" + == vec![ + Json(Friend { name: "Joe".to_string(), age: 33 }), + Json(Friend { name: "Bob".to_string(), age: 22 }), + ] +)); + +#[wasm_bindgen_test::wasm_bindgen_test] +async fn test_json_raw_value() { + let mut conn = sqlx_wasm_test::new().await; + + // unprepared, text API + let row: PgRow = conn + .fetch_one("SELECT '{\"hello\": \"world\"}'::jsonb") + .await + .unwrap(); + + let value: &JsonRawValue = row.try_get(0).unwrap(); + + assert_eq!(value.get(), "{\"hello\": \"world\"}"); + + // prepared, binary API + let row: PgRow = conn + .fetch_one(sqlx::query("SELECT '{\"hello\": \"world\"}'::jsonb")) + .await + .unwrap(); + + let value: &JsonRawValue = row.try_get(0).unwrap(); + + assert_eq!(value.get(), "{\"hello\": \"world\"}"); +} diff --git a/sqlx-wasm-test/src/pg_types_tests_money.rs b/sqlx-wasm-test/src/pg_types_tests_money.rs new file mode 100644 index 0000000000..a5dc1f7144 --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_money.rs @@ -0,0 +1,9 @@ +use sqlx_wasm_test::test_prepared_type; + +use sqlx::postgres::types::PgMoney; + +test_prepared_type!(money(Postgres, "123.45::money" == PgMoney(12345))); + +test_prepared_type!(money_vec>(Postgres, + "array[123.45,420.00,666.66]::money[]" == vec![PgMoney(12345), PgMoney(42000), PgMoney(66666)], +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_range.rs b/sqlx-wasm-test/src/pg_types_tests_range.rs new file mode 100644 index 0000000000..120c76529a --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_range.rs @@ -0,0 +1,27 @@ +use sqlx::postgres::types::PgRange; +use sqlx_wasm_test::test_type; +use std::ops::Bound; + +const EXC2: Bound = Bound::Excluded(2); +const EXC3: Bound = Bound::Excluded(3); +const INC1: Bound = Bound::Included(1); +const INC2: Bound = Bound::Included(2); +const UNB: Bound = Bound::Unbounded; + +test_type!(int4range>(Postgres, + "'(,)'::int4range" == PgRange::from((UNB, UNB)), + "'(,]'::int4range" == PgRange::from((UNB, UNB)), + "'(,2)'::int4range" == PgRange::from((UNB, EXC2)), + "'(,2]'::int4range" == PgRange::from((UNB, EXC3)), + "'(1,)'::int4range" == PgRange::from((INC2, UNB)), + "'(1,]'::int4range" == PgRange::from((INC2, UNB)), + "'(1,2]'::int4range" == PgRange::from((INC2, EXC3)), + "'[,)'::int4range" == PgRange::from((UNB, UNB)), + "'[,]'::int4range" == PgRange::from((UNB, UNB)), + "'[,2)'::int4range" == PgRange::from((UNB, EXC2)), + "'[,2]'::int4range" == PgRange::from((UNB, EXC3)), + "'[1,)'::int4range" == PgRange::from((INC1, UNB)), + "'[1,]'::int4range" == PgRange::from((INC1, UNB)), + "'[1,2)'::int4range" == PgRange::from((INC1, EXC2)), + "'[1,2]'::int4range" == PgRange::from((INC1, EXC3)), +)); diff --git a/sqlx-wasm-test/src/pg_types_tests_time.rs b/sqlx-wasm-test/src/pg_types_tests_time.rs new file mode 100644 index 0000000000..7f1f94f34b --- /dev/null +++ b/sqlx-wasm-test/src/pg_types_tests_time.rs @@ -0,0 +1,39 @@ +use sqlx_wasm_test::{test_prepared_type, test_type}; + +use sqlx::types::time::{Date, OffsetDateTime, PrimitiveDateTime, Time, UtcOffset}; +use time::{date, time}; + +type PgTimeTz = sqlx::postgres::types::PgTimeTz; + +test_type!(time_date( + Postgres, + "DATE '2001-01-05'" == date!(2001 - 1 - 5), + "DATE '2050-11-23'" == date!(2050 - 11 - 23) +)); + +test_type!(time_time