1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http.internal.common; 27 28 import java.lang.System.Logger.Level; 29 import java.nio.ByteBuffer; 30 import java.util.List; 31 import java.util.Objects; 32 import java.util.concurrent.CompletableFuture; 33 import java.util.concurrent.Executor; 34 import java.util.concurrent.Flow; 35 import java.util.concurrent.atomic.AtomicReference; 36 import java.util.function.Consumer; 37 import javax.net.ssl.SSLEngine; 38 import javax.net.ssl.SSLHandshakeException; 39 import javax.net.ssl.SSLEngineResult.HandshakeStatus; 40 import jdk.incubator.http.internal.common.SubscriberWrapper.SchedulingAction; 41 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING; 42 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED; 43 44 /** 45 * An implementation of FlowTube that wraps another FlowTube in an 46 * SSL flow. 47 * <p> 48 * The following diagram shows a typical usage of the SSLTube, where 49 * the SSLTube wraps a SocketTube on the right hand side, and is connected 50 * to an HttpConnection on the left hand side. 51 * 52 * <preformatted>{@code 53 * +---------- SSLTube -------------------------+ 54 * | | 55 * | +---SSLFlowDelegate---+ | 56 * HttpConnection | | | | SocketTube 57 * read sink <- SSLSubscriberW. <- Reader <- upstreamR.() <---- read source 58 * (a subscriber) | | \ / | | (a publisher) 59 * | | SSLEngine | | 60 * HttpConnection | | / \ | | SocketTube 61 * write source -> SSLSubscriptionW. -> upstreamW.() -> Writer ----> write sink 62 * (a publisher) | | | | (a subscriber) 63 * | +---------------------+ | 64 * | | 65 * +---------------------------------------------+ 66 * }</preformatted> 67 */ 68 public class SSLTube implements FlowTube { 69 70 static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag. 71 final System.Logger debug = 72 Utils.getDebugLogger(this::dbgString, DEBUG); 73 74 private final FlowTube tube; 75 private final SSLSubscriberWrapper readSubscriber; 76 private final SSLSubscriptionWrapper writeSubscription; 77 private final SSLFlowDelegate sslDelegate; 78 private final SSLEngine engine; 79 private volatile boolean finished; 80 81 public SSLTube(SSLEngine engine, Executor executor, FlowTube tube) { 82 Objects.requireNonNull(engine); 83 Objects.requireNonNull(executor); 84 this.tube = Objects.requireNonNull(tube); 85 writeSubscription = new SSLSubscriptionWrapper(); 86 readSubscriber = new SSLSubscriberWrapper(); 87 this.engine = engine; 88 sslDelegate = new SSLTubeFlowDelegate(engine, 89 executor, 90 readSubscriber, 91 tube); 92 } 93 94 final class SSLTubeFlowDelegate extends SSLFlowDelegate { 95 SSLTubeFlowDelegate(SSLEngine engine, Executor executor, 96 SSLSubscriberWrapper readSubscriber, 97 FlowTube tube) { 98 super(engine, executor, readSubscriber, tube); 99 } 100 protected SchedulingAction enterReadScheduling() { 101 readSubscriber.processPendingSubscriber(); 102 return SchedulingAction.CONTINUE; 103 } 104 void connect(Flow.Subscriber<? super List<ByteBuffer>> downReader, 105 Flow.Subscriber<? super List<ByteBuffer>> downWriter) { 106 assert downWriter == tube; 107 assert downReader == readSubscriber; 108 109 // Connect the read sink first. That's the left-hand side 110 // downstream subscriber from the HttpConnection (or more 111 // accurately, the SSLSubscriberWrapper that will wrap it 112 // when SSLTube::connectFlows is called. 113 reader.subscribe(downReader); 114 115 // Connect the right hand side tube (the socket tube). 116 // 117 // The SSLFlowDelegate.writer publishes ByteBuffer to 118 // the SocketTube for writing on the socket, and the 119 // SSLFlowDelegate::upstreamReader subscribes to the 120 // SocketTube to receive ByteBuffers read from the socket. 121 // 122 // Basically this method is equivalent to: 123 // // connect the read source: 124 // // subscribe the SSLFlowDelegate upstream reader 125 // // to the socket tube publisher. 126 // tube.subscribe(upstreamReader()); 127 // // connect the write sink: 128 // // subscribe the socket tube write subscriber 129 // // with the SSLFlowDelegate downstream writer. 130 // writer.subscribe(tube); 131 tube.connectFlows(FlowTube.asTubePublisher(writer), 132 FlowTube.asTubeSubscriber(upstreamReader())); 133 134 // Finally connect the write source. That's the left 135 // hand side publisher which will push ByteBuffer for 136 // writing and encryption to the SSLFlowDelegate. 137 // The writeSubscription is in fact the SSLSubscriptionWrapper 138 // that will wrap the subscription provided by the 139 // HttpConnection publisher when SSLTube::connectFlows 140 // is called. 141 upstreamWriter().onSubscribe(writeSubscription); 142 } 143 } 144 145 public CompletableFuture<String> getALPN() { 146 return sslDelegate.alpn(); 147 } 148 149 @Override 150 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { 151 readSubscriber.dropSubscription(); 152 readSubscriber.setDelegate(s); 153 s.onSubscribe(readSubscription); 154 } 155 156 /** 157 * Tells whether, or not, this FlowTube has finished receiving data. 158 * 159 * @return true when one of this FlowTube Subscriber's OnError or onComplete 160 * methods have been invoked 161 */ 162 @Override 163 public boolean isFinished() { 164 return finished; 165 } 166 167 private volatile Flow.Subscription readSubscription; 168 169 // The DelegateWrapper wraps a subscribed {@code Flow.Subscriber} and 170 // tracks the subscriber's state. In particular it makes sure that 171 // onComplete/onError are not called before onSubscribed. 172 final static class DelegateWrapper implements FlowTube.TubeSubscriber { 173 private final FlowTube.TubeSubscriber delegate; 174 private final System.Logger debug; 175 volatile boolean subscribedCalled; 176 volatile boolean subscribedDone; 177 volatile boolean completed; 178 volatile Throwable error; 179 DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate, 180 System.Logger debug) { 181 this.delegate = FlowTube.asTubeSubscriber(delegate); 182 this.debug = debug; 183 } 184 185 @Override 186 public void dropSubscription() { 187 if (subscribedCalled && !completed) { 188 delegate.dropSubscription(); 189 } 190 } 191 192 @Override 193 public void onNext(List<ByteBuffer> item) { 194 assert subscribedCalled; 195 delegate.onNext(item); 196 } 197 198 @Override 199 public void onSubscribe(Flow.Subscription subscription) { 200 onSubscribe(delegate::onSubscribe, subscription); 201 } 202 203 private void onSubscribe(Consumer<Flow.Subscription> method, 204 Flow.Subscription subscription) { 205 subscribedCalled = true; 206 method.accept(subscription); 207 Throwable x; 208 boolean finished; 209 synchronized (this) { 210 subscribedDone = true; 211 x = error; 212 finished = completed; 213 } 214 if (x != null) { 215 debug.log(Level.DEBUG, 216 "Subscriber completed before subscribe: forwarding %s", 217 (Object)x); 218 delegate.onError(x); 219 } else if (finished) { 220 debug.log(Level.DEBUG, 221 "Subscriber completed before subscribe: calling onComplete()"); 222 delegate.onComplete(); 223 } 224 } 225 226 @Override 227 public void onError(Throwable t) { 228 if (completed) { 229 debug.log(Level.DEBUG, 230 "Subscriber already completed: ignoring %s", 231 (Object)t); 232 return; 233 } 234 boolean subscribed; 235 synchronized (this) { 236 if (completed) return; 237 error = t; 238 completed = true; 239 subscribed = subscribedDone; 240 } 241 if (subscribed) { 242 delegate.onError(t); 243 } else { 244 debug.log(Level.DEBUG, 245 "Subscriber not yet subscribed: stored %s", 246 (Object)t); 247 } 248 } 249 250 @Override 251 public void onComplete() { 252 if (completed) return; 253 boolean subscribed; 254 synchronized (this) { 255 if (completed) return; 256 completed = true; 257 subscribed = subscribedDone; 258 } 259 if (subscribed) { 260 delegate.onComplete(); 261 } else { 262 debug.log(Level.DEBUG, 263 "Subscriber not yet subscribed: stored completed=true"); 264 } 265 } 266 267 @Override 268 public String toString() { 269 return "DelegateWrapper:" + delegate.toString(); 270 } 271 272 } 273 274 // Used to read data from the SSLTube. 275 final class SSLSubscriberWrapper implements FlowTube.TubeSubscriber { 276 private AtomicReference<DelegateWrapper> pendingDelegate = 277 new AtomicReference<>(); 278 private volatile DelegateWrapper subscribed; 279 private volatile boolean onCompleteReceived; 280 private final AtomicReference<Throwable> errorRef 281 = new AtomicReference<>(); 282 283 // setDelegate can be called asynchronously when the SSLTube flow 284 // is connected. At this time the permanent subscriber (this class) 285 // may already be subscribed (readSubscription != null) or not. 286 // 1. If it's already subscribed (readSubscription != null), we 287 // are going to signal the SSLFlowDelegate reader, and make sure 288 // onSubscribed is called within the reader flow 289 // 2. If it's not yet subscribed (readSubscription == null), then 290 // we're going to wait for onSubscribe to be called. 291 // 292 void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) { 293 debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s", 294 delegate); 295 assert delegate != null; 296 DelegateWrapper delegateWrapper = new DelegateWrapper(delegate, debug); 297 DelegateWrapper previous; 298 Flow.Subscription subscription; 299 boolean handleNow; 300 synchronized (this) { 301 previous = pendingDelegate.getAndSet(delegateWrapper); 302 subscription = readSubscription; 303 handleNow = this.errorRef.get() != null || finished; 304 } 305 if (previous != null) { 306 previous.dropSubscription(); 307 } 308 if (subscription == null) { 309 debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) no subscription yet"); 310 return; 311 } 312 if (handleNow || !sslDelegate.resumeReader()) { 313 processPendingSubscriber(); 314 } 315 } 316 317 // Can be called outside of the flow if an error has already been 318 // raise. Otherwise, must be called within the SSLFlowDelegate 319 // downstream reader flow. 320 // If there is a subscription, and if there is a pending delegate, 321 // calls dropSubscription() on the previous delegate (if any), 322 // then subscribe the pending delegate. 323 void processPendingSubscriber() { 324 Flow.Subscription subscription; 325 DelegateWrapper delegateWrapper, previous; 326 synchronized (this) { 327 delegateWrapper = pendingDelegate.get(); 328 if (delegateWrapper == null) return; 329 subscription = readSubscription; 330 previous = subscribed; 331 } 332 if (subscription == null) { 333 debug.log(Level.DEBUG, 334 "SSLSubscriberWrapper (reader) %s", 335 "processPendingSubscriber: no subscription yet"); 336 return; 337 } 338 delegateWrapper = pendingDelegate.getAndSet(null); 339 if (delegateWrapper == null) return; 340 if (previous != null) { 341 previous.dropSubscription(); 342 } 343 onNewSubscription(delegateWrapper, subscription); 344 } 345 346 @Override 347 public void dropSubscription() { 348 DelegateWrapper subscriberImpl = subscribed; 349 if (subscriberImpl != null) { 350 subscriberImpl.dropSubscription(); 351 } 352 } 353 354 @Override 355 public void onSubscribe(Flow.Subscription subscription) { 356 debug.log(Level.DEBUG, 357 "SSLSubscriberWrapper (reader) onSubscribe(%s)", 358 subscription); 359 onSubscribeImpl(subscription); 360 } 361 362 // called in the reader flow, from onSubscribe. 363 private void onSubscribeImpl(Flow.Subscription subscription) { 364 assert subscription != null; 365 DelegateWrapper subscriberImpl, pending; 366 synchronized (this) { 367 readSubscription = subscription; 368 subscriberImpl = subscribed; 369 pending = pendingDelegate.get(); 370 } 371 372 if (subscriberImpl == null && pending == null) { 373 debug.log(Level.DEBUG, 374 "SSLSubscriberWrapper (reader) onSubscribeImpl: %s", 375 "no delegate yet"); 376 return; 377 } 378 379 if (pending == null) { 380 // There is no pending delegate, but we have a previously 381 // subscribed delegate. This is obviously a re-subscribe. 382 // We are in the downstream reader flow, so we should call 383 // onSubscribe directly. 384 debug.log(Level.DEBUG, 385 "SSLSubscriberWrapper (reader) onSubscribeImpl: %s", 386 "resubscribing"); 387 onNewSubscription(subscriberImpl, subscription); 388 } else { 389 // We have some pending subscriber: subscribe it now that we have 390 // a subscription. If we already had a previous delegate then 391 // it will get a dropSubscription(). 392 debug.log(Level.DEBUG, 393 "SSLSubscriberWrapper (reader) onSubscribeImpl: %s", 394 "subscribing pending"); 395 processPendingSubscriber(); 396 } 397 } 398 399 private void onNewSubscription(DelegateWrapper subscriberImpl, 400 Flow.Subscription subscription) { 401 assert subscriberImpl != null; 402 assert subscription != null; 403 404 Throwable failed; 405 boolean completed; 406 // reset any demand that may have been made by the previous 407 // subscriber 408 sslDelegate.resetReaderDemand(); 409 // send the subscription to the subscriber. 410 subscriberImpl.onSubscribe(subscription); 411 412 // The following twisted logic is just here that we don't invoke 413 // onError before onSubscribe. It also prevents race conditions 414 // if onError is invoked concurrently with setDelegate. 415 synchronized (this) { 416 failed = this.errorRef.get(); 417 completed = finished; 418 subscribed = subscriberImpl; 419 } 420 if (failed != null) { 421 subscriberImpl.onError(failed); 422 } else if (completed) { 423 subscriberImpl.onComplete(); 424 } 425 } 426 427 @Override 428 public void onNext(List<ByteBuffer> item) { 429 subscribed.onNext(item); 430 } 431 432 public void onErrorImpl(Throwable throwable) { 433 // The following twisted logic is just here that we don't invoke 434 // onError before onSubscribe. It also prevents race conditions 435 // if onError is invoked concurrently with setDelegate. 436 // See setDelegate. 437 438 errorRef.compareAndSet(null, throwable); 439 Throwable failed = errorRef.get(); 440 finished = true; 441 debug.log(Level.DEBUG, "%s: onErrorImpl: %s", this, throwable); 442 DelegateWrapper subscriberImpl; 443 synchronized (this) { 444 subscriberImpl = subscribed; 445 } 446 if (subscriberImpl != null) { 447 subscriberImpl.onError(failed); 448 } else { 449 debug.log(Level.DEBUG, "%s: delegate null, stored %s", this, failed); 450 } 451 // now if we have any pending subscriber, we should forward 452 // the error to them immediately as the read scheduler will 453 // already be stopped. 454 processPendingSubscriber(); 455 } 456 457 @Override 458 public void onError(Throwable throwable) { 459 assert !finished && !onCompleteReceived; 460 onErrorImpl(throwable); 461 } 462 463 private boolean handshaking() { 464 HandshakeStatus hs = engine.getHandshakeStatus(); 465 return !(hs == NOT_HANDSHAKING || hs == FINISHED); 466 } 467 468 private boolean handshakeFailed() { 469 // sslDelegate can be null if we reach here 470 // during the initial handshake, as that happens 471 // within the SSLFlowDelegate constructor. 472 // In that case we will want to raise an exception. 473 return handshaking() 474 && (sslDelegate == null 475 || !sslDelegate.closeNotifyReceived()); 476 } 477 478 @Override 479 public void onComplete() { 480 assert !finished && !onCompleteReceived; 481 onCompleteReceived = true; 482 DelegateWrapper subscriberImpl; 483 synchronized(this) { 484 subscriberImpl = subscribed; 485 } 486 487 if (handshakeFailed()) { 488 debug.log(Level.DEBUG, 489 "handshake: %s, inbound done: %s outbound done: %s", 490 engine.getHandshakeStatus(), 491 engine.isInboundDone(), 492 engine.isOutboundDone()); 493 onErrorImpl(new SSLHandshakeException( 494 "Remote host terminated the handshake")); 495 } else if (subscriberImpl != null) { 496 finished = true; 497 subscriberImpl.onComplete(); 498 } 499 // now if we have any pending subscriber, we should complete 500 // them immediately as the read scheduler will already be stopped. 501 processPendingSubscriber(); 502 } 503 } 504 505 @Override 506 public void connectFlows(TubePublisher writePub, 507 TubeSubscriber readSub) { 508 debug.log(Level.DEBUG, "connecting flows"); 509 readSubscriber.setDelegate(readSub); 510 writePub.subscribe(this); 511 } 512 513 /** Outstanding write demand from the SSL Flow Delegate. */ 514 private final Demand writeDemand = new Demand(); 515 516 final class SSLSubscriptionWrapper implements Flow.Subscription { 517 518 volatile Flow.Subscription delegate; 519 520 void setSubscription(Flow.Subscription sub) { 521 long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand? 522 delegate = sub; 523 debug.log(Level.DEBUG, "setSubscription: demand=%d", demand); 524 if (demand > 0) 525 sub.request(demand); 526 } 527 528 @Override 529 public void request(long n) { 530 writeDemand.increase(n); 531 debug.log(Level.DEBUG, "request: n=%d", n); 532 Flow.Subscription sub = delegate; 533 if (sub != null && n > 0) { 534 sub.request(n); 535 } 536 } 537 538 @Override 539 public void cancel() { 540 // TODO: no-op or error? 541 } 542 } 543 544 /* Subscriber - writing side */ 545 @Override 546 public void onSubscribe(Flow.Subscription subscription) { 547 Objects.requireNonNull(subscription); 548 Flow.Subscription x = writeSubscription.delegate; 549 if (x != null) 550 x.cancel(); 551 552 writeSubscription.setSubscription(subscription); 553 } 554 555 @Override 556 public void onNext(List<ByteBuffer> item) { 557 Objects.requireNonNull(item); 558 boolean decremented = writeDemand.tryDecrement(); 559 assert decremented : "Unexpected writeDemand: "; 560 debug.log(Level.DEBUG, 561 "sending %d buffers to SSL flow delegate", item.size()); 562 sslDelegate.upstreamWriter().onNext(item); 563 } 564 565 @Override 566 public void onError(Throwable throwable) { 567 Objects.requireNonNull(throwable); 568 sslDelegate.upstreamWriter().onError(throwable); 569 } 570 571 @Override 572 public void onComplete() { 573 sslDelegate.upstreamWriter().onComplete(); 574 } 575 576 @Override 577 public String toString() { 578 return dbgString(); 579 } 580 581 final String dbgString() { 582 return "SSLTube(" + tube + ")"; 583 } 584 585 }