From bab67896bff53b72e7123f0b34fe6de0ae4064f7 Mon Sep 17 00:00:00 2001 From: Alexander 'z33ky' Hirsch <1zeeky@gmail.com> Date: Mon, 27 Jan 2025 22:33:17 +0100 Subject: [PATCH] Keep track of absolute IDLE time for timeout The code used to apply the set timeout value to the TcpStream before entering the IDLE loop. This effectively resets the timeout after receiving and handling incoming messages, which nullifies the purpose of the timeout when messages are received. This change remembers when the IDLE command is sent initially and uses that value to set the remaining time for the TcpStream timeout. This allows the IDLE loop to reconnect the IDLE connection at the appropriate time. Fixes #300. --- src/extensions/idle.rs | 137 ++++++++++++++++++++++------------------- 1 file changed, 74 insertions(+), 63 deletions(-) diff --git a/src/extensions/idle.rs b/src/extensions/idle.rs index 4df5689..d08fdf4 100644 --- a/src/extensions/idle.rs +++ b/src/extensions/idle.rs @@ -13,7 +13,7 @@ use rustls_connector::TlsStream as RustlsStream; use std::io::{self, Read, Write}; use std::net::TcpStream; use std::ops::DerefMut; -use std::time::Duration; +use std::time::{Duration, Instant}; /// `Handle` allows a client to block waiting for changes to the remote mailbox. /// @@ -58,7 +58,7 @@ pub struct Handle<'a, T: Read + Write> { session: &'a mut Session, timeout: Duration, keepalive: bool, - done: bool, + last_idle: Option, } /// The result of a wait on a [`Handle`] @@ -91,11 +91,13 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> { session, timeout: Duration::from_secs(29 * 60), keepalive: true, - done: false, + last_idle: None, } } fn init(&mut self) -> Result<()> { + self.last_idle = Some(Instant::now()); + // https://tools.ietf.org/html/rfc2177 // // The IDLE command takes no arguments. @@ -108,39 +110,97 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> { let mut v = Vec::new(); self.session.readline(&mut v)?; if v.starts_with(b"+") { - self.done = false; return Ok(()); } + self.last_idle = None; self.session.read_response_onto(&mut v)?; // We should *only* get a continuation on an error (i.e., it gives BAD or NO). unreachable!(); } fn terminate(&mut self) -> Result<()> { - if !self.done { - self.done = true; + if let Some(_) = self.last_idle.take() { self.session.write_line(b"DONE")?; self.session.read_response().map(|_| ()) } else { Ok(()) } } +} - /// Internal helper that doesn't consume self. +impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> { + /// Set the timeout duration on the connection. This will also set the frequency + /// at which the connection is refreshed. /// - /// This is necessary so that we can keep using the inner `Session` in `wait_while`. - fn wait_inner(&mut self, reconnect: bool, mut callback: F) -> Result + /// The interval defaults to 29 minutes as given in RFC 2177. + pub fn timeout(&mut self, interval: Duration) -> &mut Self { + self.timeout = interval; + self + } + + /// Do not continuously refresh the IDLE connection in the background. + /// + /// By default, connections will periodically be refreshed in the background using the + /// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call + /// this function and the connection will simply IDLE until `wait_while` returns or + /// the timeout expires. + pub fn keepalive(&mut self, keepalive: bool) -> &mut Self { + self.keepalive = keepalive; + self + } + + /// Block until the given callback returns `false`, or until a response + /// arrives that is not explicitly handled by [`UnsolicitedResponse`]. + pub fn wait_while(&mut self, mut callback: F) -> Result where F: FnMut(UnsolicitedResponse) -> bool, { let mut v = Vec::new(); let result = loop { - match self.session.readline(&mut v) { + match { + // The server MAY consider a client inactive if it has an IDLE command + // running, and if such a server has an inactivity timeout it MAY log + // the client off implicitly at the end of its timeout period. Because + // of that, clients using IDLE are advised to terminate the IDLE and + // re-issue it at least every 29 minutes to avoid being logged off. + // This still allows a client to receive immediate mailbox updates even + // though it need only "poll" at half hour intervals. + self.last_idle + // Check if the time since last_idle has exceeded the timeout. + .map(|last_idle| { + let time_since_idle = last_idle.elapsed(); + if time_since_idle >= self.timeout { + Err(Error::Io(io::ErrorKind::TimedOut.into())) + } else { + Ok(time_since_idle) + } + }) + // If there's no self.last_idle, initialize the connection (and return a 0 time since idle). + .unwrap_or_else(|| self.init().map(|()| Duration::ZERO)) + // Finally, if no error occurred, read from the stream. + .map(|time_since_idle| { + self.session + .stream + .get_mut() + .set_read_timeout(Some(self.timeout - time_since_idle)) + .expect("cannot be Some(0) since time is monotonically increasing"); + self.session.readline(&mut v) + }) + } { Err(Error::Io(ref e)) if e.kind() == io::ErrorKind::TimedOut || e.kind() == io::ErrorKind::WouldBlock => { + if self.keepalive { + match self.terminate() { + Ok(()) => { + // The connection gets initialized again on the next iteration. + continue; + } + Err(e) => break Err(e), + } + } break Ok(WaitOutcome::TimedOut); } Ok(_len) => { @@ -183,60 +243,11 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> { }; }; - // Reconnect on timeout if needed - match (reconnect, result) { - (true, Ok(WaitOutcome::TimedOut)) => { - self.terminate()?; - self.init()?; - self.wait_inner(reconnect, callback) - } - (_, result) => result, - } - } -} - -impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> { - /// Set the timeout duration on the connection. This will also set the frequency - /// at which the connection is refreshed. - /// - /// The interval defaults to 29 minutes as given in RFC 2177. - pub fn timeout(&mut self, interval: Duration) -> &mut Self { - self.timeout = interval; - self - } - - /// Do not continuously refresh the IDLE connection in the background. - /// - /// By default, connections will periodically be refreshed in the background using the - /// timeout duration set by [`Handle::timeout`]. If you do not want this behaviour, call - /// this function and the connection will simply IDLE until `wait_while` returns or - /// the timeout expires. - pub fn keepalive(&mut self, keepalive: bool) -> &mut Self { - self.keepalive = keepalive; - self - } + // set_read_timeout() can fail if the argument is Some(0), which can never be the + // case here. + self.session.stream.get_mut().set_read_timeout(None).unwrap(); - /// Block until the given callback returns `false`, or until a response - /// arrives that is not explicitly handled by [`UnsolicitedResponse`]. - pub fn wait_while(&mut self, callback: F) -> Result - where - F: FnMut(UnsolicitedResponse) -> bool, - { - self.init()?; - // The server MAY consider a client inactive if it has an IDLE command - // running, and if such a server has an inactivity timeout it MAY log - // the client off implicitly at the end of its timeout period. Because - // of that, clients using IDLE are advised to terminate the IDLE and - // re-issue it at least every 29 minutes to avoid being logged off. - // This still allows a client to receive immediate mailbox updates even - // though it need only "poll" at half hour intervals. - self.session - .stream - .get_mut() - .set_read_timeout(Some(self.timeout))?; - let res = self.wait_inner(self.keepalive, callback); - let _ = self.session.stream.get_mut().set_read_timeout(None).is_ok(); - res + result } }