-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathconsumer.ex
257 lines (208 loc) · 10.1 KB
/
consumer.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
defmodule KafkaClient.Consumer do
@moduledoc """
Concurrent Kafka consumer.
This module provides a higher-level implementation which should fit most typical consumer needs.
In this implementation, records are processed in separate, partition-specific processes. This
ensures that records on the same partition are processed sequentially, while separate partitions
are processed concurrently.
If the process model of this consumer doesn't fit your purposes, you can use the lower-level
`KafkaClient.Consumer.Poller` abstraction.
For usage details see `start_link/1` function. Also refer to the `Poller` documentation for
the explanation of the common behaviour, such as load control, or telemetry.
"""
use Parent.GenServer
require Logger
alias KafkaClient.Consumer.{PartitionProcessor, Poller}
@type handler :: (notification -> any)
@type batch_size :: pos_integer() | :infinity
@type notification ::
{:assigned, [KafkaClient.topic_partition()]}
| {:unassigned, [KafkaClient.topic_partition()]}
| :caught_up
| {:records, [Poller.record()]}
@doc """
Starts the consumer process.
This function takes all the same options as `Poller.start_link/1`, with one exception. Instead of
the `:processor` option required by `Poller.start_link/1`, this function requires the `:handler`
option, which is an anonymous function of arity 1, that will be invoked on every notification
sent from the Kafka poller. The single argument passed to this function is of the type
`t:Poller.notification/0`.
Example:
KafkaClient.Consumer.start_link(
servers: ["localhost:9092"],
group_id: "mygroup",
subscriptions: ["topic1", "topic2", ...],
poll_duration: 10,
commit_interval: :timer.seconds(5),
max_batch_size: 10,
# These parameters are passed directly to the Java client.
# See https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
consumer_params: %{
"heartbeat.interval.ms" => 100,
"max.poll.interval.ms" => 1000,
},
handler: &handle_message/1
)
## Batching
The handler receives a batch of records, containing all messages currently present in the process
mailbox. The maximum size of the batch can be configured with the `:max_batch_size` option, which
is by default set to `:infinity`. If you want to process messages one by one, set the maximum
size to 1.
## Concurrency consideration
Messages on the same partition are processed sequentially. Messages on different partitions are
processed concurrently. Internally, the consumer maintains one long-running process per each
assigned partition. This process is started when the partition is assigned to the consumer, and
it is terminated if the partition is unassigned.
If the handler is invoked with the argument `{:records, records}`, the invocation takes place
inside the partition process. All other handler invocations take place inside the main consumer
process (which is the parent of the partitions processes). Avoid long processing, exceptions, and
exits from these other handler invocations, because they might block the consumer or take it down
completely. The `{:records, records}` handler invocation may run arbitrarily long, and it may
safely throw an exception (see [Processing guarantees](#processing-guarantees)).
## Processing guarantees
The consumer provides at-least-once processing guarantees, i.e. it is guaranteed that the
`handler({:records, records})` invocation will finish at least once for each batch of records.
After the handler function finishes, the consumer will commit the records to Kafka. This will
also happen if the handler function throws an exception. This is done via `Poller.ack/1`.
If you wish to handle the exception yourself, e.g. by retrying or republishing the records, you
must catch the exception inside the handler function.
If you wish to commit the records before they are processed, you can asynchronously send them to
another process, e.g. via `send` or `cast`, and then return from the handler function
immediately. Alternatively, you can spawn another process to handle the message. This will change
the processing guarantees to at-most-once, since it is possible that the records are committed,
but never fully processed (e.g. the machine is taken down after the commits are flushed, but
before the handler finishes).
If the handler is spawning processes, they must be started somewhere else in the application
supervision tree, not as direct children of the process where the handler is running (the
partition process). For example, if you wish to handle the message asynchronously in a task, use
`Task.Supervisor.start_child`, not `Task.start_link`. The latter may cause unexpected `:EXIT`
messages, in which case the entire consumer will terminate. On the other hand, using `Task.async`
with `Task.await` in the handler is fine, as long as you can be certain that tasks won't crash,
or that the `await` won't time out.
### Draining
If the consumer is being completely stopped (e.g. as a part of the normal system shutdown), it
will wait a bit until all of the currently running invocations of the `handler` function finish.
The remaining messages waiting in the queue will not be processed.
The default waiting time is 5 seconds. Processors taking longer to finish will be forcefully
terminated. The consumer will drain its processors concurrently. A drain time of 5 seconds means
that the consumer will wait 5 seconds for all of the processors to finish. The drain time can be
configured via the `:drain` option. Setting this value to zero effectively turns off the draining
behaviour.
The consumer will also attempt to flush the committed offsets to Kafka. This happens after the
processors are drained. The consumer will wait for additional 5 seconds for the commits to be
flushed. This behaviour is not configurable.
Draining also happens when some partitions are lost. However, in this case the offsets of the
drained messages will not be committed to Kafka (because at this point the partitions are already
unassigned from the consumer).
## Telemetry
In addition to telemetry events mentioned in the `Poller` docs, the consumer will emit the
events for the handler invocation:
- `kafka_client.consumer.records.handler.start.duration`
- `kafka_client.consumer.records.handler.stop.duration`
- `kafka_client.consumer.records.handler.exception.duration`
"""
@spec start_link([
Poller.option()
| {:handler, handler}
| {:max_batch_size, batch_size}
| {:drain, non_neg_integer}
| {:name, GenServer.name()}
]) ::
GenServer.on_start()
def start_link(opts) do
gen_server_opts = ~w/name/a
Parent.GenServer.start_link(
__MODULE__,
Keyword.drop(opts, gen_server_opts),
Keyword.take(opts, gen_server_opts)
)
end
@doc "Synchronously stops the consumer process."
@spec stop(GenServer.server(), timeout) :: :ok | {:error, :not_found}
def stop(server, timeout \\ :infinity) do
case GenServer.whereis(server) do
pid when is_pid(pid) -> GenServer.stop(server, :normal, timeout)
nil -> {:error, :not_found}
end
end
@impl GenServer
def init(opts) do
{handler, opts} = Keyword.pop!(opts, :handler)
{max_batch_size, opts} = Keyword.pop(opts, :max_batch_size, :infinity)
{drain, opts} = Keyword.pop(opts, :drain, :timer.seconds(5))
{:ok, poller} =
Parent.start_child(
{Poller, Keyword.put(opts, :processor, self())},
id: :poller,
restart: :temporary,
ephemeral?: true
)
{:ok, %{handler: handler, poller: poller, drain: drain, max_batch_size: max_batch_size}}
end
@impl GenServer
def terminate(_reason, state) do
Parent.children()
|> Enum.filter(&match?({:processor, _id}, &1.id))
|> Enum.map(& &1.pid)
|> stop_processors(state)
end
@impl GenServer
def handle_info({poller, message}, %{poller: poller} = state) do
handle_poller_message(message, state)
{:noreply, state}
end
@impl GenServer
def handle_info(_msg, state) do
{:noreply, state}
end
@impl Parent.GenServer
def handle_stopped_children(children, state) do
crashed_children = Map.keys(children)
{:stop, {:children_crashed, crashed_children}, state}
end
defp handle_poller_message({:assigned, partitions} = event, state) do
start_processors(state.handler, state.max_batch_size, partitions)
state.handler.(event)
end
defp handle_poller_message({:unassigned, partitions} = event, state) do
partitions
|> Enum.map(fn partition ->
{:ok, pid} = Parent.child_pid({:processor, partition})
pid
end)
|> stop_processors(state)
state.handler.(event)
end
defp handle_poller_message({:record, record}, _state) do
{:ok, pid} = Parent.child_pid({:processor, {record.topic, record.partition}})
PartitionProcessor.process_record(pid, record)
end
defp handle_poller_message(message, state),
do: state.handler.(message)
defp start_processors(handler, max_batch_size, partitions) do
Enum.each(
partitions,
fn {topic, partition} ->
{:ok, _pid} =
Parent.start_child(
{PartitionProcessor, {handler, max_batch_size}},
id: {:processor, {topic, partition}},
restart: :temporary,
ephemeral?: true
)
end
)
end
defp stop_processors(processors, state) do
if state.drain > 0 do
processors
|> Enum.map(&Task.async(PartitionProcessor, :drain, [&1, state.drain]))
# Infinity is fine, since each drain is performed with a timeout (state.drain)
|> Enum.each(&Task.await(&1, :infinity))
end
# Although the processors might have been stopped at this point, we still need to shut them
# down via Parent, to ensure they are removed from the Parent's internal structure. Otherwise,
# we'll receive an unexpected `handle_stopped_children`, and the consumer will stop.
Enum.each(processors, &Parent.shutdown_child/1)
end
end