1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.net.URI; 30 import java.nio.ByteBuffer; 31 import java.util.List; 32 import java.util.Map; 33 import java.util.Set; 34 import java.net.InetSocketAddress; 35 import jdk.incubator.http.HttpConnection.Mode; 36 import java.nio.charset.StandardCharsets; 37 import static java.nio.charset.StandardCharsets.US_ASCII; 38 import java.util.concurrent.CompletableFuture; 39 import java.util.concurrent.CompletionException; 40 import java.util.concurrent.Flow; 41 42 import jdk.incubator.http.internal.common.HttpHeadersImpl; 43 import jdk.incubator.http.internal.common.Log; 44 import jdk.incubator.http.internal.common.MinimalFuture; 45 import jdk.incubator.http.internal.common.Utils; 46 47 /** 48 * A HTTP/1.1 request. 49 * 50 * send() -> Writes the request + body to the given channel, in one blocking 51 * operation. 52 */ 53 class Http1Request { 54 final HttpClientImpl client; 55 final HttpRequestImpl request; 56 final HttpConnection chan; 57 // Multiple buffers are used to hold different parts of request 58 // See line 206 and below for description 59 final ByteBuffer[] buffers; 60 final HttpRequest.BodyProcessor requestProc; 61 final HttpHeaders userHeaders; 62 final HttpHeadersImpl systemHeaders; 63 boolean streaming; 64 long contentLength; 65 final CompletableFuture<Void> cf; 66 67 Http1Request(HttpRequestImpl request, HttpClientImpl client, HttpConnection connection) 68 throws IOException 69 { 70 this.client = client; 71 this.request = request; 72 this.chan = connection; 73 buffers = new ByteBuffer[5]; // TODO: check 74 this.requestProc = request.requestProcessor; 75 this.userHeaders = request.getUserHeaders(); 76 this.systemHeaders = request.getSystemHeaders(); 77 this.cf = new MinimalFuture<>(); 78 } 79 80 private void logHeaders() throws IOException { 81 StringBuilder sb = new StringBuilder(256); 82 sb.append("REQUEST HEADERS:\n"); 83 Log.dumpHeaders(sb, " ", systemHeaders); 84 Log.dumpHeaders(sb, " ", userHeaders); 85 Log.logHeaders(sb.toString()); 86 } 87 88 private void dummy(long x) { 89 // not used in this class 90 } 91 92 private void collectHeaders0() throws IOException { 93 if (Log.headers()) { 94 logHeaders(); 95 } 96 StringBuilder sb = new StringBuilder(256); 97 collectHeaders1(sb, request, systemHeaders); 98 collectHeaders1(sb, request, userHeaders); 99 sb.append("\r\n"); 100 String headers = sb.toString(); 101 buffers[1] = ByteBuffer.wrap(headers.getBytes(StandardCharsets.US_ASCII)); 102 } 103 104 private void collectHeaders1(StringBuilder sb, 105 HttpRequestImpl request, 106 HttpHeaders headers) 107 throws IOException 108 { 109 Map<String,List<String>> h = headers.map(); 110 Set<Map.Entry<String,List<String>>> entries = h.entrySet(); 111 112 for (Map.Entry<String,List<String>> entry : entries) { 113 String key = entry.getKey(); 114 List<String> values = entry.getValue(); 115 for (String value : values) { 116 sb.append(key) 117 .append(": ") 118 .append(value) 119 .append("\r\n"); 120 } 121 } 122 } 123 124 private String getPathAndQuery(URI uri) { 125 String path = uri.getPath(); 126 String query = uri.getQuery(); 127 if (path == null || path.equals("")) { 128 path = "/"; 129 } 130 if (query == null) { 131 query = ""; 132 } 133 if (query.equals("")) { 134 return path; 135 } else { 136 return path + "?" + query; 137 } 138 } 139 140 private String authorityString(InetSocketAddress addr) { 141 return addr.getHostString() + ":" + addr.getPort(); 142 } 143 144 private String hostString() { 145 URI uri = request.uri(); 146 int port = uri.getPort(); 147 String host = uri.getHost(); 148 149 boolean defaultPort; 150 if (port == -1) { 151 defaultPort = true; 152 } else if (request.secure()) { 153 defaultPort = port == 443; 154 } else { 155 defaultPort = port == 80; 156 } 157 158 if (defaultPort) { 159 return host; 160 } else { 161 return host + ":" + Integer.toString(port); 162 } 163 } 164 165 private String requestURI() { 166 URI uri = request.uri(); 167 String method = request.method(); 168 169 if ((request.proxy(client) == null && !method.equals("CONNECT")) 170 || request.isWebSocket()) { 171 return getPathAndQuery(uri); 172 } 173 if (request.secure()) { 174 if (request.method().equals("CONNECT")) { 175 // use authority for connect itself 176 return authorityString(request.authority()); 177 } else { 178 // requests over tunnel do not require full URL 179 return getPathAndQuery(uri); 180 } 181 } 182 return uri == null? authorityString(request.authority()) : uri.toString(); 183 } 184 185 void sendHeadersOnly() throws IOException { 186 collectHeaders(); 187 chan.write(buffers, 0, 2); 188 } 189 190 void sendRequest() throws IOException { 191 collectHeaders(); 192 chan.configureMode(Mode.BLOCKING); 193 if (contentLength == 0) { 194 chan.write(buffers, 0, 2); 195 } else if (contentLength > 0) { 196 writeFixedContent(true); 197 } else { 198 writeStreamedContent(true); 199 } 200 setFinished(); 201 } 202 203 private boolean finished; 204 205 synchronized boolean finished() { 206 return finished; 207 } 208 209 synchronized void setFinished() { 210 finished = true; 211 } 212 213 private void collectHeaders() throws IOException { 214 if (Log.requests() && request != null) { 215 Log.logRequest(request.toString()); 216 } 217 String uriString = requestURI(); 218 StringBuilder sb = new StringBuilder(64); 219 sb.append(request.method()) 220 .append(' ') 221 .append(uriString) 222 .append(" HTTP/1.1\r\n"); 223 String cmd = sb.toString(); 224 225 buffers[0] = ByteBuffer.wrap(cmd.getBytes(StandardCharsets.US_ASCII)); 226 URI uri = request.uri(); 227 if (uri != null) { 228 systemHeaders.setHeader("Host", hostString()); 229 } 230 if (request == null) { 231 // this is not a user request. No content 232 contentLength = 0; 233 } else { 234 contentLength = requestProc.contentLength(); 235 } 236 237 if (contentLength == 0) { 238 systemHeaders.setHeader("Content-Length", "0"); 239 collectHeaders0(); 240 } else if (contentLength > 0) { 241 /* [0] request line [1] headers [2] body */ 242 systemHeaders.setHeader("Content-Length", 243 Integer.toString((int) contentLength)); 244 streaming = false; 245 collectHeaders0(); 246 buffers[2] = getBuffer(); 247 } else { 248 /* Chunked: 249 * 250 * [0] request line [1] headers [2] chunk header [3] chunk data [4] 251 * final chunk header and trailing CRLF of previous chunks 252 * 253 * 2,3,4 used repeatedly */ 254 streaming = true; 255 systemHeaders.setHeader("Transfer-encoding", "chunked"); 256 collectHeaders0(); 257 buffers[3] = getBuffer(); 258 } 259 } 260 261 private ByteBuffer getBuffer() { 262 return ByteBuffer.allocate(Utils.BUFSIZE); 263 } 264 265 // The following two methods used by Http1Exchange to handle expect continue 266 267 void continueRequest() throws IOException { 268 if (streaming) { 269 writeStreamedContent(false); 270 } else { 271 writeFixedContent(false); 272 } 273 setFinished(); 274 } 275 276 class StreamSubscriber implements Flow.Subscriber<ByteBuffer> { 277 volatile Flow.Subscription subscription; 278 volatile boolean includeHeaders; 279 280 StreamSubscriber(boolean includeHeaders) { 281 this.includeHeaders = includeHeaders; 282 } 283 284 @Override 285 public void onSubscribe(Flow.Subscription subscription) { 286 if (this.subscription != null) { 287 throw new IllegalStateException("already subscribed"); 288 } 289 this.subscription = subscription; 290 subscription.request(1); 291 } 292 293 @Override 294 public void onNext(ByteBuffer item) { 295 int startbuf, nbufs; 296 297 if (cf.isDone()) { 298 throw new IllegalStateException("subscription already completed"); 299 } 300 301 if (includeHeaders) { 302 startbuf = 0; 303 nbufs = 5; 304 } else { 305 startbuf = 2; 306 nbufs = 3; 307 } 308 int chunklen = item.remaining(); 309 buffers[3] = item; 310 buffers[2] = getHeader(chunklen); 311 buffers[4] = CRLF_BUFFER(); 312 try { 313 chan.write(buffers, startbuf, nbufs); 314 } catch (IOException e) { 315 subscription.cancel(); 316 cf.completeExceptionally(e); 317 } 318 includeHeaders = false; 319 subscription.request(1); 320 } 321 322 @Override 323 public void onError(Throwable throwable) { 324 if (cf.isDone()) { 325 return; 326 } 327 subscription.cancel(); 328 cf.completeExceptionally(throwable); 329 } 330 331 @Override 332 public void onComplete() { 333 if (cf.isDone()) { 334 throw new IllegalStateException("subscription already completed"); 335 } 336 buffers[3] = EMPTY_CHUNK_HEADER(); 337 buffers[4] = CRLF_BUFFER(); 338 try { 339 chan.write(buffers, 3, 2); 340 } catch (IOException ex) { 341 cf.completeExceptionally(ex); 342 return; 343 } 344 cf.complete(null); 345 } 346 } 347 348 private void waitForCompletion() throws IOException { 349 try { 350 cf.join(); 351 } catch (CompletionException e) { 352 throw Utils.getIOException(e); 353 } 354 } 355 356 /* Entire request is sent, or just body only */ 357 private void writeStreamedContent(boolean includeHeaders) 358 throws IOException 359 { 360 StreamSubscriber subscriber = new StreamSubscriber(includeHeaders); 361 requestProc.subscribe(subscriber); 362 waitForCompletion(); 363 } 364 365 class FixedContentSubscriber implements Flow.Subscriber<ByteBuffer> 366 { 367 volatile Flow.Subscription subscription; 368 volatile boolean includeHeaders; 369 volatile long contentWritten = 0; 370 371 FixedContentSubscriber(boolean includeHeaders) { 372 this.includeHeaders = includeHeaders; 373 } 374 375 @Override 376 public void onSubscribe(Flow.Subscription subscription) { 377 if (this.subscription != null) { 378 throw new IllegalStateException("already subscribed"); 379 } 380 this.subscription = subscription; 381 subscription.request(1); 382 } 383 384 @Override 385 public void onNext(ByteBuffer item) { 386 int startbuf, nbufs; 387 long headersLength; 388 389 if (includeHeaders) { 390 startbuf = 0; 391 nbufs = 3; 392 headersLength = buffers[0].remaining() + buffers[1].remaining(); 393 } else { 394 startbuf = 2; 395 nbufs = 1; 396 headersLength = 0; 397 } 398 buffers[2] = item; 399 try { 400 long writing = buffers[2].remaining() + headersLength; 401 contentWritten += buffers[2].remaining(); 402 chan.checkWrite(writing, buffers, startbuf, nbufs); 403 404 if (contentWritten > contentLength) { 405 String msg = "Too many bytes in request body. Expected: " + 406 Long.toString(contentLength) + " Sent: " + 407 Long.toString(contentWritten); 408 throw new IOException(msg); 409 } 410 subscription.request(1); 411 } catch (IOException e) { 412 subscription.cancel(); 413 cf.completeExceptionally(e); 414 } 415 } 416 417 @Override 418 public void onError(Throwable throwable) { 419 if (cf.isDone()) { 420 return; 421 } 422 subscription.cancel(); 423 cf.completeExceptionally(throwable); 424 } 425 426 @Override 427 public void onComplete() { 428 if (cf.isDone()) { 429 throw new IllegalStateException("subscription already completed"); 430 } 431 432 if (contentLength > contentWritten) { 433 subscription.cancel(); 434 Exception e = new IOException("Too few bytes returned by the processor"); 435 cf.completeExceptionally(e); 436 } else { 437 cf.complete(null); 438 } 439 } 440 } 441 442 /* Entire request is sent, or just body only */ 443 private void writeFixedContent(boolean includeHeaders) 444 throws IOException { 445 if (contentLength == 0) { 446 return; 447 } 448 FixedContentSubscriber subscriber = new FixedContentSubscriber(includeHeaders); 449 requestProc.subscribe(subscriber); 450 waitForCompletion(); 451 } 452 453 private static final byte[] CRLF = {'\r', '\n'}; 454 private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'}; 455 456 private ByteBuffer CRLF_BUFFER() { 457 return ByteBuffer.wrap(CRLF); 458 } 459 460 private ByteBuffer EMPTY_CHUNK_HEADER() { 461 return ByteBuffer.wrap(EMPTY_CHUNK_BYTES); 462 } 463 464 /* Returns a header for a particular chunk size */ 465 private static ByteBuffer getHeader(int size){ 466 String hexStr = Integer.toHexString(size); 467 byte[] hexBytes = hexStr.getBytes(US_ASCII); 468 byte[] header = new byte[hexStr.length()+2]; 469 System.arraycopy(hexBytes, 0, header, 0, hexBytes.length); 470 header[hexBytes.length] = CRLF[0]; 471 header[hexBytes.length+1] = CRLF[1]; 472 return ByteBuffer.wrap(header); 473 } 474 }