1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package jdk.incubator.http.internal.common;
  26 
  27 import jdk.incubator.http.internal.frame.DataFrame;
  28 import jdk.incubator.http.internal.frame.Http2Frame;
  29 
  30 import java.io.Closeable;
  31 import java.io.IOException;
  32 import java.util.concurrent.BlockingQueue;
  33 import java.util.concurrent.Executor;
  34 import java.util.concurrent.LinkedBlockingQueue;
  35 import java.util.concurrent.atomic.AtomicInteger;
  36 import java.util.function.Consumer;
  37 
  38 /**
  39  * Http2Frame Producer-Consumer queue which either allows to consume all frames in blocking way
  40  * or allows to consume it asynchronously. In the latter case put operation from the producer thread
  41  * executes consume operation in the given executor.
  42  */
  43 public class AsyncDataReadQueue implements Closeable {
  44 
  45     @FunctionalInterface
  46     public interface DataConsumer {
  47         /**
  48          *
  49          * @param t - frame
  50          * @return true if consuming should be continued. false when END_STREAM was received.
  51          * @throws Throwable
  52          */
  53         boolean accept(Http2Frame t) throws Throwable;
  54     }
  55 
  56     private static final int BLOCKING = 0;
  57     private static final int FLUSHING = 1;
  58     private static final int REFLUSHING = 2;
  59     private static final int ASYNC  = 3;
  60     private static final int CLOSED = 4;
  61 
  62 
  63     private final AtomicInteger state = new AtomicInteger(BLOCKING);
  64     private final BlockingQueue<Http2Frame> queue = new LinkedBlockingQueue<>();
  65     private Executor executor;
  66     private DataConsumer onData;
  67     private Consumer<Throwable> onError;
  68 
  69     public AsyncDataReadQueue() {
  70     }
  71 
  72     public boolean tryPut(Http2Frame f) {
  73         if(state.get() == CLOSED) {
  74             return false;
  75         } else {
  76             queue.offer(f);
  77             flushAsync(false);
  78             return true;
  79         }
  80     }
  81 
  82     public void put(Http2Frame f) throws IOException {
  83         if(!tryPut(f))
  84             throw new IOException("stream closed");
  85     }
  86 
  87     public void blockingReceive(DataConsumer onData, Consumer<Throwable> onError) {
  88         if (state.get() == CLOSED) {
  89             onError.accept(new IOException("stream closed"));
  90             return;
  91         }
  92         assert state.get() == BLOCKING;
  93         try {
  94             while (onData.accept(queue.take()));
  95             assert state.get() == CLOSED;
  96         } catch (Throwable e) {
  97             onError.accept(e);
  98         }
  99     }
 100 
 101     public void asyncReceive(Executor executor, DataConsumer onData,
 102                              Consumer<Throwable> onError) {
 103         if (state.get() == CLOSED) {
 104             onError.accept(new IOException("stream closed"));
 105             return;
 106         }
 107 
 108         assert state.get() == BLOCKING;
 109 
 110         // Validates that fields not already set.
 111         if (!checkCanSet("executor", this.executor, onError)
 112             || !checkCanSet("onData", this.onData, onError)
 113             || !checkCanSet("onError", this.onError, onError)) {
 114             return;
 115         }
 116 
 117         this.executor = executor;
 118         this.onData = onData;
 119         this.onError = onError;
 120 
 121         // This will report an error if asyncReceive is called twice,
 122         // because we won't be in BLOCKING state if that happens
 123         if (!this.state.compareAndSet(BLOCKING, ASYNC)) {
 124             onError.accept(new IOException(
 125                   new IllegalStateException("State: "+this.state.get())));
 126             return;
 127         }
 128 
 129         flushAsync(true);
 130     }
 131 
 132     private static <T> boolean checkCanSet(String name, T oldval, Consumer<Throwable> onError) {
 133         if (oldval != null) {
 134             onError.accept(new IOException(
 135                      new IllegalArgumentException(name)));
 136             return false;
 137         }
 138         return true;
 139     }
 140 
 141     @Override
 142     public void close() {
 143         int prevState = state.getAndSet(CLOSED);
 144         if(prevState == BLOCKING) {
 145             // wake up blocked take()
 146             queue.offer(new DataFrame(0, DataFrame.END_STREAM, new ByteBufferReference[0]));
 147         }
 148     }
 149 
 150     private void flushAsync(boolean alreadyInExecutor) {
 151         while(true) {
 152             switch (state.get()) {
 153                 case BLOCKING:
 154                 case CLOSED:
 155                 case REFLUSHING:
 156                     return;
 157                 case ASYNC:
 158                     if(state.compareAndSet(ASYNC, FLUSHING)) {
 159                         if(alreadyInExecutor) {
 160                             flushLoop();
 161                         } else {
 162                             executor.execute(this::flushLoop);
 163                         }
 164                         return;
 165                     }
 166                     break;
 167                 case FLUSHING:
 168                     if(state.compareAndSet(FLUSHING, REFLUSHING)) {
 169                         return;
 170                     }
 171                     break;
 172             }
 173         }
 174     }
 175 
 176     private void flushLoop() {
 177         try {
 178             while(true) {
 179                 Http2Frame frame = queue.poll();
 180                 while (frame != null) {
 181                     if(!onData.accept(frame)) {
 182                         assert state.get() == CLOSED;
 183                         return; // closed
 184                     }
 185                     frame = queue.poll();
 186                 }
 187                 switch (state.get()) {
 188                     case BLOCKING:
 189                         assert false;
 190                         break;
 191                     case ASYNC:
 192                         throw new RuntimeException("Shouldn't happen");
 193                     case FLUSHING:
 194                         if(state.compareAndSet(FLUSHING, ASYNC)) {
 195                             return;
 196                         }
 197                         break;
 198                     case REFLUSHING:
 199                         // We need to check if new elements were put after last
 200                         // poll() and do graceful exit
 201                         state.compareAndSet(REFLUSHING, FLUSHING);
 202                         break;
 203                     case CLOSED:
 204                         return;
 205                 }
 206             }
 207         } catch (Throwable e) {
 208             onError.accept(e);
 209             close();
 210         }
 211     }
 212 }