Skip to content

Commit 7a3c856

Browse files
authored
Merge pull request #108 from blackbeam/non-consuming-api
Implement non-consuming API (fix #103)
2 parents 60f94df + 71fc5a7 commit 7a3c856

27 files changed

+1353
-1750
lines changed

src/conn/mod.rs

Lines changed: 350 additions & 348 deletions
Large diffs are not rendered by default.

src/conn/pool/futures/disconnect_pool.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,20 @@ use crate::{
1919

2020
use std::sync::{atomic, Arc};
2121

22-
/// Future that disconnects this pool from server and resolves to `()`.
22+
/// Future that disconnects this pool from a server and resolves to `()`.
2323
///
24-
/// Active connections taken from this pool should be disconnected manually.
25-
/// Also all pending and new `GetConn`'s will resolve to error.
24+
///
25+
/// **Note:** This Future won't resolve until all active connections, taken from it,
26+
/// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error.
2627
pub struct DisconnectPool {
2728
pool_inner: Arc<Inner>,
2829
}
2930

30-
pub fn new(pool: Pool) -> DisconnectPool {
31-
DisconnectPool {
32-
pool_inner: pool.inner,
31+
impl DisconnectPool {
32+
pub(crate) fn new(pool: Pool) -> Self {
33+
Self {
34+
pool_inner: pool.inner,
35+
}
3336
}
3437
}
3538

src/conn/pool/futures/get_conn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub(crate) enum GetConnInner {
2323
New,
2424
Done(Option<Conn>),
2525
// TODO: one day this should be an existential
26-
Connecting(BoxFuture<Conn>),
26+
Connecting(BoxFuture<'static, Conn>),
2727
}
2828

2929
impl GetConnInner {

src/conn/pool/futures/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
pub(super) use self::get_conn::GetConnInner;
1010
pub use self::{
11-
disconnect_pool::{new as new_disconnect_pool, DisconnectPool},
11+
disconnect_pool::DisconnectPool,
1212
get_conn::{new as new_get_conn, GetConn},
1313
};
1414

src/conn/pool/mod.rs

Lines changed: 77 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@ use crate::{
2222
conn::{pool::futures::*, Conn},
2323
error::*,
2424
opts::{Opts, PoolOptions},
25-
queryable::{
26-
transaction::{Transaction, TransactionOptions},
27-
Queryable,
28-
},
25+
queryable::transaction::TxStatus,
2926
};
3027

3128
mod recycler;
@@ -97,7 +94,7 @@ pub struct Inner {
9794
#[derive(Clone)]
9895
/// Asynchronous pool of MySql connections.
9996
///
100-
/// Note that you will probably want to await `Pool::disconnect` before dropping the runtime, as
97+
/// Note that you will probably want to await [`Pool::disconnect`] before dropping the runtime, as
10198
/// otherwise you may end up with a number of connections that are not cleanly terminated.
10299
pub struct Pool {
103100
opts: Opts,
@@ -112,7 +109,7 @@ impl fmt::Debug for Pool {
112109
}
113110

114111
impl Pool {
115-
/// Creates new pool of connections.
112+
/// Creates a new pool of connections.
116113
pub fn new<O: Into<Opts>>(opts: O) -> Pool {
117114
let opts = opts.into();
118115
let pool_options = opts.get_pool_options().clone();
@@ -133,29 +130,21 @@ impl Pool {
133130
}
134131
}
135132

136-
/// Creates new pool of connections.
133+
/// Creates a new pool of connections.
137134
pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
138135
let opts = Opts::from_str(url.as_ref())?;
139136
Ok(Pool::new(opts))
140137
}
141138

142-
/// Returns future that resolves to `Conn`.
139+
/// Returns a future that resolves to [`Conn`].
143140
pub fn get_conn(&self) -> GetConn {
144141
new_get_conn(self)
145142
}
146143

147-
/// Shortcut for `get_conn` followed by `start_transaction`.
148-
pub async fn start_transaction(
149-
&self,
150-
options: TransactionOptions,
151-
) -> Result<Transaction<Conn>> {
152-
Queryable::start_transaction(self.get_conn().await?, options).await
153-
}
154-
155-
/// Returns future that disconnects this pool from server and resolves to `()`.
144+
/// Returns a future that disconnects this pool from the server and resolves to `()`.
156145
///
157-
/// Active connections taken from this pool should be disconnected manually.
158-
/// Also all pending and new `GetConn`'s will resolve to error.
146+
/// **Note:** This Future won't resolve until all active connections, taken from it,
147+
/// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error.
159148
pub fn disconnect(self) -> DisconnectPool {
160149
let was_closed = self.inner.close.swap(true, atomic::Ordering::AcqRel);
161150
if !was_closed {
@@ -166,7 +155,7 @@ impl Pool {
166155
let _ = self.drop.send(None).is_ok();
167156
}
168157

169-
new_disconnect_pool(self)
158+
DisconnectPool::new(self)
170159
}
171160

172161
/// A way to return connection taken from a pool.
@@ -178,7 +167,7 @@ impl Pool {
178167
if conn.inner.stream.is_some()
179168
&& !conn.inner.disconnected
180169
&& !conn.expired()
181-
&& !conn.inner.in_transaction
170+
&& conn.inner.tx_status == TxStatus::None
182171
&& conn.inner.has_result.is_none()
183172
&& !self.inner.close.load(atomic::Ordering::Acquire)
184173
{
@@ -277,6 +266,10 @@ impl Pool {
277266

278267
impl Drop for Conn {
279268
fn drop(&mut self) {
269+
if std::thread::panicking() {
270+
return;
271+
}
272+
280273
if let Some(mut pool) = self.inner.pool.take() {
281274
pool.return_conn(self.take());
282275
} else if self.inner.stream.is_some() && !self.inner.disconnected {
@@ -377,31 +370,31 @@ mod test {
377370
.await?
378371
.drop_query("CREATE TABLE IF NOT EXISTS tmp(id int)")
379372
.await?;
380-
let _ = pool
373+
let mut conn = pool.get_conn().await?;
374+
let mut tx = conn
381375
.start_transaction(TransactionOptions::default())
382-
.await?
383-
.batch_exec("INSERT INTO tmp (id) VALUES (?)", vec![(1,), (2,)])
384-
.await?
385-
.prep_exec("SELECT * FROM tmp", ())
386376
.await?;
377+
tx.batch_exec("INSERT INTO tmp (id) VALUES (?)", vec![(1_u8,), (2_u8,)])
378+
.await?;
379+
tx.prep_exec("SELECT * FROM tmp", ()).await?;
380+
drop(tx);
381+
drop(conn);
387382
let row_opt = pool
388383
.get_conn()
389384
.await?
390385
.first("SELECT COUNT(*) FROM tmp")
391-
.await?
392-
.1;
386+
.await?;
393387
assert_eq!(row_opt, Some((0u8,)));
394388
pool.get_conn().await?.drop_query("DROP TABLE tmp").await?;
395389
pool.disconnect().await?;
396390
Ok(())
397391
}
398392

399393
#[tokio::test]
400-
async fn aa_should_hold_bounds2() -> super::Result<()> {
401-
use std::cmp::min;
402-
394+
async fn should_check_inactive_connection_ttl() -> super::Result<()> {
403395
const POOL_MIN: usize = 5;
404396
const POOL_MAX: usize = 10;
397+
405398
const INACTIVE_CONNECTION_TTL: Duration = Duration::from_millis(500);
406399
const TTL_CHECK_INTERVAL: Duration = Duration::from_secs(1);
407400

@@ -417,6 +410,44 @@ mod test {
417410
let pool_clone = pool.clone();
418411
let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
419412

413+
let conns = try_join_all(conns).await?;
414+
415+
assert_eq!(ex_field!(pool_clone, exist), POOL_MAX);
416+
drop(conns);
417+
418+
// wait for a bit to let the connections be reclaimed
419+
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
420+
421+
// check that connections are still in the pool because of inactive_connection_ttl
422+
assert_eq!(ex_field!(pool_clone, available).len(), POOL_MAX);
423+
424+
// then, wait for ttl_check_interval
425+
tokio::time::delay_for(TTL_CHECK_INTERVAL).await;
426+
427+
// check that we have the expected number of connections
428+
assert_eq!(ex_field!(pool_clone, available).len(), POOL_MIN);
429+
430+
Ok(())
431+
}
432+
433+
#[tokio::test]
434+
async fn aa_should_hold_bounds2() -> super::Result<()> {
435+
use std::cmp::min;
436+
437+
const POOL_MIN: usize = 5;
438+
const POOL_MAX: usize = 10;
439+
440+
let constraints = PoolConstraints::new(POOL_MIN, POOL_MAX).unwrap();
441+
let pool_options = PoolOptions::with_constraints(constraints);
442+
443+
// Clean
444+
let mut opts = get_opts();
445+
opts.pool_options(pool_options);
446+
447+
let pool = Pool::new(opts);
448+
let pool_clone = pool.clone();
449+
let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();
450+
420451
let mut conns = try_join_all(conns).await?;
421452

422453
// we want to continuously drop connections
@@ -444,15 +475,6 @@ mod test {
444475
let idle = min(dropped, POOL_MIN);
445476
let expected = conns.len() + idle;
446477

447-
if dropped > POOL_MIN {
448-
// check that connection is still in the pool because of inactive_connection_ttl
449-
let have = ex_field!(pool_clone, exist);
450-
assert_eq!(have, expected + 1);
451-
452-
// then, wait for ttl_check_interval
453-
tokio::time::delay_for(TTL_CHECK_INTERVAL + Duration::from_millis(50)).await;
454-
}
455-
456478
// check that we have the expected number of connections
457479
let have = ex_field!(pool_clone, exist);
458480
assert_eq!(have, expected);
@@ -507,61 +529,34 @@ mod test {
507529
async fn zz_should_check_wait_timeout_on_get_conn() -> super::Result<()> {
508530
let pool = Pool::new(get_opts());
509531

510-
let conn = pool.get_conn().await?;
511-
let (conn, wait_timeout_orig) = conn.first::<_, usize>("SELECT @@wait_timeout").await?;
512-
conn.drop_query("SET GLOBAL wait_timeout = 3")
513-
.await?
514-
.disconnect()
515-
.await?;
532+
let mut conn = pool.get_conn().await?;
533+
let wait_timeout_orig = conn.first::<_, usize>("SELECT @@wait_timeout").await?;
534+
conn.drop_query("SET GLOBAL wait_timeout = 3").await?;
535+
conn.disconnect().await?;
516536

517-
let conn = pool.get_conn().await?;
518-
let (conn, wait_timeout) = conn.first::<_, usize>("SELECT @@wait_timeout").await?;
519-
let (_, id1) = conn.first::<_, usize>("SELECT CONNECTION_ID()").await?;
537+
let mut conn = pool.get_conn().await?;
538+
let wait_timeout = conn.first::<_, usize>("SELECT @@wait_timeout").await?;
539+
let id1 = conn.first::<_, usize>("SELECT CONNECTION_ID()").await?;
540+
drop(conn);
520541

521542
assert_eq!(wait_timeout, Some(3));
522543
assert_eq!(ex_field!(pool, exist), 1);
523544

524545
tokio::time::delay_for(std::time::Duration::from_secs(6)).await;
525546

526-
let conn = pool.get_conn().await?;
527-
let (conn, id2) = conn.first::<_, usize>("SELECT CONNECTION_ID()").await?;
547+
let mut conn = pool.get_conn().await?;
548+
let id2 = conn.first::<_, usize>("SELECT CONNECTION_ID()").await?;
528549
assert_eq!(ex_field!(pool, exist), 1);
529550
assert_ne!(id1, id2);
530551

531552
conn.drop_exec("SET GLOBAL wait_timeout = ?", (wait_timeout_orig,))
532553
.await?;
554+
drop(conn);
533555

534-
pool.disconnect().await
535-
}
536-
537-
/*
538-
#[test]
539-
fn should_hold_bounds_on_get_conn_drop() {
540-
let pool = Pool::new(format!("{}?pool_min=1&pool_max=2", get_opts()));
541-
let mut runtime = tokio::runtime::Runtime::new().unwrap();
556+
pool.disconnect().await?;
542557

543-
// This test is a bit more intricate: we need to poll the connection future once to get the
544-
// pool to set it up, then drop it and make sure that the `exist` count is updated.
545-
//
546-
// We wrap all of it in a lazy future to get us into the tokio context that deals with
547-
// setting up tasks. There might be a better way to do this but I don't remember right
548-
// now. Besides, std::future is just around the corner making this obsolete.
549-
//
550-
// It depends on implementation details of GetConn, but that should be fine.
551-
runtime
552-
.block_on(future::lazy(move || {
553-
let mut conn = pool.get_conn();
554-
assert_eq!(pool.inner.exist.load(atomic::Ordering::SeqCst), 0);
555-
let result = conn.poll().expect("successful first poll");
556-
assert!(result.is_not_ready(), "not ready after first poll");
557-
assert_eq!(pool.inner.exist.load(atomic::Ordering::SeqCst), 1);
558-
drop(conn);
559-
assert_eq!(pool.inner.exist.load(atomic::Ordering::SeqCst), 0);
560-
Ok::<(), ()>(())
561-
}))
562-
.unwrap();
558+
Ok(())
563559
}
564-
*/
565560

566561
#[tokio::test]
567562
async fn droptest() -> super::Result<()> {
@@ -607,7 +602,7 @@ mod test {
607602
let (tx, rx) = tokio::sync::oneshot::channel();
608603
rt.block_on(async move {
609604
let pool = Pool::new(get_opts());
610-
let c = pool.get_conn().await.unwrap();
605+
let mut c = pool.get_conn().await.unwrap();
611606
tokio::spawn(async move {
612607
let _ = rx.await;
613608
let _ = c.drop_query("SELECT 1").await;
@@ -627,7 +622,7 @@ mod test {
627622
let (tx, rx) = tokio::sync::oneshot::channel();
628623
let jh = rt.spawn(async move {
629624
let pool = Pool::new(get_opts());
630-
let c = pool.get_conn().await.unwrap();
625+
let mut c = pool.get_conn().await.unwrap();
631626
tokio::spawn(async move {
632627
let _ = rx.await;
633628
let _ = c.drop_query("SELECT 1").await;
@@ -639,26 +634,6 @@ mod test {
639634
}
640635
}
641636

642-
/*
643-
#[test]
644-
#[ignore]
645-
fn should_not_panic_if_dropped_without_tokio_runtime() {
646-
// NOTE: this test does not work anymore, since the runtime won't be idle until either
647-
//
648-
// - all Pools and Conns are dropped; OR
649-
// - Pool::disconnect is called; OR
650-
// - Runtime::shutdown_now is called
651-
//
652-
// none of these are true in this test, which is why it's been ignored
653-
let pool = Pool::new(get_opts());
654-
run(collect(
655-
(0..10).map(|_| pool.get_conn()).collect::<Vec<_>>(),
656-
))
657-
.unwrap();
658-
// pool will drop here
659-
}
660-
*/
661-
662637
#[cfg(feature = "nightly")]
663638
mod bench {
664639
use futures_util::{future::FutureExt, try_future::TryFutureExt};

0 commit comments

Comments
 (0)