Distributing consistency

Andras Gerlits
7 min readFeb 1, 2021

In my previous life, I worked for a number of different multi-national banks as a freelancer. I had the opportunity to see their everyday operations and was tasked with providing solutions within their existing frameworks. These frameworks were built around enterprise software stacks like Java containers, SQL databases and messaging frameworks, often with some “secret sauce” framework, usually cooked up in-house. What was common in each of them was that we had a fairly easy time deploying new software and had very mature workflows around it. This was mostly before Kubernetes, and even before Docker went mainstream at these places (which are admittedly very conservative), but this was still never really a problem.

Our issues were largely centered around data.

We had a number of problems with it, firstly that our databases were single-server solutions. This meant that replicating new data without lagging behind the main computer was a challenge. This requirement was not only based on common sense, but was also set in stone in the disaster recovery protocols mandated by the regulatory bodies. We had to juggle this requirement and network latency caused by geographical distribution with the need for throughput and correctness. We regularly needed to synchronise information between our London, Hong Kong, New York or Tokyo servers. Since there were no communication protocols which allowed for reliable, on-line synchronisation between them, we made do with what we had and mostly ran overnight batches. I was woken up by support to fix such batches encountering errors more times than I care to remember. These problems were even worse if one system needed information from the other during the day. In these cases, we specified specialised communication-protocols between them and monitored their execution, so if (we recognised that) something went wrong, we could intervene as soon as possible. This happened a lot. In fact, most of the support-issues I was involved with had something to do with some data missing or being malformed somewhere, which was more often that not the result of some hardware failure or miscommunication between different systems.

If someone non-technical asked us if something could be done about such issues, we usually gave a knowing smile and pointed at the Wikipedia article explaining CAP theorem and the inherent limitations in distributed systems.

I always had a niggling feeling that there must be a better solution than what we had, but it was just that, a source of annoyance. So, when I left the industry, I decided to try creating a new solution, one which can provide the sort of guarantees we would have needed.

The system would have to be:

  • resilient enough to withstand the loss of some computers
  • provide delivery guarantees between different computers
  • allow for global ordering for any two information
  • work across large geographical distances
  • not be bottlenecked locally by these constraints

A white piece of paper

I started out by looking at the technology available (in 2015). By that time, we already built a lot of things on event sourcing. I was fascinated by the idea of managing data using a single, sequenced array of events and how much simpler it was to stream those events to remote servers as opposed to communicating the end-results of what a process wrote into the database. The problem was that there is no obvious way to establish a single, global order between events in multiple such sequences of events. You either had a single one, in which case this would either only work locally and be a bottleneck both when communicating with remote servers and when scaling services, or be distributed, in which case we are left with what is called “eventual consistency”, which is a technical term for saying “your data will be correct sometime in the near future, but we don’t know exactly when”.

“A man with a watch knows what time it is. A man with two watches is never sure.” (Segal’s law)

The first idea was that if

  • I had timestamps, to which I could compare other such timestamps, and
  • If each write-process wrote its information using a single such timestamp from the future and
  • If each read-process knew what the current timestamps is, and hence symbolize reliable information

Then I could filter out the information which was entered with a timestamp that was higher than the reliable timestamp assigned to the read-process. If we did that, we would never give the client data which was half-written by another transaction, since the timestamp the reading process received was in the past, and since the write-process must have already finished before the reading process has even started.

Bearing in mind the requirements listed earlier, this sounds like science-fiction. Conventional wisdom holds that such a global clock (or ordering) requires a single source, or very tightly synchronised clocks, which is notoriously hard to do over long distances. In any case, this would bottleneck performance as already mentioned. I had to find another way.

A distributed, global clock

An atomic operation is something which is processed by a computer in a single step. The only atomic operation in an event sourcing based system is the processing of a single event. Therefore, I started to use these to mark atomic events on each node. I established a circular progression of these events and established a value which grows monotonically within a single set of nodes.

This was a start, but that still only allowed me to scale our “clock” to a single group of such nodes. To grow further, I created hierarchies of such sets, where associating these clocks with each other is also defined as an atomic event. This allowed me to expose values within different groups using a different “granularity” of time, so that different sets(which would be further away) would only be exposed to information residing on more distant computers in larger chunks.

This allowed me to establish a single timestamp, which moves quickly between computers close to each other and slowly between the ones farther apart and to fulfill all the requirements listed earlier. This removed a large obstacle to establishing a way for distant computers to speak with each other reliably. It also meant that we didn’t have to rely on external factors, such as the time held by the computer, when we establish the information in the system and so replication, data recovery and even debugging become orders of magnitude easier.

For more details on this, take a look at the technical article discussing these clocks, or the whitepaper, which explains the whole concept in full detail.

Overlapping transactions

The last major problem I was left with were the overlapping write processes. In plain English, I needed to make sure that processes which are trying to modify the same information at the same time will somehow be detected and managed. In other words, how do I make sure that two users cannot both think they reserved the same airplane-seat or bought the last lawnmower on sale and that the system knows everywhere who won and who lost? I also needed to conserve the idea that a single node doesn’t need anything but its events to work the same way, so that meant no clocks, no randomness and no relying on any other external factors.

I started by looking at the problem as a set of interconnected points. Each point is a write-process and each such process can be in a conflicting state with a number of others. This is a reflexive relationship, so if process A overlaps B, than B also overlaps A (since they are trying to update the same information). I can solve the problem by making sure that between A and B, only one is be allowed to finish, the other is cancelled. In that case

  • B’s data is deleted, the client API is notified that it either has to retry or notify the client and
  • A can finalise its information, so that subsequent read processes can find its data.

For this, I devised a simple algorithm, which made sure that between any number of overlapping writing processes, (at most) one will be finalised and all the others will be rolled back. This algorithm is explained in the technical article and detailed in the whitepaper.

Where this leads us

Let’s revisit our original requirements.

resilient enough to withstand the loss of some computers

Each node in the system is made up of a number of different physical computers. They can be replicated locally by employing a consensus group, which is rarely problematic over short distances. In fact, that’s exactly how these logs are usually maintained now.

provide delivery guarantees between different computers

We provide atomic transactions, even over large distances and -unlike current systems- this doesn’t slow down the other processes or generally have a negative impact on performance.

allow for global ordering for any two information

Our clock can provide an atomic value for each set of nodes and any two nodes in the system will agree on whether the information represented by it comes before or after another one.

work across large geographical distances

Our transactions do not need to wait for the others to finish (except in rare, corner-cases), so nothing stops the cluster from growing to an arbitrary size and across arbitrary distances.

not be bottlenecked locally by these constraints

Even if databases are distributed globally, a process which only touches local nodes will not have to communicate with distant computers.

Conclusion

We’re not saying that this system is perfect or that there are no special considerations to be made when using it, especially across large distances. What we are saying is that it’s much simpler to use and maintain when compared with other distributed systems, that it’s capable of providing the sort of guarantees which would have made our lives much easier and that this technology can open the doors to building global-scale databases.

We’ve built this system, so it’s not just a thought-experiment. We also implemented the TPC-C benchmark, so we know it’s quick even when compared to others. If you’d like to know more about it, you can reach out to us at info@dianemodb.com

If you like, you can take a closer look at how we manage failovers, replication and availability

Or look at the article examining who would benefit from such a solution.

--

--

Andras Gerlits

Writing about distributed consistency. Also founded a company called omniledger.io that helps others with distributed consistency.