Skip to content

Commit

Permalink
Add stored_demand to HydratorProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
tpendragon committed Sep 24, 2024
1 parent 356a4b2 commit 396fd03
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
last_queried_marker: Figgy.ResourceMarker.t(),
pulled_records: [Figgy.ResourceMarker.t()],
acked_records: [Figgy.ResourceMarker.t()],
cache_version: Integer
cache_version: Integer,
stored_demand: Integer
}
def init(cache_version) do
last_queried_marker = IndexingPipeline.get_processor_marker!("hydrator", cache_version)
Expand All @@ -26,7 +27,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
last_queried_marker: last_queried_marker |> Figgy.ResourceMarker.from(),
pulled_records: [],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:producer, initial_state}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

assert new_state == expected_state
Expand All @@ -39,7 +40,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

{:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state)
Expand All @@ -56,7 +58,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

assert new_state == expected_state
Expand All @@ -79,7 +82,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
fabricated_marker
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

{:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state)
Expand All @@ -95,7 +99,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

assert new_state == expected_state
Expand All @@ -114,7 +119,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: fabricated_marker,
pulled_records: [],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

{:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state)
Expand All @@ -126,7 +132,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: fabricated_marker,
pulled_records: [],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

assert new_state == expected_state
Expand All @@ -144,7 +151,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

acked_markers =
Expand All @@ -163,7 +171,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker3
],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -187,7 +196,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker3,
pulled_records: [],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand Down Expand Up @@ -217,7 +227,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

acked_markers =
Expand All @@ -236,7 +247,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker2
],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -257,7 +269,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker3,
pulled_records: [],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

acked_markers =
Expand All @@ -270,7 +283,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker3,
pulled_records: [],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -296,7 +310,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker2
],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

acked_markers =
Expand All @@ -314,7 +329,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker2
],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -341,7 +357,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

first_ack =
Expand All @@ -355,7 +372,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -373,7 +391,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker2,
pulled_records: [],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand Down

0 comments on commit 396fd03

Please sign in to comment.