1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package java.net.http; 27 28 import java.io.IOException; 29 import java.io.UncheckedIOException; 30 import java.net.ProtocolException; 31 import java.net.http.WebSocket.Listener; 32 import java.nio.ByteBuffer; 33 import java.nio.CharBuffer; 34 import java.nio.channels.SelectionKey; 35 import java.util.Optional; 36 import java.util.concurrent.CompletionStage; 37 import java.util.concurrent.Executor; 38 import java.util.concurrent.atomic.AtomicBoolean; 39 import java.util.concurrent.atomic.AtomicLong; 40 import java.util.function.Supplier; 41 42 import static java.lang.System.Logger.Level.ERROR; 43 import static java.net.http.WSUtils.EMPTY_BYTE_BUFFER; 44 import static java.net.http.WSUtils.logger; 45 46 /* 47 * Receives incoming data from the channel and converts it into a sequence of 48 * messages, which are then passed to the listener. 49 */ 50 final class WSReceiver { 51 52 private final Listener listener; 53 private final WebSocket webSocket; 54 private final Supplier<WSShared<ByteBuffer>> buffersSupplier = 55 new WSSharedPool<>(() -> ByteBuffer.allocateDirect(32768), 2); 56 private final RawChannel channel; 57 private final RawChannel.NonBlockingEvent channelEvent; 58 private final WSSignalHandler handler; 59 private final AtomicLong demand = new AtomicLong(); 60 private final AtomicBoolean readable = new AtomicBoolean(); 61 private boolean started; 62 private volatile boolean closed; 63 private final WSFrame.Reader reader = new WSFrame.Reader(); 64 private final WSFrameConsumer frameConsumer; 65 private WSShared<ByteBuffer> buf = WSShared.wrap(EMPTY_BYTE_BUFFER); 66 private WSShared<ByteBuffer> data; // TODO: initialize with leftovers from the RawChannel 67 68 WSReceiver(Listener listener, WebSocket webSocket, Executor executor, 69 RawChannel channel) { 70 this.listener = listener; 71 this.webSocket = webSocket; 72 this.channel = channel; 73 handler = new WSSignalHandler(executor, this::react); 74 channelEvent = createChannelEvent(); 75 this.frameConsumer = new WSFrameConsumer(new MessageConsumer()); 76 } 77 78 private void react() { 79 synchronized (this) { 80 while (demand.get() > 0 && !closed) { 81 try { 82 if (data == null) { 83 if (!getData()) { 84 break; 85 } 86 } 87 reader.readFrame(data, frameConsumer); 88 if (!data.hasRemaining()) { 89 data.dispose(); 90 data = null; 91 } 92 // In case of exception we don't need to clean any state, 93 // since it's the terminal condition anyway. Nothing will be 94 // retried. 95 } catch (WSProtocolException e) { 96 // Translate into ProtocolException 97 closeExceptionally(new ProtocolException().initCause(e)); 98 } catch (Exception e) { 99 closeExceptionally(e); 100 } 101 } 102 } 103 } 104 105 void request(long n) { 106 long newDemand = demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i); 107 handler.signal(); 108 assert newDemand >= 0 : newDemand; 109 } 110 111 private boolean getData() throws IOException { 112 if (!readable.get()) { 113 return false; 114 } 115 if (!buf.hasRemaining()) { 116 buf.dispose(); 117 buf = buffersSupplier.get(); 118 assert buf.hasRemaining() : buf; 119 } 120 int oldPosition = buf.position(); 121 int oldLimit = buf.limit(); 122 int numRead = channel.read(buf.buffer()); 123 if (numRead > 0) { 124 data = buf.share(oldPosition, oldPosition + numRead); 125 buf.select(buf.limit(), oldLimit); // Move window to the free region 126 return true; 127 } else if (numRead == 0) { 128 readable.set(false); 129 channel.registerEvent(channelEvent); 130 return false; 131 } else { 132 assert numRead < 0 : numRead; 133 throw new WSProtocolException 134 ("7.2.1.", "Stream ended before a Close frame has been received"); 135 } 136 } 137 138 void start() { 139 synchronized (this) { 140 if (started) { 141 throw new IllegalStateException("Already started"); 142 } 143 started = true; 144 try { 145 channel.registerEvent(channelEvent); 146 } catch (IOException e) { 147 throw new UncheckedIOException(e); 148 } 149 try { 150 listener.onOpen(webSocket); 151 } catch (Exception e) { 152 closeExceptionally(new RuntimeException("onOpen threw an exception", e)); 153 } 154 } 155 } 156 157 private void close() { // TODO: move to WS.java 158 closed = true; 159 } 160 161 private void closeExceptionally(Throwable error) { // TODO: move to WS.java 162 close(); 163 try { 164 listener.onError(webSocket, error); 165 } catch (Exception e) { 166 logger.log(ERROR, "onError threw an exception", e); 167 } 168 } 169 170 private final class MessageConsumer implements WSMessageConsumer { 171 172 @Override 173 public void onText(WebSocket.MessagePart part, WSShared<CharBuffer> data) { 174 decrementDemand(); 175 CompletionStage<?> cs; 176 try { 177 cs = listener.onText(webSocket, data.buffer(), part); 178 } catch (Exception e) { 179 closeExceptionally(new RuntimeException("onText threw an exception", e)); 180 return; 181 } 182 follow(cs, data, "onText"); 183 } 184 185 @Override 186 public void onBinary(WebSocket.MessagePart part, WSShared<ByteBuffer> data) { 187 decrementDemand(); 188 CompletionStage<?> cs; 189 try { 190 cs = listener.onBinary(webSocket, data.buffer(), part); 191 } catch (Exception e) { 192 closeExceptionally(new RuntimeException("onBinary threw an exception", e)); 193 return; 194 } 195 follow(cs, data, "onBinary"); 196 } 197 198 @Override 199 public void onPing(WSShared<ByteBuffer> data) { 200 decrementDemand(); 201 CompletionStage<?> cs; 202 try { 203 cs = listener.onPing(webSocket, data.buffer()); 204 } catch (Exception e) { 205 closeExceptionally(new RuntimeException("onPing threw an exception", e)); 206 return; 207 } 208 follow(cs, data, "onPing"); 209 } 210 211 @Override 212 public void onPong(WSShared<ByteBuffer> data) { 213 decrementDemand(); 214 CompletionStage<?> cs; 215 try { 216 cs = listener.onPong(webSocket, data.buffer()); 217 } catch (Exception e) { 218 closeExceptionally(new RuntimeException("onPong threw an exception", e)); 219 return; 220 } 221 follow(cs, data, "onPong"); 222 } 223 224 @Override 225 public void onClose(WebSocket.CloseCode code, CharSequence reason) { 226 decrementDemand(); 227 try { 228 close(); 229 listener.onClose(webSocket, Optional.ofNullable(code), reason.toString()); 230 } catch (Exception e) { 231 logger.log(ERROR, "onClose threw an exception", e); 232 } 233 } 234 } 235 236 private void follow(CompletionStage<?> cs, WSDisposable d, String source) { 237 if (cs == null) { 238 d.dispose(); 239 } else { 240 cs.whenComplete((whatever, error) -> { 241 if (error != null) { 242 String m = "CompletionStage returned by " + source + " completed exceptionally"; 243 closeExceptionally(new RuntimeException(m, error)); 244 } 245 d.dispose(); 246 }); 247 } 248 } 249 250 private void decrementDemand() { 251 long newDemand = demand.decrementAndGet(); 252 assert newDemand >= 0 : newDemand; 253 } 254 255 private RawChannel.NonBlockingEvent createChannelEvent() { 256 return new RawChannel.NonBlockingEvent() { 257 258 @Override 259 public int interestOps() { 260 return SelectionKey.OP_READ; 261 } 262 263 @Override 264 public void handle() { 265 boolean wasNotReadable = readable.compareAndSet(false, true); 266 assert wasNotReadable; 267 handler.signal(); 268 } 269 270 @Override 271 public String toString() { 272 return "Read readiness event [" + channel + "]"; 273 } 274 }; 275 } 276 }