1 /*
   2  * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General  License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General  License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General  License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.net.http;
  26 
  27 import java.net.http.WSOutgoingMessage.Binary;
  28 import java.net.http.WSOutgoingMessage.Close;
  29 import java.net.http.WSOutgoingMessage.Ping;
  30 import java.net.http.WSOutgoingMessage.Pong;
  31 import java.net.http.WSOutgoingMessage.Text;
  32 import java.nio.ByteBuffer;
  33 import java.nio.CharBuffer;
  34 import java.nio.charset.CharacterCodingException;
  35 import java.nio.charset.CoderResult;
  36 import java.nio.charset.StandardCharsets;
  37 import java.util.concurrent.BlockingQueue;
  38 import java.util.concurrent.CompletableFuture;
  39 import java.util.concurrent.Executor;
  40 import java.util.concurrent.LinkedBlockingQueue;
  41 import java.util.function.Consumer;
  42 
  43 import static java.lang.String.format;
  44 import static java.net.http.Pair.pair;
  45 
  46 /*
  47  * Prepares outgoing messages for transmission.  Verifies the WebSocket state,
  48  * places the message on the outbound queue, and notifies the signal handler.
  49  */
  50 final class WSTransmitter {
  51 
  52     private final BlockingQueue<Pair<WSOutgoingMessage, CompletableFuture<WebSocket>>>
  53             backlog = new LinkedBlockingQueue<>();
  54     private final WSMessageSender sender;
  55     private final WSSignalHandler handler;
  56     private final WebSocket webSocket;
  57     private boolean previousMessageSent = true;
  58     private boolean canSendBinary = true;
  59     private boolean canSendText = true;
  60 
  61     WSTransmitter(WebSocket ws, Executor executor, RawChannel channel, Consumer<Throwable> errorHandler) {
  62         this.webSocket = ws;
  63         this.handler = new WSSignalHandler(executor, this::handleSignal);
  64         Consumer<Throwable> sendCompletion = (error) -> {
  65             synchronized (this) {
  66                 if (error == null) {
  67                     previousMessageSent = true;
  68                     handler.signal();
  69                 } else {
  70                     errorHandler.accept(error);
  71                     backlog.forEach(p -> p.second.completeExceptionally(error));
  72                     backlog.clear();
  73                 }
  74             }
  75         };
  76         this.sender = new WSMessageSender(channel, sendCompletion);
  77     }
  78 
  79     CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) {
  80         checkAndUpdateText(isLast);
  81         return acceptMessage(new Text(isLast, message));
  82     }
  83 
  84     CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
  85         checkAndUpdateBinary(isLast);
  86         return acceptMessage(new Binary(isLast, message));
  87     }
  88 
  89     CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
  90         checkSize(message.remaining(), 125);
  91         return acceptMessage(new Ping(message));
  92     }
  93 
  94     CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
  95         checkSize(message.remaining(), 125);
  96         return acceptMessage(new Pong(message));
  97     }
  98 
  99     CompletableFuture<WebSocket> sendClose(WebSocket.CloseCode code, CharSequence reason) {
 100         return acceptMessage(createCloseMessage(code, reason));
 101     }
 102 
 103     CompletableFuture<WebSocket> sendClose() {
 104         return acceptMessage(new Close(ByteBuffer.allocate(0)));
 105     }
 106 
 107     private CompletableFuture<WebSocket> acceptMessage(WSOutgoingMessage m) {
 108         CompletableFuture<WebSocket> cf = new CompletableFuture<>();
 109         synchronized (this) {
 110             backlog.offer(pair(m, cf));
 111         }
 112         handler.signal();
 113         return cf;
 114     }
 115 
 116     /* Callback for pulling messages from the queue, and initiating the send. */
 117     private void handleSignal() {
 118         synchronized (this) {
 119             while (!backlog.isEmpty() && previousMessageSent) {
 120                 previousMessageSent = false;
 121                 Pair<WSOutgoingMessage, CompletableFuture<WebSocket>> p = backlog.peek();
 122                 boolean sent = sender.trySendFully(p.first);
 123                 if (sent) {
 124                     backlog.remove();
 125                     p.second.complete(webSocket);
 126                     previousMessageSent = true;
 127                 }
 128             }
 129         }
 130     }
 131 
 132     private Close createCloseMessage(WebSocket.CloseCode code, CharSequence reason) {
 133         // TODO: move to construction of CloseDetail (JDK-8155621)
 134         ByteBuffer b = ByteBuffer.allocateDirect(125).putChar((char) code.getCode());
 135         CoderResult result = StandardCharsets.UTF_8.newEncoder()
 136                 .encode(CharBuffer.wrap(reason), b, true);
 137         if (result.isError()) {
 138             try {
 139                 result.throwException();
 140             } catch (CharacterCodingException e) {
 141                 throw new IllegalArgumentException("Reason is a malformed UTF-16 sequence", e);
 142             }
 143         } else if (result.isOverflow()) {
 144             throw new IllegalArgumentException("Reason is too long");
 145         }
 146         return new Close(b.flip());
 147     }
 148 
 149     private void checkSize(int size, int maxSize) {
 150         if (size > maxSize) {
 151             throw new IllegalArgumentException(
 152                     format("The message is too long: %s;" +
 153                             " expected not longer than %s", size, maxSize)
 154             );
 155         }
 156     }
 157 
 158     private void checkAndUpdateText(boolean isLast) {
 159         if (!canSendText) {
 160             throw new IllegalStateException("Unexpected text message");
 161         }
 162         canSendBinary = isLast;
 163     }
 164 
 165     private void checkAndUpdateBinary(boolean isLast) {
 166         if (!canSendBinary) {
 167             throw new IllegalStateException("Unexpected binary message");
 168         }
 169         canSendText = isLast;
 170     }
 171 }