1 /* 2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.net.http; 26 27 import java.net.http.WSOutgoingMessage.Binary; 28 import java.net.http.WSOutgoingMessage.Close; 29 import java.net.http.WSOutgoingMessage.Ping; 30 import java.net.http.WSOutgoingMessage.Pong; 31 import java.net.http.WSOutgoingMessage.Text; 32 import java.nio.ByteBuffer; 33 import java.nio.CharBuffer; 34 import java.nio.charset.CharacterCodingException; 35 import java.nio.charset.CoderResult; 36 import java.nio.charset.StandardCharsets; 37 import java.util.concurrent.BlockingQueue; 38 import java.util.concurrent.CompletableFuture; 39 import java.util.concurrent.Executor; 40 import java.util.concurrent.LinkedBlockingQueue; 41 import java.util.function.Consumer; 42 43 import static java.lang.String.format; 44 import static java.net.http.Pair.pair; 45 46 /* 47 * Prepares outgoing messages for transmission. Verifies the WebSocket state, 48 * places the message on the outbound queue, and notifies the signal handler. 49 */ 50 final class WSTransmitter { 51 52 private final BlockingQueue<Pair<WSOutgoingMessage, CompletableFuture<WebSocket>>> 53 backlog = new LinkedBlockingQueue<>(); 54 private final WSMessageSender sender; 55 private final WSSignalHandler handler; 56 private final WebSocket webSocket; 57 private boolean previousMessageSent = true; 58 private boolean canSendBinary = true; 59 private boolean canSendText = true; 60 61 WSTransmitter(WebSocket ws, Executor executor, RawChannel channel, Consumer<Throwable> errorHandler) { 62 this.webSocket = ws; 63 this.handler = new WSSignalHandler(executor, this::handleSignal); 64 Consumer<Throwable> sendCompletion = (error) -> { 65 synchronized (this) { 66 if (error == null) { 67 previousMessageSent = true; 68 handler.signal(); 69 } else { 70 errorHandler.accept(error); 71 backlog.forEach(p -> p.second.completeExceptionally(error)); 72 backlog.clear(); 73 } 74 } 75 }; 76 this.sender = new WSMessageSender(channel, sendCompletion); 77 } 78 79 CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) { 80 checkAndUpdateText(isLast); 81 return acceptMessage(new Text(isLast, message)); 82 } 83 84 CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) { 85 checkAndUpdateBinary(isLast); 86 return acceptMessage(new Binary(isLast, message)); 87 } 88 89 CompletableFuture<WebSocket> sendPing(ByteBuffer message) { 90 checkSize(message.remaining(), 125); 91 return acceptMessage(new Ping(message)); 92 } 93 94 CompletableFuture<WebSocket> sendPong(ByteBuffer message) { 95 checkSize(message.remaining(), 125); 96 return acceptMessage(new Pong(message)); 97 } 98 99 CompletableFuture<WebSocket> sendClose(WebSocket.CloseCode code, CharSequence reason) { 100 return acceptMessage(createCloseMessage(code, reason)); 101 } 102 103 CompletableFuture<WebSocket> sendClose() { 104 return acceptMessage(new Close(ByteBuffer.allocate(0))); 105 } 106 107 private CompletableFuture<WebSocket> acceptMessage(WSOutgoingMessage m) { 108 CompletableFuture<WebSocket> cf = new CompletableFuture<>(); 109 synchronized (this) { 110 backlog.offer(pair(m, cf)); 111 } 112 handler.signal(); 113 return cf; 114 } 115 116 /* Callback for pulling messages from the queue, and initiating the send. */ 117 private void handleSignal() { 118 synchronized (this) { 119 while (!backlog.isEmpty() && previousMessageSent) { 120 previousMessageSent = false; 121 Pair<WSOutgoingMessage, CompletableFuture<WebSocket>> p = backlog.peek(); 122 boolean sent = sender.trySendFully(p.first); 123 if (sent) { 124 backlog.remove(); 125 p.second.complete(webSocket); 126 previousMessageSent = true; 127 } 128 } 129 } 130 } 131 132 private Close createCloseMessage(WebSocket.CloseCode code, CharSequence reason) { 133 // TODO: move to construction of CloseDetail (JDK-8155621) 134 ByteBuffer b = ByteBuffer.allocateDirect(125).putChar((char) code.getCode()); 135 CoderResult result = StandardCharsets.UTF_8.newEncoder() 136 .encode(CharBuffer.wrap(reason), b, true); 137 if (result.isError()) { 138 try { 139 result.throwException(); 140 } catch (CharacterCodingException e) { 141 throw new IllegalArgumentException("Reason is a malformed UTF-16 sequence", e); 142 } 143 } else if (result.isOverflow()) { 144 throw new IllegalArgumentException("Reason is too long"); 145 } 146 return new Close(b.flip()); 147 } 148 149 private void checkSize(int size, int maxSize) { 150 if (size > maxSize) { 151 throw new IllegalArgumentException( 152 format("The message is too long: %s;" + 153 " expected not longer than %s", size, maxSize) 154 ); 155 } 156 } 157 158 private void checkAndUpdateText(boolean isLast) { 159 if (!canSendText) { 160 throw new IllegalStateException("Unexpected text message"); 161 } 162 canSendBinary = isLast; 163 } 164 165 private void checkAndUpdateBinary(boolean isLast) { 166 if (!canSendBinary) { 167 throw new IllegalStateException("Unexpected binary message"); 168 } 169 canSendText = isLast; 170 } 171 }