-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathpostgres-as-queue.html
193 lines (177 loc) · 10.7 KB
/
postgres-as-queue.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
<html>
<meta content="width=device-width, initial-scale=1.0" name="viewport">
<head>
<title>
leontrolski - postgres as queue
</title>
<style>
body {margin: 5% auto; background: #fff7f7; color: #444444; font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif; font-size: 16px; line-height: 1.8; max-width: 63%;}
@media screen and (max-width: 800px) {body {font-size: 14px; line-height: 1.4; max-width: 90%;}}
pre {width: 100%; border-top: 3px solid gray; border-bottom: 3px solid gray;}
a {border-bottom: 1px solid #444444; color: #444444; text-decoration: none; text-shadow: 0 1px 0 #ffffff; }
a:hover {border-bottom: 0;}
.inline {background: #b3b2b226; padding-left: 0.3em; padding-right: 0.3em; white-space: nowrap;}
blockquote {font-style: italic;color:black;background-color:#f2f2f2;padding:1em;margin:0;}
details {border-bottom:solid 5px gray;}
</style>
<link href="https://unpkg.com/[email protected]/themes/prism-vs.css" rel="stylesheet">
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.20.0/components/prism-core.min.js">
</script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.20.0/plugins/autoloader/prism-autoloader.min.js">
</script>
</head>
<body>
<a href="index.html">
<img src="pic.png" style="height:2em">
⇦
</a>
<p><i>2024-02-09</i></p>
<h1>Postgres as queue</h1>
<p>The team I've been working in for the last year or so have had great success using Postgres-as-queue. We've
managed to <em>avoid</em> the following:</p>
<ul>
<li>Infrastructure/configuration<em> - I'd estimate each line of terraform to be an order of
magnitude more risk/maintenance/faff than each line of Python</em>.</li>
<li>Slow/crunky multi-container testing.</li>
<li>The need for expertise in anything beyond Python + Postgres.</li>
<li>Elaborate retry/dead-letter-queue mechanisms.</li>
<li>Thinking about data serialisation over the wire.</li>
</ul>
<p>In a nut shell, it's simpler - there are just way fewer moving parts.</p>
<p>As we're using a monolithic codebase with a reasonable ORM, we also have none of the <a
href="cmd-click-manifesto.html">CMD-clickability</a> issues that plague ad-hoc SNS/PubSub/Kafka <a
href="manipleservices.html">architectures</a>.</p>
<h1 id="objection">Objection</h1>
<p>The main objection to doing Postgres-as-queue is a performance one, along the lines of "don't put
unnecessary extra load on the db/don't increase row churn". Let's construct a reasonable example demonstrating that queue usage shouldn't introduce much extra load in many cases. <em>As always, before following
anyone's advice on this kind of stuff, profile, profile, profile!</em></p>
<blockquote>
<p>In the (fairly unusual) case that you're doing many tasks, none of which touch the db (say constructing
and sending emails from static data), you can ignore this blog post and get on with life. In another case, you
may be operating at some crazy scale where <a
href="https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/#:~:text=There%20are%20downsides%20to%20using%20SKIP%20LOCKED%20to%20implement%20a%20queue">these
downsides</a> start applying, again, run the numbers and profile.</p>
</blockquote>
<p>Let's imagine the db load introduced by a hypothetical task - I currently work in the energy industry, so
the example might be: a customer submits a meter reading, we queue a task to write the reading and update some
account balance - the load looks like:</p>
<ul>
<li>Receive the message from the broker.</li>
<li>Make 3 primary key <code>SELECT</code>s totalling 0.3ms db time.</li>
<li>Make 2 slightly hairier <code>SELECT</code>s with some <code>JOIN</code>s/<code>GROUP BY</code>s totalling 4ms
db time.</li>
<li>Perform 2 <code>UPDATE</code>s totalling 2ms db time (and some row churn).</li>
<li>ACK the message.</li>
</ul>
<p>In the new Postgres-as-queue world, this looks like:</p>
<ul>
<li><b>Poll for a message that needs processing, on finding one, <code>UPDATE</code> the status, totalling 1ms db time.
</b></li>
<li>Make 3 primary key <code>SELECT</code>s totalling 0.3ms db time.</li>
<li>Make 2 slightly hairier <code>SELECT</code>s with some <code>JOIN</code>s/<code>GROUP BY</code>s totalling 4ms
db time.</li>
<li>Perform 2 <code>UPDATE</code>s totalling 2ms db time (and some row churn).</li>
<li><b>ACK the message by <code>UPDATE</code>ing the status totalling 0.5ms db time (and some row churn).</b></li>
</ul>
<p>In this example, our db time has gone up from 6.3ms per task to 7.8ms. These figures are totally fictional, but
we've demonstrated a reasonable way of thinking about the overhead.</p>
<h1 id="implementation">Implementation</h1>
<p>If we had just one worker polling for tasks, we could ignore locking and transactions, but we want to have many, so
we have to use <code>FOR UPDATE SKIP LOCKED</code>. This atomically locks the row at the point where it selects it -
there's discussion of ins and outs in this <a
href="https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/">excellent blog post
by 2ndQuadrant</a>.</p>
<p>For our example implementation, we have an event table that looks like:</p>
<pre><code class="lang-sql"><span class="hljs-section">id | status | updated_at
------------------------------------------</span>
UUID | SMALLINT | TIMESTAMP WITH TIME ZONE
</code></pre>
<p>We have an <code>INDEX</code> on <code>(status, updated_at)</code>. <em>In reality we have many tables, one per
queue.</em></p>
<p>Our polling workers run a loop like:</p>
<pre><code class="lang-python"><span class="hljs-keyword">for</span> _ <span class="hljs-keyword">in</span> shutdown_handler.loop(): <span class="hljs-comment"># see appendix below</span>
event_meta = get_event_to_process(
where_status_eq=TO_PROCESS,
set_status_to=PROCESSING,
)
<span class="hljs-keyword">if</span> event_meta <span class="hljs-keyword">is</span> <span class="hljs-keyword">None</span>:
time.sleep(x) <span class="hljs-comment"># be gentle on the DB</span>
<span class="hljs-keyword">continue</span>
<span class="hljs-keyword">try</span>:
<span class="hljs-comment"># Perform task!</span>
set_status(event_meta, PROCESSED)
<span class="hljs-keyword">except</span>:
set_status(event_meta, ERRORED, ...)
</code></pre>
<p>And <code>get_event_to_process(...)</code> performs SQL along the lines of:</p>
<pre><code class="lang-sql">WITH ids AS MATERIALIZED (
<span class="hljs-keyword">SELECT</span> <span class="hljs-keyword">id</span> <span class="hljs-keyword">FROM</span> event_queue
<span class="hljs-keyword">WHERE</span> <span class="hljs-keyword">status</span> = {where_status_eq}
<span class="hljs-keyword">ORDER</span> <span class="hljs-keyword">BY</span> updated_at
<span class="hljs-keyword">LIMIT</span> <span class="hljs-number">1</span>
<span class="hljs-keyword">FOR</span> <span class="hljs-keyword">UPDATE</span> <span class="hljs-keyword">SKIP</span> <span class="hljs-keyword">LOCKED</span>
)
<span class="hljs-keyword">UPDATE</span> event_queue
<span class="hljs-keyword">SET</span> status = {set_status_to}, updated_at = {now}
<span class="hljs-keyword">WHERE</span> <span class="hljs-keyword">id</span> = <span class="hljs-keyword">ANY</span>(<span class="hljs-keyword">SELECT</span> <span class="hljs-keyword">id</span> <span class="hljs-keyword">FROM</span> ids)
<span class="hljs-keyword">RETURNING</span> <span class="hljs-keyword">id</span>
</code></pre>
<p><em>Note the use of <code>MATERIALISED</code> to force the CTE to evaluate eagerly before the <code>UPDATE</code>
(aside: I'd like a postgres expert to assert that this query is truly race condition free)</em>.</p>
<p><code>set_status(...)</code> just performs an update of <code>status</code> and <code>updated_at</code> for a
particular row.</p>
<h1 id="bonus-features">Bonus features</h1>
<p>Because you're simply interacting with a persistent table rather that some black-box queue, it's easy to add
bells and whistles as your requirements change.</p>
<h2 id="retrying">Retrying</h2>
<p>Sometimes tasks fail/timeout. We have jobs that periodically poll for old tasks that have weird statuses and attempt
to retry them as appropriate.</p>
<h2 id="ignore-before">Ignore before</h2>
<p>We have one more timestamp column on our <code>event_queue</code> tables - <code>ignore_before</code>. This is useful
in two scenarios:</p>
<ul>
<li>We can represent timeouts (eg. "send an email if we didn't receive inbound x after 10 days") as
regular ol' events.</li>
<li>We want to batch up certain types of outbound event, so we can set their <code>ignore_before</code> to "at
the next whole hour" and bundle up a load of events at dispatch-time.</li>
</ul>
<h2 id="cruft-cleanup">Cruft cleanup</h2>
<p>You may want have cron jobs that delete queue data older than some time.</p>
<h1 id="appendix">Appendix</h1>
<h2 id="shutdown-handler">Shutdown handler</h2>
<p>The following is a nice helper for polling loops that aids with shutdown handling, and times itself out after an hour
of no activity.</p>
<pre><code class="lang-python">import os
import signal
import threading
from types import FrameType
from typing import Iterator
INTERRUPT_TIMEOUT = 60 * 60.0 # 1 hour
class ShutdownHandler:
work_done: threading.Event
def __init__(self) -> None:
self.exit_flag = False
# For ctrl-c from terminal
signal.signal(signal.SIGINT, self.signal_handler)
# for normal Unix usage
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signal: int, frame: FrameType | None) -> None:
self.exit_flag = True
def kill_after_timeout(self) -> None:
self.work_done = threading.Event()
if self.work_done.wait(INTERRUPT_TIMEOUT):
return
# If the timeout is reached, kill the parent process
os.kill(os.getpid(), signal.SIGKILL)
def loop(self) -> Iterator[None]:
while True:
if self.exit_flag:
self.work_done.set()
return
# Making the thread a daemon means it will die if the parent thread dies.
threading.Thread(target=self.kill_after_timeout, daemon=True).start()
yield None
self.work_done.set()
</code></pre>
</body>
</html>