diff --git a/lib/base_job.rb b/lib/base_job.rb index 8166e58ef..85a880a93 100644 --- a/lib/base_job.rb +++ b/lib/base_job.rb @@ -13,24 +13,6 @@ def self.notify_of_failures end end - # Wait for all tasks for the given queue/job class combination to be - # completed before continuing - def self.wait_until_processed(max_timeout: 2 * 60 * 60) - Timeout.timeout(max_timeout) do - # wait for all queued tasks to be started - sleep 1 while Sidekiq::Queue.new(self::QUEUE_NAME).any? { |job| job.display_class == to_s } - - # wait for started tasks to be finished - sleep 1 while active_jobs? - end - end - - def self.active_jobs? - Sidekiq::Job.jobs.any? do |_, _, work| - work["queue"] == self::QUEUE_NAME && work["payload"]["class"] == to_s - end - end - private def indexes(index_name) diff --git a/lib/govuk_index/page_traffic_job.rb b/lib/govuk_index/page_traffic_job.rb index b6307214f..f7812240d 100644 --- a/lib/govuk_index/page_traffic_job.rb +++ b/lib/govuk_index/page_traffic_job.rb @@ -4,6 +4,26 @@ class PageTrafficJob < BaseJob QUEUE_NAME = "bulk".freeze sidekiq_options queue: QUEUE_NAME + # Wait for all tasks for the given queue/job class combination to be + # completed before continuing + def self.wait_until_processed(max_timeout: 2 * 60 * 60) + Timeout.timeout(max_timeout) do + # wait for all queued tasks to be started + sleep 1 while Sidekiq::Queue.new(self::QUEUE_NAME).any? { |job| job.klass == to_s } + + # wait for started tasks to be finished + sleep 1 while active_jobs? + end + end + + def self.active_jobs? + # This code makes use of Sidekiq::API which is not advised for application + # usage and is hard to test, be very careful changing it. + Sidekiq::WorkSet.new.any? do |_, _, work| + work.queue == self::QUEUE_NAME && work.payload["class"] == to_s + end + end + def self.perform_async(records, destination_index, cluster_key) data = Base64.encode64(Zlib::Deflate.deflate(Sidekiq.dump_json(records))) super(data, destination_index, cluster_key) diff --git a/spec/integration/indexer/amend_job_spec.rb b/spec/integration/indexer/amend_job_spec.rb index 5d10e077b..953961d6a 100644 --- a/spec/integration/indexer/amend_job_spec.rb +++ b/spec/integration/indexer/amend_job_spec.rb @@ -9,10 +9,6 @@ let(:updates) { { "title" => "New title" } } let(:cluster_count) { Clusters.count } - before do - Sidekiq::Job.clear_all - end - it "amends documents" do commit_document(index_name, document) @@ -41,6 +37,8 @@ job.perform(index_name, link, updates) }.to change { described_class.jobs.count }.by(1) end + # clear the side effects of Sidekiq::Testing.fake! + Sidekiq::Job.clear_all end it "forwards to failure queue" do diff --git a/spec/unit/govuk_index/page_traffic_job_spec.rb b/spec/unit/govuk_index/page_traffic_job_spec.rb new file mode 100644 index 000000000..b5bc40531 --- /dev/null +++ b/spec/unit/govuk_index/page_traffic_job_spec.rb @@ -0,0 +1,92 @@ +require "spec_helper" + +RSpec.describe GovukIndex::PageTrafficJob do + describe ".wait_until_processed" do + before do + allow(described_class).to receive(:sleep) + allow(described_class).to receive(:active_jobs?).and_return(false) + allow(Sidekiq::Queue) + .to receive(:new) + .with(described_class::QUEUE_NAME) + .and_return([]) + end + + it "doesn't wait if there are no queued items and no active jobs" do + described_class.wait_until_processed + + expect(described_class).not_to have_received(:sleep) + end + + it "waits if there are queued items for this queue and job class" do + job = Sidekiq::JobRecord.new({ "class" => described_class.name }.to_json, + described_class::QUEUE_NAME) + matching_jobs = [job, job] + no_matching_jobs = [] + + allow(Sidekiq::Queue) + .to receive(:new) + .with(described_class::QUEUE_NAME) + .and_return(matching_jobs, matching_jobs, no_matching_jobs) + + described_class.wait_until_processed + + expect(described_class).to have_received(:sleep).with(1).exactly(2).times + end + + it "waits if there are active jobs" do + allow(described_class).to receive(:active_jobs?).and_return(true, true, false) + + described_class.wait_until_processed + + expect(described_class).to have_received(:sleep).with(1).exactly(2).times + end + + it "raises an error if it waits longer than the max_timeout" do + allow(described_class).to receive(:active_jobs?).and_return(true) + allow(described_class).to receive(:sleep).and_call_original + + expect { described_class.wait_until_processed(max_timeout: 0.01) } + .to raise_error(Timeout::Error) + end + end + + describe ".active_jobs?" do + let(:key) { SecureRandom.base64(16) } + let(:thread_id) { rand(1000..2000) } + + it "returns false if there are no Sidekiq jobs running" do + allow(Sidekiq::WorkSet).to receive(:new).and_return([]) + + expect(described_class.active_jobs?).to be(false) + end + + it "returns false if Sidekiq jobs aren't for this job class or queue" do + different_class_work = Sidekiq::Work.new( + key, + thread_id, + { "queue" => described_class::QUEUE_NAME, "payload" => { "class" => "SomeOtherJob" } }, + ) + different_queue_work = Sidekiq::Work.new( + key, + thread_id, + { "queue" => "different-queue", "payload" => { "class" => described_class.name } }, + ) + jobs = [[key, thread_id, different_class_work], [key, thread_id, different_queue_work]] + allow(Sidekiq::WorkSet).to receive(:new).and_return(jobs) + + expect(described_class.active_jobs?).to be(false) + end + + it "returns true if there are Sidekiq jobs running on this queue for this class" do + work = Sidekiq::Work.new( + key, + thread_id, + { "queue" => described_class::QUEUE_NAME, "payload" => { "class" => described_class.name } }, + ) + jobs = [[key, thread_id, work]] + allow(Sidekiq::WorkSet).to receive(:new).and_return(jobs) + + expect(described_class.active_jobs?).to be(true) + end + end +end