diff --git a/lib/elsa/consumer/worker.ex b/lib/elsa/consumer/worker.ex index 504b986..d0a8cc2 100644 --- a/lib/elsa/consumer/worker.ex +++ b/lib/elsa/consumer/worker.ex @@ -6,7 +6,7 @@ defmodule Elsa.Consumer.Worker do passed in from the manager before calling the ack function to notify the cluster the messages have been successfully processed. """ - use GenServer, restart: :temporary, shutdown: 10_000 + use GenServer require Logger import Elsa.Supervisor, only: [registry: 1] @@ -55,6 +55,28 @@ defmodule Elsa.Consumer.Worker do GenServer.start_link(__MODULE__, init_args) end + def child_spec(arg) do + {worker_type, init_arg} = Keyword.pop!(arg, :worker_type) + + # Group workers are managed via the `Elsa.Group.Supervisor` + # supervision tree processes. If a process needs restarting, it gets handled there. + # + # Non-group consumers are on their own though and need their OTP supervisor + # to restart them if they crash. + restart = + case worker_type do + :group -> :temporary + :non_group -> :transient + end + + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [init_arg]}, + restart: restart, + shutdown: 10_000 + } + end + def init(init_args) do Process.flag(:trap_exit, true) diff --git a/lib/elsa/group/manager/worker_manager.ex b/lib/elsa/group/manager/worker_manager.ex index 01e7b85..222c63f 100644 --- a/lib/elsa/group/manager/worker_manager.ex +++ b/lib/elsa/group/manager/worker_manager.ex @@ -75,6 +75,7 @@ defmodule Elsa.Group.Manager.WorkerManager do assignment = Enum.into(brod_received_assignment(assignment), %{}) init_args = [ + worker_type: :group, topic: assignment.topic, partition: assignment.partition, generation_id: generation_id, diff --git a/lib/elsa/supervisor.ex b/lib/elsa/supervisor.ex index ffbc690..45de2ae 100644 --- a/lib/elsa/supervisor.ex +++ b/lib/elsa/supervisor.ex @@ -196,6 +196,7 @@ defmodule Elsa.Supervisor do consumer_args = args + |> Keyword.put(:worker_type, :non_group) |> Keyword.put(:registry, registry) |> Keyword.put(:connection, connection) |> Keyword.put(:topics, topics)