< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java
Print this page
*** 24,35 ****
*/
package jdk.incubator.http;
import java.io.IOException;
import java.nio.ByteBuffer;
! import java.util.Optional;
import java.util.function.Consumer;
import jdk.incubator.http.internal.common.Utils;
/**
* Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
--- 24,37 ----
*/
package jdk.incubator.http;
import java.io.IOException;
+ import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
! import java.util.ArrayList;
! import java.util.List;
import java.util.function.Consumer;
import jdk.incubator.http.internal.common.Utils;
/**
* Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
*** 37,85 ****
* Call pushBody() to read the body (blocking). Data and errors are provided
* to given Consumers. After final buffer delivered, empty optional delivered
*/
class ResponseContent {
! final HttpResponse.BodyProcessor<?> pusher;
! final HttpConnection connection;
final int contentLength;
! ByteBuffer buffer;
! //ByteBuffer lastBufferUsed;
! final ResponseHeaders headers;
! private final Consumer<Optional<ByteBuffer>> dataConsumer;
! private final Consumer<IOException> errorConsumer;
! private final HttpClientImpl client;
// this needs to run before we complete the body
// so that connection can be returned to pool
private final Runnable onFinished;
ResponseContent(HttpConnection connection,
int contentLength,
! ResponseHeaders h,
! HttpResponse.BodyProcessor<?> userProcessor,
! Consumer<Optional<ByteBuffer>> dataConsumer,
! Consumer<IOException> errorConsumer,
Runnable onFinished)
{
! this.pusher = (HttpResponse.BodyProcessor)userProcessor;
! this.connection = connection;
this.contentLength = contentLength;
this.headers = h;
- this.dataConsumer = dataConsumer;
- this.errorConsumer = errorConsumer;
- this.client = connection.client;
this.onFinished = onFinished;
}
static final int LF = 10;
static final int CR = 13;
- static final int SP = 0x20;
- static final int BUF_SIZE = 1024;
! boolean chunkedContent, chunkedContentInitialized;
! private boolean contentChunked() throws IOException {
if (chunkedContentInitialized) {
return chunkedContent;
}
if (contentLength == -1) {
String tc = headers.firstValue("Transfer-Encoding")
--- 39,77 ----
* Call pushBody() to read the body (blocking). Data and errors are provided
* to given Consumers. After final buffer delivered, empty optional delivered
*/
class ResponseContent {
! static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
!
! final HttpResponse.BodySubscriber<?> pusher;
final int contentLength;
! final HttpHeaders headers;
// this needs to run before we complete the body
// so that connection can be returned to pool
private final Runnable onFinished;
+ private final String dbgTag;
ResponseContent(HttpConnection connection,
int contentLength,
! HttpHeaders h,
! HttpResponse.BodySubscriber<?> userSubscriber,
Runnable onFinished)
{
! this.pusher = userSubscriber;
this.contentLength = contentLength;
this.headers = h;
this.onFinished = onFinished;
+ this.dbgTag = connection.dbgString() + "/ResponseContent";
}
static final int LF = 10;
static final int CR = 13;
! private boolean chunkedContent, chunkedContentInitialized;
! boolean contentChunked() throws IOException {
if (chunkedContentInitialized) {
return chunkedContent;
}
if (contentLength == -1) {
String tc = headers.firstValue("Transfer-Encoding")
*** 96,241 ****
}
chunkedContentInitialized = true;
return chunkedContent;
}
! /**
! * Entry point for pusher. b is an initial ByteBuffer that may
! * have some data in it. When this method returns, the body
! * has been fully processed.
! */
! void pushBody(ByteBuffer b) {
! try {
! // TODO: check status
if (contentChunked()) {
! pushBodyChunked(b);
} else {
! pushBodyFixed(b);
}
! } catch (IOException t) {
! errorConsumer.accept(t);
}
}
// reads and returns chunklen. Position of chunkbuf is first byte
// of chunk on return. chunklen includes the CR LF at end of chunk
! int readChunkLen() throws IOException {
! chunklen = 0;
! boolean cr = false;
! while (true) {
! getHunk();
int c = chunkbuf.get();
if (cr) {
if (c == LF) {
! return chunklen + 2;
} else {
throw new IOException("invalid chunk header");
}
}
if (c == CR) {
cr = true;
} else {
int digit = toDigit(c);
! chunklen = chunklen * 16 + digit;
}
}
}
- int chunklen = -1; // number of bytes in chunk (fixed)
- int bytesremaining; // number of bytes in chunk left to be read incl CRLF
- int bytesread;
- ByteBuffer chunkbuf; // initialise
! // make sure we have at least 1 byte to look at
! private void getHunk() throws IOException {
! if (chunkbuf == null || !chunkbuf.hasRemaining()) {
! chunkbuf = connection.read();
! }
}
- private void consumeBytes(int n) throws IOException {
- getHunk();
- while (n > 0) {
- int e = Math.min(chunkbuf.remaining(), n);
chunkbuf.position(chunkbuf.position() + e);
n -= e;
! if (n > 0) {
! getHunk();
! }
}
}
/**
! * Returns a ByteBuffer containing a chunk of data or a "hunk" of data
* (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
! * ByteBuffer returned is obtained from response processor.
*/
! ByteBuffer readChunkedBuffer() throws IOException {
! if (chunklen == -1) {
! // new chunk
! chunklen = readChunkLen() - 2;
! bytesremaining = chunklen;
! if (chunklen == 0) {
! consumeBytes(2);
! return null;
}
}
! getHunk();
! bytesread = chunkbuf.remaining();
! ByteBuffer returnBuffer = Utils.getBuffer();
! int space = returnBuffer.remaining();
!
! int bytes2Copy = Math.min(bytesread, Math.min(bytesremaining, space));
! Utils.copy(chunkbuf, returnBuffer, bytes2Copy);
! returnBuffer.flip();
! bytesremaining -= bytes2Copy;
! if (bytesremaining == 0) {
! consumeBytes(2);
chunklen = -1;
}
- return returnBuffer;
}
!
! ByteBuffer initialBuffer;
! int fixedBytesReturned;
!
! //ByteBuffer getResidue() {
! //return lastBufferUsed;
! //}
!
! private void compactBuffer(ByteBuffer buf) {
! buf.compact()
! .flip();
}
!
! /**
! * Copies inbuf (numBytes from its position) to new buffer. The returned
! * buffer's position is zero and limit is at end (numBytes)
! */
! private ByteBuffer copyBuffer(ByteBuffer inbuf, int numBytes) {
! ByteBuffer b1 = Utils.getBuffer();
! assert b1.remaining() >= numBytes;
! byte[] b = b1.array();
! inbuf.get(b, 0, numBytes);
! b1.limit(numBytes);
! return b1;
}
! private void pushBodyChunked(ByteBuffer b) throws IOException {
! chunkbuf = b;
! while (true) {
! ByteBuffer b1 = readChunkedBuffer();
if (b1 != null) {
if (b1.hasRemaining()) {
! dataConsumer.accept(Optional.of(b1));
}
} else {
! onFinished.run();
! dataConsumer.accept(Optional.empty());
! return;
! }
}
}
private int toDigit(int b) throws IOException {
if (b >= 0x30 && b <= 0x39) {
--- 88,365 ----
}
chunkedContentInitialized = true;
return chunkedContent;
}
! interface BodyParser extends Consumer<ByteBuffer> {
! void onSubscribe(AbstractSubscription sub);
! }
!
! // Returns a parser that will take care of parsing the received byte
! // buffers and forward them to the BodySubscriber.
! // When the parser is done, it will call onComplete.
! // If parsing was successful, the throwable parameter will be null.
! // Otherwise it will be the exception that occurred
! // Note: revisit: it might be better to use a CompletableFuture than
! // a completion handler.
! BodyParser getBodyParser(Consumer<Throwable> onComplete)
! throws IOException {
if (contentChunked()) {
! return new ChunkedBodyParser(onComplete);
} else {
! return new FixedLengthBodyParser(contentLength, onComplete);
}
! }
!
!
! static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
! class ChunkedBodyParser implements BodyParser {
! final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
! final Consumer<Throwable> onComplete;
! final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
! final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
!
! volatile Throwable closedExceptionally;
! volatile int partialChunklen = 0; // partially read chunk len
! volatile int chunklen = -1; // number of bytes in chunk
! volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF
! volatile boolean cr = false; // tryReadChunkLength has found CR
! volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding
! volatile ChunkState state = ChunkState.READING_LENGTH; // current state
! volatile AbstractSubscription sub;
! ChunkedBodyParser(Consumer<Throwable> onComplete) {
! this.onComplete = onComplete;
! }
!
! String dbgString() {
! return dbgTag;
! }
!
! @Override
! public void onSubscribe(AbstractSubscription sub) {
! debug.log(Level.DEBUG, () -> "onSubscribe: "
! + pusher.getClass().getName());
! pusher.onSubscribe(this.sub = sub);
! }
!
! @Override
! public void accept(ByteBuffer b) {
! if (closedExceptionally != null) {
! debug.log(Level.DEBUG, () -> "already closed: "
! + closedExceptionally);
! return;
! }
! boolean completed = false;
! try {
! List<ByteBuffer> out = new ArrayList<>();
! do {
! if (tryPushOneHunk(b, out)) {
! // We're done! (true if the final chunk was parsed).
! if (!out.isEmpty()) {
! // push what we have and complete
! // only reduce demand if we actually push something.
! // we would not have come here if there was no
! // demand.
! boolean hasDemand = sub.demand().tryDecrement();
! assert hasDemand;
! pusher.onNext(out);
! }
! debug.log(Level.DEBUG, () -> "done!");
! assert closedExceptionally == null;
! assert state == ChunkState.DONE;
! onFinished.run();
! pusher.onComplete();
! completed = true;
! onComplete.accept(closedExceptionally); // should be null
! break;
! }
! // the buffer may contain several hunks, and therefore
! // we must loop while it's not exhausted.
! } while (b.hasRemaining());
!
! if (!completed && !out.isEmpty()) {
! // push what we have.
! // only reduce demand if we actually push something.
! // we would not have come here if there was no
! // demand.
! boolean hasDemand = sub.demand().tryDecrement();
! assert hasDemand;
! pusher.onNext(out);
! }
! assert state == ChunkState.DONE || !b.hasRemaining();
! } catch(Throwable t) {
! closedExceptionally = t;
! if (!completed) onComplete.accept(t);
}
}
// reads and returns chunklen. Position of chunkbuf is first byte
// of chunk on return. chunklen includes the CR LF at end of chunk
! // returns -1 if needs more bytes
! private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
! assert state == ChunkState.READING_LENGTH;
! while (chunkbuf.hasRemaining()) {
int c = chunkbuf.get();
if (cr) {
if (c == LF) {
! return partialChunklen;
} else {
throw new IOException("invalid chunk header");
}
}
if (c == CR) {
cr = true;
} else {
int digit = toDigit(c);
! partialChunklen = partialChunklen * 16 + digit;
}
}
+ return -1;
}
! // try to consume as many bytes as specified by bytesToConsume.
! // returns the number of bytes that still need to be consumed.
! // In practice this method is only called to consume one CRLF pair
! // with bytesToConsume set to 2, so it will only return 0 (if completed),
! // 1, or 2 (if chunkbuf doesn't have the 2 chars).
! private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
! int n = bytesToConsume;
! if (n > 0) {
! int e = Math.min(chunkbuf.remaining(), n);
!
! // verifies some assertions
! // this methods is called only to consume CRLF
! if (Utils.ASSERTIONSENABLED) {
! assert n <= 2 && e <= 2;
! ByteBuffer tmp = chunkbuf.slice();
! // if n == 2 assert that we will first consume CR
! assert (n == 2 && e > 0) ? tmp.get() == CR : true;
! // if n == 1 || n == 2 && e == 2 assert that we then consume LF
! assert (n == 1 || e == 2) ? tmp.get() == LF : true;
}
chunkbuf.position(chunkbuf.position() + e);
n -= e;
! bytesToConsume = n;
}
+ assert n >= 0;
+ return n;
}
/**
! * Returns a ByteBuffer containing chunk of data or a "hunk" of data
* (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
! * If the given chunk does not have enough data this method return
! * an empty ByteBuffer (READMORE).
! * If we encounter the final chunk (an empty chunk) this method
! * returns null.
*/
! ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
! int unfulfilled = bytesremaining;
! int toconsume = bytesToConsume;
! ChunkState st = state;
! if (st == ChunkState.READING_LENGTH && chunklen == -1) {
! debug.log(Level.DEBUG, () -> "Trying to read chunk len"
! + " (remaining in buffer:"+chunk.remaining()+")");
! int clen = chunklen = tryReadChunkLen(chunk);
! if (clen == -1) return READMORE;
! debug.log(Level.DEBUG, "Got chunk len %d", clen);
! cr = false; partialChunklen = 0;
! unfulfilled = bytesremaining = clen;
! if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
! else st = state = ChunkState.READING_DATA; // read the data
! }
!
! if (toconsume > 0) {
! debug.log(Level.DEBUG,
! "Trying to consume bytes: %d (remaining in buffer: %s)",
! toconsume, chunk.remaining());
! if (tryConsumeBytes(chunk) > 0) {
! return READMORE;
! }
}
+
+ toconsume = bytesToConsume;
+ assert toconsume == 0;
+
+
+ if (st == ChunkState.READING_LENGTH) {
+ // we will come here only if chunklen was 0, after having
+ // consumed the trailing CRLF
+ int clen = chunklen;
+ assert clen == 0;
+ debug.log(Level.DEBUG, "No more chunks: %d", clen);
+ // the DONE state is not really needed but it helps with
+ // assertions...
+ state = ChunkState.DONE;
+ return null;
}
! int clen = chunklen;
! assert clen > 0;
! assert st == ChunkState.READING_DATA;
!
! ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
! if (unfulfilled > 0) {
! int bytesread = chunk.remaining();
! debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
! bytesread, unfulfilled);
!
! int bytes2return = Math.min(bytesread, unfulfilled);
! debug.log(Level.DEBUG, "Returning chunk bytes: %d", bytes2return);
! returnBuffer = Utils.slice(chunk, bytes2return);
! unfulfilled = bytesremaining -= bytes2return;
! if (unfulfilled == 0) bytesToConsume = 2;
! }
!
! assert unfulfilled >= 0;
!
! if (unfulfilled == 0) {
! debug.log(Level.DEBUG,
! "No more bytes to read - %d yet to consume.",
! unfulfilled);
! // check whether the trailing CRLF is consumed, try to
! // consume it if not. If tryConsumeBytes needs more bytes
! // then we will come back here later - skipping the block
! // that reads data because remaining==0, and finding
! // that the two bytes are now consumed.
! if (tryConsumeBytes(chunk) == 0) {
! // we're done for this chunk! reset all states and
! // prepare to read the next chunk.
chunklen = -1;
+ partialChunklen = 0;
+ cr = false;
+ state = ChunkState.READING_LENGTH;
+ debug.log(Level.DEBUG, "Ready to read next chunk");
}
}
! if (returnBuffer == READMORE) {
! debug.log(Level.DEBUG, "Need more data");
}
! return returnBuffer;
}
!
! // Attempt to parse and push one hunk from the buffer.
! // Returns true if the final chunk was parsed.
! // Returns false if we need to push more chunks.
! private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
! throws IOException {
! assert state != ChunkState.DONE;
! ByteBuffer b1 = tryReadOneHunk(b);
if (b1 != null) {
+ //assert b1.hasRemaining() || b1 == READMORE;
if (b1.hasRemaining()) {
! debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
! b1.remaining());
! out.add(b1);
! debug.log(Level.DEBUG, "Chunk sent.");
}
+ return false; // we haven't parsed the final chunk yet.
} else {
! return true; // we're done! the final chunk was parsed.
}
}
private int toDigit(int b) throws IOException {
if (b >= 0x30 && b <= 0x39) {
*** 248,279 ****
return b - 0x61 + 10;
}
throw new IOException("Invalid chunk header byte " + b);
}
- private void pushBodyFixed(ByteBuffer b) throws IOException {
- int remaining = contentLength;
- while (b.hasRemaining() && remaining > 0) {
- ByteBuffer buffer = Utils.getBuffer();
- int amount = Math.min(b.remaining(), remaining);
- Utils.copy(b, buffer, amount);
- remaining -= amount;
- buffer.flip();
- dataConsumer.accept(Optional.of(buffer));
- }
- while (remaining > 0) {
- ByteBuffer buffer = connection.read();
- if (buffer == null)
- throw new IOException("connection closed");
-
- int bytesread = buffer.remaining();
- // assume for now that pipelining not implemented
- if (bytesread > remaining) {
- throw new IOException("too many bytes read");
}
! remaining -= bytesread;
! dataConsumer.accept(Optional.of(buffer));
}
onFinished.run();
! dataConsumer.accept(Optional.empty());
}
}
--- 372,463 ----
return b - 0x61 + 10;
}
throw new IOException("Invalid chunk header byte " + b);
}
}
!
! class FixedLengthBodyParser implements BodyParser {
! final int contentLength;
! final Consumer<Throwable> onComplete;
! final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
! final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
! volatile int remaining;
! volatile Throwable closedExceptionally;
! volatile AbstractSubscription sub;
! FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
! this.contentLength = this.remaining = contentLength;
! this.onComplete = onComplete;
! }
!
! String dbgString() {
! return dbgTag;
! }
!
! @Override
! public void onSubscribe(AbstractSubscription sub) {
! debug.log(Level.DEBUG, () -> "length="
! + contentLength +", onSubscribe: "
! + pusher.getClass().getName());
! pusher.onSubscribe(this.sub = sub);
! try {
! if (contentLength == 0) {
! pusher.onComplete();
! onComplete.accept(null);
! }
! } catch (Throwable t) {
! closedExceptionally = t;
! try {
! pusher.onError(t);
! } finally {
! onComplete.accept(t);
! }
! }
! }
!
! @Override
! public void accept(ByteBuffer b) {
! if (closedExceptionally != null) {
! debug.log(Level.DEBUG, () -> "already closed: "
! + closedExceptionally);
! return;
}
+ boolean completed = false;
+ try {
+ int unfulfilled = remaining;
+ debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
+ b.remaining(), unfulfilled, contentLength);
+ assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
+
+ if (unfulfilled == 0 && contentLength > 0) return;
+
+ if (b.hasRemaining() && unfulfilled > 0) {
+ // only reduce demand if we actually push something.
+ // we would not have come here if there was no
+ // demand.
+ boolean hasDemand = sub.demand().tryDecrement();
+ assert hasDemand;
+ int amount = Math.min(b.remaining(), unfulfilled);
+ unfulfilled = remaining -= amount;
+ ByteBuffer buffer = Utils.slice(b, amount);
+ pusher.onNext(List.of(buffer));
+ }
+ if (unfulfilled == 0) {
+ // We're done! All data has been received.
+ assert closedExceptionally == null;
onFinished.run();
! pusher.onComplete();
! completed = true;
! onComplete.accept(closedExceptionally); // should be null
! } else {
! assert b.remaining() == 0;
! }
! } catch (Throwable t) {
! debug.log(Level.DEBUG, "Unexpected exception", t);
! closedExceptionally = t;
! if (!completed) {
! onComplete.accept(t);
! }
! }
! }
}
}
< prev index next >