Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

features to pause batch execution #1323

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ Batches track a set of jobs, and enqueue an optional callback job when all of th
- [`GoodJob::Batch`](app/models/good_job/batch.rb) has a number of assignable attributes and methods:

```ruby
batch = GoodJob::Batch.new
batch = GoodJob::Batch.new # or .new(paused: true) to pause all jobs added to the batch
batch.description = "My batch"
batch.on_finish = "MyBatchCallbackJob" # Callback job when all jobs have finished
batch.on_success = "MyBatchCallbackJob" # Callback job when/if all jobs have succeeded
Expand All @@ -734,12 +734,14 @@ batch.add do
MyJob.perform_later
end
batch.enqueue
batch.unpause # Unpauses all jobs within the batch, allowing them to be executed

batch.discarded? # => Boolean
batch.discarded_at # => <DateTime>
batch.finished? # => Boolean
batch.finished_at # => <DateTime>
batch.succeeded? # => Boolean
batch.paused? # => Boolean // TODO: expand on what this method does
batch.active_jobs # => Array of ActiveJob::Base-inherited jobs that are part of the batch

batch = GoodJob::Batch.find(batch.id)
Expand Down Expand Up @@ -831,6 +833,10 @@ end
GoodJob::Batch.enqueue(on_finish: BatchJob)
```

#### Pausing batches

// TODO: document how to pause/unpause a batch (potentially create as an entirely separate section about pausing things?)

#### Other batch details

- Whether to enqueue a callback job is evaluated once the batch is in an `enqueued?`-state by using `GoodJob::Batch.enqueue` or `batch.enqueue`.
Expand Down
40 changes: 39 additions & 1 deletion app/models/good_job/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def enqueue(active_jobs = [], **properties, &block)
def add(active_jobs = nil, &block)
record.save if record.new_record?

buffer = Bulk::Buffer.new
buffer = Bulk::Buffer.new(pause_jobs: properties[:paused] || false)
buffer.add(active_jobs)
buffer.capture(&block) if block

Expand All @@ -130,6 +130,44 @@ def add(active_jobs = nil, &block)
buffer.active_jobs
end

# TODO: document
def paused?
# TODO: consider querying to see if any jobs within the batch are paused, and if/how that should be represented if that result does not match properties[:paused]
# I think there are probably going to need to be separate methods for "is the batch set to pause all jobs within it" and "does the batch contain any paused jobs" as those cases aren't always lined up
properties[:paused] || false
Copy link
Owner

@bensheldon bensheldon Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really want to support (at least right now, initially) having a special property of a batch ("paused") have a side effect or meaning to the library; I consider them arbitrary application data.

I realize it makes it a little less optimal to say: "To create a paused batch, enqueue any job that is paused" but I think that's a fine workaround for now. I sort of see the order here being:

  1. Design and implement the functionality to allow jobs to be paused/unpaused
    • I think you have all the changes for allowing scheduled_at to be nil
    • I think we need to handle the unpausing interface, especially in the dashboard
  2. Handle the situation in which a batch has paused jobs in it

And honestly, other than that, this looks great. I can help polish stuff up if whenever you're ok with where things are at.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be open to storing a batch's paused state in a separate column/hash/etc to avoid the interference, or would you prefer to remove that entirely?

re: unpausing interface

# public API
j = OtherJob.set(good_job_paused: true).perform_later
j.good_job_unpause # calls unpause(self) on the current adapter

# public API (maybe?) I put this on the adapter by default, not sure where else to put it
GoodJob::Adapter#unpause(jobs_or_ids) # The 'real' method for unpausing arbitrary jobs, loads GoodJob::Job records, calls unpause on individual records

# internal API, basically just what Batch#unpause is doing
r = GoodJob::Job.find(my_job_id)
r.paused?
r.unpause # probably need to provide some option to disable the notifier within the method so the caller can batch notifications

What functionality did you have in mind for the dashboard? I could see a use for unpausing jobs by hand if we also had the capability to pause jobs by hand, but if they're only paused programmatically I don't see as much benefit.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be open to storing a batch's paused state in a separate column/hash/etc to avoid the interference, or would you prefer to remove that entirely?

I'd like to defer that. So not one way or another in perpetuity, but for right now it looks like we have a path to allow for a pausing feature without changing any existing interfaces or database columns. This is mostly about my personal ability to handle complexity, not the technical stuff.

What functionality did you have in mind for the dashboard? I could see a use for unpausing jobs by hand if we also had the capability to pause jobs by hand, but if they're only paused programmatically I don't see as much benefit.

We'd need to show that the jobs are in a paused state on the dashboard, to answer the question of "why isn't this job running?" And it would be nice to also add options (simple ones) to pause and unpause through the UI.

On the programmitic side, I think the pause/unpause methods should be class methods on the job. e.g. OtherJob.unpause(jobs_or_ids)

end

# TODO
# def pause; end

# TODO: document
def unpause
# TODO: consider raising an exception if the batch isn't paused in the first place

# TODO: consider setting this at the end of the method, or doing something similar to help handle situations where an exception is raised during unpausing
assign_properties(paused: false)

# TODO: this could be implemented with COALESCE and `jsonb_path_query(serialized_params, '$.scheduled_at.datetime()')` to extract the previously scheduled time within a single UPDATE, but that method is not available in PG12 (still supported at the time of writing)
unpaused_count = 0

loop do
jobs = record.jobs.where(scheduled_at: nil).limit(1_000)
break if jobs.empty?

jobs.each do |job|
job.update!(scheduled_at: job.serialized_params['scheduled_at'] || Time.current)
end

jobs.collect(&:queue_name).tally.each do |q, c|
GoodJob::Notifier.notify({ queue_name: q, count: c })
end

unpaused_count += jobs.size
end

unpaused_count
end

def active_jobs
record.jobs.map(&:active_job)
end
Expand Down
6 changes: 4 additions & 2 deletions app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ def self.queue_parser(string)
scope :unfinished, -> { where(finished_at: nil) }

# Get executions that are not scheduled for a later time than now (i.e. jobs that
# are not scheduled or scheduled for earlier than the current time).
# are scheduled for earlier than the current time). Paused jobs are excluded.
# @!method only_scheduled
# @!scope class
# @return [ActiveRecord::Relation]
scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))).or(where(scheduled_at: nil)) }
scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))) }

# Order executions by priority (highest priority first).
# @!method priority_ordered
Expand Down Expand Up @@ -331,6 +331,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false
else
execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
execution.make_discrete if discrete_support?
execution.scheduled_at ||= Time.current # set after make_discrete so it can manage assigning both created_at and scheduled_at simultaneously
end

if create_with_advisory_lock
Expand All @@ -342,6 +343,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false
end

instrument_payload[:execution] = execution
execution.scheduled_at = nil if active_job.respond_to?(:good_job_paused) && active_job.good_job_paused
execution.save!

if retried
Expand Down
1 change: 1 addition & 0 deletions demo/app/jobs/other_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ class OtherJob < ApplicationJob
JobError = Class.new(StandardError)

def perform(*)
# raise 'nope'
end
end
1 change: 1 addition & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
require_relative "good_job/active_job_extensions/interrupt_errors"
require_relative "good_job/active_job_extensions/labels"
require_relative "good_job/active_job_extensions/notify_options"
require_relative "good_job/active_job_extensions/pauseable"

require_relative "good_job/overridable_connection"
require_relative "good_job/bulk"
Expand Down
52 changes: 52 additions & 0 deletions lib/good_job/active_job_extensions/pauseable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

module GoodJob
module ActiveJobExtensions
# Allows configuring whether the job should start 'paused' when a job is enqueued.
# Configuration will apply either globally to the Job Class, or individually to jobs
# on initial enqueue and subsequent retries.
#
# @example
# class MyJob < ApplicationJob
# self.good_job_paused = true
# end
#
# # Or, configure jobs individually to not notify:
# MyJob.set(good_job_paused: true).perform_later
#
# See also - GoodJob:Batch#new's `paused` option

module Pauseable
extend ActiveSupport::Concern

module Prepends
def enqueue(options = {})
self.good_job_paused = options[:good_job_paused] if options.key?(:good_job_paused)
super
end

# good_job_paused is intentionally excluded from the serialized params so we fully rely on the scheduled_at value once the job is enqueued
# TODO: remove before merge
# def serialize
# super.tap do |job_data|
# # job_data["good_job_paused"] = good_job_paused unless good_job_paused.nil?
# end
# end

# def deserialize(job_data)
# super
# self.good_job_paused = job_data["good_job_paused"]
# end
end

included do
prepend Prepends
class_attribute :good_job_paused, instance_accessor: false, instance_predicate: false, default: nil
attr_accessor :good_job_paused
end
end
end
end

# Jobs can be paused through batches which rely on good_job_paused being available, so this must be included globally
ActiveSupport.on_load(:active_job) { include GoodJob::ActiveJobExtensions::Pauseable }
9 changes: 5 additions & 4 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def enqueue_all(active_jobs)
if GoodJob::Execution.discrete_support?
execution.make_discrete
execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at
execution.scheduled_at = nil if active_job.respond_to?(:good_job_paused) && active_job.good_job_paused
end

execution.created_at = current_time
Expand Down Expand Up @@ -92,7 +93,7 @@ def enqueue_all(active_jobs)
executions = executions.select(&:persisted?) # prune unpersisted executions

if execute_inline?
inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) }
inline_executions = executions.select { |execution| execution.scheduled_at.present? && execution.scheduled_at <= Time.current }
inline_executions.each(&:advisory_lock!)
end
end
Expand Down Expand Up @@ -146,14 +147,14 @@ def enqueue_all(active_jobs)
# @param timestamp [Integer, nil] the epoch time to perform the job
# @return [GoodJob::Execution]
def enqueue_at(active_job, timestamp)
scheduled_at = timestamp ? Time.zone.at(timestamp) : nil
scheduled_at = timestamp ? Time.zone.at(timestamp) : Time.current

# If there is a currently open Bulk in the current thread, direct the
# job there to be enqueued using enqueue_all
return if GoodJob::Bulk.capture(active_job, queue_adapter: self)

Rails.application.executor.wrap do
will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current)
will_execute_inline = execute_inline? && (scheduled_at.present? && scheduled_at <= Time.current)
will_retry_inline = will_execute_inline && CurrentThread.execution&.active_job_id == active_job.job_id && !CurrentThread.retry_now

if will_retry_inline
Expand All @@ -171,7 +172,7 @@ def enqueue_at(active_job, timestamp)
result = execution.perform

retried_execution = result.retried
while retried_execution && (retried_execution.scheduled_at.nil? || retried_execution.scheduled_at <= Time.current)
while retried_execution && (retried_execution.scheduled_at.present? && retried_execution.scheduled_at <= Time.current)
execution = retried_execution
result = execution.perform
retried_execution = result.retried
Expand Down
6 changes: 5 additions & 1 deletion lib/good_job/bulk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ def self.unbuffer
end

class Buffer
def initialize
def initialize(pause_jobs: false)
@values = []
@pause_jobs = pause_jobs
end

def capture
Expand All @@ -79,6 +80,9 @@ def add(active_jobs, queue_adapter: nil)
adapter = queue_adapter || active_job.class.queue_adapter
raise Error, "Jobs must have a Queue Adapter" unless adapter

# TODO: should explicitly setting `good_job_paused = false` on a job override this?
active_job.good_job_paused = true if @pause_jobs && active_job.respond_to?(:good_job_paused)

[active_job, adapter]
end
@values.append(*new_pairs)
Expand Down
2 changes: 2 additions & 0 deletions spec/app/filters/good_job/jobs_filter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external)
ExampleJob.set(queue: 'cron').perform_later

GoodJob::Job.order(created_at: :asc).last.update!(cron_key: "frequent_cron")

ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline)
Expand All @@ -20,6 +21,7 @@

Timecop.travel 1.hour.ago
ExampleJob.set(queue: 'elephants').perform_later(ExampleJob::DEAD_TYPE)

5.times do
Timecop.travel 5.minutes
GoodJob.perform_inline
Expand Down
14 changes: 8 additions & 6 deletions spec/app/models/good_job/execution_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ def perform(result_value = nil, raise_error: false)
context 'when NOT discrete' do
before { allow(described_class).to receive(:discrete_support?).and_return(false) }

it 'does not assign id, scheduled_at' do
it 'does not assign id, does assign scheduled_at' do
expect { described_class.enqueue(active_job) }.to change(described_class, :count).by(1)

execution = described_class.last
expect(execution.id).not_to eq(active_job.job_id)
expect(execution).to have_attributes(
is_discrete: nil,
active_job_id: active_job.job_id,
scheduled_at: nil
scheduled_at: within(1).of(Time.current)
)
end
end
Expand All @@ -88,7 +88,7 @@ def perform(result_value = nil, raise_error: false)
serialized_params: a_kind_of(Hash),
queue_name: 'test',
priority: 50,
scheduled_at: nil
scheduled_at: within(1).of(Time.current)
)
end

Expand Down Expand Up @@ -182,9 +182,10 @@ def perform(result_value = nil, raise_error: false)

context 'with multiple jobs' do
def job_params
{ active_job_id: SecureRandom.uuid, queue_name: "default", priority: 0, serialized_params: { job_class: "TestJob" } }
{ active_job_id: SecureRandom.uuid, queue_name: "default", priority: 0, serialized_params: { job_class: "TestJob" }, scheduled_at: sched }
end

let!(:sched) { Time.current }
let!(:older_job) { described_class.create!(job_params.merge(created_at: 10.minutes.ago)) }
let!(:newer_job) { described_class.create!(job_params.merge(created_at: 5.minutes.ago)) }
let!(:low_priority_job) { described_class.create!(job_params.merge(priority: 20)) }
Expand All @@ -205,9 +206,10 @@ def job_params

context "with multiple jobs and ordered queues" do
def job_params
{ active_job_id: SecureRandom.uuid, queue_name: "default", priority: 0, serialized_params: { job_class: "TestJob" } }
{ active_job_id: SecureRandom.uuid, queue_name: "default", priority: 0, serialized_params: { job_class: "TestJob" }, scheduled_at: sched }
end

let!(:sched) { Time.current }
let(:parsed_queues) { { include: %w{one two}, ordered_queues: true } }
let!(:queue_two_job) { described_class.create!(job_params.merge(queue_name: "two", created_at: 10.minutes.ago, priority: 100)) }
let!(:queue_one_job) { described_class.create!(job_params.merge(queue_name: "one", created_at: 1.minute.ago, priority: 1)) }
Expand Down Expand Up @@ -698,7 +700,7 @@ def job_params
job_class: good_job.job_class,
queue_name: good_job.queue_name,
created_at: within(0.001).of(good_job.performed_at),
scheduled_at: within(0.001).of(good_job.created_at),
scheduled_at: within(0.1).of(good_job.created_at),
finished_at: within(1.second).of(Time.current),
error: nil,
serialized_params: good_job.serialized_params
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

expect(GoodJob::Execution).to have_received(:enqueue).with(
active_job,
scheduled_at: nil
scheduled_at: be_within(1).of(Time.current)
)
end

Expand Down
2 changes: 2 additions & 0 deletions todo.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* Indicate paused status in the jobs dashboard