From 1151db21d51433b592b55421cd9689d8c81ca339 Mon Sep 17 00:00:00 2001 From: Ziinc Date: Fri, 29 Nov 2024 17:42:43 +0800 Subject: [PATCH] Refactor persistent term usage to behaviour (#344) --- lib/broadway.ex | 58 ++++++++++++++++++- lib/broadway/config_storage.ex | 52 +++++++++++++++++ lib/broadway/config_storage/ets.ex | 46 +++++++++++++++ .../config_storage/persistent_term.ex | 43 ++++++++++++++ lib/broadway/topology.ex | 26 ++++----- test/broadway/config_storage_test.exs | 40 +++++++++++++ 6 files changed, 250 insertions(+), 15 deletions(-) create mode 100644 lib/broadway/config_storage.ex create mode 100644 lib/broadway/config_storage/ets.ex create mode 100644 lib/broadway/config_storage/persistent_term.ex create mode 100644 test/broadway/config_storage_test.exs diff --git a/lib/broadway.ex b/lib/broadway.ex index b39073a..2eee487 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -607,6 +607,58 @@ defmodule Broadway do > Those issues happen regardless of Broadway and solutions to said > problems almost always need to be addressed outside of Broadway too. + ## Configuration Storage + + Broadway stores configuration globally in a chosen storage method. Broadway comes with two configuration storage options: + + - `:persistent_term`, the default. + - `:ets` + + + + ### Persistent Term + + A `:persistent_term` backed configuration storage, which is the default storage option used. Configurations are not deleted when the Broadway server process goes down, so as to avoid a global GC. + + ```elixir + config Broadway, config_storage: :persistent_term + ``` + + ### ETS + An ETS-backed configuration storage. Only use this if performance improvements over the default `:persistent_term`-based storage is needed. + + To use this configuration storage option, set your application config.exs as so: + + ```elixir + config Broadway, config_storage: :ets + ``` + + To pass options, use a tuple with a keyword list as so: + + ```elixir + config Broadway, + config_storage: :ets, + config_storage_opts: [ + table_name: :my_table + ] + ``` + + Accepted options: + - `:table_name` - configure the table name. Defaults to `:broadway_configs`. + + #### Performance Improvements over `:persistent_term` + `:persistent_term` will trigger a global GC on each `put` or `erase`. For situations where there are a large number of dynamically created Broadway pipelines that are created or removed, this may result in the global GC being triggered multiple times. If there is a large number of processes, this may cause the system to be less responsive until all heaps have been scanned. + + As `Broadway.ConfigStorage.PersistentTerm` does not perform an erase when the Broadway server process goes down, it may result in memory buildup over time within the `:persistent_term` hash table, especially when dynamic names are used for the Broadway servers. + + Furthermore, the speed of storing and updating using `:persistent_term` is proportional to the number of already-created terms in the hash table, as the hash table (and term) is copied. + + Using `:ets` as the config storage will allow for a large number of Broadway server configurations to be stored and fetched without the associated performance tradeoffs that `:persistent_term` has. + + + + + ## Telemetry Broadway currently exposes following Telemetry events: @@ -793,7 +845,7 @@ defmodule Broadway do """ - alias Broadway.{BatchInfo, Message, Topology} + alias Broadway.{BatchInfo, Message, Topology, ConfigStorage} alias NimbleOptions.ValidationError @typedoc """ @@ -1158,7 +1210,9 @@ defmodule Broadway do @doc since: "1.0.0" @spec all_running() :: [name()] def all_running do - for {{Broadway, name}, %Broadway.Topology{}} <- :persistent_term.get(), + config_storage = ConfigStorage.get_module() + + for name <- config_storage.list(), (try do GenServer.whereis(name) rescue diff --git a/lib/broadway/config_storage.ex b/lib/broadway/config_storage.ex new file mode 100644 index 0000000..a08f7cc --- /dev/null +++ b/lib/broadway/config_storage.ex @@ -0,0 +1,52 @@ +defmodule Broadway.ConfigStorage do + @moduledoc false + alias Broadway.ConfigStorage.{Ets, PersistentTerm} + + @doc """ + Optional setup for the configuration storage + """ + @callback setup() :: :ok + + @doc """ + Lists all broadway names in the config storage + """ + @callback list() :: [term()] + + @doc """ + Puts the given key value pair in the underlying storage. + """ + @callback put(server :: term(), value :: %Broadway.Topology{}) :: term() + + @doc """ + Retrieves a configuration from the underlying storage + """ + @callback get(server :: term()) :: term() + + @doc """ + Deletes a configuration from the underlying storage + """ + @callback delete(server :: term()) :: boolean() + + @optional_callbacks setup: 0 + + @doc """ + Retrieves the configured module based on the `:config_storage` key. + """ + @spec get_module() :: module() + def get_module() do + Application.get_env(Broadway, :config_storage, :persistent_term) + |> case do + :ets -> Ets + :persistent_term -> PersistentTerm + mod -> mod + end + end + + @doc """ + Retrieves any options set on the `:config_storage` key. + """ + @spec get_options() :: keyword() + def get_options() do + Application.get_env(Broadway, :config_storage_opts) || [] + end +end diff --git a/lib/broadway/config_storage/ets.ex b/lib/broadway/config_storage/ets.ex new file mode 100644 index 0000000..0402f7d --- /dev/null +++ b/lib/broadway/config_storage/ets.ex @@ -0,0 +1,46 @@ +defmodule Broadway.ConfigStorage.Ets do + @moduledoc false + alias Broadway.ConfigStorage + @behaviour ConfigStorage + + @default_table :broadway_configs + + def default_table(), do: @default_table + + @impl ConfigStorage + def setup do + if :undefined == :ets.whereis(table()) do + :ets.new(table(), [:named_table, :public, :set, {:read_concurrency, true}]) + end + + :ok + end + + @impl ConfigStorage + def list do + :ets.select(table(), [{{:"$1", :_}, [], [:"$1"]}]) + end + + @impl ConfigStorage + def get(server) do + case :ets.match(table(), {server, :"$1"}) do + [[topology]] -> topology + _ -> nil + end + end + + @impl ConfigStorage + def put(server, topology) do + :ets.insert(table(), {server, topology}) + end + + @impl ConfigStorage + def delete(server) do + :ets.delete(table(), server) + end + + defp table() do + opts = ConfigStorage.get_options() + Keyword.get(opts, :table_name, @default_table) + end +end diff --git a/lib/broadway/config_storage/persistent_term.ex b/lib/broadway/config_storage/persistent_term.ex new file mode 100644 index 0000000..345efc3 --- /dev/null +++ b/lib/broadway/config_storage/persistent_term.ex @@ -0,0 +1,43 @@ +defmodule Broadway.ConfigStorage.PersistentTerm do + @moduledoc false + @behaviour Broadway.ConfigStorage + + @impl Broadway.ConfigStorage + def setup do + unless Code.ensure_loaded?(:persistent_term) do + require Logger + Logger.error("Broadway requires Erlang/OTP 21.3+") + raise "Broadway requires Erlang/OTP 21.3+" + end + + :ok + end + + @impl Broadway.ConfigStorage + def list do + for {{Broadway, name}, %Broadway.Topology{}} <- :persistent_term.get() do + name + end + end + + @impl Broadway.ConfigStorage + def get(server) do + :persistent_term.get({Broadway, server}, nil) + end + + @impl Broadway.ConfigStorage + def put(server, topology) do + :persistent_term.put({Broadway, server}, topology) + end + + @impl Broadway.ConfigStorage + def delete(_server) do + # We don't delete from persistent term on purpose. Since the process is + # named, we can assume it does not start dynamically, so it will either + # restart or the amount of memory it uses is negligibla to justify the + # process purging done by persistent_term. If the repo is restarted and + # stores the same metadata, then no purging happens either. + # :persistent_term.erase({Broadway, server}) + true + end +end diff --git a/lib/broadway/topology.ex b/lib/broadway/topology.ex index 4818bd6..7145906 100644 --- a/lib/broadway/topology.ex +++ b/lib/broadway/topology.ex @@ -11,6 +11,8 @@ defmodule Broadway.Topology do RateLimiter } + alias Broadway.ConfigStorage + defstruct [:context, :topology, :producer_names, :batchers_names, :rate_limiter_name] def start_link(module, opts) do @@ -34,7 +36,9 @@ defmodule Broadway.Topology do end defp config(server) do - :persistent_term.get({Broadway, server}, nil) || + config_storage = ConfigStorage.get_module() + + config_storage.get(server) || exit({:noproc, {__MODULE__, :config, [server]}}) end @@ -44,10 +48,10 @@ defmodule Broadway.Topology do def init({module, opts}) do Process.flag(:trap_exit, true) - unless Code.ensure_loaded?(:persistent_term) do - require Logger - Logger.error("Broadway requires Erlang/OTP 21.3+") - raise "Broadway requires Erlang/OTP 21.3+" + config_storage = ConfigStorage.get_module() + + if function_exported?(config_storage, :setup, 0) do + config_storage.setup() end # We want to invoke this as early as possible otherwise the @@ -59,7 +63,7 @@ defmodule Broadway.Topology do emit_init_event(opts, supervisor_pid) - :persistent_term.put({Broadway, config.name}, %__MODULE__{ + config_storage.put(config.name, %__MODULE__{ context: config.context, topology: build_topology_details(config), producer_names: process_names(config, "Producer", config.producer_config), @@ -86,19 +90,15 @@ defmodule Broadway.Topology do end @impl true - def terminate(reason, %{supervisor_pid: supervisor_pid, terminator: terminator}) do + def terminate(reason, %{name: name, supervisor_pid: supervisor_pid, terminator: terminator}) do Broadway.Topology.Terminator.trap_exit(terminator) ref = Process.monitor(supervisor_pid) Process.exit(supervisor_pid, reason_to_signal(reason)) receive do {:DOWN, ^ref, _, _, _} -> - # We don't delete from persistent term on purpose. Since the process is - # named, we can assume it does not start dynamically, so it will either - # restart or the amount of memory it uses is negligibla to justify the - # process purging done by persistent_term. If the repo is restarted and - # stores the same metadata, then no purging happens either. - # :persistent_term.erase({Broadway, name}) + config_storage = ConfigStorage.get_module() + config_storage.delete(name) :ok end diff --git a/test/broadway/config_storage_test.exs b/test/broadway/config_storage_test.exs new file mode 100644 index 0000000..893aabd --- /dev/null +++ b/test/broadway/config_storage_test.exs @@ -0,0 +1,40 @@ +defmodule Broadway.ConfigStorageTest do + use ExUnit.Case, async: false + alias Broadway.ConfigStorage.Ets + + setup do + prev = Application.get_env(Broadway, :config_storage) + prev_opts = Application.get_env(Broadway, :config_storage_opts) + + on_exit(fn -> + Application.put_env(Broadway, :config_storage, prev) + Application.put_env(Broadway, :config_storage_opts, prev_opts) + end) + end + + test "ets default options" do + Application.put_env(Broadway, :config_storage, :ets) + Ets.setup() + assert [] = Ets.list() + assert Ets.put("some name", %Broadway.Topology{}) + assert ["some name"] = Ets.list() + assert %Broadway.Topology{} = Ets.get("some name") + assert :ets.info(Ets.default_table(), :size) == 1 + Ets.delete("some name") + assert :ets.info(Ets.default_table(), :size) == 0 + end + + test "ets custom name" do + Application.put_env(Broadway, :config_storage, :ets) + Application.put_env(Broadway, :config_storage_opts, table_name: :my_table) + Ets.setup() + assert :ets.info(:my_table, :size) == 0 + assert [] = Ets.list() + assert Ets.put("some name", %Broadway.Topology{}) + assert ["some name"] = Ets.list() + assert %Broadway.Topology{} = Ets.get("some name") + assert :ets.info(:my_table, :size) == 1 + Ets.delete("some name") + assert :ets.info(:my_table, :size) == 0 + end +end