Skip to content

Implement non-consuming API (fix #103) #108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
698 changes: 350 additions & 348 deletions src/conn/mod.rs

Large diffs are not rendered by default.

15 changes: 9 additions & 6 deletions src/conn/pool/futures/disconnect_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ use crate::{

use std::sync::{atomic, Arc};

/// Future that disconnects this pool from server and resolves to `()`.
/// Future that disconnects this pool from a server and resolves to `()`.
///
/// Active connections taken from this pool should be disconnected manually.
/// Also all pending and new `GetConn`'s will resolve to error.
///
/// **Note:** This Future won't resolve until all active connections, taken from it,
/// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error.
pub struct DisconnectPool {
pool_inner: Arc<Inner>,
}

pub fn new(pool: Pool) -> DisconnectPool {
DisconnectPool {
pool_inner: pool.inner,
impl DisconnectPool {
pub(crate) fn new(pool: Pool) -> Self {
Self {
pool_inner: pool.inner,
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/conn/pool/futures/get_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(crate) enum GetConnInner {
New,
Done(Option<Conn>),
// TODO: one day this should be an existential
Connecting(BoxFuture<Conn>),
Connecting(BoxFuture<'static, Conn>),
}

impl GetConnInner {
Expand Down
2 changes: 1 addition & 1 deletion src/conn/pool/futures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

pub(super) use self::get_conn::GetConnInner;
pub use self::{
disconnect_pool::{new as new_disconnect_pool, DisconnectPool},
disconnect_pool::DisconnectPool,
get_conn::{new as new_get_conn, GetConn},
};

Expand Down
179 changes: 77 additions & 102 deletions src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ use crate::{
conn::{pool::futures::*, Conn},
error::*,
opts::{Opts, PoolOptions},
queryable::{
transaction::{Transaction, TransactionOptions},
Queryable,
},
queryable::transaction::TxStatus,
};

mod recycler;
Expand Down Expand Up @@ -97,7 +94,7 @@ pub struct Inner {
#[derive(Clone)]
/// Asynchronous pool of MySql connections.
///
/// Note that you will probably want to await `Pool::disconnect` before dropping the runtime, as
/// Note that you will probably want to await [`Pool::disconnect`] before dropping the runtime, as
/// otherwise you may end up with a number of connections that are not cleanly terminated.
pub struct Pool {
opts: Opts,
Expand All @@ -112,7 +109,7 @@ impl fmt::Debug for Pool {
}

impl Pool {
/// Creates new pool of connections.
/// Creates a new pool of connections.
pub fn new<O: Into<Opts>>(opts: O) -> Pool {
let opts = opts.into();
let pool_options = opts.get_pool_options().clone();
Expand All @@ -133,29 +130,21 @@ impl Pool {
}
}

/// Creates new pool of connections.
/// Creates a new pool of connections.
pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
let opts = Opts::from_str(url.as_ref())?;
Ok(Pool::new(opts))
}

/// Returns future that resolves to `Conn`.
/// Returns a future that resolves to [`Conn`].
pub fn get_conn(&self) -> GetConn {
new_get_conn(self)
}

/// Shortcut for `get_conn` followed by `start_transaction`.
pub async fn start_transaction(
&self,
options: TransactionOptions,
) -> Result<Transaction<Conn>> {
Queryable::start_transaction(self.get_conn().await?, options).await
}

/// Returns future that disconnects this pool from server and resolves to `()`.
/// Returns a future that disconnects this pool from the server and resolves to `()`.
///
/// Active connections taken from this pool should be disconnected manually.
/// Also all pending and new `GetConn`'s will resolve to error.
/// **Note:** This Future won't resolve until all active connections, taken from it,
/// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error.
pub fn disconnect(self) -> DisconnectPool {
let was_closed = self.inner.close.swap(true, atomic::Ordering::AcqRel);
if !was_closed {
Expand All @@ -166,7 +155,7 @@ impl Pool {
let _ = self.drop.send(None).is_ok();
}

new_disconnect_pool(self)
DisconnectPool::new(self)
}

/// A way to return connection taken from a pool.
Expand All @@ -178,7 +167,7 @@ impl Pool {
if conn.inner.stream.is_some()
&& !conn.inner.disconnected
&& !conn.expired()
&& !conn.inner.in_transaction
&& conn.inner.tx_status == TxStatus::None
&& conn.inner.has_result.is_none()
&& !self.inner.close.load(atomic::Ordering::Acquire)
{
Expand Down Expand Up @@ -277,6 +266,10 @@ impl Pool {

impl Drop for Conn {
fn drop(&mut self) {
if std::thread::panicking() {
return;
}

if let Some(mut pool) = self.inner.pool.take() {
pool.return_conn(self.take());
} else if self.inner.stream.is_some() && !self.inner.disconnected {
Expand Down Expand Up @@ -377,31 +370,31 @@ mod test {
.await?
.drop_query("CREATE TABLE IF NOT EXISTS tmp(id int)")
.await?;
let _ = pool
let mut conn = pool.get_conn().await?;
let mut tx = conn
.start_transaction(TransactionOptions::default())
.await?
.batch_exec("INSERT INTO tmp (id) VALUES (?)", vec![(1,), (2,)])
.await?
.prep_exec("SELECT * FROM tmp", ())
.await?;
tx.batch_exec("INSERT INTO tmp (id) VALUES (?)", vec![(1_u8,), (2_u8,)])
.await?;
tx.prep_exec("SELECT * FROM tmp", ()).await?;
drop(tx);
drop(conn);
let row_opt = pool
.get_conn()
.await?
.first("SELECT COUNT(*) FROM tmp")
.await?
.1;
.await?;
assert_eq!(row_opt, Some((0u8,)));
pool.get_conn().await?.drop_query("DROP TABLE tmp").await?;
pool.disconnect().await?;
Ok(())
}

#[tokio::test]
async fn aa_should_hold_bounds2() -> super::Result<()> {
use std::cmp::min;

async fn should_check_inactive_connection_ttl() -> super::Result<()> {
const POOL_MIN: usize = 5;
const POOL_MAX: usize = 10;

const INACTIVE_CONNECTION_TTL: Duration = Duration::from_millis(500);
const TTL_CHECK_INTERVAL: Duration = Duration::from_secs(1);

Expand All @@ -417,6 +410,44 @@ mod test {
let pool_clone = pool.clone();
let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();

let conns = try_join_all(conns).await?;

assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
drop(conns);

// wait for a bit to let the connections be reclaimed
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;

// check that connections are still in the pool because of inactive_connection_ttl
assert_eq!(ex_field!(pool_clone, available).len(), POOL_MAX);

// then, wait for ttl_check_interval
tokio::time::delay_for(TTL_CHECK_INTERVAL).await;

// check that we have the expected number of connections
assert_eq!(ex_field!(pool_clone, available).len(), POOL_MIN);

Ok(())
}

#[tokio::test]
async fn aa_should_hold_bounds2() -> super::Result<()> {
use std::cmp::min;

const POOL_MIN: usize = 5;
const POOL_MAX: usize = 10;

let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
let pool_options = PoolOptions::with_constraints(constraints);

// Clean
let mut opts = get_opts();
opts.pool_options(pool_options);

let pool = Pool::new(opts);
let pool_clone = pool.clone();
let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();

let mut conns = try_join_all(conns).await?;

// we want to continuously drop connections
Expand Down Expand Up @@ -444,15 +475,6 @@ mod test {
let idle = min(dropped, POOL_MIN);
let expected = conns.len() + idle;

if dropped > POOL_MIN {
// check that connection is still in the pool because of inactive_connection_ttl
let have = ex_field!(pool_clone, exist);
assert_eq!(have, expected + 1);

// then, wait for ttl_check_interval
tokio::time::delay_for(TTL_CHECK_INTERVAL + Duration::from_millis(50)).await;
}

// check that we have the expected number of connections
let have = ex_field!(pool_clone, exist);
assert_eq!(have, expected);
Expand Down Expand Up @@ -507,61 +529,34 @@ mod test {
async fn zz_should_check_wait_timeout_on_get_conn() -> super::Result<()> {
let pool = Pool::new(get_opts());

let conn = pool.get_conn().await?;
let (conn, wait_timeout_orig) = conn.first::<_, usize>("SELECT @@wait_timeout").await?;
conn.drop_query("SET GLOBAL wait_timeout = 3")
.await?
.disconnect()
.await?;
let mut conn = pool.get_conn().await?;
let wait_timeout_orig = conn.first::<_, usize>("SELECT @@wait_timeout").await?;
conn.drop_query("SET GLOBAL wait_timeout = 3").await?;
conn.disconnect().await?;

let conn = pool.get_conn().await?;
let (conn, wait_timeout) = conn.first::<_, usize>("SELECT @@wait_timeout").await?;
let (_, id1) = conn.first::<_, usize>("SELECT CONNECTION_ID()").await?;
let mut conn = pool.get_conn().await?;
let wait_timeout = conn.first::<_, usize>("SELECT @@wait_timeout").await?;
let id1 = conn.first::<_, usize>("SELECT CONNECTION_ID()").await?;
drop(conn);

assert_eq!(wait_timeout, Some(3));
assert_eq!(ex_field!(pool, exist), 1);

tokio::time::delay_for(std::time::Duration::from_secs(6)).await;

let conn = pool.get_conn().await?;
let (conn, id2) = conn.first::<_, usize>("SELECT CONNECTION_ID()").await?;
let mut conn = pool.get_conn().await?;
let id2 = conn.first::<_, usize>("SELECT CONNECTION_ID()").await?;
assert_eq!(ex_field!(pool, exist), 1);
assert_ne!(id1, id2);

conn.drop_exec("SET GLOBAL wait_timeout = ?", (wait_timeout_orig,))
.await?;
drop(conn);

pool.disconnect().await
}

/*
#[test]
fn should_hold_bounds_on_get_conn_drop() {
let pool = Pool::new(format!("{}?pool_min=1&pool_max=2", get_opts()));
let mut runtime = tokio::runtime::Runtime::new().unwrap();
pool.disconnect().await?;

// This test is a bit more intricate: we need to poll the connection future once to get the
// pool to set it up, then drop it and make sure that the `exist` count is updated.
//
// We wrap all of it in a lazy future to get us into the tokio context that deals with
// setting up tasks. There might be a better way to do this but I don't remember right
// now. Besides, std::future is just around the corner making this obsolete.
//
// It depends on implementation details of GetConn, but that should be fine.
runtime
.block_on(future::lazy(move || {
let mut conn = pool.get_conn();
assert_eq!(pool.inner.exist.load(atomic::Ordering::SeqCst), 0);
let result = conn.poll().expect("successful first poll");
assert!(result.is_not_ready(), "not ready after first poll");
assert_eq!(pool.inner.exist.load(atomic::Ordering::SeqCst), 1);
drop(conn);
assert_eq!(pool.inner.exist.load(atomic::Ordering::SeqCst), 0);
Ok::<(), ()>(())
}))
.unwrap();
Ok(())
}
*/

#[tokio::test]
async fn droptest() -> super::Result<()> {
Expand Down Expand Up @@ -607,7 +602,7 @@ mod test {
let (tx, rx) = tokio::sync::oneshot::channel();
rt.block_on(async move {
let pool = Pool::new(get_opts());
let c = pool.get_conn().await.unwrap();
let mut c = pool.get_conn().await.unwrap();
tokio::spawn(async move {
let _ = rx.await;
let _ = c.drop_query("SELECT 1").await;
Expand All @@ -627,7 +622,7 @@ mod test {
let (tx, rx) = tokio::sync::oneshot::channel();
let jh = rt.spawn(async move {
let pool = Pool::new(get_opts());
let c = pool.get_conn().await.unwrap();
let mut c = pool.get_conn().await.unwrap();
tokio::spawn(async move {
let _ = rx.await;
let _ = c.drop_query("SELECT 1").await;
Expand All @@ -639,26 +634,6 @@ mod test {
}
}

/*
#[test]
#[ignore]
fn should_not_panic_if_dropped_without_tokio_runtime() {
// NOTE: this test does not work anymore, since the runtime won't be idle until either
//
// - all Pools and Conns are dropped; OR
// - Pool::disconnect is called; OR
// - Runtime::shutdown_now is called
//
// none of these are true in this test, which is why it's been ignored
let pool = Pool::new(get_opts());
run(collect(
(0..10).map(|_| pool.get_conn()).collect::<Vec<_>>(),
))
.unwrap();
// pool will drop here
}
*/

#[cfg(feature = "nightly")]
mod bench {
use futures_util::{future::FutureExt, try_future::TryFutureExt};
Expand Down
Loading