Distributing consistency

In my previous life, I worked for a number of different multi-national banks as a freelancer. I had the opportunity to see their everyday operations and was tasked with providing solutions within their existing frameworks. These frameworks were built around enterprise software stacks like Java containers, SQL databases and messaging frameworks, often with some “secret sauce” framework, usually cooked up in-house. What was common in each of them was that we had a fairly easy time deploying new software and had very mature workflows around it. This was mostly before Kubernetes, and even before Docker went mainstream at these places (which are admittedly very conservative), but this was still never really a problem.

Our issues were largely centered around data.

We had a number of problems with it, firstly that our databases were single-server solutions. This meant that replicating new data without lagging behind the main computer was a challenge. This requirement was not only based on common sense, but was also set in stone in the disaster recovery protocols mandated by the regulatory bodies. We had to juggle this requirement and network latency caused by geographical distribution with the need for throughput and correctness. We regularly needed to synchronise information between our London, Hong Kong, New York or Tokyo servers. Since there were no communication protocols which allowed for reliable, on-line synchronisation between them, we made do with what we had and mostly ran overnight batches. I was woken up by support to fix such batches encountering errors more times than I care to remember. These problems were even worse if one system needed information from the other during the day. In these cases, we specified specialised communication-protocols between them and monitored their execution, so if (we recognised that) something went wrong, we could intervene as soon as possible. This happened a lot. In fact, most of the support-issues I was involved with had something to do with some data missing or being malformed somewhere, which was more often that not the result of some hardware failure or miscommunication between different systems.

If someone non-technical asked us if something could be done about such issues, we usually gave a knowing smile and pointed at the Wikipedia article explaining CAP theorem and the inherent limitations in distributed systems.

I always had a niggling feeling that there must be a better solution than what we had, but it was just that, a source of annoyance. So, when I left the industry, I decided to try creating a new solution, one which can provide the sort of guarantees we would have needed.

The system would have to be:

  • resilient enough to withstand the loss of some computers
  • provide delivery guarantees between different computers
  • allow for global ordering for any two information
  • work across large geographical distances
  • not be bottlenecked locally by these constraints

A white piece of paper

“A man with a watch knows what time it is. A man with two watches is never sure.” (Segal’s law)

  • I had timestamps, to which I could compare other such timestamps, and
  • If each write-process wrote its information using a single such timestamp from the future and
  • If each read-process knew what the current timestamps is, and hence symbolize reliable information

Then I could filter out the information which was entered with a timestamp that was higher than the reliable timestamp assigned to the read-process. If we did that, we would never give the client data which was half-written by another transaction, since the timestamp the reading process received was in the past, and since the write-process must have already finished before the reading process has even started.

Bearing in mind the requirements listed earlier, this sounds like science-fiction. Conventional wisdom holds that such a global clock (or ordering) requires a single source, or very tightly synchronised clocks, which is notoriously hard to do over long distances. In any case, this would bottleneck performance as already mentioned. I had to find another way.

A distributed, global clock

This was a start, but that still only allowed me to scale our “clock” to a single group of such nodes. To grow further, I created hierarchies of such sets, where associating these clocks with each other is also defined as an atomic event. This allowed me to expose values within different groups using a different “granularity” of time, so that different sets(which would be further away) would only be exposed to information residing on more distant computers in larger chunks.

This allowed me to establish a single timestamp, which moves quickly between computers close to each other and slowly between the ones farther apart and to fulfill all the requirements listed earlier. This removed a large obstacle to establishing a way for distant computers to speak with each other reliably. It also meant that we didn’t have to rely on external factors, such as the time held by the computer, when we establish the information in the system and so replication, data recovery and even debugging become orders of magnitude easier.

For more details on this, take a look at the technical article discussing these clocks, or the whitepaper, which explains the whole concept in full detail.

Overlapping transactions

I started by looking at the problem as a set of interconnected points. Each point is a write-process and each such process can be in a conflicting state with a number of others. This is a reflexive relationship, so if process A overlaps B, than B also overlaps A (since they are trying to update the same information). I can solve the problem by making sure that between A and B, only one is be allowed to finish, the other is cancelled. In that case

  • B’s data is deleted, the client API is notified that it either has to retry or notify the client and
  • A can finalise its information, so that subsequent read processes can find its data.

For this, I devised a simple algorithm, which made sure that between any number of overlapping writing processes, (at most) one will be finalised and all the others will be rolled back. This algorithm is explained in the technical article and detailed in the whitepaper.

Where this leads us

resilient enough to withstand the loss of some computers

Each node in the system is made up of a number of different physical computers. They can be replicated locally by employing a consensus group, which is rarely problematic over short distances. In fact, that’s exactly how these logs are usually maintained now.

provide delivery guarantees between different computers

We provide atomic transactions, even over large distances and -unlike current systems- this doesn’t slow down the other processes or generally have a negative impact on performance.

allow for global ordering for any two information

Our clock can provide an atomic value for each set of nodes and any two nodes in the system will agree on whether the information represented by it comes before or after another one.

work across large geographical distances

Our transactions do not need to wait for the others to finish (except in rare, corner-cases), so nothing stops the cluster from growing to an arbitrary size and across arbitrary distances.

not be bottlenecked locally by these constraints

Even if databases are distributed globally, a process which only touches local nodes will not have to communicate with distant computers.

Conclusion

We’ve built this system, so it’s not just a thought-experiment. We also implemented the TPC-C benchmark, so we know it’s quick even when compared to others. If you’d like to know more about it, you can reach out to us at info@dianemodb.com

If you like, you can take a closer look at how we manage failovers, replication and availability

Or look at the article examining who would benefit from such a solution.

Neither distance nor latency nor badly synced clocks stays these transactions from the swift completion of their appointed writes!