21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package jdk.incubator.http.internal.common;
26
27
28 import java.io.Closeable;
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Deque;
34 import java.util.List;
35 import java.util.concurrent.ConcurrentLinkedDeque;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.function.BiConsumer;
38
39 public class AsyncWriteQueue implements Closeable {
40
41 private static final int IDLE = 0; // nobody is flushing from the queue
42 private static final int FLUSHING = 1; // there is the only thread flushing from the queue
43 private static final int REFLUSHING = 2; // while one thread was flushing from the queue
44 // the other thread put data into the queue.
45 // flushing thread should recheck queue before switching to idle state.
46 private static final int DELAYED = 3; // flushing is delayed
47 // either by PlainHttpConnection.WriteEvent registration, or
48 // SSL handshaking
49
50 private static final int CLOSED = 4; // queue is closed
51
52 private final AtomicInteger state = new AtomicInteger(IDLE);
53 private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
54 private final BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction;
55
56 // Queue may be processed in two modes:
57 // 1. if(!doFullDrain) - invoke callback on each chunk
58 // 2. if(doFullDrain) - drain the whole queue, merge all chunks into the single array and invoke callback
59 private final boolean doFullDrain;
60
61 private ByteBufferReference[] delayedElement = null;
62
63 public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction) {
64 this(consumeAction, true);
65 }
66
67 public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction, boolean doFullDrain) {
68 this.consumeAction = consumeAction;
69 this.doFullDrain = doFullDrain;
70 }
71
72 public void put(ByteBufferReference[] e) throws IOException {
73 ensureOpen();
74 queue.addLast(e);
75 }
76
77 public void putFirst(ByteBufferReference[] e) throws IOException {
78 ensureOpen();
79 queue.addFirst(e);
80 }
81
82 /**
83 * retruns true if flushing was performed
84 * @return
85 * @throws IOException
86 */
87 public boolean flush() throws IOException {
139 } else {
140 return prev;
141 }
142 }
143
144 private ByteBufferReference[] drain() {
145 ByteBufferReference[] next = queue.poll();
146 return next == null ? null : drain(next);
147 }
148
149 private void flushLoop() throws IOException {
150 ByteBufferReference[] element;
151 if (delayedElement != null) {
152 element = drain(delayedElement);
153 delayedElement = null;
154 } else {
155 element = drain();
156 }
157 while(true) {
158 while (element != null) {
159 consumeAction.accept(element, this);
160 if (state.get() == DELAYED) {
161 return;
162 }
163 element = drain();
164 }
165 switch (state.get()) {
166 case IDLE:
167 case DELAYED:
168 throw new RuntimeException("Shouldn't happen");
169 case FLUSHING:
170 if(state.compareAndSet(FLUSHING, IDLE)) {
171 return;
172 }
173 break;
174 case REFLUSHING:
175 // We need to check if new elements were put after last poll() and do graceful exit
176 state.compareAndSet(REFLUSHING, FLUSHING);
177 break;
178 case CLOSED:
179 throw new IOException("Queue closed");
180 }
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25 package jdk.incubator.http.internal.common;
26
27
28 import java.io.Closeable;
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Deque;
34 import java.util.List;
35 import java.util.concurrent.ConcurrentLinkedDeque;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.function.BiConsumer;
38
39 public class AsyncWriteQueue implements Closeable {
40
41 @FunctionalInterface
42 public static interface AsyncConsumer {
43 /**
44 * Takes an array of buffer reference and attempt to send the data
45 * downstream. If not all the data can be sent, then push back
46 * to the source queue by calling {@code source.setDelayed(buffers)}
47 * and return false. If all the data was successfully sent downstream
48 * then returns true.
49 * @param buffers An array of ButeBufferReference containing data
50 * to send downstream.
51 * @param source This AsyncWriteQueue.
52 * @return true if all the data could be sent downstream, false otherwise.
53 */
54 boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source);
55 }
56
57 private static final int IDLE = 0; // nobody is flushing from the queue
58 private static final int FLUSHING = 1; // there is the only thread flushing from the queue
59 private static final int REFLUSHING = 2; // while one thread was flushing from the queue
60 // the other thread put data into the queue.
61 // flushing thread should recheck queue before switching to idle state.
62 private static final int DELAYED = 3; // flushing is delayed
63 // either by PlainHttpConnection.WriteEvent registration, or
64 // SSL handshaking
65
66 private static final int CLOSED = 4; // queue is closed
67
68 private final AtomicInteger state = new AtomicInteger(IDLE);
69 private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
70 private final AsyncConsumer consumeAction;
71
72 // Queue may be processed in two modes:
73 // 1. if(!doFullDrain) - invoke callback on each chunk
74 // 2. if(doFullDrain) - drain the whole queue, merge all chunks into the single array and invoke callback
75 private final boolean doFullDrain;
76
77 private ByteBufferReference[] delayedElement = null;
78
79 public AsyncWriteQueue(AsyncConsumer consumeAction) {
80 this(consumeAction, true);
81 }
82
83 public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) {
84 this.consumeAction = consumeAction;
85 this.doFullDrain = doFullDrain;
86 }
87
88 public void put(ByteBufferReference[] e) throws IOException {
89 ensureOpen();
90 queue.addLast(e);
91 }
92
93 public void putFirst(ByteBufferReference[] e) throws IOException {
94 ensureOpen();
95 queue.addFirst(e);
96 }
97
98 /**
99 * retruns true if flushing was performed
100 * @return
101 * @throws IOException
102 */
103 public boolean flush() throws IOException {
155 } else {
156 return prev;
157 }
158 }
159
160 private ByteBufferReference[] drain() {
161 ByteBufferReference[] next = queue.poll();
162 return next == null ? null : drain(next);
163 }
164
165 private void flushLoop() throws IOException {
166 ByteBufferReference[] element;
167 if (delayedElement != null) {
168 element = drain(delayedElement);
169 delayedElement = null;
170 } else {
171 element = drain();
172 }
173 while(true) {
174 while (element != null) {
175 if (!consumeAction.trySend(element, this)) {
176 return;
177 }
178 element = drain();
179 }
180 switch (state.get()) {
181 case IDLE:
182 case DELAYED:
183 throw new RuntimeException("Shouldn't happen");
184 case FLUSHING:
185 if(state.compareAndSet(FLUSHING, IDLE)) {
186 return;
187 }
188 break;
189 case REFLUSHING:
190 // We need to check if new elements were put after last poll() and do graceful exit
191 state.compareAndSet(REFLUSHING, FLUSHING);
192 break;
193 case CLOSED:
194 throw new IOException("Queue closed");
195 }
|