Skip to content

Commit 17654b7

Browse files
aamcommit-bot@chromium.org
authored andcommitted
[vm/isolates] Introduce sendAndExit.
sendAndExit allows for fast data passing from worker isolate back to parent. ``` | linux x64 | spawnIsolate | sendAndExit | |us per iter | over sync | over send | +------------+--------------+-------------+ IsolateJson.Decode50KBx1(RunTime): 43,175.000 339.83% IsolateJson.SendAndExit_Decode50KBx1(RunTime): 22,070.000 124.83% -48.88% IsolateJson.SyncDecode50KBx1(RunTime): 9,816.284 IsolateJson.Decode50KBx4(RunTime): 77,630.000 104.56% IsolateJson.SendAndExit_Decode50KBx4(RunTime): 46,307.000 22.02% -40.35% IsolateJson.SyncDecode50KBx4(RunTime): 37,949.528 IsolateJson.Decode100KBx1(RunTime): 71,035.000 270.42% IsolateJson.SendAndExit_Decode100KBx1(RunTime): 43,056.000 124.52% -39.39% IsolateJson.SyncDecode100KBx1(RunTime): 19,176.733 IsolateJson.Decode100KBx4(RunTime): 120,915.000 54.66% IsolateJson.SendAndExit_Decode100KBx4(RunTime): 67,101.000 -14.17% -44.51% IsolateJson.SyncDecode100KBx4(RunTime): 78,179.731 IsolateJson.Decode250KBx1(RunTime): 173,574.000 202.52% IsolateJson.SendAndExit_Decode250KBx1(RunTime): 103,334.000 80.10% -40.47% IsolateJson.SyncDecode250KBx1(RunTime): 57,375.314 IsolateJson.Decode250KBx4(RunTime): 292,118.000 20.30% IsolateJson.SendAndExit_Decode250KBx4(RunTime): 168,444.000 -30.63% -42.34% IsolateJson.SyncDecode250KBx4(RunTime): 242,831.000 IsolateJson.Decode1MBx1(RunTime): 631,578.000 166.34% IsolateJson.SendAndExit_Decode1MBx1(RunTime): 371,127.000 56.50% -41.24% IsolateJson.SyncDecode1MBx1(RunTime): 237,135.778 IsolateJson.Decode1MBx4(RunTime): 1,322,789.000 36.16% IsolateJson.SendAndExit_Decode1MBx4(RunTime): 657,179.000 -32.35% -50.32% IsolateJson.SyncDecode1MBx4(RunTime): 971,473.333 ``` Bug: dart-lang#37835 Bug: dart-lang#36097 Change-Id: I386641e1431ed9f2e34fac36f562607a666ee4a8 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/142823 Commit-Queue: Alexander Aprelev <[email protected]> Reviewed-by: Martin Kustermann <[email protected]> Reviewed-by: Ryan Macnak <[email protected]>
1 parent 23cbe39 commit 17654b7

File tree

19 files changed

+562
-31
lines changed

19 files changed

+562
-31
lines changed

benchmarks/IsolateJson/dart/IsolateJson.dart

+43-13
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,32 @@ import 'dart:typed_data';
1111
import 'package:benchmark_harness/benchmark_harness.dart' show BenchmarkBase;
1212
import 'package:meta/meta.dart';
1313

14+
import 'runtime/tests/vm/dart/export_sendAndExit_helper.dart' show sendAndExit;
15+
1416
class JsonDecodingBenchmark {
1517
JsonDecodingBenchmark(this.name,
16-
{@required this.sample, @required this.numTasks});
18+
{@required this.sample,
19+
@required this.numTasks,
20+
@required this.useSendAndExit});
1721

1822
Future<void> report() async {
1923
final stopwatch = Stopwatch()..start();
20-
final decodedFutures = <Future>[];
21-
for (int i = 0; i < numTasks; i++) {
22-
decodedFutures.add(decodeJson(sample));
24+
// Benchmark harness counts 10 iterations as one.
25+
for (int i = 0; i < 10; i++) {
26+
final decodedFutures = <Future>[];
27+
for (int i = 0; i < numTasks; i++) {
28+
decodedFutures.add(decodeJson(useSendAndExit, sample));
29+
}
30+
await Future.wait(decodedFutures);
2331
}
24-
await Future.wait(decodedFutures);
2532

2633
print("$name(RunTime): ${stopwatch.elapsedMicroseconds} us.");
2734
}
2835

2936
final String name;
3037
final Uint8List sample;
3138
final int numTasks;
39+
final bool useSendAndExit;
3240
}
3341

3442
Uint8List createSampleJson(final size) {
@@ -41,27 +49,42 @@ Uint8List createSampleJson(final size) {
4149
}
4250

4351
class JsonDecodeRequest {
52+
final bool useSendAndExit;
4453
final SendPort sendPort;
4554
final Uint8List encodedJson;
46-
const JsonDecodeRequest(this.sendPort, this.encodedJson);
55+
const JsonDecodeRequest(this.useSendAndExit, this.sendPort, this.encodedJson);
4756
}
4857

49-
Future<Map> decodeJson(Uint8List encodedJson) async {
58+
Future<Map> decodeJson(bool useSendAndExit, Uint8List encodedJson) async {
5059
final port = ReceivePort();
5160
final inbox = StreamIterator<dynamic>(port);
52-
final workerExitedPort = ReceivePort();
53-
await Isolate.spawn(
54-
jsonDecodingIsolate, JsonDecodeRequest(port.sendPort, encodedJson),
55-
onExit: workerExitedPort.sendPort);
61+
final completer = Completer<bool>();
62+
final workerExitedPort = RawReceivePort((v) {
63+
completer.complete(true);
64+
});
65+
final workerErroredPort = RawReceivePort((v) {
66+
stderr.writeln('worker errored out $v');
67+
completer.completeError(true);
68+
});
69+
await Isolate.spawn(jsonDecodingIsolate,
70+
JsonDecodeRequest(useSendAndExit, port.sendPort, encodedJson),
71+
onError: workerErroredPort.sendPort, onExit: workerExitedPort.sendPort);
72+
await completer.future;
73+
workerExitedPort.close();
74+
workerErroredPort.close();
5675
await inbox.moveNext();
5776
final decodedJson = inbox.current;
58-
await workerExitedPort.first;
5977
port.close();
6078
return decodedJson;
6179
}
6280

6381
Future<void> jsonDecodingIsolate(JsonDecodeRequest request) async {
64-
request.sendPort.send(json.decode(utf8.decode(request.encodedJson)));
82+
final result = json.decode(utf8.decode(request.encodedJson));
83+
if (request.useSendAndExit) {
84+
sendAndExit(request.sendPort, result);
85+
} else {
86+
request.sendPort.send(result);
87+
}
6588
}
6689

6790
class SyncJsonDecodingBenchmark extends BenchmarkBase {
@@ -118,6 +141,13 @@ Future<void> main() async {
118141
for (final iterations in <int>[1, 4]) {
119142
await JsonDecodingBenchmark(
120143
"IsolateJson.Decode${config.suffix}x$iterations",
144+
useSendAndExit: false,
145+
sample: config.sample,
146+
numTasks: iterations)
147+
.report();
148+
await JsonDecodingBenchmark(
149+
"IsolateJson.SendAndExit_Decode${config.suffix}x$iterations",
150+
useSendAndExit: true,
121151
sample: config.sample,
122152
numTasks: iterations)
123153
.report();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export 'dart:_internal' show sendAndExit;

pkg/vm/bin/kernel_service.dart

+1
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,7 @@ Future _processLoadRequest(request) async {
691691
prepend = ", ";
692692
if (sb.length > 256) break;
693693
}
694+
sb.write("]");
694695
partToString = sb.toString();
695696
} else {
696697
partToString = part.toString();

runtime/lib/isolate.cc

+148
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "vm/dart_api_message.h"
1616
#include "vm/dart_entry.h"
1717
#include "vm/exceptions.h"
18+
#include "vm/hash_table.h"
1819
#include "vm/lockers.h"
1920
#include "vm/longjump.h"
2021
#include "vm/message_handler.h"
@@ -108,6 +109,153 @@ DEFINE_NATIVE_ENTRY(SendPortImpl_sendInternal_, 0, 2) {
108109
return Object::null();
109110
}
110111

112+
class RawObjectPtrSetTraits {
113+
public:
114+
static bool ReportStats() { return false; }
115+
static const char* Name() { return "RawObjectPtrSetTraits"; }
116+
117+
static bool IsMatch(const RawObject* a, const RawObject* b) { return a == b; }
118+
119+
static uword Hash(const RawObject* obj) {
120+
return reinterpret_cast<uword>(obj);
121+
}
122+
};
123+
124+
static RawObject* ValidateMessageObject(Zone* zone,
125+
Isolate* isolate,
126+
const Object& obj) {
127+
TIMELINE_DURATION(Thread::Current(), Isolate, "ValidateMessageObject");
128+
129+
class SendMessageValidator : public ObjectPointerVisitor {
130+
public:
131+
SendMessageValidator(IsolateGroup* isolate_group,
132+
WeakTable* visited,
133+
MallocGrowableArray<RawObject*>* const working_set)
134+
: ObjectPointerVisitor(isolate_group),
135+
visited_(visited),
136+
working_set_(working_set) {}
137+
138+
private:
139+
void VisitPointers(RawObject** from, RawObject** to) {
140+
for (RawObject** raw = from; raw <= to; raw++) {
141+
if (!(*raw)->IsHeapObject() || (*raw)->IsCanonical()) {
142+
continue;
143+
}
144+
if (visited_->GetValueExclusive(*raw) == 1) {
145+
continue;
146+
}
147+
visited_->SetValueExclusive(*raw, 1);
148+
working_set_->Add(*raw);
149+
}
150+
}
151+
152+
WeakTable* visited_;
153+
MallocGrowableArray<RawObject*>* const working_set_;
154+
};
155+
if (!obj.raw()->IsHeapObject() || obj.raw()->IsCanonical()) {
156+
return obj.raw();
157+
}
158+
ClassTable* class_table = isolate->class_table();
159+
160+
Class& klass = Class::Handle(zone);
161+
Closure& closure = Closure::Handle(zone);
162+
163+
MallocGrowableArray<RawObject*> working_set;
164+
std::unique_ptr<WeakTable> visited(new WeakTable());
165+
166+
NoSafepointScope no_safepoint;
167+
SendMessageValidator visitor(isolate->group(), visited.get(), &working_set);
168+
169+
visited->SetValueExclusive(obj.raw(), 1);
170+
working_set.Add(obj.raw());
171+
172+
while (!working_set.is_empty()) {
173+
RawObject* raw = working_set.RemoveLast();
174+
175+
if (visited->GetValueExclusive(raw) > 0) {
176+
continue;
177+
}
178+
visited->SetValueExclusive(raw, 1);
179+
180+
const intptr_t cid = raw->GetClassId();
181+
switch (cid) {
182+
// List below matches the one in raw_object_snapshot.cc
183+
#define MESSAGE_SNAPSHOT_ILLEGAL(type) \
184+
return Exceptions::CreateUnhandledException( \
185+
zone, Exceptions::kArgumentValue, \
186+
"Illegal argument in isolate message : (object is a " #type ")"); \
187+
break;
188+
189+
MESSAGE_SNAPSHOT_ILLEGAL(DynamicLibrary);
190+
MESSAGE_SNAPSHOT_ILLEGAL(MirrorReference);
191+
MESSAGE_SNAPSHOT_ILLEGAL(Pointer);
192+
MESSAGE_SNAPSHOT_ILLEGAL(ReceivePort);
193+
MESSAGE_SNAPSHOT_ILLEGAL(RegExp);
194+
MESSAGE_SNAPSHOT_ILLEGAL(StackTrace);
195+
MESSAGE_SNAPSHOT_ILLEGAL(UserTag);
196+
197+
case kClosureCid: {
198+
closure = Closure::RawCast(raw);
199+
RawFunction* func = closure.function();
200+
// We only allow closure of top level methods or static functions in a
201+
// class to be sent in isolate messages.
202+
if (!Function::IsImplicitStaticClosureFunction(func)) {
203+
return Exceptions::CreateUnhandledException(
204+
zone, Exceptions::kArgumentValue, "Closures are not allowed");
205+
}
206+
break;
207+
}
208+
default:
209+
if (cid >= kNumPredefinedCids) {
210+
klass = class_table->At(cid);
211+
if (klass.num_native_fields() != 0) {
212+
return Exceptions::CreateUnhandledException(
213+
zone, Exceptions::kArgumentValue,
214+
"Objects that extend NativeWrapper are not allowed");
215+
}
216+
}
217+
}
218+
raw->VisitPointers(&visitor);
219+
}
220+
isolate->set_forward_table_new(nullptr);
221+
return obj.raw();
222+
}
223+
224+
DEFINE_NATIVE_ENTRY(SendPortImpl_sendAndExitInternal_, 0, 2) {
225+
GET_NON_NULL_NATIVE_ARGUMENT(SendPort, port, arguments->NativeArgAt(0));
226+
if (!PortMap::IsReceiverInThisIsolateGroup(port.Id(), isolate->group())) {
227+
const auto& error =
228+
String::Handle(String::New("sendAndExit is only supported across "
229+
"isolates spawned via spawnFunction."));
230+
Exceptions::ThrowArgumentError(error);
231+
UNREACHABLE();
232+
}
233+
234+
GET_NON_NULL_NATIVE_ARGUMENT(Instance, obj, arguments->NativeArgAt(1));
235+
236+
Object& validated_result = Object::Handle(zone);
237+
Object& msg_obj = Object::Handle(zone, obj.raw());
238+
validated_result = ValidateMessageObject(zone, isolate, msg_obj);
239+
if (validated_result.IsUnhandledException()) {
240+
Exceptions::PropagateError(Error::Cast(validated_result));
241+
UNREACHABLE();
242+
}
243+
PersistentHandle* handle =
244+
isolate->group()->api_state()->AllocatePersistentHandle();
245+
handle->set_raw(msg_obj);
246+
isolate->bequeath(std::unique_ptr<Bequest>(new Bequest(handle, port.Id())));
247+
// TODO(aam): Ensure there are no dart api calls after this point as we want
248+
// to ensure that validated message won't get tampered with.
249+
Isolate::KillIfExists(isolate, Isolate::LibMsgId::kKillMsg);
250+
// Drain interrupts before running so any IMMEDIATE operations on the current
251+
// isolate happen synchronously.
252+
const Error& error = Error::Handle(thread->HandleInterrupts());
253+
RELEASE_ASSERT(error.IsUnwindError());
254+
Exceptions::PropagateError(error);
255+
// We will never execute dart code again in this isolate.
256+
return Object::null();
257+
}
258+
111259
static void ThrowIsolateSpawnException(const String& message) {
112260
const Array& args = Array::Handle(Array::New(1));
113261
args.SetAt(0, message);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
//
5+
// VMOptions=--enable-isolate-groups
6+
//
7+
// Validates functionality of sendAndExit.
8+
9+
import 'dart:_internal' show sendAndExit;
10+
import 'dart:async';
11+
import 'dart:isolate';
12+
import 'dart:nativewrappers';
13+
14+
import "package:expect/expect.dart";
15+
16+
doNothingWorker(data) {}
17+
18+
spawnWorker(worker, data) async {
19+
Completer completer = Completer();
20+
runZoned(() async {
21+
final isolate = await Isolate.spawn(worker, [data]);
22+
completer.complete(isolate);
23+
}, onError: (e, st) => completer.complete(e));
24+
return await completer.future;
25+
}
26+
27+
verifyCantSendAnonymousClosure() async {
28+
final result = await spawnWorker(doNothingWorker, () {});
29+
Expect.equals(
30+
"Invalid argument(s): Illegal argument in isolate message :"
31+
" (object is a closure - Function '<anonymous closure>': static.)",
32+
result.toString());
33+
}
34+
35+
class NativeWrapperClass extends NativeFieldWrapperClass1 {}
36+
37+
verifyCantSendNative() async {
38+
final result = await spawnWorker(doNothingWorker, NativeWrapperClass());
39+
Expect.isTrue(result.toString().startsWith("Invalid argument(s): "
40+
"Illegal argument in isolate message : "
41+
"(object extends NativeWrapper"));
42+
}
43+
44+
verifyCantSendRegexp() async {
45+
var receivePort = ReceivePort();
46+
final result = await spawnWorker(doNothingWorker, receivePort);
47+
Expect.equals(
48+
"Invalid argument(s): Illegal argument in isolate message : "
49+
"(object is a ReceivePort)",
50+
result.toString());
51+
receivePort.close();
52+
}
53+
54+
class Message {
55+
SendPort sendPort;
56+
Function closure;
57+
58+
Message(this.sendPort, this.closure);
59+
}
60+
61+
add(a, b) => a + b;
62+
63+
worker(Message message) async {
64+
final port = new ReceivePort();
65+
final inbox = new StreamIterator<dynamic>(port);
66+
message.sendPort.send(message.closure(2, 3));
67+
port.close();
68+
}
69+
70+
verifyCanSendStaticMethod() async {
71+
final port = ReceivePort();
72+
final inbox = StreamIterator<dynamic>(port);
73+
final isolate = await Isolate.spawn(worker, Message(port.sendPort, add));
74+
75+
await inbox.moveNext();
76+
Expect.equals(inbox.current, 5);
77+
port.close();
78+
}
79+
80+
main() async {
81+
await verifyCantSendAnonymousClosure();
82+
await verifyCantSendNative();
83+
await verifyCantSendRegexp();
84+
await verifyCanSendStaticMethod();
85+
}

runtime/vm/bootstrap_natives.h

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ namespace dart {
5858
V(SendPortImpl_get_id, 1) \
5959
V(SendPortImpl_get_hashcode, 1) \
6060
V(SendPortImpl_sendInternal_, 2) \
61+
V(SendPortImpl_sendAndExitInternal_, 2) \
6162
V(Smi_bitAndFromSmi, 2) \
6263
V(Smi_bitNegate, 1) \
6364
V(Smi_bitLength, 1) \

runtime/vm/exceptions.cc

+12
Original file line numberDiff line numberDiff line change
@@ -1227,4 +1227,16 @@ RawObject* Exceptions::Create(ExceptionType type, const Array& arguments) {
12271227
*constructor_name, arguments);
12281228
}
12291229

1230+
RawUnhandledException* Exceptions::CreateUnhandledException(Zone* zone,
1231+
ExceptionType type,
1232+
const char* msg) {
1233+
const String& error_str = String::Handle(zone, String::New(msg));
1234+
const Array& args = Array::Handle(zone, Array::New(1));
1235+
args.SetAt(0, error_str);
1236+
1237+
Object& result = Object::Handle(zone, Exceptions::Create(type, args));
1238+
const StackTrace& stacktrace = StackTrace::Handle(zone);
1239+
return UnhandledException::New(Instance::Cast(result), stacktrace);
1240+
}
1241+
12301242
} // namespace dart

0 commit comments

Comments
 (0)