1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package jdk.incubator.http.internal.common;
  26 
  27 
  28 import java.io.Closeable;
  29 import java.io.IOException;
  30 import java.util.ArrayList;
  31 import java.util.Arrays;
  32 import java.util.Deque;
  33 import java.util.List;
  34 import java.util.concurrent.ConcurrentLinkedDeque;
  35 import java.util.concurrent.atomic.AtomicInteger;
  36 
  37 public class AsyncWriteQueue implements Closeable {
  38 
  39     @FunctionalInterface
  40     public static interface AsyncConsumer {
  41         /**
  42          * Takes an array of buffer reference and attempt to send the data
  43          * downstream. If not all the data can be sent, then push back
  44          * to the source queue by calling {@code source.setDelayed(buffers)}
  45          * and return false. If all the data was successfully sent downstream
  46          * then returns true.
  47          * @param buffers An array of ButeBufferReference containing data
  48          *                to send downstream.
  49          * @param source This AsyncWriteQueue.
  50          * @return true if all the data could be sent downstream, false otherwise.
  51          */
  52         boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source);
  53     }
  54 
  55     private static final int IDLE    = 0;     // nobody is flushing from the queue
  56     private static final int FLUSHING = 1;    // there is the only thread flushing from the queue
  57     private static final int REFLUSHING = 2;  // while one thread was flushing from the queue
  58                                               // the other thread put data into the queue.
  59                                               // flushing thread should recheck queue before switching to idle state.
  60     private static final int DELAYED = 3;     // flushing is delayed
  61                                               // either by PlainHttpConnection.WriteEvent registration, or
  62                                               // SSL handshaking
  63 
  64     private static final int CLOSED = 4;      // queue is closed
  65 
  66     private final AtomicInteger state = new AtomicInteger(IDLE);
  67     private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
  68     private final AsyncConsumer consumeAction;
  69 
  70     // Queue may be processed in two modes:
  71     // 1. if(!doFullDrain) - invoke callback on each chunk
  72     // 2. if(doFullDrain)  - drain the whole queue, merge all chunks into the single array and invoke callback
  73     private final boolean doFullDrain;
  74 
  75     private ByteBufferReference[] delayedElement = null;
  76 
  77     public AsyncWriteQueue(AsyncConsumer consumeAction) {
  78         this(consumeAction, true);
  79     }
  80 
  81     public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) {
  82         this.consumeAction = consumeAction;
  83         this.doFullDrain = doFullDrain;
  84     }
  85 
  86     public void put(ByteBufferReference[] e) throws IOException {
  87         ensureOpen();
  88         queue.addLast(e);
  89     }
  90 
  91     public void putFirst(ByteBufferReference[] e) throws IOException {
  92         ensureOpen();
  93         queue.addFirst(e);
  94     }
  95 
  96     /**
  97      * retruns true if flushing was performed
  98      * @return
  99      * @throws IOException
 100      */
 101     public boolean flush() throws IOException {
 102         while(true) {
 103             switch (state.get()) {
 104                 case IDLE:
 105                     if(state.compareAndSet(IDLE, FLUSHING)) {
 106                         flushLoop();
 107                         return true;
 108                     }
 109                     break;
 110                 case FLUSHING:
 111                     if(state.compareAndSet(FLUSHING, REFLUSHING)) {
 112                         return false;
 113                     }
 114                     break;
 115                 case REFLUSHING:
 116                 case DELAYED:
 117                     return false;
 118                 case CLOSED:
 119                     throw new IOException("Queue closed");
 120             }
 121         }
 122     }
 123 
 124     /*
 125      *  race invocations of flushDelayed are not allowed.
 126      *  flushDelayed should be invoked only from:
 127      *   - SelectorManager thread
 128      *   - Handshaking thread
 129      */
 130     public void flushDelayed() throws IOException {
 131         ensureOpen();
 132         if(!state.compareAndSet(DELAYED, FLUSHING)) {
 133             ensureOpen(); // if CAS failed when close was set - throw proper exception
 134             throw new RuntimeException("Shouldn't happen");
 135         }
 136         flushLoop();
 137     }
 138 
 139     private ByteBufferReference[] drain(ByteBufferReference[] prev) {
 140         assert prev != null;
 141         if(doFullDrain) {
 142             ByteBufferReference[] next = queue.poll();
 143             if(next == null) {
 144                 return prev;
 145             }
 146             List<ByteBufferReference> drained = new ArrayList<>();
 147             drained.addAll(Arrays.asList(prev));
 148             drained.addAll(Arrays.asList(next));
 149             while ((next = queue.poll()) != null) {
 150                 drained.addAll(Arrays.asList(next));
 151             }
 152             return drained.toArray(new ByteBufferReference[0]);
 153         } else {
 154             return prev;
 155         }
 156     }
 157 
 158     private ByteBufferReference[] drain() {
 159         ByteBufferReference[] next = queue.poll();
 160         return next == null ? null : drain(next);
 161     }
 162 
 163     private void flushLoop() throws IOException {
 164         ByteBufferReference[] element;
 165         if (delayedElement != null) {
 166             element = drain(delayedElement);
 167             delayedElement = null;
 168         } else {
 169             element = drain();
 170         }
 171         while(true) {
 172             while (element != null) {
 173                 if (!consumeAction.trySend(element, this)) {
 174                     return;
 175                 }
 176                 element = drain();
 177             }
 178             switch (state.get()) {
 179                 case IDLE:
 180                 case DELAYED:
 181                     throw new RuntimeException("Shouldn't happen");
 182                 case FLUSHING:
 183                     if(state.compareAndSet(FLUSHING, IDLE)) {
 184                         return;
 185                     }
 186                     break;
 187                 case REFLUSHING:
 188                     // We need to check if new elements were put after last poll() and do graceful exit
 189                     state.compareAndSet(REFLUSHING, FLUSHING);
 190                     break;
 191                 case CLOSED:
 192                     throw new IOException("Queue closed");
 193             }
 194             element = drain();
 195         }
 196     }
 197 
 198     /*
 199      * The methods returns unprocessed chunk of buffers into beginning of the queue.
 200      * Invocation of the method allowed only inside consume callback,
 201      * and consume callback is invoked only when the queue in FLUSHING or REFLUSHING state.
 202      */
 203     public void setDelayed(ByteBufferReference[] delayedElement) throws IOException {
 204         while(true) {
 205             int state = this.state.get();
 206             switch (state) {
 207                 case IDLE:
 208                 case DELAYED:
 209                     throw new RuntimeException("Shouldn't happen");
 210                 case FLUSHING:
 211                 case REFLUSHING:
 212                     if(this.state.compareAndSet(state, DELAYED)) {
 213                         this.delayedElement = delayedElement;
 214                         return;
 215                     }
 216                     break;
 217                 case CLOSED:
 218                     throw new IOException("Queue closed");
 219             }
 220         }
 221 
 222     }
 223 
 224     private void ensureOpen() throws IOException {
 225         if (state.get() == CLOSED) {
 226             throw new IOException("Queue closed");
 227         }
 228     }
 229 
 230     @Override
 231     public void close() throws IOException {
 232         state.getAndSet(CLOSED);
 233     }
 234 
 235 }