--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java 2017-11-30 04:04:47.970335073 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java 2017-11-30 04:04:47.766317242 -0800 @@ -25,10 +25,12 @@ package jdk.incubator.http.internal.websocket; +import jdk.incubator.http.internal.common.Demand; +import jdk.incubator.http.internal.common.SequentialScheduler; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.util.concurrent.atomic.AtomicLong; /* * Receives incoming data from the channel on demand and converts it into a @@ -49,15 +51,15 @@ * * even if `request(long n)` is called from inside these invocations. */ -final class Receiver { +public class Receiver { private final MessageStreamConsumer messageConsumer; private final RawChannel channel; private final FrameConsumer frameConsumer; private final Frame.Reader reader = new Frame.Reader(); private final RawChannel.RawEvent event = createHandler(); - private final AtomicLong demand = new AtomicLong(); - private final CooperativeHandler handler; + protected final Demand demand = new Demand(); /* Exposed for testing purposes */ + private final SequentialScheduler pushScheduler; private ByteBuffer data; private volatile int state; @@ -66,7 +68,7 @@ private static final int AVAILABLE = 1; private static final int WAITING = 2; - Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) { + public Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) { this.messageConsumer = messageConsumer; this.channel = channel; this.frameConsumer = new FrameConsumer(this.messageConsumer); @@ -74,7 +76,12 @@ // To ensure the initial non-final `data` will be visible // (happens-before) when `handler` invokes `pushContinuously` // the following assignment is done last: - handler = new CooperativeHandler(this::pushContinuously); + pushScheduler = createScheduler(); + } + + /* Exposed for testing purposes */ + protected SequentialScheduler createScheduler() { + return new SequentialScheduler(new PushContinuouslyTask()); } private RawChannel.RawEvent createHandler() { @@ -88,21 +95,25 @@ @Override public void handle() { state = AVAILABLE; - handler.handle(); + pushScheduler.runOrSchedule(); } }; } - void request(long n) { - if (n < 0L) { - throw new IllegalArgumentException("Negative: " + n); + public void request(long n) { + if (demand.increase(n)) { + pushScheduler.runOrSchedule(); } - demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i); - handler.handle(); } + /* + * Why is this method needed? Since Receiver operates through callbacks + * this method allows to abstract out what constitutes as a message being + * received (i.e. to decide outside this type when exactly one should + * decrement the demand). + */ void acknowledge() { - long x = demand.decrementAndGet(); + long x = demand.decreaseAndGet(1); if (x < 0) { throw new InternalError(String.valueOf(x)); } @@ -112,61 +123,66 @@ * Stops the machinery from reading and delivering messages permanently, * regardless of the current demand and data availability. */ - void close() { - handler.stop(); - } - - private void pushContinuously() { - while (!handler.isStopped()) { - if (data.hasRemaining()) { - if (demand.get() > 0) { - try { - int oldPos = data.position(); - reader.readFrame(data, frameConsumer); - int newPos = data.position(); - assert oldPos != newPos : data; // reader always consumes bytes - } catch (FailWebSocketException e) { - handler.stop(); - messageConsumer.onError(e); + public void close() throws IOException { + pushScheduler.stop(); + channel.shutdownInput(); + } + + private class PushContinuouslyTask + extends SequentialScheduler.CompleteRestartableTask + { + @Override + public void run() { + while (!pushScheduler.isStopped()) { + if (data.hasRemaining()) { + if (!demand.isFulfilled()) { + try { + int oldPos = data.position(); + reader.readFrame(data, frameConsumer); + int newPos = data.position(); + assert oldPos != newPos : data; // reader always consumes bytes + } catch (Throwable e) { + pushScheduler.stop(); + messageConsumer.onError(e); + } + continue; } - continue; + break; } - break; - } - switch (state) { - case WAITING: - return; - case UNREGISTERED: - try { - state = WAITING; - channel.registerEvent(event); - } catch (IOException e) { - handler.stop(); - messageConsumer.onError(e); - } - return; - case AVAILABLE: - try { - data = channel.read(); - } catch (IOException e) { - handler.stop(); - messageConsumer.onError(e); + switch (state) { + case WAITING: return; - } - if (data == null) { // EOF - handler.stop(); - messageConsumer.onComplete(); + case UNREGISTERED: + try { + state = WAITING; + channel.registerEvent(event); + } catch (Throwable e) { + pushScheduler.stop(); + messageConsumer.onError(e); + } return; - } else if (!data.hasRemaining()) { // No data at the moment - // Pretty much a "goto", reusing the existing code path - // for registration - state = UNREGISTERED; - } - continue; - default: - throw new InternalError(String.valueOf(state)); + case AVAILABLE: + try { + data = channel.read(); + } catch (Throwable e) { + pushScheduler.stop(); + messageConsumer.onError(e); + return; + } + if (data == null) { // EOF + pushScheduler.stop(); + messageConsumer.onComplete(); + return; + } else if (!data.hasRemaining()) { // No data at the moment + // Pretty much a "goto", reusing the existing code path + // for registration + state = UNREGISTERED; + } + continue; + default: + throw new InternalError(String.valueOf(state)); + } } } } } -