1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package jdk.incubator.http.internal.common; 26 27 import jdk.incubator.http.internal.frame.DataFrame; 28 import jdk.incubator.http.internal.frame.Http2Frame; 29 30 import java.io.Closeable; 31 import java.io.IOException; 32 import java.util.concurrent.BlockingQueue; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.LinkedBlockingQueue; 35 import java.util.concurrent.atomic.AtomicInteger; 36 import java.util.function.Consumer; 37 38 /** 39 * Http2Frame Producer-Consumer queue which either allows to consume all frames in blocking way 40 * or allows to consume it asynchronously. In the latter case put operation from the producer thread 41 * executes consume operation in the given executor. 42 */ 43 public class AsyncDataReadQueue implements Closeable { 44 45 @FunctionalInterface 46 public interface DataConsumer { 47 /** 48 * 49 * @param t - frame 50 * @return true if consuming should be continued. false when END_STREAM was received. 51 * @throws Throwable 52 */ 53 boolean accept(Http2Frame t) throws Throwable; 54 } 55 56 private static final int BLOCKING = 0; 57 private static final int FLUSHING = 1; 58 private static final int REFLUSHING = 2; 59 private static final int ASYNC = 3; 60 private static final int CLOSED = 4; 61 62 63 private final AtomicInteger state = new AtomicInteger(BLOCKING); 64 private final BlockingQueue<Http2Frame> queue = new LinkedBlockingQueue<>(); 65 private Executor executor; 66 private DataConsumer onData; 67 private Consumer<Throwable> onError; 68 69 public AsyncDataReadQueue() { 70 } 71 72 public boolean tryPut(Http2Frame f) { 73 if(state.get() == CLOSED) { 74 return false; 75 } else { 76 queue.offer(f); 77 flushAsync(false); 78 return true; 79 } 80 } 81 82 public void put(Http2Frame f) throws IOException { 83 if(!tryPut(f)) 84 throw new IOException("stream closed"); 85 } 86 87 public void blockingReceive(DataConsumer onData, Consumer<Throwable> onError) { 88 if (state.get() == CLOSED) { 89 onError.accept(new IOException("stream closed")); 90 return; 91 } 92 assert state.get() == BLOCKING; 93 try { 94 while (onData.accept(queue.take())); 95 assert state.get() == CLOSED; 96 } catch (Throwable e) { 97 onError.accept(e); 98 } 99 } 100 101 public void asyncReceive(Executor executor, DataConsumer onData, 102 Consumer<Throwable> onError) { 103 if (state.get() == CLOSED) { 104 onError.accept(new IOException("stream closed")); 105 return; 106 } 107 108 assert state.get() == BLOCKING; 109 110 // Validates that fields not already set. 111 if (!checkCanSet("executor", this.executor, onError) 112 || !checkCanSet("onData", this.onData, onError) 113 || !checkCanSet("onError", this.onError, onError)) { 114 return; 115 } 116 117 this.executor = executor; 118 this.onData = onData; 119 this.onError = onError; 120 121 // This will report an error if asyncReceive is called twice, 122 // because we won't be in BLOCKING state if that happens 123 if (!this.state.compareAndSet(BLOCKING, ASYNC)) { 124 onError.accept(new IOException( 125 new IllegalStateException("State: "+this.state.get()))); 126 return; 127 } 128 129 flushAsync(true); 130 } 131 132 private static <T> boolean checkCanSet(String name, T oldval, Consumer<Throwable> onError) { 133 if (oldval != null) { 134 onError.accept(new IOException( 135 new IllegalArgumentException(name))); 136 return false; 137 } 138 return true; 139 } 140 141 @Override 142 public void close() { 143 int prevState = state.getAndSet(CLOSED); 144 if(prevState == BLOCKING) { 145 // wake up blocked take() 146 queue.offer(new DataFrame(0, DataFrame.END_STREAM, new ByteBufferReference[0])); 147 } 148 } 149 150 private void flushAsync(boolean alreadyInExecutor) { 151 while(true) { 152 switch (state.get()) { 153 case BLOCKING: 154 case CLOSED: 155 case REFLUSHING: 156 return; 157 case ASYNC: 158 if(state.compareAndSet(ASYNC, FLUSHING)) { 159 if(alreadyInExecutor) { 160 flushLoop(); 161 } else { 162 executor.execute(this::flushLoop); 163 } 164 return; 165 } 166 break; 167 case FLUSHING: 168 if(state.compareAndSet(FLUSHING, REFLUSHING)) { 169 return; 170 } 171 break; 172 } 173 } 174 } 175 176 private void flushLoop() { 177 try { 178 while(true) { 179 Http2Frame frame = queue.poll(); 180 while (frame != null) { 181 if(!onData.accept(frame)) { 182 assert state.get() == CLOSED; 183 return; // closed 184 } 185 frame = queue.poll(); 186 } 187 switch (state.get()) { 188 case BLOCKING: 189 assert false; 190 break; 191 case ASYNC: 192 throw new RuntimeException("Shouldn't happen"); 193 case FLUSHING: 194 if(state.compareAndSet(FLUSHING, ASYNC)) { 195 return; 196 } 197 break; 198 case REFLUSHING: 199 // We need to check if new elements were put after last 200 // poll() and do graceful exit 201 state.compareAndSet(REFLUSHING, FLUSHING); 202 break; 203 case CLOSED: 204 return; 205 } 206 } 207 } catch (Throwable e) { 208 onError.accept(e); 209 close(); 210 } 211 } 212 }