/* * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package java.net.http; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Flow; import static java.lang.System.Logger.Level.ERROR; import static java.lang.System.Logger.Level.TRACE; import static java.net.http.Utils.EMPTY_BYTE_BUFFER; import static java.net.http.Utils.logger; import static java.util.Objects.requireNonNull; // // Receives pairs of (Shared[], CompletableFuture). Writes // buffers to the channel, disposing them as soon as they have been written. // Finally completing the completable future (that came in the pair) if not // 'null', when all have been written. In case if an error has occurred, // completes this and possibly _all other_ incoming completable futures // exceptionally with this error. // final class Writer implements Flow.Subscriber[], CompletableFuture>> { // To not tie up a thread in the write loop indefinitely without any // progress we define a limit for the number of consecutive 0-writes private static final int MAX_INEFFECTIVE_WRITES = 3; private final SignalHandler handler; private final RawChannel channel; private final RawChannel.NonBlockingEvent channelEvent; // TODO: explain (copy from the existing source) private long bytesRemaining; private int disposalIndex; private ByteBuffer[] byteBuffers; private boolean initialized; private volatile Flow.Subscription subscription; private volatile boolean closed; private volatile Exception exception; private volatile Pair[], CompletableFuture> pair; private volatile boolean writable; Writer(Executor executor, RawChannel channel) { this.handler = new SignalHandler(executor, this::write); this.channel = requireNonNull(channel); this.channelEvent = new RawChannel.NonBlockingEvent() { @Override public int interestOps() { return SelectionKey.OP_WRITE; } @Override public void handle() { logger.log(TRACE, "The channel can be written to"); writable = true; handler.signal(); } @Override public String toString() { return "WebSocket writing"; } }; } void start() throws UncheckedIOException { logger.log(TRACE, "->Writer.start()"); // This registration is located outside the constructor to not allow // unsafe publication of OutgoingWebSocket to some other thread that // serves channel events synchronized (this) { try { registerWriteEvent(); } catch (IOException e) { throw new UncheckedIOException(e); } } logger.log(TRACE, "<-Writer.start"); } private void registerWriteEvent() throws IOException { logger.log(TRACE, "->Registering write event"); channel.registerEvent(channelEvent); logger.log(TRACE, "<-Registering write event"); } private void write() { synchronized (this) { while (writable && pair != null && !closed) { if (logger.isLoggable(TRACE)) { logger.log(TRACE, "{0}, {1}, {2}, {3}", writable, pair, closed, bytesRemaining); } Shared[] buffers = pair.first; if (!initialized) { disposalIndex = 0; byteBuffers = new ByteBuffer[buffers.length]; for (int i = 0; i < buffers.length; i++) { Shared b = buffers[i]; final int rem = b.remaining(); bytesRemaining += rem; byteBuffers[i] = b.buffer(); if (logger.isLoggable(TRACE)) { logger.log(TRACE, "{0} byte(s) in the buffer #{1}", rem, i); } } initialized = true; if (logger.isLoggable(TRACE)) { logger.log(TRACE, "Total {0} byte(s) in {1} buffer(s) to be written", bytesRemaining, buffers.length); } } // FIXME: throw CME if noticed outer modification of payload buffer // TODO: use pre-created direct buffers for short writes final CompletableFuture cf = pair.second; int numIneffectiveWrites = MAX_INEFFECTIVE_WRITES; try { while (bytesRemaining > 0) { long bytesWritten = channel.write(byteBuffers, 0, byteBuffers.length); if (logger.isLoggable(TRACE)) { logger.log(TRACE, "{0} byte(s) has been written", bytesWritten); } if (bytesWritten == 0) { if (--numIneffectiveWrites != 0) { if (logger.isLoggable(TRACE)) { logger.log(TRACE, "Ineffective writes left {0}", numIneffectiveWrites); } Thread.yield(); } else { logger.log(TRACE, "No more data can be written to the channel at the moment"); writable = false; registerWriteEvent(); break; } } else { // Reset the inefficiency counter, since we've progressed numIneffectiveWrites = MAX_INEFFECTIVE_WRITES; bytesRemaining -= bytesWritten; while (disposalIndex < byteBuffers.length && !byteBuffers[disposalIndex].hasRemaining()) { if (logger.isLoggable(TRACE)) { logger.log(TRACE, "Disposing buffer #{0} as it''s been fully written from", disposalIndex); } // Must replace the buffer in the array. // Otherwise it will be cleared, and then, and // since it's still in the byteBuffers array, // may be used in the next write operation. // Clearing means hasRemaining() may become true byteBuffers[disposalIndex] = EMPTY_BYTE_BUFFER; buffers[disposalIndex++].dispose(); } } } assert bytesRemaining >= 0 : bytesRemaining; if (bytesRemaining == 0) { logger.log(TRACE, "The buffers have been fully written"); pair = null; initialized = false; if (cf != null) { cf.complete(null); } subscription.request(1); } } catch (IOException e) { logger.log(ERROR, "Error writing from buffers", e); closed = true; exception = e; if (cf != null) { cf.completeExceptionally(e); } pair = null; } } } } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = requireNonNull(subscription); this.subscription.request(1); } @Override public void onNext(Pair[], CompletableFuture> p) { logger.log(TRACE, "->Writer.onNext(''{0}'')", p); if (!closed) { pair = p; handler.signal(); } else { logger.log(TRACE, "Completing incoming {0} exceptionally", p.second); if (p.second != null) { p.second.completeExceptionally(exception); } } logger.log(TRACE, "<-Writer.onNext", p); } @Override public void onError(Throwable throwable) { closed = true; } @Override public void onComplete() { closed = true; } }