< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java

Print this page




  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package jdk.incubator.http.internal.common;
  26 
  27 
  28 import java.io.Closeable;
  29 import java.io.IOException;
  30 import java.nio.ByteBuffer;
  31 import java.util.ArrayList;
  32 import java.util.Arrays;
  33 import java.util.Deque;
  34 import java.util.List;
  35 import java.util.concurrent.ConcurrentLinkedDeque;
  36 import java.util.concurrent.atomic.AtomicInteger;
  37 import java.util.function.BiConsumer;
  38 
  39 public class AsyncWriteQueue implements Closeable {
  40 
















  41     private static final int IDLE    = 0;     // nobody is flushing from the queue
  42     private static final int FLUSHING = 1;    // there is the only thread flushing from the queue
  43     private static final int REFLUSHING = 2;  // while one thread was flushing from the queue
  44                                               // the other thread put data into the queue.
  45                                               // flushing thread should recheck queue before switching to idle state.
  46     private static final int DELAYED = 3;     // flushing is delayed
  47                                               // either by PlainHttpConnection.WriteEvent registration, or
  48                                               // SSL handshaking
  49 
  50     private static final int CLOSED = 4;      // queue is closed
  51 
  52     private final AtomicInteger state = new AtomicInteger(IDLE);
  53     private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
  54     private final BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction;
  55 
  56     // Queue may be processed in two modes:
  57     // 1. if(!doFullDrain) - invoke callback on each chunk
  58     // 2. if(doFullDrain)  - drain the whole queue, merge all chunks into the single array and invoke callback
  59     private final boolean doFullDrain;
  60 
  61     private ByteBufferReference[] delayedElement = null;
  62 
  63     public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction) {
  64         this(consumeAction, true);
  65     }
  66 
  67     public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction, boolean doFullDrain) {
  68         this.consumeAction = consumeAction;
  69         this.doFullDrain = doFullDrain;
  70     }
  71 
  72     public void put(ByteBufferReference[] e) throws IOException {
  73         ensureOpen();
  74         queue.addLast(e);
  75     }
  76 
  77     public void putFirst(ByteBufferReference[] e) throws IOException {
  78         ensureOpen();
  79         queue.addFirst(e);
  80     }
  81 
  82     /**
  83      * retruns true if flushing was performed
  84      * @return
  85      * @throws IOException
  86      */
  87     public boolean flush() throws IOException {


 139         } else {
 140             return prev;
 141         }
 142     }
 143 
 144     private ByteBufferReference[] drain() {
 145         ByteBufferReference[] next = queue.poll();
 146         return next == null ? null : drain(next);
 147     }
 148 
 149     private void flushLoop() throws IOException {
 150         ByteBufferReference[] element;
 151         if (delayedElement != null) {
 152             element = drain(delayedElement);
 153             delayedElement = null;
 154         } else {
 155             element = drain();
 156         }
 157         while(true) {
 158             while (element != null) {
 159                 consumeAction.accept(element, this);
 160                 if (state.get() == DELAYED) {
 161                     return;
 162                 }
 163                 element = drain();
 164             }
 165             switch (state.get()) {
 166                 case IDLE:
 167                 case DELAYED:
 168                     throw new RuntimeException("Shouldn't happen");
 169                 case FLUSHING:
 170                     if(state.compareAndSet(FLUSHING, IDLE)) {
 171                         return;
 172                     }
 173                     break;
 174                 case REFLUSHING:
 175                     // We need to check if new elements were put after last poll() and do graceful exit
 176                     state.compareAndSet(REFLUSHING, FLUSHING);
 177                     break;
 178                 case CLOSED:
 179                     throw new IOException("Queue closed");
 180             }




  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package jdk.incubator.http.internal.common;
  26 
  27 
  28 import java.io.Closeable;
  29 import java.io.IOException;
  30 import java.nio.ByteBuffer;
  31 import java.util.ArrayList;
  32 import java.util.Arrays;
  33 import java.util.Deque;
  34 import java.util.List;
  35 import java.util.concurrent.ConcurrentLinkedDeque;
  36 import java.util.concurrent.atomic.AtomicInteger;
  37 import java.util.function.BiConsumer;
  38 
  39 public class AsyncWriteQueue implements Closeable {
  40     
  41     @FunctionalInterface
  42     public static interface AsyncConsumer {
  43         /**
  44          * Takes an array of buffer reference and attempt to send the data
  45          * downstream. If not all the data can be sent, then push back
  46          * to the source queue by calling {@code source.setDelayed(buffers)}
  47          * and return false. If all the data was successfully sent downstream
  48          * then returns true.
  49          * @param buffers An array of ButeBufferReference containing data
  50          *                to send downstream.
  51          * @param source This AsyncWriteQueue.
  52          * @return true if all the data could be sent downstream, false otherwise.
  53          */
  54         boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source);
  55     }
  56 
  57     private static final int IDLE    = 0;     // nobody is flushing from the queue
  58     private static final int FLUSHING = 1;    // there is the only thread flushing from the queue
  59     private static final int REFLUSHING = 2;  // while one thread was flushing from the queue
  60                                               // the other thread put data into the queue.
  61                                               // flushing thread should recheck queue before switching to idle state.
  62     private static final int DELAYED = 3;     // flushing is delayed
  63                                               // either by PlainHttpConnection.WriteEvent registration, or
  64                                               // SSL handshaking
  65 
  66     private static final int CLOSED = 4;      // queue is closed
  67 
  68     private final AtomicInteger state = new AtomicInteger(IDLE);
  69     private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
  70     private final AsyncConsumer consumeAction;
  71 
  72     // Queue may be processed in two modes:
  73     // 1. if(!doFullDrain) - invoke callback on each chunk
  74     // 2. if(doFullDrain)  - drain the whole queue, merge all chunks into the single array and invoke callback
  75     private final boolean doFullDrain;
  76 
  77     private ByteBufferReference[] delayedElement = null;
  78 
  79     public AsyncWriteQueue(AsyncConsumer consumeAction) {
  80         this(consumeAction, true);
  81     }
  82 
  83     public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) {
  84         this.consumeAction = consumeAction;
  85         this.doFullDrain = doFullDrain;
  86     }
  87 
  88     public void put(ByteBufferReference[] e) throws IOException {
  89         ensureOpen();
  90         queue.addLast(e);
  91     }
  92 
  93     public void putFirst(ByteBufferReference[] e) throws IOException {
  94         ensureOpen();
  95         queue.addFirst(e);
  96     }
  97 
  98     /**
  99      * retruns true if flushing was performed
 100      * @return
 101      * @throws IOException
 102      */
 103     public boolean flush() throws IOException {


 155         } else {
 156             return prev;
 157         }
 158     }
 159 
 160     private ByteBufferReference[] drain() {
 161         ByteBufferReference[] next = queue.poll();
 162         return next == null ? null : drain(next);
 163     }
 164 
 165     private void flushLoop() throws IOException {
 166         ByteBufferReference[] element;
 167         if (delayedElement != null) {
 168             element = drain(delayedElement);
 169             delayedElement = null;
 170         } else {
 171             element = drain();
 172         }
 173         while(true) {
 174             while (element != null) {
 175                 if (!consumeAction.trySend(element, this)) {

 176                     return;
 177                 }
 178                 element = drain();
 179             }
 180             switch (state.get()) {
 181                 case IDLE:
 182                 case DELAYED:
 183                     throw new RuntimeException("Shouldn't happen");
 184                 case FLUSHING:
 185                     if(state.compareAndSet(FLUSHING, IDLE)) {
 186                         return;
 187                     }
 188                     break;
 189                 case REFLUSHING:
 190                     // We need to check if new elements were put after last poll() and do graceful exit
 191                     state.compareAndSet(REFLUSHING, FLUSHING);
 192                     break;
 193                 case CLOSED:
 194                     throw new IOException("Queue closed");
 195             }


< prev index next >