Hacker News new | past | comments | ask | show | jobs | submit login
Queue Despair: Ordering and Poison Messages (openmymind.net)
68 points by tbonesteaks on Dec 8, 2021 | hide | past | favorite | 46 comments



> Adding a timestamp to each message is an easy way for consumers to discard any out-of-order messages.

Not correct, but it's very easy to think timestamps will solve this. Timestamps aren't good because system times aren't synced across different computers precisely. Meaning if Producer A creates the first event, and Producer B creates a second event 50ms after (imagine a single row gets updated very quickly twice), but the system time on Producer A is 100ms ahead of Producer B and the event from Producer B gets to the consumers first (variable network latency), the event from Producer A will look like the latest event from a timestamp perspective and overwrite the Producer B event.

One way to solve is it to use not use timestamps, but use a monotonically increasing version number associated with a row that gets updated for every event/update or whatever and is sent along with the event message payload. The book, Designing data intensive systems, goes into this problem a whole lot. Recommend it to anyone discussing architecture. Issues like this will seem obvious to you after reading


Yep, and there's no straightforward way to know if you missed a message by looking at a timestamp alone. If I receive a message at time "1" then time "3", how will I know I was meant to have seen something at time "2"?

Sequence numbering is how FIX does it. And I think its quite neat that it does this at a _protocol_ level. This means that a FIX client/engine will typically take care of sequence numbering, out-of-order detection (can happen during re-send requests), buffering any ahead-of-time messages, requesting gapfills, etc. It will only present your application layer code with in-order messages.

I'm not aware of any universal pattern for dealing with poison pill messages. Completeness detection and dealing with messages that crash your system are 2 separate concerns.


The implication in the FIX use case is that you should have a single writer/consumer of these events, at least for sequencing purposes. Attempting to coordinate sequencing across multiple threads (or computers) will just slow you down. Consider that a CAS or volatile write is ~10x slower on modern x86 than a single thread tearing through the same items without any contention. A 10x slowdown is pretty much the best-case scenario if you must have a contended resource shared between multiple threads. You only lose more orders of magnitude the further you go into this rabbit hole. The LMAX Disruptor is a good example of how we can rethink these sorts of problems and solve them in novel ways. I have linked a document that I think provides a really good perspective on this: [0].

The poison pill case is a business logic thing on either side of the queue. You can either do validation up-front before sticking requests into the queue, or after the fact when processing in batches. Either way, you will ultimately need to be able to handle problems on both sides of the fence. Something that doesn't look like "poison" on the way into the queue could become pretty nasty with certain emergent state as events are processed on the other side.

[0]: https://lmax-exchange.github.io/disruptor/disruptor.html


> Not correct, but it's very easy to think timestamps will solve this. Timestamps aren't good because system times aren't synced across different computers precisely.

They can be. That's what Google Spanner does, using GPS and atomic clocks. It's not hard or expensive. An atomic clock + GPS will set you back under $1000 https://www.ebay.com/itm/174750548607 for one in a nice box, $200 for a PCB https://www.ebay.com/itm/353611628534. Apparently it gets you to with 1e-11, which is about 10 pico seconds (I think).

But that doesn't solve the problem. Lets say you have two event producers, and both produce two events at times t and t+1. Once they both arrive, it's trivial to process then in the right order. Your problem is there is an unreliable network connecting you to these producers. Those atomic clocks can guarantee those events t and t+1 are distinguishable events if they are just 10ps apart, but how long do you wait for t to arrive before you decide to process the event at t+1 because there was no event at t. I can absolutely guarantee you whatever time you decide is reasonable, the universe will at some point screw you over and drop the event occurring at on your doorstep just after you decided to process t1.

Your issue isn't that it's difficult, it's is you are living in a state of sin if you believe the problem solvable given the premises.

PS: Google Spanner doesn't attempt to do the impossible. In Spanners case there is a single event producer, and Spanner is "merely" trying to record the event consistently across multiple nodes. If there are multiple event producers then it will serialise them in some order, but if there are two disconnected, independent Spanners out there processing the same events from the same producers Google is not claiming there would decide on the same order. That would need a God more powerful than Google.


> One way to solve is it to use not use timestamps, but use a monotonically increasing version number associated with a row that gets updated for every event/update or whatever and is sent along with the event message payload.

It is a concept that is known as vector clock. I suggest take a look at vector and Lamport clocks (named after Leslie Lamport), very useful in distributed systems using messages.


FWIW GPS clocks and PTP can keep clocks synchronized well below the us.


Yes, this bit me recently. Had to add an atomic index field instead, as some messages were arriving out of order because their timestamps were the same as other messages.


I think queues are the wrong abstraction to model business processes. That's why a trivial issue like a non recoverable failure during processing a message becomes such a headache. The same goes for ordering. An orchestrator like temporal.io allows modeling your business use case using higher level abstractions that hide all this low level complexity.

Disclaimer: I'm the tech lead of the temporal.io open source project and the CEO of the affiliated company.


It is a problem only if you are mixing up application layers.

If you keep your queueing system and business process as separate layers with queueing system serving only as a means of transporting business events then you can make it all to work correctly.

Think in terms of IP protocol (as in TCP/IP). It is unsuitable for transmitting financial transactions. Yet, financial transactions can be made to work on top of it if you separate the layers and treat IP only as a component mechanism of getting data from A to B.


I think we are in agreement here. Temporal does exactly what you described. It uses queues to transporting tasks to processes. But it completely hides them from the business process code.

The issue is that 99.9% of developers use queues directly in their business applications.


Hiding this complexity is useful if it also means handling it. What are the key patterns you apply in temporal to hide it? I’ve had a look at temporal and find it really interesting.


Instead of directly using queues in a Temporal Workflow, the Workflow (which is written with plain code), schedules an Activity that the system is responsible for, behind the scenes, the Activity is just an item put on a queue. Activities have retry policies which are also handled by the system. If an Activity attempt fails and should not retry according to the policy, an exception is thrown in the Workflow to be handled using code.

Using the TypeScript SDK, you can catch that exception here: https://github.com/temporalio/samples-typescript/blob/9d9108...


Ordering is too expensive. Don't ever count on it when using an asynchronous queue. It's akin to storing session in a cache -- you're mixing your metaphors.

A queue should NEVER drop messages - otherwise it's a shit queue. Or you have a bug in your application code that needs to be fixed.

Poison messages are DEFINITELY A SMELL. This means you essentially have a broken interface contract. The code that is adding messages is expecting one thing -- code that is processing messages is expecting a different thing. It needs to be fixed by clearly documenting your message queue expectations and fixing your code. Most likely you need to add clear expectations for the lifetime of a message.


...it might be a smell, but they sure are bound to happen eventually; Corrupt user state or unhanded corner cases etc. Better plan for that in advance.


This is why I prefer putting the "bad" message into a deadletter queue instead of dropping it entirely(and alert on messages going into this queue). This unblocks the queue so it can continue working and allows one to decide if it should be dropped or re-processed.


> Ordering is too expensive. Don't ever count on it when using an asynchronous queue

In a field where precision is absolutely necessary, it's unfortunate to use the term queue to describe something that is not a queue.


Curious, what's your definition of queue?


By definition a queue is FIFO, which requires preserving ordering, or else you could end up with FIRO (first in, random out)


Rejecting a poison message explicitly IS sufficiently processing it.

It's common to have windows in time where two or more sides may not have agreed what happened before they lose communication. Most of the problem can be solved by idempotency, so when the peer retries, the receiver understands it is looking at a duplicate transaction and can discard it indicating that it succeeded.


A message that gets "skipped" - for lack of a better term - because it's already been processed, and a poison message, are not the same thing.


Yes, this is a hard problem. Not even partitioning by tenant will always help you.

This is fundamentally equivalent to database ACID constraints, and the other modes described are great in the same way that if you're able to relax some of the ACID constraints in your code (say, by not being SERIALIZABLE), you get in return nice things (like reads never deadlocking).

If you can't know if message N will change the outcome of processing message N+M, then you have to resolve that before you can proceed - as surely as a serializable database will wait for the outcome of transaction N before being able to proceed with N+M.


Apart from your unit tests, there is no such thing as "Messages do need to be strictly ordered and messages cannot be lost". You can WISH for messages to come in the right sequence, and even count on it in terms of optimization, but if an event tracks something that happened, and that event comes late - or after a week - your system cannot say "Too bad, I told you, only events in the right order here" (which, at this point, is invalid in itself, as you missed some).


If you want a reliable system it’s very important. Those concepts very much relate to Isolation and Consistency in ACID. Just take a look at different levels at https://jepsen.io/consistency

There are certainly fields with lower correctness requirements where inconsistent data is not such a big deal though.


> Even if we ignore poison messages, strict ordering on its own isn't that easy to pull off. Namely, you're limited to a single consumer and can only fetch one message at a time.

Yup. If you want to be sure, you need to persist which message for each entity X you have already processed and ignore older ones. And also make sure you handle race conditions where both messages are handled almost at the same time at two different consumers, by using a lock in a db or so. Which both are annoying, ideally I could just process messages without any care.

Spent all day trying to architecture this for a new queue where ordering matters but we need lots and lots of consumeres working at the same time prefetching messages. My case is actually a bit similar to the one in the article about a "stream of vehicle positions". I only really care about the latest one. Problem is it's hard to know which is the latest one without having a db and check if I've already processed something more recent. Any other ideas on how to efficiently solve this? As in, strategies to handle messages arriving out of order, so I can avoid that as a requirement.


Have you considered frameworks like Orleans? Presumably you would have an individual actor “grain” for each vehicle and it would serialise the messages to it for you.

The state of the vehicle is kept in memory for some time.


Sidekiq has a lot of code to deal with both of these issues.

Sidekiq does not guarantee ordering within a queue; that’s a terrible, very expensive guarantee. Developers don’t want total ordering of jobs within a queue, they want to know that Job A will fully execute before Job B. There might be 1000 other jobs in the queue that are completely independent of that ordering but we’ve screwed ourselves by forcing total queue ordering. Instead Sidekiq Pro provides a workflow API, Sidekiq::Batch, which allows the developer to author higher-level workflows for Job A -> Job B which provides the ordering guarantee.

For poison pills, we detect jobs which were running when a Sidekiq process died. If this happens multiple times, the job will be sent to the dead letter queue so the developer can deal with them manually. If they were part of a Batch, the workflow will stall until the developer fixes the issue and executes the job manually to resume the workflow.


Ordering is an expensive property.

I built a system where

  1. Sensor events were picked up by a ZWave device connected to Samsung SmartThings
  2. SmartThings would call a AWS lambda function I wrote (SmartThings lives in AWS so this is efficient)
  3. My lambda function posts an event to an SQS queue
  4. My home server takes the event off SQS and posts it to RabbitMQ
  5. A queue listener takes events from RabbitMQ and takes an action
So long as I was using ordered SQS queues I would sometimes get a 5 second delay to turn on a light. When I turned off ordering the latency was perceptible but didn't make me want to jam the button multiple times.


But why would the signal to switch on the light ever leave the building?


Because SmartThings was born out of a software consultancy that knew how to do backend cloud services, but not embedded. The v1 hub was just a pic microcontoller that sent raw radio messages up to the cloud for all parsing and processing.


(1) If you are letting people code their own event handlers you wouldn't trust them to run them on a tiny machine like that.

(2) A system like that probably wants to be able to respond to events both inside and outside the house. For instance, turn on your lights remotely with a phone. A cloud component is the reliable way to do that.

(3) At the time I couldn't find decent Zigbee or ZWave hubs other than the SmartThings hub. The cloud dependence is silly, but other than that the hub is great and connects to almost everything. I could go with a Kickstarter hack or try to roll my own but I don't think it would be much better.


> If you are letting people code their own event handlers you wouldn't trust them to run them on a tiny machine like that.

Actually, you can do exactly that now: https://developer-preview.smartthings.com/docs/devices/hub-c... Though that's only for device integrations, so not an exact solution to removing the cloud from the loop on your stuff.

ST is slowly getting away from being totally cloud-centric.


That is a good example. Off topic, but what is the reason to post the event to SQS and then to RabbitMQ? Why not take the events from SQS directly and take the action?


Events that originate from inside the house go into the RabbitMQ. RabbitMQ is the central bus for all sorts of things. In the process of diagnosing that latency I found that part was pretty fast.


One thing I will say is if you need queuing, very few people actually need global queuing, they only need per-user queuing. Have fixed a number of systems where EVERY event in the world was going through the same queue, and replaced that with an array of queues sort of solution. At the end of the day this usually doesn't even need a real queue, just some database transactions and atomic ordering columns that ensure consistency and order of the events within some very specific scope (like a user). If the most events you'll ever see in a row where their order (with respect to each other) matters is like 5-20, you probably don't even need a real queuing service.


If there only was another way to turn on a lamp…


One way to deal with this is to divide up your event streams into small streams - say one per order. Those small streams then may be aggregated into a larger stream so that you can just process events for all the orders together, for example.

If you hit a poison message, block just that smaller stream, not the aggregated larger stream. Once you fix the problem, reprocess the entire small stream starting from the poison event, or the next event after that. The "entire" stream here might be just a handful of events.

Greg Young's Event Store (https://www.eventstore.com/) works this way (there's a $by_category projection that produces the aggregated streams).

Caveat: I haven't actually implemented this mechanism because I've been able to get away without it, because we have some legacy event streams that aren't split up in this way, and because nobody else has yet added support for it to the tools I'm using.


RabbitMQ has poison message handling in Quorum queues [0]. They are also FIFO.

[0] https://www.rabbitmq.com/quorum-queues.html#poison-message-h...


Typically with busted messages, an early and easy thing to do is to just shunt them off into a "dead-letter queue". That's just the name of another queue where messages are manually handled.


Now what if the rest of your messages depended on the poison message? Think “inventory in” is the poison message and now all subsequent orders for that product are cancelled.

That’s a toy example; your system corruption could get much worse due to a poison message depending on the application.


In practice this isn’t an issue. You put alarms on that queue and triage it immediately.


I remember trying to productionize an ordered service and the SRE's were banging on about messages-of-death. Their band aid solution were isolated regions. i.e. DO NOT LET YOUR REGIONAL SERVICES COMMUNICATE. What they were worried about were global cascading failures, if/when someone pushes a mistake to prod.

It's kind of a shitty solution to the problem but there you have it, maybe it's the best that can be done. Rollout code changes gradually in individual regions and make sure a bug doesn't bring everything down.


Total ordering is rarely required, and even more rarely actually possible (without lamport or atomic clocks etc).

For more common use cases it is possible to provide the minimum guarantees required to reliably reconstruct a domain object throughout a distribute system whilst still providing a fuck-ton of scope for concurrency, batching and high throughput through better partition key choice, informed by:

A: The maximum ordering guarantees that can be provided by the data source

B: the minimum ordering guarantees required to reconstruct a domain object


Introduce processing log messages in order to maintain order and not lose any messages, even a poisoned one. The faulty messages are processed as any other message (write a 'did msg x : faulted') and then go to a fault-queue.

If you have inter-dependence between messages, you need to have a message id scheme that shows the interdependence. For example, a hierarchical message order can have a .dot delimited scheme. If a poison message is a parent, the subsequent message can go to the fault-queue as well.


My understanding that not losing any messages and strict ordering correspond to "exactly once" delivery which is not possible in general case.


You'd want / it would be an "at least once" delivery. If you need to restart delivery due to some issue, the consumer might see a message twice; it's not out of order per se, it's just that the delivery/queue system doesn't know for sure whether it got delivered, and is thus redelivering.

The consumer, of course, must be aware of & ready to handle multiple deliveries. (E.g., you keep a "log position" which represents where you've processed the incoming messages up to.) But if you need ordering+not losing messages, it's the mode you need. (Since "at most once" implies "sometimes losing messages".)


That last part about a transactional outbox rings very true. I’ve been at a few places now where people expect the message bus to always be available




Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: