From a226f566002b58d618787f1121388b3df65be9c2 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Thu, 16 Jan 2014 16:56:21 +0100 Subject: [PATCH 1/3] Implement Unix domain sockets in libnative --- src/libnative/io/mod.rs | 4 +- src/libnative/io/net.rs | 356 ++++++++++++++++++++++++++++++++++++++++ src/libstd/libc.rs | 24 +++ src/libstd/rt/rtio.rs | 5 + 4 files changed, 387 insertions(+), 2 deletions(-) diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index 69ef10ac11bea..dc134aa95feed 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -197,10 +197,10 @@ impl rtio::IoFactory for IoFactory { net::UdpSocket::bind(addr).map(|u| ~u as ~RtioUdpSocket) } fn unix_bind(&mut self, _path: &CString) -> IoResult<~RtioUnixListener> { - Err(unimpl()) + net::UnixListener::bind(_path).map(|s| ~s as ~RtioUnixListener) } fn unix_connect(&mut self, _path: &CString) -> IoResult<~RtioPipe> { - Err(unimpl()) + net::UnixStream::connect(_path, libc::SOCK_STREAM).map(|s| ~s as ~RtioPipe) } fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, hint: Option) -> IoResult<~[ai::Info]> { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index b33b54862dc2b..62753b66af05c 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -8,6 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use std::c_str::CString; use std::cast; use std::io::net::ip; use std::io; @@ -15,6 +16,7 @@ use std::libc; use std::mem; use std::rt::rtio; use std::sync::arc::UnsafeArc; +use std::unstable::intrinsics; use super::{IoResult, retry}; use super::file::keep_going; @@ -88,6 +90,30 @@ fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) { } } +fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> { + // the sun_path length is limited to SUN_LEN (with null) + if addr.len() > libc::sun_len -1 { + return Err(io::IoError { + kind: io::OtherIoError, + desc: "path must be smaller than SUN_LEN", + detail: None, + }) + } + unsafe { + let storage: libc::sockaddr_storage = intrinsics::init(); + let s: *mut libc::sockaddr_un = cast::transmute(&storage); + (*s).sun_family = libc::AF_UNIX as libc::sa_family_t; + let mut i = 0; + for c in addr.iter() { + (*s).sun_path[i] = c; + i += 1; + } + + let len = mem::size_of::() + i + 1; //count the null terminator + return Ok((storage, len)); + } +} + fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult { unsafe { let fam = match addr.ip { @@ -101,6 +127,15 @@ fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult { } } +fn unix_socket(ty: libc::c_int) -> IoResult { + unsafe { + match libc::socket(libc::AF_UNIX, ty, 0) { + -1 => Err(super::last_error()), + fd => Ok(fd) + } + } +} + fn setsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int, payload: T) -> IoResult<()> { unsafe { @@ -193,6 +228,24 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, } } +fn sockaddr_to_unix(storage: &libc::sockaddr_storage, + len: uint) -> IoResult { + match storage.ss_family as libc::c_int { + libc::AF_UNIX => { + assert!(len as uint <= mem::size_of::()); + let storage: &libc::sockaddr_un = unsafe { + cast::transmute(storage) + }; + unsafe { + Ok(CString::new(storage.sun_path.to_owned().as_ptr(), false)) + } + } + _ => { + Err(io::standard_error(io::OtherIoError)) + } + } +} + #[cfg(unix)] pub fn init() {} @@ -632,3 +685,306 @@ impl rtio::RtioUdpSocket for UdpSocket { ~UdpSocket { inner: self.inner.clone() } as ~rtio::RtioUdpSocket } } + + +#[cfg(not(windows))] +//////////////////////////////////////////////////////////////////////////////// +// Unix +//////////////////////////////////////////////////////////////////////////////// + + +//////////////////////////////////////////////////////////////////////////////// +// Unix streams +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixStream { + priv fd: sock_t, +} + +impl UnixStream { + pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult { + unsafe { + unix_socket(ty).and_then(|fd| { + match addr_to_sockaddr_un(addr) { + Err(e) => return Err(e), + Ok((addr, len)) => { + let ret = UnixStream{ fd: fd }; + let addrp = &addr as *libc::sockaddr_storage; + match retry(|| { + libc::connect(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) + }) { + -1 => return Err(super::last_error()), + _ => return Ok(ret) + } + } + } + }) + } + } + + pub fn fd(&self) -> sock_t { self.fd } +} + +impl rtio::RtioPipe for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + let ret = retry(|| { + unsafe { + libc::recv(self.fd, + buf.as_ptr() as *mut libc::c_void, + buf.len() as wrlen, + 0) as libc::c_int + } + }); + if ret == 0 { + Err(io::standard_error(io::EndOfFile)) + } else if ret < 0 { + Err(super::last_error()) + } else { + Ok(ret as uint) + } + } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + let ret = keep_going(buf, |buf, len| { + unsafe { + libc::send(self.fd, + buf as *mut libc::c_void, + len as wrlen, + 0) as i64 + } + }); + if ret < 0 { + Err(super::last_error()) + } else { + Ok(()) + } + } +} + +impl Drop for UnixStream { + fn drop(&mut self) { unsafe { close(self.fd); } } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Datagram +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixDatagram { + priv fd: sock_t, +} + +impl UnixDatagram { + pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult { + unsafe { + unix_socket(ty).and_then(|fd| { + match addr_to_sockaddr_un(addr) { + Err(e) => return Err(e), + Ok((addr, len)) => { + let ret = UnixDatagram{ fd: fd }; + let addrp = &addr as *libc::sockaddr_storage; + match retry(|| { + libc::connect(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) + }) { + -1 => return Err(super::last_error()), + _ => return Ok(ret) + } + } + } + }) + } + } + + pub fn bind(addr: &CString) -> IoResult { + unsafe { + unix_socket(libc::SOCK_DGRAM).and_then(|fd| { + match addr_to_sockaddr_un(addr) { + Err(e) => return Err(e), + Ok((addr, len)) => { + let ret = UnixDatagram{ fd: fd }; + let addrp = &addr as *libc::sockaddr_storage; + match libc::bind(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) { + -1 => return Err(super::last_error()), + _ => return Ok(ret) + } + } + } + }) + } + } + + pub fn fd(&self) -> sock_t { self.fd } +} + +impl rtio::RtioPipe for UnixDatagram { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + let ret = retry(|| { + unsafe { + libc::recv(self.fd, + buf.as_ptr() as *mut libc::c_void, + buf.len() as wrlen, + 0) as libc::c_int + } + }); + if ret == 0 { + Err(io::standard_error(io::EndOfFile)) + } else if ret < 0 { + Err(super::last_error()) + } else { + Ok(ret as uint) + } + } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + let ret = keep_going(buf, |buf, len| { + unsafe { + libc::send(self.fd, + buf as *mut libc::c_void, + len as wrlen, + 0) as i64 + } + }); + if ret < 0 { + Err(super::last_error()) + } else { + Ok(()) + } + } +} + +impl rtio::RtioDatagramPipe for UnixDatagram { + fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> { + unsafe { + let mut storage: libc::sockaddr_storage = intrinsics::init(); + let storagep = &mut storage as *mut libc::sockaddr_storage; + let mut addrlen: libc::socklen_t = + mem::size_of::() as libc::socklen_t; + let ret = retry(|| { + libc::recvfrom(self.fd, + buf.as_ptr() as *mut libc::c_void, + buf.len() as msglen_t, + 0, + storagep as *mut libc::sockaddr, + &mut addrlen) as libc::c_int + }); + if ret < 0 { return Err(super::last_error()) } + sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| { + Ok((ret as uint, addr)) + }) + } + } + + fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> { + match addr_to_sockaddr_un(dst) { + Err(e) => Err(e), + Ok((dst, len)) => { + let dstp = &dst as *libc::sockaddr_storage; + unsafe { + let ret = retry(|| { + libc::sendto(self.fd, + buf.as_ptr() as *libc::c_void, + buf.len() as msglen_t, + 0, + dstp as *libc::sockaddr, + len as libc::socklen_t) as libc::c_int + }); + match ret { + -1 => Err(super::last_error()), + n if n as uint != buf.len() => { + Err(io::IoError { + kind: io::OtherIoError, + desc: "couldn't send entire packet at once", + detail: None, + }) + } + _ => Ok(()) + } + } + } + } + } +} + +impl Drop for UnixDatagram { + fn drop(&mut self) { unsafe { close(self.fd); } } +} +//////////////////////////////////////////////////////////////////////////////// +// Unix Listener +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixListener { + priv fd: sock_t, +} + +impl UnixListener { + pub fn bind(addr: &CString) -> IoResult { + unsafe { + unix_socket(libc::SOCK_STREAM).and_then(|fd| { + match addr_to_sockaddr_un(addr) { + Err(e) => return Err(e), + Ok((addr, len)) => { + let ret = UnixListener{ fd: fd }; + let addrp = &addr as *libc::sockaddr_storage; + match libc::bind(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) { + -1 => return Err(super::last_error()), + _ => return Ok(ret) + } + } + } + }) + } + } + + pub fn fd(&self) -> sock_t { self.fd } + + pub fn native_listen(self, backlog: int) -> IoResult { + match unsafe { libc::listen(self.fd, backlog as libc::c_int) } { + -1 => Err(super::last_error()), + _ => Ok(UnixAcceptor { listener: self }) + } + } +} + +impl rtio::RtioUnixListener for UnixListener { + fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> { + self.native_listen(128).map(|a| ~a as ~rtio::RtioUnixAcceptor) + } +} + +impl Drop for UnixListener { + fn drop(&mut self) { unsafe { close(self.fd); } } +} + +pub struct UnixAcceptor { + priv listener: UnixListener, +} + +impl UnixAcceptor { + pub fn fd(&self) -> sock_t { self.listener.fd } + + pub fn native_accept(&mut self) -> IoResult { + unsafe { + let mut storage: libc::sockaddr_storage = intrinsics::init(); + let storagep = &mut storage as *mut libc::sockaddr_storage; + let size = mem::size_of::(); + let mut size = size as libc::socklen_t; + match retry(|| { + libc::accept(self.fd(), + storagep as *mut libc::sockaddr, + &mut size as *mut libc::socklen_t) as libc::c_int + }) as sock_t { + -1 => Err(super::last_error()), + fd => Ok(UnixStream { fd: fd }) + } + } + } +} + +impl rtio::RtioUnixAcceptor for UnixAcceptor { + fn accept(&mut self) -> IoResult<~rtio::RtioPipe> { + self.native_accept().map(|s| ~s as ~rtio::RtioPipe) + } +} + + diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs index 7dc4c692f6319..c42a889605312 100644 --- a/src/libstd/libc.rs +++ b/src/libstd/libc.rs @@ -269,6 +269,7 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint}; + pub static sun_len:uint = 108; pub type socklen_t = u32; pub type sa_family_t = u16; pub type in_port_t = u16; @@ -319,6 +320,10 @@ pub mod types { ai_canonname: *c_char, ai_next: *addrinfo } + pub struct sockaddr_un { + sun_family: sa_family_t, + sun_path: [c_char, ..108] + } } } @@ -636,6 +641,7 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint}; + pub static sun_len:uint = 104; pub type socklen_t = u32; pub type sa_family_t = u8; pub type in_port_t = u16; @@ -691,6 +697,11 @@ pub mod types { ai_addr: *sockaddr, ai_next: *addrinfo } + pub struct sockaddr_un { + sun_len: u8, + sun_family: sa_family_t, + sun_path: [c_char, ..104] + } } } @@ -833,6 +844,7 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint, size_t}; + pub static sun_len:uint = 108; pub type SOCKET = c_uint; pub type socklen_t = c_int; pub type sa_family_t = u16; @@ -884,6 +896,10 @@ pub mod types { ai_addr: *sockaddr, ai_next: *addrinfo } + pub struct sockaddr_un { + sun_family: sa_family_t, + sun_path: [c_char, ..108] + } } } @@ -1197,6 +1213,7 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint}; + pub static sun_len:uint = 104; pub type socklen_t = c_int; pub type sa_family_t = u8; pub type in_port_t = u16; @@ -1252,6 +1269,11 @@ pub mod types { ai_addr: *sockaddr, ai_next: *addrinfo } + pub struct sockaddr_un { + sun_len: u8, + sun_family: sa_family_t, + sun_path: [c_char, ..104] + } } } @@ -2310,6 +2332,7 @@ pub mod consts { pub static MADV_UNMERGEABLE : c_int = 13; pub static MADV_HWPOISON : c_int = 100; + pub static AF_UNIX: c_int = 1; pub static AF_INET: c_int = 2; pub static AF_INET6: c_int = 10; pub static SOCK_STREAM: c_int = 1; @@ -3137,6 +3160,7 @@ pub mod consts { pub static MINCORE_REFERENCED_OTHER : c_int = 0x8; pub static MINCORE_MODIFIED_OTHER : c_int = 0x10; + pub static AF_UNIX: c_int = 1; pub static AF_INET: c_int = 2; pub static AF_INET6: c_int = 30; pub static SOCK_STREAM: c_int = 1; diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 5573f8ec02eb3..578ace2ba86a0 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -260,6 +260,11 @@ pub trait RtioPipe { fn clone(&self) -> ~RtioPipe; } +pub trait RtioDatagramPipe : RtioPipe { + fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, CString), IoError>; + fn sendto(&mut self, buf: &[u8], dst: &CString) -> Result<(), IoError>; +} + pub trait RtioUnixListener { fn listen(~self) -> Result<~RtioUnixAcceptor, IoError>; } From 94b2d9dc4dd864b481bcf279921bc7ea796355e5 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 7 Feb 2014 10:10:48 -0800 Subject: [PATCH 2/3] Move unix pipes implementation to pipe_unix.rs The windows named pipes implementation will have almost nothing to do with unix pipes, so I think it's best if they live in separate files. --- src/libnative/io/mod.rs | 16 +- src/libnative/io/net.rs | 303 ---------------------------------- src/libnative/io/pipe_unix.rs | 300 +++++++++++++++++++++++++++++++++ 3 files changed, 312 insertions(+), 307 deletions(-) create mode 100644 src/libnative/io/pipe_unix.rs diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index dc134aa95feed..ad0d7270c1a23 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -60,6 +60,14 @@ pub mod timer; #[path = "timer_win32.rs"] pub mod timer; +#[cfg(unix)] +#[path = "path_unix.rs"] +pub mod pipe; + +#[cfg(windows)] +#[path = "path_win32.rs"] +pub mod pipe; + mod timer_helper; pub type IoResult = Result; @@ -196,11 +204,11 @@ impl rtio::IoFactory for IoFactory { fn udp_bind(&mut self, addr: SocketAddr) -> IoResult<~RtioUdpSocket> { net::UdpSocket::bind(addr).map(|u| ~u as ~RtioUdpSocket) } - fn unix_bind(&mut self, _path: &CString) -> IoResult<~RtioUnixListener> { - net::UnixListener::bind(_path).map(|s| ~s as ~RtioUnixListener) + fn unix_bind(&mut self, path: &CString) -> IoResult<~RtioUnixListener> { + pipe::UnixListener::bind(path).map(|s| ~s as ~RtioUnixListener) } - fn unix_connect(&mut self, _path: &CString) -> IoResult<~RtioPipe> { - net::UnixStream::connect(_path, libc::SOCK_STREAM).map(|s| ~s as ~RtioPipe) + fn unix_connect(&mut self, path: &CString) -> IoResult<~RtioPipe> { + pipe::UnixStream::connect(path).map(|s| ~s as ~RtioPipe) } fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>, hint: Option) -> IoResult<~[ai::Info]> { diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index 62753b66af05c..dce890dc12961 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -685,306 +685,3 @@ impl rtio::RtioUdpSocket for UdpSocket { ~UdpSocket { inner: self.inner.clone() } as ~rtio::RtioUdpSocket } } - - -#[cfg(not(windows))] -//////////////////////////////////////////////////////////////////////////////// -// Unix -//////////////////////////////////////////////////////////////////////////////// - - -//////////////////////////////////////////////////////////////////////////////// -// Unix streams -//////////////////////////////////////////////////////////////////////////////// - -pub struct UnixStream { - priv fd: sock_t, -} - -impl UnixStream { - pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult { - unsafe { - unix_socket(ty).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixStream{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match retry(|| { - libc::connect(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) - }) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } - }) - } - } - - pub fn fd(&self) -> sock_t { self.fd } -} - -impl rtio::RtioPipe for UnixStream { - fn read(&mut self, buf: &mut [u8]) -> IoResult { - let ret = retry(|| { - unsafe { - libc::recv(self.fd, - buf.as_ptr() as *mut libc::c_void, - buf.len() as wrlen, - 0) as libc::c_int - } - }); - if ret == 0 { - Err(io::standard_error(io::EndOfFile)) - } else if ret < 0 { - Err(super::last_error()) - } else { - Ok(ret as uint) - } - } - fn write(&mut self, buf: &[u8]) -> IoResult<()> { - let ret = keep_going(buf, |buf, len| { - unsafe { - libc::send(self.fd, - buf as *mut libc::c_void, - len as wrlen, - 0) as i64 - } - }); - if ret < 0 { - Err(super::last_error()) - } else { - Ok(()) - } - } -} - -impl Drop for UnixStream { - fn drop(&mut self) { unsafe { close(self.fd); } } -} - -//////////////////////////////////////////////////////////////////////////////// -// Unix Datagram -//////////////////////////////////////////////////////////////////////////////// - -pub struct UnixDatagram { - priv fd: sock_t, -} - -impl UnixDatagram { - pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult { - unsafe { - unix_socket(ty).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixDatagram{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match retry(|| { - libc::connect(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) - }) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } - }) - } - } - - pub fn bind(addr: &CString) -> IoResult { - unsafe { - unix_socket(libc::SOCK_DGRAM).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixDatagram{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match libc::bind(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } - }) - } - } - - pub fn fd(&self) -> sock_t { self.fd } -} - -impl rtio::RtioPipe for UnixDatagram { - fn read(&mut self, buf: &mut [u8]) -> IoResult { - let ret = retry(|| { - unsafe { - libc::recv(self.fd, - buf.as_ptr() as *mut libc::c_void, - buf.len() as wrlen, - 0) as libc::c_int - } - }); - if ret == 0 { - Err(io::standard_error(io::EndOfFile)) - } else if ret < 0 { - Err(super::last_error()) - } else { - Ok(ret as uint) - } - } - fn write(&mut self, buf: &[u8]) -> IoResult<()> { - let ret = keep_going(buf, |buf, len| { - unsafe { - libc::send(self.fd, - buf as *mut libc::c_void, - len as wrlen, - 0) as i64 - } - }); - if ret < 0 { - Err(super::last_error()) - } else { - Ok(()) - } - } -} - -impl rtio::RtioDatagramPipe for UnixDatagram { - fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> { - unsafe { - let mut storage: libc::sockaddr_storage = intrinsics::init(); - let storagep = &mut storage as *mut libc::sockaddr_storage; - let mut addrlen: libc::socklen_t = - mem::size_of::() as libc::socklen_t; - let ret = retry(|| { - libc::recvfrom(self.fd, - buf.as_ptr() as *mut libc::c_void, - buf.len() as msglen_t, - 0, - storagep as *mut libc::sockaddr, - &mut addrlen) as libc::c_int - }); - if ret < 0 { return Err(super::last_error()) } - sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| { - Ok((ret as uint, addr)) - }) - } - } - - fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> { - match addr_to_sockaddr_un(dst) { - Err(e) => Err(e), - Ok((dst, len)) => { - let dstp = &dst as *libc::sockaddr_storage; - unsafe { - let ret = retry(|| { - libc::sendto(self.fd, - buf.as_ptr() as *libc::c_void, - buf.len() as msglen_t, - 0, - dstp as *libc::sockaddr, - len as libc::socklen_t) as libc::c_int - }); - match ret { - -1 => Err(super::last_error()), - n if n as uint != buf.len() => { - Err(io::IoError { - kind: io::OtherIoError, - desc: "couldn't send entire packet at once", - detail: None, - }) - } - _ => Ok(()) - } - } - } - } - } -} - -impl Drop for UnixDatagram { - fn drop(&mut self) { unsafe { close(self.fd); } } -} -//////////////////////////////////////////////////////////////////////////////// -// Unix Listener -//////////////////////////////////////////////////////////////////////////////// - -pub struct UnixListener { - priv fd: sock_t, -} - -impl UnixListener { - pub fn bind(addr: &CString) -> IoResult { - unsafe { - unix_socket(libc::SOCK_STREAM).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixListener{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match libc::bind(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } - }) - } - } - - pub fn fd(&self) -> sock_t { self.fd } - - pub fn native_listen(self, backlog: int) -> IoResult { - match unsafe { libc::listen(self.fd, backlog as libc::c_int) } { - -1 => Err(super::last_error()), - _ => Ok(UnixAcceptor { listener: self }) - } - } -} - -impl rtio::RtioUnixListener for UnixListener { - fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> { - self.native_listen(128).map(|a| ~a as ~rtio::RtioUnixAcceptor) - } -} - -impl Drop for UnixListener { - fn drop(&mut self) { unsafe { close(self.fd); } } -} - -pub struct UnixAcceptor { - priv listener: UnixListener, -} - -impl UnixAcceptor { - pub fn fd(&self) -> sock_t { self.listener.fd } - - pub fn native_accept(&mut self) -> IoResult { - unsafe { - let mut storage: libc::sockaddr_storage = intrinsics::init(); - let storagep = &mut storage as *mut libc::sockaddr_storage; - let size = mem::size_of::(); - let mut size = size as libc::socklen_t; - match retry(|| { - libc::accept(self.fd(), - storagep as *mut libc::sockaddr, - &mut size as *mut libc::socklen_t) as libc::c_int - }) as sock_t { - -1 => Err(super::last_error()), - fd => Ok(UnixStream { fd: fd }) - } - } - } -} - -impl rtio::RtioUnixAcceptor for UnixAcceptor { - fn accept(&mut self) -> IoResult<~rtio::RtioPipe> { - self.native_accept().map(|s| ~s as ~rtio::RtioPipe) - } -} - - diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs new file mode 100644 index 0000000000000..1160bc196d8f0 --- /dev/null +++ b/src/libnative/io/pipe_unix.rs @@ -0,0 +1,300 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//////////////////////////////////////////////////////////////////////////////// +// Unix +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixStream { + priv fd: sock_t, +} + +impl UnixStream { + pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult { + unix_socket(ty).and_then(|fd| { + match addr_to_sockaddr_un(addr) { + Err(e) => return Err(e), + Ok((addr, len)) => { + let ret = UnixStream{ fd: fd }; + let addrp = &addr as *libc::sockaddr_storage; + match retry(|| { + libc::connect(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) + }) { + -1 => return Err(super::last_error()), + _ => return Ok(ret) + } + } + } + }) + } + + pub fn fd(&self) -> sock_t { self.fd } +} + +impl rtio::RtioPipe for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + let ret = retry(|| { + unsafe { + libc::recv(self.fd, + buf.as_ptr() as *mut libc::c_void, + buf.len() as wrlen, + 0) as libc::c_int + } + }); + if ret == 0 { + Err(io::standard_error(io::EndOfFile)) + } else if ret < 0 { + Err(super::last_error()) + } else { + Ok(ret as uint) + } + } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + let ret = keep_going(buf, |buf, len| { + unsafe { + libc::send(self.fd, + buf as *mut libc::c_void, + len as wrlen, + 0) as i64 + } + }); + if ret < 0 { + Err(super::last_error()) + } else { + Ok(()) + } + } +} + +impl Drop for UnixStream { + fn drop(&mut self) { unsafe { close(self.fd); } } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Datagram +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixDatagram { + priv fd: sock_t, +} + +impl UnixDatagram { + pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult { + unsafe { + unix_socket(ty).and_then(|fd| { + match addr_to_sockaddr_un(addr) { + Err(e) => return Err(e), + Ok((addr, len)) => { + let ret = UnixDatagram{ fd: fd }; + let addrp = &addr as *libc::sockaddr_storage; + match retry(|| { + libc::connect(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) + }) { + -1 => return Err(super::last_error()), + _ => return Ok(ret) + } + } + } + }) + } + } + + pub fn bind(addr: &CString) -> IoResult { + unsafe { + unix_socket(libc::SOCK_DGRAM).and_then(|fd| { + match addr_to_sockaddr_un(addr) { + Err(e) => return Err(e), + Ok((addr, len)) => { + let ret = UnixDatagram{ fd: fd }; + let addrp = &addr as *libc::sockaddr_storage; + match libc::bind(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) { + -1 => return Err(super::last_error()), + _ => return Ok(ret) + } + } + } + }) + } + } + + pub fn fd(&self) -> sock_t { self.fd } +} + +impl rtio::RtioPipe for UnixDatagram { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + let ret = retry(|| { + unsafe { + libc::recv(self.fd, + buf.as_ptr() as *mut libc::c_void, + buf.len() as wrlen, + 0) as libc::c_int + } + }); + if ret == 0 { + Err(io::standard_error(io::EndOfFile)) + } else if ret < 0 { + Err(super::last_error()) + } else { + Ok(ret as uint) + } + } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + let ret = keep_going(buf, |buf, len| { + unsafe { + libc::send(self.fd, + buf as *mut libc::c_void, + len as wrlen, + 0) as i64 + } + }); + if ret < 0 { + Err(super::last_error()) + } else { + Ok(()) + } + } +} + +impl rtio::RtioDatagramPipe for UnixDatagram { + fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> { + unsafe { + let mut storage: libc::sockaddr_storage = intrinsics::init(); + let storagep = &mut storage as *mut libc::sockaddr_storage; + let mut addrlen: libc::socklen_t = + mem::size_of::() as libc::socklen_t; + let ret = retry(|| { + libc::recvfrom(self.fd, + buf.as_ptr() as *mut libc::c_void, + buf.len() as msglen_t, + 0, + storagep as *mut libc::sockaddr, + &mut addrlen) as libc::c_int + }); + if ret < 0 { return Err(super::last_error()) } + sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| { + Ok((ret as uint, addr)) + }) + } + } + + fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> { + match addr_to_sockaddr_un(dst) { + Err(e) => Err(e), + Ok((dst, len)) => { + let dstp = &dst as *libc::sockaddr_storage; + unsafe { + let ret = retry(|| { + libc::sendto(self.fd, + buf.as_ptr() as *libc::c_void, + buf.len() as msglen_t, + 0, + dstp as *libc::sockaddr, + len as libc::socklen_t) as libc::c_int + }); + match ret { + -1 => Err(super::last_error()), + n if n as uint != buf.len() => { + Err(io::IoError { + kind: io::OtherIoError, + desc: "couldn't send entire packet at once", + detail: None, + }) + } + _ => Ok(()) + } + } + } + } + } +} + +impl Drop for UnixDatagram { + fn drop(&mut self) { unsafe { close(self.fd); } } +} +//////////////////////////////////////////////////////////////////////////////// +// Unix Listener +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixListener { + priv fd: sock_t, +} + +impl UnixListener { + pub fn bind(addr: &CString) -> IoResult { + unsafe { + unix_socket(libc::SOCK_STREAM).and_then(|fd| { + match addr_to_sockaddr_un(addr) { + Err(e) => return Err(e), + Ok((addr, len)) => { + let ret = UnixListener{ fd: fd }; + let addrp = &addr as *libc::sockaddr_storage; + match libc::bind(fd, addrp as *libc::sockaddr, + len as libc::socklen_t) { + -1 => return Err(super::last_error()), + _ => return Ok(ret) + } + } + } + }) + } + } + + pub fn fd(&self) -> sock_t { self.fd } + + pub fn native_listen(self, backlog: int) -> IoResult { + match unsafe { libc::listen(self.fd, backlog as libc::c_int) } { + -1 => Err(super::last_error()), + _ => Ok(UnixAcceptor { listener: self }) + } + } +} + +impl rtio::RtioUnixListener for UnixListener { + fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> { + self.native_listen(128).map(|a| ~a as ~rtio::RtioUnixAcceptor) + } +} + +impl Drop for UnixListener { + fn drop(&mut self) { unsafe { close(self.fd); } } +} + +pub struct UnixAcceptor { + priv listener: UnixListener, +} + +impl UnixAcceptor { + pub fn fd(&self) -> sock_t { self.listener.fd } + + pub fn native_accept(&mut self) -> IoResult { + let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; + let storagep = &mut storage as *mut libc::sockaddr_storage; + let size = mem::size_of::(); + let mut size = size as libc::socklen_t; + match retry(|| unsafe { + libc::accept(self.fd(), + storagep as *mut libc::sockaddr, + &mut size as *mut libc::socklen_t) as libc::c_int + }) as sock_t { + -1 => Err(super::last_error()), + fd => Ok(UnixStream { fd: fd }) + } + } +} + +impl rtio::RtioUnixAcceptor for UnixAcceptor { + fn accept(&mut self) -> IoResult<~rtio::RtioPipe> { + self.native_accept().map(|s| ~s as ~rtio::RtioPipe) + } +} + From a526aa139ec1c95cbad4c1d3187c437eb45d4bae Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 7 Feb 2014 10:37:58 -0800 Subject: [PATCH 3/3] Implement named pipes for windows, touch up unix * Implementation of pipe_win32 filled out for libnative * Reorganize pipes to be clone-able * Fix a few file descriptor leaks on error * Factor out some common code into shared functions * Make use of the if_ok!() macro for less indentation Closes #11201 --- src/libnative/io/mod.rs | 9 +- src/libnative/io/net.rs | 53 ---- src/libnative/io/pipe_unix.rs | 349 +++++++++++------------ src/libnative/io/pipe_win32.rs | 492 +++++++++++++++++++++++++++++++++ src/libstd/io/net/mod.rs | 2 +- src/libstd/io/net/unix.rs | 101 ++++--- src/libstd/libc.rs | 57 +++- src/libstd/rt/rtio.rs | 5 - 8 files changed, 780 insertions(+), 288 deletions(-) create mode 100644 src/libnative/io/pipe_win32.rs diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index ad0d7270c1a23..0f9439b3eb5b8 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -61,11 +61,11 @@ pub mod timer; pub mod timer; #[cfg(unix)] -#[path = "path_unix.rs"] +#[path = "pipe_unix.rs"] pub mod pipe; #[cfg(windows)] -#[path = "path_win32.rs"] +#[path = "pipe_win32.rs"] pub mod pipe; mod timer_helper; @@ -85,6 +85,9 @@ fn translate_error(errno: i32, detail: bool) -> IoError { fn get_err(errno: i32) -> (io::IoErrorKind, &'static str) { match errno { libc::EOF => (io::EndOfFile, "end of file"), + libc::ERROR_NO_DATA => (io::BrokenPipe, "the pipe is being closed"), + libc::ERROR_FILE_NOT_FOUND => (io::FileNotFound, "file not found"), + libc::ERROR_INVALID_NAME => (io::InvalidInput, "invalid file name"), libc::WSAECONNREFUSED => (io::ConnectionRefused, "connection refused"), libc::WSAECONNRESET => (io::ConnectionReset, "connection reset"), libc::WSAEACCES => (io::PermissionDenied, "permission denied"), @@ -94,6 +97,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError { libc::WSAECONNABORTED => (io::ConnectionAborted, "connection aborted"), libc::WSAEADDRNOTAVAIL => (io::ConnectionRefused, "address not available"), libc::WSAEADDRINUSE => (io::ConnectionRefused, "address in use"), + libc::ERROR_BROKEN_PIPE => (io::BrokenPipe, "the pipe has ended"), x => { debug!("ignoring {}: {}", x, os::last_os_error()); @@ -116,6 +120,7 @@ fn translate_error(errno: i32, detail: bool) -> IoError { libc::ECONNABORTED => (io::ConnectionAborted, "connection aborted"), libc::EADDRNOTAVAIL => (io::ConnectionRefused, "address not available"), libc::EADDRINUSE => (io::ConnectionRefused, "address in use"), + libc::ENOENT => (io::FileNotFound, "no such file or directory"), // These two constants can have the same value on some systems, but // different values on others, so we can't use a match clause diff --git a/src/libnative/io/net.rs b/src/libnative/io/net.rs index dce890dc12961..b33b54862dc2b 100644 --- a/src/libnative/io/net.rs +++ b/src/libnative/io/net.rs @@ -8,7 +8,6 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::c_str::CString; use std::cast; use std::io::net::ip; use std::io; @@ -16,7 +15,6 @@ use std::libc; use std::mem; use std::rt::rtio; use std::sync::arc::UnsafeArc; -use std::unstable::intrinsics; use super::{IoResult, retry}; use super::file::keep_going; @@ -90,30 +88,6 @@ fn addr_to_sockaddr(addr: ip::SocketAddr) -> (libc::sockaddr_storage, uint) { } } -fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> { - // the sun_path length is limited to SUN_LEN (with null) - if addr.len() > libc::sun_len -1 { - return Err(io::IoError { - kind: io::OtherIoError, - desc: "path must be smaller than SUN_LEN", - detail: None, - }) - } - unsafe { - let storage: libc::sockaddr_storage = intrinsics::init(); - let s: *mut libc::sockaddr_un = cast::transmute(&storage); - (*s).sun_family = libc::AF_UNIX as libc::sa_family_t; - let mut i = 0; - for c in addr.iter() { - (*s).sun_path[i] = c; - i += 1; - } - - let len = mem::size_of::() + i + 1; //count the null terminator - return Ok((storage, len)); - } -} - fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult { unsafe { let fam = match addr.ip { @@ -127,15 +101,6 @@ fn socket(addr: ip::SocketAddr, ty: libc::c_int) -> IoResult { } } -fn unix_socket(ty: libc::c_int) -> IoResult { - unsafe { - match libc::socket(libc::AF_UNIX, ty, 0) { - -1 => Err(super::last_error()), - fd => Ok(fd) - } - } -} - fn setsockopt(fd: sock_t, opt: libc::c_int, val: libc::c_int, payload: T) -> IoResult<()> { unsafe { @@ -228,24 +193,6 @@ pub fn sockaddr_to_addr(storage: &libc::sockaddr_storage, } } -fn sockaddr_to_unix(storage: &libc::sockaddr_storage, - len: uint) -> IoResult { - match storage.ss_family as libc::c_int { - libc::AF_UNIX => { - assert!(len as uint <= mem::size_of::()); - let storage: &libc::sockaddr_un = unsafe { - cast::transmute(storage) - }; - unsafe { - Ok(CString::new(storage.sun_path.to_owned().as_ptr(), false)) - } - } - _ => { - Err(io::standard_error(io::OtherIoError)) - } - } -} - #[cfg(unix)] pub fn init() {} diff --git a/src/libnative/io/pipe_unix.rs b/src/libnative/io/pipe_unix.rs index 1160bc196d8f0..a6d75d93d6761 100644 --- a/src/libnative/io/pipe_unix.rs +++ b/src/libnative/io/pipe_unix.rs @@ -8,46 +8,124 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use std::c_str::CString; +use std::cast; +use std::io; +use std::libc; +use std::mem; +use std::rt::rtio; +use std::sync::arc::UnsafeArc; +use std::unstable::intrinsics; + +use super::{IoResult, retry}; +use super::file::{keep_going, fd_t}; + +fn unix_socket(ty: libc::c_int) -> IoResult { + match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } { + -1 => Err(super::last_error()), + fd => Ok(fd) + } +} + +fn addr_to_sockaddr_un(addr: &CString) -> IoResult<(libc::sockaddr_storage, uint)> { + // the sun_path length is limited to SUN_LEN (with null) + assert!(mem::size_of::() >= + mem::size_of::()); + let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; + let s: &mut libc::sockaddr_un = unsafe { cast::transmute(&mut storage) }; + + let len = addr.len(); + if len > s.sun_path.len() - 1 { + return Err(io::IoError { + kind: io::InvalidInput, + desc: "path must be smaller than SUN_LEN", + detail: None, + }) + } + s.sun_family = libc::AF_UNIX as libc::sa_family_t; + for (slot, value) in s.sun_path.mut_iter().zip(addr.iter()) { + *slot = value; + } + + // count the null terminator + let len = mem::size_of::() + len + 1; + return Ok((storage, len)); +} + +fn sockaddr_to_unix(storage: &libc::sockaddr_storage, + len: uint) -> IoResult { + match storage.ss_family as libc::c_int { + libc::AF_UNIX => { + assert!(len as uint <= mem::size_of::()); + let storage: &libc::sockaddr_un = unsafe { + cast::transmute(storage) + }; + unsafe { + Ok(CString::new(storage.sun_path.as_ptr(), false).clone()) + } + } + _ => Err(io::standard_error(io::InvalidInput)) + } +} + +struct Inner { + fd: fd_t, +} + +impl Drop for Inner { + fn drop(&mut self) { unsafe { let _ = libc::close(self.fd); } } +} + +fn connect(addr: &CString, ty: libc::c_int) -> IoResult { + let (addr, len) = if_ok!(addr_to_sockaddr_un(addr)); + let inner = Inner { fd: if_ok!(unix_socket(ty)) }; + let addrp = &addr as *libc::sockaddr_storage; + match retry(|| unsafe { + libc::connect(inner.fd, addrp as *libc::sockaddr, + len as libc::socklen_t) + }) { + -1 => Err(super::last_error()), + _ => Ok(inner) + } +} + +fn bind(addr: &CString, ty: libc::c_int) -> IoResult { + let (addr, len) = if_ok!(addr_to_sockaddr_un(addr)); + let inner = Inner { fd: if_ok!(unix_socket(ty)) }; + let addrp = &addr as *libc::sockaddr_storage; + match unsafe { + libc::bind(inner.fd, addrp as *libc::sockaddr, len as libc::socklen_t) + } { + -1 => Err(super::last_error()), + _ => Ok(inner) + } +} + //////////////////////////////////////////////////////////////////////////////// -// Unix +// Unix Streams //////////////////////////////////////////////////////////////////////////////// pub struct UnixStream { - priv fd: sock_t, + priv inner: UnsafeArc, } impl UnixStream { - pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult { - unix_socket(ty).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixStream{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match retry(|| { - libc::connect(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) - }) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } + pub fn connect(addr: &CString) -> IoResult { + connect(addr, libc::SOCK_STREAM).map(|inner| { + UnixStream { inner: UnsafeArc::new(inner) } }) } - pub fn fd(&self) -> sock_t { self.fd } + fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } } } impl rtio::RtioPipe for UnixStream { fn read(&mut self, buf: &mut [u8]) -> IoResult { - let ret = retry(|| { - unsafe { - libc::recv(self.fd, - buf.as_ptr() as *mut libc::c_void, - buf.len() as wrlen, - 0) as libc::c_int - } + let ret = retry(|| unsafe { + libc::recv(self.fd(), + buf.as_ptr() as *mut libc::c_void, + buf.len() as libc::size_t, + 0) as libc::c_int }); if ret == 0 { Err(io::standard_error(io::EndOfFile)) @@ -57,14 +135,13 @@ impl rtio::RtioPipe for UnixStream { Ok(ret as uint) } } + fn write(&mut self, buf: &[u8]) -> IoResult<()> { - let ret = keep_going(buf, |buf, len| { - unsafe { - libc::send(self.fd, - buf as *mut libc::c_void, - len as wrlen, - 0) as i64 - } + let ret = keep_going(buf, |buf, len| unsafe { + libc::send(self.fd(), + buf as *mut libc::c_void, + len as libc::size_t, + 0) as i64 }); if ret < 0 { Err(super::last_error()) @@ -72,10 +149,10 @@ impl rtio::RtioPipe for UnixStream { Ok(()) } } -} -impl Drop for UnixStream { - fn drop(&mut self) { unsafe { close(self.fd); } } + fn clone(&self) -> ~rtio::RtioPipe { + ~UnixStream { inner: self.inner.clone() } as ~rtio::RtioPipe + } } //////////////////////////////////////////////////////////////////////////////// @@ -83,176 +160,89 @@ impl Drop for UnixStream { //////////////////////////////////////////////////////////////////////////////// pub struct UnixDatagram { - priv fd: sock_t, + priv inner: UnsafeArc, } impl UnixDatagram { - pub fn connect(addr: &CString, ty: libc::c_int) -> IoResult { - unsafe { - unix_socket(ty).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixDatagram{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match retry(|| { - libc::connect(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) - }) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } - }) - } + pub fn connect(addr: &CString) -> IoResult { + connect(addr, libc::SOCK_DGRAM).map(|inner| { + UnixDatagram { inner: UnsafeArc::new(inner) } + }) } pub fn bind(addr: &CString) -> IoResult { - unsafe { - unix_socket(libc::SOCK_DGRAM).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixDatagram{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match libc::bind(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } - }) - } + bind(addr, libc::SOCK_DGRAM).map(|inner| { + UnixDatagram { inner: UnsafeArc::new(inner) } + }) } - pub fn fd(&self) -> sock_t { self.fd } -} + fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } } -impl rtio::RtioPipe for UnixDatagram { - fn read(&mut self, buf: &mut [u8]) -> IoResult { - let ret = retry(|| { - unsafe { - libc::recv(self.fd, + pub fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> { + let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; + let storagep = &mut storage as *mut libc::sockaddr_storage; + let mut addrlen: libc::socklen_t = + mem::size_of::() as libc::socklen_t; + let ret = retry(|| unsafe { + libc::recvfrom(self.fd(), buf.as_ptr() as *mut libc::c_void, - buf.len() as wrlen, - 0) as libc::c_int - } - }); - if ret == 0 { - Err(io::standard_error(io::EndOfFile)) - } else if ret < 0 { - Err(super::last_error()) - } else { - Ok(ret as uint) - } - } - fn write(&mut self, buf: &[u8]) -> IoResult<()> { - let ret = keep_going(buf, |buf, len| { - unsafe { - libc::send(self.fd, - buf as *mut libc::c_void, - len as wrlen, - 0) as i64 - } + buf.len() as libc::size_t, + 0, + storagep as *mut libc::sockaddr, + &mut addrlen) as libc::c_int }); - if ret < 0 { - Err(super::last_error()) - } else { - Ok(()) - } + if ret < 0 { return Err(super::last_error()) } + sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| { + Ok((ret as uint, addr)) + }) } -} -impl rtio::RtioDatagramPipe for UnixDatagram { - fn recvfrom(&mut self, buf: &mut [u8]) -> IoResult<(uint, CString)> { - unsafe { - let mut storage: libc::sockaddr_storage = intrinsics::init(); - let storagep = &mut storage as *mut libc::sockaddr_storage; - let mut addrlen: libc::socklen_t = - mem::size_of::() as libc::socklen_t; - let ret = retry(|| { - libc::recvfrom(self.fd, - buf.as_ptr() as *mut libc::c_void, - buf.len() as msglen_t, - 0, - storagep as *mut libc::sockaddr, - &mut addrlen) as libc::c_int - }); - if ret < 0 { return Err(super::last_error()) } - sockaddr_to_unix(&storage, addrlen as uint).and_then(|addr| { - Ok((ret as uint, addr)) - }) + pub fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> { + let (dst, len) = if_ok!(addr_to_sockaddr_un(dst)); + let dstp = &dst as *libc::sockaddr_storage; + let ret = retry(|| unsafe { + libc::sendto(self.fd(), + buf.as_ptr() as *libc::c_void, + buf.len() as libc::size_t, + 0, + dstp as *libc::sockaddr, + len as libc::socklen_t) as libc::c_int + }); + match ret { + -1 => Err(super::last_error()), + n if n as uint != buf.len() => { + Err(io::IoError { + kind: io::OtherIoError, + desc: "couldn't send entire packet at once", + detail: None, + }) + } + _ => Ok(()) } } - fn sendto(&mut self, buf: &[u8], dst: &CString) -> IoResult<()> { - match addr_to_sockaddr_un(dst) { - Err(e) => Err(e), - Ok((dst, len)) => { - let dstp = &dst as *libc::sockaddr_storage; - unsafe { - let ret = retry(|| { - libc::sendto(self.fd, - buf.as_ptr() as *libc::c_void, - buf.len() as msglen_t, - 0, - dstp as *libc::sockaddr, - len as libc::socklen_t) as libc::c_int - }); - match ret { - -1 => Err(super::last_error()), - n if n as uint != buf.len() => { - Err(io::IoError { - kind: io::OtherIoError, - desc: "couldn't send entire packet at once", - detail: None, - }) - } - _ => Ok(()) - } - } - } - } + pub fn clone(&mut self) -> UnixDatagram { + UnixDatagram { inner: self.inner.clone() } } } -impl Drop for UnixDatagram { - fn drop(&mut self) { unsafe { close(self.fd); } } -} //////////////////////////////////////////////////////////////////////////////// // Unix Listener //////////////////////////////////////////////////////////////////////////////// pub struct UnixListener { - priv fd: sock_t, + priv inner: Inner, } impl UnixListener { pub fn bind(addr: &CString) -> IoResult { - unsafe { - unix_socket(libc::SOCK_STREAM).and_then(|fd| { - match addr_to_sockaddr_un(addr) { - Err(e) => return Err(e), - Ok((addr, len)) => { - let ret = UnixListener{ fd: fd }; - let addrp = &addr as *libc::sockaddr_storage; - match libc::bind(fd, addrp as *libc::sockaddr, - len as libc::socklen_t) { - -1 => return Err(super::last_error()), - _ => return Ok(ret) - } - } - } - }) - } + bind(addr, libc::SOCK_STREAM).map(|fd| UnixListener { inner: fd }) } - pub fn fd(&self) -> sock_t { self.fd } + fn fd(&self) -> fd_t { self.inner.fd } pub fn native_listen(self, backlog: int) -> IoResult { - match unsafe { libc::listen(self.fd, backlog as libc::c_int) } { + match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } { -1 => Err(super::last_error()), _ => Ok(UnixAcceptor { listener: self }) } @@ -265,16 +255,12 @@ impl rtio::RtioUnixListener for UnixListener { } } -impl Drop for UnixListener { - fn drop(&mut self) { unsafe { close(self.fd); } } -} - pub struct UnixAcceptor { priv listener: UnixListener, } impl UnixAcceptor { - pub fn fd(&self) -> sock_t { self.listener.fd } + fn fd(&self) -> fd_t { self.listener.fd() } pub fn native_accept(&mut self) -> IoResult { let mut storage: libc::sockaddr_storage = unsafe { intrinsics::init() }; @@ -285,9 +271,9 @@ impl UnixAcceptor { libc::accept(self.fd(), storagep as *mut libc::sockaddr, &mut size as *mut libc::socklen_t) as libc::c_int - }) as sock_t { + }) { -1 => Err(super::last_error()), - fd => Ok(UnixStream { fd: fd }) + fd => Ok(UnixStream { inner: UnsafeArc::new(Inner { fd: fd }) }) } } } @@ -297,4 +283,3 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor { self.native_accept().map(|s| ~s as ~rtio::RtioPipe) } } - diff --git a/src/libnative/io/pipe_win32.rs b/src/libnative/io/pipe_win32.rs new file mode 100644 index 0000000000000..83731cc02a6b6 --- /dev/null +++ b/src/libnative/io/pipe_win32.rs @@ -0,0 +1,492 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Named pipes implementation for windows +//! +//! If are unfortunate enough to be reading this code, I would like to first +//! apologize. This was my first encounter with windows named pipes, and it +//! didn't exactly turn out very cleanly. If you, too, are new to named pipes, +//! read on as I'll try to explain some fun things that I ran into. +//! +//! # Unix pipes vs Named pipes +//! +//! As with everything else, named pipes on windows are pretty different from +//! unix pipes on unix. On unix, you use one "server pipe" to accept new client +//! pipes. So long as this server pipe is active, new children pipes can +//! connect. On windows, you instead have a number of "server pipes", and each +//! of these server pipes can throughout their lifetime be attached to a client +//! or not. Once attached to a client, a server pipe may then disconnect at a +//! later date. +//! +//! # Accepting clients +//! +//! As with most other I/O interfaces, our Listener/Acceptor/Stream interfaces +//! are built around the unix flavors. This means that we have one "server +//! pipe" to which many clients can connect. In order to make this compatible +//! with the windows model, each connected client consumes ownership of a server +//! pipe, and then a new server pipe is created for the next client. +//! +//! Note that the server pipes attached to clients are never given back to the +//! listener for recycling. This could possibly be implemented with a channel so +//! the listener half can re-use server pipes, but for now I err'd on the simple +//! side of things. Each stream accepted by a listener will destroy the server +//! pipe after the stream is dropped. +//! +//! This model ends up having a small race or two, and you can find more details +//! on the `native_accept` method. +//! +//! # Simultaneous reads and writes +//! +//! In testing, I found that two simultaneous writes and two simultaneous reads +//! on a pipe ended up working out just fine, but problems were encountered when +//! a read was executed simultaneously with a write. After some googling around, +//! it sounded like named pipes just weren't built for this kind of interaction, +//! and the suggested solution was to use overlapped I/O. +//! +//! I don't realy know what overlapped I/O is, but my basic understanding after +//! reading about it is that you have an external Event which is used to signal +//! I/O completion, passed around in some OVERLAPPED structures. As to what this +//! is, I'm not exactly sure. +//! +//! This problem implies that all named pipes are created with the +//! FILE_FLAG_OVERLAPPED option. This means that all of their I/O is +//! asynchronous. Each I/O operation has an associated OVERLAPPED structure, and +//! inside of this structure is a HANDLE from CreateEvent. After the I/O is +//! determined to be pending (may complete in the future), the +//! GetOverlappedResult function is used to block on the event, waiting for the +//! I/O to finish. +//! +//! This scheme ended up working well enough. There were two snags that I ran +//! into, however: +//! +//! * Each UnixStream instance needs its own read/write events to wait on. These +//! can't be shared among clones of the same stream because the documentation +//! states that it unsets the event when the I/O is started (would possibly +//! corrupt other events simultaneously waiting). For convenience's sake, +//! these events are lazily initialized. +//! +//! * Each server pipe needs to be created with FILE_FLAG_OVERLAPPED in addition +//! to all pipes created through `connect`. Notably this means that the +//! ConnectNamedPipe function is nonblocking, implying that the Listener needs +//! to have yet another event to do the actual blocking. +//! +//! # Conclusion +//! +//! The conclusion here is that I probably don't know the best way to work with +//! windows named pipes, but the solution here seems to work well enough to get +//! the test suite passing (the suite is in libstd), and that's good enough for +//! me! + +use std::c_str::CString; +use std::libc; +use std::os::win32::as_utf16_p; +use std::ptr; +use std::rt::rtio; +use std::sync::arc::UnsafeArc; +use std::unstable::intrinsics; + +use super::IoResult; + +struct Event(libc::HANDLE); + +impl Event { + fn new(manual_reset: bool, initial_state: bool) -> IoResult { + let event = unsafe { + libc::CreateEventW(ptr::mut_null(), + manual_reset as libc::BOOL, + initial_state as libc::BOOL, + ptr::null()) + }; + if event as uint == 0 { + Err(super::last_error()) + } else { + Ok(Event(event)) + } + } + + fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle } +} + +impl Drop for Event { + fn drop(&mut self) { + unsafe { let _ = libc::CloseHandle(self.handle()); } + } +} + +struct Inner { + handle: libc::HANDLE, +} + +impl Drop for Inner { + fn drop(&mut self) { + unsafe { + let _ = libc::FlushFileBuffers(self.handle); + let _ = libc::CloseHandle(self.handle); + } + } +} + +unsafe fn pipe(name: *u16, init: bool) -> libc::HANDLE { + libc::CreateNamedPipeW( + name, + libc::PIPE_ACCESS_DUPLEX | + if init {libc::FILE_FLAG_FIRST_PIPE_INSTANCE} else {0} | + libc::FILE_FLAG_OVERLAPPED, + libc::PIPE_TYPE_BYTE | libc::PIPE_READMODE_BYTE | + libc::PIPE_WAIT, + libc::PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + 0, + ptr::mut_null() + ) +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Streams +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixStream { + priv inner: UnsafeArc, + priv write: Option, + priv read: Option, +} + +impl UnixStream { + fn try_connect(p: *u16) -> Option { + // Note that most of this is lifted from the libuv implementation. + // The idea is that if we fail to open a pipe in read/write mode + // that we try afterwards in just read or just write + let mut result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_READ | libc::GENERIC_WRITE, + 0, + ptr::mut_null(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::mut_null()) + }; + if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE { + return Some(result) + } + + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_ACCESS_DENIED as libc::DWORD { + result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_READ | libc::FILE_WRITE_ATTRIBUTES, + 0, + ptr::mut_null(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::mut_null()) + }; + if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE { + return Some(result) + } + } + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_ACCESS_DENIED as libc::DWORD { + result = unsafe { + libc::CreateFileW(p, + libc::GENERIC_WRITE | libc::FILE_READ_ATTRIBUTES, + 0, + ptr::mut_null(), + libc::OPEN_EXISTING, + libc::FILE_FLAG_OVERLAPPED, + ptr::mut_null()) + }; + if result != libc::INVALID_HANDLE_VALUE as libc::HANDLE { + return Some(result) + } + } + None + } + + pub fn connect(addr: &CString) -> IoResult { + as_utf16_p(addr.as_str().unwrap(), |p| { + loop { + match UnixStream::try_connect(p) { + Some(handle) => { + let inner = Inner { handle: handle }; + let mut mode = libc::PIPE_TYPE_BYTE | + libc::PIPE_READMODE_BYTE | + libc::PIPE_WAIT; + let ret = unsafe { + libc::SetNamedPipeHandleState(inner.handle, + &mut mode, + ptr::mut_null(), + ptr::mut_null()) + }; + return if ret == 0 { + Err(super::last_error()) + } else { + Ok(UnixStream { + inner: UnsafeArc::new(inner), + read: None, + write: None, + }) + } + } + None => {} + } + + // On windows, if you fail to connect, you may need to call the + // `WaitNamedPipe` function, and this is indicated with an error + // code of ERROR_PIPE_BUSY. + let code = unsafe { libc::GetLastError() }; + if code as int != libc::ERROR_PIPE_BUSY as int { + return Err(super::last_error()) + } + + // An example I found on microsoft's website used 20 seconds, + // libuv uses 30 seconds, hence we make the obvious choice of + // waiting for 25 seconds. + if unsafe { libc::WaitNamedPipeW(p, 25000) } == 0 { + return Err(super::last_error()) + } + } + }) + } + + fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } } +} + +impl rtio::RtioPipe for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + if self.read.is_none() { + self.read = Some(if_ok!(Event::new(true, false))); + } + + let mut bytes_read = 0; + let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; + overlapped.hEvent = self.read.get_ref().handle(); + + let ret = unsafe { + libc::ReadFile(self.handle(), + buf.as_ptr() as libc::LPVOID, + buf.len() as libc::DWORD, + &mut bytes_read, + &mut overlapped) + }; + if ret == 0 { + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_read, + libc::TRUE) + }; + if ret == 0 { + return Err(super::last_error()) + } + } else { + return Err(super::last_error()) + } + } + + Ok(bytes_read as uint) + } + + fn write(&mut self, buf: &[u8]) -> IoResult<()> { + if self.write.is_none() { + self.write = Some(if_ok!(Event::new(true, false))); + } + + let mut offset = 0; + let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; + overlapped.hEvent = self.write.get_ref().handle(); + + while offset < buf.len() { + let mut bytes_written = 0; + let ret = unsafe { + libc::WriteFile(self.handle(), + buf.slice_from(offset).as_ptr() as libc::LPVOID, + (buf.len() - offset) as libc::DWORD, + &mut bytes_written, + &mut overlapped) + }; + if ret == 0 { + let err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + let ret = unsafe { + libc::GetOverlappedResult(self.handle(), + &mut overlapped, + &mut bytes_written, + libc::TRUE) + }; + if ret == 0 { + return Err(super::last_error()) + } + } else { + return Err(super::last_error()) + } + } + offset += bytes_written as uint; + } + Ok(()) + } + + fn clone(&self) -> ~rtio::RtioPipe { + ~UnixStream { + inner: self.inner.clone(), + read: None, + write: None, + } as ~rtio::RtioPipe + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Unix Listener +//////////////////////////////////////////////////////////////////////////////// + +pub struct UnixListener { + priv handle: libc::HANDLE, + priv name: CString, +} + +impl UnixListener { + pub fn bind(addr: &CString) -> IoResult { + // Although we technically don't need the pipe until much later, we + // create the initial handle up front to test the validity of the name + // and such. + as_utf16_p(addr.as_str().unwrap(), |p| { + let ret = unsafe { pipe(p, true) }; + if ret == libc::INVALID_HANDLE_VALUE as libc::HANDLE { + Err(super::last_error()) + } else { + Ok(UnixListener { handle: ret, name: addr.clone() }) + } + }) + } + + pub fn native_listen(self) -> IoResult { + Ok(UnixAcceptor { + listener: self, + event: if_ok!(Event::new(true, false)), + }) + } +} + +impl Drop for UnixListener { + fn drop(&mut self) { + unsafe { let _ = libc::CloseHandle(self.handle); } + } +} + +impl rtio::RtioUnixListener for UnixListener { + fn listen(~self) -> IoResult<~rtio::RtioUnixAcceptor> { + self.native_listen().map(|a| ~a as ~rtio::RtioUnixAcceptor) + } +} + +pub struct UnixAcceptor { + priv listener: UnixListener, + priv event: Event, +} + +impl UnixAcceptor { + pub fn native_accept(&mut self) -> IoResult { + // This function has some funky implementation details when working with + // unix pipes. On windows, each server named pipe handle can be + // connected to a one or zero clients. To the best of my knowledge, a + // named server is considered active and present if there exists at + // least one server named pipe for it. + // + // The model of this function is to take the current known server + // handle, connect a client to it, and then transfer ownership to the + // UnixStream instance. The next time accept() is invoked, it'll need a + // different server handle to connect a client to. + // + // Note that there is a possible race here. Once our server pipe is + // handed off to a `UnixStream` object, the stream could be closed, + // meaning that there would be no active server pipes, hence even though + // we have a valid `UnixAcceptor`, no one can connect to it. For this + // reason, we generate the next accept call's server pipe at the end of + // this function call. + // + // This provides us an invariant that we always have at least one server + // connection open at a time, meaning that all connects to this acceptor + // should succeed while this is active. + // + // The actual implementation of doing this is a little tricky. Once a + // server pipe is created, a client can connect to it at any time. I + // assume that which server a client connects to is nondeterministic, so + // we also need to guarantee that the only server able to be connected + // to is the one that we're calling ConnectNamedPipe on. This means that + // we have to create the second server pipe *after* we've already + // accepted a connection. In order to at least somewhat gracefully + // handle errors, this means that if the second server pipe creation + // fails that we disconnect the connected client and then just keep + // using the original server pipe. + let handle = self.listener.handle; + + // Once we've got a "server handle", we need to wait for a client to + // connect. The ConnectNamedPipe function will block this thread until + // someone on the other end connects. This function can "fail" if a + // client connects after we created the pipe but before we got down + // here. Thanks windows. + let mut overlapped: libc::OVERLAPPED = unsafe { intrinsics::init() }; + overlapped.hEvent = self.event.handle(); + if unsafe { libc::ConnectNamedPipe(handle, &mut overlapped) == 0 } { + let mut err = unsafe { libc::GetLastError() }; + if err == libc::ERROR_IO_PENDING as libc::DWORD { + let ret = unsafe { + let mut transfer = 0; + libc::GetOverlappedResult(handle, + &mut overlapped, + &mut transfer, + libc::TRUE) + }; + if ret == 0 { + err = unsafe { libc::GetLastError() }; + } else { + // we succeeded, bypass the check below + err = libc::ERROR_PIPE_CONNECTED as libc::DWORD; + } + } + if err != libc::ERROR_PIPE_CONNECTED as libc::DWORD { + return Err(super::last_error()) + } + } + + // Now that we've got a connected client to our handle, we need to + // create a second server pipe. If this fails, we disconnect the + // connected client and return an error (see comments above). + let new_handle = as_utf16_p(self.listener.name.as_str().unwrap(), |p| { + unsafe { pipe(p, false) } + }); + if new_handle == libc::INVALID_HANDLE_VALUE as libc::HANDLE { + let ret = Err(super::last_error()); + // If our disconnection fails, then there's not really a whole lot + // that we can do, so fail the task. + let err = unsafe { libc::DisconnectNamedPipe(handle) }; + assert!(err != 0); + return ret; + } else { + self.listener.handle = new_handle; + } + + // Transfer ownership of our handle into this stream + Ok(UnixStream { + inner: UnsafeArc::new(Inner { handle: handle }), + read: None, + write: None, + }) + } +} + +impl rtio::RtioUnixAcceptor for UnixAcceptor { + fn accept(&mut self) -> IoResult<~rtio::RtioPipe> { + self.native_accept().map(|s| ~s as ~rtio::RtioPipe) + } +} + diff --git a/src/libstd/io/net/mod.rs b/src/libstd/io/net/mod.rs index cf109167089d4..436156a12190b 100644 --- a/src/libstd/io/net/mod.rs +++ b/src/libstd/io/net/mod.rs @@ -14,5 +14,5 @@ pub mod addrinfo; pub mod tcp; pub mod udp; pub mod ip; -#[cfg(unix)] +// FIXME(#12093) - this should not be called unix pub mod unix; diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index 23c01aa635444..a1f3cbbe32643 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -134,7 +134,7 @@ mod tests { use io::*; use io::test::*; - fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) { + pub fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) { let path1 = next_test_unix(); let path2 = path1.clone(); let (port, chan) = Chan::new(); @@ -149,25 +149,32 @@ mod tests { server(acceptor.accept().unwrap()); } - #[test] - fn bind_error() { - match UnixListener::bind(&("path/to/nowhere")) { + iotest!(fn bind_error() { + let path = "path/to/nowhere"; + match UnixListener::bind(&path) { Ok(..) => fail!(), - Err(e) => assert_eq!(e.kind, PermissionDenied), + Err(e) => { + assert!(e.kind == PermissionDenied || e.kind == FileNotFound || + e.kind == InvalidInput); + } } - } - - #[test] - fn connect_error() { - match UnixStream::connect(&("path/to/nowhere")) { + }) + + iotest!(fn connect_error() { + let path = if cfg!(windows) { + r"\\.\pipe\this_should_not_exist_ever" + } else { + "path/to/nowhere" + }; + match UnixStream::connect(&path) { Ok(..) => fail!(), - Err(e) => assert_eq!(e.kind, - if cfg!(windows) {OtherIoError} else {FileNotFound}) + Err(e) => { + assert!(e.kind == FileNotFound || e.kind == OtherIoError); + } } - } + }) - #[test] - fn smoke() { + iotest!(fn smoke() { smalltest(proc(mut server) { let mut buf = [0]; server.read(buf).unwrap(); @@ -175,10 +182,9 @@ mod tests { }, proc(mut client) { client.write([99]).unwrap(); }) - } + }) - #[test] - fn read_eof() { + iotest!(fn read_eof() { smalltest(proc(mut server) { let mut buf = [0]; assert!(server.read(buf).is_err()); @@ -186,17 +192,18 @@ mod tests { }, proc(_client) { // drop the client }) - } + }) - #[test] - fn write_begone() { + iotest!(fn write_begone() { smalltest(proc(mut server) { let buf = [0]; loop { match server.write(buf) { Ok(..) => {} Err(e) => { - assert!(e.kind == BrokenPipe || e.kind == NotConnected, + assert!(e.kind == BrokenPipe || + e.kind == NotConnected || + e.kind == ConnectionReset, "unknown error {:?}", e); break; } @@ -205,10 +212,9 @@ mod tests { }, proc(_client) { // drop the client }) - } + }) - #[test] - fn accept_lots() { + iotest!(fn accept_lots() { let times = 10; let path1 = next_test_unix(); let path2 = path1.clone(); @@ -218,38 +224,49 @@ mod tests { port.recv(); for _ in range(0, times) { let mut stream = UnixStream::connect(&path2); - stream.write([100]).unwrap(); + match stream.write([100]) { + Ok(..) => {} + Err(e) => fail!("failed write: {}", e) + } } }); - let mut acceptor = UnixListener::bind(&path1).listen(); + let mut acceptor = match UnixListener::bind(&path1).listen() { + Ok(a) => a, + Err(e) => fail!("failed listen: {}", e), + }; chan.send(()); for _ in range(0, times) { let mut client = acceptor.accept(); let mut buf = [0]; - client.read(buf).unwrap(); + match client.read(buf) { + Ok(..) => {} + Err(e) => fail!("failed read/accept: {}", e), + } assert_eq!(buf[0], 100); } - } + }) - #[test] - fn path_exists() { + #[cfg(unix)] + iotest!(fn path_exists() { let path = next_test_unix(); let _acceptor = UnixListener::bind(&path).listen(); assert!(path.exists()); - } + }) - #[test] - fn unix_clone_smoke() { + iotest!(fn unix_clone_smoke() { let addr = next_test_unix(); let mut acceptor = UnixListener::bind(&addr).listen(); spawn(proc() { let mut s = UnixStream::connect(&addr); let mut buf = [0, 0]; + debug!("client reading"); assert_eq!(s.read(buf), Ok(1)); assert_eq!(buf[0], 1); + debug!("client writing"); s.write([2]).unwrap(); + debug!("client dropping"); }); let mut s1 = acceptor.accept().unwrap(); @@ -260,17 +277,20 @@ mod tests { spawn(proc() { let mut s2 = s2; p1.recv(); + debug!("writer writing"); s2.write([1]).unwrap(); + debug!("writer done"); c2.send(()); }); c1.send(()); let mut buf = [0, 0]; + debug!("reader reading"); assert_eq!(s1.read(buf), Ok(1)); + debug!("reader done"); p2.recv(); - } + }) - #[test] - fn unix_clone_two_read() { + iotest!(fn unix_clone_two_read() { let addr = next_test_unix(); let mut acceptor = UnixListener::bind(&addr).listen(); let (p, c) = Chan::new(); @@ -300,10 +320,9 @@ mod tests { c.send(()); p.recv(); - } + }) - #[test] - fn unix_clone_two_write() { + iotest!(fn unix_clone_two_write() { let addr = next_test_unix(); let mut acceptor = UnixListener::bind(&addr).listen(); @@ -326,5 +345,5 @@ mod tests { s1.write([2]).unwrap(); p.recv(); - } + }) } diff --git a/src/libstd/libc.rs b/src/libstd/libc.rs index c42a889605312..73bf4a1e69a88 100644 --- a/src/libstd/libc.rs +++ b/src/libstd/libc.rs @@ -269,7 +269,6 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint}; - pub static sun_len:uint = 108; pub type socklen_t = u32; pub type sa_family_t = u16; pub type in_port_t = u16; @@ -641,7 +640,6 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint}; - pub static sun_len:uint = 104; pub type socklen_t = u32; pub type sa_family_t = u8; pub type in_port_t = u16; @@ -844,7 +842,6 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint, size_t}; - pub static sun_len:uint = 108; pub type SOCKET = c_uint; pub type socklen_t = c_int; pub type sa_family_t = u16; @@ -1213,7 +1210,6 @@ pub mod types { pub mod bsd44 { use libc::types::os::arch::c95::{c_char, c_int, c_uint}; - pub static sun_len:uint = 104; pub type socklen_t = c_int; pub type sa_family_t = u8; pub type in_port_t = u16; @@ -1627,11 +1623,19 @@ pub mod consts { pub static O_NOINHERIT: c_int = 128; pub static ERROR_SUCCESS : c_int = 0; + pub static ERROR_FILE_NOT_FOUND: c_int = 2; + pub static ERROR_ACCESS_DENIED: c_int = 5; pub static ERROR_INVALID_HANDLE : c_int = 6; + pub static ERROR_BROKEN_PIPE: c_int = 109; pub static ERROR_DISK_FULL : c_int = 112; pub static ERROR_INSUFFICIENT_BUFFER : c_int = 122; + pub static ERROR_INVALID_NAME : c_int = 123; pub static ERROR_ALREADY_EXISTS : c_int = 183; + pub static ERROR_PIPE_BUSY: c_int = 231; + pub static ERROR_NO_DATA: c_int = 232; pub static ERROR_INVALID_ADDRESS : c_int = 487; + pub static ERROR_PIPE_CONNECTED: c_int = 535; + pub static ERROR_IO_PENDING: c_int = 997; pub static ERROR_FILE_INVALID : c_int = 1006; pub static INVALID_HANDLE_VALUE : c_int = -1; @@ -1770,6 +1774,7 @@ pub mod consts { pub static FILE_FLAG_SESSION_AWARE: DWORD = 0x00800000; pub static FILE_FLAG_SEQUENTIAL_SCAN: DWORD = 0x08000000; pub static FILE_FLAG_WRITE_THROUGH: DWORD = 0x80000000; + pub static FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000; pub static FILE_NAME_NORMALIZED: DWORD = 0x0; pub static FILE_NAME_OPENED: DWORD = 0x8; @@ -1783,6 +1788,8 @@ pub mod consts { pub static GENERIC_WRITE: DWORD = 0x40000000; pub static GENERIC_EXECUTE: DWORD = 0x20000000; pub static GENERIC_ALL: DWORD = 0x10000000; + pub static FILE_WRITE_ATTRIBUTES: DWORD = 0x00000100; + pub static FILE_READ_ATTRIBUTES: DWORD = 0x00000080; pub static FILE_BEGIN: DWORD = 0; pub static FILE_CURRENT: DWORD = 1; @@ -1794,6 +1801,19 @@ pub mod consts { pub static DETACHED_PROCESS: DWORD = 0x00000008; pub static CREATE_NEW_PROCESS_GROUP: DWORD = 0x00000200; + + pub static PIPE_ACCESS_DUPLEX: DWORD = 0x00000003; + pub static PIPE_ACCESS_INBOUND: DWORD = 0x00000001; + pub static PIPE_ACCESS_OUTBOUND: DWORD = 0x00000002; + pub static PIPE_TYPE_BYTE: DWORD = 0x00000000; + pub static PIPE_TYPE_MESSAGE: DWORD = 0x00000004; + pub static PIPE_READMODE_BYTE: DWORD = 0x00000000; + pub static PIPE_READMODE_MESSAGE: DWORD = 0x00000002; + pub static PIPE_WAIT: DWORD = 0x00000000; + pub static PIPE_NOWAIT: DWORD = 0x00000001; + pub static PIPE_ACCEPT_REMOTE_CLIENTS: DWORD = 0x00000000; + pub static PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008; + pub static PIPE_UNLIMITED_INSTANCES: DWORD = 255; } pub mod sysconf { } @@ -2784,6 +2804,7 @@ pub mod consts { pub static AF_INET: c_int = 2; pub static AF_INET6: c_int = 28; + pub static AF_UNIX: c_int = 1; pub static SOCK_STREAM: c_int = 1; pub static SOCK_DGRAM: c_int = 2; pub static IPPROTO_TCP: c_int = 6; @@ -4177,6 +4198,34 @@ pub mod funcs { lpPerformanceCount: *mut LARGE_INTEGER) -> BOOL; pub fn GetCurrentProcessId() -> DWORD; + pub fn CreateNamedPipeW( + lpName: LPCWSTR, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, + lpSecurityAttributes: LPSECURITY_ATTRIBUTES + ) -> HANDLE; + pub fn ConnectNamedPipe(hNamedPipe: HANDLE, + lpOverlapped: LPOVERLAPPED) -> BOOL; + pub fn WaitNamedPipeW(lpNamedPipeName: LPCWSTR, + nTimeOut: DWORD) -> BOOL; + pub fn SetNamedPipeHandleState(hNamedPipe: HANDLE, + lpMode: LPDWORD, + lpMaxCollectionCount: LPDWORD, + lpCollectDataTimeout: LPDWORD) + -> BOOL; + pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES, + bManualReset: BOOL, + bInitialState: BOOL, + lpName: LPCWSTR) -> HANDLE; + pub fn GetOverlappedResult(hFile: HANDLE, + lpOverlapped: LPOVERLAPPED, + lpNumberOfBytesTransferred: LPDWORD, + bWait: BOOL) -> BOOL; + pub fn DisconnectNamedPipe(hNamedPipe: HANDLE) -> BOOL; } } diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 578ace2ba86a0..5573f8ec02eb3 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -260,11 +260,6 @@ pub trait RtioPipe { fn clone(&self) -> ~RtioPipe; } -pub trait RtioDatagramPipe : RtioPipe { - fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, CString), IoError>; - fn sendto(&mut self, buf: &[u8], dst: &CString) -> Result<(), IoError>; -} - pub trait RtioUnixListener { fn listen(~self) -> Result<~RtioUnixAcceptor, IoError>; }