8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http.internal.websocket; 27 28 import java.io.IOException; 29 import java.nio.ByteBuffer; 30 import java.nio.channels.SelectionKey; 31 import java.util.concurrent.atomic.AtomicLong; 32 33 /* 34 * Receives incoming data from the channel on demand and converts it into a 35 * stream of WebSocket messages which are then delivered to the supplied message 36 * consumer in a strict sequential order and non-recursively. In other words, 37 * 38 * onText() 39 * onText() 40 * onBinary() 41 * ... 42 * 43 * instead of 44 * 45 * onText() 46 * onText() 47 * onBinary() 48 * ... 49 * 50 * even if `request(long n)` is called from inside these invocations. 51 */ 52 final class Receiver { 53 54 private final MessageStreamConsumer messageConsumer; 55 private final RawChannel channel; 56 private final FrameConsumer frameConsumer; 57 private final Frame.Reader reader = new Frame.Reader(); 58 private final RawChannel.RawEvent event = createHandler(); 59 private final AtomicLong demand = new AtomicLong(); 60 private final CooperativeHandler handler; 61 62 private ByteBuffer data; 63 private volatile int state; 64 65 private static final int UNREGISTERED = 0; 66 private static final int AVAILABLE = 1; 67 private static final int WAITING = 2; 68 69 Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) { 70 this.messageConsumer = messageConsumer; 71 this.channel = channel; 72 this.frameConsumer = new FrameConsumer(this.messageConsumer); 73 this.data = channel.initialByteBuffer(); 74 // To ensure the initial non-final `data` will be visible 75 // (happens-before) when `handler` invokes `pushContinuously` 76 // the following assignment is done last: 77 handler = new CooperativeHandler(this::pushContinuously); 78 } 79 80 private RawChannel.RawEvent createHandler() { 81 return new RawChannel.RawEvent() { 82 83 @Override 84 public int interestOps() { 85 return SelectionKey.OP_READ; 86 } 87 88 @Override 89 public void handle() { 90 state = AVAILABLE; 91 handler.handle(); 92 } 93 }; 94 } 95 96 void request(long n) { 97 if (n < 0L) { 98 throw new IllegalArgumentException("Negative: " + n); 99 } 100 demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i); 101 handler.handle(); 102 } 103 104 void acknowledge() { 105 long x = demand.decrementAndGet(); 106 if (x < 0) { 107 throw new InternalError(String.valueOf(x)); 108 } 109 } 110 111 /* 112 * Stops the machinery from reading and delivering messages permanently, 113 * regardless of the current demand and data availability. 114 */ 115 void close() { 116 handler.stop(); 117 } 118 119 private void pushContinuously() { 120 while (!handler.isStopped()) { 121 if (data.hasRemaining()) { 122 if (demand.get() > 0) { 123 try { 124 int oldPos = data.position(); 125 reader.readFrame(data, frameConsumer); 126 int newPos = data.position(); 127 assert oldPos != newPos : data; // reader always consumes bytes 128 } catch (FailWebSocketException e) { 129 handler.stop(); 130 messageConsumer.onError(e); 131 } 132 continue; 133 } 134 break; 135 } 136 switch (state) { 137 case WAITING: 138 return; 139 case UNREGISTERED: 140 try { 141 state = WAITING; 142 channel.registerEvent(event); 143 } catch (IOException e) { 144 handler.stop(); 145 messageConsumer.onError(e); 146 } 147 return; 148 case AVAILABLE: 149 try { 150 data = channel.read(); 151 } catch (IOException e) { 152 handler.stop(); 153 messageConsumer.onError(e); 154 return; 155 } 156 if (data == null) { // EOF 157 handler.stop(); 158 messageConsumer.onComplete(); 159 return; 160 } else if (!data.hasRemaining()) { // No data at the moment 161 // Pretty much a "goto", reusing the existing code path 162 // for registration 163 state = UNREGISTERED; 164 } 165 continue; 166 default: 167 throw new InternalError(String.valueOf(state)); 168 } 169 } 170 } 171 } 172 | 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http.internal.websocket; 27 28 import jdk.incubator.http.internal.common.Demand; 29 import jdk.incubator.http.internal.common.SequentialScheduler; 30 31 import java.io.IOException; 32 import java.nio.ByteBuffer; 33 import java.nio.channels.SelectionKey; 34 35 /* 36 * Receives incoming data from the channel on demand and converts it into a 37 * stream of WebSocket messages which are then delivered to the supplied message 38 * consumer in a strict sequential order and non-recursively. In other words, 39 * 40 * onText() 41 * onText() 42 * onBinary() 43 * ... 44 * 45 * instead of 46 * 47 * onText() 48 * onText() 49 * onBinary() 50 * ... 51 * 52 * even if `request(long n)` is called from inside these invocations. 53 */ 54 public class Receiver { 55 56 private final MessageStreamConsumer messageConsumer; 57 private final RawChannel channel; 58 private final FrameConsumer frameConsumer; 59 private final Frame.Reader reader = new Frame.Reader(); 60 private final RawChannel.RawEvent event = createHandler(); 61 protected final Demand demand = new Demand(); /* Exposed for testing purposes */ 62 private final SequentialScheduler pushScheduler; 63 64 private ByteBuffer data; 65 private volatile int state; 66 67 private static final int UNREGISTERED = 0; 68 private static final int AVAILABLE = 1; 69 private static final int WAITING = 2; 70 71 public Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) { 72 this.messageConsumer = messageConsumer; 73 this.channel = channel; 74 this.frameConsumer = new FrameConsumer(this.messageConsumer); 75 this.data = channel.initialByteBuffer(); 76 // To ensure the initial non-final `data` will be visible 77 // (happens-before) when `handler` invokes `pushContinuously` 78 // the following assignment is done last: 79 pushScheduler = createScheduler(); 80 } 81 82 /* Exposed for testing purposes */ 83 protected SequentialScheduler createScheduler() { 84 return new SequentialScheduler(new PushContinuouslyTask()); 85 } 86 87 private RawChannel.RawEvent createHandler() { 88 return new RawChannel.RawEvent() { 89 90 @Override 91 public int interestOps() { 92 return SelectionKey.OP_READ; 93 } 94 95 @Override 96 public void handle() { 97 state = AVAILABLE; 98 pushScheduler.runOrSchedule(); 99 } 100 }; 101 } 102 103 public void request(long n) { 104 if (demand.increase(n)) { 105 pushScheduler.runOrSchedule(); 106 } 107 } 108 109 /* 110 * Why is this method needed? Since Receiver operates through callbacks 111 * this method allows to abstract out what constitutes as a message being 112 * received (i.e. to decide outside this type when exactly one should 113 * decrement the demand). 114 */ 115 void acknowledge() { 116 long x = demand.decreaseAndGet(1); 117 if (x < 0) { 118 throw new InternalError(String.valueOf(x)); 119 } 120 } 121 122 /* 123 * Stops the machinery from reading and delivering messages permanently, 124 * regardless of the current demand and data availability. 125 */ 126 public void close() throws IOException { 127 pushScheduler.stop(); 128 channel.shutdownInput(); 129 } 130 131 private class PushContinuouslyTask 132 extends SequentialScheduler.CompleteRestartableTask 133 { 134 @Override 135 public void run() { 136 while (!pushScheduler.isStopped()) { 137 if (data.hasRemaining()) { 138 if (!demand.isFulfilled()) { 139 try { 140 int oldPos = data.position(); 141 reader.readFrame(data, frameConsumer); 142 int newPos = data.position(); 143 assert oldPos != newPos : data; // reader always consumes bytes 144 } catch (Throwable e) { 145 pushScheduler.stop(); 146 messageConsumer.onError(e); 147 } 148 continue; 149 } 150 break; 151 } 152 switch (state) { 153 case WAITING: 154 return; 155 case UNREGISTERED: 156 try { 157 state = WAITING; 158 channel.registerEvent(event); 159 } catch (Throwable e) { 160 pushScheduler.stop(); 161 messageConsumer.onError(e); 162 } 163 return; 164 case AVAILABLE: 165 try { 166 data = channel.read(); 167 } catch (Throwable e) { 168 pushScheduler.stop(); 169 messageConsumer.onError(e); 170 return; 171 } 172 if (data == null) { // EOF 173 pushScheduler.stop(); 174 messageConsumer.onComplete(); 175 return; 176 } else if (!data.hasRemaining()) { // No data at the moment 177 // Pretty much a "goto", reusing the existing code path 178 // for registration 179 state = UNREGISTERED; 180 } 181 continue; 182 default: 183 throw new InternalError(String.valueOf(state)); 184 } 185 } 186 } 187 } 188 } |