Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically upsert cron workloads #2

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/gouda.rb
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ class ConcurrencyExceededError < StandardError
end

def self.start
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!

queue_constraint = if ENV["GOUDA_QUEUES"]
Gouda.parse_queue_constraint(ENV["GOUDA_QUEUES"])
5 changes: 3 additions & 2 deletions lib/gouda/railtie.rb
Original file line number Diff line number Diff line change
@@ -34,8 +34,6 @@ class Railtie < Rails::Railtie
# The `to_prepare` block which is executed once in production
# and before each request in development.
config.to_prepare do
Gouda::Scheduler.update_schedule_from_config!

if defined?(Rails) && Rails.respond_to?(:application)
config_from_rails = Rails.application.config.try(:gouda)
if config_from_rails
@@ -52,6 +50,9 @@ class Railtie < Rails::Railtie
Gouda.config.polling_sleep_interval_seconds = 0.2
Gouda.config.logger.level = Gouda.config.log_level
end

Gouda::Scheduler.build_scheduler_entries_list!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end
end
end
48 changes: 46 additions & 2 deletions lib/gouda/scheduler.rb
Original file line number Diff line number Diff line change
@@ -53,7 +53,33 @@ def kwargs_value
end
end

def self.update_schedule_from_config!(cron_table_hash = nil)
# Takes in a Hash formatted with cron entries in the format similar
# to good_job, and builds a table of scheduler entries. A scheduler
# entry references a particular job class name, the set of arguments to
# be passed to the job when performing it, and either the interval
# to repeat the job after or a cron pattern. This method does not
# insert the actual Workloads into the database but just builds the
# table of the entries. That table gets consulted when workloads finish
# to determine whether the workload that just ran was scheduled or ad-hoc,
# and whether the subsequent workload has to be enqueued.
#
# If no table is given the method will attempt to read the table from
# Rails application config from `[:gouda][:cron]`.
#
# The table is a Hash of entries, and the keys are the names of the workload
# to be enqueued - those keys are also used to ensure scheduled workloads
# only get scheduled once.
#
# @param cron_table_hash[Hash] a hash of the following shape:
# {
# download_invoices_every_minute: {
# cron: "* * * * *",
# class: "DownloadInvoicesJob",
# args: ["immediate"]
# }
# }
# @return Array[Entry]
def self.build_scheduler_entries_list!(cron_table_hash = nil)
Gouda.logger.info "Updating scheduled workload entries..."
if cron_table_hash.blank?
config_from_rails = Rails.application.config.try(:gouda)
@@ -76,6 +102,12 @@ def self.update_schedule_from_config!(cron_table_hash = nil)
end
end

# Once a workload has finished (doesn't matter whether it raised an exception
# or completed successfully), it is going to be passed to this method to enqueue
# the next scheduled workload
#
# @param finished_workload[Gouda::Workload]
# @return void
def self.enqueue_next_scheduled_workload_for(finished_workload)
return unless finished_workload.scheduler_key

@@ -86,11 +118,23 @@ def self.enqueue_next_scheduled_workload_for(finished_workload)
Gouda.enqueue_jobs_via_their_adapters([timer_entry.build_active_job])
end

# Returns the list of entries of the scheduler which are currently known. Normally the
# scheduler will hold the list of entries loaded from the Rails config.
#
# @return Array[Entry]
def self.entries
@cron_table || []
end

def self.update_scheduled_workloads!
# Will upsert (`INSERT ... ON CONFLICT UPDATE`) workloads for all entries which are in the scheduler entries
# table (the table needs to be read or hydrated first using `build_scheduler_entries_list!`). This is done
# in a transaction. Any workloads which have been previously inserted from the scheduled entries, but no
# longer have a corresponding scheduler entry, will be deleted from the database. If there already are workloads
# with the corresponding scheduler key they will not be touched and will be performed with their previously-defined
# arguments.
#
# @return void
def self.upsert_workloads_from_entries_list!
table_entries = @cron_table || []

# Remove any cron keyed workloads which no longer match config-wise
26 changes: 13 additions & 13 deletions test/gouda/scheduler_test.rb
Original file line number Diff line number Diff line change
@@ -44,8 +44,8 @@ def perform
}

assert_nothing_raised do
Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.build_scheduler_entries_list!(tab)
Gouda::Scheduler.upsert_workloads_from_entries_list!
end

assert_equal 1, Gouda::Workload.enqueued.count
@@ -67,12 +67,12 @@ def perform
}

assert_nothing_raised do
Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.build_scheduler_entries_list!(tab)
end

assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
3.times do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end
end

@@ -85,7 +85,7 @@ def perform

assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
3.times do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end
end

@@ -111,11 +111,11 @@ def perform
}

assert_nothing_raised do
Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.build_scheduler_entries_list!(tab)
end

assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end

assert_equal [nil, nil], Gouda::Workload.first.serialized_params["arguments"]
@@ -149,13 +149,13 @@ def perform
}
}
assert_nothing_raised do
Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.build_scheduler_entries_list!(tab)
end

travel_to Time.utc(2023, 6, 23, 20, 0)
assert_changes_by(-> { Gouda::Workload.count }, exactly: 4) do
3.times do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end
end

@@ -165,15 +165,15 @@ def perform
kwargs: {mandatory: "good"}
}

Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.build_scheduler_entries_list!(tab)
assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end

assert tab.delete(:fifth)
Gouda::Scheduler.update_schedule_from_config!(tab)
Gouda::Scheduler.build_scheduler_entries_list!(tab)
assert_changes_by(-> { Gouda::Workload.count }, exactly: -1) do
Gouda::Scheduler.update_scheduled_workloads!
Gouda::Scheduler.upsert_workloads_from_entries_list!
end

Gouda::Workload.all.each(&:perform_and_update_state!)