Skip to content

Commit 02e47d9

Browse files
authored
Merge pull request #258 from cloneable/absolute-connection-ttl
Option to set an absolute TTL for connections
2 parents e9e037b + ec1a698 commit 02e47d9

File tree

5 files changed

+216
-19
lines changed

5 files changed

+216
-19
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ once_cell = "1.7.2"
2828
pem = "3.0"
2929
percent-encoding = "2.1.0"
3030
pin-project = "1.0.2"
31+
rand = "0.8.5"
3132
serde = "1"
3233
serde_json = "1"
3334
socket2 = "0.5.2"
@@ -74,7 +75,6 @@ optional = true
7475
tempfile = "3.1.0"
7576
socket2 = { version = "0.5.2", features = ["all"] }
7677
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
77-
rand = "0.8.0"
7878

7979
[features]
8080
default = [

src/conn/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ struct ConnInner {
107107
tx_status: TxStatus,
108108
reset_upon_returning_to_a_pool: bool,
109109
opts: Opts,
110+
ttl_deadline: Option<Instant>,
110111
last_io: Instant,
111112
wait_timeout: Duration,
112113
stmt_cache: StmtCache,
@@ -140,6 +141,7 @@ impl fmt::Debug for ConnInner {
140141
impl ConnInner {
141142
/// Constructs an empty connection.
142143
fn empty(opts: Opts) -> ConnInner {
144+
let ttl_deadline = opts.pool_opts().new_connection_ttl_deadline();
143145
ConnInner {
144146
capabilities: opts.get_capabilities(),
145147
status: StatusFlags::empty(),
@@ -157,6 +159,7 @@ impl ConnInner {
157159
stmt_cache: StmtCache::new(opts.stmt_cache_size()),
158160
socket: opts.socket().map(Into::into),
159161
opts,
162+
ttl_deadline,
160163
nonce: Vec::default(),
161164
auth_plugin: AuthPlugin::MysqlNativePassword,
162165
auth_switched: false,
@@ -1088,6 +1091,11 @@ impl Conn {
10881091
/// Returns true if time since last IO exceeds `wait_timeout`
10891092
/// (or `conn_ttl` if specified in opts).
10901093
fn expired(&self) -> bool {
1094+
if let Some(deadline) = self.inner.ttl_deadline {
1095+
if Instant::now() > deadline {
1096+
return true;
1097+
}
1098+
}
10911099
let ttl = self
10921100
.inner
10931101
.opts

src/conn/pool/mod.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ struct IdlingConn {
4444
}
4545

4646
impl IdlingConn {
47+
/// Returns true when this connection has a TTL and it elapsed.
48+
fn expired(&self) -> bool {
49+
self.conn
50+
.inner
51+
.ttl_deadline
52+
.map(|t| Instant::now() > t)
53+
.unwrap_or_default()
54+
}
55+
4756
/// Returns duration elapsed since this connection is idling.
4857
fn elapsed(&self) -> Duration {
4958
self.since.elapsed()
@@ -82,8 +91,11 @@ impl Exchange {
8291
// Spawn the Recycler.
8392
tokio::spawn(Recycler::new(pool_opts.clone(), inner.clone(), dropped));
8493

85-
// Spawn the ttl check interval if `inactive_connection_ttl` isn't `0`
86-
if pool_opts.inactive_connection_ttl() > Duration::from_secs(0) {
94+
// Spawn the ttl check interval if `inactive_connection_ttl` isn't `0` or
95+
// connections have an absolute TTL.
96+
if pool_opts.inactive_connection_ttl() > Duration::ZERO
97+
|| pool_opts.abs_conn_ttl().is_some()
98+
{
8799
tokio::spawn(TtlCheckInterval::new(pool_opts, inner.clone()));
88100
}
89101
}
@@ -1012,6 +1024,41 @@ mod test {
10121024
assert_eq!(0, waitlist.queue.len());
10131025
}
10141026

1027+
#[tokio::test]
1028+
async fn check_absolute_connection_ttl() -> super::Result<()> {
1029+
let constraints = PoolConstraints::new(1, 3).unwrap();
1030+
let pool_opts = PoolOpts::default()
1031+
.with_constraints(constraints)
1032+
.with_inactive_connection_ttl(Duration::from_secs(99))
1033+
.with_ttl_check_interval(Duration::from_secs(1))
1034+
.with_abs_conn_ttl(Some(Duration::from_secs(2)));
1035+
1036+
let pool = Pool::new(get_opts().pool_opts(pool_opts));
1037+
1038+
let conn_ttl0 = pool.get_conn().await?;
1039+
sleep(Duration::from_millis(1000)).await;
1040+
let conn_ttl1 = pool.get_conn().await?;
1041+
sleep(Duration::from_millis(1000)).await;
1042+
let conn_ttl2 = pool.get_conn().await?;
1043+
1044+
drop(conn_ttl0);
1045+
drop(conn_ttl1);
1046+
drop(conn_ttl2);
1047+
assert_eq!(ex_field!(pool, exist), 3);
1048+
1049+
sleep(Duration::from_millis(1500)).await;
1050+
assert_eq!(ex_field!(pool, exist), 2);
1051+
1052+
sleep(Duration::from_millis(1000)).await;
1053+
assert_eq!(ex_field!(pool, exist), 1);
1054+
1055+
// Go even below min pool size.
1056+
sleep(Duration::from_millis(1000)).await;
1057+
assert_eq!(ex_field!(pool, exist), 0);
1058+
1059+
Ok(())
1060+
}
1061+
10151062
#[cfg(feature = "nightly")]
10161063
mod bench {
10171064
use futures_util::future::{FutureExt, TryFutureExt};

src/conn/pool/ttl_check_inerval.rs

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use pin_project::pin_project;
1111
use tokio::time::{self, Interval};
1212

1313
use std::{
14+
collections::VecDeque,
1415
future::Future,
1516
sync::{atomic::Ordering, Arc},
1617
};
@@ -46,24 +47,41 @@ impl TtlCheckInterval {
4647

4748
/// Perform the check.
4849
pub fn check_ttl(&self) {
49-
let mut exchange = self.inner.exchange.lock().unwrap();
50+
let to_be_dropped = {
51+
let mut exchange = self.inner.exchange.lock().unwrap();
5052

51-
let num_idling = exchange.available.len();
52-
let num_to_drop = num_idling.saturating_sub(self.pool_opts.constraints().min());
53+
let num_to_drop = exchange
54+
.available
55+
.len()
56+
.saturating_sub(self.pool_opts.constraints().min());
5357

54-
for _ in 0..num_to_drop {
55-
let idling_conn = exchange.available.pop_front().unwrap();
56-
if idling_conn.elapsed() > self.pool_opts.inactive_connection_ttl() {
57-
assert!(idling_conn.conn.inner.pool.is_none());
58-
let inner = self.inner.clone();
59-
tokio::spawn(idling_conn.conn.disconnect().then(move |_| {
60-
let mut exchange = inner.exchange.lock().unwrap();
61-
exchange.exist -= 1;
62-
ok::<_, ()>(())
63-
}));
64-
} else {
65-
exchange.available.push_back(idling_conn);
58+
let mut to_be_dropped = Vec::<_>::with_capacity(exchange.available.len());
59+
let mut kept_available =
60+
VecDeque::<_>::with_capacity(self.pool_opts.constraints().max());
61+
62+
while let Some(conn) = exchange.available.pop_front() {
63+
if conn.expired() {
64+
to_be_dropped.push(conn);
65+
} else if to_be_dropped.len() < num_to_drop
66+
&& conn.elapsed() > self.pool_opts.inactive_connection_ttl()
67+
{
68+
to_be_dropped.push(conn);
69+
} else {
70+
kept_available.push_back(conn);
71+
}
6672
}
73+
exchange.available = kept_available;
74+
to_be_dropped
75+
};
76+
77+
for idling_conn in to_be_dropped {
78+
assert!(idling_conn.conn.inner.pool.is_none());
79+
let inner = self.inner.clone();
80+
tokio::spawn(idling_conn.conn.disconnect().then(move |_| {
81+
let mut exchange = inner.exchange.lock().unwrap();
82+
exchange.exist -= 1;
83+
ok::<_, ()>(())
84+
}));
6785
}
6886
}
6987
}

src/opts/mod.rs

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub use native_tls_opts::ClientIdentity;
1616
pub use rustls_opts::ClientIdentity;
1717

1818
use percent_encoding::percent_decode;
19+
use rand::Rng;
1920
use url::{Host, Url};
2021

2122
use std::{
@@ -26,7 +27,7 @@ use std::{
2627
path::Path,
2728
str::FromStr,
2829
sync::Arc,
29-
time::Duration,
30+
time::{Duration, Instant},
3031
vec,
3132
};
3233

@@ -209,6 +210,8 @@ pub struct PoolOpts {
209210
constraints: PoolConstraints,
210211
inactive_connection_ttl: Duration,
211212
ttl_check_interval: Duration,
213+
abs_conn_ttl: Option<Duration>,
214+
abs_conn_ttl_jitter: Option<Duration>,
212215
reset_connection: bool,
213216
}
214217

@@ -273,6 +276,49 @@ impl PoolOpts {
273276
self.reset_connection
274277
}
275278

279+
/// Sets an absolute TTL after which a connection is removed from the pool.
280+
/// This may push the pool below the requested minimum pool size and is indepedent of the
281+
/// idle TTL.
282+
/// The absolute TTL is disabled by default.
283+
/// Fractions of seconds are ignored.
284+
pub fn with_abs_conn_ttl(mut self, ttl: Option<Duration>) -> Self {
285+
self.abs_conn_ttl = ttl;
286+
self
287+
}
288+
289+
/// Optionally, the absolute TTL can be extended by a per-connection random amount
290+
/// bounded by `jitter`.
291+
/// Setting `abs_conn_ttl_jitter` without `abs_conn_ttl` has no effect.
292+
/// Fractions of seconds are ignored.
293+
pub fn with_abs_conn_ttl_jitter(mut self, jitter: Option<Duration>) -> Self {
294+
self.abs_conn_ttl_jitter = jitter;
295+
self
296+
}
297+
298+
/// Returns the absolute TTL, if set.
299+
pub fn abs_conn_ttl(&self) -> Option<Duration> {
300+
self.abs_conn_ttl
301+
}
302+
303+
/// Returns the absolute TTL's jitter bound, if set.
304+
pub fn abs_conn_ttl_jitter(&self) -> Option<Duration> {
305+
self.abs_conn_ttl_jitter
306+
}
307+
308+
/// Returns a new deadline that's TTL (+ random jitter) in the future.
309+
pub(crate) fn new_connection_ttl_deadline(&self) -> Option<Instant> {
310+
if let Some(ttl) = self.abs_conn_ttl {
311+
let jitter = if let Some(jitter) = self.abs_conn_ttl_jitter {
312+
Duration::from_secs(rand::thread_rng().gen_range(0..=jitter.as_secs()))
313+
} else {
314+
Duration::ZERO
315+
};
316+
Some(Instant::now() + ttl + jitter)
317+
} else {
318+
None
319+
}
320+
}
321+
276322
/// Pool will recycle inactive connection if it is outside of the lower bound of the pool
277323
/// and if it is idling longer than this value (defaults to
278324
/// [`DEFAULT_INACTIVE_CONNECTION_TTL`]).
@@ -359,6 +405,8 @@ impl Default for PoolOpts {
359405
constraints: DEFAULT_POOL_CONSTRAINTS,
360406
inactive_connection_ttl: DEFAULT_INACTIVE_CONNECTION_TTL,
361407
ttl_check_interval: DEFAULT_TTL_CHECK_INTERVAL,
408+
abs_conn_ttl: None,
409+
abs_conn_ttl_jitter: None,
362410
reset_connection: true,
363411
}
364412
}
@@ -662,6 +710,49 @@ impl Opts {
662710
self.inner.mysql_opts.conn_ttl
663711
}
664712

713+
/// The pool will close a connection when this absolute TTL has elapsed.
714+
/// Disabled by default.
715+
///
716+
/// Enables forced recycling and migration of connections in a guaranteed timeframe.
717+
/// This TTL bypasses pool constraints and an idle pool can go below the min size.
718+
///
719+
/// # Connection URL
720+
///
721+
/// You can use `abs_conn_ttl` URL parameter to set this value (in seconds). E.g.
722+
///
723+
/// ```
724+
/// # use mysql_async::*;
725+
/// # use std::time::Duration;
726+
/// # fn main() -> Result<()> {
727+
/// let opts = Opts::from_url("mysql://localhost/db?abs_conn_ttl=86400")?;
728+
/// assert_eq!(opts.abs_conn_ttl(), Some(Duration::from_secs(24 * 60 * 60)));
729+
/// # Ok(()) }
730+
/// ```
731+
pub fn abs_conn_ttl(&self) -> Option<Duration> {
732+
self.inner.mysql_opts.pool_opts.abs_conn_ttl
733+
}
734+
735+
/// Upper bound of a random value added to the absolute TTL, if enabled.
736+
/// Disabled by default.
737+
///
738+
/// Should be used to prevent connections from closing at the same time.
739+
///
740+
/// # Connection URL
741+
///
742+
/// You can use `abs_conn_ttl_jitter` URL parameter to set this value (in seconds). E.g.
743+
///
744+
/// ```
745+
/// # use mysql_async::*;
746+
/// # use std::time::Duration;
747+
/// # fn main() -> Result<()> {
748+
/// let opts = Opts::from_url("mysql://localhost/db?abs_conn_ttl=7200&abs_conn_ttl_jitter=3600")?;
749+
/// assert_eq!(opts.abs_conn_ttl_jitter(), Some(Duration::from_secs(60 * 60)));
750+
/// # Ok(()) }
751+
/// ```
752+
pub fn abs_conn_ttl_jitter(&self) -> Option<Duration> {
753+
self.inner.mysql_opts.pool_opts.abs_conn_ttl_jitter
754+
}
755+
665756
/// Number of prepared statements cached on the client side (per connection). Defaults to
666757
/// [`DEFAULT_STMT_CACHE_SIZE`].
667758
///
@@ -1444,6 +1535,34 @@ fn mysqlopts_from_url(url: &Url) -> std::result::Result<MysqlOpts, UrlError> {
14441535
});
14451536
}
14461537
}
1538+
} else if key == "abs_conn_ttl" {
1539+
match u64::from_str(&*value) {
1540+
Ok(value) => {
1541+
opts.pool_opts = opts
1542+
.pool_opts
1543+
.with_abs_conn_ttl(Some(Duration::from_secs(value)))
1544+
}
1545+
_ => {
1546+
return Err(UrlError::InvalidParamValue {
1547+
param: "abs_conn_ttl".into(),
1548+
value,
1549+
});
1550+
}
1551+
}
1552+
} else if key == "abs_conn_ttl_jitter" {
1553+
match u64::from_str(&*value) {
1554+
Ok(value) => {
1555+
opts.pool_opts = opts
1556+
.pool_opts
1557+
.with_abs_conn_ttl_jitter(Some(Duration::from_secs(value)))
1558+
}
1559+
_ => {
1560+
return Err(UrlError::InvalidParamValue {
1561+
param: "abs_conn_ttl_jitter".into(),
1562+
value,
1563+
});
1564+
}
1565+
}
14471566
} else if key == "tcp_keepalive" {
14481567
match u32::from_str(&*value) {
14491568
Ok(value) => opts.tcp_keepalive = Some(value),
@@ -1679,6 +1798,11 @@ mod test {
16791798
assert_eq!(url_opts.tcp_nodelay(), builder_opts.tcp_nodelay());
16801799
assert_eq!(url_opts.pool_opts(), builder_opts.pool_opts());
16811800
assert_eq!(url_opts.conn_ttl(), builder_opts.conn_ttl());
1801+
assert_eq!(url_opts.abs_conn_ttl(), builder_opts.abs_conn_ttl());
1802+
assert_eq!(
1803+
url_opts.abs_conn_ttl_jitter(),
1804+
builder_opts.abs_conn_ttl_jitter()
1805+
);
16821806
assert_eq!(url_opts.stmt_cache_size(), builder_opts.stmt_cache_size());
16831807
assert_eq!(url_opts.ssl_opts(), builder_opts.ssl_opts());
16841808
assert_eq!(url_opts.prefer_socket(), builder_opts.prefer_socket());

0 commit comments

Comments
 (0)