Skip to content

Commit

Permalink
Refactor persistent term usage to behaviour (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziinc authored Nov 29, 2024
1 parent f043234 commit 1151db2
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 15 deletions.
58 changes: 56 additions & 2 deletions lib/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,58 @@ defmodule Broadway do
> Those issues happen regardless of Broadway and solutions to said
> problems almost always need to be addressed outside of Broadway too.
## Configuration Storage
Broadway stores configuration globally in a chosen storage method. Broadway comes with two configuration storage options:
- `:persistent_term`, the default.
- `:ets`
### Persistent Term
A `:persistent_term` backed configuration storage, which is the default storage option used. Configurations are not deleted when the Broadway server process goes down, so as to avoid a global GC.
```elixir
config Broadway, config_storage: :persistent_term
```
### ETS
An ETS-backed configuration storage. Only use this if performance improvements over the default `:persistent_term`-based storage is needed.
To use this configuration storage option, set your application config.exs as so:
```elixir
config Broadway, config_storage: :ets
```
To pass options, use a tuple with a keyword list as so:
```elixir
config Broadway,
config_storage: :ets,
config_storage_opts: [
table_name: :my_table
]
```
Accepted options:
- `:table_name` - configure the table name. Defaults to `:broadway_configs`.
#### Performance Improvements over `:persistent_term`
`:persistent_term` will trigger a global GC on each `put` or `erase`. For situations where there are a large number of dynamically created Broadway pipelines that are created or removed, this may result in the global GC being triggered multiple times. If there is a large number of processes, this may cause the system to be less responsive until all heaps have been scanned.
As `Broadway.ConfigStorage.PersistentTerm` does not perform an erase when the Broadway server process goes down, it may result in memory buildup over time within the `:persistent_term` hash table, especially when dynamic names are used for the Broadway servers.
Furthermore, the speed of storing and updating using `:persistent_term` is proportional to the number of already-created terms in the hash table, as the hash table (and term) is copied.
Using `:ets` as the config storage will allow for a large number of Broadway server configurations to be stored and fetched without the associated performance tradeoffs that `:persistent_term` has.
## Telemetry
Broadway currently exposes following Telemetry events:
Expand Down Expand Up @@ -793,7 +845,7 @@ defmodule Broadway do
"""

alias Broadway.{BatchInfo, Message, Topology}
alias Broadway.{BatchInfo, Message, Topology, ConfigStorage}
alias NimbleOptions.ValidationError

@typedoc """
Expand Down Expand Up @@ -1158,7 +1210,9 @@ defmodule Broadway do
@doc since: "1.0.0"
@spec all_running() :: [name()]
def all_running do
for {{Broadway, name}, %Broadway.Topology{}} <- :persistent_term.get(),
config_storage = ConfigStorage.get_module()

for name <- config_storage.list(),
(try do
GenServer.whereis(name)
rescue
Expand Down
52 changes: 52 additions & 0 deletions lib/broadway/config_storage.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Broadway.ConfigStorage do
@moduledoc false
alias Broadway.ConfigStorage.{Ets, PersistentTerm}

@doc """
Optional setup for the configuration storage
"""
@callback setup() :: :ok

@doc """
Lists all broadway names in the config storage
"""
@callback list() :: [term()]

@doc """
Puts the given key value pair in the underlying storage.
"""
@callback put(server :: term(), value :: %Broadway.Topology{}) :: term()

@doc """
Retrieves a configuration from the underlying storage
"""
@callback get(server :: term()) :: term()

@doc """
Deletes a configuration from the underlying storage
"""
@callback delete(server :: term()) :: boolean()

@optional_callbacks setup: 0

@doc """
Retrieves the configured module based on the `:config_storage` key.
"""
@spec get_module() :: module()
def get_module() do
Application.get_env(Broadway, :config_storage, :persistent_term)
|> case do
:ets -> Ets
:persistent_term -> PersistentTerm
mod -> mod
end
end

@doc """
Retrieves any options set on the `:config_storage` key.
"""
@spec get_options() :: keyword()
def get_options() do
Application.get_env(Broadway, :config_storage_opts) || []
end
end
46 changes: 46 additions & 0 deletions lib/broadway/config_storage/ets.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
defmodule Broadway.ConfigStorage.Ets do
@moduledoc false
alias Broadway.ConfigStorage
@behaviour ConfigStorage

@default_table :broadway_configs

def default_table(), do: @default_table

@impl ConfigStorage
def setup do
if :undefined == :ets.whereis(table()) do
:ets.new(table(), [:named_table, :public, :set, {:read_concurrency, true}])
end

:ok
end

@impl ConfigStorage
def list do
:ets.select(table(), [{{:"$1", :_}, [], [:"$1"]}])
end

@impl ConfigStorage
def get(server) do
case :ets.match(table(), {server, :"$1"}) do
[[topology]] -> topology
_ -> nil
end
end

@impl ConfigStorage
def put(server, topology) do
:ets.insert(table(), {server, topology})
end

@impl ConfigStorage
def delete(server) do
:ets.delete(table(), server)
end

defp table() do
opts = ConfigStorage.get_options()
Keyword.get(opts, :table_name, @default_table)
end
end
43 changes: 43 additions & 0 deletions lib/broadway/config_storage/persistent_term.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule Broadway.ConfigStorage.PersistentTerm do
@moduledoc false
@behaviour Broadway.ConfigStorage

@impl Broadway.ConfigStorage
def setup do
unless Code.ensure_loaded?(:persistent_term) do
require Logger
Logger.error("Broadway requires Erlang/OTP 21.3+")
raise "Broadway requires Erlang/OTP 21.3+"
end

:ok
end

@impl Broadway.ConfigStorage
def list do
for {{Broadway, name}, %Broadway.Topology{}} <- :persistent_term.get() do
name
end
end

@impl Broadway.ConfigStorage
def get(server) do
:persistent_term.get({Broadway, server}, nil)
end

@impl Broadway.ConfigStorage
def put(server, topology) do
:persistent_term.put({Broadway, server}, topology)
end

@impl Broadway.ConfigStorage
def delete(_server) do
# We don't delete from persistent term on purpose. Since the process is
# named, we can assume it does not start dynamically, so it will either
# restart or the amount of memory it uses is negligibla to justify the
# process purging done by persistent_term. If the repo is restarted and
# stores the same metadata, then no purging happens either.
# :persistent_term.erase({Broadway, server})
true
end
end
26 changes: 13 additions & 13 deletions lib/broadway/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ defmodule Broadway.Topology do
RateLimiter
}

alias Broadway.ConfigStorage

defstruct [:context, :topology, :producer_names, :batchers_names, :rate_limiter_name]

def start_link(module, opts) do
Expand All @@ -34,7 +36,9 @@ defmodule Broadway.Topology do
end

defp config(server) do
:persistent_term.get({Broadway, server}, nil) ||
config_storage = ConfigStorage.get_module()

config_storage.get(server) ||
exit({:noproc, {__MODULE__, :config, [server]}})
end

Expand All @@ -44,10 +48,10 @@ defmodule Broadway.Topology do
def init({module, opts}) do
Process.flag(:trap_exit, true)

unless Code.ensure_loaded?(:persistent_term) do
require Logger
Logger.error("Broadway requires Erlang/OTP 21.3+")
raise "Broadway requires Erlang/OTP 21.3+"
config_storage = ConfigStorage.get_module()

if function_exported?(config_storage, :setup, 0) do
config_storage.setup()
end

# We want to invoke this as early as possible otherwise the
Expand All @@ -59,7 +63,7 @@ defmodule Broadway.Topology do

emit_init_event(opts, supervisor_pid)

:persistent_term.put({Broadway, config.name}, %__MODULE__{
config_storage.put(config.name, %__MODULE__{
context: config.context,
topology: build_topology_details(config),
producer_names: process_names(config, "Producer", config.producer_config),
Expand All @@ -86,19 +90,15 @@ defmodule Broadway.Topology do
end

@impl true
def terminate(reason, %{supervisor_pid: supervisor_pid, terminator: terminator}) do
def terminate(reason, %{name: name, supervisor_pid: supervisor_pid, terminator: terminator}) do
Broadway.Topology.Terminator.trap_exit(terminator)
ref = Process.monitor(supervisor_pid)
Process.exit(supervisor_pid, reason_to_signal(reason))

receive do
{:DOWN, ^ref, _, _, _} ->
# We don't delete from persistent term on purpose. Since the process is
# named, we can assume it does not start dynamically, so it will either
# restart or the amount of memory it uses is negligibla to justify the
# process purging done by persistent_term. If the repo is restarted and
# stores the same metadata, then no purging happens either.
# :persistent_term.erase({Broadway, name})
config_storage = ConfigStorage.get_module()
config_storage.delete(name)
:ok
end

Expand Down
40 changes: 40 additions & 0 deletions test/broadway/config_storage_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Broadway.ConfigStorageTest do
use ExUnit.Case, async: false
alias Broadway.ConfigStorage.Ets

setup do
prev = Application.get_env(Broadway, :config_storage)
prev_opts = Application.get_env(Broadway, :config_storage_opts)

on_exit(fn ->
Application.put_env(Broadway, :config_storage, prev)
Application.put_env(Broadway, :config_storage_opts, prev_opts)
end)
end

test "ets default options" do
Application.put_env(Broadway, :config_storage, :ets)
Ets.setup()
assert [] = Ets.list()
assert Ets.put("some name", %Broadway.Topology{})
assert ["some name"] = Ets.list()
assert %Broadway.Topology{} = Ets.get("some name")
assert :ets.info(Ets.default_table(), :size) == 1
Ets.delete("some name")
assert :ets.info(Ets.default_table(), :size) == 0
end

test "ets custom name" do
Application.put_env(Broadway, :config_storage, :ets)
Application.put_env(Broadway, :config_storage_opts, table_name: :my_table)
Ets.setup()
assert :ets.info(:my_table, :size) == 0
assert [] = Ets.list()
assert Ets.put("some name", %Broadway.Topology{})
assert ["some name"] = Ets.list()
assert %Broadway.Topology{} = Ets.get("some name")
assert :ets.info(:my_table, :size) == 1
Ets.delete("some name")
assert :ets.info(:my_table, :size) == 0
end
end

0 comments on commit 1151db2

Please sign in to comment.