Hacker Newsnew | past | comments | ask | show | jobs | submit | galeaspablo's commentslogin

Some input from previously working on a superset of this problem. And being in a similar position.

Mature projects have too much bureacracy, and even spending time talking to you = opportunity cost. So making a case for why you're going to solve a problem for them is tough.

New projects (whether at big companies or small companies) have 20 other things to worry about, so the problem isn't big enough.

I wrote about this in our blog if you're curious: https://ambar.cloud/blog/a-new-path-for-ambar


Thanks. Interesting read, and an interesting product / service. Have been thinking about the same approach myself…


It is useful, but it is not generally applicable.

Given an arbitrary causality graph between n messages, it would be ideal if you could consume your messages in topological order. And that you could do so in O(n log n).

No queuing system in the world does arbitrary causality graphs without O(n^2) costs. I dream of the day where this changes.

And because of this, we’ve adapted our message causality topologies to cope with the consuming mechanisms of Kafka et al

To make this less abstract, imagine you have two bank accounts, each with a stream. MoneyOut in Bob’s account should come BEFORE MoneyIn when he transfers to Alice’s account, despite each bank account having different partition keys.


Can you elaborate on how you have “adapted…message causality topologies to cope with consuming mechanisms” in relation to the example of a bank account? The causality topology being what here, couldn’t one day MoneyIn should come before else there can be now true MoneyOut?


Right on, great question. Some examples:

Example Option 1

You give up on the guarantees across partition keys (bank accounts), and you accept that balances will not reflect a causally consistent state of the past.

E.g., Bob deposits 100, Bob sends 50 to Alice.

Balances: Bob 0 Alice 50 # the source system was never in this state Bob 100 Alice 50 # the source system was never in this state Bob 50 Alice 50 # eventually consistent final state

Example Option 2

You give up on parallelism, and consume in total order (i.e., one single partition / unit of parallelism - e.g., in Kafka set a partitioner that always hashes to the same value).

Example Option 3

In the consumer you "wait" whenever you get a message that violates causal order.

E.g., Bob deposits 100 Bob sends 50 to Alice (Bob-MoneyOut 50 -> Alice-MoneyIn 50).

If we attempt to consume Alice-MoneyIn before Bob-MoneyOut, we exponentially back off from the partition containing Alice-MoneyIn.

(Option 3 is terrible because of O(n^2) processing times in the worst case and the possibility for deadlocks (two partitions are waiting for one another))


Thanks. With these of examples of messages appearance in time and in physical location in Kafka, how have you adapted your consumers? Which scenario / architectural decision (one of the examples?) have you moved forward with and creating support to yield your desired causality handling?


Option 1, but after so many years banging our heads against the wall reasoning about this, we hoped someone would eventually give us a queue that supports arbitrary causal dependency graphs.

We thought about building it ourselves, because we know the data structures, high level algorithms, and disk optimizations required. BUT we pivoted our company, so we've postponed this for the foreseeable future. After all, theory is relatively easy, but a true production grade implementation takes years.


hmmm... could this be solved by "vector clocks"? if producers are emitting something that depends on a previous event they send the id of the previous event. (so like capabilities, you need proof of "data access".)

or the problem is that again this is O(n^2)? (because then the consumers now need to buffer [potentially] n key streams (and then search for them every time - so "n" times)?


Great intuition. Exactly right. More details in one of my other comments, here: https://news.ycombinator.com/item?id=43796688


Agreed. The head of line problem is worth solving for certain use cases.

But today, all streaming systems (or workarounds) with per message key acknowledgements incur O(n^2) costs in either computation, bandwidth, or storage per n messages. This applies to Pulsar for example, which is often used for this feature.

Now, now, this degenerate time/space complexity might not show up every day, but when it does, you’re toast, and you have to wait it out.

My colleagues and I have studied this problem in depth for years, and our conclusion is that a fundamental architectural change is needed to support scalable per message key acknowledgements. Furthermore, the architecture will fundamentally require a sorted index, meaning that any such a queuing / streaming system will process n messages in O (n log n).

We’ve wanted to blog about this for a while, but never found the time. I hope this comment helps out if you’re thinking of relying on per message key acknowledgments; you should expect sporadic outages / delays.


Check out the parallel consumer: https://github.com/confluentinc/parallel-consumer

It processes unrelated keys in parallel within a partition. It has to track what offsets have been processed between the last committed offset of the partition and the tip (i.e. only what's currently processed out of order). When it commits, it saves this state in the commit metadata highly compressed.

Most of the time, it was only processing a small number of records out of order so this bookkeeping was insignificant, but if one key gets stuck, it would scale to at least 100,000 offsets ahead, at which point enough alarms would go off that we would do something. That's definitely a huge improvement to head of line blocking.


Disclosure (given this is from Confluent): I'm ex MSK (Managed Streaming for Kafka at AWS) and my current company was competing with Confluent before we pivoted.

Yup, this is one more example, just like Pulsar. There are definitely great optimizations to be made on the average case. In the case of parallel consumer, if you'd like to keep ordering guarantees, you retain O(n^2) processing time in the worst case.

The issues arise when you try to traverse arbitrary dependency topologies in your messages. So you're left with two options:

1. Make damn sure that causal dependencies don't exhibit O(n^2) behavior, which requires formal models to be 100% sure. 2. Give up ordering or make some other nasty tradeoff.

At a high level the problem boils down to traversing a DAG in topological order. From computer science theory, we know that this requires a sorted index. And if you're implementing an index on top of Kafka, you might as well embed your data into and consume directly from the index. Of course, this is easier said than done, and that's why no one has cracked this problem yet. We were going to try, but alas we pivoted :)

Edit: Topological sort does not required a sorted index (or similar) if you don't care about concurrency. But then you've lost the advantages of your queue.


> traverse arbitrary dependency topologies

Is there another way to state this? It’s very difficult for me to grok.

> DAG

Directed acyclic graph right?


Apologies, we've been so deep into this problem that we take our slang for granted :)

A graphical representation might be worth a thousand words, keeping in mind it's just one example. Imagine you're traversing the following.

A1 -> A2 -> A3...

|

v

B1 -> B2 -> B3...

|

v

C1 -> C2 -> C3...

|

v

D1 -> D2 -> D3...

|

v

E1 -> E2 -> E3...

|

v

F1 -> F2 -> F3...

|

v

...

Efficient concurrent consumption of these messages (while respecting causal dependency) would take O(w + h), where w = the _width_ (left to right) of the longest sequence, and h = the _height_ (top to bottom of the first column)

But Pulsar, Kafka + parallel consumer, Et al. would take O(n^2) either in processing time or in space complexity. This is because at a fundamental level, the underlying data storages store looks like this

A1 -> A2 -> A3...

B1 -> B2 -> B3...

C1 -> C2 -> C3...

D1 -> D2 -> D3...

E1 -> E2 -> E3...

F1 -> F2 -> F3...

Notice that the underlying data storage loses information about nodes with multiple children (e.g., A1 previously parented both A2 and B1)

If we want to respect order, the consumer will be responsible for declining to process messages that don't respect causal order. E.g., attempting to process F1 before E1. Thus we could get into a situation where we try to process F1, then E1, then D1, then C1, then B1, then A1. Now that A1 is processed, kafka tries again, but it tries F1, then E1, then D1, then C1, then B1... And so on and so forth. This is O(n^2) behavior.

Without changing the underlying data storage architecture, you will either:

1. Incur O(n^2) space or time complexity

2. Reimplement the queuing mechanism at the consumer level, but then you might as well not even use Kafka (or others) at all. In practice this is not practical (my evidence being that no one has pulled it off).

3. Face other nasty issues (e.g., in Kafka parallel consumer you can run out of memory or your processing time can become O(n^2)).


Wanted to say thanks so much for writing this all out - I've always thought of ordering as being sort of inherently against the point of parallel streams, so its interesting to hear about the state of the art and the benefits that are trying to be gleaned! I'm not thinking in stream processors terribly often so I wasn't aware of how dependencies are mapped.

If you don't mind another followup (and your patience with my ignorance hasn't run out :P), wouldn't the efficient concurrent consumption imply knowing the dependency graph before the events are processed? IE, is it possible in any instance to get to O(w+h) in a stream?


No problem. :)

Yes, order needs to be known.

So no, it’s not possible to do O(w+h) with streams partitioned by key. Unless, of course you use a supplementary index, but then you might as well not use the streams storage at all and store the records in the same storage as the index.

It’s worth noting that Pulsar does something like this (supplementary way to keep track of acknowledged messages), but their implementation has O(n^2) edge cases.


Do you have an example use case for this? This does seem like something unsuited to kafka, but I'm having a hard time imagining why you would structure something like this.


Great follow up question, thank you. I could talk about this "topic" for days, so I appreciate the opportunity to expand. :)

Let's imagine ourselves as a couple of engineers at Acme Foreign Exchange House. We'd like to track Acme's net cash position across multiple currencies, and execute trades accordingly (e.g., heding). And we'd like to retrospectively analyze our hedges, to assess their effectiveness.

Let's say I have this set of transactions (for accounts A, B, C, D, E, F, etc.)

A1 -> A2 -> A3 -> A4

B1 -> B2 -> B3 -> B4

C1-> C2

D1 -> D2 -> D3 -> D4

E1 -> E2

F1

Let's say that that:

- E1 was a deposit made into account E for $2M USD.

- E2 was an outgoing transfer of $2M USD sent to account F (incoming £1.7M GBP at F1).

If we consume our transactions and partiton our consumption by account id, we could get into a state where E1 and F1 are reflected in our net position, but E2 isn't. That is, our calculation has both $2M USD and £1.7M GBP, when in reality we only ever held either $2M USD or £1.7M GBP.

So what could we do?

1. Make sure that we respect causality order. I.e., there's no F1 reflected in our net position if we haven't processed E2.

2. Make sure that pairs of transactions (e.g., E2 and F1) update our net position atomically.

This is otherwise known as a "consistent cut" (see slide 25 here https://www.cs.cornell.edu/courses/cs6410/2011fa/lectures/19...).

Opinion: the world is causally ordered in arbitrary ways as above. But the tools, frameworks, and infrastructure more readily available to us struggle at modeling arbitrary partially ordered causality graphs. So we shrug our shoulders, and we learn to live with the edge cases. But it doesn't have to be so.


I suppose it depends on your message volume. To me, processing 100k messages and then getting a page however long later as the broker (or whatever) falls apart sounds much worse than head of line blocking and seeing the problem directly in my consumer. If I need to not do head of line blocking, I can build whatever failsafe mechanisms I need for the problematic data and defer to some other queueing system (typically, just add an attempt counter and replay the message to the same kafka topic and then if attempts > X, send it off to wherever)

I'd rather debug a worker problem than an infra scaling problem every day of the week and twice on Sundays.


It's interesting you say that, since this turned an infra scaling problem into a worker problem for us. Previously, we would get terrible head-of-line throughput issues, so we would use an egregious number of partitions to try to alleviate that. Lots of partitions is hard to manage since resizing topics is operationally tedious and it puts a lot of strain on brokers. But no matter how many partitions you have, the head-of-line still blocks. Even cases where certain keys had slightly slower throughput would clog up the whole partition with normal consumers.

The parallel consumer nearly entirely solved this problem. Only the most egregious cases where keys were ~3000 times slower than other keys would cause an issue, and then you could solve it by disabling that key for a while.


Yeah I'd say kafka is not a great technology if your median and 99ths (or 999ths if volume is large enough) are wildly different which sounds like your situation. I use kafka in contexts where 99ths going awry usually aren't key dependent so I don't have the issues you see.

I tend to prefer other queueing mechanisms in those cases, although I still work hard to make 99ths and medians align as it can still cause issues (especially for monitoring)


Follow on: If you're using kafka to publish messages to multiple consumers, this is even worse as now you're infecting every consumer with data processing issues from every other consumer. Bad juju


There's also a similar project from Line https://github.com/line/decaton.


> Furthermore, the architecture will fundamentally require a sorted index, meaning that any such a queuing / streaming system will process n messages in O (n log n).

Would using a sorted index have an impact on the measured servicing time of each message? (Not worst-case, something more like average-cass). It's made extremely clear in the Kafka docs that Kafka's relies heavily on the operating systems filesystem cache for performance, and that seeking through events on disk turns out to be very slow compared to just processing events in-order.


Let’s separate two advantages in the average case with Kafka.

1. Sequential IO when reading from disk.

2. Use of disk cache (instead of reading from disk) when re-reading recently read events.

#2 helps when you have many consumer groups reading from the tail. And this advantage would extend to index-based streaming.

But #1 would not fully extend to index-based streaming.

When does this matter? When adding a new consumer group you would lose the speed advantage of sequential IO, because it consumes from the beginning (which isn’t in disk cache).

BUT this has become less important now that SSDs are so prevalent and affordable. Additionally, in practice, the bottleneck isn’t in disk IO. Consumers tend to perform IO in other systems that incur O(log n) per insert. Or network cards can get saturated way before disk IO is the limiting factor.

I speculate that we got Kafka et al because we didn’t have such abundance of SSDs in the early 2010’s.

So, returning to your question, you wouldn’t notice the difference in the average case, as long as there are SSDs under the hood.


> streaming system will process n messages in O (n log n)

I'm guessing this is mostly around how backed up the stream is. n isn't the total number of messages but rather the current number of unacked messages.

Would a radix structure work better here? If you throw something like a UUID7 on the messages and store them in a radix structure you should be able to get O(n) performance here correct? Or am I not understanding the problem well.


I think the problem is that if you want quick access to all messages with a particular key then you have to maintain some kind of index over all persisted messages. So n would be total number of persisted messages as I read it, which can be quite large. But even storing them in the first place is O(n), so O(n log n) might not be so bad.


That's correct. And keep in mind that you might have new consumers starting from the beginning come into play, so you have to permanently retain the indexes.

And yes, O(n log n ) is not bad at all. Sorted database indexes (whether SQL, NoSQL, or AcmeVendorSQL, etc.) already take O(n log n) to insert n elements into data storage or to read n elements from data storage.


Thanks for posting the NPR source.

I read about the anesthesia case from VOX, and they claimed the opposite, namely that there would be no additional costs for the patients https://www.vox.com/policy/390031/anthem-blue-cross-blue-shi...


Many engineers don’t truly care about the correctness issue, until it’s too late. Similar to security.

Or they care but don’t bother checking whether what they’re doing is correct.

For example, in my field, where microservices/actors/processes pass messages between each other over a network, I dare say >95% of implementations I see have edge cases where messages might be lost or processed out of order.

But there isn’t an alignment of incentives that fixes this problem. Ie the payment structures for executives and engineers aren’t aligned with the best outcome for customers and shareholders.


> there isn’t an alignment of incentives that fixes this problem

"Microservices" itself is often a symptom of this problem.

Everyone and their dog wants to introduce a network boundary in between function calls for no good reason just so they can subsequently have endless busywork writing HTTP (or gRPC if you're lucky) servers, clients & JSON (de?)serializers for said function calls and try to reimplement things like distributed transactions across said network boundary and dealing with the inevitable "spooky action at a distance" that this will yield.


I've worked with microservices at scale and it was fantastic. We couldn't break backwards compatibility with our API without a lot of coordination. Outside of that, you could deploy as frequently as needed and other services could update as needed to make use of new features.

The monoliths I have worked in, very contrastingly, have had issues coordinating changes within the codebases, code crosses boundaries it should not and datastores get shared and coupled to (what should be) different domains leading to slow, inefficient code and ossified options for product changes.


If you're hand-writing clients/servers/serializers instead of generating them from schema definitions then you have more fundamental issues than using microservices.


We hand wrote clients at my last microservice based gig. It was marginally slower than automated clients and we did run into a few cases of teams "waisting" their time writing their own clients; that was fixed by the authoring service team also authoring clients. It wasn't a big issue


The path to fixing this requires first measuring and monitoring it, then establishing service level objectives that represent customer experience. Product and engineering teams have to agree on them. If the SLOs become violated, focus shifts towards system stability.

Getting everyone onboard is hard and that is why good leadership is needed. When customers start to churn because bugs pop up and new features are slow or non existent, then the case is very easy to make quality part of the process. Mature leaders get ahead of that as early as possible.


Good leadership is spot on! Agreed. The cynic part of me sees incentives that discourage mature leadership styles.

Leaders tend to be impatient and think of this quarter’s OKRs as opposed to the business’ long term financial health. In other word the leaders of leaders use standard MBA prescribed incentive structures.


> 95% of implementations I see have edge cases where messages might be lost or processed out of order.

Eek. This sort of thing can end up with innocent people in jail, or dead.

[0] https://en.wikipedia.org/wiki/British_Post_Office_scandal


The problem (or the solution, depending on which side you're on) is that innocent people are in jail or dead. The people that knowingly allowed this to happen are still free and wealthy.

So I'm not particularly sure this is a good example - if anything, it sets the opposite incentives, that even jailing people or driving them to suicide won't actually have any consequences for you.


I think there's a bit of an alignment of incentives: the edge cases are tricky enough that your programmers probably need to handle a lot of support tickets, which isn't good for anyone.

But I don't see anyway to convince yesterday's managers to give us time to build it right.


If anything, even if he’s aware, I’d say it’s pretty much in line with the spirit.

Albeit, I can’t speak for Tim O’Reilly, but just like I don’t expect to become a billionaire from writing an open source framework, I don’t go around crediting absolutely everything or tipping back. If I assume best intentions (in line with the spirit of HN), it doesn’t look like DHH is trying to make a claim to fame on the back of this phrase.


I can’t help but read this under the lens of my own experiences. For me open source is really about capturing less than I create, much like it’s written in the article. Heck, I’m a founder and we haven’t open sourced our product because we don’t want to do a “pretend” open source like others do, where it’s really just a marketing tool for enterprise.

It does help that I’ve always been able to make ends meet, so that keeps my idealistic vision of open source alive.

In any case, this was an excellent response to https://ma.tt/2024/10/on-dhh/


Hey Denis, I haven’t run into you before. But hi, this is Luis, Ambar’s founder. Nice to meet a fellow data streamer.

When I started writing Ambar I thought streaming from a database was a solved problem. But in operational use cases where ordering and delivery guarantees are assumptions developers need, it isn’t a solved problem. The first version of Ambar was just Debezium under the hood, but guess what, it failed and failed hard. Like you described Kafka. Hence we built Ambar :)

FYI we’ve considered using Redpanda under the hood instead of Kafka, but didn’t dare make the jump yet.


Ah okay, so is Ambar more of a way to finally replace Debezium then?


Yes, for operational use cases. Eg event driven microservices communication. Keeping in mind we replace the sink as well, which allows us to do cool things such as https://ambar.cloud/blog/optimal-consumption-with-adaptive-l...

For analytics (eg copy your PG database to Snowflake), Debezium is still relevant.


We use Kafka under the hood. We stream instead of poll. We used to work at the Kafka team in AWS :)

Our thesis is that a big blocker is the PhD the whole team needs in Kafka. For example, if you want to set up an API similar to Ambar with tools like Kafka connect, the connectors have failure modes that will bite you once a year and bite you hard. Eg losing your changelog in MySQL, and having to start from scratch or risk losing ordering guarantees.


Disclosure: this post is from my colleague.

A: You have managed vendors that simplify Kafka, such as MSK, Confluent, and Redpanda. And other software like Pulsar.

But we believe the solution to the time sink exposed in the article lies one level of abstraction higher. In the case of data analytics, there are tools/companies such as Decodable/Streamkap/Airbyte that simplify your life as an engineer.

In the case of operational streaming, we (Ambar) are making a bet on the tried and tested outbox/inbox pattern as a replacement for producing directly into Kafka et al, and thus managing all of its quirks and complexities. That’s the alternative we offer, but of course there are other folks in this space.

Admittedly, we didn’t dive deep into alternatives in the post. But we did explain at the end that we’ll cover it in another post. I’ll add a link at the bottom later pointing to some alternatives. :)

Thanks for reading!


Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: