< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java

Print this page

        

*** 23,36 **** * questions. */ package jdk.incubator.http.internal.websocket; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; - import java.util.concurrent.atomic.AtomicLong; /* * Receives incoming data from the channel on demand and converts it into a * stream of WebSocket messages which are then delivered to the supplied message * consumer in a strict sequential order and non-recursively. In other words, --- 23,38 ---- * questions. */ package jdk.incubator.http.internal.websocket; + import jdk.incubator.http.internal.common.Demand; + import jdk.incubator.http.internal.common.SequentialScheduler; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; /* * Receives incoming data from the channel on demand and converts it into a * stream of WebSocket messages which are then delivered to the supplied message * consumer in a strict sequential order and non-recursively. In other words,
*** 47,82 **** * onBinary() * ... * * even if `request(long n)` is called from inside these invocations. */ ! final class Receiver { private final MessageStreamConsumer messageConsumer; private final RawChannel channel; private final FrameConsumer frameConsumer; private final Frame.Reader reader = new Frame.Reader(); private final RawChannel.RawEvent event = createHandler(); ! private final AtomicLong demand = new AtomicLong(); ! private final CooperativeHandler handler; private ByteBuffer data; private volatile int state; private static final int UNREGISTERED = 0; private static final int AVAILABLE = 1; private static final int WAITING = 2; ! Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) { this.messageConsumer = messageConsumer; this.channel = channel; this.frameConsumer = new FrameConsumer(this.messageConsumer); this.data = channel.initialByteBuffer(); // To ensure the initial non-final `data` will be visible // (happens-before) when `handler` invokes `pushContinuously` // the following assignment is done last: ! handler = new CooperativeHandler(this::pushContinuously); } private RawChannel.RawEvent createHandler() { return new RawChannel.RawEvent() { --- 49,89 ---- * onBinary() * ... * * even if `request(long n)` is called from inside these invocations. */ ! public class Receiver { private final MessageStreamConsumer messageConsumer; private final RawChannel channel; private final FrameConsumer frameConsumer; private final Frame.Reader reader = new Frame.Reader(); private final RawChannel.RawEvent event = createHandler(); ! protected final Demand demand = new Demand(); /* Exposed for testing purposes */ ! private final SequentialScheduler pushScheduler; private ByteBuffer data; private volatile int state; private static final int UNREGISTERED = 0; private static final int AVAILABLE = 1; private static final int WAITING = 2; ! public Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) { this.messageConsumer = messageConsumer; this.channel = channel; this.frameConsumer = new FrameConsumer(this.messageConsumer); this.data = channel.initialByteBuffer(); // To ensure the initial non-final `data` will be visible // (happens-before) when `handler` invokes `pushContinuously` // the following assignment is done last: ! pushScheduler = createScheduler(); ! } ! ! /* Exposed for testing purposes */ ! protected SequentialScheduler createScheduler() { ! return new SequentialScheduler(new PushContinuouslyTask()); } private RawChannel.RawEvent createHandler() { return new RawChannel.RawEvent() {
*** 86,134 **** } @Override public void handle() { state = AVAILABLE; ! handler.handle(); } }; } ! void request(long n) { ! if (n < 0L) { ! throw new IllegalArgumentException("Negative: " + n); } - demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i); - handler.handle(); } void acknowledge() { ! long x = demand.decrementAndGet(); if (x < 0) { throw new InternalError(String.valueOf(x)); } } /* * Stops the machinery from reading and delivering messages permanently, * regardless of the current demand and data availability. */ ! void close() { ! handler.stop(); } ! private void pushContinuously() { ! while (!handler.isStopped()) { if (data.hasRemaining()) { ! if (demand.get() > 0) { try { int oldPos = data.position(); reader.readFrame(data, frameConsumer); int newPos = data.position(); assert oldPos != newPos : data; // reader always consumes bytes ! } catch (FailWebSocketException e) { ! handler.stop(); messageConsumer.onError(e); } continue; } break; --- 93,150 ---- } @Override public void handle() { state = AVAILABLE; ! pushScheduler.runOrSchedule(); } }; } ! public void request(long n) { ! if (demand.increase(n)) { ! pushScheduler.runOrSchedule(); } } + /* + * Why is this method needed? Since Receiver operates through callbacks + * this method allows to abstract out what constitutes as a message being + * received (i.e. to decide outside this type when exactly one should + * decrement the demand). + */ void acknowledge() { ! long x = demand.decreaseAndGet(1); if (x < 0) { throw new InternalError(String.valueOf(x)); } } /* * Stops the machinery from reading and delivering messages permanently, * regardless of the current demand and data availability. */ ! public void close() throws IOException { ! pushScheduler.stop(); ! channel.shutdownInput(); } ! private class PushContinuouslyTask ! extends SequentialScheduler.CompleteRestartableTask ! { ! @Override ! public void run() { ! while (!pushScheduler.isStopped()) { if (data.hasRemaining()) { ! if (!demand.isFulfilled()) { try { int oldPos = data.position(); reader.readFrame(data, frameConsumer); int newPos = data.position(); assert oldPos != newPos : data; // reader always consumes bytes ! } catch (Throwable e) { ! pushScheduler.stop(); messageConsumer.onError(e); } continue; } break;
*** 138,162 **** return; case UNREGISTERED: try { state = WAITING; channel.registerEvent(event); ! } catch (IOException e) { ! handler.stop(); messageConsumer.onError(e); } return; case AVAILABLE: try { data = channel.read(); ! } catch (IOException e) { ! handler.stop(); messageConsumer.onError(e); return; } if (data == null) { // EOF ! handler.stop(); messageConsumer.onComplete(); return; } else if (!data.hasRemaining()) { // No data at the moment // Pretty much a "goto", reusing the existing code path // for registration --- 154,178 ---- return; case UNREGISTERED: try { state = WAITING; channel.registerEvent(event); ! } catch (Throwable e) { ! pushScheduler.stop(); messageConsumer.onError(e); } return; case AVAILABLE: try { data = channel.read(); ! } catch (Throwable e) { ! pushScheduler.stop(); messageConsumer.onError(e); return; } if (data == null) { // EOF ! pushScheduler.stop(); messageConsumer.onComplete(); return; } else if (!data.hasRemaining()) { // No data at the moment // Pretty much a "goto", reusing the existing code path // for registration
*** 166,172 **** default: throw new InternalError(String.valueOf(state)); } } } } - --- 182,188 ---- default: throw new InternalError(String.valueOf(state)); } } } + } }
< prev index next >