diff --git a/src/adapters/framed_tcp.rs b/src/adapters/framed_tcp.rs index 1b17a0c..c5003ba 100644 --- a/src/adapters/framed_tcp.rs +++ b/src/adapters/framed_tcp.rs @@ -2,7 +2,7 @@ use crate::network::adapter::{ Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, ListeningInfo, PendingStatus, }; -use crate::network::{RemoteAddr, Readiness}; +use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen}; use crate::util::encoding::{self, Decoder, MAX_ENCODED_SIZE}; use mio::net::{TcpListener, TcpStream}; @@ -45,7 +45,10 @@ impl Resource for RemoteResource { } impl Remote for RemoteResource { - fn connect(remote_addr: RemoteAddr) -> io::Result> { + fn connect_with( + _: TransportConnect, + remote_addr: RemoteAddr, + ) -> io::Result> { let peer_addr = *remote_addr.socket_addr(); let stream = TcpStream::connect(peer_addr)?; let local_addr = stream.local_addr()?; @@ -129,7 +132,7 @@ impl Resource for LocalResource { impl Local for LocalResource { type Remote = RemoteResource; - fn listen(addr: SocketAddr) -> io::Result> { + fn listen_with(_: TransportListen, addr: SocketAddr) -> io::Result> { let listener = TcpListener::bind(addr)?; let local_addr = listener.local_addr().unwrap(); Ok(ListeningInfo { local: { LocalResource { listener } }, local_addr }) diff --git a/src/adapters/tcp.rs b/src/adapters/tcp.rs index d77e22e..d8a1062 100644 --- a/src/adapters/tcp.rs +++ b/src/adapters/tcp.rs @@ -2,7 +2,7 @@ use crate::network::adapter::{ Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, ListeningInfo, PendingStatus, }; -use crate::network::{RemoteAddr, Readiness}; +use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen}; use mio::net::{TcpListener, TcpStream}; use mio::event::{Source}; @@ -40,7 +40,10 @@ impl Resource for RemoteResource { } impl Remote for RemoteResource { - fn connect(remote_addr: RemoteAddr) -> io::Result> { + fn connect_with( + _: TransportConnect, + remote_addr: RemoteAddr, + ) -> io::Result> { let peer_addr = *remote_addr.socket_addr(); let stream = TcpStream::connect(peer_addr)?; let local_addr = stream.local_addr()?; @@ -131,7 +134,7 @@ impl Resource for LocalResource { impl Local for LocalResource { type Remote = RemoteResource; - fn listen(addr: SocketAddr) -> io::Result> { + fn listen_with(_: TransportListen, addr: SocketAddr) -> io::Result> { let listener = TcpListener::bind(addr)?; let local_addr = listener.local_addr().unwrap(); Ok(ListeningInfo { local: { LocalResource { listener } }, local_addr }) diff --git a/src/adapters/template.rs b/src/adapters/template.rs index 46815c4..738c0a2 100644 --- a/src/adapters/template.rs +++ b/src/adapters/template.rs @@ -4,7 +4,7 @@ use crate::network::adapter::{ Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, ListeningInfo, PendingStatus, }; -use crate::network::{RemoteAddr, Readiness}; +use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen}; use mio::event::{Source}; @@ -25,7 +25,10 @@ impl Resource for RemoteResource { } impl Remote for RemoteResource { - fn connect(remote_addr: RemoteAddr) -> io::Result> { + fn connect_with( + config: TransportConnect, + remote_addr: RemoteAddr, + ) -> io::Result> { todo!() } @@ -52,7 +55,7 @@ impl Resource for LocalResource { impl Local for LocalResource { type Remote = RemoteResource; - fn listen(addr: SocketAddr) -> io::Result> { + fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result> { todo!() } diff --git a/src/adapters/udp.rs b/src/adapters/udp.rs index 99b4a0e..4639285 100644 --- a/src/adapters/udp.rs +++ b/src/adapters/udp.rs @@ -2,7 +2,7 @@ use crate::network::adapter::{ Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, ListeningInfo, PendingStatus, }; -use crate::network::{RemoteAddr, Readiness}; +use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen}; use mio::net::{UdpSocket}; use mio::event::{Source}; @@ -26,6 +26,20 @@ pub const MAX_LOCAL_PAYLOAD_LEN: usize = 65535 - 20 - 8; #[cfg(target_os = "macos")] pub const MAX_LOCAL_PAYLOAD_LEN: usize = 9216 - 20 - 8; +#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)] +pub struct UdpConnectConfig { + /// Enables the socket capabilities to send broadcast messages. + pub broadcast: bool, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug, Default)] +pub struct UdpListenConfig { + /// Enables the socket capabilities to send broadcast messages when the listening socket is + /// also used for sending with + /// [`Endpoint::from_listener`](crate::network::Endpoint::from_listener). + pub broadcast: bool, +} + pub(crate) struct UdpAdapter; impl Adapter for UdpAdapter { type Remote = RemoteResource; @@ -43,9 +57,22 @@ impl Resource for RemoteResource { } impl Remote for RemoteResource { - fn connect(remote_addr: RemoteAddr) -> io::Result> { + fn connect_with( + config: TransportConnect, + remote_addr: RemoteAddr, + ) -> io::Result> { + let config = match config { + TransportConnect::Udp(config) => config, + _ => panic!("Internal error: Got wrong config"), + }; + let socket = UdpSocket::bind("0.0.0.0:0".parse().unwrap())?; let peer_addr = *remote_addr.socket_addr(); + + if config.broadcast { + socket.set_broadcast(true)?; + } + socket.connect(peer_addr)?; let local_addr = socket.local_addr()?; Ok(ConnectionInfo { remote: RemoteResource { socket }, local_addr, peer_addr }) @@ -95,7 +122,12 @@ impl Resource for LocalResource { impl Local for LocalResource { type Remote = RemoteResource; - fn listen(addr: SocketAddr) -> io::Result> { + fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result> { + let config = match config { + TransportListen::Udp(config) => config, + _ => panic!("Internal error: Got wrong config"), + }; + let socket = match addr { SocketAddr::V4(addr) if addr.ip().is_multicast() => { let listening_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, addr.port()); @@ -112,6 +144,10 @@ impl Local for LocalResource { _ => UdpSocket::bind(addr)?, }; + if config.broadcast { + socket.set_broadcast(true)?; + } + let local_addr = socket.local_addr().unwrap(); Ok(ListeningInfo { local: { LocalResource { socket } }, local_addr }) } diff --git a/src/adapters/ws.rs b/src/adapters/ws.rs index 690ee02..a6b3a31 100644 --- a/src/adapters/ws.rs +++ b/src/adapters/ws.rs @@ -4,6 +4,7 @@ use crate::network::adapter::{ }; use crate::network::{RemoteAddr, Readiness}; use crate::util::thread::{OTHER_THREAD_ERR}; +use crate::network::{TransportConnect, TransportListen}; use mio::event::{Source}; use mio::net::{TcpStream, TcpListener}; @@ -76,7 +77,10 @@ impl Resource for RemoteResource { } impl Remote for RemoteResource { - fn connect(remote_addr: RemoteAddr) -> io::Result> { + fn connect_with( + _: TransportConnect, + remote_addr: RemoteAddr, + ) -> io::Result> { let (peer_addr, url) = match remote_addr { RemoteAddr::Socket(addr) => { (addr, Url::parse(&format!("ws://{addr}/message-io-default")).unwrap()) @@ -328,7 +332,7 @@ impl Resource for LocalResource { impl Local for LocalResource { type Remote = RemoteResource; - fn listen(addr: SocketAddr) -> io::Result> { + fn listen_with(_: TransportListen, addr: SocketAddr) -> io::Result> { let listener = TcpListener::bind(addr)?; let local_addr = listener.local_addr().unwrap(); Ok(ListeningInfo { local: LocalResource { listener }, local_addr }) diff --git a/src/network.rs b/src/network.rs index 6287d1b..0dbf1ac 100644 --- a/src/network.rs +++ b/src/network.rs @@ -17,7 +17,7 @@ pub use adapter::{SendStatus}; pub use resource_id::{ResourceId, ResourceType}; pub use endpoint::{Endpoint}; pub use remote_addr::{RemoteAddr, ToRemoteAddr}; -pub use transport::{Transport}; +pub use transport::{Transport, TransportConnect, TransportListen}; pub use driver::{NetEvent}; pub use poll::{Readiness}; @@ -101,12 +101,65 @@ impl NetworkController { &self, transport: Transport, addr: impl ToRemoteAddr, + ) -> io::Result<(Endpoint, SocketAddr)> { + self.connect_with(transport.into(), addr) + } + + /// Creates a connection to the specified address with custom transport options for transports + /// that support it. + /// The endpoint, an identifier of the new connection, will be returned. + /// This function will generate a [`NetEvent::Connected`] event with the result of the + /// connection. This call will **NOT** block to perform the connection. + /// + /// Note that this function can return an error in the case the internal socket + /// could not be binded or open in the OS, but never will return an error regarding + /// the connection itself. + /// If you want to check if the connection has been established or not you have to read the + /// boolean indicator in the [`NetEvent::Connected`] event. + /// + /// Example + /// ``` + /// use message_io::node::{self, NodeEvent}; + /// use message_io::network::{TransportConnect, NetEvent}; + /// use message_io::adapters::udp::{UdpConnectConfig}; + /// + /// let (handler, listener) = node::split(); + /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1)); + /// + /// let config = UdpConnectConfig { broadcast: true }; + /// let addr = "255.255.255.255:7777"; + /// let (conn_endpoint, _) = handler.network().connect_with(TransportConnect::Udp(config), addr).unwrap(); + /// // The socket could not be able to send yet. + /// + /// listener.for_each(move |event| match event { + /// NodeEvent::Network(net_event) => match net_event { + /// NetEvent::Connected(endpoint, established) => { + /// assert_eq!(conn_endpoint, endpoint); + /// if established { + /// println!("Connected!"); + /// handler.network().send(endpoint, &[42]); + /// } + /// else { + /// println!("Could not connect"); + /// } + /// }, + /// _ => (), + /// } + /// NodeEvent::Signal(_) => handler.stop(), + /// }); + /// ``` + pub fn connect_with( + &self, + transport_connect: TransportConnect, + addr: impl ToRemoteAddr, ) -> io::Result<(Endpoint, SocketAddr)> { let addr = addr.to_remote_addr().unwrap(); - self.controllers[transport.id() as usize].connect(addr).map(|(endpoint, addr)| { - log::trace!("Connect to {}", endpoint); - (endpoint, addr) - }) + self.controllers[transport_connect.id() as usize].connect_with(transport_connect, addr).map( + |(endpoint, addr)| { + log::trace!("Connect to {}", endpoint); + (endpoint, addr) + }, + ) } /// Creates a connection to the specified address. @@ -147,7 +200,51 @@ impl NetworkController { transport: Transport, addr: impl ToRemoteAddr, ) -> io::Result<(Endpoint, SocketAddr)> { - let (endpoint, addr) = self.connect(transport, addr)?; + self.connect_sync_with(transport.into(), addr) + } + + /// Creates a connection to the specified address with custom transport options for transports + /// that support it. + /// This function is similar to [`NetworkController::connect_with()`] but will block + /// until for the connection is ready. + /// If the connection can not be established, a `ConnectionRefused` error will be returned. + /// + /// Note that the `Connect` event will be also generated. + /// + /// Since this function blocks the current thread, it must NOT be used inside + /// the network callback because the internal event could not be processed. + /// + /// In order to get the best scalability and performance, use the non-blocking + /// [`NetworkController::connect_with()`] version. + /// + /// Example + /// ``` + /// use message_io::node::{self, NodeEvent}; + /// use message_io::network::{TransportConnect, NetEvent}; + /// use message_io::adapters::udp::{UdpConnectConfig}; + /// + /// let (handler, listener) = node::split(); + /// handler.signals().send_with_timer((), std::time::Duration::from_secs(1)); + /// + /// let config = UdpConnectConfig { broadcast: true }; + /// let addr = "255.255.255.255:7777"; + /// match handler.network().connect_sync_with(TransportConnect::Udp(config), addr) { + /// Ok((endpoint, _)) => { + /// println!("Connected!"); + /// handler.network().send(endpoint, &[42]); + /// } + /// Err(err) if err.kind() == std::io::ErrorKind::ConnectionRefused => { + /// println!("Could not connect"); + /// } + /// Err(err) => println!("An OS error creating the socket"), + /// } + /// ``` + pub fn connect_sync_with( + &self, + transport_connect: TransportConnect, + addr: impl ToRemoteAddr, + ) -> io::Result<(Endpoint, SocketAddr)> { + let (endpoint, addr) = self.connect_with(transport_connect, addr)?; loop { std::thread::sleep(Duration::from_millis(1)); match self.is_ready(endpoint.resource_id()) { @@ -164,7 +261,7 @@ impl NetworkController { } /// Listen messages from specified transport. - /// The giver address will be used as interface and listening port. + /// The given address will be used as interface and listening port. /// If the port can be opened, a [ResourceId] identifying the listener is returned /// along with the local address, or an error if not. /// The address is returned despite you passed as parameter because @@ -173,12 +270,29 @@ impl NetworkController { &self, transport: Transport, addr: impl ToSocketAddrs, + ) -> io::Result<(ResourceId, SocketAddr)> { + self.listen_with(transport.into(), addr) + } + + /// Listen messages from specified transport with custom transport options for transports that + /// support it. + /// The given address will be used as interface and listening port. + /// If the port can be opened, a [ResourceId] identifying the listener is returned + /// along with the local address, or an error if not. + /// The address is returned despite you passed as parameter because + /// when a `0` port is specified, the OS will give choose the value. + pub fn listen_with( + &self, + transport_listen: TransportListen, + addr: impl ToSocketAddrs, ) -> io::Result<(ResourceId, SocketAddr)> { let addr = addr.to_socket_addrs().unwrap().next().unwrap(); - self.controllers[transport.id() as usize].listen(addr).map(|(resource_id, addr)| { - log::trace!("Listening at {} by {}", addr, resource_id); - (resource_id, addr) - }) + self.controllers[transport_listen.id() as usize].listen_with(transport_listen, addr).map( + |(resource_id, addr)| { + log::trace!("Listening at {} by {}", addr, resource_id); + (resource_id, addr) + }, + ) } /// Send the data message thought the connection represented by the given endpoint. diff --git a/src/network/adapter.rs b/src/network/adapter.rs index 2befe6a..cb5333c 100644 --- a/src/network/adapter.rs +++ b/src/network/adapter.rs @@ -1,3 +1,5 @@ +use crate::network::transport::{TransportConnect, TransportListen}; + use super::remote_addr::{RemoteAddr}; use super::poll::{Readiness}; @@ -31,7 +33,7 @@ pub trait Resource: Send + Sync { fn source(&mut self) -> &mut dyn Source; } -/// Plain struct used as a returned value of [`Remote::connect()`] +/// Plain struct used as a returned value of [`Remote::connect_with()`] pub struct ConnectionInfo { /// The new created remote resource pub remote: R, @@ -43,7 +45,7 @@ pub struct ConnectionInfo { pub peer_addr: SocketAddr, } -/// Plain struct used as a returned value of [`Local::listen()`] +/// Plain struct used as a returned value of [`Local::listen_with()`] pub struct ListeningInfo { /// The new created local resource pub local: L, @@ -117,10 +119,16 @@ pub enum PendingStatus { pub trait Remote: Resource + Sized { /// Called when the user performs a connection request to an specific remote address. /// The **implementator** is in change of creating the corresponding remote resource. + /// The [`TransportConnect`] wraps custom transport options for transports that support it. It + /// is guaranteed by the upper level to be of the variant matching the adapter. Therefore other + /// variants can be safely ignored. /// The [`RemoteAddr`] contains either a [`SocketAddr`] or a [`url::Url`]. /// It is in charge of deciding what to do in both cases. /// It also must return the extracted address as `SocketAddr`. - fn connect(remote_addr: RemoteAddr) -> io::Result>; + fn connect_with( + config: TransportConnect, + remote_addr: RemoteAddr, + ) -> io::Result>; /// Called when a remote resource received an event. /// The resource must be *ready* to receive this call. @@ -193,7 +201,10 @@ pub trait Local: Resource + Sized { /// The **implementator** is in change of creating the corresponding local resource. /// It also must returned the listening address since it could not be the same as param `addr` /// (e.g. listening from port `0`). - fn listen(addr: SocketAddr) -> io::Result>; + /// The [`TransportListen`] wraps custom transport options for transports that support it. It + /// is guaranteed by the upper level to be of the variant matching the adapter. Therefore other + /// variants can be safely ignored. + fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result>; /// Called when a local resource received an event. /// It means that some resource have tried to connect. diff --git a/src/network/driver.rs b/src/network/driver.rs index cc9aabb..fae9fbf 100644 --- a/src/network/driver.rs +++ b/src/network/driver.rs @@ -4,6 +4,7 @@ use super::poll::{Poll, Readiness}; use super::registry::{ResourceRegistry, Register}; use super::remote_addr::{RemoteAddr}; use super::adapter::{Adapter, Remote, Local, SendStatus, AcceptedType, ReadStatus, PendingStatus}; +use super::transport::{TransportConnect, TransportListen}; use std::net::{SocketAddr}; use std::sync::{ @@ -68,8 +69,16 @@ impl std::fmt::Debug for NetEvent<'_> { } pub trait ActionController: Send + Sync { - fn connect(&self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)>; - fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)>; + fn connect_with( + &self, + config: TransportConnect, + addr: RemoteAddr, + ) -> io::Result<(Endpoint, SocketAddr)>; + fn listen_with( + &self, + config: TransportListen, + addr: SocketAddr, + ) -> io::Result<(ResourceId, SocketAddr)>; fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus; fn remove(&self, id: ResourceId) -> bool; fn is_ready(&self, id: ResourceId) -> Option; @@ -136,8 +145,12 @@ impl Clone for Driver { } impl ActionController for Driver { - fn connect(&self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)> { - R::connect(addr).map(|info| { + fn connect_with( + &self, + config: TransportConnect, + addr: RemoteAddr, + ) -> io::Result<(Endpoint, SocketAddr)> { + R::connect_with(config, addr).map(|info| { let id = self.remote_registry.register( info.remote, RemoteProperties::new(info.peer_addr, None), @@ -147,8 +160,12 @@ impl ActionController for Driver { }) } - fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)> { - L::listen(addr).map(|info| { + fn listen_with( + &self, + config: TransportListen, + addr: SocketAddr, + ) -> io::Result<(ResourceId, SocketAddr)> { + L::listen_with(config, addr).map(|info| { let id = self.local_registry.register(info.local, LocalProperties, false); (id, info.local_addr) }) diff --git a/src/network/loader.rs b/src/network/loader.rs index b775d2b..b50effb 100644 --- a/src/network/loader.rs +++ b/src/network/loader.rs @@ -1,3 +1,5 @@ +use crate::network::{TransportConnect, TransportListen}; + use super::endpoint::{Endpoint}; use super::resource_id::{ResourceId}; use super::poll::{Poll, Readiness}; @@ -62,11 +64,19 @@ const UNIMPLEMENTED_DRIVER_ERR: &str = struct UnimplementedDriver; impl ActionController for UnimplementedDriver { - fn connect(&self, _: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)> { + fn connect_with( + &self, + _: TransportConnect, + _: RemoteAddr, + ) -> io::Result<(Endpoint, SocketAddr)> { panic!("{}", UNIMPLEMENTED_DRIVER_ERR); } - fn listen(&self, _: SocketAddr) -> io::Result<(ResourceId, SocketAddr)> { + fn listen_with( + &self, + _: TransportListen, + _: SocketAddr, + ) -> io::Result<(ResourceId, SocketAddr)> { panic!("{}", UNIMPLEMENTED_DRIVER_ERR); } diff --git a/src/network/transport.rs b/src/network/transport.rs index 75e3864..c804e96 100644 --- a/src/network/transport.rs +++ b/src/network/transport.rs @@ -5,7 +5,7 @@ use crate::adapters::tcp::{TcpAdapter}; #[cfg(feature = "tcp")] use crate::adapters::framed_tcp::{FramedTcpAdapter}; #[cfg(feature = "udp")] -use crate::adapters::udp::{self, UdpAdapter}; +use crate::adapters::udp::{self, UdpAdapter, UdpConnectConfig, UdpListenConfig}; #[cfg(feature = "websocket")] use crate::adapters::ws::{self, WsAdapter}; @@ -156,3 +156,89 @@ impl std::fmt::Display for Transport { write!(f, "{self:?}") } } + +pub enum TransportConnect { + #[cfg(feature = "tcp")] + Tcp, + #[cfg(feature = "tcp")] + FramedTcp, + #[cfg(feature = "udp")] + Udp(UdpConnectConfig), + #[cfg(feature = "websocket")] + Ws, +} + +impl TransportConnect { + pub fn id(&self) -> u8 { + let transport = match self { + #[cfg(feature = "tcp")] + Self::Tcp => Transport::Tcp, + #[cfg(feature = "tcp")] + Self::FramedTcp => Transport::FramedTcp, + #[cfg(feature = "udp")] + Self::Udp(_) => Transport::Udp, + #[cfg(feature = "websocket")] + Self::Ws => Transport::Ws, + }; + + transport.id() + } +} + +impl From for TransportConnect { + fn from(transport: Transport) -> Self { + match transport { + #[cfg(feature = "tcp")] + Transport::Tcp => Self::Tcp, + #[cfg(feature = "tcp")] + Transport::FramedTcp => Self::FramedTcp, + #[cfg(feature = "udp")] + Transport::Udp => Self::Udp(UdpConnectConfig::default()), + #[cfg(feature = "websocket")] + Transport::Ws => Self::Ws, + } + } +} + +pub enum TransportListen { + #[cfg(feature = "tcp")] + Tcp, + #[cfg(feature = "tcp")] + FramedTcp, + #[cfg(feature = "udp")] + Udp(UdpListenConfig), + #[cfg(feature = "websocket")] + Ws, +} + +impl TransportListen { + pub fn id(&self) -> u8 { + let transport = match self { + #[cfg(feature = "tcp")] + Self::Tcp => Transport::Tcp, + #[cfg(feature = "tcp")] + Self::FramedTcp => Transport::FramedTcp, + #[cfg(feature = "udp")] + Self::Udp(_) => Transport::Udp, + #[cfg(feature = "websocket")] + Self::Ws => Transport::Ws, + }; + + transport.id() + } +} + +impl From for TransportListen { + fn from(transport: Transport) -> Self { + match transport { + #[cfg(feature = "tcp")] + Transport::Tcp => Self::Tcp, + #[cfg(feature = "tcp")] + Transport::FramedTcp => Self::FramedTcp, + #[cfg(feature = "udp")] + Transport::Udp => Self::Udp(UdpListenConfig::default()), + #[cfg(feature = "websocket")] + Transport::Ws => Self::Ws, + } + } +}