< prev index next >
1 /*
2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 */
24
25 package java.net.http;
26
27 import java.io.Closeable;
28 import java.io.IOException;
29 import java.util.LinkedList;
30
31 // Each stream has one of these for input. Each Http2Connection has one
32 // for output. Can be used blocking or asynchronously.
33
34 class Queue<T> implements Closeable {
35
36 LinkedList<T> q = new LinkedList<>();
37 volatile boolean closed = false;
38 Runnable callback;
39 boolean forceCallback;
40 int waiters; // true if someone waiting
41
42 synchronized void putAll(T[] objs) throws IOException {
43 if (closed) {
44 throw new IOException("stream closed");
45 }
46 boolean wasEmpty = q.isEmpty();
47
48 for (T obj : objs) {
49 q.add(obj);
50 }
51
52 if (waiters > 0)
53 notifyAll();
54
55 if (wasEmpty || forceCallback) {
56 forceCallback = false;
57 if (callback != null) {
58 callback.run();
59 return;
60 }
61 }
62 }
63
64 synchronized int size() {
65 return q.size();
66 }
67
68 synchronized void put(T obj) throws IOException {
69 if (closed) {
70 throw new IOException("stream closed");
71 }
72
73 q.add(obj);
74 if (waiters > 0)
75 notifyAll();
76
77 if (q.size() == 1 || forceCallback) {
78 forceCallback = false;
79 if (callback != null) {
80 callback.run();
81 return;
82 }
83 }
84 }
85
86 /**
87 * callback is invoked any time put is called where
88 * the Queue was empty.
89 *
90 * @param callback
91 */
92 synchronized void registerPutCallback(Runnable callback) {
93 this.callback = callback;
94 if (callback != null && q.size() > 0)
95 callback.run();
96 }
97
98 @Override
99 public synchronized void close() {
100 closed = true;
101 notifyAll();
102 }
103
104 synchronized T take() throws IOException {
105 if (closed) {
106 throw new IOException("stream closed");
107 }
108 try {
109 while (q.size() == 0) {
110 waiters++;
111 wait();
112 waiters--;
113 }
114 T res = q.removeFirst();
115 return res;
116 } catch (InterruptedException ex) {
117 throw new IOException(ex);
118 }
119 }
120
121 public synchronized T poll() throws IOException {
122 if (closed)
123 throw new IOException("stream closed");
124
125 if (q.isEmpty())
126 return null;
127 T res = q.removeFirst();
128 return res;
129 }
130
131 public synchronized T[] pollAll(T[] type) throws IOException {
132 T[] ret = q.toArray(type);
133 q.clear();
134 return ret;
135 }
136
137 public synchronized void pushback(T v) {
138 forceCallback = true;
139 q.addFirst(v);
140 }
141
142 public synchronized void pushbackAll(T[] v) {
143 forceCallback = true;
144 for (int i=v.length-1; i>=0; i--) {
145 q.addFirst(v[i]);
146 }
147 }
148 }
< prev index next >