1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.EOFException;
  29 import java.io.IOException;
  30 import java.lang.System.Logger.Level;
  31 import java.nio.ByteBuffer;
  32 import java.util.List;
  33 import java.util.Objects;
  34 import java.util.concurrent.Flow;
  35 import java.util.concurrent.atomic.AtomicLong;
  36 import java.util.concurrent.atomic.AtomicReference;
  37 import java.nio.channels.SelectableChannel;
  38 import java.nio.channels.SelectionKey;
  39 import java.nio.channels.SocketChannel;
  40 import java.util.ArrayList;
  41 import java.util.function.Consumer;
  42 import java.util.function.Supplier;
  43 
  44 import jdk.incubator.http.internal.common.Demand;
  45 import jdk.incubator.http.internal.common.FlowTube;
  46 import jdk.incubator.http.internal.common.SequentialScheduler;
  47 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
  48 import jdk.incubator.http.internal.common.SequentialScheduler.RestartableTask;
  49 import jdk.incubator.http.internal.common.Utils;
  50 
  51 /**
  52  * A SocketTube is a terminal tube plugged directly into the socket.
  53  * The read subscriber should call {@code subscribe} on the SocketTube before
  54  * the SocketTube can be subscribed to the write publisher.
  55  */
  56 final class SocketTube implements FlowTube {
  57 
  58     static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag
  59     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
  60     static final AtomicLong IDS = new AtomicLong();
  61 
  62     private final HttpClientImpl client;
  63     private final SocketChannel channel;
  64     private final Supplier<ByteBuffer> buffersSource;
  65     private final Object lock = new Object();
  66     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
  67     private final InternalReadPublisher readPublisher;
  68     private final InternalWriteSubscriber writeSubscriber;
  69     private final long id = IDS.incrementAndGet();
  70 
  71     public SocketTube(HttpClientImpl client, SocketChannel channel,
  72                       Supplier<ByteBuffer> buffersSource) {
  73         this.client = client;
  74         this.channel = channel;
  75         this.buffersSource = buffersSource;
  76         this.readPublisher = new InternalReadPublisher();
  77         this.writeSubscriber = new InternalWriteSubscriber();
  78     }
  79 
  80 //    private static Flow.Subscription nopSubscription() {
  81 //        return new Flow.Subscription() {
  82 //            @Override public void request(long n) { }
  83 //            @Override public void cancel() { }
  84 //        };
  85 //    }
  86 
  87     /**
  88      * Returns {@code true} if this flow is finished.
  89      * This happens when this flow internal read subscription is completed,
  90      * either normally (EOF reading) or exceptionally  (EOF writing, or
  91      * underlying socket closed, or some exception occurred while reading or
  92      * writing to the socket).
  93      *
  94      * @return {@code true} if this flow is finished.
  95      */
  96     public boolean isFinished() {
  97         InternalReadPublisher.InternalReadSubscription subscription =
  98                 readPublisher.subscriptionImpl;
  99         return subscription != null && subscription.completed
 100                 || subscription == null && errorRef.get() != null;
 101     }
 102 
 103     // ===================================================================== //
 104     //                       Flow.Publisher                                  //
 105     // ======================================================================//
 106 
 107     /**
 108      * {@inheritDoc }
 109      * @apiNote This method should be called first. In particular, the caller
 110      *          must ensure that this method must be called by the read
 111      *          subscriber before the write publisher can call {@code onSubscribe}.
 112      *          Failure to adhere to this contract may result in assertion errors.
 113      */
 114     @Override
 115     public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
 116         Objects.requireNonNull(s);
 117         assert s instanceof TubeSubscriber : "Expected TubeSubscriber, got:" + s;
 118         readPublisher.subscribe(s);
 119     }
 120 
 121 
 122     // ===================================================================== //
 123     //                       Flow.Subscriber                                 //
 124     // ======================================================================//
 125 
 126     /**
 127      * {@inheritDoc }
 128      * @apiNote The caller must ensure that {@code subscribe} is called by
 129      *          the read subscriber before {@code onSubscribe} is called by
 130      *          the write publisher.
 131      *          Failure to adhere to this contract may result in assertion errors.
 132      */
 133     @Override
 134     public void onSubscribe(Flow.Subscription subscription) {
 135         writeSubscriber.onSubscribe(subscription);
 136     }
 137 
 138     @Override
 139     public void onNext(List<ByteBuffer> item) {
 140         writeSubscriber.onNext(item);
 141     }
 142 
 143     @Override
 144     public void onError(Throwable throwable) {
 145         writeSubscriber.onError(throwable);
 146     }
 147 
 148     @Override
 149     public void onComplete() {
 150         writeSubscriber.onComplete();
 151     }
 152 
 153     // ===================================================================== //
 154     //                           Events                                      //
 155     // ======================================================================//
 156 
 157     /**
 158      * A restartable task used to process tasks in sequence.
 159      */
 160     private static class SocketFlowTask implements RestartableTask {
 161         final Runnable task;
 162         private final Object monitor = new Object();
 163         SocketFlowTask(Runnable task) {
 164             this.task = task;
 165         }
 166         @Override
 167         public final void run(DeferredCompleter taskCompleter) {
 168             try {
 169                 // non contentious synchronized for visibility.
 170                 synchronized(monitor) {
 171                     task.run();
 172                 }
 173             } finally {
 174                 taskCompleter.complete();
 175             }
 176         }
 177     }
 178 
 179     // This is best effort - there's no guarantee that the printed set
 180     // of values is consistent. It should only be considered as
 181     // weakly accurate - in particular in what concerns the events states,
 182     // especially when displaying a read event state from a write event
 183     // callback and conversely.
 184     void debugState(String when) {
 185         if (debug.isLoggable(Level.DEBUG)) {
 186             StringBuilder state = new StringBuilder();
 187 
 188             InternalReadPublisher.InternalReadSubscription sub =
 189                     readPublisher.subscriptionImpl;
 190             InternalReadPublisher.ReadEvent readEvent =
 191                     sub == null ? null : sub.readEvent;
 192             Demand rdemand = sub == null ? null : sub.demand;
 193             InternalWriteSubscriber.WriteEvent writeEvent =
 194                     writeSubscriber.writeEvent;
 195             AtomicLong wdemand = writeSubscriber.writeDemand;
 196             int rops = readEvent == null ? 0 : readEvent.interestOps();
 197             long rd = rdemand == null ? 0 : rdemand.get();
 198             int wops = writeEvent == null ? 0 : writeEvent.interestOps();
 199             long wd = wdemand == null ? 0 : wdemand.get();
 200 
 201             state.append(when).append(" Reading: [ops=")
 202                     .append(rops).append(", demand=").append(rd)
 203                     .append(", stopped=")
 204                     .append((sub == null ? false : sub.readScheduler.isStopped()))
 205                     .append("], Writing: [ops=").append(wops)
 206                     .append(", demand=").append(wd)
 207                     .append("]");
 208             debug.log(Level.DEBUG, state.toString());
 209         }
 210     }
 211 
 212     /**
 213      * A repeatable event that can be paused or resumed by changing
 214      * its interestOps.
 215      * When the event is fired, it is first paused before being signaled.
 216      * It is the responsibility of the code triggered by {@code signalEvent}
 217      * to resume the event if required.
 218      */
 219     private static abstract class SocketFlowEvent extends AsyncEvent {
 220         final SocketChannel channel;
 221         final int defaultInterest;
 222         volatile int interestOps;
 223         volatile boolean registered;
 224         SocketFlowEvent(int defaultInterest, SocketChannel channel) {
 225             super(AsyncEvent.REPEATING);
 226             this.defaultInterest = defaultInterest;
 227             this.channel = channel;
 228         }
 229         final boolean registered() {return registered;}
 230         final void resume() {
 231             interestOps = defaultInterest;
 232             registered = true;
 233         }
 234         final void pause() {interestOps = 0;}
 235         @Override
 236         public final SelectableChannel channel() {return channel;}
 237         @Override
 238         public final int interestOps() {return interestOps;}
 239 
 240         @Override
 241         public final void handle() {
 242             pause();       // pause, then signal
 243             signalEvent(); // won't be fired again until resumed.
 244         }
 245         @Override
 246         public final void abort(IOException error) {
 247             debug().log(Level.DEBUG, () -> "abort: " + error);
 248             pause();              // pause, then signal
 249             signalError(error);   // should not be resumed after abort (not checked)
 250         }
 251 
 252         protected abstract void signalEvent();
 253         protected abstract void signalError(Throwable error);
 254         abstract System.Logger debug();
 255     }
 256 
 257     // ===================================================================== //
 258     //                              Writing                                  //
 259     // ======================================================================//
 260 
 261     // This class makes the assumption that the publisher will call
 262     // onNext sequentially, and that onNext won't be called if the demand
 263     // has not been incremented by request(1).
 264     // It has a 'queue of 1' meaning that it will call request(1) in
 265     // onSubscribe, and then only after its 'current' buffer list has been
 266     // fully written and current set to null;
 267     private final class InternalWriteSubscriber
 268             implements Flow.Subscriber<List<ByteBuffer>> {
 269 
 270         volatile Flow.Subscription subscription;
 271         volatile List<ByteBuffer> current;
 272         volatile boolean completed;
 273         final WriteEvent writeEvent = new WriteEvent(channel, this);
 274         final AtomicLong writeDemand = new AtomicLong();
 275 
 276         @Override
 277         public void onSubscribe(Flow.Subscription subscription) {
 278             Flow.Subscription previous = this.subscription;
 279             this.subscription = subscription;
 280             debug.log(Level.DEBUG, "subscribed for writing");
 281             if (current == null) {
 282                 if (previous == subscription || previous == null) {
 283                     if (writeDemand.compareAndSet(0, 1)) {
 284                         subscription.request(1);
 285                     }
 286                 } else {
 287                     writeDemand.set(1);
 288                     subscription.request(1);
 289                 }
 290             }
 291         }
 292 
 293         @Override
 294         public void onNext(List<ByteBuffer> bufs) {
 295             assert current == null; // this is a queue of 1.
 296             assert subscription != null;
 297             current = bufs;
 298             tryFlushCurrent(client.isSelectorThread()); // may be in selector thread
 299             // For instance in HTTP/2, a received SETTINGS frame might trigger
 300             // the sending of a SETTINGS frame in turn which might cause
 301             // onNext to be called from within the same selector thread that the
 302             // original SETTINGS frames arrived on. If rs is the read-subscriber
 303             // and ws is the write-subscriber then the following can occur:
 304             // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
 305             // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
 306             debugState("leaving w.onNext");
 307         }
 308 
 309         // we don't use a SequentialScheduler here: we rely on
 310         // onNext() being called sequentially, and not being called
 311         // if we haven't call request(1)
 312         // onNext is usually called from within a user/executor thread.
 313         // we will perform the initial writing in that thread.
 314         // if for some reason, not all data can be written, the writeEvent
 315         // will be resumed, and the rest of the data will be written from
 316         // the selector manager thread when the writeEvent is fired.
 317         // If we are in the selector manager thread, then we will use the executor
 318         // to call request(1), ensuring that onNext() won't be called from
 319         // within the selector thread.
 320         // If we are not in the selector manager thread, then we don't care.
 321         void tryFlushCurrent(boolean inSelectorThread) {
 322             List<ByteBuffer> bufs = current;
 323             if (bufs == null) return;
 324             try {
 325                 assert inSelectorThread == client.isSelectorThread() :
 326                        "should " + (inSelectorThread ? "" : "not ")
 327                         + " be in the selector thread";
 328                 long remaining = Utils.remaining(bufs);
 329                 debug.log(Level.DEBUG, "trying to write: %d", remaining);
 330                 long written = writeAvailable(bufs);
 331                 debug.log(Level.DEBUG, "wrote: %d", remaining);
 332                 if (written == -1) {
 333                     signalError(new EOFException("EOF reached while writing"));
 334                     return;
 335                 }
 336                 assert written <= remaining;
 337                 if (remaining - written == 0) {
 338                     current = null;
 339                     writeDemand.decrementAndGet();
 340                     Runnable requestMore = this::requestMore;
 341                     if (inSelectorThread) {
 342                         assert client.isSelectorThread();
 343                         client.theExecutor().execute(requestMore);
 344                     } else {
 345                         assert !client.isSelectorThread();
 346                         requestMore.run();
 347                     }
 348                 } else {
 349                     resumeWriteEvent(inSelectorThread);
 350                 }
 351             } catch (Throwable t) {
 352                 signalError(t);
 353                 subscription.cancel();
 354             }
 355         }
 356 
 357         void requestMore() {
 358             try {
 359                 if (completed) return;
 360                 long d =  writeDemand.get();
 361                 if (writeDemand.compareAndSet(0,1)) {
 362                     debug.log(Level.DEBUG, "write: requesting more...");
 363                     subscription.request(1);
 364                 } else {
 365                     debug.log(Level.DEBUG, "write: no need to request more: %d", d);
 366                 }
 367             } catch (Throwable t) {
 368                 debug.log(Level.DEBUG, () ->
 369                         "write: error while requesting more: " + t);
 370                 signalError(t);
 371                 subscription.cancel();
 372             } finally {
 373                 debugState("leaving requestMore: ");
 374             }
 375         }
 376 
 377         @Override
 378         public void onError(Throwable throwable) {
 379             signalError(throwable);
 380         }
 381 
 382         @Override
 383         public void onComplete() {
 384             completed = true;
 385             // no need to pause the write event here: the write event will
 386             // be paused if there is nothing more to write.
 387             List<ByteBuffer> bufs = current;
 388             long remaining = bufs == null ? 0 : Utils.remaining(bufs);
 389             debug.log(Level.DEBUG,  "write completed, %d yet to send", remaining);
 390             debugState("InternalWriteSubscriber::onComplete");
 391         }
 392 
 393         void resumeWriteEvent(boolean inSelectorThread) {
 394             debug.log(Level.DEBUG, "scheduling write event");
 395             resumeEvent(writeEvent, this::signalError);
 396         }
 397 
 398 //        void pauseWriteEvent() {
 399 //            debug.log(Level.DEBUG, "pausing write event");
 400 //            pauseEvent(writeEvent, this::signalError);
 401 //        }
 402 
 403         void signalWritable() {
 404             debug.log(Level.DEBUG, "channel is writable");
 405             tryFlushCurrent(true);
 406         }
 407 
 408         void signalError(Throwable error) {
 409             debug.log(Level.DEBUG, () -> "write error: " + error);
 410             completed = true;
 411             readPublisher.signalError(error);
 412         }
 413 
 414         // A repeatable WriteEvent which is paused after firing and can
 415         // be resumed if required - see SocketFlowEvent;
 416         final class WriteEvent extends SocketFlowEvent {
 417             final InternalWriteSubscriber sub;
 418             WriteEvent(SocketChannel channel, InternalWriteSubscriber sub) {
 419                 super(SelectionKey.OP_WRITE, channel);
 420                 this.sub = sub;
 421             }
 422             @Override
 423             protected final void signalEvent() {
 424                 try {
 425                     client.eventUpdated(this);
 426                     sub.signalWritable();
 427                 } catch(Throwable t) {
 428                     sub.signalError(t);
 429                 }
 430             }
 431 
 432             @Override
 433             protected void signalError(Throwable error) {
 434                 sub.signalError(error);
 435             }
 436 
 437             @Override
 438             System.Logger debug() {
 439                 return debug;
 440             }
 441 
 442         }
 443 
 444     }
 445 
 446     // ===================================================================== //
 447     //                              Reading                                  //
 448     // ===================================================================== //
 449 
 450     // The InternalReadPublisher uses a SequentialScheduler to ensure that
 451     // onNext/onError/onComplete are called sequentially on the caller's
 452     // subscriber.
 453     // However, it relies on the fact that the only time where
 454     // runOrSchedule() is called from a user/executor thread is in signalError,
 455     // right after the errorRef has been set.
 456     // Because the sequential scheduler's task always checks for errors first,
 457     // and always terminate the scheduler on error, then it is safe to assume
 458     // that if it reaches the point where it reads from the channel, then
 459     // it is running in the SelectorManager thread. This is because all
 460     // other invocation of runOrSchedule() are triggered from within a
 461     // ReadEvent.
 462     //
 463     // When pausing/resuming the event, some shortcuts can then be taken
 464     // when we know we're running in the selector manager thread
 465     // (in that case there's no need to call client.eventUpdated(readEvent);
 466     //
 467     private final class InternalReadPublisher
 468             implements Flow.Publisher<List<ByteBuffer>> {
 469         private final InternalReadSubscription subscriptionImpl
 470                 = new InternalReadSubscription();
 471         AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();
 472         private volatile ReadSubscription subscription;
 473 
 474         @Override
 475         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
 476             Objects.requireNonNull(s);
 477 
 478             TubeSubscriber sub = FlowTube.asTubeSubscriber(s);
 479             ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
 480             ReadSubscription previous = pendingSubscription.getAndSet(target);
 481 
 482             if (previous != null && previous != target) {
 483                 debug.log(Level.DEBUG,
 484                         () -> "read publisher: dropping pending subscriber: "
 485                         + previous.subscriber);
 486                 previous.errorRef.compareAndSet(null, errorRef.get());
 487                 previous.signalOnSubscribe();
 488                 if (subscriptionImpl.completed) {
 489                     previous.signalCompletion();
 490                 } else {
 491                     previous.subscriber.dropSubscription();
 492                 }
 493             }
 494 
 495             debug.log(Level.DEBUG, "read publisher got subscriber");
 496             subscriptionImpl.signalSubscribe();
 497             debugState("leaving read.subscribe: ");
 498         }
 499 
 500         void signalError(Throwable error) {
 501             if (!errorRef.compareAndSet(null, error)) {
 502                 return;
 503             }
 504             subscriptionImpl.handleError();
 505         }
 506 
 507         final class ReadSubscription implements Flow.Subscription {
 508             final InternalReadSubscription impl;
 509             final TubeSubscriber  subscriber;
 510             final AtomicReference<Throwable> errorRef = new AtomicReference<>();
 511             volatile boolean subscribed;
 512             volatile boolean cancelled;
 513             volatile boolean completed;
 514 
 515             public ReadSubscription(InternalReadSubscription impl,
 516                                     TubeSubscriber subscriber) {
 517                 this.impl = impl;
 518                 this.subscriber = subscriber;
 519             }
 520 
 521             @Override
 522             public void cancel() {
 523                 cancelled = true;
 524             }
 525 
 526             @Override
 527             public void request(long n) {
 528                 if (!cancelled) {
 529                     impl.request(n);
 530                 } else {
 531                     debug.log(Level.DEBUG,
 532                               "subscription cancelled, ignoring request %d", n);
 533                 }
 534             }
 535 
 536             void signalCompletion() {
 537                 assert subscribed || cancelled;
 538                 if (completed || cancelled) return;
 539                 synchronized (this) {
 540                     if (completed) return;
 541                     completed = true;
 542                 }
 543                 Throwable error = errorRef.get();
 544                 if (error != null) {
 545                     debug.log(Level.DEBUG, () ->
 546                         "forwarding error to subscriber: "
 547                         + error);
 548                     subscriber.onError(error);
 549                 } else {
 550                     debug.log(Level.DEBUG, "completing subscriber");
 551                     subscriber.onComplete();
 552                 }
 553             }
 554 
 555             void signalOnSubscribe() {
 556                 if (subscribed || cancelled) return;
 557                 synchronized (this) {
 558                     if (subscribed || cancelled) return;
 559                     subscribed = true;
 560                 }
 561                 subscriber.onSubscribe(this);
 562                 debug.log(Level.DEBUG, "onSubscribe called");
 563                 if (errorRef.get() != null) {
 564                     signalCompletion();
 565                 }
 566             }
 567         }
 568 
 569         final class InternalReadSubscription implements Flow.Subscription {
 570 
 571             private final Demand demand = new Demand();
 572             final SequentialScheduler readScheduler;
 573             private volatile boolean completed;
 574             private final ReadEvent readEvent;
 575             private final AsyncEvent subscribeEvent;
 576 
 577             InternalReadSubscription() {
 578                 readScheduler = new SequentialScheduler(new SocketFlowTask(this::read));
 579                 subscribeEvent = new AsyncTriggerEvent(this::signalError,
 580                                                        this::handleSubscribeEvent);
 581                 readEvent = new ReadEvent(channel, this);
 582             }
 583 
 584             /*
 585              * This method must be invoked before any other method of this class.
 586              */
 587             final void signalSubscribe() {
 588                 if (readScheduler.isStopped() || completed) {
 589                     // if already completed or stopped we can handle any
 590                     // pending connection directly from here.
 591                     debug.log(Level.DEBUG,
 592                               "handling pending subscription while completed");
 593                     handlePending();
 594                 } else {
 595                     try {
 596                         debug.log(Level.DEBUG,
 597                                   "registering subscribe event");
 598                         client.registerEvent(subscribeEvent);
 599                     } catch (Throwable t) {
 600                         signalError(t);
 601                         handlePending();
 602                     }
 603                 }
 604             }
 605 
 606             final void handleSubscribeEvent() {
 607                 assert client.isSelectorThread();
 608                 debug.log(Level.DEBUG, "subscribe event raised");
 609                 readScheduler.runOrSchedule();
 610                 if (readScheduler.isStopped() || completed) {
 611                     // if already completed or stopped we can handle any
 612                     // pending connection directly from here.
 613                     debug.log(Level.DEBUG,
 614                               "handling pending subscription when completed");
 615                     handlePending();
 616                 }
 617             }
 618 
 619 
 620             /*
 621              * Although this method is thread-safe, the Reactive-Streams spec seems
 622              * to not require it to be as such. It's a responsibility of the
 623              * subscriber to signal demand in a thread-safe manner.
 624              *
 625              * https://github.com/reactive-streams/reactive-streams-jvm/blob/dd24d2ab164d7de6c316f6d15546f957bec29eaa/README.md
 626              * (rules 2.7 and 3.4)
 627              */
 628             @Override
 629             public final void request(long n) {
 630                 if (n > 0L) {
 631                     boolean wasFulfilled = demand.increase(n);
 632                     if (wasFulfilled) {
 633                         debug.log(Level.DEBUG, "got some demand for reading");
 634                         resumeReadEvent();
 635                         // if demand has been changed from fulfilled
 636                         // to unfulfilled register read event;
 637                     }
 638                 } else {
 639                     signalError(new IllegalArgumentException("non-positive request"));
 640                 }
 641                 debugState("leaving request("+n+"): ");
 642             }
 643 
 644             @Override
 645             public final void cancel() {
 646                 pauseReadEvent();
 647                 readScheduler.stop();
 648             }
 649 
 650             private void resumeReadEvent() {
 651                 debug.log(Level.DEBUG, "resuming read event");
 652                 resumeEvent(readEvent, this::signalError);
 653             }
 654 
 655             private void pauseReadEvent() {
 656                 debug.log(Level.DEBUG, "pausing read event");
 657                 pauseEvent(readEvent, this::signalError);
 658             }
 659 
 660 
 661             final void handleError() {
 662                 assert errorRef.get() != null;
 663                 readScheduler.runOrSchedule();
 664             }
 665 
 666             final void signalError(Throwable error) {
 667                 if (!errorRef.compareAndSet(null, error)) {
 668                     return;
 669                 }
 670                 debug.log(Level.DEBUG, () -> "got read error: " + error);
 671                 readScheduler.runOrSchedule();
 672             }
 673 
 674             final void signalReadable() {
 675                 readScheduler.runOrSchedule();
 676             }
 677 
 678             /** The body of the task that runs in SequentialScheduler. */
 679             final void read() {
 680                 // It is important to only call pauseReadEvent() when stopping
 681                 // the scheduler. The event is automatically paused before
 682                 // firing, and trying to pause it again could cause a race
 683                 // condition between this loop, which calls tryDecrementDemand(),
 684                 // and the thread that calls request(n), which will try to resume
 685                 // reading.
 686                 try {
 687                     while(!readScheduler.isStopped()) {
 688                         if (completed) return;
 689 
 690                         // make sure we have a subscriber
 691                         if (handlePending()) {
 692                             debug.log(Level.DEBUG, "pending subscriber subscribed");
 693                             return;
 694                         }
 695 
 696                         // If an error was signaled, we might not be in the
 697                         // the selector thread, and that is OK, because we
 698                         // will just call onError and return.
 699                         ReadSubscription current = subscription;
 700                         TubeSubscriber subscriber = current.subscriber;
 701                         Throwable error = errorRef.get();
 702                         if (error != null) {
 703                             completed = true;
 704                             // safe to pause here because we're finished anyway.
 705                             pauseReadEvent();
 706                             debug.log(Level.DEBUG, () -> "Sending error " + error
 707                                   + " to subscriber " + subscriber);
 708                             current.errorRef.compareAndSet(null, error);
 709                             current.signalCompletion();
 710                             readScheduler.stop();
 711                             debugState("leaving read() loop with error: ");
 712                             return;
 713                         }
 714 
 715                         // If we reach here then we must be in the selector thread.
 716                         assert client.isSelectorThread();
 717                         if (demand.tryDecrement()) {
 718                             // we have demand.
 719                             try {
 720                                 List<ByteBuffer> bytes = readAvailable();
 721                                 if (bytes == EOF) {
 722                                     if (!completed) {
 723                                         debug.log(Level.DEBUG, "got read EOF");
 724                                         completed = true;
 725                                         // safe to pause here because we're finished
 726                                         // anyway.
 727                                         pauseReadEvent();
 728                                         current.signalCompletion();
 729                                         readScheduler.stop();
 730                                     }
 731                                     debugState("leaving read() loop after EOF: ");
 732                                     return;
 733                                 } else if (Utils.remaining(bytes) > 0) {
 734                                     // the subscriber is responsible for offloading
 735                                     // to another thread if needed.
 736                                     debug.log(Level.DEBUG, () -> "read bytes: "
 737                                             + Utils.remaining(bytes));
 738                                     assert !current.completed;
 739                                     subscriber.onNext(bytes);
 740                                     // we could continue looping until the demand
 741                                     // reaches 0. However, that would risk starving
 742                                     // other connections (bound to other socket
 743                                     // channels) - as other selected keys activated
 744                                     // by the selector manager thread might be
 745                                     // waiting for this event to terminate.
 746                                     // So resume the read event and return now...
 747                                     resumeReadEvent();
 748                                     debugState("leaving read() loop after onNext: ");
 749                                     return;
 750                                 } else {
 751                                     // nothing available!
 752                                     debug.log(Level.DEBUG, "no more bytes available");
 753                                     // re-increment the demand and resume the read
 754                                     // event. This ensures that this loop is
 755                                     // executed again when the socket becomes
 756                                     // readable again.
 757                                     demand.increase(1);
 758                                     resumeReadEvent();
 759                                     debugState("leaving read() loop with no bytes");
 760                                     return;
 761                                 }
 762                             } catch (Throwable x) {
 763                                 signalError(x);
 764                                 continue;
 765                             }
 766                         } else {
 767                             debug.log(Level.DEBUG, "no more demand for reading");
 768                             // the event is paused just after firing, so it should
 769                             // still be paused here, unless the demand was just
 770                             // incremented from 0 to n, in which case, the
 771                             // event will be resumed, causing this loop to be
 772                             // invoked again when the socket becomes readable:
 773                             // This is what we want.
 774                             // Trying to pause the event here would actually
 775                             // introduce a race condition between this loop and
 776                             // request(n).
 777                             debugState("leaving read() loop with no demand");
 778                             break;
 779                         }
 780                     }
 781                 } catch (Throwable t) {
 782                     debug.log(Level.DEBUG, "Unexpected exception in read loop", t);
 783                     signalError(t);
 784                 } finally {
 785                     handlePending();
 786                 }
 787             }
 788 
 789             boolean handlePending() {
 790                 ReadSubscription pending = pendingSubscription.getAndSet(null);
 791                 if (pending == null) return false;
 792                 debug.log(Level.DEBUG, "handling pending subscription for %s",
 793                           pending.subscriber);
 794                 ReadSubscription current = subscription;
 795                 if (current != null && current != pending && !completed) {
 796                     current.subscriber.dropSubscription();
 797                 }
 798                 debug.log(Level.DEBUG, "read demand reset to 0");
 799                 subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.
 800                 pending.errorRef.compareAndSet(null, errorRef.get());
 801                 if (!readScheduler.isStopped()) {
 802                     subscription = pending;
 803                 } else {
 804                     debug.log(Level.DEBUG, "socket tube is already stopped");
 805                 }
 806                 debug.log(Level.DEBUG, "calling onSubscribe");
 807                 pending.signalOnSubscribe();
 808                 if (completed) {
 809                     pending.errorRef.compareAndSet(null, errorRef.get());
 810                     pending.signalCompletion();
 811                 }
 812                 return true;
 813             }
 814         }
 815 
 816 
 817         // A repeatable ReadEvent which is paused after firing and can
 818         // be resumed if required - see SocketFlowEvent;
 819         final class ReadEvent extends SocketFlowEvent {
 820             final InternalReadSubscription sub;
 821             ReadEvent(SocketChannel channel, InternalReadSubscription sub) {
 822                 super(SelectionKey.OP_READ, channel);
 823                 this.sub = sub;
 824             }
 825             @Override
 826             protected final void signalEvent() {
 827                 try {
 828                     client.eventUpdated(this);
 829                     sub.signalReadable();
 830                 } catch(Throwable t) {
 831                     sub.signalError(t);
 832                 }
 833             }
 834 
 835             @Override
 836             protected final void signalError(Throwable error) {
 837                 sub.signalError(error);
 838             }
 839 
 840             @Override
 841             System.Logger debug() {
 842                 return debug;
 843             }
 844         }
 845 
 846     }
 847 
 848     // ===================================================================== //
 849     //                   Socket Channel Read/Write                           //
 850     // ===================================================================== //
 851     static final int MAX_BUFFERS = 3;
 852     static final List<ByteBuffer> EOF = List.of();
 853 
 854     private List<ByteBuffer> readAvailable() throws IOException {
 855         ByteBuffer buf = buffersSource.get();
 856         assert buf.hasRemaining();
 857 
 858         int read;
 859         int pos = buf.position();
 860         List<ByteBuffer> list = null;
 861         while (buf.hasRemaining()) {
 862             while ((read = channel.read(buf)) > 0) {
 863                if (!buf.hasRemaining()) break;
 864             }
 865 
 866             // nothing read;
 867             if (buf.position() == pos) {
 868                 // An empty list signal the end of data, and should only be
 869                 // returned if read == -1.
 870                 // If we already read some data, then we must return what we have
 871                 // read, and -1 will be returned next time the caller attempts to
 872                 // read something.
 873                 if (list == null && read == -1) {  // eof
 874                     list = EOF;
 875                     break;
 876                 }
 877             }
 878             buf.limit(buf.position());
 879             buf.position(pos);
 880             if (list == null) {
 881                 list = List.of(buf);
 882             } else {
 883                 if (!(list instanceof ArrayList)) {
 884                     list = new ArrayList<>(list);
 885                 }
 886                 list.add(buf);
 887             }
 888             if (read <= 0 || list.size() == MAX_BUFFERS) break;
 889             buf = buffersSource.get();
 890             pos = buf.position();
 891             assert buf.hasRemaining();
 892         }
 893         return list;
 894     }
 895 
 896     private long writeAvailable(List<ByteBuffer> bytes) throws IOException {
 897         ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY);
 898         final long remaining = Utils.remaining(srcs);
 899         long written = 0;
 900         while (remaining > written) {
 901             long w = channel.write(srcs);
 902             if (w == -1 && written == 0) return -1;
 903             if (w == 0) break;
 904             written += w;
 905         }
 906         return written;
 907     }
 908 
 909     private void resumeEvent(SocketFlowEvent event,
 910                              Consumer<Throwable> errorSignaler) {
 911         boolean registrationRequired;
 912         synchronized(lock) {
 913             registrationRequired = !event.registered();
 914             event.resume();
 915         }
 916         try {
 917             if (registrationRequired) {
 918                 client.registerEvent(event);
 919              } else {
 920                 client.eventUpdated(event);
 921             }
 922         } catch(Throwable t) {
 923             errorSignaler.accept(t);
 924         }
 925    }
 926 
 927     private void pauseEvent(SocketFlowEvent event,
 928                             Consumer<Throwable> errorSignaler) {
 929         synchronized(lock) {
 930             event.pause();
 931         }
 932         try {
 933             client.eventUpdated(event);
 934         } catch(Throwable t) {
 935             errorSignaler.accept(t);
 936         }
 937     }
 938 
 939     @Override
 940     public void connectFlows(TubePublisher writePublisher,
 941                              TubeSubscriber readSubscriber) {
 942         debug.log(Level.DEBUG, "connecting flows");
 943         this.subscribe(readSubscriber);
 944         writePublisher.subscribe(this);
 945     }
 946 
 947 
 948     @Override
 949     public String toString() {
 950         return dbgString();
 951     }
 952 
 953     final String dbgString() {
 954         return "SocketTube("+id+")";
 955     }
 956 }