Skip to content

layered cpuset support #1747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scheds/rust/scx_layered/examples/cpuset.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"matches": [
[
{
"CommPrefix": "geekbench"
"CgroupPrefix": "system.slice/docker"
}
]
],
Expand Down
1 change: 1 addition & 0 deletions scheds/rust/scx_layered/src/bpf/intf.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum consts {
MAX_TASKS = 131072,
MAX_PATH = 4096,
MAX_NUMA_NODES = 64,
MAX_CPUSETS = 64,
MAX_LLCS = 64,
MAX_COMM = 16,
MAX_LAYER_MATCH_ORS = 32,
Expand Down
102 changes: 95 additions & 7 deletions scheds/rust/scx_layered/src/bpf/main.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ const volatile u64 numa_cpumasks[MAX_NUMA_NODES][MAX_CPUS / 64];
const volatile u32 llc_numa_id_map[MAX_LLCS];
const volatile u32 cpu_llc_id_map[MAX_CPUS];
const volatile u32 nr_layers = 1;
const volatile u32 nr_cpusets = 1;
const volatile u32 nr_nodes = 32; /* !0 for veristat, set during init */
const volatile u32 nr_llcs = 32; /* !0 for veristat, set during init */
const volatile bool smt_enabled = true;
const volatile bool enable_cpuset = true;
const volatile bool has_little_cores = true;
const volatile bool xnuma_preemption = false;
const volatile s32 __sibling_cpu[MAX_CPUS];
Expand All @@ -53,6 +55,7 @@ const volatile u64 lo_fb_wait_ns = 5000000; /* !0 for veristat */
const volatile u32 lo_fb_share_ppk = 128; /* !0 for veristat */
const volatile bool percpu_kthread_preempt = true;
volatile u64 layer_refresh_seq_avgruntime;
const volatile u64 cpuset_fakemasks[MAX_CPUSETS][MAX_CPUS / 64];

/* Flag to enable or disable antistall feature */
const volatile bool enable_antistall = true;
Expand Down Expand Up @@ -80,6 +83,20 @@ u32 layered_root_tgid = 0;
u32 empty_layer_ids[MAX_LAYERS];
u32 nr_empty_layer_ids;


struct cpumask_wrapper {
struct bpf_cpumask __kptr *mask;
};

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(max_entries, MAX_CPUSETS);
__type(key, u32);
__type(value, struct cpumask_wrapper);
} cpuset_cpumask SEC(".maps");



UEI_DEFINE(uei);

struct task_hint {
Expand Down Expand Up @@ -502,6 +519,7 @@ struct task_ctx {
struct bpf_cpumask __kptr *layered_unprotected_mask;
bool all_cpus_allowed;
bool cpus_node_aligned;
bool cpus_cpuset_aligned;
u64 runnable_at;
u64 running_at;
u64 runtime_avg;
Expand Down Expand Up @@ -1381,9 +1399,11 @@ void BPF_STRUCT_OPS(layered_enqueue, struct task_struct *p, u64 enq_flags)
* without making the whole scheduler node aware and should only be used
* with open layers on non-saturated machines to avoid possible stalls.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the comment to explain cpus_cpuset_aligned.

if ((!taskc->all_cpus_allowed &&
!(layer->allow_node_aligned && taskc->cpus_node_aligned)) ||
!layer->nr_cpus) {
if ((!taskc->all_cpus_allowed &&
!((layer->allow_node_aligned && taskc->cpus_node_aligned) ||
(enable_cpuset && taskc->cpus_cpuset_aligned)))
|| !layer->nr_cpus) {

taskc->dsq_id = task_cpuc->lo_fb_dsq_id;
/*
* Start a new lo fallback queued region if the DSQ is empty.
Expand Down Expand Up @@ -2641,7 +2661,7 @@ void BPF_STRUCT_OPS(layered_set_weight, struct task_struct *p, u32 weight)
static void refresh_cpus_flags(struct task_ctx *taskc,
const struct cpumask *cpumask)
{
u32 node_id;
u32 node_id, cpuset_id;

if (!all_cpumask) {
scx_bpf_error("NULL all_cpumask");
Expand All @@ -2658,7 +2678,7 @@ static void refresh_cpus_flags(struct task_ctx *taskc,

if (!(nodec = lookup_node_ctx(node_id)) ||
!(node_cpumask = cast_mask(nodec->cpumask)))
return;
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is scx_bpf_error() condition. There's no point in continuing.


/* not llc aligned if partially overlaps */
if (bpf_cpumask_intersects(node_cpumask, cpumask) &&
Expand All @@ -2667,6 +2687,21 @@ static void refresh_cpus_flags(struct task_ctx *taskc,
break;
}
}
if (enable_cpuset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a blank line above?

bpf_for(cpuset_id, 0, nr_cpusets) {
struct cpumask_wrapper* wrapper;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blank line.

wrapper = bpf_map_lookup_elem(&cpuset_cpumask, &cpuset_id);
if (!wrapper || !wrapper->mask) {
scx_bpf_error("error marking tasks as cpuset aligned");
return;
}
if (bpf_cpumask_equal(cast_mask(wrapper->mask), cpumask)) {
taskc->cpus_cpuset_aligned = true;
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break; here so that it's consistent with the node aligned block and the function can be extended in the future? Note that this would require moving the false setting. BTW, why not use the same partial overlapping test used by node alignment test instead of equality test? Is that not sufficient for forward progress guarantee? If not, it'd probably be worthwhile to explain why.

}
}
taskc->cpus_cpuset_aligned = false;
}
}

static int init_cached_cpus(struct cached_cpus *ccpus)
Expand Down Expand Up @@ -3334,8 +3369,10 @@ static s32 init_cpu(s32 cpu, int *nr_online_cpus,

s32 BPF_STRUCT_OPS_SLEEPABLE(layered_init)
{
struct bpf_cpumask *cpumask, *tmp_big_cpumask, *tmp_unprotected_cpumask;
int i, nr_online_cpus, ret;
struct bpf_cpumask *cpumask, *tmp_big_cpumask, *tmp_unprotected_cpumask,
*tmp_cpuset_cpumask, *tmp_swap_dst_cpumask;
int i, j, cpu, nr_online_cpus, ret;
struct cpumask_wrapper* cpumask_wrapper;

cpumask = bpf_cpumask_create();
if (!cpumask)
Expand Down Expand Up @@ -3377,6 +3414,57 @@ s32 BPF_STRUCT_OPS_SLEEPABLE(layered_init)
if (tmp_unprotected_cpumask)
bpf_cpumask_release(tmp_unprotected_cpumask);



Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two many blank lines.

if (enable_cpuset) {
bpf_for(i, 0, nr_cpusets) {
cpumask = bpf_cpumask_create();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also customary to not have blank line between variable setting and test on it. In scheduler BPF code, we've been doing if ((var = expression)) a lot, so maybe adopt the style?

if (!cpumask)
return -ENOMEM;

bpf_for(j, 0, MAX_CPUS/64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add comments explaining what each block is doing?

bpf_for(cpu, 0, 64) {
if (i < 0 || i >= MAX_CPUSETS) {
bpf_cpumask_release(cpumask);
return -1;
}
if (cpuset_fakemasks[i][j] & (1LLU << cpu)) {
bpf_cpumask_set_cpu((MAX_CPUS/64 - j - 1) * 64 + cpu, cpumask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So AFAICT the cpumask should fit all node cpumasks? This looks like it works because we clobber vmlinux.h to hold 128 64-bit numbers, which is fine to bypass verifier behavior but I'm not sure is great to depend on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we'll rip out all unprintable trusted ptr cpumasks w/ printable arena cpumasks eventually I think, maybe. I really wish these were printable lol...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dump_layer_cpumask() prints the trusted cpumasks. It shouldn't be too difficult to separate out a generic helper from it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop is unnecessarily convoluted. Just iterate MAX_CPUS and index cpuset_fakemasks accordingly?

}

}
}

// pay init cost once for faster lookups later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need per-cpu copies? Can't this be a part of cpu_ctx? Note that percpu maps have a limited number of hot-caches per task and has a performance cliff beyond.

bpf_for(cpu, 0, nr_possible_cpus) {
cpumask_wrapper = bpf_map_lookup_percpu_elem(&cpuset_cpumask, &i, cpu);
tmp_cpuset_cpumask = bpf_cpumask_create();

if (!cpumask || !tmp_cpuset_cpumask || !cpumask_wrapper) {
if (cpumask)
bpf_cpumask_release(cpumask);
if (tmp_cpuset_cpumask)
bpf_cpumask_release(tmp_cpuset_cpumask);
scx_bpf_error("cpumask is null");
return -1;
}

bpf_cpumask_copy(tmp_cpuset_cpumask, cast_mask(cpumask));

tmp_swap_dst_cpumask = bpf_kptr_xchg(&cpumask_wrapper->mask, tmp_cpuset_cpumask);

if (tmp_swap_dst_cpumask)
bpf_cpumask_release(tmp_swap_dst_cpumask);

}

if (cpumask)
bpf_cpumask_release(cpumask);

}
}

bpf_for(i, 0, nr_nodes) {
ret = create_node(i);
if (ret)
Expand Down
6 changes: 3 additions & 3 deletions scheds/rust/scx_layered/src/layer_core_growth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ use std::collections::BTreeSet;

#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct CpuSet {
cpus: BTreeSet<usize>,
cores: BTreeSet<usize>,
pub cpus: BTreeSet<usize>,
pub cores: BTreeSet<usize>,
}

fn parse_cpu_ranges(s: &str) -> Result<BTreeSet<usize>> {
Expand Down Expand Up @@ -126,7 +126,7 @@ fn collect_cpuset_effective() -> Result<BTreeSet<BTreeSet<usize>>> {
}

// return cpuset layout.
fn get_cpusets(topo: &Topology) -> Result<BTreeSet<CpuSet>> {
pub fn get_cpusets(topo: &Topology) -> Result<BTreeSet<CpuSet>> {
let mut cpusets: BTreeSet<CpuSet> = BTreeSet::new();
let cpuset_cpus = collect_cpuset_effective()?;
for x in cpuset_cpus {
Expand Down
4 changes: 2 additions & 2 deletions scheds/rust/scx_layered/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
mod config;
mod layer_core_growth;
pub mod layer_core_growth;

pub mod bpf_intf;

Expand Down Expand Up @@ -189,7 +189,7 @@ impl CpuPool {
cpus
}

fn get_core_topological_id(&self, core: &Core) -> usize {
pub fn get_core_topological_id(&self, core: &Core) -> usize {
*self
.core_topology_to_id
.get(&(core.node_id, core.llc_id, core.id))
Expand Down
39 changes: 39 additions & 0 deletions scheds/rust/scx_layered/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use anyhow::Result;
pub use bpf_skel::*;
use clap::Parser;
use crossbeam::channel::RecvTimeoutError;
use layer_core_growth::get_cpusets;
use lazy_static::lazy_static;
use libbpf_rs::MapCore as _;
use libbpf_rs::OpenObject;
Expand Down Expand Up @@ -67,6 +68,7 @@ const MIN_LAYER_WEIGHT: u32 = bpf_intf::consts_MIN_LAYER_WEIGHT;
const MAX_LAYER_MATCH_ORS: usize = bpf_intf::consts_MAX_LAYER_MATCH_ORS as usize;
const MAX_LAYER_NAME: usize = bpf_intf::consts_MAX_LAYER_NAME as usize;
const MAX_LAYERS: usize = bpf_intf::consts_MAX_LAYERS as usize;
const MAX_CPUS: usize = bpf_intf::consts_MAX_CPUS as usize;
const DEFAULT_LAYER_WEIGHT: u32 = bpf_intf::consts_DEFAULT_LAYER_WEIGHT;
const USAGE_HALF_LIFE: u32 = bpf_intf::consts_USAGE_HALF_LIFE;
const USAGE_HALF_LIFE_F64: f64 = USAGE_HALF_LIFE as f64 / 1_000_000_000.0;
Expand Down Expand Up @@ -589,6 +591,10 @@ struct Opts {
#[clap(long, default_value = "false")]
disable_antistall: bool,

/// Enable cpuset support
#[clap(long, default_value = "false")]
enable_cpuset: bool,

/// Maximum task runnable_at delay (in seconds) before antistall turns on
#[clap(long, default_value = "3")]
antistall_sec: u64,
Expand Down Expand Up @@ -1424,6 +1430,34 @@ impl<'a> Scheduler<'a> {
Ok(())
}

fn init_cpusets(skel: &mut OpenBpfSkel, topo: &Topology) -> Result<()> {
let cpusets = get_cpusets(topo)?;
for (i, cpuset) in cpusets.iter().enumerate() {
debug!("a cpuset is: {:#?}", cpuset.clone());
let mut cpumask_bitvec: [u64; MAX_CPUS / 64] = [0; MAX_CPUS / 64];
for chunk_idx in 0..cpumask_bitvec.len() {
let mut cpumask_chunk = 0;
for cpu in 0..64 {
if cpuset.cpus.contains(&(chunk_idx * 64 + cpu)) {
cpumask_chunk |= 1 << cpu;
}
}
cpumask_bitvec[cpumask_bitvec.len() - chunk_idx - 1] = cpumask_chunk;
}
let hex_string = cpumask_bitvec
.clone()
.iter()
.map(|b| format!("{:02x}", b))
.collect::<String>();
debug!("a cpuset cpumask is: {}", hex_string);

let cpuset_cpumask_slice = &mut skel.maps.rodata_data.cpuset_fakemasks[i];
cpuset_cpumask_slice.copy_from_slice(&cpumask_bitvec);
}
skel.maps.rodata_data.nr_cpusets = cpusets.len() as u32;
Ok(())
}

fn init_nodes(skel: &mut OpenBpfSkel, _opts: &Opts, topo: &Topology) {
skel.maps.rodata_data.nr_nodes = topo.nodes.len() as u32;
skel.maps.rodata_data.nr_llcs = 0;
Expand Down Expand Up @@ -1876,6 +1910,7 @@ impl<'a> Scheduler<'a> {
skel.maps.rodata_data.lo_fb_wait_ns = opts.lo_fb_wait_us * 1000;
skel.maps.rodata_data.lo_fb_share_ppk = ((opts.lo_fb_share * 1024.0) as u32).clamp(1, 1024);
skel.maps.rodata_data.enable_antistall = !opts.disable_antistall;
skel.maps.rodata_data.enable_cpuset = opts.enable_cpuset;
skel.maps.rodata_data.enable_gpu_support = opts.enable_gpu_support;

for (cpu, sib) in topo.sibling_cpus().iter().enumerate() {
Expand Down Expand Up @@ -1944,6 +1979,10 @@ impl<'a> Scheduler<'a> {
Self::init_layers(&mut skel, &layer_specs, &topo)?;
Self::init_nodes(&mut skel, opts, &topo);

if opts.enable_cpuset {
Self::init_cpusets(&mut skel, &topo)?;
}

// We set the pin path before loading the skeleton. This will ensure
// libbpf creates and pins the map, or reuses the pinned map fd for us,
// so that we can keep reusing the older map already pinned on scheduler
Expand Down