1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http.internal.websocket; 27 28 import java.io.IOException; 29 import java.nio.ByteBuffer; 30 import java.nio.channels.SelectionKey; 31 import java.util.concurrent.atomic.AtomicLong; 32 33 /* 34 * Receives incoming data from the channel on demand and converts it into a 35 * stream of WebSocket messages which are then delivered to the supplied message 36 * consumer in a strict sequential order and non-recursively. In other words, 37 * 38 * onText() 39 * onText() 40 * onBinary() 41 * ... 42 * 43 * instead of 44 * 45 * onText() 46 * onText() 47 * onBinary() 48 * ... 49 * 50 * even if `request(long n)` is called from inside these invocations. 51 */ 52 final class Receiver { 53 54 private final MessageStreamConsumer messageConsumer; 55 private final RawChannel channel; 56 private final FrameConsumer frameConsumer; 57 private final Frame.Reader reader = new Frame.Reader(); 58 private final RawChannel.RawEvent event = createHandler(); 59 private final AtomicLong demand = new AtomicLong(); 60 private final CooperativeHandler handler; 61 62 private ByteBuffer data; 63 private volatile int state; 64 65 private static final int UNREGISTERED = 0; 66 private static final int AVAILABLE = 1; 67 private static final int WAITING = 2; 68 69 Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) { 70 this.messageConsumer = messageConsumer; 71 this.channel = channel; 72 this.frameConsumer = new FrameConsumer(this.messageConsumer); 73 this.data = channel.initialByteBuffer(); 74 // To ensure the initial non-final `data` will be visible 75 // (happens-before) when `handler` invokes `pushContinuously` 76 // the following assignment is done last: 77 handler = new CooperativeHandler(this::pushContinuously); 78 } 79 80 private RawChannel.RawEvent createHandler() { 81 return new RawChannel.RawEvent() { 82 83 @Override 84 public int interestOps() { 85 return SelectionKey.OP_READ; 86 } 87 88 @Override 89 public void handle() { 90 state = AVAILABLE; 91 handler.handle(); 92 } 93 }; 94 } 95 96 void request(long n) { 97 if (n < 0L) { 98 throw new IllegalArgumentException("Negative: " + n); 99 } 100 demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i); 101 handler.handle(); 102 } 103 104 void acknowledge() { 105 long x = demand.decrementAndGet(); 106 if (x < 0) { 107 throw new InternalError(String.valueOf(x)); 108 } 109 } 110 111 /* 112 * Stops the machinery from reading and delivering messages permanently, 113 * regardless of the current demand and data availability. 114 */ 115 void close() { 116 handler.stop(); 117 } 118 119 private void pushContinuously() { 120 while (!handler.isStopped()) { 121 if (data.hasRemaining()) { 122 if (demand.get() > 0) { 123 try { 124 int oldPos = data.position(); 125 reader.readFrame(data, frameConsumer); 126 int newPos = data.position(); 127 assert oldPos != newPos : data; // reader always consumes bytes 128 } catch (FailWebSocketException e) { 129 handler.stop(); 130 messageConsumer.onError(e); 131 } 132 continue; 133 } 134 break; 135 } 136 switch (state) { 137 case WAITING: 138 return; 139 case UNREGISTERED: 140 try { 141 state = WAITING; 142 channel.registerEvent(event); 143 } catch (IOException e) { 144 handler.stop(); 145 messageConsumer.onError(e); 146 } 147 return; 148 case AVAILABLE: 149 try { 150 data = channel.read(); 151 } catch (IOException e) { 152 handler.stop(); 153 messageConsumer.onError(e); 154 return; 155 } 156 if (data == null) { // EOF 157 handler.stop(); 158 messageConsumer.onComplete(); 159 return; 160 } else if (!data.hasRemaining()) { // No data at the moment 161 // Pretty much a "goto", reusing the existing code path 162 // for registration 163 state = UNREGISTERED; 164 } 165 continue; 166 default: 167 throw new InternalError(String.valueOf(state)); 168 } 169 } 170 } 171 } 172