< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java
Print this page
@@ -1,7 +1,7 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
@@ -24,101 +24,77 @@
*/
package jdk.incubator.http;
import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.net.InetSocketAddress;
-import jdk.incubator.http.HttpConnection.Mode;
-import java.nio.charset.StandardCharsets;
-import static java.nio.charset.StandardCharsets.US_ASCII;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
+import java.util.Objects;
import java.util.concurrent.Flow;
+import jdk.incubator.http.Http1Exchange.Http1BodySubscriber;
import jdk.incubator.http.internal.common.HttpHeadersImpl;
import jdk.incubator.http.internal.common.Log;
-import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
+import static java.nio.charset.StandardCharsets.US_ASCII;
+
/**
- * A HTTP/1.1 request.
- *
- * send() -> Writes the request + body to the given channel, in one blocking
- * operation.
+ * An HTTP/1.1 request.
*/
class Http1Request {
- final HttpClientImpl client;
- final HttpRequestImpl request;
- final HttpConnection chan;
- // Multiple buffers are used to hold different parts of request
- // See line 206 and below for description
- final ByteBuffer[] buffers;
- final HttpRequest.BodyProcessor requestProc;
- final HttpHeaders userHeaders;
- final HttpHeadersImpl systemHeaders;
- boolean streaming;
- long contentLength;
- final CompletableFuture<Void> cf;
+ private final HttpRequestImpl request;
+ private final Http1Exchange<?> http1Exchange;
+ private final HttpConnection connection;
+ private final HttpRequest.BodyPublisher requestPublisher;
+ private final HttpHeaders userHeaders;
+ private final HttpHeadersImpl systemHeaders;
+ private volatile boolean streaming;
+ private volatile long contentLength;
- Http1Request(HttpRequestImpl request, HttpClientImpl client, HttpConnection connection)
+ Http1Request(HttpRequestImpl request,
+ Http1Exchange<?> http1Exchange)
throws IOException
{
- this.client = client;
this.request = request;
- this.chan = connection;
- buffers = new ByteBuffer[5]; // TODO: check
- this.requestProc = request.requestProcessor;
+ this.http1Exchange = http1Exchange;
+ this.connection = http1Exchange.connection();
+ this.requestPublisher = request.requestPublisher; // may be null
this.userHeaders = request.getUserHeaders();
this.systemHeaders = request.getSystemHeaders();
- this.cf = new MinimalFuture<>();
}
- private void logHeaders() throws IOException {
- StringBuilder sb = new StringBuilder(256);
- sb.append("REQUEST HEADERS:\n");
- Log.dumpHeaders(sb, " ", systemHeaders);
- Log.dumpHeaders(sb, " ", userHeaders);
- Log.logHeaders(sb.toString());
- }
+ private void logHeaders(String completeHeaders) {
+ if (Log.headers()) {
+ //StringBuilder sb = new StringBuilder(256);
+ //sb.append("REQUEST HEADERS:\n");
+ //Log.dumpHeaders(sb, " ", systemHeaders);
+ //Log.dumpHeaders(sb, " ", userHeaders);
+ //Log.logHeaders(sb.toString());
- private void dummy(long x) {
- // not used in this class
+ String s = completeHeaders.replaceAll("\r\n", "\n");
+ Log.logHeaders("REQUEST HEADERS:\n" + s);
}
-
- private void collectHeaders0() throws IOException {
- if (Log.headers()) {
- logHeaders();
}
- StringBuilder sb = new StringBuilder(256);
- collectHeaders1(sb, request, systemHeaders);
- collectHeaders1(sb, request, userHeaders);
+
+ private void collectHeaders0(StringBuilder sb) {
+ collectHeaders1(sb, systemHeaders);
+ collectHeaders1(sb, userHeaders);
sb.append("\r\n");
- String headers = sb.toString();
- buffers[1] = ByteBuffer.wrap(headers.getBytes(StandardCharsets.US_ASCII));
}
- private void collectHeaders1(StringBuilder sb,
- HttpRequestImpl request,
- HttpHeaders headers)
- throws IOException
- {
- Map<String,List<String>> h = headers.map();
- Set<Map.Entry<String,List<String>>> entries = h.entrySet();
-
- for (Map.Entry<String,List<String>> entry : entries) {
+ private void collectHeaders1(StringBuilder sb, HttpHeaders headers) {
+ for (Map.Entry<String,List<String>> entry : headers.map().entrySet()) {
String key = entry.getKey();
List<String> values = entry.getValue();
for (String value : values) {
- sb.append(key)
- .append(": ")
- .append(value)
- .append("\r\n");
+ sb.append(key).append(": ").append(value).append("\r\n");
}
}
}
private String getPathAndQuery(URI uri) {
@@ -164,11 +140,11 @@
private String requestURI() {
URI uri = request.uri();
String method = request.method();
- if ((request.proxy(client) == null && !method.equals("CONNECT"))
+ if ((request.proxy() == null && !method.equals("CONNECT"))
|| request.isWebSocket()) {
return getPathAndQuery(uri);
}
if (request.secure()) {
if (request.method().equals("CONNECT")) {
@@ -177,29 +153,16 @@
} else {
// requests over tunnel do not require full URL
return getPathAndQuery(uri);
}
}
- return uri == null? authorityString(request.authority()) : uri.toString();
- }
-
- void sendHeadersOnly() throws IOException {
- collectHeaders();
- chan.write(buffers, 0, 2);
+ if (request.method().equals("CONNECT")) {
+ // use authority for connect itself
+ return authorityString(request.authority());
}
- void sendRequest() throws IOException {
- collectHeaders();
- chan.configureMode(Mode.BLOCKING);
- if (contentLength == 0) {
- chan.write(buffers, 0, 2);
- } else if (contentLength > 0) {
- writeFixedContent(true);
- } else {
- writeStreamedContent(true);
- }
- setFinished();
+ return uri == null? authorityString(request.authority()) : uri.toString();
}
private boolean finished;
synchronized boolean finished() {
@@ -208,267 +171,204 @@
synchronized void setFinished() {
finished = true;
}
- private void collectHeaders() throws IOException {
+ List<ByteBuffer> headers() {
if (Log.requests() && request != null) {
Log.logRequest(request.toString());
}
String uriString = requestURI();
StringBuilder sb = new StringBuilder(64);
sb.append(request.method())
.append(' ')
.append(uriString)
.append(" HTTP/1.1\r\n");
- String cmd = sb.toString();
- buffers[0] = ByteBuffer.wrap(cmd.getBytes(StandardCharsets.US_ASCII));
URI uri = request.uri();
if (uri != null) {
systemHeaders.setHeader("Host", hostString());
}
- if (request == null) {
- // this is not a user request. No content
+ if (request == null || requestPublisher == null) {
+ // Not a user request, or maybe a method, e.g. GET, with no body.
contentLength = 0;
} else {
- contentLength = requestProc.contentLength();
+ contentLength = requestPublisher.contentLength();
}
if (contentLength == 0) {
systemHeaders.setHeader("Content-Length", "0");
- collectHeaders0();
} else if (contentLength > 0) {
- /* [0] request line [1] headers [2] body */
- systemHeaders.setHeader("Content-Length",
- Integer.toString((int) contentLength));
+ systemHeaders.setHeader("Content-Length", Long.toString(contentLength));
streaming = false;
- collectHeaders0();
- buffers[2] = getBuffer();
} else {
- /* Chunked:
- *
- * [0] request line [1] headers [2] chunk header [3] chunk data [4]
- * final chunk header and trailing CRLF of previous chunks
- *
- * 2,3,4 used repeatedly */
streaming = true;
systemHeaders.setHeader("Transfer-encoding", "chunked");
- collectHeaders0();
- buffers[3] = getBuffer();
- }
}
-
- private ByteBuffer getBuffer() {
- return ByteBuffer.allocate(Utils.BUFSIZE);
+ collectHeaders0(sb);
+ String hs = sb.toString();
+ logHeaders(hs);
+ ByteBuffer b = ByteBuffer.wrap(hs.getBytes(US_ASCII));
+ return List.of(b);
}
- // The following two methods used by Http1Exchange to handle expect continue
-
- void continueRequest() throws IOException {
+ Http1BodySubscriber continueRequest() {
+ Http1BodySubscriber subscriber;
if (streaming) {
- writeStreamedContent(false);
+ subscriber = new StreamSubscriber();
+ requestPublisher.subscribe(subscriber);
} else {
- writeFixedContent(false);
+ if (contentLength == 0)
+ return null;
+
+ subscriber = new FixedContentSubscriber();
+ requestPublisher.subscribe(subscriber);
}
- setFinished();
+ return subscriber;
}
- class StreamSubscriber implements Flow.Subscriber<ByteBuffer> {
- volatile Flow.Subscription subscription;
- volatile boolean includeHeaders;
-
- StreamSubscriber(boolean includeHeaders) {
- this.includeHeaders = includeHeaders;
- }
+ class StreamSubscriber extends Http1BodySubscriber {
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
- throw new IllegalStateException("already subscribed");
- }
+ Throwable t = new IllegalStateException("already subscribed");
+ http1Exchange.appendToOutgoing(t);
+ } else {
this.subscription = subscription;
- subscription.request(1);
+ }
}
@Override
public void onNext(ByteBuffer item) {
- int startbuf, nbufs;
-
- if (cf.isDone()) {
- throw new IllegalStateException("subscription already completed");
- }
-
- if (includeHeaders) {
- startbuf = 0;
- nbufs = 5;
+ Objects.requireNonNull(item);
+ if (complete) {
+ Throwable t = new IllegalStateException("subscription already completed");
+ http1Exchange.appendToOutgoing(t);
} else {
- startbuf = 2;
- nbufs = 3;
- }
int chunklen = item.remaining();
- buffers[3] = item;
- buffers[2] = getHeader(chunklen);
- buffers[4] = CRLF_BUFFER();
- try {
- chan.write(buffers, startbuf, nbufs);
- } catch (IOException e) {
- subscription.cancel();
- cf.completeExceptionally(e);
+ ArrayList<ByteBuffer> l = new ArrayList<>(3);
+ l.add(getHeader(chunklen));
+ l.add(item);
+ l.add(ByteBuffer.wrap(CRLF));
+ http1Exchange.appendToOutgoing(l);
}
- includeHeaders = false;
- subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
- if (cf.isDone()) {
+ if (complete)
return;
- }
+
subscription.cancel();
- cf.completeExceptionally(throwable);
+ http1Exchange.appendToOutgoing(throwable);
}
@Override
public void onComplete() {
- if (cf.isDone()) {
- throw new IllegalStateException("subscription already completed");
- }
- buffers[3] = EMPTY_CHUNK_HEADER();
- buffers[4] = CRLF_BUFFER();
- try {
- chan.write(buffers, 3, 2);
- } catch (IOException ex) {
- cf.completeExceptionally(ex);
- return;
- }
- cf.complete(null);
- }
- }
+ if (complete) {
+ Throwable t = new IllegalStateException("subscription already completed");
+ http1Exchange.appendToOutgoing(t);
+ } else {
+ ArrayList<ByteBuffer> l = new ArrayList<>(2);
+ l.add(ByteBuffer.wrap(EMPTY_CHUNK_BYTES));
+ l.add(ByteBuffer.wrap(CRLF));
+ complete = true;
+ //setFinished();
+ http1Exchange.appendToOutgoing(l);
+ http1Exchange.appendToOutgoing(COMPLETED);
+ setFinished(); // TODO: before or after,? does it matter?
- private void waitForCompletion() throws IOException {
- try {
- cf.join();
- } catch (CompletionException e) {
- throw Utils.getIOException(e);
}
}
-
- /* Entire request is sent, or just body only */
- private void writeStreamedContent(boolean includeHeaders)
- throws IOException
- {
- StreamSubscriber subscriber = new StreamSubscriber(includeHeaders);
- requestProc.subscribe(subscriber);
- waitForCompletion();
}
- class FixedContentSubscriber implements Flow.Subscriber<ByteBuffer>
- {
- volatile Flow.Subscription subscription;
- volatile boolean includeHeaders;
- volatile long contentWritten = 0;
+ class FixedContentSubscriber extends Http1BodySubscriber {
- FixedContentSubscriber(boolean includeHeaders) {
- this.includeHeaders = includeHeaders;
- }
+ private volatile long contentWritten;
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
- throw new IllegalStateException("already subscribed");
- }
+ Throwable t = new IllegalStateException("already subscribed");
+ http1Exchange.appendToOutgoing(t);
+ } else {
this.subscription = subscription;
- subscription.request(1);
+ }
}
@Override
public void onNext(ByteBuffer item) {
- int startbuf, nbufs;
- long headersLength;
+ debug.log(Level.DEBUG, "onNext");
+ Objects.requireNonNull(item);
+ if (complete) {
+ Throwable t = new IllegalStateException("subscription already completed");
+ http1Exchange.appendToOutgoing(t);
+ } else {
+ long writing = item.remaining();
+ long written = (contentWritten += writing);
- if (includeHeaders) {
- startbuf = 0;
- nbufs = 3;
- headersLength = buffers[0].remaining() + buffers[1].remaining();
- } else {
- startbuf = 2;
- nbufs = 1;
- headersLength = 0;
- }
- buffers[2] = item;
- try {
- long writing = buffers[2].remaining() + headersLength;
- contentWritten += buffers[2].remaining();
- chan.checkWrite(writing, buffers, startbuf, nbufs);
-
- if (contentWritten > contentLength) {
- String msg = "Too many bytes in request body. Expected: " +
- Long.toString(contentLength) + " Sent: " +
- Long.toString(contentWritten);
- throw new IOException(msg);
- }
- subscription.request(1);
- } catch (IOException e) {
+ if (written > contentLength) {
subscription.cancel();
- cf.completeExceptionally(e);
+ String msg = connection.getConnectionFlow()
+ + " [" + Thread.currentThread().getName() +"] "
+ + "Too many bytes in request body. Expected: "
+ + contentLength + ", got: " + written;
+ http1Exchange.appendToOutgoing(new IOException(msg));
+ } else {
+ http1Exchange.appendToOutgoing(List.of(item));
+ }
}
}
@Override
public void onError(Throwable throwable) {
- if (cf.isDone()) {
+ debug.log(Level.DEBUG, "onError");
+ if (complete) // TODO: error?
return;
- }
+
subscription.cancel();
- cf.completeExceptionally(throwable);
+ http1Exchange.appendToOutgoing(throwable);
}
@Override
public void onComplete() {
- if (cf.isDone()) {
- throw new IllegalStateException("subscription already completed");
- }
-
- if (contentLength > contentWritten) {
+ debug.log(Level.DEBUG, "onComplete");
+ if (complete) {
+ Throwable t = new IllegalStateException("subscription already completed");
+ http1Exchange.appendToOutgoing(t);
+ } else {
+ complete = true;
+ long written = contentWritten;
+ if (contentLength > written) {
subscription.cancel();
- Exception e = new IOException("Too few bytes returned by the processor");
- cf.completeExceptionally(e);
+ Throwable t = new IOException(connection.getConnectionFlow()
+ + " [" + Thread.currentThread().getName() +"] "
+ + "Too few bytes returned by the publisher ("
+ + written + "/"
+ + contentLength + ")");
+ http1Exchange.appendToOutgoing(t);
} else {
- cf.complete(null);
- }
+ http1Exchange.appendToOutgoing(COMPLETED);
}
}
-
- /* Entire request is sent, or just body only */
- private void writeFixedContent(boolean includeHeaders)
- throws IOException {
- if (contentLength == 0) {
- return;
}
- FixedContentSubscriber subscriber = new FixedContentSubscriber(includeHeaders);
- requestProc.subscribe(subscriber);
- waitForCompletion();
}
private static final byte[] CRLF = {'\r', '\n'};
private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'};
- private ByteBuffer CRLF_BUFFER() {
- return ByteBuffer.wrap(CRLF);
- }
-
- private ByteBuffer EMPTY_CHUNK_HEADER() {
- return ByteBuffer.wrap(EMPTY_CHUNK_BYTES);
- }
-
- /* Returns a header for a particular chunk size */
- private static ByteBuffer getHeader(int size){
+ /** Returns a header for a particular chunk size */
+ private static ByteBuffer getHeader(int size) {
String hexStr = Integer.toHexString(size);
byte[] hexBytes = hexStr.getBytes(US_ASCII);
byte[] header = new byte[hexStr.length()+2];
System.arraycopy(hexBytes, 0, header, 0, hexBytes.length);
header[hexBytes.length] = CRLF[0];
header[hexBytes.length+1] = CRLF[1];
return ByteBuffer.wrap(header);
}
+
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ final System.Logger debug = Utils.getDebugLogger(this::toString, DEBUG);
+
}
< prev index next >