Skip to content

Commit

Permalink
Automatically upsert cron workloads
Browse files Browse the repository at this point in the history
and document (and rename for clarity) the methods which work with the list of scheduler entries. It was previously not really clear that one method would not do anything to the database and another method would. It is better to be descriptive in that regard
  • Loading branch information
julik committed Jun 11, 2024
1 parent fff867f commit 1860fce
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 18 deletions.
2 changes: 1 addition & 1 deletion lib/gouda.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ConcurrencyExceededError < StandardError
end

def self.start
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!

queue_constraint = if ENV["GOUDA_QUEUES"]
Gouda.parse_queue_constraint(ENV["GOUDA_QUEUES"])
Expand Down
5 changes: 3 additions & 2 deletions lib/gouda/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ class Railtie < Rails::Railtie
# The `to_prepare` block which is executed once in production
# and before each request in development.
config.to_prepare do
Gouda::Scheduler.update_schedule_from_config!

if defined?(Rails) && Rails.respond_to?(:application)
config_from_rails = Rails.application.config.try(:gouda)
if config_from_rails
Expand All @@ -52,6 +50,9 @@ class Railtie < Rails::Railtie
Gouda.config.polling_sleep_interval_seconds = 0.2
Gouda.config.logger.level = Gouda.config.log_level
end

Gouda::Scheduler.build_scheduler_entries_list!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end
end
end
48 changes: 46 additions & 2 deletions lib/gouda/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,33 @@ def kwargs_value
end
end

def self.update_schedule_from_config!(cron_table_hash = nil)
# Takes in a Hash formatted with cron entries in the format similar
# to good_job, and builds a table of scheduler entries. A scheduler
# entry references a particular job class name, the set of arguments to
# be passed to the job when performing it, and either the interval
# to repeat the job after or a cron pattern. This method does not
# insert the actual Workloads into the database but just builds the
# table of the entries. That table gets consulted when workloads finish
# to determine whether the workload that just ran was scheduled or ad-hoc,
# and whether the subsequent workload has to be enqueued.
#
# If no table is given the method will attempt to read the table from
# Rails application config from `[:gouda][:cron]`.
#
# The table is a Hash of entries, and the keys are the names of the workload
# to be enqueued - those keys are also used to ensure scheduled workloads
# only get scheduled once.
#
# @param cron_table_hash[Hash] a hash of the following shape:
# {
# download_invoices_every_minute: {
# cron: "* * * * *",
# class: "DownloadInvoicesJob",
# args: ["immediate"]
# }
# }
# @return Array[Entry]
def self.build_scheduler_entries_list!(cron_table_hash = nil)
Gouda.logger.info "Updating scheduled workload entries..."
if cron_table_hash.blank?
config_from_rails = Rails.application.config.try(:gouda)
Expand All @@ -76,6 +102,12 @@ def self.update_schedule_from_config!(cron_table_hash = nil)
end
end

# Once a workload has finished (doesn't matter whether it raised an exception
# or completed successfully), it is going to be passed to this method to enqueue
# the next scheduled workload
#
# @param finished_workload[Gouda::Workload]
# @return void
def self.enqueue_next_scheduled_workload_for(finished_workload)
return unless finished_workload.scheduler_key

Expand All @@ -86,11 +118,23 @@ def self.enqueue_next_scheduled_workload_for(finished_workload)
Gouda.enqueue_jobs_via_their_adapters([timer_entry.build_active_job])
end

# Returns the list of entries of the scheduler which are currently known. Normally the
# scheduler will hold the list of entries loaded from the Rails config.
#
# @return Array[Entry]
def self.entries
@cron_table || []
end

def self.update_scheduled_workloads!
# Will upsert (`INSERT ... ON CONFLICT UPDATE`) workloads for all entries which are in the scheduler entries
# table (the table needs to be read or hydrated first using `build_scheduler_entries_list!`). This is done
# in a transaction. Any workloads which have been previously inserted from the scheduled entries, but no
# longer have a corresponding scheduler entry, will be deleted from the database. If there already are workloads
# with the corresponding scheduler key they will not be touched and will be performed with their previously-defined
# arguments.
#
# @return void
def self.upsert_workloads_from_entries_list!
table_entries = @cron_table || []

# Remove any cron keyed workloads which no longer match config-wise
Expand Down
26 changes: 13 additions & 13 deletions test/gouda/scheduler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def perform
}

assert_nothing_raised do
Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.build_scheduler_entries_list!(tab)
Gouda::Scheduler.upsert_workloads_from_entries_list!
end

assert_equal 1, Gouda::Workload.enqueued.count
Expand All @@ -67,12 +67,12 @@ def perform
}

assert_nothing_raised do
Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.build_scheduler_entries_list!(tab)
end

assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
3.times do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end
end

Expand All @@ -85,7 +85,7 @@ def perform

assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
3.times do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end
end

Expand All @@ -111,11 +111,11 @@ def perform
}

assert_nothing_raised do
Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.build_scheduler_entries_list!(tab)
end

assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end

assert_equal [nil, nil], Gouda::Workload.first.serialized_params["arguments"]
Expand Down Expand Up @@ -149,13 +149,13 @@ def perform
}
}
assert_nothing_raised do
Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.build_scheduler_entries_list!(tab)
end

travel_to Time.utc(2023, 6, 23, 20, 0)
assert_changes_by(-> { Gouda::Workload.count }, exactly: 4) do
3.times do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end
end

Expand All @@ -165,15 +165,15 @@ def perform
kwargs: {mandatory: "good"}
}

Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.build_scheduler_entries_list!(tab)
assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end

assert tab.delete(:fifth)
Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.build_scheduler_entries_list!(tab)
assert_changes_by(-> { Gouda::Workload.count }, exactly: -1) do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end

Gouda::Workload.all.each(&:perform_and_update_state!)
Expand Down

0 comments on commit 1860fce

Please sign in to comment.