diff --git a/lib/dpul_collections/index_metrics_tracker.ex b/lib/dpul_collections/index_metrics_tracker.ex index 5dd380de..d30edad0 100644 --- a/lib/dpul_collections/index_metrics_tracker.ex +++ b/lib/dpul_collections/index_metrics_tracker.ex @@ -4,7 +4,20 @@ defmodule DpulCollections.IndexMetricsTracker do alias DpulCollections.IndexingPipeline.Figgy def start_link(_) do - GenServer.start_link(__MODULE__, [], name: __MODULE__) + GenServer.start_link(__MODULE__, %{}, name: __MODULE__) + end + + @impl true + def init(_) do + :ok = + :telemetry.attach( + "metrics-ack-tracker", + [:database_producer, :ack, :done], + &handle_ack_received/4, + nil + ) + + {:ok, %{}} end def register_fresh_index(source) do @@ -19,33 +32,21 @@ defmodule DpulCollections.IndexMetricsTracker do Metrics.index_metrics(source.processor_marker_key(), "full_index") end - @impl true - def init(_) do - {:ok, %{}} - end - @impl true def handle_call({:fresh_index, source}, _, state) do - new_state = put_in(state, [source], %{start_time: :erlang.monotonic_time()}) + new_state = + put_in(state, [source.processor_marker_key()], %{ + start_time: :erlang.monotonic_time(), + acked_count: 0 + }) + {:reply, nil, new_state} end def handle_call({:poll_started, source}, _, state) do - if get_in(state, [source, :start_time]) != nil && get_in(state, [source, :end_time]) == nil do - state = put_in(state, [source, :end_time], :erlang.monotonic_time()) - duration = state[source][:end_time] - state[source][:start_time] - - :telemetry.execute( - [:dpulc, :indexing_pipeline, event(source), :time_to_poll], - %{duration: duration}, - %{source: source} - ) - - Metrics.create_index_metric(%{ - type: source.processor_marker_key(), - measurement_type: "full_index", - duration: System.convert_time_unit(duration, :native, :second) - }) + if get_in(state, [source.processor_marker_key(), :start_time]) != nil && + get_in(state, [source.processor_marker_key(), :end_time]) == nil do + state = put_in(state, [source.processor_marker_key(), :request_end], true) {:reply, nil, state} else @@ -53,15 +54,89 @@ defmodule DpulCollections.IndexMetricsTracker do end end - def event(Figgy.HydrationProducerSource) do + def handle_call( + {:ack_received, metadata = %{processor_marker_key: processor_marker_key}}, + _, + state + ) do + state = + state + |> put_in( + [processor_marker_key], + handle_ack_received(metadata, Map.get(state, processor_marker_key)) + ) + + {:reply, nil, state} + end + + # If there's no stored info yet, do nothing. + defp handle_ack_received(_event, nil), do: nil + # If there's a start and end time, do nothing + defp handle_ack_received( + _event, + processor_state = %{start_time: _start_time, end_time: _end_time} + ), + do: processor_state + + # If there's a start, trigger for end time, and the unacked_count is 0, create the IndexMetric. + defp handle_ack_received( + %{ + processor_marker_key: processor_marker_key, + acked_count: new_acked_count, + unacked_count: 0 + }, + processor_state = %{ + start_time: _start_time, + request_end: true, + acked_count: old_acked_count + } + ) do + processor_state = + processor_state + |> put_in([:end_time], :erlang.monotonic_time()) + |> Map.delete(:request_end) + |> put_in([:acked_count], old_acked_count + new_acked_count) + + duration = processor_state[:end_time] - processor_state[:start_time] + + :telemetry.execute( + [:dpulc, :indexing_pipeline, event(processor_marker_key), :time_to_poll], + %{duration: duration}, + %{source: processor_marker_key} + ) + + Metrics.create_index_metric(%{ + type: processor_marker_key, + measurement_type: "full_index", + duration: System.convert_time_unit(duration, :native, :second), + records_acked: processor_state[:acked_count] + }) + + processor_state + end + + # If there's a start time, record the acked_count + defp handle_ack_received( + %{acked_count: new_acked_count}, + processor_state = %{start_time: _start_time, acked_count: old_acked_count} + ) do + processor_state + |> put_in([:acked_count], old_acked_count + new_acked_count) + end + + def event("figgy_hydrator") do :hydrator end - def event(Figgy.TransformationProducerSource) do + def event("figgy_transformer") do :transformer end - def event(Figgy.IndexingProducerSource) do + def event("figgy_indexer") do :indexer end + + defp handle_ack_received([:database_producer, :ack, :done], _measurements, metadata, _config) do + GenServer.call(__MODULE__, {:ack_received, metadata}) + end end diff --git a/lib/dpul_collections/indexing_pipeline/index_metric.ex b/lib/dpul_collections/indexing_pipeline/index_metric.ex index c3b233f3..0ae96c4d 100644 --- a/lib/dpul_collections/indexing_pipeline/index_metric.ex +++ b/lib/dpul_collections/indexing_pipeline/index_metric.ex @@ -7,7 +7,7 @@ defmodule DpulCollections.IndexingPipeline.IndexMetric do field :measurement_type, :string # Duration in seconds field :duration, :integer - field :records_acked, :integer + field :records_acked, :integer, default: 0 timestamps(type: :utc_datetime_usec) end @@ -15,7 +15,7 @@ defmodule DpulCollections.IndexingPipeline.IndexMetric do @doc false def changeset(index_metric, attrs) do index_metric - |> cast(attrs, [:type, :measurement_type, :duration]) - |> validate_required([:type, :measurement_type, :duration]) + |> cast(attrs, [:type, :measurement_type, :duration, :records_acked]) + |> validate_required([:type, :measurement_type, :duration, :records_acked]) end end diff --git a/lib/dpul_collections/indexing_pipeline/metrics.ex b/lib/dpul_collections/indexing_pipeline/metrics.ex index b3ee2c2e..ce15d958 100644 --- a/lib/dpul_collections/indexing_pipeline/metrics.ex +++ b/lib/dpul_collections/indexing_pipeline/metrics.ex @@ -21,7 +21,8 @@ defmodule DpulCollections.IndexingPipeline.Metrics do def index_metrics(type, measurement_type) do query = from r in IndexMetric, - where: r.type == ^type and r.measurement_type == ^measurement_type + where: r.type == ^type and r.measurement_type == ^measurement_type, + order_by: [desc: r.inserted_at] Repo.all(query) end diff --git a/test/dpul_collections/index_metrics_tracker_test.exs b/test/dpul_collections/index_metrics_tracker_test.exs index 8cf9359c..c9a0f575 100644 --- a/test/dpul_collections/index_metrics_tracker_test.exs +++ b/test/dpul_collections/index_metrics_tracker_test.exs @@ -19,7 +19,20 @@ defmodule DpulCollections.IndexMetricsTrackerTest do processor_marker_key: HydrationProducerSource.processor_marker_key() } ) + IndexMetricsTracker.register_polling_started(HydrationProducerSource) + # Send an ack done with unacked_count 1, this tracks ack but doesn't + # finish. + :telemetry.execute( + [:database_producer, :ack, :done], + %{}, + %{ + acked_count: 1, + unacked_count: 1, + processor_marker_key: HydrationProducerSource.processor_marker_key() + } + ) + # Send an ack done with unacked_count 0, this triggers an index time # create. :telemetry.execute( @@ -31,12 +44,13 @@ defmodule DpulCollections.IndexMetricsTrackerTest do processor_marker_key: HydrationProducerSource.processor_marker_key() } ) + [metric = %IndexMetric{}] = IndexMetricsTracker.index_times(HydrationProducerSource) # Assert # This is 0 because it takes less than a second to run. assert metric.duration == 0 - assert metric.records_acked == 2 + assert metric.records_acked == 3 end end end diff --git a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs index 09889ce6..25cf1515 100644 --- a/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs +++ b/test/dpul_collections/indexing_pipeline/integration/full_integration_test.exs @@ -163,8 +163,7 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do # Ensure metrics are being sent. assert_receive {:hydrator_time_to_poll_hit, %{duration: _}} [hydration_metric_1 | _] = IndexMetricsTracker.index_times(HydrationProducerSource) - # This is 0 because hydration production takes less than a second to run. - assert hydration_metric_1.duration == 0 + assert hydration_metric_1.duration > 0 end test "indexes expected fields" do