diff --git a/Cargo.lock b/Cargo.lock index f11205a8..c292918d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -924,6 +924,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -1218,7 +1227,7 @@ dependencies = [ "dyn-clone", "heapless", "illumos-sys-hdrs", - "itertools 0.12.1", + "itertools 0.13.0", "kstat-macro", "opte", "opte-api", @@ -1250,7 +1259,7 @@ dependencies = [ "clap", "criterion", "ctor", - "itertools 0.12.1", + "itertools 0.13.0", "nix", "opte", "opte-test-utils", diff --git a/Cargo.toml b/Cargo.toml index 67f739d4..eecd6795 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,7 @@ darling = "0.20" dyn-clone = "1.0" heapless = "0.8" ipnetwork = { version = "0.20", default-features = false } -itertools = { version = "0.12", default-features = false } +itertools = { version = "0.13", default-features = false } libc = "0.2" libnet = { git = "https://github.com/oxidecomputer/netadm-sys" } nix = { version = "0.29", features = ["signal", "user"] } diff --git a/bench/src/kbench/measurement.rs b/bench/src/kbench/measurement.rs index 6ea55e83..b0482344 100644 --- a/bench/src/kbench/measurement.rs +++ b/bench/src/kbench/measurement.rs @@ -182,8 +182,9 @@ pub fn build_flamegraph( } let terms = [ - ("xde_rx", rx_name.unwrap_or("rx")), - ("xde_mc_tx", tx_name.unwrap_or("tx")), + ("xde_rx", rx_name.unwrap_or("in_place")), + ("xde_mc_tx", tx_name.unwrap_or("out_place")), + ("xde_worker", "process"), ]; for (tracked_fn, out_name) in terms { diff --git a/crates/illumos-sys-hdrs/src/kernel.rs b/crates/illumos-sys-hdrs/src/kernel.rs index ec510bc9..f19f0b2c 100644 --- a/crates/illumos-sys-hdrs/src/kernel.rs +++ b/crates/illumos-sys-hdrs/src/kernel.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2022 Oxide Computer Company +// Copyright 2024 Oxide Computer Company #![allow(clippy::missing_safety_doc)] @@ -80,6 +80,31 @@ impl krw_t { pub const RW_READER_STARVEWRITER: Self = Self(2); } +#[repr(C)] +pub struct kcondvar_t { + pub _opaque: c_ushort, +} + +#[repr(transparent)] +#[derive(Copy, Clone, Eq, PartialEq)] +pub struct kcv_type_t(pub c_int); +impl kcv_type_t { + pub const CV_DEFAULT: Self = Self(0); + pub const CV_DRIVER: Self = Self(1); +} + +#[repr(transparent)] +#[derive(Copy, Clone, Eq, PartialEq)] +pub struct time_res_t(pub c_int); +impl time_res_t { + pub const TR_NANOSEC: Self = Self(0); + pub const TR_MICROSEC: Self = Self(1); + pub const TR_MILLISEC: Self = Self(2); + pub const TR_SEC: Self = Self(3); + pub const TR_CLOCK_TICK: Self = Self(4); + pub const TR_COUNT: Self = Self(5); +} + extern "C" { type module_info; type module_stat; @@ -586,6 +611,40 @@ extern "C" { pub fn rw_tryupgrade(rwlp: *mut krwlock_t); pub fn rw_read_locked(rwlp: *mut krwlock_t); + pub fn cv_init( + cvp: *mut kcondvar_t, + name: *const c_char, + cv_type: kcv_type_t, + arg: *mut c_void, + ); + pub fn cv_destroy(cvp: *mut kcondvar_t); + pub fn cv_wait(cvp: *mut kcondvar_t, mp: *mut kmutex_t); + pub fn cv_signal(cvp: *mut kcondvar_t); + pub fn cv_broadcast(cvp: *mut kcondvar_t); + pub fn cv_wait_sig(cvp: *mut kcondvar_t, mp: *mut kmutex_t) -> c_int; + pub fn cv_timedwait( + cvp: *mut kcondvar_t, + mp: *mut kmutex_t, + timeout: clock_t, + ) -> clock_t; + pub fn cv_timedwait_sig( + cvp: *mut kcondvar_t, + mp: *mut kmutex_t, + timeout: clock_t, + ) -> clock_t; + pub fn cv_reltimedwait( + cvp: *mut kcondvar_t, + mp: *mut kmutex_t, + delta: clock_t, + res: time_res_t, + ) -> clock_t; + pub fn cv_reltimedwait_sig( + cvp: *mut kcondvar_t, + mp: *mut kmutex_t, + delta: clock_t, + res: time_res_t, + ) -> clock_t; + pub fn nochpoll() -> c_int; pub fn nodev() -> c_int; pub fn nulldev() -> c_int; diff --git a/crates/illumos-sys-hdrs/src/lib.rs b/crates/illumos-sys-hdrs/src/lib.rs index 1f52a7f7..96ebd66d 100644 --- a/crates/illumos-sys-hdrs/src/lib.rs +++ b/crates/illumos-sys-hdrs/src/lib.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2022 Oxide Computer Company +// Copyright 2024 Oxide Computer Company #![cfg_attr(feature = "kernel", feature(extern_types))] #![allow(non_camel_case_types)] #![no_std] @@ -307,6 +307,7 @@ pub type hrtime_t = c_longlong; // ====================================================================== // uts/common/sys/types.h // ====================================================================== +pub type clock_t = c_long; pub type datalink_id_t = uint32_t; pub type dev_t = c_ulong; pub type id_t = c_int; diff --git a/dtrace/flow.d b/dtrace/flow.d new file mode 100644 index 00000000..7754605b --- /dev/null +++ b/dtrace/flow.d @@ -0,0 +1,16 @@ +mac_client_set_flow_cb:entry { + printf("entry: mip %p mrh %p mp %p", + arg0, arg1, arg2); +} + +mac_client_set_flow_cb:return { + printf("donezo off %p val %p", arg0, arg1); +} + +flow_transport_lport_match:entry { + printf("entry: mip %p mrh %p mp %p", arg0, arg1, arg2); +} + +flow_transport_lport_match:return { + printf("donezo off %p val %p", arg0, arg1); +} diff --git a/dtrace/opte-count-cycles-oneliner.d b/dtrace/opte-count-cycles-oneliner.d new file mode 100644 index 00000000..e386e4b7 --- /dev/null +++ b/dtrace/opte-count-cycles-oneliner.d @@ -0,0 +1 @@ +worker-pkt-start { self->ts = vtimestamp; self->dir = arg0; } worker-pkt-end /self->dir == 1 && self->ts/ { @time["rx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); self->ts = 0;} worker-pkt-end /self->dir == 2 && self->ts/ {@time["tx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); self->ts = 0;} END {} \ No newline at end of file diff --git a/dtrace/opte-count-cycles-os.d b/dtrace/opte-count-cycles-os.d new file mode 100644 index 00000000..189fc61f --- /dev/null +++ b/dtrace/opte-count-cycles-os.d @@ -0,0 +1 @@ +xde_rx:entry { self->ts = vtimestamp; self->dir = 1; } xde_mc_tx:entry { self->ts = vtimestamp; self->dir = 2; } xde_rx:return /self->ts/ { @time["rx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); self->ts = 0;} xde_mc_tx:return /self->ts/ {@time["tx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); self->ts = 0;} END {} \ No newline at end of file diff --git a/dtrace/opte-count-cycles.d b/dtrace/opte-count-cycles.d index 60d98b36..ab271796 100644 --- a/dtrace/opte-count-cycles.d +++ b/dtrace/opte-count-cycles.d @@ -1,21 +1,57 @@ -xde_mc_tx:entry { - self->tx_ts = vtimestamp; +worker-pkt-start { + self->ts = vtimestamp; + self->dir = arg0; +} + +worker-pkt-end /self->dir == 1 && self->ts/ { + @time["rx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); +} + +worker-pkt-end /self->dir == 2 && self->ts/ { + @time["tx"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); +} + +worker-pkt-end { + self->ts = 0; + self->dir = 0; } xde_rx:entry { - self->rx_ts = vtimestamp; + self->drop_time = vtimestamp; } -xde_mc_tx:return /self->tx_ts/ { - @time["tx"] = lquantize((vtimestamp - self->tx_ts), 256, 32768, 256); - self->tx_ts = 0; +xde_mc_tx:entry { + self->drop_time = vtimestamp; } -xde_rx:return /self->rx_ts/ { - @time["rx"] = lquantize((vtimestamp - self->rx_ts), 256, 32768, 256); - self->rx_ts = 0; +xde_rx:return /self->dir/ { + @time["place_in_inner"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); + self->drop_time = 0; } -END { +xde_mc_tx:return /self->dir/ { + @time["place_out_inner"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); + self->drop_time = 0; +} +xde_rx:return /!self->dir/ { + @time["place_in"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); + self->drop_time = 0; } + +xde_mc_tx:return /!self->dir/ { + @time["place_out"] = lquantize((vtimestamp - self->ts), 256, 32768, 256); + self->drop_time = 0; +} + +xde_rx:return { + self->drop_time = 0; +} + +xde_mc_tx:return { + self->drop_time = 0; +} + +END { + +} \ No newline at end of file diff --git a/dtrace/opte-tcp-flowdrop.d b/dtrace/opte-tcp-flowdrop.d new file mode 100644 index 00000000..8b430548 --- /dev/null +++ b/dtrace/opte-tcp-flowdrop.d @@ -0,0 +1,31 @@ +/* + * Track bad packets as they happen. + * + * dtrace -L ./lib -I . -Cqs ./opte-bad-packet.d + */ +#include "common.h" + +#define HDR_FMT "%-12s %-3s %-18s %s\n" +#define LINE_FMT "%-12s %-3s 0x%-16p %s\n" + +BEGIN { + printf(HDR_FMT, "PORT", "DIR", "MBLK", "MSG"); + num = 0; +} + +tcp-err { + this->dir = DIR_STR(arg0); + this->port = stringof(arg1); + this->flow_id = stringof(arg2); + this->mblk = arg3; + this->msg = stringof(arg4); + + if (num >= 10) { + printf(HDR_FMT, "PORT", "DIR", "MBLK", "MSG"); + num = 0; + } + + printf(LINE_FMT, this->port, this->dir, this->mblk, this->msg); + stack(); + num++; +} diff --git a/lib/opte/src/ddi/sync.rs b/lib/opte/src/ddi/sync.rs index 6050a738..5246c1fd 100644 --- a/lib/opte/src/ddi/sync.rs +++ b/lib/opte/src/ddi/sync.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2022 Oxide Computer Company +// Copyright 2024 Oxide Computer Company //! Safe abstractions for synchronization primitives. //! @@ -16,12 +16,16 @@ cfg_if! { use core::cell::UnsafeCell; use core::ptr; use illumos_sys_hdrs::{ - kmutex_t, krw_t, krwlock_t, mutex_enter, mutex_exit, - mutex_destroy, mutex_init, rw_enter, rw_exit, rw_init, + cv_broadcast, cv_destroy, cv_init, cv_signal, cv_wait, + kcondvar_t, kcv_type_t, + kmutex_t, krw_t, krwlock_t, mutex_destroy, mutex_enter, + mutex_exit, + mutex_init, mutex_tryenter, rw_enter, rw_exit, rw_init, rw_destroy }; } else { use std::sync::Mutex; + use std::sync::Condvar; } } @@ -134,10 +138,21 @@ impl KMutex { unsafe { mutex_enter(self.mutex.0.get()) }; KMutexGuard { lock: self } } + + pub fn try_lock(&self) -> Result, LockTaken> { + let try_lock = unsafe { mutex_tryenter(self.mutex.0.get()) }; + if try_lock != 0 { + Ok(KMutexGuard { lock: self }) + } else { + Err(LockTaken) + } + } } -unsafe impl Send for KMutex {} -unsafe impl Sync for KMutex {} +pub struct LockTaken; + +unsafe impl Send for KCondvar {} +unsafe impl Sync for KCondvar {} #[cfg(all(not(feature = "std"), not(test)))] pub struct KMutexGuard<'a, T: 'a> { @@ -212,6 +227,16 @@ impl KMutex { let guard = self.inner.lock().unwrap(); KMutexGuard { guard } } + + pub fn try_lock(&self) -> Result, LockTaken> { + self.inner + .try_lock() + .map_err(|err| match err { + std::sync::TryLockError::Poisoned(_) => panic!("oops"), + std::sync::TryLockError::WouldBlock => LockTaken, + }) + .map(|guard| KMutexGuard { guard }) + } } /// A wrapper around illumos rwlock(9F) @@ -400,3 +425,79 @@ impl KRwLock { KRwLockWriteGuard { guard } } } + +unsafe impl Send for KMutex {} +unsafe impl Sync for KMutex {} + +#[cfg(all(not(feature = "std"), not(test)))] +pub struct KCondvar { + cv: UnsafeCell, +} + +#[cfg(any(feature = "std", test))] +pub struct KCondvar { + cv: Condvar, +} + +#[cfg(all(not(feature = "std"), not(test)))] +impl KCondvar { + pub fn new() -> Self { + let mut cv = kcondvar_t { _opaque: 0 }; + + unsafe { + cv_init( + &mut cv, + ptr::null_mut(), + kcv_type_t::CV_DRIVER, + ptr::null_mut(), + ); + } + + Self { cv: UnsafeCell::new(cv) } + } + + pub fn notify_one(&self) { + unsafe { cv_signal(self.cv.get()) } + } + + pub fn notify_all(&self) { + unsafe { cv_broadcast(self.cv.get()) } + } + + pub fn wait<'a, T: 'a>( + &self, + lock: KMutexGuard<'a, T>, + ) -> KMutexGuard<'a, T> { + unsafe { cv_wait(self.cv.get(), lock.lock.mutex.0.get()) } + lock + } +} + +#[cfg(any(feature = "std", test))] +impl KCondvar { + pub fn new() -> Self { + Self { cv: Condvar::new() } + } + + pub fn notify_one(&self) { + self.cv.notify_one() + } + + pub fn notify_all(&self) { + self.cv.notify_one() + } + + pub fn wait<'a, T: 'a>( + &self, + lock: KMutexGuard<'a, T>, + ) -> KMutexGuard<'a, T> { + KMutexGuard { guard: self.cv.wait(lock.guard).unwrap() } + } +} + +#[cfg(all(not(feature = "std"), not(test)))] +impl Drop for KCondvar { + fn drop(&mut self) { + unsafe { cv_destroy(self.cv.get()) }; + } +} diff --git a/lib/opte/src/engine/packet.rs b/lib/opte/src/engine/packet.rs index d9ac2107..7083d922 100644 --- a/lib/opte/src/engine/packet.rs +++ b/lib/opte/src/engine/packet.rs @@ -8,11 +8,6 @@ //! //! TODO //! -//! * Add a PacketChain type to represent a chain of one or more -//! indepenndent packets. Also consider having chains that represent -//! multiple packets for the same flow if it would be advantageous to -//! do so. -//! //! * Add hardware offload information to [`Packet`]. //! @@ -49,12 +44,19 @@ use super::ip6::Ipv6HdrError; use super::ip6::Ipv6Meta; use super::NetworkParser; use crate::d_error::DError; +use crate::ddi::sync::KCondvar; +use crate::ddi::sync::KMutex; +use alloc::collections::LinkedList; use core::fmt; use core::fmt::Display; +use core::num::NonZeroUsize; use core::ptr; use core::ptr::NonNull; use core::result; use core::slice; +use core::sync::atomic::AtomicBool; +use core::sync::atomic::AtomicU64; +use core::sync::atomic::AtomicUsize; use crc32fast::Hasher; use dyn_clone::DynClone; use serde::Deserialize; @@ -423,6 +425,7 @@ impl PacketMeta { struct PacketChainInner { head: NonNull, tail: NonNull, + len: usize, } /// A chain of network packets. @@ -460,11 +463,21 @@ impl PacketChain { // Walk the chain to find the tail, and support faster append. let mut tail = head; + let mut len = 1; while let Some(next_ptr) = NonNull::new((*tail.as_ptr()).b_next) { + len += 1; tail = next_ptr; } - Ok(Self { inner: Some(PacketChainInner { head, tail }) }) + Ok(Self { inner: Some(PacketChainInner { head, tail, len }) }) + } + + pub fn len(&self) -> usize { + self.inner.as_ref().map(|v| v.len).unwrap_or_default() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 } /// Removes the next packet from the top of the chain and returns @@ -527,8 +540,10 @@ impl PacketChain { // pkt_p->b_next is already null. } list.tail = pkt; + list.len += 1; } else { - self.inner = Some(PacketChainInner { head: pkt, tail: pkt }); + self.inner = + Some(PacketChainInner { head: pkt, tail: pkt, len: 1 }); } } @@ -538,6 +553,31 @@ impl PacketChain { pub fn unwrap_mblk(mut self) -> Option> { self.inner.take().map(|v| v.head) } + + /// Append another `PacketChain` to the end of this one. + pub fn extend(&mut self, mut other: Self) { + match (&mut self.inner, other.inner.take()) { + // Append them to us. + (Some(my_inner), Some(their_inner)) => { + // link our tail with their head. + let old_tail_p = my_inner.tail.as_ptr(); + let their_head_p = their_inner.head.as_ptr(); + unsafe { + (*old_tail_p).b_next = their_head_p; + (*their_head_p).b_prev = old_tail_p; + } + + my_inner.tail = their_inner.tail; + my_inner.len += their_inner.len; + } + // replace with their inner. + (None, their_inner @ Some(_)) => { + self.inner = their_inner; + } + // Append an empty list: no-op. + (_, None) => {} + } + } } impl Drop for PacketChain { @@ -562,6 +602,170 @@ impl Drop for PacketChain { } } +unsafe impl Send for PacketChain {} +unsafe impl Sync for PacketChain {} + +/// A `PacketChain` plus per-element metadata. +// using linked list, probably want to swap vecdeques in and out. +pub struct PacketChainAnd { + packets: PacketChain, + metadata: LinkedList, +} + +impl PacketChainAnd { + pub fn len(&self) -> usize { + self.packets.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn extend(&mut self, mut other: Self) { + self.packets.extend(other.packets); + self.metadata.append(&mut other.metadata); + } +} + +impl Default for PacketChainAnd { + fn default() -> Self { + Self { packets: PacketChain::empty(), metadata: LinkedList::new() } + } +} + +impl Iterator for PacketChainAnd { + type Item = (Packet, T); + + fn next(&mut self) -> Option { + self.packets + .pop_front() + .and_then(|el| self.metadata.pop_front().map(|meta| (el, meta))) + } +} + +// to test: double-buffer inner with shared tick? + +struct WriteSlot { + inner: KMutex>, + pkts: AtomicUsize, +} + +impl Default for WriteSlot { + fn default() -> Self { + Self { + inner: KMutex::new( + Default::default(), + crate::ddi::sync::KMutexType::Spin, + ), + pkts: 0.into(), + } + } +} + +/// Not quite an SQueue. +pub struct EssQueue { + slots: [WriteSlot; 2], + sleepy: KMutex<()>, + cv: KCondvar, + watermark: Option, + kill: AtomicBool, +} + +// NOTE: this is a really dumb proxy for per-port. + +impl EssQueue { + pub fn new(watermark: Option) -> Self { + Self { + slots: Default::default(), + sleepy: KMutex::new( + Default::default(), + crate::ddi::sync::KMutexType::Spin, + ), + cv: KCondvar::new(), + watermark, + kill: false.into(), + } + } + + // TODO: want in future to maybe have T be derived from the packet (e.g., parsed). + pub fn deliver( + &self, + slot: usize, + packets: PacketChain, + mut f: impl FnMut() -> T, + ) -> Result<(), EssQueueDeliverError> { + let n = packets.len(); + // pre-prepare list of elements to push at back. + let mut els: LinkedList = (0..n).map(|_| f()).collect(); + + let mut workspace = self.slots[slot].inner.lock(); + + if self + .watermark + .map(|w| workspace.packets.len() > w.into()) + .unwrap_or(false) + { + return Err(EssQueueDeliverError::Full); + } + + workspace.packets.extend(packets); + workspace.metadata.append(&mut els); + + self.slots[slot] + .pkts + .fetch_add(n, core::sync::atomic::Ordering::Relaxed); + + // We can notify with/without the lock, but illumos prefers + // we do so with the lock 'for scheduling purposes'. + drop(workspace); + self.cv.notify_one(); + + Ok(()) + } + + // Probably want there to be a timeout here rather than just a kill flag. + pub fn receive(&self) -> Result, EssQueueReceiveError> { + let mut pkts = PacketChainAnd::default(); + + while pkts.is_empty() { + if self.kill.load(core::sync::atomic::Ordering::Relaxed) { + return Err(EssQueueReceiveError::Dead); + } + + for slot in &self.slots { + if slot.pkts.load(core::sync::atomic::Ordering::Relaxed) == 0 { + continue; + } + let mut workspace = slot.inner.lock(); + + let taken = core::mem::take(&mut *workspace); + slot.pkts.store(0, core::sync::atomic::Ordering::Relaxed); + drop(workspace); + + pkts.extend(taken); + } + + // let a = self.sleepy.lock(); + // self.cv.wait(a); + } + + Ok(pkts) + } + + pub fn quiesce(&self) { + self.kill.store(true, core::sync::atomic::Ordering::Relaxed); + self.cv.notify_all(); + } +} + +pub enum EssQueueDeliverError { + Full, +} + +pub enum EssQueueReceiveError { + Dead, +} + /// A network packet. /// /// The [`Packet`] type presents an abstraction for manipulating @@ -4001,6 +4205,8 @@ mod test { let mut chain = unsafe { PacketChain::new(els[0]) }.unwrap(); let pkt = unsafe { Packet::wrap_mblk(new_el) }.unwrap(); + assert_eq!(chain.len(), els.len()); + chain.append(pkt); // Chain head/tail ptrs are correct diff --git a/xde/src/lib.rs b/xde/src/lib.rs index 726f1ef4..a7e87431 100644 --- a/xde/src/lib.rs +++ b/xde/src/lib.rs @@ -47,6 +47,7 @@ mod mac_sys; pub mod route; pub mod secpolicy; pub mod sys; +pub mod thread; pub mod xde; // On alignment, `kmem_alloc(9F)` has this of offer: diff --git a/xde/src/thread.rs b/xde/src/thread.rs new file mode 100644 index 00000000..42f2e6e1 --- /dev/null +++ b/xde/src/thread.rs @@ -0,0 +1,75 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2024 Oxide Computer Company + +//! Illumos kthread support. + +use crate::ip::kthread_t; +use crate::ip::p0; +use crate::ip::thread_create; +use crate::ip::thread_exit; +use crate::ip::thread_join; +use crate::ip::TS_RUN; +use alloc::boxed::Box; +use core::ffi::c_void; +use core::ptr; +use core::ptr::addr_of_mut; +use core::ptr::NonNull; + +unsafe extern "C" fn kthread_body(arg: *mut c_void) { + let arg = arg as *mut Box; + let closure = unsafe { Box::from_raw(arg) }; + + closure(); + // closure used by val, so dropped before thread exit. + + unsafe { + thread_exit(); + } +} + +pub fn spawn(f: F) -> JoinHandle +where + F: FnOnce(), // -> T, + F: Send + 'static, + // T: Send + 'static, +{ + // A bit of an odd dance here -- we need to double box to get a thin + // pointer at the `into_raw` side. + let boxed = Box::new(f) as Box; + let arg = Box::into_raw(Box::new(boxed)); + let handle = unsafe { + thread_create( + ptr::null_mut(), + 0, // pulled up to default stack size. + // Typedef implies no args, reality implies args. Huh. + Some(core::mem::transmute::<_, unsafe extern "C" fn()>( + kthread_body as unsafe extern "C" fn(_), + )), + arg as *mut c_void, + 0, + addr_of_mut!(p0), + TS_RUN as i32, + 60, //minclsyspri + ) + }; + + let handle = NonNull::new(handle).expect( + "thread_create returned a null ptr, \ + but is documented as infallible", + ); + + JoinHandle { handle } +} + +pub struct JoinHandle { + handle: NonNull, +} + +impl JoinHandle { + pub fn join(self) { + unsafe { thread_join((*self.handle.as_ptr()).t_did) } + } +} diff --git a/xde/src/xde.rs b/xde/src/xde.rs index ce859a2e..18c8411e 100644 --- a/xde/src/xde.rs +++ b/xde/src/xde.rs @@ -28,6 +28,8 @@ use crate::route::RouteCache; use crate::route::RouteKey; use crate::secpolicy; use crate::sys; +use crate::thread::spawn; +use crate::thread::JoinHandle; use crate::warn; use alloc::boxed::Box; use alloc::ffi::CString; @@ -37,6 +39,7 @@ use alloc::sync::Arc; use alloc::vec::Vec; use core::ffi::CStr; use core::num::NonZeroU32; +use core::num::NonZeroUsize; use core::ptr; use core::ptr::addr_of; use core::ptr::addr_of_mut; @@ -64,6 +67,7 @@ use opte::engine::headers::EncapMeta; use opte::engine::headers::IpAddr; use opte::engine::ioctl::{self as api}; use opte::engine::ip6::Ipv6Addr; +use opte::engine::packet::EssQueue; use opte::engine::packet::Initialized; use opte::engine::packet::InnerFlowId; use opte::engine::packet::Packet; @@ -149,6 +153,14 @@ extern "C" { pub fn __dtrace_probe_hdlr__resp(resp_str: uintptr_t); pub fn __dtrace_probe_rx(mp: uintptr_t); pub fn __dtrace_probe_tx(mp: uintptr_t); + + pub fn __dtrace_probe_worker__start(n_pkts: uintptr_t); + pub fn __dtrace_probe_worker__end(); + + pub fn __dtrace_probe_worker__pkt__start(dir: uintptr_t); + pub fn __dtrace_probe_worker__pkt__end(); + + pub fn __dtrace_probe_worker__no__space(dir: uintptr_t, n_pkts: uintptr_t); } fn bad_packet_parse_probe( @@ -230,6 +242,9 @@ struct XdeState { vpc_map: Arc, v2b: Arc, underlay: KMutex>, + + deliver: Arc>, + deliver_hdl: Option, } struct UnderlayState { @@ -252,11 +267,23 @@ fn get_xde_state() -> &'static XdeState { impl XdeState { fn new() -> Self { let ectx = Arc::new(ExecCtx { log: Box::new(opte::KernelLog {}) }); + + // Completely arbitrary watermark lmao. + let deliver = Arc::new(EssQueue::new(Some( + NonZeroUsize::new(usize::MAX).unwrap(), + ))); + let worker_mailbox = deliver.clone(); + + let deliver_hdl = Some(spawn(|| xde_worker(worker_mailbox))); + XdeState { underlay: KMutex::new(None, KMutexType::Driver), ectx, vpc_map: Arc::new(overlay::VpcMappings::new()), v2b: Arc::new(overlay::Virt2Boundary::new()), + + deliver, + deliver_hdl, } } } @@ -1273,7 +1300,13 @@ unsafe extern "C" fn xde_detach( // Reattach the XdeState to a Box, which takes ownership and will // free it on drop. - drop(Box::from_raw(state)); + let mut xde = Box::from_raw(state); + + // End the worker thread. + xde.deliver.quiesce(); + xde.deliver_hdl.take().unwrap().join(); + + drop(xde); // Remove control device ddi_remove_minor_node(xde_dip, XDE_STR); @@ -1549,8 +1582,19 @@ unsafe extern "C" fn xde_mc_tx( // by the mch they're being targeted to. E.g., either build a list // of chains (u1, u2, port0, port1, ...), or hold tx until another // packet breaks the run targeting the same dest. - while let Some(pkt) = chain.pop_front() { - xde_mc_tx_one(src_dev, pkt); + // while let Some(pkt) = chain.pop_front() { + // xde_mc_tx_one(src_dev, pkt); + // } + + let n_pkts = chain.len(); + + if let Err(_) = get_xde_state() + .deliver + .deliver(1, chain, || Origin::Port(src_dev.devname.clone())) + { + unsafe { + __dtrace_probe_worker__no__space(Direction::Out as _, n_pkts); + } } ptr::null_mut() @@ -1697,6 +1741,85 @@ unsafe fn xde_mc_tx_one( ptr::null_mut() } +// Doing it this wey for now because passing Arcs will really +// complicate the detach flow + safety (e.g., Arcs to ports/ulays +// outliving the quiesced callbacks)... +// Strings suck but this is a POC, so w/e. +enum Origin { + Underlay(usize), + Port(String), +} + +// TODO: put somewhere sane + +pub const CPU_BEST: c_int = -4; + +extern "C" { + pub fn affinity_set(affinity: c_int); +} + +fn xde_worker(mailbox: Arc>) { + opte::engine::err!("XDE WORKER STARTED"); + + unsafe { affinity_set(CPU_BEST) }; + + while let Ok(packets) = mailbox.receive() { + unsafe { + __dtrace_probe_worker__start(packets.len()); + } + + let devs = unsafe { xde_devs.read() }; + let xde = get_xde_state(); + let ulay = xde.underlay.lock(); + + // TODO: hold packets out here for batched dispatch. + for (packet, origin) in packets { + let dir = if let Origin::Underlay(_) = &origin { + Direction::In + } else { + Direction::Out + }; + unsafe { + __dtrace_probe_worker__pkt__start(dir as _); + } + + match origin { + Origin::Port(port_name) => { + let my_port = + devs.iter().find(|port| port.devname == port_name); + + if let Some(port) = my_port { + unsafe { + xde_mc_tx_one(port, packet); + } + } + } + Origin::Underlay(0) => unsafe { + if let Some(ulay) = &*ulay { + xde_rx_one(&ulay.u1.mch, ptr::null_mut(), packet); + } + }, + Origin::Underlay(1) => unsafe { + if let Some(ulay) = &*ulay { + xde_rx_one(&ulay.u2.mch, ptr::null_mut(), packet); + } + }, + Origin::Underlay(n) => panic!("I don't have an {n}th underlay"), + }; + + unsafe { + __dtrace_probe_worker__pkt__end(); + } + } + + unsafe { + __dtrace_probe_worker__end(); + } + } + + opte::engine::err!("XDE WORKER EXITED"); +} + /// This is a generic wrapper for references that should be dropped once not in /// use. pub(crate) struct DropRef @@ -1851,8 +1974,20 @@ unsafe extern "C" fn xde_rx( // by the mch they're being targeted to. E.g., either build a list // of chains (port0, port1, ...), or hold tx until another // packet breaks the run targeting the same dest. - while let Some(pkt) = chain.pop_front() { - xde_rx_one(&mch, mrh, pkt); + // while let Some(pkt) = chain.pop_front() { + // xde_rx_one(&mch, mrh, pkt); + // } + + let n_pkts = chain.len(); + + // Eh... The only reason I can get away with this is because we never + // Hairpin anything on the underlay, for good reason lmao. + if let Err(_) = + get_xde_state().deliver.deliver(0, chain, || Origin::Underlay(0)) + { + unsafe { + __dtrace_probe_worker__no__space(Direction::In as _, n_pkts); + } } }