Skip to content

Commit 0e27df0

Browse files
committed
adds *FireAndForgetMono's stress tests
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 3c0f788 commit 0e27df0

9 files changed

+769
-4
lines changed

rsocket-core/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ dependencies {
4242

4343
testImplementation 'org.hamcrest:hamcrest-library'
4444

45+
jcstressImplementation(project(":rsocket-test"))
4546
jcstressImplementation "ch.qos.logback:logback-classic"
4647
}
4748

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package io.rsocket.core;
2+
3+
import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;
4+
5+
import io.netty.buffer.ByteBuf;
6+
import io.rsocket.test.TestDuplexConnection;
7+
import org.openjdk.jcstress.annotations.Actor;
8+
import org.openjdk.jcstress.annotations.Arbiter;
9+
import org.openjdk.jcstress.annotations.JCStressTest;
10+
import org.openjdk.jcstress.annotations.Outcome;
11+
import org.openjdk.jcstress.annotations.State;
12+
import org.openjdk.jcstress.infra.results.LLLL_Result;
13+
14+
public abstract class FireAndForgetRequesterMonoStressTest {
15+
16+
abstract static class BaseStressTest {
17+
18+
final StressSubscriber<ByteBuf> outboundSubscriber = new StressSubscriber<>();
19+
20+
final StressSubscriber<Void> stressSubscriber = new StressSubscriber<>();
21+
22+
final TestDuplexConnection testDuplexConnection =
23+
new TestDuplexConnection(this.outboundSubscriber, false);
24+
25+
final TestRequesterResponderSupport requesterResponderSupport =
26+
new TestRequesterResponderSupport(testDuplexConnection, StreamIdSupplier.clientSupplier());
27+
28+
final FireAndForgetRequesterMono source = source();
29+
30+
abstract FireAndForgetRequesterMono source();
31+
}
32+
33+
@JCStressTest
34+
@Outcome(
35+
id = {"-9223372036854775808, 3, 1, 0"},
36+
expect = ACCEPTABLE)
37+
@State
38+
public static class TwoSubscribesRaceStressTest extends BaseStressTest {
39+
40+
final StressSubscriber<Void> stressSubscriber1 = new StressSubscriber<>();
41+
42+
@Override
43+
FireAndForgetRequesterMono source() {
44+
return new FireAndForgetRequesterMono(
45+
UnpooledByteBufPayload.create(
46+
"test-data", "test-metadata", this.requesterResponderSupport.getAllocator()),
47+
this.requesterResponderSupport);
48+
}
49+
50+
@Actor
51+
public void subscribe1() {
52+
this.source.subscribe(this.stressSubscriber);
53+
}
54+
55+
@Actor
56+
public void subscribe2() {
57+
this.source.subscribe(this.stressSubscriber1);
58+
}
59+
60+
@Arbiter
61+
public void arbiter(LLLL_Result r) {
62+
r.r1 = this.source.state;
63+
r.r2 =
64+
this.stressSubscriber.onCompleteCalls
65+
+ this.stressSubscriber.onErrorCalls * 2
66+
+ this.stressSubscriber1.onCompleteCalls
67+
+ this.stressSubscriber1.onErrorCalls * 2;
68+
r.r3 = this.outboundSubscriber.onNextCalls;
69+
r.r4 = this.source.payload.refCnt();
70+
71+
this.outboundSubscriber.values.forEach(ByteBuf::release);
72+
}
73+
}
74+
75+
@JCStressTest
76+
@Outcome(
77+
id = {"-9223372036854775808, 1, 1, 0"},
78+
expect = ACCEPTABLE,
79+
desc = "frame delivered before cancellation")
80+
@Outcome(
81+
id = {"-9223372036854775808, 0, 0, 0"},
82+
expect = ACCEPTABLE,
83+
desc = "cancellation happened first")
84+
@State
85+
public static class SubscribeAndCancelRaceStressTest extends BaseStressTest {
86+
87+
@Override
88+
FireAndForgetRequesterMono source() {
89+
return new FireAndForgetRequesterMono(
90+
UnpooledByteBufPayload.create(
91+
"test-data", "test-metadata", this.requesterResponderSupport.getAllocator()),
92+
this.requesterResponderSupport);
93+
}
94+
95+
@Actor
96+
public void subscribe() {
97+
this.source.subscribe(this.stressSubscriber);
98+
}
99+
100+
@Actor
101+
public void cancel() {
102+
this.stressSubscriber.cancel();
103+
}
104+
105+
@Arbiter
106+
public void arbiter(LLLL_Result r) {
107+
r.r1 = this.source.state;
108+
r.r2 = this.stressSubscriber.onCompleteCalls + this.stressSubscriber.onErrorCalls * 2;
109+
r.r3 = this.outboundSubscriber.onNextCalls;
110+
r.r4 = this.source.payload.refCnt();
111+
112+
this.outboundSubscriber.values.forEach(ByteBuf::release);
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)