Notice: Function _load_textdomain_just_in_time was called incorrectly. Translation loading for the jetpack domain was triggered too early. This is usually an indicator for some code in the plugin or theme running too early. Translations should be loaded at the init action or later. Please see Debugging in WordPress for more information. (This message was added in version 6.7.0.) in /home/feedavenue.com/public_html/wp-includes/functions.php on line 6114
Request Batch - Feedavenue
Thursday, January 9, 2025

Request Batch

Date:

Related stories

spot_imgspot_img


Solution

Combine multiple requests together into a single request batch.
The batch of the request will be sent to the cluster node for processing.
with each request processed in exactly the same manner as
an individual request. It will then respond with the batch of the responses.

As an example, consider a distributed key-value store,
where the client sends requests to store
multiple key-values on the server.
When the client receives a call to send the request, it does not immediately
send it over the network; instead, it keeps a queue of requests to be sent.

class Client…

  LinkedBlockingQueue<RequestEntry> requests = new LinkedBlockingQueue<>();

  public CompletableFuture send(SetValueRequest setValueRequest) 
      int requestId = enqueueRequest(setValueRequest);
      CompletableFuture responseFuture = trackPendingRequest(requestId);
      return responseFuture;
  

  private int enqueueRequest(SetValueRequest setValueRequest) 
      int requestId = nextRequestId();
      byte[] requestBytes = serialize(setValueRequest, requestId);
      requests.add(new RequestEntry(requestBytes, clock.nanoTime()));
      return requestId;
  
  private int nextRequestId() 
      return requestNumber++;
  

The time at which the request is enqued is tracked; this is later used
to decide if the request can be sent as part of the batch.

class RequestEntry…

  class RequestEntry {
      byte[] serializedRequest;
      long createdTime;
  
      public RequestEntry(byte[] serializedRequest, long createdTime) 
          this.serializedRequest = serializedRequest;
          this.createdTime = createdTime;
      

It then tracks the pending requests to be completed when a
response is received. Each request will be assigned a unique request number
which can be used to map the response and complete the requests.

class Client…

  Map<Integer, CompletableFuture> pendingRequests = new ConcurrentHashMap<>();

  private CompletableFuture trackPendingRequest(Integer correlationId) 
      CompletableFuture responseFuture = new CompletableFuture();
      pendingRequests.put(correlationId, responseFuture);
      return responseFuture;
  

The client starts a separate task which continuously tracks the
queued requests.

class Client…

  public Client(Config config, InetAddressAndPort serverAddress, SystemClock clock) 
      this.clock = clock;
      this.sender = new Sender(config, serverAddress, clock);
      this.sender.start();
  

class Sender…

  @Override
  public void run() {
      while (isRunning) 
  }

  private RequestBatch createBatch(LinkedBlockingQueue<RequestEntry> requests) 
      RequestBatch batch = new RequestBatch(MAX_BATCH_SIZE_BYTES);
      RequestEntry entry = requests.peek();
      while (entry != null && batch.hasSpaceFor(entry.getRequest())) 
          batch.add(entry.getRequest());
          requests.remove(entry);
          entry = requests.peek();
      
      return batch;
  

class RequestBatch…

  public boolean hasSpaceFor(byte[] requestBytes) 
      return batchSize() + requestBytes.length <= maxSize;
  
  private int batchSize() 
      return requests.stream().map(r->r.length).reduce(0, Integer::sum);
  

There are two checks which are generally done.

  • If enough requests have accumulated to fill the batch to the maximum
    configured size.
  • class Sender…

  private boolean maxBatchSizeReached(Queue<RequestEntry> requests) 
      return accumulatedRequestSize(requests) > MAX_BATCH_SIZE_BYTES;
  

  private int accumulatedRequestSize(Queue<RequestEntry> requests) 
      return requests.stream().map(re -> re.size()).reduce((r1, r2) -> r1 + r2).orElse(0);
  
  • Because we cannot wait forever for the batch to be filled in,
    we can configure a small amount of wait time. The sender task waits
    and then checks if the request has been added before the
    maximum wait time.
  • class Sender…

      private boolean requestsWaitedFor(long batchingWindowInMs) 
          RequestEntry oldestPendingRequest = requests.peek();
          if (oldestPendingRequest == null) 
              return false;
          
          long oldestEntryWaitTime = clock.nanoTime() - oldestPendingRequest.createdTime;
          return oldestEntryWaitTime > batchingWindowInMs;
      

    Once any of these conditions has been fulfilled the batch request can
    then be sent to the server.
    The server unpacks the batch request, and processes each of the
    individual requests.

    class Server…

      private void handleBatchRequest(RequestOrResponse batchRequest, ClientConnection clientConnection) 
          RequestBatch batch = JsonSerDes.deserialize(batchRequest.getMessageBodyJson(), RequestBatch.class);
          List<RequestOrResponse> requests = batch.getPackedRequests();
          List<RequestOrResponse> responses = new ArrayList<>();
          for (RequestOrResponse request : requests) 
              RequestOrResponse response = handleSetValueRequest(request);
              responses.add(response);
          
          sendResponse(batchRequest, clientConnection, new BatchResponse(responses));
      
    
      private RequestOrResponse handleSetValueRequest(RequestOrResponse request) 
          SetValueRequest setValueRequest = JsonSerDes.deserialize(request.getMessageBodyJson(), SetValueRequest.class);
          kv.put(setValueRequest.getKey(), setValueRequest.getValue());
          RequestOrResponse response = new RequestOrResponse(RequestId.SetValueResponse.getId(), "Success".getBytes(), request.getCorrelationId());
          return response;
      

    The client receives the batch response and completes all the pending requests.

    class Sender…

      private void handleResponse(BatchResponse batchResponse) 
          List<RequestOrResponse> responseList = batchResponse.getResponseList();
          logger.debug("Completing requests from " + responseList.get(0).getCorrelationId() + " to " + responseList.get(responseList.size() - 1).getCorrelationId());
          responseList.stream().forEach(r -> 
              CompletableFuture completableFuture = pendingRequests.remove(r.getCorrelationId());
              if (completableFuture != null) 
                  completableFuture.complete(r);
               else 
                  logger.error("no pending request for " + r.getCorrelationId());
              
          );
      

    Technical Considerations

    The batch size should be chosen based on the size of
    individual messages and available network bandwidth as well
    as the observed latency and throughput improvements based on the
    real life load. These are configured to some sensible defaults assuming
    smaller message sizes and the optimal batch size for
    server side processing. For example, Kafka has a
    default batch size of 16Kb. It also has a configuration parameter called
    “linger.ms” with the default value of 0.
    However if the size of the messages are bigger a higher batch size might work better.

    Having too large a batch size will likely only offer diminishing returns.
    For example
    having a batch size in MBs can add further overheads in terms of
    processing. This is why the batch size parameter
    is typically tuned according to observations made through
    performance testing.

    A request batch is generally used along with Request Pipeline
    to improve overall throughput and latency.

    When the retry-backoff policy is used to send requests to cluster nodes,
    the entire batch request will be retried. The cluster node might have processed
    part of the batch already; so to ensure the retry works without
    any issues, you should implement Idempotent Receiver.



    Source link

    Latest stories

    spot_img