Skip to content

Commit 6d18ee3

Browse files
committed
Change UX to be more compositional
1 parent 0a8291b commit 6d18ee3

File tree

5 files changed

+58
-81
lines changed

5 files changed

+58
-81
lines changed

examples/client/src/examples/client/ClientMain.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ object ClientMain extends IOApp.Simple {
3939
// Starting the server
4040
rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar)
4141
// Creating a channel that will be used to communicate to the server
42-
fs2Channel <- FS2Channel
43-
.lspCompliant[IO](rp.stdout, rp.stdin, cancelTemplate = cancelEndpoint.some)
44-
.concurrently(rp.stderr.through(fs2.io.stderr))
45-
// Opening the stream to be able to send and receive data
46-
_ <- fs2Channel.openStream
42+
fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some)
43+
_ <- Stream(())
44+
.concurrently(fs2Channel.output.through(lsp.encodePayloads).through(rp.stdin))
45+
.concurrently(rp.stdout.through(lsp.decodePayloads).through(fs2Channel.input))
46+
.concurrently(rp.stderr.through(fs2.io.stderr[IO]))
4747
// Creating a `IntWrapper => IO[IntWrapper]` stub that can call the server
4848
increment = fs2Channel.simpleStub[IntWrapper, IntWrapper]("increment")
4949
result1 <- Stream.eval(increment(IntWrapper(0)))

examples/server/src/examples/server/ServerMain.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,14 @@ object ServerMain extends IOApp.Simple {
2929
def run: IO[Unit] = {
3030
// Using errorln as stdout is used by the RPC channel
3131
IO.consoleForIO.errorln("Starting server") >>
32-
FS2Channel
33-
.lspCompliant[IO](fs2.io.stdin[IO](bufSize = 512), fs2.io.stdout[IO], cancelTemplate = cancelEndpoint.some)
32+
FS2Channel[IO](cancelTemplate = Some(cancelEndpoint))
3433
.flatMap(_.withEndpointStream(increment)) // mounting an endpoint onto the channel
35-
.flatMap(_.openStreamForever) // starts the communication
34+
.flatMap(channel =>
35+
fs2.Stream
36+
.eval(IO.never) // running the server forever
37+
.concurrently(stdin[IO](512).through(lsp.decodePayloads).through(channel.input))
38+
.concurrently(channel.output.through(lsp.encodePayloads).through(stdout[IO]))
39+
)
3640
.compile
3741
.drain
3842
.guarantee(IO.consoleForIO.errorln("Terminating server"))

fs2/src/jsonrpclib/fs2/FS2Channel.scala

+19-30
Original file line numberDiff line numberDiff line change
@@ -16,50 +16,45 @@ import jsonrpclib.internals.MessageDispatcher
1616
import jsonrpclib.internals._
1717

1818
import scala.util.Try
19-
import _root_.fs2.concurrent.SignallingRef
2019

2120
trait FS2Channel[F[_]] extends Channel[F] {
21+
22+
def input: Pipe[F, Payload, Unit]
23+
def output: Stream[F, Payload]
24+
2225
def withEndpoint(endpoint: Endpoint[F])(implicit F: Functor[F]): Resource[F, FS2Channel[F]] =
2326
Resource.make(mountEndpoint(endpoint))(_ => unmountEndpoint(endpoint.method)).map(_ => this)
2427

2528
def withEndpointStream(endpoint: Endpoint[F])(implicit F: MonadCancelThrow[F]): Stream[F, FS2Channel[F]] =
2629
Stream.resource(withEndpoint(endpoint))
2730

2831
def withEndpoints(endpoint: Endpoint[F], rest: Endpoint[F]*)(implicit F: Monad[F]): Resource[F, FS2Channel[F]] =
29-
(endpoint :: rest.toList).traverse_(withEndpoint).as(this)
32+
withEndpoints(endpoint +: rest)
33+
34+
def withEndpoints(endpoints: Seq[Endpoint[F]])(implicit F: Monad[F]): Resource[F, FS2Channel[F]] =
35+
endpoints.toList.traverse_(withEndpoint).as(this)
3036

3137
def withEndpointStream(endpoint: Endpoint[F], rest: Endpoint[F]*)(implicit
3238
F: MonadCancelThrow[F]
3339
): Stream[F, FS2Channel[F]] =
3440
Stream.resource(withEndpoints(endpoint, rest: _*))
3541

36-
def open: Resource[F, Unit]
37-
def openStream: Stream[F, Unit]
38-
def openStreamForever: Stream[F, Nothing]
42+
def withEndpointsStream(endpoints: Seq[Endpoint[F]])(implicit F: MonadCancelThrow[F]): Stream[F, FS2Channel[F]] =
43+
Stream.resource(withEndpoints(endpoints))
44+
3945
}
4046

4147
object FS2Channel {
4248

43-
def lspCompliant[F[_]: Concurrent](
44-
byteStream: Stream[F, Byte],
45-
byteSink: Pipe[F, Byte, Unit],
46-
bufferSize: Int = 512,
47-
cancelTemplate: Option[CancelTemplate] = None
48-
): Stream[F, FS2Channel[F]] = internals.LSP.writeSink(byteSink, bufferSize).flatMap { sink =>
49-
apply[F](internals.LSP.readStream(byteStream), sink, cancelTemplate)
50-
}
51-
5249
def apply[F[_]: Concurrent](
53-
payloadStream: Stream[F, Payload],
54-
payloadSink: Payload => F[Unit],
50+
bufferSize: Int = 2048,
5551
cancelTemplate: Option[CancelTemplate] = None
5652
): Stream[F, FS2Channel[F]] = {
5753
for {
5854
supervisor <- Stream.resource(Supervisor[F])
5955
ref <- Ref[F].of(State[F](Map.empty, Map.empty, Map.empty, 0)).toStream
60-
isOpen <- SignallingRef[F].of(false).toStream
61-
awaitingSink = isOpen.waitUntil(identity) >> payloadSink(_: Payload)
62-
impl = new Impl(awaitingSink, ref, isOpen, supervisor, cancelTemplate)
56+
queue <- cats.effect.std.Queue.bounded[F, Payload](bufferSize).toStream
57+
impl = new Impl(queue, ref, supervisor, cancelTemplate)
6358

6459
// Creating a bespoke endpoint to receive cancelation requests
6560
maybeCancelEndpoint: Option[Endpoint[F]] = cancelTemplate.map { ct =>
@@ -71,10 +66,6 @@ object FS2Channel {
7166
}
7267
// mounting the cancelation endpoint
7368
_ <- maybeCancelEndpoint.traverse_(ep => impl.mountEndpoint(ep)).toStream
74-
_ <- Stream(()).concurrently {
75-
// Gatekeeping the pull until the channel is actually marked as open
76-
payloadStream.pauseWhen(isOpen.map(b => !b)).evalMap(impl.handleReceivedPayload)
77-
}
7869
} yield impl
7970
}
8071

@@ -107,15 +98,17 @@ object FS2Channel {
10798
}
10899

109100
private class Impl[F[_]](
110-
private val sink: Payload => F[Unit],
101+
private val queue: cats.effect.std.Queue[F, Payload],
111102
private val state: Ref[F, FS2Channel.State[F]],
112-
private val isOpen: SignallingRef[F, Boolean],
113103
supervisor: Supervisor[F],
114104
maybeCancelTemplate: Option[CancelTemplate]
115105
)(implicit F: Concurrent[F])
116106
extends MessageDispatcher[F]
117107
with FS2Channel[F] {
118108

109+
def output: Stream[F, Payload] = Stream.fromQueueUnterminated(queue)
110+
def input: Pipe[F, Payload, Unit] = _.evalMap(handleReceivedPayload)
111+
119112
def mountEndpoint(endpoint: Endpoint[F]): F[Unit] = state
120113
.modify(s =>
121114
s.mountEndpoint(endpoint) match {
@@ -127,10 +120,6 @@ object FS2Channel {
127120

128121
def unmountEndpoint(method: String): F[Unit] = state.update(_.removeEndpoint(method))
129122

130-
def open: Resource[F, Unit] = Resource.make[F, Unit](isOpen.set(true))(_ => isOpen.set(false))
131-
def openStream: Stream[F, Unit] = Stream.resource(open)
132-
def openStreamForever: Stream[F, Nothing] = openStream.evalMap(_ => F.never)
133-
134123
protected[fs2] def cancel(callId: CallId): F[Unit] = state.get.map(_.runningCalls.get(callId)).flatMap {
135124
case None => F.unit
136125
case Some(fiber) => fiber.cancel
@@ -147,7 +136,7 @@ object FS2Channel {
147136
}
148137
protected def reportError(params: Option[Payload], error: ProtocolError, method: String): F[Unit] = ???
149138
protected def getEndpoint(method: String): F[Option[Endpoint[F]]] = state.get.map(_.endpoints.get(method))
150-
protected def sendMessage(message: Message): F[Unit] = sink(Codec.encode(message))
139+
protected def sendMessage(message: Message): F[Unit] = queue.offer(Codec.encode(message))
151140

152141
protected def nextCallId(): F[CallId] = state.modify(_.nextCallId)
153142
protected def createPromise[A](callId: CallId): F[(Try[A] => F[Unit], () => F[A])] = Deferred[F, Try[A]].map {

fs2/src/jsonrpclib/fs2/internals/LSP.scala renamed to fs2/src/jsonrpclib/fs2/lsp.scala

+7-14
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,26 @@
1-
package jsonrpclib.fs2.internals
1+
package jsonrpclib.fs2
22

33
import cats.MonadThrow
4-
import cats.effect.Concurrent
5-
import cats.effect.std.Queue
64
import cats.implicits._
75
import fs2.Chunk
86
import fs2.Stream
7+
import fs2.Pipe
98
import jsonrpclib.Payload
109

1110
import java.nio.charset.Charset
1211
import java.nio.charset.StandardCharsets
1312

14-
object LSP {
13+
object lsp {
1514

16-
def writeSink[F[_]: Concurrent](
17-
writePipe: fs2.Pipe[F, Byte, Unit],
18-
bufferSize: Int
19-
): Stream[F, Payload => F[Unit]] =
20-
Stream.eval(Queue.bounded[F, Payload](bufferSize)).flatMap { queue =>
21-
val payloads = fs2.Stream.fromQueueUnterminated(queue, bufferSize)
22-
Stream(queue.offer(_)).concurrently(payloads.map(writeChunk).flatMap(Stream.chunk(_)).through(writePipe))
23-
}
15+
def encodePayloads[F[_]]: Pipe[F, Payload, Byte] =
16+
(_: Stream[F, Payload]).map(writeChunk).flatMap(Stream.chunk(_))
2417

2518
/** Split a stream of bytes into payloads by extracting each frame based on information contained in the headers.
2619
*
2720
* See https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#contentPart
2821
*/
29-
def readStream[F[_]: MonadThrow](bytes: Stream[F, Byte]): Stream[F, Payload] =
30-
bytes
22+
def decodePayloads[F[_]: MonadThrow]: Pipe[F, Byte, Payload] =
23+
(_: Stream[F, Byte])
3124
.scanChunks(ScanState.starting) { case (state, chunk) =>
3225
val (ns, maybeResult) = loop(state.concatChunk(chunk))
3326
(ns, Chunk(maybeResult))

fs2/test/src/jsonrpclib/fs2/FS2ChannelSpec.scala

+20-29
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package jsonrpclib.fs2
22

33
import cats.effect.IO
4-
import cats.effect.implicits._
5-
import cats.effect.std.Queue
64
import cats.syntax.all._
75
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
86
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
@@ -28,18 +26,28 @@ object FS2ChannelSpec extends SimpleIOSuite {
2826
def testRes(name: TestName)(run: Stream[IO, Expectations]): Unit =
2927
test(name)(run.compile.lastOrError.timeout(10.second))
3028

29+
type ClientSideChannel = FS2Channel[IO]
30+
def setup(endpoints: Endpoint[IO]*) = setupAux(endpoints, None)
31+
def setup(cancelTemplate: CancelTemplate, endpoints: Endpoint[IO]*) = setupAux(endpoints, Some(cancelTemplate))
32+
def setupAux(endpoints: Seq[Endpoint[IO]], cancelTemplate: Option[CancelTemplate]): Stream[IO, ClientSideChannel] = {
33+
for {
34+
serverSideChannel <- FS2Channel[IO](cancelTemplate = cancelTemplate)
35+
clientSideChannel <- FS2Channel[IO](cancelTemplate = cancelTemplate)
36+
_ <- serverSideChannel.withEndpointsStream(endpoints)
37+
_ <- Stream(())
38+
.concurrently(clientSideChannel.output.through(serverSideChannel.input))
39+
.concurrently(serverSideChannel.output.through(clientSideChannel.input))
40+
} yield {
41+
clientSideChannel
42+
}
43+
}
44+
3145
testRes("Round trip") {
3246
val endpoint: Endpoint[IO] = Endpoint[IO]("inc").simple((int: IntWrapper) => IO(IntWrapper(int.int + 1)))
3347

3448
for {
35-
stdout <- Queue.bounded[IO, Payload](10).toStream
36-
stdin <- Queue.bounded[IO, Payload](10).toStream
37-
serverSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdin), stdout.offer)
38-
clientSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdout), stdin.offer)
39-
_ <- serverSideChannel.withEndpoint(endpoint).asStream
49+
clientSideChannel <- setup(endpoint)
4050
remoteFunction = clientSideChannel.simpleStub[IntWrapper, IntWrapper]("inc")
41-
_ <- serverSideChannel.open.asStream
42-
_ <- clientSideChannel.open.asStream
4351
result <- remoteFunction(IntWrapper(1)).toStream
4452
} yield {
4553
expect.same(result, IntWrapper(2))
@@ -49,13 +57,8 @@ object FS2ChannelSpec extends SimpleIOSuite {
4957
testRes("Endpoint not mounted") {
5058

5159
for {
52-
stdout <- Queue.bounded[IO, Payload](10).toStream
53-
stdin <- Queue.bounded[IO, Payload](10).toStream
54-
serverSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdin), stdout.offer)
55-
clientSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdout), stdin.offer)
60+
clientSideChannel <- setup()
5661
remoteFunction = clientSideChannel.simpleStub[IntWrapper, IntWrapper]("inc")
57-
_ <- serverSideChannel.open.asStream
58-
_ <- clientSideChannel.open.asStream
5962
result <- remoteFunction(IntWrapper(1)).attempt.toStream
6063
} yield {
6164
expect.same(result, Left(ErrorPayload(-32601, "Method inc not found", None)))
@@ -70,14 +73,8 @@ object FS2ChannelSpec extends SimpleIOSuite {
7073
}
7174

7275
for {
73-
stdout <- Queue.bounded[IO, Payload](10).toStream
74-
stdin <- Queue.bounded[IO, Payload](10).toStream
75-
serverSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdin), payload => stdout.offer(payload))
76-
clientSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdout), payload => stdin.offer(payload))
77-
_ <- serverSideChannel.withEndpoint(endpoint).asStream
76+
clientSideChannel <- setup(endpoint)
7877
remoteFunction = clientSideChannel.simpleStub[IntWrapper, IntWrapper]("inc")
79-
_ <- serverSideChannel.open.asStream
80-
_ <- clientSideChannel.open.asStream
8178
timedResults <- (1 to 10).toList.map(IntWrapper(_)).parTraverse(remoteFunction).timed.toStream
8279
} yield {
8380
val (time, results) = timedResults
@@ -95,14 +92,8 @@ object FS2ChannelSpec extends SimpleIOSuite {
9592
IO.never.as(int).onCancel(canceledPromise.complete(int).void)
9693
)
9794

98-
stdin <- Queue.bounded[IO, Payload](10).toStream
99-
stdout <- Queue.bounded[IO, Payload](10).toStream
100-
serverSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdin), stdout.offer, Some(cancelTemplate))
101-
clientSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdout), stdin.offer, Some(cancelTemplate))
102-
_ <- serverSideChannel.withEndpoint(endpoint).asStream
95+
clientSideChannel <- setup(cancelTemplate, endpoint)
10396
remoteFunction = clientSideChannel.simpleStub[IntWrapper, IntWrapper]("never")
104-
_ <- serverSideChannel.open.asStream
105-
_ <- clientSideChannel.open.asStream
10697
// Timeing-out client-call to verify cancelation progagates to server
10798
_ <- IO.race(remoteFunction(IntWrapper(23)), IO.sleep(1.second)).toStream
10899
result <- canceledPromise.get.toStream

0 commit comments

Comments
 (0)