< prev index next >
   1 package java.net.http;
   2 
   3 import java.io.*;
   4 import java.nio.ByteBuffer;
   5 
   6 /**
   7  * OutputStream. Incoming window updates handled by the main connection
   8  * reader thread.
   9  */
  10 @SuppressWarnings({"rawtypes","unchecked"})
  11 class BodyOutputStream extends OutputStream {
  12     final static byte[] EMPTY_BARRAY = new byte[0];
  13 
  14     final int streamid;
  15     int window;
  16     boolean closed;
  17     boolean goodToGo = false; // not allowed to send until headers sent
  18     final Http2TestServerConnection conn;
  19     final Queue outputQ;
  20 
  21     BodyOutputStream(int streamid, int initialWindow, Http2TestServerConnection conn) {
  22         this.window = initialWindow;
  23         this.streamid = streamid;
  24         this.conn = conn;
  25         this.outputQ = conn.outputQ;
  26         conn.registerStreamWindowUpdater(streamid, this::updateWindow);
  27     }
  28 
  29     // called from connection reader thread as all incoming window
  30     // updates are handled there.
  31     synchronized void updateWindow(int update) {
  32         window += update;
  33         notifyAll();
  34     }
  35 
  36     void waitForWindow(int demand) throws InterruptedException {
  37         // first wait for the connection window
  38         conn.obtainConnectionWindow(demand);
  39         // now wait for the stream window
  40         synchronized (this) {
  41             while (demand > 0) {
  42                 int n = Math.min(demand, window);
  43                 demand -= n;
  44                 window -= n;
  45                 if (demand > 0) {
  46                     wait();
  47                 }
  48             }
  49         }
  50     }
  51 
  52     void goodToGo() {
  53         goodToGo = true;
  54     }
  55 
  56     @Override
  57     public void write(byte[] buf, int offset, int len) throws IOException {
  58         if (closed) {
  59             throw new IOException("closed");
  60         }
  61 
  62         if (!goodToGo) {
  63             throw new IllegalStateException("sendResponseHeaders must be called first");
  64         }
  65         try {
  66             waitForWindow(len);
  67             send(buf, offset, len, 0);
  68         } catch (InterruptedException ex) {
  69             throw new IOException(ex);
  70         }
  71     }
  72 
  73     private void send(byte[] buf, int offset, int len, int flags) throws IOException {
  74         ByteBuffer buffer = ByteBuffer.allocate(len);
  75         buffer.put(buf, offset, len);
  76         buffer.flip();
  77         DataFrame df = new DataFrame();
  78         assert streamid != 0;
  79         df.streamid(streamid);
  80         df.setFlags(flags);
  81         df.setData(buffer);
  82         outputQ.put(df);
  83     }
  84 
  85     byte[] one = new byte[1];
  86 
  87     @Override
  88     public void write(int b) throws IOException {
  89         one[0] = (byte) b;
  90         write(one, 0, 1);
  91     }
  92 
  93     @Override
  94     public void close() {
  95         if (closed) {
  96             return;
  97         }
  98         closed = true;
  99         try {
 100             send(EMPTY_BARRAY, 0, 0, DataFrame.END_STREAM);
 101         } catch (IOException ex) {
 102             System.err.println("TestServer: OutputStream.close exception: " + ex);
 103             ex.printStackTrace();
 104         }
 105     }
 106 }
< prev index next >