1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.net.InetSocketAddress;
  30 import jdk.incubator.http.HttpResponse.BodyHandler;
  31 import jdk.incubator.http.HttpResponse.BodyProcessor;
  32 import java.nio.ByteBuffer;
  33 import java.util.concurrent.CompletableFuture;
  34 import java.util.Collections;
  35 import java.util.LinkedList;
  36 import java.util.List;
  37 import java.util.concurrent.CompletionException;
  38 import java.util.concurrent.Executor;
  39 import jdk.incubator.http.internal.common.Log;
  40 import jdk.incubator.http.internal.common.MinimalFuture;
  41 import jdk.incubator.http.internal.common.Utils;
  42 
  43 /**
  44  * Encapsulates one HTTP/1.1 request/responseAsync exchange.
  45  */
  46 class Http1Exchange<T> extends ExchangeImpl<T> {
  47 
  48     final HttpRequestImpl request;        // main request
  49     private final List<CompletableFuture<?>> operations; // used for cancel
  50     final Http1Request requestAction;
  51     private volatile Http1Response<T> response;
  52     // use to record possible cancellation raised before any operation
  53     // has been initiated.
  54     private IOException failed;
  55     final HttpConnection connection;
  56     final HttpClientImpl client;
  57     final Executor executor;
  58     volatile ByteBuffer buffer; // used for receiving
  59 
  60     @Override
  61     public String toString() {
  62         return request.toString();
  63     }
  64 
  65     HttpRequestImpl request() {
  66         return request;
  67     }
  68 
  69     Http1Exchange(Exchange<T> exchange, HttpConnection connection)
  70         throws IOException
  71     {
  72         super(exchange);
  73         this.request = exchange.request();
  74         this.client = exchange.client();
  75         this.executor = exchange.executor();
  76         this.operations = new LinkedList<>();
  77         this.buffer = Utils.EMPTY_BYTEBUFFER;
  78         if (connection != null) {
  79             this.connection = connection;
  80         } else {
  81             InetSocketAddress addr = request.getAddress(client);
  82             this.connection = HttpConnection.getConnection(addr, client, request);
  83         }
  84         this.requestAction = new Http1Request(request, client, this.connection);
  85     }
  86 
  87 
  88     HttpConnection connection() {
  89         return connection;
  90     }
  91 
  92 
  93     @Override
  94     T readBody(BodyHandler<T> handler, boolean returnConnectionToPool)
  95         throws IOException
  96     {
  97         BodyProcessor<T> processor = handler.apply(response.responseCode(),
  98                                                    response.responseHeaders());
  99         CompletableFuture<T> bodyCF = response.readBody(processor,
 100                                                         returnConnectionToPool,
 101                                                         this::executeInline);
 102         try {
 103             return bodyCF.join();
 104         } catch (CompletionException e) {
 105             throw Utils.getIOException(e);
 106         }
 107     }
 108 
 109     private void executeInline(Runnable r) {
 110         r.run();
 111     }
 112 
 113     synchronized ByteBuffer getBuffer() {
 114         return buffer;
 115     }
 116 
 117     @Override
 118     CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
 119                                        boolean returnConnectionToPool,
 120                                        Executor executor)
 121     {
 122         BodyProcessor<T> processor = handler.apply(response.responseCode(),
 123                                                    response.responseHeaders());
 124         CompletableFuture<T> bodyCF = response.readBody(processor,
 125                                                         returnConnectionToPool,
 126                                                         executor);
 127         return bodyCF;
 128     }
 129 
 130     @Override
 131     void sendHeadersOnly() throws IOException, InterruptedException {
 132         try {
 133             if (!connection.connected()) {
 134                 connection.connect();
 135             }
 136             requestAction.sendHeadersOnly();
 137         } catch (Throwable e) {
 138             connection.close();
 139             throw e;
 140         }
 141     }
 142 
 143     @Override
 144     void sendBody() throws IOException {
 145         try {
 146             requestAction.continueRequest();
 147         } catch (Throwable e) {
 148             connection.close();
 149             throw e;
 150         }
 151     }
 152 
 153     @Override
 154     Response getResponse() throws IOException {
 155         try {
 156             response = new Http1Response<>(connection, this);
 157             response.readHeaders();
 158             Response r = response.response();
 159             buffer = response.getBuffer();
 160             return r;
 161         } catch (Throwable t) {
 162             connection.close();
 163             throw t;
 164         }
 165     }
 166 
 167     private void closeConnection() {
 168         connection.close();
 169     }
 170 
 171     /**
 172      * Cancel checks to see if request and responseAsync finished already.
 173      * If not it closes the connection and completes all pending operations
 174      */
 175     @Override
 176     void cancel() {
 177         cancel(new IOException("Request cancelled"));
 178     }
 179 
 180     /**
 181      * Cancel checks to see if request and responseAsync finished already.
 182      * If not it closes the connection and completes all pending operations
 183      */
 184     @Override
 185     synchronized void cancel(IOException cause) {
 186         if (requestAction != null && requestAction.finished()
 187                 && response != null && response.finished()) {
 188             return;
 189         }
 190         connection.close();
 191         int count = 0;
 192         if (operations.isEmpty()) {
 193             failed = cause;
 194             Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation."
 195                          + "\n\tCan''t cancel yet with {2}",
 196                          request.uri(),
 197                          request.duration() == null ? -1 :
 198                          // calling duration.toMillis() can throw an exception.
 199                          // this is just debugging, we don't care if it overflows.
 200                          (request.duration().getSeconds() * 1000
 201                           + request.duration().getNano() / 1000000),
 202                          cause);
 203         } else {
 204             for (CompletableFuture<?> cf : operations) {
 205                 cf.completeExceptionally(cause);
 206                 count++;
 207             }
 208         }
 209         Log.logError("Http1Exchange.cancel: count=" + count);
 210     }
 211 
 212     CompletableFuture<Response> getResponseAsyncImpl(Executor executor) {
 213         return MinimalFuture.supply( () -> {
 214             response = new Http1Response<>(connection, Http1Exchange.this);
 215             response.readHeaders();
 216             Response r = response.response();
 217             buffer = response.getBuffer();
 218             return r;
 219         }, executor);
 220     }
 221 
 222     @Override
 223     CompletableFuture<Response> getResponseAsync(Executor executor) {
 224         CompletableFuture<Response> cf =
 225             connection.whenReceivingResponse()
 226                       .thenCompose((v) -> getResponseAsyncImpl(executor));
 227         IOException cause;
 228         synchronized(this) {
 229             operations.add(cf);
 230             cause = failed;
 231             failed = null;
 232         }
 233         if (cause != null) {
 234             Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]"
 235                          + "\n\tCompleting exceptionally with {2}\n",
 236                          request.uri(),
 237                          request.duration() == null ? -1 :
 238                          // calling duration.toMillis() can throw an exception.
 239                          // this is just debugging, we don't care if it overflows.
 240                          (request.duration().getSeconds() * 1000
 241                           + request.duration().getNano() / 1000000),
 242                          cause);
 243             cf.completeExceptionally(cause);
 244         }
 245         return cf;
 246     }
 247 }