Skip to content

Commit c7add16

Browse files
committed
iterors extracted to separate package
1 parent 5b5c9e9 commit c7add16

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+432
-728
lines changed

.Rbuildignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
^_pkgdown\.yml$
77
^docs$
88
^pkgdown$
9+
^graphs$
910
^vignettes/README.md$
1011
^tests/testthat/graphs
1112
^ctz.svg$

DESCRIPTION

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: async
22
Title: Coroutines: Generators / Yield, Async / Await, and Streams
3-
Version: 0.3.1
3+
Version: 0.3.2
44
Date: 2023-03-01
55
Authors@R:
66
person(given = "Peter",
@@ -25,6 +25,7 @@ Encoding: UTF-8
2525
LazyData: true
2626
Depends: R (>= 4.1)
2727
Imports:
28+
iterors,
2829
nseval (>= 0.4.3),
2930
later,
3031
promises,
@@ -34,14 +35,12 @@ Suggests:
3435
rmarkdown,
3536
knitr,
3637
magrittr,
37-
iterators,
3838
audio,
3939
profvis,
4040
microbenchmark
4141
Collate:
4242
'async-package.R'
4343
'util.R'
44-
'iteror.R'
4544
'cps.R'
4645
'signals.R'
4746
'syntax.R'

NAMESPACE

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@
22

33
S3method(all_indices,environment)
44
S3method(all_indices,list)
5-
S3method(as.list,iteror)
6-
S3method(as.vector,iteror)
75
S3method(compile,coroutine)
86
S3method(debugAsync,coroutine)
97
S3method(format,channel)
108
S3method(format,coroutine)
11-
S3method(format,iteror)
129
S3method(getEntry,coroutine)
1310
S3method(getPump,async)
1411
S3method(getPump,generator)
@@ -23,14 +20,7 @@ S3method(getStop,coroutine)
2320
S3method(getStop,stream)
2421
S3method(is.channel,channel)
2522
S3method(is.channel,default)
26-
S3method(iterators::nextElem,iteror)
27-
S3method(iteror,"function")
28-
S3method(iteror,default)
29-
S3method(iteror,iter)
30-
S3method(iteror,iteror)
3123
S3method(make_graph,coroutine)
32-
S3method(nextOr,funiteror)
33-
S3method(nextOr,iter)
3424
S3method(nextThen,channel)
3525
S3method(print,channel)
3626
S3method(print,coroutine)
@@ -46,6 +36,7 @@ S3method(summary,stream)
4636
export(async)
4737
export(await)
4838
export(awaitNext)
39+
export(collect)
4940
export(collector)
5041
export(combine)
5142
export(debugAsync)
@@ -54,9 +45,7 @@ export(gather)
5445
export(gen)
5546
export(goto)
5647
export(graphAsync)
57-
export(ilimit)
5848
export(is.channel)
59-
export(iseq)
6049
export(iteror)
6150
export(nextOr)
6251
export(nextThen)
@@ -71,3 +60,5 @@ import(nseval)
7160
import(promises)
7261
import(testthat)
7362
import(utils)
63+
importFrom(iterors,iteror)
64+
importFrom(iterors,nextOr)

NEWS.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# async 0.3.1
22

3+
* `iteror` and associated functions have been extracted to a new package [`iterors`](http://github.com/crowding/iterors), which includes
4+
5+
# async 0.3.1
6+
37
* `debugAsync` now accepts a `trace` argument to control printing of execution traces to console.
48
Execution traces now print node addresses.
59
* Various inspection methods `getNode`, `getState`, `getEnv`, `getOrig` have been consolidated into the method `summary.coroutine`.

R/all_names.R

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -164,29 +164,34 @@ visit_function <- function(fn, yield, nonTail=TRUE, forGraph=FALSE) {
164164
visit_weird_call(expr, inTail, NULL, yield)
165165
} else if (exists(head, env, inherits=FALSE)) {
166166
peek <- get0(head, envir=env, ifnotfound=NULL, inherits=FALSE)
167-
if (!all(c("cont") %in% names(formals(peek)))) {
167+
if (is.function(peek)) {
168+
if (!all(c("cont") %in% names(formals(peek)))) {
169+
visit_ordinary_call(expr, inTail, orig, yield)
170+
} else {
171+
# A trampoline-indirect call! Register both the
172+
# target and the indirect.
173+
handl <- match.call(peek, expr, expand.dots=FALSE,
174+
envir=as.environment(list(`...`=NULL)))
175+
if ("winding" %in% names(handl)) {
176+
# windup takes TWO function pointers
177+
woundup <- as.call(list(handl$winding))
178+
windup <- TRUE
179+
handl$winding <- NULL
180+
yield(as.character(woundup[[1]]), "wind")
181+
yield(c(list(woundup, expr), orig), "windup")
182+
visit_weird_call(woundup, inTail, c(list(expr), orig), yield)}
183+
trampoline_args <- names(handl) %in% c("cont", "val")
184+
trampolined <- as.call(handl[trampoline_args])
185+
handl <- handl[!trampoline_args]
186+
yield(as.character(trampolined[[1]]), "tramp")
187+
yield(as.character(expr[[1]]), "hand")
188+
yield(c(list(handl, expr), orig), "handler")
189+
yield(c(list(trampolined, expr), orig), "trampoline")
190+
visit_weird_call(handl, FALSE, c(list(expr), orig), yield)
191+
}
192+
} else { # not a function?
193+
#This comes up if it's an ieration variable, which you write to...
168194
visit_ordinary_call(expr, inTail, orig, yield)
169-
} else {
170-
# A trampoline-indirect call! Register both the
171-
# target and the indirect.
172-
handl <- match.call(peek, expr, expand.dots=FALSE,
173-
envir=as.environment(list(`...`=NULL)))
174-
if ("winding" %in% names(handl)) {
175-
# windup takes TWO function pointers
176-
woundup <- as.call(list(handl$winding))
177-
windup <- TRUE
178-
handl$winding <- NULL
179-
yield(as.character(woundup[[1]]), "wind")
180-
yield(c(list(woundup, expr), orig), "windup")
181-
visit_weird_call(woundup, inTail, c(list(expr), orig), yield)}
182-
trampoline_args <- names(handl) %in% c("cont", "val")
183-
trampolined <- as.call(handl[trampoline_args])
184-
handl <- handl[!trampoline_args]
185-
yield(as.character(trampolined[[1]]), "tramp")
186-
yield(as.character(expr[[1]]), "hand")
187-
yield(c(list(handl, expr), orig), "handler")
188-
yield(c(list(trampolined, expr), orig), "trampoline")
189-
visit_weird_call(handl, FALSE, c(list(expr), orig), yield)
190195
}
191196
} else {
192197
# something that isn't bound in the immediately

R/async-package.R

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,16 @@ trace_ <- function(x) if(getOption("async.verbose")) cat(x)
5050
options(async.compileLevel=getOption("async.compileLevel") %||% 0,
5151
async.paranoid=getOption("async.paranoid") %||% FALSE,
5252
async.verbose=getOption("async.verbose") %||% FALSE,
53-
async.destructive=getOption("async.verbose") %||% TRUE,
54-
async.sendLater=getOption("async.verbose") %||% TRUE)
53+
async.destructive=getOption("async.destructive") %||% TRUE,
54+
async.sendLater=getOption("async.sendLater") %||% TRUE)
5555
}
5656

57+
#' @export
58+
iterors::nextOr
59+
60+
#' @export
61+
iterors::iteror
62+
5763
## usethis namespace: start
5864
## usethis namespace: end
5965
NULL

R/async.R

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,11 @@ make_async <- function(expr, orig = expr, ...,
179179
nonce <- (function() function() NULL)()
180180
state <- "pending" #print method uses this
181181
value <- nonce
182+
err <- NULL
182183
resolve_ <- NULL
183184
reject_ <- NULL
184185

185-
node(getState <- function() state)
186+
node(getState <- function() list(state=state, err=err))
186187

187188
node(return_ <- function(val) {
188189
state <<- "resolved"
@@ -192,7 +193,7 @@ make_async <- function(expr, orig = expr, ...,
192193
})
193194

194195
node(stop_ <- function(val) {
195-
value <<- val
196+
err <<- val
196197
state <<- "rejected"
197198
reject_(val)
198199
val
@@ -313,10 +314,8 @@ getStartSet.async <- function(x, ...) {
313314
#' "rejected".
314315
#' @exportS3Method
315316
summary.async <- function(object, ...) {
316-
c(list(code=object$orig,
317-
state=object$state$getState(),
318-
node=environment(object$state$pump)$getCont(),
319-
envir=environment(object$state$pump)$targetEnv),
317+
c(list(code=object$orig),
318+
object$state$getState(),
320319
NextMethod("summary"))
321320
}
322321

R/channel.R

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ channel <- function(impl, max_queue=500L, max_awaiting=500L,
220220
break
221221
}))
222222
listener$resolve(val)
223+
odo <<- odo+1
223224
}, error=function(err) {
224225
warning("Unhandled channel error on send: ", err)
225226
})
@@ -237,18 +238,17 @@ channel <- function(impl, max_queue=500L, max_awaiting=500L,
237238
send()
238239
}
239240

240-
nextOr <- function(or) {
241+
nextOr_ <- function(or) {
241242
#subscribe and return a promise.
242243
promise(function(resolve, reject) {
243244
nextThen(resolve, reject, function() reject("StopIteration"))
244245
})
245246
}
246247

247248
impl(emit, reject, close)
248-
structure(list(nextThen=nextThen, nextOr=nextOr,
249-
formatChannel=formatChannel),
250-
class=c("channel", "funiteror", "iteror", "iter"))
251-
249+
structure(add_class(iteror(nextOr_), "channel"),
250+
methods=list(nextThen=nextThen, nextOr=nextOr,
251+
formatChannel=formatChannel))
252252
}
253253

254254
#' @exportS3Method
@@ -258,7 +258,7 @@ print.channel <- function(x, ...) {
258258

259259
#' @exportS3Method
260260
format.channel <- function(x, ...) {
261-
x$formatChannel(...)
261+
attr(x, "methods")$formatChannel(...)
262262
}
263263

264264
#' Receive values from channels by callback.
@@ -278,7 +278,9 @@ format.channel <- function(x, ...) {
278278
#' emitted value. For [subscribe], a function to be called with each
279279
#' emitted value until the stream finishes.
280280
#' @param onError Function to be called if channel stops with an
281-
#' error.
281+
#' error. Note that if you call nextThen multiple times to register
282+
#' multile callbacks, only the first will receive onError; the rest
283+
#' will be called with onClose.
282284
#' @param onClose Function to be called if the channel finishes normally.
283285
#' @param ... Undocumented.
284286
#' @export
@@ -293,7 +295,7 @@ nextThen.channel <- function(x,
293295
stop("Unhandled channel error: ", err),
294296
onClose = function() NULL,
295297
...) {
296-
x$nextThen(onNext, onError, onClose, ...)
298+
attr(x, "methods")$nextThen(onNext, onError, onClose, ...)
297299
}
298300

299301
#' @export
@@ -332,7 +334,7 @@ subscribe.channel <- function(x, onNext, onError, onClose, ...) {
332334
#'
333335
#' `combine(...)` takes any number of [promise] or [channel]
334336
#' objects. It awaits each one, and returns a [channel] object
335-
#' which re-emits every value from its target promises, in whatever
337+
#' which re-emits every value from its targets, in whatever
336338
#' order they are received.
337339
#' @param ... Each argument should be a [promise] or a [channel].
338340
#' @return a [channel] object.

R/collect.R

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,4 @@
1-
#' Gather all values emitted by iterators and channels.
2-
#'
3-
#' @description These functions help collect values from generators, streams or
4-
#' other processes into lists. Note that you can also generate a list
5-
#' of values using [run].
6-
#'
7-
#' @description `as.list.iteror` and `as.vector.iteror` convert iterable
8-
#' objects into vectors of the given mode.
9-
#'
10-
#' @exportS3Method as.list iteror
11-
#' @param ... Undocumented.
12-
#' @return `as.list.iteror` returns a `[list]`.
13-
#' @author Peter Meilstrup
14-
#' @rdname collect
15-
#' @examples
16-
#'
17-
#' as.list(iseq(1,10, by=3))
18-
as.list.iteror <- function(x, ...) {
19-
collect(function(emit) repeat emit(nextOr(x, break)), type=list())
20-
}
21-
22-
#' @param x An [iteror]
23-
#' @param mode The mode of the output; taking the same modes as `vector`.
24-
#' @return `as.vector.iteror(x, mode)` returns a vector of the given mode.
25-
#' @exportS3Method as.vector iteror
26-
#' @rdname collect
27-
#' @examples
28-
#'
29-
#' as.vector(gen(for (i in 1:10) if (i %% 3 != 0) yield(i)), "numeric")
30-
as.vector.iteror <- function(x, mode) {
31-
collect(function(emit)
32-
repeat(emit(nextOr(x, break))),
33-
vector(mode, 0))
34-
}
35-
1+
#' Collect iterator / channel items into a vector.
362
#' @rdname collect
373
#' @description `gather` takes a [channel] as argument and returns a
384
#' [promise]. All values emitted by the channel will be collected
@@ -68,6 +34,7 @@ gather <- function(ch, type=list()) {
6834
#' @rdname collect
6935
#' @description Method `as.promise.channel` is a synonym for `gather`.
7036
#' @exportS3Method promises::as.promise channel
37+
#' @param x a [channel].
7138
as.promise.channel <- function(x) gather(x)
7239

7340
#' @rdname collect
@@ -100,6 +67,7 @@ as.promise.channel <- function(x) gather(x)
10067
#' as.list.iteror <- function(it) {
10168
#' collect(\(yield) repeat yield(nextOr(it, break)))
10269
#' }
70+
#' @export
10371
collect <- function(fn, type=list()) {
10472
collector(function(emit, extract) {fn(emit); extract(TRUE)}, type)
10573
}

R/coroutine.R

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
# not-exported methods
32
getPump <- function(x) UseMethod("getPump")
43
getEntry <- function(x) UseMethod("getEntry")
@@ -80,7 +79,7 @@ getStartSet.coroutine <- function(x) {
8079
runPump = environment(getPump(x))$runPump,
8180
base_winding = environment(getPump(x))$base_winding,
8281
setDebug = environment(getPump(x))$setDebug,
83-
getCont = environment(getPump(x))$getCont)
82+
getPumpState = environment(getPump(x))$getPumpState)
8483
}
8584

8685
#' Query / display coroutine properties and state.
@@ -95,11 +94,11 @@ getStartSet.coroutine <- function(x) {
9594
format.coroutine <- function(x, ...) {
9695
s <- summary(x)
9796
a <- deparse(call(class(x)[[1]], s$code), backtick=TRUE)
98-
b <- format(s$envir, ...)
97+
b <- format(s$targetEnv, ...)
9998
c <- paste0(c("<", class(x)[[1]], " [",
10099
s$state,
101-
" at `", s$node, "`",
102-
if (s$state=="stopped")
100+
" at `", s$cont, "`",
101+
if (s$state %in% c("stopped", "rejected"))
103102
c(": ", capture.output(print(s$err))),
104103
"]>"), collapse="")
105104
d <- NextMethod()
@@ -136,12 +135,8 @@ coroutine_function <- function(arg, head, ...) {
136135
#' * `err`: the error object, if the coroutine caught an error.
137136
#' @export
138137
summary.coroutine <- function(object, ...) {
138+
s <- environment(getPump(object))$getPumpState()
139139
d <- debugAsync(object)
140-
list(
141-
node=environment(getPump(object))$getCont(),
142-
envir=environment(getPump(object))$targetEnv,
143-
err=environment(getPump(object))$err,
144-
debugR=d$R,
145-
debugInternal=d$internal,
146-
trace=d$trace)
140+
c(s,
141+
debug=d)
147142
}

0 commit comments

Comments
 (0)