From 7a1ba10b2f15e478b89107b8775022dc6a971420 Mon Sep 17 00:00:00 2001 From: Danie Palm Date: Wed, 10 Apr 2019 10:11:33 +0200 Subject: [PATCH 1/4] Handle continue on producer init --- lib/gen_stage.ex | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex index b6abe6e..d17d704 100644 --- a/lib/gen_stage.ex +++ b/lib/gen_stage.ex @@ -882,11 +882,17 @@ defmodule GenStage do @callback init(args :: term) :: {:producer, state} + | {:producer, state, :hibernate | {:continue, term}} | {:producer, state, [producer_option]} + | {:producer, state, :hibernate | {:continue, term}, [producer_option]} | {:producer_consumer, state} + | {:producer_consumer, state, :hibernate | {:continue, term}} | {:producer_consumer, state, [producer_consumer_option]} + | {:producer_consumer, state, :hibernate | {:continue, term}, [producer_consumer_option]} | {:consumer, state} + | {:consumer, state, :hibernate | {:continue, term}} | {:consumer, state, [consumer_option]} + | {:consumer, state, :hibernate | {:continue, term}, [consumer_option]} | :ignore | {:stop, reason :: any} when state: any @@ -1094,6 +1100,25 @@ defmodule GenStage do | {:stop, reason :: term, new_state} when new_state: term, event: term + @doc """ + Invoked to handle `continue` instructions. + + It is useful for performing work after initialization or for splitting the work + in a callback in multiple steps, updating the process state along the way. + + Return values are the same as `c:handle_cast/2`. + + This callback is optional. If one is not implemented, the server will fail + if a continue instruction is used. + + This callback is only supported on Erlang/OTP 21+. + """ + @callback handle_continue(continue :: term, state :: term) :: + {:noreply, [event], new_state} + | {:noreply, [event], new_state, :hibernate} + | {:stop, reason :: term, new_state} + when new_state: term, event: term + @doc """ The same as `c:GenServer.terminate/2`. """ @@ -1127,6 +1152,7 @@ defmodule GenStage do handle_call: 3, handle_cast: 2, handle_info: 2, + handle_continue: 2, terminate: 2 ] @@ -1700,9 +1726,15 @@ defmodule GenStage do {:producer, state} -> init_producer(mod, [], state) + {:producer, state, {:continue, continue}} -> + init_producer(mod, [], state, {:continue, continue}) + {:producer, state, opts} when is_list(opts) -> init_producer(mod, opts, state) + {:producer, state, {:continue, continue}, opts} when is_list(opts) -> + init_producer(mod, opts, state, {:continue, continue}) + {:producer_consumer, state} -> init_producer_consumer(mod, [], state) @@ -1726,7 +1758,7 @@ defmodule GenStage do end end - defp init_producer(mod, opts, state) do + defp init_producer(mod, opts, state, continue \\ :no_continue) do with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts), {:ok, buffer_size, opts} <- Utils.validate_integer(opts, :buffer_size, 10000, 0, :infinity, true), @@ -1746,7 +1778,10 @@ defmodule GenStage do dispatcher_state: dispatcher_state } - {:ok, stage} + case continue do + :no_continue -> {:ok, stage} + continue -> {:ok, stage, continue} + end else {:error, message} -> {:stop, {:bad_opts, message}} end From 67bba002eb25e8eaa1e60a087367b18fd023971b Mon Sep 17 00:00:00 2001 From: Danie Palm Date: Thu, 16 May 2019 20:39:53 +0200 Subject: [PATCH 2/4] support continue for consumers --- lib/gen_stage.ex | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex index d17d704..273269c 100644 --- a/lib/gen_stage.ex +++ b/lib/gen_stage.ex @@ -1738,15 +1738,27 @@ defmodule GenStage do {:producer_consumer, state} -> init_producer_consumer(mod, [], state) + {:producer_consumer, state, {:continue, continue}} -> + init_producer_consumer(mod, [], state, {:continue, continue}) + {:producer_consumer, state, opts} when is_list(opts) -> init_producer_consumer(mod, opts, state) + {:producer_consumer, state, {:continue, continue}, opts} when is_list(opts) -> + init_producer_consumer(mod, opts, state, {:continue, continue}) + {:consumer, state} -> init_consumer(mod, [], state) + {:consumer, state, {:continue, continue}} -> + init_consumer(mod, [], state, {:continue, continue}) + {:consumer, state, opts} when is_list(opts) -> init_consumer(mod, opts, state) + {:consumer, state, {:continue, continue}, opts} when is_list(opts) -> + init_consumer(mod, opts, state, {:continue, continue}) + {:stop, _} = stop -> stop @@ -1803,7 +1815,7 @@ defmodule GenStage do end end - defp init_producer_consumer(mod, opts, state) do + defp init_producer_consumer(mod, opts, state, continue \\ :no_continue) do with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts), {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []), {:ok, buffer_size, opts} <- @@ -1822,17 +1834,17 @@ defmodule GenStage do dispatcher_state: dispatcher_state } - consumer_init_subscribe(subscribe_to, stage) + consumer_init_subscribe(subscribe_to, stage, continue) else {:error, message} -> {:stop, {:bad_opts, message}} end end - defp init_consumer(mod, opts, state) do + defp init_consumer(mod, opts, state, continue \\ :no_continue) do with {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []), :ok <- Utils.validate_no_opts(opts) do stage = %GenStage{mod: mod, state: state, type: :consumer} - consumer_init_subscribe(subscribe_to, stage) + consumer_init_subscribe(subscribe_to, stage, continue) else {:error, message} -> {:stop, {:bad_opts, message}} end @@ -2395,20 +2407,27 @@ defmodule GenStage do ## Consumer helpers - defp consumer_init_subscribe(producers, stage) do + defp consumer_init_subscribe(producers, stage, continue) do fold_fun = fn - to, {:ok, stage} -> + to, {:ok, stage, :no_continue} -> case consumer_subscribe(to, stage) do {:reply, _, stage} -> {:ok, stage} {:stop, reason, _, _} -> {:stop, reason} {:stop, reason, _} -> {:stop, reason} end + to, {:ok, stage, continue} -> + case consumer_subscribe(to, stage) do + {:reply, _, stage} -> {:ok, stage, continue} + {:stop, reason, _, _} -> {:stop, reason} + {:stop, reason, _} -> {:stop, reason} + end + _, {:stop, reason} -> {:stop, reason} end - :lists.foldl(fold_fun, {:ok, stage}, producers) + :lists.foldl(fold_fun, {:ok, stage, continue}, producers) end defp consumer_receive({_, ref} = from, {producer_id, cancel, {demand, min, max}}, events, stage) do From 299d17ebc675fd5d8dd56f7251c37f3b03c4ee65 Mon Sep 17 00:00:00 2001 From: Danie Palm Date: Thu, 23 May 2019 22:33:02 +0200 Subject: [PATCH 3/4] most tests pass again --- lib/consumer_supervisor.ex | 2 +- lib/gen_stage.ex | 73 ++++++++++++++++++++------------------ test/gen_stage_test.exs | 2 +- 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/lib/consumer_supervisor.ex b/lib/consumer_supervisor.ex index 0553426..53af2cb 100644 --- a/lib/consumer_supervisor.ex +++ b/lib/consumer_supervisor.ex @@ -24,7 +24,7 @@ defmodule ConsumerSupervisor do to a producer named `Producer` and starts a new process for each event received from the producer. Each new process will be started by calling `Printer.start_link/1`, which simply starts a task that will print the - incoming event to the terminal. + incoming event to the terminal. defmodule Consumer do use ConsumerSupervisor diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex index 273269c..b16aabc 100644 --- a/lib/gen_stage.ex +++ b/lib/gen_stage.ex @@ -831,8 +831,8 @@ defmodule GenStage do end The returned tuple may also contain 3 or 4 elements. The third - element may be the `:hibernate` atom or a set of options defined - below. + element may be the `:hibernate` atom, the `{:continue, term}` tuple + or a set of options defined below. Returning `:ignore` will cause `start_link/3` to return `:ignore` and the process will exit normally without entering the loop or @@ -1724,40 +1724,40 @@ defmodule GenStage do def init({mod, args}) do case mod.init(args) do {:producer, state} -> - init_producer(mod, [], state) - - {:producer, state, {:continue, continue}} -> - init_producer(mod, [], state, {:continue, continue}) + init_producer(mod, [], state, :no_continue) {:producer, state, opts} when is_list(opts) -> - init_producer(mod, opts, state) + init_producer(mod, opts, state, :no_continue) - {:producer, state, {:continue, continue}, opts} when is_list(opts) -> - init_producer(mod, opts, state, {:continue, continue}) + {:producer, state, continue_or_hibernate} -> + init_producer(mod, [], state, continue_or_hibernate) - {:producer_consumer, state} -> - init_producer_consumer(mod, [], state) + {:producer, state, continue_or_hibernate, opts} when is_list(opts) -> + init_producer(mod, opts, state, continue_or_hibernate) - {:producer_consumer, state, {:continue, continue}} -> - init_producer_consumer(mod, [], state, {:continue, continue}) + {:producer_consumer, state} -> + init_producer_consumer(mod, [], state, :no_continue) {:producer_consumer, state, opts} when is_list(opts) -> - init_producer_consumer(mod, opts, state) + init_producer_consumer(mod, opts, state, :no_continue) - {:producer_consumer, state, {:continue, continue}, opts} when is_list(opts) -> - init_producer_consumer(mod, opts, state, {:continue, continue}) + {:producer_consumer, state, continue_or_hibernate} -> + init_producer_consumer(mod, [], state, continue_or_hibernate) - {:consumer, state} -> - init_consumer(mod, [], state) + {:producer_consumer, state, continue_or_hibernate, opts} when is_list(opts) -> + init_producer_consumer(mod, opts, state, continue_or_hibernate) - {:consumer, state, {:continue, continue}} -> - init_consumer(mod, [], state, {:continue, continue}) + {:consumer, state} -> + init_consumer(mod, [], state, :no_continue) {:consumer, state, opts} when is_list(opts) -> - init_consumer(mod, opts, state) + init_consumer(mod, opts, state, :no_continue) - {:consumer, state, {:continue, continue}, opts} when is_list(opts) -> - init_consumer(mod, opts, state, {:continue, continue}) + {:consumer, state, continue_or_hibernate} -> + init_consumer(mod, [], state, continue_or_hibernate) + + {:consumer, state, continue_or_hibernate, opts} when is_list(opts) -> + init_consumer(mod, opts, state, continue_or_hibernate) {:stop, _} = stop -> stop @@ -1770,7 +1770,7 @@ defmodule GenStage do end end - defp init_producer(mod, opts, state, continue \\ :no_continue) do + defp init_producer(mod, opts, state, continue_or_hibernate) do with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts), {:ok, buffer_size, opts} <- Utils.validate_integer(opts, :buffer_size, 10000, 0, :infinity, true), @@ -1790,9 +1790,9 @@ defmodule GenStage do dispatcher_state: dispatcher_state } - case continue do + case continue_or_hibernate do :no_continue -> {:ok, stage} - continue -> {:ok, stage, continue} + continue_or_hibernate -> {:ok, stage, continue_or_hibernate} end else {:error, message} -> {:stop, {:bad_opts, message}} @@ -1815,7 +1815,7 @@ defmodule GenStage do end end - defp init_producer_consumer(mod, opts, state, continue \\ :no_continue) do + defp init_producer_consumer(mod, opts, state, continue_or_hibernate) do with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts), {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []), {:ok, buffer_size, opts} <- @@ -1834,17 +1834,17 @@ defmodule GenStage do dispatcher_state: dispatcher_state } - consumer_init_subscribe(subscribe_to, stage, continue) + consumer_init_subscribe(subscribe_to, stage, continue_or_hibernate) else {:error, message} -> {:stop, {:bad_opts, message}} end end - defp init_consumer(mod, opts, state, continue \\ :no_continue) do + defp init_consumer(mod, opts, state, continue_or_hibernate) do with {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []), :ok <- Utils.validate_no_opts(opts) do stage = %GenStage{mod: mod, state: state, type: :consumer} - consumer_init_subscribe(subscribe_to, stage, continue) + consumer_init_subscribe(subscribe_to, stage, continue_or_hibernate) else {:error, message} -> {:stop, {:bad_opts, message}} end @@ -2407,7 +2407,7 @@ defmodule GenStage do ## Consumer helpers - defp consumer_init_subscribe(producers, stage, continue) do + defp consumer_init_subscribe(producers, stage, continue_or_hibernate) do fold_fun = fn to, {:ok, stage, :no_continue} -> case consumer_subscribe(to, stage) do @@ -2416,9 +2416,9 @@ defmodule GenStage do {:stop, reason, _} -> {:stop, reason} end - to, {:ok, stage, continue} -> + to, {:ok, stage, continue_or_hibernate} -> case consumer_subscribe(to, stage) do - {:reply, _, stage} -> {:ok, stage, continue} + {:reply, _, stage} -> {:ok, stage, continue_or_hibernate} {:stop, reason, _, _} -> {:stop, reason} {:stop, reason, _} -> {:stop, reason} end @@ -2427,7 +2427,12 @@ defmodule GenStage do {:stop, reason} end - :lists.foldl(fold_fun, {:ok, stage, continue}, producers) + :lists.foldl(fold_fun, {:ok, stage, continue_or_hibernate}, producers) + |> case do + {:ok, stage, :no_continue} -> {:ok, stage} + {:ok, stage, continue_or_hibernate} -> {:ok, stage, continue_or_hibernate} + otherwise -> otherwise + end end defp consumer_receive({_, ref} = from, {producer_id, cancel, {demand, min, max}}, events, stage) do diff --git a/test/gen_stage_test.exs b/test/gen_stage_test.exs index 39c69fc..7b062e5 100644 --- a/test/gen_stage_test.exs +++ b/test/gen_stage_test.exs @@ -143,7 +143,7 @@ defmodule GenStageTest do defmodule Postponer do @moduledoc """ - Discards all events. + Postpones all events. """ use GenStage From abb99b249e8c1334bedd1670c9059cd3a0b8c4ed Mon Sep 17 00:00:00 2001 From: Danie Palm Date: Mon, 27 May 2019 17:07:26 +0200 Subject: [PATCH 4/4] simplify consumer init subscribe --- lib/gen_stage.ex | 8 -------- 1 file changed, 8 deletions(-) diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex index b16aabc..6ee97f0 100644 --- a/lib/gen_stage.ex +++ b/lib/gen_stage.ex @@ -2409,13 +2409,6 @@ defmodule GenStage do defp consumer_init_subscribe(producers, stage, continue_or_hibernate) do fold_fun = fn - to, {:ok, stage, :no_continue} -> - case consumer_subscribe(to, stage) do - {:reply, _, stage} -> {:ok, stage} - {:stop, reason, _, _} -> {:stop, reason} - {:stop, reason, _} -> {:stop, reason} - end - to, {:ok, stage, continue_or_hibernate} -> case consumer_subscribe(to, stage) do {:reply, _, stage} -> {:ok, stage, continue_or_hibernate} @@ -2430,7 +2423,6 @@ defmodule GenStage do :lists.foldl(fold_fun, {:ok, stage, continue_or_hibernate}, producers) |> case do {:ok, stage, :no_continue} -> {:ok, stage} - {:ok, stage, continue_or_hibernate} -> {:ok, stage, continue_or_hibernate} otherwise -> otherwise end end