< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java

Print this page

        

*** 24,35 **** */ package jdk.incubator.http; import java.io.IOException; import java.nio.ByteBuffer; ! import java.util.Optional; import java.util.function.Consumer; import jdk.incubator.http.internal.common.Utils; /** * Implements chunked/fixed transfer encodings of HTTP/1.1 responses. --- 24,37 ---- */ package jdk.incubator.http; import java.io.IOException; + import java.lang.System.Logger.Level; import java.nio.ByteBuffer; ! import java.util.ArrayList; ! import java.util.List; import java.util.function.Consumer; import jdk.incubator.http.internal.common.Utils; /** * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
*** 37,85 **** * Call pushBody() to read the body (blocking). Data and errors are provided * to given Consumers. After final buffer delivered, empty optional delivered */ class ResponseContent { ! final HttpResponse.BodyProcessor<?> pusher; ! final HttpConnection connection; final int contentLength; ! ByteBuffer buffer; ! //ByteBuffer lastBufferUsed; ! final ResponseHeaders headers; ! private final Consumer<Optional<ByteBuffer>> dataConsumer; ! private final Consumer<IOException> errorConsumer; ! private final HttpClientImpl client; // this needs to run before we complete the body // so that connection can be returned to pool private final Runnable onFinished; ResponseContent(HttpConnection connection, int contentLength, ! ResponseHeaders h, ! HttpResponse.BodyProcessor<?> userProcessor, ! Consumer<Optional<ByteBuffer>> dataConsumer, ! Consumer<IOException> errorConsumer, Runnable onFinished) { ! this.pusher = (HttpResponse.BodyProcessor)userProcessor; ! this.connection = connection; this.contentLength = contentLength; this.headers = h; - this.dataConsumer = dataConsumer; - this.errorConsumer = errorConsumer; - this.client = connection.client; this.onFinished = onFinished; } static final int LF = 10; static final int CR = 13; - static final int SP = 0x20; - static final int BUF_SIZE = 1024; ! boolean chunkedContent, chunkedContentInitialized; ! private boolean contentChunked() throws IOException { if (chunkedContentInitialized) { return chunkedContent; } if (contentLength == -1) { String tc = headers.firstValue("Transfer-Encoding") --- 39,77 ---- * Call pushBody() to read the body (blocking). Data and errors are provided * to given Consumers. After final buffer delivered, empty optional delivered */ class ResponseContent { ! static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. ! ! final HttpResponse.BodySubscriber<?> pusher; final int contentLength; ! final HttpHeaders headers; // this needs to run before we complete the body // so that connection can be returned to pool private final Runnable onFinished; + private final String dbgTag; ResponseContent(HttpConnection connection, int contentLength, ! HttpHeaders h, ! HttpResponse.BodySubscriber<?> userSubscriber, Runnable onFinished) { ! this.pusher = userSubscriber; this.contentLength = contentLength; this.headers = h; this.onFinished = onFinished; + this.dbgTag = connection.dbgString() + "/ResponseContent"; } static final int LF = 10; static final int CR = 13; ! private boolean chunkedContent, chunkedContentInitialized; ! boolean contentChunked() throws IOException { if (chunkedContentInitialized) { return chunkedContent; } if (contentLength == -1) { String tc = headers.firstValue("Transfer-Encoding")
*** 96,241 **** } chunkedContentInitialized = true; return chunkedContent; } ! /** ! * Entry point for pusher. b is an initial ByteBuffer that may ! * have some data in it. When this method returns, the body ! * has been fully processed. ! */ ! void pushBody(ByteBuffer b) { ! try { ! // TODO: check status if (contentChunked()) { ! pushBodyChunked(b); } else { ! pushBodyFixed(b); } ! } catch (IOException t) { ! errorConsumer.accept(t); } } // reads and returns chunklen. Position of chunkbuf is first byte // of chunk on return. chunklen includes the CR LF at end of chunk ! int readChunkLen() throws IOException { ! chunklen = 0; ! boolean cr = false; ! while (true) { ! getHunk(); int c = chunkbuf.get(); if (cr) { if (c == LF) { ! return chunklen + 2; } else { throw new IOException("invalid chunk header"); } } if (c == CR) { cr = true; } else { int digit = toDigit(c); ! chunklen = chunklen * 16 + digit; } } } - int chunklen = -1; // number of bytes in chunk (fixed) - int bytesremaining; // number of bytes in chunk left to be read incl CRLF - int bytesread; - ByteBuffer chunkbuf; // initialise ! // make sure we have at least 1 byte to look at ! private void getHunk() throws IOException { ! if (chunkbuf == null || !chunkbuf.hasRemaining()) { ! chunkbuf = connection.read(); ! } } - private void consumeBytes(int n) throws IOException { - getHunk(); - while (n > 0) { - int e = Math.min(chunkbuf.remaining(), n); chunkbuf.position(chunkbuf.position() + e); n -= e; ! if (n > 0) { ! getHunk(); ! } } } /** ! * Returns a ByteBuffer containing a chunk of data or a "hunk" of data * (a chunk of a chunk if the chunk size is larger than our ByteBuffers). ! * ByteBuffer returned is obtained from response processor. */ ! ByteBuffer readChunkedBuffer() throws IOException { ! if (chunklen == -1) { ! // new chunk ! chunklen = readChunkLen() - 2; ! bytesremaining = chunklen; ! if (chunklen == 0) { ! consumeBytes(2); ! return null; } } ! getHunk(); ! bytesread = chunkbuf.remaining(); ! ByteBuffer returnBuffer = Utils.getBuffer(); ! int space = returnBuffer.remaining(); ! ! int bytes2Copy = Math.min(bytesread, Math.min(bytesremaining, space)); ! Utils.copy(chunkbuf, returnBuffer, bytes2Copy); ! returnBuffer.flip(); ! bytesremaining -= bytes2Copy; ! if (bytesremaining == 0) { ! consumeBytes(2); chunklen = -1; } - return returnBuffer; } ! ! ByteBuffer initialBuffer; ! int fixedBytesReturned; ! ! //ByteBuffer getResidue() { ! //return lastBufferUsed; ! //} ! ! private void compactBuffer(ByteBuffer buf) { ! buf.compact() ! .flip(); } ! ! /** ! * Copies inbuf (numBytes from its position) to new buffer. The returned ! * buffer's position is zero and limit is at end (numBytes) ! */ ! private ByteBuffer copyBuffer(ByteBuffer inbuf, int numBytes) { ! ByteBuffer b1 = Utils.getBuffer(); ! assert b1.remaining() >= numBytes; ! byte[] b = b1.array(); ! inbuf.get(b, 0, numBytes); ! b1.limit(numBytes); ! return b1; } ! private void pushBodyChunked(ByteBuffer b) throws IOException { ! chunkbuf = b; ! while (true) { ! ByteBuffer b1 = readChunkedBuffer(); if (b1 != null) { if (b1.hasRemaining()) { ! dataConsumer.accept(Optional.of(b1)); } } else { ! onFinished.run(); ! dataConsumer.accept(Optional.empty()); ! return; ! } } } private int toDigit(int b) throws IOException { if (b >= 0x30 && b <= 0x39) { --- 88,365 ---- } chunkedContentInitialized = true; return chunkedContent; } ! interface BodyParser extends Consumer<ByteBuffer> { ! void onSubscribe(AbstractSubscription sub); ! } ! ! // Returns a parser that will take care of parsing the received byte ! // buffers and forward them to the BodySubscriber. ! // When the parser is done, it will call onComplete. ! // If parsing was successful, the throwable parameter will be null. ! // Otherwise it will be the exception that occurred ! // Note: revisit: it might be better to use a CompletableFuture than ! // a completion handler. ! BodyParser getBodyParser(Consumer<Throwable> onComplete) ! throws IOException { if (contentChunked()) { ! return new ChunkedBodyParser(onComplete); } else { ! return new FixedLengthBodyParser(contentLength, onComplete); } ! } ! ! ! static enum ChunkState {READING_LENGTH, READING_DATA, DONE} ! class ChunkedBodyParser implements BodyParser { ! final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER; ! final Consumer<Throwable> onComplete; ! final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); ! final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser"; ! ! volatile Throwable closedExceptionally; ! volatile int partialChunklen = 0; // partially read chunk len ! volatile int chunklen = -1; // number of bytes in chunk ! volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF ! volatile boolean cr = false; // tryReadChunkLength has found CR ! volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding ! volatile ChunkState state = ChunkState.READING_LENGTH; // current state ! volatile AbstractSubscription sub; ! ChunkedBodyParser(Consumer<Throwable> onComplete) { ! this.onComplete = onComplete; ! } ! ! String dbgString() { ! return dbgTag; ! } ! ! @Override ! public void onSubscribe(AbstractSubscription sub) { ! debug.log(Level.DEBUG, () -> "onSubscribe: " ! + pusher.getClass().getName()); ! pusher.onSubscribe(this.sub = sub); ! } ! ! @Override ! public void accept(ByteBuffer b) { ! if (closedExceptionally != null) { ! debug.log(Level.DEBUG, () -> "already closed: " ! + closedExceptionally); ! return; ! } ! boolean completed = false; ! try { ! List<ByteBuffer> out = new ArrayList<>(); ! do { ! if (tryPushOneHunk(b, out)) { ! // We're done! (true if the final chunk was parsed). ! if (!out.isEmpty()) { ! // push what we have and complete ! // only reduce demand if we actually push something. ! // we would not have come here if there was no ! // demand. ! boolean hasDemand = sub.demand().tryDecrement(); ! assert hasDemand; ! pusher.onNext(out); ! } ! debug.log(Level.DEBUG, () -> "done!"); ! assert closedExceptionally == null; ! assert state == ChunkState.DONE; ! onFinished.run(); ! pusher.onComplete(); ! completed = true; ! onComplete.accept(closedExceptionally); // should be null ! break; ! } ! // the buffer may contain several hunks, and therefore ! // we must loop while it's not exhausted. ! } while (b.hasRemaining()); ! ! if (!completed && !out.isEmpty()) { ! // push what we have. ! // only reduce demand if we actually push something. ! // we would not have come here if there was no ! // demand. ! boolean hasDemand = sub.demand().tryDecrement(); ! assert hasDemand; ! pusher.onNext(out); ! } ! assert state == ChunkState.DONE || !b.hasRemaining(); ! } catch(Throwable t) { ! closedExceptionally = t; ! if (!completed) onComplete.accept(t); } } // reads and returns chunklen. Position of chunkbuf is first byte // of chunk on return. chunklen includes the CR LF at end of chunk ! // returns -1 if needs more bytes ! private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException { ! assert state == ChunkState.READING_LENGTH; ! while (chunkbuf.hasRemaining()) { int c = chunkbuf.get(); if (cr) { if (c == LF) { ! return partialChunklen; } else { throw new IOException("invalid chunk header"); } } if (c == CR) { cr = true; } else { int digit = toDigit(c); ! partialChunklen = partialChunklen * 16 + digit; } } + return -1; } ! // try to consume as many bytes as specified by bytesToConsume. ! // returns the number of bytes that still need to be consumed. ! // In practice this method is only called to consume one CRLF pair ! // with bytesToConsume set to 2, so it will only return 0 (if completed), ! // 1, or 2 (if chunkbuf doesn't have the 2 chars). ! private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException { ! int n = bytesToConsume; ! if (n > 0) { ! int e = Math.min(chunkbuf.remaining(), n); ! ! // verifies some assertions ! // this methods is called only to consume CRLF ! if (Utils.ASSERTIONSENABLED) { ! assert n <= 2 && e <= 2; ! ByteBuffer tmp = chunkbuf.slice(); ! // if n == 2 assert that we will first consume CR ! assert (n == 2 && e > 0) ? tmp.get() == CR : true; ! // if n == 1 || n == 2 && e == 2 assert that we then consume LF ! assert (n == 1 || e == 2) ? tmp.get() == LF : true; } chunkbuf.position(chunkbuf.position() + e); n -= e; ! bytesToConsume = n; } + assert n >= 0; + return n; } /** ! * Returns a ByteBuffer containing chunk of data or a "hunk" of data * (a chunk of a chunk if the chunk size is larger than our ByteBuffers). ! * If the given chunk does not have enough data this method return ! * an empty ByteBuffer (READMORE). ! * If we encounter the final chunk (an empty chunk) this method ! * returns null. */ ! ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException { ! int unfulfilled = bytesremaining; ! int toconsume = bytesToConsume; ! ChunkState st = state; ! if (st == ChunkState.READING_LENGTH && chunklen == -1) { ! debug.log(Level.DEBUG, () -> "Trying to read chunk len" ! + " (remaining in buffer:"+chunk.remaining()+")"); ! int clen = chunklen = tryReadChunkLen(chunk); ! if (clen == -1) return READMORE; ! debug.log(Level.DEBUG, "Got chunk len %d", clen); ! cr = false; partialChunklen = 0; ! unfulfilled = bytesremaining = clen; ! if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk ! else st = state = ChunkState.READING_DATA; // read the data ! } ! ! if (toconsume > 0) { ! debug.log(Level.DEBUG, ! "Trying to consume bytes: %d (remaining in buffer: %s)", ! toconsume, chunk.remaining()); ! if (tryConsumeBytes(chunk) > 0) { ! return READMORE; ! } } + + toconsume = bytesToConsume; + assert toconsume == 0; + + + if (st == ChunkState.READING_LENGTH) { + // we will come here only if chunklen was 0, after having + // consumed the trailing CRLF + int clen = chunklen; + assert clen == 0; + debug.log(Level.DEBUG, "No more chunks: %d", clen); + // the DONE state is not really needed but it helps with + // assertions... + state = ChunkState.DONE; + return null; } ! int clen = chunklen; ! assert clen > 0; ! assert st == ChunkState.READING_DATA; ! ! ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk ! if (unfulfilled > 0) { ! int bytesread = chunk.remaining(); ! debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d", ! bytesread, unfulfilled); ! ! int bytes2return = Math.min(bytesread, unfulfilled); ! debug.log(Level.DEBUG, "Returning chunk bytes: %d", bytes2return); ! returnBuffer = Utils.slice(chunk, bytes2return); ! unfulfilled = bytesremaining -= bytes2return; ! if (unfulfilled == 0) bytesToConsume = 2; ! } ! ! assert unfulfilled >= 0; ! ! if (unfulfilled == 0) { ! debug.log(Level.DEBUG, ! "No more bytes to read - %d yet to consume.", ! unfulfilled); ! // check whether the trailing CRLF is consumed, try to ! // consume it if not. If tryConsumeBytes needs more bytes ! // then we will come back here later - skipping the block ! // that reads data because remaining==0, and finding ! // that the two bytes are now consumed. ! if (tryConsumeBytes(chunk) == 0) { ! // we're done for this chunk! reset all states and ! // prepare to read the next chunk. chunklen = -1; + partialChunklen = 0; + cr = false; + state = ChunkState.READING_LENGTH; + debug.log(Level.DEBUG, "Ready to read next chunk"); } } ! if (returnBuffer == READMORE) { ! debug.log(Level.DEBUG, "Need more data"); } ! return returnBuffer; } ! ! // Attempt to parse and push one hunk from the buffer. ! // Returns true if the final chunk was parsed. ! // Returns false if we need to push more chunks. ! private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out) ! throws IOException { ! assert state != ChunkState.DONE; ! ByteBuffer b1 = tryReadOneHunk(b); if (b1 != null) { + //assert b1.hasRemaining() || b1 == READMORE; if (b1.hasRemaining()) { ! debug.log(Level.DEBUG, "Sending chunk to consumer (%d)", ! b1.remaining()); ! out.add(b1); ! debug.log(Level.DEBUG, "Chunk sent."); } + return false; // we haven't parsed the final chunk yet. } else { ! return true; // we're done! the final chunk was parsed. } } private int toDigit(int b) throws IOException { if (b >= 0x30 && b <= 0x39) {
*** 248,279 **** return b - 0x61 + 10; } throw new IOException("Invalid chunk header byte " + b); } - private void pushBodyFixed(ByteBuffer b) throws IOException { - int remaining = contentLength; - while (b.hasRemaining() && remaining > 0) { - ByteBuffer buffer = Utils.getBuffer(); - int amount = Math.min(b.remaining(), remaining); - Utils.copy(b, buffer, amount); - remaining -= amount; - buffer.flip(); - dataConsumer.accept(Optional.of(buffer)); - } - while (remaining > 0) { - ByteBuffer buffer = connection.read(); - if (buffer == null) - throw new IOException("connection closed"); - - int bytesread = buffer.remaining(); - // assume for now that pipelining not implemented - if (bytesread > remaining) { - throw new IOException("too many bytes read"); } ! remaining -= bytesread; ! dataConsumer.accept(Optional.of(buffer)); } onFinished.run(); ! dataConsumer.accept(Optional.empty()); } } --- 372,463 ---- return b - 0x61 + 10; } throw new IOException("Invalid chunk header byte " + b); } } ! ! class FixedLengthBodyParser implements BodyParser { ! final int contentLength; ! final Consumer<Throwable> onComplete; ! final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); ! final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser"; ! volatile int remaining; ! volatile Throwable closedExceptionally; ! volatile AbstractSubscription sub; ! FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) { ! this.contentLength = this.remaining = contentLength; ! this.onComplete = onComplete; ! } ! ! String dbgString() { ! return dbgTag; ! } ! ! @Override ! public void onSubscribe(AbstractSubscription sub) { ! debug.log(Level.DEBUG, () -> "length=" ! + contentLength +", onSubscribe: " ! + pusher.getClass().getName()); ! pusher.onSubscribe(this.sub = sub); ! try { ! if (contentLength == 0) { ! pusher.onComplete(); ! onComplete.accept(null); ! } ! } catch (Throwable t) { ! closedExceptionally = t; ! try { ! pusher.onError(t); ! } finally { ! onComplete.accept(t); ! } ! } ! } ! ! @Override ! public void accept(ByteBuffer b) { ! if (closedExceptionally != null) { ! debug.log(Level.DEBUG, () -> "already closed: " ! + closedExceptionally); ! return; } + boolean completed = false; + try { + int unfulfilled = remaining; + debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)", + b.remaining(), unfulfilled, contentLength); + assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0; + + if (unfulfilled == 0 && contentLength > 0) return; + + if (b.hasRemaining() && unfulfilled > 0) { + // only reduce demand if we actually push something. + // we would not have come here if there was no + // demand. + boolean hasDemand = sub.demand().tryDecrement(); + assert hasDemand; + int amount = Math.min(b.remaining(), unfulfilled); + unfulfilled = remaining -= amount; + ByteBuffer buffer = Utils.slice(b, amount); + pusher.onNext(List.of(buffer)); + } + if (unfulfilled == 0) { + // We're done! All data has been received. + assert closedExceptionally == null; onFinished.run(); ! pusher.onComplete(); ! completed = true; ! onComplete.accept(closedExceptionally); // should be null ! } else { ! assert b.remaining() == 0; ! } ! } catch (Throwable t) { ! debug.log(Level.DEBUG, "Unexpected exception", t); ! closedExceptionally = t; ! if (!completed) { ! onComplete.accept(t); ! } ! } ! } } }
< prev index next >