--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java 2017-11-30 04:04:20.833962854 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java 2017-11-30 04:04:20.622944406 -0800 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,15 +25,16 @@ package jdk.incubator.http.internal.frame; -import jdk.incubator.http.internal.common.ByteBufferReference; import jdk.incubator.http.internal.common.Log; import jdk.incubator.http.internal.common.Utils; import java.io.IOException; +import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; +import java.util.Queue; /** * Frames Decoder @@ -46,7 +47,9 @@ */ public class FramesDecoder { - + static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. + static final System.Logger DEBUG_LOGGER = + Utils.getDebugLogger("FramesDecoder"::toString, DEBUG); @FunctionalInterface public interface FrameProcessor { @@ -56,14 +59,14 @@ private final FrameProcessor frameProcessor; private final int maxFrameSize; - private ByteBufferReference currentBuffer; // current buffer either null or hasRemaining + private ByteBuffer currentBuffer; // current buffer either null or hasRemaining - private final java.util.Queue tailBuffers = new ArrayDeque<>(); + private final Queue tailBuffers = new ArrayDeque<>(); private int tailSize = 0; private boolean slicedToDataFrame = false; - private final List prepareToRelease = new ArrayList<>(); + private final List prepareToRelease = new ArrayList<>(); // if true - Frame Header was parsed (9 bytes consumed) and subsequent fields have meaning // otherwise - stopped at frames boundary @@ -92,25 +95,51 @@ this.maxFrameSize = Math.min(Math.max(16 * 1024, maxFrameSize), 16 * 1024 * 1024 - 1); } + /** Threshold beyond which data is no longer copied into the current buffer, + * if that buffer has enough unused space. */ + private static final int COPY_THRESHOLD = 8192; + /** - * put next buffer into queue, - * if frame decoding is possible - decode all buffers and invoke FrameProcessor + * Adds the data from the given buffer, and performs frame decoding if + * possible. Either 1) appends the data from the given buffer to the + * current buffer ( if there is enough unused space ), or 2) adds it to the + * next buffer in the queue. * - * @param buffer - * @throws IOException + * If there is enough data to perform frame decoding then, all buffers are + * decoded and the FrameProcessor is invoked. */ - public void decode(ByteBufferReference buffer) throws IOException { - int remaining = buffer.get().remaining(); + public void decode(ByteBuffer buffer) throws IOException { + int remaining = buffer.remaining(); + DEBUG_LOGGER.log(Level.DEBUG, "decodes: %d", remaining); if (remaining > 0) { if (currentBuffer == null) { currentBuffer = buffer; } else { - tailBuffers.add(buffer); - tailSize += remaining; + int limit = currentBuffer.limit(); + int freeSpace = currentBuffer.capacity() - limit; + if (remaining <= COPY_THRESHOLD && freeSpace >= remaining) { + // append the new data to the unused space in the current buffer + ByteBuffer b = buffer; + int position = currentBuffer.position(); + currentBuffer.position(limit); + currentBuffer.limit(limit + b.limit()); + currentBuffer.put(b); + currentBuffer.position(position); + DEBUG_LOGGER.log(Level.DEBUG, "copied: %d", remaining); + } else { + DEBUG_LOGGER.log(Level.DEBUG, "added: %d", remaining); + tailBuffers.add(buffer); + tailSize += remaining; + } } } + DEBUG_LOGGER.log(Level.DEBUG, "Tail size is now: %d, current=", + tailSize, + (currentBuffer == null ? 0 : + currentBuffer.remaining())); Http2Frame frame; while ((frame = nextFrame()) != null) { + DEBUG_LOGGER.log(Level.DEBUG, "Got frame: %s", frame); frameProcessor.processFrame(frame); frameProcessed(); } @@ -121,21 +150,28 @@ if (currentBuffer == null) { return null; // no data at all } + long available = currentBuffer.remaining() + tailSize; if (!frameHeaderParsed) { - if (currentBuffer.get().remaining() + tailSize >= Http2Frame.FRAME_HEADER_SIZE) { + if (available >= Http2Frame.FRAME_HEADER_SIZE) { parseFrameHeader(); if (frameLength > maxFrameSize) { // connection error return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR, - "Frame type("+frameType+") " +"length("+frameLength+") exceeds MAX_FRAME_SIZE("+ maxFrameSize+")"); + "Frame type("+frameType+") " + +"length("+frameLength + +") exceeds MAX_FRAME_SIZE(" + + maxFrameSize+")"); } frameHeaderParsed = true; } else { - return null; // no data for frame header + DEBUG_LOGGER.log(Level.DEBUG, + "Not enough data to parse header, needs: %d, has: %d", + Http2Frame.FRAME_HEADER_SIZE, available); } } + available = currentBuffer == null ? 0 : currentBuffer.remaining() + tailSize; if ((frameLength == 0) || - (currentBuffer != null && currentBuffer.get().remaining() + tailSize >= frameLength)) { + (currentBuffer != null && available >= frameLength)) { Http2Frame frame = parseFrameBody(); frameHeaderParsed = false; // frame == null means we have to skip this frame and try parse next @@ -143,13 +179,15 @@ return frame; } } else { + DEBUG_LOGGER.log(Level.DEBUG, + "Not enough data to parse frame body, needs: %d, has: %d", + frameLength, available); return null; // no data for the whole frame header } } } private void frameProcessed() { - prepareToRelease.forEach(ByteBufferReference::clear); prepareToRelease.clear(); } @@ -165,29 +203,27 @@ // move next buffer from tailBuffers to currentBuffer if required private void nextBuffer() { - if (!currentBuffer.get().hasRemaining()) { + if (!currentBuffer.hasRemaining()) { if (!slicedToDataFrame) { prepareToRelease.add(currentBuffer); } slicedToDataFrame = false; currentBuffer = tailBuffers.poll(); if (currentBuffer != null) { - tailSize -= currentBuffer.get().remaining(); + tailSize -= currentBuffer.remaining(); } } } public int getByte() { - ByteBuffer buf = currentBuffer.get(); - int res = buf.get() & 0xff; + int res = currentBuffer.get() & 0xff; nextBuffer(); return res; } public int getShort() { - ByteBuffer buf = currentBuffer.get(); - if (buf.remaining() >= 2) { - int res = buf.getShort() & 0xffff; + if (currentBuffer.remaining() >= 2) { + int res = currentBuffer.getShort() & 0xffff; nextBuffer(); return res; } @@ -197,9 +233,8 @@ } public int getInt() { - ByteBuffer buf = currentBuffer.get(); - if (buf.remaining() >= 4) { - int res = buf.getInt(); + if (currentBuffer.remaining() >= 4) { + int res = currentBuffer.getInt(); nextBuffer(); return res; } @@ -215,9 +250,8 @@ byte[] bytes = new byte[n]; int offset = 0; while (n > 0) { - ByteBuffer buf = currentBuffer.get(); - int length = Math.min(n, buf.remaining()); - buf.get(bytes, offset, length); + int length = Math.min(n, currentBuffer.remaining()); + currentBuffer.get(bytes, offset, length); offset += length; n -= length; nextBuffer(); @@ -226,36 +260,34 @@ } - private ByteBufferReference[] getBuffers(boolean isDataFrame, int bytecount) { - List res = new ArrayList<>(); + private List getBuffers(boolean isDataFrame, int bytecount) { + List res = new ArrayList<>(); while (bytecount > 0) { - ByteBuffer buf = currentBuffer.get(); - int remaining = buf.remaining(); + int remaining = currentBuffer.remaining(); int extract = Math.min(remaining, bytecount); ByteBuffer extractedBuf; if (isDataFrame) { - extractedBuf = Utils.slice(buf, extract); + extractedBuf = Utils.slice(currentBuffer, extract); slicedToDataFrame = true; } else { // Header frames here // HPACK decoding should performed under lock and immediately after frame decoding. // in that case it is safe to release original buffer, // because of sliced buffer has a very short life - extractedBuf = Utils.slice(buf, extract); + extractedBuf = Utils.slice(currentBuffer, extract); } - res.add(ByteBufferReference.of(extractedBuf)); + res.add(extractedBuf); bytecount -= extract; nextBuffer(); } - return res.toArray(new ByteBufferReference[0]); + return res; } public void skipBytes(int bytecount) { while (bytecount > 0) { - ByteBuffer buf = currentBuffer.get(); - int remaining = buf.remaining(); + int remaining = currentBuffer.remaining(); int extract = Math.min(remaining, bytecount); - buf.position(buf.position() + extract); + currentBuffer.position(currentBuffer.position() + extract); bytecount -= remaining; nextBuffer(); } @@ -296,12 +328,13 @@ private Http2Frame parseDataFrame(int frameLength, int streamid, int flags) { // non-zero stream if (streamid == 0) { - return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, "zero streamId for DataFrame"); + return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, + "zero streamId for DataFrame"); } int padLength = 0; if ((flags & DataFrame.PADDED) != 0) { padLength = getByte(); - if(padLength >= frameLength) { + if (padLength >= frameLength) { return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, "the length of the padding is the length of the frame payload or greater"); } @@ -317,7 +350,8 @@ private Http2Frame parseHeadersFrame(int frameLength, int streamid, int flags) { // non-zero stream if (streamid == 0) { - return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, "zero streamId for HeadersFrame"); + return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, + "zero streamId for HeadersFrame"); } int padLength = 0; if ((flags & HeadersFrame.PADDED) != 0) {