1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.nio.ByteBuffer; 30 import java.util.List; 31 import java.util.concurrent.CompletableFuture; 32 import java.util.concurrent.Executor; 33 import jdk.incubator.http.internal.common.Log; 34 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1; 35 36 /** 37 * Handles a HTTP/1.1 response in two blocking calls. readHeaders() and 38 * readBody(). There can be more than one of these per Http exchange. 39 */ 40 class Http1Response<T> { 41 42 private volatile ResponseContent content; 43 private final HttpRequestImpl request; 44 private Response response; 45 private final HttpConnection connection; 46 private ResponseHeaders headers; 47 private int responseCode; 48 private ByteBuffer buffer; 49 private final Http1Exchange<T> exchange; 50 private final boolean redirecting; // redirecting 51 private boolean return2Cache; // return connection to cache when finished 52 53 Http1Response(HttpConnection conn, Http1Exchange<T> exchange) { 54 this.request = exchange.request(); 55 this.exchange = exchange; 56 this.connection = conn; 57 this.redirecting = false; 58 buffer = exchange.getBuffer(); 59 } 60 61 @SuppressWarnings("unchecked") 62 public void readHeaders() throws IOException { 63 String statusline = readStatusLine(); 64 if (statusline == null) { 65 if (Log.errors()) { 66 Log.logError("Connection closed. Retry"); 67 } 68 connection.close(); 69 // connection was closed 70 throw new IOException("Connection closed"); 71 } 72 if (!statusline.startsWith("HTTP/1.")) { 73 throw new IOException("Invalid status line: " + statusline); 74 } 75 if (Log.trace()) { 76 Log.logTrace("Statusline: {0}", statusline); 77 } 78 char c = statusline.charAt(7); 79 responseCode = Integer.parseInt(statusline.substring(9, 12)); 80 81 headers = new ResponseHeaders(connection, buffer); 82 if (Log.headers()) { 83 logHeaders(headers); 84 } 85 response = new Response( 86 request, exchange.getExchange(), 87 headers, responseCode, HTTP_1_1); 88 } 89 90 private boolean finished; 91 92 synchronized void completed() { 93 finished = true; 94 } 95 96 synchronized boolean finished() { 97 return finished; 98 } 99 100 ByteBuffer getBuffer() { 101 return buffer; 102 } 103 104 int fixupContentLen(int clen) { 105 if (request.method().equalsIgnoreCase("HEAD")) { 106 return 0; 107 } 108 if (clen == -1) { 109 if (headers.firstValue("Transfer-encoding").orElse("") 110 .equalsIgnoreCase("chunked")) { 111 return -1; 112 } 113 return 0; 114 } 115 return clen; 116 } 117 118 public CompletableFuture<T> readBody( 119 HttpResponse.BodyProcessor<T> p, 120 boolean return2Cache, 121 Executor executor) { 122 final BlockingPushPublisher<List<ByteBuffer>> publisher = new BlockingPushPublisher<>(); 123 return readBody(p, return2Cache, publisher, executor); 124 } 125 126 private CompletableFuture<T> readBody( 127 HttpResponse.BodyProcessor<T> p, 128 boolean return2Cache, 129 AbstractPushPublisher<List<ByteBuffer>> publisher, 130 Executor executor) { 131 this.return2Cache = return2Cache; 132 final jdk.incubator.http.HttpResponse.BodyProcessor<T> pusher = p; 133 final CompletableFuture<T> cf = p.getBody().toCompletableFuture(); 134 135 int clen0; 136 try { 137 clen0 = headers.getContentLength(); 138 } catch (IOException ex) { 139 cf.completeExceptionally(ex); 140 return cf; 141 } 142 final int clen = fixupContentLen(clen0); 143 144 executor.execute(() -> { 145 try { 146 content = new ResponseContent( 147 connection, clen, headers, pusher, 148 publisher.asDataConsumer(), 149 (t -> { 150 publisher.acceptError(t); 151 connection.close(); 152 cf.completeExceptionally(t); 153 }), 154 () -> onFinished() 155 ); 156 publisher.subscribe(p); 157 if (cf.isCompletedExceptionally()) { 158 // if an error occurs during subscription 159 connection.close(); 160 return; 161 } 162 content.pushBody(buffer); 163 } catch (Throwable t) { 164 cf.completeExceptionally(t); 165 } 166 }); 167 return cf; 168 } 169 170 private void onFinished() { 171 if (return2Cache) { 172 Log.logTrace("Returning connection to the pool: {0}", connection); 173 connection.returnToCache(headers); 174 } 175 } 176 177 private void logHeaders(ResponseHeaders headers) { 178 StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n"); 179 Log.dumpHeaders(sb, " ", headers); 180 Log.logHeaders(sb.toString()); 181 } 182 183 Response response() { 184 return response; 185 } 186 187 boolean redirecting() { 188 return redirecting; 189 } 190 191 HttpHeaders responseHeaders() { 192 return headers; 193 } 194 195 int responseCode() { 196 return responseCode; 197 } 198 199 static final char CR = '\r'; 200 static final char LF = '\n'; 201 202 private int obtainBuffer() throws IOException { 203 int n = buffer.remaining(); 204 205 if (n == 0) { 206 buffer = connection.read(); 207 if (buffer == null) { 208 return -1; 209 } 210 n = buffer.remaining(); 211 } 212 return n; 213 } 214 215 String readStatusLine() throws IOException { 216 boolean cr = false; 217 StringBuilder statusLine = new StringBuilder(128); 218 while ((obtainBuffer()) != -1) { 219 byte[] buf = buffer.array(); 220 int offset = buffer.position(); 221 int len = buffer.limit() - offset; 222 223 for (int i = 0; i < len; i++) { 224 char c = (char) buf[i+offset]; 225 226 if (cr) { 227 if (c == LF) { 228 buffer.position(i + 1 + offset); 229 return statusLine.toString(); 230 } else { 231 throw new IOException("invalid status line"); 232 } 233 } 234 if (c == CR) { 235 cr = true; 236 } else { 237 statusLine.append(c); 238 } 239 } 240 // unlikely, but possible, that multiple reads required 241 buffer.position(buffer.limit()); 242 } 243 return null; 244 } 245 }