From 06e829eaac6d7125b6d06c776dc298e83784126f Mon Sep 17 00:00:00 2001 From: Julik Tarkhanov Date: Wed, 4 Sep 2024 11:20:30 +0200 Subject: [PATCH] Do not execute removed cron workloads (#17) We need to check whether a workload was enqueued with a scheduler key, but no longer is in the cron table. If that is the case (we are trying to execute a workload which has a scheduler key, but the scheduler does not know about that key) it means that the workload has been removed from the cron table and must not run. Moreover: running it can be dangerous because it was likely removed from the table for a reason. Should that be the case, mark the job "finished" and return `nil` to get to the next poll. If the deployed worker still has the workload in its scheduler table, but a new deploy removed it - this is a race condition, but we are willing to accept it. Note that we are already "just not enqueueing" that job when the cron table gets loaded - but it is not enough. If our system can be in a state of partial deployment: ``` [ release 1 does have some_job_hourly crontab entry ] [ release 2 no longer does ] ^ --- race conditions possible here --^ ``` ...and we remove the crontabled workloads during app boot, it does not give us a guarantee that release 1 won't reenqueue them. For example, via the "reinsert next scheduled" feature when the job is executing. This is why this safeguard is needed. This protects us from a very dangerous failure mode where we would remove an entry from the cron table, deploy the change, and then still have the workload run a day later. Also, mandate Ruby 3.1 as the minimum version and pin the Standardrb syntax to that. --- .github/workflows/ci.yml | 3 +- .standard.yml | 1 + CHANGELOG.md | 62 ++++++++++++++++++++---------------- gouda.gemspec | 4 +-- lib/gouda.rb | 14 ++++---- lib/gouda/adapter.rb | 2 +- lib/gouda/scheduler.rb | 17 ++++++++-- lib/gouda/version.rb | 2 +- lib/gouda/workload.rb | 40 +++++++++++++++++++---- test/gouda/scheduler_test.rb | 39 +++++++++++++++++++++++ test/gouda/test_helper.rb | 17 ++-------- 11 files changed, 137 insertions(+), 64 deletions(-) create mode 100644 .standard.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4949b0a..176aa99 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,8 +14,7 @@ jobs: strategy: matrix: ruby: - - '2.7' - - '3.3' + - '3.1' services: postgres: image: postgres diff --git a/.standard.yml b/.standard.yml new file mode 100644 index 0000000..72b2693 --- /dev/null +++ b/.standard.yml @@ -0,0 +1 @@ +ruby_version: 3.1 diff --git a/CHANGELOG.md b/CHANGELOG.md index c64fcfb..f1897d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,58 +1,64 @@ ## [Unreleased] -## [0.1.0] - 2024-06-10 -- Initial release +## [0.1.13] - 2024-09-03 -## [0.1.1] - 2024-06-10 +- Ensure we won't execute workloads which were scheduled but are no longer present in the cron table entries. -- Fix support for older ruby versions until 2.7 +## [0.1.12] - 2024-07-03 -## [0.1.2] - 2024-06-11 +- When doing polling, suppress DEBUG-level messages. This will stop Gouda spamming the logs with SQL in dev/test environments. -- Updated readme and method renaming in Scheduler +## [0.1.11] - 2024-07-03 -## [0.1.3] - 2024-06-11 +- Fix: make sure the Gouda logger config does not get used during Rails initialization -- Allow the Rails app to boot even if there is no database yet +## [0.1.10] - 2024-07-03 -## [0.1.4] - 2024-06-14 +- Fix: remove logger overrides that Gouda should install, as this causes problems for Rails apps hosting Gouda -- Rescue NoDatabaseError at scheduler update. -- Include tests in gem, for sake of easier debugging. -- Reduce logging in local test runs. -- Bump local ruby version to 3.3.3 +## [0.1.9] - 2024-06-26 -## [0.1.5] - 2024-06-18 +- Fix: cleanup_preserved_jobs_before in Gouda::Workload.prune now points to Gouda.config -- Update documentation -- Don't pass on scheduler keys to retries +## [0.1.8] - 2024-06-21 + +- Move some missed instrumentations to Gouda.instrument + +## [0.1.7] - 2024-06-21 + +- Separate all instrumentation to use ActiveSupport::Notification ## [0.1.6] - 2024-06-18 - Fix: don't upsert workloads twice when starting Gouda. - Add back in Appsignal calls -## [0.1.7] - 2024-06-21 +## [0.1.5] - 2024-06-18 -- Separate all instrumentation to use ActiveSupport::Notification +- Update documentation +- Don't pass on scheduler keys to retries -## [0.1.8] - 2024-06-21 +## [0.1.4] - 2024-06-14 -- Move some missed instrumentations to Gouda.instrument +- Rescue NoDatabaseError at scheduler update. +- Include tests in gem, for sake of easier debugging. +- Reduce logging in local test runs. +- Bump local ruby version to 3.3.3 -## [0.1.9] - 2024-06-26 +## [0.1.3] - 2024-06-11 -- Fix: cleanup_preserved_jobs_before in Gouda::Workload.prune now points to Gouda.config +- Allow the Rails app to boot even if there is no database yet -## [0.1.10] - 2024-07-03 +## [0.1.2] - 2024-06-11 -- Fix: remove logger overrides that Gouda should install, as this causes problems for Rails apps hosting Gouda +- Updated readme and method renaming in Scheduler -## [0.1.11] - 2024-07-03 +## [0.1.1] - 2024-06-10 -- Fix: make sure the Gouda logger config does not get used during Rails initialization +- Fix support for older ruby versions until 2.7 -## [0.1.12] - 2024-07-03 +## [0.1.0] - 2024-06-10 + +- Initial release -- When doing polling, suppress DEBUG-level messages. This will stop Gouda spamming the logs with SQL in dev/test environments. diff --git a/gouda.gemspec b/gouda.gemspec index 6456be3..5559d70 100644 --- a/gouda.gemspec +++ b/gouda.gemspec @@ -9,10 +9,10 @@ Gem::Specification.new do |spec| spec.email = ["sebastian@cheddar.me", "me@julik.nl"] spec.homepage = "https://github.com/cheddar-me/gouda" spec.license = "MIT" - spec.required_ruby_version = Gem::Requirement.new(">= 2.7.0") + spec.required_ruby_version = Gem::Requirement.new(">= 3.1.0") spec.require_paths = ["lib"] - spec.metadata["homepage_uri"] = + spec.metadata["homepage_uri"] = spec.homepage spec.metadata["source_code_uri"] = spec.homepage spec.metadata["changelog_uri"] = "https://github.com/cheddar-me/gouda/CHANGELOG.md" diff --git a/lib/gouda.rb b/lib/gouda.rb index d989280..26ebcec 100644 --- a/lib/gouda.rb +++ b/lib/gouda.rb @@ -64,10 +64,8 @@ def self.configure def self.logger # By default, return a logger that sends data nowhere. The `Rails.logger` method # only becomes available later in the Rails lifecycle. - @fallback_gouda_logger ||= begin - ActiveSupport::Logger.new($stdout).tap do |logger| - logger.level = Logger::WARN - end + @fallback_gouda_logger ||= ActiveSupport::Logger.new($stdout).tap do |logger| + logger.level = Logger::WARN end # We want the Rails-configured loggers to take precedence over ours, since Gouda @@ -81,22 +79,22 @@ def self.logger Rails.try(:logger) || ActiveJob::Base.try(:logger) || @fallback_gouda_logger end - def self.suppressing_sql_logs(&blk) + def self.suppressing_sql_logs(&) # This is used for frequently-called methods that poll the DB. If logging is done at a low level (DEBUG) # those methods print a lot of SQL into the logs, on every poll. While that is useful if # you collect SQL queries from the logs, in most cases - especially if this is used # in a side-thread inside Puma - the output might be quite annoying. So silence the # logger when we poll, but just to INFO. Omitting DEBUG-level messages gets rid of the SQL. if Gouda::Workload.logger - Gouda::Workload.logger.silence(Logger::INFO, &blk) + Gouda::Workload.logger.silence(Logger::INFO, &) else # In tests (and at earlier stages of the Rails boot cycle) the global ActiveRecord logger may be nil yield end end - def self.instrument(channel, options, &block) - ActiveSupport::Notifications.instrument("#{channel}.gouda", options, &block) + def self.instrument(channel, options, &) + ActiveSupport::Notifications.instrument("#{channel}.gouda", options, &) end def self.create_tables(active_record_schema) diff --git a/lib/gouda/adapter.rb b/lib/gouda/adapter.rb index 39a15ca..6a3aa7c 100644 --- a/lib/gouda/adapter.rb +++ b/lib/gouda/adapter.rb @@ -57,7 +57,7 @@ def enqueue_all(active_jobs) # We can't tell Postgres to ignore conflicts on _both_ the scheduler key and the enqueue concurrency key but not on # the ID - it is either "all indexes" or "just one", but never "this index and that index". MERGE https://www.postgresql.org/docs/current/sql-merge.html # is in theory capable of solving this but let's not complicate things all to hastily, the hour is getting late - scheduler_key = active_job.try(:executions) == 0 ? active_job.scheduler_key : nil # only enforce scheduler key on first workload + scheduler_key = (active_job.try(:executions) == 0) ? active_job.scheduler_key : nil # only enforce scheduler key on first workload { active_job_id: active_job.job_id, # Multiple jobs can have the same ID due to retries, job-iteration etc. scheduled_at: active_job.scheduled_at || t_now, diff --git a/lib/gouda/scheduler.rb b/lib/gouda/scheduler.rb index e56b96f..671a0be 100644 --- a/lib/gouda/scheduler.rb +++ b/lib/gouda/scheduler.rb @@ -87,7 +87,7 @@ def kwargs_value # @return Array[Entry] def self.build_scheduler_entries_list!(cron_table_hash = nil) Gouda.logger.info "Updating scheduled workload entries..." - if cron_table_hash.blank? + if cron_table_hash.nil? # An empty hash indicates that an empty crontab will be loaded config_from_rails = Rails.application.config.try(:gouda) cron_table_hash = if config_from_rails.present? @@ -106,6 +106,9 @@ def self.build_scheduler_entries_list!(cron_table_hash = nil) params_with_defaults = defaults.merge(cron_entry_params) Entry.new(name: name, **params_with_defaults) end + @known_scheduler_keys = Set.new(@cron_table.map(&:scheduler_key)) + + @cron_table end # Once a workload has finished (doesn't matter whether it raised an exception @@ -132,6 +135,14 @@ def self.entries @cron_table || [] end + # Returns the set of known scheduler keys that may be present in the workloads table and are defined + # by the current entries. + # + # @return Set[String] + def self.known_scheduler_keys + @known_scheduler_keys || Set.new + end + # Will upsert (`INSERT ... ON CONFLICT UPDATE`) workloads for all entries which are in the scheduler entries # table (the table needs to be read or hydrated first using `build_scheduler_entries_list!`). This is done # in a transaction. Any workloads which have been previously inserted from the scheduled entries, but no @@ -143,9 +154,11 @@ def self.entries def self.upsert_workloads_from_entries_list! table_entries = @cron_table || [] - # Remove any cron keyed workloads which no longer match config-wise + # Remove any cron keyed workloads which no longer match config-wise. + # We do this to keep things clean (but it is not enough, an extra guard is needed in Workload checkout) known_keys = table_entries.map(&:scheduler_key).uniq Gouda::Workload.transaction do + # We do this to keep things a bit clean Gouda::Workload.where.not(scheduler_key: known_keys).delete_all # Insert the next iteration for every "next" entry in the crontab. diff --git a/lib/gouda/version.rb b/lib/gouda/version.rb index 228be7e..7b659de 100644 --- a/lib/gouda/version.rb +++ b/lib/gouda/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Gouda - VERSION = "0.1.12" + VERSION = "0.1.13" end diff --git a/lib/gouda/workload.rb b/lib/gouda/workload.rb index aef39c2..a288682 100644 --- a/lib/gouda/workload.rb +++ b/lib/gouda/workload.rb @@ -95,14 +95,14 @@ def self.checkout_and_lock_one(executing_on:, queue_constraint: Gouda::AnyQueue) AND NOT EXISTS ( SELECT NULL FROM #{quoted_table_name} AS concurrent - WHERE concurrent.state = 'executing' + WHERE concurrent.state = 'executing' AND concurrent.execution_concurrency_key = workloads.execution_concurrency_key ) AND workloads.scheduled_at <= clock_timestamp() SQL # Enter a txn just to mark this job as being executed "by us". This allows us to avoid any # locks during execution itself, including advisory locks - jobs = Gouda::Workload + workloads = Gouda::Workload .select("workloads.*") .from("#{quoted_table_name} AS workloads") .where(where_query) @@ -111,13 +111,41 @@ def self.checkout_and_lock_one(executing_on:, queue_constraint: Gouda::AnyQueue) .limit(1) _first_available_workload = ActiveSupport::Notifications.instrument(:checkout_and_lock_one, {queue_constraint: queue_constraint.to_sql}) do |payload| - payload[:condition_sql] = jobs.to_sql + payload[:condition_sql] = workloads.to_sql payload[:retried_checkouts_due_to_concurrent_exec] = 0 uncached do # Necessary because we SELECT with a clock_timestamp() which otherwise gets cached by ActiveRecord query cache transaction do - job = Gouda.suppressing_sql_logs { jobs.first } # Silence SQL output as this gets called very frequently - job&.update!(state: "executing", executing_on: executing_on, last_execution_heartbeat_at: Time.now.utc, execution_started_at: Time.now.utc) - job + workload = Gouda.suppressing_sql_logs { workloads.first } # Silence SQL output as this gets called very frequently + return nil unless workload + + if workload.scheduler_key && !Gouda::Scheduler.known_scheduler_keys.include?(workload.scheduler_key) + # Check whether this workload was enqueued with a scheduler key, but no longer is in the cron table. + # If that is the case (we are trying to execute a workload which has a scheduler key, but the scheduler + # does not know about that key) it means that the workload has been removed from the cron table and must not run. + # Moreover: running it can be dangerous because it was likely removed from the table for a reason. + # Should that be the case, mark the job "finished" and return `nil` to get to the next poll. If the deployed worker still has + # the workload in its scheduler table, but a new deploy removed it - this is a race condition, but we are willing to accept it. + # Note that we are already "just not enqueueing" that job when the cron table gets loaded - this already happens. + # + # Removing jobs from the queue forcibly when we load the cron table is nice, but not enough, because our system can be in a state + # of partial deployment: + # + # [ release 1 does have some_job_hourly crontab entry ] + # [ release 2 no longer does ] + # ^ --- race conditions possible here --^ + # + # So even if we remove the crontabled workloads during app boot, it does not give us a guarantee that release 1 won't reinsert them. + # This is why this safeguard is needed. + error = {class_name: "WorkloadSkippedError", message: "Skipped as scheduler_key was no longer in the cron table"} + workload.update!(state: "finished", error:) + # And return nil. This will cause a brief "sleep" in the polling routine since the caller may think there are no more workloads + # in the queue, but only for a brief moment. + nil + else + # Once we have verified this job is OK to execute + workload.update!(state: "executing", executing_on: executing_on, last_execution_heartbeat_at: Time.now.utc, execution_started_at: Time.now.utc) + workload + end rescue ActiveRecord::RecordNotUnique # It can happen that due to a race the `execution_concurrency_key NOT IN` does not capture # a job which _just_ entered the "executing" state, apparently after we do our SELECT. This will happen regardless diff --git a/test/gouda/scheduler_test.rb b/test/gouda/scheduler_test.rb index 3e87aaf..937145b 100644 --- a/test/gouda/scheduler_test.rb +++ b/test/gouda/scheduler_test.rb @@ -142,6 +142,45 @@ def perform assert_equal [nil, nil], Gouda::Workload.first.serialized_params["arguments"] end + test "ensures a job that was scheduled but no longer present in the cron table gets force-finished without executing" do + tab = { + first_hourly: { + cron: "@hourly", + class: "GoudaSchedulerTest::TestJob", + args: [nil, nil] + } + } + + assert_nothing_raised do + Gouda::Scheduler.build_scheduler_entries_list!(tab) + end + + Gouda::Workload.delete_all + assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do + Gouda::Scheduler.upsert_workloads_from_entries_list! + end + + # Update all workloads so that it is already time for it to be executed (as we use clock_timestamp() + # time travel is not possible in those tests) + Gouda::Workload.update_all(scheduled_at: Time.now - 2.minutes) + + workload = Gouda::Workload.checkout_and_lock_one(executing_on: "test") + assert workload # Now this workload does get selected for execution + workload.update(state: "enqueued") # Return it to the queue + + # Erase the crontab. + # No need to enqueue next jobs in this test as there would not be jobs enqueued anyway + assert_nothing_raised do + Gouda::Scheduler.build_scheduler_entries_list!({}) + end + + assert_nil Gouda::Workload.checkout_and_lock_one(executing_on: "test"), "The workload should not be picked for execution now" + just_finished_workload = Gouda::Workload.where(state: "finished").first! + assert_equal "finished", just_finished_workload.state + assert just_finished_workload.error + assert_match(/scheduler/, just_finished_workload.error.fetch("message")) + end + test "is able to accept a crontab" do tab = { first_hourly: { diff --git a/test/gouda/test_helper.rb b/test/gouda/test_helper.rb index e31ad4f..6259b28 100644 --- a/test/gouda/test_helper.rb +++ b/test/gouda/test_helper.rb @@ -56,27 +56,16 @@ def truncate_test_tables ActiveRecord::Base.connection.execute("TRUNCATE TABLE gouda_job_fuses") end - def test_create_tables - ActiveRecord::Base.transaction do - ActiveRecord::Base.connection.execute("DROP TABLE gouda_workloads") - ActiveRecord::Base.connection.execute("DROP TABLE gouda_job_fuses") - # The adapter has to be in a variable as the schema definition is scoped to the migrator, not self - ActiveRecord::Schema.define(version: 1) do |via_definer| - Gouda.create_tables(via_definer) - end - end - end - def subscribed_notification_for(notification) payload = nil - subscription = ActiveSupport::Notifications.subscribe notification do |name, start, finish, id, _payload| - payload = _payload + subscription = ActiveSupport::Notifications.subscribe notification do |name, start, finish, id, local_payload| + payload = local_payload end yield ActiveSupport::Notifications.unsubscribe(subscription) - return payload + payload end end