--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java 2017-11-30 04:03:54.914696995 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java 2017-11-30 04:03:54.623671554 -0800 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -26,97 +26,73 @@ package jdk.incubator.http; import java.io.IOException; +import java.lang.System.Logger.Level; import java.net.URI; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import java.net.InetSocketAddress; -import jdk.incubator.http.HttpConnection.Mode; -import java.nio.charset.StandardCharsets; -import static java.nio.charset.StandardCharsets.US_ASCII; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; +import java.util.Objects; import java.util.concurrent.Flow; +import jdk.incubator.http.Http1Exchange.Http1BodySubscriber; import jdk.incubator.http.internal.common.HttpHeadersImpl; import jdk.incubator.http.internal.common.Log; -import jdk.incubator.http.internal.common.MinimalFuture; import jdk.incubator.http.internal.common.Utils; +import static java.nio.charset.StandardCharsets.US_ASCII; + /** - * A HTTP/1.1 request. - * - * send() -> Writes the request + body to the given channel, in one blocking - * operation. + * An HTTP/1.1 request. */ class Http1Request { - final HttpClientImpl client; - final HttpRequestImpl request; - final HttpConnection chan; - // Multiple buffers are used to hold different parts of request - // See line 206 and below for description - final ByteBuffer[] buffers; - final HttpRequest.BodyProcessor requestProc; - final HttpHeaders userHeaders; - final HttpHeadersImpl systemHeaders; - boolean streaming; - long contentLength; - final CompletableFuture cf; + private final HttpRequestImpl request; + private final Http1Exchange http1Exchange; + private final HttpConnection connection; + private final HttpRequest.BodyPublisher requestPublisher; + private final HttpHeaders userHeaders; + private final HttpHeadersImpl systemHeaders; + private volatile boolean streaming; + private volatile long contentLength; - Http1Request(HttpRequestImpl request, HttpClientImpl client, HttpConnection connection) + Http1Request(HttpRequestImpl request, + Http1Exchange http1Exchange) throws IOException { - this.client = client; this.request = request; - this.chan = connection; - buffers = new ByteBuffer[5]; // TODO: check - this.requestProc = request.requestProcessor; + this.http1Exchange = http1Exchange; + this.connection = http1Exchange.connection(); + this.requestPublisher = request.requestPublisher; // may be null this.userHeaders = request.getUserHeaders(); this.systemHeaders = request.getSystemHeaders(); - this.cf = new MinimalFuture<>(); } - private void logHeaders() throws IOException { - StringBuilder sb = new StringBuilder(256); - sb.append("REQUEST HEADERS:\n"); - Log.dumpHeaders(sb, " ", systemHeaders); - Log.dumpHeaders(sb, " ", userHeaders); - Log.logHeaders(sb.toString()); - } + private void logHeaders(String completeHeaders) { + if (Log.headers()) { + //StringBuilder sb = new StringBuilder(256); + //sb.append("REQUEST HEADERS:\n"); + //Log.dumpHeaders(sb, " ", systemHeaders); + //Log.dumpHeaders(sb, " ", userHeaders); + //Log.logHeaders(sb.toString()); - private void dummy(long x) { - // not used in this class + String s = completeHeaders.replaceAll("\r\n", "\n"); + Log.logHeaders("REQUEST HEADERS:\n" + s); + } } - private void collectHeaders0() throws IOException { - if (Log.headers()) { - logHeaders(); - } - StringBuilder sb = new StringBuilder(256); - collectHeaders1(sb, request, systemHeaders); - collectHeaders1(sb, request, userHeaders); + private void collectHeaders0(StringBuilder sb) { + collectHeaders1(sb, systemHeaders); + collectHeaders1(sb, userHeaders); sb.append("\r\n"); - String headers = sb.toString(); - buffers[1] = ByteBuffer.wrap(headers.getBytes(StandardCharsets.US_ASCII)); } - private void collectHeaders1(StringBuilder sb, - HttpRequestImpl request, - HttpHeaders headers) - throws IOException - { - Map> h = headers.map(); - Set>> entries = h.entrySet(); - - for (Map.Entry> entry : entries) { + private void collectHeaders1(StringBuilder sb, HttpHeaders headers) { + for (Map.Entry> entry : headers.map().entrySet()) { String key = entry.getKey(); List values = entry.getValue(); for (String value : values) { - sb.append(key) - .append(": ") - .append(value) - .append("\r\n"); + sb.append(key).append(": ").append(value).append("\r\n"); } } } @@ -166,7 +142,7 @@ URI uri = request.uri(); String method = request.method(); - if ((request.proxy(client) == null && !method.equals("CONNECT")) + if ((request.proxy() == null && !method.equals("CONNECT")) || request.isWebSocket()) { return getPathAndQuery(uri); } @@ -179,25 +155,12 @@ return getPathAndQuery(uri); } } - return uri == null? authorityString(request.authority()) : uri.toString(); - } - - void sendHeadersOnly() throws IOException { - collectHeaders(); - chan.write(buffers, 0, 2); - } - - void sendRequest() throws IOException { - collectHeaders(); - chan.configureMode(Mode.BLOCKING); - if (contentLength == 0) { - chan.write(buffers, 0, 2); - } else if (contentLength > 0) { - writeFixedContent(true); - } else { - writeStreamedContent(true); + if (request.method().equals("CONNECT")) { + // use authority for connect itself + return authorityString(request.authority()); } - setFinished(); + + return uri == null? authorityString(request.authority()) : uri.toString(); } private boolean finished; @@ -210,7 +173,7 @@ finished = true; } - private void collectHeaders() throws IOException { + List headers() { if (Log.requests() && request != null) { Log.logRequest(request.toString()); } @@ -220,250 +183,183 @@ .append(' ') .append(uriString) .append(" HTTP/1.1\r\n"); - String cmd = sb.toString(); - buffers[0] = ByteBuffer.wrap(cmd.getBytes(StandardCharsets.US_ASCII)); URI uri = request.uri(); if (uri != null) { systemHeaders.setHeader("Host", hostString()); } - if (request == null) { - // this is not a user request. No content + if (request == null || requestPublisher == null) { + // Not a user request, or maybe a method, e.g. GET, with no body. contentLength = 0; } else { - contentLength = requestProc.contentLength(); + contentLength = requestPublisher.contentLength(); } if (contentLength == 0) { systemHeaders.setHeader("Content-Length", "0"); - collectHeaders0(); } else if (contentLength > 0) { - /* [0] request line [1] headers [2] body */ - systemHeaders.setHeader("Content-Length", - Integer.toString((int) contentLength)); + systemHeaders.setHeader("Content-Length", Long.toString(contentLength)); streaming = false; - collectHeaders0(); - buffers[2] = getBuffer(); } else { - /* Chunked: - * - * [0] request line [1] headers [2] chunk header [3] chunk data [4] - * final chunk header and trailing CRLF of previous chunks - * - * 2,3,4 used repeatedly */ streaming = true; systemHeaders.setHeader("Transfer-encoding", "chunked"); - collectHeaders0(); - buffers[3] = getBuffer(); } + collectHeaders0(sb); + String hs = sb.toString(); + logHeaders(hs); + ByteBuffer b = ByteBuffer.wrap(hs.getBytes(US_ASCII)); + return List.of(b); } - private ByteBuffer getBuffer() { - return ByteBuffer.allocate(Utils.BUFSIZE); - } - - // The following two methods used by Http1Exchange to handle expect continue - - void continueRequest() throws IOException { + Http1BodySubscriber continueRequest() { + Http1BodySubscriber subscriber; if (streaming) { - writeStreamedContent(false); + subscriber = new StreamSubscriber(); + requestPublisher.subscribe(subscriber); } else { - writeFixedContent(false); + if (contentLength == 0) + return null; + + subscriber = new FixedContentSubscriber(); + requestPublisher.subscribe(subscriber); } - setFinished(); + return subscriber; } - class StreamSubscriber implements Flow.Subscriber { - volatile Flow.Subscription subscription; - volatile boolean includeHeaders; - - StreamSubscriber(boolean includeHeaders) { - this.includeHeaders = includeHeaders; - } + class StreamSubscriber extends Http1BodySubscriber { @Override public void onSubscribe(Flow.Subscription subscription) { if (this.subscription != null) { - throw new IllegalStateException("already subscribed"); + Throwable t = new IllegalStateException("already subscribed"); + http1Exchange.appendToOutgoing(t); + } else { + this.subscription = subscription; } - this.subscription = subscription; - subscription.request(1); } @Override public void onNext(ByteBuffer item) { - int startbuf, nbufs; - - if (cf.isDone()) { - throw new IllegalStateException("subscription already completed"); - } - - if (includeHeaders) { - startbuf = 0; - nbufs = 5; + Objects.requireNonNull(item); + if (complete) { + Throwable t = new IllegalStateException("subscription already completed"); + http1Exchange.appendToOutgoing(t); } else { - startbuf = 2; - nbufs = 3; + int chunklen = item.remaining(); + ArrayList l = new ArrayList<>(3); + l.add(getHeader(chunklen)); + l.add(item); + l.add(ByteBuffer.wrap(CRLF)); + http1Exchange.appendToOutgoing(l); } - int chunklen = item.remaining(); - buffers[3] = item; - buffers[2] = getHeader(chunklen); - buffers[4] = CRLF_BUFFER(); - try { - chan.write(buffers, startbuf, nbufs); - } catch (IOException e) { - subscription.cancel(); - cf.completeExceptionally(e); - } - includeHeaders = false; - subscription.request(1); } @Override public void onError(Throwable throwable) { - if (cf.isDone()) { + if (complete) return; - } + subscription.cancel(); - cf.completeExceptionally(throwable); + http1Exchange.appendToOutgoing(throwable); } @Override public void onComplete() { - if (cf.isDone()) { - throw new IllegalStateException("subscription already completed"); - } - buffers[3] = EMPTY_CHUNK_HEADER(); - buffers[4] = CRLF_BUFFER(); - try { - chan.write(buffers, 3, 2); - } catch (IOException ex) { - cf.completeExceptionally(ex); - return; + if (complete) { + Throwable t = new IllegalStateException("subscription already completed"); + http1Exchange.appendToOutgoing(t); + } else { + ArrayList l = new ArrayList<>(2); + l.add(ByteBuffer.wrap(EMPTY_CHUNK_BYTES)); + l.add(ByteBuffer.wrap(CRLF)); + complete = true; + //setFinished(); + http1Exchange.appendToOutgoing(l); + http1Exchange.appendToOutgoing(COMPLETED); + setFinished(); // TODO: before or after,? does it matter? + } - cf.complete(null); } } - private void waitForCompletion() throws IOException { - try { - cf.join(); - } catch (CompletionException e) { - throw Utils.getIOException(e); - } - } + class FixedContentSubscriber extends Http1BodySubscriber { - /* Entire request is sent, or just body only */ - private void writeStreamedContent(boolean includeHeaders) - throws IOException - { - StreamSubscriber subscriber = new StreamSubscriber(includeHeaders); - requestProc.subscribe(subscriber); - waitForCompletion(); - } - - class FixedContentSubscriber implements Flow.Subscriber - { - volatile Flow.Subscription subscription; - volatile boolean includeHeaders; - volatile long contentWritten = 0; - - FixedContentSubscriber(boolean includeHeaders) { - this.includeHeaders = includeHeaders; - } + private volatile long contentWritten; @Override public void onSubscribe(Flow.Subscription subscription) { if (this.subscription != null) { - throw new IllegalStateException("already subscribed"); + Throwable t = new IllegalStateException("already subscribed"); + http1Exchange.appendToOutgoing(t); + } else { + this.subscription = subscription; } - this.subscription = subscription; - subscription.request(1); } @Override public void onNext(ByteBuffer item) { - int startbuf, nbufs; - long headersLength; - - if (includeHeaders) { - startbuf = 0; - nbufs = 3; - headersLength = buffers[0].remaining() + buffers[1].remaining(); + debug.log(Level.DEBUG, "onNext"); + Objects.requireNonNull(item); + if (complete) { + Throwable t = new IllegalStateException("subscription already completed"); + http1Exchange.appendToOutgoing(t); } else { - startbuf = 2; - nbufs = 1; - headersLength = 0; - } - buffers[2] = item; - try { - long writing = buffers[2].remaining() + headersLength; - contentWritten += buffers[2].remaining(); - chan.checkWrite(writing, buffers, startbuf, nbufs); - - if (contentWritten > contentLength) { - String msg = "Too many bytes in request body. Expected: " + - Long.toString(contentLength) + " Sent: " + - Long.toString(contentWritten); - throw new IOException(msg); + long writing = item.remaining(); + long written = (contentWritten += writing); + + if (written > contentLength) { + subscription.cancel(); + String msg = connection.getConnectionFlow() + + " [" + Thread.currentThread().getName() +"] " + + "Too many bytes in request body. Expected: " + + contentLength + ", got: " + written; + http1Exchange.appendToOutgoing(new IOException(msg)); + } else { + http1Exchange.appendToOutgoing(List.of(item)); } - subscription.request(1); - } catch (IOException e) { - subscription.cancel(); - cf.completeExceptionally(e); } } @Override public void onError(Throwable throwable) { - if (cf.isDone()) { + debug.log(Level.DEBUG, "onError"); + if (complete) // TODO: error? return; - } + subscription.cancel(); - cf.completeExceptionally(throwable); + http1Exchange.appendToOutgoing(throwable); } @Override public void onComplete() { - if (cf.isDone()) { - throw new IllegalStateException("subscription already completed"); - } - - if (contentLength > contentWritten) { - subscription.cancel(); - Exception e = new IOException("Too few bytes returned by the processor"); - cf.completeExceptionally(e); + debug.log(Level.DEBUG, "onComplete"); + if (complete) { + Throwable t = new IllegalStateException("subscription already completed"); + http1Exchange.appendToOutgoing(t); } else { - cf.complete(null); + complete = true; + long written = contentWritten; + if (contentLength > written) { + subscription.cancel(); + Throwable t = new IOException(connection.getConnectionFlow() + + " [" + Thread.currentThread().getName() +"] " + + "Too few bytes returned by the publisher (" + + written + "/" + + contentLength + ")"); + http1Exchange.appendToOutgoing(t); + } else { + http1Exchange.appendToOutgoing(COMPLETED); + } } } } - /* Entire request is sent, or just body only */ - private void writeFixedContent(boolean includeHeaders) - throws IOException { - if (contentLength == 0) { - return; - } - FixedContentSubscriber subscriber = new FixedContentSubscriber(includeHeaders); - requestProc.subscribe(subscriber); - waitForCompletion(); - } - private static final byte[] CRLF = {'\r', '\n'}; private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'}; - private ByteBuffer CRLF_BUFFER() { - return ByteBuffer.wrap(CRLF); - } - - private ByteBuffer EMPTY_CHUNK_HEADER() { - return ByteBuffer.wrap(EMPTY_CHUNK_BYTES); - } - - /* Returns a header for a particular chunk size */ - private static ByteBuffer getHeader(int size){ - String hexStr = Integer.toHexString(size); + /** Returns a header for a particular chunk size */ + private static ByteBuffer getHeader(int size) { + String hexStr = Integer.toHexString(size); byte[] hexBytes = hexStr.getBytes(US_ASCII); byte[] header = new byte[hexStr.length()+2]; System.arraycopy(hexBytes, 0, header, 0, hexBytes.length); @@ -471,4 +367,8 @@ header[hexBytes.length+1] = CRLF[1]; return ByteBuffer.wrap(header); } + + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + final System.Logger debug = Utils.getDebugLogger(this::toString, DEBUG); + }