Skip to content

Commit fadcef5

Browse files
committed
Pipe event callback registration concept
1 parent 59a3c45 commit fadcef5

File tree

7 files changed

+168
-8
lines changed

7 files changed

+168
-8
lines changed

NAMESPACE

+1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ export(nng_version)
8181
export(opt)
8282
export(parse_url)
8383
export(pipe_notify)
84+
export(pipe_register)
8485
export(random)
8586
export(read_monitor)
8687
export(reap)

R/sync.R

+40
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ cv_signal <- function(cv) invisible(.Call(rnng_cv_signal, cv))
155155
#' socket. The underlying transport may be closed at this point, and it is not
156156
#' possible to communicate using this pipe.
157157
#'
158+
#' Note: this function cannot be used in conjunction with [pipe_register()].
159+
#'
158160
#' @param socket a Socket.
159161
#' @param cv a 'conditionVariable' to signal, or NULL to cancel a previously set
160162
#' signal.
@@ -197,6 +199,44 @@ cv_signal <- function(cv) invisible(.Call(rnng_cv_signal, cv))
197199
pipe_notify <- function(socket, cv, add = FALSE, remove = FALSE, flag = FALSE)
198200
invisible(.Call(rnng_pipe_notify, socket, cv, add, remove, flag))
199201

202+
203+
#' Pipe Register
204+
#'
205+
#' Register a callback to run whenever pipes (individual connections) are
206+
#' added or removed at a socket.
207+
#'
208+
#' For add: this event occurs after the pipe is fully added to the socket. Prior
209+
#' to this time, it is not possible to communicate over the pipe with the
210+
#' socket.
211+
#'
212+
#' For remove: this event occurs after the pipe has been removed from the
213+
#' socket. The underlying transport may be closed at this point, and it is not
214+
#' possible to communicate using this pipe.
215+
#'
216+
#' Note: this function cannot be used in conjunction with [pipe_notify()].
217+
#'
218+
#' @param socket a Socket.
219+
#' @param add \[default NULL\] an R function callback to be run whenever a pipe
220+
#' is added. If NULL, any previously-registered callback is cancelled.
221+
#' @param remove \[default NULL\] an R function callback to be run whenever a
222+
#' pipe is removed. If NULL, any previously-registered callback is cancelled.
223+
#'
224+
#' @return Invisibly, zero on success (will otherwise error).
225+
#'
226+
#' @examplesIf requireNamespace("later", quietly = TRUE)
227+
#' s <- socket(listen = "inproc://nanopipecb")
228+
#' pipe_register(s, function() print("hi"), function() print("bye"))
229+
#'
230+
#' s1 <- socket(dial = "inproc://nanopipecb")
231+
#' close(s1)
232+
#'
233+
#' close(s)
234+
#'
235+
#' @export
236+
#'
237+
pipe_register <- function(socket, add = NULL, remove = NULL)
238+
invisible(.Call(rnng_pipe_register, socket, add, remove))
239+
200240
#' Signal Forwarder
201241
#'
202242
#' Forwards signals from one 'conditionVariable' to another.

man/pipe_notify.Rd

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/pipe_register.Rd

+46
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/init.c

+1
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ static const R_CallMethodDef callMethods[] = {
149149
{"rnng_ncurl_session_close", (DL_FUNC) &rnng_ncurl_session_close, 1},
150150
{"rnng_ncurl_transact", (DL_FUNC) &rnng_ncurl_transact, 1},
151151
{"rnng_pipe_notify", (DL_FUNC) &rnng_pipe_notify, 5},
152+
{"rnng_pipe_register", (DL_FUNC) &rnng_pipe_register, 3},
152153
{"rnng_protocol_open", (DL_FUNC) &rnng_protocol_open, 6},
153154
{"rnng_random", (DL_FUNC) &rnng_random, 2},
154155
{"rnng_reap", (DL_FUNC) &rnng_reap, 1},

src/nanonext.h

+2
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ extern int R_interrupts_pending;
119119

120120
#define ERROR_OUT(xc) Rf_error("%d | %s", xc, nng_strerror(xc))
121121
#define ERROR_RET(xc) { Rf_warning("%d | %s", xc, nng_strerror(xc)); return mk_error(xc); }
122+
#define NANONEXT_ENSURE_LATER if (eln2 == NULL) nano_load_later()
122123
#define NANONEXT_INIT_BUFSIZE 4096
123124
#define NANONEXT_SERIAL_VER 3
124125
#define NANONEXT_SERIAL_THR 134217728
@@ -345,6 +346,7 @@ SEXP rnng_ncurl_session(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
345346
SEXP rnng_ncurl_session_close(SEXP);
346347
SEXP rnng_ncurl_transact(SEXP);
347348
SEXP rnng_pipe_notify(SEXP, SEXP, SEXP, SEXP, SEXP);
349+
SEXP rnng_pipe_register(SEXP, SEXP, SEXP);
348350
SEXP rnng_protocol_open(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
349351
SEXP rnng_random(SEXP, SEXP);
350352
SEXP rnng_reap(SEXP);

src/sync.c

+76-8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,29 @@
33
#define NANONEXT_SIGNALS
44
#include "nanonext.h"
55

6+
// internals -------------------------------------------------------------------
7+
8+
static void nano_load_later(void) {
9+
10+
SEXP str, call;
11+
PROTECT(str = Rf_mkString("later"));
12+
PROTECT(call = Rf_lang2(Rf_install("loadNamespace"), str));
13+
Rf_eval(call, R_BaseEnv);
14+
UNPROTECT(2);
15+
eln2 = (void (*)(void (*)(void *), void *, double, int)) R_GetCCallable("later", "execLaterNative2");
16+
17+
}
18+
19+
static void nano_eval_later (void *arg) {
20+
21+
SEXP call, node = (SEXP) arg;
22+
node = R_WeakRefValue(node);
23+
PROTECT(call = Rf_lcons(node, R_NilValue));
24+
(void) Rf_eval(call, R_GlobalEnv);
25+
UNPROTECT(1);
26+
27+
}
28+
629
// aio completion callbacks ----------------------------------------------------
730

831
static void sendaio_complete(void *arg) {
@@ -101,6 +124,12 @@ void pipe_cb_signal(nng_pipe p, nng_pipe_ev ev, void *arg) {
101124

102125
}
103126

127+
void pipe_cb_eval(nng_pipe p, nng_pipe_ev ev, void *arg) {
128+
129+
later2(nano_eval_later, arg);
130+
131+
}
132+
104133
static void pipe_cb_monitor(nng_pipe p, nng_pipe_ev ev, void *arg) {
105134

106135
nano_monitor *monitor = (nano_monitor *) arg;
@@ -495,14 +524,7 @@ SEXP rnng_set_promise_context(SEXP x, SEXP ctx) {
495524

496525
nano_aio *raio = (nano_aio *) NANO_PTR(aio);
497526

498-
if (eln2 == NULL) {
499-
SEXP str, call;
500-
PROTECT(str = Rf_mkString("later"));
501-
PROTECT(call = Rf_lang2(Rf_install("loadNamespace"), str));
502-
Rf_eval(call, R_BaseEnv);
503-
UNPROTECT(2);
504-
eln2 = (void (*)(void (*)(void *), void *, double, int)) R_GetCCallable("later", "execLaterNative2");
505-
}
527+
NANONEXT_ENSURE_LATER;
506528

507529
switch (raio->type) {
508530
case REQAIO:
@@ -569,6 +591,52 @@ SEXP rnng_pipe_notify(SEXP socket, SEXP cv, SEXP add, SEXP remove, SEXP flag) {
569591

570592
}
571593

594+
SEXP rnng_pipe_register(SEXP socket, SEXP add, SEXP remove) {
595+
596+
if (NANO_PTR_CHECK(socket, nano_SocketSymbol))
597+
Rf_error("'socket' is not a valid Socket");
598+
599+
int xc;
600+
nng_socket *sock = (nng_socket *) NANO_PTR(socket);
601+
602+
NANONEXT_ENSURE_LATER;
603+
604+
if (add != R_NilValue) {
605+
606+
if (TYPEOF(add) != CLOSXP)
607+
Rf_error("'add' is not a valid R closure function");
608+
609+
SEXP ref = R_MakeWeakRef(socket, add, R_NilValue, FALSE);
610+
if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, pipe_cb_eval, ref)))
611+
ERROR_OUT(xc);
612+
613+
} else {
614+
615+
if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, NULL, NULL)))
616+
ERROR_OUT(xc);
617+
618+
}
619+
620+
if (remove != R_NilValue) {
621+
622+
if (TYPEOF(remove) != CLOSXP)
623+
Rf_error("'remove' is not a valid R closure function");
624+
625+
SEXP ref = R_MakeWeakRef(socket, remove, R_NilValue, FALSE);
626+
if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, pipe_cb_eval, ref)))
627+
ERROR_OUT(xc);
628+
629+
} else {
630+
631+
if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, NULL, NULL)))
632+
ERROR_OUT(xc);
633+
634+
}
635+
636+
return nano_success;
637+
638+
}
639+
572640
// monitors --------------------------------------------------------------------
573641

574642
SEXP rnng_monitor_create(SEXP socket, SEXP cv) {

0 commit comments

Comments
 (0)