1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.net.URI;
  31 import java.nio.ByteBuffer;
  32 import java.util.ArrayList;
  33 import java.util.List;
  34 import java.util.Map;
  35 import java.net.InetSocketAddress;
  36 import java.util.Objects;
  37 import java.util.concurrent.Flow;
  38 
  39 import jdk.incubator.http.Http1Exchange.Http1BodySubscriber;
  40 import jdk.incubator.http.internal.common.HttpHeadersImpl;
  41 import jdk.incubator.http.internal.common.Log;
  42 import jdk.incubator.http.internal.common.Utils;
  43 
  44 import static java.nio.charset.StandardCharsets.US_ASCII;
  45 
  46 /**
  47  *  An HTTP/1.1 request.
  48  */
  49 class Http1Request {
  50     private final HttpRequestImpl request;
  51     private final Http1Exchange<?> http1Exchange;
  52     private final HttpConnection connection;
  53     private final HttpRequest.BodyPublisher requestPublisher;
  54     private final HttpHeaders userHeaders;
  55     private final HttpHeadersImpl systemHeaders;
  56     private volatile boolean streaming;
  57     private volatile long contentLength;
  58 
  59     Http1Request(HttpRequestImpl request,
  60                  Http1Exchange<?> http1Exchange)
  61         throws IOException
  62     {
  63         this.request = request;
  64         this.http1Exchange = http1Exchange;
  65         this.connection = http1Exchange.connection();
  66         this.requestPublisher = request.requestPublisher;  // may be null
  67         this.userHeaders = request.getUserHeaders();
  68         this.systemHeaders = request.getSystemHeaders();
  69     }
  70 
  71     private void logHeaders(String completeHeaders) {
  72         if (Log.headers()) {
  73             //StringBuilder sb = new StringBuilder(256);
  74             //sb.append("REQUEST HEADERS:\n");
  75             //Log.dumpHeaders(sb, "    ", systemHeaders);
  76             //Log.dumpHeaders(sb, "    ", userHeaders);
  77             //Log.logHeaders(sb.toString());
  78 
  79             String s = completeHeaders.replaceAll("\r\n", "\n");
  80             Log.logHeaders("REQUEST HEADERS:\n" + s);
  81         }
  82     }
  83 
  84     private void collectHeaders0(StringBuilder sb) {
  85         collectHeaders1(sb, systemHeaders);
  86         collectHeaders1(sb, userHeaders);
  87         sb.append("\r\n");
  88     }
  89 
  90     private void collectHeaders1(StringBuilder sb, HttpHeaders headers) {
  91         for (Map.Entry<String,List<String>> entry : headers.map().entrySet()) {
  92             String key = entry.getKey();
  93             List<String> values = entry.getValue();
  94             for (String value : values) {
  95                 sb.append(key).append(": ").append(value).append("\r\n");
  96             }
  97         }
  98     }
  99 
 100     private String getPathAndQuery(URI uri) {
 101         String path = uri.getPath();
 102         String query = uri.getQuery();
 103         if (path == null || path.equals("")) {
 104             path = "/";
 105         }
 106         if (query == null) {
 107             query = "";
 108         }
 109         if (query.equals("")) {
 110             return path;
 111         } else {
 112             return path + "?" + query;
 113         }
 114     }
 115 
 116     private String authorityString(InetSocketAddress addr) {
 117         return addr.getHostString() + ":" + addr.getPort();
 118     }
 119 
 120     private String hostString() {
 121         URI uri = request.uri();
 122         int port = uri.getPort();
 123         String host = uri.getHost();
 124 
 125         boolean defaultPort;
 126         if (port == -1) {
 127             defaultPort = true;
 128         } else if (request.secure()) {
 129             defaultPort = port == 443;
 130         } else {
 131             defaultPort = port == 80;
 132         }
 133 
 134         if (defaultPort) {
 135             return host;
 136         } else {
 137             return host + ":" + Integer.toString(port);
 138         }
 139     }
 140 
 141     private String requestURI() {
 142         URI uri = request.uri();
 143         String method = request.method();
 144 
 145         if ((request.proxy() == null && !method.equals("CONNECT"))
 146                 || request.isWebSocket()) {
 147             return getPathAndQuery(uri);
 148         }
 149         if (request.secure()) {
 150             if (request.method().equals("CONNECT")) {
 151                 // use authority for connect itself
 152                 return authorityString(request.authority());
 153             } else {
 154                 // requests over tunnel do not require full URL
 155                 return getPathAndQuery(uri);
 156             }
 157         }
 158         if (request.method().equals("CONNECT")) {
 159             // use authority for connect itself
 160             return authorityString(request.authority());
 161         }
 162 
 163         return uri == null? authorityString(request.authority()) : uri.toString();
 164     }
 165 
 166     private boolean finished;
 167 
 168     synchronized boolean finished() {
 169         return  finished;
 170     }
 171 
 172     synchronized void setFinished() {
 173         finished = true;
 174     }
 175 
 176     List<ByteBuffer> headers() {
 177         if (Log.requests() && request != null) {
 178             Log.logRequest(request.toString());
 179         }
 180         String uriString = requestURI();
 181         StringBuilder sb = new StringBuilder(64);
 182         sb.append(request.method())
 183           .append(' ')
 184           .append(uriString)
 185           .append(" HTTP/1.1\r\n");
 186 
 187         URI uri = request.uri();
 188         if (uri != null) {
 189             systemHeaders.setHeader("Host", hostString());
 190         }
 191         if (request == null || requestPublisher == null) {
 192             // Not a user request, or maybe a method, e.g. GET, with no body.
 193             contentLength = 0;
 194         } else {
 195             contentLength = requestPublisher.contentLength();
 196         }
 197 
 198         if (contentLength == 0) {
 199             systemHeaders.setHeader("Content-Length", "0");
 200         } else if (contentLength > 0) {
 201             systemHeaders.setHeader("Content-Length", Long.toString(contentLength));
 202             streaming = false;
 203         } else {
 204             streaming = true;
 205             systemHeaders.setHeader("Transfer-encoding", "chunked");
 206         }
 207         collectHeaders0(sb);
 208         String hs = sb.toString();
 209         logHeaders(hs);
 210         ByteBuffer b = ByteBuffer.wrap(hs.getBytes(US_ASCII));
 211         return List.of(b);
 212     }
 213 
 214     Http1BodySubscriber continueRequest()  {
 215         Http1BodySubscriber subscriber;
 216         if (streaming) {
 217             subscriber = new StreamSubscriber();
 218             requestPublisher.subscribe(subscriber);
 219         } else {
 220             if (contentLength == 0)
 221                 return null;
 222 
 223             subscriber = new FixedContentSubscriber();
 224             requestPublisher.subscribe(subscriber);
 225         }
 226         return subscriber;
 227     }
 228 
 229     class StreamSubscriber extends Http1BodySubscriber {
 230 
 231         @Override
 232         public void onSubscribe(Flow.Subscription subscription) {
 233             if (this.subscription != null) {
 234                 Throwable t = new IllegalStateException("already subscribed");
 235                 http1Exchange.appendToOutgoing(t);
 236             } else {
 237                 this.subscription = subscription;
 238             }
 239         }
 240 
 241         @Override
 242         public void onNext(ByteBuffer item) {
 243             Objects.requireNonNull(item);
 244             if (complete) {
 245                 Throwable t = new IllegalStateException("subscription already completed");
 246                 http1Exchange.appendToOutgoing(t);
 247             } else {
 248                 int chunklen = item.remaining();
 249                 ArrayList<ByteBuffer> l = new ArrayList<>(3);
 250                 l.add(getHeader(chunklen));
 251                 l.add(item);
 252                 l.add(ByteBuffer.wrap(CRLF));
 253                 http1Exchange.appendToOutgoing(l);
 254             }
 255         }
 256 
 257         @Override
 258         public void onError(Throwable throwable) {
 259             if (complete)
 260                 return;
 261 
 262             subscription.cancel();
 263             http1Exchange.appendToOutgoing(throwable);
 264         }
 265 
 266         @Override
 267         public void onComplete() {
 268             if (complete) {
 269                 Throwable t = new IllegalStateException("subscription already completed");
 270                 http1Exchange.appendToOutgoing(t);
 271             } else {
 272                 ArrayList<ByteBuffer> l = new ArrayList<>(2);
 273                 l.add(ByteBuffer.wrap(EMPTY_CHUNK_BYTES));
 274                 l.add(ByteBuffer.wrap(CRLF));
 275                 complete = true;
 276                 //setFinished();
 277                 http1Exchange.appendToOutgoing(l);
 278                 http1Exchange.appendToOutgoing(COMPLETED);
 279                 setFinished();  // TODO: before or after,? does it matter?
 280 
 281             }
 282         }
 283     }
 284 
 285     class FixedContentSubscriber extends Http1BodySubscriber {
 286 
 287         private volatile long contentWritten;
 288 
 289         @Override
 290         public void onSubscribe(Flow.Subscription subscription) {
 291             if (this.subscription != null) {
 292                 Throwable t = new IllegalStateException("already subscribed");
 293                 http1Exchange.appendToOutgoing(t);
 294             } else {
 295                 this.subscription = subscription;
 296             }
 297         }
 298 
 299         @Override
 300         public void onNext(ByteBuffer item) {
 301             debug.log(Level.DEBUG, "onNext");
 302             Objects.requireNonNull(item);
 303             if (complete) {
 304                 Throwable t = new IllegalStateException("subscription already completed");
 305                 http1Exchange.appendToOutgoing(t);
 306             } else {
 307                 long writing = item.remaining();
 308                 long written = (contentWritten += writing);
 309 
 310                 if (written > contentLength) {
 311                     subscription.cancel();
 312                     String msg = connection.getConnectionFlow()
 313                                   + " [" + Thread.currentThread().getName() +"] "
 314                                   + "Too many bytes in request body. Expected: "
 315                                   + contentLength + ", got: " + written;
 316                     http1Exchange.appendToOutgoing(new IOException(msg));
 317                 } else {
 318                     http1Exchange.appendToOutgoing(List.of(item));
 319                 }
 320             }
 321         }
 322 
 323         @Override
 324         public void onError(Throwable throwable) {
 325             debug.log(Level.DEBUG, "onError");
 326             if (complete)  // TODO: error?
 327                 return;
 328 
 329             subscription.cancel();
 330             http1Exchange.appendToOutgoing(throwable);
 331         }
 332 
 333         @Override
 334         public void onComplete() {
 335             debug.log(Level.DEBUG, "onComplete");
 336             if (complete) {
 337                 Throwable t = new IllegalStateException("subscription already completed");
 338                 http1Exchange.appendToOutgoing(t);
 339             } else {
 340                 complete = true;
 341                 long written = contentWritten;
 342                 if (contentLength > written) {
 343                     subscription.cancel();
 344                     Throwable t = new IOException(connection.getConnectionFlow()
 345                                          + " [" + Thread.currentThread().getName() +"] "
 346                                          + "Too few bytes returned by the publisher ("
 347                                                   + written + "/"
 348                                                   + contentLength + ")");
 349                     http1Exchange.appendToOutgoing(t);
 350                 } else {
 351                     http1Exchange.appendToOutgoing(COMPLETED);
 352                 }
 353             }
 354         }
 355     }
 356 
 357     private static final byte[] CRLF = {'\r', '\n'};
 358     private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'};
 359 
 360     /** Returns a header for a particular chunk size */
 361     private static ByteBuffer getHeader(int size) {
 362         String hexStr = Integer.toHexString(size);
 363         byte[] hexBytes = hexStr.getBytes(US_ASCII);
 364         byte[] header = new byte[hexStr.length()+2];
 365         System.arraycopy(hexBytes, 0, header, 0, hexBytes.length);
 366         header[hexBytes.length] = CRLF[0];
 367         header[hexBytes.length+1] = CRLF[1];
 368         return ByteBuffer.wrap(header);
 369     }
 370 
 371     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
 372     final System.Logger  debug = Utils.getDebugLogger(this::toString, DEBUG);
 373 
 374 }