/* * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.net.http; import java.io.IOException; import java.net.ProtocolException; import java.net.http.OutgoingMessage.Binary; import java.net.http.OutgoingMessage.BinaryText; import java.net.http.OutgoingMessage.CharacterText; import java.net.http.OutgoingMessage.Close; import java.net.http.OutgoingMessage.Ping; import java.net.http.OutgoingMessage.Pong; import java.net.http.OutgoingMessage.StreamedText; import java.net.http.OutgoingMessage.Visitor; import java.nio.ByteBuffer; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Stream; import static java.lang.System.Logger.Level.ERROR; import static java.lang.System.Logger.Level.TRACE; import static java.lang.System.Logger.Level.WARNING; import static java.net.http.Utils.dump; import static java.net.http.Utils.logger; import static java.util.Objects.requireNonNull; // // +----+ +----+ +---------+ // | \ \ \ \ | // (WS)->| MP / / MS / / Writer |---+ // +----+ +----+ +---------+ | // +->-----------------------------------~ // final class WebSocketImpl implements WebSocket { private final String subprotocol; private final RawChannel channel; private final MessagePublisher messagePublisher; private final MessageSender messageSender; private final Writer writer; private final WSReceiver receiver; private final CompletableFuture closed = new CompletableFuture<>(); private final Visitor, Void> v = createStateVisitor(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Listener listener; private final Executor executor; private volatile State state = State.CONNECTED; private final Object pongLock = new Object(); private CompletableFuture lastSendPongFuture = CompletableFuture.completedFuture(null); static CompletableFuture newInstanceAsync(WebSocketBuilderImpl b) { CompletableFuture result = new OpeningHandshake(b).performAsync(); // Creating a snapshot, since by the time the handshake has been // completed, the builder might have been changed Listener listener = b.getListener(); Executor executor = b.getClient().executorService(); return result.thenApply(r -> { WebSocketImpl ws = new WebSocketImpl(listener, r.subprotocol, r.channel, executor); ws.start(); return ws; }); } private WebSocketImpl(Listener listener, String subprotocol, RawChannel channel, Executor executor) { this.listener = wrapListener(requireNonNull(listener)); this.channel = requireNonNull(channel); this.executor = requireNonNull(executor); this.subprotocol = subprotocol; logger.log(TRACE, "Executor in use: {0}", executor); messagePublisher = new MessagePublisher(1, executor); messageSender = new MessageSender(executor); writer = new Writer(executor, channel); messagePublisher.subscribe(messageSender); messageSender.subscribe(writer); writer.start(); SharedPool pool = new SharedPool<>( () -> ByteBuffer.allocateDirect(32768), 2); receiver = new WSReceiver(this.listener, this, executor, channel, pool); } private void start() { receiver.start(); } @Override public CompletableFuture sendText(ByteBuffer message, boolean isLast) { requireNonNull(message, "message"); return new BinaryText(isLast, message).accept(v, null); } @Override public CompletableFuture sendText(CharSequence message, boolean isLast) { requireNonNull(message, "message"); // We can reuse a ByteBuffer without a UTF-8 check in the case if the // passed CharSequence is in fact a WebSocket.Text. But. Only if we know // that the Text instance represents _a complete Message_, otherwise the // byte buffer might be context-sensitive. i.e. it might represent a // valid _continuation_ of some UTF-8 sequence, not a valid UTF-8 // sequence itself. // I don't know if given the complexity and assumed prevalence of such // cases makes it worth thinking about at all. return new CharacterText(isLast, message).accept(v, null); } @Override public CompletableFuture sendText(Stream message) { requireNonNull(message, "message"); return new StreamedText(message).accept(v, null); } @Override public CompletableFuture sendBinary(ByteBuffer message, boolean isLast) { requireNonNull(message, "message"); return new Binary(isLast, message).accept(v, null); } @Override public CompletableFuture sendPing(ByteBuffer message) { requireNonNull(message, "message"); return new Ping(message).accept(v, null); } @Override public CompletableFuture sendPong(ByteBuffer message) { synchronized (lock) { requireNonNull(message, "message"); lastSendPongFuture = new Pong(message).accept(v, null); } return lastSendPongFuture; } @Override public CompletableFuture sendClose(CloseCode code, CharSequence reason) { requireNonNull(code, "code"); requireNonNull(reason, "reason"); return new Close(code, reason).accept(v, null); } @Override public CompletableFuture sendClose() { return new Close().accept(v, null); } @Override public long request(long n) { if (n < 0L) { throw new IllegalArgumentException ("The number must not be negative: " + n); } return receiver.request(n); } @Override public String getSubprotocol() { return subprotocol; } @Override public boolean isClosed() { return state.isClosed(); } @Override public void abort() throws IOException { lock.writeLock().lock(); try { if (state.isClosed()) { return; } setState(State.ABORTED); channel.close(); } finally { lock.writeLock().unlock(); } } private Visitor, Void> createStateVisitor() { return new Visitor<>() { @Override public CompletableFuture visit(CharacterText message, Void attachment) { return visitNonClose(message); } @Override public CompletableFuture visit(BinaryText message, Void attachment) { return visitNonClose(message); } @Override public CompletableFuture visit(StreamedText message, Void attachment) { return visitNonClose(message); } @Override public CompletableFuture visit(Binary message, Void attachment) { return visitNonClose(message); } @Override public CompletableFuture visit(Ping message, Void attachment) { return visitNonClose(message); } @Override public CompletableFuture visit(Pong message, Void attachment) { return visitNonClose(message); } @Override public CompletableFuture visit(Close message, Void attachment) { lock.writeLock().lock(); try { if (state.isClosed() || state == State.CLOSED_LOCALLY) { throw new IllegalStateException(state.toString()); } else if (state == State.CLOSED_REMOTELY) { setState(State.CLOSED); } else { setState(State.CLOSED_LOCALLY); } return messagePublisher.send(message); } finally { lock.writeLock().unlock(); } } private CompletableFuture visitNonClose(OutgoingMessage message) { lock.readLock().lock(); try { if (state.isClosed()) { throw new IllegalStateException(state.toString()); } return messagePublisher.send(message); } finally { lock.readLock().unlock(); } } }; } private Listener wrapListener(Listener listener) { return new Listener() { @Override public void onOpen(WebSocket webSocket) { listener.onOpen(webSocket); } @Override public CompletionStage onText(WebSocket webSocket, Text message, MessagePart part) { return listener.onText(webSocket, message, part); } @Override public CompletionStage onBinary(WebSocket webSocket, ByteBuffer message, MessagePart part) { return listener.onBinary(webSocket, message, part); } @Override public CompletionStage onPing(WebSocket webSocket, ByteBuffer message) { return listener.onPing(webSocket, message); } @Override public CompletionStage onPong(WebSocket webSocket, ByteBuffer message) { return listener.onPong(webSocket, message); } /** @noinspection OptionalUsedAsFieldOrParameterType*/ @Override public void onClose(WebSocket webSocket, Optional code, String reason) { lock.writeLock().tryLock(); try { if (state == State.CLOSED_REMOTELY || state == State.CLOSED) { throw new IllegalStateException(); } else if (state == State.CLOSED_LOCALLY) { setState(State.CLOSED); } else if (state == State.CONNECTED) { setState(State.CLOSED_REMOTELY); } } finally { lock.writeLock().unlock(); } listener.onClose(webSocket, code, reason); } @Override public void onError(WebSocket webSocket, Throwable error) { if (error instanceof ProtocolException && (error.getCause() instanceof WebSocketProtocolException)) { final WebSocketProtocolException cause = (WebSocketProtocolException) error.getCause(); logger.log(WARNING, "Automatically closing {0}, reason: {1}", webSocket, cause.getMessage() ); final CloseCode cc = cause.getCloseCode(); webSocket.sendClose(cc, "RFC 6455 " + cause.getSection()) .whenComplete((v, t) -> { try { webSocket.abort(); } catch (IOException e) { logger.log(ERROR, e); } }); } listener.onError(webSocket, error); } }; } private void setState(State newState) { state = newState; if (newState.isClosed()) { assert lock.isWriteLockedByCurrentThread() : dump(lock.writeLock(), lock.readLock()); closed.complete(null); } } CompletionStage whenClosed() { return closed; } CompletableFuture lastSentPongFuture() { return lastSendPongFuture; } CompletionStage sendPongIfNoOutstanding(ByteBuffer message) { synchronized (pongLock) { if (lastSendPongFuture.isDone()) { return sendPong(message); } } return CompletableFuture.completedFuture(null); } private enum State { CONNECTED, CLOSED_REMOTELY, CLOSED_LOCALLY, CLOSED, ABORTED; boolean isClosed() { return this == CLOSED || this == ABORTED; } } @Override public String toString() { return super.toString() + "[" + state + "]"; } }