Skip to content

Commit 64f313d

Browse files
authored
Merge pull request #4 from neandertech/docs-and-examples
Docs and examples
2 parents 58f5d5b + 8e9a54d commit 64f313d

File tree

10 files changed

+316
-91
lines changed

10 files changed

+316
-91
lines changed

.scalafmt.conf

+14
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
11
version = "3.5.3"
22
runner.dialect = scala213
33
maxColumn = 120
4+
fileOverride {
5+
"glob:**/fs2/src/**" {
6+
runner.dialect = scala213source3
7+
}
8+
"glob:**/fs2/test/src/**" {
9+
runner.dialect = scala213source3
10+
}
11+
"glob:**/core/test/src-jvm-native/**" {
12+
runner.dialect = scala213source3
13+
}
14+
"glob:**/core/src/**" {
15+
runner.dialect = scala213source3
16+
}
17+
}

README.md

+29-11
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,41 @@
11
[![CI](https://github.com/neandertech/jsonrpclib/actions/workflows/ci.yml/badge.svg)](https://github.com/neandertech/jsonrpclib/actions/workflows/ci.yml)
22

3+
[![jsonrpclib-fs2 Scala version support](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2/latest-by-scala-version.svg?platform=jvm)](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2)
4+
5+
[![jsonrpclib-fs2 Scala version support](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2/latest-by-scala-version.svg?platform=sjs1)](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2)
6+
7+
38
# jsonrpclib
49

5-
This is a cross-platform, cross-scala-version [jsonrpc](https://www.jsonrpc.org/) library that provides construct for bidirectional communication using
6-
the jsonrpc protocol.
10+
This is a cross-platform, cross-scala-version library that provides construct for bidirectional communication using the [jsonrpc](https://www.jsonrpc.org/) protocol. It is built on top of [fs2](https://fs2.io/#/) and [jsoniter-scala](https://github.com/plokhotnyuk/jsoniter-scala)
11+
12+
This library does not enforce any transport, and can work on top of stdin/stdout or other channels.
13+
14+
## Installation
15+
16+
The dependencies below are following [cross-platform semantics](http://youforgotapercentagesignoracolon.com/).
17+
Adapt according to your needs
18+
19+
### SBT
720

8-
This library does not enforce any transport, and works as long as you can provide input/output byte streams.
21+
```scala
22+
libraryDependencies += "tech.neander" %%% "jsonrpclib-fs2" % version
23+
```
924

25+
### Mill
1026

11-
## Dev Notes
27+
```scala
28+
override def ivyDeps = super.ivyDeps() ++ Agg(ivy"tech.neander::jsonrpclib-fs2::$version")
29+
```
1230

13-
### Scala-native
31+
### Scala-cli
1432

15-
See
16-
* https://github.com/scala-native/scala-native/blob/63d07093f6d0a6e9de28cd8f9fb6bc1d6596c6ec/test-interface/src/main/scala/scala/scalanative/testinterface/NativeRPC.scala
33+
```scala
34+
//> using lib "tech.neander::jsonrpclib-fs2:<VERSION>"
35+
```
1736

37+
## Usage
1838

19-
### Scala-js
39+
**/!\ Please be aware that this library is in its early days and offers strictly no guarantee with regards to backward compatibility**
2040

21-
See
22-
* https://github.com/scala-js/scala-js-js-envs/blob/main/nodejs-env/src/main/scala/org/scalajs/jsenv/nodejs/ComSupport.scala#L245
23-
* https://github.com/scala-js/scala-js/blob/0708917912938714d52be1426364f78a3d1fd269/test-bridge/src/main/scala/org/scalajs/testing/bridge/JSRPC.scala
41+
See the examples folder

build.sc

+24-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import mill.define.Target
2+
import mill.util.Jvm
13
import $ivy.`com.lihaoyi::mill-contrib-bloop:$MILL_VERSION`
24
import $ivy.`io.github.davidgregory084::mill-tpolecat::0.3.1`
35
import $ivy.`de.tototec::de.tobiasroeser.mill.vcs.version::0.1.4`
@@ -20,6 +22,7 @@ object versions {
2022
val scalaNativeVersion = "0.4.4"
2123
val munitVersion = "0.7.29"
2224
val munitNativeVersion = "1.0.0-M6"
25+
val fs2 = "3.2.11"
2326

2427
val scala213 = "2.13"
2528
val scala212 = "2.12"
@@ -59,7 +62,7 @@ object fs2 extends RPCCrossPlatformModule { cross =>
5962

6063
override def crossPlatformModuleDeps = Seq(core)
6164
def crossPlatformIvyDeps: T[Agg[Dep]] = Agg(
62-
ivy"co.fs2::fs2-core::3.2.8"
65+
ivy"co.fs2::fs2-core::${versions.fs2}"
6366
)
6467

6568
object jvm extends mill.Cross[JvmModule](scala213, scala3)
@@ -74,6 +77,26 @@ object fs2 extends RPCCrossPlatformModule { cross =>
7477

7578
}
7679

80+
object examples extends mill.define.Module {
81+
82+
object server extends ScalaModule {
83+
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}")
84+
def moduleDeps = Seq(fs2.jvm(versions.scala213))
85+
def scalaVersion = versions.scala213Version
86+
}
87+
88+
object client extends ScalaModule {
89+
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}")
90+
def moduleDeps = Seq(fs2.jvm(versions.scala213))
91+
def scalaVersion = versions.scala213Version
92+
def forkEnv: Target[Map[String, String]] = T {
93+
val assembledServer = server.assembly()
94+
super.forkEnv() ++ Map("SERVER_JAR" -> assembledServer.path.toString())
95+
}
96+
}
97+
98+
}
99+
77100
// #############################################################################
78101
// COMMON SETUP
79102
// #############################################################################

devnotes.md

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Dev Notes
2+
3+
In case somebody wants to implement future-based channels, here are some source of inspiration :
4+
5+
### Scala-native
6+
7+
See
8+
* https://github.com/scala-native/scala-native/blob/63d07093f6d0a6e9de28cd8f9fb6bc1d6596c6ec/test-interface/src/main/scala/scala/scalanative/testinterface/NativeRPC.scala
9+
10+
11+
### Scala-js
12+
13+
See
14+
* https://github.com/scala-js/scala-js-js-envs/blob/main/nodejs-env/src/main/scala/org/scalajs/jsenv/nodejs/ComSupport.scala#L245
15+
* https://github.com/scala-js/scala-js/blob/0708917912938714d52be1426364f78a3d1fd269/test-bridge/src/main/scala/org/scalajs/testing/bridge/JSRPC.scala
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package examples.client
2+
3+
import fs2.Stream
4+
import cats.effect._
5+
import cats.syntax.all._
6+
import scala.jdk.CollectionConverters._
7+
import java.io.OutputStream
8+
9+
trait ChildProcess[F[_]] {
10+
def stdin: fs2.Pipe[F, Byte, Unit]
11+
def stdout: Stream[F, Byte]
12+
def stderr: Stream[F, Byte]
13+
}
14+
15+
object ChildProcess {
16+
17+
def spawn[F[_]: Async](command: String*): Stream[F, ChildProcess[F]] =
18+
Stream.bracket(start[F](command))(_._2).map(_._1)
19+
20+
val readBufferSize = 512
21+
private def start[F[_]: Async](command: Seq[String]) = Async[F].interruptible {
22+
val p =
23+
new java.lang.ProcessBuilder(command.asJava)
24+
.start() // .directory(new java.io.File(wd)).start()
25+
val done = Async[F].fromCompletableFuture(Sync[F].delay(p.onExit()))
26+
27+
val terminate: F[Unit] = Sync[F].interruptible(p.destroy())
28+
29+
import cats._
30+
val onGlobal = new (F ~> F) {
31+
def apply[A](fa: F[A]): F[A] = Async[F].evalOn(fa, scala.concurrent.ExecutionContext.global)
32+
}
33+
34+
val cp = new ChildProcess[F] {
35+
def stdin: fs2.Pipe[F, Byte, Unit] =
36+
writeOutputStreamFlushingChunks[F](Sync[F].interruptible(p.getOutputStream()))
37+
38+
def stdout: fs2.Stream[F, Byte] = fs2.io
39+
.readInputStream[F](Sync[F].interruptible(p.getInputStream()), chunkSize = readBufferSize)
40+
.translate(onGlobal)
41+
42+
def stderr: fs2.Stream[F, Byte] = fs2.io
43+
.readInputStream[F](Sync[F].blocking(p.getErrorStream()), chunkSize = readBufferSize)
44+
.translate(onGlobal)
45+
// Avoids broken pipe - we cut off when the program ends.
46+
// Users can decide what to do with the error logs using the exitCode value
47+
.interruptWhen(done.void.attempt)
48+
}
49+
(cp, terminate)
50+
}
51+
52+
/** Adds a flush after each chunk
53+
*/
54+
def writeOutputStreamFlushingChunks[F[_]](
55+
fos: F[OutputStream],
56+
closeAfterUse: Boolean = true
57+
)(implicit F: Sync[F]): fs2.Pipe[F, Byte, Nothing] =
58+
s => {
59+
def useOs(os: OutputStream): Stream[F, Nothing] =
60+
s.chunks.foreach(c => F.interruptible(os.write(c.toArray)) >> F.blocking(os.flush()))
61+
62+
val os =
63+
if (closeAfterUse) Stream.bracket(fos)(os => F.blocking(os.close()))
64+
else Stream.eval(fos)
65+
os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush())))
66+
}
67+
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package examples.server
2+
3+
import jsonrpclib.CallId
4+
import jsonrpclib.fs2._
5+
import cats.effect._
6+
import fs2.io._
7+
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
8+
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
9+
import jsonrpclib.Endpoint
10+
import cats.syntax.all._
11+
import fs2.Stream
12+
import jsonrpclib.StubTemplate
13+
import cats.effect.std.Dispatcher
14+
import cats.effect.implicits._
15+
import java.io.OutputStream
16+
import java.io.InputStream
17+
import examples.client.ChildProcess
18+
19+
object ClientMain extends IOApp.Simple {
20+
21+
// Reserving a method for cancelation.
22+
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)
23+
24+
// Creating a datatype that'll serve as a request (and response) of an endpoint
25+
case class IntWrapper(value: Int)
26+
object IntWrapper {
27+
implicit val jcodec: JsonValueCodec[IntWrapper] = JsonCodecMaker.make
28+
}
29+
30+
type IOStream[A] = fs2.Stream[IO, A]
31+
def log(str: String): IOStream[Unit] = Stream.eval(IO.consoleForIO.errorln(str))
32+
33+
def run: IO[Unit] = {
34+
import scala.concurrent.duration._
35+
// Using errorln as stdout is used by the RPC channel
36+
val run = for {
37+
_ <- log("Starting client")
38+
serverJar <- sys.env.get("SERVER_JAR").liftTo[IOStream](new Exception("SERVER_JAR env var does not exist"))
39+
// Starting the server
40+
rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar)
41+
// Creating a channel that will be used to communicate to the server
42+
fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some)
43+
_ <- Stream(())
44+
.concurrently(fs2Channel.output.through(lsp.encodePayloads).through(rp.stdin))
45+
.concurrently(rp.stdout.through(lsp.decodePayloads).through(fs2Channel.input))
46+
.concurrently(rp.stderr.through(fs2.io.stderr[IO]))
47+
// Creating a `IntWrapper => IO[IntWrapper]` stub that can call the server
48+
increment = fs2Channel.simpleStub[IntWrapper, IntWrapper]("increment")
49+
result1 <- Stream.eval(increment(IntWrapper(0)))
50+
_ <- log(s"Client received $result1")
51+
result2 <- Stream.eval(increment(result1))
52+
_ <- log(s"Client received $result2")
53+
_ <- log("Terminating client")
54+
} yield ()
55+
run.compile.drain
56+
}
57+
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package examples.server
2+
3+
import jsonrpclib.CallId
4+
import jsonrpclib.fs2._
5+
import cats.effect._
6+
import fs2.io._
7+
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
8+
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
9+
import jsonrpclib.Endpoint
10+
import cats.syntax.all._
11+
12+
object ServerMain extends IOApp.Simple {
13+
14+
// Reserving a method for cancelation.
15+
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)
16+
17+
// Creating a datatype that'll serve as a request (and response) of an endpoint
18+
case class IntWrapper(value: Int)
19+
object IntWrapper {
20+
implicit val jcodec: JsonValueCodec[IntWrapper] = JsonCodecMaker.make
21+
}
22+
23+
// Implementing an incrementation endpoint
24+
val increment = Endpoint[IO]("increment").simple { in: IntWrapper =>
25+
IO.consoleForIO.errorln(s"Server received $in") >>
26+
IO.pure(in.copy(value = in.value + 1))
27+
}
28+
29+
def run: IO[Unit] = {
30+
// Using errorln as stdout is used by the RPC channel
31+
IO.consoleForIO.errorln("Starting server") >>
32+
FS2Channel[IO](cancelTemplate = Some(cancelEndpoint))
33+
.flatMap(_.withEndpointStream(increment)) // mounting an endpoint onto the channel
34+
.flatMap(channel =>
35+
fs2.Stream
36+
.eval(IO.never) // running the server forever
37+
.concurrently(stdin[IO](512).through(lsp.decodePayloads).through(channel.input))
38+
.concurrently(channel.output.through(lsp.encodePayloads).through(stdout[IO]))
39+
)
40+
.compile
41+
.drain
42+
.guarantee(IO.consoleForIO.errorln("Terminating server"))
43+
}
44+
45+
}

0 commit comments

Comments
 (0)