diff --git a/lib/consumer_supervisor.ex b/lib/consumer_supervisor.ex index 0553426..53af2cb 100644 --- a/lib/consumer_supervisor.ex +++ b/lib/consumer_supervisor.ex @@ -24,7 +24,7 @@ defmodule ConsumerSupervisor do to a producer named `Producer` and starts a new process for each event received from the producer. Each new process will be started by calling `Printer.start_link/1`, which simply starts a task that will print the - incoming event to the terminal. + incoming event to the terminal. defmodule Consumer do use ConsumerSupervisor diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex index b6abe6e..6ee97f0 100644 --- a/lib/gen_stage.ex +++ b/lib/gen_stage.ex @@ -831,8 +831,8 @@ defmodule GenStage do end The returned tuple may also contain 3 or 4 elements. The third - element may be the `:hibernate` atom or a set of options defined - below. + element may be the `:hibernate` atom, the `{:continue, term}` tuple + or a set of options defined below. Returning `:ignore` will cause `start_link/3` to return `:ignore` and the process will exit normally without entering the loop or @@ -882,11 +882,17 @@ defmodule GenStage do @callback init(args :: term) :: {:producer, state} + | {:producer, state, :hibernate | {:continue, term}} | {:producer, state, [producer_option]} + | {:producer, state, :hibernate | {:continue, term}, [producer_option]} | {:producer_consumer, state} + | {:producer_consumer, state, :hibernate | {:continue, term}} | {:producer_consumer, state, [producer_consumer_option]} + | {:producer_consumer, state, :hibernate | {:continue, term}, [producer_consumer_option]} | {:consumer, state} + | {:consumer, state, :hibernate | {:continue, term}} | {:consumer, state, [consumer_option]} + | {:consumer, state, :hibernate | {:continue, term}, [consumer_option]} | :ignore | {:stop, reason :: any} when state: any @@ -1094,6 +1100,25 @@ defmodule GenStage do | {:stop, reason :: term, new_state} when new_state: term, event: term + @doc """ + Invoked to handle `continue` instructions. + + It is useful for performing work after initialization or for splitting the work + in a callback in multiple steps, updating the process state along the way. + + Return values are the same as `c:handle_cast/2`. + + This callback is optional. If one is not implemented, the server will fail + if a continue instruction is used. + + This callback is only supported on Erlang/OTP 21+. + """ + @callback handle_continue(continue :: term, state :: term) :: + {:noreply, [event], new_state} + | {:noreply, [event], new_state, :hibernate} + | {:stop, reason :: term, new_state} + when new_state: term, event: term + @doc """ The same as `c:GenServer.terminate/2`. """ @@ -1127,6 +1152,7 @@ defmodule GenStage do handle_call: 3, handle_cast: 2, handle_info: 2, + handle_continue: 2, terminate: 2 ] @@ -1698,22 +1724,40 @@ defmodule GenStage do def init({mod, args}) do case mod.init(args) do {:producer, state} -> - init_producer(mod, [], state) + init_producer(mod, [], state, :no_continue) {:producer, state, opts} when is_list(opts) -> - init_producer(mod, opts, state) + init_producer(mod, opts, state, :no_continue) + + {:producer, state, continue_or_hibernate} -> + init_producer(mod, [], state, continue_or_hibernate) + + {:producer, state, continue_or_hibernate, opts} when is_list(opts) -> + init_producer(mod, opts, state, continue_or_hibernate) {:producer_consumer, state} -> - init_producer_consumer(mod, [], state) + init_producer_consumer(mod, [], state, :no_continue) {:producer_consumer, state, opts} when is_list(opts) -> - init_producer_consumer(mod, opts, state) + init_producer_consumer(mod, opts, state, :no_continue) + + {:producer_consumer, state, continue_or_hibernate} -> + init_producer_consumer(mod, [], state, continue_or_hibernate) + + {:producer_consumer, state, continue_or_hibernate, opts} when is_list(opts) -> + init_producer_consumer(mod, opts, state, continue_or_hibernate) {:consumer, state} -> - init_consumer(mod, [], state) + init_consumer(mod, [], state, :no_continue) {:consumer, state, opts} when is_list(opts) -> - init_consumer(mod, opts, state) + init_consumer(mod, opts, state, :no_continue) + + {:consumer, state, continue_or_hibernate} -> + init_consumer(mod, [], state, continue_or_hibernate) + + {:consumer, state, continue_or_hibernate, opts} when is_list(opts) -> + init_consumer(mod, opts, state, continue_or_hibernate) {:stop, _} = stop -> stop @@ -1726,7 +1770,7 @@ defmodule GenStage do end end - defp init_producer(mod, opts, state) do + defp init_producer(mod, opts, state, continue_or_hibernate) do with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts), {:ok, buffer_size, opts} <- Utils.validate_integer(opts, :buffer_size, 10000, 0, :infinity, true), @@ -1746,7 +1790,10 @@ defmodule GenStage do dispatcher_state: dispatcher_state } - {:ok, stage} + case continue_or_hibernate do + :no_continue -> {:ok, stage} + continue_or_hibernate -> {:ok, stage, continue_or_hibernate} + end else {:error, message} -> {:stop, {:bad_opts, message}} end @@ -1768,7 +1815,7 @@ defmodule GenStage do end end - defp init_producer_consumer(mod, opts, state) do + defp init_producer_consumer(mod, opts, state, continue_or_hibernate) do with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts), {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []), {:ok, buffer_size, opts} <- @@ -1787,17 +1834,17 @@ defmodule GenStage do dispatcher_state: dispatcher_state } - consumer_init_subscribe(subscribe_to, stage) + consumer_init_subscribe(subscribe_to, stage, continue_or_hibernate) else {:error, message} -> {:stop, {:bad_opts, message}} end end - defp init_consumer(mod, opts, state) do + defp init_consumer(mod, opts, state, continue_or_hibernate) do with {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []), :ok <- Utils.validate_no_opts(opts) do stage = %GenStage{mod: mod, state: state, type: :consumer} - consumer_init_subscribe(subscribe_to, stage) + consumer_init_subscribe(subscribe_to, stage, continue_or_hibernate) else {:error, message} -> {:stop, {:bad_opts, message}} end @@ -2360,11 +2407,11 @@ defmodule GenStage do ## Consumer helpers - defp consumer_init_subscribe(producers, stage) do + defp consumer_init_subscribe(producers, stage, continue_or_hibernate) do fold_fun = fn - to, {:ok, stage} -> + to, {:ok, stage, continue_or_hibernate} -> case consumer_subscribe(to, stage) do - {:reply, _, stage} -> {:ok, stage} + {:reply, _, stage} -> {:ok, stage, continue_or_hibernate} {:stop, reason, _, _} -> {:stop, reason} {:stop, reason, _} -> {:stop, reason} end @@ -2373,7 +2420,11 @@ defmodule GenStage do {:stop, reason} end - :lists.foldl(fold_fun, {:ok, stage}, producers) + :lists.foldl(fold_fun, {:ok, stage, continue_or_hibernate}, producers) + |> case do + {:ok, stage, :no_continue} -> {:ok, stage} + otherwise -> otherwise + end end defp consumer_receive({_, ref} = from, {producer_id, cancel, {demand, min, max}}, events, stage) do diff --git a/test/gen_stage_test.exs b/test/gen_stage_test.exs index 39c69fc..7b062e5 100644 --- a/test/gen_stage_test.exs +++ b/test/gen_stage_test.exs @@ -143,7 +143,7 @@ defmodule GenStageTest do defmodule Postponer do @moduledoc """ - Discards all events. + Postpones all events. """ use GenStage