Skip to content

Commit

Permalink
Merge pull request #3055 from alphagov/flaky-test
Browse files Browse the repository at this point in the history
Fix broken GovukIndex::PageTrafficJob and flaky tests
  • Loading branch information
kevindew authored Nov 20, 2024
2 parents 073869a + 34304d2 commit 3520d7b
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 22 deletions.
18 changes: 0 additions & 18 deletions lib/base_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,6 @@ def self.notify_of_failures
end
end

# Wait for all tasks for the given queue/job class combination to be
# completed before continuing
def self.wait_until_processed(max_timeout: 2 * 60 * 60)
Timeout.timeout(max_timeout) do
# wait for all queued tasks to be started
sleep 1 while Sidekiq::Queue.new(self::QUEUE_NAME).any? { |job| job.display_class == to_s }

# wait for started tasks to be finished
sleep 1 while active_jobs?
end
end

def self.active_jobs?
Sidekiq::Job.jobs.any? do |_, _, work|
work["queue"] == self::QUEUE_NAME && work["payload"]["class"] == to_s
end
end

private

def indexes(index_name)
Expand Down
20 changes: 20 additions & 0 deletions lib/govuk_index/page_traffic_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@ class PageTrafficJob < BaseJob
QUEUE_NAME = "bulk".freeze
sidekiq_options queue: QUEUE_NAME

# Wait for all tasks for the given queue/job class combination to be
# completed before continuing
def self.wait_until_processed(max_timeout: 2 * 60 * 60)
Timeout.timeout(max_timeout) do
# wait for all queued tasks to be started
sleep 1 while Sidekiq::Queue.new(self::QUEUE_NAME).any? { |job| job.klass == to_s }

# wait for started tasks to be finished
sleep 1 while active_jobs?
end
end

def self.active_jobs?
# This code makes use of Sidekiq::API which is not advised for application
# usage and is hard to test, be very careful changing it.
Sidekiq::WorkSet.new.any? do |_, _, work|
work.queue == self::QUEUE_NAME && work.payload["class"] == to_s
end
end

def self.perform_async(records, destination_index, cluster_key)
data = Base64.encode64(Zlib::Deflate.deflate(Sidekiq.dump_json(records)))
super(data, destination_index, cluster_key)
Expand Down
6 changes: 2 additions & 4 deletions spec/integration/indexer/amend_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
let(:updates) { { "title" => "New title" } }
let(:cluster_count) { Clusters.count }

before do
Sidekiq::Job.clear_all
end

it "amends documents" do
commit_document(index_name, document)

Expand Down Expand Up @@ -41,6 +37,8 @@
job.perform(index_name, link, updates)
}.to change { described_class.jobs.count }.by(1)
end
# clear the side effects of Sidekiq::Testing.fake!
Sidekiq::Job.clear_all
end

it "forwards to failure queue" do
Expand Down
92 changes: 92 additions & 0 deletions spec/unit/govuk_index/page_traffic_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
require "spec_helper"

RSpec.describe GovukIndex::PageTrafficJob do
describe ".wait_until_processed" do
before do
allow(described_class).to receive(:sleep)
allow(described_class).to receive(:active_jobs?).and_return(false)
allow(Sidekiq::Queue)
.to receive(:new)
.with(described_class::QUEUE_NAME)
.and_return([])
end

it "doesn't wait if there are no queued items and no active jobs" do
described_class.wait_until_processed

expect(described_class).not_to have_received(:sleep)
end

it "waits if there are queued items for this queue and job class" do
job = Sidekiq::JobRecord.new({ "class" => described_class.name }.to_json,
described_class::QUEUE_NAME)
matching_jobs = [job, job]
no_matching_jobs = []

allow(Sidekiq::Queue)
.to receive(:new)
.with(described_class::QUEUE_NAME)
.and_return(matching_jobs, matching_jobs, no_matching_jobs)

described_class.wait_until_processed

expect(described_class).to have_received(:sleep).with(1).exactly(2).times
end

it "waits if there are active jobs" do
allow(described_class).to receive(:active_jobs?).and_return(true, true, false)

described_class.wait_until_processed

expect(described_class).to have_received(:sleep).with(1).exactly(2).times
end

it "raises an error if it waits longer than the max_timeout" do
allow(described_class).to receive(:active_jobs?).and_return(true)
allow(described_class).to receive(:sleep).and_call_original

expect { described_class.wait_until_processed(max_timeout: 0.01) }
.to raise_error(Timeout::Error)
end
end

describe ".active_jobs?" do
let(:key) { SecureRandom.base64(16) }
let(:thread_id) { rand(1000..2000) }

it "returns false if there are no Sidekiq jobs running" do
allow(Sidekiq::WorkSet).to receive(:new).and_return([])

expect(described_class.active_jobs?).to be(false)
end

it "returns false if Sidekiq jobs aren't for this job class or queue" do
different_class_work = Sidekiq::Work.new(
key,
thread_id,
{ "queue" => described_class::QUEUE_NAME, "payload" => { "class" => "SomeOtherJob" } },
)
different_queue_work = Sidekiq::Work.new(
key,
thread_id,
{ "queue" => "different-queue", "payload" => { "class" => described_class.name } },
)
jobs = [[key, thread_id, different_class_work], [key, thread_id, different_queue_work]]
allow(Sidekiq::WorkSet).to receive(:new).and_return(jobs)

expect(described_class.active_jobs?).to be(false)
end

it "returns true if there are Sidekiq jobs running on this queue for this class" do
work = Sidekiq::Work.new(
key,
thread_id,
{ "queue" => described_class::QUEUE_NAME, "payload" => { "class" => described_class.name } },
)
jobs = [[key, thread_id, work]]
allow(Sidekiq::WorkSet).to receive(:new).and_return(jobs)

expect(described_class.active_jobs?).to be(true)
end
end
end

0 comments on commit 3520d7b

Please sign in to comment.