Solution
The essence of two phase commit, unsurprisingly, is that it carries out an
update in two phases:
- the first, prepare, asks each node if it’s able to promise to carry out
the update - the second, commit, actually carries it out.
As part of the prepare phase, each node participating in the transaction
acquires whatever it needs to assure that it will be able to do the
commit in the second phase, for instance any locks that are required.
Once each node is able to ensure it can commit in the second phase, it lets
the coordinator know, effectively promising the coordinator that it can and will
commit in the second phase. If any node is unable to make that promise, then
the coordinator tells all nodes to rollback, releasing any locks they have,
and the transaction is aborted. Only if all the participants agree to go
ahead does the second phase commence, at which point it’s expected they will
all successfully update.
Considering a simple distributed key value store implementation,
the two phase commit protocol works as follows.
The transactional client creates a unique identifier called a transaction identifier.
The client also keeps track of other details like the transaction start time.
This is used, as described later by the locking mechanism, to prevent deadlocks.
The unique id, along with the additional details like the start timestamp,
that the client tracks is used to refer the transaction across the cluster nodes.
The client maintains a transaction reference as follows, which is passed along
with every request from the client to other cluster nodes.
class TransactionRef…
private UUID txnId; private long startTimestamp; public TransactionRef(long startTimestamp) this.txnId = UUID.randomUUID(); this.startTimestamp = startTimestamp;
class TransactionClient…
TransactionRef transactionRef; public TransactionClient(ReplicaMapper replicaMapper, SystemClock systemClock) this.clock = systemClock; this.transactionRef = new TransactionRef(clock.now()); this.replicaMapper = replicaMapper;
One of the cluster nodes acts as a coordinator which tracks the status of the
transaction on behalf of the client.
In a key-value store, it is generally the cluster node holding data for
one of the keys. It is generally picked up as the cluster node storing data
for the first key used by the client.
Before storing any value, the client communicates with the coordinator to notify
it about the start of the transaction.
Because the coordinator is one of the cluster nodes storing values,
it is picked up dynamically when the client initiates a get or put operation
with a specific key.
class TransactionClient…
private TransactionalKVStore coordinator;
private void maybeBeginTransaction(String key)
if (coordinator == null)
coordinator = replicaMapper.serverFor(key);
coordinator.begin(transactionRef);
The transaction coordinator keeps track of the status of the transaction.
It records every change in a Write-Ahead Log to make sure
that the details are available in case of a crash.
class TransactionCoordinator…
Map<TransactionRef, TransactionMetadata> transactions = new ConcurrentHashMap<>(); WriteAheadLog transactionLog; public void begin(TransactionRef transactionRef) TransactionMetadata txnMetadata = new TransactionMetadata(transactionRef, systemClock, transactionTimeoutMs); transactionLog.writeEntry(txnMetadata.serialize()); transactions.put(transactionRef, txnMetadata);
class TransactionMetadata…
private TransactionRef txn; private List<String> participatingKeys = new ArrayList<>(); private TransactionStatus transactionStatus;
The client sends each key which is part of the transaction to the coordinator.
This way the coordinator tracks all the keys which are part of the transaction.
The coordinator records the keys which are part of the transaction in
the transaction metadata.The keys then can be used to know about all of the
cluster nodes which are part of the transaction.
Because each key-value is generally replicated with the
Replicated Log,
the leader server handling the requests for a particular key might change
over the lifetime of the transaction, so the keys are tracked instead of
the actual server addresses.
The client then sends the put or get requests to the server holding the data
for the key. The server is picked based on the partitioning strategy.
The thing to note is that the client directly communicates with the server
and not through the coordinator. This avoids sending data twice over the network,
from client to coordinator, and then from coordinator to the respective server.
The keys then can be used to know about all the cluster nodes which are
part of the transaction. Because each key-value is generally replicated with
Replicated Log, the leader server handling the requests
for a particular key might change over the life time of the transaction, so
keys are tracked, rather than the actual server addresses.
class TransactionClient…
public CompletableFuture<String> get(String key) maybeBeginTransaction(key); coordinator.addKeyToTransaction(transactionRef, key); TransactionalKVStore kvStore = replicaMapper.serverFor(key); return kvStore.get(transactionRef, key); public void put(String key, String value) maybeBeginTransaction(key); coordinator.addKeyToTransaction(transactionRef, key); replicaMapper.serverFor(key).put(transactionRef, key, value);
class TransactionCoordinator…
public synchronized void addKeyToTransaction(TransactionRef transactionRef, String key) TransactionMetadata metadata = transactions.get(transactionRef); if (!metadata.getParticipatingKeys().contains(key)) metadata.addKey(key); transactionLog.writeEntry(metadata.serialize());
The cluster node handling the request detects that the request is part of a
transaction with the transaction ID. It manages the state of the transaction,
where it stores the key and the value in the request. The key values are not
directly made available to the key value store, but stored separately.
class TransactionalKVStore…
public void put(TransactionRef transactionRef, String key, String value) TransactionState state = getOrCreateTransactionState(transactionRef); state.addPendingUpdates(key, value);
Locks and Transaction Isolation
The requests also take a lock on the keys.
Particularly, the get requests take a read lock and
the put requests take a write lock. The read locks are taken as the
values are read.
class TransactionalKVStore…
public CompletableFuture<String> get(TransactionRef txn, String key) CompletableFuture<TransactionRef> lockFuture = lockManager.acquire(txn, key, LockMode.READ); return lockFuture.thenApply(transactionRef -> getOrCreateTransactionState(transactionRef); return kv.get(key); ); synchronized TransactionState getOrCreateTransactionState(TransactionRef txnRef) TransactionState state = this.ongoingTransactions.get(txnRef); if (state == null) state = new TransactionState(); this.ongoingTransactions.put(txnRef, state); return state;
The write locks can be taken only when the transaction is about to commit
and the values are to be made visible in the key value store. Until then, the
cluster node can just track the modified values as pending operations.
Delaying locking decreases the chances of conflicting transactions.
class TransactionalKVStore…
public void put(TransactionRef transactionRef, String key, String value) TransactionState state = getOrCreateTransactionState(transactionRef); state.addPendingUpdates(key, value);
It is important to note that the locks are long lived and not released
when the request completes. They are released only when the transaction commits.
This technique of holding locks for the duration of the transaction
and releasing them only when the transaction commits or rolls back is
called two-phase-locking.
Two-phase locking is critical in providing the serializable isolation level.
Serializable meaning that the effects of the transactions are visible as
if they are executed one at a time.
Deadlock Prevention
Usage of locks can cause deadlocks where two transactions wait for
each other to release the locks. Deadlocks can be avoided if transactions
are not allowed to wait and aborted when the conflicts are detected.
There are different strategies used to decide which transactions are
aborted and which are allowed to continue.
The lock manager implements these wait policies as follows:
class LockManager…
WaitPolicy waitPolicy;
The WaitPolicy decides what to do when there are conflicting requests.
public enum WaitPolicy WoundWait, WaitDie, Error
The lock is an object which tracks the transactions which currently
own the lock and the ones which are waiting for the lock.
class Lock…
Queue<LockRequest> waitQueue = new LinkedList<>(); List<TransactionRef> owners = new ArrayList<>(); LockMode lockMode;
When a transaction requests to acquire a lock, the lock manager grants
the lock immediately if there are no conflicting transactions already
owning the lock.
class LockManager…
public synchronized CompletableFuture<TransactionRef> acquire(TransactionRef txn, String key, LockMode lockMode) return acquire(txn, key, lockMode, new CompletableFuture<>()); CompletableFuture<TransactionRef> acquire(TransactionRef txnRef, String key, LockMode askedLockMode, CompletableFuture<TransactionRef> lockFuture) { Lock lock = getOrCreateLock(key); logger.debug("acquiring lock for = " + txnRef + " on key = " + key + " with lock mode = " + askedLockMode); if (lock.isCompatible(txnRef, askedLockMode)) lock.addOwner(txnRef, askedLockMode); lockFuture.complete(txnRef); logger.debug("acquired lock for = " + txnRef); return lockFuture;
class Lock…
public boolean isCompatible(TransactionRef txnRef, LockMode lockMode) if(hasOwner()) isUpgrade(txnRef, lockMode); return true;
If there are conflicts,
the lock manager acts depending on the wait policy.
Error On Conflict
If the wait policy is to error out, it will throw an error and the calling
transaction will rollback and retry after a random timeout.
class LockManager…
private CompletableFuture<TransactionRef> handleConflict(Lock lock, TransactionRef txnRef, String key, LockMode askedLockMode, CompletableFuture<TransactionRef> lockFuture) switch (waitPolicy) case Error: lockFuture.completeExceptionally(new WriteConflictException(txnRef, key, lock.owners)); return lockFuture; case WoundWait: return lock.woundWait(txnRef, key, askedLockMode, lockFuture, this); case WaitDie: return lock.waitDie(txnRef, key, askedLockMode, lockFuture, this); throw new IllegalArgumentException("Unknown waitPolicy " + waitPolicy);
In case of contention when there are a lot of user transactions
trying to acquire locks, if all of them need to restart, it severely
limits the systems throughput.
Data stores try to make sure that there are minimal transaction restarts.
A common technique is to assign a unique ID to transactions and order
them. For example, Spanner assigns unique IDs to transactions
in such a way that they can be ordered.
The technique is very similar to the one discussed in
Paxos to order requests across cluster nodes.
Once the transactions can be ordered, there are two techniques used
to avoid deadlock, but still allow transactions to continue without
restarting
The transaction reference is created in such a way that it can be
compared and ordered with other transaction references. The easiest
method is to assign a timestamp to each transaction and compare based
on the timestamp.
class TransactionRef…
boolean after(TransactionRef otherTransactionRef) return this.startTimestamp > otherTransactionRef.startTimestamp;
But in distributed systems,
wall clocks are not monotonic, so a different method like
assigning unique IDs to transactions in such a way that
they can be ordered is used. Along with ordered IDs, the age of each
is tracked to be able to order the transactions.
Spanner orders transactions by tracking the age of each
transaction in the system.
To be able to order all the transactions, each cluster node is assigned
a unique ID. The client picks up the coordinator at the start of
the transaction and gets the transaction ID from the coordinator
The cluster node acting as a coordinator generates transaction
IDs as follows.
class TransactionCoordinator…
private int requestId; public MonotonicId begin() return new MonotonicId(requestId++, config.getServerId());
class MonotonicId…
public class MonotonicId implements Comparable<MonotonicId> { public int requestId; int serverId; public MonotonicId(int requestId, int serverId) this.serverId = serverId; this.requestId = requestId; public static MonotonicId empty() return new MonotonicId(-1, -1); public boolean isAfter(MonotonicId other) if (this.requestId == other.requestId) return this.serverId > other.serverId; return this.requestId > other.requestId;
class TransactionClient…
private void beginTransaction(String key) if (coordinator == null) coordinator = replicaMapper.serverFor(key); MonotonicId transactionId = coordinator.begin(); transactionRef = new TransactionRef(transactionId, clock.nanoTime());
The client tracks the age of the transaction by recording
the elapsed time since the beginning of the transaction.
class TransactionRef…
public void incrementAge(SystemClock clock) age = clock.nanoTime() - startTimestamp;
The client increments the age, every time a get or a put request
is sent to the servers. The transactions are then ordered as
per their age. The transaction id is used to break the ties when
there are same age transactions.
class TransactionRef…
public boolean isAfter(TransactionRef other) return age == other.age? this.id.isAfter(other.id) :this.age > other.age;
Wound-Wait
In the wound-wait method, if there is a conflict,
the transaction reference asking for the lock is compared to all the
transactions currently owning the lock. If the lock owners are all
younger than the transaction asking for the lock, all of those transactions are aborted.
But if the transaction asking the lock is younger than the ones owning
the transaction, it waits for the lock
class Lock…
public CompletableFuture<TransactionRef> woundWait(TransactionRef txnRef, String key, LockMode askedLockMode, CompletableFuture<TransactionRef> lockFuture, LockManager lockManager) if (allOwningTransactionsStartedAfter(txnRef) && !anyOwnerIsPrepared(lockManager)) abortAllOwners(lockManager, key, txnRef); return lockManager.acquire(txnRef, key, askedLockMode, lockFuture); LockRequest lockRequest = new LockRequest(txnRef, key, askedLockMode, lockFuture); lockManager.logger.debug("Adding to wait queue = " + lockRequest); addToWaitQueue(lockRequest); return lockFuture;
class Lock…
private boolean allOwningTransactionsStartedAfter(TransactionRef txn) return owners.stream().filter(o -> !o.equals(txn)).allMatch(owner -> owner.after(txn));
One of the key things to notice is that if the transaction owning
the lock is already in the prepared state of two-phase-commit, it is
not aborted.
Wait-Die
The wait-die method works in the opposite way
to wound-wait.
If the lock owners are all younger than the transaction
asking for the lock, then the transaction waits for the lock.
But if the transaction asking for the lock is younger than the ones owning
the transaction, the transaction is aborted.
class Lock…
public CompletableFuture<TransactionRef> waitDie(TransactionRef txnRef, String key, LockMode askedLockMode, CompletableFuture<TransactionRef> lockFuture, LockManager lockManager) if (allOwningTransactionsStartedAfter(txnRef)) addToWaitQueue(new LockRequest(txnRef, key, askedLockMode, lockFuture)); return lockFuture; lockManager.abort(txnRef, key); lockFuture.completeExceptionally(new WriteConflictException(txnRef, key, owners)); return lockFuture;
Wound-wait mechanism generally has
fewer restarts
compared to the wait-die method.
So data stores like Spanner use the wound-wait
method.
When the owner of the transaction releases a lock,
the waiting transactions are granted the lock.
class LockManager…
private void release(TransactionRef txn, String key) Optional<Lock> lock = getLock(key); lock.ifPresent(l -> l.release(txn, this); );
class Lock…
public void release(TransactionRef txn, LockManager lockManager) removeOwner(txn); if (hasWaiters()) LockRequest lockRequest = getFirst(lockManager.waitPolicy); lockManager.acquire(lockRequest.txn, lockRequest.key, lockRequest.lockMode, lockRequest.future);
Commit and Rollback
Once the client successfully reads without facing any conflicts and
writes all the key values, it initiates the commit request by sending
a commit request to the coordinator.
class TransactionClient…
public CompletableFuture<Boolean> commit() return coordinator.commit(transactionRef);
The transaction coordinator records the state of the transaction as
preparing to commit. The coordinator implements the commit handling in
two phases.
- It first sends the prepare request to each of the participants.
- Once it receives a successful response from all the participants,
the coordinator marks the transaction as prepared to complete.
Then it sends the commit request to all the participants.
class TransactionCoordinator…
public CompletableFuture<Boolean> commit(TransactionRef transactionRef) TransactionMetadata metadata = transactions.get(transactionRef); metadata.markPreparingToCommit(transactionLog); List<CompletableFuture<Boolean>> allPrepared = sendPrepareRequestToParticipants(transactionRef); CompletableFuture<List<Boolean>> futureList = sequence(allPrepared); return futureList.thenApply(result -> if (!result.stream().allMatch(r -> r)) logger.info("Rolling back = " + transactionRef); rollback(transactionRef); return false; metadata.markPrepared(transactionLog); sendCommitMessageToParticipants(transactionRef); metadata.markCommitComplete(transactionLog); return true; ); public List<CompletableFuture<Boolean>> sendPrepareRequestToParticipants(TransactionRef transactionRef) TransactionMetadata transactionMetadata = transactions.get(transactionRef); var transactionParticipants = getParticipants(transactionMetadata.getParticipatingKeys()); return transactionParticipants.keySet() .stream() .map(server -> server.handlePrepare(transactionRef)) .collect(Collectors.toList()); private void sendCommitMessageToParticipants(TransactionRef transactionRef) TransactionMetadata transactionMetadata = transactions.get(transactionRef); var participantsForKeys = getParticipants(transactionMetadata.getParticipatingKeys()); participantsForKeys.keySet().stream() .forEach(kvStore -> List<String> keys = participantsForKeys.get(kvStore); kvStore.handleCommit(transactionRef, keys); ); private Map<TransactionalKVStore, List<String>> getParticipants(List<String> participatingKeys) return participatingKeys.stream() .map(k -> Pair.of(serverFor(k), k)) .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())));
The cluster node receiving the prepare requests do two things:
- It tries to grab the write locks for all of the keys.
- Once successful, it writes all of the changes to the write-ahead log.
If it can successfully do these,
it can guarantee that there are no conflicting transactions,
and even in the case of a crash the cluster node can recover all the
required state to complete the transaction.
class TransactionalKVStore…
public synchronized CompletableFuture<Boolean> handlePrepare(TransactionRef txn)
try
TransactionState state = getTransactionState(txn);
if (state.isPrepared())
return CompletableFuture.completedFuture(true); //already prepared.
if (state.isAborted())
return CompletableFuture.completedFuture(false); //aborted by another transaction.
Optional<Map<String, String>> pendingUpdates = state.getPendingUpdates();
CompletableFuture<Boolean> prepareFuture = prepareUpdates(txn, pendingUpdates);
return prepareFuture.thenApply(ignored ->
Map<String, Lock> locksHeldByTxn = lockManager.getAllLocksFor(txn);
state.markPrepared();
writeToWAL(new TransactionMarker(txn, locksHeldByTxn, TransactionStatus.PREPARED));
return true;
);
catch (TransactionException
private CompletableFuture<Boolean> prepareUpdates(TransactionRef txn, Optional<Map<String, String>> pendingUpdates)
if (pendingUpdates.isPresent())
Map<String, String> pendingKVs = pendingUpdates.get();
CompletableFuture<List<TransactionRef>> lockFuture = acquireLocks(txn, pendingKVs.keySet());
return lockFuture.thenApply(ignored ->
writeToWAL(txn, pendingKVs);
return true;
);
return CompletableFuture.completedFuture(true);
TransactionState getTransactionState(TransactionRef txnRef)
return ongoingTransactions.get(txnRef);
private void writeToWAL(TransactionRef txn, Map<String, String> pendingUpdates)
for (String key : pendingUpdates.keySet())
String value = pendingUpdates.get(key);
wal.writeEntry(new SetValueCommand(txn, key, value).serialize());
private CompletableFuture<List<TransactionRef>> acquireLocks(TransactionRef txn, Set<String> keys)
List<CompletableFuture<TransactionRef>> lockFutures = new ArrayList<>();
for (String key : keys)
CompletableFuture<TransactionRef> lockFuture = lockManager.acquire(txn, key, LockMode.READWRITE);
lockFutures.add(lockFuture);
return sequence(lockFutures);
When the cluster node receives the commit message from the coordinator,
it is safe to make the key-value changes visible.
The cluster node does three things while committing the changes:
- It marks the transaction as committed. Should the cluster node fail at this point,
it knows the outcome of the transaction, and can repeat the following steps. - It applies all the changes to the key-value storage
- It releases all the acquired locks.
class TransactionalKVStore…
public synchronized void handleCommit(TransactionRef transactionRef, List<String> keys)
if (!ongoingTransactions.containsKey(transactionRef))
return; //this is a no-op. Already committed.
if (!lockManager.hasLocksFor(transactionRef, keys))
throw new IllegalStateException("Transaction " + transactionRef + " should hold all the required locks for keys " + keys);
writeToWAL(new TransactionMarker(transactionRef, TransactionStatus.COMMITTED, keys));
applyPendingUpdates(transactionRef);
releaseLocks(transactionRef, keys);
private void removeTransactionState(TransactionRef txnRef)
ongoingTransactions.remove(txnRef);
private void applyPendingUpdates(TransactionRef txnRef)
TransactionState state = getTransactionState(txnRef);
Optional<Map<String, String>> pendingUpdates = state.getPendingUpdates();
apply(txnRef, pendingUpdates);
private void apply(TransactionRef txnRef, Optional<Map<String, String>> pendingUpdates)
if (pendingUpdates.isPresent())
Map<String, String> pendingKv = pendingUpdates.get();
apply(pendingKv);
removeTransactionState(txnRef);
private void apply(Map<String, String> pendingKv)
for (String key : pendingKv.keySet())
String value = pendingKv.get(key);
kv.put(key, value);
private void releaseLocks(TransactionRef txn, List<String> keys)
lockManager.release(txn, keys);
private Long writeToWAL(TransactionMarker transactionMarker)
return wal.writeEntry(transactionMarker.serialize());
The rollback is implemented in a similar way. If there is any failure,
the client communicates with the coordinator to rollback the transaction.
class TransactionClient…
public void rollback() coordinator.rollback(transactionRef);
The transaction coordinator records the state of the transaction as preparing
to rollback. Then it forwards the rollback request to all of the servers
which stored the values for the given transaction.
Once all of the requests are successful, the coordinator marks the transaction
rollback as complete.In case the coordinator crashes after the transaction
is marked as ‘prepared to rollback’, it can keep on sending the rollback
messages to all the participating cluster nodes.
class TransactionCoordinator…
public void rollback(TransactionRef transactionRef) transactions.get(transactionRef).markPrepareToRollback(this.transactionLog); sendRollbackMessageToParticipants(transactionRef); transactions.get(transactionRef).markRollbackComplete(this.transactionLog); private void sendRollbackMessageToParticipants(TransactionRef transactionRef) TransactionMetadata transactionMetadata = transactions.get(transactionRef); var participants = getParticipants(transactionMetadata.getParticipatingKeys()); for (TransactionalKVStore kvStore : participants.keySet()) List<String> keys = participants.get(kvStore); kvStore.handleRollback(transactionMetadata.getTxn(), keys);
The cluster nodes receiving the rollback request does three things:
- It records the state of the transaction as rolled back in the write-ahead log.
- It discards the transaction state.
- It releases all of the locks
class TransactionalKVStore…
public synchronized void handleRollback(TransactionRef transactionRef, List<String> keys) if (!ongoingTransactions.containsKey(transactionRef)) return; //no-op. Already rolled back. writeToWAL(new TransactionMarker(transactionRef, TransactionStatus.ROLLED_BACK, keys)); this.ongoingTransactions.remove(transactionRef); this.lockManager.release(transactionRef, keys);
Idempotent Operations
In case of network failures, the coordinator can retry calls to
prepare, commit or abort. So these operations need to be
idempotent.
An Example Scenario
Atomic Writes
Consider the following scenario. Paula Blue has a truck and Steven Green
has a backhoe.
The availability and the booking status of the truck and the backhoe
are stored on a distributed key-value store.
Depending on how the keys are mapped to servers,
Blue’s truck and Green’s backhoe bookings are stored on
separate cluster nodes.
Alice is trying to book a truck
and backhoe for the construction work she is planning to start on a Monday.
She needs both the truck and the backhoe to be available.
The booking scenario happens as follows.
Alice checks the availability of Blue’s truck and Green’s backhoe.
by reading the keys ‘truck_booking_monday’ and ‘backhoe_booking_monday’
If the values are empty, the booking is free.
She reserves the truck and the backhoe.
It is important that both the values are set atomically.
If there is any failure, then none of the values is set.
The commit happens in two phases. The first server Alice
contacts acts as the coordinator and executes the two phases.
The coordinator is a separate participant in the
protocol, and is shown that way on the sequence diagram. However
usually one of the servers (Blue or Green) acts as
the coordinator, thus playing two roles in the interaction.
Conflicting Transactions
Consider a scenario where another person, Bob, is also trying to book a
truck and backhoe for construction work on the same Monday.
The booking scenario happens as follows:
- Both Alice and Bob read the keys ‘truck_booking_monday’
and ‘backhoe_booking_monday’ - Both see that the values are empty, meaning the booking is free.
- Both try to book the truck and the backhoe.
The expectation is that, only Alice or Bob, should be able to book,
because the transactions are conflicting.
In case of errors, the whole flow needs to be retried and hopefully,
one will go ahead with the booking.
But in no situation, should booking be done partially.
Either both bookings should be done or neither is done.
To check the availability, both Alice and Bob start a transaction
and contact Blue and Green’s servers respectively to check for the availability.
Blue holds a read lock for the key “truck_booking_on_monday” and
Green holds a read lock for the key “backhoe_booking_on_monday”.
Because read locks are shared, both Alice and Bob can read the values.
Alice and Bob see that both the bookings are available on Monday.
So they reserve by sending the put requests to servers.
Both the servers hold the put requests in the temporary storage.
When Alice and Bob decide to commit the transactions-
assuming that Blue acts as a coordinator- it triggers the two-phase
commit protocol and sends the prepare requests to itself and Green.
For Alice’s request it tries to grab a write lock for the key ‘truck_booking_on_monday’, which
it can not get, because there is a conflicting read lock grabbed by
another transaction. So Alice’s transaction fails in the prepare phase.
The same thing happens with Bob’s request.
Transactions can be retried with a retry loop as follows:
class TransactionExecutor…
public boolean executeWithRetry(Function<TransactionClient, Boolean> txnMethod, ReplicaMapper replicaMapper, SystemClock systemClock) for (int attempt = 1; attempt <= maxRetries; attempt++) TransactionClient client = new TransactionClient(replicaMapper, systemClock); try boolean checkPassed = txnMethod.apply(client); Boolean successfullyCommitted = client.commit().get(); return checkPassed && successfullyCommitted; catch (Exception e) logger.error("Write conflict detected while executing." + client.transactionRef + " Retrying attempt " + attempt); client.rollback(); randomWait(); //wait for random interval return false;
The example booking code for Alice and Bob will look as follows:
class TransactionalKVStoreTest…
@Test public void retryWhenConflict() (!aliceTxn.isSuccess() && bobTxn.isSuccess()), "waiting for one txn to complete", Duration.ofSeconds(50)); private TransactionExecutor bookTransactionally(List<TransactionalKVStore> allServers, String user, SystemClock systemClock) List<String> bookingKeys = Arrays.asList("truck_booking_on_monday", "backhoe_booking_on_monday"); TransactionExecutor t1 = new TransactionExecutor(allServers); t1.executeAsyncWithRetry(txnClient -> if (txnClient.isAvailable(bookingKeys)) txnClient.reserve(bookingKeys, user); return true; return false; , systemClock); return t1;
In this case one of the transactions will eventually succeed and
the other will back out.
While it is very easy to implement, with Error WaitPolicy ,
there will be multiple transaction restarts,reducing the overall
throughput.
As explained in the above section, if Wound-Wait policy is used
it will have fewer transaction restarts. In the above example,
only one transaction will possibly restart instead of both restarting
in case of conflicts.
Using Versioned Value
It is very constraining to have conflicts for all the read and write
operations, particularly so when the transactions can be read-only.
It is optimal if read-only transactions can work without holding any
locks and still guarantee that the values read in a transaction
do not change with a concurrent read-write transaction.
Data-stores generally store multiple versions of the values,
as described in Versioned Value.
The version used is the timestamp following Lamport Clock.
Mostly a Hybrid Clock is used in databases like
MongoDB or CockroachDB.
To use it with the two-phase commit protocol, the trick is that every server
participating in the transaction sends the timestamp it can write the
values at, as response to the prepare request.
The coordinator chooses the maximum of these timestamps as a
commit timestamp and sends it along with the value.
The participating servers then save the value at the commit timestamp.
This allows read-only requests to be executed without holding locks,
because it’s guaranteed that the value written at a particular timestamp
is never going to change.
Consider a simple example as follows. Philip is running a report to read
all of the bookings that happened until timestamp 2. If it is a long-running
operation holding a lock, Alice, who is trying to book a truck, will be blocked
until Philip’s work completes. With Versioned Value
Philip’s get requests, which are part of a read-only operation, can continue
at timestamp 2, while Alice’s booking continues at timestamp 4.
Note that read requests which are part of a read-write transaction,
still need to hold a lock.
The example code with Lamport Clock looks as follows:
class MvccTransactionalKVStore…
public String readOnlyGet(String key, long readTimestamp) adjustServerTimestamp(readTimestamp); return kv.get(new VersionedKey(key, readTimestamp)); public CompletableFuture<String> get(TransactionRef txn, String key, long readTimestamp) adjustServerTimestamp(readTimestamp); CompletableFuture<TransactionRef> lockFuture = lockManager.acquire(txn, key, LockMode.READ); return lockFuture.thenApply(transactionRef -> getOrCreateTransactionState(transactionRef); return kv.get(key); ); private void adjustServerTimestamp(long readTimestamp) this.timestamp = readTimestamp > this.timestamp ? readTimestamp:timestamp; public void put(TransactionRef txnId, String key, String value) timestamp = timestamp + 1; TransactionState transactionState = getOrCreateTransactionState(txnId); transactionState.addPendingUpdates(key, value);
class MvccTransactionalKVStore…
private long prepare(TransactionRef txn, Optional<Map<String, String>> pendingUpdates) throws WriteConflictException, IOException if (pendingUpdates.isPresent()) Map<String, String> pendingKVs = pendingUpdates.get(); acquireLocks(txn, pendingKVs); timestamp = timestamp + 1; //increment the timestamp for write operation. writeToWAL(txn, pendingKVs, timestamp); return timestamp;
class MvccTransactionCoordinator…
public long commit(TransactionRef txn) long commitTimestamp = prepare(txn); TransactionMetadata transactionMetadata = transactions.get(txn); transactionMetadata.markPreparedToCommit(commitTimestamp, this.transactionLog); sendCommitMessageToAllTheServers(txn, commitTimestamp, transactionMetadata.getParticipatingKeys()); transactionMetadata.markCommitComplete(transactionLog); return commitTimestamp; public long prepare(TransactionRef txn) throws WriteConflictException TransactionMetadata transactionMetadata = transactions.get(txn); Map<MvccTransactionalKVStore, List<String>> keysToServers = getParticipants(transactionMetadata.getParticipatingKeys()); List<Long> prepareTimestamps = new ArrayList<>(); for (MvccTransactionalKVStore store : keysToServers.keySet()) List<String> keys = keysToServers.get(store); long prepareTimestamp = store.prepare(txn, keys); prepareTimestamps.add(prepareTimestamp); return prepareTimestamps.stream().max(Long::compare).orElse(txn.getStartTimestamp());
All the participating cluster nodes then store the key-values at the
commit timestamp.
class MvccTransactionalKVStore…
public void commit(TransactionRef txn, List<String> keys, long commitTimestamp)
if (!lockManager.hasLocksFor(txn, keys))
throw new IllegalStateException("Transaction should hold all the required locks");
adjustServerTimestamp(commitTimestamp);
applyPendingOperations(txn, commitTimestamp);
lockManager.release(txn, keys);
logTransactionMarker(new TransactionMarker(txn, TransactionStatus.COMMITTED, commitTimestamp, keys, Collections.EMPTY_MAP));
private void applyPendingOperations(TransactionRef txnId, long commitTimestamp)
Optional<TransactionState> transactionState = getTransactionState(txnId);
if (transactionState.isPresent())
TransactionState t = transactionState.get();
Optional<Map<String, String>> pendingUpdates = t.getPendingUpdates();
apply(txnId, pendingUpdates, commitTimestamp);
private void apply(TransactionRef txnId, Optional<Map<String, String>> pendingUpdates, long commitTimestamp)
if (pendingUpdates.isPresent())
Map<String, String> pendingKv = pendingUpdates.get();
apply(pendingKv, commitTimestamp);
ongoingTransactions.remove(txnId);
private void apply(Map<String, String> pendingKv, long commitTimestamp)
for (String key : pendingKv.keySet())
String value = pendingKv.get(key);
kv.put(new VersionedKey(key, commitTimestamp), value);
Technical Considerations
There is another subtle issue to be tackled here.
Once a particular response is returned at a given timestamp,
no write should happen at a lower timestamp than the one received in
the read request.
This is achieved by different techniques.
Google Percolator and
datastores like TiKV inspired by
Percolator use a separate server called Timestamp oracle which is
guaranteed to give monotonic timestamps.
Databases like MongoDB or CockroachDB
use Hybrid Clock to
guarantee it because every request will adjust the hybrid clock
on each server to be the most up-todate. The timestamp is also
advanced monotonically with every write request.
Finally, the commit phase picks up the maximum timestamp across the set
of participating servers, making sure that the write will always
follow a previous read request.
It is important to note that, if the client is reading
at a timestamp value lower than the one at which server is writing to,
it is not an issue. But if the client is reading at a timestamp while the server
is about to write at a particular timestamp, then it is a problem. If servers
detect that a client is reading at a timestamp which the server might have
an in-flight writes (the ones which are only prepared), the servers reject
the write. CockroachDB throws error an if a read happens at
a timestamp for which there is an ongoing transaction.
Spanner reads have a phase where the client gets the
time of the last successful write on a particular partition. If a
client reads at a higher timestamp, the read requests wait till the writes
happen at that timestamp.
Using Replicated Log
To improve fault tolerance cluster nodes use Replicated Log.
The coordinator uses Replicated Log to store the transaction log entries.
Considering the example of Alice and Bob in the above section,
the Blue servers will be a group of servers, so are the Green servers.
All the booking data will be replicated across a set of servers.
Each request which is part of the two-phase commit goes to the leader
of the server group. The replication is implemented using
Replicated Log.
The client communicates with the leader of each server group.
The replication is necessary only when the client decides to commit the
transaction, so it happens as part of the prepare request.
The coordinator replicates every state change to replicated log as well.
In a distributed datastore, each cluster node handles multiple partitions.
A Replicated Log is maintained per partition.
When Raft is used as part of replication it’s sometimes
referred to as multi-raft.
Client communicates with the leader of each partition participating in
the transaction.
Failure Handling
Two-phase commit protocol heavily relies on the coordinator node
to communicate the outcome of the transaction.
Until the outcome of the transaction is known,
the individual cluster nodes cannot allow any other transactions
to write to the keys participating in the pending transaction.
The cluster nodes block until the outcome of the transaction is known.
This puts some critical requirements on the coordinator
The coordinator needs to remember the state of the transactions
even in case of a process crash.
Coordinator uses Write-Ahead Log to record every update
to the state of the transaction.
This way, when the coordinator crashes and comes back up,
it can continue to work on the transactions which are incomplete.
class TransactionCoordinator…
public void loadTransactionsFromWAL() throws IOException List<WALEntry> walEntries = this.transactionLog.readAll(); for (WALEntry walEntry : walEntries) TransactionMetadata txnMetadata = (TransactionMetadata) Command.deserialize(new ByteArrayInputStream(walEntry.getData())); transactions.put(txnMetadata.getTxn(), txnMetadata); startTransactionTimeoutScheduler(); completePreparedTransactions(); private void completePreparedTransactions() throws IOException List<Map.Entry<TransactionRef, TransactionMetadata>> preparedTransactions = transactions.entrySet().stream().filter(entry -> entry.getValue().isPrepared()).collect(Collectors.toList()); for (Map.Entry<TransactionRef, TransactionMetadata> preparedTransaction : preparedTransactions) TransactionMetadata txnMetadata = preparedTransaction.getValue(); sendCommitMessageToParticipants(txnMetadata.getTxn());
The client can fail before sending the commit message to the coordinator.
The transaction coordinator tracks when each transaction state was updated.
If no state update is received in a timeout period, which is configured,
it triggers a transaction rollback.
class TransactionCoordinator…
private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1); private ScheduledFuture<?> taskFuture; private long transactionTimeoutMs = Long.MAX_VALUE; //for now. public void startTransactionTimeoutScheduler() taskFuture = scheduler.scheduleAtFixedRate(() -> timeoutTransactions(), transactionTimeoutMs, transactionTimeoutMs, TimeUnit.MILLISECONDS); private void timeoutTransactions() for (TransactionRef txnRef : transactions.keySet()) TransactionMetadata transactionMetadata = transactions.get(txnRef); long now = systemClock.nanoTime(); if (transactionMetadata.hasTimedOut(now)) sendRollbackMessageToParticipants(transactionMetadata.getTxn()); transactionMetadata.markRollbackComplete(transactionLog);
Transactions across heterogenous systems
The solution outlined here demonstrates the two-phase commit implementation
in a homogenous system. Homogenous meaning all the cluster nodes are part
of the same system and store same kind of data. For example
a distributed data store like MongoDb or a distributed message broker
like Kafka.
Historically, two-phase commit was mostly discussed in the context of
heterogeneous systems. Most common usage of two-phase commit was
with [XA] transactions. In the J2EE servers, it is very
common to use two-phase commit across a message broker and a database.
The most common usage pattern is when a message needs to be produced
on a message broker like ActiveMQ or JMS and a record needs to be
inserted/updated in a database.
As seen in the above sections, the fault tolerance of the coordinator
plays a critical role in two-phase commit implementation. In case of XA
transactions the coordinator is mostly the application process making
the database and message broker calls. The application in most modern
scenarios is a stateless microservice which is running in a containerized
environment. It is not really a suitable place to put the responsibility
of the coordinator. The coordinator needs to maintain state and recover
quickly from failures to commit or rollback, which is difficult to
implement in this case.
This is the reason that while XA transactions seem so attractive, they
often run into issues
in practice and are avoided. In the microservices
world, patterns like [transactional-outbox] are preferred over
XA transactions.
On the other hand most distributed storage systems implement
two-phase commit across a set of partitions, and it works well in practice.