1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.nio.ByteBuffer; 30 import java.util.concurrent.CompletableFuture; 31 import java.util.concurrent.Executor; 32 import jdk.incubator.http.internal.common.Log; 33 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1; 34 35 /** 36 * Handles a HTTP/1.1 response in two blocking calls. readHeaders() and 37 * readBody(). There can be more than one of these per Http exchange. 38 */ 39 class Http1Response<T> { 40 41 private volatile ResponseContent content; 42 private final HttpRequestImpl request; 43 private Response response; 44 private final HttpConnection connection; 45 private ResponseHeaders headers; 46 private int responseCode; 47 private ByteBuffer buffer; 48 private final Http1Exchange<T> exchange; 49 private final boolean redirecting; // redirecting 50 private boolean return2Cache; // return connection to cache when finished 51 52 Http1Response(HttpConnection conn, Http1Exchange<T> exchange) { 53 this.request = exchange.request(); 54 this.exchange = exchange; 55 this.connection = conn; 56 this.redirecting = false; 57 buffer = exchange.getBuffer(); 58 } 59 60 @SuppressWarnings("unchecked") 61 public void readHeaders() throws IOException { 62 String statusline = readStatusLine(); 63 if (statusline == null) { 64 if (Log.errors()) { 65 Log.logError("Connection closed. Retry"); 66 } 67 connection.close(); 68 // connection was closed 69 throw new IOException("Connection closed"); 70 } 71 if (!statusline.startsWith("HTTP/1.")) { 72 throw new IOException("Invalid status line: " + statusline); 73 } 74 if (Log.trace()) { 75 Log.logTrace("Statusline: {0}", statusline); 76 } 77 char c = statusline.charAt(7); 78 responseCode = Integer.parseInt(statusline.substring(9, 12)); 79 80 headers = new ResponseHeaders(connection, buffer); 81 if (Log.headers()) { 82 logHeaders(headers); 83 } 84 response = new Response( 85 request, exchange.getExchange(), 86 headers, responseCode, HTTP_1_1); 87 } 88 89 private boolean finished; 90 91 synchronized void completed() { 92 finished = true; 93 } 94 95 synchronized boolean finished() { 96 return finished; 97 } 98 99 ByteBuffer getBuffer() { 100 return buffer; 101 } 102 103 int fixupContentLen(int clen) { 104 if (request.method().equalsIgnoreCase("HEAD")) { 105 return 0; 106 } 107 if (clen == -1) { 108 if (headers.firstValue("Transfer-encoding").orElse("") 109 .equalsIgnoreCase("chunked")) { 110 return -1; 111 } 112 return 0; 113 } 114 return clen; 115 } 116 117 public CompletableFuture<T> readBody( 118 HttpResponse.BodyProcessor<T> p, 119 boolean return2Cache, 120 Executor executor) { 121 final BlockingPushPublisher<ByteBuffer> publisher = new BlockingPushPublisher<>(); 122 return readBody(p, return2Cache, publisher, executor); 123 } 124 125 private CompletableFuture<T> readBody( 126 HttpResponse.BodyProcessor<T> p, 127 boolean return2Cache, 128 AbstractPushPublisher<ByteBuffer> publisher, 129 Executor executor) { 130 this.return2Cache = return2Cache; 131 final jdk.incubator.http.HttpResponse.BodyProcessor<T> pusher = p; 132 final CompletableFuture<T> cf = p.getBody().toCompletableFuture(); 133 134 int clen0; 135 try { 136 clen0 = headers.getContentLength(); 137 } catch (IOException ex) { 138 cf.completeExceptionally(ex); 139 return cf; 140 } 141 final int clen = fixupContentLen(clen0); 142 143 executor.execute(() -> { 144 try { 145 content = new ResponseContent( 146 connection, clen, headers, pusher, 147 publisher.asDataConsumer(), 148 (t -> { 149 publisher.acceptError(t); 150 connection.close(); 151 cf.completeExceptionally(t); 152 }), 153 () -> onFinished() 154 ); 155 publisher.subscribe(p); 156 if (cf.isCompletedExceptionally()) { 157 // if an error occurs during subscription 158 connection.close(); 159 return; 160 } 161 content.pushBody(buffer); 162 } catch (Throwable t) { 163 cf.completeExceptionally(t); 164 } 165 }); 166 return cf; 167 } 168 169 private void onFinished() { 170 if (return2Cache) { 171 Log.logTrace("Returning connection to the pool: {0}", connection); 172 connection.returnToCache(headers); 173 } 174 } 175 176 private void logHeaders(ResponseHeaders headers) { 177 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 178 Log.dumpHeaders(sb, " ", headers); 179 Log.logHeaders(sb.toString()); 180 } 181 182 Response response() { 183 return response; 184 } 185 186 boolean redirecting() { 187 return redirecting; 188 } 189 190 HttpHeaders responseHeaders() { 191 return headers; 192 } 193 194 int responseCode() { 195 return responseCode; 196 } 197 198 static final char CR = '\r'; 199 static final char LF = '\n'; 200 201 private int obtainBuffer() throws IOException { 202 int n = buffer.remaining(); 203 204 if (n == 0) { 205 buffer = connection.read(); 206 if (buffer == null) { 207 return -1; 208 } 209 n = buffer.remaining(); 210 } 211 return n; 212 } 213 214 String readStatusLine() throws IOException { 215 boolean cr = false; 216 StringBuilder statusLine = new StringBuilder(128); 217 while ((obtainBuffer()) != -1) { 218 byte[] buf = buffer.array(); 219 int offset = buffer.position(); 220 int len = buffer.limit() - offset; 221 222 for (int i = 0; i < len; i++) { 223 char c = (char) buf[i+offset]; 224 225 if (cr) { 226 if (c == LF) { 227 buffer.position(i + 1 + offset); 228 return statusLine.toString(); 229 } else { 230 throw new IOException("invalid status line"); 231 } 232 } 233 if (c == CR) { 234 cr = true; 235 } else { 236 statusLine.append(c); 237 } 238 } 239 // unlikely, but possible, that multiple reads required 240 buffer.position(buffer.limit()); 241 } 242 return null; 243 } 244 }