Skip to content

Commit

Permalink
Calculate index metrics from last ack.
Browse files Browse the repository at this point in the history
  • Loading branch information
tpendragon committed Nov 20, 2024
1 parent d84c753 commit 46bee0d
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 32 deletions.
125 changes: 100 additions & 25 deletions lib/dpul_collections/index_metrics_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,20 @@ defmodule DpulCollections.IndexMetricsTracker do
alias DpulCollections.IndexingPipeline.Figgy

def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

@impl true
def init(_) do
:ok =
:telemetry.attach(
"metrics-ack-tracker",
[:database_producer, :ack, :done],
&handle_ack_received/4,
nil
)

{:ok, %{}}
end

def register_fresh_index(source) do
Expand All @@ -19,49 +32,111 @@ defmodule DpulCollections.IndexMetricsTracker do
Metrics.index_metrics(source.processor_marker_key(), "full_index")
end

@impl true
def init(_) do
{:ok, %{}}
end

@impl true
def handle_call({:fresh_index, source}, _, state) do
new_state = put_in(state, [source], %{start_time: :erlang.monotonic_time()})
new_state =
put_in(state, [source.processor_marker_key()], %{
start_time: :erlang.monotonic_time(),
acked_count: 0
})

{:reply, nil, new_state}
end

def handle_call({:poll_started, source}, _, state) do
if get_in(state, [source, :start_time]) != nil && get_in(state, [source, :end_time]) == nil do
state = put_in(state, [source, :end_time], :erlang.monotonic_time())
duration = state[source][:end_time] - state[source][:start_time]

:telemetry.execute(
[:dpulc, :indexing_pipeline, event(source), :time_to_poll],
%{duration: duration},
%{source: source}
)

Metrics.create_index_metric(%{
type: source.processor_marker_key(),
measurement_type: "full_index",
duration: System.convert_time_unit(duration, :native, :second)
})
if get_in(state, [source.processor_marker_key(), :start_time]) != nil &&
get_in(state, [source.processor_marker_key(), :end_time]) == nil do
state = put_in(state, [source.processor_marker_key(), :request_end], true)

{:reply, nil, state}
else
{:reply, nil, state}
end
end

def event(Figgy.HydrationProducerSource) do
def handle_call(
{:ack_received, metadata = %{processor_marker_key: processor_marker_key}},
_,
state
) do
state =
state
|> put_in(
[processor_marker_key],
handle_ack_received(metadata, Map.get(state, processor_marker_key))
)

{:reply, nil, state}
end

# If there's no stored info yet, do nothing.
defp handle_ack_received(_event, nil), do: nil
# If there's a start and end time, do nothing
defp handle_ack_received(
_event,
processor_state = %{start_time: _start_time, end_time: _end_time}
),
do: processor_state

# If there's a start, trigger for end time, and the unacked_count is 0, create the IndexMetric.
defp handle_ack_received(
%{
processor_marker_key: processor_marker_key,
acked_count: new_acked_count,
unacked_count: 0
},
processor_state = %{
start_time: _start_time,
request_end: true,
acked_count: old_acked_count
}
) do
processor_state =
processor_state
|> put_in([:end_time], :erlang.monotonic_time())
|> Map.delete(:request_end)
|> put_in([:acked_count], old_acked_count + new_acked_count)

duration = processor_state[:end_time] - processor_state[:start_time]

:telemetry.execute(
[:dpulc, :indexing_pipeline, event(processor_marker_key), :time_to_poll],
%{duration: duration},
%{source: processor_marker_key}
)

Metrics.create_index_metric(%{
type: processor_marker_key,
measurement_type: "full_index",
duration: System.convert_time_unit(duration, :native, :second),
records_acked: processor_state[:acked_count]
})

processor_state
end

# If there's a start time, record the acked_count
defp handle_ack_received(
%{acked_count: new_acked_count},
processor_state = %{start_time: _start_time, acked_count: old_acked_count}
) do
processor_state
|> put_in([:acked_count], old_acked_count + new_acked_count)
end

def event("figgy_hydrator") do
:hydrator
end

def event(Figgy.TransformationProducerSource) do
def event("figgy_transformer") do
:transformer
end

def event(Figgy.IndexingProducerSource) do
def event("figgy_indexer") do
:indexer
end

defp handle_ack_received([:database_producer, :ack, :done], _measurements, metadata, _config) do
GenServer.call(__MODULE__, {:ack_received, metadata})
end
end
6 changes: 3 additions & 3 deletions lib/dpul_collections/indexing_pipeline/index_metric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ defmodule DpulCollections.IndexingPipeline.IndexMetric do
field :measurement_type, :string
# Duration in seconds
field :duration, :integer
field :records_acked, :integer
field :records_acked, :integer, default: 0

timestamps(type: :utc_datetime_usec)
end

@doc false
def changeset(index_metric, attrs) do
index_metric
|> cast(attrs, [:type, :measurement_type, :duration])
|> validate_required([:type, :measurement_type, :duration])
|> cast(attrs, [:type, :measurement_type, :duration, :records_acked])
|> validate_required([:type, :measurement_type, :duration, :records_acked])
end
end
3 changes: 2 additions & 1 deletion lib/dpul_collections/indexing_pipeline/metrics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ defmodule DpulCollections.IndexingPipeline.Metrics do
def index_metrics(type, measurement_type) do
query =
from r in IndexMetric,
where: r.type == ^type and r.measurement_type == ^measurement_type
where: r.type == ^type and r.measurement_type == ^measurement_type,
order_by: [desc: r.inserted_at]

Repo.all(query)
end
Expand Down
16 changes: 15 additions & 1 deletion test/dpul_collections/index_metrics_tracker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,20 @@ defmodule DpulCollections.IndexMetricsTrackerTest do
processor_marker_key: HydrationProducerSource.processor_marker_key()
}
)

IndexMetricsTracker.register_polling_started(HydrationProducerSource)
# Send an ack done with unacked_count 1, this tracks ack but doesn't
# finish.
:telemetry.execute(
[:database_producer, :ack, :done],
%{},
%{
acked_count: 1,
unacked_count: 1,
processor_marker_key: HydrationProducerSource.processor_marker_key()
}
)

# Send an ack done with unacked_count 0, this triggers an index time
# create.
:telemetry.execute(
Expand All @@ -31,12 +44,13 @@ defmodule DpulCollections.IndexMetricsTrackerTest do
processor_marker_key: HydrationProducerSource.processor_marker_key()
}
)

[metric = %IndexMetric{}] = IndexMetricsTracker.index_times(HydrationProducerSource)

# Assert
# This is 0 because it takes less than a second to run.
assert metric.duration == 0
assert metric.records_acked == 2
assert metric.records_acked == 3
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ defmodule DpulCollections.IndexingPipeline.FiggyFullIntegrationTest do
# Ensure metrics are being sent.
assert_receive {:hydrator_time_to_poll_hit, %{duration: _}}
[hydration_metric_1 | _] = IndexMetricsTracker.index_times(HydrationProducerSource)
# This is 0 because hydration production takes less than a second to run.
assert hydration_metric_1.duration == 0
assert hydration_metric_1.duration > 0
end

test "indexes expected fields" do
Expand Down

0 comments on commit 46bee0d

Please sign in to comment.