/* * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ package jdk.internal.net.http.common; import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; import java.io.IOException; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.IntBinaryOperator; /** * Implements SSL using two SubscriberWrappers. * *

Constructor takes two Flow.Subscribers: one that receives the network * data (after it has been encrypted by SSLFlowDelegate) data, and one that * receives the application data (before it has been encrypted by SSLFlowDelegate). * *

Methods upstreamReader() and upstreamWriter() return the corresponding * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data. * See diagram below. * *

How Flow.Subscribers are used in this class, and where they come from: *

 * {@code
 *
 *
 *
 * --------->  data flow direction
 *
 *
 *                         +------------------+
 *        upstreamWriter   |                  | downWriter
 *        ---------------> |                  | ------------>
 *  obtained from this     |                  | supplied to constructor
 *                         | SSLFlowDelegate  |
 *        downReader       |                  | upstreamReader
 *        <--------------- |                  | <--------------
 * supplied to constructor |                  | obtained from this
 *                         +------------------+
 *
 * Errors are reported to the downReader Flow.Subscriber
 *
 * }
 * 
*/ public class SSLFlowDelegate { final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); // When handshake is in progress trying to wrap may produce no bytes. private static final ByteBuffer NOTHING = ByteBuffer.allocate(0); private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate"); private static final boolean isMonitored = monProp != null && (monProp.isEmpty() || monProp.equalsIgnoreCase("true")); final Executor exec; final Reader reader; final Writer writer; final SSLEngine engine; final String tubeName; // hack final CompletableFuture alpnCF; // completes on initial handshake final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped volatile boolean close_notify_received; final CompletableFuture readerCF; final CompletableFuture writerCF; final Consumer recycler; static AtomicInteger scount = new AtomicInteger(1); final int id; /** * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each * Flow.Subscriber requires an associated {@link CompletableFuture} * for errors that need to be signaled from downstream to upstream. */ public SSLFlowDelegate(SSLEngine engine, Executor exec, Subscriber> downReader, Subscriber> downWriter) { this(engine, exec, null, downReader, downWriter); } /** * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each * Flow.Subscriber requires an associated {@link CompletableFuture} * for errors that need to be signaled from downstream to upstream. */ public SSLFlowDelegate(SSLEngine engine, Executor exec, Consumer recycler, Subscriber> downReader, Subscriber> downWriter) { this.id = scount.getAndIncrement(); this.tubeName = String.valueOf(downWriter); this.recycler = recycler; this.reader = new Reader(); this.writer = new Writer(); this.engine = engine; this.exec = exec; this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); this.readerCF = reader.completion(); this.writerCF = reader.completion(); readerCF.exceptionally(this::stopOnError); writerCF.exceptionally(this::stopOnError); CompletableFuture.allOf(reader.completion(), writer.completion()) .thenRun(this::normalStop); this.alpnCF = new MinimalFuture<>(); // connect the Reader to the downReader and the // Writer to the downWriter. connect(downReader, downWriter); if (isMonitored) Monitor.add(monitor); } /** * Returns true if the SSLFlowDelegate has detected a TLS * close_notify from the server. * @return true, if a close_notify was detected. */ public boolean closeNotifyReceived() { return close_notify_received; } /** * Connects the read sink (downReader) to the SSLFlowDelegate Reader, * and the write sink (downWriter) to the SSLFlowDelegate Writer. * Called from within the constructor. Overwritten by SSLTube. * * @param downReader The left hand side read sink (typically, the * HttpConnection read subscriber). * @param downWriter The right hand side write sink (typically * the SocketTube write subscriber). */ void connect(Subscriber> downReader, Subscriber> downWriter) { this.reader.subscribe(downReader); this.writer.subscribe(downWriter); } /** * Returns a CompletableFuture which completes after * the initial handshake completes, and which contains the negotiated * alpn. */ public CompletableFuture alpn() { return alpnCF; } private void setALPN() { // Handshake is finished. So, can retrieve the ALPN now if (alpnCF.isDone()) return; String alpn = engine.getApplicationProtocol(); if (debug.on()) debug.log("setALPN = %s", alpn); alpnCF.complete(alpn); } public String monitor() { StringBuilder sb = new StringBuilder(); sb.append("SSL: id ").append(id); sb.append(" ").append(dbgString()); sb.append(" HS state: " + states(handshakeState)); sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); if (stateList != null) { sb.append(" LL : "); for (String s : stateList) { sb.append(s).append(" "); } } sb.append("\r\n"); sb.append("Reader:: ").append(reader.toString()); sb.append("\r\n"); sb.append("Writer:: ").append(writer.toString()); sb.append("\r\n==================================="); return sb.toString(); } protected SchedulingAction enterReadScheduling() { return SchedulingAction.CONTINUE; } /** * Processing function for incoming data. Pass it thru SSLEngine.unwrap(). * Any decrypted buffers returned to be passed downstream. * Status codes: * NEED_UNWRAP: do nothing. Following incoming data will contain * any required handshake data * NEED_WRAP: call writer.addData() with empty buffer * NEED_TASK: delegate task to executor * BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap * BUFFER_UNDERFLOW: keep buffer and wait for more data * OK: return generated buffers. * * Upstream subscription strategy is to try and keep no more than * TARGET_BUFSIZE bytes in readBuf */ final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber { // Maximum record size is 16k. // Because SocketTube can feeds us up to 3 16K buffers, // then setting this size to 16K means that the readBuf // can store up to 64K-1 (16K-1 + 3*16K) static final int TARGET_BUFSIZE = 16 * 1024; final SequentialScheduler scheduler; volatile ByteBuffer readBuf; volatile boolean completing; final Object readBufferLock = new Object(); final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); private final class ReaderDownstreamPusher implements Runnable { @Override public void run() { processData(); } } Reader() { super(); scheduler = SequentialScheduler.synchronizedScheduler( new ReaderDownstreamPusher()); this.readBuf = ByteBuffer.allocate(1024); readBuf.limit(0); // keep in read mode } @Override public boolean supportsRecycling() { return recycler != null; } protected SchedulingAction enterScheduling() { return enterReadScheduling(); } public final String dbgString() { return "SSL Reader(" + tubeName + ")"; } /** * entry point for buffers delivered from upstream Subscriber */ @Override public void incoming(List buffers, boolean complete) { if (debugr.on()) debugr.log("Adding %d bytes to read buffer", Utils.remaining(buffers)); addToReadBuf(buffers, complete); scheduler.runOrSchedule(exec); } @Override public String toString() { return "READER: " + super.toString() + ", readBuf: " + readBuf.toString() + ", count: " + count.toString() + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running") + ", status: " + lastUnwrapStatus; } private void reallocReadBuf() { int sz = readBuf.capacity(); ByteBuffer newb = ByteBuffer.allocate(sz * 2); readBuf.flip(); Utils.copy(readBuf, newb); readBuf = newb; } @Override protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { if (readBuf.remaining() > TARGET_BUFSIZE) { if (debugr.on()) debugr.log("readBuf has more than TARGET_BUFSIZE: %d", readBuf.remaining()); return 0; } else { return super.upstreamWindowUpdate(currentWindow, downstreamQsize); } } // readBuf is kept ready for reading outside of this method private void addToReadBuf(List buffers, boolean complete) { assert Utils.remaining(buffers) > 0 || buffers.isEmpty(); synchronized (readBufferLock) { for (ByteBuffer buf : buffers) { readBuf.compact(); while (readBuf.remaining() < buf.remaining()) reallocReadBuf(); readBuf.put(buf); readBuf.flip(); // should be safe to call inside lock // since the only implementation // offers the buffer to an unbounded queue. // WARNING: do not touch buf after this point! if (recycler != null) recycler.accept(buf); } if (complete) { this.completing = complete; minBytesRequired = 0; } } } void schedule() { scheduler.runOrSchedule(exec); } void stop() { if (debugr.on()) debugr.log("stop"); scheduler.stop(); } AtomicInteger count = new AtomicInteger(0); // minimum number of bytes required to call unwrap. // Usually this is 0, unless there was a buffer underflow. // In this case we need to wait for more bytes than what // we had before calling unwrap() again. volatile int minBytesRequired; // work function where it all happens final void processData() { try { if (debugr.on()) debugr.log("processData:" + " readBuf remaining:" + readBuf.remaining() + ", state:" + states(handshakeState) + ", engine handshake status:" + engine.getHandshakeStatus()); int len; boolean complete = false; while (readBuf.remaining() > (len = minBytesRequired)) { boolean handshaking = false; try { EngineResult result; synchronized (readBufferLock) { complete = this.completing; if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining()); // Unless there is a BUFFER_UNDERFLOW, we should try to // unwrap any number of bytes. Set minBytesRequired to 0: // we only need to do that if minBytesRequired is not already 0. len = len > 0 ? minBytesRequired = 0 : len; result = unwrapBuffer(readBuf); len = readBuf.remaining(); if (debugr.on()) { debugr.log("Unwrapped: result: %s", result.result); debugr.log("Unwrapped: consumed: %s", result.bytesConsumed()); } } if (result.bytesProduced() > 0) { if (debugr.on()) debugr.log("sending %d", result.bytesProduced()); count.addAndGet(result.bytesProduced()); outgoing(result.destBuffer, false); } if (result.status() == Status.BUFFER_UNDERFLOW) { if (debugr.on()) debugr.log("BUFFER_UNDERFLOW"); // not enough data in the read buffer... // no need to try to unwrap again unless we get more bytes // than minBytesRequired = len in the read buffer. synchronized (readBufferLock) { minBytesRequired = len; // more bytes could already have been added... assert readBuf.remaining() >= len; // check if we have received some data, and if so // we can just re-spin the loop if (readBuf.remaining() > len) continue; else if (this.completing) { if (debug.on()) { debugr.log("BUFFER_UNDERFLOW with EOF," + " %d bytes non decrypted.", len); } // The channel won't send us any more data, and // we are in underflow: we need to fail. throw new IOException("BUFFER_UNDERFLOW with EOF, " + len + " bytes non decrypted."); } } // request more data and return. requestMore(); return; } if (complete && result.status() == Status.CLOSED) { if (debugr.on()) debugr.log("Closed: completing"); outgoing(Utils.EMPTY_BB_LIST, true); return; } if (result.handshaking()) { handshaking = true; if (debugr.on()) debugr.log("handshaking"); if (doHandshake(result, READER)) continue; // need unwrap else break; // doHandshake will have triggered the write scheduler if necessary } else { if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { handshaking = false; applicationBufferSize = engine.getSession().getApplicationBufferSize(); packetBufferSize = engine.getSession().getPacketBufferSize(); setALPN(); resumeActivity(); } } } catch (IOException ex) { errorCommon(ex); handleError(ex); return; } if (handshaking && !complete) return; } if (!complete) { synchronized (readBufferLock) { complete = this.completing && !readBuf.hasRemaining(); } } if (complete) { if (debugr.on()) debugr.log("completing"); // Complete the alpnCF, if not already complete, regardless of // whether or not the ALPN is available, there will be no more // activity. setALPN(); outgoing(Utils.EMPTY_BB_LIST, true); } } catch (Throwable ex) { errorCommon(ex); handleError(ex); } } private volatile Status lastUnwrapStatus; EngineResult unwrapBuffer(ByteBuffer src) throws IOException { ByteBuffer dst = getAppBuffer(); int len = src.remaining(); while (true) { SSLEngineResult sslResult = engine.unwrap(src, dst); switch (lastUnwrapStatus = sslResult.getStatus()) { case BUFFER_OVERFLOW: // may happen if app size buffer was changed, or if // our 'adaptiveBufferSize' guess was too small for // the current payload. In that case, update the // value of applicationBufferSize, and allocate a // buffer of that size, which we are sure will be // big enough to decode whatever needs to be // decoded. We will later update adaptiveBufferSize // in OK: below. int appSize = applicationBufferSize = engine.getSession().getApplicationBufferSize(); ByteBuffer b = ByteBuffer.allocate(appSize + dst.position()); dst.flip(); b.put(dst); dst = b; break; case CLOSED: assert dst.position() == 0; return doClosure(new EngineResult(sslResult)); case BUFFER_UNDERFLOW: // handled implicitly by compaction/reallocation of readBuf assert dst.position() == 0; return new EngineResult(sslResult); case OK: int size = dst.position(); if (debug.on()) { debugr.log("Decoded " + size + " bytes out of " + len + " into buffer of " + dst.capacity() + " remaining to decode: " + src.remaining()); } // if the record payload was bigger than what was originally // allocated, then sets the adaptiveAppBufferSize to size // and we will use that new size as a guess for the next app // buffer. if (size > adaptiveAppBufferSize) { adaptiveAppBufferSize = ((size + 7) >>> 3) << 3; } dst.flip(); return new EngineResult(sslResult, dst); } } } } public interface Monitorable { public String getInfo(); } public static class Monitor extends Thread { final List> list; final List finalList; final ReferenceQueue queue = new ReferenceQueue<>(); static Monitor themon; static { themon = new Monitor(); themon.start(); // uncomment to enable Monitor } // An instance used to temporarily store the // last observable state of a monitorable object. // When Monitor.remove(o) is called, we replace // 'o' with a FinalMonitorable whose reference // will be enqueued after the last observable state // has been printed. final class FinalMonitorable implements Monitorable { final String finalState; FinalMonitorable(Monitorable o) { finalState = o.getInfo(); finalList.add(this); } @Override public String getInfo() { finalList.remove(this); return finalState; } } Monitor() { super("Monitor"); setDaemon(true); list = Collections.synchronizedList(new LinkedList<>()); finalList = new ArrayList<>(); // access is synchronized on list above } void addTarget(Monitorable o) { list.add(new WeakReference<>(o, queue)); } void removeTarget(Monitorable o) { // It can take a long time for GC to clean up references. // Calling Monitor.remove() early helps removing noise from the // logs/ synchronized (list) { Iterator> it = list.iterator(); while (it.hasNext()) { Monitorable m = it.next().get(); if (m == null) it.remove(); if (o == m) { it.remove(); break; } } FinalMonitorable m = new FinalMonitorable(o); addTarget(m); Reference.reachabilityFence(m); } } public static void add(Monitorable o) { themon.addTarget(o); } public static void remove(Monitorable o) { themon.removeTarget(o); } @Override public void run() { System.out.println("Monitor starting"); try { while (true) { Thread.sleep(20 * 1000); synchronized (list) { Reference expired; while ((expired = queue.poll()) != null) list.remove(expired); for (WeakReference ref : list) { Monitorable o = ref.get(); if (o == null) continue; if (o instanceof FinalMonitorable) { ref.enqueue(); } System.out.println(o.getInfo()); System.out.println("-------------------------"); } } System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-"); } } catch (InterruptedException e) { System.out.println("Monitor exiting with " + e); } } } /** * Processing function for outgoing data. Pass it thru SSLEngine.wrap() * Any encrypted buffers generated are passed downstream to be written. * Status codes: * NEED_UNWRAP: call reader.addData() with empty buffer * NEED_WRAP: call addData() with empty buffer * NEED_TASK: delegate task to executor * BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap * BUFFER_UNDERFLOW: shouldn't happen on writing side * OK: return generated buffers */ class Writer extends SubscriberWrapper { final SequentialScheduler scheduler; // queues of buffers received from upstream waiting // to be processed by the SSLEngine final List writeList; final Logger debugw = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); volatile boolean completing; boolean completed; // only accessed in processData class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask { @Override public void run() { processData(); } } Writer() { super(); writeList = Collections.synchronizedList(new LinkedList<>()); scheduler = new SequentialScheduler(new WriterDownstreamPusher()); } @Override protected void incoming(List buffers, boolean complete) { assert complete ? buffers == Utils.EMPTY_BB_LIST : true; assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true; if (complete) { if (debugw.on()) debugw.log("adding SENTINEL"); completing = true; writeList.add(SENTINEL); } else { writeList.addAll(buffers); } if (debugw.on()) debugw.log("added " + buffers.size() + " (" + Utils.remaining(buffers) + " bytes) to the writeList"); scheduler.runOrSchedule(); } public final String dbgString() { return "SSL Writer(" + tubeName + ")"; } protected void onSubscribe() { if (debugw.on()) debugw.log("onSubscribe initiating handshaking"); addData(HS_TRIGGER); // initiates handshaking } void schedule() { scheduler.runOrSchedule(); } void stop() { if (debugw.on()) debugw.log("stop"); scheduler.stop(); } @Override public boolean closing() { return closeNotifyReceived(); } private boolean isCompleting() { return completing; } @Override protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { if (writeList.size() > 10) return 0; else return super.upstreamWindowUpdate(currentWindow, downstreamQsize); } private boolean hsTriggered() { synchronized(writeList) { for (ByteBuffer b : writeList) if (b == HS_TRIGGER) return true; return false; } } void triggerWrite() { synchronized (writeList) { if (writeList.isEmpty()) { writeList.add(HS_TRIGGER); } } scheduler.runOrSchedule(); } private void processData() { boolean completing = isCompleting(); try { if (debugw.on()) debugw.log("processData, writeList remaining:" + Utils.remaining(writeList) + ", hsTriggered:" + hsTriggered() + ", needWrap:" + needWrap()); while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) { ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY); EngineResult result = wrapBuffers(outbufs); if (debugw.on()) debugw.log("wrapBuffer returned %s", result.result); if (result.status() == Status.CLOSED) { if (!upstreamCompleted) { upstreamCompleted = true; upstreamSubscription.cancel(); } if (result.bytesProduced() <= 0) return; if (!completing && !completed) { completing = this.completing = true; // There could still be some outgoing data in outbufs. writeList.add(SENTINEL); } } boolean handshaking = false; if (result.handshaking()) { if (debugw.on()) debugw.log("handshaking"); doHandshake(result, WRITER); // ok to ignore return handshaking = true; } else { if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { applicationBufferSize = engine.getSession().getApplicationBufferSize(); packetBufferSize = engine.getSession().getPacketBufferSize(); setALPN(); resumeActivity(); } } cleanList(writeList); // tidy up the source list sendResultBytes(result); if (handshaking) { if (!completing && needWrap()) { continue; } else { return; } } } if (completing && Utils.remaining(writeList) == 0) { if (!completed) { completed = true; writeList.clear(); outgoing(Utils.EMPTY_BB_LIST, true); } return; } if (writeList.isEmpty() && needWrap()) { writer.addData(HS_TRIGGER); } } catch (Throwable ex) { errorCommon(ex); handleError(ex); } } // The SSLEngine insists on being given a buffer that is at least // SSLSession.getPacketBufferSize() long (usually 16K). If given // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only // has 6 bytes to wrap. Typical usage shows that for GET we // usually produce an average of ~ 100 bytes. // To avoid wasting space, and because allocating and zeroing // 16K buffers for encoding 6 bytes is costly, we are reusing the // same writeBuffer to interact with SSLEngine.wrap(). // If the SSLEngine produces less than writeBuffer.capacity() / 2, // then we copy off the bytes to a smaller buffer that we send // downstream. Otherwise, we send the writeBuffer downstream // and will allocate a new one next time. volatile ByteBuffer writeBuffer; private volatile Status lastWrappedStatus; @SuppressWarnings("fallthrough") EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { long len = Utils.remaining(src); if (debugw.on()) debugw.log("wrapping " + len + " bytes"); ByteBuffer dst = writeBuffer; if (dst == null) dst = writeBuffer = getNetBuffer(); assert dst.position() == 0 : "buffer position is " + dst.position(); assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity(); while (true) { SSLEngineResult sslResult = engine.wrap(src, dst); if (debugw.on()) debugw.log("SSLResult: " + sslResult); switch (lastWrappedStatus = sslResult.getStatus()) { case BUFFER_OVERFLOW: // Shouldn't happen. We allocated buffer with packet size // get it again if net buffer size was changed if (debugw.on()) debugw.log("BUFFER_OVERFLOW"); int netSize = packetBufferSize = engine.getSession().getPacketBufferSize(); ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position()); dst.flip(); b.put(dst); dst = b; break; // try again case CLOSED: if (debugw.on()) debugw.log("CLOSED"); // fallthrough. There could be some remaining data in dst. // CLOSED will be handled by the caller. case OK: final ByteBuffer dest; if (dst.position() == 0) { dest = NOTHING; // can happen if handshake is in progress } else if (dst.position() < dst.capacity() / 2) { // less than half the buffer was used. // copy off the bytes to a smaller buffer, and keep // the writeBuffer for next time. dst.flip(); dest = Utils.copyAligned(dst); dst.clear(); } else { // more than half the buffer was used. // just send that buffer downstream, and we will // get a new writeBuffer next time it is needed. dst.flip(); dest = dst; writeBuffer = null; } if (debugw.on()) debugw.log("OK => produced: %d bytes into %d, not wrapped: %d", dest.remaining(), dest.capacity(), Utils.remaining(src)); return new EngineResult(sslResult, dest); case BUFFER_UNDERFLOW: // Shouldn't happen. Doesn't returns when wrap() // underflow handled externally // assert false : "Buffer Underflow"; if (debug.on()) debug.log("BUFFER_UNDERFLOW"); return new EngineResult(sslResult); default: if (debugw.on()) debugw.log("result: %s", sslResult.getStatus()); assert false : "result:" + sslResult.getStatus(); } } } private boolean needWrap() { return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP; } private void sendResultBytes(EngineResult result) { if (result.bytesProduced() > 0) { if (debugw.on()) debugw.log("Sending %d bytes downstream", result.bytesProduced()); outgoing(result.destBuffer, false); } } @Override public String toString() { return "WRITER: " + super.toString() + ", writeList size: " + Integer.toString(writeList.size()) + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running") + ", status: " + lastWrappedStatus; //" writeList: " + writeList.toString(); } } private void handleError(Throwable t) { if (debug.on()) debug.log("handleError", t); readerCF.completeExceptionally(t); writerCF.completeExceptionally(t); // no-op if already completed alpnCF.completeExceptionally(t); reader.stop(); writer.stop(); } boolean stopped; private synchronized void normalStop() { if (stopped) return; stopped = true; reader.stop(); writer.stop(); if (isMonitored) Monitor.remove(monitor); } private Void stopOnError(Throwable currentlyUnused) { // maybe log, etc normalStop(); return null; } private void cleanList(List l) { synchronized (l) { Iterator iter = l.iterator(); while (iter.hasNext()) { ByteBuffer b = iter.next(); if (!b.hasRemaining() && b != SENTINEL) { iter.remove(); } } } } /** * States for handshake. We avoid races when accessing/updating the AtomicInt * because updates always schedule an additional call to both the read() * and write() functions. */ private static final int NOT_HANDSHAKING = 0; private static final int HANDSHAKING = 1; // Bit flags // a thread is currently executing tasks private static final int DOING_TASKS = 4; // a thread wants to execute tasks, while another thread is executing private static final int REQUESTING_TASKS = 8; private static final int TASK_BITS = 12; // Both bits private static final int READER = 1; private static final int WRITER = 2; private static String states(AtomicInteger state) { int s = state.get(); StringBuilder sb = new StringBuilder(); int x = s & ~TASK_BITS; switch (x) { case NOT_HANDSHAKING: sb.append(" NOT_HANDSHAKING "); break; case HANDSHAKING: sb.append(" HANDSHAKING "); break; default: throw new InternalError(); } if ((s & DOING_TASKS) > 0) sb.append("|DOING_TASKS"); if ((s & REQUESTING_TASKS) > 0) sb.append("|REQUESTING_TASKS"); return sb.toString(); } private void resumeActivity() { reader.schedule(); writer.schedule(); } final AtomicInteger handshakeState; final ConcurrentLinkedQueue stateList = debug.on() ? new ConcurrentLinkedQueue<>() : null; // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS // depending on previous value private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> { if ((current & DOING_TASKS) == 0) return DOING_TASKS | (current & HANDSHAKING); else return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING); }; // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set // clears bits if not. private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> { if ((current & REQUESTING_TASKS) != 0) return DOING_TASKS | (current & HANDSHAKING); // clear both bits return (current & HANDSHAKING); }; private boolean doHandshake(EngineResult r, int caller) { // unconditionally sets the HANDSHAKING bit, while preserving task bits handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS)); if (stateList != null && debug.on()) { stateList.add(r.handshakeStatus().toString()); stateList.add(Integer.toString(caller)); } switch (r.handshakeStatus()) { case NEED_TASK: int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS); if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks return false; } if (debug.on()) debug.log("obtaining and initiating task execution"); List tasks = obtainTasks(); executeTasks(tasks); return false; // executeTasks will resume activity case NEED_WRAP: if (caller == READER) { writer.triggerWrite(); return false; } break; case NEED_UNWRAP: case NEED_UNWRAP_AGAIN: // do nothing else // receiving-side data will trigger unwrap if (caller == WRITER) { reader.schedule(); return false; } break; default: throw new InternalError("Unexpected handshake status:" + r.handshakeStatus()); } return true; } private List obtainTasks() { List l = new ArrayList<>(); Runnable r; while ((r = engine.getDelegatedTask()) != null) { l.add(r); } return l; } private void executeTasks(List tasks) { exec.execute(() -> { try { List nextTasks = tasks; if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size())); do { nextTasks.forEach(Runnable::run); if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { nextTasks = obtainTasks(); } else { int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS); if ((s & DOING_TASKS) != 0) { if (debug.on()) debug.log("re-running tasks (B)"); nextTasks = obtainTasks(); continue; } break; } } while (true); if (debug.on()) debug.log("finished task execution"); resumeActivity(); } catch (Throwable t) { handleError(t); } }); } // FIXME: acknowledge a received CLOSE request from peer EngineResult doClosure(EngineResult r) throws IOException { if (debug.on()) debug.log("doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]", r.result, engine.getHandshakeStatus(), engine.isOutboundDone(), engine.isInboundDone()); if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) { // we have received TLS close_notify and need to send // an acknowledgement back. We're calling doHandshake // to finish the close handshake. if (engine.isInboundDone() && !engine.isOutboundDone()) { if (debug.on()) debug.log("doClosure: close_notify received"); close_notify_received = true; if (!writer.scheduler.isStopped()) { doHandshake(r, READER); } else { // We have received closed notify, but we // won't be able to send the acknowledgement. // Nothing more will come from the socket either, // so mark the reader as completed. synchronized (reader.readBufferLock) { reader.completing = true; } } } } return r; } /** * Returns the upstream Flow.Subscriber of the reading (incoming) side. * This flow must be given the encrypted data read from upstream (eg socket) * before it is decrypted. */ public Flow.Subscriber> upstreamReader() { return reader; } /** * Returns the upstream Flow.Subscriber of the writing (outgoing) side. * This flow contains the plaintext data before it is encrypted. */ public Flow.Subscriber> upstreamWriter() { return writer; } public boolean resumeReader() { return reader.signalScheduling(); } public void resetReaderDemand() { reader.resetDownstreamDemand(); } static class EngineResult { final SSLEngineResult result; final ByteBuffer destBuffer; // normal result EngineResult(SSLEngineResult result) { this(result, null); } EngineResult(SSLEngineResult result, ByteBuffer destBuffer) { this.result = result; this.destBuffer = destBuffer; } boolean handshaking() { HandshakeStatus s = result.getHandshakeStatus(); return s != HandshakeStatus.FINISHED && s != HandshakeStatus.NOT_HANDSHAKING && result.getStatus() != Status.CLOSED; } boolean needUnwrap() { HandshakeStatus s = result.getHandshakeStatus(); return s == HandshakeStatus.NEED_UNWRAP; } int bytesConsumed() { return result.bytesConsumed(); } int bytesProduced() { return result.bytesProduced(); } SSLEngineResult.HandshakeStatus handshakeStatus() { return result.getHandshakeStatus(); } SSLEngineResult.Status status() { return result.getStatus(); } } // The maximum network buffer size negotiated during // the handshake. Usually 16K. volatile int packetBufferSize; final ByteBuffer getNetBuffer() { int netSize = packetBufferSize; if (netSize <= 0) { packetBufferSize = netSize = engine.getSession().getPacketBufferSize(); } return ByteBuffer.allocate(netSize); } // The maximum application buffer size negotiated during // the handshake. Usually close to 16K. volatile int applicationBufferSize; // Despite of the maximum applicationBufferSize negotiated // above, TLS records usually have a much smaller payload. // The adaptativeAppBufferSize records the max payload // ever decoded, and we use that as a guess for how big // a buffer we will need for the next payload. // This avoids allocating and zeroing a 16K buffer for // nothing... volatile int adaptiveAppBufferSize; final ByteBuffer getAppBuffer() { int appSize = applicationBufferSize; if (appSize <= 0) { applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize(); } int size = adaptiveAppBufferSize; if (size <= 0) { size = 512; // start with 512 this is usually enough for handshaking / headers } else if (size > appSize) { size = appSize; } // will cause a BUFFER_OVERFLOW if not big enough, but // that's OK. return ByteBuffer.allocate(size); } final String dbgString() { return "SSLFlowDelegate(" + tubeName + ")"; } }