Skip to content

Commit a2e4c35

Browse files
committed
RSocket 통신 관련 예제 추가
1 parent 27ebfa6 commit a2e4c35

File tree

9 files changed

+350
-60
lines changed

9 files changed

+350
-60
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.github.debop.rsocket.client
2+
3+
import kotlinx.coroutines.delay
4+
import kotlinx.coroutines.flow.Flow
5+
import kotlinx.coroutines.flow.flow
6+
import kotlinx.coroutines.flow.onEach
7+
import mu.KLogging
8+
import org.springframework.beans.factory.annotation.Autowired
9+
import org.springframework.beans.factory.annotation.Qualifier
10+
import org.springframework.messaging.rsocket.RSocketRequester
11+
import org.springframework.messaging.rsocket.RSocketStrategies
12+
import org.springframework.shell.standard.ShellComponent
13+
14+
@ShellComponent
15+
class RSocketShellClient(
16+
@Autowired private val builder: RSocketRequester.Builder,
17+
@Qualifier("rSocketStrategies") private val strategies: RSocketStrategies,
18+
) {
19+
20+
companion object: KLogging() {
21+
const val CLIENT = "Client"
22+
const val REQUEST = "Request"
23+
const val FIRE_AND_FORGET = "Fire-And-Forget"
24+
const val STREAM = "Stream"
25+
}
26+
}
27+
28+
class ClientHandler {
29+
30+
companion object: KLogging()
31+
32+
fun statusUpdate(status: String): Flow<String> {
33+
logger.info { "Connection $status" }
34+
return flow {
35+
while (true) {
36+
emit(Runtime.getRuntime().freeMemory().toString())
37+
}
38+
}.onEach { delay(5000L) }
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.github.debop.rsocket.client
2+
3+
import org.springframework.boot.autoconfigure.SpringBootApplication
4+
import org.springframework.boot.autoconfigure.security.reactive.ReactiveSecurityAutoConfiguration
5+
import org.springframework.boot.autoconfigure.security.reactive.ReactiveUserDetailsServiceAutoConfiguration
6+
import org.springframework.boot.autoconfigure.security.rsocket.RSocketSecurityAutoConfiguration
7+
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration
8+
import org.springframework.boot.runApplication
9+
10+
@SpringBootApplication(exclude = [
11+
ReactiveUserDetailsServiceAutoConfiguration::class,
12+
SecurityAutoConfiguration::class,
13+
ReactiveSecurityAutoConfiguration::class,
14+
RSocketSecurityAutoConfiguration::class,
15+
])
16+
class RSocketShellClientApplication
17+
18+
fun main(vararg args: String) {
19+
runApplication<RSocketShellClientApplication>(*args)
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
2+
# To download the RSocket Client CLI
3+
wget -O rsc.jar https://github.com/making/rsc/releases/download/0.6.1/rsc-0.6.1.jar
4+
5+
# To make the client easier to wirk with, set an alias
6+
alias rsc='java -jar rsc.jar'
7+
8+
# for Mac use homebrew
9+
10+
brew install rsc
11+
12+
# To use the client to do request-response against a server on tcp://localhost:7000
13+
rsc --debug --request --data "{\"origin\":\"Client\",\"interaction\":\"Request\"}" --route request-response tcp://localhost:7000
14+
15+
# To use the client to do fire-and-forget against the same server
16+
rsc --debug --fnf --data "{\"origin\":\"Client\",\"interaction\":\"Fire And Forget\"}" --route fire-and-forget tcp://localhost:7000
+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
usage: rsc Uri [Options]
2+
3+
Non-option arguments:
4+
[String: Uri]
5+
6+
Option Description
7+
------ -----------
8+
--channel Shortcut of --im REQUEST_CHANNEL
9+
-d, --data [String] Data. Use '-' to read data from
10+
standard input. (default: )
11+
--dataMimeType, --dmt [String] MimeType for data (default:
12+
application/json)
13+
--debug Enable FrameLogger
14+
--delayElements [Long] Enable delayElements(delay) in milli
15+
seconds
16+
--fnf Shortcut of --im FIRE_AND_FORGET
17+
--help Print help
18+
--im, --interactionModel InteractionModel (default:
19+
[InteractionModel] REQUEST_RESPONSE)
20+
--limitRate [Integer] Enable limitRate(rate)
21+
--log [String] Enable log()
22+
-m, --metadata [String] Metadata (default: )
23+
--metadataMimeType, --mmt [String] MimeType for metadata (default:
24+
text/plain)
25+
-q, --quiet Disable the output on next
26+
-r, --route [String] Routing Metadata Extension
27+
--request Shortcut of --im REQUEST_RESPONSE
28+
--resume [Integer] Enable resume. Resume session duration
29+
can be configured in seconds. Unless
30+
the duration is specified, the
31+
default value (2min) is used.
32+
-s, --setup [String] Setup payload
33+
--show-system-properties Show SystemProperties for troubleshoot
34+
--stacktrace Show Stacktrace when an exception
35+
happens
36+
--stream Shortcut of --im REQUEST_STREAM
37+
--take [Integer] Enable take(n)
38+
-v, --version Print version
39+
-w, --wiretap Enable wiretap
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
POST http://localhost:7000/request-response
2+
Content-Type: application/json
3+
4+
{
5+
"origin": "Client",
6+
"interaction": "Request"
7+
}
8+
9+
###

rsocket/rsocket-communication/server/src/main/kotlin/io/github/debop/rsocket/server/RSocketController.kt

+73
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,21 @@
11
package io.github.debop.rsocket.server
22

33
import io.github.debop.rsocket.api.Message
4+
import kotlinx.coroutines.ExperimentalCoroutinesApi
5+
import kotlinx.coroutines.delay
6+
import kotlinx.coroutines.flow.Flow
7+
import kotlinx.coroutines.flow.flatMapLatest
8+
import kotlinx.coroutines.flow.flow
9+
import kotlinx.coroutines.flow.onCompletion
10+
import kotlinx.coroutines.flow.onEach
411
import mu.KLogging
512
import org.springframework.messaging.handler.annotation.MessageMapping
13+
import org.springframework.messaging.handler.annotation.Payload
614
import org.springframework.messaging.rsocket.RSocketRequester
15+
import org.springframework.messaging.rsocket.annotation.ConnectMapping
16+
import org.springframework.messaging.rsocket.retrieveFlux
717
import org.springframework.stereotype.Controller
18+
import java.time.Duration
819
import javax.annotation.PreDestroy
920

1021
@Controller
@@ -28,6 +39,34 @@ class RSocketController {
2839
logger.info { "Shutting down." }
2940
}
3041

42+
/**
43+
* Shell client 가 접속했을 때, 접속되었음을 알려준다.
44+
* client 에서는 접속 후에 주기적으로 free memory를 서버로 전송한다.
45+
*
46+
* @param requester
47+
* @param client
48+
*/
49+
@ConnectMapping("shell-client")
50+
fun connectShellClientAndAskForTelemetry(requester: RSocketRequester, @Payload client: String) {
51+
requester.rsocket()!!
52+
.onClose()
53+
.doFirst {
54+
// Add all new clients to a client list
55+
logger.info { "Client: $client CONNECTED." }
56+
clients.add(requester)
57+
}
58+
.doOnEach { }
59+
.doFinally { }
60+
.subscribe()
61+
62+
// Callback to client, confirming connection
63+
requester.route("client-status")
64+
.data("OPEN")
65+
.retrieveFlux<String>()
66+
.doOnNext { logger.info { "Client: $client Free Memory: $it" } }
67+
.subscribe()
68+
}
69+
3170
@MessageMapping("request-response")
3271
suspend fun requestResponse(request: Message): Message {
3372
logger.debug { "Received request-response request: $request" }
@@ -38,4 +77,38 @@ class RSocketController {
3877
suspend fun fireAndForget(request: Message) {
3978
logger.debug { "Received fire-and-forget request: $request" }
4079
}
80+
81+
@ExperimentalCoroutinesApi
82+
@MessageMapping("stream")
83+
suspend fun stream(request: Message): Flow<Message> {
84+
logger.info { "Received stream request: $request" }
85+
86+
return flow {
87+
var index = 0L
88+
while (true) {
89+
val message = Message(SERVER, STREAM, index++)
90+
logger.info { "Send stream response: $message" }
91+
emit(message)
92+
}
93+
}
94+
.onEach { delay(1000L) }
95+
}
96+
97+
@ExperimentalCoroutinesApi
98+
@MessageMapping("channel")
99+
suspend fun channel(settings: Flow<Duration>): Flow<Message> {
100+
logger.info { "Received channel request ... " }
101+
102+
return settings
103+
.onEach { logger.info { "Channel frequency setting is ${it.seconds}" } }
104+
.onCompletion { logger.warn("The client cancelled the channel.") }
105+
.flatMapLatest { setting ->
106+
flow {
107+
var index = 0L
108+
while (true) {
109+
emit(Message(SERVER, CHANNEL, index++))
110+
}
111+
}.onEach { delay(setting.toMillis()) }
112+
}
113+
}
41114
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package io.github.debop.rsocket.server
2+
3+
import io.github.debop.rsocket.api.Message
4+
import io.rsocket.SocketAcceptor
5+
import io.rsocket.core.RSocketServer
6+
import io.rsocket.frame.decoder.PayloadDecoder
7+
import io.rsocket.metadata.WellKnownMimeType
8+
import io.rsocket.transport.netty.server.CloseableChannel
9+
import io.rsocket.transport.netty.server.TcpServerTransport
10+
import kotlinx.coroutines.delay
11+
import kotlinx.coroutines.flow.Flow
12+
import kotlinx.coroutines.flow.collect
13+
import kotlinx.coroutines.flow.collectIndexed
14+
import kotlinx.coroutines.flow.flow
15+
import kotlinx.coroutines.flow.take
16+
import kotlinx.coroutines.reactive.asFlow
17+
import kotlinx.coroutines.runBlocking
18+
import mu.KLogging
19+
import org.amshove.kluent.shouldBeEqualTo
20+
import org.junit.jupiter.api.AfterAll
21+
import org.junit.jupiter.api.Test
22+
import org.springframework.beans.factory.getBean
23+
import org.springframework.context.annotation.AnnotationConfigApplicationContext
24+
import org.springframework.context.annotation.Bean
25+
import org.springframework.context.annotation.Configuration
26+
import org.springframework.http.codec.cbor.Jackson2CborDecoder
27+
import org.springframework.http.codec.cbor.Jackson2CborEncoder
28+
import org.springframework.messaging.handler.annotation.MessageMapping
29+
import org.springframework.messaging.rsocket.RSocketRequester
30+
import org.springframework.messaging.rsocket.RSocketStrategies
31+
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler
32+
import org.springframework.messaging.rsocket.retrieveAndAwait
33+
import org.springframework.messaging.rsocket.retrieveAndAwaitOrNull
34+
import org.springframework.messaging.rsocket.retrieveFlow
35+
import org.springframework.util.MimeTypeUtils
36+
import reactor.core.publisher.Flux
37+
import reactor.core.publisher.Mono
38+
import java.time.Duration
39+
40+
class RSocketControllerTest {
41+
42+
companion object: KLogging()
43+
44+
class ClientHandler {
45+
46+
companion object: KLogging()
47+
48+
@MessageMapping("client-status")
49+
fun statusUpdate(status: String): Flow<String> {
50+
logger.info { "Connection: $status" }
51+
return flow {
52+
delay(5000)
53+
emit(Runtime.getRuntime().freeMemory().toString())
54+
}
55+
}
56+
}
57+
58+
@Configuration
59+
class ServerConfig {
60+
@Bean
61+
fun controller(): RSocketController = RSocketController()
62+
63+
@Bean
64+
fun messageHandler(): RSocketMessageHandler = RSocketMessageHandler().apply {
65+
rSocketStrategies = rsocketStrategies()
66+
}
67+
68+
@Bean
69+
fun rsocketStrategies(): RSocketStrategies =
70+
RSocketStrategies.builder()
71+
.encoder(Jackson2CborEncoder())
72+
.decoder(Jackson2CborDecoder())
73+
.build()
74+
}
75+
76+
// 자제적인 Bean 설정을 가지고 테스트를 수행합니다.
77+
private val metadataMimeType = MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.string)
78+
private val context = AnnotationConfigApplicationContext(ServerConfig::class.java)
79+
private val messageHandler = context.getBean<RSocketMessageHandler>()
80+
private val responder: SocketAcceptor = messageHandler.responder()
81+
82+
private val server: CloseableChannel =
83+
RSocketServer.create(responder)
84+
.payloadDecoder(PayloadDecoder.ZERO_COPY)
85+
.bind(TcpServerTransport.create("localhost", 7001))
86+
.block()!!
87+
88+
private val requester: RSocketRequester =
89+
RSocketRequester.builder()
90+
.metadataMimeType(metadataMimeType)
91+
.rsocketStrategies(context.getBean<RSocketStrategies>())
92+
.tcp("localhost", 7001)
93+
94+
@AfterAll
95+
fun cleanup() {
96+
requester.rsocket()?.dispose()
97+
server.dispose()
98+
}
99+
100+
@Test
101+
fun `fire and forget`() = runBlocking<Unit> {
102+
val result = requester.route("fire-and-forget")
103+
.data(Message("TEST", "Fire-And-Forget"))
104+
.retrieveAndAwaitOrNull<Unit>()
105+
}
106+
107+
@Test
108+
fun `request get response`() = runBlocking<Unit> {
109+
val result = requester.route("request-response")
110+
.data(Message("TEST", "Request"))
111+
.retrieveAndAwait<Message>()
112+
113+
result shouldBeEqualTo Message(RSocketController.SERVER, RSocketController.RESPONSE, 0)
114+
}
115+
116+
@Test
117+
fun `request get stream`() = runBlocking<Unit> {
118+
val result = requester.route("stream")
119+
.data(Message("TEST", "Stream"))
120+
.retrieveFlow<Message>()
121+
122+
result.take(5).collectIndexed { i, m ->
123+
m shouldBeEqualTo Message(RSocketController.SERVER, RSocketController.STREAM, i.toLong())
124+
}
125+
}
126+
127+
@Test
128+
fun `channel - stream get stream`() = runBlocking<Unit> {
129+
val setting1 = Mono.just(Duration.ofSeconds(6)).delayElement(Duration.ofSeconds(0))
130+
val setting2 = Mono.just(Duration.ofSeconds(6)).delayElement(Duration.ofSeconds(9))
131+
val settings = Flux.concat(setting1, setting2).asFlow()
132+
133+
val result = requester.route("channel")
134+
.data(settings)
135+
.retrieveFlow<Message>()
136+
137+
try {
138+
result.take(2).collect {
139+
it shouldBeEqualTo Message(RSocketController.SERVER, RSocketController.CHANNEL, 0L)
140+
}
141+
} catch (e: Exception) {
142+
logger.error(e) { "Canceled." }
143+
}
144+
}
145+
146+
147+
}

0 commit comments

Comments
 (0)