Skip to content

Add support for sendmmsg(2) on linux #1171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/backend/libc/net/syscalls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
use super::read_sockaddr::initialize_family_to_unspec;
use super::send_recv::{RecvFlags, SendFlags};
use crate::backend::c;
#[cfg(target_os = "linux")]
use crate::backend::conv::ret_u32;
use crate::backend::conv::{borrowed_fd, ret, ret_owned_fd, ret_send_recv, send_recv_len};
use crate::fd::{BorrowedFd, OwnedFd};
use crate::io;
#[cfg(target_os = "linux")]
use crate::net::MMsgHdr;
use crate::net::SocketAddrBuf;
use crate::net::{
addr::SocketAddrArg, AddressFamily, Protocol, Shutdown, SocketAddrAny, SocketFlags, SocketType,
Expand Down Expand Up @@ -231,6 +235,23 @@ pub(crate) fn sendmsg_addr(
})
}

#[cfg(target_os = "linux")]
pub(crate) fn sendmmsg(
sockfd: BorrowedFd<'_>,
msgs: &mut [MMsgHdr<'_>],
flags: SendFlags,
) -> io::Result<usize> {
unsafe {
ret_u32(c::sendmmsg(
borrowed_fd(sockfd),
msgs.as_mut_ptr() as _,
msgs.len().try_into().unwrap_or(c::c_uint::MAX),
bitflags_bits!(flags),
))
.map(|ret| ret as usize)
}
}

#[cfg(not(any(
apple,
windows,
Expand Down
10 changes: 5 additions & 5 deletions src/backend/linux_raw/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ pub(crate) use linux_raw_sys::{
general::{O_CLOEXEC as SOCK_CLOEXEC, O_NONBLOCK as SOCK_NONBLOCK},
if_ether::*,
net::{
linger, msghdr, sockaddr, sockaddr_in, sockaddr_in6, sockaddr_un, socklen_t, AF_DECnet,
__kernel_sa_family_t as sa_family_t, __kernel_sockaddr_storage as sockaddr_storage,
cmsghdr, in6_addr, in_addr, ip_mreq, ip_mreq_source, ip_mreqn, ipv6_mreq, AF_APPLETALK,
AF_ASH, AF_ATMPVC, AF_ATMSVC, AF_AX25, AF_BLUETOOTH, AF_BRIDGE, AF_CAN, AF_ECONET,
AF_IEEE802154, AF_INET, AF_INET6, AF_IPX, AF_IRDA, AF_ISDN, AF_IUCV, AF_KEY, AF_LLC,
AF_NETBEUI, AF_NETLINK, AF_NETROM, AF_PACKET, AF_PHONET, AF_PPPOX, AF_RDS, AF_ROSE,
cmsghdr, in6_addr, in_addr, ip_mreq, ip_mreq_source, ip_mreqn, ipv6_mreq, linger, mmsghdr,
msghdr, sockaddr, sockaddr_in, sockaddr_in6, sockaddr_un, socklen_t, AF_DECnet,
AF_APPLETALK, AF_ASH, AF_ATMPVC, AF_ATMSVC, AF_AX25, AF_BLUETOOTH, AF_BRIDGE, AF_CAN,
AF_ECONET, AF_IEEE802154, AF_INET, AF_INET6, AF_IPX, AF_IRDA, AF_ISDN, AF_IUCV, AF_KEY,
AF_LLC, AF_NETBEUI, AF_NETLINK, AF_NETROM, AF_PACKET, AF_PHONET, AF_PPPOX, AF_RDS, AF_ROSE,
AF_RXRPC, AF_SECURITY, AF_SNA, AF_TIPC, AF_UNIX, AF_UNSPEC, AF_WANPIPE, AF_X25, AF_XDP,
IP6T_SO_ORIGINAL_DST, IPPROTO_FRAGMENT, IPPROTO_ICMPV6, IPPROTO_MH, IPPROTO_ROUTING,
IPV6_ADD_MEMBERSHIP, IPV6_DROP_MEMBERSHIP, IPV6_FREEBIND, IPV6_MULTICAST_HOPS,
Expand Down
32 changes: 30 additions & 2 deletions src/backend/linux_raw/net/syscalls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ use super::msghdr::{with_msghdr, with_noaddr_msghdr, with_recv_msghdr};
use super::read_sockaddr::initialize_family_to_unspec;
use super::send_recv::{RecvFlags, ReturnFlags, SendFlags};
use crate::backend::c;
#[cfg(target_os = "linux")]
use crate::backend::conv::slice_mut;
use crate::backend::conv::{
by_mut, by_ref, c_int, c_uint, pass_usize, ret, ret_owned_fd, ret_usize, size_of, slice,
socklen_t, zero,
};
use crate::backend::reg::raw_arg;
use crate::fd::{BorrowedFd, OwnedFd};
use crate::io::{self, IoSlice, IoSliceMut};
#[cfg(target_os = "linux")]
use crate::net::MMsgHdr;
use crate::net::SocketAddrBuf;
use crate::net::{
addr::SocketAddrArg, AddressFamily, Protocol, RecvAncillaryBuffer, RecvMsg,
Expand All @@ -28,8 +32,8 @@ use {
crate::backend::reg::{ArgReg, SocketArg},
linux_raw_sys::net::{
SYS_ACCEPT, SYS_ACCEPT4, SYS_BIND, SYS_CONNECT, SYS_GETPEERNAME, SYS_GETSOCKNAME,
SYS_LISTEN, SYS_RECV, SYS_RECVFROM, SYS_RECVMSG, SYS_SEND, SYS_SENDMSG, SYS_SENDTO,
SYS_SHUTDOWN, SYS_SOCKET, SYS_SOCKETPAIR,
SYS_LISTEN, SYS_RECV, SYS_RECVFROM, SYS_RECVMSG, SYS_SEND, SYS_SENDMMSG, SYS_SENDMSG,
SYS_SENDTO, SYS_SHUTDOWN, SYS_SOCKET, SYS_SOCKETPAIR,
},
};

Expand Down Expand Up @@ -331,6 +335,30 @@ pub(crate) fn sendmsg_addr(
})
}

#[cfg(target_os = "linux")]
#[inline]
pub(crate) fn sendmmsg(
sockfd: BorrowedFd<'_>,
msgs: &mut [MMsgHdr<'_>],
flags: SendFlags,
) -> io::Result<usize> {
let (msgs, len) = slice_mut(msgs);

#[cfg(not(target_arch = "x86"))]
let result = unsafe { ret_usize(syscall!(__NR_sendmmsg, sockfd, msgs, len, flags)) };

#[cfg(target_arch = "x86")]
let result = unsafe {
ret_usize(syscall!(
__NR_socketcall,
x86_sys(SYS_SENDMMSG),
slice_just_addr::<ArgReg<'_, SocketArg>, _>(&[sockfd.into(), msgs, len, flags.into()])
))
};

result
}

#[inline]
pub(crate) fn shutdown(fd: BorrowedFd<'_>, how: Shutdown) -> io::Result<()> {
#[cfg(not(target_arch = "x86"))]
Expand Down
67 changes: 67 additions & 0 deletions src/net/send_recv/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#![allow(unsafe_code)]

#[cfg(target_os = "linux")]
use crate::backend::net::msghdr::{with_msghdr, with_noaddr_msghdr};
use crate::backend::{self, c};
use crate::fd::{AsFd, BorrowedFd, OwnedFd};
use crate::io::{self, IoSlice, IoSliceMut};
Expand Down Expand Up @@ -591,6 +593,55 @@ impl<'buf> Iterator for AncillaryDrain<'buf> {

impl FusedIterator for AncillaryDrain<'_> {}

/// An ABI-compatible wrapper for `mmsghdr`, for sending multiple messages with
/// [sendmmsg].
#[cfg(target_os = "linux")]
#[repr(transparent)]
pub struct MMsgHdr<'a> {
raw: c::mmsghdr,
_phantom: PhantomData<&'a mut ()>,
}

#[cfg(target_os = "linux")]
impl<'a> MMsgHdr<'a> {
/// Constructs a new message with no destination address.
pub fn new(iov: &'a [IoSlice<'_>], control: &'a mut SendAncillaryBuffer<'_, '_, '_>) -> Self {
with_noaddr_msghdr(iov, control, Self::wrap)
}

/// Constructs a new message to a specific address.
///
/// The lifetime of `addr` (and the underlying
/// [SocketAddrStorage](crate::net::addr::SocketAddrStorage)) must be valid
/// until the call to [sendmmsg], so types implementing
/// [SocketAddrArg](crate::net::addr::SocketAddrArg) can't be used here
/// without first being converted using
/// [SocketAddrArg::as_any](crate::net::addr::SocketAddrArg::as_any).
pub fn new_with_addr(
addr: &'a SocketAddrAny,
Copy link
Contributor Author

@colinmarc colinmarc Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to take a &'a impl SockAddrArg here, but it doesn't seem like the trait ensures that the pointer passed to the closure lives as long as the SockAddrArg itself.

We could obviously immediately call as_any, but that would just be making an explicit clone implicit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. Could you add a comment about this? Something like, "this doesn't use SocketAddrArg because that creates a temporary SocketAddrAny that only lives for the duration of the with_sockaddr call, and we need a SocketAddrAny that lives for 'a".

Another possibility would be to add a with_socket_addr_any function to SocketAddrArg like this:

    fn with_socket_addr_any<R>(&self, f: impl FnOnce(&SocketAddrAny) -> R) -> R {
        let any = self.as_any();
        f(&any)
    }

Then users with a family-specific addr could do

    addr.with_socket_addr_any(|addr| {
        sendmmsg(
            &socket,
            &mut [
                MMsgHdr::new_with_addr(
                    &addr,
                    &[IoSlice::new(b"hello")],
                    &mut Default::default(),
                ),
            ],
            SendFlags::empty(),
        )
    });

and then SocketAddrAny's impl SocketAddrArg could skip the as_any() call:

    fn with_socket_addr_any<R>(&self, f: impl FnOnce(&SocketAddrAny) -> R) -> R {
        f(self)
    }

That way users could avoid calling as_any themselves. We could do that, though it's not that much of a simplification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a sentence to the docstring pointing users to as_any - let me know what you think or if you think it needs more explication. I tried a few variants and in the end it felt like too much detail for end-users of the library, but maybe it would be useful as an "internal" comment.

iov: &'a [IoSlice<'_>],
control: &'a mut SendAncillaryBuffer<'_, '_, '_>,
) -> MMsgHdr<'a> {
with_msghdr(addr, iov, control, Self::wrap)
}

fn wrap(msg_hdr: c::msghdr) -> Self {
Self {
raw: c::mmsghdr {
msg_hdr,
msg_len: 0,
},
_phantom: PhantomData,
}
}

/// Returns the number of bytes sent. This will return 0 until after a
/// successful call to [sendmmsg].
pub fn bytes_sent(&self) -> usize {
self.raw.msg_len as _
}
}

/// `sendmsg(msghdr)`—Sends a message on a socket.
///
/// This function is for use on connected sockets, as it doesn't have
Expand Down Expand Up @@ -656,6 +707,22 @@ pub fn sendmsg_addr(
backend::net::syscalls::sendmsg_addr(socket.as_fd(), addr, iov, control, flags)
}

/// `sendmmsg(msghdr)`—Sends multiple messages on a socket.
///
/// # References
/// - [Linux]
///
/// [Linux]: https://man7.org/linux/man-pages/man2/sendmmsg.2.html
#[inline]
#[cfg(target_os = "linux")]
pub fn sendmmsg(
socket: impl AsFd,
msgs: &mut [MMsgHdr<'_>],
flags: SendFlags,
) -> io::Result<usize> {
backend::net::syscalls::sendmmsg(socket.as_fd(), msgs, flags)
}

/// `recvmsg(msghdr)`—Receives a message from a socket.
///
/// # References
Expand Down
91 changes: 91 additions & 0 deletions tests/net/v4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,94 @@ fn test_v4_msg() {
client.join().unwrap();
server.join().unwrap();
}

#[test]
#[cfg(target_os = "linux")]
fn test_v4_sendmmsg() {
crate::init();

use std::net::TcpStream;

use rustix::io::IoSlice;
use rustix::net::addr::SocketAddrArg as _;
use rustix::net::{sendmmsg, MMsgHdr};

fn server(ready: Arc<(Mutex<u16>, Condvar)>) {
let connection_socket = socket(AddressFamily::INET, SocketType::STREAM, None).unwrap();

let name = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 0);
bind(&connection_socket, &name).unwrap();

let who = getsockname(&connection_socket).unwrap();
let who = SocketAddrV4::try_from(who).unwrap();

listen(&connection_socket, 1).unwrap();

{
let (lock, cvar) = &*ready;
let mut port = lock.lock().unwrap();
*port = who.port();
cvar.notify_all();
}

let mut buffer = vec![0; 13];
let mut data_socket: TcpStream = accept(&connection_socket).unwrap().into();

std::io::Read::read_exact(&mut data_socket, &mut buffer).unwrap();
assert_eq!(String::from_utf8_lossy(&buffer), "hello...world");
}

fn client(ready: Arc<(Mutex<u16>, Condvar)>) {
let port = {
let (lock, cvar) = &*ready;
let mut port = lock.lock().unwrap();
while *port == 0 {
port = cvar.wait(port).unwrap();
}
*port
};

let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port);
let data_socket = socket(AddressFamily::INET, SocketType::STREAM, None).unwrap();
connect(&data_socket, &addr).unwrap();

let mut off = 0;
while off < 2 {
let sent = sendmmsg(
&data_socket,
&mut [
MMsgHdr::new(&[IoSlice::new(b"hello")], &mut Default::default()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this use new_wiith_addr, like the v6 version does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my idea was to exercise both code paths. I changed it so that both tests exercise both.

MMsgHdr::new_with_addr(
&addr.as_any(),
&[IoSlice::new(b"...world")],
&mut Default::default(),
),
][off..],
SendFlags::empty(),
)
.unwrap();

off += sent;
}
}

let ready = Arc::new((Mutex::new(0_u16), Condvar::new()));
let ready_clone = Arc::clone(&ready);

let server = thread::Builder::new()
.name("server".to_string())
.spawn(move || {
server(ready);
})
.unwrap();

let client = thread::Builder::new()
.name("client".to_string())
.spawn(move || {
client(ready_clone);
})
.unwrap();

client.join().unwrap();
server.join().unwrap();
}
91 changes: 91 additions & 0 deletions tests/net/v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,94 @@ fn test_v6_msg() {
client.join().unwrap();
server.join().unwrap();
}

#[test]
#[cfg(target_os = "linux")]
fn test_v6_sendmmsg() {
crate::init();

use std::net::TcpStream;

use rustix::io::IoSlice;
use rustix::net::addr::SocketAddrArg as _;
use rustix::net::{sendmmsg, MMsgHdr};

fn server(ready: Arc<(Mutex<u16>, Condvar)>) {
let connection_socket = socket(AddressFamily::INET6, SocketType::STREAM, None).unwrap();

let name = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), 0, 0, 0);
bind(&connection_socket, &name).unwrap();

let who = getsockname(&connection_socket).unwrap();
let who = SocketAddrV6::try_from(who).unwrap();

listen(&connection_socket, 1).unwrap();

{
let (lock, cvar) = &*ready;
let mut port = lock.lock().unwrap();
*port = who.port();
cvar.notify_all();
}

let mut buffer = vec![0; 13];
let mut data_socket: TcpStream = accept(&connection_socket).unwrap().into();

std::io::Read::read_exact(&mut data_socket, &mut buffer).unwrap();
assert_eq!(String::from_utf8_lossy(&buffer), "hello...world");
}

fn client(ready: Arc<(Mutex<u16>, Condvar)>) {
let port = {
let (lock, cvar) = &*ready;
let mut port = lock.lock().unwrap();
while *port == 0 {
port = cvar.wait(port).unwrap();
}
*port
};

let addr = SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), port, 0, 0);
let data_socket = socket(AddressFamily::INET6, SocketType::STREAM, None).unwrap();
connect(&data_socket, &addr).unwrap();

let mut off = 0;
while off < 2 {
let sent = sendmmsg(
&data_socket,
&mut [
MMsgHdr::new(&[IoSlice::new(b"hello")], &mut Default::default()),
MMsgHdr::new_with_addr(
&addr.as_any(),
&[IoSlice::new(b"...world")],
&mut Default::default(),
),
][off..],
SendFlags::empty(),
)
.unwrap();

off += sent;
}
}

let ready = Arc::new((Mutex::new(0_u16), Condvar::new()));
let ready_clone = Arc::clone(&ready);

let server = thread::Builder::new()
.name("server".to_string())
.spawn(move || {
server(ready);
})
.unwrap();

let client = thread::Builder::new()
.name("client".to_string())
.spawn(move || {
client(ready_clone);
})
.unwrap();

client.join().unwrap();
server.join().unwrap();
}