1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package jdk.incubator.http.internal.common; 26 27 28 import java.io.Closeable; 29 import java.io.IOException; 30 import java.nio.ByteBuffer; 31 import java.util.ArrayList; 32 import java.util.Arrays; 33 import java.util.Deque; 34 import java.util.List; 35 import java.util.concurrent.ConcurrentLinkedDeque; 36 import java.util.concurrent.atomic.AtomicInteger; 37 import java.util.function.BiConsumer; 38 39 public class AsyncWriteQueue implements Closeable { 40 41 @FunctionalInterface 42 public static interface AsyncConsumer { 43 /** 44 * Takes an array of buffer reference and attempt to send the data 45 * downstream. If not all the data can be sent, then push back 46 * to the source queue by calling {@code source.setDelayed(buffers)} 47 * and return false. If all the data was successfully sent downstream 48 * then returns true. 49 * @param buffers An array of ButeBufferReference containing data 50 * to send downstream. 51 * @param source This AsyncWriteQueue. 52 * @return true if all the data could be sent downstream, false otherwise. 53 */ 54 boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source); 55 } 56 57 private static final int IDLE = 0; // nobody is flushing from the queue 58 private static final int FLUSHING = 1; // there is the only thread flushing from the queue 59 private static final int REFLUSHING = 2; // while one thread was flushing from the queue 60 // the other thread put data into the queue. 61 // flushing thread should recheck queue before switching to idle state. 62 private static final int DELAYED = 3; // flushing is delayed 63 // either by PlainHttpConnection.WriteEvent registration, or 64 // SSL handshaking 65 66 private static final int CLOSED = 4; // queue is closed 67 68 private final AtomicInteger state = new AtomicInteger(IDLE); 69 private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>(); 70 private final AsyncConsumer consumeAction; 71 72 // Queue may be processed in two modes: 73 // 1. if(!doFullDrain) - invoke callback on each chunk 74 // 2. if(doFullDrain) - drain the whole queue, merge all chunks into the single array and invoke callback 75 private final boolean doFullDrain; 76 77 private ByteBufferReference[] delayedElement = null; 78 79 public AsyncWriteQueue(AsyncConsumer consumeAction) { 80 this(consumeAction, true); 81 } 82 83 public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) { 84 this.consumeAction = consumeAction; 85 this.doFullDrain = doFullDrain; 86 } 87 88 public void put(ByteBufferReference[] e) throws IOException { 89 ensureOpen(); 90 queue.addLast(e); 91 } 92 93 public void putFirst(ByteBufferReference[] e) throws IOException { 94 ensureOpen(); 95 queue.addFirst(e); 96 } 97 98 /** 99 * retruns true if flushing was performed 100 * @return 101 * @throws IOException 102 */ 103 public boolean flush() throws IOException { 104 while(true) { 105 switch (state.get()) { 106 case IDLE: 107 if(state.compareAndSet(IDLE, FLUSHING)) { 108 flushLoop(); 109 return true; 110 } 111 break; 112 case FLUSHING: 113 if(state.compareAndSet(FLUSHING, REFLUSHING)) { 114 return false; 115 } 116 break; 117 case REFLUSHING: 118 case DELAYED: 119 return false; 120 case CLOSED: 121 throw new IOException("Queue closed"); 122 } 123 } 124 } 125 126 /* 127 * race invocations of flushDelayed are not allowed. 128 * flushDelayed should be invoked only from: 129 * - SelectorManager thread 130 * - Handshaking thread 131 */ 132 public void flushDelayed() throws IOException { 133 ensureOpen(); 134 if(!state.compareAndSet(DELAYED, FLUSHING)) { 135 ensureOpen(); // if CAS failed when close was set - throw proper exception 136 throw new RuntimeException("Shouldn't happen"); 137 } 138 flushLoop(); 139 } 140 141 private ByteBufferReference[] drain(ByteBufferReference[] prev) { 142 assert prev != null; 143 if(doFullDrain) { 144 ByteBufferReference[] next = queue.poll(); 145 if(next == null) { 146 return prev; 147 } 148 List<ByteBufferReference> drained = new ArrayList<>(); 149 drained.addAll(Arrays.asList(prev)); 150 drained.addAll(Arrays.asList(next)); 151 while ((next = queue.poll()) != null) { 152 drained.addAll(Arrays.asList(next)); 153 } 154 return drained.toArray(new ByteBufferReference[0]); 155 } else { 156 return prev; 157 } 158 } 159 160 private ByteBufferReference[] drain() { 161 ByteBufferReference[] next = queue.poll(); 162 return next == null ? null : drain(next); 163 } 164 165 private void flushLoop() throws IOException { 166 ByteBufferReference[] element; 167 if (delayedElement != null) { 168 element = drain(delayedElement); 169 delayedElement = null; 170 } else { 171 element = drain(); 172 } 173 while(true) { 174 while (element != null) { 175 if (!consumeAction.trySend(element, this)) { 176 return; 177 } 178 element = drain(); 179 } 180 switch (state.get()) { 181 case IDLE: 182 case DELAYED: 183 throw new RuntimeException("Shouldn't happen"); 184 case FLUSHING: 185 if(state.compareAndSet(FLUSHING, IDLE)) { 186 return; 187 } 188 break; 189 case REFLUSHING: 190 // We need to check if new elements were put after last poll() and do graceful exit 191 state.compareAndSet(REFLUSHING, FLUSHING); 192 break; 193 case CLOSED: 194 throw new IOException("Queue closed"); 195 } 196 element = drain(); 197 } 198 } 199 200 /* 201 * The methods returns unprocessed chunk of buffers into beginning of the queue. 202 * Invocation of the method allowed only inside consume callback, 203 * and consume callback is invoked only when the queue in FLUSHING or REFLUSHING state. 204 */ 205 public void setDelayed(ByteBufferReference[] delayedElement) throws IOException { 206 while(true) { 207 int state = this.state.get(); 208 switch (state) { 209 case IDLE: 210 case DELAYED: 211 throw new RuntimeException("Shouldn't happen"); 212 case FLUSHING: 213 case REFLUSHING: 214 if(this.state.compareAndSet(state, DELAYED)) { 215 this.delayedElement = delayedElement; 216 return; 217 } 218 break; 219 case CLOSED: 220 throw new IOException("Queue closed"); 221 } 222 } 223 224 } 225 226 private void ensureOpen() throws IOException { 227 if (state.get() == CLOSED) { 228 throw new IOException("Queue closed"); 229 } 230 } 231 232 @Override 233 public void close() throws IOException { 234 state.getAndSet(CLOSED); 235 } 236 237 }