1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.nio.ByteBuffer;
  30 import java.util.List;
  31 import java.util.concurrent.CompletableFuture;
  32 import java.util.concurrent.Executor;
  33 import jdk.incubator.http.internal.common.Log;
  34 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
  35 
  36 /**
  37  * Handles a HTTP/1.1 response in two blocking calls. readHeaders() and
  38  * readBody(). There can be more than one of these per Http exchange.
  39  */
  40 class Http1Response<T> {
  41 
  42     private volatile ResponseContent content;
  43     private final HttpRequestImpl request;
  44     private Response response;
  45     private final HttpConnection connection;
  46     private ResponseHeaders headers;
  47     private int responseCode;
  48     private ByteBuffer buffer;
  49     private final Http1Exchange<T> exchange;
  50     private final boolean redirecting; // redirecting
  51     private boolean return2Cache; // return connection to cache when finished
  52 
  53     Http1Response(HttpConnection conn, Http1Exchange<T> exchange) {
  54         this.request = exchange.request();
  55         this.exchange = exchange;
  56         this.connection = conn;
  57         this.redirecting = false;
  58         buffer = exchange.getBuffer();
  59     }
  60 
  61     @SuppressWarnings("unchecked")
  62     public void readHeaders() throws IOException {
  63         String statusline = readStatusLine();
  64         if (statusline == null) {
  65             if (Log.errors()) {
  66                 Log.logError("Connection closed. Retry");
  67             }
  68             connection.close();
  69             // connection was closed
  70             throw new IOException("Connection closed");
  71         }
  72         if (!statusline.startsWith("HTTP/1.")) {
  73             throw new IOException("Invalid status line: " + statusline);
  74         }
  75         if (Log.trace()) {
  76             Log.logTrace("Statusline: {0}", statusline);
  77         }
  78         char c = statusline.charAt(7);
  79         responseCode = Integer.parseInt(statusline.substring(9, 12));
  80 
  81         headers = new ResponseHeaders(connection, buffer);
  82         if (Log.headers()) {
  83             logHeaders(headers);
  84         }
  85         response = new Response(
  86                 request, exchange.getExchange(),
  87                 headers, responseCode, HTTP_1_1);
  88     }
  89 
  90     private boolean finished;
  91 
  92     synchronized void completed() {
  93         finished = true;
  94     }
  95 
  96     synchronized boolean finished() {
  97         return finished;
  98     }
  99 
 100     ByteBuffer getBuffer() {
 101         return buffer;
 102     }
 103 
 104     int fixupContentLen(int clen) {
 105         if (request.method().equalsIgnoreCase("HEAD")) {
 106             return 0;
 107         }
 108         if (clen == -1) {
 109             if (headers.firstValue("Transfer-encoding").orElse("")
 110                        .equalsIgnoreCase("chunked")) {
 111                 return -1;
 112             }
 113             return 0;
 114         }
 115         return clen;
 116     }
 117 
 118     public CompletableFuture<T> readBody(
 119             HttpResponse.BodyProcessor<T> p,
 120             boolean return2Cache,
 121             Executor executor) {
 122         final BlockingPushPublisher<List<ByteBuffer>> publisher = new BlockingPushPublisher<>();
 123         return readBody(p, return2Cache, publisher, executor);
 124     }
 125 
 126     private CompletableFuture<T> readBody(
 127             HttpResponse.BodyProcessor<T> p,
 128             boolean return2Cache,
 129             AbstractPushPublisher<List<ByteBuffer>> publisher,
 130             Executor executor) {
 131         this.return2Cache = return2Cache;
 132         final jdk.incubator.http.HttpResponse.BodyProcessor<T> pusher = p;
 133         final CompletableFuture<T> cf = p.getBody().toCompletableFuture();
 134 
 135         int clen0;
 136         try {
 137             clen0 = headers.getContentLength();
 138         } catch (IOException ex) {
 139             cf.completeExceptionally(ex);
 140             return cf;
 141         }
 142         final int clen = fixupContentLen(clen0);
 143 
 144         executor.execute(() -> {
 145             try {
 146                 content = new ResponseContent(
 147                         connection, clen, headers, pusher,
 148                         publisher.asDataConsumer(),
 149                         (t -> {
 150                             publisher.acceptError(t);
 151                             connection.close();
 152                             cf.completeExceptionally(t);
 153                         }),
 154                         () -> onFinished()
 155                 );
 156                 publisher.subscribe(p);
 157                 if (cf.isCompletedExceptionally()) {
 158                     // if an error occurs during subscription
 159                     connection.close();
 160                     return;
 161                 }
 162                 content.pushBody(buffer);
 163             } catch (Throwable t) {
 164                 cf.completeExceptionally(t);
 165             }
 166         });
 167         return cf;
 168     }
 169 
 170     private void onFinished() {
 171         if (return2Cache) {
 172             Log.logTrace("Returning connection to the pool: {0}", connection);
 173             connection.returnToCache(headers);
 174         }
 175     }
 176 
 177     private void logHeaders(ResponseHeaders headers) {
 178         StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 179         Log.dumpHeaders(sb, "    ", headers);
 180         Log.logHeaders(sb.toString());
 181     }
 182 
 183     Response response() {
 184         return response;
 185     }
 186 
 187     boolean redirecting() {
 188         return redirecting;
 189     }
 190 
 191     HttpHeaders responseHeaders() {
 192         return headers;
 193     }
 194 
 195     int responseCode() {
 196         return responseCode;
 197     }
 198 
 199     static final char CR = '\r';
 200     static final char LF = '\n';
 201 
 202     private int obtainBuffer() throws IOException {
 203         int n = buffer.remaining();
 204 
 205         if (n == 0) {
 206             buffer = connection.read();
 207             if (buffer == null) {
 208                 return -1;
 209             }
 210             n = buffer.remaining();
 211         }
 212         return n;
 213     }
 214 
 215     String readStatusLine() throws IOException {
 216         boolean cr = false;
 217         StringBuilder statusLine = new StringBuilder(128);
 218         while ((obtainBuffer()) != -1) {
 219             byte[] buf = buffer.array();
 220             int offset = buffer.position();
 221             int len = buffer.limit() - offset;
 222 
 223             for (int i = 0; i < len; i++) {
 224                 char c = (char) buf[i+offset];
 225 
 226                 if (cr) {
 227                     if (c == LF) {
 228                         buffer.position(i + 1 + offset);
 229                         return statusLine.toString();
 230                     } else {
 231                         throw new IOException("invalid status line");
 232                     }
 233                 }
 234                 if (c == CR) {
 235                     cr = true;
 236                 } else {
 237                     statusLine.append(c);
 238                 }
 239             }
 240             // unlikely, but possible, that multiple reads required
 241             buffer.position(buffer.limit());
 242         }
 243         return null;
 244     }
 245 }