1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package jdk.incubator.http.internal.common;
  26 
  27 
  28 import java.io.Closeable;
  29 import java.io.IOException;
  30 import java.nio.ByteBuffer;
  31 import java.util.ArrayList;
  32 import java.util.Arrays;
  33 import java.util.Deque;
  34 import java.util.List;
  35 import java.util.concurrent.ConcurrentLinkedDeque;
  36 import java.util.concurrent.atomic.AtomicInteger;
  37 import java.util.function.BiConsumer;
  38 
  39 public class AsyncWriteQueue implements Closeable {
  40     
  41     @FunctionalInterface
  42     public static interface AsyncConsumer {
  43         /**
  44          * Takes an array of buffer reference and attempt to send the data
  45          * downstream. If not all the data can be sent, then push back
  46          * to the source queue by calling {@code source.setDelayed(buffers)}
  47          * and return false. If all the data was successfully sent downstream
  48          * then returns true.
  49          * @param buffers An array of ButeBufferReference containing data
  50          *                to send downstream.
  51          * @param source This AsyncWriteQueue.
  52          * @return true if all the data could be sent downstream, false otherwise.
  53          */
  54         boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source);
  55     }
  56 
  57     private static final int IDLE    = 0;     // nobody is flushing from the queue
  58     private static final int FLUSHING = 1;    // there is the only thread flushing from the queue
  59     private static final int REFLUSHING = 2;  // while one thread was flushing from the queue
  60                                               // the other thread put data into the queue.
  61                                               // flushing thread should recheck queue before switching to idle state.
  62     private static final int DELAYED = 3;     // flushing is delayed
  63                                               // either by PlainHttpConnection.WriteEvent registration, or
  64                                               // SSL handshaking
  65 
  66     private static final int CLOSED = 4;      // queue is closed
  67 
  68     private final AtomicInteger state = new AtomicInteger(IDLE);
  69     private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
  70     private final AsyncConsumer consumeAction;
  71 
  72     // Queue may be processed in two modes:
  73     // 1. if(!doFullDrain) - invoke callback on each chunk
  74     // 2. if(doFullDrain)  - drain the whole queue, merge all chunks into the single array and invoke callback
  75     private final boolean doFullDrain;
  76 
  77     private ByteBufferReference[] delayedElement = null;
  78 
  79     public AsyncWriteQueue(AsyncConsumer consumeAction) {
  80         this(consumeAction, true);
  81     }
  82 
  83     public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) {
  84         this.consumeAction = consumeAction;
  85         this.doFullDrain = doFullDrain;
  86     }
  87 
  88     public void put(ByteBufferReference[] e) throws IOException {
  89         ensureOpen();
  90         queue.addLast(e);
  91     }
  92 
  93     public void putFirst(ByteBufferReference[] e) throws IOException {
  94         ensureOpen();
  95         queue.addFirst(e);
  96     }
  97 
  98     /**
  99      * retruns true if flushing was performed
 100      * @return
 101      * @throws IOException
 102      */
 103     public boolean flush() throws IOException {
 104         while(true) {
 105             switch (state.get()) {
 106                 case IDLE:
 107                     if(state.compareAndSet(IDLE, FLUSHING)) {
 108                         flushLoop();
 109                         return true;
 110                     }
 111                     break;
 112                 case FLUSHING:
 113                     if(state.compareAndSet(FLUSHING, REFLUSHING)) {
 114                         return false;
 115                     }
 116                     break;
 117                 case REFLUSHING:
 118                 case DELAYED:
 119                     return false;
 120                 case CLOSED:
 121                     throw new IOException("Queue closed");
 122             }
 123         }
 124     }
 125 
 126     /*
 127      *  race invocations of flushDelayed are not allowed.
 128      *  flushDelayed should be invoked only from:
 129      *   - SelectorManager thread
 130      *   - Handshaking thread
 131      */
 132     public void flushDelayed() throws IOException {
 133         ensureOpen();
 134         if(!state.compareAndSet(DELAYED, FLUSHING)) {
 135             ensureOpen(); // if CAS failed when close was set - throw proper exception
 136             throw new RuntimeException("Shouldn't happen");
 137         }
 138         flushLoop();
 139     }
 140 
 141     private ByteBufferReference[] drain(ByteBufferReference[] prev) {
 142         assert prev != null;
 143         if(doFullDrain) {
 144             ByteBufferReference[] next = queue.poll();
 145             if(next == null) {
 146                 return prev;
 147             }
 148             List<ByteBufferReference> drained = new ArrayList<>();
 149             drained.addAll(Arrays.asList(prev));
 150             drained.addAll(Arrays.asList(next));
 151             while ((next = queue.poll()) != null) {
 152                 drained.addAll(Arrays.asList(next));
 153             }
 154             return drained.toArray(new ByteBufferReference[0]);
 155         } else {
 156             return prev;
 157         }
 158     }
 159 
 160     private ByteBufferReference[] drain() {
 161         ByteBufferReference[] next = queue.poll();
 162         return next == null ? null : drain(next);
 163     }
 164 
 165     private void flushLoop() throws IOException {
 166         ByteBufferReference[] element;
 167         if (delayedElement != null) {
 168             element = drain(delayedElement);
 169             delayedElement = null;
 170         } else {
 171             element = drain();
 172         }
 173         while(true) {
 174             while (element != null) {
 175                 if (!consumeAction.trySend(element, this)) {
 176                     return;
 177                 }
 178                 element = drain();
 179             }
 180             switch (state.get()) {
 181                 case IDLE:
 182                 case DELAYED:
 183                     throw new RuntimeException("Shouldn't happen");
 184                 case FLUSHING:
 185                     if(state.compareAndSet(FLUSHING, IDLE)) {
 186                         return;
 187                     }
 188                     break;
 189                 case REFLUSHING:
 190                     // We need to check if new elements were put after last poll() and do graceful exit
 191                     state.compareAndSet(REFLUSHING, FLUSHING);
 192                     break;
 193                 case CLOSED:
 194                     throw new IOException("Queue closed");
 195             }
 196             element = drain();
 197         }
 198     }
 199 
 200     /*
 201      * The methods returns unprocessed chunk of buffers into beginning of the queue.
 202      * Invocation of the method allowed only inside consume callback,
 203      * and consume callback is invoked only when the queue in FLUSHING or REFLUSHING state.
 204      */
 205     public void setDelayed(ByteBufferReference[] delayedElement) throws IOException {
 206         while(true) {
 207             int state = this.state.get();
 208             switch (state) {
 209                 case IDLE:
 210                 case DELAYED:
 211                     throw new RuntimeException("Shouldn't happen");
 212                 case FLUSHING:
 213                 case REFLUSHING:
 214                     if(this.state.compareAndSet(state, DELAYED)) {
 215                         this.delayedElement = delayedElement;
 216                         return;
 217                     }
 218                     break;
 219                 case CLOSED:
 220                     throw new IOException("Queue closed");
 221             }
 222         }
 223 
 224     }
 225 
 226     private void ensureOpen() throws IOException {
 227         if (state.get() == CLOSED) {
 228             throw new IOException("Queue closed");
 229         }
 230     }
 231 
 232     @Override
 233     public void close() throws IOException {
 234         state.getAndSet(CLOSED);
 235     }
 236 
 237 }