1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http.internal.common;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.nio.ByteBuffer;
  31 import java.util.concurrent.Executor;
  32 import java.util.concurrent.Flow;
  33 import java.util.concurrent.Flow.Subscriber;
  34 import java.util.List;
  35 import java.util.ArrayList;
  36 import java.util.Collections;
  37 import java.util.Iterator;
  38 import java.util.LinkedList;
  39 import java.util.concurrent.CompletableFuture;
  40 import java.util.concurrent.ConcurrentLinkedQueue;
  41 import java.util.concurrent.atomic.AtomicInteger;
  42 import javax.net.ssl.SSLEngine;
  43 import javax.net.ssl.SSLEngineResult;
  44 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
  45 import javax.net.ssl.SSLEngineResult.Status;
  46 import javax.net.ssl.SSLException;
  47 import jdk.incubator.http.internal.common.SubscriberWrapper.SchedulingAction;
  48 
  49 /**
  50  * Implements SSL using two SubscriberWrappers.
  51  *
  52  * <p> Constructor takes two Flow.Subscribers: one that receives the network
  53  * data (after it has been encrypted by SSLFlowDelegate) data, and one that
  54  * receives the application data (before it has been encrypted by SSLFlowDelegate).
  55  *
  56  * <p> Methods upstreamReader() and upstreamWriter() return the corresponding
  57  * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data.
  58  * See diagram below.
  59  *
  60  * <p> How Flow.Subscribers are used in this class, and where they come from:
  61  * <pre>
  62  * {@code
  63  *
  64  *
  65  *
  66  * --------->  data flow direction
  67  *
  68  *
  69  *                         +------------------+
  70  *        upstreamWriter   |                  | downWriter
  71  *        ---------------> |                  | ------------>
  72  *  obtained from this     |                  | supplied to constructor
  73  *                         | SSLFlowDelegate  |
  74  *        downReader       |                  | upstreamReader
  75  *        <--------------- |                  | <--------------
  76  * supplied to constructor |                  | obtained from this
  77  *                         +------------------+
  78  * }
  79  * </pre>
  80  */
  81 public class SSLFlowDelegate {
  82 
  83     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  84     final System.Logger debug =
  85             Utils.getDebugLogger(this::dbgString, DEBUG);
  86 
  87     final Executor exec;
  88     final Reader reader;
  89     final Writer writer;
  90     final SSLEngine engine;
  91     final String tubeName; // hack
  92     final CompletableFuture<Void> cf;
  93     final CompletableFuture<String> alpnCF; // completes on initial handshake
  94     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
  95     volatile boolean close_notify_received;
  96 
  97     /**
  98      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
  99      * Flow.Subscriber requires an associated {@link CompletableFuture}
 100      * for errors that need to be signaled from downstream to upstream.
 101      */
 102     public SSLFlowDelegate(SSLEngine engine,
 103                            Executor exec,
 104                            Subscriber<? super List<ByteBuffer>> downReader,
 105                            Subscriber<? super List<ByteBuffer>> downWriter)
 106     {
 107         this.tubeName = String.valueOf(downWriter);
 108         this.reader = new Reader();
 109         this.writer = new Writer();
 110         this.engine = engine;
 111         this.exec = exec;
 112         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
 113         this.cf = CompletableFuture.allOf(reader.completion(), writer.completion())
 114                                    .thenRun(this::normalStop);
 115         this.alpnCF = new MinimalFuture<>();
 116 
 117         // connect the Reader to the downReader and the
 118         // Writer to the downWriter.
 119         connect(downReader, downWriter);
 120 
 121         //Monitor.add(this::monitor);
 122     }
 123 
 124     /**
 125      * Returns true if the SSLFlowDelegate has detected a TLS
 126      * close_notify from the server.
 127      * @return true, if a close_notify was detected.
 128      */
 129     public boolean closeNotifyReceived() {
 130         return close_notify_received;
 131     }
 132 
 133     /**
 134      * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
 135      * and the write sink (downWriter) to the SSLFlowDelegate Writer.
 136      * Called from within the constructor. Overwritten by SSLTube.
 137      *
 138      * @param downReader  The left hand side read sink (typically, the
 139      *                    HttpConnection read subscriber).
 140      * @param downWriter  The right hand side write sink (typically
 141      *                    the SocketTube write subscriber).
 142      */
 143     void connect(Subscriber<? super List<ByteBuffer>> downReader,
 144                  Subscriber<? super List<ByteBuffer>> downWriter) {
 145         this.reader.subscribe(downReader);
 146         this.writer.subscribe(downWriter);
 147     }
 148 
 149    /**
 150     * Returns a CompletableFuture<String> which completes after
 151     * the initial handshake completes, and which contains the negotiated
 152     * alpn.
 153     */
 154     public CompletableFuture<String> alpn() {
 155         return alpnCF;
 156     }
 157 
 158     private void setALPN() {
 159         // Handshake is finished. So, can retrieve the ALPN now
 160         if (alpnCF.isDone())
 161             return;
 162         String alpn = engine.getApplicationProtocol();
 163         debug.log(Level.DEBUG, "setALPN = %s", alpn);
 164         alpnCF.complete(alpn);
 165     }
 166 
 167     public String monitor() {
 168         StringBuilder sb = new StringBuilder();
 169         sb.append("SSL: HS state: " + states(handshakeState));
 170         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
 171         sb.append(" LL : ");
 172         synchronized(stateList) {
 173             for (String s: stateList) {
 174                 sb.append(s).append(" ");
 175             }
 176         }
 177         sb.append("\r\n");
 178         sb.append("Reader:: ").append(reader.toString());
 179         sb.append("\r\n");
 180         sb.append("Writer:: ").append(writer.toString());
 181         sb.append("\r\n===================================");
 182         return sb.toString();
 183     }
 184 
 185     protected SchedulingAction enterReadScheduling() {
 186         return SchedulingAction.CONTINUE;
 187     }
 188 
 189 
 190     /**
 191      * Processing function for incoming data. Pass it thru SSLEngine.unwrap().
 192      * Any decrypted buffers returned to be passed downstream.
 193      * Status codes:
 194      *     NEED_UNWRAP: do nothing. Following incoming data will contain
 195      *                  any required handshake data
 196      *     NEED_WRAP: call writer.addData() with empty buffer
 197      *     NEED_TASK: delegate task to executor
 198      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap
 199      *     BUFFER_UNDERFLOW: keep buffer and wait for more data
 200      *     OK: return generated buffers.
 201      *
 202      * Upstream subscription strategy is to try and keep no more than
 203      * TARGET_BUFSIZE bytes in readBuf
 204      */
 205     class Reader extends SubscriberWrapper {
 206         final SequentialScheduler scheduler;
 207         static final int TARGET_BUFSIZE = 16 * 1024;
 208         volatile ByteBuffer readBuf;
 209         volatile boolean completing = false;
 210         final Object readBufferLock = new Object();
 211         final System.Logger debugr =
 212             Utils.getDebugLogger(this::dbgString, DEBUG);
 213 
 214         class ReaderDownstreamPusher implements Runnable {
 215             @Override public void run() { processData(); }
 216         }
 217 
 218         Reader() {
 219             super();
 220             scheduler = SequentialScheduler.synchronizedScheduler(
 221                                                 new ReaderDownstreamPusher());
 222             this.readBuf = ByteBuffer.allocate(1024);
 223             readBuf.limit(0); // keep in read mode
 224         }
 225 
 226         protected SchedulingAction enterScheduling() {
 227             return enterReadScheduling();
 228         }
 229 
 230         public final String dbgString() {
 231             return "SSL Reader(" + tubeName + ")";
 232         }
 233 
 234         /**
 235          * entry point for buffers delivered from upstream Subscriber
 236          */
 237         @Override
 238         public void incoming(List<ByteBuffer> buffers, boolean complete) {
 239             debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers)
 240                         + " bytes to read buffer");
 241             addToReadBuf(buffers);
 242             if (complete) {
 243                 this.completing = true;
 244             }
 245             scheduler.runOrSchedule();
 246         }
 247 
 248         @Override
 249         public String toString() {
 250             return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
 251                     + " count: " + count.toString();
 252         }
 253 
 254         private void reallocReadBuf() {
 255             int sz = readBuf.capacity();
 256             ByteBuffer newb = ByteBuffer.allocate(sz*2);
 257             readBuf.flip();
 258             Utils.copy(readBuf, newb);
 259             readBuf = newb;
 260         }
 261 
 262         @Override
 263         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
 264             if (readBuf.remaining() > TARGET_BUFSIZE) {
 265                 return 0;
 266             } else {
 267                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
 268             }
 269         }
 270 
 271         // readBuf is kept ready for reading outside of this method
 272         private void addToReadBuf(List<ByteBuffer> buffers) {
 273             synchronized (readBufferLock) {
 274                 for (ByteBuffer buf : buffers) {
 275                     readBuf.compact();
 276                     while (readBuf.remaining() < buf.remaining())
 277                         reallocReadBuf();
 278                     readBuf.put(buf);
 279                     readBuf.flip();
 280                 }
 281             }
 282         }
 283 
 284         void schedule() {
 285             scheduler.runOrSchedule();
 286         }
 287 
 288         void stop() {
 289             debugr.log(Level.DEBUG, "stop");
 290             scheduler.stop();
 291         }
 292 
 293         AtomicInteger count = new AtomicInteger(0);
 294 
 295         // work function where it all happens
 296         void processData() {
 297             try {
 298                 debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining()
 299                            + " bytes to unwrap "
 300                            + states(handshakeState));
 301 
 302                 while (readBuf.hasRemaining()) {
 303                     boolean handshaking = false;
 304                     try {
 305                         EngineResult result;
 306                         synchronized (readBufferLock) {
 307                             result = unwrapBuffer(readBuf);
 308                             debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
 309                         }
 310                         if (result.status() == Status.BUFFER_UNDERFLOW) {
 311                             debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW");
 312                             return;
 313                         }
 314                         if (completing && result.status() == Status.CLOSED) {
 315                             debugr.log(Level.DEBUG, "Closed: completing");
 316                             outgoing(Utils.EMPTY_BB_LIST, true);
 317                             return;
 318                         }
 319                         if (result.handshaking() && !completing) {
 320                             debugr.log(Level.DEBUG, "handshaking");
 321                             doHandshake(result, READER);
 322                             resumeActivity();
 323                             handshaking = true;
 324                         } else {
 325                             if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
 326                                 setALPN();
 327                                 handshaking = false;
 328                                 resumeActivity();
 329                             }
 330                         }
 331                         if (result.bytesProduced() > 0) {
 332                             debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
 333                             count.addAndGet(result.bytesProduced());
 334                             outgoing(result.destBuffer, false);
 335                         }
 336                     } catch (IOException ex) {
 337                         errorCommon(ex);
 338                         handleError(ex);
 339                     }
 340                     if (handshaking && !completing)
 341                         return;
 342                 }
 343                 if (completing) {
 344                     debugr.log(Level.DEBUG, "completing");
 345                     // Complete the alpnCF, if not already complete, regardless of
 346                     // whether or not the ALPN is available, there will be no more
 347                     // activity.
 348                     setALPN();
 349                     outgoing(Utils.EMPTY_BB_LIST, true);
 350                 }
 351             } catch (Throwable ex) {
 352                 errorCommon(ex);
 353                 handleError(ex);
 354             }
 355         }
 356     }
 357 
 358     /**
 359      * Returns a CompletableFuture which completes after all activity
 360      * in the delegate is terminated (whether normally or exceptionally).
 361      *
 362      * @return
 363      */
 364     public CompletableFuture<Void> completion() {
 365         return cf;
 366     }
 367 
 368     public interface Monitorable {
 369         public String getInfo();
 370     }
 371 
 372     public static class Monitor extends Thread {
 373         final List<Monitorable> list;
 374         static Monitor themon;
 375 
 376         static {
 377             themon = new Monitor();
 378             themon.start(); // uncomment to enable Monitor
 379         }
 380 
 381         Monitor() {
 382             super("Monitor");
 383             setDaemon(true);
 384             list = Collections.synchronizedList(new LinkedList<>());
 385         }
 386 
 387         void addTarget(Monitorable o) {
 388             list.add(o);
 389         }
 390 
 391         public static void add(Monitorable o) {
 392             themon.addTarget(o);
 393         }
 394 
 395         @Override
 396         public void run() {
 397             System.out.println("Monitor starting");
 398             while (true) {
 399                 try {Thread.sleep(20*1000); } catch (Exception e) {}
 400                 synchronized (list) {
 401                     for (Monitorable o : list) {
 402                         System.out.println(o.getInfo());
 403                         System.out.println("-------------------------");
 404                     }
 405                 }
 406                 System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
 407 
 408             }
 409         }
 410     }
 411 
 412     /**
 413      * Processing function for outgoing data. Pass it thru SSLEngine.wrap()
 414      * Any encrypted buffers generated are passed downstream to be written.
 415      * Status codes:
 416      *     NEED_UNWRAP: call reader.addData() with empty buffer
 417      *     NEED_WRAP: call addData() with empty buffer
 418      *     NEED_TASK: delegate task to executor
 419      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap
 420      *     BUFFER_UNDERFLOW: shouldn't happen on writing side
 421      *     OK: return generated buffers
 422      */
 423     class Writer extends SubscriberWrapper {
 424         final SequentialScheduler scheduler;
 425         // queues of buffers received from upstream waiting
 426         // to be processed by the SSLEngine
 427         final List<ByteBuffer> writeList;
 428         final System.Logger debugw =
 429             Utils.getDebugLogger(this::dbgString, DEBUG);
 430 
 431         class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
 432             @Override public void run() { processData(); }
 433         }
 434 
 435         Writer() {
 436             super();
 437             writeList = Collections.synchronizedList(new LinkedList<>());
 438             scheduler = new SequentialScheduler(new WriterDownstreamPusher());
 439         }
 440 
 441         @Override
 442         protected void incoming(List<ByteBuffer> buffers, boolean complete) {
 443             assert complete ? buffers ==  Utils.EMPTY_BB_LIST : true;
 444             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
 445             if (complete) {
 446                 writeList.add(SENTINEL);
 447             } else {
 448                 writeList.addAll(buffers);
 449             }
 450             debugw.log(Level.DEBUG, () -> "added " + buffers.size()
 451                         + " (" + Utils.remaining(buffers)
 452                         + " bytes) to the writeList");
 453             scheduler.runOrSchedule();
 454         }
 455 
 456         public final String dbgString() {
 457             return "SSL Writer(" + tubeName + ")";
 458         }
 459 
 460         protected void onSubscribe() {
 461             doHandshake(EngineResult.INIT, INIT);
 462             resumeActivity();
 463         }
 464 
 465         void schedule() {
 466             scheduler.runOrSchedule();
 467         }
 468 
 469         void stop() {
 470             debugw.log(Level.DEBUG, "stop");
 471             scheduler.stop();
 472         }
 473 
 474         @Override
 475         public boolean closing() {
 476             return closeNotifyReceived();
 477         }
 478 
 479         private boolean isCompleting() {
 480             synchronized(writeList) {
 481                 int lastIndex = writeList.size() - 1;
 482                 if (lastIndex < 0)
 483                     return false;
 484                 return writeList.get(lastIndex) == SENTINEL;
 485             }
 486         }
 487 
 488         @Override
 489         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
 490             if (writeList.size() > 10)
 491                 return 0;
 492             else
 493                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
 494         }
 495 
 496         private boolean hsTriggered() {
 497             synchronized(writeList) {
 498                 for (ByteBuffer b : writeList)
 499                     if (b == HS_TRIGGER)
 500                         return true;
 501                 return false;
 502             }
 503         }
 504 
 505         private void processData() {
 506             boolean completing = isCompleting();
 507 
 508             try {
 509                 debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")");
 510                 while (Utils.remaining(writeList) > 0 || hsTriggered()) {
 511                     ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
 512                     EngineResult result = wrapBuffers(outbufs);
 513                     debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result);
 514 
 515                     if (result.status() == Status.CLOSED) {
 516                         if (result.bytesProduced() <= 0)
 517                             return;
 518 
 519                         completing = true;
 520                         // There could still be some outgoing data in outbufs.
 521                         writeList.add(SENTINEL);
 522                     }
 523 
 524                     boolean handshaking = false;
 525                     if (result.handshaking()) {
 526                         debugw.log(Level.DEBUG, "handshaking");
 527                         doHandshake(result, WRITER);
 528                         handshaking = true;
 529                     } else {
 530                         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
 531                             setALPN();
 532                             resumeActivity();
 533                         }
 534                     }
 535                     cleanList(writeList); // tidy up the source list
 536                     sendResultBytes(result);
 537                     if (handshaking && !completing) {
 538                         if (writeList.isEmpty() && !result.needUnwrap()) {
 539                             writer.addData(HS_TRIGGER);
 540                         }
 541                         return;
 542                     }
 543                 }
 544                 if (completing && Utils.remaining(writeList) == 0) {
 545                     /*
 546                     System.out.println("WRITER DOO 3");
 547                     engine.closeOutbound();
 548                     EngineResult result = wrapBuffers(Utils.EMPTY_BB_ARRAY);
 549                     sendResultBytes(result);
 550                     */
 551                     outgoing(Utils.EMPTY_BB_LIST, true);
 552                     return;
 553                 }
 554             } catch (Throwable ex) {
 555                 handleError(ex);
 556             }
 557         }
 558 
 559         private void sendResultBytes(EngineResult result) {
 560             if (result.bytesProduced() > 0) {
 561                 debugw.log(Level.DEBUG, "Sending %d bytes downstream",
 562                            result.bytesProduced());
 563                 outgoing(result.destBuffer, false);
 564             }
 565         }
 566 
 567         @Override
 568         public String toString() {
 569             return "WRITER: " + super.toString() +
 570                     " writeList size " + Integer.toString(writeList.size());
 571                     //" writeList: " + writeList.toString();
 572         }
 573     }
 574 
 575     private void handleError(Throwable t) {
 576         debug.log(Level.DEBUG, "handleError", t);
 577         cf.completeExceptionally(t);
 578         // no-op if already completed
 579         alpnCF.completeExceptionally(t);
 580         reader.stop();
 581         writer.stop();
 582     }
 583 
 584     private void normalStop() {
 585         reader.stop();
 586         writer.stop();
 587     }
 588 
 589     private void cleanList(List<ByteBuffer> l) {
 590         synchronized (l) {
 591             Iterator<ByteBuffer> iter = l.iterator();
 592             while (iter.hasNext()) {
 593                 ByteBuffer b = iter.next();
 594                 if (!b.hasRemaining()) {
 595                     iter.remove();
 596                 }
 597             }
 598         }
 599     }
 600 
 601     /**
 602      * States for handshake. We avoid races when accessing/updating the AtomicInt
 603      * because updates always schedule an additional call to both the read()
 604      * and write() functions.
 605      */
 606     private static final int NOT_HANDSHAKING = 0;
 607     private static final int HANDSHAKING = 1;
 608     private static final int INIT = 2;
 609     private static final int DOING_TASKS = 4; // bit added to above state
 610     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
 611 
 612     private static final int READER = 1;
 613     private static final int WRITER = 2;
 614 
 615     private static String states(AtomicInteger state) {
 616         int s = state.get();
 617         StringBuilder sb = new StringBuilder();
 618         int x = s & ~DOING_TASKS;
 619         switch (x) {
 620             case NOT_HANDSHAKING:
 621                 sb.append(" NOT_HANDSHAKING ");
 622                 break;
 623             case HANDSHAKING:
 624                 sb.append(" HANDSHAKING ");
 625                 break;
 626             case INIT:
 627                 sb.append(" INIT ");
 628                 break;
 629             default:
 630                 throw new InternalError();
 631         }
 632         if ((s & DOING_TASKS) > 0)
 633             sb.append("|DOING_TASKS");
 634         return sb.toString();
 635     }
 636 
 637     private void resumeActivity() {
 638         reader.schedule();
 639         writer.schedule();
 640     }
 641 
 642     final AtomicInteger handshakeState;
 643     final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
 644 
 645     private void doHandshake(EngineResult r, int caller) {
 646         int s = handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
 647         stateList.add(r.handshakeStatus().toString());
 648         stateList.add(Integer.toString(caller));
 649         switch (r.handshakeStatus()) {
 650             case NEED_TASK:
 651                 if ((s & DOING_TASKS) > 0) // someone else was doing tasks
 652                     return;
 653                 List<Runnable> tasks = obtainTasks();
 654                 executeTasks(tasks);
 655                 break;
 656             case NEED_WRAP:
 657                 writer.addData(HS_TRIGGER);
 658                 break;
 659             case NEED_UNWRAP:
 660             case NEED_UNWRAP_AGAIN:
 661                 // do nothing else
 662                 break;
 663             default:
 664                 throw new InternalError("Unexpected handshake status:"
 665                                         + r.handshakeStatus());
 666         }
 667     }
 668 
 669     private List<Runnable> obtainTasks() {
 670         List<Runnable> l = new ArrayList<>();
 671         Runnable r;
 672         while ((r = engine.getDelegatedTask()) != null) {
 673             l.add(r);
 674         }
 675         return l;
 676     }
 677 
 678     private void executeTasks(List<Runnable> tasks) {
 679         exec.execute(() -> {
 680             handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
 681             try {
 682                 tasks.forEach((r) -> {
 683                     r.run();
 684                 });
 685             } catch (Throwable t) {
 686                 handleError(t);
 687             }
 688             handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
 689             writer.addData(HS_TRIGGER);
 690             resumeActivity();
 691         });
 692     }
 693 
 694 
 695     EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
 696         ByteBuffer dst = getAppBuffer();
 697         while (true) {
 698             SSLEngineResult sslResult = engine.unwrap(src, dst);
 699             switch (sslResult.getStatus()) {
 700                 case BUFFER_OVERFLOW:
 701                     // may happen only if app size buffer was changed.
 702                     // get it again if app buffer size changed
 703                     int appSize = engine.getSession().getApplicationBufferSize();
 704                     ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
 705                     dst.flip();
 706                     b.put(dst);
 707                     dst = b;
 708                     break;
 709                 case CLOSED:
 710                     return doClosure(new EngineResult(sslResult));
 711                 case BUFFER_UNDERFLOW:
 712                     // handled implicitly by compaction/reallocation of readBuf
 713                     return new EngineResult(sslResult);
 714                 case OK:
 715                      dst.flip();
 716                      return new EngineResult(sslResult, dst);
 717             }
 718         }
 719     }
 720 
 721     // FIXME: acknowledge a received CLOSE request from peer
 722     EngineResult doClosure(EngineResult r) throws IOException {
 723         debug.log(Level.DEBUG,
 724                 "doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]",
 725                 r.result, engine.getHandshakeStatus(),
 726                 engine.isOutboundDone(), engine.isInboundDone());
 727         if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
 728             // we have received TLS close_notify and need to send
 729             // an acknowledgement back. We're calling doHandshake
 730             // to finish the close handshake.
 731             if (engine.isInboundDone() && !engine.isOutboundDone()) {
 732                 debug.log(Level.DEBUG, "doClosure: close_notify received");
 733                 close_notify_received = true;
 734                 doHandshake(r, READER);
 735             }
 736         }
 737         return r;
 738     }
 739 
 740     /**
 741      * Returns the upstream Flow.Subscriber of the reading (incoming) side.
 742      * This flow must be given the encrypted data read from upstream (eg socket)
 743      * before it is decrypted.
 744      */
 745     public Flow.Subscriber<List<ByteBuffer>> upstreamReader() {
 746         return reader;
 747     }
 748 
 749     /**
 750      * Returns the upstream Flow.Subscriber of the writing (outgoing) side.
 751      * This flow contains the plaintext data before it is encrypted.
 752      */
 753     public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() {
 754         return writer;
 755     }
 756 
 757     public boolean resumeReader() {
 758         return reader.signalScheduling();
 759     }
 760 
 761     public void resetReaderDemand() {
 762         reader.resetDownstreamDemand();
 763     }
 764 
 765     static class EngineResult {
 766         final SSLEngineResult result;
 767         final ByteBuffer destBuffer;
 768 
 769         // normal result
 770         EngineResult(SSLEngineResult result) {
 771             this(result, null);
 772         }
 773 
 774         EngineResult(SSLEngineResult result, ByteBuffer destBuffer) {
 775             this.result = result;
 776             this.destBuffer = destBuffer;
 777         }
 778 
 779         // Special result used to trigger handshaking in constructor
 780         static EngineResult INIT =
 781             new EngineResult(
 782                 new SSLEngineResult(SSLEngineResult.Status.OK, HandshakeStatus.NEED_WRAP, 0, 0));
 783 
 784         boolean handshaking() {
 785             HandshakeStatus s = result.getHandshakeStatus();
 786             return s != HandshakeStatus.FINISHED
 787                    && s != HandshakeStatus.NOT_HANDSHAKING
 788                    && result.getStatus() != Status.CLOSED;
 789         }
 790 
 791         boolean needUnwrap() {
 792             HandshakeStatus s = result.getHandshakeStatus();
 793             return s == HandshakeStatus.NEED_UNWRAP;
 794         }
 795 
 796 
 797         int bytesConsumed() {
 798             return result.bytesConsumed();
 799         }
 800 
 801         int bytesProduced() {
 802             return result.bytesProduced();
 803         }
 804 
 805         SSLEngineResult.HandshakeStatus handshakeStatus() {
 806             return result.getHandshakeStatus();
 807         }
 808 
 809         SSLEngineResult.Status status() {
 810             return result.getStatus();
 811         }
 812     }
 813 
 814     public ByteBuffer getNetBuffer() {
 815         return ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
 816     }
 817 
 818     private ByteBuffer getAppBuffer() {
 819         return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
 820     }
 821 
 822     final String dbgString() {
 823         return "SSLFlowDelegate(" + tubeName + ")";
 824     }
 825 
 826     @SuppressWarnings("fallthrough")
 827     EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
 828         debug.log(Level.DEBUG, () -> "wrapping "
 829                     + Utils.remaining(src) + " bytes");
 830         ByteBuffer dst = getNetBuffer();
 831         while (true) {
 832             SSLEngineResult sslResult = engine.wrap(src, dst);
 833             debug.log(Level.DEBUG, () -> "SSLResult: " + sslResult);
 834             switch (sslResult.getStatus()) {
 835                 case BUFFER_OVERFLOW:
 836                     // Shouldn't happen. We allocated buffer with packet size
 837                     // get it again if net buffer size was changed
 838                     debug.log(Level.DEBUG, "BUFFER_OVERFLOW");
 839                     int appSize = engine.getSession().getApplicationBufferSize();
 840                     ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
 841                     dst.flip();
 842                     b.put(dst);
 843                     dst = b;
 844                     break; // try again
 845                 case CLOSED:
 846                     debug.log(Level.DEBUG, "CLOSED");
 847                     // fallthrough. There could be some remaining data in dst.
 848                     // CLOSED will be handled by the caller.
 849                 case OK:
 850                     dst.flip();
 851                     final ByteBuffer dest = dst;
 852                     debug.log(Level.DEBUG, () -> "OK => produced: "
 853                                            + dest.remaining()
 854                                            + " not wrapped: "
 855                                            + Utils.remaining(src));
 856                     return new EngineResult(sslResult, dest);
 857                 case BUFFER_UNDERFLOW:
 858                     // Shouldn't happen.  Doesn't returns when wrap()
 859                     // underflow handled externally
 860                     // assert false : "Buffer Underflow";
 861                     debug.log(Level.DEBUG, "BUFFER_UNDERFLOW");
 862                     return new EngineResult(sslResult);
 863                 default:
 864                     debug.log(Level.DEBUG, "ASSERT");
 865                     assert false;
 866             }
 867         }
 868     }
 869 }