-
Notifications
You must be signed in to change notification settings - Fork 0
Speed up streamed-proto query output by distributing work to multiple threads #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
6bd312b
41fbacf
0c9a1d2
913d4a3
9843a5e
9a0efa0
1852be0
89e8b3b
5fc8b13
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -13,18 +13,26 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// limitations under the License. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
package com.google.devtools.build.lib.query2.query.output; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.common.collect.Iterables; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.devtools.build.lib.packages.LabelPrinter; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.devtools.build.lib.packages.Target; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.devtools.build.lib.query2.proto.proto2api.Build; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.protobuf.CodedOutputStream; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.io.IOException; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.io.OutputStream; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.List; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.*; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import java.util.concurrent.atomic.AtomicBoolean; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* An output formatter that outputs a protocol buffer representation of a query result and outputs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* the proto bytes to the output print stream. By taking the bytes and calling {@code mergeFrom()} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* on a {@code Build.QueryResult} object the full result can be reconstructed. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public class StreamedProtoOutputFormatter extends ProtoOutputFormatter { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public String getName() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return "streamed_proto"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -34,13 +42,107 @@ public String getName() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public OutputFormatterCallback<Target> createPostFactoStreamCallback( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return new OutputFormatterCallback<Target>() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private static final int TARGETS_PER_CHUNK = 500; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private final LabelPrinter ourLabelPrinter = labelPrinter; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void processOutput(Iterable<Target> partialResult) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throws IOException, InterruptedException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (Target target : partialResult) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ForkJoinTask<?> writeAllTargetsFuture; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try (ForkJoinPool executor = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
new ForkJoinPool( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Runtime.getRuntime().availableProcessors(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ForkJoinPool.defaultForkJoinWorkerThreadFactory, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
null, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// we use asyncMode to ensure the queue is processed FIFO, which maximizes | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// throughput | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
true)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
var stillAddingTargetsToQueue = new AtomicBoolean(true); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
writeAllTargetsFuture = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
executor.submit( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
() -> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Future<List<byte[]>> targets = targetQueue.take(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (byte[] target : targets.get()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
out.write(target); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (InterruptedException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new WrappedInterruptedException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (IOException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new WrappedIOException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (ExecutionException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// TODO: figure out what might be in here and propagate | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new RuntimeException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+79
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This Also, consider logging the exception with sufficient context to aid debugging. // TODO: figure out what might be in here and propagate
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw new WrappedIOException((IOException) cause);
} else if (cause instanceof InterruptedException) {
throw new WrappedInterruptedException((InterruptedException) cause);
} else {
throw new RuntimeException("Error during target processing", cause);
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets))); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} finally { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
stillAddingTargetsToQueue.set(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+88
to
+90
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After setting } finally {
stillAddingTargetsToQueue.set(false);
executor.shutdown(); // Initiate shutdown
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // Wait for tasks to complete
System.err.println("ForkJoinPool did not terminate in the specified timeout.");
// Optionally, log the state of the tasks that are still running.
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
writeAllTargetsFuture.get(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (ExecutionException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// TODO: propagate | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new RuntimeException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+94
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the previous } catch (ExecutionException e) {
// TODO: propagate
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw new WrappedIOException((IOException) cause);
} else if (cause instanceof InterruptedException) {
throw new WrappedInterruptedException((InterruptedException) cause);
} else {
throw new RuntimeException("Error during target processing", cause);
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private List<byte[]> writeTargetsDelimitedToByteArrays(List<Target> targets) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return targets.stream().map(target -> writeDelimited(toProto(target))).toList(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private Build.Target toProto(Target target) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return toTargetProtoBuffer(target, ourLabelPrinter); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (InterruptedException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new WrappedInterruptedException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private static byte[] writeDelimited(Build.Target targetProtoBuffer) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
var serializedSize = targetProtoBuffer.getSerializedSize(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
var output = new byte[headerSize + serializedSize]; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
targetProtoBuffer.writeTo(codedOut); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
codedOut.flush(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return output; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+114
to
+121
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Length-delimited framing is incorrect – header bytes are never written
- var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
- var output = new byte[headerSize + serializedSize];
- var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize);
- targetProtoBuffer.writeTo(codedOut);
+ int headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
+ byte[] output = new byte[headerSize + serializedSize];
+
+ // 1. write the var-int length prefix
+ CodedOutputStream headerOut = CodedOutputStream.newInstance(output, 0, headerSize);
+ headerOut.writeUInt32NoTag(serializedSize);
+ headerOut.flush();
+
+ // 2. write the message bytes immediately after the prefix
+ CodedOutputStream bodyOut =
+ CodedOutputStream.newInstance(output, headerSize, serializedSize);
+ targetProtoBuffer.writeTo(bodyOut);
+ bodyOut.flush(); Without this fix every emitted target is malformed, so downstream tools will silently break. 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} catch (IOException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
throw new WrappedIOException(e); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private static class WrappedIOException extends RuntimeException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private WrappedIOException(IOException cause) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
super(cause); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public IOException getCause() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return (IOException) super.getCause(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+132
to
+134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider whether wrapping the @Override
public IOException getCause() {
return cause;
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private static class WrappedInterruptedException extends RuntimeException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private WrappedInterruptedException(InterruptedException cause) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
super(cause); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public InterruptedException getCause() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return (InterruptedException) super.getCause(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+144
to
+146
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to @Override
public InterruptedException getCause() {
return cause;
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
ForkJoinPool worker is blocking on
take()
/get()
– high risk of thread starvationThe consumer task runs inside the same
ForkJoinPool
that processes the producer tasks yet:targetQueue.take()
andFuture.get()
),processOutput
) to block onput()
.Because
ForkJoinPool
counts blocked workers towardparallelism
, the pool can dead-lock or under-utilise CPUs when many long-running producer tasks occupy the limited workers while the single consumer waits, or vice-versa.Recommended approaches:
Executors.newSingleThreadExecutor
) outside the FJP, orForkJoinPool.managedBlock
, orExecutorService
that tolerates blocking.This will eliminate the starvation risk and make behaviour more predictable.