Skip to content

Commit c24e8fc

Browse files
committed
It works ... but client program doesn't terminate
1 parent ca3b26c commit c24e8fc

File tree

3 files changed

+77
-16
lines changed

3 files changed

+77
-16
lines changed

examples/client/src/examples/client/ClientMain.scala

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,23 @@ import jsonrpclib.CallId
44
import jsonrpclib.fs2._
55
import cats.effect._
66
import fs2.io._
7-
import eu.monniot.process.Process
87
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
98
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
109
import jsonrpclib.Endpoint
1110
import cats.syntax.all._
1211
import fs2.Stream
1312
import jsonrpclib.StubTemplate
13+
import cats.effect.std.Dispatcher
14+
import scala.sys.process.ProcessIO
15+
import cats.effect.implicits._
16+
import scala.sys.process.{Process => SProcess}
17+
import java.io.OutputStream
18+
import java.io.InputStream
1419

1520
object ClientMain extends IOApp.Simple {
1621

1722
// Reserving a method for cancelation.
18-
val cancelTemplate = CancelTemplate.make[CallId]("$/cancel", identity, identity)
23+
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)
1924

2025
// Creating a datatype that'll serve as a request (and response) of an endpoint
2126
case class IntWrapper(value: Int)
@@ -27,21 +32,75 @@ object ClientMain extends IOApp.Simple {
2732
def log(str: String): IOStream[Unit] = Stream.eval(IO.consoleForIO.errorln(str))
2833

2934
def run: IO[Unit] = {
35+
import scala.concurrent.duration._
3036
// Using errorln as stdout is used by the RPC channel
3137
val run = for {
3238
_ <- log("Starting client")
3339
serverJar <- sys.env.get("SERVER_JAR").liftTo[IOStream](new Exception("SERVER_JAR env var does not exist"))
34-
serverProcess <- Stream.resource(Process.spawn[IO]("java", "-jar", serverJar))
35-
fs2Channel <- FS2Channel.lspCompliant[IO](serverProcess.stdout, serverProcess.stdin)
40+
// Starting the server
41+
(serverStdin, serverStdout, serverStderr) <- Stream.resource(process("java", "-jar", serverJar))
42+
pipeErrors = serverStderr.through(fs2.io.stderr)
43+
// Creating a channel that will be used to communicate to the server
44+
fs2Channel <- FS2Channel
45+
.lspCompliant[IO](serverStdout, serverStdin, cancelTemplate = cancelEndpoint.some)
46+
.concurrently(pipeErrors)
3647
// Opening the stream to be able to send and receive data
3748
_ <- fs2Channel.openStream
3849
// Creating a `IntWrapper => IO[IntWrapper]` stub that can call the server
3950
increment = fs2Channel.simpleStub[IntWrapper, IntWrapper]("increment")
40-
result <- Stream.eval(IO.pure(IntWrapper(0)).flatMap(increment))
51+
result <- Stream.eval(increment(IntWrapper(0)))
4152
_ <- log(s"Client received $result")
4253
_ <- log("Terminating client")
4354
} yield ()
44-
run.compile.drain
55+
run.compile.drain.timeout(2.second)
4556
}
4657

58+
/** Wraps the spawning of a subprocess into fs2 friendly semantics
59+
*/
60+
import scala.concurrent.duration._
61+
def process(command: String*) = for {
62+
dispatcher <- Dispatcher[IO]
63+
stdinPromise <- IO.deferred[fs2.Pipe[IO, Byte, Unit]].toResource
64+
stdoutPromise <- IO.deferred[fs2.Stream[IO, Byte]].toResource
65+
stderrPromise <- IO.deferred[fs2.Stream[IO, Byte]].toResource
66+
makeProcessBuilder = IO(sys.process.stringSeqToProcess(command))
67+
makeProcessIO = IO(
68+
new ProcessIO(
69+
in = { (outputStream: OutputStream) =>
70+
val pipe = writeOutputStreamFlushingChunks(IO(outputStream))
71+
val fulfil = stdinPromise.complete(pipe)
72+
dispatcher.unsafeRunSync(fulfil)
73+
},
74+
out = { (inputStream: InputStream) =>
75+
val stream = fs2.io.readInputStream(IO(inputStream), 512)
76+
val fulfil = stdoutPromise.complete(stream)
77+
dispatcher.unsafeRunSync(fulfil)
78+
},
79+
err = { (inputStream: InputStream) =>
80+
val stream = fs2.io.readInputStream(IO(inputStream), 512)
81+
val fulfil = stderrPromise.complete(stream)
82+
dispatcher.unsafeRunSync(fulfil)
83+
}
84+
)
85+
)
86+
makeProcess = (makeProcessBuilder, makeProcessIO).flatMapN { case (b, io) => IO.blocking(b.run(io)) }
87+
_ <- Resource.make(makeProcess)((runningProcess) => IO.blocking(runningProcess.destroy()))
88+
pipes <- (stdinPromise.get, stdoutPromise.get, stderrPromise.get).tupled.toResource
89+
} yield pipes
90+
91+
/** Adds a flush after each chunk
92+
*/
93+
def writeOutputStreamFlushingChunks[F[_]](
94+
fos: F[OutputStream],
95+
closeAfterUse: Boolean = true
96+
)(implicit F: Sync[F]): fs2.Pipe[F, Byte, Nothing] =
97+
s => {
98+
def useOs(os: OutputStream): Stream[F, Nothing] =
99+
s.chunks.foreach(c => F.interruptible(os.write(c.toArray)) >> F.blocking(os.flush()))
100+
101+
val os =
102+
if (closeAfterUse) Stream.bracket(fos)(os => F.blocking(os.close()))
103+
else Stream.eval(fos)
104+
os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush())))
105+
}
47106
}

examples/server/src/examples/server/ServerMain.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ import fs2.io._
77
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
88
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
99
import jsonrpclib.Endpoint
10+
import cats.syntax.all._
1011

1112
object ServerMain extends IOApp.Simple {
1213

1314
// Reserving a method for cancelation.
14-
val cancelTemplate = CancelTemplate.make[CallId]("$/cancel", identity, identity)
15+
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)
1516

1617
// Creating a datatype that'll serve as a request (and response) of an endpoint
1718
case class IntWrapper(value: Int)
@@ -29,7 +30,7 @@ object ServerMain extends IOApp.Simple {
2930
// Using errorln as stdout is used by the RPC channel
3031
IO.consoleForIO.errorln("Starting server") >>
3132
FS2Channel
32-
.lspCompliant[IO](fs2.io.stdin[IO](bufSize = 512), fs2.io.stdout[IO])
33+
.lspCompliant[IO](fs2.io.stdin[IO](bufSize = 512), fs2.io.stdout[IO], cancelTemplate = cancelEndpoint.some)
3334
.flatMap(_.withEndpointStream(increment)) // mounting an endpoint onto the channel
3435
.flatMap(_.openStreamForever) // starts the communication
3536
.compile

fs2/src/jsonrpclib/fs2/FS2Channel.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,28 +44,28 @@ object FS2Channel {
4444
byteStream: Stream[F, Byte],
4545
byteSink: Pipe[F, Byte, Unit],
4646
bufferSize: Int = 512,
47-
maybeCancelTemplate: Option[CancelTemplate] = None
47+
cancelTemplate: Option[CancelTemplate] = None
4848
): Stream[F, FS2Channel[F]] = internals.LSP.writeSink(byteSink, bufferSize).flatMap { sink =>
49-
apply[F](internals.LSP.readStream(byteStream), sink, maybeCancelTemplate)
49+
apply[F](internals.LSP.readStream(byteStream), sink, cancelTemplate)
5050
}
5151

5252
def apply[F[_]: Concurrent](
5353
payloadStream: Stream[F, Payload],
5454
payloadSink: Payload => F[Unit],
55-
maybeCancelTemplate: Option[CancelTemplate] = None
55+
cancelTemplate: Option[CancelTemplate] = None
5656
): Stream[F, FS2Channel[F]] = {
5757
for {
5858
supervisor <- Stream.resource(Supervisor[F])
5959
ref <- Ref[F].of(State[F](Map.empty, Map.empty, Map.empty, 0)).toStream
6060
isOpen <- SignallingRef[F].of(false).toStream
6161
awaitingSink = isOpen.waitUntil(identity) >> payloadSink(_: Payload)
62-
impl = new Impl(awaitingSink, ref, isOpen, supervisor, maybeCancelTemplate)
62+
impl = new Impl(awaitingSink, ref, isOpen, supervisor, cancelTemplate)
6363

6464
// Creating a bespoke endpoint to receive cancelation requests
65-
maybeCancelEndpoint: Option[Endpoint[F]] = maybeCancelTemplate.map { cancelTemplate =>
66-
implicit val codec: Codec[cancelTemplate.C] = cancelTemplate.codec
67-
Endpoint[F](cancelTemplate.method).notification[cancelTemplate.C] { request =>
68-
val callId = cancelTemplate.toCallId(request)
65+
maybeCancelEndpoint: Option[Endpoint[F]] = cancelTemplate.map { ct =>
66+
implicit val codec: Codec[ct.C] = ct.codec
67+
Endpoint[F](ct.method).notification[ct.C] { request =>
68+
val callId = ct.toCallId(request)
6969
impl.cancel(callId)
7070
}
7171
}
@@ -148,6 +148,7 @@ object FS2Channel {
148148
protected def reportError(params: Option[Payload], error: ProtocolError, method: String): F[Unit] = ???
149149
protected def getEndpoint(method: String): F[Option[Endpoint[F]]] = state.get.map(_.endpoints.get(method))
150150
protected def sendMessage(message: Message): F[Unit] = sink(Codec.encode(message))
151+
151152
protected def nextCallId(): F[CallId] = state.modify(_.nextCallId)
152153
protected def createPromise[A](callId: CallId): F[(Try[A] => F[Unit], () => F[A])] = Deferred[F, Try[A]].map {
153154
promise =>

0 commit comments

Comments
 (0)