1 /*
   2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http.common;
  27 
  28 import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction;
  29 
  30 import javax.net.ssl.SSLEngine;
  31 import javax.net.ssl.SSLEngineResult;
  32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
  33 import javax.net.ssl.SSLEngineResult.Status;
  34 import javax.net.ssl.SSLException;
  35 import java.io.IOException;
  36 import java.lang.ref.Reference;
  37 import java.lang.ref.ReferenceQueue;
  38 import java.lang.ref.WeakReference;
  39 import java.nio.ByteBuffer;
  40 import java.util.ArrayList;
  41 import java.util.Collections;
  42 import java.util.Iterator;
  43 import java.util.LinkedList;
  44 import java.util.List;
  45 import java.util.concurrent.CompletableFuture;
  46 import java.util.concurrent.ConcurrentLinkedQueue;
  47 import java.util.concurrent.Executor;
  48 import java.util.concurrent.Flow;
  49 import java.util.concurrent.Flow.Subscriber;
  50 import java.util.concurrent.atomic.AtomicInteger;
  51 import java.util.function.Consumer;
  52 import java.util.function.IntBinaryOperator;
  53 
  54 /**
  55  * Implements SSL using two SubscriberWrappers.
  56  *
  57  * <p> Constructor takes two Flow.Subscribers: one that receives the network
  58  * data (after it has been encrypted by SSLFlowDelegate) data, and one that
  59  * receives the application data (before it has been encrypted by SSLFlowDelegate).
  60  *
  61  * <p> Methods upstreamReader() and upstreamWriter() return the corresponding
  62  * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data.
  63  * See diagram below.
  64  *
  65  * <p> How Flow.Subscribers are used in this class, and where they come from:
  66  * <pre>
  67  * {@code
  68  *
  69  *
  70  *
  71  * --------->  data flow direction
  72  *
  73  *
  74  *                         +------------------+
  75  *        upstreamWriter   |                  | downWriter
  76  *        ---------------> |                  | ------------>
  77  *  obtained from this     |                  | supplied to constructor
  78  *                         | SSLFlowDelegate  |
  79  *        downReader       |                  | upstreamReader
  80  *        <--------------- |                  | <--------------
  81  * supplied to constructor |                  | obtained from this
  82  *                         +------------------+
  83  *
  84  * Errors are reported to the downReader Flow.Subscriber
  85  *
  86  * }
  87  * </pre>
  88  */
  89 public class SSLFlowDelegate {
  90 
  91     final Logger debug =
  92             Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  93 
  94     private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
  95     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
  96     // When handshake is in progress trying to wrap may produce no bytes.
  97     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
  98     private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
  99     private static final boolean isMonitored =
 100             monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true"));
 101 
 102     final Executor exec;
 103     final Reader reader;
 104     final Writer writer;
 105     final SSLEngine engine;
 106     final String tubeName; // hack
 107     final CompletableFuture<String> alpnCF; // completes on initial handshake
 108     final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
 109     volatile boolean close_notify_received;
 110     final CompletableFuture<Void> readerCF;
 111     final CompletableFuture<Void> writerCF;
 112     final Consumer<ByteBuffer> recycler;
 113     static AtomicInteger scount = new AtomicInteger(1);
 114     final int id;
 115 
 116     /**
 117      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
 118      * Flow.Subscriber requires an associated {@link CompletableFuture}
 119      * for errors that need to be signaled from downstream to upstream.
 120      */
 121     public SSLFlowDelegate(SSLEngine engine,
 122                            Executor exec,
 123                            Subscriber<? super List<ByteBuffer>> downReader,
 124                            Subscriber<? super List<ByteBuffer>> downWriter)
 125     {
 126         this(engine, exec, null, downReader, downWriter);
 127     }
 128 
 129     /**
 130      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
 131      * Flow.Subscriber requires an associated {@link CompletableFuture}
 132      * for errors that need to be signaled from downstream to upstream.
 133      */
 134     public SSLFlowDelegate(SSLEngine engine,
 135             Executor exec,
 136             Consumer<ByteBuffer> recycler,
 137             Subscriber<? super List<ByteBuffer>> downReader,
 138             Subscriber<? super List<ByteBuffer>> downWriter)
 139         {
 140         this.id = scount.getAndIncrement();
 141         this.tubeName = String.valueOf(downWriter);
 142         this.recycler = recycler;
 143         this.reader = new Reader();
 144         this.writer = new Writer();
 145         this.engine = engine;
 146         this.exec = exec;
 147         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
 148         this.readerCF = reader.completion();
 149         this.writerCF = reader.completion();
 150         readerCF.exceptionally(this::stopOnError);
 151         writerCF.exceptionally(this::stopOnError);
 152 
 153         CompletableFuture.allOf(reader.completion(), writer.completion())
 154             .thenRun(this::normalStop);
 155         this.alpnCF = new MinimalFuture<>();
 156 
 157         // connect the Reader to the downReader and the
 158         // Writer to the downWriter.
 159         connect(downReader, downWriter);
 160 
 161         if (isMonitored) Monitor.add(monitor);
 162     }
 163 
 164     /**
 165      * Returns true if the SSLFlowDelegate has detected a TLS
 166      * close_notify from the server.
 167      * @return true, if a close_notify was detected.
 168      */
 169     public boolean closeNotifyReceived() {
 170         return close_notify_received;
 171     }
 172 
 173     /**
 174      * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
 175      * and the write sink (downWriter) to the SSLFlowDelegate Writer.
 176      * Called from within the constructor. Overwritten by SSLTube.
 177      *
 178      * @param downReader  The left hand side read sink (typically, the
 179      *                    HttpConnection read subscriber).
 180      * @param downWriter  The right hand side write sink (typically
 181      *                    the SocketTube write subscriber).
 182      */
 183     void connect(Subscriber<? super List<ByteBuffer>> downReader,
 184                  Subscriber<? super List<ByteBuffer>> downWriter) {
 185         this.reader.subscribe(downReader);
 186         this.writer.subscribe(downWriter);
 187     }
 188 
 189    /**
 190     * Returns a CompletableFuture<String> which completes after
 191     * the initial handshake completes, and which contains the negotiated
 192     * alpn.
 193     */
 194     public CompletableFuture<String> alpn() {
 195         return alpnCF;
 196     }
 197 
 198     private void setALPN() {
 199         // Handshake is finished. So, can retrieve the ALPN now
 200         if (alpnCF.isDone())
 201             return;
 202         String alpn = engine.getApplicationProtocol();
 203         if (debug.on()) debug.log("setALPN = %s", alpn);
 204         alpnCF.complete(alpn);
 205     }
 206 
 207     public String monitor() {
 208         StringBuilder sb = new StringBuilder();
 209         sb.append("SSL: id ").append(id);
 210         sb.append(" ").append(dbgString());
 211         sb.append(" HS state: " + states(handshakeState));
 212         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
 213         if (stateList != null) {
 214             sb.append(" LL : ");
 215             for (String s : stateList) {
 216                 sb.append(s).append(" ");
 217             }
 218         }
 219         sb.append("\r\n");
 220         sb.append("Reader:: ").append(reader.toString());
 221         sb.append("\r\n");
 222         sb.append("Writer:: ").append(writer.toString());
 223         sb.append("\r\n===================================");
 224         return sb.toString();
 225     }
 226 
 227     protected SchedulingAction enterReadScheduling() {
 228         return SchedulingAction.CONTINUE;
 229     }
 230 
 231 
 232     /**
 233      * Processing function for incoming data. Pass it thru SSLEngine.unwrap().
 234      * Any decrypted buffers returned to be passed downstream.
 235      * Status codes:
 236      *     NEED_UNWRAP: do nothing. Following incoming data will contain
 237      *                  any required handshake data
 238      *     NEED_WRAP: call writer.addData() with empty buffer
 239      *     NEED_TASK: delegate task to executor
 240      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap
 241      *     BUFFER_UNDERFLOW: keep buffer and wait for more data
 242      *     OK: return generated buffers.
 243      *
 244      * Upstream subscription strategy is to try and keep no more than
 245      * TARGET_BUFSIZE bytes in readBuf
 246      */
 247     final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
 248         // Maximum record size is 16k.
 249         // Because SocketTube can feeds us up to 3 16K buffers,
 250         // then setting this size to 16K means that the readBuf
 251         // can store up to 64K-1 (16K-1 + 3*16K)
 252         static final int TARGET_BUFSIZE = 16 * 1024;
 253 
 254         final SequentialScheduler scheduler;
 255         volatile ByteBuffer readBuf;
 256         volatile boolean completing;
 257         final Object readBufferLock = new Object();
 258         final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
 259 
 260         private final class ReaderDownstreamPusher implements Runnable {
 261             @Override
 262             public void run() {
 263                 processData();
 264             }
 265         }
 266 
 267         Reader() {
 268             super();
 269             scheduler = SequentialScheduler.synchronizedScheduler(
 270                     new ReaderDownstreamPusher());
 271             this.readBuf = ByteBuffer.allocate(1024);
 272             readBuf.limit(0); // keep in read mode
 273         }
 274 
 275         @Override
 276         public boolean supportsRecycling() {
 277             return recycler != null;
 278         }
 279 
 280         protected SchedulingAction enterScheduling() {
 281             return enterReadScheduling();
 282         }
 283 
 284         public final String dbgString() {
 285             return "SSL Reader(" + tubeName + ")";
 286         }
 287 
 288         /**
 289          * entry point for buffers delivered from upstream Subscriber
 290          */
 291         @Override
 292         public void incoming(List<ByteBuffer> buffers, boolean complete) {
 293             if (debugr.on())
 294                 debugr.log("Adding %d bytes to read buffer",
 295                         Utils.remaining(buffers));
 296             addToReadBuf(buffers, complete);
 297             scheduler.runOrSchedule(exec);
 298         }
 299 
 300         @Override
 301         public String toString() {
 302             return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
 303                     + ", count: " + count.toString() + ", scheduler: "
 304                     + (scheduler.isStopped() ? "stopped" : "running")
 305                     + ", status: " + lastUnwrapStatus;
 306         }
 307 
 308         private void reallocReadBuf() {
 309             int sz = readBuf.capacity();
 310             ByteBuffer newb = ByteBuffer.allocate(sz * 2);
 311             readBuf.flip();
 312             Utils.copy(readBuf, newb);
 313             readBuf = newb;
 314         }
 315 
 316         @Override
 317         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
 318             if (readBuf.remaining() > TARGET_BUFSIZE) {
 319                 if (debugr.on())
 320                     debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
 321                             readBuf.remaining());
 322                 return 0;
 323             } else {
 324                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
 325             }
 326         }
 327 
 328         // readBuf is kept ready for reading outside of this method
 329         private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
 330             assert Utils.remaining(buffers) > 0 || buffers.isEmpty();
 331             synchronized (readBufferLock) {
 332                 for (ByteBuffer buf : buffers) {
 333                     readBuf.compact();
 334                     while (readBuf.remaining() < buf.remaining())
 335                         reallocReadBuf();
 336                     readBuf.put(buf);
 337                     readBuf.flip();
 338                     // should be safe to call inside lock
 339                     // since the only implementation
 340                     // offers the buffer to an unbounded queue.
 341                     // WARNING: do not touch buf after this point!
 342                     if (recycler != null) recycler.accept(buf);
 343                 }
 344                 if (complete) {
 345                     this.completing = complete;
 346                     minBytesRequired = 0;
 347                 }
 348             }
 349         }
 350 
 351         void schedule() {
 352             scheduler.runOrSchedule(exec);
 353         }
 354 
 355         void stop() {
 356             if (debugr.on()) debugr.log("stop");
 357             scheduler.stop();
 358         }
 359 
 360         AtomicInteger count = new AtomicInteger(0);
 361 
 362         // minimum number of bytes required to call unwrap.
 363         // Usually this is 0, unless there was a buffer underflow.
 364         // In this case we need to wait for more bytes than what
 365         // we had before calling unwrap() again.
 366         volatile int minBytesRequired;
 367 
 368         // work function where it all happens
 369         final void processData() {
 370             try {
 371                 if (debugr.on())
 372                     debugr.log("processData:"
 373                             + " readBuf remaining:" + readBuf.remaining()
 374                             + ", state:" + states(handshakeState)
 375                             + ", engine handshake status:" + engine.getHandshakeStatus());
 376                 int len;
 377                 boolean complete = false;
 378                 while (readBuf.remaining() > (len = minBytesRequired)) {
 379                     boolean handshaking = false;
 380                     try {
 381                         EngineResult result;
 382                         synchronized (readBufferLock) {
 383                             complete = this.completing;
 384                             if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining());
 385                             // Unless there is a BUFFER_UNDERFLOW, we should try to
 386                             // unwrap any number of bytes. Set minBytesRequired to 0:
 387                             // we only need to do that if minBytesRequired is not already 0.
 388                             len = len > 0 ? minBytesRequired = 0 : len;
 389                             result = unwrapBuffer(readBuf);
 390                             len = readBuf.remaining();
 391                             if (debugr.on()) {
 392                                 debugr.log("Unwrapped: result: %s", result.result);
 393                                 debugr.log("Unwrapped: consumed: %s", result.bytesConsumed());
 394                             }
 395                         }
 396                         if (result.bytesProduced() > 0) {
 397                             if (debugr.on())
 398                                 debugr.log("sending %d", result.bytesProduced());
 399                             count.addAndGet(result.bytesProduced());
 400                             outgoing(result.destBuffer, false);
 401                         }
 402                         if (result.status() == Status.BUFFER_UNDERFLOW) {
 403                             if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
 404                             // not enough data in the read buffer...
 405                             // no need to try to unwrap again unless we get more bytes
 406                             // than minBytesRequired = len in the read buffer.
 407                             synchronized (readBufferLock) {
 408                                 minBytesRequired = len;
 409                                 // more bytes could already have been added...
 410                                 assert readBuf.remaining() >= len;
 411                                 // check if we have received some data, and if so
 412                                 // we can just re-spin the loop
 413                                 if (readBuf.remaining() > len) continue;
 414                                 else if (this.completing) {
 415                                     if (debug.on()) {
 416                                         debugr.log("BUFFER_UNDERFLOW with EOF," +
 417                                                 " %d bytes non decrypted.", len);
 418                                     }
 419                                     // The channel won't send us any more data, and
 420                                     // we are in underflow: we need to fail.
 421                                     throw new IOException("BUFFER_UNDERFLOW with EOF, "
 422                                             + len + " bytes non decrypted.");
 423                                 }
 424                             }
 425                             // request more data and return.
 426                             requestMore();
 427                             return;
 428                         }
 429                         if (complete && result.status() == Status.CLOSED) {
 430                             if (debugr.on()) debugr.log("Closed: completing");
 431                             outgoing(Utils.EMPTY_BB_LIST, true);
 432                             return;
 433                         }
 434                         if (result.handshaking()) {
 435                             handshaking = true;
 436                             if (debugr.on()) debugr.log("handshaking");
 437                             if (doHandshake(result, READER)) continue; // need unwrap
 438                             else break; // doHandshake will have triggered the write scheduler if necessary
 439                         } else {
 440                             if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
 441                                 handshaking = false;
 442                                 applicationBufferSize = engine.getSession().getApplicationBufferSize();
 443                                 packetBufferSize = engine.getSession().getPacketBufferSize();
 444                                 setALPN();
 445                                 resumeActivity();
 446                             }
 447                         }
 448                     } catch (IOException ex) {
 449                         errorCommon(ex);
 450                         handleError(ex);
 451                         return;
 452                     }
 453                     if (handshaking && !complete)
 454                         return;
 455                 }
 456                 if (!complete) {
 457                     synchronized (readBufferLock) {
 458                         complete = this.completing && !readBuf.hasRemaining();
 459                     }
 460                 }
 461                 if (complete) {
 462                     if (debugr.on()) debugr.log("completing");
 463                     // Complete the alpnCF, if not already complete, regardless of
 464                     // whether or not the ALPN is available, there will be no more
 465                     // activity.
 466                     setALPN();
 467                     outgoing(Utils.EMPTY_BB_LIST, true);
 468                 }
 469             } catch (Throwable ex) {
 470                 errorCommon(ex);
 471                 handleError(ex);
 472             }
 473         }
 474 
 475         private volatile Status lastUnwrapStatus;
 476         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
 477             ByteBuffer dst = getAppBuffer();
 478             int len = src.remaining();
 479             while (true) {
 480                 SSLEngineResult sslResult = engine.unwrap(src, dst);
 481                 switch (lastUnwrapStatus = sslResult.getStatus()) {
 482                     case BUFFER_OVERFLOW:
 483                         // may happen if app size buffer was changed, or if
 484                         // our 'adaptiveBufferSize' guess was too small for
 485                         // the current payload. In that case, update the
 486                         // value of applicationBufferSize, and allocate a
 487                         // buffer of that size, which we are sure will be
 488                         // big enough to decode whatever needs to be
 489                         // decoded. We will later update adaptiveBufferSize
 490                         // in OK: below.
 491                         int appSize = applicationBufferSize =
 492                                 engine.getSession().getApplicationBufferSize();
 493                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
 494                         dst.flip();
 495                         b.put(dst);
 496                         dst = b;
 497                         break;
 498                     case CLOSED:
 499                         assert dst.position() == 0;
 500                         return doClosure(new EngineResult(sslResult));
 501                     case BUFFER_UNDERFLOW:
 502                         // handled implicitly by compaction/reallocation of readBuf
 503                         assert dst.position() == 0;
 504                         return new EngineResult(sslResult);
 505                     case OK:
 506                         int size = dst.position();
 507                         if (debug.on()) {
 508                             debugr.log("Decoded " + size + " bytes out of " + len
 509                                     + " into buffer of " + dst.capacity()
 510                                     + " remaining to decode: " + src.remaining());
 511                         }
 512                         // if the record payload was bigger than what was originally
 513                         // allocated, then sets the adaptiveAppBufferSize to size
 514                         // and we will use that new size as a guess for the next app
 515                         // buffer.
 516                         if (size > adaptiveAppBufferSize) {
 517                             adaptiveAppBufferSize = ((size + 7) >>> 3) << 3;
 518                         }
 519                         dst.flip();
 520                         return new EngineResult(sslResult, dst);
 521                 }
 522             }
 523         }
 524     }
 525 
 526     public interface Monitorable {
 527         public String getInfo();
 528     }
 529 
 530     public static class Monitor extends Thread {
 531         final List<WeakReference<Monitorable>> list;
 532         final List<FinalMonitorable> finalList;
 533         final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>();
 534         static Monitor themon;
 535 
 536         static {
 537             themon = new Monitor();
 538             themon.start(); // uncomment to enable Monitor
 539         }
 540 
 541         // An instance used to temporarily store the
 542         // last observable state of a monitorable object.
 543         // When Monitor.remove(o) is called, we replace
 544         // 'o' with a FinalMonitorable whose reference
 545         // will be enqueued after the last observable state
 546         // has been printed.
 547         final class FinalMonitorable implements Monitorable {
 548             final String finalState;
 549             FinalMonitorable(Monitorable o) {
 550                 finalState = o.getInfo();
 551                 finalList.add(this);
 552             }
 553             @Override
 554             public String getInfo() {
 555                 finalList.remove(this);
 556                 return finalState;
 557             }
 558         }
 559 
 560         Monitor() {
 561             super("Monitor");
 562             setDaemon(true);
 563             list = Collections.synchronizedList(new LinkedList<>());
 564             finalList = new ArrayList<>(); // access is synchronized on list above
 565         }
 566 
 567         void addTarget(Monitorable o) {
 568             list.add(new WeakReference<>(o, queue));
 569         }
 570         void removeTarget(Monitorable o) {
 571             // It can take a long time for GC to clean up references.
 572             // Calling Monitor.remove() early helps removing noise from the
 573             // logs/
 574             synchronized (list) {
 575                 Iterator<WeakReference<Monitorable>> it = list.iterator();
 576                 while (it.hasNext()) {
 577                     Monitorable m = it.next().get();
 578                     if (m == null) it.remove();
 579                     if (o == m) {
 580                         it.remove();
 581                         break;
 582                     }
 583                 }
 584                 FinalMonitorable m = new FinalMonitorable(o);
 585                 addTarget(m);
 586                 Reference.reachabilityFence(m);
 587             }
 588         }
 589 
 590         public static void add(Monitorable o) {
 591             themon.addTarget(o);
 592         }
 593         public static void remove(Monitorable o) {
 594             themon.removeTarget(o);
 595         }
 596 
 597         @Override
 598         public void run() {
 599             System.out.println("Monitor starting");
 600             try {
 601                 while (true) {
 602                     Thread.sleep(20 * 1000);
 603                     synchronized (list) {
 604                         Reference<? extends Monitorable> expired;
 605                         while ((expired = queue.poll()) != null) list.remove(expired);
 606                         for (WeakReference<Monitorable> ref : list) {
 607                             Monitorable o = ref.get();
 608                             if (o == null) continue;
 609                             if (o instanceof FinalMonitorable) {
 610                                 ref.enqueue();
 611                             }
 612                             System.out.println(o.getInfo());
 613                             System.out.println("-------------------------");
 614                         }
 615                     }
 616                     System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
 617                 }
 618             } catch (InterruptedException e) {
 619                 System.out.println("Monitor exiting with " + e);
 620             }
 621         }
 622     }
 623 
 624     /**
 625      * Processing function for outgoing data. Pass it thru SSLEngine.wrap()
 626      * Any encrypted buffers generated are passed downstream to be written.
 627      * Status codes:
 628      *     NEED_UNWRAP: call reader.addData() with empty buffer
 629      *     NEED_WRAP: call addData() with empty buffer
 630      *     NEED_TASK: delegate task to executor
 631      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap
 632      *     BUFFER_UNDERFLOW: shouldn't happen on writing side
 633      *     OK: return generated buffers
 634      */
 635     class Writer extends SubscriberWrapper {
 636         final SequentialScheduler scheduler;
 637         // queues of buffers received from upstream waiting
 638         // to be processed by the SSLEngine
 639         final List<ByteBuffer> writeList;
 640         final Logger debugw =  Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
 641         volatile boolean completing;
 642         boolean completed; // only accessed in processData
 643 
 644         class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
 645             @Override public void run() { processData(); }
 646         }
 647 
 648         Writer() {
 649             super();
 650             writeList = Collections.synchronizedList(new LinkedList<>());
 651             scheduler = new SequentialScheduler(new WriterDownstreamPusher());
 652         }
 653 
 654         @Override
 655         protected void incoming(List<ByteBuffer> buffers, boolean complete) {
 656             assert complete ? buffers == Utils.EMPTY_BB_LIST : true;
 657             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
 658             if (complete) {
 659                 if (debugw.on()) debugw.log("adding SENTINEL");
 660                 completing = true;
 661                 writeList.add(SENTINEL);
 662             } else {
 663                 writeList.addAll(buffers);
 664             }
 665             if (debugw.on())
 666                 debugw.log("added " + buffers.size()
 667                            + " (" + Utils.remaining(buffers)
 668                            + " bytes) to the writeList");
 669             scheduler.runOrSchedule();
 670         }
 671 
 672         public final String dbgString() {
 673             return "SSL Writer(" + tubeName + ")";
 674         }
 675 
 676         protected void onSubscribe() {
 677             if (debugw.on()) debugw.log("onSubscribe initiating handshaking");
 678             addData(HS_TRIGGER);  // initiates handshaking
 679         }
 680 
 681         void schedule() {
 682             scheduler.runOrSchedule();
 683         }
 684 
 685         void stop() {
 686             if (debugw.on()) debugw.log("stop");
 687             scheduler.stop();
 688         }
 689 
 690         @Override
 691         public boolean closing() {
 692             return closeNotifyReceived();
 693         }
 694 
 695         private boolean isCompleting() {
 696             return completing;
 697         }
 698 
 699         @Override
 700         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
 701             if (writeList.size() > 10)
 702                 return 0;
 703             else
 704                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
 705         }
 706 
 707         private boolean hsTriggered() {
 708             synchronized(writeList) {
 709                 for (ByteBuffer b : writeList)
 710                     if (b == HS_TRIGGER)
 711                         return true;
 712                 return false;
 713             }
 714         }
 715 
 716         void triggerWrite() {
 717             synchronized (writeList) {
 718                 if (writeList.isEmpty()) {
 719                     writeList.add(HS_TRIGGER);
 720                 }
 721             }
 722             scheduler.runOrSchedule();
 723         }
 724 
 725         private void processData() {
 726             boolean completing = isCompleting();
 727 
 728             try {
 729                 if (debugw.on())
 730                     debugw.log("processData, writeList remaining:"
 731                                 + Utils.remaining(writeList) + ", hsTriggered:"
 732                                 + hsTriggered() + ", needWrap:" + needWrap());
 733 
 734                 while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) {
 735                     ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
 736                     EngineResult result = wrapBuffers(outbufs);
 737                     if (debugw.on())
 738                         debugw.log("wrapBuffer returned %s", result.result);
 739 
 740                     if (result.status() == Status.CLOSED) {
 741                         if (!upstreamCompleted) {
 742                             upstreamCompleted = true;
 743                             upstreamSubscription.cancel();
 744                         }
 745                         if (result.bytesProduced() <= 0)
 746                             return;
 747 
 748                         if (!completing && !completed) {
 749                             completing = this.completing = true;
 750                             // There could still be some outgoing data in outbufs.
 751                             writeList.add(SENTINEL);
 752                         }
 753                     }
 754 
 755                     boolean handshaking = false;
 756                     if (result.handshaking()) {
 757                         if (debugw.on()) debugw.log("handshaking");
 758                         doHandshake(result, WRITER);  // ok to ignore return
 759                         handshaking = true;
 760                     } else {
 761                         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
 762                             applicationBufferSize = engine.getSession().getApplicationBufferSize();
 763                             packetBufferSize = engine.getSession().getPacketBufferSize();
 764                             setALPN();
 765                             resumeActivity();
 766                         }
 767                     }
 768                     cleanList(writeList); // tidy up the source list
 769                     sendResultBytes(result);
 770                     if (handshaking) {
 771                         if (!completing && needWrap()) {
 772                             continue;
 773                         } else {
 774                             return;
 775                         }
 776                     }
 777                 }
 778                 if (completing && Utils.remaining(writeList) == 0) {
 779                     if (!completed) {
 780                         completed = true;
 781                         writeList.clear();
 782                         outgoing(Utils.EMPTY_BB_LIST, true);
 783                     }
 784                     return;
 785                 }
 786                 if (writeList.isEmpty() && needWrap()) {
 787                     writer.addData(HS_TRIGGER);
 788                 }
 789             } catch (Throwable ex) {
 790                 errorCommon(ex);
 791                 handleError(ex);
 792             }
 793         }
 794 
 795         // The SSLEngine insists on being given a buffer that is at least
 796         // SSLSession.getPacketBufferSize() long (usually 16K). If given
 797         // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only
 798         // has 6 bytes to wrap. Typical usage shows that for GET we
 799         // usually produce an average of ~ 100 bytes.
 800         // To avoid wasting space, and because allocating and zeroing
 801         // 16K buffers for encoding 6 bytes is costly, we are reusing the
 802         // same writeBuffer to interact with SSLEngine.wrap().
 803         // If the SSLEngine produces less than writeBuffer.capacity() / 2,
 804         // then we copy off the bytes to a smaller buffer that we send
 805         // downstream. Otherwise, we send the writeBuffer downstream
 806         // and will allocate a new one next time.
 807         volatile ByteBuffer writeBuffer;
 808         private volatile Status lastWrappedStatus;
 809         @SuppressWarnings("fallthrough")
 810         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
 811             long len = Utils.remaining(src);
 812             if (debugw.on())
 813                 debugw.log("wrapping " + len + " bytes");
 814 
 815             ByteBuffer dst = writeBuffer;
 816             if (dst == null) dst = writeBuffer = getNetBuffer();
 817             assert dst.position() == 0 : "buffer position is " + dst.position();
 818             assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity();
 819 
 820             while (true) {
 821                 SSLEngineResult sslResult = engine.wrap(src, dst);
 822                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
 823                 switch (lastWrappedStatus = sslResult.getStatus()) {
 824                     case BUFFER_OVERFLOW:
 825                         // Shouldn't happen. We allocated buffer with packet size
 826                         // get it again if net buffer size was changed
 827                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
 828                         int netSize = packetBufferSize
 829                                 = engine.getSession().getPacketBufferSize();
 830                         ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position());
 831                         dst.flip();
 832                         b.put(dst);
 833                         dst = b;
 834                         break; // try again
 835                     case CLOSED:
 836                         if (debugw.on()) debugw.log("CLOSED");
 837                         // fallthrough. There could be some remaining data in dst.
 838                         // CLOSED will be handled by the caller.
 839                     case OK:
 840                         final ByteBuffer dest;
 841                         if (dst.position() == 0) {
 842                             dest = NOTHING; // can happen if handshake is in progress
 843                         } else if (dst.position() < dst.capacity() / 2) {
 844                             // less than half the buffer was used.
 845                             // copy off the bytes to a smaller buffer, and keep
 846                             // the writeBuffer for next time.
 847                             dst.flip();
 848                             dest = Utils.copyAligned(dst);
 849                             dst.clear();
 850                         } else {
 851                             // more than half the buffer was used.
 852                             // just send that buffer downstream, and we will
 853                             // get a new writeBuffer next time it is needed.
 854                             dst.flip();
 855                             dest = dst;
 856                             writeBuffer = null;
 857                         }
 858                         if (debugw.on())
 859                             debugw.log("OK => produced: %d bytes into %d, not wrapped: %d",
 860                                        dest.remaining(),  dest.capacity(), Utils.remaining(src));
 861                         return new EngineResult(sslResult, dest);
 862                     case BUFFER_UNDERFLOW:
 863                         // Shouldn't happen.  Doesn't returns when wrap()
 864                         // underflow handled externally
 865                         // assert false : "Buffer Underflow";
 866                         if (debug.on()) debug.log("BUFFER_UNDERFLOW");
 867                         return new EngineResult(sslResult);
 868                     default:
 869                         if (debugw.on())
 870                             debugw.log("result: %s", sslResult.getStatus());
 871                         assert false : "result:" + sslResult.getStatus();
 872                 }
 873             }
 874         }
 875 
 876         private boolean needWrap() {
 877             return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
 878         }
 879 
 880         private void sendResultBytes(EngineResult result) {
 881             if (result.bytesProduced() > 0) {
 882                 if (debugw.on())
 883                     debugw.log("Sending %d bytes downstream",
 884                                result.bytesProduced());
 885                 outgoing(result.destBuffer, false);
 886             }
 887         }
 888 
 889         @Override
 890         public String toString() {
 891             return "WRITER: " + super.toString()
 892                     + ", writeList size: " + Integer.toString(writeList.size())
 893                     + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running")
 894                     + ", status: " + lastWrappedStatus;
 895                     //" writeList: " + writeList.toString();
 896         }
 897     }
 898 
 899     private void handleError(Throwable t) {
 900         if (debug.on()) debug.log("handleError", t);
 901         readerCF.completeExceptionally(t);
 902         writerCF.completeExceptionally(t);
 903         // no-op if already completed
 904         alpnCF.completeExceptionally(t);
 905         reader.stop();
 906         writer.stop();
 907     }
 908 
 909     boolean stopped;
 910 
 911     private synchronized void normalStop() {
 912         if (stopped)
 913             return;
 914         stopped = true;
 915         reader.stop();
 916         writer.stop();
 917         if (isMonitored) Monitor.remove(monitor);
 918     }
 919 
 920     private Void stopOnError(Throwable currentlyUnused) {
 921         // maybe log, etc
 922         normalStop();
 923         return null;
 924     }
 925 
 926     private void cleanList(List<ByteBuffer> l) {
 927         synchronized (l) {
 928             Iterator<ByteBuffer> iter = l.iterator();
 929             while (iter.hasNext()) {
 930                 ByteBuffer b = iter.next();
 931                 if (!b.hasRemaining() && b != SENTINEL) {
 932                     iter.remove();
 933                 }
 934             }
 935         }
 936     }
 937 
 938     /**
 939      * States for handshake. We avoid races when accessing/updating the AtomicInt
 940      * because updates always schedule an additional call to both the read()
 941      * and write() functions.
 942      */
 943     private static final int NOT_HANDSHAKING = 0;
 944     private static final int HANDSHAKING = 1;
 945 
 946     // Bit flags
 947     // a thread is currently executing tasks
 948     private static final int DOING_TASKS = 4;
 949     // a thread wants to execute tasks, while another thread is executing
 950     private static final int REQUESTING_TASKS = 8;
 951     private static final int TASK_BITS = 12; // Both bits
 952 
 953     private static final int READER = 1;
 954     private static final int WRITER = 2;
 955 
 956     private static String states(AtomicInteger state) {
 957         int s = state.get();
 958         StringBuilder sb = new StringBuilder();
 959         int x = s & ~TASK_BITS;
 960         switch (x) {
 961             case NOT_HANDSHAKING:
 962                 sb.append(" NOT_HANDSHAKING ");
 963                 break;
 964             case HANDSHAKING:
 965                 sb.append(" HANDSHAKING ");
 966                 break;
 967             default:
 968                 throw new InternalError();
 969         }
 970         if ((s & DOING_TASKS) > 0)
 971             sb.append("|DOING_TASKS");
 972         if ((s & REQUESTING_TASKS) > 0)
 973             sb.append("|REQUESTING_TASKS");
 974         return sb.toString();
 975     }
 976 
 977     private void resumeActivity() {
 978         reader.schedule();
 979         writer.schedule();
 980     }
 981 
 982     final AtomicInteger handshakeState;
 983     final ConcurrentLinkedQueue<String> stateList =
 984             debug.on() ? new ConcurrentLinkedQueue<>() : null;
 985 
 986     // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS
 987     // depending on previous value
 988     private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> {
 989         if ((current & DOING_TASKS) == 0)
 990             return DOING_TASKS | (current & HANDSHAKING);
 991         else
 992             return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING);
 993     };
 994 
 995     // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set
 996     // clears bits if not.
 997     private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> {
 998         if ((current & REQUESTING_TASKS) != 0)
 999             return DOING_TASKS | (current & HANDSHAKING);
1000         // clear both bits
1001         return (current & HANDSHAKING);
1002     };
1003 
1004     private boolean doHandshake(EngineResult r, int caller) {
1005         // unconditionally sets the HANDSHAKING bit, while preserving task bits
1006         handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS));
1007         if (stateList != null && debug.on()) {
1008             stateList.add(r.handshakeStatus().toString());
1009             stateList.add(Integer.toString(caller));
1010         }
1011         switch (r.handshakeStatus()) {
1012             case NEED_TASK:
1013                 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS);
1014                 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks
1015                     return false;
1016                 }
1017 
1018                 if (debug.on()) debug.log("obtaining and initiating task execution");
1019                 List<Runnable> tasks = obtainTasks();
1020                 executeTasks(tasks);
1021                 return false;  // executeTasks will resume activity
1022             case NEED_WRAP:
1023                 if (caller == READER) {
1024                     writer.triggerWrite();
1025                     return false;
1026                 }
1027                 break;
1028             case NEED_UNWRAP:
1029             case NEED_UNWRAP_AGAIN:
1030                 // do nothing else
1031                 // receiving-side data will trigger unwrap
1032                 if (caller == WRITER) {
1033                     reader.schedule();
1034                     return false;
1035                 }
1036                 break;
1037             default:
1038                 throw new InternalError("Unexpected handshake status:"
1039                                         + r.handshakeStatus());
1040         }
1041         return true;
1042     }
1043 
1044     private List<Runnable> obtainTasks() {
1045         List<Runnable> l = new ArrayList<>();
1046         Runnable r;
1047         while ((r = engine.getDelegatedTask()) != null) {
1048             l.add(r);
1049         }
1050         return l;
1051     }
1052 
1053     private void executeTasks(List<Runnable> tasks) {
1054         exec.execute(() -> {
1055             try {
1056                 List<Runnable> nextTasks = tasks;
1057                 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size()));
1058                 do {
1059                     nextTasks.forEach(Runnable::run);
1060                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
1061                         nextTasks = obtainTasks();
1062                     } else {
1063                         int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS);
1064                         if ((s & DOING_TASKS) != 0) {
1065                             if (debug.on()) debug.log("re-running tasks (B)");
1066                             nextTasks = obtainTasks();
1067                             continue;
1068                         }
1069                         break;
1070                     }
1071                 } while (true);
1072                 if (debug.on()) debug.log("finished task execution");
1073                 resumeActivity();
1074             } catch (Throwable t) {
1075                 handleError(t);
1076             }
1077         });
1078     }
1079 
1080     // FIXME: acknowledge a received CLOSE request from peer
1081     EngineResult doClosure(EngineResult r) throws IOException {
1082         if (debug.on())
1083             debug.log("doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]",
1084                       r.result, engine.getHandshakeStatus(),
1085                       engine.isOutboundDone(), engine.isInboundDone());
1086         if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
1087             // we have received TLS close_notify and need to send
1088             // an acknowledgement back. We're calling doHandshake
1089             // to finish the close handshake.
1090             if (engine.isInboundDone() && !engine.isOutboundDone()) {
1091                 if (debug.on()) debug.log("doClosure: close_notify received");
1092                 close_notify_received = true;
1093                 if (!writer.scheduler.isStopped()) {
1094                     doHandshake(r, READER);
1095                 } else {
1096                     // We have received closed notify, but we
1097                     // won't be able to send the acknowledgement.
1098                     // Nothing more will come from the socket either,
1099                     // so mark the reader as completed.
1100                     synchronized (reader.readBufferLock) {
1101                         reader.completing = true;
1102                     }
1103                 }
1104             }
1105         }
1106         return r;
1107     }
1108 
1109     /**
1110      * Returns the upstream Flow.Subscriber of the reading (incoming) side.
1111      * This flow must be given the encrypted data read from upstream (eg socket)
1112      * before it is decrypted.
1113      */
1114     public Flow.Subscriber<List<ByteBuffer>> upstreamReader() {
1115         return reader;
1116     }
1117 
1118     /**
1119      * Returns the upstream Flow.Subscriber of the writing (outgoing) side.
1120      * This flow contains the plaintext data before it is encrypted.
1121      */
1122     public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() {
1123         return writer;
1124     }
1125 
1126     public boolean resumeReader() {
1127         return reader.signalScheduling();
1128     }
1129 
1130     public void resetReaderDemand() {
1131         reader.resetDownstreamDemand();
1132     }
1133 
1134     static class EngineResult {
1135         final SSLEngineResult result;
1136         final ByteBuffer destBuffer;
1137 
1138         // normal result
1139         EngineResult(SSLEngineResult result) {
1140             this(result, null);
1141         }
1142 
1143         EngineResult(SSLEngineResult result, ByteBuffer destBuffer) {
1144             this.result = result;
1145             this.destBuffer = destBuffer;
1146         }
1147 
1148         boolean handshaking() {
1149             HandshakeStatus s = result.getHandshakeStatus();
1150             return s != HandshakeStatus.FINISHED
1151                    && s != HandshakeStatus.NOT_HANDSHAKING
1152                    && result.getStatus() != Status.CLOSED;
1153         }
1154 
1155         boolean needUnwrap() {
1156             HandshakeStatus s = result.getHandshakeStatus();
1157             return s == HandshakeStatus.NEED_UNWRAP;
1158         }
1159 
1160 
1161         int bytesConsumed() {
1162             return result.bytesConsumed();
1163         }
1164 
1165         int bytesProduced() {
1166             return result.bytesProduced();
1167         }
1168 
1169         SSLEngineResult.HandshakeStatus handshakeStatus() {
1170             return result.getHandshakeStatus();
1171         }
1172 
1173         SSLEngineResult.Status status() {
1174             return result.getStatus();
1175         }
1176     }
1177 
1178     // The maximum network buffer size negotiated during
1179     // the handshake. Usually 16K.
1180     volatile int packetBufferSize;
1181     final ByteBuffer getNetBuffer() {
1182         int netSize = packetBufferSize;
1183         if (netSize <= 0) {
1184             packetBufferSize = netSize = engine.getSession().getPacketBufferSize();
1185         }
1186         return ByteBuffer.allocate(netSize);
1187     }
1188 
1189     // The maximum application buffer size negotiated during
1190     // the handshake. Usually close to 16K.
1191     volatile int applicationBufferSize;
1192     // Despite of the maximum applicationBufferSize negotiated
1193     // above, TLS records usually have a much smaller payload.
1194     // The adaptativeAppBufferSize records the max payload
1195     // ever decoded, and we use that as a guess for how big
1196     // a buffer we will need for the next payload.
1197     // This avoids allocating and zeroing a 16K buffer for
1198     // nothing...
1199     volatile int adaptiveAppBufferSize;
1200     final ByteBuffer getAppBuffer() {
1201         int appSize = applicationBufferSize;
1202         if (appSize <= 0) {
1203             applicationBufferSize = appSize
1204                     = engine.getSession().getApplicationBufferSize();
1205         }
1206         int size = adaptiveAppBufferSize;
1207         if (size <= 0) {
1208             size = 512; // start with 512 this is usually enough for handshaking / headers
1209         } else if (size > appSize) {
1210             size = appSize;
1211         }
1212         // will cause a BUFFER_OVERFLOW if not big enough, but
1213         // that's OK.
1214         return ByteBuffer.allocate(size);
1215     }
1216 
1217     final String dbgString() {
1218         return "SSLFlowDelegate(" + tubeName + ")";
1219     }
1220 }