1 /* 2 * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.IOException; 29 import java.nio.ByteBuffer; 30 import java.util.List; 31 import java.util.Objects; 32 import java.util.concurrent.Flow; 33 import java.util.concurrent.atomic.AtomicLong; 34 import java.util.concurrent.atomic.AtomicReference; 35 import java.nio.channels.SelectableChannel; 36 import java.nio.channels.SelectionKey; 37 import java.nio.channels.SocketChannel; 38 import java.util.ArrayList; 39 import java.util.function.Consumer; 40 import java.util.function.Supplier; 41 import jdk.internal.net.http.common.BufferSupplier; 42 import jdk.internal.net.http.common.Demand; 43 import jdk.internal.net.http.common.FlowTube; 44 import jdk.internal.net.http.common.Log; 45 import jdk.internal.net.http.common.Logger; 46 import jdk.internal.net.http.common.SequentialScheduler; 47 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; 48 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask; 49 import jdk.internal.net.http.common.Utils; 50 51 /** 52 * A SocketTube is a terminal tube plugged directly into the socket. 53 * The read subscriber should call {@code subscribe} on the SocketTube before 54 * the SocketTube is subscribed to the write publisher. 55 */ 56 final class SocketTube implements FlowTube { 57 58 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 59 static final AtomicLong IDS = new AtomicLong(); 60 61 private final HttpClientImpl client; 62 private final SocketChannel channel; 63 private final SliceBufferSource sliceBuffersSource; 64 private final Object lock = new Object(); 65 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 66 private final InternalReadPublisher readPublisher; 67 private final InternalWriteSubscriber writeSubscriber; 68 private final long id = IDS.incrementAndGet(); 69 70 public SocketTube(HttpClientImpl client, SocketChannel channel, 71 Supplier<ByteBuffer> buffersFactory) { 72 this.client = client; 73 this.channel = channel; 74 this.sliceBuffersSource = new SliceBufferSource(buffersFactory); 75 76 this.readPublisher = new InternalReadPublisher(); 77 this.writeSubscriber = new InternalWriteSubscriber(); 78 } 79 80 /** 81 * Returns {@code true} if this flow is finished. 82 * This happens when this flow internal read subscription is completed, 83 * either normally (EOF reading) or exceptionally (EOF writing, or 84 * underlying socket closed, or some exception occurred while reading or 85 * writing to the socket). 86 * 87 * @return {@code true} if this flow is finished. 88 */ 89 public boolean isFinished() { 90 InternalReadPublisher.InternalReadSubscription subscription = 91 readPublisher.subscriptionImpl; 92 return subscription != null && subscription.completed 93 || subscription == null && errorRef.get() != null; 94 } 95 96 // ===================================================================== // 97 // Flow.Publisher // 98 // ======================================================================// 99 100 /** 101 * {@inheritDoc } 102 * @apiNote This method should be called first. In particular, the caller 103 * must ensure that this method must be called by the read 104 * subscriber before the write publisher can call {@code onSubscribe}. 105 * Failure to adhere to this contract may result in assertion errors. 106 */ 107 @Override 108 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { 109 Objects.requireNonNull(s); 110 assert s instanceof TubeSubscriber : "Expected TubeSubscriber, got:" + s; 111 readPublisher.subscribe(s); 112 } 113 114 115 // ===================================================================== // 116 // Flow.Subscriber // 117 // ======================================================================// 118 119 /** 120 * {@inheritDoc } 121 * @apiNote The caller must ensure that {@code subscribe} is called by 122 * the read subscriber before {@code onSubscribe} is called by 123 * the write publisher. 124 * Failure to adhere to this contract may result in assertion errors. 125 */ 126 @Override 127 public void onSubscribe(Flow.Subscription subscription) { 128 writeSubscriber.onSubscribe(subscription); 129 } 130 131 @Override 132 public void onNext(List<ByteBuffer> item) { 133 writeSubscriber.onNext(item); 134 } 135 136 @Override 137 public void onError(Throwable throwable) { 138 writeSubscriber.onError(throwable); 139 } 140 141 @Override 142 public void onComplete() { 143 writeSubscriber.onComplete(); 144 } 145 146 // ===================================================================== // 147 // Events // 148 // ======================================================================// 149 150 void signalClosed() { 151 // Ensures that the subscriber will be terminated and that future 152 // subscribers will be notified when the connection is closed. 153 if (Log.channel()) { 154 Log.logChannel("Connection close signalled: connection closed locally ({0})", 155 channelDescr()); 156 } 157 readPublisher.subscriptionImpl.signalError( 158 new IOException("connection closed locally")); 159 } 160 161 /** 162 * A restartable task used to process tasks in sequence. 163 */ 164 private static class SocketFlowTask implements RestartableTask { 165 final Runnable task; 166 private final Object monitor = new Object(); 167 SocketFlowTask(Runnable task) { 168 this.task = task; 169 } 170 @Override 171 public final void run(DeferredCompleter taskCompleter) { 172 try { 173 // non contentious synchronized for visibility. 174 synchronized(monitor) { 175 task.run(); 176 } 177 } finally { 178 taskCompleter.complete(); 179 } 180 } 181 } 182 183 // This is best effort - there's no guarantee that the printed set of values 184 // is consistent. It should only be considered as weakly accurate - in 185 // particular in what concerns the events states, especially when displaying 186 // a read event state from a write event callback and conversely. 187 void debugState(String when) { 188 if (debug.on()) { 189 StringBuilder state = new StringBuilder(); 190 191 InternalReadPublisher.InternalReadSubscription sub = 192 readPublisher.subscriptionImpl; 193 InternalReadPublisher.ReadEvent readEvent = 194 sub == null ? null : sub.readEvent; 195 Demand rdemand = sub == null ? null : sub.demand; 196 InternalWriteSubscriber.WriteEvent writeEvent = 197 writeSubscriber.writeEvent; 198 Demand wdemand = writeSubscriber.writeDemand; 199 int rops = readEvent == null ? 0 : readEvent.interestOps(); 200 long rd = rdemand == null ? 0 : rdemand.get(); 201 int wops = writeEvent == null ? 0 : writeEvent.interestOps(); 202 long wd = wdemand == null ? 0 : wdemand.get(); 203 204 state.append(when).append(" Reading: [ops=") 205 .append(rops).append(", demand=").append(rd) 206 .append(", stopped=") 207 .append((sub == null ? false : sub.readScheduler.isStopped())) 208 .append("], Writing: [ops=").append(wops) 209 .append(", demand=").append(wd) 210 .append("]"); 211 debug.log(state.toString()); 212 } 213 } 214 215 /** 216 * A repeatable event that can be paused or resumed by changing its 217 * interestOps. When the event is fired, it is first paused before being 218 * signaled. It is the responsibility of the code triggered by 219 * {@code signalEvent} to resume the event if required. 220 */ 221 private static abstract class SocketFlowEvent extends AsyncEvent { 222 final SocketChannel channel; 223 final int defaultInterest; 224 volatile int interestOps; 225 volatile boolean registered; 226 SocketFlowEvent(int defaultInterest, SocketChannel channel) { 227 super(AsyncEvent.REPEATING); 228 this.defaultInterest = defaultInterest; 229 this.channel = channel; 230 } 231 final boolean registered() {return registered;} 232 final void resume() { 233 interestOps = defaultInterest; 234 registered = true; 235 } 236 final void pause() {interestOps = 0;} 237 @Override 238 public final SelectableChannel channel() {return channel;} 239 @Override 240 public final int interestOps() {return interestOps;} 241 242 @Override 243 public final void handle() { 244 pause(); // pause, then signal 245 signalEvent(); // won't be fired again until resumed. 246 } 247 @Override 248 public final void abort(IOException error) { 249 debug().log(() -> "abort: " + error); 250 pause(); // pause, then signal 251 signalError(error); // should not be resumed after abort (not checked) 252 } 253 254 protected abstract void signalEvent(); 255 protected abstract void signalError(Throwable error); 256 abstract Logger debug(); 257 } 258 259 // ===================================================================== // 260 // Writing // 261 // ======================================================================// 262 263 // This class makes the assumption that the publisher will call onNext 264 // sequentially, and that onNext won't be called if the demand has not been 265 // incremented by request(1). 266 // It has a 'queue of 1' meaning that it will call request(1) in 267 // onSubscribe, and then only after its 'current' buffer list has been 268 // fully written and current set to null; 269 private final class InternalWriteSubscriber 270 implements Flow.Subscriber<List<ByteBuffer>> { 271 272 volatile WriteSubscription subscription; 273 volatile List<ByteBuffer> current; 274 volatile boolean completed; 275 final AsyncTriggerEvent startSubscription = 276 new AsyncTriggerEvent(this::signalError, this::startSubscription); 277 final WriteEvent writeEvent = new WriteEvent(channel, this); 278 final Demand writeDemand = new Demand(); 279 280 @Override 281 public void onSubscribe(Flow.Subscription subscription) { 282 WriteSubscription previous = this.subscription; 283 if (debug.on()) debug.log("subscribed for writing"); 284 try { 285 boolean needEvent = current == null; 286 if (needEvent) { 287 if (previous != null && previous.upstreamSubscription != subscription) { 288 previous.dropSubscription(); 289 } 290 } 291 this.subscription = new WriteSubscription(subscription); 292 if (needEvent) { 293 if (debug.on()) 294 debug.log("write: registering startSubscription event"); 295 client.registerEvent(startSubscription); 296 } 297 } catch (Throwable t) { 298 signalError(t); 299 } 300 } 301 302 @Override 303 public void onNext(List<ByteBuffer> bufs) { 304 assert current == null : dbgString() // this is a queue of 1. 305 + "w.onNext current: " + current; 306 assert subscription != null : dbgString() 307 + "w.onNext: subscription is null"; 308 current = bufs; 309 tryFlushCurrent(client.isSelectorThread()); // may be in selector thread 310 // For instance in HTTP/2, a received SETTINGS frame might trigger 311 // the sending of a SETTINGS frame in turn which might cause 312 // onNext to be called from within the same selector thread that the 313 // original SETTINGS frames arrived on. If rs is the read-subscriber 314 // and ws is the write-subscriber then the following can occur: 315 // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write 316 // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent 317 debugState("leaving w.onNext"); 318 } 319 320 // Don't use a SequentialScheduler here: rely on onNext() being invoked 321 // sequentially, and not being invoked if there is no demand, request(1). 322 // onNext is usually called from within a user / executor thread. 323 // Initial writing will be performed in that thread. If for some reason, 324 // not all the data can be written, a writeEvent will be registered, and 325 // writing will resume in the selector manager thread when the 326 // writeEvent is fired. 327 // 328 // If this method is invoked in the selector manager thread (because of 329 // a writeEvent), then the executor will be used to invoke request(1), 330 // ensuring that onNext() won't be invoked from within the selector 331 // thread. If not in the selector manager thread, then request(1) is 332 // invoked directly. 333 void tryFlushCurrent(boolean inSelectorThread) { 334 List<ByteBuffer> bufs = current; 335 if (bufs == null) return; 336 try { 337 assert inSelectorThread == client.isSelectorThread() : 338 "should " + (inSelectorThread ? "" : "not ") 339 + " be in the selector thread"; 340 long remaining = Utils.remaining(bufs); 341 if (debug.on()) debug.log("trying to write: %d", remaining); 342 long written = writeAvailable(bufs); 343 if (debug.on()) debug.log("wrote: %d", written); 344 assert written >= 0 : "negative number of bytes written:" + written; 345 assert written <= remaining; 346 if (remaining - written == 0) { 347 current = null; 348 if (writeDemand.tryDecrement()) { 349 Runnable requestMore = this::requestMore; 350 if (inSelectorThread) { 351 assert client.isSelectorThread(); 352 client.theExecutor().execute(requestMore); 353 } else { 354 assert !client.isSelectorThread(); 355 requestMore.run(); 356 } 357 } 358 } else { 359 resumeWriteEvent(inSelectorThread); 360 } 361 } catch (Throwable t) { 362 signalError(t); 363 } 364 } 365 366 // Kick off the initial request:1 that will start the writing side. 367 // Invoked in the selector manager thread. 368 void startSubscription() { 369 try { 370 if (debug.on()) debug.log("write: starting subscription"); 371 if (Log.channel()) { 372 Log.logChannel("Start requesting bytes for writing to channel: {0}", 373 channelDescr()); 374 } 375 assert client.isSelectorThread(); 376 // make sure read registrations are handled before; 377 readPublisher.subscriptionImpl.handlePending(); 378 if (debug.on()) debug.log("write: offloading requestMore"); 379 // start writing; 380 client.theExecutor().execute(this::requestMore); 381 } catch(Throwable t) { 382 signalError(t); 383 } 384 } 385 386 void requestMore() { 387 WriteSubscription subscription = this.subscription; 388 subscription.requestMore(); 389 } 390 391 @Override 392 public void onError(Throwable throwable) { 393 signalError(throwable); 394 } 395 396 @Override 397 public void onComplete() { 398 completed = true; 399 // no need to pause the write event here: the write event will 400 // be paused if there is nothing more to write. 401 List<ByteBuffer> bufs = current; 402 long remaining = bufs == null ? 0 : Utils.remaining(bufs); 403 if (debug.on()) 404 debug.log( "write completed, %d yet to send", remaining); 405 debugState("InternalWriteSubscriber::onComplete"); 406 } 407 408 void resumeWriteEvent(boolean inSelectorThread) { 409 if (debug.on()) debug.log("scheduling write event"); 410 resumeEvent(writeEvent, this::signalError); 411 } 412 413 void signalWritable() { 414 if (debug.on()) debug.log("channel is writable"); 415 tryFlushCurrent(true); 416 } 417 418 void signalError(Throwable error) { 419 debug.log(() -> "write error: " + error); 420 if (Log.channel()) { 421 Log.logChannel("Failed to write to channel ({0}: {1})", 422 channelDescr(), error); 423 } 424 completed = true; 425 readPublisher.signalError(error); 426 Flow.Subscription subscription = this.subscription; 427 if (subscription != null) subscription.cancel(); 428 } 429 430 // A repeatable WriteEvent which is paused after firing and can 431 // be resumed if required - see SocketFlowEvent; 432 final class WriteEvent extends SocketFlowEvent { 433 final InternalWriteSubscriber sub; 434 WriteEvent(SocketChannel channel, InternalWriteSubscriber sub) { 435 super(SelectionKey.OP_WRITE, channel); 436 this.sub = sub; 437 } 438 @Override 439 protected final void signalEvent() { 440 try { 441 client.eventUpdated(this); 442 sub.signalWritable(); 443 } catch(Throwable t) { 444 sub.signalError(t); 445 } 446 } 447 448 @Override 449 protected void signalError(Throwable error) { 450 sub.signalError(error); 451 } 452 453 @Override 454 Logger debug() { return debug; } 455 } 456 457 final class WriteSubscription implements Flow.Subscription { 458 final Flow.Subscription upstreamSubscription; 459 volatile boolean cancelled; 460 WriteSubscription(Flow.Subscription subscription) { 461 this.upstreamSubscription = subscription; 462 } 463 464 @Override 465 public void request(long n) { 466 if (cancelled) return; 467 upstreamSubscription.request(n); 468 } 469 470 @Override 471 public void cancel() { 472 if (cancelled) return; 473 if (debug.on()) debug.log("write: cancel"); 474 if (Log.channel()) { 475 Log.logChannel("Cancelling write subscription"); 476 } 477 dropSubscription(); 478 upstreamSubscription.cancel(); 479 } 480 481 void dropSubscription() { 482 synchronized (InternalWriteSubscriber.this) { 483 cancelled = true; 484 if (debug.on()) debug.log("write: resetting demand to 0"); 485 writeDemand.reset(); 486 } 487 } 488 489 void requestMore() { 490 try { 491 if (completed || cancelled) return; 492 boolean requestMore; 493 long d; 494 // don't fiddle with demand after cancel. 495 // see dropSubscription. 496 synchronized (InternalWriteSubscriber.this) { 497 if (cancelled) return; 498 d = writeDemand.get(); 499 requestMore = writeDemand.increaseIfFulfilled(); 500 } 501 if (requestMore) { 502 if (debug.on()) debug.log("write: requesting more..."); 503 upstreamSubscription.request(1); 504 } else { 505 if (debug.on()) 506 debug.log("write: no need to request more: %d", d); 507 } 508 } catch (Throwable t) { 509 if (debug.on()) 510 debug.log("write: error while requesting more: " + t); 511 signalError(t); 512 } finally { 513 debugState("leaving requestMore: "); 514 } 515 } 516 } 517 } 518 519 // ===================================================================== // 520 // Reading // 521 // ===================================================================== // 522 523 // The InternalReadPublisher uses a SequentialScheduler to ensure that 524 // onNext/onError/onComplete are called sequentially on the caller's 525 // subscriber. 526 // However, it relies on the fact that the only time where 527 // runOrSchedule() is called from a user/executor thread is in signalError, 528 // right after the errorRef has been set. 529 // Because the sequential scheduler's task always checks for errors first, 530 // and always terminate the scheduler on error, then it is safe to assume 531 // that if it reaches the point where it reads from the channel, then 532 // it is running in the SelectorManager thread. This is because all 533 // other invocation of runOrSchedule() are triggered from within a 534 // ReadEvent. 535 // 536 // When pausing/resuming the event, some shortcuts can then be taken 537 // when we know we're running in the selector manager thread 538 // (in that case there's no need to call client.eventUpdated(readEvent); 539 // 540 private final class InternalReadPublisher 541 implements Flow.Publisher<List<ByteBuffer>> { 542 private final InternalReadSubscription subscriptionImpl 543 = new InternalReadSubscription(); 544 AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>(); 545 private volatile ReadSubscription subscription; 546 547 @Override 548 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { 549 Objects.requireNonNull(s); 550 551 TubeSubscriber sub = FlowTube.asTubeSubscriber(s); 552 ReadSubscription target = new ReadSubscription(subscriptionImpl, sub); 553 ReadSubscription previous = pendingSubscription.getAndSet(target); 554 555 if (previous != null && previous != target) { 556 if (debug.on()) 557 debug.log("read publisher: dropping pending subscriber: " 558 + previous.subscriber); 559 previous.errorRef.compareAndSet(null, errorRef.get()); 560 previous.signalOnSubscribe(); 561 if (subscriptionImpl.completed) { 562 previous.signalCompletion(); 563 } else { 564 previous.subscriber.dropSubscription(); 565 } 566 } 567 568 if (debug.on()) debug.log("read publisher got subscriber"); 569 subscriptionImpl.signalSubscribe(); 570 debugState("leaving read.subscribe: "); 571 } 572 573 void signalError(Throwable error) { 574 if (debug.on()) debug.log("error signalled " + error); 575 if (!errorRef.compareAndSet(null, error)) { 576 return; 577 } 578 if (Log.channel()) { 579 Log.logChannel("Error signalled on channel {0}: {1}", 580 channelDescr(), error); 581 } 582 subscriptionImpl.handleError(); 583 } 584 585 final class ReadSubscription implements Flow.Subscription { 586 final InternalReadSubscription impl; 587 final TubeSubscriber subscriber; 588 final AtomicReference<Throwable> errorRef = new AtomicReference<>(); 589 final BufferSource bufferSource; 590 volatile boolean subscribed; 591 volatile boolean cancelled; 592 volatile boolean completed; 593 594 public ReadSubscription(InternalReadSubscription impl, 595 TubeSubscriber subscriber) { 596 this.impl = impl; 597 this.bufferSource = subscriber.supportsRecycling() 598 ? new SSLDirectBufferSource(client) 599 : SocketTube.this.sliceBuffersSource; 600 this.subscriber = subscriber; 601 } 602 603 @Override 604 public void cancel() { 605 cancelled = true; 606 } 607 608 @Override 609 public void request(long n) { 610 if (!cancelled) { 611 impl.request(n); 612 } else { 613 if (debug.on()) 614 debug.log("subscription cancelled, ignoring request %d", n); 615 } 616 } 617 618 void signalCompletion() { 619 assert subscribed || cancelled; 620 if (completed || cancelled) return; 621 synchronized (this) { 622 if (completed) return; 623 completed = true; 624 } 625 Throwable error = errorRef.get(); 626 if (error != null) { 627 if (debug.on()) 628 debug.log("forwarding error to subscriber: " + error); 629 subscriber.onError(error); 630 } else { 631 if (debug.on()) debug.log("completing subscriber"); 632 subscriber.onComplete(); 633 } 634 } 635 636 void signalOnSubscribe() { 637 if (subscribed || cancelled) return; 638 synchronized (this) { 639 if (subscribed || cancelled) return; 640 subscribed = true; 641 } 642 subscriber.onSubscribe(this); 643 if (debug.on()) debug.log("onSubscribe called"); 644 if (errorRef.get() != null) { 645 signalCompletion(); 646 } 647 } 648 } 649 650 final class InternalReadSubscription implements Flow.Subscription { 651 652 private final Demand demand = new Demand(); 653 final SequentialScheduler readScheduler; 654 private volatile boolean completed; 655 private final ReadEvent readEvent; 656 private final AsyncEvent subscribeEvent; 657 658 InternalReadSubscription() { 659 readScheduler = new SequentialScheduler(new SocketFlowTask(this::read)); 660 subscribeEvent = new AsyncTriggerEvent(this::signalError, 661 this::handleSubscribeEvent); 662 readEvent = new ReadEvent(channel, this); 663 } 664 665 /* 666 * This method must be invoked before any other method of this class. 667 */ 668 final void signalSubscribe() { 669 if (readScheduler.isStopped() || completed) { 670 // if already completed or stopped we can handle any 671 // pending connection directly from here. 672 if (debug.on()) 673 debug.log("handling pending subscription while completed"); 674 handlePending(); 675 } else { 676 try { 677 if (debug.on()) debug.log("registering subscribe event"); 678 client.registerEvent(subscribeEvent); 679 } catch (Throwable t) { 680 signalError(t); 681 handlePending(); 682 } 683 } 684 } 685 686 final void handleSubscribeEvent() { 687 assert client.isSelectorThread(); 688 debug.log("subscribe event raised"); 689 if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr()); 690 readScheduler.runOrSchedule(); 691 if (readScheduler.isStopped() || completed) { 692 // if already completed or stopped we can handle any 693 // pending connection directly from here. 694 if (debug.on()) 695 debug.log("handling pending subscription when completed"); 696 handlePending(); 697 } 698 } 699 700 701 /* 702 * Although this method is thread-safe, the Reactive-Streams spec seems 703 * to not require it to be as such. It's a responsibility of the 704 * subscriber to signal demand in a thread-safe manner. 705 * 706 * See Reactive Streams specification, rules 2.7 and 3.4. 707 */ 708 @Override 709 public final void request(long n) { 710 if (n > 0L) { 711 boolean wasFulfilled = demand.increase(n); 712 if (wasFulfilled) { 713 if (debug.on()) debug.log("got some demand for reading"); 714 resumeReadEvent(); 715 // if demand has been changed from fulfilled 716 // to unfulfilled register read event; 717 } 718 } else { 719 signalError(new IllegalArgumentException("non-positive request")); 720 } 721 debugState("leaving request("+n+"): "); 722 } 723 724 @Override 725 public final void cancel() { 726 pauseReadEvent(); 727 if (Log.channel()) { 728 Log.logChannel("Read subscription cancelled for channel {0}", 729 channelDescr()); 730 } 731 readScheduler.stop(); 732 } 733 734 private void resumeReadEvent() { 735 if (debug.on()) debug.log("resuming read event"); 736 resumeEvent(readEvent, this::signalError); 737 } 738 739 private void pauseReadEvent() { 740 if (debug.on()) debug.log("pausing read event"); 741 pauseEvent(readEvent, this::signalError); 742 } 743 744 745 final void handleError() { 746 assert errorRef.get() != null; 747 readScheduler.runOrSchedule(); 748 } 749 750 final void signalError(Throwable error) { 751 if (!errorRef.compareAndSet(null, error)) { 752 return; 753 } 754 if (debug.on()) debug.log("got read error: " + error); 755 if (Log.channel()) { 756 Log.logChannel("Read error signalled on channel {0}: {1}", 757 channelDescr(), error); 758 } 759 readScheduler.runOrSchedule(); 760 } 761 762 final void signalReadable() { 763 readScheduler.runOrSchedule(); 764 } 765 766 /** The body of the task that runs in SequentialScheduler. */ 767 final void read() { 768 // It is important to only call pauseReadEvent() when stopping 769 // the scheduler. The event is automatically paused before 770 // firing, and trying to pause it again could cause a race 771 // condition between this loop, which calls tryDecrementDemand(), 772 // and the thread that calls request(n), which will try to resume 773 // reading. 774 try { 775 while(!readScheduler.isStopped()) { 776 if (completed) return; 777 778 // make sure we have a subscriber 779 if (handlePending()) { 780 if (debug.on()) 781 debug.log("pending subscriber subscribed"); 782 return; 783 } 784 785 // If an error was signaled, we might not be in the 786 // the selector thread, and that is OK, because we 787 // will just call onError and return. 788 ReadSubscription current = subscription; 789 Throwable error = errorRef.get(); 790 if (current == null) { 791 assert error != null; 792 if (debug.on()) 793 debug.log("error raised before subscriber subscribed: %s", 794 (Object)error); 795 return; 796 } 797 TubeSubscriber subscriber = current.subscriber; 798 if (error != null) { 799 completed = true; 800 // safe to pause here because we're finished anyway. 801 pauseReadEvent(); 802 if (debug.on()) 803 debug.log("Sending error " + error 804 + " to subscriber " + subscriber); 805 if (Log.channel()) { 806 Log.logChannel("Raising error with subscriber for {0}: {1}", 807 channelDescr(), error); 808 } 809 current.errorRef.compareAndSet(null, error); 810 current.signalCompletion(); 811 readScheduler.stop(); 812 debugState("leaving read() loop with error: "); 813 return; 814 } 815 816 // If we reach here then we must be in the selector thread. 817 assert client.isSelectorThread(); 818 if (demand.tryDecrement()) { 819 // we have demand. 820 try { 821 List<ByteBuffer> bytes = readAvailable(current.bufferSource); 822 if (bytes == EOF) { 823 if (!completed) { 824 if (debug.on()) debug.log("got read EOF"); 825 if (Log.channel()) { 826 Log.logChannel("EOF read from channel: {0}", 827 channelDescr()); 828 } 829 completed = true; 830 // safe to pause here because we're finished 831 // anyway. 832 pauseReadEvent(); 833 current.signalCompletion(); 834 readScheduler.stop(); 835 } 836 debugState("leaving read() loop after EOF: "); 837 return; 838 } else if (Utils.remaining(bytes) > 0) { 839 // the subscriber is responsible for offloading 840 // to another thread if needed. 841 if (debug.on()) 842 debug.log("read bytes: " + Utils.remaining(bytes)); 843 assert !current.completed; 844 subscriber.onNext(bytes); 845 // we could continue looping until the demand 846 // reaches 0. However, that would risk starving 847 // other connections (bound to other socket 848 // channels) - as other selected keys activated 849 // by the selector manager thread might be 850 // waiting for this event to terminate. 851 // So resume the read event and return now... 852 resumeReadEvent(); 853 debugState("leaving read() loop after onNext: "); 854 return; 855 } else { 856 // nothing available! 857 if (debug.on()) debug.log("no more bytes available"); 858 // re-increment the demand and resume the read 859 // event. This ensures that this loop is 860 // executed again when the socket becomes 861 // readable again. 862 demand.increase(1); 863 resumeReadEvent(); 864 debugState("leaving read() loop with no bytes"); 865 return; 866 } 867 } catch (Throwable x) { 868 signalError(x); 869 continue; 870 } 871 } else { 872 if (debug.on()) debug.log("no more demand for reading"); 873 // the event is paused just after firing, so it should 874 // still be paused here, unless the demand was just 875 // incremented from 0 to n, in which case, the 876 // event will be resumed, causing this loop to be 877 // invoked again when the socket becomes readable: 878 // This is what we want. 879 // Trying to pause the event here would actually 880 // introduce a race condition between this loop and 881 // request(n). 882 debugState("leaving read() loop with no demand"); 883 break; 884 } 885 } 886 } catch (Throwable t) { 887 if (debug.on()) debug.log("Unexpected exception in read loop", t); 888 signalError(t); 889 } finally { 890 if (readScheduler.isStopped()) { 891 if (debug.on()) debug.log("Read scheduler stopped"); 892 if (Log.channel()) { 893 Log.logChannel("Stopped reading from channel {0}", channelDescr()); 894 } 895 } 896 handlePending(); 897 } 898 } 899 900 boolean handlePending() { 901 ReadSubscription pending = pendingSubscription.getAndSet(null); 902 if (pending == null) return false; 903 if (debug.on()) 904 debug.log("handling pending subscription for %s", 905 pending.subscriber); 906 ReadSubscription current = subscription; 907 if (current != null && current != pending && !completed) { 908 current.subscriber.dropSubscription(); 909 } 910 if (debug.on()) debug.log("read demand reset to 0"); 911 subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to. 912 pending.errorRef.compareAndSet(null, errorRef.get()); 913 if (!readScheduler.isStopped()) { 914 subscription = pending; 915 } else { 916 if (debug.on()) debug.log("socket tube is already stopped"); 917 } 918 if (debug.on()) debug.log("calling onSubscribe"); 919 pending.signalOnSubscribe(); 920 if (completed) { 921 pending.errorRef.compareAndSet(null, errorRef.get()); 922 pending.signalCompletion(); 923 } 924 return true; 925 } 926 } 927 928 929 // A repeatable ReadEvent which is paused after firing and can 930 // be resumed if required - see SocketFlowEvent; 931 final class ReadEvent extends SocketFlowEvent { 932 final InternalReadSubscription sub; 933 ReadEvent(SocketChannel channel, InternalReadSubscription sub) { 934 super(SelectionKey.OP_READ, channel); 935 this.sub = sub; 936 } 937 @Override 938 protected final void signalEvent() { 939 try { 940 client.eventUpdated(this); 941 sub.signalReadable(); 942 } catch(Throwable t) { 943 sub.signalError(t); 944 } 945 } 946 947 @Override 948 protected final void signalError(Throwable error) { 949 sub.signalError(error); 950 } 951 952 @Override 953 Logger debug() { return debug; } 954 } 955 } 956 957 // ===================================================================== // 958 // Buffer Management // 959 // ===================================================================== // 960 961 // This interface is used by readAvailable(BufferSource); 962 public interface BufferSource { 963 /** 964 * Returns a buffer to read data from the socket. 965 * 966 * @implNote 967 * Different implementation can have different strategies, as to 968 * which kind of buffer to return, or whether to return the same 969 * buffer. The only constraints are that: 970 * a. the buffer returned must not be null 971 * b. the buffer position indicates where to start reading 972 * c. the buffer limit indicates where to stop reading. 973 * d. the buffer is 'free' - that is - it is not used 974 * or retained by anybody else 975 * 976 * @return A buffer to read data from the socket. 977 */ 978 ByteBuffer getBuffer(); 979 980 /** 981 * Appends the read-data in {@code buffer} to the list of buffer to 982 * be sent downstream to the subscriber. May return a new 983 * list, or append to the given list. 984 * 985 * @implNote 986 * Different implementation can have different strategies, but 987 * must obviously be consistent with the implementation of the 988 * getBuffer() method. For instance, an implementation could 989 * decide to add the buffer to the list and return a new buffer 990 * next time getBuffer() is called, or could decide to add a buffer 991 * slice to the list and return the same buffer (if remaining 992 * space is available) next time getBuffer() is called. 993 * 994 * @param list The list before adding the data. Can be null. 995 * @param buffer The buffer containing the data to add to the list. 996 * @param start The start position at which data were read. 997 * The current buffer position indicates the end. 998 * @return A possibly new list where a buffer containing the 999 * data read from the socket has been added. 1000 */ 1001 List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buffer, int start); 1002 1003 /** 1004 * Returns the given unused {@code buffer}, previously obtained from 1005 * {@code getBuffer}. 1006 * 1007 * @implNote This method can be used, if necessary, to return 1008 * the unused buffer to the pull. 1009 * 1010 * @param buffer The unused buffer. 1011 */ 1012 default void returnUnused(ByteBuffer buffer) { } 1013 } 1014 1015 // An implementation of BufferSource used for unencrypted data. 1016 // This buffer source uses heap buffers and avoids wasting memory 1017 // by forwarding read-only buffer slices downstream. 1018 // Buffers allocated through this source are simply GC'ed when 1019 // they are no longer referenced. 1020 private static final class SliceBufferSource implements BufferSource { 1021 private final Supplier<ByteBuffer> factory; 1022 private volatile ByteBuffer current; 1023 1024 public SliceBufferSource() { 1025 this(Utils::getBuffer); 1026 } 1027 public SliceBufferSource(Supplier<ByteBuffer> factory) { 1028 this.factory = Objects.requireNonNull(factory); 1029 } 1030 1031 // Reuses the same buffer if some space remains available. 1032 // Otherwise, returns a new heap buffer. 1033 @Override 1034 public final ByteBuffer getBuffer() { 1035 ByteBuffer buf = current; 1036 buf = (buf == null || !buf.hasRemaining()) 1037 ? (current = factory.get()) : buf; 1038 assert buf.hasRemaining(); 1039 return buf; 1040 } 1041 1042 // Adds a read-only slice to the list, potentially returning a 1043 // new list with that slice at the end. 1044 @Override 1045 public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) { 1046 // creates a slice to add to the list 1047 int limit = buf.limit(); 1048 buf.limit(buf.position()); 1049 buf.position(start); 1050 ByteBuffer slice = buf.slice(); 1051 1052 // restore buffer state to what it was before creating the slice 1053 buf.position(buf.limit()); 1054 buf.limit(limit); 1055 1056 // add the buffer to the list 1057 return SocketTube.listOf(list, slice.asReadOnlyBuffer()); 1058 } 1059 } 1060 1061 1062 // An implementation of BufferSource used for encrypted data. 1063 // This buffer source uses direct byte buffers that will be 1064 // recycled by the SocketTube subscriber. 1065 // 1066 private static final class SSLDirectBufferSource implements BufferSource { 1067 private final BufferSupplier factory; 1068 private final HttpClientImpl client; 1069 private ByteBuffer current; 1070 1071 public SSLDirectBufferSource(HttpClientImpl client) { 1072 this.client = Objects.requireNonNull(client); 1073 this.factory = Objects.requireNonNull(client.getSSLBufferSupplier()); 1074 } 1075 1076 // Obtains a 'free' byte buffer from the pool, or returns 1077 // the same buffer if nothing was read at the previous cycle. 1078 // The subscriber will be responsible for recycling this 1079 // buffer into the pool (see SSLFlowDelegate.Reader) 1080 @Override 1081 public final ByteBuffer getBuffer() { 1082 assert client.isSelectorThread(); 1083 ByteBuffer buf = current; 1084 if (buf == null) { 1085 buf = current = factory.get(); 1086 } 1087 assert buf.hasRemaining(); 1088 assert buf.position() == 0; 1089 return buf; 1090 } 1091 1092 // Adds the buffer to the list. The buffer will be later returned to the 1093 // pool by the subscriber (see SSLFlowDelegate.Reader). 1094 // The next buffer returned by getBuffer() will be obtained from the 1095 // pool. It might be the same buffer or another one. 1096 // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because 1097 // recycling will happen in the flow before onNext returns, then the 1098 // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though 1099 // it's shared by all SSL connections opened on that client. 1100 @Override 1101 public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) { 1102 assert client.isSelectorThread(); 1103 assert buf.isDirect(); 1104 assert start == 0; 1105 assert current == buf; 1106 current = null; 1107 buf.limit(buf.position()); 1108 buf.position(start); 1109 // add the buffer to the list 1110 return SocketTube.listOf(list, buf); 1111 } 1112 1113 @Override 1114 public void returnUnused(ByteBuffer buffer) { 1115 // if current is null, then the buffer will have been added to the 1116 // list, through append. Otherwise, current is not null, and needs 1117 // to be returned to prevent the buffer supplier pool from growing 1118 // to more than MAX_BUFFERS. 1119 assert buffer == current; 1120 ByteBuffer buf = current; 1121 if (buf != null) { 1122 assert buf.position() == 0; 1123 current = null; 1124 // the supplier assert if buf has remaining 1125 buf.limit(buf.position()); 1126 factory.recycle(buf); 1127 } 1128 } 1129 } 1130 1131 // ===================================================================== // 1132 // Socket Channel Read/Write // 1133 // ===================================================================== // 1134 static final int MAX_BUFFERS = 3; 1135 static final List<ByteBuffer> EOF = List.of(); 1136 static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER); 1137 1138 // readAvailable() will read bytes into the 'current' ByteBuffer until 1139 // the ByteBuffer is full, or 0 or -1 (EOF) is returned by read(). 1140 // When that happens, a slice of the data that has been read so far 1141 // is inserted into the returned buffer list, and if the current buffer 1142 // has remaining space, that space will be used to read more data when 1143 // the channel becomes readable again. 1144 private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException { 1145 ByteBuffer buf = buffersSource.getBuffer(); 1146 assert buf.hasRemaining(); 1147 1148 int read; 1149 int pos = buf.position(); 1150 List<ByteBuffer> list = null; 1151 while (buf.hasRemaining()) { 1152 try { 1153 while ((read = channel.read(buf)) > 0) { 1154 if (!buf.hasRemaining()) 1155 break; 1156 } 1157 } catch (IOException x) { 1158 if (buf.position() == pos && list == null) { 1159 // make sure that the buffer source will recycle 1160 // 'buf' if needed 1161 buffersSource.returnUnused(buf); 1162 // no bytes have been read, just throw... 1163 throw x; 1164 } else { 1165 // some bytes have been read, return them and fail next time 1166 errorRef.compareAndSet(null, x); 1167 read = 0; // ensures outer loop will exit 1168 } 1169 } 1170 1171 // nothing read; 1172 if (buf.position() == pos) { 1173 // An empty list signals the end of data, and should only be 1174 // returned if read == -1. If some data has already been read, 1175 // then it must be returned. -1 will be returned next time 1176 // the caller attempts to read something. 1177 buffersSource.returnUnused(buf); 1178 if (list == null) { 1179 // nothing read - list was null - return EOF or NOTHING 1180 list = read == -1 ? EOF : NOTHING; 1181 } 1182 break; 1183 } 1184 1185 // check whether this buffer has still some free space available. 1186 // if so, we will keep it for the next round. 1187 list = buffersSource.append(list, buf, pos); 1188 1189 if (read <= 0 || list.size() == MAX_BUFFERS) { 1190 break; 1191 } 1192 1193 buf = buffersSource.getBuffer(); 1194 pos = buf.position(); 1195 assert buf.hasRemaining(); 1196 } 1197 return list; 1198 } 1199 1200 private static <T> List<T> listOf(List<T> list, T item) { 1201 int size = list == null ? 0 : list.size(); 1202 switch (size) { 1203 case 0: return List.of(item); 1204 case 1: return List.of(list.get(0), item); 1205 case 2: return List.of(list.get(0), list.get(1), item); 1206 default: // slow path if MAX_BUFFERS > 3 1207 List<T> res = list instanceof ArrayList ? list : new ArrayList<>(list); 1208 res.add(item); 1209 return res; 1210 } 1211 } 1212 1213 private long writeAvailable(List<ByteBuffer> bytes) throws IOException { 1214 ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY); 1215 final long remaining = Utils.remaining(srcs); 1216 long written = 0; 1217 while (remaining > written) { 1218 try { 1219 long w = channel.write(srcs); 1220 assert w >= 0 : "negative number of bytes written:" + w; 1221 if (w == 0) { 1222 break; 1223 } 1224 written += w; 1225 } catch (IOException x) { 1226 if (written == 0) { 1227 // no bytes were written just throw 1228 throw x; 1229 } else { 1230 // return how many bytes were written, will fail next time 1231 break; 1232 } 1233 } 1234 } 1235 return written; 1236 } 1237 1238 private void resumeEvent(SocketFlowEvent event, 1239 Consumer<Throwable> errorSignaler) { 1240 boolean registrationRequired; 1241 synchronized(lock) { 1242 registrationRequired = !event.registered(); 1243 event.resume(); 1244 } 1245 try { 1246 if (registrationRequired) { 1247 client.registerEvent(event); 1248 } else { 1249 client.eventUpdated(event); 1250 } 1251 } catch(Throwable t) { 1252 errorSignaler.accept(t); 1253 } 1254 } 1255 1256 private void pauseEvent(SocketFlowEvent event, 1257 Consumer<Throwable> errorSignaler) { 1258 synchronized(lock) { 1259 event.pause(); 1260 } 1261 try { 1262 client.eventUpdated(event); 1263 } catch(Throwable t) { 1264 errorSignaler.accept(t); 1265 } 1266 } 1267 1268 @Override 1269 public void connectFlows(TubePublisher writePublisher, 1270 TubeSubscriber readSubscriber) { 1271 if (debug.on()) debug.log("connecting flows"); 1272 this.subscribe(readSubscriber); 1273 writePublisher.subscribe(this); 1274 } 1275 1276 1277 @Override 1278 public String toString() { 1279 return dbgString(); 1280 } 1281 1282 final String dbgString() { 1283 return "SocketTube("+id+")"; 1284 } 1285 1286 final String channelDescr() { 1287 return String.valueOf(channel); 1288 } 1289 }