< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java

Print this page




   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;

  29 import java.time.Duration;
  30 import java.util.List;
  31 import java.security.AccessControlContext;
  32 import java.security.AccessController;
  33 import java.util.concurrent.CompletableFuture;
  34 import java.util.concurrent.CompletionException;
  35 import java.util.concurrent.ExecutionException;
  36 import java.util.function.BiFunction;
  37 import java.util.concurrent.Executor;
  38 import java.util.function.UnaryOperator;
  39 

  40 import jdk.incubator.http.internal.common.Log;
  41 import jdk.incubator.http.internal.common.MinimalFuture;
  42 import jdk.incubator.http.internal.common.Pair;
  43 import jdk.incubator.http.internal.common.Utils;
  44 import static jdk.incubator.http.internal.common.Pair.pair;

  45 
  46 /**
  47  * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
  48  * - manages filters
  49  * - retries due to filters.
  50  * - I/O errors and most other exceptions get returned directly to user
  51  *
  52  * Creates a new Exchange for each request/response interaction
  53  */
  54 class MultiExchange<U,T> {
  55 




  56     private final HttpRequest userRequest; // the user request
  57     private final HttpRequestImpl request; // a copy of the user request
  58     final AccessControlContext acc;
  59     final HttpClientImpl client;
  60     final HttpResponse.BodyHandler<T> responseHandler;
  61     final ExecutorWrapper execWrapper;
  62     final Executor executor;
  63     final HttpResponse.MultiProcessor<U,T> multiResponseHandler;

  64     HttpRequestImpl currentreq; // used for async only
  65     Exchange<T> exchange; // the current exchange
  66     Exchange<T> previous;
  67     int attempts;



  68     // Maximum number of times a request will be retried/redirected
  69     // for any reason
  70 
  71     static final int DEFAULT_MAX_ATTEMPTS = 5;
  72     static final int max_attempts = Utils.getIntegerNetProperty(
  73             "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
  74     );
  75 
  76     private final List<HeaderFilter> filters;
  77     TimedEvent timedEvent;
  78     volatile boolean cancelled;
  79     final PushGroup<U,T> pushGroup;
  80 
  81     /**
  82      * Filter fields. These are attached as required by filters
  83      * and only used by the filter implementations. This could be
  84      * generalised into Objects that are passed explicitly to the filters
  85      * (one per MultiExchange object, and one per Exchange object possibly)
  86      */
  87     volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
  88     // RedirectHandler
  89     volatile int numberOfRedirects = 0;
  90 
  91     /**
  92      * MultiExchange with one final response.
  93      */
  94     MultiExchange(HttpRequest req,

  95                   HttpClientImpl client,
  96                   HttpResponse.BodyHandler<T> responseHandler) {

  97         this.previous = null;
  98         this.userRequest = req;
  99         this.request = new HttpRequestImpl(req);
 100         this.currentreq = request;
 101         this.attempts = 0;
 102         this.client = client;
 103         this.filters = client.filterChain();
 104         if (System.getSecurityManager() != null) {
 105             this.acc = AccessController.getContext();
 106         } else {
 107             this.acc = null;
 108         }
 109         this.execWrapper = new ExecutorWrapper(client.executor(), acc);
 110         this.executor = execWrapper.executor();
 111         this.responseHandler = responseHandler;





 112         this.exchange = new Exchange<>(request, this);
 113         this.multiResponseHandler = null;
 114         this.pushGroup = null;
 115     }
 116 
 117     /**
 118      * MultiExchange with multiple responses (HTTP/2 server pushes).
 119      */
 120     MultiExchange(HttpRequest req,

 121                   HttpClientImpl client,
 122                   HttpResponse.MultiProcessor<U, T> multiResponseHandler) {

 123         this.previous = null;
 124         this.userRequest = req;
 125         this.request = new HttpRequestImpl(req);
 126         this.currentreq = request;
 127         this.attempts = 0;
 128         this.client = client;
 129         this.filters = client.filterChain();
 130         if (System.getSecurityManager() != null) {
 131             this.acc = AccessController.getContext();
 132         } else {
 133             this.acc = null;
 134         }
 135         this.execWrapper = new ExecutorWrapper(client.executor(), acc);
 136         this.executor = execWrapper.executor();
 137         this.multiResponseHandler = multiResponseHandler;
 138         this.pushGroup = new PushGroup<>(multiResponseHandler, request);
 139         this.exchange = new Exchange<>(request, this);
 140         this.responseHandler = pushGroup.mainResponseHandler();
 141     }
 142 
 143     public HttpResponseImpl<T> response() throws IOException, InterruptedException {
 144         HttpRequestImpl r = request;
 145         if (r.duration() != null) {
 146             timedEvent = new TimedEvent(r.duration());
 147             client.registerTimer(timedEvent);
 148         }
 149         while (attempts < max_attempts) {
 150             try {
 151                 attempts++;
 152                 Exchange<T> currExchange = getExchange();
 153                 requestFilters(r);
 154                 Response response = currExchange.response();
 155                 HttpRequestImpl newreq = responseFilters(response);
 156                 if (newreq == null) {
 157                     if (attempts > 1) {
 158                         Log.logError("Succeeded on attempt: " + attempts);
 159                     }
 160                     T body = currExchange.readBody(responseHandler);
 161                     cancelTimer();
 162                     return new HttpResponseImpl<>(userRequest, response, body, currExchange);
 163                 }
 164                 //response.body(HttpResponse.ignoreBody());
 165                 setExchange(new Exchange<>(newreq, this, acc));
 166                 r = newreq;
 167             } catch (IOException e) {
 168                 if (cancelled) {
 169                     throw new HttpTimeoutException("Request timed out");
 170                 }
 171                 throw e;
 172             }
 173         }
 174         cancelTimer();
 175         throw new IOException("Retry limit exceeded");
 176     }
 177 
 178     CompletableFuture<Void> multiCompletionCF() {
 179         return pushGroup.groupResult();
 180     }
 181 
 182     private synchronized Exchange<T> getExchange() {
 183         return exchange;
 184     }
 185 
 186     HttpClientImpl client() {
 187         return client;
 188     }
 189 
 190     HttpClient.Redirect followRedirects() {
 191         return client.followRedirects();
 192     }
 193 
 194     HttpClient.Version version() {
 195         return request.version().orElse(client.version());
 196     }
 197 
 198     private synchronized void setExchange(Exchange<T> exchange) {



 199         this.exchange = exchange;
 200     }
 201 
 202     private void cancelTimer() {
 203         if (timedEvent != null) {
 204             client.cancelTimer(timedEvent);
 205         }
 206     }
 207 
 208     private void requestFilters(HttpRequestImpl r) throws IOException {
 209         Log.logTrace("Applying request filters");
 210         for (HeaderFilter filter : filters) {
 211             Log.logTrace("Applying {0}", filter);
 212             filter.request(r, this);
 213         }
 214         Log.logTrace("All filters applied");
 215     }
 216 
 217     private HttpRequestImpl responseFilters(Response response) throws IOException
 218     {
 219         Log.logTrace("Applying response filters");
 220         for (HeaderFilter filter : filters) {
 221             Log.logTrace("Applying {0}", filter);
 222             HttpRequestImpl newreq = filter.response(response);
 223             if (newreq != null) {
 224                 Log.logTrace("New request: stopping filters");
 225                 return newreq;
 226             }
 227         }
 228         Log.logTrace("All filters applied");
 229         return null;
 230     }
 231 
 232     public void cancel() {
 233         cancelled = true;
 234         getExchange().cancel();
 235     }
 236 
 237     public void cancel(IOException cause) {
 238         cancelled = true;
 239         getExchange().cancel(cause);
 240     }
 241 
 242     public CompletableFuture<HttpResponseImpl<T>> responseAsync() {
 243         CompletableFuture<Void> start = new MinimalFuture<>();
 244         CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
 245         start.completeAsync( () -> null, executor); // trigger execution
 246         return cf;
 247     }
 248 
 249     private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) {

 250         return start.thenCompose( v -> responseAsyncImpl())
 251             .thenCompose((Response r) -> {
 252                 Exchange<T> exch = getExchange();
 253                 return exch.readBodyAsync(responseHandler)
 254                         .thenApply((T body) ->  new HttpResponseImpl<>(userRequest, r, body, exch));




 255             });
 256     }
 257 
 258     CompletableFuture<U> multiResponseAsync() {
 259         CompletableFuture<Void> start = new MinimalFuture<>();
 260         CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
 261         CompletableFuture<HttpResponse<T>> mainResponse =
 262                 cf.thenApply((HttpResponseImpl<T> b) -> {
 263                       multiResponseHandler.onResponse(b);
 264                       return (HttpResponse<T>)b;
 265                    });
 266 
 267         pushGroup.setMainResponse(mainResponse);
 268         // set up house-keeping related to multi-response
 269         mainResponse.thenAccept((r) -> {
 270             // All push promises received by now.
 271             pushGroup.noMorePushes(true);
 272         });
 273         CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());


 274         start.completeAsync( () -> null, executor); // trigger execution
 275         return res;
 276     }
 277 
 278     private CompletableFuture<Response> responseAsyncImpl() {
 279         CompletableFuture<Response> cf;
 280         if (++attempts > max_attempts) {
 281             cf = MinimalFuture.failedFuture(new IOException("Too many retries"));
 282         } else {
 283             if (currentreq.duration() != null) {
 284                 timedEvent = new TimedEvent(currentreq.duration());
 285                 client.registerTimer(timedEvent);
 286             }
 287             try {
 288                 // 1. Apply request filters
 289                 requestFilters(currentreq);
 290             } catch (IOException e) {
 291                 return MinimalFuture.failedFuture(e);
 292             }
 293             Exchange<T> exch = getExchange();
 294             // 2. get response
 295             cf = exch.responseAsync()
 296                 .thenCompose((Response response) -> {
 297                     HttpRequestImpl newrequest = null;
 298                     try {
 299                         // 3. Apply response filters
 300                         newrequest = responseFilters(response);
 301                     } catch (IOException e) {
 302                         return MinimalFuture.failedFuture(e);
 303                     }
 304                     // 4. Check filter result and repeat or continue
 305                     if (newrequest == null) {
 306                         if (attempts > 1) {
 307                             Log.logError("Succeeded on attempt: " + attempts);
 308                         }
 309                         return MinimalFuture.completedFuture(response);
 310                     } else {




 311                         currentreq = newrequest;

 312                         setExchange(new Exchange<>(currentreq, this, acc));
 313                         //reads body off previous, and then waits for next response
 314                         return responseAsyncImpl();
 315                     }
 316                 })
 317             // 5. Handle errors and cancel any timer set
 318             .handle((response, ex) -> {

 319                 cancelTimer();
 320                 if (ex == null) {
 321                     assert response != null;
 322                     return MinimalFuture.completedFuture(response);
 323                 }
 324                 // all exceptions thrown are handled here
 325                 CompletableFuture<Response> error = getExceptionalCF(ex);
 326                 if (error == null) {
 327                     return responseAsyncImpl();
 328                 } else {
 329                     return error;
 330                 }
 331             })
 332             .thenCompose(UnaryOperator.identity());
 333         }
 334         return cf;
 335     }
 336 
 337     /**
 338      * Take a Throwable and return a suitable CompletableFuture that is
 339      * completed exceptionally.
 340      */
 341     private CompletableFuture<Response> getExceptionalCF(Throwable t) {
 342         if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
 343             if (t.getCause() != null) {
 344                 t = t.getCause();
 345             }
 346         }
 347         if (cancelled && t instanceof IOException) {
 348             t = new HttpTimeoutException("request timed out");
















 349         }
 350         return MinimalFuture.failedFuture(t);
 351     }
 352 
 353     class TimedEvent extends TimeoutEvent {
 354         TimedEvent(Duration duration) {
 355             super(duration);
 356         }
 357         @Override
 358         public void handle() {



 359             cancel(new HttpTimeoutException("request timed out"));
 360         }
 361     }
 362 }


   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.time.Duration;
  31 import java.util.List;
  32 import java.security.AccessControlContext;

  33 import java.util.concurrent.CompletableFuture;
  34 import java.util.concurrent.CompletionException;
  35 import java.util.concurrent.ExecutionException;

  36 import java.util.concurrent.Executor;
  37 import java.util.concurrent.atomic.AtomicInteger;
  38 import java.util.function.Function;
  39 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
  40 import jdk.incubator.http.internal.common.Log;
  41 import jdk.incubator.http.internal.common.MinimalFuture;
  42 import jdk.incubator.http.internal.common.ConnectionExpiredException;
  43 import jdk.incubator.http.internal.common.Utils;
  44 import static jdk.incubator.http.internal.common.MinimalFuture.completedFuture;
  45 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
  46 
  47 /**
  48  * Encapsulates multiple Exchanges belonging to one HttpRequestImpl.
  49  * - manages filters
  50  * - retries due to filters.
  51  * - I/O errors and most other exceptions get returned directly to user
  52  *
  53  * Creates a new Exchange for each request/response interaction
  54  */
  55 class MultiExchange<U,T> {
  56 
  57     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  58     static final System.Logger DEBUG_LOGGER =
  59             Utils.getDebugLogger("MultiExchange"::toString, DEBUG);
  60 
  61     private final HttpRequest userRequest; // the user request
  62     private final HttpRequestImpl request; // a copy of the user request
  63     final AccessControlContext acc;
  64     final HttpClientImpl client;
  65     final HttpResponse.BodyHandler<T> responseHandler;

  66     final Executor executor;
  67     final HttpResponse.MultiSubscriber<U,T> multiResponseSubscriber;
  68     final AtomicInteger attempts = new AtomicInteger();
  69     HttpRequestImpl currentreq; // used for async only
  70     Exchange<T> exchange; // the current exchange
  71     Exchange<T> previous;
  72     volatile Throwable retryCause;
  73     volatile boolean expiredOnce;
  74     volatile HttpResponse<T> response = null;
  75 
  76     // Maximum number of times a request will be retried/redirected
  77     // for any reason
  78 
  79     static final int DEFAULT_MAX_ATTEMPTS = 5;
  80     static final int max_attempts = Utils.getIntegerNetProperty(
  81             "jdk.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
  82     );
  83 
  84     private final List<HeaderFilter> filters;
  85     TimedEvent timedEvent;
  86     volatile boolean cancelled;
  87     final PushGroup<U,T> pushGroup;
  88 
  89     /**
  90      * Filter fields. These are attached as required by filters
  91      * and only used by the filter implementations. This could be
  92      * generalised into Objects that are passed explicitly to the filters
  93      * (one per MultiExchange object, and one per Exchange object possibly)
  94      */
  95     volatile AuthenticationFilter.AuthInfo serverauth, proxyauth;
  96     // RedirectHandler
  97     volatile int numberOfRedirects = 0;
  98 
  99     /**
 100      * MultiExchange with one final response.
 101      */
 102     MultiExchange(HttpRequest userRequest,
 103                   HttpRequestImpl requestImpl,
 104                   HttpClientImpl client,
 105                   HttpResponse.BodyHandler<T> responseHandler,
 106                   AccessControlContext acc) {
 107         this.previous = null;
 108         this.userRequest = userRequest;
 109         this.request = requestImpl;
 110         this.currentreq = request;

 111         this.client = client;
 112         this.filters = client.filterChain();
 113         this.acc = acc;
 114         this.executor = client.theExecutor();





 115         this.responseHandler = responseHandler;
 116         if (acc != null) {
 117             // Restricts the file publisher with the senders ACC, if any
 118             if (responseHandler instanceof UntrustedBodyHandler)
 119                 ((UntrustedBodyHandler)this.responseHandler).setAccessControlContext(acc);
 120         }
 121         this.exchange = new Exchange<>(request, this);
 122         this.multiResponseSubscriber = null;
 123         this.pushGroup = null;
 124     }
 125 
 126     /**
 127      * MultiExchange with multiple responses (HTTP/2 server pushes).
 128      */
 129     MultiExchange(HttpRequest userRequest,
 130                   HttpRequestImpl requestImpl,
 131                   HttpClientImpl client,
 132                   HttpResponse.MultiSubscriber<U, T> multiResponseSubscriber,
 133                   AccessControlContext acc) {
 134         this.previous = null;
 135         this.userRequest = userRequest;
 136         this.request = requestImpl;
 137         this.currentreq = request;

 138         this.client = client;
 139         this.filters = client.filterChain();
 140         this.acc = acc;
 141         this.executor = client.theExecutor();
 142         this.multiResponseSubscriber = multiResponseSubscriber;
 143         this.pushGroup = new PushGroup<>(multiResponseSubscriber, request, acc);





 144         this.exchange = new Exchange<>(request, this);
 145         this.responseHandler = pushGroup.mainResponseHandler();
 146     }
 147 
 148 //    CompletableFuture<Void> multiCompletionCF() {
 149 //        return pushGroup.groupResult();
 150 //    }



































 151 
 152     private synchronized Exchange<T> getExchange() {
 153         return exchange;
 154     }
 155 
 156     HttpClientImpl client() {
 157         return client;
 158     }
 159 
 160 //    HttpClient.Redirect followRedirects() {
 161 //        return client.followRedirects();
 162 //    }
 163 
 164     HttpClient.Version version() {
 165         return request.version().orElse(client.version());
 166     }
 167 
 168     private synchronized void setExchange(Exchange<T> exchange) {
 169         if (this.exchange != null && exchange != this.exchange) {
 170             this.exchange.released();
 171         }
 172         this.exchange = exchange;
 173     }
 174 
 175     private void cancelTimer() {
 176         if (timedEvent != null) {
 177             client.cancelTimer(timedEvent);
 178         }
 179     }
 180 
 181     private void requestFilters(HttpRequestImpl r) throws IOException {
 182         Log.logTrace("Applying request filters");
 183         for (HeaderFilter filter : filters) {
 184             Log.logTrace("Applying {0}", filter);
 185             filter.request(r, this);
 186         }
 187         Log.logTrace("All filters applied");
 188     }
 189 
 190     private HttpRequestImpl responseFilters(Response response) throws IOException
 191     {
 192         Log.logTrace("Applying response filters");
 193         for (HeaderFilter filter : filters) {
 194             Log.logTrace("Applying {0}", filter);
 195             HttpRequestImpl newreq = filter.response(response);
 196             if (newreq != null) {
 197                 Log.logTrace("New request: stopping filters");
 198                 return newreq;
 199             }
 200         }
 201         Log.logTrace("All filters applied");
 202         return null;
 203     }
 204 
 205 //    public void cancel() {
 206 //        cancelled = true;
 207 //        getExchange().cancel();
 208 //    }
 209 
 210     public void cancel(IOException cause) {
 211         cancelled = true;
 212         getExchange().cancel(cause);
 213     }
 214 
 215     public CompletableFuture<HttpResponse<T>> responseAsync() {
 216         CompletableFuture<Void> start = new MinimalFuture<>();
 217         CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
 218         start.completeAsync( () -> null, executor); // trigger execution
 219         return cf;
 220     }
 221 
 222     private CompletableFuture<HttpResponse<T>>
 223     responseAsync0(CompletableFuture<Void> start) {
 224         return start.thenCompose( v -> responseAsyncImpl())
 225                     .thenCompose((Response r) -> {
 226                         Exchange<T> exch = getExchange();
 227                         return exch.readBodyAsync(responseHandler)
 228                             .thenApply((T body) -> {
 229                                 this.response =
 230                                     new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
 231                                 return this.response;
 232                             });
 233                     });
 234     }
 235 
 236     CompletableFuture<U> multiResponseAsync() {
 237         CompletableFuture<Void> start = new MinimalFuture<>();
 238         CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);
 239         CompletableFuture<HttpResponse<T>> mainResponse =
 240                 cf.thenApply(b -> {
 241                         multiResponseSubscriber.onResponse(b);







 242                         pushGroup.noMorePushes(true);
 243                         return b; });
 244         pushGroup.setMainResponse(mainResponse);
 245         CompletableFuture<U> res = multiResponseSubscriber.completion(pushGroup.groupResult(),
 246                                                                       pushGroup.pushesCF());
 247         start.completeAsync( () -> null, executor); // trigger execution
 248         return res;
 249     }
 250 
 251     private CompletableFuture<Response> responseAsyncImpl() {
 252         CompletableFuture<Response> cf;
 253         if (attempts.incrementAndGet() > max_attempts) {
 254             cf = failedFuture(new IOException("Too many retries", retryCause));
 255         } else {
 256             if (currentreq.timeout().isPresent()) {
 257                 timedEvent = new TimedEvent(currentreq.timeout().get());
 258                 client.registerTimer(timedEvent);
 259             }
 260             try {
 261                 // 1. apply request filters
 262                 requestFilters(currentreq);
 263             } catch (IOException e) {
 264                 return failedFuture(e);
 265             }
 266             Exchange<T> exch = getExchange();
 267             // 2. get response
 268             cf = exch.responseAsync()
 269                      .thenCompose((Response response) -> {
 270                         HttpRequestImpl newrequest;
 271                         try {
 272                             // 3. apply response filters
 273                             newrequest = responseFilters(response);
 274                         } catch (IOException e) {
 275                             return failedFuture(e);
 276                         }
 277                         // 4. check filter result and repeat or continue
 278                         if (newrequest == null) {
 279                             if (attempts.get() > 1) {
 280                                 Log.logError("Succeeded on attempt: " + attempts);
 281                             }
 282                             return completedFuture(response);
 283                         } else {
 284                             this.response =
 285                                 new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
 286                             Exchange<T> oldExch = exch;
 287                             return exch.ignoreBody().handle((r,t) -> {
 288                                 currentreq = newrequest;
 289                                 expiredOnce = false;
 290                                 setExchange(new Exchange<>(currentreq, this, acc));

 291                                 return responseAsyncImpl();
 292                             }).thenCompose(Function.identity());
 293                         } })

 294                      .handle((response, ex) -> {
 295                         // 5. handle errors and cancel any timer set
 296                         cancelTimer();
 297                         if (ex == null) {
 298                             assert response != null;
 299                             return completedFuture(response);
 300                         }
 301                         // all exceptions thrown are handled here
 302                         CompletableFuture<Response> errorCF = getExceptionalCF(ex);
 303                         if (errorCF == null) {
 304                             return responseAsyncImpl();
 305                         } else {
 306                             return errorCF;
 307                         } })
 308                      .thenCompose(Function.identity());

 309         }
 310         return cf;
 311     }
 312 
 313     /**
 314      * Takes a Throwable and returns a suitable CompletableFuture that is
 315      * completed exceptionally, or null.
 316      */
 317     private CompletableFuture<Response> getExceptionalCF(Throwable t) {
 318         if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
 319             if (t.getCause() != null) {
 320                 t = t.getCause();
 321             }
 322         }
 323         if (cancelled && t instanceof IOException) {
 324             t = new HttpTimeoutException("request timed out");
 325         } else if (t instanceof ConnectionExpiredException) {
 326             // allow the retry mechanism to do its work
 327             // ####: method (GET,HEAD, not POST?), no bytes written or read ( differentiate? )
 328             if (t.getCause() != null) retryCause = t.getCause();
 329             if (!expiredOnce) {
 330                 DEBUG_LOGGER.log(Level.DEBUG,
 331                     "MultiExchange: ConnectionExpiredException (async): retrying...",
 332                     t);
 333                 expiredOnce = true;
 334                 return null;
 335             } else {
 336                 DEBUG_LOGGER.log(Level.DEBUG,
 337                     "MultiExchange: ConnectionExpiredException (async): already retried once.",
 338                     t);
 339                 if (t.getCause() != null) t = t.getCause();
 340             }
 341         }
 342         return failedFuture(t);
 343     }
 344 
 345     class TimedEvent extends TimeoutEvent {
 346         TimedEvent(Duration duration) {
 347             super(duration);
 348         }
 349         @Override
 350         public void handle() {
 351             DEBUG_LOGGER.log(Level.DEBUG,
 352                     "Cancelling MultiExchange due to timeout for request %s",
 353                      request);
 354             cancel(new HttpTimeoutException("request timed out"));
 355         }
 356     }
 357 }
< prev index next >