Skip to content

Indexing scheduling unbalanced for Kafka source #5747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
rdettai opened this issue Apr 15, 2025 · 8 comments · May be fixed by #5755
Open

Indexing scheduling unbalanced for Kafka source #5747

rdettai opened this issue Apr 15, 2025 · 8 comments · May be fixed by #5755
Labels
bug Something isn't working

Comments

@rdettai
Copy link
Collaborator

rdettai commented Apr 15, 2025

Describe the bug
When using a SourceToScheduleType::NonSharded (e.g Kafka), the current implementation of the the indexing scheduler seems systematically collocates all pipeline of a given source into the same indexer. For the Kafka source, this prevents distributing the indexing load of a given topic across indexers.

Note that this problem was already reported here. The proposed solution of setting a small cpu_capacity does not work because the scheduler scales the capacities to fit the workload before assigning the pipelines to the nodes.

Steps to reproduce (if applicable)
See test in comments.

Expected behavior
Pipelines with high throughputs should be more or less evenly distributed across indexers.

Possible solutions

  1. measure the actual load for each Kafka source (currently hardcoded to 4CPU) and use that for scheduling. This increases the risk of entering rebalancing ping pong between the control plane and the Kafka reblancing protocol.
  2. for each source, try to first limit the max number of pipelines that can be assigned to each node according to its unscaled original capacity.
  3. (variant of 2) re-introduce a source parameter like max_num_pipelines_per_indexer so that users can at least manually force the distribution of the load for given source/topics across nodes. This parameter would be pretty hard to configure properly (and hard to maintain for fluctuating workloads)

EDIT:
4) add a "num cpu per pipeline" parameter to the source, to make it possible to inform Quickwit that some Kafka topic do not require such a large amount of cpu.
5) (variant of 4) add an "average data rate" parameter to the source, which would have the same effect as "num cpu per pipeline" but easier for the user to configure (QW internally converts the bandwidth to CPUs)

Configuration:
Main (but same behavior in 0.8).

@rdettai rdettai added the bug Something isn't working label Apr 15, 2025
@rdettai
Copy link
Collaborator Author

rdettai commented Apr 15, 2025

Rational behind the current behavior:

  • Colocating pipelines for a source on the same node is important to improve merge quality
  • We favor a constant default for the estimated pipeline resource because we'd rather have a stable indexing plan than take the risk of entering rebalancing ping pong between the control plane and Kafka

@rdettai rdettai changed the title Indexing scheduling broken for Kafka source Indexing scheduling unbalanced for Kafka source Apr 16, 2025
@rdettai
Copy link
Collaborator Author

rdettai commented Apr 16, 2025

Here is an interesting behavior of the scheduler.

Let's assign one source with four pipelines to a cluster with 4 nodes (8CPU each):

let mut problem: SchedulingProblem = SchedulingProblem::with_indexer_cpu_capacities(vec![
    mcpu(8000),
    mcpu(8000),
    mcpu(8000),
    mcpu(8000),
]);
problem.add_source(
    4,
    NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()).unwrap(),
);
let old_solution = problem.new_solution();
let solution = solve(problem, old_solution);
for assignement in &solution.indexer_assignments {
    println!("{:?}", assignement);
}
IndexerAssignment { indexer_ord: 0, num_shards_per_source: {0: 1} }
IndexerAssignment { indexer_ord: 1, num_shards_per_source: {0: 1} }
IndexerAssignment { indexer_ord: 2, num_shards_per_source: {0: 1} }
IndexerAssignment { indexer_ord: 3, num_shards_per_source: {0: 1} }

This shows that when the capacity is not exceeded, pipelines are evenly balanced across nodes. We should have colocated at least 2 pipelines per indexer according to the principle stated above:

Colocating pipelines for a source on the same node is important to improve merge quality

Now if we increase the number of sources to 4 (same cluster capacity):

let mut problem: SchedulingProblem = SchedulingProblem::with_indexer_cpu_capacities(vec![
    mcpu(8000),
    mcpu(8000),
    mcpu(8000),
    mcpu(8000),
]);
for _ in 0..4 {
    problem.add_source(
        4,
        NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()).unwrap(),
    );
}
let old_solution = problem.new_solution();
let solution = solve(problem, old_solution);
for assignement in &solution.indexer_assignments {
    println!("{:?}", assignement);
}
IndexerAssignment { indexer_ord: 0, num_shards_per_source: {3: 4} }
IndexerAssignment { indexer_ord: 1, num_shards_per_source: {2: 4} }
IndexerAssignment { indexer_ord: 2, num_shards_per_source: {1: 4} }
IndexerAssignment { indexer_ord: 3, num_shards_per_source: {0: 4} }

Each source gets all its partitions assigned to a single node.

@hardboiled
Copy link

Hey @rdettai, would there be a benefit in making some sort of affinity config for pipelines, so if you know that some topics will be pushing a lot more data than others, you could instruct quickwit indexers to share the load more; whereas, if the topic ingest variance is pretty normalized, then the dev would just leave the default config?

@rdettai
Copy link
Collaborator Author

rdettai commented Apr 23, 2025

Do you have a specific UX in mind? One would be to specify the expected bandwidth for the topic. But I feel it would be "stressful" for users who know that their workloads vary a lot through time.

The benefit of this PR is that it doesn't add an extra configuration, and it does effectively allow to spread the load. For instance if you have 8CPU nodes, setting a source to num_pipeline=8 would spread it to up to 4 nodes (a pipeline is 4CPU so the load is balanced as soon as you have more than 2).

@fulmicoton-dd
Copy link
Collaborator

fulmicoton-dd commented Apr 23, 2025

I think the ticket does not really finish describing the problem.
The reason we are collocating pipelines form a same index on the same indexer is to get better pruning at search time.
This is not a bug: this is a feature.

The cpu capacity parameter on indexers purpose is here to act as a throttle we accept some imbalance of loads precisely, when it helps improving this pruning thing. Setting it to 0 has for effect to favor balance.

Unfortunately this balance is only measure in term of pipeline. Quickwit has a mechanism to assess the throughput of a pipeline but we never use it for Kafka. We judge that, even more than ingest v2, Kafka is more subject to having a playing catchup mode vs indexing horizon.

So the issue with collocating pipelines from a same index on the same indexer is that the load of kafka pipelines can be very inequal, and leads to an imbalance workload on the different nodes.
This problem is not caused by that behavior, it is emphasized by it.

I advise against 1. for the reason already discussed.

2 is only partially solving the problem. (e.g. 4 nodes, 3 indexes with 1 pipeline indexing at 20mb/s and 12 indexes with a throughput of 100KB/s).

For exhaustivity, right now I believe we do not know how to run two merge concurrently, so that a merge pipeline could rapidly end up being hte bottleneck if we had ~3 indexing pipelines for a same index on the same node.
I cannot remember if the control plane already has logic preventing this or not.

I would add:

Solution 4: add a num cpu per pipeline parameter, to make it possible to inform quickwit that some kafka topic do not require such a large amount of cpu.

@daniele-br
Copy link

daniele-br commented Apr 28, 2025

Solution 4: add a num cpu per pipeline parameter, to make it possible to inform quickwit that some kafka topic do not require such a large amount of cpu.

OK, but just to be clear, we're in the opposite situation. We have 1-2 topics that have 70% of the data and a lower amount of data on the other 20 topics. Not sure what implications that has.

@fulmicoton-dd
Copy link
Collaborator

@daniele-br This was my understanding. You would then have to mark all the topics with a lower amount of data as having a lower amount of data. I assume your comment was about reporting the oddity of having to "special case" the "majority".

@rdettai
Copy link
Collaborator Author

rdettai commented Apr 29, 2025

2 is only partially solving the problem. (e.g. 4 nodes, 3 indexes with 1 pipeline indexing at 20mb/s and 12 indexes with a throughput of 100KB/s).

@fulmicoton-dd Yes, it does not address this specific workload.

  • Don't you think that adding this extra logic is still worth it? Even if we add an extra parameter (either (4) or (5)), it would still be useful to improve the default behavior when the parameter is not specified by the user. I agree that it makes the scheduling more complex, which is something we might want to avoid, but besides that I think it is a net improvement.
  • If we add an extra parameter to hint the CPU usage of each pipeline, should we have expected_cpus_per_pipeline: CpuCapacity or expected_throughput_per_pipeline: ByteSize? Exposing the config in terms of CPU implies that the user is more aware of the internals of how pipelines use CPUs. It makes sense if we want to force the user to dig deeper before tweaking this. Exposing the config as expected bandwidth is closer to the user's perspective so easier for him to understand.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants