From 9e7e8d735b91fd6896c243cb725e02f82ba56fea Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Mon, 2 Sep 2024 22:33:04 +0100 Subject: [PATCH 1/5] rest server concept --- NAMESPACE | 1 + NEWS.md | 1 + R/ncurl.R | 43 ++++++++ man/server.Rd | 49 +++++++++ src/init.c | 1 + src/nanonext.h | 2 + src/server.c | 280 +++++++++++++++++++++++++++++++++++++++++++++++++ src/thread.c | 3 +- 8 files changed, 379 insertions(+), 1 deletion(-) create mode 100644 man/server.Rd create mode 100644 src/server.c diff --git a/NAMESPACE b/NAMESPACE index eede0b96e..2883ad092 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -91,6 +91,7 @@ export(request) export(send) export(send_aio) export(serial_config) +export(server) export(set_promise_context) export(socket) export(stat) diff --git a/NEWS.md b/NEWS.md index f30e823aa..51cefe82c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -4,6 +4,7 @@ * Adds support for threaded dispatcher in `mirai`. * Adds 'recvAio' method for `promises::as.promise()` and `promises::is.promising()` to enable 'recvAio' promises. +* Adds `server()`, an HTTP REST server for evaluation of R expressions (experimental). #### Updates diff --git a/R/ncurl.R b/R/ncurl.R index e58a8d035..c9c94ffca 100644 --- a/R/ncurl.R +++ b/R/ncurl.R @@ -271,3 +271,46 @@ as.promise.ncurlAio <- function(x) { #' @exportS3Method promises::is.promising #' is.promising.ncurlAio <- function(x) TRUE + +#' Start REST Server +#' +#' Creates an instance of an HTTP REST server which evaluates R expressions sent +#' to it [EXPERIMENTAL]. As arbitrary R expressions are evaluated, this +#' should only be deployed on the local machine (using the 127.0.0.1 +#' loopback address) in a trusted environment. +#' +#' @param url full http address including hostname, port and path at which to +#' host the server. +#' +#' @details Query the API with an HTTP client using the \sQuote{POST} method, +#' with the request data being the R expression as a text string. The +#' received response body will consist of the serialized evaluation result. +#' Unserialize to return an R object. +#' +#' Use only in a new session. Use \sQuote{ctrl + \\} to forcibly quit +#' when finished as the function blocks with no means of interruption. +#' +#' Currently still experimental as the server lacks error handling. Sending +#' an invalid R expression will cause the server to exit. +#' +#' @return This function never returns. +#' +#' @examples +#' if (interactive()) { +#' +#' # run in a new session: +#' # nanonext::server() +#' +#' res <- ncurl("http://127.0.0.1:5555/api/rest", +#' convert = FALSE, +#' method = "POST", +#' data = "Sys.time()") +#' +#' if (!is_error_value(res$data)) unserialize(res$data) +#' +#' } +#' +#' @export +#' +server <- function(url = "http://127.0.0.1:5555/api/rest") + .Call(rnng_rest_server, url) diff --git a/man/server.Rd b/man/server.Rd new file mode 100644 index 000000000..11b298896 --- /dev/null +++ b/man/server.Rd @@ -0,0 +1,49 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/ncurl.R +\name{server} +\alias{server} +\title{Start REST Server} +\usage{ +server(url = "http://127.0.0.1:5555/api/rest") +} +\arguments{ +\item{url}{full http address including hostname, port and path at which to +host the server.} +} +\value{ +This function never returns. +} +\description{ +Creates an instance of an HTTP REST server which evaluates R expressions sent + to it [EXPERIMENTAL]. As arbitrary R expressions are evaluated, this + should only be deployed on the local machine (using the 127.0.0.1 + loopback address) in a trusted environment. +} +\details{ +Query the API with an HTTP client using the \sQuote{POST} method, + with the request data being the R expression as a text string. The + received response body will consist of the serialized evaluation result. + Unserialize to return an R object. + + Use only in a new session. Use \sQuote{ctrl + \\} to forcibly quit + when finished as the function blocks with no means of interruption. + + Currently still experimental as the server lacks error handling. Sending + an invalid R expression will cause the server to exit. +} +\examples{ +if (interactive()) { + +# run in a new session: +# nanonext::server() + +res <- ncurl("http://127.0.0.1:5555/api/rest", + convert = FALSE, + method = "POST", + data = "Sys.time()") + +if (!is_error_value(res$data)) unserialize(res$data) + +} + +} diff --git a/src/init.c b/src/init.c index f03fb3339..d9a83cc17 100644 --- a/src/init.c +++ b/src/init.c @@ -167,6 +167,7 @@ static const R_CallMethodDef callMethods[] = { {"rnng_recv", (DL_FUNC) &rnng_recv, 4}, {"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 6}, {"rnng_request", (DL_FUNC) &rnng_request, 7}, + {"rnng_rest_server", (DL_FUNC) &rnng_rest_server, 1}, {"rnng_send", (DL_FUNC) &rnng_send, 4}, {"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 5}, {"rnng_serial_config", (DL_FUNC) &rnng_serial_config, 4}, diff --git a/src/nanonext.h b/src/nanonext.h index 8acf5791e..ac9b577bc 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -294,6 +294,7 @@ int nano_matchargs(const SEXP); void pipe_cb_signal(nng_pipe, nng_pipe_ev, void *); void tls_finalizer(SEXP); +void nano_printf(const int, const char *, ...); SEXP rnng_advance_rng_state(void); SEXP rnng_aio_call(SEXP); @@ -347,6 +348,7 @@ SEXP rnng_reap(SEXP); SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP); SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP rnng_rest_server(SEXP); SEXP rnng_send(SEXP, SEXP, SEXP, SEXP); SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rnng_serial_config(SEXP, SEXP, SEXP, SEXP); diff --git a/src/server.c b/src/server.c new file mode 100644 index 000000000..18d35fb29 --- /dev/null +++ b/src/server.c @@ -0,0 +1,280 @@ +// Copyright (C) 2022-2024 Hibiki AI Limited +// +// This file is part of nanonext. +// +// nanonext is free software: you can redistribute it and/or modify it under the +// terms of the GNU General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later +// version. +// +// nanonext is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +// A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// nanonext. If not, see . + +// nanonext - HTTP REST Sever -------------------------------------------------- + +#define NANONEXT_HTTP +#define NANONEXT_PROTOCOLS +#include "nanonext.h" + +// REST server ----------------------------------------------------------------- + +// This file contains modified code with the following licence: +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +typedef enum { + SEND_REQ, + RECV_REP, +} job_state; + +typedef struct rest_job { + nng_aio * http_aio; // aio from HTTP we must reply to + nng_http_res * http_res; // HTTP response object + job_state state; // 0 = sending, 1 = receiving + nng_msg * msg; // request message + nng_aio * aio; // request flow + nng_ctx ctx; // context on the request socket + struct rest_job *next; // next on the freelist +} rest_job; + +nng_mtx * job_lock; +rest_job *job_freelist; +nng_socket req_sock; + +static void fatal(const char *reason, int xc) { + nano_printf(1, "%s: %s\n", reason, nng_strerror(xc)); +} + +static void rest_recycle_job(rest_job *job) { + + if (job->http_res != NULL) { + nng_http_res_free(job->http_res); + job->http_res = NULL; + } + if (job->msg != NULL) { + nng_msg_free(job->msg); + job->msg = NULL; + } + if (nng_ctx_id(job->ctx) != 0) { + nng_ctx_close(job->ctx); + } + + nng_mtx_lock(job_lock); + job->next = job_freelist; + job_freelist = job; + nng_mtx_unlock(job_lock); + +} + +static void rest_http_fatal(rest_job *job, const char *fmt, int rv) { + + char buf[128]; + nng_aio *aio = job->http_aio; + nng_http_res *res = job->http_res; + + job->http_res = NULL; + job->http_aio = NULL; + snprintf(buf, sizeof(buf), fmt, nng_strerror(rv)); + nng_http_res_set_status(res, NNG_HTTP_STATUS_INTERNAL_SERVER_ERROR); + nng_http_res_set_reason(res, buf); + nng_aio_set_output(aio, 0, res); + nng_aio_finish(aio, 0); + rest_recycle_job(job); + +} + +static void rest_job_cb(void *arg) { + + rest_job *job = arg; + nng_aio *aio = job->aio; + int xc; + + switch (job->state) { + case SEND_REQ: + if ((xc = nng_aio_result(aio))) { + rest_http_fatal(job, "send REQ failed: %s", xc); + return; + } + job->msg = NULL; + nng_aio_set_msg(aio, NULL); + job->state = RECV_REP; + nng_ctx_recv(job->ctx, aio); + break; + case RECV_REP: + if ((xc = nng_aio_result(aio))) { + rest_http_fatal(job, "recv reply failed: %s", xc); + return; + } + job->msg = nng_aio_get_msg(aio); + if ((xc = nng_http_res_copy_data(job->http_res, + nng_msg_body(job->msg), + nng_msg_len(job->msg)))) { + rest_http_fatal(job, "nng_http_res_copy_data: %s", xc); + return; + } + nng_aio_set_output(job->http_aio, 0, job->http_res); + nng_aio_finish(job->http_aio, 0); + job->http_aio = NULL; + job->http_res = NULL; + rest_recycle_job(job); + break; + default: + fatal("bad case", NNG_ESTATE); + break; + } + +} + +static rest_job *rest_get_job(void) { + + rest_job *job; + + nng_mtx_lock(job_lock); + if ((job = job_freelist) != NULL) { + job_freelist = job->next; + nng_mtx_unlock(job_lock); + job->next = NULL; + return (job); + } + nng_mtx_unlock(job_lock); + if ((job = calloc(1, sizeof(*job))) == NULL) { + return (NULL); + } + if (nng_aio_alloc(&job->aio, rest_job_cb, job) != 0) { + free(job); + return (NULL); + } + return (job); + +} + +void rest_handle(nng_aio *aio) { + + struct rest_job *job; + nng_http_req *req = nng_aio_get_input(aio, 0); + void *data; + size_t sz; + int xc; + + if ((job = rest_get_job()) == NULL) { + nng_aio_finish(aio, NNG_ENOMEM); + return; + } + if ((xc = nng_http_res_alloc(&job->http_res)) || + (xc = nng_ctx_open(&job->ctx, req_sock))) { + rest_recycle_job(job); + nng_aio_finish(aio, xc); + return; + } + + nng_http_req_get_data(req, &data, &sz); + job->http_aio = aio; + + if ((xc = nng_msg_alloc(&job->msg, sz))) { + rest_http_fatal(job, "nng_msg_alloc: %s", xc); + return; + } + + memcpy(nng_msg_body(job->msg), data, sz); + nng_aio_set_msg(job->aio, job->msg); + job->state = SEND_REQ; + nng_ctx_send(job->ctx, job->aio); + +} + +void rest_start(void *arg) { + + const char **addr = (const char **) arg; + nng_http_server *server; + nng_http_handler *handler; + nng_url *url; + int xc; + + if ((xc = nng_mtx_alloc(&job_lock))) + fatal("nng_mtx_alloc", xc); + + job_freelist = NULL; + + if ((xc = nng_url_parse(&url, addr[0]))) + fatal("nng_url_parse", xc); + + if ((xc = nng_req0_open(&req_sock))) + fatal("nng_req0_open", xc); + + if ((xc = nng_dial(req_sock, addr[1], NULL, NNG_FLAG_NONBLOCK))) + fatal("nng_dial(inproc_url)", xc); + + if ((xc = nng_http_server_hold(&server, url))) + fatal("nng_http_server_hold", xc); + + if ((xc = nng_http_handler_alloc(&handler, url->u_path, rest_handle))) + fatal("nng_http_handler_alloc", xc); + + if ((xc = nng_http_handler_set_method(handler, "POST"))) + fatal("nng_http_handler_set_method", xc); + + if ((xc = nng_http_handler_collect_body(handler, true, 1024 * 128))) + fatal("nng_http_handler_collect_body", xc); + + if ((xc = nng_http_server_add_handler(server, handler))) + fatal("nng_http_handler_add_handler", xc); + + if ((xc = nng_http_server_start(server))) + fatal("nng_http_server_start", xc); + + nng_url_free(url); + +} + +void inproc_server(const char* url) { + + nng_socket s; + nng_msg *msg; + int xc; + + if ((xc = nng_rep0_open(&s)) || (xc = nng_listen(s, url, NULL, 0))) + fatal("unable to set up inproc", xc); + + for (;;) { + if ((xc = nng_recvmsg(s, &msg, 0))) + fatal("inproc recvmsg", xc); + + const char *body = nng_msg_body(msg); + nano_buf buf; + SEXP res = R_ParseEvalString(body, R_GlobalEnv); + nano_serialize(&buf, res, R_NilValue); + nng_msg_clear(msg); + nng_msg_append(msg, buf.buf, buf.cur); + if ((xc = nng_sendmsg(s, msg, 0))) + fatal("inproc sendmsg", xc); + + } + +} + +SEXP rnng_rest_server(SEXP url) { + + const char *addr[2] = {CHAR(STRING_ELT(url, 0)), "inproc://n-a-n-o-serv"}; + nng_thread *thr; + int xc; + + if ((xc = nng_thread_create(&thr, rest_start, (void *) addr))) + ERROR_OUT(xc); + + inproc_server(addr[1]); + + nng_thread_destroy(thr); + return R_NilValue; + +} diff --git a/src/thread.c b/src/thread.c index fbde57ba3..98e8323a3 100644 --- a/src/thread.c +++ b/src/thread.c @@ -25,7 +25,8 @@ // # nocov start // tested interactively -static void nano_printf(const int err, const char *fmt, ...) { +void nano_printf(const int err, const char *fmt, ...) { + char buf[NANONEXT_INIT_BUFSIZE]; va_list arg_ptr; From 636dca5e4f6047951cd97329fb2497ca7fb1f86a Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:28:56 +0100 Subject: [PATCH 2/5] support returning strings for cross-client compatibility --- R/ncurl.R | 43 ------------------------------- R/server.R | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++ man/server.Rd | 31 ++++++++++++++-------- src/server.c | 8 +++++- 4 files changed, 99 insertions(+), 54 deletions(-) create mode 100644 R/server.R diff --git a/R/ncurl.R b/R/ncurl.R index c9c94ffca..e58a8d035 100644 --- a/R/ncurl.R +++ b/R/ncurl.R @@ -271,46 +271,3 @@ as.promise.ncurlAio <- function(x) { #' @exportS3Method promises::is.promising #' is.promising.ncurlAio <- function(x) TRUE - -#' Start REST Server -#' -#' Creates an instance of an HTTP REST server which evaluates R expressions sent -#' to it [EXPERIMENTAL]. As arbitrary R expressions are evaluated, this -#' should only be deployed on the local machine (using the 127.0.0.1 -#' loopback address) in a trusted environment. -#' -#' @param url full http address including hostname, port and path at which to -#' host the server. -#' -#' @details Query the API with an HTTP client using the \sQuote{POST} method, -#' with the request data being the R expression as a text string. The -#' received response body will consist of the serialized evaluation result. -#' Unserialize to return an R object. -#' -#' Use only in a new session. Use \sQuote{ctrl + \\} to forcibly quit -#' when finished as the function blocks with no means of interruption. -#' -#' Currently still experimental as the server lacks error handling. Sending -#' an invalid R expression will cause the server to exit. -#' -#' @return This function never returns. -#' -#' @examples -#' if (interactive()) { -#' -#' # run in a new session: -#' # nanonext::server() -#' -#' res <- ncurl("http://127.0.0.1:5555/api/rest", -#' convert = FALSE, -#' method = "POST", -#' data = "Sys.time()") -#' -#' if (!is_error_value(res$data)) unserialize(res$data) -#' -#' } -#' -#' @export -#' -server <- function(url = "http://127.0.0.1:5555/api/rest") - .Call(rnng_rest_server, url) diff --git a/R/server.R b/R/server.R new file mode 100644 index 000000000..f2912f4e3 --- /dev/null +++ b/R/server.R @@ -0,0 +1,71 @@ +# Copyright (C) 2022-2024 Hibiki AI Limited +# +# This file is part of nanonext. +# +# nanonext is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# nanonext is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along with +# nanonext. If not, see . + +# nanonext - server - HTTP REST Server ----------------------------------------- + +#' Start REST Server +#' +#' Creates an instance of an HTTP REST server which evaluates R expressions sent +#' to it [EXPERIMENTAL]. As arbitrary R expressions are evaluated, this +#' should only be deployed on the local machine (using the 127.0.0.1 +#' loopback address) in a trusted environment. +#' +#' @param url full http address including hostname, port and path at which to +#' host the server. +#' +#' @details Query the API with an HTTP client using the \sQuote{POST} method, +#' with the request data being the R expression as a text string. The +#' received response body will consist of the evaluation result as a text +#' string (if of the appropriate type), or otherwise a serialized R object, +#' which should be passed to \code{\link{unserialize}}. +#' +#' Use only in a new session. Use \sQuote{ctrl + \\} to forcibly quit +#' when finished as the function blocks with no means of interruption. +#' +#' Currently still experimental as the server lacks error handling. Sending +#' an invalid R expression will cause the server to quit. +#' +#' @return This function never returns. +#' +#' @examples +#' if (interactive()) { +#' +#' # run server in a new session: +#' # Rscript -e 'nanonext::server()' +#' +#' # query using curl: +#' # curl -X POST http://127.0.0.1:5555/api/rest -d 'format(Sys.time())' +#' +#' ncurl( +#' "http://127.0.0.1:5555/api/rest", +#' method = "POST", +#' data = "format(Sys.time())" +#' ) +#' +#' res <- ncurl( +#' "http://127.0.0.1:5555/api/rest", +#' convert = FALSE, +#' method = "POST", +#' data = "data.frame(random = nanonext::random(3))" +#' ) +#' if (!is_error_value(res$data)) unserialize(res$data) +#' +#' } +#' +#' @export +#' +server <- function(url = "http://127.0.0.1:5555/api/rest") + .Call(rnng_rest_server, url) diff --git a/man/server.Rd b/man/server.Rd index 11b298896..5c12025a5 100644 --- a/man/server.Rd +++ b/man/server.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/ncurl.R +% Please edit documentation in R/server.R \name{server} \alias{server} \title{Start REST Server} @@ -22,26 +22,37 @@ Creates an instance of an HTTP REST server which evaluates R expressions sent \details{ Query the API with an HTTP client using the \sQuote{POST} method, with the request data being the R expression as a text string. The - received response body will consist of the serialized evaluation result. - Unserialize to return an R object. + received response body will consist of the evaluation result as a text + string (if of the appropriate type), or otherwise a serialized R object, + which should be passed to \code{\link{unserialize}}. Use only in a new session. Use \sQuote{ctrl + \\} to forcibly quit when finished as the function blocks with no means of interruption. Currently still experimental as the server lacks error handling. Sending - an invalid R expression will cause the server to exit. + an invalid R expression will cause the server to quit. } \examples{ if (interactive()) { -# run in a new session: -# nanonext::server() +# run server in a new session: +# Rscript -e 'nanonext::server()' -res <- ncurl("http://127.0.0.1:5555/api/rest", - convert = FALSE, - method = "POST", - data = "Sys.time()") +# query using curl: +# curl -X POST http://127.0.0.1:5555/api/rest -d 'format(Sys.time())' +ncurl( + "http://127.0.0.1:5555/api/rest", + method = "POST", + data = "format(Sys.time())" +) + +res <- ncurl( + "http://127.0.0.1:5555/api/rest", + convert = FALSE, + method = "POST", + data = "data.frame(random = nanonext::random(3))" +) if (!is_error_value(res$data)) unserialize(res$data) } diff --git a/src/server.c b/src/server.c index 18d35fb29..582895402 100644 --- a/src/server.c +++ b/src/server.c @@ -253,7 +253,13 @@ void inproc_server(const char* url) { const char *body = nng_msg_body(msg); nano_buf buf; SEXP res = R_ParseEvalString(body, R_GlobalEnv); - nano_serialize(&buf, res, R_NilValue); + if (TYPEOF(res) == STRSXP) { + const char *string = NANO_STRING(res); + buf.buf = (unsigned char *) string; + buf.cur = strlen(string); + } else { + nano_serialize(&buf, res, R_NilValue); + } nng_msg_clear(msg); nng_msg_append(msg, buf.buf, buf.cur); if ((xc = nng_sendmsg(s, msg, 0))) From e928ee7e067099024a2697f2b5b25200ceeab1e9 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Tue, 3 Sep 2024 11:48:36 +0100 Subject: [PATCH 3/5] use R_TopLevelExec() for server error handling --- R/server.R | 13 ++++++++++--- man/server.Rd | 13 ++++++++++--- src/server.c | 19 +++++++++++++++---- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/R/server.R b/R/server.R index f2912f4e3..062b1bd63 100644 --- a/R/server.R +++ b/R/server.R @@ -35,8 +35,8 @@ #' Use only in a new session. Use \sQuote{ctrl + \\} to forcibly quit #' when finished as the function blocks with no means of interruption. #' -#' Currently still experimental as the server lacks error handling. Sending -#' an invalid R expression will cause the server to quit. +#' If the expression could not be parsed or evaluated, the response will be +#' returned with a status code of 500 and a blank body. #' #' @return This function never returns. #' @@ -44,7 +44,7 @@ #' if (interactive()) { #' #' # run server in a new session: -#' # Rscript -e 'nanonext::server()' +#' # Rscript -e 'nanonext::server("http://127.0.0.1:5555/api/rest")' #' #' # query using curl: #' # curl -X POST http://127.0.0.1:5555/api/rest -d 'format(Sys.time())' @@ -55,6 +55,13 @@ #' data = "format(Sys.time())" #' ) #' +#' # error will return status of 500 +#' ncurl( +#' "http://127.0.0.1:5555/api/rest", +#' method = "POST", +#' data = "not_valid()" +#' ) +#' #' res <- ncurl( #' "http://127.0.0.1:5555/api/rest", #' convert = FALSE, diff --git a/man/server.Rd b/man/server.Rd index 5c12025a5..5abcaf32a 100644 --- a/man/server.Rd +++ b/man/server.Rd @@ -29,14 +29,14 @@ Query the API with an HTTP client using the \sQuote{POST} method, Use only in a new session. Use \sQuote{ctrl + \\} to forcibly quit when finished as the function blocks with no means of interruption. - Currently still experimental as the server lacks error handling. Sending - an invalid R expression will cause the server to quit. + If the expression could not be parsed or evaluated, the response will be + returned with a status code of 500 and a blank body. } \examples{ if (interactive()) { # run server in a new session: -# Rscript -e 'nanonext::server()' +# Rscript -e 'nanonext::server("http://127.0.0.1:5555/api/rest")' # query using curl: # curl -X POST http://127.0.0.1:5555/api/rest -d 'format(Sys.time())' @@ -47,6 +47,13 @@ ncurl( data = "format(Sys.time())" ) +# error will return status of 500 +ncurl( + "http://127.0.0.1:5555/api/rest", + method = "POST", + data = "not_valid()" +) + res <- ncurl( "http://127.0.0.1:5555/api/rest", convert = FALSE, diff --git a/src/server.c b/src/server.c index 582895402..978fb4ec8 100644 --- a/src/server.c +++ b/src/server.c @@ -237,6 +237,12 @@ void rest_start(void *arg) { } +SEXP nano_eval_res; + +void parse_eval_safe(void *data) { + nano_eval_res = R_ParseEvalString((const char *) data, R_GlobalEnv); +} + void inproc_server(const char* url) { nng_socket s; @@ -246,22 +252,27 @@ void inproc_server(const char* url) { if ((xc = nng_rep0_open(&s)) || (xc = nng_listen(s, url, NULL, 0))) fatal("unable to set up inproc", xc); + nano_eval_res = R_BlankScalarString; + for (;;) { if ((xc = nng_recvmsg(s, &msg, 0))) fatal("inproc recvmsg", xc); const char *body = nng_msg_body(msg); nano_buf buf; - SEXP res = R_ParseEvalString(body, R_GlobalEnv); - if (TYPEOF(res) == STRSXP) { - const char *string = NANO_STRING(res); + + R_ToplevelExec(parse_eval_safe, (void *) body); + + if (TYPEOF(nano_eval_res) == STRSXP) { + const char *string = NANO_STRING(nano_eval_res); buf.buf = (unsigned char *) string; buf.cur = strlen(string); } else { - nano_serialize(&buf, res, R_NilValue); + nano_serialize(&buf, nano_eval_res, R_NilValue); } nng_msg_clear(msg); nng_msg_append(msg, buf.buf, buf.cur); + nano_eval_res = R_BlankScalarString; if ((xc = nng_sendmsg(s, msg, 0))) fatal("inproc sendmsg", xc); From 952ccfead1fa055e8233bffee41a12b1c1fe6340 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Tue, 3 Sep 2024 16:09:33 +0100 Subject: [PATCH 4/5] restrict scope of eval parse result --- src/server.c | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/server.c b/src/server.c index 978fb4ec8..f6bce6c22 100644 --- a/src/server.c +++ b/src/server.c @@ -237,10 +237,10 @@ void rest_start(void *arg) { } -SEXP nano_eval_res; +static SEXP nano_parse_eval_res; void parse_eval_safe(void *data) { - nano_eval_res = R_ParseEvalString((const char *) data, R_GlobalEnv); + nano_parse_eval_res = R_ParseEvalString((const char *) data, R_GlobalEnv); } void inproc_server(const char* url) { @@ -252,8 +252,6 @@ void inproc_server(const char* url) { if ((xc = nng_rep0_open(&s)) || (xc = nng_listen(s, url, NULL, 0))) fatal("unable to set up inproc", xc); - nano_eval_res = R_BlankScalarString; - for (;;) { if ((xc = nng_recvmsg(s, &msg, 0))) fatal("inproc recvmsg", xc); @@ -261,18 +259,18 @@ void inproc_server(const char* url) { const char *body = nng_msg_body(msg); nano_buf buf; + nano_parse_eval_res = R_BlankScalarString; R_ToplevelExec(parse_eval_safe, (void *) body); - if (TYPEOF(nano_eval_res) == STRSXP) { - const char *string = NANO_STRING(nano_eval_res); + if (TYPEOF(nano_parse_eval_res) == STRSXP) { + const char *string = NANO_STRING(nano_parse_eval_res); buf.buf = (unsigned char *) string; buf.cur = strlen(string); } else { - nano_serialize(&buf, nano_eval_res, R_NilValue); + nano_serialize(&buf, nano_parse_eval_res, R_NilValue); } nng_msg_clear(msg); nng_msg_append(msg, buf.buf, buf.cur); - nano_eval_res = R_BlankScalarString; if ((xc = nng_sendmsg(s, msg, 0))) fatal("inproc sendmsg", xc); From 58b231aba8e2b2b56910ea44b4a156e9568c3056 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Wed, 11 Sep 2024 21:51:55 +0100 Subject: [PATCH 5/5] allow user interrupts --- R/server.R | 7 ++++--- man/server.Rd | 7 ++++--- src/server.c | 32 +++++++++++++------------------- 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/R/server.R b/R/server.R index 062b1bd63..131aead96 100644 --- a/R/server.R +++ b/R/server.R @@ -32,12 +32,13 @@ #' string (if of the appropriate type), or otherwise a serialized R object, #' which should be passed to \code{\link{unserialize}}. #' -#' Use only in a new session. Use \sQuote{ctrl + \\} to forcibly quit -#' when finished as the function blocks with no means of interruption. -#' #' If the expression could not be parsed or evaluated, the response will be #' returned with a status code of 500 and a blank body. #' +#' User interrupts will only be processed after the next query has been +#' completed, hence return from the function may not be immediate. Use +#' \sQuote{ctrl + \\} to forcibly quit the entire R session if required. +#' #' @return This function never returns. #' #' @examples diff --git a/man/server.Rd b/man/server.Rd index 5abcaf32a..c724480b5 100644 --- a/man/server.Rd +++ b/man/server.Rd @@ -26,11 +26,12 @@ Query the API with an HTTP client using the \sQuote{POST} method, string (if of the appropriate type), or otherwise a serialized R object, which should be passed to \code{\link{unserialize}}. - Use only in a new session. Use \sQuote{ctrl + \\} to forcibly quit - when finished as the function blocks with no means of interruption. - If the expression could not be parsed or evaluated, the response will be returned with a status code of 500 and a blank body. + + User interrupts will only be processed after the next query has been + completed, hence return from the function may not be immediate. Use + \sQuote{ctrl + \\} to forcibly quit the entire R session if required. } \examples{ if (interactive()) { diff --git a/src/server.c b/src/server.c index f6bce6c22..2d793c7f8 100644 --- a/src/server.c +++ b/src/server.c @@ -243,25 +243,30 @@ void parse_eval_safe(void *data) { nano_parse_eval_res = R_ParseEvalString((const char *) data, R_GlobalEnv); } -void inproc_server(const char* url) { +SEXP rnng_rest_server(SEXP url) { + const char *addr[2] = {CHAR(STRING_ELT(url, 0)), "inproc://n-a-n-o-serv"}; + nng_thread *thr; nng_socket s; nng_msg *msg; int xc; - if ((xc = nng_rep0_open(&s)) || (xc = nng_listen(s, url, NULL, 0))) + if ((xc = nng_thread_create(&thr, rest_start, (void *) addr))) + ERROR_OUT(xc); + + if ((xc = nng_rep0_open(&s)) || + (xc = nng_listen(s, addr[1], NULL, 0))) fatal("unable to set up inproc", xc); for (;;) { + if ((xc = nng_recvmsg(s, &msg, 0))) fatal("inproc recvmsg", xc); - const char *body = nng_msg_body(msg); - nano_buf buf; - nano_parse_eval_res = R_BlankScalarString; - R_ToplevelExec(parse_eval_safe, (void *) body); + R_ToplevelExec(parse_eval_safe, (void *) nng_msg_body(msg)); + nano_buf buf; if (TYPEOF(nano_parse_eval_res) == STRSXP) { const char *string = NANO_STRING(nano_parse_eval_res); buf.buf = (unsigned char *) string; @@ -274,20 +279,9 @@ void inproc_server(const char* url) { if ((xc = nng_sendmsg(s, msg, 0))) fatal("inproc sendmsg", xc); - } + R_CheckUserInterrupt(); -} - -SEXP rnng_rest_server(SEXP url) { - - const char *addr[2] = {CHAR(STRING_ELT(url, 0)), "inproc://n-a-n-o-serv"}; - nng_thread *thr; - int xc; - - if ((xc = nng_thread_create(&thr, rest_start, (void *) addr))) - ERROR_OUT(xc); - - inproc_server(addr[1]); + } nng_thread_destroy(thr); return R_NilValue;