1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package jdk.incubator.http.internal.common; 26 27 28 import java.io.Closeable; 29 import java.io.IOException; 30 import java.util.ArrayList; 31 import java.util.Arrays; 32 import java.util.Deque; 33 import java.util.List; 34 import java.util.concurrent.ConcurrentLinkedDeque; 35 import java.util.concurrent.atomic.AtomicInteger; 36 37 public class AsyncWriteQueue implements Closeable { 38 39 @FunctionalInterface 40 public static interface AsyncConsumer { 41 /** 42 * Takes an array of buffer reference and attempt to send the data 43 * downstream. If not all the data can be sent, then push back 44 * to the source queue by calling {@code source.setDelayed(buffers)} 45 * and return false. If all the data was successfully sent downstream 46 * then returns true. 47 * @param buffers An array of ButeBufferReference containing data 48 * to send downstream. 49 * @param source This AsyncWriteQueue. 50 * @return true if all the data could be sent downstream, false otherwise. 51 */ 52 boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source); 53 } 54 55 private static final int IDLE = 0; // nobody is flushing from the queue 56 private static final int FLUSHING = 1; // there is the only thread flushing from the queue 57 private static final int REFLUSHING = 2; // while one thread was flushing from the queue 58 // the other thread put data into the queue. 59 // flushing thread should recheck queue before switching to idle state. 60 private static final int DELAYED = 3; // flushing is delayed 61 // either by PlainHttpConnection.WriteEvent registration, or 62 // SSL handshaking 63 64 private static final int CLOSED = 4; // queue is closed 65 66 private final AtomicInteger state = new AtomicInteger(IDLE); 67 private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>(); 68 private final AsyncConsumer consumeAction; 69 70 // Queue may be processed in two modes: 71 // 1. if(!doFullDrain) - invoke callback on each chunk 72 // 2. if(doFullDrain) - drain the whole queue, merge all chunks into the single array and invoke callback 73 private final boolean doFullDrain; 74 75 private ByteBufferReference[] delayedElement = null; 76 77 public AsyncWriteQueue(AsyncConsumer consumeAction) { 78 this(consumeAction, true); 79 } 80 81 public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) { 82 this.consumeAction = consumeAction; 83 this.doFullDrain = doFullDrain; 84 } 85 86 public void put(ByteBufferReference[] e) throws IOException { 87 ensureOpen(); 88 queue.addLast(e); 89 } 90 91 public void putFirst(ByteBufferReference[] e) throws IOException { 92 ensureOpen(); 93 queue.addFirst(e); 94 } 95 96 /** 97 * retruns true if flushing was performed 98 * @return 99 * @throws IOException 100 */ 101 public boolean flush() throws IOException { 102 while(true) { 103 switch (state.get()) { 104 case IDLE: 105 if(state.compareAndSet(IDLE, FLUSHING)) { 106 flushLoop(); 107 return true; 108 } 109 break; 110 case FLUSHING: 111 if(state.compareAndSet(FLUSHING, REFLUSHING)) { 112 return false; 113 } 114 break; 115 case REFLUSHING: 116 case DELAYED: 117 return false; 118 case CLOSED: 119 throw new IOException("Queue closed"); 120 } 121 } 122 } 123 124 /* 125 * race invocations of flushDelayed are not allowed. 126 * flushDelayed should be invoked only from: 127 * - SelectorManager thread 128 * - Handshaking thread 129 */ 130 public void flushDelayed() throws IOException { 131 ensureOpen(); 132 if(!state.compareAndSet(DELAYED, FLUSHING)) { 133 ensureOpen(); // if CAS failed when close was set - throw proper exception 134 throw new RuntimeException("Shouldn't happen"); 135 } 136 flushLoop(); 137 } 138 139 private ByteBufferReference[] drain(ByteBufferReference[] prev) { 140 assert prev != null; 141 if(doFullDrain) { 142 ByteBufferReference[] next = queue.poll(); 143 if(next == null) { 144 return prev; 145 } 146 List<ByteBufferReference> drained = new ArrayList<>(); 147 drained.addAll(Arrays.asList(prev)); 148 drained.addAll(Arrays.asList(next)); 149 while ((next = queue.poll()) != null) { 150 drained.addAll(Arrays.asList(next)); 151 } 152 return drained.toArray(new ByteBufferReference[0]); 153 } else { 154 return prev; 155 } 156 } 157 158 private ByteBufferReference[] drain() { 159 ByteBufferReference[] next = queue.poll(); 160 return next == null ? null : drain(next); 161 } 162 163 private void flushLoop() throws IOException { 164 ByteBufferReference[] element; 165 if (delayedElement != null) { 166 element = drain(delayedElement); 167 delayedElement = null; 168 } else { 169 element = drain(); 170 } 171 while(true) { 172 while (element != null) { 173 if (!consumeAction.trySend(element, this)) { 174 return; 175 } 176 element = drain(); 177 } 178 switch (state.get()) { 179 case IDLE: 180 case DELAYED: 181 throw new RuntimeException("Shouldn't happen"); 182 case FLUSHING: 183 if(state.compareAndSet(FLUSHING, IDLE)) { 184 return; 185 } 186 break; 187 case REFLUSHING: 188 // We need to check if new elements were put after last poll() and do graceful exit 189 state.compareAndSet(REFLUSHING, FLUSHING); 190 break; 191 case CLOSED: 192 throw new IOException("Queue closed"); 193 } 194 element = drain(); 195 } 196 } 197 198 /* 199 * The methods returns unprocessed chunk of buffers into beginning of the queue. 200 * Invocation of the method allowed only inside consume callback, 201 * and consume callback is invoked only when the queue in FLUSHING or REFLUSHING state. 202 */ 203 public void setDelayed(ByteBufferReference[] delayedElement) throws IOException { 204 while(true) { 205 int state = this.state.get(); 206 switch (state) { 207 case IDLE: 208 case DELAYED: 209 throw new RuntimeException("Shouldn't happen"); 210 case FLUSHING: 211 case REFLUSHING: 212 if(this.state.compareAndSet(state, DELAYED)) { 213 this.delayedElement = delayedElement; 214 return; 215 } 216 break; 217 case CLOSED: 218 throw new IOException("Queue closed"); 219 } 220 } 221 222 } 223 224 private void ensureOpen() throws IOException { 225 if (state.get() == CLOSED) { 226 throw new IOException("Queue closed"); 227 } 228 } 229 230 @Override 231 public void close() throws IOException { 232 state.getAndSet(CLOSED); 233 } 234 235 }