< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java

Print this page

        

@@ -1,7 +1,7 @@
 /*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 only, as
  * published by the Free Software Foundation.  Oracle designates this

@@ -23,19 +23,20 @@
  * questions.
  */
 
 package jdk.incubator.http.internal.frame;
 
-import jdk.incubator.http.internal.common.ByteBufferReference;
 import jdk.incubator.http.internal.common.Log;
 import jdk.incubator.http.internal.common.Utils;
 
 import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Queue;
 
 /**
  * Frames Decoder
  * <p>
  * collect buffers until frame decoding is possible,

@@ -44,28 +45,30 @@
  * It's a stateful class due to the fact that FramesDecoder stores buffers inside.
  * Should be allocated only the single instance per connection.
  */
 public class FramesDecoder {
 
-
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    static final System.Logger DEBUG_LOGGER =
+            Utils.getDebugLogger("FramesDecoder"::toString, DEBUG);
 
     @FunctionalInterface
     public interface FrameProcessor {
         void processFrame(Http2Frame frame) throws IOException;
     }
 
     private final FrameProcessor frameProcessor;
     private final int maxFrameSize;
 
-    private ByteBufferReference currentBuffer; // current buffer either null or hasRemaining
+    private ByteBuffer currentBuffer; // current buffer either null or hasRemaining
 
-    private final java.util.Queue<ByteBufferReference> tailBuffers = new ArrayDeque<>();
+    private final Queue<ByteBuffer> tailBuffers = new ArrayDeque<>();
     private int tailSize = 0;
 
     private boolean slicedToDataFrame = false;
 
-    private final List<ByteBufferReference> prepareToRelease = new ArrayList<>();
+    private final List<ByteBuffer> prepareToRelease = new ArrayList<>();
 
     // if true  - Frame Header was parsed (9 bytes consumed) and subsequent fields have meaning
     // otherwise - stopped at frames boundary
     private boolean frameHeaderParsed = false;
     private int frameLength;

@@ -90,68 +93,103 @@
     public FramesDecoder(FrameProcessor frameProcessor, int maxFrameSize) {
         this.frameProcessor = frameProcessor;
         this.maxFrameSize = Math.min(Math.max(16 * 1024, maxFrameSize), 16 * 1024 * 1024 - 1);
     }
 
+    /** Threshold beyond which data is no longer copied into the current buffer,
+     * if that buffer has enough unused space. */
+    private static final int COPY_THRESHOLD = 8192;
+
     /**
-     * put next buffer into queue,
-     * if frame decoding is possible - decode all buffers and invoke FrameProcessor
+     * Adds the data from the given buffer, and performs frame decoding if
+     * possible.   Either 1) appends the data from the given buffer to the
+     * current buffer ( if there is enough unused space ), or 2) adds it to the
+     * next buffer in the queue.
      *
-     * @param buffer
-     * @throws IOException
+     * If there is enough data to perform frame decoding then, all buffers are
+     * decoded and the FrameProcessor is invoked.
      */
-    public void decode(ByteBufferReference buffer) throws IOException {
-        int remaining = buffer.get().remaining();
+    public void decode(ByteBuffer buffer) throws IOException {
+        int remaining = buffer.remaining();
+        DEBUG_LOGGER.log(Level.DEBUG, "decodes: %d", remaining);
         if (remaining > 0) {
             if (currentBuffer == null) {
                 currentBuffer = buffer;
             } else {
+                int limit = currentBuffer.limit();
+                int freeSpace = currentBuffer.capacity() - limit;
+                if (remaining <= COPY_THRESHOLD && freeSpace >= remaining) {
+                    // append the new data to the unused space in the current buffer
+                    ByteBuffer b = buffer;
+                    int position = currentBuffer.position();
+                    currentBuffer.position(limit);
+                    currentBuffer.limit(limit + b.limit());
+                    currentBuffer.put(b);
+                    currentBuffer.position(position);
+                    DEBUG_LOGGER.log(Level.DEBUG, "copied: %d", remaining);
+                } else {
+                    DEBUG_LOGGER.log(Level.DEBUG, "added: %d", remaining);
                 tailBuffers.add(buffer);
                 tailSize += remaining;
             }
         }
+        }
+        DEBUG_LOGGER.log(Level.DEBUG, "Tail size is now: %d, current=",
+                tailSize,
+                (currentBuffer == null ? 0 :
+                   currentBuffer.remaining()));
         Http2Frame frame;
         while ((frame = nextFrame()) != null) {
+            DEBUG_LOGGER.log(Level.DEBUG, "Got frame: %s", frame);
             frameProcessor.processFrame(frame);
             frameProcessed();
         }
     }
 
     private Http2Frame nextFrame() throws IOException {
         while (true) {
             if (currentBuffer == null) {
                 return null; // no data at all
             }
+            long available = currentBuffer.remaining() + tailSize;
             if (!frameHeaderParsed) {
-                if (currentBuffer.get().remaining() + tailSize >= Http2Frame.FRAME_HEADER_SIZE) {
+                if (available >= Http2Frame.FRAME_HEADER_SIZE) {
                     parseFrameHeader();
                     if (frameLength > maxFrameSize) {
                         // connection error
                         return new MalformedFrame(ErrorFrame.FRAME_SIZE_ERROR,
-                                "Frame type("+frameType+") " +"length("+frameLength+") exceeds MAX_FRAME_SIZE("+ maxFrameSize+")");
+                                "Frame type("+frameType+") "
+                                +"length("+frameLength
+                                +") exceeds MAX_FRAME_SIZE("
+                                + maxFrameSize+")");
                     }
                     frameHeaderParsed = true;
                 } else {
-                    return null; // no data for frame header
+                    DEBUG_LOGGER.log(Level.DEBUG,
+                            "Not enough data to parse header, needs: %d, has: %d",
+                            Http2Frame.FRAME_HEADER_SIZE, available);
                 }
             }
+            available = currentBuffer == null ? 0 : currentBuffer.remaining() + tailSize;
             if ((frameLength == 0) ||
-                    (currentBuffer != null && currentBuffer.get().remaining() + tailSize >= frameLength)) {
+                    (currentBuffer != null && available >= frameLength)) {
                 Http2Frame frame = parseFrameBody();
                 frameHeaderParsed = false;
                 // frame == null means we have to skip this frame and try parse next
                 if (frame != null) {
                     return frame;
                 }
             } else {
+                DEBUG_LOGGER.log(Level.DEBUG,
+                        "Not enough data to parse frame body, needs: %d,  has: %d",
+                        frameLength, available);
                 return null;  // no data for the whole frame header
             }
         }
     }
 
     private void frameProcessed() {
-        prepareToRelease.forEach(ByteBufferReference::clear);
         prepareToRelease.clear();
     }
 
     private void parseFrameHeader() throws IOException {
         int x = getInt();

@@ -163,45 +201,42 @@
         // MUST be ignored when receiving.
     }
 
     // move next buffer from tailBuffers to currentBuffer if required
     private void nextBuffer() {
-        if (!currentBuffer.get().hasRemaining()) {
+        if (!currentBuffer.hasRemaining()) {
             if (!slicedToDataFrame) {
                 prepareToRelease.add(currentBuffer);
             }
             slicedToDataFrame = false;
             currentBuffer = tailBuffers.poll();
             if (currentBuffer != null) {
-                tailSize -= currentBuffer.get().remaining();
+                tailSize -= currentBuffer.remaining();
             }
         }
     }
 
     public int getByte() {
-        ByteBuffer buf = currentBuffer.get();
-        int res = buf.get() & 0xff;
+        int res = currentBuffer.get() & 0xff;
         nextBuffer();
         return res;
     }
 
     public int getShort() {
-        ByteBuffer buf = currentBuffer.get();
-        if (buf.remaining() >= 2) {
-            int res = buf.getShort() & 0xffff;
+        if (currentBuffer.remaining() >= 2) {
+            int res = currentBuffer.getShort() & 0xffff;
             nextBuffer();
             return res;
         }
         int val = getByte();
         val = (val << 8) + getByte();
         return val;
     }
 
     public int getInt() {
-        ByteBuffer buf = currentBuffer.get();
-        if (buf.remaining() >= 4) {
-            int res = buf.getInt();
+        if (currentBuffer.remaining() >= 4) {
+            int res = currentBuffer.getInt();
             nextBuffer();
             return res;
         }
         int val = getByte();
         val = (val << 8) + getByte();

@@ -213,51 +248,48 @@
 
     public byte[] getBytes(int n) {
         byte[] bytes = new byte[n];
         int offset = 0;
         while (n > 0) {
-            ByteBuffer buf = currentBuffer.get();
-            int length = Math.min(n, buf.remaining());
-            buf.get(bytes, offset, length);
+            int length = Math.min(n, currentBuffer.remaining());
+            currentBuffer.get(bytes, offset, length);
             offset += length;
             n -= length;
             nextBuffer();
         }
         return bytes;
 
     }
 
-    private ByteBufferReference[] getBuffers(boolean isDataFrame, int bytecount) {
-        List<ByteBufferReference> res = new ArrayList<>();
+    private List<ByteBuffer> getBuffers(boolean isDataFrame, int bytecount) {
+        List<ByteBuffer> res = new ArrayList<>();
         while (bytecount > 0) {
-            ByteBuffer buf = currentBuffer.get();
-            int remaining = buf.remaining();
+            int remaining = currentBuffer.remaining();
             int extract = Math.min(remaining, bytecount);
             ByteBuffer extractedBuf;
             if (isDataFrame) {
-                extractedBuf = Utils.slice(buf, extract);
+                extractedBuf = Utils.slice(currentBuffer, extract);
                 slicedToDataFrame = true;
             } else {
                 // Header frames here
                 // HPACK decoding should performed under lock and immediately after frame decoding.
                 // in that case it is safe to release original buffer,
                 // because of sliced buffer has a very short life
-                extractedBuf = Utils.slice(buf, extract);
+                extractedBuf = Utils.slice(currentBuffer, extract);
             }
-            res.add(ByteBufferReference.of(extractedBuf));
+            res.add(extractedBuf);
             bytecount -= extract;
             nextBuffer();
         }
-        return res.toArray(new ByteBufferReference[0]);
+        return res;
     }
 
     public void skipBytes(int bytecount) {
         while (bytecount > 0) {
-            ByteBuffer buf = currentBuffer.get();
-            int remaining = buf.remaining();
+            int remaining = currentBuffer.remaining();
             int extract = Math.min(remaining, bytecount);
-            buf.position(buf.position() + extract);
+            currentBuffer.position(currentBuffer.position() + extract);
             bytecount -= remaining;
             nextBuffer();
         }
     }
 

@@ -294,16 +326,17 @@
     }
 
     private Http2Frame parseDataFrame(int frameLength, int streamid, int flags) {
         // non-zero stream
         if (streamid == 0) {
-            return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, "zero streamId for DataFrame");
+            return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
+                                      "zero streamId for DataFrame");
         }
         int padLength = 0;
         if ((flags & DataFrame.PADDED) != 0) {
             padLength = getByte();
-            if(padLength >= frameLength) {
+            if (padLength >= frameLength) {
                 return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
                         "the length of the padding is the length of the frame payload or greater");
             }
             frameLength--;
         }

@@ -315,11 +348,12 @@
     }
 
     private Http2Frame parseHeadersFrame(int frameLength, int streamid, int flags) {
         // non-zero stream
         if (streamid == 0) {
-            return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR, "zero streamId for HeadersFrame");
+            return new MalformedFrame(ErrorFrame.PROTOCOL_ERROR,
+                                      "zero streamId for HeadersFrame");
         }
         int padLength = 0;
         if ((flags & HeadersFrame.PADDED) != 0) {
             padLength = getByte();
             frameLength--;
< prev index next >