< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java
Print this page
*** 1,7 ****
/*
! * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
--- 1,7 ----
/*
! * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
*** 24,124 ****
*/
package jdk.incubator.http;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
- import java.util.Set;
import java.net.InetSocketAddress;
! import jdk.incubator.http.HttpConnection.Mode;
! import java.nio.charset.StandardCharsets;
! import static java.nio.charset.StandardCharsets.US_ASCII;
! import java.util.concurrent.CompletableFuture;
! import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow;
import jdk.incubator.http.internal.common.HttpHeadersImpl;
import jdk.incubator.http.internal.common.Log;
- import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
/**
! * A HTTP/1.1 request.
! *
! * send() -> Writes the request + body to the given channel, in one blocking
! * operation.
*/
class Http1Request {
! final HttpClientImpl client;
! final HttpRequestImpl request;
! final HttpConnection chan;
! // Multiple buffers are used to hold different parts of request
! // See line 206 and below for description
! final ByteBuffer[] buffers;
! final HttpRequest.BodyProcessor requestProc;
! final HttpHeaders userHeaders;
! final HttpHeadersImpl systemHeaders;
! boolean streaming;
! long contentLength;
! final CompletableFuture<Void> cf;
! Http1Request(HttpRequestImpl request, HttpClientImpl client, HttpConnection connection)
throws IOException
{
- this.client = client;
this.request = request;
! this.chan = connection;
! buffers = new ByteBuffer[5]; // TODO: check
! this.requestProc = request.requestProcessor;
this.userHeaders = request.getUserHeaders();
this.systemHeaders = request.getSystemHeaders();
- this.cf = new MinimalFuture<>();
}
! private void logHeaders() throws IOException {
! StringBuilder sb = new StringBuilder(256);
! sb.append("REQUEST HEADERS:\n");
! Log.dumpHeaders(sb, " ", systemHeaders);
! Log.dumpHeaders(sb, " ", userHeaders);
! Log.logHeaders(sb.toString());
! }
! private void dummy(long x) {
! // not used in this class
}
-
- private void collectHeaders0() throws IOException {
- if (Log.headers()) {
- logHeaders();
}
! StringBuilder sb = new StringBuilder(256);
! collectHeaders1(sb, request, systemHeaders);
! collectHeaders1(sb, request, userHeaders);
sb.append("\r\n");
- String headers = sb.toString();
- buffers[1] = ByteBuffer.wrap(headers.getBytes(StandardCharsets.US_ASCII));
}
! private void collectHeaders1(StringBuilder sb,
! HttpRequestImpl request,
! HttpHeaders headers)
! throws IOException
! {
! Map<String,List<String>> h = headers.map();
! Set<Map.Entry<String,List<String>>> entries = h.entrySet();
!
! for (Map.Entry<String,List<String>> entry : entries) {
String key = entry.getKey();
List<String> values = entry.getValue();
for (String value : values) {
! sb.append(key)
! .append(": ")
! .append(value)
! .append("\r\n");
}
}
}
private String getPathAndQuery(URI uri) {
--- 24,100 ----
*/
package jdk.incubator.http;
import java.io.IOException;
+ import java.lang.System.Logger.Level;
import java.net.URI;
import java.nio.ByteBuffer;
+ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.net.InetSocketAddress;
! import java.util.Objects;
import java.util.concurrent.Flow;
+ import jdk.incubator.http.Http1Exchange.Http1BodySubscriber;
import jdk.incubator.http.internal.common.HttpHeadersImpl;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.Utils;
+ import static java.nio.charset.StandardCharsets.US_ASCII;
+
/**
! * An HTTP/1.1 request.
*/
class Http1Request {
! private final HttpRequestImpl request;
! private final Http1Exchange<?> http1Exchange;
! private final HttpConnection connection;
! private final HttpRequest.BodyPublisher requestPublisher;
! private final HttpHeaders userHeaders;
! private final HttpHeadersImpl systemHeaders;
! private volatile boolean streaming;
! private volatile long contentLength;
! Http1Request(HttpRequestImpl request,
! Http1Exchange<?> http1Exchange)
throws IOException
{
this.request = request;
! this.http1Exchange = http1Exchange;
! this.connection = http1Exchange.connection();
! this.requestPublisher = request.requestPublisher; // may be null
this.userHeaders = request.getUserHeaders();
this.systemHeaders = request.getSystemHeaders();
}
! private void logHeaders(String completeHeaders) {
! if (Log.headers()) {
! //StringBuilder sb = new StringBuilder(256);
! //sb.append("REQUEST HEADERS:\n");
! //Log.dumpHeaders(sb, " ", systemHeaders);
! //Log.dumpHeaders(sb, " ", userHeaders);
! //Log.logHeaders(sb.toString());
! String s = completeHeaders.replaceAll("\r\n", "\n");
! Log.logHeaders("REQUEST HEADERS:\n" + s);
}
}
!
! private void collectHeaders0(StringBuilder sb) {
! collectHeaders1(sb, systemHeaders);
! collectHeaders1(sb, userHeaders);
sb.append("\r\n");
}
! private void collectHeaders1(StringBuilder sb, HttpHeaders headers) {
! for (Map.Entry<String,List<String>> entry : headers.map().entrySet()) {
String key = entry.getKey();
List<String> values = entry.getValue();
for (String value : values) {
! sb.append(key).append(": ").append(value).append("\r\n");
}
}
}
private String getPathAndQuery(URI uri) {
*** 164,174 ****
private String requestURI() {
URI uri = request.uri();
String method = request.method();
! if ((request.proxy(client) == null && !method.equals("CONNECT"))
|| request.isWebSocket()) {
return getPathAndQuery(uri);
}
if (request.secure()) {
if (request.method().equals("CONNECT")) {
--- 140,150 ----
private String requestURI() {
URI uri = request.uri();
String method = request.method();
! if ((request.proxy() == null && !method.equals("CONNECT"))
|| request.isWebSocket()) {
return getPathAndQuery(uri);
}
if (request.secure()) {
if (request.method().equals("CONNECT")) {
*** 177,205 ****
} else {
// requests over tunnel do not require full URL
return getPathAndQuery(uri);
}
}
! return uri == null? authorityString(request.authority()) : uri.toString();
! }
!
! void sendHeadersOnly() throws IOException {
! collectHeaders();
! chan.write(buffers, 0, 2);
}
! void sendRequest() throws IOException {
! collectHeaders();
! chan.configureMode(Mode.BLOCKING);
! if (contentLength == 0) {
! chan.write(buffers, 0, 2);
! } else if (contentLength > 0) {
! writeFixedContent(true);
! } else {
! writeStreamedContent(true);
! }
! setFinished();
}
private boolean finished;
synchronized boolean finished() {
--- 153,168 ----
} else {
// requests over tunnel do not require full URL
return getPathAndQuery(uri);
}
}
! if (request.method().equals("CONNECT")) {
! // use authority for connect itself
! return authorityString(request.authority());
}
! return uri == null? authorityString(request.authority()) : uri.toString();
}
private boolean finished;
synchronized boolean finished() {
*** 208,474 ****
synchronized void setFinished() {
finished = true;
}
! private void collectHeaders() throws IOException {
if (Log.requests() && request != null) {
Log.logRequest(request.toString());
}
String uriString = requestURI();
StringBuilder sb = new StringBuilder(64);
sb.append(request.method())
.append(' ')
.append(uriString)
.append(" HTTP/1.1\r\n");
- String cmd = sb.toString();
- buffers[0] = ByteBuffer.wrap(cmd.getBytes(StandardCharsets.US_ASCII));
URI uri = request.uri();
if (uri != null) {
systemHeaders.setHeader("Host", hostString());
}
! if (request == null) {
! // this is not a user request. No content
contentLength = 0;
} else {
! contentLength = requestProc.contentLength();
}
if (contentLength == 0) {
systemHeaders.setHeader("Content-Length", "0");
- collectHeaders0();
} else if (contentLength > 0) {
! /* [0] request line [1] headers [2] body */
! systemHeaders.setHeader("Content-Length",
! Integer.toString((int) contentLength));
streaming = false;
- collectHeaders0();
- buffers[2] = getBuffer();
} else {
- /* Chunked:
- *
- * [0] request line [1] headers [2] chunk header [3] chunk data [4]
- * final chunk header and trailing CRLF of previous chunks
- *
- * 2,3,4 used repeatedly */
streaming = true;
systemHeaders.setHeader("Transfer-encoding", "chunked");
- collectHeaders0();
- buffers[3] = getBuffer();
- }
}
!
! private ByteBuffer getBuffer() {
! return ByteBuffer.allocate(Utils.BUFSIZE);
}
! // The following two methods used by Http1Exchange to handle expect continue
!
! void continueRequest() throws IOException {
if (streaming) {
! writeStreamedContent(false);
} else {
! writeFixedContent(false);
}
! setFinished();
}
! class StreamSubscriber implements Flow.Subscriber<ByteBuffer> {
! volatile Flow.Subscription subscription;
! volatile boolean includeHeaders;
!
! StreamSubscriber(boolean includeHeaders) {
! this.includeHeaders = includeHeaders;
! }
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
! throw new IllegalStateException("already subscribed");
! }
this.subscription = subscription;
! subscription.request(1);
}
@Override
public void onNext(ByteBuffer item) {
! int startbuf, nbufs;
!
! if (cf.isDone()) {
! throw new IllegalStateException("subscription already completed");
! }
!
! if (includeHeaders) {
! startbuf = 0;
! nbufs = 5;
} else {
- startbuf = 2;
- nbufs = 3;
- }
int chunklen = item.remaining();
! buffers[3] = item;
! buffers[2] = getHeader(chunklen);
! buffers[4] = CRLF_BUFFER();
! try {
! chan.write(buffers, startbuf, nbufs);
! } catch (IOException e) {
! subscription.cancel();
! cf.completeExceptionally(e);
}
- includeHeaders = false;
- subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
! if (cf.isDone()) {
return;
! }
subscription.cancel();
! cf.completeExceptionally(throwable);
}
@Override
public void onComplete() {
! if (cf.isDone()) {
! throw new IllegalStateException("subscription already completed");
! }
! buffers[3] = EMPTY_CHUNK_HEADER();
! buffers[4] = CRLF_BUFFER();
! try {
! chan.write(buffers, 3, 2);
! } catch (IOException ex) {
! cf.completeExceptionally(ex);
! return;
! }
! cf.complete(null);
! }
! }
- private void waitForCompletion() throws IOException {
- try {
- cf.join();
- } catch (CompletionException e) {
- throw Utils.getIOException(e);
}
}
-
- /* Entire request is sent, or just body only */
- private void writeStreamedContent(boolean includeHeaders)
- throws IOException
- {
- StreamSubscriber subscriber = new StreamSubscriber(includeHeaders);
- requestProc.subscribe(subscriber);
- waitForCompletion();
}
! class FixedContentSubscriber implements Flow.Subscriber<ByteBuffer>
! {
! volatile Flow.Subscription subscription;
! volatile boolean includeHeaders;
! volatile long contentWritten = 0;
! FixedContentSubscriber(boolean includeHeaders) {
! this.includeHeaders = includeHeaders;
! }
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
! throw new IllegalStateException("already subscribed");
! }
this.subscription = subscription;
! subscription.request(1);
}
@Override
public void onNext(ByteBuffer item) {
! int startbuf, nbufs;
! long headersLength;
! if (includeHeaders) {
! startbuf = 0;
! nbufs = 3;
! headersLength = buffers[0].remaining() + buffers[1].remaining();
! } else {
! startbuf = 2;
! nbufs = 1;
! headersLength = 0;
! }
! buffers[2] = item;
! try {
! long writing = buffers[2].remaining() + headersLength;
! contentWritten += buffers[2].remaining();
! chan.checkWrite(writing, buffers, startbuf, nbufs);
!
! if (contentWritten > contentLength) {
! String msg = "Too many bytes in request body. Expected: " +
! Long.toString(contentLength) + " Sent: " +
! Long.toString(contentWritten);
! throw new IOException(msg);
! }
! subscription.request(1);
! } catch (IOException e) {
subscription.cancel();
! cf.completeExceptionally(e);
}
}
@Override
public void onError(Throwable throwable) {
! if (cf.isDone()) {
return;
! }
subscription.cancel();
! cf.completeExceptionally(throwable);
}
@Override
public void onComplete() {
! if (cf.isDone()) {
! throw new IllegalStateException("subscription already completed");
! }
!
! if (contentLength > contentWritten) {
subscription.cancel();
! Exception e = new IOException("Too few bytes returned by the processor");
! cf.completeExceptionally(e);
} else {
! cf.complete(null);
! }
}
}
-
- /* Entire request is sent, or just body only */
- private void writeFixedContent(boolean includeHeaders)
- throws IOException {
- if (contentLength == 0) {
- return;
}
- FixedContentSubscriber subscriber = new FixedContentSubscriber(includeHeaders);
- requestProc.subscribe(subscriber);
- waitForCompletion();
}
private static final byte[] CRLF = {'\r', '\n'};
private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'};
! private ByteBuffer CRLF_BUFFER() {
! return ByteBuffer.wrap(CRLF);
! }
!
! private ByteBuffer EMPTY_CHUNK_HEADER() {
! return ByteBuffer.wrap(EMPTY_CHUNK_BYTES);
! }
!
! /* Returns a header for a particular chunk size */
! private static ByteBuffer getHeader(int size){
String hexStr = Integer.toHexString(size);
byte[] hexBytes = hexStr.getBytes(US_ASCII);
byte[] header = new byte[hexStr.length()+2];
System.arraycopy(hexBytes, 0, header, 0, hexBytes.length);
header[hexBytes.length] = CRLF[0];
header[hexBytes.length+1] = CRLF[1];
return ByteBuffer.wrap(header);
}
}
--- 171,374 ----
synchronized void setFinished() {
finished = true;
}
! List<ByteBuffer> headers() {
if (Log.requests() && request != null) {
Log.logRequest(request.toString());
}
String uriString = requestURI();
StringBuilder sb = new StringBuilder(64);
sb.append(request.method())
.append(' ')
.append(uriString)
.append(" HTTP/1.1\r\n");
URI uri = request.uri();
if (uri != null) {
systemHeaders.setHeader("Host", hostString());
}
! if (request == null || requestPublisher == null) {
! // Not a user request, or maybe a method, e.g. GET, with no body.
contentLength = 0;
} else {
! contentLength = requestPublisher.contentLength();
}
if (contentLength == 0) {
systemHeaders.setHeader("Content-Length", "0");
} else if (contentLength > 0) {
! systemHeaders.setHeader("Content-Length", Long.toString(contentLength));
streaming = false;
} else {
streaming = true;
systemHeaders.setHeader("Transfer-encoding", "chunked");
}
! collectHeaders0(sb);
! String hs = sb.toString();
! logHeaders(hs);
! ByteBuffer b = ByteBuffer.wrap(hs.getBytes(US_ASCII));
! return List.of(b);
}
! Http1BodySubscriber continueRequest() {
! Http1BodySubscriber subscriber;
if (streaming) {
! subscriber = new StreamSubscriber();
! requestPublisher.subscribe(subscriber);
} else {
! if (contentLength == 0)
! return null;
!
! subscriber = new FixedContentSubscriber();
! requestPublisher.subscribe(subscriber);
}
! return subscriber;
}
! class StreamSubscriber extends Http1BodySubscriber {
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
! Throwable t = new IllegalStateException("already subscribed");
! http1Exchange.appendToOutgoing(t);
! } else {
this.subscription = subscription;
! }
}
@Override
public void onNext(ByteBuffer item) {
! Objects.requireNonNull(item);
! if (complete) {
! Throwable t = new IllegalStateException("subscription already completed");
! http1Exchange.appendToOutgoing(t);
} else {
int chunklen = item.remaining();
! ArrayList<ByteBuffer> l = new ArrayList<>(3);
! l.add(getHeader(chunklen));
! l.add(item);
! l.add(ByteBuffer.wrap(CRLF));
! http1Exchange.appendToOutgoing(l);
}
}
@Override
public void onError(Throwable throwable) {
! if (complete)
return;
!
subscription.cancel();
! http1Exchange.appendToOutgoing(throwable);
}
@Override
public void onComplete() {
! if (complete) {
! Throwable t = new IllegalStateException("subscription already completed");
! http1Exchange.appendToOutgoing(t);
! } else {
! ArrayList<ByteBuffer> l = new ArrayList<>(2);
! l.add(ByteBuffer.wrap(EMPTY_CHUNK_BYTES));
! l.add(ByteBuffer.wrap(CRLF));
! complete = true;
! //setFinished();
! http1Exchange.appendToOutgoing(l);
! http1Exchange.appendToOutgoing(COMPLETED);
! setFinished(); // TODO: before or after,? does it matter?
}
}
}
! class FixedContentSubscriber extends Http1BodySubscriber {
! private volatile long contentWritten;
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
! Throwable t = new IllegalStateException("already subscribed");
! http1Exchange.appendToOutgoing(t);
! } else {
this.subscription = subscription;
! }
}
@Override
public void onNext(ByteBuffer item) {
! debug.log(Level.DEBUG, "onNext");
! Objects.requireNonNull(item);
! if (complete) {
! Throwable t = new IllegalStateException("subscription already completed");
! http1Exchange.appendToOutgoing(t);
! } else {
! long writing = item.remaining();
! long written = (contentWritten += writing);
! if (written > contentLength) {
subscription.cancel();
! String msg = connection.getConnectionFlow()
! + " [" + Thread.currentThread().getName() +"] "
! + "Too many bytes in request body. Expected: "
! + contentLength + ", got: " + written;
! http1Exchange.appendToOutgoing(new IOException(msg));
! } else {
! http1Exchange.appendToOutgoing(List.of(item));
! }
}
}
@Override
public void onError(Throwable throwable) {
! debug.log(Level.DEBUG, "onError");
! if (complete) // TODO: error?
return;
!
subscription.cancel();
! http1Exchange.appendToOutgoing(throwable);
}
@Override
public void onComplete() {
! debug.log(Level.DEBUG, "onComplete");
! if (complete) {
! Throwable t = new IllegalStateException("subscription already completed");
! http1Exchange.appendToOutgoing(t);
! } else {
! complete = true;
! long written = contentWritten;
! if (contentLength > written) {
subscription.cancel();
! Throwable t = new IOException(connection.getConnectionFlow()
! + " [" + Thread.currentThread().getName() +"] "
! + "Too few bytes returned by the publisher ("
! + written + "/"
! + contentLength + ")");
! http1Exchange.appendToOutgoing(t);
} else {
! http1Exchange.appendToOutgoing(COMPLETED);
}
}
}
}
private static final byte[] CRLF = {'\r', '\n'};
private static final byte[] EMPTY_CHUNK_BYTES = {'0', '\r', '\n'};
! /** Returns a header for a particular chunk size */
! private static ByteBuffer getHeader(int size) {
String hexStr = Integer.toHexString(size);
byte[] hexBytes = hexStr.getBytes(US_ASCII);
byte[] header = new byte[hexStr.length()+2];
System.arraycopy(hexBytes, 0, header, 0, hexBytes.length);
header[hexBytes.length] = CRLF[0];
header[hexBytes.length+1] = CRLF[1];
return ByteBuffer.wrap(header);
}
+
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ final System.Logger debug = Utils.getDebugLogger(this::toString, DEBUG);
+
}
< prev index next >