< prev index next >
   1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  */
  24 package java.net.http;
  25 
  26 import java.io.IOException;
  27 import java.net.InetSocketAddress;
  28 import java.net.URI;
  29 import java.util.concurrent.CompletableFuture;
  30 import java.util.Collections;
  31 import java.util.LinkedList;
  32 import java.util.List;
  33 
  34 /**
  35  * Encapsulates one HTTP/1.1 request/responseAsync exchange.
  36  */
  37 class Http1Exchange extends ExchangeImpl {
  38 
  39     final HttpRequestImpl request;        // main request
  40     final List<CompletableFuture<?>> operations; // used for cancel
  41     final Http1Request requestAction;
  42     volatile Http1Response response;
  43     final HttpConnection connection;
  44     final HttpClientImpl client;
  45     final ExecutorWrapper executor;
  46 
  47     @Override
  48     public String toString() {
  49         return request.toString();
  50     }
  51 
  52     HttpRequestImpl request() {
  53         return request;
  54     }
  55 
  56     Http1Exchange(Exchange exchange, HttpConnection connection)
  57         throws IOException
  58     {
  59         super(exchange);
  60         this.request = exchange.request();
  61         this.client = request.client();
  62         this.executor = client.executorWrapper();
  63         this.operations = Collections.synchronizedList(new LinkedList<>());
  64         if (connection != null) {
  65             this.connection = connection;
  66         } else {
  67             InetSocketAddress addr = getAddress(request);
  68             this.connection = HttpConnection.getConnection(addr, request);
  69         }
  70         this.requestAction = new Http1Request(request, this.connection);
  71     }
  72 
  73     private static InetSocketAddress getAddress(HttpRequestImpl req) {
  74         URI uri = req.uri();
  75         if (uri == null) {
  76             return req.authority();
  77         }
  78         int port = uri.getPort();
  79         if (port == -1) {
  80             if (uri.getScheme().equalsIgnoreCase("https")) {
  81                 port = 443;
  82             } else {
  83                 port = 80;
  84             }
  85         }
  86         String host = uri.getHost();
  87         if (req.proxy() == null) {
  88             return new InetSocketAddress(host, port);
  89         } else {
  90             return InetSocketAddress.createUnresolved(host, port);
  91         }
  92     }
  93 
  94     HttpConnection connection() {
  95         return connection;
  96     }
  97 
  98     @Override
  99     <T> T responseBody(HttpResponse.BodyProcessor<T> processor)
 100         throws IOException
 101     {
 102         return responseBody(processor, true);
 103     }
 104 
 105     <T> T responseBody(HttpResponse.BodyProcessor<T> processor,
 106                        boolean return2Cache)
 107         throws IOException
 108     {
 109         try {
 110             T body = response.readBody(processor, return2Cache);
 111             return body;
 112         } catch (Throwable t) {
 113             connection.close();
 114             throw t;
 115         }
 116     }
 117 
 118     @Override
 119     <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
 120         CompletableFuture<T> cf = new CompletableFuture<>();
 121         request.client()
 122                .executorWrapper()
 123                .execute(() -> {
 124                             try {
 125                                 T body = responseBody(processor);
 126                                 cf.complete(body);
 127                             } catch (Throwable e) {
 128                                 cf.completeExceptionally(e);
 129                             }
 130                         },
 131                         () -> response.response.getAccessControlContext()); // TODO: fix
 132         return cf;
 133     }
 134 
 135     @Override
 136     void sendHeadersOnly() throws IOException, InterruptedException {
 137         try {
 138             if (!connection.connected()) {
 139                 connection.connect();
 140             }
 141             requestAction.sendHeadersOnly();
 142         } catch (Throwable e) {
 143             connection.close();
 144             throw e;
 145         }
 146     }
 147 
 148     @Override
 149     void sendBody() throws IOException {
 150         try {
 151             requestAction.continueRequest();
 152         } catch (Throwable e) {
 153             connection.close();
 154             throw e;
 155         }
 156     }
 157 
 158     @Override
 159     HttpResponseImpl getResponse() throws IOException {
 160         try {
 161             response = new Http1Response(connection, this);
 162             response.readHeaders();
 163             return response.response();
 164         } catch (Throwable t) {
 165             connection.close();
 166             throw t;
 167         }
 168     }
 169 
 170     @Override
 171     void sendRequest() throws IOException, InterruptedException {
 172         try {
 173             if (!connection.connected()) {
 174                 connection.connect();
 175             }
 176             requestAction.sendRequest();
 177         } catch (Throwable t) {
 178             connection.close();
 179             throw t;
 180         }
 181     }
 182 
 183     private void closeConnection() {
 184         connection.close();
 185     }
 186 
 187     @Override
 188     CompletableFuture<Void> sendHeadersAsync() {
 189         if (!connection.connected()) {
 190             CompletableFuture<Void> op = connection.connectAsync()
 191                     .thenCompose(this::sendHdrsAsyncImpl)
 192                     .whenComplete((Void b, Throwable t) -> {
 193                         if (t != null)
 194                             closeConnection();
 195                     });
 196             operations.add(op);
 197             return op;
 198         } else {
 199             return sendHdrsAsyncImpl(null);
 200         }
 201     }
 202 
 203     private CompletableFuture<Void> sendHdrsAsyncImpl(Void v) {
 204         CompletableFuture<Void> cf = new CompletableFuture<>();
 205         executor.execute(() -> {
 206                             try {
 207                                 requestAction.sendHeadersOnly();
 208                                 cf.complete(null);
 209                             } catch (Throwable e) {
 210                                 cf.completeExceptionally(e);
 211                                 connection.close();
 212                             }
 213                          },
 214                          () -> request.getAccessControlContext());
 215         operations.add(cf);
 216         return cf;
 217     }
 218 
 219     /**
 220      * Cancel checks to see if request and responseAsync finished already.
 221      * If not it closes the connection and completes all pending operations
 222      */
 223     @Override
 224     synchronized void cancel() {
 225         if (requestAction != null && requestAction.finished()
 226                 && response != null && response.finished()) {
 227             return;
 228         }
 229         connection.close();
 230         IOException e = new IOException("Request cancelled");
 231         int count = 0;
 232         for (CompletableFuture<?> cf : operations) {
 233             cf.completeExceptionally(e);
 234             count++;
 235         }
 236         Log.logError("Http1Exchange.cancel: count=" + count);
 237     }
 238 
 239     CompletableFuture<HttpResponseImpl> getResponseAsyncImpl(Void v) {
 240         CompletableFuture<HttpResponseImpl> cf = new CompletableFuture<>();
 241         try {
 242             response = new Http1Response(connection, Http1Exchange.this);
 243             response.readHeaders();
 244             cf.complete(response.response());
 245         } catch (IOException e) {
 246             cf.completeExceptionally(e);
 247         }
 248         return cf;
 249     }
 250 
 251     @Override
 252     CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) {
 253         CompletableFuture<HttpResponseImpl> cf =
 254             connection.whenReceivingResponse()
 255                       .thenCompose(this::getResponseAsyncImpl);
 256 
 257         operations.add(cf);
 258         return cf;
 259     }
 260 
 261     @Override
 262     CompletableFuture<Void> sendBodyAsync() {
 263         final CompletableFuture<Void> cf = new CompletableFuture<>();
 264         executor.execute(() -> {
 265             try {
 266                 requestAction.continueRequest();
 267                 cf.complete(null);
 268             } catch (Throwable e) {
 269                 cf.completeExceptionally(e);
 270                 connection.close();
 271             }
 272         }, () -> request.getAccessControlContext());
 273         operations.add(cf);
 274         return cf;
 275     }
 276 
 277     @Override
 278     CompletableFuture<Void> sendRequestAsync() {
 279         CompletableFuture<Void> op;
 280         if (!connection.connected()) {
 281             op = connection.connectAsync()
 282                 .thenCompose(this::sendRequestAsyncImpl)
 283                 .whenComplete((Void v, Throwable t) -> {
 284                     if (t != null) {
 285                         closeConnection();
 286                     }
 287                 });
 288         } else {
 289             op = sendRequestAsyncImpl(null);
 290         }
 291         operations.add(op);
 292         return op;
 293     }
 294 
 295     CompletableFuture<Void> sendRequestAsyncImpl(Void v) {
 296         CompletableFuture<Void> cf = new CompletableFuture<>();
 297         executor.execute(() -> {
 298             try {
 299                 requestAction.sendRequest();
 300                 cf.complete(null);
 301             } catch (Throwable e) {
 302                 cf.completeExceptionally(e);
 303                 connection.close();
 304             }
 305         }, () -> request.getAccessControlContext());
 306         operations.add(cf);
 307         return cf;
 308     }
 309 }
< prev index next >