Cluster nodes maintain a Write-Ahead Log. Each log entry stores
the state required for consensus along with the user request.
They coordinate to build consensus over log entries,
so that all cluster nodes have exactly the same Write-Ahead log.
The requests are then executed sequentially following the log.
Because all cluster nodes agree on each log entry, they execute the same
requests in the same order. This ensures that all the cluster nodes
share the same state.
Executing two phases for each state change request is not efficient.
So cluster nodes select a leader at startup.
The leader election phase establishes the Generation Clock
and detects all log entries in the previous Quorum.
(The entries the previous leader might have copied to the majority of
the cluster nodes.)
Once there is a stable leader, only the leader co-ordinates the replication.
Clients communicate with the leader.
The leader adds each request to the log and makes sure that it’s replicated
on all the followers. Consensus is reached once a log entry is successfully
replicated to the majority of the followers.
This way, only one phase execution to
reach consensus is needed for each state change operation when there is a
stable leader.
Following sections describe how Raft implements a replicated log.
Replicating client requests
Figure 1: Replication
For each log entry, the leader appends it to its local Write-Ahead log
and then sends it to all the followers.
leader (class ReplicatedLog…)
private Long appendAndReplicate(byte[] data) Long lastLogEntryIndex = appendToLocalLog(data); replicateOnFollowers(lastLogEntryIndex); return lastLogEntryIndex; private void replicateOnFollowers(Long entryAtIndex) for (final FollowerHandler follower : followers) replicateOn(follower, entryAtIndex); //send replication requests to followers
The followers handle the replication request and append the log entries to their local logs.
After successfully appending the log entries, they respond to the leader with the index of the
latest log entry they have.
The response also includes the current Generation Clock of the server.
The followers also check if the entries already exist or there are entries beyond
the ones which are being replicated.
It ignores entries which are already present. But if there are entries which are from different generations,
they remove the conflicting entries.
follower (class ReplicatedLog…)
void maybeTruncate(ReplicationRequest replicationRequest) replicationRequest.getEntries().stream() .filter(entry -> wal.getLastLogIndex() >= entry.getEntryIndex() && entry.getGeneration() != wal.readAt(entry.getEntryIndex()).getGeneration()) .forEach(entry -> wal.truncate(entry.getEntryIndex()));
follower (class ReplicatedLog…)
private ReplicationResponse appendEntries(ReplicationRequest replicationRequest) List<WALEntry> entries = replicationRequest.getEntries(); entries.stream() .filter(e -> !wal.exists(e)) .forEach(e -> wal.writeEntry(e)); return new ReplicationResponse(SUCCEEDED, serverId(), replicationState.getGeneration(), wal.getLastLogIndex());
The follower rejects the replication request when the generation number in the request
is lower than the latest generation the server knows about.
This notifies the leader to step down and become a follower.
follower (class ReplicatedLog…)
Long currentGeneration = replicationState.getGeneration(); if (currentGeneration > request.getGeneration()) return new ReplicationResponse(FAILED, serverId(), currentGeneration, wal.getLastLogIndex());
The Leader keeps track of log indexes replicated at each server, when responses are received.
It uses it to track the log entries which are successfully copied to the Quorum
and tracks the index as a commitIndex. commitIndex is the High-Water Mark in the log.
leader (class ReplicatedLog…)
logger.info("Updating matchIndex for " + response.getServerId() + " to " + response.getReplicatedLogIndex());
updateMatchingLogIndex(response.getServerId(), response.getReplicatedLogIndex());
var logIndexAtQuorum = computeHighwaterMark(logIndexesAtAllServers(), config.numberOfServers());
var currentHighWaterMark = replicationState.getHighWaterMark();
if (logIndexAtQuorum > currentHighWaterMark && logIndexAtQuorum != 0)
applyLogEntries(currentHighWaterMark, logIndexAtQuorum);
replicationState.setHighWaterMark(logIndexAtQuorum);
leader (class ReplicatedLog…)
Long computeHighwaterMark(List<Long> serverLogIndexes, int noOfServers) serverLogIndexes.sort(Long::compareTo); return serverLogIndexes.get(noOfServers / 2);
leader (class ReplicatedLog…)
private void updateMatchingLogIndex(int serverId, long replicatedLogIndex) FollowerHandler follower = getFollowerHandler(serverId); follower.updateLastReplicationIndex(replicatedLogIndex);
leader (class ReplicatedLog…)
public void updateLastReplicationIndex(long lastReplicatedLogIndex) this.matchIndex = lastReplicatedLogIndex;
Full replication
It is important to ensure that all the cluster nodes
receive all the log entries from the leader, even when
they are disconnected or they crash and come back up.
Raft has a mechanism to make sure all the cluster nodes receive
all the log entries from the leader.
With every replication request in Raft, the leader also sends the log
index and generation of the log entries which immediately precede
the new entries getting replicated. If the previous log index and
term do not match with its local log, the followers reject the request.
This indicates to the leader that the follower log needs to be synced
for some of the older entries.
follower (class ReplicatedLog…)
if (!wal.isEmpty() && request.getPrevLogIndex() >= wal.getLogStartIndex() && generationAt(request.getPrevLogIndex()) != request.getPrevLogGeneration()) return new ReplicationResponse(FAILED, serverId(), replicationState.getGeneration(), wal.getLastLogIndex());
follower (class ReplicatedLog…)
private Long generationAt(long prevLogIndex) WALEntry walEntry = wal.readAt(prevLogIndex); return walEntry.getGeneration();
So the leader decrements the matchIndex and tries sending
log entries at the lower index. This continues until the followers
accept the replication request.
leader (class ReplicatedLog…)
//rejected because of conflicting entries, decrement matchIndex FollowerHandler peer = getFollowerHandler(response.getServerId()); logger.info("decrementing nextIndex for peer " + peer.getId() + " from " + peer.getNextIndex()); peer.decrementNextIndex(); replicateOn(peer, peer.getNextIndex());
This check on the previous log index and generation
allows the leader to detect two things.
- If the follower log has missing entries.
For example, if the follower log has only one entry
and the leader starts replicating the third entry,
the requests will be rejected until the leader replicates
the second entry. -
If the previous entries in the log are from a different
generation, higher or lower than the corresponding entries
in the leader log. The leader will try replicating entries
from lower indexes until the requests get accepted.
The followers truncate the entries for which the generation
does not match.
This way, the leader tries to push its own log to all the followers
continuously by using the previous index to detect missing entries
or conflicting entries.
This makes sure that all the cluster nodes eventually
receive all the log entries from the leader even when they
are disconnected for some time.
Raft does not have a separate commit message, but sends the commitIndex as part
of the normal replication requests.
The empty replication requests are also sent as heartbeats.
So commitIndex is sent to followers as part of the heartbeat requests.
Log entries are executed in the log order
Once the leader updates its commitIndex, it executes the log entries in order,
from the last value of the commitIndex to the latest value of the commitIndex.
The client requests are completed and the response is returned to the client
once the log entries are executed.
class ReplicatedLog…
private void applyLogEntries(Long previousCommitIndex, Long commitIndex)
for (long index = previousCommitIndex + 1; index <= commitIndex; index++)
WALEntry walEntry = wal.readAt(index);
var responses = stateMachine.applyEntries(Arrays.asList(walEntry));
completeActiveProposals(index, responses);
The leader also sends the commitIndex with the heartbeat requests it sends to the followers.
The followers update the commitIndex and apply the entries the same way.
class ReplicatedLog…
private void updateHighWaterMark(ReplicationRequest request) if (request.getHighWaterMark() > replicationState.getHighWaterMark()) var previousHighWaterMark = replicationState.getHighWaterMark(); replicationState.setHighWaterMark(request.getHighWaterMark()); applyLogEntries(previousHighWaterMark, request.getHighWaterMark());
Leader Election
Leader election is the phase where log entries committed in the previous quorum
are detected.
Every cluster node operates in three states: candidate, leader or follower.
The cluster nodes start in a follower state expecting
a HeartBeat from an existing leader.
If a follower doesn’t hear from any leader in a predetermined time period
,it moves to the candidate state and starts leader-election.
The leader election algorithm establishes a new Generation Clock
value. Raft refers to the Generation Clock as term.
The leader election mechanism also makes sure the elected leader has as many
up-to-date log entries stipulated by the quorum.
This is an optimization done by Raft
which avoids log entries from previous Quorum
being transferred to the new leader.
New leader election is started by sending each of the peer servers
a message requesting a vote.
class ReplicatedLog…
private void startLeaderElection() replicationState.setGeneration(replicationState.getGeneration() + 1); registerSelfVote(); requestVoteFrom(followers);
Once a server is voted for in a given Generation Clock,
the same vote is returned for that generation always.
This ensures that some other server requesting a vote for the
same generation is not elected, when a successful election has already
happened.
The handling of the vote request happens as follows:
class ReplicatedLog…
VoteResponse handleVoteRequest(VoteRequest voteRequest) //for higher generation request become follower. // But we do not know who the leader is yet. if (voteRequest.getGeneration() > replicationState.getGeneration()) becomeFollower(LEADER_NOT_KNOWN, voteRequest.getGeneration()); VoteTracker voteTracker = replicationState.getVoteTracker(); if (voteRequest.getGeneration() == replicationState.getGeneration() && !replicationState.hasLeader()) if(isUptoDate(voteRequest) && !voteTracker.alreadyVoted()) voteTracker.registerVote(voteRequest.getServerId()); return grantVote(); if (voteTracker.alreadyVoted()) return voteTracker.votedFor == voteRequest.getServerId() ? grantVote():rejectVote(); return rejectVote(); private boolean isUptoDate(VoteRequest voteRequest) (voteRequest.getLastLogEntryGeneration() == wal.getLastLogEntryGeneration() && voteRequest.getLastLogEntryIndex() >= wal.getLastLogIndex()); return result;
The server which receives votes from the majority of the servers
transitions to the leader state. The majority is determined as discussed
in Quorum. Once elected, the leader continuously
sends a HeartBeat to all of the followers.
If the followers don’t receive a HeartBeat
in a specified time interval,
a new leader election is triggered.
Log entries from previous generation
As discussed in the above section, the first phase of the consensus
algorithms detects the existing values, which had been copied
on the previous runs of the algorithm. The other key aspect is that
these values are proposed as the values with the latest generation
of the leader. The second phase decides that the value is committed
only if the values are proposed for the current generation.
Raft never updates generation numbers for the existing entries
in the log. So if the leader has log entries from the older generation
which are missing from some of the followers,
it can not mark those entries as committed just based on
the majority quorum.
That is because some other server which may not be available now,
can have an entry at the same index with higher generation.
If the leader goes down without replicating an entry from
its current generation, those entries can get overwritten by the new leader.
So in Raft, the new leader must commit at least one entry in its term.
It can then safely commit all the previous entries.
Most practical implementations of Raft try to commit a no-op entry
immediately after a leader election, before the leader is considered
ready to serve client requests.
Refer to [raft-phd] section 3.6.1 for details.
An example leader-election
Consider five servers, athens, byzantium, cyrene, delphi and ephesus.
ephesus is the leader for generation 1. It has replicated entries to
itself, delphi and athens.
Figure 2: Lost heartbeat triggers an election
At this point, ephesus and delphi get disconnected from the rest of the cluster.
byzantium has the least election timeout, so it
triggers the election by incrementing its Generation Clock to 2.
cyrene has its generation less than 2 and it also has same log entry as byzantium.
So it grants the vote. But athens has an extra entry in its log. So it rejects the vote.
Because byzantium can not get a majority 3 votes, it loses the election
and moves back to follower state.
Figure 3: Lost election because log is not up-to-date
athens times out and triggers the election next. It increments the Generation Clock
to 3 and sends vote request to byzantium and cyrene.
Because both byzantium and cyrene have lower generation number and less log entries than
athens, they both grant the vote to athens.
Once athens gets majority of the votes, it becomes the leader and starts
sending HeartBeats to byzantium and cyrene.
Once byzantium and cyrene receive a heartbeat from the leader at higher generation,
they increment their generation. This confirms the leadership of athens.
athens then replicates its own log to byzantium and cyrene.
Figure 4: Node with up-to-date log wins election
athens now replicates Entry2 from generation 1 to byzantium and cyrene.
But because it’s an entry from the previous generation,
it does not update the commitIndex even when Entry2 is successfully replicated
on the majority quorum.
athens appends a no-op entry to its local log.
After this new entry in generation 3 is successfully replicated,
it updates the commitIndex
If ephesus comes back up or restores network connectivity and sends
request to cyrene. Because cyrene is now at generation 3, it rejects the requests.
ephesus gets the new term in the rejection response, and steps down to be a follower.
Figure 7: Leader step-down