@@ -83,22 +83,20 @@ deque <- function(len=64) {
83
83
# ' to be determined. It is something like a combination of a [promise]
84
84
# ' and an [iteror].
85
85
# '
86
- # ' The channel interface could be used to represent and work with data
87
- # ' coming in over a connection, data values being logged over time, a
88
- # ' queue of incoming requests, and things of that nature.
86
+ # ' The channel interface is intended to represent and work with
87
+ # ' asynchronous, live data sources, for instance event logs,
88
+ # ' non-blocking connections, paginated query results, reactive values,
89
+ # ' and other processes that yield a sequence of values over time.
89
90
# '
90
- # ' The friendly way to create a channel and use it in asynchronous
91
- # ' programming is to use a [stream] coroutine. Inside of `stream()`
92
- # ' call [await] to wait on promises, [awaitNext] to wait on other
93
- # ' streams and [yield] to yield values. To signal end of iteration
94
- # ' use `return()` (which will discard its value) and to signal an
95
- # ' error use `stop()`.
91
+ # ' `channel` is an S3 method and will attempt to convert the argument
92
+ # ' `obj` into a channel object according to its class. In particular
93
+ # ' [connection] objects will be wrapped with a connection.
96
94
# '
97
- # ' The friendly way to consume values from a channel is to use
98
- # ' awaitNext within an ` async` or ` stream` coroutine.
95
+ # ' The friendly way to obtain values from a channel is to use
96
+ # ' ` awaitNext` or `for` loops within an [ async] or [ stream] coroutine.
99
97
# '
100
- # ' The low-level interface to request values from a channel is to call
101
- # ' [nextThen]`(ch, onNext=, onError=, onClose=)]`, providing callback
98
+ # ' The low-level interface to obtain values from a channel is to call
99
+ # ' [nextThen]`(ch, onNext=, onError=, onClose=, ... )]`, providing callback
102
100
# ' functions for at least `onNext(val)`. Those callbacks will be
103
101
# ' appended to an internal queue, and will be called as soon as data
104
102
# ' is available, in the order that requests were received.
@@ -108,25 +106,32 @@ deque <- function(len=64) {
108
106
# ' available value. Each promise created this way will be resolved in
109
107
# ' the order that data come in. Note that this way there is no special
110
108
# ' signal for end of iteration; a promise will reject with
111
- # ' the sigil value `"StopIteration"` to signal end of iteration.
109
+ # ' a condition message `"StopIteration"` to signal end of iteration.
112
110
# '
113
111
# ' Be careful with the iterator-over-promises interface though: if you
114
112
# ' call `as.list.iteror(pr)` you may get stuck in an infinite loop, as
115
113
# ' `as.list` keeps calling `nextElem` and receives more promises to
116
114
# ' represent values that exist only hypothetically. This is one
117
115
# ' reason for the `max_listeners` limit.
118
116
# '
119
- # ' The low-level interface to _create_ a channel object is to call
120
- # ' `channel(function(emit, reject, cancel) {...})`, providing your own
121
- # ' function in its argument; your function will receive those three
122
- # ' callback methods as arguments. Then use whatever means to arrange
123
- # ' to call `emit(val)` some time in the future as data comes in. When
124
- # ' you are done emitting values, call the `close()` callback. To
125
- # ' report an error use the callback `reject(err)` The next requestor
126
- # ' will receive the error. If there is more than one listener, other
127
- # ' queued listeners will get a `close` signal.
117
+ # ' The friendly way to create a channel with custom behavior is to use
118
+ # ' a [stream] coroutine. Inside of `stream()` call [await] to wait on
119
+ # ' promises, [awaitNext] to wait on other streams and [yield] to yield
120
+ # ' values. To signal end of iteration use `return()` (which will
121
+ # ' discard its value) and to signal an error use `stop()`.
128
122
# '
129
- # ' @param impl A user-provided function; it will receive three
123
+ # ' The low-level interface to create a channel with custom behavior
124
+ # ' is to call `channel(function(emit, reject, cancel) {...})`,
125
+ # ' providing your own function definition; your function will
126
+ # ' receive those three callback methods as arguments. Then use
127
+ # ' whatever means to arrange to call `emit(val)` some time in the
128
+ # ' future as data comes in. When you are done emitting values, call
129
+ # ' the `close()` callback. To report an error call
130
+ # ' `reject(err)`; the next requestor will receive the error. If there
131
+ # ' is more than one listener, other queued listeners will get a
132
+ # ' `close` signal.
133
+ # '
134
+ # ' @param obj A user-provided function; it will receive three
130
135
# ' callback functions as arguments, in order, `emit(val)`,
131
136
# ' `reject(err)` and `close()`
132
137
# ' @param max_queue The maximum number of outgoing values to store if
@@ -141,10 +146,25 @@ deque <- function(len=64) {
141
146
# ' @return a channel object, supporting methods "nextThen" and "nextElem"
142
147
# '
143
148
# ' @author Peter Meilstrup
144
- channel <- function (impl , max_queue = 500L , max_awaiting = 500L ,
145
- wakeup = function () NULL ) {
149
+ # ' @export
150
+ channel <- function (obj , ... ) {
151
+ UseMethod(" channel" )
152
+ }
153
+
154
+ # ' @exportS3Method
155
+ channel.default <- function (obj , ... ) {
156
+ if (is.function(obj ))
157
+ channel.function(obj , ... )
158
+ else stop(" Don't know how to make channel out of that" )
159
+ }
160
+
161
+ # ' @exportS3Method channel "function"
162
+ # ' @export
163
+ # ' @rdname channel
164
+ channel.function <- function (obj , max_queue = 500L , max_awaiting = 500L ,
165
+ wakeup = function (... ) NULL ) {
146
166
# list of callbacks waiting to be made having yet to be sent
147
- # each is a list(resolve=, reject=, close=
167
+ # each is a list(resolve=, reject=, close= )
148
168
outgoing <- deque()
149
169
# list of values waiting for a callback
150
170
awaiting <- deque()
@@ -203,22 +223,25 @@ channel <- function(impl, max_queue=500L, max_awaiting=500L,
203
223
tryCatch({
204
224
val <- outgoing $ getFirst(
205
225
or = switch (state ,
206
- " error" = {
207
- state <<- " stopped"
208
- listener $ reject(errorValue )
209
- odo <<- odo + 1
210
- break
211
- },
212
- " stopped" ,
213
- " closed" = {
214
- listener $ close()
215
- break
216
- },
217
- " running" = {
218
- awaiting $ prepend(listener )
219
- wakeup()
220
- break
221
- }))
226
+ " error" = {
227
+ state <<- " stopped"
228
+ listener $ reject(errorValue )
229
+ odo <<- odo + 1
230
+ break
231
+ },
232
+ " stopped" ,
233
+ " closed" = {
234
+ listener $ close()
235
+ break
236
+ },
237
+ " running" = {
238
+ awaiting $ prepend(listener )
239
+ # pass along arguments...
240
+ if (length(listener $ args ) > 0 )
241
+ do.call(wakeup , listener $ args )
242
+ else wakeup()
243
+ break
244
+ }))
222
245
listener $ resolve(val )
223
246
odo <<- odo + 1
224
247
}, error = function (err ) {
@@ -230,25 +253,26 @@ channel <- function(impl, max_queue=500L, max_awaiting=500L,
230
253
}
231
254
232
255
nextThen <- function (onNext ,
233
- onError = function (err )
234
- warning(" Unhandled promise_iter error " , err ),
235
- onClose ) {
256
+ onError = function (err )
257
+ warning(" Unhandled channel error " , err ),
258
+ onClose , ... ) {
236
259
if (awaiting $ length() > max_awaiting ) stop(" Channel has too many listeners" )
237
- awaiting $ append(list (resolve = onNext , reject = onError , close = onClose ))
260
+ awaiting $ append(list (resolve = onNext , reject = onError ,
261
+ close = onClose , args = list (... )))
238
262
send()
239
263
}
240
264
241
- nextOr_ <- function (or ) {
265
+ nextOr_ <- function (or , ... ) {
242
266
# subscribe and return a promise.
243
267
promise(function (resolve , reject ) {
244
- nextThen(resolve , reject , function () reject(" StopIteration" ))
268
+ nextThen(resolve , reject , function () reject(simpleError( " StopIteration" )), ... )
245
269
})
246
270
}
247
271
248
- impl (emit , reject , close )
272
+ obj (emit , reject , close )
249
273
structure(add_class(iteror(nextOr_ ), " channel" ),
250
- methods = list (nextThen = nextThen , nextOr = nextOr ,
251
- formatChannel = formatChannel ))
274
+ methods = list (nextThen = nextThen , nextOr = nextOr ,
275
+ formatChannel = formatChannel ))
252
276
}
253
277
254
278
# ' @exportS3Method
@@ -371,3 +395,68 @@ combine <- function(...) {
371
395
else running <- TRUE
372
396
})
373
397
}
398
+
399
+ # The channel method for connections wraps a connection object
400
+ # (which should be opened in non-blocking mode).
401
+ channel.connection <- function (obj , ... ,
402
+ read = {
403
+ if (summary(obj )$ text == " text" )
404
+ c(" lines" , " char" )
405
+ else c(" bin" , " lines" , " char" )
406
+ },
407
+ read_params = {
408
+ switch (read ,
409
+ lines = list (n = 1 ),
410
+ char = list (nchar = 1 ),
411
+ bin = list (what = " raw" , n = 1 ))
412
+ },
413
+ loop = current_loop()) {
414
+ if (! isOpen(obj , " read" ))
415
+ stop(" Need to open the connection for reading before making a channel" )
416
+
417
+ read <- match.arg(read )
418
+ readMethod <- switch (read ,
419
+ lines = readLines ,
420
+ char = readChar ,
421
+ bin = readBin )
422
+
423
+ emit <- identity
424
+ reject <- identity
425
+ close <- function () NULL
426
+
427
+ arguments <- read_params | > names() | > lapply(as.name ) | > structure(names = names(read_params ))
428
+ readCall <- function _(
429
+ c(list (... ), read_params ),
430
+ bquote(splice = TRUE , {
431
+ readMethod(obj , ..(arguments ), ... )
432
+ },
433
+ environment()
434
+ ))
435
+
436
+ doRead <- function (... ) {
437
+ fn <- function () {
438
+ tryCatch({
439
+ result <- readCall(... )
440
+ if (length(result ) == 0 ) {
441
+ cat(" no results...\n " )
442
+ later(fn , 1 )
443
+ } else {
444
+ if (isIncomplete(obj )) {
445
+ cat(" incomplete results...\n " )
446
+ later(fn , 1 )
447
+ } else {
448
+ emit(result )
449
+ }
450
+ }}, error = function (x ) {
451
+ close(obj );
452
+ stop(x )
453
+ })
454
+ }
455
+ fn()
456
+ }
457
+
458
+ channel(\(emit , reject , close ) {
459
+ emit <<- emit ; reject <<- reject ; close <<- close
460
+ }, wakeup = doRead )
461
+
462
+ }
0 commit comments