< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java

Print this page

        

@@ -1,7 +1,7 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 only, as
  * published by the Free Software Foundation.  Oracle designates this

@@ -24,101 +24,77 @@
  */
 
 package jdk.incubator.http;
 
 import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.net.InetSocketAddress;
-import jdk.incubator.http.HttpConnection.Mode;
-import java.nio.charset.StandardCharsets;
-import static java.nio.charset.StandardCharsets.US_ASCII;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
+import java.util.Objects;
 import java.util.concurrent.Flow;
 
+import jdk.incubator.http.Http1Exchange.Http1BodySubscriber;
 import jdk.incubator.http.internal.common.HttpHeadersImpl;
 import jdk.incubator.http.internal.common.Log;
-import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Utils;
 
+import static java.nio.charset.StandardCharsets.US_ASCII;
+
 /**
- *  A HTTP/1.1 request.
- *
- * send() -> Writes the request + body to the given channel, in one blocking
- * operation.
+ *  An HTTP/1.1 request.
  */
 class Http1Request {
-    final HttpClientImpl client;
-    final HttpRequestImpl request;
-    final HttpConnection chan;
-    // Multiple buffers are used to hold different parts of request
-    // See line 206 and below for description
-    final ByteBuffer[] buffers;
-    final HttpRequest.BodyProcessor requestProc;
-    final HttpHeaders userHeaders;
-    final HttpHeadersImpl systemHeaders;
-    boolean streaming;
-    long contentLength;
-    final CompletableFuture<Void> cf;
+    private final HttpRequestImpl request;
+    private final Http1Exchange<?> http1Exchange;
+    private final HttpConnection connection;
+    private final HttpRequest.BodyPublisher requestPublisher;
+    private final HttpHeaders userHeaders;
+    private final HttpHeadersImpl systemHeaders;
+    private volatile boolean streaming;
+    private volatile long contentLength;
 
-    Http1Request(HttpRequestImpl request, HttpClientImpl client, HttpConnection connection)
+    Http1Request(HttpRequestImpl request,
+                 Http1Exchange<?> http1Exchange)
         throws IOException
     {
-        this.client = client;
         this.request = request;
-        this.chan = connection;
-        buffers = new ByteBuffer[5]; // TODO: check
-        this.requestProc = request.requestProcessor;
+        this.http1Exchange = http1Exchange;
+        this.connection = http1Exchange.connection();
+        this.requestPublisher = request.requestPublisher;  // may be null
         this.userHeaders = request.getUserHeaders();
         this.systemHeaders = request.getSystemHeaders();
-        this.cf = new MinimalFuture<>();
     }
 
-    private void logHeaders() throws IOException {
-        StringBuilder sb = new StringBuilder(256);
-        sb.append("REQUEST HEADERS:\n");
-        Log.dumpHeaders(sb, "    ", systemHeaders);
-        Log.dumpHeaders(sb, "    ", userHeaders);
-        Log.logHeaders(sb.toString());
-    }
+    private void logHeaders(String completeHeaders) {
+        if (Log.headers()) {
+            //StringBuilder sb = new StringBuilder(256);
+            //sb.append("REQUEST HEADERS:\n");
+            //Log.dumpHeaders(sb, "    ", systemHeaders);
+            //Log.dumpHeaders(sb, "    ", userHeaders);
+            //Log.logHeaders(sb.toString());
 
-    private void dummy(long x) {
-        // not used in this class
+            String s = completeHeaders.replaceAll("\r\n", "\n");
+            Log.logHeaders("REQUEST HEADERS:\n" + s);
     }
-
-    private void collectHeaders0() throws IOException {
-        if (Log.headers()) {
-            logHeaders();
         }
-        StringBuilder sb = new StringBuilder(256);
-        collectHeaders1(sb, request, systemHeaders);
-        collectHeaders1(sb, request, userHeaders);
+
+    private void collectHeaders0(StringBuilder sb) {
+        collectHeaders1(sb, systemHeaders);
+        collectHeaders1(sb, userHeaders);
         sb.append("\r\n");
-        String headers = sb.toString();
-        buffers[1] = ByteBuffer.wrap(headers.getBytes(StandardCharsets.US_ASCII));
     }
 
-    private void collectHeaders1(StringBuilder sb,
-                                 HttpRequestImpl request,
-                                 HttpHeaders headers)
-        throws IOException
-    {
-        Map<String,List<String>> h = headers.map();
-        Set<Map.Entry<String,List<String>>> entries = h.entrySet();
-
-        for (Map.Entry<String,List<String>> entry : entries) {
+    private void collectHeaders1(StringBuilder sb, HttpHeaders headers) {
+        for (Map.Entry<String,List<String>> entry : headers.map().entrySet()) {
             String key = entry.getKey();
             List<String> values = entry.getValue();
             for (String value : values) {
-                sb.append(key)
-                  .append(": ")
-                  .append(value)
-                  .append("\r\n");
+                sb.append(key).append(": ").append(value).append("\r\n");
             }
         }
     }
 
     private String getPathAndQuery(URI uri) {

@@ -164,11 +140,11 @@
 
     private String requestURI() {
         URI uri = request.uri();
         String method = request.method();
 
-        if ((request.proxy(client) == null && !method.equals("CONNECT"))
+        if ((request.proxy() == null && !method.equals("CONNECT"))
                 || request.isWebSocket()) {
             return getPathAndQuery(uri);
         }
         if (request.secure()) {
             if (request.method().equals("CONNECT")) {

@@ -177,29 +153,16 @@
             } else {
                 // requests over tunnel do not require full URL
                 return getPathAndQuery(uri);
             }
         }
-        return uri == null? authorityString(request.authority()) : uri.toString();
-    }
-
-    void sendHeadersOnly() throws IOException {
-        collectHeaders();
-        chan.write(buffers, 0, 2);
+        if (request.method().equals("CONNECT")) {
+            // use authority for connect itself
+            return authorityString(request.authority());
     }
 
-    void sendRequest() throws IOException {
-        collectHeaders();
-        chan.configureMode(Mode.BLOCKING);
-        if (contentLength == 0) {
-            chan.write(buffers, 0, 2);
-        } else if (contentLength > 0) {
-            writeFixedContent(true);
-        } else {
-            writeStreamedContent(true);
-        }
-        setFinished();
+        return uri == null? authorityString(request.authority()) : uri.toString();
     }
 
     private boolean finished;
 
     synchronized boolean finished() {

@@ -208,267 +171,204 @@
 
     synchronized void setFinished() {
         finished = true;
     }
 
-    private void collectHeaders() throws IOException {
+    List<ByteBuffer> headers() {
         if (Log.requests() && request != null) {
             Log.logRequest(request.toString());
         }
         String uriString = requestURI();
         StringBuilder sb = new StringBuilder(64);
         sb.append(request.method())
           .append(' ')
           .append(uriString)
           .append(" HTTP/1.1\r\n");
-        String cmd = sb.toString();
 
-        buffers[0] = ByteBuffer.wrap(cmd.getBytes(StandardCharsets.US_ASCII));
         URI uri = request.uri();
         if (uri != null) {
             systemHeaders.setHeader("Host", hostString());
         }
-        if (request == null) {
-            // this is not a user request. No content
+        if (request == null || requestPublisher == null) {
+            // Not a user request, or maybe a method, e.g. GET, with no body.
             contentLength = 0;
         } else {
-            contentLength = requestProc.contentLength();
+            contentLength = requestPublisher.contentLength();
         }
 
         if (contentLength == 0) {
             systemHeaders.setHeader("Content-Length", "0");
-            collectHeaders0();
         } else if (contentLength > 0) {
-            /* [0] request line [1] headers [2] body  */
-            systemHeaders.setHeader("Content-Length",
-                                    Integer.toString((int) contentLength));
+            systemHeaders.setHeader("Content-Length", Long.toString(contentLength));
             streaming = false;
-            collectHeaders0();
-            buffers[2] = getBuffer();
         } else {
-            /* Chunked:
-             *
-             * [0] request line [1] headers [2] chunk header [3] chunk data [4]
-             * final chunk header and trailing CRLF of previous chunks
-             *
-             * 2,3,4 used repeatedly */
             streaming = true;
             systemHeaders.setHeader("Transfer-encoding", "chunked");
-            collectHeaders0();
-            buffers[3] = getBuffer();
-        }
     }
-
-    private ByteBuffer getBuffer() {
-        return ByteBuffer.allocate(Utils.BUFSIZE);
+        collectHeaders0(sb);
+        String hs = sb.toString();
+        logHeaders(hs);
+        ByteBuffer b = ByteBuffer.wrap(hs.getBytes(US_ASCII));
+        return List.of(b);
     }
 
-    // The following two methods used by Http1Exchange to handle expect continue
-
-    void continueRequest() throws IOException {
+    Http1BodySubscriber continueRequest()  {
+        Http1BodySubscriber subscriber;
         if (streaming) {
-            writeStreamedContent(false);
+            subscriber = new StreamSubscriber();
+            requestPublisher.subscribe(subscriber);
         } else {
-            writeFixedContent(false);
+            if (contentLength == 0)
+                return null;
+
+            subscriber = new FixedContentSubscriber();
+            requestPublisher.subscribe(subscriber);
         }
-        setFinished();
+        return subscriber;
     }
 
-    class StreamSubscriber implements Flow.Subscriber<ByteBuffer> {
-        volatile Flow.Subscription subscription;
-        volatile boolean includeHeaders;
-
-        StreamSubscriber(boolean includeHeaders) {
-            this.includeHeaders = includeHeaders;
-        }
+    class StreamSubscriber extends Http1BodySubscriber {
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
             if (this.subscription != null) {
-                throw new IllegalStateException("already subscribed");
-            }
+                Throwable t = new IllegalStateException("already subscribed");
+                http1Exchange.appendToOutgoing(t);
+            } else {
             this.subscription = subscription;
-            subscription.request(1);
+            }
         }
 
         @Override
         public void onNext(ByteBuffer item) {
-            int startbuf, nbufs;
-
-            if (cf.isDone()) {
-                throw new IllegalStateException("subscription already completed");
-            }
-
-            if (includeHeaders) {
-                startbuf = 0;
-                nbufs = 5;
+            Objects.requireNonNull(item);
+            if (complete) {
+                Throwable t = new IllegalStateException("subscription already completed");
+                http1Exchange.appendToOutgoing(t);
             } else {
-                startbuf = 2;
-                nbufs = 3;
-            }
             int chunklen = item.remaining();
-            buffers[3] = item;
-            buffers[2] = getHeader(chunklen);
-            buffers[4] = CRLF_BUFFER();
-            try {
-                chan.write(buffers, startbuf, nbufs);
-            } catch (IOException e) {
-                subscription.cancel();
-                cf.completeExceptionally(e);
+                ArrayList<ByteBuffer> l = new ArrayList<>(3);
+                l.add(getHeader(chunklen));
+                l.add(item);
+                l.add(ByteBuffer.wrap(CRLF));
+                http1Exchange.appendToOutgoing(l);
             }
-            includeHeaders = false;
-            subscription.request(1);
         }
 
         @Override
         public void onError(Throwable throwable) {
-            if (cf.isDone()) {
+            if (complete)
                 return;
-            }
+
             subscription.cancel();
-            cf.completeExceptionally(throwable);
+            http1Exchange.appendToOutgoing(throwable);
         }
 
         @Override
         public void onComplete() {
-            if (cf.isDone()) {
-                throw new IllegalStateException("subscription already completed");
-            }
-            buffers[3] = EMPTY_CHUNK_HEADER();
-            buffers[4] = CRLF_BUFFER();
-            try {
-                chan.write(buffers, 3, 2);
-            } catch (IOException ex) {
-                cf.completeExceptionally(ex);
-                return;
-            }
-            cf.complete(null);
-        }
-    }
+            if (complete) {
+                Throwable t = new IllegalStateException("subscription already completed");
+                http1Exchange.appendToOutgoing(t);
+            } else {
+                ArrayList<ByteBuffer> l = new ArrayList<>(2);
+                l.add(ByteBuffer.wrap(EMPTY_CHUNK_BYTES));
+                l.add(ByteBuffer.wrap(CRLF));
+                complete = true;
+                //setFinished();
+                http1Exchange.appendToOutgoing(l);
+                http1Exchange.appendToOutgoing(COMPLETED);
+                setFinished();  // TODO: before or after,? does it matter?
 
-    private void waitForCompletion() throws IOException {
-        try {
-            cf.join();
-        } catch (CompletionException e) {
-            throw Utils.getIOException(e);
         }
     }
-
-    /* Entire request is sent, or just body only  */
-    private void writeStreamedContent(boolean includeHeaders)
-        throws IOException
-    {
-        StreamSubscriber subscriber = new StreamSubscriber(includeHeaders);
-        requestProc.subscribe(subscriber);
-        waitForCompletion();
     }
 
-    class FixedContentSubscriber implements Flow.Subscriber<ByteBuffer>
-    {
-        volatile Flow.Subscription subscription;
-        volatile boolean includeHeaders;
-        volatile long contentWritten = 0;
+    class FixedContentSubscriber extends Http1BodySubscriber {
 
-        FixedContentSubscriber(boolean includeHeaders) {
-            this.includeHeaders = includeHeaders;
-        }
+        private volatile long contentWritten;
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
             if (this.subscription != null) {
-                throw new IllegalStateException("already subscribed");
-            }
+                Throwable t = new IllegalStateException("already subscribed");
+                http1Exchange.appendToOutgoing(t);
+            } else {
             this.subscription = subscription;
-            subscription.request(1);
+            }
         }
 
         @Override
         public void onNext(ByteBuffer item) {
-            int startbuf, nbufs;
-            long headersLength;
+            debug.log(Level.DEBUG, "onNext");
+            Objects.requireNonNull(item);
+            if (complete) {
+                Throwable t = new IllegalStateException("subscription already completed");
+                http1Exchange.appendToOutgoing(t);
+            } else {
+                long writing = item.remaining();
+                long written = (contentWritten += writing);
 
-            if (includeHeaders) {
-                startbuf = 0;
-                nbufs = 3;
-                headersLength = buffers[0].remaining() + buffers[1].remaining();
-            } else {
-                startbuf = 2;
-                nbufs = 1;
-                headersLength = 0;
-            }
-            buffers[2] = item;
-            try {
-                long writing = buffers[2].remaining() + headersLength;
-                contentWritten += buffers[2].remaining();
-                chan.checkWrite(writing, buffers, startbuf, nbufs);
-
-                if (contentWritten > contentLength) {
-                    String msg = "Too many bytes in request body. Expected: " +
-                        Long.toString(contentLength) + " Sent: " +
-                        Long.toString(contentWritten);
-                    throw new IOException(msg);
-                }
-                subscription.request(1);
-            } catch (IOException e) {
+                if (written > contentLength) {
                 subscription.cancel();
-                cf.completeExceptionally(e);
+                    String msg = connection.getConnectionFlow()
+                                  + " [" + Thread.currentThread().getName() +"] "
+                                  + "Too many bytes in request body. Expected: "
+                                  + contentLength + ", got: " + written;
+                    http1Exchange.appendToOutgoing(new IOException(msg));
+                } else {
+                    http1Exchange.appendToOutgoing(List.of(item));
+                }
             }
         }
 
         @Override
         public void onError(Throwable throwable) {
-            if (cf.isDone()) {
+            debug.log(Level.DEBUG, "onError");
+            if (complete)  // TODO: error?
                 return;
-            }
+
             subscription.cancel();
-            cf.completeExceptionally(throwable);
+            http1Exchange.appendToOutgoing(throwable);
         }
 
         @Override
         public void onComplete() {
-            if (cf.isDone()) {
-                throw new IllegalStateException("subscription already completed");
-            }
-
-            if (contentLength > contentWritten) {
+            debug.log(Level.DEBUG, "onComplete");
+            if (complete) {
+                Throwable t = new IllegalStateException("subscription already completed");
+                http1Exchange.appendToOutgoing(t);
+            } else {
+                complete = true;
+                long written = contentWritten;
+                if (contentLength > written) {
                 subscription.cancel();
-                Exception e = new IOException("Too few bytes returned by the processor");
-                cf.completeExceptionally(e);
+                    Throwable t = new IOException(connection.getConnectionFlow()
+                                         + " [" + Thread.currentThread().getName() +"] "
+                                         + "Too few bytes returned by the publisher ("
+                                                  + written + "/"
+                                                  + contentLength + ")");
+                    http1Exchange.appendToOutgoing(t);
             } else {
-                cf.complete(null);
-            }
+                    http1Exchange.appendToOutgoing(COMPLETED);
         }
     }
-
-    /* Entire request is sent, or just body only */
-    private void writeFixedContent(boolean includeHeaders)
-            throws IOException {
-        if (contentLength == 0) {
-            return;
         }
-        FixedContentSubscriber subscriber = new FixedContentSubscriber(includeHeaders);
-        requestProc.subscribe(subscriber);
-        waitForCompletion();
     }
 
     private static final byte[] CRLF = {'\r', '\n'};
     private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'};
 
-    private ByteBuffer CRLF_BUFFER() {
-        return ByteBuffer.wrap(CRLF);
-    }
-
-    private ByteBuffer EMPTY_CHUNK_HEADER() {
-        return ByteBuffer.wrap(EMPTY_CHUNK_BYTES);
-    }
-
-    /* Returns a header for a particular chunk size */
-    private static ByteBuffer getHeader(int size){
+    /** Returns a header for a particular chunk size */
+    private static ByteBuffer getHeader(int size) {
         String hexStr =  Integer.toHexString(size);
         byte[] hexBytes = hexStr.getBytes(US_ASCII);
         byte[] header = new byte[hexStr.length()+2];
         System.arraycopy(hexBytes, 0, header, 0, hexBytes.length);
         header[hexBytes.length] = CRLF[0];
         header[hexBytes.length+1] = CRLF[1];
         return ByteBuffer.wrap(header);
     }
+
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    final System.Logger  debug = Utils.getDebugLogger(this::toString, DEBUG);
+
 }
< prev index next >