1 /*
   2  * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.io.*;
  25 import java.nio.ByteBuffer;
  26 
  27 import jdk.incubator.http.internal.common.ByteBufferReference;
  28 import jdk.incubator.http.internal.common.Queue;
  29 import jdk.incubator.http.internal.common.Utils;
  30 import jdk.incubator.http.internal.frame.DataFrame;
  31 import jdk.incubator.http.internal.frame.Http2Frame;
  32 import jdk.incubator.http.internal.frame.ResetFrame;
  33 
  34 /**
  35  * InputStream reads frames off stream q and supplies read demand from any
  36  * DataFrames it finds. Window updates are sent back on the connections send
  37  * q.
  38  */
  39 class BodyInputStream extends InputStream {
  40 
  41     final Queue<Http2Frame> q;
  42     final int streamid;
  43     boolean closed;
  44     boolean eof;
  45     final Http2TestServerConnection conn;
  46 
  47     @SuppressWarnings({"rawtypes","unchecked"})
  48     BodyInputStream(Queue q, int streamid, Http2TestServerConnection conn) {
  49         this.q = q;
  50         this.streamid = streamid;
  51         this.conn = conn;
  52     }
  53 
  54     DataFrame df;
  55     ByteBuffer[] buffers;
  56     ByteBuffer buffer;
  57     int nextIndex = -1;
  58 
  59     private DataFrame getData() throws IOException {
  60         if (eof) {
  61             return null;
  62         }
  63         Http2Frame frame;
  64         do {
  65             frame = q.take();
  66             if (frame.type() == ResetFrame.TYPE) {
  67                 conn.handleStreamReset((ResetFrame) frame); // throws IOException
  68             }
  69             // ignoring others for now Wupdates handled elsewhere
  70             if (frame.type() != DataFrame.TYPE) {
  71                 System.out.println("Ignoring " + frame.toString() + " CHECK THIS");
  72             }
  73         } while (frame.type() != DataFrame.TYPE);
  74         df = (DataFrame) frame;
  75         int len = df.payloadLength();
  76         eof = frame.getFlag(DataFrame.END_STREAM);
  77         // acknowledge
  78         conn.sendWindowUpdates(len, streamid);
  79         return (DataFrame) frame;
  80     }
  81 
  82     // null return means EOF
  83     private ByteBuffer getBuffer() throws IOException {
  84         if (buffer == null || !buffer.hasRemaining()) {
  85             if (nextIndex == -1 || nextIndex == buffers.length) {
  86                 DataFrame df = getData();
  87                 if (df == null) {
  88                     return null;
  89                 }
  90                 ByteBufferReference[] data = df.getData();
  91                 int len = Utils.remaining(data);
  92                 if ((len == 0) && eof) {
  93                     return null;
  94                 }
  95 
  96                 buffers = ByteBufferReference.toBuffers(data);
  97                 nextIndex = 0;
  98             }
  99             buffer = buffers[nextIndex++];
 100         }
 101         return buffer;
 102     }
 103 
 104     @Override
 105     public int read(byte[] buf, int offset, int length) throws IOException {
 106         if (closed) {
 107             throw new IOException("closed");
 108         }
 109         ByteBuffer b = getBuffer();
 110         if (b == null) {
 111             return -1;
 112         }
 113         int remaining = b.remaining();
 114         if (remaining < length) {
 115             length = remaining;
 116         }
 117         b.get(buf, offset, length);
 118         return length;
 119     }
 120 
 121     byte[] one = new byte[1];
 122 
 123     @Override
 124     public int read() throws IOException {
 125         int c = read(one, 0, 1);
 126         if (c == -1) {
 127             return -1;
 128         }
 129         return one[0];
 130     }
 131 
 132     @Override
 133     public void close() {
 134         // TODO reset this stream
 135         closed = true;
 136     }
 137 }