1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.net.URI;
  30 import java.nio.ByteBuffer;
  31 import java.util.List;
  32 import java.util.Map;
  33 import java.util.Set;
  34 import java.net.InetSocketAddress;
  35 import jdk.incubator.http.HttpConnection.Mode;
  36 import java.nio.charset.StandardCharsets;
  37 import static java.nio.charset.StandardCharsets.US_ASCII;
  38 import java.util.concurrent.CompletableFuture;
  39 import java.util.concurrent.CompletionException;
  40 import java.util.concurrent.Flow;
  41 
  42 import jdk.incubator.http.internal.common.HttpHeadersImpl;
  43 import jdk.incubator.http.internal.common.Log;
  44 import jdk.incubator.http.internal.common.MinimalFuture;
  45 import jdk.incubator.http.internal.common.Utils;
  46 
  47 /**
  48  *  A HTTP/1.1 request.
  49  *
  50  * send() -> Writes the request + body to the given channel, in one blocking
  51  * operation.
  52  */
  53 class Http1Request {
  54     final HttpClientImpl client;
  55     final HttpRequestImpl request;
  56     final HttpConnection chan;
  57     // Multiple buffers are used to hold different parts of request
  58     // See line 206 and below for description
  59     final ByteBuffer[] buffers;
  60     final HttpRequest.BodyProcessor requestProc;
  61     final HttpHeaders userHeaders;
  62     final HttpHeadersImpl systemHeaders;
  63     boolean streaming;
  64     long contentLength;
  65     final CompletableFuture<Void> cf;
  66 
  67     Http1Request(HttpRequestImpl request, HttpClientImpl client, HttpConnection connection)
  68         throws IOException
  69     {
  70         this.client = client;
  71         this.request = request;
  72         this.chan = connection;
  73         buffers = new ByteBuffer[5]; // TODO: check
  74         this.requestProc = request.requestProcessor;
  75         this.userHeaders = request.getUserHeaders();
  76         this.systemHeaders = request.getSystemHeaders();
  77         this.cf = new MinimalFuture<>();
  78     }
  79 
  80     private void logHeaders() throws IOException {
  81         StringBuilder sb = new StringBuilder(256);
  82         sb.append("REQUEST HEADERS:\n");
  83         Log.dumpHeaders(sb, "    ", systemHeaders);
  84         Log.dumpHeaders(sb, "    ", userHeaders);
  85         Log.logHeaders(sb.toString());
  86     }
  87 
  88     private void dummy(long x) {
  89         // not used in this class
  90     }
  91 
  92     private void collectHeaders0() throws IOException {
  93         if (Log.headers()) {
  94             logHeaders();
  95         }
  96         StringBuilder sb = new StringBuilder(256);
  97         collectHeaders1(sb, request, systemHeaders);
  98         collectHeaders1(sb, request, userHeaders);
  99         sb.append("\r\n");
 100         String headers = sb.toString();
 101         buffers[1] = ByteBuffer.wrap(headers.getBytes(StandardCharsets.US_ASCII));
 102     }
 103 
 104     private void collectHeaders1(StringBuilder sb,
 105                                  HttpRequestImpl request,
 106                                  HttpHeaders headers)
 107         throws IOException
 108     {
 109         Map<String,List<String>> h = headers.map();
 110         Set<Map.Entry<String,List<String>>> entries = h.entrySet();
 111 
 112         for (Map.Entry<String,List<String>> entry : entries) {
 113             String key = entry.getKey();
 114             List<String> values = entry.getValue();
 115             for (String value : values) {
 116                 sb.append(key)
 117                   .append(": ")
 118                   .append(value)
 119                   .append("\r\n");
 120             }
 121         }
 122     }
 123 
 124     private String getPathAndQuery(URI uri) {
 125         String path = uri.getPath();
 126         String query = uri.getQuery();
 127         if (path == null || path.equals("")) {
 128             path = "/";
 129         }
 130         if (query == null) {
 131             query = "";
 132         }
 133         if (query.equals("")) {
 134             return path;
 135         } else {
 136             return path + "?" + query;
 137         }
 138     }
 139 
 140     private String authorityString(InetSocketAddress addr) {
 141         return addr.getHostString() + ":" + addr.getPort();
 142     }
 143 
 144     private String hostString() {
 145         URI uri = request.uri();
 146         int port = uri.getPort();
 147         String host = uri.getHost();
 148 
 149         boolean defaultPort;
 150         if (port == -1) {
 151             defaultPort = true;
 152         } else if (request.secure()) {
 153             defaultPort = port == 443;
 154         } else {
 155             defaultPort = port == 80;
 156         }
 157 
 158         if (defaultPort) {
 159             return host;
 160         } else {
 161             return host + ":" + Integer.toString(port);
 162         }
 163     }
 164 
 165     private String requestURI() {
 166         URI uri = request.uri();
 167         String method = request.method();
 168 
 169         if ((request.proxy(client) == null && !method.equals("CONNECT"))
 170                 || request.isWebSocket()) {
 171             return getPathAndQuery(uri);
 172         }
 173         if (request.secure()) {
 174             if (request.method().equals("CONNECT")) {
 175                 // use authority for connect itself
 176                 return authorityString(request.authority());
 177             } else {
 178                 // requests over tunnel do not require full URL
 179                 return getPathAndQuery(uri);
 180             }
 181         }
 182         return uri == null? authorityString(request.authority()) : uri.toString();
 183     }
 184 
 185     void sendHeadersOnly() throws IOException {
 186         collectHeaders();
 187         chan.write(buffers, 0, 2);
 188     }
 189 
 190     void sendRequest() throws IOException {
 191         collectHeaders();
 192         chan.configureMode(Mode.BLOCKING);
 193         if (contentLength == 0) {
 194             chan.write(buffers, 0, 2);
 195         } else if (contentLength > 0) {
 196             writeFixedContent(true);
 197         } else {
 198             writeStreamedContent(true);
 199         }
 200         setFinished();
 201     }
 202 
 203     private boolean finished;
 204 
 205     synchronized boolean finished() {
 206         return  finished;
 207     }
 208 
 209     synchronized void setFinished() {
 210         finished = true;
 211     }
 212 
 213     private void collectHeaders() throws IOException {
 214         if (Log.requests() && request != null) {
 215             Log.logRequest(request.toString());
 216         }
 217         String uriString = requestURI();
 218         StringBuilder sb = new StringBuilder(64);
 219         sb.append(request.method())
 220           .append(' ')
 221           .append(uriString)
 222           .append(" HTTP/1.1\r\n");
 223         String cmd = sb.toString();
 224 
 225         buffers[0] = ByteBuffer.wrap(cmd.getBytes(StandardCharsets.US_ASCII));
 226         URI uri = request.uri();
 227         if (uri != null) {
 228             systemHeaders.setHeader("Host", hostString());
 229         }
 230         if (request == null) {
 231             // this is not a user request. No content
 232             contentLength = 0;
 233         } else {
 234             contentLength = requestProc.contentLength();
 235         }
 236 
 237         if (contentLength == 0) {
 238             systemHeaders.setHeader("Content-Length", "0");
 239             collectHeaders0();
 240         } else if (contentLength > 0) {
 241             /* [0] request line [1] headers [2] body  */
 242             systemHeaders.setHeader("Content-Length",
 243                                     Integer.toString((int) contentLength));
 244             streaming = false;
 245             collectHeaders0();
 246             buffers[2] = getBuffer();
 247         } else {
 248             /* Chunked:
 249              *
 250              * [0] request line [1] headers [2] chunk header [3] chunk data [4]
 251              * final chunk header and trailing CRLF of previous chunks
 252              *
 253              * 2,3,4 used repeatedly */
 254             streaming = true;
 255             systemHeaders.setHeader("Transfer-encoding", "chunked");
 256             collectHeaders0();
 257             buffers[3] = getBuffer();
 258         }
 259     }
 260 
 261     private ByteBuffer getBuffer() {
 262         return ByteBuffer.allocate(Utils.BUFSIZE);
 263     }
 264 
 265     // The following two methods used by Http1Exchange to handle expect continue
 266 
 267     void continueRequest() throws IOException {
 268         if (streaming) {
 269             writeStreamedContent(false);
 270         } else {
 271             writeFixedContent(false);
 272         }
 273         setFinished();
 274     }
 275 
 276     class StreamSubscriber implements Flow.Subscriber<ByteBuffer> {
 277         volatile Flow.Subscription subscription;
 278         volatile boolean includeHeaders;
 279 
 280         StreamSubscriber(boolean includeHeaders) {
 281             this.includeHeaders = includeHeaders;
 282         }
 283 
 284         @Override
 285         public void onSubscribe(Flow.Subscription subscription) {
 286             if (this.subscription != null) {
 287                 throw new IllegalStateException("already subscribed");
 288             }
 289             this.subscription = subscription;
 290             subscription.request(1);
 291         }
 292 
 293         @Override
 294         public void onNext(ByteBuffer item) {
 295             int startbuf, nbufs;
 296 
 297             if (cf.isDone()) {
 298                 throw new IllegalStateException("subscription already completed");
 299             }
 300 
 301             if (includeHeaders) {
 302                 startbuf = 0;
 303                 nbufs = 5;
 304             } else {
 305                 startbuf = 2;
 306                 nbufs = 3;
 307             }
 308             int chunklen = item.remaining();
 309             buffers[3] = item;
 310             buffers[2] = getHeader(chunklen);
 311             buffers[4] = CRLF_BUFFER();
 312             try {
 313                 chan.write(buffers, startbuf, nbufs);
 314             } catch (IOException e) {
 315                 subscription.cancel();
 316                 cf.completeExceptionally(e);
 317             }
 318             includeHeaders = false;
 319             subscription.request(1);
 320         }
 321 
 322         @Override
 323         public void onError(Throwable throwable) {
 324             if (cf.isDone()) {
 325                 return;
 326             }
 327             subscription.cancel();
 328             cf.completeExceptionally(throwable);
 329         }
 330 
 331         @Override
 332         public void onComplete() {
 333             if (cf.isDone()) {
 334                 throw new IllegalStateException("subscription already completed");
 335             }
 336             buffers[3] = EMPTY_CHUNK_HEADER();
 337             buffers[4] = CRLF_BUFFER();
 338             try {
 339                 chan.write(buffers, 3, 2);
 340             } catch (IOException ex) {
 341                 cf.completeExceptionally(ex);
 342                 return;
 343             }
 344             cf.complete(null);
 345         }
 346     }
 347 
 348     private void waitForCompletion() throws IOException {
 349         try {
 350             cf.join();
 351         } catch (CompletionException e) {
 352             throw Utils.getIOException(e);
 353         }
 354     }
 355 
 356     /* Entire request is sent, or just body only  */
 357     private void writeStreamedContent(boolean includeHeaders)
 358         throws IOException
 359     {
 360         StreamSubscriber subscriber = new StreamSubscriber(includeHeaders);
 361         requestProc.subscribe(subscriber);
 362         waitForCompletion();
 363     }
 364 
 365     class FixedContentSubscriber implements Flow.Subscriber<ByteBuffer>
 366     {
 367         volatile Flow.Subscription subscription;
 368         volatile boolean includeHeaders;
 369         volatile long contentWritten = 0;
 370 
 371         FixedContentSubscriber(boolean includeHeaders) {
 372             this.includeHeaders = includeHeaders;
 373         }
 374 
 375         @Override
 376         public void onSubscribe(Flow.Subscription subscription) {
 377             if (this.subscription != null) {
 378                 throw new IllegalStateException("already subscribed");
 379             }
 380             this.subscription = subscription;
 381             subscription.request(1);
 382         }
 383 
 384         @Override
 385         public void onNext(ByteBuffer item) {
 386             int startbuf, nbufs;
 387             long headersLength;
 388 
 389             if (includeHeaders) {
 390                 startbuf = 0;
 391                 nbufs = 3;
 392                 headersLength = buffers[0].remaining() + buffers[1].remaining();
 393             } else {
 394                 startbuf = 2;
 395                 nbufs = 1;
 396                 headersLength = 0;
 397             }
 398             buffers[2] = item;
 399             try {
 400                 long writing = buffers[2].remaining() + headersLength;
 401                 contentWritten += buffers[2].remaining();
 402                 chan.checkWrite(writing, buffers, startbuf, nbufs);
 403 
 404                 if (contentWritten > contentLength) {
 405                     String msg = "Too many bytes in request body. Expected: " +
 406                         Long.toString(contentLength) + " Sent: " +
 407                         Long.toString(contentWritten);
 408                     throw new IOException(msg);
 409                 }
 410                 subscription.request(1);
 411             } catch (IOException e) {
 412                 subscription.cancel();
 413                 cf.completeExceptionally(e);
 414             }
 415         }
 416 
 417         @Override
 418         public void onError(Throwable throwable) {
 419             if (cf.isDone()) {
 420                 return;
 421             }
 422             subscription.cancel();
 423             cf.completeExceptionally(throwable);
 424         }
 425 
 426         @Override
 427         public void onComplete() {
 428             if (cf.isDone()) {
 429                 throw new IllegalStateException("subscription already completed");
 430             }
 431 
 432             if (contentLength > contentWritten) {
 433                 subscription.cancel();
 434                 Exception e = new IOException("Too few bytes returned by the processor");
 435                 cf.completeExceptionally(e);
 436             } else {
 437                 cf.complete(null);
 438             }
 439         }
 440     }
 441 
 442     /* Entire request is sent, or just body only */
 443     private void writeFixedContent(boolean includeHeaders)
 444             throws IOException {
 445         if (contentLength == 0) {
 446             return;
 447         }
 448         FixedContentSubscriber subscriber = new FixedContentSubscriber(includeHeaders);
 449         requestProc.subscribe(subscriber);
 450         waitForCompletion();
 451     }
 452 
 453     private static final byte[] CRLF = {'\r', '\n'};
 454     private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'};
 455 
 456     private ByteBuffer CRLF_BUFFER() {
 457         return ByteBuffer.wrap(CRLF);
 458     }
 459 
 460     private ByteBuffer EMPTY_CHUNK_HEADER() {
 461         return ByteBuffer.wrap(EMPTY_CHUNK_BYTES);
 462     }
 463 
 464     /* Returns a header for a particular chunk size */
 465     private static ByteBuffer getHeader(int size){
 466         String hexStr =  Integer.toHexString(size);
 467         byte[] hexBytes = hexStr.getBytes(US_ASCII);
 468         byte[] header = new byte[hexStr.length()+2];
 469         System.arraycopy(hexBytes, 0, header, 0, hexBytes.length);
 470         header[hexBytes.length] = CRLF[0];
 471         header[hexBytes.length+1] = CRLF[1];
 472         return ByteBuffer.wrap(header);
 473     }
 474 }