< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java

Print this page

        

@@ -23,14 +23,16 @@
  * questions.
  */
 
 package jdk.incubator.http.internal.websocket;
 
+import jdk.incubator.http.internal.common.Demand;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.util.concurrent.atomic.AtomicLong;
 
 /*
  * Receives incoming data from the channel on demand and converts it into a
  * stream of WebSocket messages which are then delivered to the supplied message
  * consumer in a strict sequential order and non-recursively. In other words,

@@ -47,36 +49,41 @@
  *         onBinary()
  *     ...
  *
  * even if `request(long n)` is called from inside these invocations.
  */
-final class Receiver {
+public class Receiver {
 
     private final MessageStreamConsumer messageConsumer;
     private final RawChannel channel;
     private final FrameConsumer frameConsumer;
     private final Frame.Reader reader = new Frame.Reader();
     private final RawChannel.RawEvent event = createHandler();
-    private final AtomicLong demand = new AtomicLong();
-    private final CooperativeHandler handler;
+    protected final Demand demand = new Demand(); /* Exposed for testing purposes */
+    private final SequentialScheduler pushScheduler;
 
     private ByteBuffer data;
     private volatile int state;
 
     private static final int UNREGISTERED = 0;
     private static final int AVAILABLE    = 1;
     private static final int WAITING      = 2;
 
-    Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
+    public Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
         this.messageConsumer = messageConsumer;
         this.channel = channel;
         this.frameConsumer = new FrameConsumer(this.messageConsumer);
         this.data = channel.initialByteBuffer();
         // To ensure the initial non-final `data` will be visible
         // (happens-before) when `handler` invokes `pushContinuously`
         // the following assignment is done last:
-        handler = new CooperativeHandler(this::pushContinuously);
+        pushScheduler = createScheduler();
+    }
+
+    /* Exposed for testing purposes */
+    protected SequentialScheduler createScheduler() {
+        return new SequentialScheduler(new PushContinuouslyTask());
     }
 
     private RawChannel.RawEvent createHandler() {
         return new RawChannel.RawEvent() {
 

@@ -86,49 +93,58 @@
             }
 
             @Override
             public void handle() {
                 state = AVAILABLE;
-                handler.handle();
+                pushScheduler.runOrSchedule();
             }
         };
     }
 
-    void request(long n) {
-        if (n < 0L) {
-            throw new IllegalArgumentException("Negative: " + n);
+    public void request(long n) {
+        if (demand.increase(n)) {
+            pushScheduler.runOrSchedule();
         }
-        demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
-        handler.handle();
     }
 
+    /*
+     * Why is this method needed? Since Receiver operates through callbacks
+     * this method allows to abstract out what constitutes as a message being
+     * received (i.e. to decide outside this type when exactly one should
+     * decrement the demand).
+     */
     void acknowledge() {
-        long x = demand.decrementAndGet();
+        long x = demand.decreaseAndGet(1);
         if (x < 0) {
             throw new InternalError(String.valueOf(x));
         }
     }
 
     /*
      * Stops the machinery from reading and delivering messages permanently,
      * regardless of the current demand and data availability.
      */
-    void close() {
-        handler.stop();
+    public void close() throws IOException {
+        pushScheduler.stop();
+        channel.shutdownInput();
     }
 
-    private void pushContinuously() {
-        while (!handler.isStopped()) {
+    private class PushContinuouslyTask
+        extends SequentialScheduler.CompleteRestartableTask
+    {
+        @Override
+        public void run() {
+            while (!pushScheduler.isStopped()) {
             if (data.hasRemaining()) {
-                if (demand.get() > 0) {
+                    if (!demand.isFulfilled()) {
                     try {
                         int oldPos = data.position();
                         reader.readFrame(data, frameConsumer);
                         int newPos = data.position();
                         assert oldPos != newPos : data; // reader always consumes bytes
-                    } catch (FailWebSocketException e) {
-                        handler.stop();
+                        } catch (Throwable e) {
+                            pushScheduler.stop();
                         messageConsumer.onError(e);
                     }
                     continue;
                 }
                 break;

@@ -138,25 +154,25 @@
                     return;
                 case UNREGISTERED:
                     try {
                         state = WAITING;
                         channel.registerEvent(event);
-                    } catch (IOException e) {
-                        handler.stop();
+                        } catch (Throwable e) {
+                            pushScheduler.stop();
                         messageConsumer.onError(e);
                     }
                     return;
                 case AVAILABLE:
                     try {
                         data = channel.read();
-                    } catch (IOException e) {
-                        handler.stop();
+                        } catch (Throwable e) {
+                            pushScheduler.stop();
                         messageConsumer.onError(e);
                         return;
                     }
                     if (data == null) { // EOF
-                        handler.stop();
+                            pushScheduler.stop();
                         messageConsumer.onComplete();
                         return;
                     } else if (!data.hasRemaining()) { // No data at the moment
                         // Pretty much a "goto", reusing the existing code path
                         // for registration

@@ -166,7 +182,7 @@
                 default:
                     throw new InternalError(String.valueOf(state));
             }
         }
     }
+    }
 }
-
< prev index next >