diff --git a/Cargo.lock b/Cargo.lock index ee79f9c4..ab746a07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1313,6 +1313,7 @@ dependencies = [ "smoltcp", "tabwriter", "usdt", + "uuid", "version_check", "zerocopy 0.8.25", ] @@ -1327,6 +1328,7 @@ dependencies = [ "postcard", "serde", "smoltcp", + "uuid", ] [[package]] diff --git a/bin/opteadm/src/bin/opteadm.rs b/bin/opteadm/src/bin/opteadm.rs index fe74d27e..70d0b146 100644 --- a/bin/opteadm/src/bin/opteadm.rs +++ b/bin/opteadm/src/bin/opteadm.rs @@ -31,6 +31,7 @@ use oxide_vpc::api::ClearVirt2PhysReq; use oxide_vpc::api::DelRouterEntryReq; use oxide_vpc::api::DelRouterEntryResp; use oxide_vpc::api::DhcpCfg; +use oxide_vpc::api::DumpFlowStatsResp; use oxide_vpc::api::ExternalIpCfg; use oxide_vpc::api::Filters as FirewallFilters; use oxide_vpc::api::FirewallAction; @@ -276,6 +277,13 @@ enum Command { #[arg(long = "dir")] direction: Option, }, + + /// XXX TEMP + DumpFlowStats { + /// The OPTE port to read... + #[arg(short)] + port: String, + }, } #[derive(Debug, Parser)] @@ -859,6 +867,12 @@ fn main() -> anyhow::Result<()> { })?; } } + + // XXX TEMP + Command::DumpFlowStats { port } => { + let DumpFlowStatsResp { data } = hdl.dump_flowstats(&port)?; + println!("{data}"); + } } Ok(()) diff --git a/crates/opte-api/Cargo.toml b/crates/opte-api/Cargo.toml index 7c4d2e60..34c47a95 100644 --- a/crates/opte-api/Cargo.toml +++ b/crates/opte-api/Cargo.toml @@ -17,6 +17,7 @@ ingot.workspace = true ipnetwork = { workspace = true, optional = true } postcard.workspace = true serde.workspace = true +uuid.workspace = true [dependencies.smoltcp] workspace = true diff --git a/crates/opte-api/src/cmd.rs b/crates/opte-api/src/cmd.rs index 33be6b40..613045cc 100644 --- a/crates/opte-api/src/cmd.rs +++ b/crates/opte-api/src/cmd.rs @@ -50,6 +50,9 @@ pub enum OpteCmd { SetExternalIps = 80, // set xde external IPs for a port AllowCidr = 90, // allow ip block through gateway tx/rx RemoveCidr = 91, // deny ip block through gateway tx/rx + + // TEMP + DumpFlowStats = 34, } impl TryFrom for OpteCmd { diff --git a/crates/opte-api/src/lib.rs b/crates/opte-api/src/lib.rs index 11cdcc69..a935b74e 100644 --- a/crates/opte-api/src/lib.rs +++ b/crates/opte-api/src/lib.rs @@ -28,6 +28,7 @@ pub mod encap; pub mod ip; pub mod mac; pub mod ndp; +pub mod stat; pub mod tcp; pub mod ulp; @@ -38,6 +39,7 @@ pub use encap::*; pub use ip::*; pub use mac::*; pub use ndp::*; +pub use stat::*; pub use tcp::*; pub use ulp::*; @@ -51,7 +53,7 @@ pub use ulp::*; /// /// We rely on CI and the check-api-version.sh script to verify that /// this number is incremented anytime the oxide-api code changes. -pub const API_VERSION: u64 = 36; +pub const API_VERSION: u64 = 37; /// Major version of the OPTE package. pub const MAJOR_VERSION: u64 = 0; diff --git a/crates/opte-api/src/stat.rs b/crates/opte-api/src/stat.rs new file mode 100644 index 00000000..846bf88f --- /dev/null +++ b/crates/opte-api/src/stat.rs @@ -0,0 +1,38 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2025 Oxide Computer Company + +//! Types for handling flow stats from the ioctl API. + +use crate::Direction; +use alloc::vec::Vec; +use serde::Deserialize; +use serde::Serialize; +use uuid::Uuid; + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct FlowStat { + pub partner: FlowId, + pub dir: Direction, + pub bases: Vec, + pub stats: PacketCounter, +} + +#[derive(Deserialize, Serialize, Debug, Clone, Copy)] +pub struct PacketCounter { + pub created_at: u64, + pub pkts_in: u64, + pub bytes_in: u64, + pub pkts_out: u64, + pub bytes_out: u64, +} + +#[derive(Deserialize, Serialize, Debug, Clone, Copy)] +pub struct FullCounter { + pub allow: u64, + pub deny: u64, + pub hairpin: u64, + pub packets: PacketCounter, +} diff --git a/lib/opte-ioctl/src/lib.rs b/lib/opte-ioctl/src/lib.rs index b12be9e6..8c3efead 100644 --- a/lib/opte-ioctl/src/lib.rs +++ b/lib/opte-ioctl/src/lib.rs @@ -35,6 +35,7 @@ use oxide_vpc::api::DelRouterEntryReq; use oxide_vpc::api::DelRouterEntryResp; use oxide_vpc::api::DeleteXdeReq; use oxide_vpc::api::DhcpCfg; +use oxide_vpc::api::DumpFlowStatsResp; use oxide_vpc::api::DumpVirt2BoundaryReq; use oxide_vpc::api::DumpVirt2BoundaryResp; use oxide_vpc::api::DumpVirt2PhysReq; @@ -383,6 +384,19 @@ impl OpteHdl { Some(&DumpUftReq { port_name: port_name.to_string() }), ) } + + /// TEMP METHOD + pub fn dump_flowstats( + &self, + port_name: &str, + ) -> Result { + let cmd = OpteCmd::DumpFlowStats; + run_cmd_ioctl( + self.device.as_raw_fd(), + cmd, + Some(&DumpUftReq { port_name: port_name.to_string() }), + ) + } } pub fn run_cmd_ioctl( diff --git a/lib/opte-test-utils/src/lib.rs b/lib/opte-test-utils/src/lib.rs index 001c07bb..3336e5c0 100644 --- a/lib/opte-test-utils/src/lib.rs +++ b/lib/opte-test-utils/src/lib.rs @@ -269,11 +269,11 @@ fn oxide_net_builder( let dhcp = base_dhcp_config(); firewall::setup(&mut pb, fw_limit).expect("failed to add firewall layer"); - gateway::setup(&pb, cfg, vpc_map, fw_limit, &dhcp) + gateway::setup(&mut pb, cfg, vpc_map, fw_limit, &dhcp) .expect("failed to setup gateway layer"); - router::setup(&pb, cfg, one_limit).expect("failed to add router layer"); + router::setup(&mut pb, cfg, one_limit).expect("failed to add router layer"); nat::setup(&mut pb, cfg, snat_limit).expect("failed to add nat layer"); - overlay::setup(&pb, cfg, v2p, v2b, one_limit) + overlay::setup(&mut pb, cfg, v2p, v2b, one_limit) .expect("failed to add overlay layer"); pb } diff --git a/lib/opte-test-utils/src/port_state.rs b/lib/opte-test-utils/src/port_state.rs index 4a5d775f..7f03578f 100644 --- a/lib/opte-test-utils/src/port_state.rs +++ b/lib/opte-test-utils/src/port_state.rs @@ -86,6 +86,11 @@ pub fn print_port( write_hr(&mut out)?; writeln!(&mut out, "{:#?}", port.stats_snap())?; + // ================================================================ + // Print the Better Stats + // ================================================================ + writeln!(&mut out, "{}", port.dump_flow_stats().unwrap())?; + write_hrb(&mut out)?; writeln!(&mut out)?; diff --git a/lib/opte/Cargo.toml b/lib/opte/Cargo.toml index 4aa8fc9a..e63d1bda 100644 --- a/lib/opte/Cargo.toml +++ b/lib/opte/Cargo.toml @@ -38,6 +38,7 @@ itertools = { workspace = true, optional = true } postcard.workspace = true serde.workspace = true tabwriter = { workspace = true, optional = true } +uuid.workspace = true usdt = { workspace = true, optional = true } zerocopy = { workspace = true, optional = true } diff --git a/lib/opte/src/engine/layer.rs b/lib/opte/src/engine/layer.rs index b627953c..188b12b7 100644 --- a/lib/opte/src/engine/layer.rs +++ b/lib/opte/src/engine/layer.rs @@ -17,6 +17,7 @@ use super::packet::InnerFlowId; use super::packet::MblkFullParsed; use super::packet::MblkPacketData; use super::packet::Packet; +use super::port::PortBuilder; use super::port::Transforms; use super::port::meta::ActionMeta; use super::rule; @@ -28,7 +29,10 @@ use super::rule::GenBtError; use super::rule::HdrTransformError; use super::rule::Rule; use super::rule::ht_probe; +use super::stat::StatTree; +use super::stat::TableStat; use crate::ExecCtx; +use crate::ExecCtx2; use crate::LogLevel; use crate::d_error::DError; #[cfg(all(not(feature = "std"), not(test)))] @@ -55,6 +59,7 @@ use opte_api::Direction; use opte_api::DumpLayerResp; use opte_api::RuleDump; use opte_api::RuleTableEntryDump; +use uuid::Uuid; #[derive(Debug)] pub enum LayerError { @@ -157,10 +162,31 @@ pub enum LftError { MaxCapacity, } +#[derive(Clone, Debug)] +struct LftInEntry { + action_desc: ActionDescEntry, + stat: Arc, +} + +impl Display for LftInEntry { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.action_desc) + } +} + +impl Dump for LftInEntry { + type DumpVal = ActionDescEntryDump; + + fn dump(&self, hits: u64) -> Self::DumpVal { + ActionDescEntryDump { hits, summary: self.to_string() } + } +} + #[derive(Clone, Debug)] struct LftOutEntry { in_flow_pair: InnerFlowId, action_desc: ActionDescEntry, + stat: Arc, } impl LftOutEntry { @@ -186,7 +212,7 @@ impl Dump for LftOutEntry { struct LayerFlowTable { limit: NonZeroU32, count: u32, - ft_in: FlowTable, + ft_in: FlowTable, ft_out: FlowTable, } @@ -202,11 +228,15 @@ impl LayerFlowTable { action_desc: ActionDescEntry, in_flow: InnerFlowId, out_flow: InnerFlowId, + stat: Arc, ) { // We add unchekced because the limit is now enforced by // LayerFlowTable, not the individual flow tables. - self.ft_in.add_unchecked(in_flow, action_desc.clone()); - let out_entry = LftOutEntry { in_flow_pair: in_flow, action_desc }; + let in_entry = + LftInEntry { action_desc: action_desc.clone(), stat: stat.clone() }; + self.ft_in.add_unchecked(in_flow, in_entry); + let out_entry = + LftOutEntry { in_flow_pair: in_flow, action_desc, stat }; self.ft_out.add_unchecked(out_flow, out_entry); self.count += 1; } @@ -247,10 +277,12 @@ impl LayerFlowTable { match self.ft_in.get(flow) { Some(entry) => { entry.hit(); + let action = entry.state().action_desc.clone(); + let stat = entry.state().stat.clone(); if entry.is_dirty() { - EntryState::Dirty(entry.state().clone()) + EntryState::Dirty(action, stat) } else { - EntryState::Clean(entry.state().clone()) + EntryState::Clean(action, stat) } } @@ -263,10 +295,11 @@ impl LayerFlowTable { Some(entry) => { entry.hit(); let action = entry.state().action_desc.clone(); + let stat = entry.state().stat.clone(); if entry.is_dirty() { - EntryState::Dirty(action) + EntryState::Dirty(action, stat) } else { - EntryState::Clean(action) + EntryState::Clean(action, stat) } } @@ -277,7 +310,7 @@ impl LayerFlowTable { fn remove_in( &mut self, flow: &InnerFlowId, - ) -> Option>> { + ) -> Option>> { self.ft_in.remove(flow) } @@ -337,14 +370,14 @@ impl LayerFlowTable { } /// The result of a flowtable lookup. -pub enum EntryState { +enum EntryState { /// No flow entry was found matching a given flowid. None, /// An existing flow table entry was found. - Clean(ActionDescEntry), + Clean(ActionDescEntry, Arc), /// An existing flow table entry was found, but rule processing must be rerun /// to use the original action or invalidate the underlying entry. - Dirty(ActionDescEntry), + Dirty(ActionDescEntry, Arc), } /// The default action of a layer. @@ -354,8 +387,9 @@ pub enum EntryState { /// reasonable to open this up to be any [`Action`], if such a use /// case were to present itself. For now, we stay conservative, and /// supply only what the current consumers need. -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Default)] pub enum DefaultAction { + #[default] Allow, StatefulAllow, Deny, @@ -408,7 +442,7 @@ impl Display for ActionDescEntry { /// /// This describes the actions a layer's rules can take as well as the /// [`DefaultAction`] to take when a rule doesn't match. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct LayerActions { /// The list of actions shared among the layer's rules. An action /// doesn't have to be shared, each rule is free to create its @@ -420,9 +454,15 @@ pub struct LayerActions { /// direction. pub default_in: DefaultAction, + /// The stats ID to attach to the default-in action. + pub default_in_stat_id: Option, + /// The default action to take if no rule matches in the outbound /// direction. pub default_out: DefaultAction, + + /// The stats ID to attach to the default-in action. + pub default_out_stat_id: Option, } #[derive(KStatProvider)] @@ -504,8 +544,10 @@ pub struct Layer { actions: Vec, default_in: DefaultAction, default_in_hits: u64, + default_in_stat: Arc, default_out: DefaultAction, default_out_hits: u64, + default_out_stat: Arc, ft: LayerFlowTable, ft_cstr: CString, rules_in: RuleTable, @@ -519,15 +561,20 @@ impl Layer { self.actions.get(idx).cloned() } - pub fn add_rule(&mut self, dir: Direction, rule: Rule) { + pub fn add_rule( + &mut self, + dir: Direction, + rule: Rule, + stats: &mut StatTree, + ) { match dir { Direction::Out => { - self.rules_out.add(rule); + self.rules_out.add(rule, stats); self.stats.vals.out_rules += 1; } Direction::In => { - self.rules_in.add(rule); + self.rules_in.add(rule, stats); self.stats.vals.in_rules += 1; } } @@ -738,18 +785,24 @@ impl Layer { pub fn new( name: &'static str, - port: &str, + port: &mut PortBuilder, actions: LayerActions, ft_limit: NonZeroU32, ) -> Self { - let port_c = CString::new(port).unwrap(); + let stats = port.stats_mut(); + let default_in_stat = stats.root(actions.default_in_stat_id); + let default_out_stat = stats.root(actions.default_out_stat_id); + + let port_name = port.name(); + + let port_c = CString::new(port_name).unwrap(); let name_c = CString::new(name).unwrap(); // Unwrap: We know this is fine because the stat names are // generated from the LayerStats structure. let stats = KStatNamed::new( "xde", - &format!("{}_{}", port, name), + &format!("{}_{}", port_name, name), LayerStats::new(), ) .unwrap(); @@ -760,15 +813,17 @@ impl Layer { actions: actions.actions, default_in: actions.default_in, default_in_hits: 0, + default_in_stat, default_out: actions.default_out, default_out_hits: 0, + default_out_stat, name, name_c, port_c, - ft: LayerFlowTable::new(port, name, ft_limit), + ft: LayerFlowTable::new(port_name, name, ft_limit), ft_cstr: CString::new(format!("ft-{}", name)).unwrap(), - rules_in: RuleTable::new(port, name, Direction::In), - rules_out: RuleTable::new(port, name, Direction::Out), + rules_in: RuleTable::new(port_name, name, Direction::In), + rules_out: RuleTable::new(port_name, name, Direction::Out), rt_cstr: CString::new(format!("rt-{}", name)).unwrap(), stats, } @@ -790,18 +845,18 @@ impl Layer { pub(crate) fn process( &mut self, - ectx: &ExecCtx, + ectx: &mut ExecCtx2, dir: Direction, pkt: &mut Packet, xforms: &mut Transforms, ameta: &mut ActionMeta, ) -> result::Result { - use Direction::*; let flow_before = *pkt.flow(); self.layer_process_entry_probe(dir, pkt.flow()); + pkt.meta_mut().stats.new_layer(); let res = match dir { - Out => self.process_out(ectx, pkt, xforms, ameta), - In => self.process_in(ectx, pkt, xforms, ameta), + Direction::Out => self.process_out(ectx, pkt, xforms, ameta), + Direction::In => self.process_in(ectx, pkt, xforms, ameta), }; self.layer_process_return_probe(dir, &flow_before, pkt.flow(), &res); res @@ -809,7 +864,7 @@ impl Layer { fn process_in( &mut self, - ectx: &ExecCtx, + ectx: &mut ExecCtx2, pkt: &mut Packet, xforms: &mut Transforms, ameta: &mut ActionMeta, @@ -821,23 +876,27 @@ impl Layer { // Do we have a FlowTable entry? If so, use it. let flow = *pkt.flow(); - let action = match self.ft.get_in(&flow) { - EntryState::Dirty(ActionDescEntry::Desc(action)) + let (action, stat) = match self.ft.get_in(&flow) { + EntryState::Dirty(ActionDescEntry::Desc(action), stat) if action.is_valid() => { self.ft.mark_clean(Direction::In, &flow); - Some(ActionDescEntry::Desc(action)) + (Some(ActionDescEntry::Desc(action)), Some(stat)) } - EntryState::Dirty(_) => { + EntryState::Dirty(_, _) => { // NoOps are included in this case as we can't ask the actor whether // it remains valid: the simplest method to do so is to rerun lookup. self.ft.remove_in(&flow); - None + (None, None) } - EntryState::Clean(action) => Some(action), - EntryState::None => None, + EntryState::Clean(action, stat) => (Some(action), Some(stat)), + EntryState::None => (None, None), }; + if let Some(stat) = stat { + pkt.meta_mut().stats.push(stat); + } + match action { Some(ActionDescEntry::NoOp) => { self.stats.vals.in_lft_hit += 1; @@ -879,7 +938,7 @@ impl Layer { fn process_in_rules( &mut self, - ectx: &ExecCtx, + ectx: &mut ExecCtx2, pkt: &mut Packet, xforms: &mut Transforms, ameta: &mut ActionMeta, @@ -889,15 +948,23 @@ impl Layer { self.stats.vals.in_lft_miss += 1; let rule = self.rules_in.find_match(pkt.flow(), pkt.meta(), ameta); - let action = if let Some(rule) = rule { + let (action, stat) = if let Some(rule) = rule { self.stats.vals.in_rule_match += 1; - rule.action() + (rule.rule.action(), rule.stat.clone()) } else { self.stats.vals.in_rule_nomatch += 1; self.default_in_hits += 1; - self.default_in.into() + (self.default_in.into(), self.default_in_stat.clone()) }; + // No LFT to account for. + // TODO: figure out how to have actions push on some IDs + // that then belong to the LFT. + let mut stat = Some(stat); + if !matches!(action, Action::StatefulAllow | Action::Stateful(_)) { + pkt.meta_mut().stats.push(stat.take().unwrap()); + } + match action { Action::Allow => Ok(LayerResult::Allow), @@ -910,13 +977,17 @@ impl Layer { }); } + let stat = + ectx.stats.new_intermediate(vec![stat.take().unwrap()]); + pkt.meta_mut().stats.push(stat.clone()); + // The outbound flow ID mirrors the inbound. Remember, // the "top" of layer represents how the client sees // the traffic, and the "bottom" of the layer // represents how the network sees the traffic. let flow_out = pkt.flow().mirror(); let desc = ActionDescEntry::NoOp; - self.ft.add_pair(desc, *pkt.flow(), flow_out); + self.ft.add_pair(desc, *pkt.flow(), flow_out, stat); self.stats.vals.flows += 1; Ok(LayerResult::Allow) } @@ -960,7 +1031,12 @@ impl Layer { }, Err(e) => { - self.record_gen_ht_failure(ectx, In, pkt.flow(), &e); + self.record_gen_ht_failure( + ectx.user_ctx, + In, + pkt.flow(), + &e, + ); return Err(LayerError::GenHdrTransform { layer: self.name, err: e, @@ -1020,6 +1096,8 @@ impl Layer { }); } + // TODO: how on earth are we plumbing StatTree into here?? + let desc = match action.gen_desc(pkt.flow(), pkt, ameta) { Ok(aord) => match aord { AllowOrDeny::Allow(desc) => desc, @@ -1033,7 +1111,12 @@ impl Layer { }, Err(e) => { - self.record_gen_desc_failure(ectx, In, pkt.flow(), &e); + self.record_gen_desc_failure( + ectx.user_ctx, + In, + pkt.flow(), + &e, + ); return Err(LayerError::GenDesc(e)); } }; @@ -1057,6 +1140,10 @@ impl Layer { } } + let stat = + ectx.stats.new_intermediate(vec![stat.take().unwrap()]); + pkt.meta_mut().stats.push(stat.clone()); + // The outbound flow ID must be calculated _after_ the // header transformation. Remember, the "top" // (outbound) of layer represents how the client sees @@ -1069,6 +1156,7 @@ impl Layer { ActionDescEntry::Desc(desc), flow_before, flow_out, + stat, ); self.stats.vals.flows += 1; Ok(LayerResult::Allow) @@ -1096,7 +1184,7 @@ impl Layer { fn process_out( &mut self, - ectx: &ExecCtx, + ectx: &mut ExecCtx2, pkt: &mut Packet, xforms: &mut Transforms, ameta: &mut ActionMeta, @@ -1108,23 +1196,27 @@ impl Layer { // Do we have a FlowTable entry? If so, use it. let flow = *pkt.flow(); - let action = match self.ft.get_out(&flow) { - EntryState::Dirty(ActionDescEntry::Desc(action)) + let (action, stat) = match self.ft.get_out(&flow) { + EntryState::Dirty(ActionDescEntry::Desc(action), stat) if action.is_valid() => { self.ft.mark_clean(Direction::Out, &flow); - Some(ActionDescEntry::Desc(action)) + (Some(ActionDescEntry::Desc(action)), Some(stat)) } - EntryState::Dirty(_) => { + EntryState::Dirty(_, _) => { // NoOps are included in this case as we can't ask the actor whether // it remains valid: the simplest method to do so is to rerun lookup. self.ft.remove_out(&flow); - None + (None, None) } - EntryState::Clean(action) => Some(action), - EntryState::None => None, + EntryState::Clean(action, stat) => (Some(action), Some(stat)), + EntryState::None => (None, None), }; + if let Some(stat) = stat { + pkt.meta_mut().stats.push(stat); + } + match action { Some(ActionDescEntry::NoOp) => { self.stats.vals.out_lft_hit += 1; @@ -1166,7 +1258,7 @@ impl Layer { fn process_out_rules( &mut self, - ectx: &ExecCtx, + ectx: &mut ExecCtx2, pkt: &mut Packet, xforms: &mut Transforms, ameta: &mut ActionMeta, @@ -1176,15 +1268,23 @@ impl Layer { self.stats.vals.out_lft_miss += 1; let rule = self.rules_out.find_match(pkt.flow(), pkt.meta(), ameta); - let action = if let Some(rule) = rule { + let (action, stat) = if let Some(rule) = rule { self.stats.vals.out_rule_match += 1; - rule.action() + (rule.rule.action(), rule.stat.clone()) } else { self.stats.vals.out_rule_nomatch += 1; self.default_out_hits += 1; - self.default_out.into() + (self.default_out.into(), self.default_out_stat.clone()) }; + // No LFT to account for. + // TODO: figure out how to have actions push on some IDs + // that then belong to the LFT. + let mut stat = Some(stat); + if !matches!(action, Action::StatefulAllow | Action::Stateful(_)) { + pkt.meta_mut().stats.push(stat.take().unwrap()); + } + match action { Action::Allow => Ok(LayerResult::Allow), @@ -1197,6 +1297,10 @@ impl Layer { }); } + let stat = + ectx.stats.new_intermediate(vec![stat.take().unwrap()]); + pkt.meta_mut().stats.push(stat.clone()); + // The inbound flow ID must be calculated _after_ the // header transformation. Remember, the "top" // (outbound) of layer represents how the client sees @@ -1205,7 +1309,12 @@ impl Layer { // The final step is to mirror the IPs and ports to // reflect the traffic direction change. let flow_in = pkt.flow().mirror(); - self.ft.add_pair(ActionDescEntry::NoOp, flow_in, *pkt.flow()); + self.ft.add_pair( + ActionDescEntry::NoOp, + flow_in, + *pkt.flow(), + stat, + ); self.stats.vals.flows += 1; Ok(LayerResult::Allow) } @@ -1249,7 +1358,12 @@ impl Layer { }, Err(e) => { - self.record_gen_ht_failure(ectx, Out, pkt.flow(), &e); + self.record_gen_ht_failure( + ectx.user_ctx, + Out, + pkt.flow(), + &e, + ); return Err(LayerError::GenHdrTransform { layer: self.name, err: e, @@ -1309,6 +1423,10 @@ impl Layer { }); } + let stat = + ectx.stats.new_intermediate(vec![stat.take().unwrap()]); + pkt.meta_mut().stats.push(stat.clone()); + let desc = match action.gen_desc(pkt.flow(), pkt, ameta) { Ok(aord) => match aord { AllowOrDeny::Allow(desc) => desc, @@ -1322,7 +1440,12 @@ impl Layer { }, Err(e) => { - self.record_gen_desc_failure(ectx, Out, pkt.flow(), &e); + self.record_gen_desc_failure( + ectx.user_ctx, + Out, + pkt.flow(), + &e, + ); return Err(LayerError::GenDesc(e)); } }; @@ -1359,6 +1482,7 @@ impl Layer { ActionDescEntry::Desc(desc), flow_in, flow_before, + stat, ); self.stats.vals.flows += 1; Ok(LayerResult::Allow) @@ -1493,9 +1617,10 @@ impl Layer { &mut self, in_rules: Vec>, out_rules: Vec>, + stats: &mut StatTree, ) { self.ft.clear(); - self.set_rules_core(in_rules, out_rules); + self.set_rules_core(in_rules, out_rules, stats); } /// Set all rules at once without clearing the flow table. @@ -1506,18 +1631,20 @@ impl Layer { &mut self, in_rules: Vec>, out_rules: Vec>, + stats: &mut StatTree, ) { self.ft.mark_dirty(); - self.set_rules_core(in_rules, out_rules); + self.set_rules_core(in_rules, out_rules, stats); } fn set_rules_core( &mut self, in_rules: Vec>, out_rules: Vec>, + stats: &mut StatTree, ) { - self.rules_in.set_rules(in_rules); - self.rules_out.set_rules(out_rules); + self.rules_in.set_rules(in_rules, stats); + self.rules_out.set_rules(out_rules, stats); self.stats.vals.set_rules_called += 1; self.stats.vals.in_rules.set(self.rules_in.num_rules() as u64); self.stats.vals.out_rules.set(self.rules_out.num_rules() as u64); @@ -1533,6 +1660,7 @@ struct RuleTableEntry { id: RuleId, hits: u64, rule: Rule, + stat: Arc, } impl From<&RuleTableEntry> for RuleTableEntryDump { @@ -1562,15 +1690,18 @@ pub enum RuleRemoveErr { } impl RuleTable { - fn add(&mut self, rule: Rule) { + fn add(&mut self, rule: Rule, stats: &mut StatTree) { + let stat = stats.root(rule.stat_id().copied()); match self.find_pos(&rule) { RulePlace::End => { - let rte = RuleTableEntry { id: self.next_id, hits: 0, rule }; + let rte = + RuleTableEntry { id: self.next_id, hits: 0, rule, stat }; self.rules.push(rte); } RulePlace::Insert(idx) => { - let rte = RuleTableEntry { id: self.next_id, hits: 0, rule }; + let rte = + RuleTableEntry { id: self.next_id, hits: 0, rule, stat }; self.rules.insert(idx, rte); } } @@ -1590,7 +1721,7 @@ impl RuleTable { ifid: &InnerFlowId, pmeta: &MblkPacketData, ameta: &ActionMeta, - ) -> Option<&Rule> { + ) -> Option<&RuleTableEntry> { for rte in self.rules.iter_mut() { if rte.rule.is_match(pmeta, ameta) { rte.hits += 1; @@ -1601,7 +1732,7 @@ impl RuleTable { ifid, &rte.rule, ); - return Some(&rte.rule); + return Some(rte); } } @@ -1739,10 +1870,14 @@ impl RuleTable { } } - pub fn set_rules(&mut self, new_rules: Vec>) { + pub fn set_rules( + &mut self, + new_rules: Vec>, + stats: &mut StatTree, + ) { self.rules.clear(); for r in new_rules { - self.add(r); + self.add(r, stats); } } } @@ -1826,6 +1961,7 @@ mod test { use crate::engine::predicate::Predicate; use crate::engine::rule; + let mut stats = StatTree::default(); let mut rule_table = RuleTable::new("port", "test", Direction::Out); let mut rule = Rule::new( 1, @@ -1836,7 +1972,7 @@ mod test { Ipv4AddrMatch::Prefix(cidr), ])); - rule_table.add(rule.finalize()); + rule_table.add(rule.finalize(), &mut stats); let mut test_pkt = MsgBlk::new_ethernet_pkt(( Ethernet { ethertype: Ethertype::IPV4, ..Default::default() }, diff --git a/lib/opte/src/engine/mod.rs b/lib/opte/src/engine/mod.rs index 222ac0d2..d881dcf6 100644 --- a/lib/opte/src/engine/mod.rs +++ b/lib/opte/src/engine/mod.rs @@ -30,6 +30,7 @@ pub mod predicate; pub mod print; pub mod rule; pub mod snat; +pub mod stat; #[macro_use] pub mod tcp; pub mod tcp_state; diff --git a/lib/opte/src/engine/packet.rs b/lib/opte/src/engine/packet.rs index 46c259cf..f20247cc 100644 --- a/lib/opte/src/engine/packet.rs +++ b/lib/opte/src/engine/packet.rs @@ -33,6 +33,7 @@ use super::rule::CompiledEncap; use super::rule::CompiledTransform; use super::rule::HdrTransform; use super::rule::HdrTransformError; +use super::stat::FlowStatBuilder; pub use crate::api::AddrPair; pub use crate::api::FLOW_ID_DEFAULT; pub use crate::api::InnerFlowId; @@ -457,6 +458,7 @@ pub struct PacketData { pub(crate) headers: OpteMeta, initial_lens: Option>, body: PktBodyWalker, + pub(crate) stats: FlowStatBuilder, } impl From> for OpteMeta { @@ -768,7 +770,12 @@ where .into(), ); let body = PktBodyWalker::new(last_chunk, data); - let meta = Box::new(PacketData { headers, initial_lens, body }); + let meta = Box::new(PacketData { + headers, + initial_lens, + body, + stats: FlowStatBuilder::new(), + }); Packet { state: FullParsed { diff --git a/lib/opte/src/engine/port/mod.rs b/lib/opte/src/engine/port/mod.rs index 9d3646cc..757cad0c 100644 --- a/lib/opte/src/engine/port/mod.rs +++ b/lib/opte/src/engine/port/mod.rs @@ -45,11 +45,15 @@ use super::rule::HdrTransform; use super::rule::HdrTransformError; use super::rule::Rule; use super::rule::TransformFlags; +use super::stat::Action as StatAction; +use super::stat::FlowStat; +use super::stat::StatTree; use super::tcp::KEEPALIVE_EXPIRE_TTL; use super::tcp::TIME_WAIT_EXPIRE_TTL; use super::tcp_state::TcpFlowState; use super::tcp_state::TcpFlowStateError; use crate::ExecCtx; +use crate::ExecCtx2; use crate::d_error::DError; #[cfg(all(not(feature = "std"), not(test)))] use crate::d_error::LabelBlock; @@ -198,6 +202,16 @@ enum InternalProcessResult { Hairpin(MsgBlk), } +impl InternalProcessResult { + fn stat_action(&self) -> StatAction { + match self { + Self::Modified => StatAction::Allow, + Self::Drop { .. } => StatAction::Deny, + Self::Hairpin(..) => StatAction::Hairpin, + } + } +} + impl From for InternalProcessResult { fn from(hpa: HdlPktAction) -> Self { match hpa { @@ -232,7 +246,8 @@ pub struct PortBuilder { // probes. name_cstr: CString, mac: MacAddr, - layers: KMutex>, + layers: Vec, + flow_stats: StatTree, } #[derive(Clone, Debug)] @@ -267,36 +282,34 @@ impl PortBuilder { /// a packet from the guest. The last is the last to see a packet /// before it is delivered to the guest. pub fn add_layer( - &self, + &mut self, new_layer: Layer, pos: Pos, ) -> result::Result<(), OpteError> { - let mut lock = self.layers.lock(); - match pos { Pos::Last => { - lock.push(new_layer); + self.layers.push(new_layer); return Ok(()); } Pos::First => { - lock.insert(0, new_layer); + self.layers.insert(0, new_layer); return Ok(()); } Pos::Before(name) => { - for (i, layer) in lock.iter().enumerate() { + for (i, layer) in self.layers.iter().enumerate() { if layer.name() == name { - lock.insert(i, new_layer); + self.layers.insert(i, new_layer); return Ok(()); } } } Pos::After(name) => { - for (i, layer) in lock.iter().enumerate() { + for (i, layer) in self.layers.iter().enumerate() { if layer.name() == name { - lock.insert(i + 1, new_layer); + self.layers.insert(i + 1, new_layer); return Ok(()); } } @@ -312,14 +325,14 @@ impl PortBuilder { /// Add a new `Rule` to the layer named by `layer`, if such a /// layer exists. Otherwise, return an error. pub fn add_rule( - &self, + &mut self, layer_name: &str, dir: Direction, rule: Rule, ) -> result::Result<(), OpteError> { - for layer in &mut *self.layers.lock() { + for layer in &mut self.layers { if layer.name() == layer_name { - layer.add_rule(dir, rule); + layer.add_rule(dir, rule, &mut self.flow_stats); return Ok(()); } } @@ -335,9 +348,7 @@ impl PortBuilder { ) -> result::Result, PortCreateError> { let data = PortData { state: PortState::Ready, - // At this point the layer pipeline is immutable, thus we - // move the layers out of the mutex. - layers: self.layers.into_inner(), + layers: self.layers, uft_in: FlowTable::new(&self.name, "uft_in", uft_limit, None), uft_out: FlowTable::new(&self.name, "uft_out", uft_limit, None), tcp_flows: FlowTable::new( @@ -346,6 +357,7 @@ impl PortBuilder { tcp_limit, Some(Box::::default()), ), + flow_stats: self.flow_stats, }; let mut data = KRwLock::new(data); @@ -367,7 +379,7 @@ impl PortBuilder { /// [`Layer`] at the given index. If the layer does not exist, or /// has no action at that index, then `None` is returned. pub fn layer_action(&self, layer: &str, idx: usize) -> Option { - for l in &*self.layers.lock() { + for l in &self.layers { if l.name() == layer { return l.action(idx); } @@ -379,9 +391,8 @@ impl PortBuilder { /// List each [`Layer`] under this port. pub fn list_layers(&self) -> ListLayersResp { let mut tmp = vec![]; - let lock = self.layers.lock(); - for layer in lock.iter() { + for layer in self.layers.iter() { tmp.push(LayerDesc { name: layer.name().to_string(), rules_in: layer.num_rules(Direction::In), @@ -411,22 +422,26 @@ impl PortBuilder { name_cstr, mac, ectx, - layers: KMutex::new(Vec::new()), + layers: vec![], + flow_stats: Default::default(), } } /// Remove the [`Layer`] registered under `name`, if such a layer /// exists. - pub fn remove_layer(&self, name: &str) { - let mut lock = self.layers.lock(); - - for (i, layer) in lock.iter().enumerate() { + pub fn remove_layer(&mut self, name: &str) { + for (i, layer) in self.layers.iter().enumerate() { if layer.name() == name { - let _ = lock.remove(i); + let _ = self.layers.remove(i); return; } } } + + /// Provide access to the inner [`StatTree`]. + pub fn stats_mut(&mut self) -> &mut StatTree { + &mut self.flow_stats + } } /// The current state of the [`Port`]. @@ -551,6 +566,8 @@ pub struct UftEntry { /// Cached reference to a flow's TCP state, if applicable. /// This allows us to maintain up-to-date TCP flow table info tcp_flow: Option>>, + + stat: Arc, } impl Dump for UftEntry { @@ -583,7 +600,8 @@ impl Display for UftEntry { impl fmt::Debug for UftEntry { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let UftEntry { pair: _pair, xforms, l4_hash, epoch, tcp_flow } = self; + let UftEntry { pair: _pair, xforms, l4_hash, epoch, tcp_flow, stat } = + self; f.debug_struct("UftEntry") .field("pair", &"") @@ -591,6 +609,10 @@ impl fmt::Debug for UftEntry { .field("l4_hash", l4_hash) .field("epoch", epoch) .field("tcp_flow", tcp_flow) + .field( + "stats", + &crate::api::FlowStat::::from(stat.as_ref()), + ) .finish() } } @@ -688,6 +710,8 @@ struct PortData { // that we know which inbound UFT/FT entries to retire upon // connection termination. tcp_flows: FlowTable, + + flow_stats: StatTree, } /// A virtual switch port. @@ -885,10 +909,12 @@ impl Port { let mut data = self.data.write(); check_state!(data.state, [PortState::Ready, PortState::Running])?; - for layer in &mut data.layers { + let PortData { layers, flow_stats, .. } = &mut (*data); + + for layer in layers { if layer.name() == layer_name { self.epoch.fetch_add(1, SeqCst); - layer.add_rule(dir, rule); + layer.add_rule(dir, rule, flow_stats); return Ok(()); } } @@ -998,6 +1024,17 @@ impl Port { Ok(DumpTcpFlowsResp { flows: data.tcp_flows.dump() }) } + /// XXX TEST METHOD + pub fn dump_flow_stats(&self) -> Result { + let data = self.data.read(); + check_state!( + data.state, + [PortState::Running, PortState::Paused, PortState::Restored] + )?; + + Ok(data.flow_stats.dump()) + } + /// Clear all entries from the Unified Flow Table (UFT). /// /// # States @@ -1109,6 +1146,8 @@ impl Port { // set TIME_WAIT_EXPIRE_TTL or another state-specific timer lower // than 60s, we'll need to specifically expire the matching UFTs. let _ = data.tcp_flows.expire_flows(now, |_| FLOW_ID_DEFAULT); + + data.flow_stats.expire(now); Ok(()) } @@ -1237,6 +1276,7 @@ impl Port { let process_start = Moment::now(); let flow_before = pkt.flow(); let mblk_addr = pkt.mblk_addr(); + let pkt_len = pkt.len() as u64; // Packet processing is split into a few mechanisms based on // expected speed, based on actions and the size of required metadata: @@ -1338,6 +1378,8 @@ impl Port { // The Fast Path. drop(lock.take()); let xforms = &entry.state().xforms; + entry.state().stat.hit_at(pkt_len, process_start); + let out = if xforms.compiled.is_some() { FastPathDecision::CompiledUft(entry) } else { @@ -1403,7 +1445,7 @@ impl Port { self.name_cstr.as_c_str(), tcp, dir, - pkt.len() as u64, + pkt_len, ufid_in, ) { Ok(TcpState::Closed) => Some(Arc::clone(tcp_flow)), @@ -1670,10 +1712,12 @@ impl Port { let mut data = self.data.write(); check_state!(data.state, [PortState::Ready, PortState::Running])?; - for layer in &mut data.layers { + let PortData { layers, flow_stats, .. } = &mut (*data); + + for layer in layers { if layer.name() == layer_name { self.epoch.fetch_add(1, SeqCst); - layer.set_rules(in_rules, out_rules); + layer.set_rules(in_rules, out_rules, flow_stats); return Ok(()); } } @@ -1691,10 +1735,12 @@ impl Port { let mut data = self.data.write(); check_state!(data.state, [PortState::Ready, PortState::Running])?; - for layer in &mut data.layers { + let PortData { layers, flow_stats, .. } = &mut (*data); + + for layer in layers { if layer.name() == layer_name { self.epoch.fetch_add(1, SeqCst); - layer.set_rules_soft(in_rules, out_rules); + layer.set_rules_soft(in_rules, out_rules, flow_stats); return Ok(()); } } @@ -1981,11 +2027,13 @@ impl Port { xforms: &mut Transforms, ameta: &mut ActionMeta, ) -> result::Result { + let mut ectx = + ExecCtx2 { user_ctx: &self.ectx, stats: &mut data.flow_stats }; + match dir { Direction::Out => { for layer in &mut data.layers { - let res = - layer.process(&self.ectx, dir, pkt, xforms, ameta); + let res = layer.process(&mut ectx, dir, pkt, xforms, ameta); match res { Ok(LayerResult::Allow) => (), @@ -1999,8 +2047,7 @@ impl Port { Direction::In => { for layer in data.layers.iter_mut().rev() { - let res = - layer.process(&self.ectx, dir, pkt, xforms, ameta); + let res = layer.process(&mut ectx, dir, pkt, xforms, ameta); match res { Ok(LayerResult::Allow) => (), @@ -2330,36 +2377,48 @@ impl Port { self.stats.vals.in_uft_miss.incr(1); let mut xforms = Transforms::new(); let res = self.layers_process(data, In, pkt, &mut xforms, ameta); - match res { + + let (ipr, create_flow) = match res { Ok(LayerResult::Allow) => { // If there is no flow ID, then do not create a UFT // entry. - if *ufid_in == FLOW_ID_DEFAULT { - return Ok(InternalProcessResult::Modified); - } + (InternalProcessResult::Modified, *ufid_in != FLOW_ID_DEFAULT) } - Ok(LayerResult::Deny { name, reason }) => { - return Ok(InternalProcessResult::Drop { + Ok(LayerResult::Deny { name, reason }) => ( + InternalProcessResult::Drop { reason: DropReason::Layer { name, reason }, - }); - } + }, + false, + ), Ok(LayerResult::Hairpin(hppkt)) => { - return Ok(InternalProcessResult::Hairpin(hppkt)); + (InternalProcessResult::Hairpin(hppkt), false) } - Ok(LayerResult::HandlePkt) => { - return Ok(InternalProcessResult::from(self.net.handle_pkt( + Ok(LayerResult::HandlePkt) => ( + InternalProcessResult::from(self.net.handle_pkt( In, pkt, &data.uft_in, &data.uft_out, - )?)); - } + )?), + false, + ), + // TODO: Errors as a decision?! Err(e) => return Err(ProcessError::Layer(e)), - } + }; + + let pkt_len = pkt.len() as u64; + let Some(stat_parents) = pkt.meta_mut().stats.terminate( + ipr.stat_action(), + pkt_len, + In, + create_flow, + ) else { + return Ok(ipr); + }; let mut flags = TransformFlags::empty(); if pkt.checksums_dirty() { @@ -2370,12 +2429,16 @@ impl Port { } let ufid_out = pkt.flow().mirror(); + let stat = + data.flow_stats.new_flow(ufid_in, &ufid_out, In, stat_parents); + stat.hit(pkt_len); let mut hte = UftEntry { pair: KMutex::new(Some(ufid_out)), xforms: xforms.compile(flags), epoch, l4_hash: ufid_in.crc32(), tcp_flow: None, + stat, }; // Keep around the comment on the `None` arm @@ -2405,12 +2468,7 @@ impl Port { // For inbound traffic the TCP flow table must be // checked _after_ processing take place. if pkt.meta().is_inner_tcp() { - match self.process_in_tcp( - data, - pkt.meta(), - ufid_in, - pkt.len() as u64, - ) { + match self.process_in_tcp(data, pkt.meta(), ufid_in, pkt_len) { Ok(TcpMaybeClosed::Closed { .. }) => { Ok(InternalProcessResult::Modified) } @@ -2535,6 +2593,8 @@ impl Port { self.stats.vals.out_uft_miss.incr(1); let mut tcp_closed = false; + let pkt_len = pkt.len() as u64; + // For outbound traffic the TCP flow table must be checked // _before_ processing take place. let tcp_flow = if pkt.meta().is_inner_tcp() { @@ -2542,7 +2602,7 @@ impl Port { data, pkt.flow(), pkt.meta(), - pkt.len() as u64, + pkt_len, ) { Ok(TcpMaybeClosed::Closed { ufid_inbound }) => { tcp_closed = true; @@ -2601,46 +2661,76 @@ impl Port { flags |= TransformFlags::INTERNAL_DESTINATION; } + let (ipr, create_flow) = match res { + Ok(LayerResult::Allow) => { + // If there is no flow ID, then do not create a UFT + // entry. + ( + InternalProcessResult::Modified, + flow_before != FLOW_ID_DEFAULT && !tcp_closed, + ) + } + + Ok(LayerResult::Deny { name, reason }) => ( + InternalProcessResult::Drop { + reason: DropReason::Layer { name, reason }, + }, + false, + ), + + Ok(LayerResult::Hairpin(hppkt)) => { + (InternalProcessResult::Hairpin(hppkt), false) + } + + Ok(LayerResult::HandlePkt) => ( + InternalProcessResult::from(self.net.handle_pkt( + Out, + pkt, + &data.uft_in, + &data.uft_out, + )?), + false, + ), + + // TODO: Errors as a decision?! + Err(e) => return Err(ProcessError::Layer(e)), + }; + + let Some(stat_parents) = pkt.meta_mut().stats.terminate( + ipr.stat_action(), + pkt_len, + Out, + create_flow, + ) else { + return Ok(ipr); + }; + + let ufid_out = pkt.flow().mirror(); + let stat = data.flow_stats.new_flow( + &flow_before, + &ufid_out, + Out, + stat_parents, + ); + stat.hit(pkt_len); + let hte = UftEntry { pair: KMutex::new(None), xforms: xforms.compile(flags), epoch, l4_hash: flow_before.crc32(), tcp_flow, + stat, }; - match res { - Ok(LayerResult::Allow) => { - // If there is no Flow ID, then there is no UFT entry. - if flow_before == FLOW_ID_DEFAULT || tcp_closed { - return Ok(InternalProcessResult::Modified); - } - match data.uft_out.add(flow_before, hte) { - Ok(_) => Ok(InternalProcessResult::Modified), - Err(OpteError::MaxCapacity(limit)) => { - Err(ProcessError::FlowTableFull { kind: "UFT", limit }) - } - Err(_) => unreachable!( - "Cannot return other errors from FlowTable::add" - ), - } - } - - Ok(LayerResult::Hairpin(hppkt)) => { - Ok(InternalProcessResult::Hairpin(hppkt)) + match data.uft_out.add(flow_before, hte) { + Ok(_) => Ok(InternalProcessResult::Modified), + Err(OpteError::MaxCapacity(limit)) => { + Err(ProcessError::FlowTableFull { kind: "UFT", limit }) } - - Ok(LayerResult::Deny { name, reason }) => { - Ok(InternalProcessResult::Drop { - reason: DropReason::Layer { name, reason }, - }) + Err(_) => { + unreachable!("Cannot return other errors from FlowTable::add") } - - Ok(LayerResult::HandlePkt) => Ok(InternalProcessResult::from( - self.net.handle_pkt(Out, pkt, &data.uft_in, &data.uft_out)?, - )), - - Err(e) => Err(ProcessError::Layer(e)), } } diff --git a/lib/opte/src/engine/rule.rs b/lib/opte/src/engine/rule.rs index 9ce1030f..f9d9e0e6 100644 --- a/lib/opte/src/engine/rule.rs +++ b/lib/opte/src/engine/rule.rs @@ -68,6 +68,7 @@ use opte_api::Direction; use opte_api::RuleDump; use serde::Deserialize; use serde::Serialize; +use uuid::Uuid; use zerocopy::ByteSliceMut; /// A marker trait indicating a type is an entry acuired from a [`Resource`]. @@ -987,6 +988,7 @@ pub struct Rule { state: S, action: Action, priority: u16, + stat_id: Option, } impl PartialEq for Rule { @@ -1001,6 +1003,10 @@ impl Rule { pub fn action(&self) -> &Action { &self.action } + + pub fn stat_id(&self) -> Option<&Uuid> { + self.stat_id.as_ref() + } } impl Rule { @@ -1010,9 +1016,22 @@ impl Rule { /// any implicit predicates dictated by the action. Additional /// predicates may be added along with the action's implicit ones. pub fn new(priority: u16, action: Action) -> Self { + Rule::new_with_id(priority, action, None) + } + + pub fn new_with_id( + priority: u16, + action: Action, + stat_id: Option, + ) -> Self { let (hdr_preds, data_preds) = action.implicit_preds(); - Rule { state: Ready { hdr_preds, data_preds }, action, priority } + Rule { + state: Ready { hdr_preds, data_preds }, + action, + priority, + stat_id, + } } /// Create a new rule that matches anything. @@ -1023,7 +1042,15 @@ impl Rule { /// useful for making intentions clear that this rule is to match /// anything. pub fn match_any(priority: u16, action: Action) -> Rule { - Rule { state: Finalized { preds: None }, action, priority } + Rule::match_any_with_id(priority, action, None) + } + + pub fn match_any_with_id( + priority: u16, + action: Action, + stat_id: Option, + ) -> Rule { + Rule { state: Finalized { preds: None }, action, priority, stat_id } } /// Add a single [`Predicate`] to the end of the list. @@ -1069,6 +1096,7 @@ impl Rule { state: Finalized { preds }, priority: self.priority, action: self.action, + stat_id: self.stat_id, } } } diff --git a/lib/opte/src/engine/stat.rs b/lib/opte/src/engine/stat.rs new file mode 100644 index 00000000..d03f453c --- /dev/null +++ b/lib/opte/src/engine/stat.rs @@ -0,0 +1,642 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2025 Oxide Computer Company + +//! Flow stat objects modified and tracked as rules and entries are used. + +use crate::api::InnerFlowId; +use crate::ddi::sync::KRwLock; +use crate::ddi::sync::KRwLockType; +use crate::ddi::time::Moment; +use crate::engine::flow_table::Ttl; +use alloc::collections::BTreeMap; +use alloc::collections::BTreeSet; +use alloc::collections::btree_map::Entry; +use alloc::string::String; +use alloc::sync::Arc; +use alloc::sync::Weak; +use alloc::vec::Vec; +use core::sync::atomic::AtomicU64; +use core::sync::atomic::Ordering; +use opte_api::Direction; +use opte_api::FlowStat as ApiFlowStat; +use opte_api::FullCounter as ApiFullCounter; +use opte_api::PacketCounter as ApiPktCounter; +use opte_api::TcpState; +use uuid::Uuid; + +// TODO DELETION ON FLOW CLOSE [and holding onto 'dead flows'] + +/// Opaque identifier for tracking unique stat objects. +#[derive(Copy, Clone, Hash, PartialEq, PartialOrd, Eq, Ord, Debug)] +pub struct StatId(u64); + +impl StatId { + fn new(val: &mut u64) -> Self { + let out = *val; + *val += 1; + StatId(out) + } +} + +/// Reduced form of an action for stats tracking purposes. +#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default)] +pub enum Action { + #[default] + Allow, + Deny, + Hairpin, +} + +pub struct FlowStat { + /// The direction of this flow half. + pub dir: Direction, + /// The other half of this flow. + pub partner: InnerFlowId, + /// `TableStat`s to whom we must return our own `stats`. + pub parents: Vec>, + /// The cached list of IDs of root `TableStat` entries. + pub bases: BTreeSet, + + /// Actual stats associated with this flow. + pub shared: Arc, + + /// When was this flow last updated? + pub last_hit: AtomicU64, +} + +impl FlowStat { + pub fn hit(&self, pkt_size: u64) { + self.hit_at(pkt_size, Moment::now()); + } + + pub fn hit_at(&self, pkt_size: u64, time: Moment) { + self.last_hit.store(time.raw(), Ordering::Relaxed); + self.shared.stats.hit(self.dir, pkt_size); + } +} + +pub struct SharedFlowStat { + /// Actual stats associated with this flow. + pub stats: PacketCounter, + + /// Tcp? + /// + /// Yeah this needs some rework wrt today... + pub tcp: Option, + + /// パケットはどちらにきましたか。 + pub first_dir: Direction, +} + +impl From<&FlowStat> for ApiFlowStat { + fn from(value: &FlowStat) -> Self { + ApiFlowStat { + partner: value.partner, + dir: value.dir, + bases: value.bases.iter().copied().collect(), + stats: (&value.shared.stats).into(), + } + } +} + +pub struct TableStat { + pub id: Option, + + pub parents: Vec>, + pub children: KRwLock>>, + + /// The actual stats! + pub stats: FullCounter, + + /// When was this flow last updated? + pub last_hit: AtomicU64, +} + +impl core::fmt::Debug for TableStat { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + todo!() + } +} + +impl TableStat { + /// Allow a packet which will track local stats via a UFT entry. + pub fn allow(&self) { + self.allow_at(Moment::now()); + } + + /// Allow a packet (at a given timestamp) which will track local stats via + /// a UFT entry. + pub fn allow_at(&self, time: Moment) { + self.last_hit.store(time.raw(), Ordering::Relaxed); + self.stats.allow.fetch_add(1, Ordering::Relaxed); + } + + /// Record an action for a packet which will ultimately be dropped or + /// hairpinned. + pub fn act(&self, action: Action, pkt_size: u64, direction: Direction) { + self.act_at(action, pkt_size, direction, Moment::now()); + } + + /// Record an action for a packet (at a given time) which will ultimately + /// be dropped or hairpinned. + pub fn act_at( + &self, + action: Action, + pkt_size: u64, + direction: Direction, + time: Moment, + ) { + self.last_hit.store(time.raw(), Ordering::Relaxed); + self.stats.packets.hit(direction, pkt_size); + let stat = match action { + Action::Allow => &self.stats.allow, + Action::Deny => &self.stats.deny, + Action::Hairpin => &self.stats.hairpin, + }; + stat.fetch_add(1, Ordering::Relaxed); + } +} + +pub struct PacketCounter { + pub id: StatId, + pub created_at: Moment, + + pub pkts_in: AtomicU64, + pub bytes_in: AtomicU64, + pub pkts_out: AtomicU64, + pub bytes_out: AtomicU64, +} + +impl PacketCounter { + fn from_next_id(id: &mut u64) -> PacketCounter { + PacketCounter { + id: StatId::new(id), + created_at: Moment::now(), + + pkts_in: 0.into(), + bytes_in: 0.into(), + pkts_out: 0.into(), + bytes_out: 0.into(), + } + } + + #[inline] + fn hit(&self, direction: Direction, pkt_size: u64) { + let (pkts, bytes) = match direction { + Direction::In => (&self.pkts_in, &self.bytes_in), + Direction::Out => (&self.pkts_out, &self.bytes_out), + }; + pkts.fetch_add(1, Ordering::Relaxed); + bytes.fetch_add(pkt_size, Ordering::Relaxed); + } + + fn combine(&self, into: &Self) { + into.pkts_in + .fetch_add(self.pkts_in.load(Ordering::Relaxed), Ordering::Relaxed); + into.bytes_in.fetch_add( + self.bytes_in.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + into.pkts_out.fetch_add( + self.pkts_out.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + into.bytes_out.fetch_add( + self.bytes_out.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + } +} + +impl From<&PacketCounter> for ApiPktCounter { + fn from(val: &PacketCounter) -> Self { + ApiPktCounter { + created_at: val.created_at.raw(), + pkts_in: val.pkts_in.load(Ordering::Relaxed), + bytes_in: val.bytes_in.load(Ordering::Relaxed), + pkts_out: val.pkts_out.load(Ordering::Relaxed), + bytes_out: val.bytes_out.load(Ordering::Relaxed), + } + } +} + +pub struct FullCounter { + pub allow: AtomicU64, + pub deny: AtomicU64, + pub hairpin: AtomicU64, + pub packets: PacketCounter, +} + +impl FullCounter { + fn from_next_id(id: &mut u64) -> FullCounter { + FullCounter { + allow: 0.into(), + deny: 0.into(), + hairpin: 0.into(), + packets: PacketCounter::from_next_id(id), + } + } + + fn combine(&self, into: &Self) { + into.packets.combine(&self.packets); + into.allow + .fetch_add(self.allow.load(Ordering::Relaxed), Ordering::Relaxed); + into.deny + .fetch_add(self.deny.load(Ordering::Relaxed), Ordering::Relaxed); + into.hairpin + .fetch_add(self.hairpin.load(Ordering::Relaxed), Ordering::Relaxed); + } + + #[inline] + fn id(&self) -> StatId { + self.packets.id + } +} + +impl From<&FullCounter> for ApiFullCounter { + fn from(val: &FullCounter) -> Self { + ApiFullCounter { + packets: (&val.packets).into(), + allow: val.allow.load(Ordering::Relaxed), + deny: val.deny.load(Ordering::Relaxed), + hairpin: val.hairpin.load(Ordering::Relaxed), + } + } +} + +pub trait FoldStat: Send + Sync { + fn fold(&self, into: &FullCounter, visited: &mut BTreeSet); +} + +impl FoldStat for FlowStat { + fn fold(&self, into: &FullCounter, visited: &mut BTreeSet) { + if !visited.insert(self.shared.stats.id) { + self.shared.stats.combine(&into.packets); + } + } +} + +impl FoldStat for TableStat { + fn fold(&self, into: &FullCounter, visited: &mut BTreeSet) { + if !visited.insert(self.stats.id()) { + self.stats.combine(into); + } + } +} + +/// Tracking/handling of all stats. +/// +/// ?? Describe? +#[derive(Default)] +pub struct StatTree { + next_id: u64, + roots: BTreeMap>, + intermediate: Vec>, + flows: BTreeMap>, +} + +impl StatTree { + /// Gets or creates the root stat for a given UUID. + /// + /// Allocates a new UUID if none is provided. + pub fn root(&mut self, uuid: Option) -> Arc { + let uuid = uuid.unwrap_or_else(|| Uuid::from_u64_pair(0, self.next_id)); + let ids = &mut self.next_id; + + self.roots + .entry(uuid) + .or_insert_with_key(|id| { + let mut children = KRwLock::new(vec![]); + children.init(KRwLockType::Driver); + + Arc::new(TableStat { + id: Some(*id), + parents: vec![], + children, + stats: FullCounter::from_next_id(ids), + last_hit: Moment::now().raw().into(), + }) + }) + .clone() + } + + pub fn new_intermediate( + &mut self, + parents: Vec>, + ) -> Arc { + let mut children = KRwLock::new(vec![]); + children.init(KRwLockType::Driver); + + let out = Arc::new(TableStat { + id: None, + parents, + children, + stats: FullCounter::from_next_id(&mut self.next_id), + last_hit: Moment::now().raw().into(), + }); + + for parent in &out.parents { + let mut p_children = parent.children.write(); + let weak = Arc::downgrade(&out); + p_children.push(weak); + } + + self.intermediate.push(out.clone()); + + out + } + + pub fn new_flow( + &mut self, + flow_id: &InnerFlowId, + partner_flow: &InnerFlowId, + dir: Direction, + parents: Vec>, + ) -> Arc { + if let Entry::Occupied(e) = self.flows.entry(*flow_id) { + // TODO: what to do with (maybe new) parents & bases?! + // I *think* these should win out, insert, and preserve + // the old stats. Need to think about it. + return e.get().clone(); + } + + let bases = get_base_ids(&parents); + + let out = match self.flows.entry(*partner_flow) { + // Miss, but existing partner. + Entry::Occupied(partner) => Arc::new(FlowStat { + dir, + partner: *partner_flow, + parents, + bases, + shared: partner.get().shared.clone(), + last_hit: Moment::now().raw().into(), + }), + // Miss, no partner. + Entry::Vacant(_) => { + Arc::new(FlowStat { + dir, + partner: *partner_flow, + parents, + bases, + shared: Arc::new(SharedFlowStat { + stats: PacketCounter::from_next_id(&mut self.next_id), + // TODO + tcp: None, + first_dir: dir, + }), + last_hit: Moment::now().raw().into(), + }) + } + }; + + // Proven a miss on flow_id already + let _ = self.flows.insert(*flow_id, out.clone()); + out + } + + pub fn expire(&mut self, now: Moment) { + const EXPIRY_WINDOW: Ttl = Ttl::new_seconds(10); + // Root removal and re-entry? Don't want any gaps. + const ROOT_EXPIRY_WINDOW: Ttl = Ttl::new_seconds(100); + + #[derive(Default, Eq, PartialEq)] + enum Hmm { + #[default] + NotSeen, + SeenKeep, + Seen(InnerFlowId), + } + + #[derive(Default)] + struct Aa { + lhs: Hmm, + rhs: Hmm, + } + + // Flows -- need to account for shared component between arc'd things. + let mut possibly_expired: BTreeMap = BTreeMap::new(); + for (k, v) in &self.flows { + let t_hit = + Moment::from_raw_nanos(v.last_hit.load(Ordering::Relaxed)); + let can_remove = EXPIRY_WINDOW.is_expired(t_hit, now) + && Arc::strong_count(v) == 1; + let base_id = v.shared.stats.id; + let el = possibly_expired.entry(base_id).or_default(); + match (v.dir, can_remove) { + (Direction::In, false) => { + el.lhs = Hmm::SeenKeep; + } + (Direction::Out, false) => { + el.rhs = Hmm::SeenKeep; + } + (Direction::In, true) => { + el.lhs = Hmm::Seen(*k); + } + (Direction::Out, true) => { + el.rhs = Hmm::Seen(*k); + } + } + } + for v in possibly_expired.values() { + let cannot_remove = v.lhs == Hmm::SeenKeep + || v.rhs == Hmm::SeenKeep + || (v.lhs == Hmm::NotSeen && v.rhs == Hmm::NotSeen); + if cannot_remove { + continue; + } + + #[allow(clippy::mutable_key_type)] + let mut parents: BTreeSet = Default::default(); + let mut base_stats = None; + if let Hmm::Seen(id) = v.lhs { + if let Some(flow) = self.flows.remove(&id) { + let flow = Arc::into_inner(flow) + .expect("strong count 1 is enforced above"); + for p_id in flow.parents { + parents.insert(ById(p_id)); + } + base_stats = Some(flow.shared); + } + } + if let Hmm::Seen(id) = v.rhs { + if let Some(flow) = self.flows.remove(&id) { + let flow = Arc::into_inner(flow) + .expect("strong count 1 is enforced above"); + for p_id in flow.parents { + parents.insert(ById(p_id)); + } + base_stats = Some(flow.shared); + } + } + + // At long last, combine! + let base_stats = + base_stats.expect("should not have no parent here!!"); + for parent in parents { + base_stats.stats.combine(&parent.0.stats.packets); + } + } + + // Intermediates. + self.intermediate.retain(|v| { + // Time is... not relevant here. The LFTs are GONE. + if Arc::strong_count(v) == 1 { + for p in &v.parents { + v.stats.combine(&p.stats); + } + false + } else { + true + } + }); + + // Roots may need to be held onto for some time in case rules with the + // same ID come and go in adjacent control plane operations... + self.roots.retain(|_, v| { + let t_hit = + Moment::from_raw_nanos(v.last_hit.load(Ordering::Relaxed)); + Arc::strong_count(v) > 1 + || !ROOT_EXPIRY_WINDOW.is_expired(t_hit, now) + }); + } + + // TEMP + pub fn dump(&self) -> String { + let mut out = String::new(); + out.push_str("--Roots--\n"); + for (id, root) in &self.roots { + let d = ApiFullCounter::from(&root.stats); + out.push_str(&format!("\t{:?}/{id} -> {d:?}\n", root.stats.id())); + } + out.push_str("----\n\n"); + out.push_str("--Ints--\n"); + for root in &self.intermediate { + let d = ApiFullCounter::from(&root.stats); + out.push_str(&format!("\t{:?} -> {d:?}\n", root.stats.id())); + let parents: Vec> = + root.parents.iter().map(|v| v.id).collect(); + out.push_str(&format!("\t\tparents {parents:?}\n\n")); + } + out.push_str("----\n\n"); + out.push_str("--Flows--\n"); + for (id, stat) in &self.flows { + // let d: ApiFlowStat = stat.as_ref().into(); + let d: ApiPktCounter = (&stat.as_ref().shared.stats).into(); + let parents: Vec<_> = + stat.parents.iter().map(|v| v.stats.id()).collect(); + out.push_str(&format!("\t{id}/{} ->\n", stat.dir)); + out.push_str(&format!("\t\t{:?} {d:?}\n", stat.shared.stats.id)); + out.push_str(&format!("\t\tparents {:?}\n", parents)); + out.push_str(&format!("\t\tbases {:?}\n\n", stat.bases)); + } + out.push_str("----\n"); + out + } +} + +fn get_base_ids(parents: &[Arc]) -> BTreeSet { + let mut out = BTreeSet::new(); + + let mut work_set = parents.to_vec(); + while let Some(el) = work_set.pop() { + work_set.extend_from_slice(&el.parents); + if let Some(id) = el.id { + out.insert(id); + } + } + + out +} + +/// XXX holds stats as they arrive on a packet. +pub struct FlowStatBuilder { + parents: Vec>, + layer_end: usize, +} + +impl FlowStatBuilder { + pub fn new() -> Self { + Self { + // TODO: do we want this cfg'able? + parents: Vec::with_capacity(16), + layer_end: 0, + } + } + + /// Push a parent onto this flow. + pub fn push(&mut self, parent: Arc) { + self.parents.push(parent); + } + + /// Mark all current parents as [`Action::Allow`]. + pub fn new_layer(&mut self) { + self.layer_end = self.parents.len(); + } + + /// Return a list of stat parents if this packet is bound for flow creation. + pub fn terminate( + &mut self, + action: Action, + pkt_size: u64, + direction: Direction, + create_flow: bool, + ) -> Option>> { + match action { + Action::Allow if create_flow => { + self.parents.iter().for_each(|v| v.allow()); + // TODO: should *take*? + Some(self.parents.clone()) + } + Action::Allow => { + self.parents + .iter() + .for_each(|v| v.act(action, pkt_size, direction)); + None + } + Action::Deny | Action::Hairpin => { + let (accepted, last_layer) = + self.parents.split_at(self.layer_end); + accepted + .iter() + .for_each(|v| v.act(Action::Allow, pkt_size, direction)); + last_layer + .iter() + .for_each(|v| v.act(action, pkt_size, direction)); + + None + } + } + } +} + +impl Default for FlowStatBuilder { + fn default() -> Self { + Self::new() + } +} + +struct ById(Arc); + +impl PartialOrd for ById { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ById { + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + self.0.stats.id().cmp(&other.0.stats.id()) + } +} + +impl PartialEq for ById { + fn eq(&self, other: &Self) -> bool { + self.0.stats.id() == other.0.stats.id() + } +} + +impl Eq for ById {} diff --git a/lib/opte/src/lib.rs b/lib/opte/src/lib.rs index 33ee13df..a3659214 100644 --- a/lib/opte/src/lib.rs +++ b/lib/opte/src/lib.rs @@ -30,6 +30,8 @@ extern crate self as opte; use alloc::boxed::Box; use core::fmt; use core::fmt::Display; +#[cfg(any(feature = "engine", test))] +use engine::stat::StatTree; pub use ingot; @@ -253,3 +255,9 @@ impl LogProvider for KernelLog { pub struct ExecCtx { pub log: Box, } + +#[cfg(any(feature = "engine", test))] +pub(crate) struct ExecCtx2<'a> { + pub user_ctx: &'a ExecCtx, + pub stats: &'a mut StatTree, +} diff --git a/lib/oxide-vpc/src/api.rs b/lib/oxide-vpc/src/api/mod.rs similarity index 99% rename from lib/oxide-vpc/src/api.rs rename to lib/oxide-vpc/src/api/mod.rs index 7f8464f5..89b05a6f 100644 --- a/lib/oxide-vpc/src/api.rs +++ b/lib/oxide-vpc/src/api/mod.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2024 Oxide Computer Company +// Copyright 2025 Oxide Computer Company use alloc::collections::BTreeMap; use alloc::collections::BTreeSet; @@ -19,6 +19,8 @@ use serde::Deserialize; use serde::Serialize; use uuid::Uuid; +pub mod stat; + /// This is the MAC address that OPTE uses to act as the virtual gateway. pub const GW_MAC_ADDR: MacAddr = MacAddr::from_const([0xA8, 0x40, 0x25, 0xFF, 0x77, 0x77]); @@ -650,6 +652,14 @@ pub struct FirewallRule { pub priority: u16, } +// TEMP +#[derive(Debug, Deserialize, Serialize)] +pub struct DumpFlowStatsResp { + pub data: String, +} + +impl CmdOk for DumpFlowStatsResp {} + impl FromStr for FirewallRule { type Err = String; diff --git a/lib/oxide-vpc/src/api/stat.rs b/lib/oxide-vpc/src/api/stat.rs new file mode 100644 index 00000000..b88abede --- /dev/null +++ b/lib/oxide-vpc/src/api/stat.rs @@ -0,0 +1,33 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2025 Oxide Computer Company + +//! Stat IDs for the Oxide VPC API. + +use uuid::Uuid; + +pub static FW_DEFAULT_IN: Uuid = + Uuid::from_fields(0x01de_f00d, 0x7777, 0x0000, &0u64.to_be_bytes()); +pub static FW_DEFAULT_OUT: Uuid = + Uuid::from_fields(0x01de_f00d, 0x7777, 0x0000, &1u64.to_be_bytes()); + +pub static GATEWAY_NOSPOOF_IN: Uuid = + Uuid::from_fields(0x01de_f00d, 0x7777, 0x0001, &0u64.to_be_bytes()); +pub static GATEWAY_NOSPOOF_OUT: Uuid = + Uuid::from_fields(0x01de_f00d, 0x7777, 0x0001, &1u64.to_be_bytes()); + +pub static ROUTER_NOROUTE: Uuid = + Uuid::from_fields(0x01de_f00d, 0x7777, 0x0002, &0u64.to_be_bytes()); + +pub static NAT_SNAT_V4: Uuid = + Uuid::from_fields(0x01de_f00d, 0x7777, 0x0003, &0u64.to_be_bytes()); +pub static NAT_SNAT_V6: Uuid = + Uuid::from_fields(0x01de_f00d, 0x7777, 0x0003, &1u64.to_be_bytes()); +pub static NAT_VALID_IGW_V4: Uuid = + Uuid::from_fields(0x01de_f00d, 0x7777, 0x0003, &2u64.to_be_bytes()); +pub static NAT_VALID_IGW_V6: Uuid = + Uuid::from_fields(0x01de_f00d, 0x7777, 0x0003, &3u64.to_be_bytes()); +pub static NAT_NONE: Uuid = + Uuid::from_fields(0x01de_f00d, 0x7777, 0x0003, &255u64.to_be_bytes()); diff --git a/lib/oxide-vpc/src/engine/firewall.rs b/lib/oxide-vpc/src/engine/firewall.rs index 9c27b8ce..73f670d3 100644 --- a/lib/oxide-vpc/src/engine/firewall.rs +++ b/lib/oxide-vpc/src/engine/firewall.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2023 Oxide Computer Company +// Copyright 2025 Oxide Computer Company //! The Oxide VPC firewall. //! @@ -18,6 +18,7 @@ use crate::api::Ports; pub use crate::api::ProtoFilter; use crate::api::RemFwRuleReq; use crate::api::SetFwRulesReq; +use crate::api::stat::*; use crate::engine::overlay::ACTION_META_VNI; use alloc::string::ToString; use core::num::NonZeroU32; @@ -48,7 +49,25 @@ pub fn setup( pb: &mut PortBuilder, ft_limit: NonZeroU32, ) -> Result<(), OpteError> { - let fw_layer = Firewall::create_layer(pb.name(), ft_limit); + // The inbound side of the firewall is a filtering layer, only + // traffic explicitly allowed should pass. By setting the + // default inbound action to deny we effectively implement the + // implied "implied deny inbound" rule as speficied in RFD 63 + // §2.8.1. + // + // RFD 63 §2.8.1 also states that all outbond traffic should + // be allowed by default, aka the "implied allow outbound" + // rule. Therefore, we set the default outbound action to + // allow. + let actions = LayerActions { + default_in: DefaultAction::Deny, + default_in_stat_id: Some(FW_DEFAULT_IN), + default_out: DefaultAction::StatefulAllow, + default_out_stat_id: Some(FW_DEFAULT_OUT), + ..Default::default() + }; + + let fw_layer = Layer::new(FW_LAYER_NAME, pb, actions, ft_limit); pb.add_layer(fw_layer, Pos::First) } @@ -124,28 +143,6 @@ pub fn from_fw_rule(fw_rule: FirewallRule, action: Action) -> Rule { rule.finalize() } -impl Firewall { - pub fn create_layer(port_name: &str, ft_limit: NonZeroU32) -> Layer { - // The inbound side of the firewall is a filtering layer, only - // traffic explicitly allowed should pass. By setting the - // default inbound action to deny we effectively implement the - // implied "implied deny inbound" rule as speficied in RFD 63 - // §2.8.1. - // - // RFD 63 §2.8.1 also states that all outbond traffic should - // be allowed by default, aka the "implied allow outbound" - // rule. Therefore, we set the default outbound action to - // allow. - let actions = LayerActions { - actions: vec![], - default_in: DefaultAction::Deny, - default_out: DefaultAction::StatefulAllow, - }; - - Layer::new(FW_LAYER_NAME, port_name, actions, ft_limit) - } -} - impl ProtoFilter { pub fn into_predicate(self) -> Option { match self { diff --git a/lib/oxide-vpc/src/engine/gateway/arp.rs b/lib/oxide-vpc/src/engine/gateway/arp.rs index d530ce16..659bcb0e 100644 --- a/lib/oxide-vpc/src/engine/gateway/arp.rs +++ b/lib/oxide-vpc/src/engine/gateway/arp.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2023 Oxide Computer Company +// Copyright 2025 Oxide Computer Company //! The ARP implementation of the Virtual Gateway. @@ -17,8 +17,13 @@ use opte::engine::predicate::EtherTypeMatch; use opte::engine::predicate::Predicate; use opte::engine::rule::Action; use opte::engine::rule::Rule; +use opte::engine::stat::StatTree; -pub fn setup(layer: &mut Layer, cfg: &VpcCfg) -> Result<(), OpteError> { +pub fn setup( + layer: &mut Layer, + cfg: &VpcCfg, + stats: &mut StatTree, +) -> Result<(), OpteError> { // ================================================================ // Outbound ARP Request for Gateway, from Guest // @@ -33,7 +38,7 @@ pub fn setup(layer: &mut Layer, cfg: &VpcCfg) -> Result<(), OpteError> { )]), Predicate::InnerEtherSrc(vec![EtherAddrMatch::Exact(cfg.guest_mac)]), ]); - layer.add_rule(Direction::Out, rule.finalize()); + layer.add_rule(Direction::Out, rule.finalize(), stats); Ok(()) } diff --git a/lib/oxide-vpc/src/engine/gateway/dhcp.rs b/lib/oxide-vpc/src/engine/gateway/dhcp.rs index d10698e6..e008cbc0 100644 --- a/lib/oxide-vpc/src/engine/gateway/dhcp.rs +++ b/lib/oxide-vpc/src/engine/gateway/dhcp.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2024 Oxide Computer Company +// Copyright 2025 Oxide Computer Company //! The DHCP implementation of the Virtual Gateway. @@ -21,12 +21,14 @@ use opte::engine::ip::v4::Ipv4Cidr; use opte::engine::layer::Layer; use opte::engine::rule::Action; use opte::engine::rule::Rule; +use opte::engine::stat::StatTree; pub fn setup( layer: &mut Layer, cfg: &VpcCfg, ip_cfg: &Ipv4Cfg, dhcp_cfg: DhcpCfg, + stats: &mut StatTree, ) -> Result<(), OpteError> { // All guest interfaces live on a `/32`-network in the Oxide VPC; // restricting the L2 domain to two nodes: the guest NIC and the @@ -91,9 +93,9 @@ pub fn setup( })); let discover_rule = Rule::new(1, offer); - layer.add_rule(Direction::Out, discover_rule.finalize()); + layer.add_rule(Direction::Out, discover_rule.finalize(), stats); let request_rule = Rule::new(1, ack); - layer.add_rule(Direction::Out, request_rule.finalize()); + layer.add_rule(Direction::Out, request_rule.finalize(), stats); Ok(()) } diff --git a/lib/oxide-vpc/src/engine/gateway/dhcpv6.rs b/lib/oxide-vpc/src/engine/gateway/dhcpv6.rs index 00bbec2a..071cd3b3 100644 --- a/lib/oxide-vpc/src/engine/gateway/dhcpv6.rs +++ b/lib/oxide-vpc/src/engine/gateway/dhcpv6.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2023 Oxide Computer Company +// Copyright 2025 Oxide Computer Company //! The DHCPv6 implementation of the Virtual Gateway. @@ -17,11 +17,13 @@ use opte::engine::dhcpv6::LeasedAddress; use opte::engine::layer::Layer; use opte::engine::rule::Action; use opte::engine::rule::Rule; +use opte::engine::stat::StatTree; pub fn setup( layer: &mut Layer, cfg: &VpcCfg, dhcp_cfg: DhcpCfg, + stats: &mut StatTree, ) -> Result<(), OpteError> { let ip_cfg = match cfg.ipv6_cfg() { None => return Ok(()), @@ -44,6 +46,6 @@ pub fn setup( let server = Action::Hairpin(Arc::new(action)); let rule = Rule::new(1, server); - layer.add_rule(Direction::Out, rule.finalize()); + layer.add_rule(Direction::Out, rule.finalize(), stats); Ok(()) } diff --git a/lib/oxide-vpc/src/engine/gateway/icmp.rs b/lib/oxide-vpc/src/engine/gateway/icmp.rs index c4c48550..c08d4067 100644 --- a/lib/oxide-vpc/src/engine/gateway/icmp.rs +++ b/lib/oxide-vpc/src/engine/gateway/icmp.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2023 Oxide Computer Company +// Copyright 2025 Oxide Computer Company //! The ICMP implementation of the Virtual Gateway. @@ -15,11 +15,13 @@ use opte::engine::icmp::v4::IcmpEchoReply; use opte::engine::layer::Layer; use opte::engine::rule::Action; use opte::engine::rule::Rule; +use opte::engine::stat::StatTree; pub fn setup( layer: &mut Layer, cfg: &VpcCfg, ip_cfg: &Ipv4Cfg, + stats: &mut StatTree, ) -> Result<(), OpteError> { // ================================================================ // ICMPv4 Echo Reply @@ -33,6 +35,6 @@ pub fn setup( echo_dst_ip: ip_cfg.gateway_ip, })); let rule = Rule::new(1, reply); - layer.add_rule(Direction::Out, rule.finalize()); + layer.add_rule(Direction::Out, rule.finalize(), stats); Ok(()) } diff --git a/lib/oxide-vpc/src/engine/gateway/icmpv6.rs b/lib/oxide-vpc/src/engine/gateway/icmpv6.rs index 2821325b..ee17ea45 100644 --- a/lib/oxide-vpc/src/engine/gateway/icmpv6.rs +++ b/lib/oxide-vpc/src/engine/gateway/icmpv6.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2023 Oxide Computer Company +// Copyright 2025 Oxide Computer Company //! The ICMPv6 implementation of the Virtual Gateway. @@ -19,6 +19,7 @@ use opte::engine::layer::Layer; use opte::engine::predicate::DataPredicate; use opte::engine::rule::Action; use opte::engine::rule::Rule; +use opte::engine::stat::StatTree; use smoltcp::wire::Icmpv6Message; // Add support for ICMPv6: @@ -38,6 +39,7 @@ pub fn setup( layer: &mut Layer, cfg: &VpcCfg, ip_cfg: &Ipv6Cfg, + stats: &mut StatTree, ) -> Result<(), OpteError> { let dst_ip = Ipv6Addr::from_eui64(&cfg.gateway_mac); let hairpins = [ @@ -87,7 +89,7 @@ pub fn setup( hairpins.into_iter().enumerate().for_each(|(i, action)| { let priority = u16::try_from(i + 1).unwrap(); let rule = Rule::new(priority, action); - layer.add_rule(Direction::Out, rule.finalize()); + layer.add_rule(Direction::Out, rule.finalize(), stats); }); // Filter any uncaught in/out-bound NDP traffic. @@ -99,11 +101,11 @@ pub fn setup( let mut ndp_filter = Rule::new(next_out_prio, Action::Deny); ndp_filter.add_data_predicate(pred); - layer.add_rule(Direction::Out, ndp_filter.finalize()); + layer.add_rule(Direction::Out, ndp_filter.finalize(), stats); let mut ndp_filter = Rule::new(1, Action::Deny); ndp_filter.add_data_predicate(in_pred); - layer.add_rule(Direction::In, ndp_filter.finalize()); + layer.add_rule(Direction::In, ndp_filter.finalize(), stats); Ok(()) } diff --git a/lib/oxide-vpc/src/engine/gateway/mod.rs b/lib/oxide-vpc/src/engine/gateway/mod.rs index b8d6a580..b24e6e50 100644 --- a/lib/oxide-vpc/src/engine/gateway/mod.rs +++ b/lib/oxide-vpc/src/engine/gateway/mod.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2024 Oxide Computer Company +// Copyright 2025 Oxide Computer Company //! The Oxide VPC Virtual Gateway. //! @@ -42,6 +42,7 @@ use crate::api::DhcpCfg; use crate::api::MacAddr; +use crate::api::stat::*; use crate::cfg::Ipv4Cfg; use crate::cfg::Ipv6Cfg; use crate::cfg::VpcCfg; @@ -89,7 +90,7 @@ pub use transit::*; pub const NAME: &str = "gateway"; pub fn setup( - pb: &PortBuilder, + pb: &mut PortBuilder, cfg: &VpcCfg, vpc_mappings: Arc, ft_limit: core::num::NonZeroU32, @@ -104,15 +105,18 @@ pub fn setup( // Since we are acting as a gateway we also rewrite the source MAC address // for inbound traffic to be that of the gateway. let actions = LayerActions { - actions: vec![], default_in: DefaultAction::Deny, + default_in_stat_id: Some(GATEWAY_NOSPOOF_IN), default_out: DefaultAction::Deny, + default_out_stat_id: Some(GATEWAY_NOSPOOF_IN), + ..Default::default() }; - let mut layer = Layer::new(NAME, pb.name(), actions, ft_limit); + let mut layer = Layer::new(NAME, pb, actions, ft_limit); if let Some(ipv4_cfg) = cfg.ipv4_cfg() { setup_ipv4( + pb, &mut layer, cfg, ipv4_cfg, @@ -122,7 +126,14 @@ pub fn setup( } if let Some(ipv6_cfg) = cfg.ipv6_cfg() { - setup_ipv6(&mut layer, cfg, ipv6_cfg, vpc_mappings, dhcp_cfg.clone())?; + setup_ipv6( + pb, + &mut layer, + cfg, + ipv6_cfg, + vpc_mappings, + dhcp_cfg.clone(), + )?; } pb.add_layer(layer, Pos::Before("firewall")) @@ -161,15 +172,17 @@ impl StaticAction for RewriteSrcMac { } fn setup_ipv4( + pb: &mut PortBuilder, layer: &mut Layer, cfg: &VpcCfg, ip_cfg: &Ipv4Cfg, vpc_mappings: Arc, dhcp_cfg: DhcpCfg, ) -> Result<(), OpteError> { - arp::setup(layer, cfg)?; - dhcp::setup(layer, cfg, ip_cfg, dhcp_cfg)?; - icmp::setup(layer, cfg, ip_cfg)?; + let stats = pb.stats_mut(); + arp::setup(layer, cfg, stats)?; + dhcp::setup(layer, cfg, ip_cfg, dhcp_cfg, stats)?; + icmp::setup(layer, cfg, ip_cfg, stats)?; let vpc_meta = Arc::new(VpcMeta::new(vpc_mappings)); @@ -180,7 +193,7 @@ fn setup_ipv4( nospoof_out.add_predicate(Predicate::InnerEtherSrc(vec![ EtherAddrMatch::Exact(cfg.guest_mac), ])); - layer.add_rule(Direction::Out, nospoof_out.finalize()); + layer.add_rule(Direction::Out, nospoof_out.finalize(), stats); let mut unicast_in = Rule::new( 1000, @@ -194,20 +207,22 @@ fn setup_ipv4( unicast_in.add_predicate(Predicate::InnerEtherDst(vec![ EtherAddrMatch::Exact(cfg.guest_mac), ])); - layer.add_rule(Direction::In, unicast_in.finalize()); + layer.add_rule(Direction::In, unicast_in.finalize(), stats); Ok(()) } fn setup_ipv6( + pb: &mut PortBuilder, layer: &mut Layer, cfg: &VpcCfg, ip_cfg: &Ipv6Cfg, vpc_mappings: Arc, dhcp_cfg: DhcpCfg, ) -> Result<(), OpteError> { - icmpv6::setup(layer, cfg, ip_cfg)?; - dhcpv6::setup(layer, cfg, dhcp_cfg)?; + let stats = pb.stats_mut(); + icmpv6::setup(layer, cfg, ip_cfg, stats)?; + dhcpv6::setup(layer, cfg, dhcp_cfg, stats)?; let vpc_meta = Arc::new(VpcMeta::new(vpc_mappings)); let mut nospoof_out = Rule::new(1000, Action::Meta(vpc_meta)); nospoof_out.add_predicate(Predicate::InnerSrcIp6(vec![ @@ -216,7 +231,7 @@ fn setup_ipv6( nospoof_out.add_predicate(Predicate::InnerEtherSrc(vec![ EtherAddrMatch::Exact(cfg.guest_mac), ])); - layer.add_rule(Direction::Out, nospoof_out.finalize()); + layer.add_rule(Direction::Out, nospoof_out.finalize(), stats); let mut unicast_in = Rule::new( 1000, @@ -230,7 +245,7 @@ fn setup_ipv6( unicast_in.add_predicate(Predicate::InnerEtherDst(vec![ EtherAddrMatch::Exact(cfg.guest_mac), ])); - layer.add_rule(Direction::In, unicast_in.finalize()); + layer.add_rule(Direction::In, unicast_in.finalize(), stats); Ok(()) } diff --git a/lib/oxide-vpc/src/engine/nat.rs b/lib/oxide-vpc/src/engine/nat.rs index 2251a246..8fc14063 100644 --- a/lib/oxide-vpc/src/engine/nat.rs +++ b/lib/oxide-vpc/src/engine/nat.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2023 Oxide Computer Company +// Copyright 2025 Oxide Computer Company use super::VpcNetwork; use super::router::ROUTER_LAYER_NAME; @@ -10,6 +10,7 @@ use super::router::RouterTargetClass; use super::router::RouterTargetInternal; use crate::api::ExternalIpCfg; use crate::api::SetExternalIpsReq; +use crate::api::stat::*; use crate::cfg::IpCfg; use crate::cfg::Ipv4Cfg; use crate::cfg::Ipv6Cfg; @@ -101,14 +102,16 @@ pub fn setup( // but no valid replacement source IP must be dropped, otherwise it will // be forwarded to boundary services. let actions = LayerActions { - actions: vec![], default_in: DefaultAction::Allow, + default_in_stat_id: Some(NAT_NONE), default_out: DefaultAction::Allow, + default_out_stat_id: Some(NAT_NONE), + ..Default::default() }; - let mut layer = Layer::new(NAT_LAYER_NAME, pb.name(), actions, ft_limit); + let mut layer = Layer::new(NAT_LAYER_NAME, pb, actions, ft_limit); let (in_rules, out_rules) = create_nat_rules(cfg, None)?; - layer.set_rules(in_rules, out_rules); + layer.set_rules(in_rules, out_rules, pb.stats_mut()); pb.add_layer(layer, Pos::After(ROUTER_LAYER_NAME)) } @@ -288,8 +291,11 @@ fn setup_ipv4_nat( let snat = Arc::new(snat); for igw_id in igw_matches { - let mut rule = - Rule::new(SNAT_PRIORITY, Action::Stateful(snat.clone())); + let mut rule = Rule::new_with_id( + SNAT_PRIORITY, + Action::Stateful(snat.clone()), + Some(NAT_SNAT_V4), + ); rule.add_predicate(Predicate::InnerEtherType(vec![ EtherTypeMatch::Exact(ETHER_TYPE_IPV4), @@ -437,8 +443,11 @@ fn setup_ipv6_nat( let snat = Arc::new(snat); for igw_id in igw_matches { - let mut rule = - Rule::new(SNAT_PRIORITY, Action::Stateful(snat.clone())); + let mut rule = Rule::new_with_id( + SNAT_PRIORITY, + Action::Stateful(snat.clone()), + Some(NAT_SNAT_V6), + ); rule.add_predicate(Predicate::InnerEtherType(vec![ EtherTypeMatch::Exact(ETHER_TYPE_IPV6), diff --git a/lib/oxide-vpc/src/engine/overlay.rs b/lib/oxide-vpc/src/engine/overlay.rs index 396fbc8d..607c55b7 100644 --- a/lib/oxide-vpc/src/engine/overlay.rs +++ b/lib/oxide-vpc/src/engine/overlay.rs @@ -71,7 +71,7 @@ use poptrie::Poptrie; pub const OVERLAY_LAYER_NAME: &str = "overlay"; pub fn setup( - pb: &PortBuilder, + pb: &mut PortBuilder, cfg: &VpcCfg, v2p: Arc, v2b: Arc, @@ -92,14 +92,14 @@ pub fn setup( actions: vec![encap, decap], default_in: DefaultAction::Deny, default_out: DefaultAction::Deny, + ..Default::default() }; - let mut layer = - Layer::new(OVERLAY_LAYER_NAME, pb.name(), actions, ft_limit); + let mut layer = Layer::new(OVERLAY_LAYER_NAME, pb, actions, ft_limit); let encap_rule = Rule::match_any(1, layer.action(0).unwrap()); - layer.add_rule(Direction::Out, encap_rule); + layer.add_rule(Direction::Out, encap_rule, pb.stats_mut()); let decap_rule = Rule::match_any(1, layer.action(1).unwrap()); - layer.add_rule(Direction::In, decap_rule); + layer.add_rule(Direction::In, decap_rule, pb.stats_mut()); // NOTE The First/Last positions cannot fail; perhaps I should // improve the API to avoid the unwrap(). pb.add_layer(layer, Pos::Last) diff --git a/lib/oxide-vpc/src/engine/router.rs b/lib/oxide-vpc/src/engine/router.rs index c05f7882..fbd34458 100644 --- a/lib/oxide-vpc/src/engine/router.rs +++ b/lib/oxide-vpc/src/engine/router.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2024 Oxide Computer Company +// Copyright 2025 Oxide Computer Company //! The Oxide Network VPC Router. //! @@ -13,6 +13,7 @@ use super::firewall as fw; use crate::api::DelRouterEntryResp; use crate::api::RouterClass; use crate::api::RouterTarget; +use crate::api::stat::*; use crate::cfg::VpcCfg; use alloc::string::String; use alloc::string::ToString; @@ -247,7 +248,7 @@ fn compute_rule_priority(cidr: &IpCidr, class: RouterClass) -> u16 { } pub fn setup( - pb: &PortBuilder, + pb: &mut PortBuilder, _cfg: &VpcCfg, ft_limit: core::num::NonZeroU32, ) -> Result<(), OpteError> { @@ -257,12 +258,13 @@ pub fn setup( // Outbound: If there is no matching route, then the packet should // make it no further. let actions = LayerActions { - actions: vec![], default_in: DefaultAction::Allow, default_out: DefaultAction::Deny, + default_out_stat_id: Some(ROUTER_NOROUTE), + ..Default::default() }; - let layer = Layer::new(ROUTER_LAYER_NAME, pb.name(), actions, ft_limit); + let layer = Layer::new(ROUTER_LAYER_NAME, pb, actions, ft_limit); pb.add_layer(layer, Pos::After(fw::FW_LAYER_NAME)) } diff --git a/lib/oxide-vpc/tests/integration_tests.rs b/lib/oxide-vpc/tests/integration_tests.rs index 86587318..082fd283 100644 --- a/lib/oxide-vpc/tests/integration_tests.rs +++ b/lib/oxide-vpc/tests/integration_tests.rs @@ -3134,6 +3134,8 @@ fn uft_lft_invalidation_out() { "incr:stats.port.out_uft_miss", ] ); + + print_port(&g1.port, &g1.vpc_map); } // Verify that changing rules causes invalidation of UFT and LFT diff --git a/xde/src/xde.rs b/xde/src/xde.rs index ae38a1b0..ec64ff3b 100644 --- a/xde/src/xde.rs +++ b/xde/src/xde.rs @@ -690,6 +690,27 @@ unsafe extern "C" fn xde_ioc_opte_cmd(karg: *mut c_void, mode: c_int) -> c_int { let resp = remove_cidr_hdlr(&mut env); hdlr_resp(&mut env, resp) } + + // TEMP + OpteCmd::DumpFlowStats => { + let resp = flow_stats_hdlr(&mut env); + hdlr_resp(&mut env, resp) + } + } +} + +#[unsafe(no_mangle)] +fn flow_stats_hdlr( + env: &mut IoctlEnvelope, +) -> Result { + let req: oxide_vpc::api::DumpUftReq = env.copy_in_req()?; + let devs = xde_devs().read(); + match devs.get_by_name(&req.port_name) { + Some(dev) => dev + .port + .dump_flow_stats() + .map(|data| oxide_vpc::api::DumpFlowStatsResp { data }), + None => Err(OpteError::PortNotFound(req.port_name)), } } @@ -2051,10 +2072,10 @@ fn new_port( // XXX some layers have no need for LFT, perhaps have two types // of Layer: one with, one without? - gateway::setup(&pb, &cfg, vpc_map, FT_LIMIT_ONE, dhcp_cfg)?; - router::setup(&pb, &cfg, FT_LIMIT_ONE)?; + gateway::setup(&mut pb, &cfg, vpc_map, FT_LIMIT_ONE, dhcp_cfg)?; + router::setup(&mut pb, &cfg, FT_LIMIT_ONE)?; nat::setup(&mut pb, &cfg, nat_ft_limit)?; - overlay::setup(&pb, &cfg, v2p, v2b, FT_LIMIT_ONE)?; + overlay::setup(&mut pb, &cfg, v2p, v2b, FT_LIMIT_ONE)?; // Set the overall unified flow and TCP flow table limits based on the total // configuration above, by taking the maximum of size of the individual