Use what you already have: Building a message queue on Postgres
August 2024
See the code at github.com/oliverlambson/pgmq.
When you’re building a decently-sized webapp, you will often end up wanting a way to shift work off of your webserver: you don’t want it getting bogged down in “heavy” work, and you don’t want to have to wait for non-critical things to run before you can send a response. Normally, this is done with a message queue: your webserver publishes instructions to somewhere, and you set up another server (a “worker”) that listens for those instructions and does the hard work.
Why? This must be a solved problem?
There are lots of solutions to this problem. Most/all are probably better. But I am a meme-driven-developer, so here we are.
Seriously though, the solutions I’m aware of are either overkill: using Rabbit/SQS/etc means I have to maintain another piece of infrastructure, and Celery/Symfony Messenger/whatever else lock me in to a language or framework-specific implementation, which is annoying. I’m already running a Postgres server, can’t I just use that? It only has to be good-enough and I’ll be happy.
This solution is under 250 lines of Python and SQL, it runs on my existing infrastructure, and is easy to understand. Maybe one day I’ll need a “proper” solution, but until then this seems fine.
What’s the simplest solution?
Add messages to a table. Set up a worker to poll it. Once the work is done, delete the row.
You could have a table called messages
that looks like this:
id | message |
---|---|
1 |
{ "task": "send-email", "payload": { "kind": "account-statement", "to": "ollie@example.com" } } |
2 |
{ "task": "send-email", "payload": { "kind": "account-statement", "to": "elliot@ecorp.com" } } |
3 |
{ "task": "reverse-transaction", "payload": { transaction-id": 7, "reason": "declined" } } |
And your worker code could be something like this:
while True:
sleep(0.1)
row = db_conn.fetch_one("SELECT id, message FROM messages LIMIT 1;")
if row is None:
continue
message = row.message
match message.task:
case "send-email": send_email(message.payload)
case "reverse-transaction": reverse_transaction(message.payload)
case _: print(f"Unknown task: {message.task}")
db_conn.execute("DELETE FROM messages WHERE id = $1;", row.id)
This is probably fine, it might even meet the criteria for “good enough”, but it doesn’t feel good. What if my worker fails? What if I want multiple workers? What if I want messages to timeout? What if I want history of messages that were processed? Ew, why are we using polling? How do I know that messages are being processed ok? How will I know when a message fails? How will I know when a message is bad? How do I retry failed messages?
Making it less garbage
Polling
Even though the polling is probably the least-bad part, let’s fix that first
because it feels the worst. Postgres has a built-in asynchronous notification
mechanism with the NOTIFY/LISTEN
commands. We can use this to tell our worker
when a new message is added to the messages
table, and only then do the read
the row, meaning we don’t have to constantly poll the table for messages1.
We can set up a trigger on the messages table to send the id of the new row whenever one is inserted:
CREATE OR REPLACE FUNCTION new_message_nofify() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('new_message', NEW.id::TEXT);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER new_message_trigger
AFTER INSERT ON messages
FOR EACH ROW
EXECUTE FUNCTION new_message_nofify();
And modify the worker to listen to the new_message
channel and run the message
processing whenever a notification comes through:
def callback(db_conn, payload):
row_id = int(payload)
row = db_conn.fetch_one("SELECT message FROM messages WHERE id = $1;", row_id)
if row is None:
continue
message = row.message
match message.task:
case "send-email": send_email(message.payload)
case "reverse-transaction": reverse_transaction(message.payload)
case _: print(f"Unknown task: {message.task}")
db_conn.execute("DELETE FROM messages WHERE id = $1;", row_id)
# ------- NEW! -------
db_conn.add_listener(channel="new_message", callback=callback)
await asyncio.Event().wait() # block forever so the listener doesn't close
# ----- end NEW! -----
Locking
Now, imagine things are going well and we have a lot of messages to process: we’re going to need multiple workers. Right now if we have two workers, they’ll both process all the messages, which is not what we want. Let’s mark them as “locked” when they’re being processed, and only allow one worker to process a message at a time. We’ll also give this lock a timeout so the message can escape the worker if it gets stuck.
Modify the messages
table to have a lock_expires_at
column:
id | message | lock_expires_at |
---|---|---|
1 |
{ "task": "send-email", "payload": { "kind": "account-statement", "to": "ollie@example.com" } } |
2024-08-02 14:56:03 |
2 |
{ "task": "send-email", "payload": { "kind": "account-statement", "to": "elliot@ecorp.com" } } |
null |
3 |
{ "task": "reverse-transaction", "payload": { transaction-id": 7, "reason": "declined" } } |
null |
Modify the worker to set the lock when it starts processing a message:
def callback(db_conn, payload):
row_id = int(payload)
# ------- NEW! -------
row = db_conn.fetch_one("""
UPDATE messages
SET lock_expires_at = CURRENT_TIMESTAMP + INTERVAL '1 minute'
WHERE
id = $1
AND (lock_expires_at IS NULL OR lock_expires_at < CURRENT_TIMESTAMP)
RETURNING *;
""",
row_id,
)
# ----- end NEW! -----
if row is None:
continue
message = row.message
match message.task:
case "send-email": send_email(message.payload)
case "reverse-transaction": reverse_transaction(message.payload)
case _: print(f"Unknown task: {message.task}")
db_conn.execute("DELETE FROM messages WHERE id = $1;", row_id)
db_conn.add_listener(channel="new_message", callback=callback)
await asyncio.Event().wait() # block forever so the listener doesn't close
By doing an UPDATE
with a RETURNING
clause, we can ensure the row is only
acquired by one worker at a time. If two workers try to acquire the same row at
the same time, the one that gets there first will lock the row, and the second
will not find the row when it tries to lock it due to the WHERE
clause2.
History
After processing a message, we delete it from the messages
table as that is
the “live”/“active” queue. We can write the completed messages to a
message_archive
table.
id | message | result | archived_at |
---|---|---|---|
1 |
{ "task": "send-email", "payload": { "kind": "account-statement", "to": "ollie@example.com" } } |
success | 2024-08-02 14:53:02 |
2 |
{ "task": "send-email", "payload": { "kind": "account-statement", "to": "elliot@ecorp.com" } } |
success | 2024-08-02 14:53:05 |
3 |
{ "task": "reverse-transaction", "payload": { transaction-id": 7, "reason": "declined" } } |
failed | 2024-08-02 14:54:24 |
This is done in a transaction to ensure the delete from messages
and the
insert to message_archive
happen together:
def callback(db_conn, payload):
row_id = int(payload)
row = db_conn.fetch_one("""
UPDATE messages
SET lock_expires_at = CURRENT_TIMESTAMP + INTERVAL '1 minute'
WHERE
id = $1
AND (lock_expires_at IS NULL OR lock_expires_at < CURRENT_TIMESTAMP)
RETURNING *;
""",
row_id,
)
if row is None:
continue
message = row.message
match message.task:
case "send-email": send_email(message.payload)
case "reverse-transaction": reverse_transaction(message.payload)
case _: print(f"Unknown task: {message.task}")
# ------- NEW! -------
with db_conn.transaction():
db_conn.execute("DELETE FROM messages WHERE id = $1;", row_id)
db_conn.execute(
"INSERT INTO message_archive (message, result) VALUES ($1, 'success');",
message,
)
# ----- end NEW! -----
db_conn.add_listener(channel="new_message", callback=callback)
await asyncio.Event().wait() # block forever so the listener doesn't close
Dead messages
Similar to the archive table, we can have a failed_messages
table to store
messages that failed to be processed. Unlike the archive table, these are
considered “bad”: the messages are not in a terminal state—we need to decide
what to do with them. It’s up to you to decide if they need to be retried, or
fixed and re-added to the queue, or if they can be ignored and written to the
archive table.
id | message | failure_reason | details | failed_at |
---|---|---|---|---|
1 |
{ "task": "receive-email", "payload": { "kind": "account-statement", "to": "ollie@example.com" } } |
rejected | unknown task: receive-email | 2024-08-02 14:53:02 |
2 |
{ "task": "send-email", "payload": { "kind": "account-statement", "to": "elliot@ecorp.com" } } |
runtime_error | Customer not found. Traceback: ... | 2024-08-02 14:53:05 |
3 |
{ "task": "reverse-transaction", "payload": { transaction-id": 7, "reason": "declined" } } |
lock_expired | null | 2024-08-02 14:54:24 |
Now when deleting from messages
we insert into the archive if successful or
the message_dead
table if we had an error:
def callback(db_conn, payload):
row_id = int(payload)
row = db_conn.fetch_one("""
UPDATE messages
SET lock_expires_at = CURRENT_TIMESTAMP + INTERVAL '1 minute'
WHERE
id = $1
AND (lock_expires_at IS NULL OR lock_expires_at < CURRENT_TIMESTAMP)
RETURNING *;
""",
row_id,
)
if row is None:
continue
message = row.message
match message.task:
case "send-email": status, err = send_email(message.payload)
case "reverse-transaction": status, err = reverse_transaction(message.payload)
case _: status, err = "rejected", f"unknown task: {message.task}"
with db_conn.transaction():
db_conn.execute("DELETE FROM messages WHERE id = $1;", row_id)
# ------- NEW! -------
if status == "success":
db_conn.execute(
"INSERT INTO message_archive (message, result) VALUES ($1, 'success');",
message,
)
else:
db_conn.execute(
"""INSERT INTO message_dead (message, failure_reason, details)
VALUES ($1, $2, $3);""",
message,
status,
err,
)
# ----- end NEW! -----
db_conn.add_listener(channel="new_message", callback=callback)
await asyncio.Event().wait() # block forever so the listener doesn't close
Failed workers
Our workers rely of the NOTIFY
events to pick up expired messages. This means
we need to re-notify them of existing messages that have expired locks. We can
use pg_cron
to check every minute for expired locks and send the
notifications.
CREATE OR REPLACE FUNCTION expired_lock_renotify() RETURNS void AS $$
DECLARE
message_record RECORD;
BEGIN
FOR message_record IN
SELECT id FROM messages WHERE lock_expired < NOW()
LOOP
PERFORM pg_notify('new_message', message_record.id::TEXT);
END LOOP;
END;
$$ LANGUAGE plpgsql;
SELECT cron.schedule(
'messages:expired_lock_renotify',
'* * * * *',
$$ SELECT expired_lock_renotify(); $$
);
While we’re at it, we should also make sure a worker doesn’t continue to process a job if it exceeds the timeout, even if it’s not hit an error3.
async def callback(db_conn, payload):
row_id = int(payload)
row = db_conn.fetch_one("""
UPDATE messages
SET lock_expires_at = CURRENT_TIMESTAMP + INTERVAL '1 minute'
WHERE
id = $1
AND (lock_expires_at IS NULL OR lock_expires_at < CURRENT_TIMESTAMP)
RETURNING *;
""",
row_id,
)
if row is None:
continue
message = row.message
# ------- NEW! -------
try:
async with asyncio.timeout(60):
match message.task:
case "send-email": status, err = send_email(message.payload)
case "reverse-transaction": status, err = reverse_transaction(message.payload)
case _: status, err = "rejected", f"unknown task: {message.task}"
except asyncio.TimeoutError as e:
status, err = "lock_expired", "still processing at lock expiry time"
# ----- end NEW! -----
with db_conn.transaction():
db_conn.execute("DELETE FROM messages WHERE id = $1;", row_id)
if status == "success":
db_conn.execute(
"INSERT INTO message_archive (message, result) VALUES ($1, 'success');",
message,
)
else:
db_conn.execute(
"""INSERT INTO message_dead (message, failure_reason, details)
VALUES ($1, $2, $3);""",
message,
status,
err,
)
db_conn.add_listener(channel="new_message", callback=callback)
await asyncio.Event().wait() # block forever so the listener doesn't close
Error visibility
I really hate silent failures. Right now, if a message fails and is put in the
message_dead
queue, or a lock times out and a message has to be re-processed,
or no workers are picking up messages, we have no way of knowing about it.
I don’t want these notifications/logs to be dependent on my worker code, because I want to know about the health of the system which includes when the workers are unhealthy. We’re relying on Postgres as a single point of failure here already (if our database goes down we have bigger problems), so we can use it to do the monitoring too.
We can emit a warning log to the Postgres logs when re-notifying for expired locks:
CREATE OR REPLACE FUNCTION expired_lock_renotify() RETURNS void AS $$
DECLARE
message_record RECORD;
BEGIN
FOR message_record IN
SELECT id FROM messages WHERE lock_expired < NOW()
LOOP
--- NEW ---
RAISE WARNING 'Error: message_record.id=% lock expired, re-notifying', NEW.id;
--- end NEW ---
PERFORM pg_notify('new_message', message_record.id::TEXT);
END LOOP;
END;
$$ LANGUAGE plpgsql;
SELECT cron.schedule(
'messages:expired_lock_cleanup',
'* * * * *',
$$ SELECT expired_lock_cleanup(); $$
);
We can do the same whenever a message is inserted into the message_dead
table:
CREATE OR REPLACE FUNCTION dead_message_nofify() RETURNS trigger AS $$
BEGIN
RAISE WARNING 'Error: message_dead.id=% has result=%', NEW.id, NEW.result;
-- we could set up workers to automatically try reprocess these messages:
PERFORM pg_notify('dead_message', NEW.id::TEXT);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER dead_message_trigger
AFTER INSERT ON message_dead
FOR EACH ROW
EXECUTE FUNCTION dead_message_nofify();
And we can do the same when a message hasn’t been picked up by a worker at all:
CREATE OR REPLACE FUNCTION non_locked_notify() RETURNS void AS $$
DECLARE
message RECORD;
BEGIN
FOR message IN
SELECT id FROM messages
WHERE lock_expires_at IS NULL
AND date_trunc('minute', created_at) < date_trunc('minute', now())
LOOP
RAISE WARNING 'Error: message.id=% not picked up, re-notifying', message.id;
PERFORM pg_notify('new_message', message.id::TEXT);
END LOOP;
END;
$$ LANGUAGE plpgsql;
SELECT cron.schedule(
'messages:non_locked_notify',
'* * * * *',
$$ SELECT non_locked_notify(); $$
);
Is that good enough?
I think so.
Checkout the full code on github, it’s more complete and it actually runs.
Why not just use these as our queue? We could, but: no persistence, we don’t get any of the other stuff we do later on in this article, and the payload size is limited to 8kB (what if we want big messages?).
We can make this more efficient by doing a
SELECT ... FOR UPDATE NOWAIT
before theUPDATE ... RETURNING
inside a transaction, but this simpler way is fine too.This will prevent a worker from holding onto a message forever if it’s stuck in a loop, but it will also prevent a worker from processing a message that takes a long time—in that case we would need to increase the timeout.