Skip to content

Implement unsolicited message handling while IDLEing #130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ imap-proto = "0.7"
nom = "4.0"
base64 = "0.10"
chrono = "0.4"
enumset = "0.3.18"

[dev-dependencies]
lettre = "0.9"
Expand Down
40 changes: 35 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use base64;
use bufstream::BufStream;
use enumset::EnumSet;
use native_tls::{TlsConnector, TlsStream};
use nom;
use std::collections::HashSet;
Expand All @@ -13,6 +14,7 @@ use super::error::{Error, ParseError, Result, ValidateError};
use super::extensions;
use super::parse::*;
use super::types::*;
use super::unsolicited_responses::UnsolicitedResponseSender;

static TAG_PREFIX: &'static str = "a";
const INITIAL_TAG: u32 = 0;
Expand Down Expand Up @@ -48,7 +50,7 @@ fn validate_str(value: &str) -> Result<String> {
#[derive(Debug)]
pub struct Session<T: Read + Write> {
conn: Connection<T>,
unsolicited_responses_tx: mpsc::Sender<UnsolicitedResponse>,
unsolicited_responses_tx: UnsolicitedResponseSender,

/// Server responses that are not related to the current command. See also the note on
/// [unilateral server responses in RFC 3501](https://tools.ietf.org/html/rfc3501#section-7).
Expand Down Expand Up @@ -376,10 +378,37 @@ impl<T: Read + Write> Session<T> {
Session {
conn,
unsolicited_responses: rx,
unsolicited_responses_tx: tx,
unsolicited_responses_tx: UnsolicitedResponseSender::new(tx),
}
}

/// Tells which unsolicited responses are required. Defaults to none.
///
/// The server *is* allowed to unilaterally send things to the client for messages in
/// a selected mailbox whose status has changed. See the note on [unilateral server responses
/// in RFC 3501](https://tools.ietf.org/html/rfc3501#section-7). This function tells
/// which events you want to hear about.
///
/// If you request unsolicited responses, you have to regularly check the
/// `unsolicited_responses` channel of the [`Session`](struct.Session.html) for new responses.
pub fn filter_unsolicited_responses(&mut self, mask: EnumSet<UnsolicitedResponseCategory>) {
self.unsolicited_responses_tx
.request(&self.unsolicited_responses, mask);
}

/// Request all unsolicited responses from the server. Convenience method.
///
/// The server *is* allowed to unilaterally send things to the client for messages in
/// a selected mailbox whose status has changed. See the note on [unilateral server responses
/// in RFC 3501](https://tools.ietf.org/html/rfc3501#section-7). This function tells
/// which events you want to hear about.
///
/// If you request unsolicited responses, you have to regularly check the
/// `unsolicited_responses` channel of the [`Session`](struct.Session.html) for new responses.
pub fn request_all_unsolicited_responses(&mut self) {
self.filter_unsolicited_responses(EnumSet::all())
}

/// Selects a mailbox
///
/// The `SELECT` command selects a mailbox so that messages in the mailbox can be accessed.
Expand Down Expand Up @@ -995,7 +1024,8 @@ impl<T: Read + Write> Session<T> {
///
/// See [`extensions::idle::Handle`] for details.
pub fn idle(&mut self) -> Result<extensions::idle::Handle<T>> {
extensions::idle::Handle::make(self)
let sender = self.unsolicited_responses_tx.clone();
extensions::idle::Handle::make(self, sender)
}

/// The [`APPEND` command](https://tools.ietf.org/html/rfc3501#section-6.3.11) appends
Expand Down Expand Up @@ -1228,7 +1258,7 @@ impl<T: Read + Write> Connection<T> {
// Remove CRLF
let len = into.len();
let line = &into[(len - read)..(len - 2)];
eprint!("S: {}\n", String::from_utf8_lossy(line));
eprintln!("S: {}", String::from_utf8_lossy(line));
}

Ok(read)
Expand All @@ -1244,7 +1274,7 @@ impl<T: Read + Write> Connection<T> {
self.stream.write_all(&[CR, LF])?;
self.stream.flush()?;
if self.debug {
eprint!("C: {}\n", String::from_utf8(buf.to_vec()).unwrap());
eprintln!("C: {}", String::from_utf8(buf.to_vec()).unwrap());
}
Ok(())
}
Expand Down
83 changes: 58 additions & 25 deletions src/extensions/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
use client::Session;
use error::{Error, Result};
use native_tls::TlsStream;
use parse;
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::time::Duration;
use unsolicited_responses::UnsolicitedResponseSender;

/// `Handle` allows a client to block waiting for changes to the remote mailbox.
///
Expand All @@ -26,7 +28,8 @@ use std::time::Duration;
#[derive(Debug)]
pub struct Handle<'a, T: Read + Write + 'a> {
session: &'a mut Session<T>,
keepalive: Duration,
unsolicited_responses_tx: UnsolicitedResponseSender,
keepalive: Option<Duration>,
done: bool,
}

Expand All @@ -45,10 +48,14 @@ pub trait SetReadTimeout {
}

impl<'a, T: Read + Write + 'a> Handle<'a, T> {
pub(crate) fn make(session: &'a mut Session<T>) -> Result<Self> {
pub(crate) fn make(
session: &'a mut Session<T>,
unsolicited_responses_tx: UnsolicitedResponseSender,
) -> Result<Self> {
let mut h = Handle {
session,
keepalive: Duration::from_secs(29 * 60),
unsolicited_responses_tx,
keepalive: None,
done: false,
};
h.init()?;
Expand Down Expand Up @@ -77,31 +84,53 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
unreachable!();
}

fn terminate(&mut self) -> Result<()> {
fn terminate(&mut self, consume_response: bool) -> Result<()> {
if !self.done {
self.done = true;
self.session.write_line(b"DONE")?;
self.session.read_response().map(|_| ())
} else {
Ok(())
if consume_response {
return self.session.read_response().map(|_| ());
}
}
Ok(())
}

/// Internal helper that doesn't consume self.
///
/// This is necessary so that we can keep using the inner `Session` in `wait_keepalive`.
fn wait_inner(&mut self) -> Result<()> {
let mut v = Vec::new();
match self.session.readline(&mut v).map(|_| ()) {
Err(Error::Io(ref e))
if e.kind() == io::ErrorKind::TimedOut || e.kind() == io::ErrorKind::WouldBlock =>
{
// we need to refresh the IDLE connection
self.terminate()?;
self.init()?;
self.wait_inner()
let mut buffer = Vec::new();
loop {
match self.session.readline(&mut buffer).map(|_| ()) {
Err(Error::Io(ref e))
if e.kind() == io::ErrorKind::TimedOut
|| e.kind() == io::ErrorKind::WouldBlock =>
{
if self.keepalive.is_some() {
// we need to refresh the IDLE connection
self.terminate(true)?;
self.init()?;
}
}
Err(err) => return Err(err),
Ok(_) => {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this clause at all. Why do we need this nested loop construct? And why do we need to terminate the IDLE for each iteration?

let _ = parse::parse_idle(&buffer, &mut self.unsolicited_responses_tx)?;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, shouldn't this use the return value from parse_idle to know whether or not DONE was sent?

self.terminate(false)?;
buffer.truncate(0);

// Unsolicited responses coming in from the server are not multi-line,
// therefore, we don't need to worry about the remaining bytes since
// they should always be terminated at a LF.
loop {
let _ = self.session.readline(&mut buffer)?;
let found_ok =
parse::parse_idle(&buffer, &mut self.unsolicited_responses_tx)?;
if found_ok {
return Ok(());
}
}
}
}
r => r,
}
}

Expand All @@ -114,28 +143,31 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
/// Set the keep-alive interval to use when `wait_keepalive` is called.
///
/// The interval defaults to 29 minutes as dictated by RFC 2177.
/// The interval defaults to 29 minutes as advised by RFC 2177.
pub fn set_keepalive(&mut self, interval: Duration) {
self.keepalive = interval;
self.keepalive = Some(interval);
}

/// Block until the selected mailbox changes.
///
/// This method differs from [`Handle::wait`] in that it will periodically refresh the IDLE
/// connection, to prevent the server from timing out our connection. The keepalive interval is
/// set to 29 minutes by default, as dictated by RFC 2177, but can be changed using
/// set to 29 minutes by default, as advised by RFC 2177, but can be changed using
/// [`Handle::set_keepalive`].
///
/// This is the recommended method to use for waiting.
pub fn wait_keepalive(self) -> Result<()> {
pub fn wait_keepalive(mut self) -> Result<()> {
// The server MAY consider a client inactive if it has an IDLE command
// running, and if such a server has an inactivity timeout it MAY log
// the client off implicitly at the end of its timeout period. Because
// of that, clients using IDLE are advised to terminate the IDLE and
// re-issue it at least every 29 minutes to avoid being logged off.
// This still allows a client to receive immediate mailbox updates even
// though it need only "poll" at half hour intervals.
let keepalive = self.keepalive;
let keepalive = self
.keepalive
.get_or_insert_with(|| Duration::from_secs(29 * 60))
.clone();
self.wait_timeout(keepalive)
}

Expand All @@ -154,7 +186,8 @@ impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
impl<'a, T: Read + Write + 'a> Drop for Handle<'a, T> {
fn drop(&mut self) {
// we don't want to panic here if we can't terminate the Idle
let _ = self.terminate().is_ok();
// If we sent done, then we should suck up the OK.
let _ = self.terminate(true).is_ok();
}
}

Expand All @@ -164,8 +197,8 @@ impl<'a> SetReadTimeout for TcpStream {
}
}

impl<'a> SetReadTimeout for TlsStream<TcpStream> {
impl<'a, T: SetReadTimeout + Read + Write + 'a> SetReadTimeout for TlsStream<T> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) -> Result<()> {
self.get_ref().set_read_timeout(timeout).map_err(Error::Io)
self.get_mut().set_read_timeout(timeout)
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ extern crate imap_proto;
extern crate native_tls;
extern crate nom;
extern crate regex;
#[macro_use]
extern crate enumset;

mod parse;
mod unsolicited_responses;

pub mod types;

Expand Down
Loading