< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java
Print this page
*** 23,36 ****
* questions.
*/
package jdk.incubator.http.internal.websocket;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
- import java.util.concurrent.atomic.AtomicLong;
/*
* Receives incoming data from the channel on demand and converts it into a
* stream of WebSocket messages which are then delivered to the supplied message
* consumer in a strict sequential order and non-recursively. In other words,
--- 23,38 ----
* questions.
*/
package jdk.incubator.http.internal.websocket;
+ import jdk.incubator.http.internal.common.Demand;
+ import jdk.incubator.http.internal.common.SequentialScheduler;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
/*
* Receives incoming data from the channel on demand and converts it into a
* stream of WebSocket messages which are then delivered to the supplied message
* consumer in a strict sequential order and non-recursively. In other words,
*** 47,82 ****
* onBinary()
* ...
*
* even if `request(long n)` is called from inside these invocations.
*/
! final class Receiver {
private final MessageStreamConsumer messageConsumer;
private final RawChannel channel;
private final FrameConsumer frameConsumer;
private final Frame.Reader reader = new Frame.Reader();
private final RawChannel.RawEvent event = createHandler();
! private final AtomicLong demand = new AtomicLong();
! private final CooperativeHandler handler;
private ByteBuffer data;
private volatile int state;
private static final int UNREGISTERED = 0;
private static final int AVAILABLE = 1;
private static final int WAITING = 2;
! Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
this.messageConsumer = messageConsumer;
this.channel = channel;
this.frameConsumer = new FrameConsumer(this.messageConsumer);
this.data = channel.initialByteBuffer();
// To ensure the initial non-final `data` will be visible
// (happens-before) when `handler` invokes `pushContinuously`
// the following assignment is done last:
! handler = new CooperativeHandler(this::pushContinuously);
}
private RawChannel.RawEvent createHandler() {
return new RawChannel.RawEvent() {
--- 49,89 ----
* onBinary()
* ...
*
* even if `request(long n)` is called from inside these invocations.
*/
! public class Receiver {
private final MessageStreamConsumer messageConsumer;
private final RawChannel channel;
private final FrameConsumer frameConsumer;
private final Frame.Reader reader = new Frame.Reader();
private final RawChannel.RawEvent event = createHandler();
! protected final Demand demand = new Demand(); /* Exposed for testing purposes */
! private final SequentialScheduler pushScheduler;
private ByteBuffer data;
private volatile int state;
private static final int UNREGISTERED = 0;
private static final int AVAILABLE = 1;
private static final int WAITING = 2;
! public Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
this.messageConsumer = messageConsumer;
this.channel = channel;
this.frameConsumer = new FrameConsumer(this.messageConsumer);
this.data = channel.initialByteBuffer();
// To ensure the initial non-final `data` will be visible
// (happens-before) when `handler` invokes `pushContinuously`
// the following assignment is done last:
! pushScheduler = createScheduler();
! }
!
! /* Exposed for testing purposes */
! protected SequentialScheduler createScheduler() {
! return new SequentialScheduler(new PushContinuouslyTask());
}
private RawChannel.RawEvent createHandler() {
return new RawChannel.RawEvent() {
*** 86,134 ****
}
@Override
public void handle() {
state = AVAILABLE;
! handler.handle();
}
};
}
! void request(long n) {
! if (n < 0L) {
! throw new IllegalArgumentException("Negative: " + n);
}
- demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
- handler.handle();
}
void acknowledge() {
! long x = demand.decrementAndGet();
if (x < 0) {
throw new InternalError(String.valueOf(x));
}
}
/*
* Stops the machinery from reading and delivering messages permanently,
* regardless of the current demand and data availability.
*/
! void close() {
! handler.stop();
}
! private void pushContinuously() {
! while (!handler.isStopped()) {
if (data.hasRemaining()) {
! if (demand.get() > 0) {
try {
int oldPos = data.position();
reader.readFrame(data, frameConsumer);
int newPos = data.position();
assert oldPos != newPos : data; // reader always consumes bytes
! } catch (FailWebSocketException e) {
! handler.stop();
messageConsumer.onError(e);
}
continue;
}
break;
--- 93,150 ----
}
@Override
public void handle() {
state = AVAILABLE;
! pushScheduler.runOrSchedule();
}
};
}
! public void request(long n) {
! if (demand.increase(n)) {
! pushScheduler.runOrSchedule();
}
}
+ /*
+ * Why is this method needed? Since Receiver operates through callbacks
+ * this method allows to abstract out what constitutes as a message being
+ * received (i.e. to decide outside this type when exactly one should
+ * decrement the demand).
+ */
void acknowledge() {
! long x = demand.decreaseAndGet(1);
if (x < 0) {
throw new InternalError(String.valueOf(x));
}
}
/*
* Stops the machinery from reading and delivering messages permanently,
* regardless of the current demand and data availability.
*/
! public void close() throws IOException {
! pushScheduler.stop();
! channel.shutdownInput();
}
! private class PushContinuouslyTask
! extends SequentialScheduler.CompleteRestartableTask
! {
! @Override
! public void run() {
! while (!pushScheduler.isStopped()) {
if (data.hasRemaining()) {
! if (!demand.isFulfilled()) {
try {
int oldPos = data.position();
reader.readFrame(data, frameConsumer);
int newPos = data.position();
assert oldPos != newPos : data; // reader always consumes bytes
! } catch (Throwable e) {
! pushScheduler.stop();
messageConsumer.onError(e);
}
continue;
}
break;
*** 138,162 ****
return;
case UNREGISTERED:
try {
state = WAITING;
channel.registerEvent(event);
! } catch (IOException e) {
! handler.stop();
messageConsumer.onError(e);
}
return;
case AVAILABLE:
try {
data = channel.read();
! } catch (IOException e) {
! handler.stop();
messageConsumer.onError(e);
return;
}
if (data == null) { // EOF
! handler.stop();
messageConsumer.onComplete();
return;
} else if (!data.hasRemaining()) { // No data at the moment
// Pretty much a "goto", reusing the existing code path
// for registration
--- 154,178 ----
return;
case UNREGISTERED:
try {
state = WAITING;
channel.registerEvent(event);
! } catch (Throwable e) {
! pushScheduler.stop();
messageConsumer.onError(e);
}
return;
case AVAILABLE:
try {
data = channel.read();
! } catch (Throwable e) {
! pushScheduler.stop();
messageConsumer.onError(e);
return;
}
if (data == null) { // EOF
! pushScheduler.stop();
messageConsumer.onComplete();
return;
} else if (!data.hasRemaining()) { // No data at the moment
// Pretty much a "goto", reusing the existing code path
// for registration
*** 166,172 ****
default:
throw new InternalError(String.valueOf(state));
}
}
}
}
-
--- 182,188 ----
default:
throw new InternalError(String.valueOf(state));
}
}
}
+ }
}
< prev index next >