Skip to content

Commit 10452dc

Browse files
committed
Add support for sendmmsg(2) on linux
https://man7.org/linux/man-pages/man2/sendmmsg.2.html Partially addresses #1156.
1 parent 22e9043 commit 10452dc

File tree

5 files changed

+270
-5
lines changed

5 files changed

+270
-5
lines changed

src/backend/libc/net/syscalls.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ use super::msghdr::with_xdp_msghdr;
77
#[cfg(target_os = "linux")]
88
use super::write_sockaddr::encode_sockaddr_xdp;
99
use crate::backend::c;
10+
#[cfg(target_os = "linux")]
11+
use crate::backend::conv::ret_u32;
1012
use crate::backend::conv::{borrowed_fd, ret, ret_owned_fd, ret_send_recv, send_recv_len};
1113
use crate::fd::{BorrowedFd, OwnedFd};
1214
use crate::io;
1315
#[cfg(target_os = "linux")]
1416
use crate::net::xdp::SocketAddrXdp;
17+
#[cfg(target_os = "linux")]
18+
use crate::net::MMsgHdr;
1519
use crate::net::{SocketAddrAny, SocketAddrV4, SocketAddrV6};
1620
use crate::utils::as_ptr;
1721
use core::mem::{size_of, MaybeUninit};
@@ -455,6 +459,23 @@ pub(crate) fn sendmsg_xdp(
455459
})
456460
}
457461

462+
#[cfg(target_os = "linux")]
463+
pub(crate) fn sendmmsg(
464+
sockfd: BorrowedFd<'_>,
465+
msgs: &mut [MMsgHdr],
466+
flags: SendFlags,
467+
) -> io::Result<usize> {
468+
unsafe {
469+
ret_u32(c::sendmmsg(
470+
borrowed_fd(sockfd),
471+
msgs.as_mut_ptr() as _,
472+
msgs.len().try_into().unwrap_or(c::c_uint::MAX),
473+
bitflags_bits!(flags),
474+
))
475+
.map(|ret| ret as usize)
476+
}
477+
}
478+
458479
#[cfg(not(any(
459480
apple,
460481
windows,

src/backend/linux_raw/c.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ pub(crate) use linux_raw_sys::{
5656
general::{O_CLOEXEC as SOCK_CLOEXEC, O_NONBLOCK as SOCK_NONBLOCK},
5757
if_ether::*,
5858
net::{
59-
linger, msghdr, sockaddr, sockaddr_in, sockaddr_in6, sockaddr_un, socklen_t, AF_DECnet,
6059
__kernel_sa_family_t as sa_family_t, __kernel_sockaddr_storage as sockaddr_storage,
61-
cmsghdr, in6_addr, in_addr, ip_mreq, ip_mreq_source, ip_mreqn, ipv6_mreq, AF_APPLETALK,
62-
AF_ASH, AF_ATMPVC, AF_ATMSVC, AF_AX25, AF_BLUETOOTH, AF_BRIDGE, AF_CAN, AF_ECONET,
63-
AF_IEEE802154, AF_INET, AF_INET6, AF_IPX, AF_IRDA, AF_ISDN, AF_IUCV, AF_KEY, AF_LLC,
64-
AF_NETBEUI, AF_NETLINK, AF_NETROM, AF_PACKET, AF_PHONET, AF_PPPOX, AF_RDS, AF_ROSE,
60+
cmsghdr, in6_addr, in_addr, ip_mreq, ip_mreq_source, ip_mreqn, ipv6_mreq, linger, mmsghdr,
61+
msghdr, sockaddr, sockaddr_in, sockaddr_in6, sockaddr_un, socklen_t, AF_DECnet,
62+
AF_APPLETALK, AF_ASH, AF_ATMPVC, AF_ATMSVC, AF_AX25, AF_BLUETOOTH, AF_BRIDGE, AF_CAN,
63+
AF_ECONET, AF_IEEE802154, AF_INET, AF_INET6, AF_IPX, AF_IRDA, AF_ISDN, AF_IUCV, AF_KEY,
64+
AF_LLC, AF_NETBEUI, AF_NETLINK, AF_NETROM, AF_PACKET, AF_PHONET, AF_PPPOX, AF_RDS, AF_ROSE,
6565
AF_RXRPC, AF_SECURITY, AF_SNA, AF_TIPC, AF_UNIX, AF_UNSPEC, AF_WANPIPE, AF_X25, AF_XDP,
6666
IP6T_SO_ORIGINAL_DST, IPPROTO_FRAGMENT, IPPROTO_ICMPV6, IPPROTO_MH, IPPROTO_ROUTING,
6767
IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP, IPV6_FREEBIND, IPV6_MULTICAST_HOPS,

src/backend/linux_raw/net/syscalls.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use super::send_recv::{RecvFlags, SendFlags};
1616
use super::write_sockaddr::encode_sockaddr_xdp;
1717
use super::write_sockaddr::{encode_sockaddr_v4, encode_sockaddr_v6};
1818
use crate::backend::c;
19+
#[cfg(target_os = "linux")]
20+
use crate::backend::conv::slice_mut;
1921
use crate::backend::conv::{
2022
by_mut, by_ref, c_int, c_uint, pass_usize, ret, ret_owned_fd, ret_usize, size_of, slice,
2123
socklen_t, zero,
@@ -24,6 +26,8 @@ use crate::fd::{BorrowedFd, OwnedFd};
2426
use crate::io::{self, IoSlice, IoSliceMut};
2527
#[cfg(target_os = "linux")]
2628
use crate::net::xdp::SocketAddrXdp;
29+
#[cfg(target_os = "linux")]
30+
use crate::net::MMsgHdr;
2731
use crate::net::{
2832
AddressFamily, Protocol, RecvAncillaryBuffer, RecvMsgReturn, SendAncillaryBuffer, Shutdown,
2933
SocketAddrAny, SocketAddrUnix, SocketAddrV4, SocketAddrV6, SocketFlags, SocketType,
@@ -439,6 +443,30 @@ pub(crate) fn sendmsg_xdp(
439443
})
440444
}
441445

446+
#[cfg(target_os = "linux")]
447+
#[inline]
448+
pub(crate) fn sendmmsg(
449+
sockfd: BorrowedFd<'_>,
450+
msgs: &mut [MMsgHdr<'_>],
451+
flags: SendFlags,
452+
) -> io::Result<usize> {
453+
let (msgs, len) = slice_mut(msgs);
454+
455+
#[cfg(not(target_arch = "x86"))]
456+
let result = unsafe { ret_usize(syscall!(__NR_sendmmsg, sockfd, msgs, len, flags)) };
457+
458+
#[cfg(target_arch = "x86")]
459+
let result = unsafe {
460+
ret_usize(syscall!(
461+
__NR_socketcall,
462+
x86_sys(SYS_SENDMMSG),
463+
slice_just_addr::<ArgReg<'_, SocketArg>, _>(&[sockfd.into(), msgs, len, flags.into()])
464+
))
465+
};
466+
467+
result
468+
}
469+
442470
#[inline]
443471
pub(crate) fn shutdown(fd: BorrowedFd<'_>, how: Shutdown) -> io::Result<()> {
444472
#[cfg(not(target_arch = "x86"))]

src/net/send_recv/msg.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,17 @@
22
33
#![allow(unsafe_code)]
44

5+
#[cfg(target_os = "linux")]
6+
use crate::backend::net::msghdr::{
7+
with_noaddr_msghdr, with_unix_msghdr, with_v4_msghdr, with_v6_msghdr, with_xdp_msghdr,
8+
};
59
use crate::backend::{self, c};
610
use crate::fd::{AsFd, BorrowedFd, OwnedFd};
711
use crate::io::{self, IoSlice, IoSliceMut};
812
#[cfg(linux_kernel)]
913
use crate::net::UCred;
14+
#[cfg(target_os = "linux")]
15+
use crate::net::{xdp::SocketAddrXdp, SocketAddrUnix};
1016

1117
use core::iter::FusedIterator;
1218
use core::marker::PhantomData;
@@ -591,6 +597,94 @@ impl<'buf> Iterator for AncillaryDrain<'buf> {
591597

592598
impl FusedIterator for AncillaryDrain<'_> {}
593599

600+
/// An ABI-compatible wrapper for `mmsghdr`, for sending multiple messages with
601+
/// [sendmmsg].
602+
#[cfg(target_os = "linux")]
603+
#[repr(transparent)]
604+
pub struct MMsgHdr<'a> {
605+
raw: c::mmsghdr,
606+
_phantom: PhantomData<&'a mut ()>,
607+
}
608+
609+
#[cfg(target_os = "linux")]
610+
impl<'a> MMsgHdr<'a> {
611+
/// Constructs a new message with no destination address.
612+
pub fn new(iov: &[IoSlice<'a>], control: &mut SendAncillaryBuffer<'_, '_, '_>) -> Self {
613+
with_noaddr_msghdr(iov, control, |msg_hdr| Self {
614+
raw: c::mmsghdr {
615+
msg_hdr,
616+
msg_len: 0,
617+
},
618+
_phantom: PhantomData,
619+
})
620+
}
621+
622+
/// Constructs a new message to a specific IPv4 address.
623+
pub fn new_v4(
624+
addr: &SocketAddrV4,
625+
iov: &[IoSlice<'a>],
626+
control: &mut SendAncillaryBuffer<'_, '_, '_>,
627+
) -> Self {
628+
with_v4_msghdr(addr, iov, control, |msg_hdr| Self {
629+
raw: c::mmsghdr {
630+
msg_hdr,
631+
msg_len: 0,
632+
},
633+
_phantom: PhantomData,
634+
})
635+
}
636+
637+
/// Constructs a new message to a specific IPv6 address.
638+
pub fn new_v6(
639+
addr: &SocketAddrV6,
640+
iov: &[IoSlice<'a>],
641+
control: &mut SendAncillaryBuffer<'_, '_, '_>,
642+
) -> Self {
643+
with_v6_msghdr(addr, iov, control, |msg_hdr| Self {
644+
raw: c::mmsghdr {
645+
msg_hdr,
646+
msg_len: 0,
647+
},
648+
_phantom: PhantomData,
649+
})
650+
}
651+
652+
/// Constructs a new message to a specific Unix-domain address.
653+
pub fn new_unix(
654+
addr: &SocketAddrUnix,
655+
iov: &[IoSlice<'a>],
656+
control: &mut SendAncillaryBuffer<'_, '_, '_>,
657+
) -> Self {
658+
with_unix_msghdr(addr, iov, control, |msg_hdr| Self {
659+
raw: c::mmsghdr {
660+
msg_hdr,
661+
msg_len: 0,
662+
},
663+
_phantom: PhantomData,
664+
})
665+
}
666+
667+
/// Constructs a new message to a specific XDP address.
668+
pub fn new_xdp(
669+
addr: &SocketAddrXdp,
670+
iov: &[IoSlice<'a>],
671+
control: &mut SendAncillaryBuffer<'_, '_, '_>,
672+
) -> Self {
673+
with_xdp_msghdr(addr, iov, control, |msg_hdr| Self {
674+
raw: c::mmsghdr {
675+
msg_hdr,
676+
msg_len: 0,
677+
},
678+
_phantom: PhantomData,
679+
})
680+
}
681+
682+
/// Returns the number of bytes sent by [sendmmsg].
683+
pub fn bytes(&self) -> usize {
684+
self.raw.msg_len as _
685+
}
686+
}
687+
594688
/// `sendmsg(msghdr)`—Sends a message on a socket.
595689
///
596690
/// # References
@@ -781,6 +875,22 @@ pub fn sendmsg_any(
781875
}
782876
}
783877

878+
/// `sendmmsg(msghdr)`—Sends multiple messages on a socket.
879+
///
880+
/// # References
881+
/// - [Linux]
882+
///
883+
/// [Linux]: https://man7.org/linux/man-pages/man2/sendmmsg.2.html
884+
#[inline]
885+
#[cfg(target_os = "linux")]
886+
pub fn sendmmsg(
887+
socket: impl AsFd,
888+
msgs: &mut [MMsgHdr<'_>],
889+
flags: SendFlags,
890+
) -> io::Result<usize> {
891+
backend::net::syscalls::sendmmsg(socket.as_fd(), msgs, flags)
892+
}
893+
784894
/// `recvmsg(msghdr)`—Receives a message from a socket.
785895
///
786896
/// # References

tests/net/v4.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
55
#![cfg(not(any(target_os = "redox", target_os = "wasi")))]
66

7+
#[cfg(target_os = "linux")]
8+
use rustix::net::MMsgHdr;
79
use rustix::net::{
810
accept, bind_v4, connect_v4, getsockname, listen, recv, send, socket, AddressFamily, Ipv4Addr,
911
RecvFlags, SendFlags, SocketAddrAny, SocketAddrV4, SocketType,
@@ -194,3 +196,107 @@ fn test_v4_msg() {
194196
client.join().unwrap();
195197
server.join().unwrap();
196198
}
199+
200+
#[test]
201+
#[cfg(target_os = "linux")]
202+
fn test_v4_sendmmsg() {
203+
crate::init();
204+
205+
use rustix::io::{IoSlice, IoSliceMut};
206+
use rustix::net::{recvmsg, sendmmsg};
207+
208+
fn server(ready: Arc<(Mutex<u16>, Condvar)>) {
209+
let connection_socket = socket(AddressFamily::INET, SocketType::STREAM, None).unwrap();
210+
211+
let name = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0);
212+
bind_v4(&connection_socket, &name).unwrap();
213+
214+
let who = match getsockname(&connection_socket).unwrap() {
215+
SocketAddrAny::V4(addr) => addr,
216+
_ => panic!(),
217+
};
218+
219+
listen(&connection_socket, 1).unwrap();
220+
221+
{
222+
let (lock, cvar) = &*ready;
223+
let mut port = lock.lock().unwrap();
224+
*port = who.port();
225+
cvar.notify_all();
226+
}
227+
228+
let mut buffer = vec![0; BUFFER_SIZE];
229+
let data_socket = accept(&connection_socket).unwrap();
230+
231+
let res = recvmsg(
232+
&data_socket,
233+
&mut [IoSliceMut::new(&mut buffer)],
234+
&mut Default::default(),
235+
RecvFlags::empty(),
236+
)
237+
.unwrap();
238+
assert_eq!(String::from_utf8_lossy(&buffer[..res.bytes]), "hello");
239+
240+
let res = recvmsg(
241+
&data_socket,
242+
&mut [IoSliceMut::new(&mut buffer)],
243+
&mut Default::default(),
244+
RecvFlags::empty(),
245+
)
246+
.unwrap();
247+
assert_eq!(String::from_utf8_lossy(&buffer[..res.bytes]), "...world");
248+
}
249+
250+
fn client(ready: Arc<(Mutex<u16>, Condvar)>) {
251+
let port = {
252+
let (lock, cvar) = &*ready;
253+
let mut port = lock.lock().unwrap();
254+
while *port == 0 {
255+
port = cvar.wait(port).unwrap();
256+
}
257+
*port
258+
};
259+
260+
let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port);
261+
let data_socket = socket(AddressFamily::INET, SocketType::STREAM, None).unwrap();
262+
connect_v4(&data_socket, &addr).unwrap();
263+
264+
let mut off = 0;
265+
loop {
266+
let sent = sendmmsg(
267+
&data_socket,
268+
&mut [
269+
MMsgHdr::new(&[IoSlice::new(b"hello")], &mut Default::default()),
270+
MMsgHdr::new(&[IoSlice::new(b"...world")], &mut Default::default()),
271+
][off..],
272+
SendFlags::empty(),
273+
)
274+
.unwrap();
275+
276+
off += sent;
277+
if off >= 2 {
278+
break;
279+
}
280+
}
281+
}
282+
283+
let ready = Arc::new((Mutex::new(0_u16), Condvar::new()));
284+
let ready_clone = Arc::clone(&ready);
285+
286+
let server = thread::Builder::new()
287+
.name("server".to_string())
288+
.spawn(move || {
289+
server(ready);
290+
})
291+
.unwrap();
292+
293+
let client = thread::Builder::new()
294+
.name("client".to_string())
295+
.spawn(move || {
296+
client(ready_clone);
297+
})
298+
.unwrap();
299+
300+
client.join().unwrap();
301+
server.join().unwrap();
302+
}

0 commit comments

Comments
 (0)