1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.EOFException; 29 import java.io.IOException; 30 import java.lang.System.Logger.Level; 31 import java.nio.ByteBuffer; 32 import java.util.List; 33 import java.util.Objects; 34 import java.util.concurrent.Flow; 35 import java.util.concurrent.atomic.AtomicLong; 36 import java.util.concurrent.atomic.AtomicReference; 37 import java.nio.channels.SelectableChannel; 38 import java.nio.channels.SelectionKey; 39 import java.nio.channels.SocketChannel; 40 import java.util.ArrayList; 41 import java.util.function.Consumer; 42 import java.util.function.Supplier; 43 44 import jdk.incubator.http.internal.common.Demand; 45 import jdk.incubator.http.internal.common.FlowTube; 46 import jdk.incubator.http.internal.common.SequentialScheduler; 47 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter; 48 import jdk.incubator.http.internal.common.SequentialScheduler.RestartableTask; 49 import jdk.incubator.http.internal.common.Utils; 50 51 /** 52 * A SocketTube is a terminal tube plugged directly into the socket. 53 * The read subscriber should call {@code subscribe} on the SocketTube before 54 * the SocketTube can be subscribed to the write publisher. 55 */ 56 final class SocketTube implements FlowTube { 57 58 static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag 59 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 60 static final AtomicLong IDS = new AtomicLong(); 61 62 private final HttpClientImpl client; 63 private final SocketChannel channel; 64 private final Supplier<ByteBuffer> buffersSource; 65 private final Object lock = new Object(); 66 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 67 private final InternalReadPublisher readPublisher; 68 private final InternalWriteSubscriber writeSubscriber; 69 private final long id = IDS.incrementAndGet(); 70 71 public SocketTube(HttpClientImpl client, SocketChannel channel, 72 Supplier<ByteBuffer> buffersSource) { 73 this.client = client; 74 this.channel = channel; 75 this.buffersSource = buffersSource; 76 this.readPublisher = new InternalReadPublisher(); 77 this.writeSubscriber = new InternalWriteSubscriber(); 78 } 79 80 // private static Flow.Subscription nopSubscription() { 81 // return new Flow.Subscription() { 82 // @Override public void request(long n) { } 83 // @Override public void cancel() { } 84 // }; 85 // } 86 87 /** 88 * Returns {@code true} if this flow is finished. 89 * This happens when this flow internal read subscription is completed, 90 * either normally (EOF reading) or exceptionally (EOF writing, or 91 * underlying socket closed, or some exception occurred while reading or 92 * writing to the socket). 93 * 94 * @return {@code true} if this flow is finished. 95 */ 96 public boolean isFinished() { 97 InternalReadPublisher.InternalReadSubscription subscription = 98 readPublisher.subscriptionImpl; 99 return subscription != null && subscription.completed 100 || subscription == null && errorRef.get() != null; 101 } 102 103 // ===================================================================== // 104 // Flow.Publisher // 105 // ======================================================================// 106 107 /** 108 * {@inheritDoc } 109 * @apiNote This method should be called first. In particular, the caller 110 * must ensure that this method must be called by the read 111 * subscriber before the write publisher can call {@code onSubscribe}. 112 * Failure to adhere to this contract may result in assertion errors. 113 */ 114 @Override 115 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { 116 Objects.requireNonNull(s); 117 assert s instanceof TubeSubscriber : "Expected TubeSubscriber, got:" + s; 118 readPublisher.subscribe(s); 119 } 120 121 122 // ===================================================================== // 123 // Flow.Subscriber // 124 // ======================================================================// 125 126 /** 127 * {@inheritDoc } 128 * @apiNote The caller must ensure that {@code subscribe} is called by 129 * the read subscriber before {@code onSubscribe} is called by 130 * the write publisher. 131 * Failure to adhere to this contract may result in assertion errors. 132 */ 133 @Override 134 public void onSubscribe(Flow.Subscription subscription) { 135 writeSubscriber.onSubscribe(subscription); 136 } 137 138 @Override 139 public void onNext(List<ByteBuffer> item) { 140 writeSubscriber.onNext(item); 141 } 142 143 @Override 144 public void onError(Throwable throwable) { 145 writeSubscriber.onError(throwable); 146 } 147 148 @Override 149 public void onComplete() { 150 writeSubscriber.onComplete(); 151 } 152 153 // ===================================================================== // 154 // Events // 155 // ======================================================================// 156 157 /** 158 * A restartable task used to process tasks in sequence. 159 */ 160 private static class SocketFlowTask implements RestartableTask { 161 final Runnable task; 162 private final Object monitor = new Object(); 163 SocketFlowTask(Runnable task) { 164 this.task = task; 165 } 166 @Override 167 public final void run(DeferredCompleter taskCompleter) { 168 try { 169 // non contentious synchronized for visibility. 170 synchronized(monitor) { 171 task.run(); 172 } 173 } finally { 174 taskCompleter.complete(); 175 } 176 } 177 } 178 179 // This is best effort - there's no guarantee that the printed set 180 // of values is consistent. It should only be considered as 181 // weakly accurate - in particular in what concerns the events states, 182 // especially when displaying a read event state from a write event 183 // callback and conversely. 184 void debugState(String when) { 185 if (debug.isLoggable(Level.DEBUG)) { 186 StringBuilder state = new StringBuilder(); 187 188 InternalReadPublisher.InternalReadSubscription sub = 189 readPublisher.subscriptionImpl; 190 InternalReadPublisher.ReadEvent readEvent = 191 sub == null ? null : sub.readEvent; 192 Demand rdemand = sub == null ? null : sub.demand; 193 InternalWriteSubscriber.WriteEvent writeEvent = 194 writeSubscriber.writeEvent; 195 AtomicLong wdemand = writeSubscriber.writeDemand; 196 int rops = readEvent == null ? 0 : readEvent.interestOps(); 197 long rd = rdemand == null ? 0 : rdemand.get(); 198 int wops = writeEvent == null ? 0 : writeEvent.interestOps(); 199 long wd = wdemand == null ? 0 : wdemand.get(); 200 201 state.append(when).append(" Reading: [ops=") 202 .append(rops).append(", demand=").append(rd) 203 .append(", stopped=") 204 .append((sub == null ? false : sub.readScheduler.isStopped())) 205 .append("], Writing: [ops=").append(wops) 206 .append(", demand=").append(wd) 207 .append("]"); 208 debug.log(Level.DEBUG, state.toString()); 209 } 210 } 211 212 /** 213 * A repeatable event that can be paused or resumed by changing 214 * its interestOps. 215 * When the event is fired, it is first paused before being signaled. 216 * It is the responsibility of the code triggered by {@code signalEvent} 217 * to resume the event if required. 218 */ 219 private static abstract class SocketFlowEvent extends AsyncEvent { 220 final SocketChannel channel; 221 final int defaultInterest; 222 volatile int interestOps; 223 volatile boolean registered; 224 SocketFlowEvent(int defaultInterest, SocketChannel channel) { 225 super(AsyncEvent.REPEATING); 226 this.defaultInterest = defaultInterest; 227 this.channel = channel; 228 } 229 final boolean registered() {return registered;} 230 final void resume() { 231 interestOps = defaultInterest; 232 registered = true; 233 } 234 final void pause() {interestOps = 0;} 235 @Override 236 public final SelectableChannel channel() {return channel;} 237 @Override 238 public final int interestOps() {return interestOps;} 239 240 @Override 241 public final void handle() { 242 pause(); // pause, then signal 243 signalEvent(); // won't be fired again until resumed. 244 } 245 @Override 246 public final void abort(IOException error) { 247 debug().log(Level.DEBUG, () -> "abort: " + error); 248 pause(); // pause, then signal 249 signalError(error); // should not be resumed after abort (not checked) 250 } 251 252 protected abstract void signalEvent(); 253 protected abstract void signalError(Throwable error); 254 abstract System.Logger debug(); 255 } 256 257 // ===================================================================== // 258 // Writing // 259 // ======================================================================// 260 261 // This class makes the assumption that the publisher will call 262 // onNext sequentially, and that onNext won't be called if the demand 263 // has not been incremented by request(1). 264 // It has a 'queue of 1' meaning that it will call request(1) in 265 // onSubscribe, and then only after its 'current' buffer list has been 266 // fully written and current set to null; 267 private final class InternalWriteSubscriber 268 implements Flow.Subscriber<List<ByteBuffer>> { 269 270 volatile Flow.Subscription subscription; 271 volatile List<ByteBuffer> current; 272 volatile boolean completed; 273 final WriteEvent writeEvent = new WriteEvent(channel, this); 274 final AtomicLong writeDemand = new AtomicLong(); 275 276 @Override 277 public void onSubscribe(Flow.Subscription subscription) { 278 Flow.Subscription previous = this.subscription; 279 this.subscription = subscription; 280 debug.log(Level.DEBUG, "subscribed for writing"); 281 if (current == null) { 282 if (previous == subscription || previous == null) { 283 if (writeDemand.compareAndSet(0, 1)) { 284 subscription.request(1); 285 } 286 } else { 287 writeDemand.set(1); 288 subscription.request(1); 289 } 290 } 291 } 292 293 @Override 294 public void onNext(List<ByteBuffer> bufs) { 295 assert current == null; // this is a queue of 1. 296 assert subscription != null; 297 current = bufs; 298 tryFlushCurrent(client.isSelectorThread()); // may be in selector thread 299 // For instance in HTTP/2, a received SETTINGS frame might trigger 300 // the sending of a SETTINGS frame in turn which might cause 301 // onNext to be called from within the same selector thread that the 302 // original SETTINGS frames arrived on. If rs is the read-subscriber 303 // and ws is the write-subscriber then the following can occur: 304 // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write 305 // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent 306 debugState("leaving w.onNext"); 307 } 308 309 // we don't use a SequentialScheduler here: we rely on 310 // onNext() being called sequentially, and not being called 311 // if we haven't call request(1) 312 // onNext is usually called from within a user/executor thread. 313 // we will perform the initial writing in that thread. 314 // if for some reason, not all data can be written, the writeEvent 315 // will be resumed, and the rest of the data will be written from 316 // the selector manager thread when the writeEvent is fired. 317 // If we are in the selector manager thread, then we will use the executor 318 // to call request(1), ensuring that onNext() won't be called from 319 // within the selector thread. 320 // If we are not in the selector manager thread, then we don't care. 321 void tryFlushCurrent(boolean inSelectorThread) { 322 List<ByteBuffer> bufs = current; 323 if (bufs == null) return; 324 try { 325 assert inSelectorThread == client.isSelectorThread() : 326 "should " + (inSelectorThread ? "" : "not ") 327 + " be in the selector thread"; 328 long remaining = Utils.remaining(bufs); 329 debug.log(Level.DEBUG, "trying to write: %d", remaining); 330 long written = writeAvailable(bufs); 331 debug.log(Level.DEBUG, "wrote: %d", remaining); 332 if (written == -1) { 333 signalError(new EOFException("EOF reached while writing")); 334 return; 335 } 336 assert written <= remaining; 337 if (remaining - written == 0) { 338 current = null; 339 writeDemand.decrementAndGet(); 340 Runnable requestMore = this::requestMore; 341 if (inSelectorThread) { 342 assert client.isSelectorThread(); 343 client.theExecutor().execute(requestMore); 344 } else { 345 assert !client.isSelectorThread(); 346 requestMore.run(); 347 } 348 } else { 349 resumeWriteEvent(inSelectorThread); 350 } 351 } catch (Throwable t) { 352 signalError(t); 353 subscription.cancel(); 354 } 355 } 356 357 void requestMore() { 358 try { 359 if (completed) return; 360 long d = writeDemand.get(); 361 if (writeDemand.compareAndSet(0,1)) { 362 debug.log(Level.DEBUG, "write: requesting more..."); 363 subscription.request(1); 364 } else { 365 debug.log(Level.DEBUG, "write: no need to request more: %d", d); 366 } 367 } catch (Throwable t) { 368 debug.log(Level.DEBUG, () -> 369 "write: error while requesting more: " + t); 370 signalError(t); 371 subscription.cancel(); 372 } finally { 373 debugState("leaving requestMore: "); 374 } 375 } 376 377 @Override 378 public void onError(Throwable throwable) { 379 signalError(throwable); 380 } 381 382 @Override 383 public void onComplete() { 384 completed = true; 385 // no need to pause the write event here: the write event will 386 // be paused if there is nothing more to write. 387 List<ByteBuffer> bufs = current; 388 long remaining = bufs == null ? 0 : Utils.remaining(bufs); 389 debug.log(Level.DEBUG, "write completed, %d yet to send", remaining); 390 debugState("InternalWriteSubscriber::onComplete"); 391 } 392 393 void resumeWriteEvent(boolean inSelectorThread) { 394 debug.log(Level.DEBUG, "scheduling write event"); 395 resumeEvent(writeEvent, this::signalError); 396 } 397 398 // void pauseWriteEvent() { 399 // debug.log(Level.DEBUG, "pausing write event"); 400 // pauseEvent(writeEvent, this::signalError); 401 // } 402 403 void signalWritable() { 404 debug.log(Level.DEBUG, "channel is writable"); 405 tryFlushCurrent(true); 406 } 407 408 void signalError(Throwable error) { 409 debug.log(Level.DEBUG, () -> "write error: " + error); 410 completed = true; 411 readPublisher.signalError(error); 412 } 413 414 // A repeatable WriteEvent which is paused after firing and can 415 // be resumed if required - see SocketFlowEvent; 416 final class WriteEvent extends SocketFlowEvent { 417 final InternalWriteSubscriber sub; 418 WriteEvent(SocketChannel channel, InternalWriteSubscriber sub) { 419 super(SelectionKey.OP_WRITE, channel); 420 this.sub = sub; 421 } 422 @Override 423 protected final void signalEvent() { 424 try { 425 client.eventUpdated(this); 426 sub.signalWritable(); 427 } catch(Throwable t) { 428 sub.signalError(t); 429 } 430 } 431 432 @Override 433 protected void signalError(Throwable error) { 434 sub.signalError(error); 435 } 436 437 @Override 438 System.Logger debug() { 439 return debug; 440 } 441 442 } 443 444 } 445 446 // ===================================================================== // 447 // Reading // 448 // ===================================================================== // 449 450 // The InternalReadPublisher uses a SequentialScheduler to ensure that 451 // onNext/onError/onComplete are called sequentially on the caller's 452 // subscriber. 453 // However, it relies on the fact that the only time where 454 // runOrSchedule() is called from a user/executor thread is in signalError, 455 // right after the errorRef has been set. 456 // Because the sequential scheduler's task always checks for errors first, 457 // and always terminate the scheduler on error, then it is safe to assume 458 // that if it reaches the point where it reads from the channel, then 459 // it is running in the SelectorManager thread. This is because all 460 // other invocation of runOrSchedule() are triggered from within a 461 // ReadEvent. 462 // 463 // When pausing/resuming the event, some shortcuts can then be taken 464 // when we know we're running in the selector manager thread 465 // (in that case there's no need to call client.eventUpdated(readEvent); 466 // 467 private final class InternalReadPublisher 468 implements Flow.Publisher<List<ByteBuffer>> { 469 private final InternalReadSubscription subscriptionImpl 470 = new InternalReadSubscription(); 471 AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>(); 472 private volatile ReadSubscription subscription; 473 474 @Override 475 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { 476 Objects.requireNonNull(s); 477 478 TubeSubscriber sub = FlowTube.asTubeSubscriber(s); 479 ReadSubscription target = new ReadSubscription(subscriptionImpl, sub); 480 ReadSubscription previous = pendingSubscription.getAndSet(target); 481 482 if (previous != null && previous != target) { 483 debug.log(Level.DEBUG, 484 () -> "read publisher: dropping pending subscriber: " 485 + previous.subscriber); 486 previous.errorRef.compareAndSet(null, errorRef.get()); 487 previous.signalOnSubscribe(); 488 if (subscriptionImpl.completed) { 489 previous.signalCompletion(); 490 } else { 491 previous.subscriber.dropSubscription(); 492 } 493 } 494 495 debug.log(Level.DEBUG, "read publisher got subscriber"); 496 subscriptionImpl.signalSubscribe(); 497 debugState("leaving read.subscribe: "); 498 } 499 500 void signalError(Throwable error) { 501 if (!errorRef.compareAndSet(null, error)) { 502 return; 503 } 504 subscriptionImpl.handleError(); 505 } 506 507 final class ReadSubscription implements Flow.Subscription { 508 final InternalReadSubscription impl; 509 final TubeSubscriber subscriber; 510 final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 511 volatile boolean subscribed; 512 volatile boolean cancelled; 513 volatile boolean completed; 514 515 public ReadSubscription(InternalReadSubscription impl, 516 TubeSubscriber subscriber) { 517 this.impl = impl; 518 this.subscriber = subscriber; 519 } 520 521 @Override 522 public void cancel() { 523 cancelled = true; 524 } 525 526 @Override 527 public void request(long n) { 528 if (!cancelled) { 529 impl.request(n); 530 } else { 531 debug.log(Level.DEBUG, 532 "subscription cancelled, ignoring request %d", n); 533 } 534 } 535 536 void signalCompletion() { 537 assert subscribed || cancelled; 538 if (completed || cancelled) return; 539 synchronized (this) { 540 if (completed) return; 541 completed = true; 542 } 543 Throwable error = errorRef.get(); 544 if (error != null) { 545 debug.log(Level.DEBUG, () -> 546 "forwarding error to subscriber: " 547 + error); 548 subscriber.onError(error); 549 } else { 550 debug.log(Level.DEBUG, "completing subscriber"); 551 subscriber.onComplete(); 552 } 553 } 554 555 void signalOnSubscribe() { 556 if (subscribed || cancelled) return; 557 synchronized (this) { 558 if (subscribed || cancelled) return; 559 subscribed = true; 560 } 561 subscriber.onSubscribe(this); 562 debug.log(Level.DEBUG, "onSubscribe called"); 563 if (errorRef.get() != null) { 564 signalCompletion(); 565 } 566 } 567 } 568 569 final class InternalReadSubscription implements Flow.Subscription { 570 571 private final Demand demand = new Demand(); 572 final SequentialScheduler readScheduler; 573 private volatile boolean completed; 574 private final ReadEvent readEvent; 575 private final AsyncEvent subscribeEvent; 576 577 InternalReadSubscription() { 578 readScheduler = new SequentialScheduler(new SocketFlowTask(this::read)); 579 subscribeEvent = new AsyncTriggerEvent(this::signalError, 580 this::handleSubscribeEvent); 581 readEvent = new ReadEvent(channel, this); 582 } 583 584 /* 585 * This method must be invoked before any other method of this class. 586 */ 587 final void signalSubscribe() { 588 if (readScheduler.isStopped() || completed) { 589 // if already completed or stopped we can handle any 590 // pending connection directly from here. 591 debug.log(Level.DEBUG, 592 "handling pending subscription while completed"); 593 handlePending(); 594 } else { 595 try { 596 debug.log(Level.DEBUG, 597 "registering subscribe event"); 598 client.registerEvent(subscribeEvent); 599 } catch (Throwable t) { 600 signalError(t); 601 handlePending(); 602 } 603 } 604 } 605 606 final void handleSubscribeEvent() { 607 assert client.isSelectorThread(); 608 debug.log(Level.DEBUG, "subscribe event raised"); 609 readScheduler.runOrSchedule(); 610 if (readScheduler.isStopped() || completed) { 611 // if already completed or stopped we can handle any 612 // pending connection directly from here. 613 debug.log(Level.DEBUG, 614 "handling pending subscription when completed"); 615 handlePending(); 616 } 617 } 618 619 620 /* 621 * Although this method is thread-safe, the Reactive-Streams spec seems 622 * to not require it to be as such. It's a responsibility of the 623 * subscriber to signal demand in a thread-safe manner. 624 * 625 * https://github.com/reactive-streams/reactive-streams-jvm/blob/dd24d2ab164d7de6c316f6d15546f957bec29eaa/README.md 626 * (rules 2.7 and 3.4) 627 */ 628 @Override 629 public final void request(long n) { 630 if (n > 0L) { 631 boolean wasFulfilled = demand.increase(n); 632 if (wasFulfilled) { 633 debug.log(Level.DEBUG, "got some demand for reading"); 634 resumeReadEvent(); 635 // if demand has been changed from fulfilled 636 // to unfulfilled register read event; 637 } 638 } else { 639 signalError(new IllegalArgumentException("non-positive request")); 640 } 641 debugState("leaving request("+n+"): "); 642 } 643 644 @Override 645 public final void cancel() { 646 pauseReadEvent(); 647 readScheduler.stop(); 648 } 649 650 private void resumeReadEvent() { 651 debug.log(Level.DEBUG, "resuming read event"); 652 resumeEvent(readEvent, this::signalError); 653 } 654 655 private void pauseReadEvent() { 656 debug.log(Level.DEBUG, "pausing read event"); 657 pauseEvent(readEvent, this::signalError); 658 } 659 660 661 final void handleError() { 662 assert errorRef.get() != null; 663 readScheduler.runOrSchedule(); 664 } 665 666 final void signalError(Throwable error) { 667 if (!errorRef.compareAndSet(null, error)) { 668 return; 669 } 670 debug.log(Level.DEBUG, () -> "got read error: " + error); 671 readScheduler.runOrSchedule(); 672 } 673 674 final void signalReadable() { 675 readScheduler.runOrSchedule(); 676 } 677 678 /** The body of the task that runs in SequentialScheduler. */ 679 final void read() { 680 // It is important to only call pauseReadEvent() when stopping 681 // the scheduler. The event is automatically paused before 682 // firing, and trying to pause it again could cause a race 683 // condition between this loop, which calls tryDecrementDemand(), 684 // and the thread that calls request(n), which will try to resume 685 // reading. 686 try { 687 while(!readScheduler.isStopped()) { 688 if (completed) return; 689 690 // make sure we have a subscriber 691 if (handlePending()) { 692 debug.log(Level.DEBUG, "pending subscriber subscribed"); 693 return; 694 } 695 696 // If an error was signaled, we might not be in the 697 // the selector thread, and that is OK, because we 698 // will just call onError and return. 699 ReadSubscription current = subscription; 700 TubeSubscriber subscriber = current.subscriber; 701 Throwable error = errorRef.get(); 702 if (error != null) { 703 completed = true; 704 // safe to pause here because we're finished anyway. 705 pauseReadEvent(); 706 debug.log(Level.DEBUG, () -> "Sending error " + error 707 + " to subscriber " + subscriber); 708 current.errorRef.compareAndSet(null, error); 709 current.signalCompletion(); 710 readScheduler.stop(); 711 debugState("leaving read() loop with error: "); 712 return; 713 } 714 715 // If we reach here then we must be in the selector thread. 716 assert client.isSelectorThread(); 717 if (demand.tryDecrement()) { 718 // we have demand. 719 try { 720 List<ByteBuffer> bytes = readAvailable(); 721 if (bytes == EOF) { 722 if (!completed) { 723 debug.log(Level.DEBUG, "got read EOF"); 724 completed = true; 725 // safe to pause here because we're finished 726 // anyway. 727 pauseReadEvent(); 728 current.signalCompletion(); 729 readScheduler.stop(); 730 } 731 debugState("leaving read() loop after EOF: "); 732 return; 733 } else if (Utils.remaining(bytes) > 0) { 734 // the subscriber is responsible for offloading 735 // to another thread if needed. 736 debug.log(Level.DEBUG, () -> "read bytes: " 737 + Utils.remaining(bytes)); 738 assert !current.completed; 739 subscriber.onNext(bytes); 740 // we could continue looping until the demand 741 // reaches 0. However, that would risk starving 742 // other connections (bound to other socket 743 // channels) - as other selected keys activated 744 // by the selector manager thread might be 745 // waiting for this event to terminate. 746 // So resume the read event and return now... 747 resumeReadEvent(); 748 debugState("leaving read() loop after onNext: "); 749 return; 750 } else { 751 // nothing available! 752 debug.log(Level.DEBUG, "no more bytes available"); 753 // re-increment the demand and resume the read 754 // event. This ensures that this loop is 755 // executed again when the socket becomes 756 // readable again. 757 demand.increase(1); 758 resumeReadEvent(); 759 debugState("leaving read() loop with no bytes"); 760 return; 761 } 762 } catch (Throwable x) { 763 signalError(x); 764 continue; 765 } 766 } else { 767 debug.log(Level.DEBUG, "no more demand for reading"); 768 // the event is paused just after firing, so it should 769 // still be paused here, unless the demand was just 770 // incremented from 0 to n, in which case, the 771 // event will be resumed, causing this loop to be 772 // invoked again when the socket becomes readable: 773 // This is what we want. 774 // Trying to pause the event here would actually 775 // introduce a race condition between this loop and 776 // request(n). 777 debugState("leaving read() loop with no demand"); 778 break; 779 } 780 } 781 } catch (Throwable t) { 782 debug.log(Level.DEBUG, "Unexpected exception in read loop", t); 783 signalError(t); 784 } finally { 785 handlePending(); 786 } 787 } 788 789 boolean handlePending() { 790 ReadSubscription pending = pendingSubscription.getAndSet(null); 791 if (pending == null) return false; 792 debug.log(Level.DEBUG, "handling pending subscription for %s", 793 pending.subscriber); 794 ReadSubscription current = subscription; 795 if (current != null && current != pending && !completed) { 796 current.subscriber.dropSubscription(); 797 } 798 debug.log(Level.DEBUG, "read demand reset to 0"); 799 subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to. 800 pending.errorRef.compareAndSet(null, errorRef.get()); 801 if (!readScheduler.isStopped()) { 802 subscription = pending; 803 } else { 804 debug.log(Level.DEBUG, "socket tube is already stopped"); 805 } 806 debug.log(Level.DEBUG, "calling onSubscribe"); 807 pending.signalOnSubscribe(); 808 if (completed) { 809 pending.errorRef.compareAndSet(null, errorRef.get()); 810 pending.signalCompletion(); 811 } 812 return true; 813 } 814 } 815 816 817 // A repeatable ReadEvent which is paused after firing and can 818 // be resumed if required - see SocketFlowEvent; 819 final class ReadEvent extends SocketFlowEvent { 820 final InternalReadSubscription sub; 821 ReadEvent(SocketChannel channel, InternalReadSubscription sub) { 822 super(SelectionKey.OP_READ, channel); 823 this.sub = sub; 824 } 825 @Override 826 protected final void signalEvent() { 827 try { 828 client.eventUpdated(this); 829 sub.signalReadable(); 830 } catch(Throwable t) { 831 sub.signalError(t); 832 } 833 } 834 835 @Override 836 protected final void signalError(Throwable error) { 837 sub.signalError(error); 838 } 839 840 @Override 841 System.Logger debug() { 842 return debug; 843 } 844 } 845 846 } 847 848 // ===================================================================== // 849 // Socket Channel Read/Write // 850 // ===================================================================== // 851 static final int MAX_BUFFERS = 3; 852 static final List<ByteBuffer> EOF = List.of(); 853 854 private List<ByteBuffer> readAvailable() throws IOException { 855 ByteBuffer buf = buffersSource.get(); 856 assert buf.hasRemaining(); 857 858 int read; 859 int pos = buf.position(); 860 List<ByteBuffer> list = null; 861 while (buf.hasRemaining()) { 862 while ((read = channel.read(buf)) > 0) { 863 if (!buf.hasRemaining()) break; 864 } 865 866 // nothing read; 867 if (buf.position() == pos) { 868 // An empty list signal the end of data, and should only be 869 // returned if read == -1. 870 // If we already read some data, then we must return what we have 871 // read, and -1 will be returned next time the caller attempts to 872 // read something. 873 if (list == null && read == -1) { // eof 874 list = EOF; 875 break; 876 } 877 } 878 buf.limit(buf.position()); 879 buf.position(pos); 880 if (list == null) { 881 list = List.of(buf); 882 } else { 883 if (!(list instanceof ArrayList)) { 884 list = new ArrayList<>(list); 885 } 886 list.add(buf); 887 } 888 if (read <= 0 || list.size() == MAX_BUFFERS) break; 889 buf = buffersSource.get(); 890 pos = buf.position(); 891 assert buf.hasRemaining(); 892 } 893 return list; 894 } 895 896 private long writeAvailable(List<ByteBuffer> bytes) throws IOException { 897 ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY); 898 final long remaining = Utils.remaining(srcs); 899 long written = 0; 900 while (remaining > written) { 901 long w = channel.write(srcs); 902 if (w == -1 && written == 0) return -1; 903 if (w == 0) break; 904 written += w; 905 } 906 return written; 907 } 908 909 private void resumeEvent(SocketFlowEvent event, 910 Consumer<Throwable> errorSignaler) { 911 boolean registrationRequired; 912 synchronized(lock) { 913 registrationRequired = !event.registered(); 914 event.resume(); 915 } 916 try { 917 if (registrationRequired) { 918 client.registerEvent(event); 919 } else { 920 client.eventUpdated(event); 921 } 922 } catch(Throwable t) { 923 errorSignaler.accept(t); 924 } 925 } 926 927 private void pauseEvent(SocketFlowEvent event, 928 Consumer<Throwable> errorSignaler) { 929 synchronized(lock) { 930 event.pause(); 931 } 932 try { 933 client.eventUpdated(event); 934 } catch(Throwable t) { 935 errorSignaler.accept(t); 936 } 937 } 938 939 @Override 940 public void connectFlows(TubePublisher writePublisher, 941 TubeSubscriber readSubscriber) { 942 debug.log(Level.DEBUG, "connecting flows"); 943 this.subscribe(readSubscriber); 944 writePublisher.subscribe(this); 945 } 946 947 948 @Override 949 public String toString() { 950 return dbgString(); 951 } 952 953 final String dbgString() { 954 return "SocketTube("+id+")"; 955 } 956 }