1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 */ 24 25 package java.net.http; 26 27 import java.io.Closeable; 28 import java.io.IOException; 29 import java.util.LinkedList; 30 31 // Each stream has one of these for input. Each Http2Connection has one 32 // for output. Can be used blocking or asynchronously. 33 34 class Queue<T> implements Closeable { 35 36 private final LinkedList<T> q = new LinkedList<>(); 37 private volatile boolean closed = false; 38 private Runnable callback; 39 private boolean forceCallback; 40 private int waiters; // true if someone waiting 41 42 synchronized void putAll(T[] objs) throws IOException { 43 if (closed) { 44 throw new IOException("stream closed"); 45 } 46 boolean wasEmpty = q.isEmpty(); 47 48 for (T obj : objs) { 49 q.add(obj); 50 } 51 52 if (waiters > 0) 53 notifyAll(); 54 55 if (wasEmpty || forceCallback) { 56 forceCallback = false; 57 if (callback != null) { 58 callback.run(); 59 } 60 } 61 } 62 63 synchronized int size() { 64 return q.size(); 65 } 66 67 synchronized void put(T obj) throws IOException { 68 if (closed) { 69 throw new IOException("stream closed"); 70 } 71 72 q.add(obj); 73 if (waiters > 0) 74 notifyAll(); 75 76 if (q.size() == 1 || forceCallback) { 77 forceCallback = false; 78 if (callback != null) { 79 callback.run(); 80 } 81 } 82 } 83 84 /** 85 * callback is invoked any time put is called where 86 * the Queue was empty. 87 */ 88 synchronized void registerPutCallback(Runnable callback) { 89 this.callback = callback; 90 if (callback != null && q.size() > 0) 91 callback.run(); 92 } 93 94 @Override 95 public synchronized void close() { 96 closed = true; 97 notifyAll(); 98 } 99 100 synchronized T take() throws IOException { 101 if (closed) { 102 throw new IOException("stream closed"); 103 } 104 try { 105 while (q.size() == 0) { 106 waiters++; 107 wait(); 108 if (closed) 109 throw new IOException("Queue closed"); 110 waiters--; 111 } 112 return q.removeFirst(); 113 } catch (InterruptedException ex) { 114 throw new IOException(ex); 115 } 116 } 117 118 public synchronized T poll() throws IOException { 119 if (closed) 120 throw new IOException("stream closed"); 121 122 if (q.isEmpty()) 123 return null; 124 T res = q.removeFirst(); 125 return res; 126 } 127 128 public synchronized T[] pollAll(T[] type) throws IOException { 129 T[] ret = q.toArray(type); 130 q.clear(); 131 return ret; 132 } 133 134 public synchronized void pushback(T v) { 135 forceCallback = true; 136 q.addFirst(v); 137 } 138 139 public synchronized void pushbackAll(T[] v) { 140 forceCallback = true; 141 for (int i=v.length-1; i>=0; i--) { 142 q.addFirst(v[i]); 143 } 144 } 145 }