Use what you already have: Building a message queue on Postgres

August 2024

See the code at github.com/oliverlambson/pgmq.

When you’re building a decently-sized webapp, you will often end up wanting a way to shift work off of your webserver: you don’t want it getting bogged down in “heavy” work, and you don’t want to have to wait for non-critical things to run before you can send a response. Normally, this is done with a message queue: your webserver publishes instructions to somewhere, and you set up another server (a “worker”) that listens for those instructions and does the hard work.

Why? This must be a solved problem?

There are lots of solutions to this problem. Most/all are probably better. But I am a meme-driven-developer, so here we are.

Seriously though, the solutions I’m aware of are either overkill: using Rabbit/SQS/etc means I have to maintain another piece of infrastructure, and Celery/Symfony Messenger/whatever else lock me in to a language or framework-specific implementation, which is annoying. I’m already running a Postgres server, can’t I just use that? It only has to be good-enough and I’ll be happy.

This solution is under 250 lines of Python and SQL, it runs on my existing infrastructure, and is easy to understand. Maybe one day I’ll need a “proper” solution, but until then this seems fine.

What’s the simplest solution?

Add messages to a table. Set up a worker to poll it. Once the work is done, delete the row.

You could have a table called messages that looks like this:

id message
1
{
  "task": "send-email",
  "payload": {
    "kind": "account-statement",
    "to": "ollie@example.com"
  }
}
2
{
  "task": "send-email",
  "payload": {
    "kind": "account-statement",
    "to": "elliot@ecorp.com"
  }
}
3
{
  "task": "reverse-transaction",
  "payload": {
    transaction-id": 7,
    "reason": "declined"
  }
}

And your worker code could be something like this:

while True:
    sleep(0.1)

    row = db_conn.fetch_one("SELECT id, message FROM messages LIMIT 1;")
    if row is None:
        continue

    message = row.message
    match message.task:
        case "send-email": send_email(message.payload)
        case "reverse-transaction": reverse_transaction(message.payload)
        case _: print(f"Unknown task: {message.task}")

    db_conn.execute("DELETE FROM messages WHERE id = $1;", row.id)

This is probably fine, it might even meet the criteria for “good enough”, but it doesn’t feel good. What if my worker fails? What if I want multiple workers? What if I want messages to timeout? What if I want history of messages that were processed? Ew, why are we using polling? How do I know that messages are being processed ok? How will I know when a message fails? How will I know when a message is bad? How do I retry failed messages?

Making it less garbage

Polling

Even though the polling is probably the least-bad part, let’s fix that first because it feels the worst. Postgres has a built-in asynchronous notification mechanism with the NOTIFY/LISTEN commands. We can use this to tell our worker when a new message is added to the messages table, and only then do the read the row, meaning we don’t have to constantly poll the table for messages1.

We can set up a trigger on the messages table to send the id of the new row whenever one is inserted:

CREATE OR REPLACE FUNCTION new_message_nofify() RETURNS trigger AS $$
BEGIN
    PERFORM pg_notify('new_message', NEW.id::TEXT);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER new_message_trigger
AFTER INSERT ON messages
FOR EACH ROW
EXECUTE FUNCTION new_message_nofify();

And modify the worker to listen to the new_message channel and run the message processing whenever a notification comes through:

def callback(db_conn, payload):
    row_id = int(payload)

    row = db_conn.fetch_one("SELECT message FROM messages WHERE id = $1;", row_id)
    if row is None:
        continue

    message = row.message
    match message.task:
        case "send-email": send_email(message.payload)
        case "reverse-transaction": reverse_transaction(message.payload)
        case _: print(f"Unknown task: {message.task}")

    db_conn.execute("DELETE FROM messages WHERE id = $1;", row_id)

# ------- NEW! -------
db_conn.add_listener(channel="new_message", callback=callback)
await asyncio.Event().wait() # block forever so the listener doesn't close
# ----- end NEW! -----

Locking

Now, imagine things are going well and we have a lot of messages to process: we’re going to need multiple workers. Right now if we have two workers, they’ll both process all the messages, which is not what we want. Let’s mark them as “locked” when they’re being processed, and only allow one worker to process a message at a time. We’ll also give this lock a timeout so the message can escape the worker if it gets stuck.

Modify the messages table to have a lock_expires_at column:

id message lock_expires_at
1
{
  "task": "send-email",
  "payload": {
    "kind": "account-statement",
    "to": "ollie@example.com"
  }
}
2024-08-02 14:56:03
2
{
  "task": "send-email",
  "payload": {
    "kind": "account-statement",
    "to": "elliot@ecorp.com"
  }
}
null
3
{
  "task": "reverse-transaction",
  "payload": {
    transaction-id": 7,
    "reason": "declined"
  }
}
null

Modify the worker to set the lock when it starts processing a message:

def callback(db_conn, payload):
    row_id = int(payload)

    # ------- NEW! -------
    row = db_conn.fetch_one("""
        UPDATE messages
        SET lock_expires_at = CURRENT_TIMESTAMP + INTERVAL '1 minute'
        WHERE
          id = $1
          AND (lock_expires_at IS NULL OR lock_expires_at < CURRENT_TIMESTAMP)
        RETURNING *;
        """,
        row_id,
    )
    # ----- end NEW! -----
    if row is None:
        continue

    message = row.message
    match message.task:
        case "send-email": send_email(message.payload)
        case "reverse-transaction": reverse_transaction(message.payload)
        case _: print(f"Unknown task: {message.task}")

    db_conn.execute("DELETE FROM messages WHERE id = $1;", row_id)

db_conn.add_listener(channel="new_message", callback=callback)
await asyncio.Event().wait() # block forever so the listener doesn't close

By doing an UPDATE with a RETURNING clause, we can ensure the row is only acquired by one worker at a time. If two workers try to acquire the same row at the same time, the one that gets there first will lock the row, and the second will not find the row when it tries to lock it due to the WHERE clause2.

History

After processing a message, we delete it from the messages table as that is the “live”/“active” queue. We can write the completed messages to a message_archive table.

id message result archived_at
1
{
  "task": "send-email",
  "payload": {
    "kind": "account-statement",
    "to": "ollie@example.com"
  }
}
success 2024-08-02 14:53:02
2
{
  "task": "send-email",
  "payload": {
    "kind": "account-statement",
    "to": "elliot@ecorp.com"
  }
}
success 2024-08-02 14:53:05
3
{
  "task": "reverse-transaction",
  "payload": {
    transaction-id": 7,
    "reason": "declined"
  }
}
failed 2024-08-02 14:54:24

This is done in a transaction to ensure the delete from messages and the insert to message_archive happen together:

def callback(db_conn, payload):
    row_id = int(payload)

    row = db_conn.fetch_one("""
        UPDATE messages
        SET lock_expires_at = CURRENT_TIMESTAMP + INTERVAL '1 minute'
        WHERE
          id = $1
          AND (lock_expires_at IS NULL OR lock_expires_at < CURRENT_TIMESTAMP)
        RETURNING *;
        """,
        row_id,
    )
    if row is None:
        continue

    message = row.message
    match message.task:
        case "send-email": send_email(message.payload)
        case "reverse-transaction": reverse_transaction(message.payload)
        case _: print(f"Unknown task: {message.task}")

    # ------- NEW! -------
    with db_conn.transaction():
        db_conn.execute("DELETE FROM messages WHERE id = $1;", row_id)
        db_conn.execute(
            "INSERT INTO message_archive (message, result) VALUES ($1, 'success');",
            message,
        )
    # ----- end NEW! -----

db_conn.add_listener(channel="new_message", callback=callback)
await asyncio.Event().wait() # block forever so the listener doesn't close

Dead messages

Similar to the archive table, we can have a failed_messages table to store messages that failed to be processed. Unlike the archive table, these are considered “bad”: the messages are not in a terminal state—we need to decide what to do with them. It’s up to you to decide if they need to be retried, or fixed and re-added to the queue, or if they can be ignored and written to the archive table.

id message failure_reason details failed_at
1
{
  "task": "receive-email",
  "payload": {
    "kind": "account-statement",
    "to": "ollie@example.com"
  }
}
rejected unknown task: receive-email 2024-08-02 14:53:02
2
{
  "task": "send-email",
  "payload": {
    "kind": "account-statement",
    "to": "elliot@ecorp.com"
  }
}
runtime_error Customer not found. Traceback: ... 2024-08-02 14:53:05
3
{
  "task": "reverse-transaction",
  "payload": {
    transaction-id": 7,
    "reason": "declined"
  }
}
lock_expired null 2024-08-02 14:54:24

Now when deleting from messages we insert into the archive if successful or the message_dead table if we had an error:

def callback(db_conn, payload):
    row_id = int(payload)

    row = db_conn.fetch_one("""
        UPDATE messages
        SET lock_expires_at = CURRENT_TIMESTAMP + INTERVAL '1 minute'
        WHERE
          id = $1
          AND (lock_expires_at IS NULL OR lock_expires_at < CURRENT_TIMESTAMP)
        RETURNING *;
        """,
        row_id,
    )
    if row is None:
        continue

    message = row.message
    match message.task:
        case "send-email": status, err = send_email(message.payload)
        case "reverse-transaction": status, err = reverse_transaction(message.payload)
        case _: status, err = "rejected", f"unknown task: {message.task}"

    with db_conn.transaction():
        db_conn.execute("DELETE FROM messages WHERE id = $1;", row_id)
        # ------- NEW! -------
        if status == "success":
            db_conn.execute(
                "INSERT INTO message_archive (message, result) VALUES ($1, 'success');",
                message,
            )
        else:
            db_conn.execute(
                """INSERT INTO message_dead (message, failure_reason, details)
                VALUES ($1, $2, $3);""",
                message,
                status,
                err,
            )
        # ----- end NEW! -----

db_conn.add_listener(channel="new_message", callback=callback)
await asyncio.Event().wait() # block forever so the listener doesn't close

Failed workers

Our workers rely of the NOTIFY events to pick up expired messages. This means we need to re-notify them of existing messages that have expired locks. We can use pg_cron to check every minute for expired locks and send the notifications.

CREATE OR REPLACE FUNCTION expired_lock_renotify() RETURNS void AS $$
DECLARE
    message_record RECORD;
BEGIN
    FOR message_record IN
        SELECT id FROM messages WHERE lock_expired < NOW()
    LOOP
        PERFORM pg_notify('new_message', message_record.id::TEXT);
    END LOOP;
END;
$$ LANGUAGE plpgsql;

SELECT cron.schedule(
    'messages:expired_lock_renotify',
    '* * * * *',
    $$ SELECT expired_lock_renotify(); $$
);

While we’re at it, we should also make sure a worker doesn’t continue to process a job if it exceeds the timeout, even if it’s not hit an error3.

async def callback(db_conn, payload):
    row_id = int(payload)

    row = db_conn.fetch_one("""
        UPDATE messages
        SET lock_expires_at = CURRENT_TIMESTAMP + INTERVAL '1 minute'
        WHERE
          id = $1
          AND (lock_expires_at IS NULL OR lock_expires_at < CURRENT_TIMESTAMP)
        RETURNING *;
        """,
        row_id,
    )
    if row is None:
        continue

    message = row.message

    # ------- NEW! -------
    try:
        async with asyncio.timeout(60):
            match message.task:
                case "send-email": status, err = send_email(message.payload)
                case "reverse-transaction": status, err = reverse_transaction(message.payload)
                case _: status, err = "rejected", f"unknown task: {message.task}"
    except asyncio.TimeoutError as e:
        status, err = "lock_expired", "still processing at lock expiry time"
    # ----- end NEW! -----

    with db_conn.transaction():
        db_conn.execute("DELETE FROM messages WHERE id = $1;", row_id)
        if status == "success":
            db_conn.execute(
                "INSERT INTO message_archive (message, result) VALUES ($1, 'success');",
                message,
            )
        else:
            db_conn.execute(
                """INSERT INTO message_dead (message, failure_reason, details)
                VALUES ($1, $2, $3);""",
                message,
                status,
                err,
            )

db_conn.add_listener(channel="new_message", callback=callback)
await asyncio.Event().wait() # block forever so the listener doesn't close

Error visibility

I really hate silent failures. Right now, if a message fails and is put in the message_dead queue, or a lock times out and a message has to be re-processed, or no workers are picking up messages, we have no way of knowing about it.

I don’t want these notifications/logs to be dependent on my worker code, because I want to know about the health of the system which includes when the workers are unhealthy. We’re relying on Postgres as a single point of failure here already (if our database goes down we have bigger problems), so we can use it to do the monitoring too.

We can emit a warning log to the Postgres logs when re-notifying for expired locks:

CREATE OR REPLACE FUNCTION expired_lock_renotify() RETURNS void AS $$
DECLARE
    message_record RECORD;
BEGIN
    FOR message_record IN
        SELECT id FROM messages WHERE lock_expired < NOW()
    LOOP
        --- NEW ---
        RAISE WARNING 'Error: message_record.id=% lock expired, re-notifying', NEW.id;
        --- end NEW ---
        PERFORM pg_notify('new_message', message_record.id::TEXT);
    END LOOP;
END;
$$ LANGUAGE plpgsql;

SELECT cron.schedule(
    'messages:expired_lock_cleanup',
    '* * * * *',
    $$ SELECT expired_lock_cleanup(); $$
);

We can do the same whenever a message is inserted into the message_dead table:

CREATE OR REPLACE FUNCTION dead_message_nofify() RETURNS trigger AS $$
BEGIN
    RAISE WARNING 'Error: message_dead.id=% has result=%', NEW.id, NEW.result;
    -- we could set up workers to automatically try reprocess these messages:
    PERFORM pg_notify('dead_message', NEW.id::TEXT);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER dead_message_trigger
AFTER INSERT ON message_dead
FOR EACH ROW
EXECUTE FUNCTION dead_message_nofify();

And we can do the same when a message hasn’t been picked up by a worker at all:

CREATE OR REPLACE FUNCTION non_locked_notify() RETURNS void AS $$
DECLARE
    message RECORD;
BEGIN
    FOR message IN
        SELECT id FROM messages
        WHERE lock_expires_at IS NULL
          AND date_trunc('minute', created_at) < date_trunc('minute', now())
    LOOP
        RAISE WARNING 'Error: message.id=% not picked up, re-notifying', message.id;
        PERFORM pg_notify('new_message', message.id::TEXT);
    END LOOP;
END;
$$ LANGUAGE plpgsql;

SELECT cron.schedule(
    'messages:non_locked_notify',
    '* * * * *',
    $$ SELECT non_locked_notify(); $$
);

Is that good enough?

I think so.

Checkout the full code on github, it’s more complete and it actually runs.


  1. Why not just use these as our queue? We could, but: no persistence, we don’t get any of the other stuff we do later on in this article, and the payload size is limited to 8kB (what if we want big messages?).

  2. We can make this more efficient by doing a SELECT ... FOR UPDATE NOWAIT before the UPDATE ... RETURNING inside a transaction, but this simpler way is fine too.

  3. This will prevent a worker from holding onto a message forever if it’s stuck in a loop, but it will also prevent a worker from processing a message that takes a long time—in that case we would need to increase the timeout.