1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.net.InetSocketAddress; 31 import jdk.incubator.http.HttpResponse.BodyHandler; 32 import jdk.incubator.http.HttpResponse.BodySubscriber; 33 import java.nio.ByteBuffer; 34 import java.util.Objects; 35 import java.util.concurrent.CompletableFuture; 36 import java.util.LinkedList; 37 import java.util.List; 38 import java.util.concurrent.ConcurrentLinkedDeque; 39 import java.util.concurrent.Executor; 40 import java.util.concurrent.Flow; 41 import jdk.incubator.http.internal.common.Demand; 42 import jdk.incubator.http.internal.common.Log; 43 import jdk.incubator.http.internal.common.FlowTube; 44 import jdk.incubator.http.internal.common.SequentialScheduler; 45 import jdk.incubator.http.internal.common.MinimalFuture; 46 import jdk.incubator.http.internal.common.Utils; 47 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1; 48 49 /** 50 * Encapsulates one HTTP/1.1 request/response exchange. 51 */ 52 class Http1Exchange<T> extends ExchangeImpl<T> { 53 54 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 55 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 56 private static final System.Logger DEBUG_LOGGER = 57 Utils.getDebugLogger("Http1Exchange"::toString, DEBUG); 58 59 final HttpRequestImpl request; // main request 60 final Http1Request requestAction; 61 private volatile Http1Response<T> response; 62 final HttpConnection connection; 63 final HttpClientImpl client; 64 final Executor executor; 65 private final Http1AsyncReceiver asyncReceiver; 66 67 /** Records a possible cancellation raised before any operation 68 * has been initiated, or an error received while sending the request. */ 69 private Throwable failed; 70 private final List<CompletableFuture<?>> operations; // used for cancel 71 72 /** Must be held when operating on any internal state or data. */ 73 private final Object lock = new Object(); 74 75 /** Holds the outgoing data, either the headers or a request body part. Or 76 * an error from the request body publisher. At most there can be ~2 pieces 77 * of outgoing data ( onComplete|onError can be invoked without demand ).*/ 78 final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>(); 79 80 /** The write publisher, responsible for writing the complete request ( both 81 * headers and body ( if any ). */ 82 private final Http1Publisher writePublisher = new Http1Publisher(); 83 84 /** Completed when the header have been published, or there is an error */ 85 private volatile CompletableFuture<ExchangeImpl<T>> headersSentCF = new MinimalFuture<>(); 86 /** Completed when the body has been published, or there is an error */ 87 private volatile CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>(); 88 89 /** The subscriber to the request's body published. Maybe null. */ 90 private volatile Http1BodySubscriber bodySubscriber; 91 92 enum State { INITIAL, 93 HEADERS, 94 BODY, 95 ERROR, // terminal state 96 COMPLETING, 97 COMPLETED } // terminal state 98 99 private State state = State.INITIAL; 100 101 /** A carrier for either data or an error. Used to carry data, and communicate 102 * errors from the request ( both headers and body ) to the exchange. */ 103 static class DataPair { 104 Throwable throwable; 105 List<ByteBuffer> data; 106 DataPair(List<ByteBuffer> data, Throwable throwable){ 107 this.data = data; 108 this.throwable = throwable; 109 } 110 @Override 111 public String toString() { 112 return "DataPair [data=" + data + ", throwable=" + throwable + "]"; 113 } 114 } 115 116 /** An abstract supertype for HTTP/1.1 body subscribers. There are two 117 * concrete implementations: {@link Http1Request.StreamSubscriber}, and 118 * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and 119 * fixed length bodies, respectively. */ 120 static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> { 121 protected volatile Flow.Subscription subscription; 122 protected volatile boolean complete; 123 124 /** Final sentinel in the stream of request body. */ 125 static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0)); 126 127 void request(long n) { 128 DEBUG_LOGGER.log(Level.DEBUG, () -> 129 "Http1BodySubscriber requesting " + n + ", from " + subscription); 130 subscription.request(n); 131 } 132 133 static Http1BodySubscriber completeSubscriber() { 134 return new Http1BodySubscriber() { 135 @Override public void onSubscribe(Flow.Subscription subscription) { error(); } 136 @Override public void onNext(ByteBuffer item) { error(); } 137 @Override public void onError(Throwable throwable) { error(); } 138 @Override public void onComplete() { error(); } 139 private void error() { 140 throw new InternalError("should not reach here"); 141 } 142 }; 143 } 144 } 145 146 @Override 147 public String toString() { 148 return "HTTP/1.1 " + request.toString(); 149 } 150 151 HttpRequestImpl request() { 152 return request; 153 } 154 155 Http1Exchange(Exchange<T> exchange, HttpConnection connection) 156 throws IOException 157 { 158 super(exchange); 159 this.request = exchange.request(); 160 this.client = exchange.client(); 161 this.executor = exchange.executor(); 162 this.operations = new LinkedList<>(); 163 operations.add(headersSentCF); 164 operations.add(bodySentCF); 165 if (connection != null) { 166 this.connection = connection; 167 } else { 168 InetSocketAddress addr = request.getAddress(); 169 this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1); 170 } 171 this.requestAction = new Http1Request(request, this); 172 this.asyncReceiver = new Http1AsyncReceiver(executor, this); 173 asyncReceiver.subscribe(new InitialErrorReceiver()); 174 } 175 176 /** An initial receiver that handles no data, but cancels the request if 177 * it receives an error. Will be replaced when reading response body. */ 178 final class InitialErrorReceiver implements Http1AsyncReceiver.Http1AsyncDelegate { 179 volatile AbstractSubscription s; 180 @Override 181 public boolean tryAsyncReceive(ByteBuffer ref) { 182 return false; // no data has been processed, leave it in the queue 183 } 184 185 @Override 186 public void onReadError(Throwable ex) { 187 cancelImpl(ex); 188 } 189 190 @Override 191 public void onSubscribe(AbstractSubscription s) { 192 this.s = s; 193 } 194 195 public AbstractSubscription subscription() { 196 return s; 197 } 198 } 199 200 @Override 201 HttpConnection connection() { 202 return connection; 203 } 204 205 private void connectFlows(HttpConnection connection) { 206 FlowTube tube = connection.getConnectionFlow(); 207 debug.log(Level.DEBUG, "%s connecting flows", tube); 208 209 // Connect the flow to our Http1TubeSubscriber: 210 // asyncReceiver.subscriber(). 211 tube.connectFlows(writePublisher, 212 asyncReceiver.subscriber()); 213 } 214 215 @Override 216 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 217 // create the response before sending the request headers, so that 218 // the response can set the appropriate receivers. 219 debug.log(Level.DEBUG, "Sending headers only"); 220 if (response == null) { 221 response = new Http1Response<>(connection, this, asyncReceiver); 222 } 223 224 debug.log(Level.DEBUG, "response created in advance"); 225 // If the first attempt to read something triggers EOF, or 226 // IOException("channel reset by peer"), we're going to retry. 227 // Instruct the asyncReceiver to throw ConnectionExpiredException 228 // to force a retry. 229 asyncReceiver.setRetryOnError(true); 230 231 CompletableFuture<Void> connectCF; 232 if (!connection.connected()) { 233 debug.log(Level.DEBUG, "initiating connect async"); 234 connectCF = connection.connectAsync(); 235 synchronized (lock) { 236 operations.add(connectCF); 237 } 238 } else { 239 connectCF = new MinimalFuture<>(); 240 connectCF.complete(null); 241 } 242 243 return connectCF 244 .thenCompose(unused -> { 245 CompletableFuture<Void> cf = new MinimalFuture<>(); 246 try { 247 connectFlows(connection); 248 249 debug.log(Level.DEBUG, "requestAction.headers"); 250 List<ByteBuffer> data = requestAction.headers(); 251 synchronized (lock) { 252 state = State.HEADERS; 253 } 254 debug.log(Level.DEBUG, "setting outgoing with headers"); 255 assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing; 256 appendToOutgoing(data); 257 cf.complete(null); 258 return cf; 259 } catch (Throwable t) { 260 debug.log(Level.DEBUG, "Failed to send headers: %s", t); 261 connection.close(); 262 cf.completeExceptionally(t); 263 return cf; 264 } }) 265 .thenCompose(unused -> headersSentCF); 266 } 267 268 @Override 269 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 270 assert headersSentCF.isDone(); 271 try { 272 bodySubscriber = requestAction.continueRequest(); 273 if (bodySubscriber == null) { 274 bodySubscriber = Http1BodySubscriber.completeSubscriber(); 275 appendToOutgoing(Http1BodySubscriber.COMPLETED); 276 } else { 277 bodySubscriber.request(1); // start 278 } 279 } catch (Throwable t) { 280 connection.close(); 281 bodySentCF.completeExceptionally(t); 282 } 283 return bodySentCF; 284 } 285 286 @Override 287 CompletableFuture<Response> getResponseAsync(Executor executor) { 288 CompletableFuture<Response> cf = response.readHeadersAsync(executor); 289 Throwable cause; 290 synchronized (lock) { 291 operations.add(cf); 292 cause = failed; 293 failed = null; 294 } 295 296 if (cause != null) { 297 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]" 298 + "\n\tCompleting exceptionally with {2}\n", 299 request.uri(), 300 request.timeout().isPresent() ? 301 // calling duration.toMillis() can throw an exception. 302 // this is just debugging, we don't care if it overflows. 303 (request.timeout().get().getSeconds() * 1000 304 + request.timeout().get().getNano() / 1000000) : -1, 305 cause); 306 boolean acknowledged = cf.completeExceptionally(cause); 307 debug.log(Level.DEBUG, 308 () -> acknowledged 309 ? ("completed response with " + cause) 310 : ("response already completed, ignoring " + cause)); 311 } 312 return cf; 313 } 314 315 @Override 316 CompletableFuture<T> readBodyAsync(BodyHandler<T> handler, 317 boolean returnConnectionToPool, 318 Executor executor) 319 { 320 BodySubscriber<T> bs = handler.apply(response.responseCode(), 321 response.responseHeaders()); 322 CompletableFuture<T> bodyCF = response.readBody(bs, 323 returnConnectionToPool, 324 executor); 325 return bodyCF; 326 } 327 328 @Override 329 CompletableFuture<Void> ignoreBody() { 330 return response.ignoreBody(executor); 331 } 332 333 ByteBuffer drainLeftOverBytes() { 334 synchronized (lock) { 335 asyncReceiver.stop(); 336 return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER); 337 } 338 } 339 340 void released() { 341 Http1Response<T> resp = this.response; 342 if (resp != null) resp.completed(); 343 asyncReceiver.clear(); 344 } 345 346 void completed() { 347 Http1Response<T> resp = this.response; 348 if (resp != null) resp.completed(); 349 } 350 351 /** 352 * Cancel checks to see if request and responseAsync finished already. 353 * If not it closes the connection and completes all pending operations 354 */ 355 @Override 356 void cancel() { 357 cancelImpl(new IOException("Request cancelled")); 358 } 359 360 /** 361 * Cancel checks to see if request and responseAsync finished already. 362 * If not it closes the connection and completes all pending operations 363 */ 364 @Override 365 void cancel(IOException cause) { 366 cancelImpl(cause); 367 } 368 369 private void cancelImpl(Throwable cause) { 370 LinkedList<CompletableFuture<?>> toComplete = null; 371 int count = 0; 372 synchronized (lock) { 373 if (failed == null) 374 failed = cause; 375 if (requestAction != null && requestAction.finished() 376 && response != null && response.finished()) { 377 return; 378 } 379 connection.close(); // TODO: ensure non-blocking if holding the lock 380 writePublisher.writeScheduler.stop(); 381 if (operations.isEmpty()) { 382 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation." 383 + "\n\tCan''t cancel yet with {2}", 384 request.uri(), 385 request.timeout().isPresent() ? 386 // calling duration.toMillis() can throw an exception. 387 // this is just debugging, we don't care if it overflows. 388 (request.timeout().get().getSeconds() * 1000 389 + request.timeout().get().getNano() / 1000000) : -1, 390 cause); 391 } else { 392 for (CompletableFuture<?> cf : operations) { 393 if (!cf.isDone()) { 394 if (toComplete == null) toComplete = new LinkedList<>(); 395 toComplete.add(cf); 396 count++; 397 } 398 } 399 operations.clear(); 400 } 401 } 402 Log.logError("Http1Exchange.cancel: count=" + count); 403 if (toComplete != null) { 404 // We might be in the selector thread in case of timeout, when 405 // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline() 406 // There may or may not be other places that reach here 407 // from the SelectorManager thread, so just make sure we 408 // don't complete any CF from within the selector manager 409 // thread. 410 Executor exec = client.isSelectorThread() 411 ? executor 412 : this::runInline; 413 while (!toComplete.isEmpty()) { 414 CompletableFuture<?> cf = toComplete.poll(); 415 exec.execute(() -> { 416 if (cf.completeExceptionally(cause)) { 417 debug.log(Level.DEBUG, "completed cf with %s", 418 (Object) cause); 419 } 420 }); 421 } 422 } 423 } 424 425 private void runInline(Runnable run) { 426 assert !client.isSelectorThread(); 427 run.run(); 428 } 429 430 /** Returns true if this exchange was canceled. */ 431 boolean isCanceled() { 432 synchronized (lock) { 433 return failed != null; 434 } 435 } 436 437 /** Returns the cause for which this exchange was canceled, if available. */ 438 Throwable getCancelCause() { 439 synchronized (lock) { 440 return failed; 441 } 442 } 443 444 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */ 445 void appendToOutgoing(Throwable throwable) { 446 appendToOutgoing(new DataPair(null, throwable)); 447 } 448 449 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */ 450 void appendToOutgoing(List<ByteBuffer> item) { 451 appendToOutgoing(new DataPair(item, null)); 452 } 453 454 private void appendToOutgoing(DataPair dp) { 455 debug.log(Level.DEBUG, "appending to outgoing " + dp); 456 outgoing.add(dp); 457 writePublisher.writeScheduler.runOrSchedule(); 458 } 459 460 /** Tells whether, or not, there is any outgoing data that can be published, 461 * or if there is an error. */ 462 private boolean hasOutgoing() { 463 return !outgoing.isEmpty(); 464 } 465 466 // Invoked only by the publisher 467 // ALL tasks should execute off the Selector-Manager thread 468 /** Returns the next portion of the HTTP request, or the error. */ 469 private DataPair getOutgoing() { 470 final Executor exec = client.theExecutor(); 471 final DataPair dp = outgoing.pollFirst(); 472 473 if (dp == null) // publisher has not published anything yet 474 return null; 475 476 synchronized (lock) { 477 if (dp.throwable != null) { 478 state = State.ERROR; 479 exec.execute(() -> { 480 connection.close(); 481 headersSentCF.completeExceptionally(dp.throwable); 482 bodySentCF.completeExceptionally(dp.throwable); 483 }); 484 return dp; 485 } 486 487 switch (state) { 488 case HEADERS: 489 state = State.BODY; 490 // completeAsync, since dependent tasks should run in another thread 491 debug.log(Level.DEBUG, "initiating completion of headersSentCF"); 492 headersSentCF.completeAsync(() -> this, exec); 493 break; 494 case BODY: 495 if (dp.data == Http1BodySubscriber.COMPLETED) { 496 state = State.COMPLETING; 497 debug.log(Level.DEBUG, "initiating completion of bodySentCF"); 498 bodySentCF.completeAsync(() -> this, exec); 499 } else { 500 debug.log(Level.DEBUG, "requesting more body from the subscriber"); 501 exec.execute(() -> bodySubscriber.request(1)); 502 } 503 break; 504 case INITIAL: 505 case ERROR: 506 case COMPLETING: 507 case COMPLETED: 508 default: 509 assert false : "Unexpected state:" + state; 510 } 511 512 return dp; 513 } 514 } 515 516 /** A Publisher of HTTP/1.1 headers and request body. */ 517 final class Http1Publisher implements FlowTube.TubePublisher { 518 519 final System.Logger debug = Utils.getDebugLogger(this::dbgString); 520 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 521 volatile boolean cancelled; 522 final Http1WriteSubscription subscription = new Http1WriteSubscription(); 523 final Demand demand = new Demand(); 524 final SequentialScheduler writeScheduler = 525 SequentialScheduler.synchronizedScheduler(new WriteTask()); 526 527 @Override 528 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { 529 assert state == State.INITIAL; 530 Objects.requireNonNull(s); 531 assert subscriber == null; 532 533 subscriber = s; 534 debug.log(Level.DEBUG, "got subscriber: %s", s); 535 s.onSubscribe(subscription); 536 } 537 538 volatile String dbgTag; 539 String dbgString() { 540 String tag = dbgTag; 541 Object flow = connection.getConnectionFlow(); 542 if (tag == null && flow != null) { 543 dbgTag = tag = "Http1Publisher(" + flow + ")"; 544 } else if (tag == null) { 545 tag = "Http1Publisher(?)"; 546 } 547 return tag; 548 } 549 550 final class WriteTask implements Runnable { 551 @Override 552 public void run() { 553 assert state != State.COMPLETED : "Unexpected state:" + state; 554 debug.log(Level.DEBUG, "WriteTask"); 555 if (subscriber == null) { 556 debug.log(Level.DEBUG, "no subscriber yet"); 557 return; 558 } 559 debug.log(Level.DEBUG, () -> "hasOutgoing = " + hasOutgoing()); 560 if (hasOutgoing() && demand.tryDecrement()) { 561 DataPair dp = getOutgoing(); 562 563 if (dp.throwable != null) { 564 debug.log(Level.DEBUG, "onError"); 565 // Do not call the subscriber's onError, it is not required. 566 writeScheduler.stop(); 567 } else { 568 List<ByteBuffer> data = dp.data; 569 if (data == Http1BodySubscriber.COMPLETED) { 570 synchronized (lock) { 571 assert state == State.COMPLETING : "Unexpected state:" + state; 572 state = State.COMPLETED; 573 } 574 debug.log(Level.DEBUG, 575 "completed, stopping %s", writeScheduler); 576 writeScheduler.stop(); 577 // Do nothing more. Just do not publish anything further. 578 // The next Subscriber will eventually take over. 579 580 } else { 581 debug.log(Level.DEBUG, () -> 582 "onNext with " + Utils.remaining(data) + " bytes"); 583 subscriber.onNext(data); 584 } 585 } 586 } 587 } 588 } 589 590 final class Http1WriteSubscription implements Flow.Subscription { 591 592 @Override 593 public void request(long n) { 594 if (cancelled) 595 return; //no-op 596 demand.increase(n); 597 debug.log(Level.DEBUG, 598 "subscription request(%d), demand=%s", n, demand); 599 writeScheduler.deferOrSchedule(client.theExecutor()); 600 } 601 602 @Override 603 public void cancel() { 604 debug.log(Level.DEBUG, "subscription cancelled"); 605 if (cancelled) 606 return; //no-op 607 cancelled = true; 608 writeScheduler.stop(); 609 } 610 } 611 } 612 613 String dbgString() { 614 return "Http1Exchange"; 615 } 616 }