1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http.internal.common;
  27 
  28 import java.lang.System.Logger.Level;
  29 import java.nio.ByteBuffer;
  30 import java.util.List;
  31 import java.util.Objects;
  32 import java.util.concurrent.CompletableFuture;
  33 import java.util.concurrent.Executor;
  34 import java.util.concurrent.Flow;
  35 import java.util.concurrent.atomic.AtomicReference;
  36 import java.util.function.Consumer;
  37 import javax.net.ssl.SSLEngine;
  38 import javax.net.ssl.SSLHandshakeException;
  39 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
  40 import jdk.incubator.http.internal.common.SubscriberWrapper.SchedulingAction;
  41 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
  42 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
  43 
  44 /**
  45  * An implementation of FlowTube that wraps another FlowTube in an
  46  * SSL flow.
  47  * <p>
  48  * The following diagram shows a typical usage of the SSLTube, where
  49  * the SSLTube wraps a SocketTube on the right hand side, and is connected
  50  * to an HttpConnection on the left hand side.
  51  *
  52  * <preformatted>{@code
  53  *                  +----------  SSLTube -------------------------+
  54  *                  |                                             |
  55  *                  |                    +---SSLFlowDelegate---+  |
  56  *  HttpConnection  |                    |                     |  |   SocketTube
  57  *    read sink  <- SSLSubscriberW.   <- Reader <- upstreamR.() <---- read source
  58  *  (a subscriber)  |                    |    \         /      |  |  (a publisher)
  59  *                  |                    |     SSLEngine       |  |
  60  *  HttpConnection  |                    |    /         \      |  |   SocketTube
  61  *  write source -> SSLSubscriptionW. -> upstreamW.() -> Writer ----> write sink
  62  *  (a publisher)   |                    |                     |  |  (a subscriber)
  63  *                  |                    +---------------------+  |
  64  *                  |                                             |
  65  *                  +---------------------------------------------+
  66  * }</preformatted>
  67  */
  68 public class SSLTube implements FlowTube {
  69 
  70     static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag.
  71     final System.Logger debug =
  72             Utils.getDebugLogger(this::dbgString, DEBUG);
  73 
  74     private final FlowTube tube;
  75     private final SSLSubscriberWrapper readSubscriber;
  76     private final SSLSubscriptionWrapper writeSubscription;
  77     private final SSLFlowDelegate sslDelegate;
  78     private final SSLEngine engine;
  79     private volatile boolean finished;
  80 
  81     public SSLTube(SSLEngine engine, Executor executor, FlowTube tube) {
  82         Objects.requireNonNull(engine);
  83         Objects.requireNonNull(executor);
  84         this.tube = Objects.requireNonNull(tube);
  85         writeSubscription = new SSLSubscriptionWrapper();
  86         readSubscriber = new SSLSubscriberWrapper();
  87         this.engine = engine;
  88         sslDelegate = new SSLTubeFlowDelegate(engine,
  89                                               executor,
  90                                               readSubscriber,
  91                                               tube);
  92     }
  93 
  94     final class SSLTubeFlowDelegate extends SSLFlowDelegate {
  95         SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
  96                             SSLSubscriberWrapper readSubscriber,
  97                             FlowTube tube) {
  98             super(engine, executor, readSubscriber, tube);
  99         }
 100         protected SchedulingAction enterReadScheduling() {
 101             readSubscriber.processPendingSubscriber();
 102             return SchedulingAction.CONTINUE;
 103         }
 104         void connect(Flow.Subscriber<? super List<ByteBuffer>> downReader,
 105                      Flow.Subscriber<? super List<ByteBuffer>> downWriter) {
 106             assert downWriter == tube;
 107             assert downReader == readSubscriber;
 108 
 109             // Connect the read sink first. That's the left-hand side
 110             // downstream subscriber from the HttpConnection (or more
 111             // accurately, the SSLSubscriberWrapper that will wrap it
 112             // when SSLTube::connectFlows is called.
 113             reader.subscribe(downReader);
 114 
 115             // Connect the right hand side tube (the socket tube).
 116             //
 117             // The SSLFlowDelegate.writer publishes ByteBuffer to
 118             // the SocketTube for writing on the socket, and the
 119             // SSLFlowDelegate::upstreamReader subscribes to the
 120             // SocketTube to receive ByteBuffers read from the socket.
 121             //
 122             // Basically this method is equivalent to:
 123             //     // connect the read source:
 124             //     //   subscribe the SSLFlowDelegate upstream reader
 125             //     //   to the socket tube publisher.
 126             //     tube.subscribe(upstreamReader());
 127             //     // connect the write sink:
 128             //     //   subscribe the socket tube write subscriber
 129             //     //   with the SSLFlowDelegate downstream writer.
 130             //     writer.subscribe(tube);
 131             tube.connectFlows(FlowTube.asTubePublisher(writer),
 132                               FlowTube.asTubeSubscriber(upstreamReader()));
 133 
 134             // Finally connect the write source. That's the left
 135             // hand side publisher which will push ByteBuffer for
 136             // writing and encryption to the SSLFlowDelegate.
 137             // The writeSubscription is in fact the SSLSubscriptionWrapper
 138             // that will wrap the subscription provided by the
 139             // HttpConnection publisher when SSLTube::connectFlows
 140             // is called.
 141             upstreamWriter().onSubscribe(writeSubscription);
 142         }
 143     }
 144 
 145     public CompletableFuture<String> getALPN() {
 146         return sslDelegate.alpn();
 147     }
 148 
 149     @Override
 150     public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
 151         readSubscriber.dropSubscription();
 152         readSubscriber.setDelegate(s);
 153         s.onSubscribe(readSubscription);
 154     }
 155 
 156     /**
 157      * Tells whether, or not, this FlowTube has finished receiving data.
 158      *
 159      * @return true when one of this FlowTube Subscriber's OnError or onComplete
 160      * methods have been invoked
 161      */
 162     @Override
 163     public boolean isFinished() {
 164         return finished;
 165     }
 166 
 167     private volatile Flow.Subscription readSubscription;
 168 
 169     // The DelegateWrapper wraps a subscribed {@code Flow.Subscriber} and
 170     // tracks the subscriber's state. In particular it makes sure that
 171     // onComplete/onError are not called before onSubscribed.
 172     final static class DelegateWrapper implements FlowTube.TubeSubscriber {
 173         private final FlowTube.TubeSubscriber delegate;
 174         private final System.Logger debug;
 175         volatile boolean subscribedCalled;
 176         volatile boolean subscribedDone;
 177         volatile boolean completed;
 178         volatile Throwable error;
 179         DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate,
 180                         System.Logger debug) {
 181             this.delegate = FlowTube.asTubeSubscriber(delegate);
 182             this.debug = debug;
 183         }
 184 
 185         @Override
 186         public void dropSubscription() {
 187             if (subscribedCalled && !completed) {
 188                 delegate.dropSubscription();
 189             }
 190         }
 191 
 192         @Override
 193         public void onNext(List<ByteBuffer> item) {
 194             assert subscribedCalled;
 195             delegate.onNext(item);
 196         }
 197 
 198         @Override
 199         public void onSubscribe(Flow.Subscription subscription) {
 200             onSubscribe(delegate::onSubscribe, subscription);
 201         }
 202 
 203         private void onSubscribe(Consumer<Flow.Subscription> method,
 204                                  Flow.Subscription subscription) {
 205             subscribedCalled = true;
 206             method.accept(subscription);
 207             Throwable x;
 208             boolean finished;
 209             synchronized (this) {
 210                 subscribedDone = true;
 211                 x = error;
 212                 finished = completed;
 213             }
 214             if (x != null) {
 215                 debug.log(Level.DEBUG,
 216                           "Subscriber completed before subscribe: forwarding %s",
 217                           (Object)x);
 218                 delegate.onError(x);
 219             } else if (finished) {
 220                 debug.log(Level.DEBUG,
 221                           "Subscriber completed before subscribe: calling onComplete()");
 222                 delegate.onComplete();
 223             }
 224         }
 225 
 226         @Override
 227         public void onError(Throwable t) {
 228             if (completed) {
 229                 debug.log(Level.DEBUG,
 230                           "Subscriber already completed: ignoring %s",
 231                           (Object)t);
 232                 return;
 233             }
 234             boolean subscribed;
 235             synchronized (this) {
 236                 if (completed) return;
 237                 error = t;
 238                 completed = true;
 239                 subscribed = subscribedDone;
 240             }
 241             if (subscribed) {
 242                 delegate.onError(t);
 243             } else {
 244                 debug.log(Level.DEBUG,
 245                           "Subscriber not yet subscribed: stored %s",
 246                           (Object)t);
 247             }
 248         }
 249 
 250         @Override
 251         public void onComplete() {
 252             if (completed) return;
 253             boolean subscribed;
 254             synchronized (this) {
 255                 if (completed) return;
 256                 completed = true;
 257                 subscribed = subscribedDone;
 258             }
 259             if (subscribed) {
 260                 delegate.onComplete();
 261             } else {
 262                 debug.log(Level.DEBUG,
 263                           "Subscriber not yet subscribed: stored completed=true");
 264             }
 265         }
 266 
 267         @Override
 268         public String toString() {
 269             return "DelegateWrapper:" + delegate.toString();
 270         }
 271 
 272     }
 273 
 274     // Used to read data from the SSLTube.
 275     final class SSLSubscriberWrapper implements FlowTube.TubeSubscriber {
 276         private AtomicReference<DelegateWrapper> pendingDelegate =
 277                 new AtomicReference<>();
 278         private volatile DelegateWrapper subscribed;
 279         private volatile boolean onCompleteReceived;
 280         private final AtomicReference<Throwable> errorRef
 281                 = new AtomicReference<>();
 282 
 283         // setDelegate can be called asynchronously when the SSLTube flow
 284         // is connected. At this time the permanent subscriber (this class)
 285         // may already be subscribed (readSubscription != null) or not.
 286         // 1. If it's already subscribed (readSubscription != null), we
 287         //    are going to signal the SSLFlowDelegate reader, and make sure
 288         //    onSubscribed is called within the reader flow
 289         // 2. If it's not yet subscribed (readSubscription == null), then
 290         //    we're going to wait for onSubscribe to be called.
 291         //
 292         void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
 293             debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
 294                       delegate);
 295             assert delegate != null;
 296             DelegateWrapper delegateWrapper = new DelegateWrapper(delegate, debug);
 297             DelegateWrapper previous;
 298             Flow.Subscription subscription;
 299             boolean handleNow;
 300             synchronized (this) {
 301                 previous = pendingDelegate.getAndSet(delegateWrapper);
 302                 subscription = readSubscription;
 303                 handleNow = this.errorRef.get() != null || finished;
 304             }
 305             if (previous != null) {
 306                 previous.dropSubscription();
 307             }
 308             if (subscription == null) {
 309                 debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) no subscription yet");
 310                 return;
 311             }
 312             if (handleNow || !sslDelegate.resumeReader()) {
 313                 processPendingSubscriber();
 314             }
 315         }
 316 
 317         // Can be called outside of the flow if an error has already been
 318         // raise. Otherwise, must be called within the SSLFlowDelegate
 319         // downstream reader flow.
 320         // If there is a subscription, and if there is a pending delegate,
 321         // calls dropSubscription() on the previous delegate (if any),
 322         // then subscribe the pending delegate.
 323         void processPendingSubscriber() {
 324             Flow.Subscription subscription;
 325             DelegateWrapper delegateWrapper, previous;
 326             synchronized (this) {
 327                 delegateWrapper = pendingDelegate.get();
 328                 if (delegateWrapper == null) return;
 329                 subscription = readSubscription;
 330                 previous = subscribed;
 331             }
 332             if (subscription == null) {
 333                 debug.log(Level.DEBUG,
 334                          "SSLSubscriberWrapper (reader) %s",
 335                          "processPendingSubscriber: no subscription yet");
 336                 return;
 337             }
 338             delegateWrapper = pendingDelegate.getAndSet(null);
 339             if (delegateWrapper == null) return;
 340             if (previous != null) {
 341                 previous.dropSubscription();
 342             }
 343             onNewSubscription(delegateWrapper, subscription);
 344         }
 345 
 346         @Override
 347         public void dropSubscription() {
 348             DelegateWrapper subscriberImpl = subscribed;
 349             if (subscriberImpl != null) {
 350                 subscriberImpl.dropSubscription();
 351             }
 352         }
 353 
 354         @Override
 355         public void onSubscribe(Flow.Subscription subscription) {
 356             debug.log(Level.DEBUG,
 357                       "SSLSubscriberWrapper (reader) onSubscribe(%s)",
 358                       subscription);
 359             onSubscribeImpl(subscription);
 360         }
 361 
 362         // called in the reader flow, from onSubscribe.
 363         private void onSubscribeImpl(Flow.Subscription subscription) {
 364             assert subscription != null;
 365             DelegateWrapper subscriberImpl, pending;
 366             synchronized (this) {
 367                 readSubscription = subscription;
 368                 subscriberImpl = subscribed;
 369                 pending = pendingDelegate.get();
 370             }
 371 
 372             if (subscriberImpl == null && pending == null) {
 373                 debug.log(Level.DEBUG,
 374                       "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
 375                       "no delegate yet");
 376                 return;
 377             }
 378 
 379             if (pending == null) {
 380                 // There is no pending delegate, but we have a previously
 381                 // subscribed delegate. This is obviously a re-subscribe.
 382                 // We are in the downstream reader flow, so we should call
 383                 // onSubscribe directly.
 384                 debug.log(Level.DEBUG,
 385                       "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
 386                       "resubscribing");
 387                 onNewSubscription(subscriberImpl, subscription);
 388             } else {
 389                 // We have some pending subscriber: subscribe it now that we have
 390                 // a subscription. If we already had a previous delegate then
 391                 // it will get a dropSubscription().
 392                 debug.log(Level.DEBUG,
 393                       "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
 394                       "subscribing pending");
 395                 processPendingSubscriber();
 396             }
 397         }
 398 
 399         private void onNewSubscription(DelegateWrapper subscriberImpl,
 400                                        Flow.Subscription subscription) {
 401             assert subscriberImpl != null;
 402             assert subscription != null;
 403 
 404             Throwable failed;
 405             boolean completed;
 406             // reset any demand that may have been made by the previous
 407             // subscriber
 408             sslDelegate.resetReaderDemand();
 409             // send the subscription to the subscriber.
 410             subscriberImpl.onSubscribe(subscription);
 411 
 412             // The following twisted logic is just here that we don't invoke
 413             // onError before onSubscribe. It also prevents race conditions
 414             // if onError is invoked concurrently with setDelegate.
 415             synchronized (this) {
 416                 failed = this.errorRef.get();
 417                 completed = finished;
 418                 subscribed = subscriberImpl;
 419             }
 420             if (failed != null) {
 421                 subscriberImpl.onError(failed);
 422             } else if (completed) {
 423                 subscriberImpl.onComplete();
 424             }
 425         }
 426 
 427         @Override
 428         public void onNext(List<ByteBuffer> item) {
 429             subscribed.onNext(item);
 430         }
 431 
 432         public void onErrorImpl(Throwable throwable) {
 433             // The following twisted logic is just here that we don't invoke
 434             // onError before onSubscribe. It also prevents race conditions
 435             // if onError is invoked concurrently with setDelegate.
 436             // See setDelegate.
 437 
 438             errorRef.compareAndSet(null, throwable);
 439             Throwable failed = errorRef.get();
 440             finished = true;
 441             debug.log(Level.DEBUG, "%s: onErrorImpl: %s", this, throwable);
 442             DelegateWrapper subscriberImpl;
 443             synchronized (this) {
 444                 subscriberImpl = subscribed;
 445             }
 446             if (subscriberImpl != null) {
 447                 subscriberImpl.onError(failed);
 448             } else {
 449                 debug.log(Level.DEBUG, "%s: delegate null, stored %s", this, failed);
 450             }
 451             // now if we have any pending subscriber, we should forward
 452             // the error to them immediately as the read scheduler will
 453             // already be stopped.
 454             processPendingSubscriber();
 455         }
 456 
 457         @Override
 458         public void onError(Throwable throwable) {
 459             assert !finished && !onCompleteReceived;
 460             onErrorImpl(throwable);
 461         }
 462 
 463         private boolean handshaking() {
 464             HandshakeStatus hs = engine.getHandshakeStatus();
 465             return !(hs == NOT_HANDSHAKING || hs == FINISHED);
 466         }
 467 
 468         private boolean handshakeFailed() {
 469             // sslDelegate can be null if we reach here
 470             // during the initial handshake, as that happens
 471             // within the SSLFlowDelegate constructor.
 472             // In that case we will want to raise an exception.
 473             return handshaking()
 474                     && (sslDelegate == null
 475                     || !sslDelegate.closeNotifyReceived());
 476         }
 477 
 478         @Override
 479         public void onComplete() {
 480             assert !finished && !onCompleteReceived;
 481             onCompleteReceived = true;
 482             DelegateWrapper subscriberImpl;
 483             synchronized(this) {
 484                 subscriberImpl = subscribed;
 485             }
 486 
 487             if (handshakeFailed()) {
 488                 debug.log(Level.DEBUG,
 489                         "handshake: %s, inbound done: %s outbound done: %s",
 490                         engine.getHandshakeStatus(),
 491                         engine.isInboundDone(),
 492                         engine.isOutboundDone());
 493                 onErrorImpl(new SSLHandshakeException(
 494                         "Remote host terminated the handshake"));
 495             } else if (subscriberImpl != null) {
 496                 finished = true;
 497                 subscriberImpl.onComplete();
 498             }
 499             // now if we have any pending subscriber, we should complete
 500             // them immediately as the read scheduler will already be stopped.
 501             processPendingSubscriber();
 502         }
 503     }
 504 
 505     @Override
 506     public void connectFlows(TubePublisher writePub,
 507                              TubeSubscriber readSub) {
 508         debug.log(Level.DEBUG, "connecting flows");
 509         readSubscriber.setDelegate(readSub);
 510         writePub.subscribe(this);
 511     }
 512 
 513     /** Outstanding write demand from the SSL Flow Delegate. */
 514     private final Demand writeDemand = new Demand();
 515 
 516     final class SSLSubscriptionWrapper implements Flow.Subscription {
 517 
 518         volatile Flow.Subscription delegate;
 519 
 520         void setSubscription(Flow.Subscription sub) {
 521             long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand?
 522             delegate = sub;
 523             debug.log(Level.DEBUG, "setSubscription: demand=%d", demand);
 524             if (demand > 0)
 525                 sub.request(demand);
 526         }
 527 
 528         @Override
 529         public void request(long n) {
 530             writeDemand.increase(n);
 531             debug.log(Level.DEBUG, "request: n=%d", n);
 532             Flow.Subscription sub = delegate;
 533             if (sub != null && n > 0) {
 534                 sub.request(n);
 535             }
 536         }
 537 
 538         @Override
 539         public void cancel() {
 540             // TODO:  no-op or error?
 541         }
 542     }
 543 
 544     /* Subscriber - writing side */
 545     @Override
 546     public void onSubscribe(Flow.Subscription subscription) {
 547         Objects.requireNonNull(subscription);
 548         Flow.Subscription x = writeSubscription.delegate;
 549         if (x != null)
 550             x.cancel();
 551 
 552         writeSubscription.setSubscription(subscription);
 553     }
 554 
 555     @Override
 556     public void onNext(List<ByteBuffer> item) {
 557         Objects.requireNonNull(item);
 558         boolean decremented = writeDemand.tryDecrement();
 559         assert decremented : "Unexpected writeDemand: ";
 560         debug.log(Level.DEBUG,
 561                 "sending %d  buffers to SSL flow delegate", item.size());
 562         sslDelegate.upstreamWriter().onNext(item);
 563     }
 564 
 565     @Override
 566     public void onError(Throwable throwable) {
 567         Objects.requireNonNull(throwable);
 568         sslDelegate.upstreamWriter().onError(throwable);
 569     }
 570 
 571     @Override
 572     public void onComplete() {
 573         sslDelegate.upstreamWriter().onComplete();
 574     }
 575 
 576     @Override
 577     public String toString() {
 578         return dbgString();
 579     }
 580 
 581     final String dbgString() {
 582         return "SSLTube(" + tube + ")";
 583     }
 584 
 585 }