< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java
Print this page
@@ -1,7 +1,7 @@
/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
@@ -23,19 +23,20 @@
* questions.
*/
package jdk.incubator.http.internal.frame;
-import jdk.incubator.http.internal.common.ByteBufferReference;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.Utils;
import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
/**
* Frames Decoder
* <p>
* collect buffers until frame decoding is possible,
@@ -44,28 +45,30 @@
* It's a stateful class due to the fact that FramesDecoder stores buffers inside.
* Should be allocated only the single instance per connection.
*/
public class FramesDecoder {
-
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ static final System.Logger DEBUG_LOGGER =
+ Utils.getDebugLogger("FramesDecoder"::toString, DEBUG);
@FunctionalInterface
public interface FrameProcessor {
void processFrame(Http2Frame frame) throws IOException;
}
private final FrameProcessor frameProcessor;
private final int maxFrameSize;
- private ByteBufferReference currentBuffer; // current buffer either null or hasRemaining
+ private ByteBuffer currentBuffer; // current buffer either null or hasRemaining
- private final java.util.Queue<ByteBufferReference> tailBuffers = new ArrayDeque<>();
+ private final Queue<ByteBuffer> tailBuffers = new ArrayDeque<>();
private int tailSize = 0;
private boolean slicedToDataFrame = false;
- private final List<ByteBufferReference> prepareToRelease = new ArrayList<>();
+ private final List<ByteBuffer> prepareToRelease = new ArrayList<>();
// if true - Frame Header was parsed (9 bytes consumed) and subsequent fields have meaning
// otherwise - stopped at frames boundary
private boolean frameHeaderParsed = false;
private int frameLength;
@@ -90,68 +93,103 @@
public FramesDecoder(FrameProcessor frameProcessor, int maxFrameSize) {
this.frameProcessor = frameProcessor;
this.maxFrameSize = Math.min(Math.max(16 * 1024, maxFrameSize), 16 * 1024 * 1024 - 1);
}
+ /** Threshold beyond which data is no longer copied into the current buffer,
+ * if that buffer has enough unused space. */
+ private static final int COPY_THRESHOLD = 8192;
+
/**
- * put next buffer into queue,
- * if frame decoding is possible - decode all buffers and invoke FrameProcessor
+ * Adds the data from the given buffer, and performs frame decoding if
+ * possible. Either 1) appends the data from the given buffer to the
+ * current buffer ( if there is enough unused space ), or 2) adds it to the
+ * next buffer in the queue.
*
- * @param buffer
- * @throws IOException
+ * If there is enough data to perform frame decoding then, all buffers are
+ * decoded and the FrameProcessor is invoked.
*/
- public void decode(ByteBufferReference buffer) throws IOException {
- int remaining = buffer.get().remaining();
+ public void decode(ByteBuffer buffer) throws IOException {
+ int remaining = buffer.remaining();
+ DEBUG_LOGGER.log(Level.DEBUG, "decodes: %d", remaining);
if (remaining > 0) {
if (currentBuffer == null) {
currentBuffer = buffer;
} else {
+ int limit = currentBuffer.limit();
+ int freeSpace = currentBuffer.capacity() - limit;
+ if (remaining <= COPY_THRESHOLD && freeSpace >= remaining) {
+ // append the new data to the unused space in the current buffer
+ ByteBuffer b = buffer;
+ int position = currentBuffer.position();
+ currentBuffer.position(limit);
+ currentBuffer.limit(limit + b.limit());
+ currentBuffer.put(b);
+ currentBuffer.position(position);
+ DEBUG_LOGGER.log(Level.DEBUG, "copied: %d", remaining);
+ } else {
+ DEBUG_LOGGER.log(Level.DEBUG, "added: %d", remaining);
tailBuffers.add(buffer);
tailSize += remaining;
}
}
+ }
+ DEBUG_LOGGER.log(Level.DEBUG, "Tail size is now: %d, current=",
+ tailSize,
+ (currentBuffer == null ? 0 :
+ currentBuffer.remaining()));
Http2Frame frame;
while ((frame = nextFrame()) != null) {
+ DEBUG_LOGGER.log(Level.DEBUG, "Got frame: %s", frame);
frameProcessor.processFrame(frame);
frameProcessed();
}
}
private Http2Frame nextFrame() throws IOException {
while (true) {
if (currentBuffer == null) {
return null; // no data at all
}
+ long available = currentBuffer.remaining() + tailSize;
if (!frameHeaderParsed) {
- if (currentBuffer.get().remaining() + tailSize >= Http2Frame.FRAME_HEADER_SIZE) {
+ if (available >= Http2Frame.FRAME_HEADER_SIZE) {
parseFrameHeader();
if (frameLength > maxFrameSize) {
// connection error
return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR,
- "Frame type("+frameType+") " +"length("+frameLength+") exceeds MAX_FRAME_SIZE("+ maxFrameSize+")");
+ "Frame type("+frameType+") "
+ +"length("+frameLength
+ +") exceeds MAX_FRAME_SIZE("
+ + maxFrameSize+")");
}
frameHeaderParsed = true;
} else {
- return null; // no data for frame header
+ DEBUG_LOGGER.log(Level.DEBUG,
+ "Not enough data to parse header, needs: %d, has: %d",
+ Http2Frame.FRAME_HEADER_SIZE, available);
}
}
+ available = currentBuffer == null ? 0 : currentBuffer.remaining() + tailSize;
if ((frameLength == 0) ||
- (currentBuffer != null && currentBuffer.get().remaining() + tailSize >= frameLength)) {
+ (currentBuffer != null && available >= frameLength)) {
Http2Frame frame = parseFrameBody();
frameHeaderParsed = false;
// frame == null means we have to skip this frame and try parse next
if (frame != null) {
return frame;
}
} else {
+ DEBUG_LOGGER.log(Level.DEBUG,
+ "Not enough data to parse frame body, needs: %d, has: %d",
+ frameLength, available);
return null; // no data for the whole frame header
}
}
}
private void frameProcessed() {
- prepareToRelease.forEach(ByteBufferReference::clear);
prepareToRelease.clear();
}
private void parseFrameHeader() throws IOException {
int x = getInt();
@@ -163,45 +201,42 @@
// MUST be ignored when receiving.
}
// move next buffer from tailBuffers to currentBuffer if required
private void nextBuffer() {
- if (!currentBuffer.get().hasRemaining()) {
+ if (!currentBuffer.hasRemaining()) {
if (!slicedToDataFrame) {
prepareToRelease.add(currentBuffer);
}
slicedToDataFrame = false;
currentBuffer = tailBuffers.poll();
if (currentBuffer != null) {
- tailSize -= currentBuffer.get().remaining();
+ tailSize -= currentBuffer.remaining();
}
}
}
public int getByte() {
- ByteBuffer buf = currentBuffer.get();
- int res = buf.get() & 0xff;
+ int res = currentBuffer.get() & 0xff;
nextBuffer();
return res;
}
public int getShort() {
- ByteBuffer buf = currentBuffer.get();
- if (buf.remaining() >= 2) {
- int res = buf.getShort() & 0xffff;
+ if (currentBuffer.remaining() >= 2) {
+ int res = currentBuffer.getShort() & 0xffff;
nextBuffer();
return res;
}
int val = getByte();
val = (val << 8) + getByte();
return val;
}
public int getInt() {
- ByteBuffer buf = currentBuffer.get();
- if (buf.remaining() >= 4) {
- int res = buf.getInt();
+ if (currentBuffer.remaining() >= 4) {
+ int res = currentBuffer.getInt();
nextBuffer();
return res;
}
int val = getByte();
val = (val << 8) + getByte();
@@ -213,51 +248,48 @@
public byte[] getBytes(int n) {
byte[] bytes = new byte[n];
int offset = 0;
while (n > 0) {
- ByteBuffer buf = currentBuffer.get();
- int length = Math.min(n, buf.remaining());
- buf.get(bytes, offset, length);
+ int length = Math.min(n, currentBuffer.remaining());
+ currentBuffer.get(bytes, offset, length);
offset += length;
n -= length;
nextBuffer();
}
return bytes;
}
- private ByteBufferReference[] getBuffers(boolean isDataFrame, int bytecount) {
- List<ByteBufferReference> res = new ArrayList<>();
+ private List<ByteBuffer> getBuffers(boolean isDataFrame, int bytecount) {
+ List<ByteBuffer> res = new ArrayList<>();
while (bytecount > 0) {
- ByteBuffer buf = currentBuffer.get();
- int remaining = buf.remaining();
+ int remaining = currentBuffer.remaining();
int extract = Math.min(remaining, bytecount);
ByteBuffer extractedBuf;
if (isDataFrame) {
- extractedBuf = Utils.slice(buf, extract);
+ extractedBuf = Utils.slice(currentBuffer, extract);
slicedToDataFrame = true;
} else {
// Header frames here
// HPACK decoding should performed under lock and immediately after frame decoding.
// in that case it is safe to release original buffer,
// because of sliced buffer has a very short life
- extractedBuf = Utils.slice(buf, extract);
+ extractedBuf = Utils.slice(currentBuffer, extract);
}
- res.add(ByteBufferReference.of(extractedBuf));
+ res.add(extractedBuf);
bytecount -= extract;
nextBuffer();
}
- return res.toArray(new ByteBufferReference[0]);
+ return res;
}
public void skipBytes(int bytecount) {
while (bytecount > 0) {
- ByteBuffer buf = currentBuffer.get();
- int remaining = buf.remaining();
+ int remaining = currentBuffer.remaining();
int extract = Math.min(remaining, bytecount);
- buf.position(buf.position() + extract);
+ currentBuffer.position(currentBuffer.position() + extract);
bytecount -= remaining;
nextBuffer();
}
}
@@ -294,16 +326,17 @@
}
private Http2Frame parseDataFrame(int frameLength, int streamid, int flags) {
// non-zero stream
if (streamid == 0) {
- return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, "zero streamId for DataFrame");
+ return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
+ "zero streamId for DataFrame");
}
int padLength = 0;
if ((flags & DataFrame.PADDED) != 0) {
padLength = getByte();
- if(padLength >= frameLength) {
+ if (padLength >= frameLength) {
return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
"the length of the padding is the length of the frame payload or greater");
}
frameLength--;
}
@@ -315,11 +348,12 @@
}
private Http2Frame parseHeadersFrame(int frameLength, int streamid, int flags) {
// non-zero stream
if (streamid == 0) {
- return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, "zero streamId for HeadersFrame");
+ return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
+ "zero streamId for HeadersFrame");
}
int padLength = 0;
if ((flags & HeadersFrame.PADDED) != 0) {
padLength = getByte();
frameLength--;
< prev index next >