Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fold all conditions into the pending workload query #18

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 45 additions & 50 deletions lib/gouda/workload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,53 @@ class Gouda::Workload < ActiveRecord::Base
ZOMBIE_MAX_THRESHOLD = "5 minutes"

self.table_name = "gouda_workloads"

# GoodJob calls these "enqueued" but they are more like
# "waiting to start" - jobs which have been scheduled past now,
# or haven't been scheduled to a particular time, are in the "enqueued"
# state and match the queue constraint
# state and match the queue constraint. Jobs which have fuses set are
# excluded from this scope, as well as jobs which have been scheduled
# but are no longer present in the schedule.
# The scope can be used to call `count()` to obtain the queue depth, which can
# be employed as a driving metric for autoscaling.
scope :waiting_to_start, ->(queue_constraint: Gouda::AnyQueue) {
# Check whether this workload was enqueued with a scheduler key, but no longer is in the cron table.
# If that is the case (we are trying to execute a workload which has a scheduler key, but the scheduler
# does not know about that key) it means that the workload has been removed from the cron table and must not run.
# Moreover: running it can be dangerous because it was likely removed from the table for a reason.
# Should that be the case, mark the job "finished" and return `nil` to get to the next poll. If the deployed worker still has
# the workload in its scheduler table, but a new deploy removed it - this is a race condition, but we are willing to accept it.
# Note that we are already "just not enqueueing" that job when the cron table gets loaded - this already happens.
#
# Removing jobs from the queue forcibly when we load the cron table is nice, but not enough, because our system can be in a state
# of partial deployment:
#
# [ release 1 does have some_job_hourly crontab entry ]
# [ release 2 no longer does ]
# ^ --- race conditions possible here --^
#
# So even if we remove the crontabled workloads during app boot, it does not give us a guarantee
# that release 1 won't reinsert them.
# This is why this safeguard is needed.
known_scheduler_keys = Gouda::Scheduler.known_scheduler_keys.to_a
known_scheduler_keys_condition = known_scheduler_keys.map { |key| connection.quote(key) }.join(", ")
condition_for_ready_to_execute_jobs = <<~SQL
#{queue_constraint.to_sql}
AND execution_concurrency_key NOT IN (
SELECT execution_concurrency_key FROM #{quoted_table_name} WHERE state = 'executing' AND execution_concurrency_key IS NOT NULL
AND NOT EXISTS (
SELECT NULL
FROM gouda_job_fuses
WHERE gouda_job_fuses.active_job_class_name = #{quoted_table_name}.active_job_class_name
)
AND NOT EXISTS (
SELECT NULL
FROM #{quoted_table_name} AS currently_executing_workloads
WHERE currently_executing_workloads.state = 'executing'
AND currently_executing_workloads.execution_concurrency_key = #{quoted_table_name}.execution_concurrency_key
)
AND (scheduler_key IS NULL OR scheduler_key IN (#{known_scheduler_keys_condition}))
AND state = 'enqueued'
AND (scheduled_at <= clock_timestamp())
SQL

where(Arel.sql(condition_for_ready_to_execute_jobs))
}

Expand Down Expand Up @@ -89,63 +122,25 @@ def self.reap_zombie_workloads

# Lock the next workload and mark it as executing
def self.checkout_and_lock_one(executing_on:, queue_constraint: Gouda::AnyQueue)
where_query = <<~SQL
#{queue_constraint.to_sql}
AND workloads.state = 'enqueued'
AND NOT EXISTS (
SELECT NULL
FROM #{quoted_table_name} AS concurrent
WHERE concurrent.state = 'executing'
AND concurrent.execution_concurrency_key = workloads.execution_concurrency_key
)
AND workloads.scheduled_at <= clock_timestamp()
SQL
# Enter a txn just to mark this job as being executed "by us". This allows us to avoid any
# locks during execution itself, including advisory locks
workloads = Gouda::Workload
.select("workloads.*")
.from("#{quoted_table_name} AS workloads")
.where(where_query)
.order("workloads.priority ASC NULLS LAST")
workloads_rel = Gouda::Workload
.waiting_to_start
.order("#{quoted_table_name}.priority ASC NULLS LAST")
.lock("FOR UPDATE SKIP LOCKED")
.limit(1)

_first_available_workload = ActiveSupport::Notifications.instrument(:checkout_and_lock_one, {queue_constraint: queue_constraint.to_sql}) do |payload|
payload[:condition_sql] = workloads.to_sql
payload[:condition_sql] = workloads_rel.to_sql
payload[:retried_checkouts_due_to_concurrent_exec] = 0
uncached do # Necessary because we SELECT with a clock_timestamp() which otherwise gets cached by ActiveRecord query cache
transaction do
workload = Gouda.suppressing_sql_logs { workloads.first } # Silence SQL output as this gets called very frequently
workload = Gouda.suppressing_sql_logs { workloads_rel.first } # Silence SQL output as this gets called very frequently
return nil unless workload

if workload.scheduler_key && !Gouda::Scheduler.known_scheduler_keys.include?(workload.scheduler_key)
# Check whether this workload was enqueued with a scheduler key, but no longer is in the cron table.
# If that is the case (we are trying to execute a workload which has a scheduler key, but the scheduler
# does not know about that key) it means that the workload has been removed from the cron table and must not run.
# Moreover: running it can be dangerous because it was likely removed from the table for a reason.
# Should that be the case, mark the job "finished" and return `nil` to get to the next poll. If the deployed worker still has
# the workload in its scheduler table, but a new deploy removed it - this is a race condition, but we are willing to accept it.
# Note that we are already "just not enqueueing" that job when the cron table gets loaded - this already happens.
#
# Removing jobs from the queue forcibly when we load the cron table is nice, but not enough, because our system can be in a state
# of partial deployment:
#
# [ release 1 does have some_job_hourly crontab entry ]
# [ release 2 no longer does ]
# ^ --- race conditions possible here --^
#
# So even if we remove the crontabled workloads during app boot, it does not give us a guarantee that release 1 won't reinsert them.
# This is why this safeguard is needed.
error = {class_name: "WorkloadSkippedError", message: "Skipped as scheduler_key was no longer in the cron table"}
workload.update!(state: "finished", error:)
# And return nil. This will cause a brief "sleep" in the polling routine since the caller may think there are no more workloads
# in the queue, but only for a brief moment.
nil
else
# Once we have verified this job is OK to execute
workload.update!(state: "executing", executing_on: executing_on, last_execution_heartbeat_at: Time.now.utc, execution_started_at: Time.now.utc)
workload
end
# Once we have verified this job is OK to execute
workload.update!(state: "executing", executing_on: executing_on, last_execution_heartbeat_at: Time.now.utc, execution_started_at: Time.now.utc)
workload
rescue ActiveRecord::RecordNotUnique
# It can happen that due to a race the `execution_concurrency_key NOT IN` does not capture
# a job which _just_ entered the "executing" state, apparently after we do our SELECT. This will happen regardless
Expand Down
Loading