Solution
Combine multiple requests together into a single request batch.
The batch of the request will be sent to the cluster node for processing.
with each request processed in exactly the same manner as
an individual request. It will then respond with the batch of the responses.
As an example, consider a distributed key-value store,
where the client sends requests to store
multiple key-values on the server.
When the client receives a call to send the request, it does not immediately
send it over the network; instead, it keeps a queue of requests to be sent.
class Client…
LinkedBlockingQueue<RequestEntry> requests = new LinkedBlockingQueue<>(); public CompletableFuture send(SetValueRequest setValueRequest) int requestId = enqueueRequest(setValueRequest); CompletableFuture responseFuture = trackPendingRequest(requestId); return responseFuture; private int enqueueRequest(SetValueRequest setValueRequest) int requestId = nextRequestId(); byte[] requestBytes = serialize(setValueRequest, requestId); requests.add(new RequestEntry(requestBytes, clock.nanoTime())); return requestId; private int nextRequestId() return requestNumber++;
The time at which the request is enqued is tracked; this is later used
to decide if the request can be sent as part of the batch.
class RequestEntry…
class RequestEntry { byte[] serializedRequest; long createdTime; public RequestEntry(byte[] serializedRequest, long createdTime) this.serializedRequest = serializedRequest; this.createdTime = createdTime;
It then tracks the pending requests to be completed when a
response is received. Each request will be assigned a unique request number
which can be used to map the response and complete the requests.
class Client…
Map<Integer, CompletableFuture> pendingRequests = new ConcurrentHashMap<>(); private CompletableFuture trackPendingRequest(Integer correlationId) CompletableFuture responseFuture = new CompletableFuture(); pendingRequests.put(correlationId, responseFuture); return responseFuture;
The client starts a separate task which continuously tracks the
queued requests.
class Client…
public Client(Config config, InetAddressAndPort serverAddress, SystemClock clock) this.clock = clock; this.sender = new Sender(config, serverAddress, clock); this.sender.start();
class Sender…
@Override public void run() { while (isRunning) } private RequestBatch createBatch(LinkedBlockingQueue<RequestEntry> requests) RequestBatch batch = new RequestBatch(MAX_BATCH_SIZE_BYTES); RequestEntry entry = requests.peek(); while (entry != null && batch.hasSpaceFor(entry.getRequest())) batch.add(entry.getRequest()); requests.remove(entry); entry = requests.peek(); return batch;
class RequestBatch…
public boolean hasSpaceFor(byte[] requestBytes) return batchSize() + requestBytes.length <= maxSize; private int batchSize() return requests.stream().map(r->r.length).reduce(0, Integer::sum);
There are two checks which are generally done.
- If enough requests have accumulated to fill the batch to the maximum
configured size.
class Sender…
private boolean maxBatchSizeReached(Queue<RequestEntry> requests) return accumulatedRequestSize(requests) > MAX_BATCH_SIZE_BYTES; private int accumulatedRequestSize(Queue<RequestEntry> requests) return requests.stream().map(re -> re.size()).reduce((r1, r2) -> r1 + r2).orElse(0);
we can configure a small amount of wait time. The sender task waits
and then checks if the request has been added before the
maximum wait time.
class Sender…
private boolean requestsWaitedFor(long batchingWindowInMs) RequestEntry oldestPendingRequest = requests.peek(); if (oldestPendingRequest == null) return false; long oldestEntryWaitTime = clock.nanoTime() - oldestPendingRequest.createdTime; return oldestEntryWaitTime > batchingWindowInMs;
Once any of these conditions has been fulfilled the batch request can
then be sent to the server.
The server unpacks the batch request, and processes each of the
individual requests.
class Server…
private void handleBatchRequest(RequestOrResponse batchRequest, ClientConnection clientConnection) RequestBatch batch = JsonSerDes.deserialize(batchRequest.getMessageBodyJson(), RequestBatch.class); List<RequestOrResponse> requests = batch.getPackedRequests(); List<RequestOrResponse> responses = new ArrayList<>(); for (RequestOrResponse request : requests) RequestOrResponse response = handleSetValueRequest(request); responses.add(response); sendResponse(batchRequest, clientConnection, new BatchResponse(responses)); private RequestOrResponse handleSetValueRequest(RequestOrResponse request) SetValueRequest setValueRequest = JsonSerDes.deserialize(request.getMessageBodyJson(), SetValueRequest.class); kv.put(setValueRequest.getKey(), setValueRequest.getValue()); RequestOrResponse response = new RequestOrResponse(RequestId.SetValueResponse.getId(), "Success".getBytes(), request.getCorrelationId()); return response;
The client receives the batch response and completes all the pending requests.
class Sender…
private void handleResponse(BatchResponse batchResponse) List<RequestOrResponse> responseList = batchResponse.getResponseList(); logger.debug("Completing requests from " + responseList.get(0).getCorrelationId() + " to " + responseList.get(responseList.size() - 1).getCorrelationId()); responseList.stream().forEach(r -> CompletableFuture completableFuture = pendingRequests.remove(r.getCorrelationId()); if (completableFuture != null) completableFuture.complete(r); else logger.error("no pending request for " + r.getCorrelationId()); );
Technical Considerations
The batch size should be chosen based on the size of
individual messages and available network bandwidth as well
as the observed latency and throughput improvements based on the
real life load. These are configured to some sensible defaults assuming
smaller message sizes and the optimal batch size for
server side processing. For example, Kafka has a
default batch size of 16Kb. It also has a configuration parameter called
“linger.ms” with the default value of 0.
However if the size of the messages are bigger a higher batch size might work better.
Having too large a batch size will likely only offer diminishing returns.
For example
having a batch size in MBs can add further overheads in terms of
processing. This is why the batch size parameter
is typically tuned according to observations made through
performance testing.
A request batch is generally used along with Request Pipeline
to improve overall throughput and latency.
When the retry-backoff policy is used to send requests to cluster nodes,
the entire batch request will be retried. The cluster node might have processed
part of the batch already; so to ensure the retry works without
any issues, you should implement Idempotent Receiver.