Skip to content

Commit

Permalink
Always restart non-group worker processes that exited abnormally
Browse files Browse the repository at this point in the history
The new behaviour is less surprising and consistent with the way group
consumer workers behave.

Fixes bbalser#104
  • Loading branch information
jtrees committed Apr 25, 2023
1 parent 53eb831 commit 2c04b64
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
24 changes: 23 additions & 1 deletion lib/elsa/consumer/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Elsa.Consumer.Worker do
passed in from the manager before calling the ack function to
notify the cluster the messages have been successfully processed.
"""
use GenServer, restart: :temporary, shutdown: 10_000
use GenServer
require Logger

import Elsa.Supervisor, only: [registry: 1]
Expand Down Expand Up @@ -55,6 +55,28 @@ defmodule Elsa.Consumer.Worker do
GenServer.start_link(__MODULE__, init_args)
end

def child_spec(arg) do
{worker_type, init_arg} = Keyword.pop!(arg, :worker_type)

# Group workers are managed via the `Elsa.Group.Supervisor`
# supervision tree processes. If a process needs restarting, it gets handled there.
#
# Non-group consumers are on their own though and need their OTP supervisor
# to restart them if they crash.
restart =
case worker_type do
:group -> :temporary
:non_group -> :transient
end

%{
id: __MODULE__,
start: {__MODULE__, :start_link, [init_arg]},
restart: restart,
shutdown: 10_000
}
end

def init(init_args) do
Process.flag(:trap_exit, true)

Expand Down
1 change: 1 addition & 0 deletions lib/elsa/group/manager/worker_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ defmodule Elsa.Group.Manager.WorkerManager do
assignment = Enum.into(brod_received_assignment(assignment), %{})

init_args = [
worker_type: :group,
topic: assignment.topic,
partition: assignment.partition,
generation_id: generation_id,
Expand Down
1 change: 1 addition & 0 deletions lib/elsa/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ defmodule Elsa.Supervisor do

consumer_args =
args
|> Keyword.put(:worker_type, :non_group)
|> Keyword.put(:registry, registry)
|> Keyword.put(:connection, connection)
|> Keyword.put(:topics, topics)
Expand Down

0 comments on commit 2c04b64

Please sign in to comment.