I often meet with the task of transferring state between various successive calls, which are solved by a simple iteration in synchronous execution and which make it very painful in asynchronous. For example, in the case of writing an API client, it may be necessary to pass on what kind of account is an attempt to execute a request, in order to break off vain attempts and simply return an exception if the limit is exceeded. The synchronous version may look like this (the example is simplified):

HttpResponse execute(HttpRequest request) throws TimeoutException { Stopwatch timer = Stopwatch.createStarted(); for (int i = 0; i < MAX_RETRIES; i++) { if (timer.elapsed(TimeUnit.MILLISECONDS) > REQUEST_TIMEOUT) { throw new TimeoutException(); } try { return tryExecute(request); } catch (ConnectionException e) { // cycle one more time } } throw new UnreachableHostException(); } 

In the case of asynchronous pain begins: one way or another, the attempt number and the execution timer must fall into the method body. Here I see two solutions (and they are both so-so ):

  1. Make a separate private state class in which to transfer everything you need, and update / re-create it for each call:

     CompletableFuture<HttpResponse> execute(HttpRequest request, State state) { if (state.getTimer().elapsed(TimeUnit.MILLISECONDS) > REQUEST_TIMEOUT) { CompletableFuture<HttpResponse> synchronizer = new CompletableFuture<>(); synchronizer.completeExceptionally(new TimeoutException()); return synchronizer; } if (state.getAttempt() >= MAX_ATTEMPTS) { CompletableFuture<HttpResponse> synchronizer = new CompletableFuture<>(); synchronizer.completeExceptionally(new UnreachableHostException()); return synchronizer; } return tryExecute(request) // о да, мы идем прямо в ад. прошу не обращать // внимание на саму сложность конструкции .handle((result, throwable) -> { if (result != null) { return CompletableFuture.completedFuture(result); } if (throwable instanceof ConnectionException) { state.setAttempt(state.getAttempt() + 1); return execute(request, state); } CompletableFuture synchronizer = new CompletableFuture<>(); synchronizer.completeExceptionally(throwable); return synchronizer; }) .thenCompose(f -> f); } 
  2. Make a separate private one-time request class in which to place the method itself and the required state in the form of fields. It will look about the same (only calls instead of the state will go to this ), so I will not give the code itself.

Both approaches look awful, require an insane amount of code, and don't impress me. The fact that the iterative version looks so simple tells me that I just didn’t get very well into the programming paradigm and misunderstand something. How to correctly convey the state in such calls?

  • one
    If you understand the task correctly, then bring all the logic with repetitions / caching to the manager, which maintains the queue, starts / restarts requests if necessary. And to the request itself information about how many times it has been launched to anything ... - Yura Ivanov
  • @YuraIvanov honestly, I don’t understand very well how a manager should look like in that case. He should keep the map <request, contest>, which you constantly pull? - etki
  • 3
    Yeah, like that. In fact, you should have someone who runs this execute and someone who already holds state for each request, let it be a manager. for example, a runnable arrives at the input, the manager runs a task in executor, receives a response, positive or exception, repeats the request if necessary ... For example, this is how volley works: caches requests, retry, one callback comes out. - Yura Ivanov
  • @etki Do not consider the possibility of using the model of actors? Very convenient model of a multithreading, easily keeps within in the head and random access memory. For example, in the Akka framework, which implements the model of actors, timeouts, retracts, attempt counters, and an exponential foldback are very easy and organic. - iTollu
  • @iTollu in the standard library? - etki

1 answer 1

It is necessary to transfer such logic to the manager, which will keep the state.

Java7 / 8 example : AsyncRetryExecutor based on ScheduledExecutorService

 ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); RetryExecutor executor = new AsyncRetryExecutor(scheduler) .retryOn(ConnectionException.class) .withExponentialBackoff(500, 2) //500ms х2 каждую попытку .withMaxDelay(10_000) //10 сек макс. .withUniformJitter() // +/- 100ms случайно .withMaxRetries(10); final CompletableFuture<HttpResponse> future = executor.getWithRetry(() -> new HttpResponse(...) ); future.thenAccept(response -> System.out.println("response received") ); 
  • Ok, and what is the device inside the manager? - etki