Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use query_type: :text for cancel_all_jobs in Dolphin engine #1223

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions lib/oban/engines/dolphin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,31 @@ defmodule Oban.Engines.Dolphin do

@impl Engine
def cancel_all_jobs(%Config{} = conf, queryable) do
query =
queryable
|> where([j], j.state not in ~w(cancelled completed discarded))
|> select([j], map(j, [:id, :queue, :state]))
|> lock("FOR UPDATE SKIP LOCKED")

# Using literal SQL allows us to use `query_type: text`, which is required in environments
# such as Planetscale where the binary protocol is incompatible with prepared statements.
{sql, params} = Ecto.Adapters.SQL.to_sql(:all, conf.repo, query)

sql =
Enum.reduce(params, sql, fn param, acc ->
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this, but I couldn't figure out a better way to do it. query_type: :text does not allow params, so the full SQL needs to be passed to Repo.query! without any params.

String.replace(acc, "?", to_string(param), global: false)
end)

Repo.transaction(conf, fn ->
jobs =
queryable
|> where([j], j.state not in ~w(cancelled completed discarded))
|> select([j], map(j, [:id, :queue, :state]))
|> lock("FOR UPDATE SKIP LOCKED")
|> then(&Repo.all(conf, &1))
%{columns: columns, rows: rows} = Repo.query!(conf, sql, [], query_type: :text)

jobs = Enum.map(rows, &conf.repo.load(Job, {columns, &1}))

query = where(Job, [j], j.id in ^Enum.map(jobs, & &1.id))
updates = [set: [state: "cancelled", cancelled_at: utc_now()]]

Repo.update_all(conf, query, set: [state: "cancelled", cancelled_at: utc_now()])
# MySQL doesn't support selecting in an update. To accomplish the required functionality
# we have to select, then update.
Repo.update_all(conf, where(Job, [j], j.id in ^Enum.map(jobs, & &1.id)), updates)

jobs
end)
Expand Down