Skip to content

Commit e960980

Browse files
committed
Add support for sendmmsg(2) on linux
https://man7.org/linux/man-pages/man2/sendmmsg.2.html Partially addresses #1156. Signed-off-by: Colin Marc <[email protected]>
1 parent 77f3f20 commit e960980

File tree

8 files changed

+301
-9
lines changed

8 files changed

+301
-9
lines changed

src/backend/libc/net/msghdr.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use crate::backend::net::write_sockaddr::{encode_sockaddr_v4, encode_sockaddr_v6
1212
use crate::io::{self, IoSlice, IoSliceMut};
1313
#[cfg(target_os = "linux")]
1414
use crate::net::xdp::SocketAddrXdp;
15-
use crate::net::{RecvAncillaryBuffer, SendAncillaryBuffer, SocketAddrV4, SocketAddrV6};
15+
use crate::net::{
16+
RawSocketAddr, RecvAncillaryBuffer, SendAncillaryBuffer, SocketAddrV4, SocketAddrV6,
17+
};
1618
use crate::utils::as_ptr;
1719

1820
use core::mem::{size_of, zeroed, MaybeUninit};

src/backend/libc/net/syscalls.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use super::msghdr::with_xdp_msghdr;
77
#[cfg(target_os = "linux")]
88
use super::write_sockaddr::encode_sockaddr_xdp;
99
use crate::backend::c;
10+
#[cfg(target_os = "linux")]
11+
use crate::backend::conv::ret_u32;
1012
use crate::backend::conv::{borrowed_fd, ret, ret_owned_fd, ret_send_recv, send_recv_len};
1113
use crate::fd::{BorrowedFd, OwnedFd};
1214
use crate::io;
@@ -482,6 +484,23 @@ pub(crate) fn sendmsg_raw(
482484
})
483485
}
484486

487+
#[cfg(target_os = "linux")]
488+
pub(crate) fn sendmmsg(
489+
sockfd: BorrowedFd<'_>,
490+
msgs: &mut [MMsgHdr<'_>],
491+
flags: SendFlags,
492+
) -> io::Result<usize> {
493+
unsafe {
494+
ret_u32(c::sendmmsg(
495+
borrowed_fd(sockfd),
496+
msgs.as_mut_ptr() as _,
497+
msgs.len().try_into().unwrap_or(c::c_uint::MAX),
498+
bitflags_bits!(flags),
499+
))
500+
.map(|ret| ret as usize)
501+
}
502+
}
503+
485504
#[cfg(not(any(
486505
apple,
487506
windows,

src/backend/linux_raw/c.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ pub(crate) use linux_raw_sys::{
5656
general::{O_CLOEXEC as SOCK_CLOEXEC, O_NONBLOCK as SOCK_NONBLOCK},
5757
if_ether::*,
5858
net::{
59-
linger, msghdr, sockaddr, sockaddr_in, sockaddr_in6, sockaddr_un, socklen_t, AF_DECnet,
6059
__kernel_sa_family_t as sa_family_t, __kernel_sockaddr_storage as sockaddr_storage,
61-
cmsghdr, in6_addr, in_addr, ip_mreq, ip_mreq_source, ip_mreqn, ipv6_mreq, AF_APPLETALK,
62-
AF_ASH, AF_ATMPVC, AF_ATMSVC, AF_AX25, AF_BLUETOOTH, AF_BRIDGE, AF_CAN, AF_ECONET,
63-
AF_IEEE802154, AF_INET, AF_INET6, AF_IPX, AF_IRDA, AF_ISDN, AF_IUCV, AF_KEY, AF_LLC,
64-
AF_NETBEUI, AF_NETLINK, AF_NETROM, AF_PACKET, AF_PHONET, AF_PPPOX, AF_RDS, AF_ROSE,
60+
cmsghdr, in6_addr, in_addr, ip_mreq, ip_mreq_source, ip_mreqn, ipv6_mreq, linger, mmsghdr,
61+
msghdr, sockaddr, sockaddr_in, sockaddr_in6, sockaddr_un, socklen_t, AF_DECnet,
62+
AF_APPLETALK, AF_ASH, AF_ATMPVC, AF_ATMSVC, AF_AX25, AF_BLUETOOTH, AF_BRIDGE, AF_CAN,
63+
AF_ECONET, AF_IEEE802154, AF_INET, AF_INET6, AF_IPX, AF_IRDA, AF_ISDN, AF_IUCV, AF_KEY,
64+
AF_LLC, AF_NETBEUI, AF_NETLINK, AF_NETROM, AF_PACKET, AF_PHONET, AF_PPPOX, AF_RDS, AF_ROSE,
6565
AF_RXRPC, AF_SECURITY, AF_SNA, AF_TIPC, AF_UNIX, AF_UNSPEC, AF_WANPIPE, AF_X25, AF_XDP,
6666
IP6T_SO_ORIGINAL_DST, IPPROTO_FRAGMENT, IPPROTO_ICMPV6, IPPROTO_MH, IPPROTO_ROUTING,
6767
IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP, IPV6_FREEBIND, IPV6_MULTICAST_HOPS,

src/backend/linux_raw/net/msghdr.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ use crate::backend::c;
99
#[cfg(target_os = "linux")]
1010
use crate::backend::net::write_sockaddr::encode_sockaddr_xdp;
1111
use crate::backend::net::write_sockaddr::{encode_sockaddr_v4, encode_sockaddr_v6};
12-
1312
use crate::io::{self, IoSlice, IoSliceMut};
1413
#[cfg(target_os = "linux")]
1514
use crate::net::xdp::SocketAddrXdp;
15+
#[cfg(target_os = "linux")]
16+
use crate::net::RawSocketAddr;
1617
use crate::net::{RecvAncillaryBuffer, SendAncillaryBuffer, SocketAddrV4, SocketAddrV6};
1718
use crate::utils::as_ptr;
1819

src/backend/linux_raw/net/syscalls.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use super::send_recv::{RecvFlags, SendFlags};
1717
use super::write_sockaddr::encode_sockaddr_xdp;
1818
use super::write_sockaddr::{encode_sockaddr_v4, encode_sockaddr_v6};
1919
use crate::backend::c;
20+
#[cfg(target_os = "linux")]
21+
use crate::backend::conv::slice_mut;
2022
use crate::backend::conv::{
2123
by_mut, by_ref, c_int, c_uint, pass_usize, ret, ret_owned_fd, ret_usize, size_of, slice,
2224
socklen_t, zero,
@@ -25,6 +27,8 @@ use crate::fd::{BorrowedFd, OwnedFd};
2527
use crate::io::{self, IoSlice, IoSliceMut};
2628
#[cfg(target_os = "linux")]
2729
use crate::net::xdp::SocketAddrXdp;
30+
#[cfg(target_os = "linux")]
31+
use crate::net::MMsgHdr;
2832
use crate::net::{
2933
AddressFamily, Protocol, RawSocketAddr, RecvAncillaryBuffer, RecvMsgReturn,
3034
SendAncillaryBuffer, Shutdown, SocketAddrAny, SocketAddrUnix, SocketAddrV4, SocketAddrV6,
@@ -38,8 +42,8 @@ use {
3842
crate::backend::reg::{ArgReg, SocketArg},
3943
linux_raw_sys::net::{
4044
SYS_ACCEPT, SYS_ACCEPT4, SYS_BIND, SYS_CONNECT, SYS_GETPEERNAME, SYS_GETSOCKNAME,
41-
SYS_LISTEN, SYS_RECV, SYS_RECVFROM, SYS_RECVMSG, SYS_SEND, SYS_SENDMSG, SYS_SENDTO,
42-
SYS_SHUTDOWN, SYS_SOCKET, SYS_SOCKETPAIR,
45+
SYS_LISTEN, SYS_RECV, SYS_RECVFROM, SYS_RECVMSG, SYS_SEND, SYS_SENDMMSG, SYS_SENDMSG,
46+
SYS_SENDTO, SYS_SHUTDOWN, SYS_SOCKET, SYS_SOCKETPAIR,
4347
},
4448
};
4549

@@ -478,6 +482,30 @@ pub(crate) fn sendmsg_raw(
478482
})
479483
}
480484

485+
#[cfg(target_os = "linux")]
486+
#[inline]
487+
pub(crate) fn sendmmsg(
488+
sockfd: BorrowedFd<'_>,
489+
msgs: &mut [MMsgHdr<'_>],
490+
flags: SendFlags,
491+
) -> io::Result<usize> {
492+
let (msgs, len) = slice_mut(msgs);
493+
494+
#[cfg(not(target_arch = "x86"))]
495+
let result = unsafe { ret_usize(syscall!(__NR_sendmmsg, sockfd, msgs, len, flags)) };
496+
497+
#[cfg(target_arch = "x86")]
498+
let result = unsafe {
499+
ret_usize(syscall!(
500+
__NR_socketcall,
501+
x86_sys(SYS_SENDMMSG),
502+
slice_just_addr::<ArgReg<'_, SocketArg>, _>(&[sockfd.into(), msgs, len, flags.into()])
503+
))
504+
};
505+
506+
result
507+
}
508+
481509
#[inline]
482510
pub(crate) fn shutdown(fd: BorrowedFd<'_>, how: Shutdown) -> io::Result<()> {
483511
#[cfg(not(target_arch = "x86"))]

src/net/send_recv/msg.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
33
#![allow(unsafe_code)]
44

5+
#[cfg(target_os = "linux")]
6+
use crate::backend::net::msghdr::{with_noaddr_msghdr, with_raw_msghdr};
57
use crate::backend::{self, c};
68
use crate::fd::{AsFd, BorrowedFd, OwnedFd};
79
use crate::io::{self, IoSlice, IoSliceMut};
@@ -592,6 +594,48 @@ impl<'buf> Iterator for AncillaryDrain<'buf> {
592594

593595
impl FusedIterator for AncillaryDrain<'_> {}
594596

597+
/// An ABI-compatible wrapper for `mmsghdr`, for sending multiple messages with
598+
/// [sendmmsg].
599+
#[cfg(target_os = "linux")]
600+
#[repr(transparent)]
601+
pub struct MMsgHdr<'a> {
602+
raw: c::mmsghdr,
603+
_phantom: PhantomData<&'a mut ()>,
604+
}
605+
606+
#[cfg(target_os = "linux")]
607+
impl<'a> MMsgHdr<'a> {
608+
/// Constructs a new message with no destination address.
609+
pub fn new(iov: &'a [IoSlice<'_>], control: &'a mut SendAncillaryBuffer<'_, '_, '_>) -> Self {
610+
with_noaddr_msghdr(iov, control, Self::wrap)
611+
}
612+
613+
/// Constructs a new message to a specific address.
614+
pub fn new_with_addr(
615+
addr: &'a RawSocketAddr,
616+
iov: &'a [IoSlice<'_>],
617+
control: &'a mut SendAncillaryBuffer<'_, '_, '_>,
618+
) -> MMsgHdr<'a> {
619+
with_raw_msghdr(addr, iov, control, Self::wrap)
620+
}
621+
622+
fn wrap(msg_hdr: c::msghdr) -> Self {
623+
Self {
624+
raw: c::mmsghdr {
625+
msg_hdr,
626+
msg_len: 0,
627+
},
628+
_phantom: PhantomData,
629+
}
630+
}
631+
632+
/// Returns the number of bytes sent. This will return 0 until after a
633+
/// successful call to [sendmmsg].
634+
pub fn bytes_sent(&self) -> usize {
635+
self.raw.msg_len as _
636+
}
637+
}
638+
595639
/// `sendmsg(msghdr)`—Sends a message on a socket.
596640
///
597641
/// # References
@@ -818,6 +862,22 @@ pub fn sendmsg_raw(
818862
}
819863
}
820864

865+
/// `sendmmsg(msghdr)`—Sends multiple messages on a socket.
866+
///
867+
/// # References
868+
/// - [Linux]
869+
///
870+
/// [Linux]: https://man7.org/linux/man-pages/man2/sendmmsg.2.html
871+
#[inline]
872+
#[cfg(target_os = "linux")]
873+
pub fn sendmmsg(
874+
socket: impl AsFd,
875+
msgs: &mut [MMsgHdr<'_>],
876+
flags: SendFlags,
877+
) -> io::Result<usize> {
878+
backend::net::syscalls::sendmmsg(socket.as_fd(), msgs, flags)
879+
}
880+
821881
/// `recvmsg(msghdr)`—Receives a message from a socket.
822882
///
823883
/// # References

tests/net/v4.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,91 @@ fn test_v4_msg() {
194194
client.join().unwrap();
195195
server.join().unwrap();
196196
}
197+
198+
#[test]
199+
#[cfg(target_os = "linux")]
200+
fn test_v4_sendmmsg() {
201+
crate::init();
202+
203+
use std::net::TcpStream;
204+
205+
use rustix::io::IoSlice;
206+
use rustix::net::{sendmmsg, MMsgHdr};
207+
208+
fn server(ready: Arc<(Mutex<u16>, Condvar)>) {
209+
let connection_socket = socket(AddressFamily::INET, SocketType::STREAM, None).unwrap();
210+
211+
let name = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0);
212+
bind_v4(&connection_socket, &name).unwrap();
213+
214+
let who = match getsockname(&connection_socket).unwrap() {
215+
SocketAddrAny::V4(addr) => addr,
216+
_ => panic!(),
217+
};
218+
219+
listen(&connection_socket, 1).unwrap();
220+
221+
{
222+
let (lock, cvar) = &*ready;
223+
let mut port = lock.lock().unwrap();
224+
*port = who.port();
225+
cvar.notify_all();
226+
}
227+
228+
let mut buffer = vec![0; 13];
229+
let mut data_socket: TcpStream = accept(&connection_socket).unwrap().into();
230+
231+
std::io::Read::read_exact(&mut data_socket, &mut buffer).unwrap();
232+
assert_eq!(String::from_utf8_lossy(&buffer), "hello...world");
233+
}
234+
235+
fn client(ready: Arc<(Mutex<u16>, Condvar)>) {
236+
let port = {
237+
let (lock, cvar) = &*ready;
238+
let mut port = lock.lock().unwrap();
239+
while *port == 0 {
240+
port = cvar.wait(port).unwrap();
241+
}
242+
*port
243+
};
244+
245+
let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port);
246+
let data_socket = socket(AddressFamily::INET, SocketType::STREAM, None).unwrap();
247+
connect_v4(&data_socket, &addr).unwrap();
248+
249+
let mut off = 0;
250+
while off < 2 {
251+
let sent = sendmmsg(
252+
&data_socket,
253+
&mut [
254+
MMsgHdr::new(&[IoSlice::new(b"hello")], &mut Default::default()),
255+
MMsgHdr::new(&[IoSlice::new(b"...world")], &mut Default::default()),
256+
][off..],
257+
SendFlags::empty(),
258+
)
259+
.unwrap();
260+
261+
off += sent;
262+
}
263+
}
264+
265+
let ready = Arc::new((Mutex::new(0_u16), Condvar::new()));
266+
let ready_clone = Arc::clone(&ready);
267+
268+
let server = thread::Builder::new()
269+
.name("server".to_string())
270+
.spawn(move || {
271+
server(ready);
272+
})
273+
.unwrap();
274+
275+
let client = thread::Builder::new()
276+
.name("client".to_string())
277+
.spawn(move || {
278+
client(ready_clone);
279+
})
280+
.unwrap();
281+
282+
client.join().unwrap();
283+
server.join().unwrap();
284+
}

0 commit comments

Comments
 (0)