diff --git a/config/config.exs b/config/config.exs index 45b9e3d2..c7a22718 100644 --- a/config/config.exs +++ b/config/config.exs @@ -61,6 +61,8 @@ config :logger, :console, # Use Jason for JSON parsing in Phoenix config :phoenix, :json_library, Jason +config :dpul_collections, :figgy_hydrator, poll_interval: 60000 + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{config_env()}.exs" diff --git a/config/test.exs b/config/test.exs index e91088b3..e81df83f 100644 --- a/config/test.exs +++ b/config/test.exs @@ -55,3 +55,6 @@ config :dpul_collections, :solr, %{ username: System.get_env("SOLR_USERNAME"), password: System.get_env("SOLR_PASSWORD") } + +# Set this poll interval really small so it triggers in test. +config :dpul_collections, :figgy_hydrator, poll_interval: 50 diff --git a/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex b/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex index 93f65e52..96c0ceb8 100644 --- a/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex +++ b/lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex @@ -40,11 +40,13 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do state = %{ last_queried_marker: last_queried_marker, pulled_records: pulled_records, - acked_records: acked_records + acked_records: acked_records, + stored_demand: stored_demand } - ) - when demand > 0 do - records = IndexingPipeline.get_figgy_resources_since!(last_queried_marker, demand) + ) do + total_demand = stored_demand + demand + + records = IndexingPipeline.get_figgy_resources_since!(last_queried_marker, total_demand) new_state = state @@ -57,10 +59,40 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do Enum.concat(pulled_records, Enum.map(records, &Figgy.ResourceMarker.from/1)) ) |> Map.put(:acked_records, acked_records) + |> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records))) + + # Set a timer to try fulfilling demand again later + if new_state.stored_demand > 0 do + Process.send_after( + self(), + :check_for_updates, + Application.get_env(:dpul_collections, :figgy_hydrator)[:poll_interval] + ) + end {:noreply, Enum.map(records, &wrap_record/1), new_state} end + defp calculate_stored_demand(total_demand, fulfilled_demand) + when total_demand == fulfilled_demand do + 0 + end + + defp calculate_stored_demand(total_demand, fulfilled_demand) + when total_demand > fulfilled_demand do + total_demand - fulfilled_demand + end + + def handle_info(:check_for_updates, state = %{stored_demand: demand}) + when demand > 0 do + new_demand = 0 + handle_demand(new_demand, state) + end + + def handle_info(:check_for_updates, state) do + {:noreply, [], state} + end + @impl GenStage def handle_info({:ack, :figgy_producer_ack, pending_markers}, state) do messages = [] diff --git a/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs b/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs index 137fb2bb..446f3f2f 100644 --- a/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs +++ b/test/dpul_collections/indexing_pipeline/figgy/hydration_producer_test.exs @@ -133,7 +133,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do pulled_records: [], acked_records: [], cache_version: 0, - stored_demand: 0 + stored_demand: 1 } assert new_state == expected_state @@ -405,5 +405,10 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do assert processor_marker == marker2 end + + test ".handle_info(:check_for_updates) with no stored demand" do + assert Figgy.HydrationProducer.handle_info(:check_for_updates, %{stored_demand: 0}) == + {:noreply, [], %{stored_demand: 0}} + end end end