Skip to content

Commit 8e18c03

Browse files
authored
Merge pull request eugenp#5855 from pandachris/master
BAEL-2283
2 parents ec7283c + d151552 commit 8e18c03

File tree

12 files changed

+568
-0
lines changed

12 files changed

+568
-0
lines changed

pom.xml

+2
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@
484484

485485
<module>reactor-core</module>
486486
<module>resteasy</module>
487+
<module>rsocket</module>
487488
<module>rxjava</module>
488489
<module>rxjava-2</module>
489490
<module>rabbitmq</module>
@@ -1039,6 +1040,7 @@
10391040

10401041
<module>reactor-core</module>
10411042
<module>resteasy</module>
1043+
<module>rsocket</module>
10421044
<module>rxjava</module>
10431045
<module>rxjava-2</module>
10441046
<module>rabbitmq</module>

rsocket/pom.xml

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<artifactId>rsocket</artifactId>
5+
<version>0.0.1-SNAPSHOT</version>
6+
<name>rsocket</name>
7+
8+
<parent>
9+
<groupId>com.baeldung</groupId>
10+
<artifactId>parent-modules</artifactId>
11+
<version>1.0.0-SNAPSHOT</version>
12+
</parent>
13+
<packaging>jar</packaging>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>io.rsocket</groupId>
18+
<artifactId>rsocket-core</artifactId>
19+
<version>0.11.13</version>
20+
</dependency>
21+
<dependency>
22+
<groupId>io.rsocket</groupId>
23+
<artifactId>rsocket-transport-netty</artifactId>
24+
<version>0.11.13</version>
25+
</dependency>
26+
<dependency>
27+
<groupId>junit</groupId>
28+
<artifactId>junit</artifactId>
29+
<version>4.12</version>
30+
<scope>test</scope>
31+
</dependency>
32+
<dependency>
33+
<groupId>org.hamcrest</groupId>
34+
<artifactId>hamcrest-core</artifactId>
35+
<version>1.3</version>
36+
<scope>test</scope>
37+
</dependency>
38+
<dependency>
39+
<groupId>ch.qos.logback</groupId>
40+
<artifactId>logback-classic</artifactId>
41+
<version>1.2.3</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>ch.qos.logback</groupId>
45+
<artifactId>logback-core</artifactId>
46+
<version>1.2.3</version>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.slf4j</groupId>
50+
<artifactId>slf4j-api</artifactId>
51+
<version>1.7.25</version>
52+
</dependency>
53+
</dependencies>
54+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.baeldung.rsocket;
2+
3+
import static com.baeldung.rsocket.support.Constants.*;
4+
import com.baeldung.rsocket.support.GameController;
5+
import io.rsocket.RSocket;
6+
import io.rsocket.RSocketFactory;
7+
import io.rsocket.transport.netty.client.TcpClientTransport;
8+
import reactor.core.publisher.Flux;
9+
10+
public class ChannelClient {
11+
12+
private final RSocket socket;
13+
private final GameController gameController;
14+
15+
public ChannelClient() {
16+
this.socket = RSocketFactory.connect()
17+
.transport(TcpClientTransport.create("localhost", TCP_PORT))
18+
.start()
19+
.block();
20+
21+
this.gameController = new GameController("Client Player");
22+
}
23+
24+
public void playGame() {
25+
socket.requestChannel(Flux.from(gameController))
26+
.doOnNext(gameController::processPayload)
27+
.blockLast();
28+
}
29+
30+
public void dispose() {
31+
this.socket.dispose();
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.baeldung.rsocket;
2+
3+
import static com.baeldung.rsocket.support.Constants.*;
4+
import io.rsocket.Payload;
5+
import io.rsocket.RSocket;
6+
import io.rsocket.RSocketFactory;
7+
import io.rsocket.transport.netty.client.TcpClientTransport;
8+
import io.rsocket.util.DefaultPayload;
9+
import java.nio.ByteBuffer;
10+
import java.time.Duration;
11+
import java.util.ArrayList;
12+
import java.util.Collections;
13+
import java.util.List;
14+
import java.util.function.Function;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
import reactor.core.publisher.Flux;
18+
19+
public class FireNForgetClient {
20+
21+
private static final Logger LOG = LoggerFactory.getLogger(FireNForgetClient.class);
22+
23+
private final RSocket socket;
24+
private final List<Float> data;
25+
26+
public FireNForgetClient() {
27+
this.socket = RSocketFactory.connect()
28+
.transport(TcpClientTransport.create("localhost", TCP_PORT))
29+
.start()
30+
.block();
31+
this.data = Collections.unmodifiableList(generateData());
32+
}
33+
34+
/**
35+
* Send binary velocity (float) every 50ms
36+
*/
37+
public void sendData() {
38+
Flux.interval(Duration.ofMillis(50))
39+
.take(data.size())
40+
.map(this::createFloatPayload)
41+
.flatMap(socket::fireAndForget)
42+
.blockLast();
43+
}
44+
45+
/**
46+
* Create a binary payload containing a single float value
47+
*
48+
* @param index Index into the data list
49+
* @return Payload ready to send to the server
50+
*/
51+
private Payload createFloatPayload(Long index) {
52+
float velocity = data.get(index.intValue());
53+
ByteBuffer buffer = ByteBuffer.allocate(4).putFloat(velocity);
54+
buffer.rewind();
55+
return DefaultPayload.create(buffer);
56+
}
57+
58+
/**
59+
* Generate sample data
60+
*
61+
* @return List of random floats
62+
*/
63+
private List<Float> generateData() {
64+
List<Float> dataList = new ArrayList<>(WIND_DATA_LENGTH);
65+
float velocity = 0;
66+
for (int i = 0; i < WIND_DATA_LENGTH; i++) {
67+
velocity += Math.random();
68+
dataList.add(velocity);
69+
}
70+
return dataList;
71+
}
72+
73+
/**
74+
* Get the data used for this client.
75+
*
76+
* @return list of data values
77+
*/
78+
public List<Float> getData() {
79+
return data;
80+
}
81+
82+
public void dispose() {
83+
this.socket.dispose();
84+
}
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.baeldung.rsocket;
2+
3+
import static com.baeldung.rsocket.support.Constants.*;
4+
import io.rsocket.Payload;
5+
import io.rsocket.RSocket;
6+
import io.rsocket.RSocketFactory;
7+
import io.rsocket.transport.netty.client.TcpClientTransport;
8+
import io.rsocket.util.DefaultPayload;
9+
10+
public class ReqResClient {
11+
12+
private final RSocket socket;
13+
14+
public ReqResClient() {
15+
this.socket = RSocketFactory.connect()
16+
.transport(TcpClientTransport.create("localhost", TCP_PORT))
17+
.start()
18+
.block();
19+
}
20+
21+
public String callBlocking(String string) {
22+
return socket
23+
.requestResponse(DefaultPayload.create(string))
24+
.map(Payload::getDataUtf8)
25+
.onErrorReturn(ERROR_MSG)
26+
.block();
27+
}
28+
29+
public void dispose() {
30+
this.socket.dispose();
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.baeldung.rsocket;
2+
3+
import static com.baeldung.rsocket.support.Constants.*;
4+
import io.rsocket.Payload;
5+
import io.rsocket.RSocket;
6+
import io.rsocket.RSocketFactory;
7+
import io.rsocket.transport.netty.client.TcpClientTransport;
8+
import io.rsocket.util.DefaultPayload;
9+
import reactor.core.publisher.Flux;
10+
11+
public class ReqStreamClient {
12+
13+
private final RSocket socket;
14+
15+
public ReqStreamClient() {
16+
this.socket = RSocketFactory.connect()
17+
.transport(TcpClientTransport.create("localhost", TCP_PORT))
18+
.start()
19+
.block();
20+
}
21+
22+
public Flux<Float> getDataStream() {
23+
return socket
24+
.requestStream(DefaultPayload.create(WIND_DATA_STREAM_NAME))
25+
.map(Payload::getData)
26+
.map(buf -> buf.getFloat())
27+
.onErrorReturn(null);
28+
}
29+
30+
public void dispose() {
31+
this.socket.dispose();
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package com.baeldung.rsocket;
2+
3+
import com.baeldung.rsocket.support.WindDataPublisher;
4+
import static com.baeldung.rsocket.support.Constants.*;
5+
import com.baeldung.rsocket.support.GameController;
6+
import io.rsocket.AbstractRSocket;
7+
import io.rsocket.Payload;
8+
import io.rsocket.RSocketFactory;
9+
import io.rsocket.transport.netty.server.TcpServerTransport;
10+
import org.reactivestreams.Publisher;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import reactor.core.Disposable;
14+
import reactor.core.publisher.Flux;
15+
import reactor.core.publisher.Mono;
16+
17+
public class Server {
18+
19+
private static final Logger LOG = LoggerFactory.getLogger(Server.class);
20+
21+
private final Disposable server;
22+
private final WindDataPublisher windDataPublisher = new WindDataPublisher();
23+
private final GameController gameController;
24+
25+
public Server() {
26+
this.server = RSocketFactory.receive()
27+
.acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
28+
.transport(TcpServerTransport.create("localhost", TCP_PORT))
29+
.start()
30+
.doOnNext(x -> LOG.info("Server started"))
31+
.subscribe();
32+
33+
this.gameController = new GameController("Server Player");
34+
}
35+
36+
public void dispose() {
37+
windDataPublisher.complete();
38+
this.server.dispose();
39+
}
40+
41+
/**
42+
* RSocket implementation
43+
*/
44+
private class RSocketImpl extends AbstractRSocket {
45+
46+
/**
47+
* Handle Request/Response messages
48+
*
49+
* @param payload Message payload
50+
* @return payload response
51+
*/
52+
@Override
53+
public Mono<Payload> requestResponse(Payload payload) {
54+
try {
55+
return Mono.just(payload); // reflect the payload back to the sender
56+
} catch (Exception x) {
57+
return Mono.error(x);
58+
}
59+
}
60+
61+
/**
62+
* Handle Fire-and-Forget messages
63+
*
64+
* @param payload Message payload
65+
* @return nothing
66+
*/
67+
@Override
68+
public Mono<Void> fireAndForget(Payload payload) {
69+
try {
70+
windDataPublisher.publish(payload); // forward the payload
71+
return Mono.empty();
72+
} catch (Exception x) {
73+
return Mono.error(x);
74+
}
75+
}
76+
77+
/**
78+
* Handle Request/Stream messages. Each request returns a new stream.
79+
*
80+
* @param payload Payload that can be used to determine which stream to return
81+
* @return Flux stream containing simulated wind speed data
82+
*/
83+
@Override
84+
public Flux<Payload> requestStream(Payload payload) {
85+
String streamName = payload.getDataUtf8();
86+
if (WIND_DATA_STREAM_NAME.equals(streamName)) {
87+
return Flux.from(windDataPublisher);
88+
}
89+
return Flux.error(new IllegalArgumentException(streamName));
90+
}
91+
92+
/**
93+
* Handle request for bidirectional channel
94+
*
95+
* @param payloads Stream of payloads delivered from the client
96+
* @return
97+
*/
98+
@Override
99+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
100+
Flux.from(payloads)
101+
.subscribe(gameController::processPayload);
102+
Flux<Payload> channel = Flux.from(gameController);
103+
return channel;
104+
}
105+
}
106+
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.baeldung.rsocket.support;
2+
3+
public interface Constants {
4+
5+
int TCP_PORT = 7101;
6+
String ERROR_MSG = "error";
7+
int WIND_DATA_LENGTH = 30;
8+
String WIND_DATA_STREAM_NAME = "wind-data";
9+
int SHOT_COUNT = 10;
10+
}

0 commit comments

Comments
 (0)