/* * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.net.http; import java.net.http.OutgoingMessage.Binary; import java.net.http.OutgoingMessage.BinaryText; import java.net.http.OutgoingMessage.CharacterText; import java.net.http.OutgoingMessage.Close; import java.net.http.OutgoingMessage.Ping; import java.net.http.OutgoingMessage.Pong; import java.net.http.OutgoingMessage.StreamedText; import java.net.http.OutgoingMessage.Visitor; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.StandardCharsets; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Flow; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static java.lang.String.format; import static java.lang.System.Logger.Level.ERROR; import static java.lang.System.Logger.Level.TRACE; import static java.net.http.Pair.pair; import static java.net.http.Utils.dump; import static java.net.http.Utils.logger; import static java.util.Objects.requireNonNull; final class MessagePublisher implements Flow.Publisher>> { // 1. The queue (backlog) is for storing messages, not for restricting the // number of them. The number of messages is limited by the quota // (maxPermits) instead. Whether or not WebSocket is ready to accept more // messages for sending depends on current number of permits left, not on // how much space is available in the queue. In other words, available space // in the queue is necessary, but not sufficient for being ready to accept a // message for sending. // 2. If a message is ready to be accepted, the queue must be able to store // it, i.e. it must be the case that: // // backlog.remainingCapacity() >= permits.get() // // at all times. // 3. A new message or an unprocessed one is always at the top of the // backlog. private final BlockingQueue>> backlog = new LinkedBlockingQueue<>(); private final AtomicInteger permits; private final AtomicReference exception = new AtomicReference<>(); private final Visitor checkingVisitor = new CheckingVisitor(); private final SignalHandler handler; private final AtomicLong demand = new AtomicLong(); private volatile Flow.Subscriber>> subscriber; MessagePublisher(int maxPermits, Executor executor) { if (maxPermits < 1) { throw new IllegalArgumentException(String.valueOf(maxPermits)); } this.permits = new AtomicInteger(maxPermits); assert capacityAndPermits(); this.handler = new SignalHandler(executor, this::react); } CompletableFuture send(OutgoingMessage message) { logger.log(TRACE, "send(''{0}'')", message); // noinspection ThrowableResultOfMethodCallIgnored if (exception.get() != null) { throw new IllegalStateException("Already closed with an error", exception.get()); } // Decrement only if there's something left int oldPermits = permits.getAndUpdate((x) -> (x - 1) < 0 ? 0 : x - 1); if (oldPermits == 0) { throw new IllegalStateException("Retry after any outstanding send operations has been completed"); } try { CompletableFuture cf; // This particular synchronization is done for mutual exclusion: to // not allow check-then-act race (since 'checkingVisitor' is // stateful). Otherwise there might be a gap between visiting the // message and enqueueing the pair. synchronized (this) { message.accept(checkingVisitor, null); cf = new CompletableFuture<>(); boolean offered = backlog.offer(pair(message, cf)); // That is a programming error actually. Something is wrong // with the algorithm. There must've been a place for this // message in the queue! assert offered : backlog.size(); } handler.signal(); // The order is important: after the permit has been returned, the // user will observe the completion, not the other way around; // otherwise there might be a race return cf.thenRun(permits::incrementAndGet); } catch (RuntimeException e) { // Return the permit taken earlier for the sake of failure // atomicity; maybe it could've been retried later? permits.incrementAndGet(); throw e; } } private void closeExceptionally(Throwable error) { if (exception.compareAndSet(null, requireNonNull(error))) { handler.signal(); } } // FIXME: provide onComplete call as a reaction to Close message being send private void react() { synchronized (this) { while (true) { Throwable e = exception.get(); if (e != null) { backlog.forEach(p -> p.second.completeExceptionally(e)); backlog.clear(); reportError(e); break; } Pair> p = backlog.poll(); if (p != null && demand.getAndUpdate(x -> x == 0 ? 0 : x - 1) > 0) { try { subscriber.onNext(p); } catch (Exception e1) { reportError(e1); } } else { break; } } } } private void reportError(Throwable error) { try { subscriber.onError(error); } catch (Exception e) { logger.log(ERROR, "onError threw an error", e); } } @Override public void subscribe(Flow.Subscriber>> subscriber) { this.subscriber = requireNonNull(subscriber); this.subscriber.onSubscribe( new Flow.Subscription() { @Override public void request(long n) { if (n < 0) { throw new IllegalArgumentException(String.valueOf(n)); } demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i); handler.signal(); } @Override public void cancel() { closeExceptionally(new IllegalStateException("Cancellation")); } }); } // Performs several checks for incoming messages and updates the state // before they are enqueued for sending. These are only quick checks. private static final class CheckingVisitor implements Visitor { private static final int CLOSE_MESSAGE_SENT = 1; // Once there forever there private static final int TEXT_EXPECTED = 2; private static final int BINARY_EXPECTED = 4; // Initially any kinds of messages are expected private int state = TEXT_EXPECTED | BINARY_EXPECTED; @Override public Void visit(CharacterText message, Void attachment) { checkClose(); checkText(message.isLast); return null; } @Override public Void visit(BinaryText message, Void attachment) { checkClose(); checkText(message.isLast); return null; } @Override public Void visit(StreamedText message, Void attachment) { checkClose(); checkText(true); return null; } @Override public Void visit(Binary message, Void attachment) { checkClose(); if ((state & BINARY_EXPECTED) == 0) { throw new IllegalStateException("Unexpected binary message"); } if (message.isLast) { state |= TEXT_EXPECTED; } else { state &= ~TEXT_EXPECTED; } return null; } @Override public Void visit(Ping ping, Void attachment) { checkClose(); checkSize(ping.bytes.remaining(), 125); return null; } @Override public Void visit(Pong pong, Void attachment) { checkClose(); checkSize(pong.bytes.remaining(), 125); return null; } @Override public Void visit(Close close, Void attachment) { logger.log(TRACE, "Checking Close message ''{0}''", close); checkClose(); state |= CLOSE_MESSAGE_SENT; if (close.reason.length() != 0) { // It would be inappropriate to use // // reason.toString().getBytes(StandardCharsets.UTF_8) // // because it is fault-tolerant by design, i.e. it doesn't // report coding errors ByteBuffer encodedReason; try { encodedReason = StandardCharsets.UTF_8.newEncoder() .encode(CharBuffer.wrap(close.reason)); } catch (CharacterCodingException e) { throw new IllegalArgumentException( "The closure reason is a malformed UTF-16 sequence", e); } int len = encodedReason.remaining(); if (len > 123) { throw new IllegalArgumentException( "The closure reason is too long (max 123 bytes): " + len); } } return null; } private void checkClose() { if ((state & CLOSE_MESSAGE_SENT) != 0) { throw new IllegalStateException ("A Close message has been accepted previously"); } } private void checkSize(int size, int maxSize) { if (size > maxSize) { throw new IllegalArgumentException( format("The message is too long: %s;" + " expected not longer than %s", size, maxSize) ); } } private void checkText(boolean isLast) { if ((state & TEXT_EXPECTED) == 0) { throw new IllegalStateException("Unexpected text message"); } if (isLast) { state |= BINARY_EXPECTED; } else { state &= ~BINARY_EXPECTED; } } } private boolean capacityAndPermits() { int permits = this.permits.get(); int remainingCapacity = this.backlog.remainingCapacity(); assert permits <= remainingCapacity : dump(permits, remainingCapacity); return true; } }