Skip to content

Commit

Permalink
Fix keyword args passing in lower ruby versions
Browse files Browse the repository at this point in the history
  • Loading branch information
svanhesteren committed Jun 10, 2024
1 parent 79fd37d commit c3fcbfb
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
2 changes: 1 addition & 1 deletion lib/gouda/active_job_extensions/concurrency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def gouda_control_concurrency_with(total_limit: nil, perform_limit: nil, enqueue
enqueue_limit = total_limit
end

self.gouda_concurrency_config = {perform_limit:, enqueue_limit:, key:}
self.gouda_concurrency_config = {perform_limit: perform_limit, enqueue_limit:, key:}
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/gouda/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def self.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_co
break if check_shutdown.call

did_process = Gouda.config.app_executor.wrap do
Gouda::Workload.checkout_and_perform_one(executing_on: worker_id_and_thread_id, queue_constraint:, in_progress: executing_workload_ids)
Gouda::Workload.checkout_and_perform_one(executing_on: worker_id_and_thread_id, queue_constraint: queue_constraint, in_progress: executing_workload_ids)
end

# If no job was retrieved the queue is likely empty. Relax the polling then and ease off.
Expand Down
6 changes: 3 additions & 3 deletions lib/gouda/workload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def self.reap_zombie_workloads
# Appsignal.increment_counter("gouda_workloads_revived", 1, job_class: workload.active_job_class_name)

interrupted_at = workload.last_execution_heartbeat_at
workload.update!(state: "finished", interrupted_at:, last_execution_heartbeat_at: Time.now.utc, execution_finished_at: Time.now.utc)
workload.update!(state: "finished", interrupted_at: interrupted_at, last_execution_heartbeat_at: Time.now.utc, execution_finished_at: Time.now.utc)
revived_job = ActiveJob::Base.deserialize(workload.active_job_data)
# Save the interrupted_at timestamp so that upon execution the new job will raise a Gouda::Interrpupted exception.
# The exception can then be handled like any other ActiveJob exception (using rescue_from or similar).
Expand Down Expand Up @@ -116,7 +116,7 @@ def self.checkout_and_lock_one(executing_on:, queue_constraint: Gouda::AnyQueue)
uncached do # Necessary because we SELECT with a clock_timestamp() which otherwise gets cached by ActiveRecord query cache
transaction do
jobs.first.tap do |job|
job&.update!(state: "executing", executing_on:, last_execution_heartbeat_at: Time.now.utc, execution_started_at: Time.now.utc)
job&.update!(state: "executing", executing_on: executing_on, last_execution_heartbeat_at: Time.now.utc, execution_started_at: Time.now.utc)
end
rescue ActiveRecord::RecordNotUnique
# It can happen that due to a race the `execution_concurrency_key NOT IN` does not capture
Expand All @@ -133,7 +133,7 @@ def self.checkout_and_lock_one(executing_on:, queue_constraint: Gouda::AnyQueue)
# @param in_progress[#add,#delete] Used for tracking work in progress for heartbeats
def self.checkout_and_perform_one(executing_on:, queue_constraint: Gouda::AnyQueue, in_progress: Set.new)
# Select a job and mark it as "executing" which will make it unavailable to any other
workload = checkout_and_lock_one(executing_on:, queue_constraint:)
workload = checkout_and_lock_one(executing_on: executing_on, queue_constraint: queue_constraint)
if workload
in_progress.add(workload.id)
workload.perform_and_update_state!
Expand Down

0 comments on commit c3fcbfb

Please sign in to comment.