Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Non-group workers are not restarted after crash #104

Open
jtrees opened this issue Mar 31, 2022 · 0 comments · May be fixed by #107
Open

Bug: Non-group workers are not restarted after crash #104

jtrees opened this issue Mar 31, 2022 · 0 comments · May be fixed by #107

Comments

@jtrees
Copy link

jtrees commented Mar 31, 2022

In a group supervision tree, workers behave as expected: If their handler triggers a crash, they are restarted and the group can continue handling messages.

In a non-group supervision tree, crashed workers are simply ignored and never restarted. This is unintuitive and a pain to deal with, since the outer supervisor (the process returned by Elsa.Supervisor.start_link/1) keeps running as though everything were fine. I would expect the DynamicProcessManager to take care of worker restarts.

Here is an example integration test that fails and demonstrates the problem:

test "restarts a crashed worker that isn't in a group" do
  topic = "consumer-test3"
  Elsa.create_topic(@brokers, topic)

  start_supervised!(
    {Elsa.Supervisor,
     connection: :name1,
     endpoints: @brokers,
     consumer: [
       topic: topic,
       handler: Testing.ExampleMessageHandlerWithState,
       handler_init_args: %{pid: self()},
       begin_offset: :earliest
     ]}
  )

  send_messages(topic, ["message1"])
  send_messages(topic, ["message2"])

  assert_receive {:message, %{topic: ^topic, value: "message1"}}, 5_000
  assert_receive {:message, %{topic: ^topic, value: "message2"}}, 5_000

  kill_worker(topic)

  send_messages(topic, ["message3"])
  send_messages(topic, ["message4"])

  # These assertions fail, because the worker wasn't brought back up.
  assert_receive {:message, %{topic: ^topic, value: "message3"}}, 5_000
  assert_receive {:message, %{topic: ^topic, value: "message4"}}, 5_000
end

defmodule Testing.ExampleMessageHandlerWithState do
  use Elsa.Consumer.MessageHandler

  def init(args) do
    {:ok, args}
  end

  def handle_messages(messages, state) do
    Enum.each(messages, &send(state.pid, {:message, &1}))
    {:ack, state}
  end
end

defp send_messages(topic, messages) do
  :brod.start_link_client(@brokers, :test_client)
  :brod.start_producer(:test_client, topic, [])

  messages
  |> Enum.with_index()
  |> Enum.each(fn {msg, index} ->
    partition = rem(index, 2)
    :brod.produce_sync(:test_client, topic, partition, "", msg)
  end)
end

defp kill_worker(topic) do
  partition = 0

  worker_pid = Elsa.Registry.whereis_name({:elsa_registry_name1, :"worker_#{topic}_#{partition}"})
  Process.exit(worker_pid, :kill)

  assert false == Process.alive?(worker_pid)
end
jtrees added a commit to jtrees/elsa that referenced this issue Apr 25, 2023
The new behaviour is less surprising and consistent with the way group
consumer workers behave.

Fixes bbalser#104
jtrees added a commit to jtrees/elsa that referenced this issue Apr 25, 2023
The new behaviour is less surprising and consistent with the way group
consumer workers behave.

Fixes bbalser#104
jtrees added a commit to jtrees/elsa that referenced this issue Apr 25, 2023
The new behaviour is less surprising and consistent with the way group
consumer workers behave.

Fixes bbalser#104
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant