1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.nio.ByteBuffer;
  30 import java.util.concurrent.CompletableFuture;
  31 import java.util.concurrent.Executor;
  32 import jdk.incubator.http.internal.common.Log;
  33 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
  34 
  35 /**
  36  * Handles a HTTP/1.1 response in two blocking calls. readHeaders() and
  37  * readBody(). There can be more than one of these per Http exchange.
  38  */
  39 class Http1Response<T> {
  40 
  41     private volatile ResponseContent content;
  42     private final HttpRequestImpl request;
  43     private Response response;
  44     private final HttpConnection connection;
  45     private ResponseHeaders headers;
  46     private int responseCode;
  47     private ByteBuffer buffer;
  48     private final Http1Exchange<T> exchange;
  49     private final boolean redirecting; // redirecting
  50     private boolean return2Cache; // return connection to cache when finished
  51 
  52     Http1Response(HttpConnection conn, Http1Exchange<T> exchange) {
  53         this.request = exchange.request();
  54         this.exchange = exchange;
  55         this.connection = conn;
  56         this.redirecting = false;
  57         buffer = exchange.getBuffer();
  58     }
  59 
  60     @SuppressWarnings("unchecked")
  61     public void readHeaders() throws IOException {
  62         String statusline = readStatusLine();
  63         if (statusline == null) {
  64             if (Log.errors()) {
  65                 Log.logError("Connection closed. Retry");
  66             }
  67             connection.close();
  68             // connection was closed
  69             throw new IOException("Connection closed");
  70         }
  71         if (!statusline.startsWith("HTTP/1.")) {
  72             throw new IOException("Invalid status line: " + statusline);
  73         }
  74         if (Log.trace()) {
  75             Log.logTrace("Statusline: {0}", statusline);
  76         }
  77         char c = statusline.charAt(7);
  78         responseCode = Integer.parseInt(statusline.substring(9, 12));
  79 
  80         headers = new ResponseHeaders(connection, buffer);
  81         if (Log.headers()) {
  82             logHeaders(headers);
  83         }
  84         response = new Response(
  85                 request, exchange.getExchange(),
  86                 headers, responseCode, HTTP_1_1);
  87     }
  88 
  89     private boolean finished;
  90 
  91     synchronized void completed() {
  92         finished = true;
  93     }
  94 
  95     synchronized boolean finished() {
  96         return finished;
  97     }
  98 
  99     ByteBuffer getBuffer() {
 100         return buffer;
 101     }
 102 
 103     int fixupContentLen(int clen) {
 104         if (request.method().equalsIgnoreCase("HEAD")) {
 105             return 0;
 106         }
 107         if (clen == -1) {
 108             if (headers.firstValue("Transfer-encoding").orElse("")
 109                        .equalsIgnoreCase("chunked")) {
 110                 return -1;
 111             }
 112             return 0;
 113         }
 114         return clen;
 115     }
 116 
 117     public CompletableFuture<T> readBody(
 118             HttpResponse.BodyProcessor<T> p,
 119             boolean return2Cache,
 120             Executor executor) {
 121         final BlockingPushPublisher<ByteBuffer> publisher = new BlockingPushPublisher<>();
 122         return readBody(p, return2Cache, publisher, executor);
 123     }
 124 
 125     private CompletableFuture<T> readBody(
 126             HttpResponse.BodyProcessor<T> p,
 127             boolean return2Cache,
 128             AbstractPushPublisher<ByteBuffer> publisher,
 129             Executor executor) {
 130         this.return2Cache = return2Cache;
 131         final jdk.incubator.http.HttpResponse.BodyProcessor<T> pusher = p;
 132         final CompletableFuture<T> cf = p.getBody().toCompletableFuture();
 133 
 134         int clen0;
 135         try {
 136             clen0 = headers.getContentLength();
 137         } catch (IOException ex) {
 138             cf.completeExceptionally(ex);
 139             return cf;
 140         }
 141         final int clen = fixupContentLen(clen0);
 142 
 143         executor.execute(() -> {
 144             try {
 145                 content = new ResponseContent(
 146                         connection, clen, headers, pusher,
 147                         publisher.asDataConsumer(),
 148                         (t -> {
 149                             publisher.acceptError(t);
 150                             connection.close();
 151                             cf.completeExceptionally(t);
 152                         }),
 153                         () -> onFinished()
 154                 );
 155                 publisher.subscribe(p);
 156                 if (cf.isCompletedExceptionally()) {
 157                     // if an error occurs during subscription
 158                     connection.close();
 159                     return;
 160                 }
 161                 content.pushBody(buffer);
 162             } catch (Throwable t) {
 163                 cf.completeExceptionally(t);
 164             }
 165         });
 166         return cf;
 167     }
 168 
 169     private void onFinished() {
 170         if (return2Cache) {
 171             Log.logTrace("Returning connection to the pool: {0}", connection);
 172             connection.returnToCache(headers);
 173         }
 174     }
 175 
 176     private void logHeaders(ResponseHeaders headers) {
 177         StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 178         Log.dumpHeaders(sb, "    ", headers);
 179         Log.logHeaders(sb.toString());
 180     }
 181 
 182     Response response() {
 183         return response;
 184     }
 185 
 186     boolean redirecting() {
 187         return redirecting;
 188     }
 189 
 190     HttpHeaders responseHeaders() {
 191         return headers;
 192     }
 193 
 194     int responseCode() {
 195         return responseCode;
 196     }
 197 
 198     static final char CR = '\r';
 199     static final char LF = '\n';
 200 
 201     private int obtainBuffer() throws IOException {
 202         int n = buffer.remaining();
 203 
 204         if (n == 0) {
 205             buffer = connection.read();
 206             if (buffer == null) {
 207                 return -1;
 208             }
 209             n = buffer.remaining();
 210         }
 211         return n;
 212     }
 213 
 214     String readStatusLine() throws IOException {
 215         boolean cr = false;
 216         StringBuilder statusLine = new StringBuilder(128);
 217         while ((obtainBuffer()) != -1) {
 218             byte[] buf = buffer.array();
 219             int offset = buffer.position();
 220             int len = buffer.limit() - offset;
 221 
 222             for (int i = 0; i < len; i++) {
 223                 char c = (char) buf[i+offset];
 224 
 225                 if (cr) {
 226                     if (c == LF) {
 227                         buffer.position(i + 1 + offset);
 228                         return statusLine.toString();
 229                     } else {
 230                         throw new IOException("invalid status line");
 231                     }
 232                 }
 233                 if (c == CR) {
 234                     cr = true;
 235                 } else {
 236                     statusLine.append(c);
 237                 }
 238             }
 239             // unlikely, but possible, that multiple reads required
 240             buffer.position(buffer.limit());
 241         }
 242         return null;
 243     }
 244 }