1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.EOFException; 29 import java.io.IOException; 30 import java.lang.System.Logger.Level; 31 import java.nio.ByteBuffer; 32 import java.util.Arrays; 33 import java.util.HashSet; 34 import java.util.List; 35 import java.util.Set; 36 import java.util.concurrent.ConcurrentLinkedDeque; 37 import java.util.concurrent.Executor; 38 import java.util.concurrent.Flow; 39 import java.util.concurrent.atomic.AtomicBoolean; 40 import java.util.concurrent.atomic.AtomicLong; 41 import java.util.concurrent.atomic.AtomicReference; 42 import java.util.stream.Collectors; 43 import jdk.incubator.http.internal.common.Demand; 44 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber; 45 import jdk.incubator.http.internal.common.SequentialScheduler; 46 import jdk.incubator.http.internal.common.ConnectionExpiredException; 47 import jdk.incubator.http.internal.common.Utils; 48 49 50 /** 51 * A helper class that will queue up incoming data until the receiving 52 * side is ready to handle it. 53 */ 54 class Http1AsyncReceiver { 55 56 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 57 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 58 59 /** 60 * A delegate that can asynchronously receive data from an upstream flow, 61 * parse, it, then possibly transform it and either store it (response 62 * headers) or possibly pass it to a downstream subscriber (response body). 63 * Usually, there will be one Http1AsyncDelegate in charge of receiving 64 * and parsing headers, and another one in charge of receiving, parsing, 65 * and forwarding body. Each will sequentially subscribe with the 66 * Http1AsyncReceiver in turn. There may be additional delegates which 67 * subscribe to the Http1AsyncReceiver, mainly for the purpose of handling 68 * errors while the connection is busy transmitting the request body and the 69 * Http1Exchange::readBody method hasn't been called yet, and response 70 * delegates haven't subscribed yet. 71 */ 72 static interface Http1AsyncDelegate { 73 /** 74 * Receives and handles a byte buffer reference. 75 * @param ref A byte buffer reference coming from upstream. 76 * @return false, if the byte buffer reference should be kept in the queue. 77 * Usually, this means that either the byte buffer reference 78 * was handled and parsing is finished, or that the receiver 79 * didn't handle the byte reference at all. 80 * There may or may not be any remaining data in the 81 * byte buffer, and the byte buffer reference must not have 82 * been cleared. 83 * true, if the byte buffer reference was fully read and 84 * more data can be received. 85 */ 86 public boolean tryAsyncReceive(ByteBuffer ref); 87 88 /** 89 * Called when an exception is raised. 90 * @param ex The raised Throwable. 91 */ 92 public void onReadError(Throwable ex); 93 94 /** 95 * Must be called before any other method on the delegate. 96 * The subscription can be either used directly by the delegate 97 * to request more data (e.g. if the delegate is a header parser), 98 * or can be forwarded to a downstream subscriber (if the delegate 99 * is a body parser that wraps a response BodySubscriber). 100 * In all cases, it is the responsibility of the delegate to ensure 101 * that request(n) and demand.tryDecrement() are called appropriately. 102 * No data will be sent to {@code tryAsyncReceive} unless 103 * the subscription has some demand. 104 * 105 * @param s A subscription that allows the delegate to control the 106 * data flow. 107 */ 108 public void onSubscribe(AbstractSubscription s); 109 110 /** 111 * Returns the subscription that was passed to {@code onSubscribe} 112 * @return the subscription that was passed to {@code onSubscribe}.. 113 */ 114 public AbstractSubscription subscription(); 115 116 } 117 118 /** 119 * A simple subclass of AbstractSubscription that ensures the 120 * SequentialScheduler will be run when request() is called and demand 121 * becomes positive again. 122 */ 123 private static final class Http1AsyncDelegateSubscription 124 extends AbstractSubscription 125 { 126 private final Runnable onCancel; 127 private final SequentialScheduler scheduler; 128 Http1AsyncDelegateSubscription(SequentialScheduler scheduler, 129 Runnable onCancel) { 130 this.scheduler = scheduler; 131 this.onCancel = onCancel; 132 } 133 @Override 134 public void request(long n) { 135 final Demand demand = demand(); 136 if (demand.increase(n)) { 137 scheduler.runOrSchedule(); 138 } 139 } 140 @Override 141 public void cancel() { onCancel.run();} 142 } 143 144 private final ConcurrentLinkedDeque<ByteBuffer> queue 145 = new ConcurrentLinkedDeque<>(); 146 private final SequentialScheduler scheduler = 147 SequentialScheduler.synchronizedScheduler(this::flush); 148 private final Executor executor; 149 private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber(); 150 private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef; 151 private final AtomicLong received = new AtomicLong(); 152 final AtomicBoolean canRequestMore = new AtomicBoolean(); 153 154 private volatile Throwable error; 155 private volatile Http1AsyncDelegate delegate; 156 // This reference is only used to prevent early GC of the exchange. 157 private volatile Http1Exchange<?> owner; 158 // Only used for checking whether we run on the selector manager thread. 159 private final HttpClientImpl client; 160 private boolean retry; 161 162 public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) { 163 this.pendingDelegateRef = new AtomicReference<>(); 164 this.executor = executor; 165 this.owner = owner; 166 this.client = owner.client; 167 } 168 169 // This is the main loop called by the SequentialScheduler. 170 // It attempts to empty the queue until the scheduler is stopped, 171 // or the delegate is unregistered, or the delegate is unable to 172 // process the data (because it's not ready or already done), which 173 // it signals by returning 'true'; 174 private void flush() { 175 ByteBuffer buf; 176 try { 177 assert !client.isSelectorThread() : 178 "Http1AsyncReceiver::flush should not run in the selector: " 179 + Thread.currentThread().getName(); 180 181 // First check whether we have a pending delegate that has 182 // just subscribed, and if so, create a Subscription for it 183 // and call onSubscribe. 184 handlePendingDelegate(); 185 186 // Then start emptying the queue, if possible. 187 while ((buf = queue.peek()) != null) { 188 Http1AsyncDelegate delegate = this.delegate; 189 debug.log(Level.DEBUG, "Got %s bytes for delegate %s", 190 buf.remaining(), delegate); 191 if (!hasDemand(delegate)) { 192 // The scheduler will be invoked again later when the demand 193 // becomes positive. 194 return; 195 } 196 197 assert delegate != null; 198 debug.log(Level.DEBUG, "Forwarding %s bytes to delegate %s", 199 buf.remaining(), delegate); 200 // The delegate has demand: feed it the next buffer. 201 if (!delegate.tryAsyncReceive(buf)) { 202 final long remaining = buf.remaining(); 203 debug.log(Level.DEBUG, () -> { 204 // If the scheduler is stopped, the queue may already 205 // be empty and the reference may already be released. 206 String remstr = scheduler.isStopped() ? "" : 207 " remaining in ref: " 208 + remaining; 209 remstr = remstr 210 + " total remaining: " + remaining(); 211 return "Delegate done: " + remaining; 212 }); 213 canRequestMore.set(false); 214 // The last buffer parsed may have remaining unparsed bytes. 215 // Don't take it out of the queue. 216 return; // done. 217 } 218 219 // removed parsed buffer from queue, and continue with next 220 // if available 221 ByteBuffer parsed = queue.remove(); 222 canRequestMore.set(queue.isEmpty()); 223 assert parsed == buf; 224 } 225 226 // queue is empty: let's see if we should request more 227 checkRequestMore(); 228 229 } catch (Throwable t) { 230 Throwable x = error; 231 if (x == null) error = t; // will be handled in the finally block 232 debug.log(Level.DEBUG, "Unexpected error caught in flush()", t); 233 } finally { 234 // Handles any pending error. 235 // The most recently subscribed delegate will get the error. 236 checkForErrors(); 237 } 238 } 239 240 /** 241 * Must be called from within the scheduler main loop. 242 * Handles any pending errors by calling delegate.onReadError(). 243 * If the error can be forwarded to the delegate, stops the scheduler. 244 */ 245 private void checkForErrors() { 246 // Handles any pending error. 247 // The most recently subscribed delegate will get the error. 248 // If the delegate is null, the error will be handled by the next 249 // delegate that subscribes. 250 // If the queue is not empty, wait until it it is empty before 251 // handling the error. 252 Http1AsyncDelegate delegate = pendingDelegateRef.get(); 253 if (delegate == null) delegate = this.delegate; 254 Throwable x = error; 255 if (delegate != null && x != null && queue.isEmpty()) { 256 // forward error only after emptying the queue. 257 final Object captured = delegate; 258 debug.log(Level.DEBUG, () -> "flushing " + x 259 + "\n\t delegate: " + captured 260 + "\t\t queue.isEmpty: " + queue.isEmpty()); 261 scheduler.stop(); 262 delegate.onReadError(x); 263 } 264 } 265 266 /** 267 * Must be called from within the scheduler main loop. 268 * Figure out whether more data should be requested from the 269 * Http1TubeSubscriber. 270 */ 271 private void checkRequestMore() { 272 Http1AsyncDelegate delegate = this.delegate; 273 boolean more = this.canRequestMore.get(); 274 boolean hasDemand = hasDemand(delegate); 275 debug.log(Level.DEBUG, () -> "checkRequestMore: " 276 + "canRequestMore=" + more + ", hasDemand=" + hasDemand 277 + (delegate == null ? ", delegate=null" : "")); 278 if (hasDemand) { 279 subscriber.requestMore(); 280 } 281 } 282 283 /** 284 * Must be called from within the scheduler main loop. 285 * Return true if the delegate is not null and has some demand. 286 * @param delegate The Http1AsyncDelegate delegate 287 * @return true if the delegate is not null and has some demand 288 */ 289 private boolean hasDemand(Http1AsyncDelegate delegate) { 290 if (delegate == null) return false; 291 AbstractSubscription subscription = delegate.subscription(); 292 long demand = subscription.demand().get(); 293 debug.log(Level.DEBUG, "downstream subscription demand is %s", demand); 294 return demand > 0; 295 } 296 297 /** 298 * Must be called from within the scheduler main loop. 299 * Handles pending delegate subscription. 300 * Return true if there was some pending delegate subscription and a new 301 * delegate was subscribed, false otherwise. 302 * 303 * @return true if there was some pending delegate subscription and a new 304 * delegate was subscribed, false otherwise. 305 */ 306 private boolean handlePendingDelegate() { 307 Http1AsyncDelegate pending = pendingDelegateRef.get(); 308 if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) { 309 Http1AsyncDelegate delegate = this.delegate; 310 if (delegate != null) unsubscribe(delegate); 311 Runnable cancel = () -> { 312 debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending); 313 // The connection should be closed, as some data may 314 // be left over in the stream. 315 try { 316 setRetryOnError(false); 317 onReadError(new IOException("subscription cancelled")); 318 unsubscribe(pending); 319 } finally { 320 Http1Exchange<?> exchg = owner; 321 stop(); 322 if (exchg != null) exchg.connection().close(); 323 } 324 }; 325 // The subscription created by a delegate is only loosely 326 // coupled with the upstream subscription. This is partly because 327 // the header/body parser work with a flow of ByteBuffer, whereas 328 // we have a flow List<ByteBuffer> upstream. 329 Http1AsyncDelegateSubscription subscription = 330 new Http1AsyncDelegateSubscription(scheduler, cancel); 331 pending.onSubscribe(subscription); 332 this.delegate = delegate = pending; 333 final Object captured = delegate; 334 debug.log(Level.DEBUG, () -> "delegate is now " + captured 335 + ", demand=" + subscription.demand().get() 336 + ", canRequestMore=" + canRequestMore.get() 337 + ", queue.isEmpty=" + queue.isEmpty()); 338 return true; 339 } 340 return false; 341 } 342 343 synchronized void setRetryOnError(boolean retry) { 344 this.retry = retry; 345 } 346 347 void clear() { 348 debug.log(Level.DEBUG, "cleared"); 349 this.pendingDelegateRef.set(null); 350 this.delegate = null; 351 this.owner = null; 352 } 353 354 void subscribe(Http1AsyncDelegate delegate) { 355 synchronized(this) { 356 pendingDelegateRef.set(delegate); 357 } 358 if (queue.isEmpty()) { 359 canRequestMore.set(true); 360 } 361 debug.log(Level.DEBUG, () -> 362 "Subscribed pending " + delegate + " queue.isEmpty: " 363 + queue.isEmpty()); 364 // Everything may have been received already. Make sure 365 // we parse it. 366 if (client.isSelectorThread()) { 367 scheduler.deferOrSchedule(executor); 368 } else { 369 scheduler.runOrSchedule(); 370 } 371 } 372 373 // Used for debugging only! 374 long remaining() { 375 return Utils.remaining(queue.toArray(Utils.EMPTY_BB_ARRAY)); 376 } 377 378 void unsubscribe(Http1AsyncDelegate delegate) { 379 synchronized(this) { 380 if (this.delegate == delegate) { 381 debug.log(Level.DEBUG, "Unsubscribed %s", delegate); 382 this.delegate = null; 383 } 384 } 385 } 386 387 // Callback: Consumer of ByteBuffer 388 private void asyncReceive(ByteBuffer buf) { 389 debug.log(Level.DEBUG, "Putting %s bytes into the queue", buf.remaining()); 390 received.addAndGet(buf.remaining()); 391 queue.offer(buf); 392 393 // This callback is called from within the selector thread. 394 // Use an executor here to avoid doing the heavy lifting in the 395 // selector. 396 scheduler.deferOrSchedule(executor); 397 } 398 399 // Callback: Consumer of Throwable 400 void onReadError(Throwable ex) { 401 Http1AsyncDelegate delegate; 402 Throwable recorded; 403 debug.log(Level.DEBUG, "onError: %s", (Object) ex); 404 synchronized (this) { 405 delegate = this.delegate; 406 recorded = error; 407 if (recorded == null) { 408 // retry is set to true by HttpExchange when the connection is 409 // already connected, which means it's been retrieved from 410 // the pool. 411 if (retry && (ex instanceof IOException)) { 412 // could be either EOFException, or 413 // IOException("connection reset by peer), or 414 // SSLHandshakeException resulting from the server having 415 // closed the SSL session. 416 if (received.get() == 0) { 417 // If we receive such an exception before having 418 // received any byte, then in this case, we will 419 // throw ConnectionExpiredException 420 // to try & force a retry of the request. 421 retry = false; 422 ex = new ConnectionExpiredException( 423 "subscription is finished", ex); 424 } 425 } 426 error = ex; 427 } 428 final Throwable t = (recorded == null ? ex : recorded); 429 debug.log(Level.DEBUG, () -> "recorded " + t 430 + "\n\t delegate: " + delegate 431 + "\t\t queue.isEmpty: " + queue.isEmpty(), ex); 432 } 433 if (queue.isEmpty() || pendingDelegateRef.get() != null) { 434 // This callback is called from within the selector thread. 435 // Use an executor here to avoid doing the heavy lifting in the 436 // selector. 437 scheduler.deferOrSchedule(executor); 438 } 439 } 440 441 void stop() { 442 debug.log(Level.DEBUG, "stopping"); 443 scheduler.stop(); 444 delegate = null; 445 owner = null; 446 } 447 448 /** 449 * Returns the TubeSubscriber for reading from the connection flow. 450 * @return the TubeSubscriber for reading from the connection flow. 451 */ 452 TubeSubscriber subscriber() { 453 return subscriber; 454 } 455 456 /** 457 * A simple tube subscriber for reading from the connection flow. 458 */ 459 final class Http1TubeSubscriber implements TubeSubscriber { 460 volatile Flow.Subscription subscription; 461 volatile boolean completed; 462 volatile boolean dropped; 463 464 public void onSubscribe(Flow.Subscription subscription) { 465 // supports being called multiple time. 466 // doesn't cancel the previous subscription, since that is 467 // most probably the same as the new subscription. 468 assert this.subscription == null || dropped == false; 469 this.subscription = subscription; 470 dropped = false; 471 canRequestMore.set(true); 472 if (delegate != null) { 473 scheduler.deferOrSchedule(executor); 474 } 475 } 476 477 void requestMore() { 478 Flow.Subscription s = subscription; 479 if (s == null) return; 480 if (canRequestMore.compareAndSet(true, false)) { 481 if (!completed && !dropped) { 482 debug.log(Level.DEBUG, 483 "Http1TubeSubscriber: requesting one more from upstream"); 484 s.request(1); 485 return; 486 } 487 } 488 debug.log(Level.DEBUG, "Http1TubeSubscriber: no need to request more"); 489 } 490 491 @Override 492 public void onNext(List<ByteBuffer> item) { 493 canRequestMore.set(item.isEmpty()); 494 for (ByteBuffer buffer : item) { 495 asyncReceive(buffer); 496 } 497 } 498 499 @Override 500 public void onError(Throwable throwable) { 501 onReadError(throwable); 502 completed = true; 503 } 504 505 @Override 506 public void onComplete() { 507 onReadError(new EOFException("EOF reached while reading")); 508 completed = true; 509 } 510 511 public void dropSubscription() { 512 debug.log(Level.DEBUG, "Http1TubeSubscriber: dropSubscription"); 513 // we could probably set subscription to null here... 514 // then we might not need the 'dropped' boolean? 515 dropped = true; 516 } 517 518 } 519 520 // Drains the content of the queue into a single ByteBuffer. 521 // The scheduler must be permanently stopped before calling drain(). 522 ByteBuffer drain(ByteBuffer initial) { 523 // Revisit: need to clean that up. 524 // 525 ByteBuffer b = initial = (initial == null ? Utils.EMPTY_BYTEBUFFER : initial); 526 assert scheduler.isStopped(); 527 528 if (queue.isEmpty()) return b; 529 530 // sanity check: we shouldn't have queued the same 531 // buffer twice. 532 ByteBuffer[] qbb = queue.toArray(new ByteBuffer[queue.size()]); 533 assert java.util.stream.Stream.of(qbb) 534 .collect(Collectors.toSet()) 535 .size() == qbb.length : debugQBB(qbb); 536 537 // compute the number of bytes in the queue, the number of bytes 538 // in the initial buffer 539 // TODO: will need revisiting - as it is not guaranteed that all 540 // data will fit in single BB! 541 int size = Utils.remaining(qbb, Integer.MAX_VALUE); 542 int remaining = b.remaining(); 543 int free = b.capacity() - b.position() - remaining; 544 debug.log(Level.DEBUG, 545 "Flushing %s bytes from queue into initial buffer (remaining=%s, free=%s)", 546 size, remaining, free); 547 548 // check whether the initial buffer has enough space 549 if (size > free) { 550 debug.log(Level.DEBUG, 551 "Allocating new buffer for initial: %s", (size + remaining)); 552 // allocates a new buffer and copy initial to it 553 b = ByteBuffer.allocate(size + remaining); 554 Utils.copy(initial, b); 555 assert b.position() == remaining; 556 b.flip(); 557 assert b.position() == 0; 558 assert b.limit() == remaining; 559 assert b.remaining() == remaining; 560 } 561 562 // store position and limit 563 int pos = b.position(); 564 int limit = b.limit(); 565 assert limit - pos == remaining; 566 assert b.capacity() >= remaining + size 567 : "capacity: " + b.capacity() 568 + ", remaining: " + b.remaining() 569 + ", size: " + size; 570 571 // prepare to copy the content of the queue 572 b.position(limit); 573 b.limit(pos + remaining + size); 574 assert b.remaining() >= size : 575 "remaining: " + b.remaining() + ", size: " + size; 576 577 // copy the content of the queue 578 int count = 0; 579 for (int i=0; i<qbb.length; i++) { 580 ByteBuffer b2 = qbb[i]; 581 int r = b2.remaining(); 582 assert b.remaining() >= r : "need at least " + r + " only " 583 + b.remaining() + " available"; 584 int copied = Utils.copy(b2, b); 585 assert copied == r : "copied="+copied+" available="+r; 586 assert b2.remaining() == 0; 587 count += copied; 588 } 589 assert count == size; 590 assert b.position() == pos + remaining + size : 591 "b.position="+b.position()+" != "+pos+"+"+remaining+"+"+size; 592 593 // reset limit and position 594 b.limit(limit+size); 595 b.position(pos); 596 597 // we can clear the refs 598 queue.clear(); 599 final ByteBuffer bb = b; 600 debug.log(Level.DEBUG, () -> "Initial buffer now has " + bb.remaining() 601 + " pos=" + bb.position() + " limit=" + bb.limit()); 602 603 return b; 604 } 605 606 private String debugQBB(ByteBuffer[] qbb) { 607 StringBuilder msg = new StringBuilder(); 608 List<ByteBuffer> lbb = Arrays.asList(qbb); 609 Set<ByteBuffer> sbb = new HashSet<>(Arrays.asList(qbb)); 610 611 int uniquebb = sbb.size(); 612 msg.append("qbb: ").append(lbb.size()) 613 .append(" (unique: ").append(uniquebb).append("), ") 614 .append("duplicates: "); 615 String sep = ""; 616 for (ByteBuffer b : lbb) { 617 if (!sbb.remove(b)) { 618 msg.append(sep) 619 .append(String.valueOf(b)) 620 .append("[remaining=") 621 .append(b.remaining()) 622 .append(", position=") 623 .append(b.position()) 624 .append(", capacity=") 625 .append(b.capacity()) 626 .append("]"); 627 sep = ", "; 628 } 629 } 630 return msg.toString(); 631 } 632 633 volatile String dbgTag; 634 String dbgString() { 635 String tag = dbgTag; 636 if (tag == null) { 637 String flowTag = null; 638 Http1Exchange<?> exchg = owner; 639 Object flow = (exchg != null) 640 ? exchg.connection().getConnectionFlow() 641 : null; 642 flowTag = tag = flow == null ? null: (String.valueOf(flow)); 643 if (flowTag != null) { 644 dbgTag = tag = flowTag + " Http1AsyncReceiver"; 645 } else { 646 tag = "Http1AsyncReceiver"; 647 } 648 } 649 return tag; 650 } 651 }