From 2b89125cca94fcbd85a062f80fa024c1587f8918 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Tue, 10 Nov 2020 17:10:11 -0500 Subject: [PATCH 1/2] Allow GenConsumer callbacks to return :stop replies --- lib/kafka_ex/gen_consumer.ex | 119 +++++++++++------- .../consumer_group_implementation_test.exs | 70 ++++++++++- 2 files changed, 146 insertions(+), 43 deletions(-) diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index d29febf2..180affd0 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -232,6 +232,7 @@ defmodule KafkaEx.GenConsumer do """ @callback init(topic :: binary, partition :: non_neg_integer) :: {:ok, state :: term} + | {:stop, reason :: term} @doc """ Invoked when the server is started. `start_link/5` will block until it @@ -255,7 +256,7 @@ defmodule KafkaEx.GenConsumer do topic :: binary, partition :: non_neg_integer, extra_args :: map() - ) :: {:ok, state :: term} + ) :: {:ok, state :: term} | {:stop, reason :: term} @doc """ Invoked for each message set consumed from a Kafka topic partition. @@ -287,6 +288,8 @@ defmodule KafkaEx.GenConsumer do """ @callback handle_call(call :: term, from :: GenServer.from(), state :: term) :: {:reply, reply_value :: term, new_state :: term} + | {:stop, reason :: term, reply_value :: term, new_state :: term} + | {:stop, reason :: term, new_state :: term} @doc """ Invoked by `KafkaEx.GenConsumer.cast/2`. @@ -296,6 +299,7 @@ defmodule KafkaEx.GenConsumer do """ @callback handle_cast(cast :: term, state :: term) :: {:noreply, new_state :: term} + | {:stop, reason :: term, new_state :: term} @doc """ Invoked by sending messages to the consumer. @@ -305,6 +309,7 @@ defmodule KafkaEx.GenConsumer do """ @callback handle_info(info :: term, state :: term) :: {:noreply, new_state :: term} + | {:stop, reason :: term, new_state :: term} defmacro __using__(_opts) do quote do @@ -541,44 +546,49 @@ defmodule KafkaEx.GenConsumer do api_versions = Keyword.get(opts, :api_versions, %{}) api_versions = Map.merge(default_api_versions, api_versions) - {:ok, consumer_state} = - consumer_module.init(topic, partition, extra_consumer_args) + case consumer_module.init(topic, partition, extra_consumer_args) do + {:ok, consumer_state} -> + worker_opts = Keyword.take(opts, [:uris, :use_ssl, :ssl_options]) - worker_opts = Keyword.take(opts, [:uris, :use_ssl, :ssl_options]) + {:ok, worker_name} = + KafkaEx.create_worker( + :no_name, + [consumer_group: group_name] ++ worker_opts + ) - {:ok, worker_name} = - KafkaEx.create_worker( - :no_name, - [consumer_group: group_name] ++ worker_opts - ) + default_fetch_options = [ + auto_commit: false, + worker_name: worker_name + ] - default_fetch_options = [ - auto_commit: false, - worker_name: worker_name - ] + given_fetch_options = Keyword.get(opts, :fetch_options, []) - given_fetch_options = Keyword.get(opts, :fetch_options, []) - fetch_options = Keyword.merge(default_fetch_options, given_fetch_options) - - state = %State{ - consumer_module: consumer_module, - consumer_state: consumer_state, - commit_interval: commit_interval, - commit_threshold: commit_threshold, - auto_offset_reset: auto_offset_reset, - worker_name: worker_name, - group: group_name, - topic: topic, - partition: partition, - generation_id: generation_id, - member_id: member_id, - fetch_options: fetch_options, - api_versions: api_versions - } + fetch_options = + Keyword.merge(default_fetch_options, given_fetch_options) + + state = %State{ + consumer_module: consumer_module, + consumer_state: consumer_state, + commit_interval: commit_interval, + commit_threshold: commit_threshold, + auto_offset_reset: auto_offset_reset, + worker_name: worker_name, + group: group_name, + topic: topic, + partition: partition, + generation_id: generation_id, + member_id: member_id, + fetch_options: fetch_options, + api_versions: api_versions + } - Process.flag(:trap_exit, true) + Process.flag(:trap_exit, true) - {:ok, state, 0} + {:ok, state, 0} + + {:stop, reason} -> + {:stop, reason} + end end def handle_call(:partition, _from, state) do @@ -597,14 +607,23 @@ defmodule KafkaEx.GenConsumer do # which we turn into a timeout = 0 clause so that we continue to consume. # any other GenServer flow control could have unintended consequences, # so we leave that for later consideration - {:reply, reply, new_consumer_state} = + consumer_reply = consumer_module.handle_call( message, from, consumer_state ) - {:reply, reply, %{state | consumer_state: new_consumer_state}, 0} + case consumer_reply do + {:reply, reply, new_consumer_state} -> + {:reply, reply, %{state | consumer_state: new_consumer_state}, 0} + + {:stop, reason, new_consumer_state} -> + {:stop, reason, %{state | consumer_state: new_consumer_state}} + + {:stop, reason, reply, new_consumer_state} -> + {:stop, reason, reply, %{state | consumer_state: new_consumer_state}} + end end def handle_cast( @@ -618,13 +637,19 @@ defmodule KafkaEx.GenConsumer do # which we turn into a timeout = 0 clause so that we continue to consume. # any other GenServer flow control could have unintended consequences, # so we leave that for later consideration - {:noreply, new_consumer_state} = + consumer_reply = consumer_module.handle_cast( message, consumer_state ) - {:noreply, %{state | consumer_state: new_consumer_state}, 0} + case consumer_reply do + {:noreply, new_consumer_state} -> + {:noreply, %{state | consumer_state: new_consumer_state}, 0} + + {:stop, reason, new_consumer_state} -> + {:stop, reason, %{state | consumer_state: new_consumer_state}} + end end def handle_info( @@ -660,13 +685,19 @@ defmodule KafkaEx.GenConsumer do # which we turn into a timeout = 0 clause so that we continue to consume. # any other GenServer flow control could have unintended consequences, # so we leave that for later consideration - {:noreply, new_consumer_state} = + consumer_reply = consumer_module.handle_info( message, consumer_state ) - {:noreply, %{state | consumer_state: new_consumer_state}, 0} + case consumer_reply do + {:noreply, new_consumer_state} -> + {:noreply, %{state | consumer_state: new_consumer_state}, 0} + + {:stop, reason, new_consumer_state} -> + {:stop, reason, %{state | consumer_state: new_consumer_state}} + end end def terminate(_reason, %State{} = state) do @@ -689,7 +720,8 @@ defmodule KafkaEx.GenConsumer do KafkaEx.fetch( topic, partition, - Keyword.merge(fetch_options, + Keyword.merge( + fetch_options, offset: offset, api_version: Map.fetch!(state.api_versions, :fetch) ) @@ -850,9 +882,12 @@ defmodule KafkaEx.GenConsumer do # one of these needs to match, depending on which client case partition_response do # old client - ^partition -> :ok + ^partition -> + :ok + # new client - %{error_code: :no_error, partition: ^partition} -> :ok + %{error_code: :no_error, partition: ^partition} -> + :ok end Logger.debug(fn -> diff --git a/test/integration/consumer_group_implementation_test.exs b/test/integration/consumer_group_implementation_test.exs index 6e2cb26e..a7cef07e 100644 --- a/test/integration/consumer_group_implementation_test.exs +++ b/test/integration/consumer_group_implementation_test.exs @@ -1,5 +1,5 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do - use ExUnit.Case + use ExUnit.Case, async: false alias KafkaEx.ConsumerGroup alias KafkaEx.GenConsumer @@ -75,14 +75,30 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do {:reply, Map.get(state, key), state} end + def handle_call({:stop, msg}, _from, state) do + {:stop, :test_stop, msg, state} + end + + def handle_call(:stop, _from, state) do + {:stop, :test_stop, state} + end + def handle_cast({:set, key, value}, state) do {:noreply, Map.put_new(state, key, value)} end + def handle_cast(:stop, state) do + {:stop, :test_stop, state} + end + def handle_info({:set, key, value}, state) do {:noreply, Map.put_new(state, key, value)} end + def handle_info(:stop, state) do + {:stop, :test_stop, state} + end + def handle_message_set(message_set, state) do Logger.debug(fn -> "Consumer #{inspect(self())} handled message set #{inspect(message_set)}" @@ -374,4 +390,56 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do assert :value == TestConsumer.get(consumer_pid, :test_info) end end + + test "handle call stop returns from callbacks", context do + consumer_group_pid = + ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) + + [c1, c2] = GenConsumer.Supervisor.child_pids(consumer_group_pid) + assert :foo = GenConsumer.call(c1, {:stop, :foo}) + + try do + GenConsumer.call(c2, :stop) + catch + _, err -> + assert {:test_stop, _} = err + end + + assert nil == Process.info(c1) + assert nil == Process.info(c2) + end + + test "handle cast stop returns from callbacks", context do + consumer_group_pid = + ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) + + [c1, _c2] = GenConsumer.Supervisor.child_pids(consumer_group_pid) + GenConsumer.cast(c1, :stop) + + try do + :sys.get_state(c1) + catch + _, err -> + assert {:test_stop, _} = err + end + + assert nil == Process.info(c1) + end + + test "handle info stop returns from callbacks", context do + consumer_group_pid = + ConsumerGroup.consumer_supervisor_pid(context[:consumer_group_pid1]) + + [c1, _c2] = GenConsumer.Supervisor.child_pids(consumer_group_pid) + send(c1, :stop) + + try do + :sys.get_state(c1) + catch + _, err -> + assert {:test_stop, _} = err + end + + assert nil == Process.info(c1) + end end From 6523b1fd07b36169fbd438b9d54d10d946b8febb Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Wed, 11 Nov 2020 11:25:14 -0500 Subject: [PATCH 2/2] Add test name to consumer group names to prevent tests affecting each other --- .../consumer_group_implementation_test.exs | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/test/integration/consumer_group_implementation_test.exs b/test/integration/consumer_group_implementation_test.exs index a7cef07e..09b049e1 100644 --- a/test/integration/consumer_group_implementation_test.exs +++ b/test/integration/consumer_group_implementation_test.exs @@ -146,14 +146,14 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do |> length end - setup do + setup context do ports_before = num_open_ports() {:ok, test_partitioner_pid} = TestPartitioner.start_link() {:ok, consumer_group_pid1} = ConsumerGroup.start_link( TestConsumer, - @consumer_group_name, + consumer_group_name(context), [@topic_name], heartbeat_interval: 100, partition_assignment_callback: &TestPartitioner.assign_partitions/2, @@ -163,7 +163,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do {:ok, consumer_group_pid2} = ConsumerGroup.start_link( TestConsumer, - @consumer_group_name, + consumer_group_name(context), [@topic_name], heartbeat_interval: 100, partition_assignment_callback: &TestPartitioner.assign_partitions/2, @@ -199,7 +199,9 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do generation_id2 = ConsumerGroup.generation_id(context[:consumer_group_pid2]) assert generation_id1 == generation_id2 - assert @consumer_group_name == + consumer_group_name = consumer_group_name(context) + + assert consumer_group_name == ConsumerGroup.group_name(context[:consumer_group_pid1]) member1 = ConsumerGroup.member_id(context[:consumer_group_pid1]) @@ -305,7 +307,11 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do for px <- partition_range do wait_for(fn -> ending_offset = - latest_consumer_offset_number(@topic_name, px, @consumer_group_name) + latest_consumer_offset_number( + @topic_name, + px, + consumer_group_name(context) + ) last_offset = Map.get(last_offsets, px) ending_offset == last_offset + 1 @@ -334,7 +340,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do {:ok, consumer_group_pid3} = ConsumerGroup.start_link( TestConsumer, - @consumer_group_name, + consumer_group_name(context), [@topic_name], heartbeat_interval: 100, partition_assignment_callback: &TestPartitioner.assign_partitions/2 @@ -442,4 +448,9 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do assert nil == Process.info(c1) end + + def consumer_group_name(context) do + test_name = context[:test] |> to_string() |> String.replace(" ", "_") + @consumer_group_name <> test_name + end end