1
+ package io.github.debop.rsocket.server
2
+
3
+ import io.github.debop.rsocket.api.Message
4
+ import kotlinx.coroutines.delay
5
+ import kotlinx.coroutines.flow.Flow
6
+ import kotlinx.coroutines.flow.flow
7
+ import kotlinx.coroutines.runBlocking
8
+ import mu.KLogging
9
+ import org.junit.jupiter.api.Test
10
+ import org.springframework.beans.factory.annotation.Autowired
11
+ import org.springframework.boot.rsocket.context.LocalRSocketServerPort
12
+ import org.springframework.boot.test.context.SpringBootTest
13
+ import org.springframework.boot.test.context.SpringBootTest.WebEnvironment
14
+ import org.springframework.messaging.handler.annotation.MessageMapping
15
+ import org.springframework.messaging.rsocket.RSocketRequester
16
+ import org.springframework.messaging.rsocket.RSocketStrategies
17
+ import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler
18
+ import org.springframework.messaging.rsocket.connectTcpAndAwait
19
+ import org.springframework.messaging.rsocket.retrieveMono
20
+ import java.util.UUID
21
+
22
+
23
+ @SpringBootTest(webEnvironment = WebEnvironment .RANDOM_PORT )
24
+ class RSocketServerApplicationTest (
25
+ @Autowired private val builder : RSocketRequester .Builder ,
26
+ @Autowired private val strategies : RSocketStrategies ,
27
+ @LocalRSocketServerPort private val port : Int ,
28
+ ) {
29
+
30
+ val responder = RSocketMessageHandler .responder(strategies, ClientHandler ())
31
+
32
+ val requester = runBlocking {
33
+ builder
34
+ .setupRoute(" shell-client" )
35
+ .setupData(UUID .randomUUID().toString())
36
+ .rsocketConnector { it.acceptor(responder) }
37
+ .connectTcpAndAwait(" localhost" , port)
38
+ }
39
+
40
+ @Test
41
+ fun `fire and forget` () = runBlocking<Unit > {
42
+ val result = requester.route(" fire-and-forget" )
43
+ .data(Message (" TEST" , " Fire-And-Forget" ))
44
+ .retrieveMono<Void >()
45
+ }
46
+
47
+ class ClientHandler {
48
+
49
+ companion object : KLogging ()
50
+
51
+ @MessageMapping(" client-status" )
52
+ fun statusUpdate (status : String ): Flow <String > {
53
+ logger.info { " Connection: $status " }
54
+ return flow {
55
+ delay(5000 )
56
+ emit(Runtime .getRuntime().freeMemory().toString())
57
+ }
58
+ }
59
+ }
60
+ }
0 commit comments