< prev index next >
1 package java.net.http;
2
3 import java.io.*;
4 import java.nio.ByteBuffer;
5
6 /**
7 * OutputStream. Incoming window updates handled by the main connection
8 * reader thread.
9 */
10 @SuppressWarnings({"rawtypes","unchecked"})
11 class BodyOutputStream extends OutputStream {
12 final static byte[] EMPTY_BARRAY = new byte[0];
13
14 final int streamid;
15 int window;
16 boolean closed;
17 boolean goodToGo = false; // not allowed to send until headers sent
18 final Http2TestServerConnection conn;
19 final Queue outputQ;
20
21 BodyOutputStream(int streamid, int initialWindow, Http2TestServerConnection conn) {
22 this.window = initialWindow;
23 this.streamid = streamid;
24 this.conn = conn;
25 this.outputQ = conn.outputQ;
26 conn.registerStreamWindowUpdater(streamid, this::updateWindow);
27 }
28
29 // called from connection reader thread as all incoming window
30 // updates are handled there.
31 synchronized void updateWindow(int update) {
32 window += update;
33 notifyAll();
34 }
35
36 void waitForWindow(int demand) throws InterruptedException {
37 // first wait for the connection window
38 conn.obtainConnectionWindow(demand);
39 // now wait for the stream window
40 synchronized (this) {
41 while (demand > 0) {
42 int n = Math.min(demand, window);
43 demand -= n;
44 window -= n;
45 if (demand > 0) {
46 wait();
47 }
48 }
49 }
50 }
51
52 void goodToGo() {
53 goodToGo = true;
54 }
55
56 @Override
57 public void write(byte[] buf, int offset, int len) throws IOException {
58 if (closed) {
59 throw new IOException("closed");
60 }
61
62 if (!goodToGo) {
63 throw new IllegalStateException("sendResponseHeaders must be called first");
64 }
65 try {
66 waitForWindow(len);
67 send(buf, offset, len, 0);
68 } catch (InterruptedException ex) {
69 throw new IOException(ex);
70 }
71 }
72
73 private void send(byte[] buf, int offset, int len, int flags) throws IOException {
74 ByteBuffer buffer = ByteBuffer.allocate(len);
75 buffer.put(buf, offset, len);
76 buffer.flip();
77 DataFrame df = new DataFrame();
78 assert streamid != 0;
79 df.streamid(streamid);
80 df.setFlags(flags);
81 df.setData(buffer);
82 outputQ.put(df);
83 }
84
85 byte[] one = new byte[1];
86
87 @Override
88 public void write(int b) throws IOException {
89 one[0] = (byte) b;
90 write(one, 0, 1);
91 }
92
93 @Override
94 public void close() {
95 if (closed) {
96 return;
97 }
98 closed = true;
99 try {
100 send(EMPTY_BARRAY, 0, 0, DataFrame.END_STREAM);
101 } catch (IOException ex) {
102 System.err.println("TestServer: OutputStream.close exception: " + ex);
103 ex.printStackTrace();
104 }
105 }
106 }
< prev index next >