1 /*
   2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;
  29 import java.nio.ByteBuffer;
  30 import java.util.List;
  31 import java.util.Objects;
  32 import java.util.concurrent.Flow;
  33 import java.util.concurrent.atomic.AtomicLong;
  34 import java.util.concurrent.atomic.AtomicReference;
  35 import java.nio.channels.SelectableChannel;
  36 import java.nio.channels.SelectionKey;
  37 import java.nio.channels.SocketChannel;
  38 import java.util.ArrayList;
  39 import java.util.function.Consumer;
  40 import java.util.function.Supplier;
  41 import jdk.internal.net.http.common.BufferSupplier;
  42 import jdk.internal.net.http.common.Demand;
  43 import jdk.internal.net.http.common.FlowTube;
  44 import jdk.internal.net.http.common.Log;
  45 import jdk.internal.net.http.common.Logger;
  46 import jdk.internal.net.http.common.SequentialScheduler;
  47 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
  48 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask;
  49 import jdk.internal.net.http.common.Utils;
  50 
  51 /**
  52  * A SocketTube is a terminal tube plugged directly into the socket.
  53  * The read subscriber should call {@code subscribe} on the SocketTube before
  54  * the SocketTube is subscribed to the write publisher.
  55  */
  56 final class SocketTube implements FlowTube {
  57 
  58     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
  59     static final AtomicLong IDS = new AtomicLong();
  60 
  61     private final HttpClientImpl client;
  62     private final SocketChannel channel;
  63     private final SliceBufferSource sliceBuffersSource;
  64     private final Object lock = new Object();
  65     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
  66     private final InternalReadPublisher readPublisher;
  67     private final InternalWriteSubscriber writeSubscriber;
  68     private final long id = IDS.incrementAndGet();
  69 
  70     public SocketTube(HttpClientImpl client, SocketChannel channel,
  71                       Supplier<ByteBuffer> buffersFactory) {
  72         this.client = client;
  73         this.channel = channel;
  74         this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
  75 
  76         this.readPublisher = new InternalReadPublisher();
  77         this.writeSubscriber = new InternalWriteSubscriber();
  78     }
  79 
  80     /**
  81      * Returns {@code true} if this flow is finished.
  82      * This happens when this flow internal read subscription is completed,
  83      * either normally (EOF reading) or exceptionally  (EOF writing, or
  84      * underlying socket closed, or some exception occurred while reading or
  85      * writing to the socket).
  86      *
  87      * @return {@code true} if this flow is finished.
  88      */
  89     public boolean isFinished() {
  90         InternalReadPublisher.InternalReadSubscription subscription =
  91                 readPublisher.subscriptionImpl;
  92         return subscription != null && subscription.completed
  93                 || subscription == null && errorRef.get() != null;
  94     }
  95 
  96     // ===================================================================== //
  97     //                       Flow.Publisher                                  //
  98     // ======================================================================//
  99 
 100     /**
 101      * {@inheritDoc }
 102      * @apiNote This method should be called first. In particular, the caller
 103      *          must ensure that this method must be called by the read
 104      *          subscriber before the write publisher can call {@code onSubscribe}.
 105      *          Failure to adhere to this contract may result in assertion errors.
 106      */
 107     @Override
 108     public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
 109         Objects.requireNonNull(s);
 110         assert s instanceof TubeSubscriber : "Expected TubeSubscriber, got:" + s;
 111         readPublisher.subscribe(s);
 112     }
 113 
 114 
 115     // ===================================================================== //
 116     //                       Flow.Subscriber                                 //
 117     // ======================================================================//
 118 
 119     /**
 120      * {@inheritDoc }
 121      * @apiNote The caller must ensure that {@code subscribe} is called by
 122      *          the read subscriber before {@code onSubscribe} is called by
 123      *          the write publisher.
 124      *          Failure to adhere to this contract may result in assertion errors.
 125      */
 126     @Override
 127     public void onSubscribe(Flow.Subscription subscription) {
 128         writeSubscriber.onSubscribe(subscription);
 129     }
 130 
 131     @Override
 132     public void onNext(List<ByteBuffer> item) {
 133         writeSubscriber.onNext(item);
 134     }
 135 
 136     @Override
 137     public void onError(Throwable throwable) {
 138         writeSubscriber.onError(throwable);
 139     }
 140 
 141     @Override
 142     public void onComplete() {
 143         writeSubscriber.onComplete();
 144     }
 145 
 146     // ===================================================================== //
 147     //                           Events                                      //
 148     // ======================================================================//
 149 
 150     void signalClosed() {
 151         // Ensures that the subscriber will be terminated and that future
 152         // subscribers will be notified when the connection is closed.
 153         if (Log.channel()) {
 154             Log.logChannel("Connection close signalled: connection closed locally ({0})",
 155                     channelDescr());
 156         }
 157         readPublisher.subscriptionImpl.signalError(
 158                 new IOException("connection closed locally"));
 159     }
 160 
 161     /**
 162      * A restartable task used to process tasks in sequence.
 163      */
 164     private static class SocketFlowTask implements RestartableTask {
 165         final Runnable task;
 166         private final Object monitor = new Object();
 167         SocketFlowTask(Runnable task) {
 168             this.task = task;
 169         }
 170         @Override
 171         public final void run(DeferredCompleter taskCompleter) {
 172             try {
 173                 // non contentious synchronized for visibility.
 174                 synchronized(monitor) {
 175                     task.run();
 176                 }
 177             } finally {
 178                 taskCompleter.complete();
 179             }
 180         }
 181     }
 182 
 183     // This is best effort - there's no guarantee that the printed set of values
 184     // is consistent. It should only be considered as weakly accurate - in
 185     // particular in what concerns the events states, especially when displaying
 186     // a read event state from a write event callback and conversely.
 187     void debugState(String when) {
 188         if (debug.on()) {
 189             StringBuilder state = new StringBuilder();
 190 
 191             InternalReadPublisher.InternalReadSubscription sub =
 192                     readPublisher.subscriptionImpl;
 193             InternalReadPublisher.ReadEvent readEvent =
 194                     sub == null ? null : sub.readEvent;
 195             Demand rdemand = sub == null ? null : sub.demand;
 196             InternalWriteSubscriber.WriteEvent writeEvent =
 197                     writeSubscriber.writeEvent;
 198             Demand wdemand = writeSubscriber.writeDemand;
 199             int rops = readEvent == null ? 0 : readEvent.interestOps();
 200             long rd = rdemand == null ? 0 : rdemand.get();
 201             int wops = writeEvent == null ? 0 : writeEvent.interestOps();
 202             long wd = wdemand == null ? 0 : wdemand.get();
 203 
 204             state.append(when).append(" Reading: [ops=")
 205                     .append(rops).append(", demand=").append(rd)
 206                     .append(", stopped=")
 207                     .append((sub == null ? false : sub.readScheduler.isStopped()))
 208                     .append("], Writing: [ops=").append(wops)
 209                     .append(", demand=").append(wd)
 210                     .append("]");
 211             debug.log(state.toString());
 212         }
 213     }
 214 
 215     /**
 216      * A repeatable event that can be paused or resumed by changing its
 217      * interestOps. When the event is fired, it is first paused before being
 218      * signaled. It is the responsibility of the code triggered by
 219      * {@code signalEvent} to resume the event if required.
 220      */
 221     private static abstract class SocketFlowEvent extends AsyncEvent {
 222         final SocketChannel channel;
 223         final int defaultInterest;
 224         volatile int interestOps;
 225         volatile boolean registered;
 226         SocketFlowEvent(int defaultInterest, SocketChannel channel) {
 227             super(AsyncEvent.REPEATING);
 228             this.defaultInterest = defaultInterest;
 229             this.channel = channel;
 230         }
 231         final boolean registered() {return registered;}
 232         final void resume() {
 233             interestOps = defaultInterest;
 234             registered = true;
 235         }
 236         final void pause() {interestOps = 0;}
 237         @Override
 238         public final SelectableChannel channel() {return channel;}
 239         @Override
 240         public final int interestOps() {return interestOps;}
 241 
 242         @Override
 243         public final void handle() {
 244             pause();       // pause, then signal
 245             signalEvent(); // won't be fired again until resumed.
 246         }
 247         @Override
 248         public final void abort(IOException error) {
 249             debug().log(() -> "abort: " + error);
 250             pause();              // pause, then signal
 251             signalError(error);   // should not be resumed after abort (not checked)
 252         }
 253 
 254         protected abstract void signalEvent();
 255         protected abstract void signalError(Throwable error);
 256         abstract Logger debug();
 257     }
 258 
 259     // ===================================================================== //
 260     //                              Writing                                  //
 261     // ======================================================================//
 262 
 263     // This class makes the assumption that the publisher will call onNext
 264     // sequentially, and that onNext won't be called if the demand has not been
 265     // incremented by request(1).
 266     // It has a 'queue of 1' meaning that it will call request(1) in
 267     // onSubscribe, and then only after its 'current' buffer list has been
 268     // fully written and current set to null;
 269     private final class InternalWriteSubscriber
 270             implements Flow.Subscriber<List<ByteBuffer>> {
 271 
 272         volatile WriteSubscription subscription;
 273         volatile List<ByteBuffer> current;
 274         volatile boolean completed;
 275         final AsyncTriggerEvent startSubscription =
 276                 new AsyncTriggerEvent(this::signalError, this::startSubscription);
 277         final WriteEvent writeEvent = new WriteEvent(channel, this);
 278         final Demand writeDemand = new Demand();
 279 
 280         @Override
 281         public void onSubscribe(Flow.Subscription subscription) {
 282             WriteSubscription previous = this.subscription;
 283             if (debug.on()) debug.log("subscribed for writing");
 284             try {
 285                 boolean needEvent = current == null;
 286                 if (needEvent) {
 287                     if (previous != null && previous.upstreamSubscription != subscription) {
 288                         previous.dropSubscription();
 289                     }
 290                 }
 291                 this.subscription = new WriteSubscription(subscription);
 292                 if (needEvent) {
 293                     if (debug.on())
 294                         debug.log("write: registering startSubscription event");
 295                     client.registerEvent(startSubscription);
 296                 }
 297             } catch (Throwable t) {
 298                 signalError(t);
 299             }
 300         }
 301 
 302         @Override
 303         public void onNext(List<ByteBuffer> bufs) {
 304             assert current == null : dbgString() // this is a queue of 1.
 305                     + "w.onNext current: " + current;
 306             assert subscription != null : dbgString()
 307                     + "w.onNext: subscription is null";
 308             current = bufs;
 309             tryFlushCurrent(client.isSelectorThread()); // may be in selector thread
 310             // For instance in HTTP/2, a received SETTINGS frame might trigger
 311             // the sending of a SETTINGS frame in turn which might cause
 312             // onNext to be called from within the same selector thread that the
 313             // original SETTINGS frames arrived on. If rs is the read-subscriber
 314             // and ws is the write-subscriber then the following can occur:
 315             // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
 316             // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
 317             debugState("leaving w.onNext");
 318         }
 319 
 320         // Don't use a SequentialScheduler here: rely on onNext() being invoked
 321         // sequentially, and not being invoked if there is no demand, request(1).
 322         // onNext is usually called from within a user / executor thread.
 323         // Initial writing will be performed in that thread. If for some reason,
 324         // not all the data can be written, a writeEvent will be registered, and
 325         // writing will resume in the selector manager thread when the
 326         // writeEvent is fired.
 327         //
 328         // If this method is invoked in the selector manager thread (because of
 329         // a writeEvent), then the executor will be used to invoke request(1),
 330         // ensuring that onNext() won't be invoked from within the selector
 331         // thread. If not in the selector manager thread, then request(1) is
 332         // invoked directly.
 333         void tryFlushCurrent(boolean inSelectorThread) {
 334             List<ByteBuffer> bufs = current;
 335             if (bufs == null) return;
 336             try {
 337                 assert inSelectorThread == client.isSelectorThread() :
 338                        "should " + (inSelectorThread ? "" : "not ")
 339                         + " be in the selector thread";
 340                 long remaining = Utils.remaining(bufs);
 341                 if (debug.on()) debug.log("trying to write: %d", remaining);
 342                 long written = writeAvailable(bufs);
 343                 if (debug.on()) debug.log("wrote: %d", written);
 344                 assert written >= 0 : "negative number of bytes written:" + written;
 345                 assert written <= remaining;
 346                 if (remaining - written == 0) {
 347                     current = null;
 348                     if (writeDemand.tryDecrement()) {
 349                         Runnable requestMore = this::requestMore;
 350                         if (inSelectorThread) {
 351                             assert client.isSelectorThread();
 352                             client.theExecutor().execute(requestMore);
 353                         } else {
 354                             assert !client.isSelectorThread();
 355                             requestMore.run();
 356                         }
 357                     }
 358                 } else {
 359                     resumeWriteEvent(inSelectorThread);
 360                 }
 361             } catch (Throwable t) {
 362                 signalError(t);
 363             }
 364         }
 365 
 366         // Kick off the initial request:1 that will start the writing side.
 367         // Invoked in the selector manager thread.
 368         void startSubscription() {
 369             try {
 370                 if (debug.on()) debug.log("write: starting subscription");
 371                 if (Log.channel()) {
 372                     Log.logChannel("Start requesting bytes for writing to channel: {0}",
 373                             channelDescr());
 374                 }
 375                 assert client.isSelectorThread();
 376                 // make sure read registrations are handled before;
 377                 readPublisher.subscriptionImpl.handlePending();
 378                 if (debug.on()) debug.log("write: offloading requestMore");
 379                 // start writing;
 380                 client.theExecutor().execute(this::requestMore);
 381             } catch(Throwable t) {
 382                 signalError(t);
 383             }
 384         }
 385 
 386         void requestMore() {
 387            WriteSubscription subscription = this.subscription;
 388            subscription.requestMore();
 389         }
 390 
 391         @Override
 392         public void onError(Throwable throwable) {
 393             signalError(throwable);
 394         }
 395 
 396         @Override
 397         public void onComplete() {
 398             completed = true;
 399             // no need to pause the write event here: the write event will
 400             // be paused if there is nothing more to write.
 401             List<ByteBuffer> bufs = current;
 402             long remaining = bufs == null ? 0 : Utils.remaining(bufs);
 403             if (debug.on())
 404                 debug.log( "write completed, %d yet to send", remaining);
 405             debugState("InternalWriteSubscriber::onComplete");
 406         }
 407 
 408         void resumeWriteEvent(boolean inSelectorThread) {
 409             if (debug.on()) debug.log("scheduling write event");
 410             resumeEvent(writeEvent, this::signalError);
 411         }
 412 
 413         void signalWritable() {
 414             if (debug.on()) debug.log("channel is writable");
 415             tryFlushCurrent(true);
 416         }
 417 
 418         void signalError(Throwable error) {
 419             debug.log(() -> "write error: " + error);
 420             if (Log.channel()) {
 421                 Log.logChannel("Failed to write to channel ({0}: {1})",
 422                         channelDescr(), error);
 423             }
 424             completed = true;
 425             readPublisher.signalError(error);
 426             Flow.Subscription subscription = this.subscription;
 427             if (subscription != null) subscription.cancel();
 428         }
 429 
 430         // A repeatable WriteEvent which is paused after firing and can
 431         // be resumed if required - see SocketFlowEvent;
 432         final class WriteEvent extends SocketFlowEvent {
 433             final InternalWriteSubscriber sub;
 434             WriteEvent(SocketChannel channel, InternalWriteSubscriber sub) {
 435                 super(SelectionKey.OP_WRITE, channel);
 436                 this.sub = sub;
 437             }
 438             @Override
 439             protected final void signalEvent() {
 440                 try {
 441                     client.eventUpdated(this);
 442                     sub.signalWritable();
 443                 } catch(Throwable t) {
 444                     sub.signalError(t);
 445                 }
 446             }
 447 
 448             @Override
 449             protected void signalError(Throwable error) {
 450                 sub.signalError(error);
 451             }
 452 
 453             @Override
 454             Logger debug() { return debug; }
 455         }
 456 
 457         final class WriteSubscription implements Flow.Subscription {
 458             final Flow.Subscription upstreamSubscription;
 459             volatile boolean cancelled;
 460             WriteSubscription(Flow.Subscription subscription) {
 461                 this.upstreamSubscription = subscription;
 462             }
 463 
 464             @Override
 465             public void request(long n) {
 466                 if (cancelled) return;
 467                 upstreamSubscription.request(n);
 468             }
 469 
 470             @Override
 471             public void cancel() {
 472                 if (cancelled) return;
 473                 if (debug.on()) debug.log("write: cancel");
 474                 if (Log.channel()) {
 475                     Log.logChannel("Cancelling write subscription");
 476                 }
 477                 dropSubscription();
 478                 upstreamSubscription.cancel();
 479             }
 480 
 481             void dropSubscription() {
 482                 synchronized (InternalWriteSubscriber.this) {
 483                     cancelled = true;
 484                     if (debug.on()) debug.log("write: resetting demand to 0");
 485                     writeDemand.reset();
 486                 }
 487             }
 488 
 489             void requestMore() {
 490                 try {
 491                     if (completed || cancelled) return;
 492                     boolean requestMore;
 493                     long d;
 494                     // don't fiddle with demand after cancel.
 495                     // see dropSubscription.
 496                     synchronized (InternalWriteSubscriber.this) {
 497                         if (cancelled) return;
 498                         d = writeDemand.get();
 499                         requestMore = writeDemand.increaseIfFulfilled();
 500                     }
 501                     if (requestMore) {
 502                         if (debug.on()) debug.log("write: requesting more...");
 503                         upstreamSubscription.request(1);
 504                     } else {
 505                         if (debug.on())
 506                             debug.log("write: no need to request more: %d", d);
 507                     }
 508                 } catch (Throwable t) {
 509                     if (debug.on())
 510                         debug.log("write: error while requesting more: " + t);
 511                     signalError(t);
 512                 } finally {
 513                     debugState("leaving requestMore: ");
 514                 }
 515             }
 516         }
 517     }
 518 
 519     // ===================================================================== //
 520     //                              Reading                                  //
 521     // ===================================================================== //
 522 
 523     // The InternalReadPublisher uses a SequentialScheduler to ensure that
 524     // onNext/onError/onComplete are called sequentially on the caller's
 525     // subscriber.
 526     // However, it relies on the fact that the only time where
 527     // runOrSchedule() is called from a user/executor thread is in signalError,
 528     // right after the errorRef has been set.
 529     // Because the sequential scheduler's task always checks for errors first,
 530     // and always terminate the scheduler on error, then it is safe to assume
 531     // that if it reaches the point where it reads from the channel, then
 532     // it is running in the SelectorManager thread. This is because all
 533     // other invocation of runOrSchedule() are triggered from within a
 534     // ReadEvent.
 535     //
 536     // When pausing/resuming the event, some shortcuts can then be taken
 537     // when we know we're running in the selector manager thread
 538     // (in that case there's no need to call client.eventUpdated(readEvent);
 539     //
 540     private final class InternalReadPublisher
 541             implements Flow.Publisher<List<ByteBuffer>> {
 542         private final InternalReadSubscription subscriptionImpl
 543                 = new InternalReadSubscription();
 544         AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();
 545         private volatile ReadSubscription subscription;
 546 
 547         @Override
 548         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
 549             Objects.requireNonNull(s);
 550 
 551             TubeSubscriber sub = FlowTube.asTubeSubscriber(s);
 552             ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
 553             ReadSubscription previous = pendingSubscription.getAndSet(target);
 554 
 555             if (previous != null && previous != target) {
 556                 if (debug.on())
 557                     debug.log("read publisher: dropping pending subscriber: "
 558                               + previous.subscriber);
 559                 previous.errorRef.compareAndSet(null, errorRef.get());
 560                 previous.signalOnSubscribe();
 561                 if (subscriptionImpl.completed) {
 562                     previous.signalCompletion();
 563                 } else {
 564                     previous.subscriber.dropSubscription();
 565                 }
 566             }
 567 
 568             if (debug.on()) debug.log("read publisher got subscriber");
 569             subscriptionImpl.signalSubscribe();
 570             debugState("leaving read.subscribe: ");
 571         }
 572 
 573         void signalError(Throwable error) {
 574             if (debug.on()) debug.log("error signalled " + error);
 575             if (!errorRef.compareAndSet(null, error)) {
 576                 return;
 577             }
 578             if (Log.channel()) {
 579                 Log.logChannel("Error signalled on channel {0}: {1}",
 580                         channelDescr(), error);
 581             }
 582             subscriptionImpl.handleError();
 583         }
 584 
 585         final class ReadSubscription implements Flow.Subscription {
 586             final InternalReadSubscription impl;
 587             final TubeSubscriber  subscriber;
 588             final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 589             final BufferSource bufferSource;
 590             volatile boolean subscribed;
 591             volatile boolean cancelled;
 592             volatile boolean completed;
 593 
 594             public ReadSubscription(InternalReadSubscription impl,
 595                                     TubeSubscriber subscriber) {
 596                 this.impl = impl;
 597                 this.bufferSource = subscriber.supportsRecycling()
 598                         ? new SSLDirectBufferSource(client)
 599                         : SocketTube.this.sliceBuffersSource;
 600                 this.subscriber = subscriber;
 601             }
 602 
 603             @Override
 604             public void cancel() {
 605                 cancelled = true;
 606             }
 607 
 608             @Override
 609             public void request(long n) {
 610                 if (!cancelled) {
 611                     impl.request(n);
 612                 } else {
 613                     if (debug.on())
 614                         debug.log("subscription cancelled, ignoring request %d", n);
 615                 }
 616             }
 617 
 618             void signalCompletion() {
 619                 assert subscribed || cancelled;
 620                 if (completed || cancelled) return;
 621                 synchronized (this) {
 622                     if (completed) return;
 623                     completed = true;
 624                 }
 625                 Throwable error = errorRef.get();
 626                 if (error != null) {
 627                     if (debug.on())
 628                         debug.log("forwarding error to subscriber: " + error);
 629                     subscriber.onError(error);
 630                 } else {
 631                     if (debug.on()) debug.log("completing subscriber");
 632                     subscriber.onComplete();
 633                 }
 634             }
 635 
 636             void signalOnSubscribe() {
 637                 if (subscribed || cancelled) return;
 638                 synchronized (this) {
 639                     if (subscribed || cancelled) return;
 640                     subscribed = true;
 641                 }
 642                 subscriber.onSubscribe(this);
 643                 if (debug.on()) debug.log("onSubscribe called");
 644                 if (errorRef.get() != null) {
 645                     signalCompletion();
 646                 }
 647             }
 648         }
 649 
 650         final class InternalReadSubscription implements Flow.Subscription {
 651 
 652             private final Demand demand = new Demand();
 653             final SequentialScheduler readScheduler;
 654             private volatile boolean completed;
 655             private final ReadEvent readEvent;
 656             private final AsyncEvent subscribeEvent;
 657 
 658             InternalReadSubscription() {
 659                 readScheduler = new SequentialScheduler(new SocketFlowTask(this::read));
 660                 subscribeEvent = new AsyncTriggerEvent(this::signalError,
 661                                                        this::handleSubscribeEvent);
 662                 readEvent = new ReadEvent(channel, this);
 663             }
 664 
 665             /*
 666              * This method must be invoked before any other method of this class.
 667              */
 668             final void signalSubscribe() {
 669                 if (readScheduler.isStopped() || completed) {
 670                     // if already completed or stopped we can handle any
 671                     // pending connection directly from here.
 672                     if (debug.on())
 673                         debug.log("handling pending subscription while completed");
 674                     handlePending();
 675                 } else {
 676                     try {
 677                         if (debug.on()) debug.log("registering subscribe event");
 678                         client.registerEvent(subscribeEvent);
 679                     } catch (Throwable t) {
 680                         signalError(t);
 681                         handlePending();
 682                     }
 683                 }
 684             }
 685 
 686             final void handleSubscribeEvent() {
 687                 assert client.isSelectorThread();
 688                 debug.log("subscribe event raised");
 689                 if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr());
 690                 readScheduler.runOrSchedule();
 691                 if (readScheduler.isStopped() || completed) {
 692                     // if already completed or stopped we can handle any
 693                     // pending connection directly from here.
 694                     if (debug.on())
 695                         debug.log("handling pending subscription when completed");
 696                     handlePending();
 697                 }
 698             }
 699 
 700 
 701             /*
 702              * Although this method is thread-safe, the Reactive-Streams spec seems
 703              * to not require it to be as such. It's a responsibility of the
 704              * subscriber to signal demand in a thread-safe manner.
 705              *
 706              * See Reactive Streams specification, rules 2.7 and 3.4.
 707              */
 708             @Override
 709             public final void request(long n) {
 710                 if (n > 0L) {
 711                     boolean wasFulfilled = demand.increase(n);
 712                     if (wasFulfilled) {
 713                         if (debug.on()) debug.log("got some demand for reading");
 714                         resumeReadEvent();
 715                         // if demand has been changed from fulfilled
 716                         // to unfulfilled register read event;
 717                     }
 718                 } else {
 719                     signalError(new IllegalArgumentException("non-positive request"));
 720                 }
 721                 debugState("leaving request("+n+"): ");
 722             }
 723 
 724             @Override
 725             public final void cancel() {
 726                 pauseReadEvent();
 727                 if (Log.channel()) {
 728                     Log.logChannel("Read subscription cancelled for channel {0}",
 729                             channelDescr());
 730                 }
 731                 readScheduler.stop();
 732             }
 733 
 734             private void resumeReadEvent() {
 735                 if (debug.on()) debug.log("resuming read event");
 736                 resumeEvent(readEvent, this::signalError);
 737             }
 738 
 739             private void pauseReadEvent() {
 740                 if (debug.on()) debug.log("pausing read event");
 741                 pauseEvent(readEvent, this::signalError);
 742             }
 743 
 744 
 745             final void handleError() {
 746                 assert errorRef.get() != null;
 747                 readScheduler.runOrSchedule();
 748             }
 749 
 750             final void signalError(Throwable error) {
 751                 if (!errorRef.compareAndSet(null, error)) {
 752                     return;
 753                 }
 754                 if (debug.on()) debug.log("got read error: " + error);
 755                 if (Log.channel()) {
 756                     Log.logChannel("Read error signalled on channel {0}: {1}",
 757                             channelDescr(), error);
 758                 }
 759                 readScheduler.runOrSchedule();
 760             }
 761 
 762             final void signalReadable() {
 763                 readScheduler.runOrSchedule();
 764             }
 765 
 766             /** The body of the task that runs in SequentialScheduler. */
 767             final void read() {
 768                 // It is important to only call pauseReadEvent() when stopping
 769                 // the scheduler. The event is automatically paused before
 770                 // firing, and trying to pause it again could cause a race
 771                 // condition between this loop, which calls tryDecrementDemand(),
 772                 // and the thread that calls request(n), which will try to resume
 773                 // reading.
 774                 try {
 775                     while(!readScheduler.isStopped()) {
 776                         if (completed) return;
 777 
 778                         // make sure we have a subscriber
 779                         if (handlePending()) {
 780                             if (debug.on())
 781                                 debug.log("pending subscriber subscribed");
 782                             return;
 783                         }
 784 
 785                         // If an error was signaled, we might not be in the
 786                         // the selector thread, and that is OK, because we
 787                         // will just call onError and return.
 788                         ReadSubscription current = subscription;
 789                         Throwable error = errorRef.get();
 790                         if (current == null)  {
 791                             assert error != null;
 792                             if (debug.on())
 793                                 debug.log("error raised before subscriber subscribed: %s",
 794                                           (Object)error);
 795                             return;
 796                         }
 797                         TubeSubscriber subscriber = current.subscriber;
 798                         if (error != null) {
 799                             completed = true;
 800                             // safe to pause here because we're finished anyway.
 801                             pauseReadEvent();
 802                             if (debug.on())
 803                                 debug.log("Sending error " + error
 804                                           + " to subscriber " + subscriber);
 805                             if (Log.channel()) {
 806                                 Log.logChannel("Raising error with subscriber for {0}: {1}",
 807                                         channelDescr(), error);
 808                             }
 809                             current.errorRef.compareAndSet(null, error);
 810                             current.signalCompletion();
 811                             readScheduler.stop();
 812                             debugState("leaving read() loop with error: ");
 813                             return;
 814                         }
 815 
 816                         // If we reach here then we must be in the selector thread.
 817                         assert client.isSelectorThread();
 818                         if (demand.tryDecrement()) {
 819                             // we have demand.
 820                             try {
 821                                 List<ByteBuffer> bytes = readAvailable(current.bufferSource);
 822                                 if (bytes == EOF) {
 823                                     if (!completed) {
 824                                         if (debug.on()) debug.log("got read EOF");
 825                                         if (Log.channel()) {
 826                                             Log.logChannel("EOF read from channel: {0}",
 827                                                         channelDescr());
 828                                         }
 829                                         completed = true;
 830                                         // safe to pause here because we're finished
 831                                         // anyway.
 832                                         pauseReadEvent();
 833                                         current.signalCompletion();
 834                                         readScheduler.stop();
 835                                     }
 836                                     debugState("leaving read() loop after EOF: ");
 837                                     return;
 838                                 } else if (Utils.remaining(bytes) > 0) {
 839                                     // the subscriber is responsible for offloading
 840                                     // to another thread if needed.
 841                                     if (debug.on())
 842                                         debug.log("read bytes: " + Utils.remaining(bytes));
 843                                     assert !current.completed;
 844                                     subscriber.onNext(bytes);
 845                                     // we could continue looping until the demand
 846                                     // reaches 0. However, that would risk starving
 847                                     // other connections (bound to other socket
 848                                     // channels) - as other selected keys activated
 849                                     // by the selector manager thread might be
 850                                     // waiting for this event to terminate.
 851                                     // So resume the read event and return now...
 852                                     resumeReadEvent();
 853                                     debugState("leaving read() loop after onNext: ");
 854                                     return;
 855                                 } else {
 856                                     // nothing available!
 857                                     if (debug.on()) debug.log("no more bytes available");
 858                                     // re-increment the demand and resume the read
 859                                     // event. This ensures that this loop is
 860                                     // executed again when the socket becomes
 861                                     // readable again.
 862                                     demand.increase(1);
 863                                     resumeReadEvent();
 864                                     debugState("leaving read() loop with no bytes");
 865                                     return;
 866                                 }
 867                             } catch (Throwable x) {
 868                                 signalError(x);
 869                                 continue;
 870                             }
 871                         } else {
 872                             if (debug.on()) debug.log("no more demand for reading");
 873                             // the event is paused just after firing, so it should
 874                             // still be paused here, unless the demand was just
 875                             // incremented from 0 to n, in which case, the
 876                             // event will be resumed, causing this loop to be
 877                             // invoked again when the socket becomes readable:
 878                             // This is what we want.
 879                             // Trying to pause the event here would actually
 880                             // introduce a race condition between this loop and
 881                             // request(n).
 882                             debugState("leaving read() loop with no demand");
 883                             break;
 884                         }
 885                     }
 886                 } catch (Throwable t) {
 887                     if (debug.on()) debug.log("Unexpected exception in read loop", t);
 888                     signalError(t);
 889                 } finally {
 890                     if (readScheduler.isStopped()) {
 891                         if (debug.on()) debug.log("Read scheduler stopped");
 892                         if (Log.channel()) {
 893                             Log.logChannel("Stopped reading from channel {0}", channelDescr());
 894                         }
 895                     }
 896                     handlePending();
 897                 }
 898             }
 899 
 900             boolean handlePending() {
 901                 ReadSubscription pending = pendingSubscription.getAndSet(null);
 902                 if (pending == null) return false;
 903                 if (debug.on())
 904                     debug.log("handling pending subscription for %s",
 905                             pending.subscriber);
 906                 ReadSubscription current = subscription;
 907                 if (current != null && current != pending && !completed) {
 908                     current.subscriber.dropSubscription();
 909                 }
 910                 if (debug.on()) debug.log("read demand reset to 0");
 911                 subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.
 912                 pending.errorRef.compareAndSet(null, errorRef.get());
 913                 if (!readScheduler.isStopped()) {
 914                     subscription = pending;
 915                 } else {
 916                     if (debug.on()) debug.log("socket tube is already stopped");
 917                 }
 918                 if (debug.on()) debug.log("calling onSubscribe");
 919                 pending.signalOnSubscribe();
 920                 if (completed) {
 921                     pending.errorRef.compareAndSet(null, errorRef.get());
 922                     pending.signalCompletion();
 923                 }
 924                 return true;
 925             }
 926         }
 927 
 928 
 929         // A repeatable ReadEvent which is paused after firing and can
 930         // be resumed if required - see SocketFlowEvent;
 931         final class ReadEvent extends SocketFlowEvent {
 932             final InternalReadSubscription sub;
 933             ReadEvent(SocketChannel channel, InternalReadSubscription sub) {
 934                 super(SelectionKey.OP_READ, channel);
 935                 this.sub = sub;
 936             }
 937             @Override
 938             protected final void signalEvent() {
 939                 try {
 940                     client.eventUpdated(this);
 941                     sub.signalReadable();
 942                 } catch(Throwable t) {
 943                     sub.signalError(t);
 944                 }
 945             }
 946 
 947             @Override
 948             protected final void signalError(Throwable error) {
 949                 sub.signalError(error);
 950             }
 951 
 952             @Override
 953             Logger debug() { return debug; }
 954         }
 955     }
 956 
 957     // ===================================================================== //
 958     //                       Buffer Management                               //
 959     // ===================================================================== //
 960 
 961     // This interface is used by readAvailable(BufferSource);
 962     public interface BufferSource {
 963         /**
 964          * Returns a buffer to read data from the socket.
 965          *
 966          * @implNote
 967          * Different implementation can have different strategies, as to
 968          * which kind of buffer to return, or whether to return the same
 969          * buffer. The only constraints are that:
 970          *   a. the buffer returned must not be null
 971          *   b. the buffer position indicates where to start reading
 972          *   c. the buffer limit indicates where to stop reading.
 973          *   d. the buffer is 'free' - that is - it is not used
 974          *      or retained by anybody else
 975          *
 976          * @return A buffer to read data from the socket.
 977          */
 978         ByteBuffer getBuffer();
 979 
 980         /**
 981          * Appends the read-data in {@code buffer} to the list of buffer to
 982          * be sent downstream to the subscriber. May return a new
 983          * list, or append to the given list.
 984          *
 985          * @implNote
 986          * Different implementation can have different strategies, but
 987          * must obviously be consistent with the implementation of the
 988          * getBuffer() method. For instance, an implementation could
 989          * decide to add the buffer to the list and return a new buffer
 990          * next time getBuffer() is called, or could decide to add a buffer
 991          * slice to the list and return the same buffer (if remaining
 992          * space is available) next time getBuffer() is called.
 993          *
 994          * @param list    The list before adding the data. Can be null.
 995          * @param buffer  The buffer containing the data to add to the list.
 996          * @param start   The start position at which data were read.
 997          *                The current buffer position indicates the end.
 998          * @return A possibly new list where a buffer containing the
 999          *         data read from the socket has been added.
1000          */
1001         List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buffer, int start);
1002 
1003         /**
1004          * Returns the given unused {@code buffer}, previously obtained from
1005          * {@code getBuffer}.
1006          *
1007          * @implNote This method can be used, if necessary, to return
1008          *  the unused buffer to the pull.
1009          *
1010          * @param buffer The unused buffer.
1011          */
1012         default void returnUnused(ByteBuffer buffer) { }
1013     }
1014 
1015     // An implementation of BufferSource used for unencrypted data.
1016     // This buffer source uses heap buffers and avoids wasting memory
1017     // by forwarding read-only buffer slices downstream.
1018     // Buffers allocated through this source are simply GC'ed when
1019     // they are no longer referenced.
1020     private static final class SliceBufferSource implements BufferSource {
1021         private final Supplier<ByteBuffer> factory;
1022         private volatile ByteBuffer current;
1023 
1024         public SliceBufferSource() {
1025             this(Utils::getBuffer);
1026         }
1027         public SliceBufferSource(Supplier<ByteBuffer> factory) {
1028             this.factory = Objects.requireNonNull(factory);
1029         }
1030 
1031         // Reuses the same buffer if some space remains available.
1032         // Otherwise, returns a new heap buffer.
1033         @Override
1034         public final ByteBuffer getBuffer() {
1035             ByteBuffer buf = current;
1036             buf = (buf == null || !buf.hasRemaining())
1037                     ? (current = factory.get()) : buf;
1038             assert buf.hasRemaining();
1039             return buf;
1040         }
1041 
1042         // Adds a read-only slice to the list, potentially returning a
1043         // new list with that slice at the end.
1044         @Override
1045         public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
1046             // creates a slice to add to the list
1047             int limit = buf.limit();
1048             buf.limit(buf.position());
1049             buf.position(start);
1050             ByteBuffer slice = buf.slice();
1051 
1052             // restore buffer state to what it was before creating the slice
1053             buf.position(buf.limit());
1054             buf.limit(limit);
1055 
1056             // add the buffer to the list
1057             return SocketTube.listOf(list, slice.asReadOnlyBuffer());
1058         }
1059     }
1060 
1061 
1062     // An implementation of BufferSource used for encrypted data.
1063     // This buffer source uses direct byte buffers that will be
1064     // recycled by the SocketTube subscriber.
1065     //
1066     private static final class SSLDirectBufferSource implements BufferSource {
1067         private final BufferSupplier factory;
1068         private final HttpClientImpl client;
1069         private ByteBuffer current;
1070 
1071         public SSLDirectBufferSource(HttpClientImpl client) {
1072             this.client = Objects.requireNonNull(client);
1073             this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());
1074         }
1075 
1076         // Obtains a 'free' byte buffer from the pool, or returns
1077         // the same buffer if nothing was read at the previous cycle.
1078         // The subscriber will be responsible for recycling this
1079         // buffer into the pool (see SSLFlowDelegate.Reader)
1080         @Override
1081         public final ByteBuffer getBuffer() {
1082             assert client.isSelectorThread();
1083             ByteBuffer buf = current;
1084             if (buf == null) {
1085                 buf = current = factory.get();
1086             }
1087             assert buf.hasRemaining();
1088             assert buf.position() == 0;
1089             return buf;
1090         }
1091 
1092         // Adds the buffer to the list. The buffer will be later returned to the
1093         // pool by the subscriber (see SSLFlowDelegate.Reader).
1094         // The next buffer returned by getBuffer() will be obtained from the
1095         // pool. It might be the same buffer or another one.
1096         // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because
1097         // recycling will happen in the flow before onNext returns, then the
1098         // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though
1099         // it's shared by all SSL connections opened on that client.
1100         @Override
1101         public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
1102             assert client.isSelectorThread();
1103             assert buf.isDirect();
1104             assert start == 0;
1105             assert current == buf;
1106             current = null;
1107             buf.limit(buf.position());
1108             buf.position(start);
1109             // add the buffer to the list
1110             return SocketTube.listOf(list, buf);
1111         }
1112 
1113         @Override
1114         public void returnUnused(ByteBuffer buffer) {
1115             // if current is null, then the buffer will have been added to the
1116             // list, through append. Otherwise, current is not null, and needs
1117             // to be returned to prevent the buffer supplier pool from growing
1118             // to more than MAX_BUFFERS.
1119             assert buffer == current;
1120             ByteBuffer buf = current;
1121             if (buf != null) {
1122                 assert buf.position() == 0;
1123                 current = null;
1124                 // the supplier assert if buf has remaining
1125                 buf.limit(buf.position());
1126                 factory.recycle(buf);
1127             }
1128         }
1129     }
1130 
1131     // ===================================================================== //
1132     //                   Socket Channel Read/Write                           //
1133     // ===================================================================== //
1134     static final int MAX_BUFFERS = 3;
1135     static final List<ByteBuffer> EOF = List.of();
1136     static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER);
1137 
1138     // readAvailable() will read bytes into the 'current' ByteBuffer until
1139     // the ByteBuffer is full, or 0 or -1 (EOF) is returned by read().
1140     // When that happens, a slice of the data that has been read so far
1141     // is inserted into the returned buffer list, and if the current buffer
1142     // has remaining space, that space will be used to read more data when
1143     // the channel becomes readable again.
1144     private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {
1145         ByteBuffer buf = buffersSource.getBuffer();
1146         assert buf.hasRemaining();
1147 
1148         int read;
1149         int pos = buf.position();
1150         List<ByteBuffer> list = null;
1151         while (buf.hasRemaining()) {
1152             try {
1153                 while ((read = channel.read(buf)) > 0) {
1154                     if (!buf.hasRemaining())
1155                         break;
1156                 }
1157             } catch (IOException x) {
1158                 if (buf.position() == pos && list == null) {
1159                     // make sure that the buffer source will recycle
1160                     // 'buf' if needed
1161                     buffersSource.returnUnused(buf);
1162                     // no bytes have been read, just throw...
1163                     throw x;
1164                 } else {
1165                     // some bytes have been read, return them and fail next time
1166                     errorRef.compareAndSet(null, x);
1167                     read = 0; // ensures outer loop will exit
1168                 }
1169             }
1170 
1171             // nothing read;
1172             if (buf.position() == pos) {
1173                 // An empty list signals the end of data, and should only be
1174                 // returned if read == -1. If some data has already been read,
1175                 // then it must be returned. -1 will be returned next time
1176                 // the caller attempts to read something.
1177                 buffersSource.returnUnused(buf);
1178                 if (list == null) {
1179                     // nothing read - list was null - return EOF or NOTHING
1180                     list = read == -1 ? EOF : NOTHING;
1181                 }
1182                 break;
1183             }
1184 
1185             // check whether this buffer has still some free space available.
1186             // if so, we will keep it for the next round.
1187             list = buffersSource.append(list, buf, pos);
1188 
1189             if (read <= 0 || list.size() == MAX_BUFFERS) {
1190                 break;
1191             }
1192 
1193             buf = buffersSource.getBuffer();
1194             pos = buf.position();
1195             assert buf.hasRemaining();
1196         }
1197         return list;
1198     }
1199 
1200     private static <T> List<T> listOf(List<T> list, T item) {
1201         int size = list == null ? 0 : list.size();
1202         switch (size) {
1203             case 0: return List.of(item);
1204             case 1: return List.of(list.get(0), item);
1205             case 2: return List.of(list.get(0), list.get(1), item);
1206             default: // slow path if MAX_BUFFERS > 3
1207                 List<T> res = list instanceof ArrayList ? list : new ArrayList<>(list);
1208                 res.add(item);
1209                 return res;
1210         }
1211     }
1212 
1213     private long writeAvailable(List<ByteBuffer> bytes) throws IOException {
1214         ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY);
1215         final long remaining = Utils.remaining(srcs);
1216         long written = 0;
1217         while (remaining > written) {
1218             try {
1219                 long w = channel.write(srcs);
1220                 assert w >= 0 : "negative number of bytes written:" + w;
1221                 if (w == 0) {
1222                     break;
1223                 }
1224                 written += w;
1225             } catch (IOException x) {
1226                 if (written == 0) {
1227                     // no bytes were written just throw
1228                     throw x;
1229                 } else {
1230                     // return how many bytes were written, will fail next time
1231                     break;
1232                 }
1233             }
1234         }
1235         return written;
1236     }
1237 
1238     private void resumeEvent(SocketFlowEvent event,
1239                              Consumer<Throwable> errorSignaler) {
1240         boolean registrationRequired;
1241         synchronized(lock) {
1242             registrationRequired = !event.registered();
1243             event.resume();
1244         }
1245         try {
1246             if (registrationRequired) {
1247                 client.registerEvent(event);
1248              } else {
1249                 client.eventUpdated(event);
1250             }
1251         } catch(Throwable t) {
1252             errorSignaler.accept(t);
1253         }
1254    }
1255 
1256     private void pauseEvent(SocketFlowEvent event,
1257                             Consumer<Throwable> errorSignaler) {
1258         synchronized(lock) {
1259             event.pause();
1260         }
1261         try {
1262             client.eventUpdated(event);
1263         } catch(Throwable t) {
1264             errorSignaler.accept(t);
1265         }
1266     }
1267 
1268     @Override
1269     public void connectFlows(TubePublisher writePublisher,
1270                              TubeSubscriber readSubscriber) {
1271         if (debug.on()) debug.log("connecting flows");
1272         this.subscribe(readSubscriber);
1273         writePublisher.subscribe(this);
1274     }
1275 
1276 
1277     @Override
1278     public String toString() {
1279         return dbgString();
1280     }
1281 
1282     final String dbgString() {
1283         return "SocketTube("+id+")";
1284     }
1285 
1286     final String channelDescr() {
1287         return String.valueOf(channel);
1288     }
1289 }