Skip to content

Balance pipelines when unscaled capacity of individual nodes is exceeded #5755

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,14 @@ fn attempt_place_unassigned_shards(
) -> Result<SchedulingSolution, NotEnoughCapacity> {
let mut solution = partial_solution.clone();
for source in unassigned_shards {
let indexers_with_most_available_capacity =
compute_indexer_available_capacity(problem, &solution)
.sorted_by_key(|(indexer_ord, capacity)| Reverse((*capacity, *indexer_ord)));
let mut indexers_with_most_available_capacity =
compute_indexer_available_capacity(problem, &solution).collect_vec();
indexers_with_most_available_capacity
.sort_by_key(|(indexer_ord, capacity)| Reverse((*capacity, *indexer_ord)));
place_unassigned_shards_single_source(
source,
indexers_with_most_available_capacity,
problem.unscaled_indexer_cpu_capacities(),
&mut solution,
)?;
}
Expand All @@ -241,7 +243,7 @@ fn place_unassigned_shards_with_affinity(
for source in &unassigned_shards {
// List of indexer with a non-null affinity and some available capacity, sorted by
// (affinity, available capacity) in that order.
let indexers_with_affinity_and_available_capacity = source
let indexers_with_available_capacity = source
.affinities
.iter()
.filter(|&(_, &affinity)| affinity != 0u32)
Expand All @@ -254,10 +256,12 @@ fn place_unassigned_shards_with_affinity(
.sorted_by_key(|(indexer_ord, affinity, capacity)| {
Reverse((*affinity, *capacity, *indexer_ord))
})
.map(|(indexer_ord, _, capacity)| (indexer_ord, capacity));
.map(|(indexer_ord, _, capacity)| (indexer_ord, capacity))
.collect_vec();
let _ = place_unassigned_shards_single_source(
source,
indexers_with_affinity_and_available_capacity,
indexers_with_available_capacity,
problem.unscaled_indexer_cpu_capacities(),
solution,
);
}
Expand Down Expand Up @@ -346,26 +350,73 @@ struct NotEnoughCapacity;
/// amongst the node with their given node capacity.
fn place_unassigned_shards_single_source(
source: &Source,
mut indexer_with_capacities: impl Iterator<Item = (IndexerOrd, CpuCapacity)>,
mut indexer_with_capacities: Vec<(IndexerOrd, CpuCapacity)>,
unscaled_capacities: &[CpuCapacity],
solution: &mut SchedulingSolution,
) -> Result<(), NotEnoughCapacity> {
let mut num_shards = source.num_shards;
while num_shards > 0 {
let Some((indexer_ord, available_capacity)) = indexer_with_capacities.next() else {
return Err(NotEnoughCapacity);
};
let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard;
let num_shards_to_place = num_placable_shards.min(num_shards);
let mut previous_num_shards = u32::MAX;
while previous_num_shards > num_shards {
previous_num_shards = num_shards;
let indexer_with_capacities_iter = indexer_with_capacities
.iter_mut()
.map(|(indexer_ord, available_capacity)| (*indexer_ord, available_capacity));
place_unassigned_shards_single_source_iteration(
source,
&mut num_shards,
indexer_with_capacities_iter,
unscaled_capacities,
solution,
);
if num_shards == 0 {
// All shards have been placed.
return Ok(());
}
}
// Last placement iteration didn't make progress,
// we won't be able to place the remaining shards
Err(NotEnoughCapacity)
}

/// Places as many shards as possible to indexers while respecting both the the
/// remaining scaled node capacities and the original unscaled node capacities.
fn place_unassigned_shards_single_source_iteration<'a>(
source: &Source,
remaining_shards_to_place: &mut u32,
indexer_with_capacities: impl Iterator<Item = (IndexerOrd, &'a mut CpuCapacity)>,
unscaled_capacities: &[CpuCapacity],
solution: &mut SchedulingSolution,
) {
for (indexer_ord, available_capacity) in indexer_with_capacities {
if *remaining_shards_to_place == 0 {
return;
}
let num_placable_shards_into_scaled_capacity =
available_capacity.cpu_millis() / source.load_per_shard;

// We limit each node's shard allocation per iteration to what fits in
// its original capacity. This introduces a behavior that distributes
// shards more evenly accross nodes when the system capacity is
// over-subscribed. If the shard's load doesn't fit into the original
// capacity, we still allow one shard to be placed.
let num_placable_shards_into_original_capacity =
(unscaled_capacities[indexer_ord].cpu_millis() / source.load_per_shard).max(1);

let num_shards_to_place = num_placable_shards_into_scaled_capacity
.min(num_placable_shards_into_original_capacity)
.min(*remaining_shards_to_place);

// Update the solution, the shard load, and the number of shards to place.
if num_shards_to_place == 0u32 {
// No need to fill indexer_assignments with empty assignments.
continue;
}
solution.indexer_assignments[indexer_ord]
.add_shards(source.source_ord, num_shards_to_place);
num_shards -= num_shards_to_place;
*remaining_shards_to_place -= num_shards_to_place;
*available_capacity = *available_capacity
- CpuCapacity::from_cpu_millis(num_shards_to_place * source.load_per_shard.get());
}
Ok(())
}

/// Compute the sources/shards that have not been assigned to any indexer yet.
Expand Down Expand Up @@ -419,7 +470,7 @@ mod tests {
use std::num::NonZeroU32;

use proptest::prelude::*;
use quickwit_proto::indexing::mcpu;
use quickwit_proto::indexing::{PIPELINE_FULL_CAPACITY, mcpu};

use super::*;

Expand Down Expand Up @@ -783,4 +834,28 @@ mod tests {
solve(problem, solution);
}
}

#[test]
fn test_oversubscribing_sources_get_balanced() {
let mut problem: SchedulingProblem = SchedulingProblem::with_indexer_cpu_capacities(vec![
mcpu(8000),
mcpu(8000),
mcpu(8000),
mcpu(8000),
]);
for _ in 0..12 {
problem.add_source(
4,
NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()).unwrap(),
);
}

let old_solution = problem.new_solution();
let solution = solve(problem, old_solution);
for assignement in &solution.indexer_assignments {
for &num_shards in assignement.num_shards_per_source.values() {
assert_eq!(num_shards, 2);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl Source {
pub struct SchedulingProblem {
sources: Vec<Source>,
indexer_cpu_capacities: Vec<CpuCapacity>,
unscaled_cpu_capacities: Vec<CpuCapacity>,
}

impl SchedulingProblem {
Expand All @@ -97,6 +98,7 @@ impl SchedulingProblem {
// TODO assert for affinity.
SchedulingProblem {
sources: Vec::new(),
unscaled_cpu_capacities: indexer_cpu_capacities.clone(),
indexer_cpu_capacities,
}
}
Expand All @@ -109,6 +111,11 @@ impl SchedulingProblem {
self.indexer_cpu_capacities[indexer_ord]
}

/// Gets the original cpu capacities before scaling.
pub fn unscaled_indexer_cpu_capacities(&self) -> &[CpuCapacity] {
&self.unscaled_cpu_capacities
}

/// Scales the cpu capacity by the given scaling factor.
///
/// Resulting cpu capacity are ceiled to the next integer millicpus value.
Expand Down