1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.nio.ByteBuffer; 30 import java.util.List; 31 import java.util.concurrent.CompletableFuture; 32 import java.util.concurrent.Executor; 33 import jdk.incubator.http.internal.common.Log; 34 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1; 35 36 /** 37 * Handles a HTTP/1.1 response in two blocking calls. readHeaders() and 38 * readBody(). There can be more than one of these per Http exchange. 39 */ 40 class Http1Response<T> { 41 42 private volatile ResponseContent content; 43 private final HttpRequestImpl request; 44 private Response response; 45 private final HttpConnection connection; 46 private ResponseHeaders headers; 47 private int responseCode; 48 private ByteBuffer buffer; 49 private final Http1Exchange<T> exchange; 50 private final boolean redirecting; // redirecting 51 private boolean return2Cache; // return connection to cache when finished 52 53 Http1Response(HttpConnection conn, Http1Exchange<T> exchange) { 54 this.request = exchange.request(); 55 this.exchange = exchange; 56 this.connection = conn; 57 this.redirecting = false; 58 buffer = exchange.getBuffer(); 59 } 60 61 @SuppressWarnings("unchecked") 62 public void readHeaders() throws IOException { 63 String statusline = readStatusLine(); 64 if (statusline == null) { 65 if (Log.errors()) { 66 Log.logError("Connection closed. Retry"); 67 } 68 connection.close(); 69 // connection was closed 70 throw new IOException("Connection closed"); 71 } 72 if (!statusline.startsWith("HTTP/1.")) { 73 throw new IOException("Invalid status line: " + statusline); 74 } 75 if (Log.trace()) { 76 Log.logTrace("Statusline: {0}", statusline); 77 } 78 char c = statusline.charAt(7); 79 responseCode = Integer.parseInt(statusline.substring(9, 12)); 80 81 headers = new ResponseHeaders(connection, buffer); 82 if (Log.headers()) { 83 logHeaders(headers); 84 } 85 response = new Response( 86 request, exchange.getExchange(), 87 headers, responseCode, HTTP_1_1); 88 } 89 90 private boolean finished; 91 92 synchronized void completed() { 93 finished = true; 94 } 95 96 synchronized boolean finished() { 97 return finished; 98 } 99 100 ByteBuffer getBuffer() { 101 return buffer; 102 } 103 104 int fixupContentLen(int clen) { 105 if (request.method().equalsIgnoreCase("HEAD")) { 106 return 0; 107 } 108 if (clen == -1) { 109 if (headers.firstValue("Transfer-encoding").orElse("") 110 .equalsIgnoreCase("chunked")) { 111 return -1; 112 } 113 return 0; 114 } 115 return clen; 116 } 117 118 public CompletableFuture<T> readBody(HttpResponse.BodyProcessor<T> p, 119 boolean return2Cache, 120 Executor executor) { 121 BlockingPushPublisher<List<ByteBuffer>> publisher = new BlockingPushPublisher<>(); 122 return readBody(p, return2Cache, publisher, executor); 123 } 124 125 private CompletableFuture<T> readBody(HttpResponse.BodyProcessor<T> p, 126 boolean return2Cache, 127 AbstractPushPublisher<List<ByteBuffer>> publisher, 128 Executor executor) { 129 this.return2Cache = return2Cache; 130 final HttpResponse.BodyProcessor<T> pusher = p; 131 final CompletableFuture<T> cf = p.getBody().toCompletableFuture(); 132 133 int clen0; 134 try { 135 clen0 = headers.getContentLength(); 136 } catch (IOException ex) { 137 cf.completeExceptionally(ex); 138 return cf; 139 } 140 final int clen = fixupContentLen(clen0); 141 142 executor.execute(() -> { 143 try { 144 content = new ResponseContent( 145 connection, clen, headers, pusher, 146 publisher.asDataConsumer(), 147 (t -> { 148 publisher.acceptError(t); 149 connection.close(); 150 cf.completeExceptionally(t); 151 }), 152 () -> onFinished() 153 ); 154 publisher.subscribe(p); 155 if (cf.isCompletedExceptionally()) { 156 // if an error occurs during subscription 157 connection.close(); 158 return; 159 } 160 content.pushBody(buffer); 161 } catch (Throwable t) { 162 cf.completeExceptionally(t); 163 } 164 }); 165 return cf; 166 } 167 168 private void onFinished() { 169 if (return2Cache) { 170 Log.logTrace("Returning connection to the pool: {0}", connection); 171 connection.returnToCache(headers); 172 } 173 } 174 175 private void logHeaders(ResponseHeaders headers) { 176 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 177 Log.dumpHeaders(sb, " ", headers); 178 Log.logHeaders(sb.toString()); 179 } 180 181 Response response() { 182 return response; 183 } 184 185 boolean redirecting() { 186 return redirecting; 187 } 188 189 HttpHeaders responseHeaders() { 190 return headers; 191 } 192 193 int responseCode() { 194 return responseCode; 195 } 196 197 static final char CR = '\r'; 198 static final char LF = '\n'; 199 200 private int obtainBuffer() throws IOException { 201 int n = buffer.remaining(); 202 203 if (n == 0) { 204 buffer = connection.read(); 205 if (buffer == null) { 206 return -1; 207 } 208 n = buffer.remaining(); 209 } 210 return n; 211 } 212 213 String readStatusLine() throws IOException { 214 boolean cr = false; 215 StringBuilder statusLine = new StringBuilder(128); 216 while ((obtainBuffer()) != -1) { 217 byte[] buf = buffer.array(); 218 int offset = buffer.position(); 219 int len = buffer.limit() - offset; 220 221 for (int i = 0; i < len; i++) { 222 char c = (char) buf[i+offset]; 223 224 if (cr) { 225 if (c == LF) { 226 buffer.position(i + 1 + offset); 227 return statusLine.toString(); 228 } else { 229 throw new IOException("invalid status line"); 230 } 231 } 232 if (c == CR) { 233 cr = true; 234 } else { 235 statusLine.append(c); 236 } 237 } 238 // unlikely, but possible, that multiple reads required 239 buffer.position(buffer.limit()); 240 } 241 return null; 242 } 243 }