For several projects I’ve opted for the even dumber approach, that works out of the box with every ORM/Query DSL framework in every language: using a normal table with SELECT FOR UPDATE SKIP LOCKED
I've done even simpler without locks (as no transaction logic), where I select a row, and then try to update a field about it being taken. If 1 row is affected, it's mine. If 0, someone else did it before me and I select a new row.
I've used this for tasks at big organizations without issue. No need for any special deployments or new infra. Just spin up a few worker threads in your app. Perhaps a thread to reset abandoned tasks. But in three years this never actually happened, as everything was contained in try/catch that would add it back to the queue, and our java app was damn stable.
PSA: This is a read-modify-write pattern, thus it is not safe under concurrency unless a transaction isolation level of SERIALIZABLE is specified, or some locking mechanism is used (select for update etc).
The part about checking the number of affected rows hints at using `UPDATE ... WHERE ...` which should act as an atomic CAS regardless of isolation level.
Edit: To clarify, I mean `SELECT id WHERE used = 0` followed by `UPDATE ... SET used = 1 WHERE id = ... AND used = 0`
I don't get it :(. Why could the same task be executed more than once? From my understanding, if the UPDATE is atomic, only one worker will be able to set `used = 1`. If the update statement is not successful (affected != 1), then the worker should drop the task and do another select.
With a transaction isolation level below SERIALIZABLE you can have two transactions that both read the old row (with `used = 0`) at the time they perform the update (but before they commit the transaction). In that case, both transactions will have performed an update (rows affected = 1).
Why would both transactions see `used = 0`? The DB server tries to isolate transactions and actively hides effects of other transactions that have not committed yet.
This is not true in postgres. When the second transaction tries to update the row, it will wait for the first transaction to commit first and then recheck the WHERE.
This should be safe under SI (other than the ABA issue, which isn't even fixed with serializable). The update forces a W-W conflict, which is sufficient to make the behavior serializable under SI (and therefore, I think but am not sure, PG's RR level too).
I guess you update it with the assigned worker id, where the "taken by" field is currently null? Does it mean that workers have persistent identities, something like an index? How do you deal with workers being replaced, scaled down, etc?
Just curious. We maintained a custom background processing system for years but recently replaced it with off the shelf stuff, so I'm really interested in how others are doing similar stuff.
No, just update set taken=1. If it was a change to the row, you updated it. If it wasn't, someone updated before you.
Our tasks were quick enough so that all fetched tasks would always be able to be completed before a scale down / new deploy etc, but we stopped fetching new ones when the signal came so it just finished what it had. I updated above, we did have logic to monitor if a task got taken but never got a finished status, but I can't remember it ever actually reporting on anything.
I would set the taken field to a timestamp. Then you could have a cleanup job that looks for any lingering jobs aged past a reasonable timeout and null out the field.
We have a "status flag" column which is either Available, Locked or Processed (A, L and P), an Updated column with a timestamp of when it was last updated, and a Version counter.
When grabbing a new message it selects "Available or (Locked with Updated timestamp older than configured timeout)". If successful it immediately tries to set the Locked status, Updated timestamp and bumps the Version counter, where the previous values of Status and Version has to match. If the update fails it retries getting a new message.
If the Version counter is too high, it moves the message to the associated dead-letter table, and retries getting a new message.
This isn't for high performance. I tested it and got 1000 messages/sec throughput with handful of producers and consumers against test db instance (limited hardware), which would be plenty for us.
I wrote it to be simple and so we could easily move to something AMPQ'ish like RabbitMQ or Azure Service Bus when needed. Overall quite easy to implement and has served us well so far.
it wont work with a timestamp because each write will have an affected row of 1 beacuse the writes happen at different times. setting a boolean is static
That is the sort of thing that bites you hard when it bites. It might run perfectly for years but that one period of flappy downtime at a third party or slightly misconfigured DNS will bite you hard.
But compared to our rabbit setup where I work now, it was dead stable. No losing tasks or extra engineering effort on maintaining yet another piece of tech. Our rabbit cluster acting up has led to multiple disasters lately.
Agreed, I've had my own rabbit nightmares. But setting up a more robust queue on postgresql is easy, so you can easily gain a lot more guarantees without more complexity.
I've done this successfully with a web service front that retrieves jobs to send to workers for processing, by using a SQL table queue. That web service ran without a hitch for a long time, serving about 10 to 50 job consumers for fast and highly concurrent queues.
My approach was:
- Accept the inbound call
- Generate a 20 character random string (used as a signature)
- Execute a sql query that selects the oldest job without a signature and write the signature, return the primary key of the job that was updated.
- If it errors for any reason, loop back and attempt again, but only 10 times, as some underlying issue exists (10 collisions is statistically improbable for my use case)
- Read the primary key returned by that sql query and read it, comparing it's signature to my random one.
- If a hit, return the job to the caller
- If a miss, loop back and start again, incrementing attempts by 1.
The caller has to handle the possibility that a call to this web service won't return anything, either due to no jobs existing, or the collision/error threshold being reached.
In either case, the caller backs for it's configured time, then calls again.
Callers are usually in 'while true' loops, only existing if they get an external signal to close or an uncontrolled crash.
If you take this approach, you will have a function or a web service that converts the SQL table into a job queue service. When you do that, you can build metrics on the amount of collisions you get while trying to pull and assign jobs to workers.
I had inbuilt processes that would sweep through jobs that were assigned (had a job signature) and weren't marked as complete, it actioned those to handle the condition of a crashed worker.
There are many many other services the proper job queues offer, but that usually means more dependencies, and code libraries / containers, so just build in the functionality you need.
If it is accurate, fast enough, and stable, you've got the best solution for you.
The reason why you want to use skip locked is so that Postgres can automatically skip rows that are being concurrently accessed for updating the "status". You are right, if you update a "status" field you don't really need to worry about advisory locks and skipping rows that are locked but it still helps with performance if you have a decent amount of concurrent consumers polling the table.
I recently got introduced to this system at work, and also built a new job using it. It works fine, but since I had to implement work stealing to deal with abandoned jobs in a timely manner, I wouldn't dare to use it for actions that absolutely must not happen twice.
Exactly-once is only meaningfully possible if you have a rollback for tasks of unknown completion state - for example if the task involves manipulating the same database as the one controlling the task execution. Otherwise, it becomes the (impossible to solve) two-generals problem between updating the task status and performing the task.
There is actually another possibility: there must be a way to check whether the receiving system has received the message. But this only works if there are no "rogue" senders.
Agenda uses this, and we found the hard way on mongo 4 that it can lead to mongo spinning the CPU at 100% if it gets too many at once. No idea if they've fixed it in later versions.
I recently published a manifesto and code snippets for exactly this in Postgres!
delete from task
where task_id in
( select task_id
from task
order by random() -- use tablesample for better performance
for update
skip locked
limit 1
)
returning task_id, task_type, params::jsonb as params
Holding a transaction open for the duration of a request to an external service makes me nervous. I've seen similar code lock up the database and bring down production. Are you using timeouts and circuit breakers to control the length of the transactions?
Yes, you absolutely need to set a reasonable idle transaction timeout to avoid a disaster (bugs in the code happen) - this can also be done globally in the database settings.
This article was written in 2015, a year before idle_in_transaction_session_timeout parameter was added (in Postgres 9.6) - which is unfortunately still disabled by default, but that's the easiest way to make sure no transaction sits idle for too long.
In my experience, a queue system is the worst thing to find out isn't scaling properly because once you find out your queue system can't architecturally scale, there's no easy fix to avoid data loss. You talk about "several thousand background jobs" but generally, queues are measured in terms of Little's Law [1] for which you need to be talking about rates; according to Little's Law namely average task enqueue rate per second and average task duration per second. Raw numbers don't mean that much.
In the beginning you can do a naive UPDATE ... SET, which locks way too much. While you can make your locking more efficient, doing UPDATE with SELECT subqueries for dequeues and SELECT FOR UPDATE SKIP LOCKED, eventually your dequeue queries will throttle each other's locks and your queue will grind to a halt. You can try to disable enqueues at that point to give your DB more breathing room but you'll have data loss on lost enqueues and it'll mostly be your dequeues locking each other out.
You can try very quickly to shard out your task tables to avoid locking and that may work but it's brittle to roll out across multiple workers and can result in data loss. You can of course drop a random subset of tasks but this will cause data loss. Any of these options is not only highly stressful in a production scenario but also very hard to recover from without a ground-up rearchitecture.
Is this kind of a nightmare production scenario really worth choosing Boring Technology? Maybe if you have a handful of customers and are confident you'll be working at tens of tasks per second forever. Having been in the hot seat for one of these I will always choose a real queue technology over a database when possible.
> and are confident you'll be working at tens of tasks per second forever.
It's more like a few thousand per second, and enqueues win, not dequeues like you say... on very small hardware without tuning. If you're at tens of tasks per second, you have a whole lot of breathing room: don't build for 100x current requirements.
This link is simply raw enqueue/dequeue performance. Factor in workers that perform work or execute remote calls and the numbers change. Also, I find when your jobs have high variance in times, performance degrades significantly.
> This doesn't really make sense to me. To me, the main problem seems to be that you end up with having a lot of snapshots around.
The dequeuer needs to know which tasks to "claim", so this requires some form of locking. Eventually this becomes a bottleneck.
> don't build for 100x current requirements
What happens if you get 100x traffic? Popularity spikes can do it, so can attacks. Is the answer to just accept data loss in those situations? Queue systems are super simple to use. I'm counting "NOTIFY/LISTEN" on Postgres as a queue, because it is a queue from the bottom up.
> Factor in workers that perform work or execute remote calls and the numbers change.
These don't occur on the database server, though... This merely affects the number of rows currently claimed.
> The dequeuer needs to know which tasks to "claim", so this requires some form of locking. Eventually this becomes a bottleneck.
These are just try locks, though-- the row locks are not contended. The big thing you run into is having lots of snapshots around and having to skip a lot of claimed rows for each dequeue.
> What happens if you get 100x traffic? Popularity spikes can do it, so can attacks.
If you get 100x the queueing activity for batch jobs, you're going to have stuff break well before the queue. It's probably not too easy to get 100x the drain rate, even if your queue system can handle it.
This scales well beyond 100M batch tasks per day, which gets you to 1M users with 100 tasks/day each.
Throttle the inputs. Rate-limiting doesn’t belong to the data layer.
While throttling due to organic popularity isn’t great, I’d argue the tradeoffs might be worthwhile. If it looks like the spike will last, stand up Redis during the throttling, double-write, and throttle down the Postgres queue until it’s empty. If you really need to, take a 15 minute outage to just copy data over.
What happens when you get 500x the traffic or 50x?
How does the system behave when the traffic rate is higher for which it was designed for or can currently handle? Because that number will always be there, even in a "scalable" system. One won't be able to add capacity at the same rate that work will increase.
NOTIFY/LISTEN isn't a queue it has broadcast semantics. Postgres queueing is really just the SELECT FOR UPDATE SKIP LOCKED, the NOTIFY/LISTEN allows you to reduce the latency a bit but not essential.
If you find yourself in that situation, migrating to a more performant queuing solution is not that much of a leap. You already have an overall system architecture that scales well (async processing with a queue).
_Ideally_ the queuing technology is abstracted from the job-submitters/job-runners anyway. It's a bit more work if multiple services are just writing to the queue table directly.
I agree that the _moment_ the system comes to a screeching halt is definitely not fun.
You are going to have the same scaling issues with your datastore. I don't really understand why you say that your dequeue queries will throttle each others locks and grind it to a half? Isn't that the whole point of SKIP LOCKED?
>Applied to job records, this feature enables simple queue processing queries, e.g. SELECT * FROM jobs ORDER BY created_at FOR UPDATE SKIP LOCKED LIMIT 1.
Also, postgres partial indexes can be quite helpful in situations where you want to persist and query intermediate job lifecycle state and don't want multiple rows or tables to track one type of job queue
Skip locked is useful till you have to maintain order for a group of messages with some "group_id", so that set of related messages are sent one after the other.
Then you probably have to write complicated queries or use partitions in some sort.
article says he also uses "order by" clause, but I am wondering if it will severely limit throughput since all messages will need to be sorted on each lookup, but this probably can be solved by introducing index.
It seems strictly worse to use ORDER BY in this case, since if you're using SKIP LOCKED you should be doing parallel processing anyway, and if you're doing parallel processing, ordering is already going out the window.
Unless you can guarantee that the processing time of each job is exactly the same, if you have multiple workers processing the same queue, you can’t order anything except the start time.
You can use locks to effectively break the queue into sub queues so that each sub queue is only being processed by 1 worker. Then you can order that sub queue.
job should be attempted inthe same order/priority they are enqueued, that's the meaning of the word "queue". That they take varrying amounts of time is another matter.
Queue can clearly mean "work that needs to be completed" not necessarily 'work completed in order'. Your definition is much stricter than it needs to be for most use cases.
There is clearly a conceptual difference between a set of things from which you pull things out randomly, and a queue. A queue always has intrinsic criteria to select the next item to be pulled out.
There are many times when the start order doesn’t really matter, and the additional sorting overhead isn’t worth it. In those cases people will still tend to refer to the entity holding the jobs to be processed as a queue despite the fact that it doesn’t strictly follow FIFO order.
If they are being technically precise, queue isn’t the correct term, but language changes with context and time. Either way the implementation isn’t wrong if strict start order has been considered and isn’t important.
I’m not confusing anything. I’ve seen random selection “job queues” implemented many times. As long as you truly don’t care about start order, it’s fine to trade it for increased throughout.
It means that you are telling pg that you don’t care about order, so it is free to optimize the query in whatever way it wants to. The order can change query to query depending on numerous external factors.
I’m not using pg itself as an example. I’m using a specific implementation of a “job queue” built with pg.
I’ve seen and you can search for and find many implementations of “job queues” using relational databases where job start order guarantees are traded away for throughput.
You have no ordering guarantees, so how can order be important? If 4 work items are scheduled on 4 independent workers, you have no guarantee which will start first or finish first.
I think the order matter at least because you want to have some FIFO approximation, otherwise some tasks can forever stuck in queue and never be picked up.
https://www.pgcasts.com/episodes/the-skip-locked-feature-in-...
It’s not “web scale” but it easily extends to several thousand background jobs in my experience