1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 import java.io.IOException; 27 import java.util.LinkedList; 28 import java.util.stream.Stream; 29 30 // Each stream has one of these for input. Each Http2Connection has one 31 // for output. Can be used blocking or asynchronously. 32 33 public class Queue<T> implements ExceptionallyCloseable { 34 35 private final LinkedList<T> q = new LinkedList<>(); 36 private boolean closed = false; 37 private boolean closing = false; 38 private Throwable exception = null; 39 private Runnable callback; 40 private boolean callbackDisabled = false; 41 private int waiters; // true if someone waiting 42 private final T closeSentinel; 43 44 Queue(T closeSentinel) { 45 this.closeSentinel = closeSentinel; 46 } 47 48 public synchronized int size() { 49 return q.size(); 50 } 51 52 public synchronized void put(T obj) throws IOException { 53 if (closed || closing) { 54 throw new IOException("stream closed"); 55 } 56 57 q.add(obj); 58 59 if (waiters > 0) { 60 notifyAll(); 61 } 62 63 if (callbackDisabled) { 64 return; 65 } 66 67 if (q.size() > 0 && callback != null) { 68 // Note: calling callback while holding the lock is 69 // dangerous and may lead to deadlocks. 70 callback.run(); 71 } 72 } 73 74 // Other close() variants are immediate and abortive 75 // This allows whatever is on Q to be processed first. 76 77 public synchronized void orderlyClose() { 78 if (closing || closed) 79 return; 80 try { 81 put(closeSentinel); 82 } catch (IOException e) { 83 e.printStackTrace(); 84 } 85 closing = true; 86 } 87 88 @Override 89 public synchronized void close() { 90 closed = true; 91 notifyAll(); 92 } 93 94 @Override 95 public synchronized void closeExceptionally(Throwable t) { 96 if (exception == null) exception = t; 97 else if (t != null && t != exception) { 98 if (!Stream.of(exception.getSuppressed()) 99 .filter(x -> x == t) 100 .findFirst() 101 .isPresent()) 102 { 103 exception.addSuppressed(t); 104 } 105 } 106 close(); 107 } 108 109 public synchronized T take() throws IOException { 110 if (closed) { 111 throw newIOException("stream closed"); 112 } 113 try { 114 while (q.size() == 0) { 115 waiters++; 116 wait(); 117 if (closed) { 118 throw newIOException("Queue closed"); 119 } 120 waiters--; 121 } 122 T item = q.removeFirst(); 123 if (item.equals(closeSentinel)) { 124 closed = true; 125 assert q.isEmpty(); 126 } 127 return item; 128 } catch (InterruptedException ex) { 129 throw new IOException(ex); 130 } 131 } 132 133 public synchronized T poll() throws IOException { 134 if (closed) { 135 throw newIOException("stream closed"); 136 } 137 138 if (q.isEmpty()) { 139 return null; 140 } 141 return take(); 142 } 143 144 private IOException newIOException(String msg) { 145 if (exception == null) { 146 return new IOException(msg); 147 } else { 148 return new IOException(msg, exception); 149 } 150 } 151 }