Skip to content

Support for handle_continue #234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/consumer_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule ConsumerSupervisor do
to a producer named `Producer` and starts a new process for each event
received from the producer. Each new process will be started by calling
`Printer.start_link/1`, which simply starts a task that will print the
incoming event to the terminal.
incoming event to the terminal.

defmodule Consumer do
use ConsumerSupervisor
Expand Down
87 changes: 69 additions & 18 deletions lib/gen_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -831,8 +831,8 @@ defmodule GenStage do
end

The returned tuple may also contain 3 or 4 elements. The third
element may be the `:hibernate` atom or a set of options defined
below.
element may be the `:hibernate` atom, the `{:continue, term}` tuple
or a set of options defined below.

Returning `:ignore` will cause `start_link/3` to return `:ignore`
and the process will exit normally without entering the loop or
Expand Down Expand Up @@ -882,11 +882,17 @@ defmodule GenStage do

@callback init(args :: term) ::
{:producer, state}
| {:producer, state, :hibernate | {:continue, term}}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current PR doesn't handle hiberrnate, does it? In any case, there are other discussions we need to have regarding hibernate, so probably better to focus on continue for now. :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but the existing documentation for the init function mentions hibernate, which confused me. I'll remove the hibernate reference from the docs (around lines 833-835) and from this callback.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove the hibernate reference from the docs (around lines 833-835) and from this callback.

Fantastic, thank you!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, actually, the handle_info callback (and the others) also contains :hibernate. I guess that was a copy paste from the GenServer docs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Actually we do support hibernate there but not on init. So I guess we crossed this line anyway. So I would go ahead and support it on init too!

| {:producer, state, [producer_option]}
| {:producer, state, :hibernate | {:continue, term}, [producer_option]}
| {:producer_consumer, state}
| {:producer_consumer, state, :hibernate | {:continue, term}}
| {:producer_consumer, state, [producer_consumer_option]}
| {:producer_consumer, state, :hibernate | {:continue, term}, [producer_consumer_option]}
| {:consumer, state}
| {:consumer, state, :hibernate | {:continue, term}}
| {:consumer, state, [consumer_option]}
| {:consumer, state, :hibernate | {:continue, term}, [consumer_option]}
| :ignore
| {:stop, reason :: any}
when state: any
Expand Down Expand Up @@ -1094,6 +1100,25 @@ defmodule GenStage do
| {:stop, reason :: term, new_state}
when new_state: term, event: term

@doc """
Invoked to handle `continue` instructions.

It is useful for performing work after initialization or for splitting the work
in a callback in multiple steps, updating the process state along the way.

Return values are the same as `c:handle_cast/2`.

This callback is optional. If one is not implemented, the server will fail
if a continue instruction is used.

This callback is only supported on Erlang/OTP 21+.
"""
@callback handle_continue(continue :: term, state :: term) ::
{:noreply, [event], new_state}
| {:noreply, [event], new_state, :hibernate}
| {:stop, reason :: term, new_state}
when new_state: term, event: term

@doc """
The same as `c:GenServer.terminate/2`.
"""
Expand Down Expand Up @@ -1127,6 +1152,7 @@ defmodule GenStage do
handle_call: 3,
handle_cast: 2,
handle_info: 2,
handle_continue: 2,
terminate: 2
]

Expand Down Expand Up @@ -1698,22 +1724,40 @@ defmodule GenStage do
def init({mod, args}) do
case mod.init(args) do
{:producer, state} ->
init_producer(mod, [], state)
init_producer(mod, [], state, :no_continue)

{:producer, state, opts} when is_list(opts) ->
init_producer(mod, opts, state)
init_producer(mod, opts, state, :no_continue)

{:producer, state, continue_or_hibernate} ->
init_producer(mod, [], state, continue_or_hibernate)

{:producer, state, continue_or_hibernate, opts} when is_list(opts) ->
init_producer(mod, opts, state, continue_or_hibernate)

{:producer_consumer, state} ->
init_producer_consumer(mod, [], state)
init_producer_consumer(mod, [], state, :no_continue)

{:producer_consumer, state, opts} when is_list(opts) ->
init_producer_consumer(mod, opts, state)
init_producer_consumer(mod, opts, state, :no_continue)

{:producer_consumer, state, continue_or_hibernate} ->
init_producer_consumer(mod, [], state, continue_or_hibernate)

{:producer_consumer, state, continue_or_hibernate, opts} when is_list(opts) ->
init_producer_consumer(mod, opts, state, continue_or_hibernate)

{:consumer, state} ->
init_consumer(mod, [], state)
init_consumer(mod, [], state, :no_continue)

{:consumer, state, opts} when is_list(opts) ->
init_consumer(mod, opts, state)
init_consumer(mod, opts, state, :no_continue)

{:consumer, state, continue_or_hibernate} ->
init_consumer(mod, [], state, continue_or_hibernate)

{:consumer, state, continue_or_hibernate, opts} when is_list(opts) ->
init_consumer(mod, opts, state, continue_or_hibernate)

{:stop, _} = stop ->
stop
Expand All @@ -1726,7 +1770,7 @@ defmodule GenStage do
end
end

defp init_producer(mod, opts, state) do
defp init_producer(mod, opts, state, continue_or_hibernate) do
with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts),
{:ok, buffer_size, opts} <-
Utils.validate_integer(opts, :buffer_size, 10000, 0, :infinity, true),
Expand All @@ -1746,7 +1790,10 @@ defmodule GenStage do
dispatcher_state: dispatcher_state
}

{:ok, stage}
case continue_or_hibernate do
:no_continue -> {:ok, stage}
continue_or_hibernate -> {:ok, stage, continue_or_hibernate}
end
else
{:error, message} -> {:stop, {:bad_opts, message}}
end
Expand All @@ -1768,7 +1815,7 @@ defmodule GenStage do
end
end

defp init_producer_consumer(mod, opts, state) do
defp init_producer_consumer(mod, opts, state, continue_or_hibernate) do
with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts),
{:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []),
{:ok, buffer_size, opts} <-
Expand All @@ -1787,17 +1834,17 @@ defmodule GenStage do
dispatcher_state: dispatcher_state
}

consumer_init_subscribe(subscribe_to, stage)
consumer_init_subscribe(subscribe_to, stage, continue_or_hibernate)
else
{:error, message} -> {:stop, {:bad_opts, message}}
end
end

defp init_consumer(mod, opts, state) do
defp init_consumer(mod, opts, state, continue_or_hibernate) do
with {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []),
:ok <- Utils.validate_no_opts(opts) do
stage = %GenStage{mod: mod, state: state, type: :consumer}
consumer_init_subscribe(subscribe_to, stage)
consumer_init_subscribe(subscribe_to, stage, continue_or_hibernate)
else
{:error, message} -> {:stop, {:bad_opts, message}}
end
Expand Down Expand Up @@ -2360,11 +2407,11 @@ defmodule GenStage do

## Consumer helpers

defp consumer_init_subscribe(producers, stage) do
defp consumer_init_subscribe(producers, stage, continue_or_hibernate) do
fold_fun = fn
to, {:ok, stage} ->
to, {:ok, stage, continue_or_hibernate} ->
case consumer_subscribe(to, stage) do
{:reply, _, stage} -> {:ok, stage}
{:reply, _, stage} -> {:ok, stage, continue_or_hibernate}
{:stop, reason, _, _} -> {:stop, reason}
{:stop, reason, _} -> {:stop, reason}
end
Expand All @@ -2373,7 +2420,11 @@ defmodule GenStage do
{:stop, reason}
end

:lists.foldl(fold_fun, {:ok, stage}, producers)
:lists.foldl(fold_fun, {:ok, stage, continue_or_hibernate}, producers)
|> case do
{:ok, stage, :no_continue} -> {:ok, stage}
otherwise -> otherwise
end
end

defp consumer_receive({_, ref} = from, {producer_id, cancel, {demand, min, max}}, events, stage) do
Expand Down
2 changes: 1 addition & 1 deletion test/gen_stage_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ defmodule GenStageTest do

defmodule Postponer do
@moduledoc """
Discards all events.
Postpones all events.
"""

use GenStage
Expand Down