Skip to content

Commit 58f5d5b

Browse files
authored
Merge pull request #3 from neandertech/cancelation
Add logic to propagate CE-cancelation via channels
2 parents 9f9e974 + 67a0e40 commit 58f5d5b

File tree

7 files changed

+133
-311
lines changed

7 files changed

+133
-311
lines changed

core/src/jsonrpclib/internals/CallId.scala renamed to core/src/jsonrpclib/CallId.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package jsonrpclib.internals
1+
package jsonrpclib
22

33
import com.github.plokhotnyuk.jsoniter_scala.core._
44
import scala.util.Try

core/src/jsonrpclib/RPCCoreBis.scala

Lines changed: 0 additions & 294 deletions
This file was deleted.

core/src/jsonrpclib/internals/FutureBaseChannel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import scala.util.Try
1111
abstract class FutureBasedChannel(endpoints: List[Endpoint[Future]])(implicit ec: ExecutionContext)
1212
extends MessageDispatcher[Future] {
1313

14-
override def createPromise[A](): Future[(Try[A] => Future[Unit], () => Future[A])] = Future.successful {
14+
override def createPromise[A](callId: CallId): Future[(Try[A] => Future[Unit], () => Future[A])] = Future.successful {
1515
val promise = Promise[A]()
1616
val fulfill: Try[A] => Future[Unit] = (a: Try[A]) => Future.successful(promise.complete(a))
1717
val future: () => Future[A] = () => promise.future

core/src/jsonrpclib/internals/MessageDispatcher.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
1212

1313
import F._
1414

15-
protected def background[A](fa: F[A]): F[Unit]
15+
protected def background[A](maybeCallId: Option[CallId], fa: F[A]): F[Unit]
1616
protected def reportError(params: Option[Payload], error: ProtocolError, method: String): F[Unit]
1717
protected def getEndpoint(method: String): F[Option[Endpoint[F]]]
1818
protected def sendMessage(message: Message): F[Unit]
1919
protected def nextCallId(): F[CallId]
20-
protected def createPromise[A](): F[(Try[A] => F[Unit], () => F[A])]
20+
protected def createPromise[A](callId: CallId): F[(Try[A] => F[Unit], () => F[A])]
2121
protected def storePendingCall(callId: CallId, handle: OutputMessage => F[Unit]): F[Unit]
2222
protected def removePendingCall(callId: CallId): F[Option[OutputMessage => F[Unit]]]
2323

@@ -34,7 +34,7 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
3434
val encoded = inCodec.encode(input)
3535
doFlatMap(nextCallId()) { callId =>
3636
val message = InputMessage.RequestMessage(method, callId, Some(encoded))
37-
doFlatMap(createPromise[Either[Err, Out]]()) { case (fulfill, future) =>
37+
doFlatMap(createPromise[Either[Err, Out]](callId)) { case (fulfill, future) =>
3838
val pc = createPendingCall(errCodec, outCodec, fulfill)
3939
doFlatMap(storePendingCall(callId, pc))(_ => doFlatMap(sendMessage(message))(_ => future()))
4040
}
@@ -45,7 +45,7 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
4545
Codec.decode[Message](Some(payload)).map {
4646
case im: InputMessage =>
4747
doFlatMap(getEndpoint(im.method)) {
48-
case Some(ep) => background(executeInputMessage(im, ep))
48+
case Some(ep) => background(im.maybeCallId, executeInputMessage(im, ep))
4949
case None =>
5050
im.maybeCallId match {
5151
case None =>
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package jsonrpclib.fs2
2+
3+
import jsonrpclib.Codec
4+
import jsonrpclib.CallId
5+
6+
/** A cancelation template that represents the RPC method by which cancelation
7+
*
8+
* @param template
9+
* : the notification template for the cancelation method
10+
* @param fromCallId
11+
* : a function to create a cancelation request out of a call id
12+
* @param toCallId
13+
* : a function to extract a call id from a cancelation request
14+
*/
15+
trait CancelTemplate {
16+
type C
17+
def method: String
18+
def codec: Codec[C]
19+
def fromCallId(callId: CallId): C
20+
def toCallId(cancelRequest: C): CallId
21+
22+
}
23+
24+
object CancelTemplate {
25+
26+
def make[CancelRequest: Codec](
27+
cancelMethod: String,
28+
toId: CancelRequest => CallId,
29+
fromId: CallId => CancelRequest
30+
): CancelTemplate =
31+
new CancelTemplate {
32+
type C = CancelRequest
33+
34+
def method: String = cancelMethod
35+
36+
def codec: Codec[CancelRequest] = implicitly[Codec[CancelRequest]]
37+
38+
def fromCallId(callId: CallId): CancelRequest = fromId(callId)
39+
40+
def toCallId(cancelRequest: CancelRequest): CallId = toId(cancelRequest)
41+
}
42+
43+
}

0 commit comments

Comments
 (0)