Skip to content

Commit 67a0e40

Browse files
committed
Add logic to propagate CE-cancelation via channels
Adds a `CancelTemplate` for users to specify a method that should be used for cancelation purposes. This LSP method is implemented automatically. When invoked, the server looks up the list of running calls it contains, and cancels the corresponding one. On the client side, stubs are automatically wired so that canceling a request in flight leads to a cancelation request propagating to the server with the corresponding CallId.
1 parent 9f9e974 commit 67a0e40

File tree

7 files changed

+133
-311
lines changed

7 files changed

+133
-311
lines changed

core/src/jsonrpclib/internals/CallId.scala renamed to core/src/jsonrpclib/CallId.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package jsonrpclib.internals
1+
package jsonrpclib
22

33
import com.github.plokhotnyuk.jsoniter_scala.core._
44
import scala.util.Try

core/src/jsonrpclib/RPCCoreBis.scala

Lines changed: 0 additions & 294 deletions
This file was deleted.

core/src/jsonrpclib/internals/FutureBaseChannel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import scala.util.Try
1111
abstract class FutureBasedChannel(endpoints: List[Endpoint[Future]])(implicit ec: ExecutionContext)
1212
extends MessageDispatcher[Future] {
1313

14-
override def createPromise[A](): Future[(Try[A] => Future[Unit], () => Future[A])] = Future.successful {
14+
override def createPromise[A](callId: CallId): Future[(Try[A] => Future[Unit], () => Future[A])] = Future.successful {
1515
val promise = Promise[A]()
1616
val fulfill: Try[A] => Future[Unit] = (a: Try[A]) => Future.successful(promise.complete(a))
1717
val future: () => Future[A] = () => promise.future

core/src/jsonrpclib/internals/MessageDispatcher.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
1212

1313
import F._
1414

15-
protected def background[A](fa: F[A]): F[Unit]
15+
protected def background[A](maybeCallId: Option[CallId], fa: F[A]): F[Unit]
1616
protected def reportError(params: Option[Payload], error: ProtocolError, method: String): F[Unit]
1717
protected def getEndpoint(method: String): F[Option[Endpoint[F]]]
1818
protected def sendMessage(message: Message): F[Unit]
1919
protected def nextCallId(): F[CallId]
20-
protected def createPromise[A](): F[(Try[A] => F[Unit], () => F[A])]
20+
protected def createPromise[A](callId: CallId): F[(Try[A] => F[Unit], () => F[A])]
2121
protected def storePendingCall(callId: CallId, handle: OutputMessage => F[Unit]): F[Unit]
2222
protected def removePendingCall(callId: CallId): F[Option[OutputMessage => F[Unit]]]
2323

@@ -34,7 +34,7 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
3434
val encoded = inCodec.encode(input)
3535
doFlatMap(nextCallId()) { callId =>
3636
val message = InputMessage.RequestMessage(method, callId, Some(encoded))
37-
doFlatMap(createPromise[Either[Err, Out]]()) { case (fulfill, future) =>
37+
doFlatMap(createPromise[Either[Err, Out]](callId)) { case (fulfill, future) =>
3838
val pc = createPendingCall(errCodec, outCodec, fulfill)
3939
doFlatMap(storePendingCall(callId, pc))(_ => doFlatMap(sendMessage(message))(_ => future()))
4040
}
@@ -45,7 +45,7 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
4545
Codec.decode[Message](Some(payload)).map {
4646
case im: InputMessage =>
4747
doFlatMap(getEndpoint(im.method)) {
48-
case Some(ep) => background(executeInputMessage(im, ep))
48+
case Some(ep) => background(im.maybeCallId, executeInputMessage(im, ep))
4949
case None =>
5050
im.maybeCallId match {
5151
case None =>
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package jsonrpclib.fs2
2+
3+
import jsonrpclib.Codec
4+
import jsonrpclib.CallId
5+
6+
/** A cancelation template that represents the RPC method by which cancelation
7+
*
8+
* @param template
9+
* : the notification template for the cancelation method
10+
* @param fromCallId
11+
* : a function to create a cancelation request out of a call id
12+
* @param toCallId
13+
* : a function to extract a call id from a cancelation request
14+
*/
15+
trait CancelTemplate {
16+
type C
17+
def method: String
18+
def codec: Codec[C]
19+
def fromCallId(callId: CallId): C
20+
def toCallId(cancelRequest: C): CallId
21+
22+
}
23+
24+
object CancelTemplate {
25+
26+
def make[CancelRequest: Codec](
27+
cancelMethod: String,
28+
toId: CancelRequest => CallId,
29+
fromId: CallId => CancelRequest
30+
): CancelTemplate =
31+
new CancelTemplate {
32+
type C = CancelRequest
33+
34+
def method: String = cancelMethod
35+
36+
def codec: Codec[CancelRequest] = implicitly[Codec[CancelRequest]]
37+
38+
def fromCallId(callId: CallId): CancelRequest = fromId(callId)
39+
40+
def toCallId(cancelRequest: CancelRequest): CallId = toId(cancelRequest)
41+
}
42+
43+
}

0 commit comments

Comments
 (0)