1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.nio.ByteBuffer;
  30 import java.util.List;
  31 import java.util.concurrent.CompletableFuture;
  32 import java.util.concurrent.Executor;
  33 import jdk.incubator.http.internal.common.Log;
  34 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
  35 
  36 /**
  37  * Handles a HTTP/1.1 response in two blocking calls. readHeaders() and
  38  * readBody(). There can be more than one of these per Http exchange.
  39  */
  40 class Http1Response<T> {
  41 
  42     private volatile ResponseContent content;
  43     private final HttpRequestImpl request;
  44     private Response response;
  45     private final HttpConnection connection;
  46     private ResponseHeaders headers;
  47     private int responseCode;
  48     private ByteBuffer buffer;
  49     private final Http1Exchange<T> exchange;
  50     private final boolean redirecting; // redirecting
  51     private boolean return2Cache; // return connection to cache when finished
  52 
  53     Http1Response(HttpConnection conn, Http1Exchange<T> exchange) {
  54         this.request = exchange.request();
  55         this.exchange = exchange;
  56         this.connection = conn;
  57         this.redirecting = false;
  58         buffer = exchange.getBuffer();
  59     }
  60 
  61     @SuppressWarnings("unchecked")
  62     public void readHeaders() throws IOException {
  63         String statusline = readStatusLine();
  64         if (statusline == null) {
  65             if (Log.errors()) {
  66                 Log.logError("Connection closed. Retry");
  67             }
  68             connection.close();
  69             // connection was closed
  70             throw new IOException("Connection closed");
  71         }
  72         if (!statusline.startsWith("HTTP/1.")) {
  73             throw new IOException("Invalid status line: " + statusline);
  74         }
  75         if (Log.trace()) {
  76             Log.logTrace("Statusline: {0}", statusline);
  77         }
  78         char c = statusline.charAt(7);
  79         responseCode = Integer.parseInt(statusline.substring(9, 12));
  80 
  81         headers = new ResponseHeaders(connection, buffer);
  82         if (Log.headers()) {
  83             logHeaders(headers);
  84         }
  85         response = new Response(
  86                 request, exchange.getExchange(),
  87                 headers, responseCode, HTTP_1_1);
  88     }
  89 
  90     private boolean finished;
  91 
  92     synchronized void completed() {
  93         finished = true;
  94     }
  95 
  96     synchronized boolean finished() {
  97         return finished;
  98     }
  99 
 100     ByteBuffer getBuffer() {
 101         return buffer;
 102     }
 103 
 104     int fixupContentLen(int clen) {
 105         if (request.method().equalsIgnoreCase("HEAD")) {
 106             return 0;
 107         }
 108         if (clen == -1) {
 109             if (headers.firstValue("Transfer-encoding").orElse("")
 110                        .equalsIgnoreCase("chunked")) {
 111                 return -1;
 112             }
 113             return 0;
 114         }
 115         return clen;
 116     }
 117 
 118     public CompletableFuture<T> readBody(HttpResponse.BodyProcessor<T> p,
 119                                          boolean return2Cache,
 120                                          Executor executor) {
 121         BlockingPushPublisher<List<ByteBuffer>> publisher = new BlockingPushPublisher<>();
 122         return readBody(p, return2Cache, publisher, executor);
 123     }
 124 
 125     private CompletableFuture<T> readBody(HttpResponse.BodyProcessor<T> p,
 126                                           boolean return2Cache,
 127                                           AbstractPushPublisher<List<ByteBuffer>> publisher,
 128                                           Executor executor) {
 129         this.return2Cache = return2Cache;
 130         final HttpResponse.BodyProcessor<T> pusher = p;
 131         final CompletableFuture<T> cf = p.getBody().toCompletableFuture();
 132 
 133         int clen0;
 134         try {
 135             clen0 = headers.getContentLength();
 136         } catch (IOException ex) {
 137             cf.completeExceptionally(ex);
 138             return cf;
 139         }
 140         final int clen = fixupContentLen(clen0);
 141 
 142         executor.execute(() -> {
 143             try {
 144                 content = new ResponseContent(
 145                         connection, clen, headers, pusher,
 146                         publisher.asDataConsumer(),
 147                         (t -> {
 148                             publisher.acceptError(t);
 149                             connection.close();
 150                             cf.completeExceptionally(t);
 151                         }),
 152                         () -> onFinished()
 153                 );
 154                 publisher.subscribe(p);
 155                 if (cf.isCompletedExceptionally()) {
 156                     // if an error occurs during subscription
 157                     connection.close();
 158                     return;
 159                 }
 160                 content.pushBody(buffer);
 161             } catch (Throwable t) {
 162                 cf.completeExceptionally(t);
 163             }
 164         });
 165         return cf;
 166     }
 167 
 168     private void onFinished() {
 169         if (return2Cache) {
 170             Log.logTrace("Returning connection to the pool: {0}", connection);
 171             connection.returnToCache(headers);
 172         }
 173     }
 174 
 175     private void logHeaders(ResponseHeaders headers) {
 176         StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
 177         Log.dumpHeaders(sb, "    ", headers);
 178         Log.logHeaders(sb.toString());
 179     }
 180 
 181     Response response() {
 182         return response;
 183     }
 184 
 185     boolean redirecting() {
 186         return redirecting;
 187     }
 188 
 189     HttpHeaders responseHeaders() {
 190         return headers;
 191     }
 192 
 193     int responseCode() {
 194         return responseCode;
 195     }
 196 
 197     static final char CR = '\r';
 198     static final char LF = '\n';
 199 
 200     private int obtainBuffer() throws IOException {
 201         int n = buffer.remaining();
 202 
 203         if (n == 0) {
 204             buffer = connection.read();
 205             if (buffer == null) {
 206                 return -1;
 207             }
 208             n = buffer.remaining();
 209         }
 210         return n;
 211     }
 212 
 213     String readStatusLine() throws IOException {
 214         boolean cr = false;
 215         StringBuilder statusLine = new StringBuilder(128);
 216         while ((obtainBuffer()) != -1) {
 217             byte[] buf = buffer.array();
 218             int offset = buffer.position();
 219             int len = buffer.limit() - offset;
 220 
 221             for (int i = 0; i < len; i++) {
 222                 char c = (char) buf[i+offset];
 223 
 224                 if (cr) {
 225                     if (c == LF) {
 226                         buffer.position(i + 1 + offset);
 227                         return statusLine.toString();
 228                     } else {
 229                         throw new IOException("invalid status line");
 230                     }
 231                 }
 232                 if (c == CR) {
 233                     cr = true;
 234                 } else {
 235                     statusLine.append(c);
 236                 }
 237             }
 238             // unlikely, but possible, that multiple reads required
 239             buffer.position(buffer.limit());
 240         }
 241         return null;
 242     }
 243 }