Intro
Why need multiple machines
- Scalability → If your data volume, read load, or write load grows bigger than a single machine can handle, you can potentially spread the load across multiple machines.
- Fault-tolerance/high availability → If your application needs to continue working even if one machine goes down, you can use multiple machines to give you redundancy.
- Latency → If you have users around the world, you might want to have servers at various locations worldwide so that each user can be served from a datacenter that is geographically close to them.
Scaling
Vertical Scaling
Vertical Scaling or Scaling up means upgrade to a more powerful machine.
There are two strategies:
- shared-memory → many CPUs use same memory and disk
- shared-disk → many CPUs and memories use same disk
But they have several problems:
- The cost grows faster than linearly, add double hardware doesn’t mean double performance
- For shared-memory, it’s limited to a single location
- For shared-disk, the overhead of locking limit the scalability
Horizontal scaling
Horizontal Scaling or Scaling out treat every machine as node, each node uses its CPUs, RAM, and disks independently. Any coordination between nodes is done at the software level, using a conventional network.
Because every node is independent, so its strategy calls shared-nothing. It usually incurs additional complexity for applications and sometimes limits the expressiveness of the data models you can use.
Replication Vs Partitioning

Replication → Keeping a copy of the same data on several different nodes, potentially in different locations. Replication provides redundancy.
Partitioning → Splitting a big database into smaller subsets called partitions so that different partitions can be assigned to different nodes (also known as sharding).
Leader & Follower
Each node that stores a copy of the database is called a replica. Every write to the database needs to be processed by every replica; otherwise, the replicas would no longer contain the same data. The most common solution for this is called leader-follower/master-slave replication.

- One of the replicas is designated the leader (also known as master). When clients want to write to the database, they must send their requests to the leader.
- The other replicas are known as followers (slaves or hot standbys). Whenever the leader get writes request, it also sends the data change to all of its followers as part of a replication log or change stream. Each follower takes the log from the leader and updates its local copy of the database accordingly, by applying all writes in the same order as they were processed on the leader.
When a client wants to read from the database, it can query either the leader or any of the followers. However, writes are only accepted on the leader.
Synchronous vs Asynchronous

- Sync → leader waits until follower has confirmed that it received the write before reporting success to the user.
- Async → leader sends the message, but doesn’t wait for a response from the follower.
The differences between sync and async are:
- sync sacrifices high availability to achieve strict consistency
- async sacrifices strict consistency to achieve high availability
Here are three configurations for different availability and consistency requirements:
- fully sync → leader finishes its write after all followers ack
- semi sync → leader finishes its write after some of followers ack
- async → leader finishes its write immediately, no need to wait follower ack
Setting Up New Followers
From time to time, you need to set up new followers—perhaps to increase the number of replicas, or to replace failed nodes.
We cannot directly copy all current data files from other nodes because:
- client will constantly write new data or update old data, if we only copy current data, it’s inconsistent
- if we block client’s write request, we can make all data consistent, but lose high availability
The correct apporach is:
- Take a consistent snapshot of the leader’s database at some point in time.
- Copy the snapshot to the new follower node.
- The follower connects to the leader and requests all the data changes that have happened since the snapshot was taken. The position of snapshot is sometimes called log sequence number or binlog coordinates.
- When the follower has processed the backlog of data changes since the snapshot, we say it has caught up. It can now continue to process data changes from the leader as they happen.
Handling Node Outage
Follower: catch-up recovery
Each follower keeps a log of the data changes it has received from the leader.
The follower can recover quite easily: from its log, it knows the last transaction that was processed before the fault occurred. Thus, the follower can connect to the leader and request all the data changes that occurred during the time when the follower was disconnected.
Leader: failover
Failover
Determining that the leader has failed. The most common Failure detection algorithm uses timeout.
Choosing a new leader. Leader election algorithm and Consensus algorithm: the leader is chosen by a majority of the remaining replicas and is usually the replica with the most up-to-date data changes from the old leader.
Reconfiguring the system to use the new leader. Clients now need to send their write requests to the new leader. The system needs to ensure that the old leader becomes a follower and recognizes the new leader.
Problems
If asynchronous replication is used, the new leader may not have received all the writes from the old leader before it failed. If the former leader rejoins the cluster after a new leader has been chosen, the new leader may have received conflicting writes in the meantime. The most common solution is for the old leader’s unreplicated writes to simply be discarded.
Discarding writes is especially dangerous if other storage systems outside of the database need to be coordinated with the database contents.
For example current system needs to use Redis as cache, and the old leader wrote some primary key into Redis, then it crashed. The new leader also store its auto incremental primary key into Redis, but because its data is not up-to-date, so there may have some primary key already in Redis.
In certain fault scenarios, it could happen that two nodes both believe that they are the leader.
How to set a reasonable timeout value.
Implementation of Replication Logs
Statement-based replication
Leader records the SQL statements and send them to followers. The leader just acts like a client to there followers.
For example, the log may has record
SELECT * FROM table WHERE conditionsINSERT properties INTO table VALUES(values)
The disadvantages:
Nondeterministic → functions such as
NOW()andRAND()are likely to generate a different value on each replica.Execution Order → if they use autoincrementing column, or if they depend on the existing data in the database, they must be executed in exactly the same order on each replica, or else they may have a different effect.
Side effects → like triggers, stored procedures, user-defined functions may result in different side effects occurring on each replica, unless the side effects are absolutely deterministic.
Write-Ahead Log (WAL)
WAL is used for recovery:
In the case of a log-structured storage engine, every modification is first written to a write-ahead log so that the memtable can be recovered even the crash happens.
In the case of a B-tree, which overwrites individual disk blocks, every modification is first written to a write-ahead log so that the index can be restored to a consistent state after a crash.
It’s append-only sequence of bytes. The disadvantage is:
- WAL contains details of which bytes were changed in which disk blocks. This makes replication closely coupled to the storage engine.
If the storage engine changes or is incompatible, WAL may cannot allocate the data to certain position.
Logical Log (binlog in MySQL)
A log should be decoupled from the storage engine.
A logical log for a relational database is usually a sequence of records describing writes to database tables at the granularity of a row:
- For an inserted row, the log contains the new values of all columns.
- For a deleted row, the log contains enough information to uniquely identify the row that was deleted. (tombstone)
- For an updated row, the log contains enough information to uniquely identify the updated row, and the new values of all columns.
- For transaction, the log will append a flag to inform the commit of the transaction.
Problems with Replication Lag
If we have multiple followers, we can get:
- availablity → tolerate some faults in other machine
- scalability → read can be distributed so we can deal with more requests
- low-latency → request can choose a fast path
But the question is: how to make all replics look the same.
If we choose sync replication, all problems can be solved except that if some replics crash, the service is blocked to wait for replics recovery, which may cause user complain and is unacceptable.
So the only way is async replication. But it may has a problem: some replics may fall behind other replics.
This leads to apparent inconsistencies in the database: if you run the same query on the leader and a follower at the same time, you may get different results, because not all writes have been reflected in the follower.
Good news is this inconsistency is just a temporary state—if you stop writing to the database and wait a while, the followers will eventually catch up and become consistent with the leader. For that reason, this effect is known as eventual consistency.
Reading Your Own Writes

In this situation, we need read-after-write consistency. This is a guarantee that if the user reloads the page, they will always see any updates they submitted themselves. It makes no promises about other users: other users’ updates may not be visible until some later time.
Methods
- When reading something that the user may have modified, read it from the leader; otherwise, read it from a follower.
- Track the time of the last update and, if new requests within certain time threshold, make all reads from the leader.
- The client can remember the timestamp of its most recent write—then the system can ensure that the replica serving any reads for that user reflects updates at least until that timestamp.
Problems
- Timestamp is very hard to sync for one logic user has multiple physical device.
- Replicas are distributed across different datacenters, there is no guarantee that connections from different devices will be routed to the same datacenter.s
Monotonic Reads

Monotonic reads is a guarantee that is a lesser guarantee than strong consistency, but a stronger guarantee than eventual consistency. When you read data, you may see an old value; monotonic reads only means that if one user makes several reads in sequence, they will not see time go backward.
For example, the old data is 1 → 2, and right now the newest data is 3, when we send the first read, it may return “2”, then I send several reads, it makes sure that the responses look like “2, 2, 2, …, 3, 3, 3”. “1” will never appear and if we see “3”, “2” will never appear.
Methods
- each user always makes their reads from the same replica (different users can read from different replicas).
- timestamp
Difference with “Reading your own writes”
“Reading your own writes” guarantees the read order after write, “Monotonic reads” guarantees multiple reads order.
Consistent Prefix Reads (Causal)

The reason for inconsistent prefix is partition. The order inside one partition is easy to maintain, but inter partition is hard.
Methods
- no partition.
- route all causal requests to same partition, but how to detect several requests are causal is hard.
Solutions for Replication Lag
Transaction!!! We will cover it later.
Multi-leaders
Leader-based replication has one major downside: there is only one leader, and all writes must go through it. So the extension for it is using multiple leaders.

Single leader vs Multiple leaders
| single leader | multiple leaders | |
|---|---|---|
| Performance | Every write must go over the internet to the datacenter with the leader. This can add significant latency to writes. | Every write can be processed in the local datacenter and is replicated asynchronously to the other datacenters. |
| Tolerance of datacenter outages | If the datacenter with the leader fails, failover can promote a follower in another datacenter to be leader, the whole system needs to wait until new leader starts. | Each datacenter can continue operating independently of the others, the failed datacenter can select new leader by their own. |
| Tolerance of network problems | It is very sensitive to public internet (inter-datacenter link). | A multi-leader configuration with asynchronous replication can usually tolerate network problems better: a temporary network interruption does not prevent writes being processed. |
Use case
- Clients with offline operation - Consider the calendar apps on your mobile phone, your laptop, and other devices. Every device has a local database that acts as a leader (it accepts write requests), and there is an asynchronous multi-leader replication process (sync) between the replicas of your calendar on all of your devices.
- Database spanning multiple data centers.
- Collaborative editing - Like Google Doc.
Downside
But multi-leaders has a big problem: the same data may be concurrently modified in two different datacenters, and those write conflicts must be resolved. So multi-leader is not a universal choice for all situations.
Handling Write Conflicts

Conflict Detection
In a single-leader database:
- the second writer will either block and wait for the first write to complete
- abort the second write transaction, forcing the user to retry the write
In a multi-leader setup, both writes are successful, and the conflict is only detected asynchronously at some later point in time. At that time, it may be too late to ask the user to resolve the conflict.
In principle, you could make the conflict detection synchronous—i.e., wait for the write to be replicated to all replicas before telling the user that the write was successful. However, by doing so, you would lose the main advantage of multi-leader replication: allowing each replica to accept writes independently. If you want synchronous conflict detection, you might as well just use single-leader replication.
Conflict Avoidance
Handle conflicts → Avoid conflicts.
The core is: if the application can ensure that all writes for a particular record go through the same leader, then conflicts cannot occur.
For example, in an application where a user can edit their own data, you can ensure that requests from a particular user are always routed to the same datacenter and use the leader in that datacenter for reading and writing. Different users may have different “home” datacenters, but from any one user’s point of view the configuration is essentially single-leader.
The only problem is: if the user changes the datacenter, e.g. move from new york to california or origin datacenter is down, the conflict occurs.
Conflict Converging (收敛)
A single-leader database applies writes in a sequential order: if there are several updates to the same field, the last write determines the final value of the field. But for multi-leaders, every datacenter may has its own order, it hard to define a universal order.
Convergent → means that all replicas must arrive at the same final value when all changes have been replicated.
- Give each write a unique ID (e.g., a timestamp, a long random number, a UUID, or a hash of the key and value), pick the write with the highest ID as the winner. (if a timestamp is used, this technique is known as last write wins (LWW))
- Give each replica a unique ID, and let writes that originated at a highernumbered replica always take precedence over writes that originated at a lowernumbered replica.
- Merge the values together.
- Record the conflict information and try to solve it later.
Custom Conflict Resolution
On write
As soon as the database system detects a conflict in the log of replicated changes, it calls the conflict handler (callback function). This handler typically cannot prompt a user—it runs in a background process and it must execute quickly.
On read
When a conflict is detected, all the conflicting writes are stored. The next time the data is read, these multiple versions of the data are returned to the application. The application may prompt the user or automatically resolve the conflict, and write the result back to the database.
Multi-leaders Replication Topologies

| Circular | Star / Tree | All-to-All | |
|---|---|---|---|
| Advantage | Total amount of replication messages is the same as amount of nodes, which is small. | Fault tolerance | |
| Disadvantage | If just one node fails, it can interrupt the flow of replication messages between other nodes, causing them to be unable to communicate until the node is fixed. | The same as circular. | 1. Total amount of replication messages is large 2. Because it doesn’t have order (no prior node or next node), the variation of latency of network links may cause causality problem. |
For circular & star, to aviod broadcast flooding, each node has a unique identifier, and in the replication log, each write needs to record all nodes it passed through.
If the latency of network links are different, the causality problem may occur.
Because the update depends on the prior insert, so we need to make sure that all nodes process the insert first, and then the update. Simply attaching a timestamp to every write is not sufficient, because clocks sync is difficult. To order these events correctly, a technique called version vectors or vector clock, which is a logical clock. We will talk about that in next chapter.
Leaderless
In leaderless implementations:
- the client directly sends its writes to several replicas.
- a coordinator node does this on behalf of the client. However, unlike a leader database, that coordinator does not enforce a particular ordering of writes.
Writing to the Database When a Node Is Down
In a leaderless configuration, failover does not exist because it doesn’t have leader :).
The client sends the write to all replicas (let’s say n replicas) in parallel, and x available replicas accept the write but the n-x unavailable replicas miss it. We have a threshold h that if at least h replicas reply ok, we think the write is finished and we ignore the remaining n-h replicas. According to following figure, we can think x=2, n=3, h=2.

When a client reads from the database, it doesn’t just send its request to one replica: read requests are also sent to several nodes in parallel. The client may get different responses from different nodes; i.e., the up-to-date value from one node and a stale value from another. Version numbers are used to determine which value is newer.
Read repair and anti-entropy (AE)
The replication scheme should ensure that eventually all the data is copied to every replica. After an unavailable node comes back online, how does it catch up on the writes that it missed?
Read repair
When a client makes a read from several nodes in parallel, it can detect any stale responses. When the client sees certain replica has a stale value, it writes the newer value back to that replica. This approach works well for values that are frequently read.Anti-entropy
In addition, some datastores have a background process that constantly looks for differences in the data between replicas and copies any missing data from one replica to another. Unlike the replication log in leader-based replication, this anti-entropy process does not copy writes in any particular order, and there may be a significant delay before data is copied.In more basic terms, the AE service identifies missing or inconsistent shards and repairs them (it’s a DIFF process).
- AE can only perform its heroism when there is at least one copy of the shard still available.
- AE will not compare or repair hot shards, meaning that the shard can’t have active writes. Hot shards are more prone to change, and at any given moment, arrival of new data affects AE’s digest comparison.
Quorums for reading and writing
If there are n replicas, every write must be confirmed by w nodes to be considered successful, and we must query at least r nodes for each read. As long as w + r > n, we expect to get an up-to-date value when reading, because at least one of the r nodes we’re reading from must be up to date. Reads and writes that obey these r and w values are called quorum reads and writes.
Normally, reads and writes are always sent to all n replicas in parallel. The parameters w and r determine how many nodes we wait for.

What’s more, w and r are configurable, which means if the system is write heavy, we can set smaller w to reduce replication complexity.
Limitations of Quorum Consistency
Even w + r > n, sometimes READ still gets stale values:
- If a sloppy quorum is used, the
nwill change, which break thew + r > nrule. - If two writes occur concurrently, it is not clear which one happened first. If we choose use timestamp to pick the winner, the clock skew will make replicas inconsistent.
in leader-based strategy, leader will decide which is winner then send it to all replicas, but leaderless means every replica can has its own thought.
- If a write happens concurrently with a read, the write may be reflected on only some of the replicas.
- If a write succeeded on some replicas but failed on others and overall succeeded on fewer than w replicas, it is not rolled back on the replicas where it succeeded.
- If a node carrying a new value fails, and its data is restored from a replica carrying an old value, the number of replicas storing the new value may fall below w, breaking the quorum condition.
Monitoring staleness
Even if your application can tolerate stale reads, you need to be aware of the health of your replication. If it falls behind significantly, it should alert you so that you can investigate the cause.
For leader-based replication, the database typically exposes metrics for the replication lag, which you can feed into a monitoring system. This is possible because writes are applied to the leader and to followers in the same order, and each node has a position in the replication log (the number of writes it has applied locally). By subtracting a follower’s current position from the leader’s current position, you can measure the amount of replication lag.
For leaderless replication, there is no fixed order in which writes are applied, which makes monitoring more difficult. Moreover, if the database only uses read repair (no anti-entropy), there is no limit to how old a value might be — if a value is only infrequently read, the value returned by a stale replica may be ancient.
For leaderless, we can write only partial replicas, so the write log in one replica cannot represent the actual write order for the whole system.
Sloppy Quorums and Hinted Handoff
A network interruption can easily cut off a client from a large number of database nodes. Although those nodes are alive, and other clients may be able to connect to them, to a client that is cut off from the database nodes, they might as well be dead. In this situation, it’s likely that fewer than w or r reachable nodes remain, so the client can no longer reach a quorum.
There are two choices:
- let all write and read fail
- accept write for now, but write it to some nodes that are reachable but aren’t among the original
nquorum
The second choice is called sloppy quorum.
For example originally we choice 10 nodes from [1, …, 10], and if some machines are not reachable ([1, 2, 3, 4]), we write to [5, …, 10, 11*, 12*, 13*, 14*].
Sloppy quorums are particularly useful for increasing write availability: as long as any w nodes are available, the database can accept writes. However, this means that even when w + r > n, you cannot be sure to read the latest value for a key, because the latest value may have been temporarily written to some nodes outside of n
Once the network interruption is fixed, any writes that one node temporarily accepted on behalf of another node are sent to the appropriate “home” nodes. This is called hinted handoff.
For example, when 1, 2, 3, 4 become available, the 11*, 12*, 13*, 14* will quit the quorum and the system still use [1, …, 10] as quorum. But the problem is the [1, …, 4] don’t have up-to-date values. In some systems, 11*-14* may send temporary stored updates back to 1-4 to make the original quorum works like nothing happend.
Multi-datacenter operation
The number of replicas n includes nodes in all datacenters, and in the configuration you can specify how many of the n replicas you want to have in each datacenter. Each write from a client is sent to all replicas, regardless of datacenter, but the client usually only waits for acknowledgment from a quorum of nodes within its local datacenter so that it is unaffected by delays and interruptions on the cross-datacenter link.
Or keeps all communication between clients and database nodes local to one datacenter, so n describes the number of replicas within one datacenter. Cross-datacenter replication between database clusters happens asynchronously in the background, in a style that is similar to multi-leader replication.
Detecting Concurrent Writes
The problem is that events may arrive in a different order at different nodes, due to variable network delays and partial failures. In order to become eventually consistent, the replicas should converge toward the same value.
Last Write Wins (LWW)
Even though the writes don’t have a natural ordering, we can force an arbitrary order on them. For example, we can attach a timestamp to each write, pick the biggest timestamp as the most “recent,” and discard any writes with an earlier timestamp. This conflict resolution algorithm, called last write wins (LWW).
LWW achieves the goal of eventual convergence, but at the cost of durability: if there are several concurrent writes to the same key, even if they were all reported as successful to the client, only one of the writes will survive and the others will be silently discarded. Moreover, LWW may even drop writes that are not concurrent.
If losing data is not acceptable, LWW is a poor choice for conflict resolution.
Causality (因果关系) vs Concurrency
An operation A happens before (causality) another operation B if B knows about A, or depends on A, or builds upon A in some way. So two operations are concurrent if neither happens before the other.
If one operation happened before another, the later operation should overwrite the earlier operation, but if the operations are concurrent, we have a conflict that needs to be resolved.
For defining concurrency, exact time doesn’t matter: we simply call two operations concurrent if they are both unaware of each other, regardless of the physical time at which they occurred, because in distributed systems, clock skew cannot be aviod.
Capture the causality relationship


The algorithm is:
- The server maintains a version number Vi for every key, increments the version number Vi → Vi+1 every time that key is written, and stores the new version number along with the value written.
- When a client reads a key, the server returns all values that have not been overwritten, as well as the latest version number.
- When a client writes a key, it must include the version number Vx from the prior read, and it must merge together all values that it received in the prior read.
- When the server receives a write with a particular version number Vx, it can overwrite all values with that version number or below V ≤ Vx (since it knows that they have been merged into the new value), but it must keep all values with a higher version number (because those values are concurrent with the incoming write).
- If a client writes a key without any version number, add it into current dataset.
Merging concurrently written values
This algorithm ensures that no data is silently dropped, but it unfortunately requires that the clients do some extra work: if several operations happen concurrently, clients have to clean up afterward by merging the concurrently written values. A simple approach is to just pick one of the values based on a version number or timestamp (last write wins), but that implies losing data.
When delete data, we cannot directly delete it, we need to set a tombstone to mark it as unavailable.
Version vector
Use a version number per replica as well as per key. Each replica increments its own version number when processing a write, and also keeps track of the version numbers it has seen from each of the other replicas. This information indicates which values to overwrite and which values to keep as siblings. The version vector allows the database to distinguish between overwrites and concurrent writes.


