1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.http.HttpResponse.BodyHandler; 31 import java.net.http.HttpResponse.BodySubscriber; 32 import java.nio.ByteBuffer; 33 import java.util.Objects; 34 import java.util.concurrent.CompletableFuture; 35 import java.util.LinkedList; 36 import java.util.List; 37 import java.util.concurrent.ConcurrentLinkedDeque; 38 import java.util.concurrent.Executor; 39 import java.util.concurrent.Flow; 40 import jdk.internal.net.http.common.Demand; 41 import jdk.internal.net.http.common.Log; 42 import jdk.internal.net.http.common.FlowTube; 43 import jdk.internal.net.http.common.Logger; 44 import jdk.internal.net.http.common.SequentialScheduler; 45 import jdk.internal.net.http.common.MinimalFuture; 46 import jdk.internal.net.http.common.Utils; 47 import static java.net.http.HttpClient.Version.HTTP_1_1; 48 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail; 49 50 /** 51 * Encapsulates one HTTP/1.1 request/response exchange. 52 */ 53 class Http1Exchange<T> extends ExchangeImpl<T> { 54 55 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 56 final HttpRequestImpl request; // main request 57 final Http1Request requestAction; 58 private volatile Http1Response<T> response; 59 final HttpConnection connection; 60 final HttpClientImpl client; 61 final Executor executor; 62 private final Http1AsyncReceiver asyncReceiver; 63 64 /** Records a possible cancellation raised before any operation 65 * has been initiated, or an error received while sending the request. */ 66 private Throwable failed; 67 private final List<CompletableFuture<?>> operations; // used for cancel 68 69 /** Must be held when operating on any internal state or data. */ 70 private final Object lock = new Object(); 71 72 /** Holds the outgoing data, either the headers or a request body part. Or 73 * an error from the request body publisher. At most there can be ~2 pieces 74 * of outgoing data ( onComplete|onError can be invoked without demand ).*/ 75 final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>(); 76 77 /** The write publisher, responsible for writing the complete request ( both 78 * headers and body ( if any ). */ 79 private final Http1Publisher writePublisher = new Http1Publisher(); 80 81 /** Completed when the header have been published, or there is an error */ 82 private final CompletableFuture<ExchangeImpl<T>> headersSentCF = new MinimalFuture<>(); 83 /** Completed when the body has been published, or there is an error */ 84 private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>(); 85 86 /** The subscriber to the request's body published. Maybe null. */ 87 private volatile Http1BodySubscriber bodySubscriber; 88 89 enum State { INITIAL, 90 HEADERS, 91 BODY, 92 ERROR, // terminal state 93 COMPLETING, 94 COMPLETED } // terminal state 95 96 private State state = State.INITIAL; 97 98 /** A carrier for either data or an error. Used to carry data, and communicate 99 * errors from the request ( both headers and body ) to the exchange. */ 100 static class DataPair { 101 Throwable throwable; 102 List<ByteBuffer> data; 103 DataPair(List<ByteBuffer> data, Throwable throwable){ 104 this.data = data; 105 this.throwable = throwable; 106 } 107 @Override 108 public String toString() { 109 return "DataPair [data=" + data + ", throwable=" + throwable + "]"; 110 } 111 } 112 113 /** An abstract supertype for HTTP/1.1 body subscribers. There are two 114 * concrete implementations: {@link Http1Request.StreamSubscriber}, and 115 * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and 116 * fixed length bodies, respectively. */ 117 static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> { 118 final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>(); 119 private volatile Flow.Subscription subscription; 120 volatile boolean complete; 121 private final Logger debug; 122 Http1BodySubscriber(Logger debug) { 123 assert debug != null; 124 this.debug = debug; 125 } 126 127 /** Final sentinel in the stream of request body. */ 128 static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0)); 129 130 final void request(long n) { 131 if (debug.on()) 132 debug.log("Http1BodySubscriber requesting %d, from %s", 133 n, subscription); 134 subscription.request(n); 135 } 136 137 /** A current-state message suitable for inclusion in an exception detail message. */ 138 abstract String currentStateMessage(); 139 140 final boolean isSubscribed() { 141 return subscription != null; 142 } 143 144 final void setSubscription(Flow.Subscription subscription) { 145 this.subscription = subscription; 146 whenSubscribed.complete(subscription); 147 } 148 149 final void cancelSubscription() { 150 try { 151 subscription.cancel(); 152 } catch(Throwable t) { 153 String msg = "Ignoring exception raised when canceling BodyPublisher subscription"; 154 if (debug.on()) debug.log("%s: %s", msg, t); 155 Log.logError("{0}: {1}", msg, (Object)t); 156 } 157 } 158 159 static Http1BodySubscriber completeSubscriber(Logger debug) { 160 return new Http1BodySubscriber(debug) { 161 @Override public void onSubscribe(Flow.Subscription subscription) { error(); } 162 @Override public void onNext(ByteBuffer item) { error(); } 163 @Override public void onError(Throwable throwable) { error(); } 164 @Override public void onComplete() { error(); } 165 @Override String currentStateMessage() { return null; } 166 private void error() { 167 throw new InternalError("should not reach here"); 168 } 169 }; 170 } 171 } 172 173 @Override 174 public String toString() { 175 return "HTTP/1.1 " + request.toString(); 176 } 177 178 HttpRequestImpl request() { 179 return request; 180 } 181 182 Http1Exchange(Exchange<T> exchange, HttpConnection connection) 183 throws IOException 184 { 185 super(exchange); 186 this.request = exchange.request(); 187 this.client = exchange.client(); 188 this.executor = exchange.executor(); 189 this.operations = new LinkedList<>(); 190 operations.add(headersSentCF); 191 operations.add(bodySentCF); 192 if (connection != null) { 193 this.connection = connection; 194 } else { 195 InetSocketAddress addr = request.getAddress(); 196 this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1); 197 } 198 this.requestAction = new Http1Request(request, this); 199 this.asyncReceiver = new Http1AsyncReceiver(executor, this); 200 } 201 202 @Override 203 HttpConnection connection() { 204 return connection; 205 } 206 207 private void connectFlows(HttpConnection connection) { 208 FlowTube tube = connection.getConnectionFlow(); 209 if (debug.on()) debug.log("%s connecting flows", tube); 210 211 // Connect the flow to our Http1TubeSubscriber: 212 // asyncReceiver.subscriber(). 213 tube.connectFlows(writePublisher, 214 asyncReceiver.subscriber()); 215 } 216 217 @Override 218 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 219 // create the response before sending the request headers, so that 220 // the response can set the appropriate receivers. 221 if (debug.on()) debug.log("Sending headers only"); 222 // If the first attempt to read something triggers EOF, or 223 // IOException("channel reset by peer"), we're going to retry. 224 // Instruct the asyncReceiver to throw ConnectionExpiredException 225 // to force a retry. 226 asyncReceiver.setRetryOnError(true); 227 if (response == null) { 228 response = new Http1Response<>(connection, this, asyncReceiver); 229 } 230 231 if (debug.on()) debug.log("response created in advance"); 232 233 CompletableFuture<Void> connectCF; 234 if (!connection.connected()) { 235 if (debug.on()) debug.log("initiating connect async"); 236 connectCF = connection.connectAsync(); 237 Throwable cancelled; 238 synchronized (lock) { 239 if ((cancelled = failed) == null) { 240 operations.add(connectCF); 241 } 242 } 243 if (cancelled != null) { 244 if (client.isSelectorThread()) { 245 executor.execute(() -> 246 connectCF.completeExceptionally(cancelled)); 247 } else { 248 connectCF.completeExceptionally(cancelled); 249 } 250 } 251 } else { 252 connectCF = new MinimalFuture<>(); 253 connectCF.complete(null); 254 } 255 256 return connectCF 257 .thenCompose(unused -> { 258 CompletableFuture<Void> cf = new MinimalFuture<>(); 259 try { 260 asyncReceiver.whenFinished.whenComplete((r,t) -> { 261 if (t != null) { 262 if (debug.on()) 263 debug.log("asyncReceiver finished (failed=%s)", (Object)t); 264 if (!headersSentCF.isDone()) 265 headersSentCF.completeAsync(() -> this, executor); 266 } 267 }); 268 connectFlows(connection); 269 270 if (debug.on()) debug.log("requestAction.headers"); 271 List<ByteBuffer> data = requestAction.headers(); 272 synchronized (lock) { 273 state = State.HEADERS; 274 } 275 if (debug.on()) debug.log("setting outgoing with headers"); 276 assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing; 277 appendToOutgoing(data); 278 cf.complete(null); 279 return cf; 280 } catch (Throwable t) { 281 if (debug.on()) debug.log("Failed to send headers: %s", t); 282 headersSentCF.completeExceptionally(t); 283 bodySentCF.completeExceptionally(t); 284 connection.close(); 285 cf.completeExceptionally(t); 286 return cf; 287 } }) 288 .thenCompose(unused -> headersSentCF); 289 } 290 291 private void cancelIfFailed(Flow.Subscription s) { 292 asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> { 293 if (debug.on()) 294 debug.log("asyncReceiver finished (failed=%s)", (Object)t); 295 if (t != null) { 296 s.cancel(); 297 // Don't complete exceptionally here as 't' 298 // might not be the right exception: it will 299 // not have been decorated yet. 300 // t is an exception raised by the read side, 301 // an EOFException or Broken Pipe... 302 // We are cancelling the BodyPublisher subscription 303 // and completing bodySentCF to allow the next step 304 // to flow and call readHeaderAsync, which will 305 // get the right exception from the asyncReceiver. 306 bodySentCF.complete(this); 307 } 308 }, executor); 309 } 310 311 @Override 312 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 313 assert headersSentCF.isDone(); 314 if (debug.on()) debug.log("sendBodyAsync"); 315 try { 316 bodySubscriber = requestAction.continueRequest(); 317 if (debug.on()) debug.log("bodySubscriber is %s", 318 bodySubscriber == null ? null : bodySubscriber.getClass()); 319 if (bodySubscriber == null) { 320 bodySubscriber = Http1BodySubscriber.completeSubscriber(debug); 321 appendToOutgoing(Http1BodySubscriber.COMPLETED); 322 } else { 323 // start 324 bodySubscriber.whenSubscribed 325 .thenAccept((s) -> cancelIfFailed(s)) 326 .thenAccept((s) -> requestMoreBody()); 327 } 328 } catch (Throwable t) { 329 cancelImpl(t); 330 bodySentCF.completeExceptionally(t); 331 } 332 return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF); 333 } 334 335 @Override 336 CompletableFuture<Response> getResponseAsync(Executor executor) { 337 if (debug.on()) debug.log("reading headers"); 338 CompletableFuture<Response> cf = response.readHeadersAsync(executor); 339 Throwable cause; 340 synchronized (lock) { 341 operations.add(cf); 342 cause = failed; 343 failed = null; 344 } 345 346 if (cause != null) { 347 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]" 348 + "\n\tCompleting exceptionally with {2}\n", 349 request.uri(), 350 request.timeout().isPresent() ? 351 // calling duration.toMillis() can throw an exception. 352 // this is just debugging, we don't care if it overflows. 353 (request.timeout().get().getSeconds() * 1000 354 + request.timeout().get().getNano() / 1000000) : -1, 355 cause); 356 boolean acknowledged = cf.completeExceptionally(cause); 357 if (debug.on()) 358 debug.log(acknowledged ? ("completed response with " + cause) 359 : ("response already completed, ignoring " + cause)); 360 } 361 return Utils.wrapForDebug(debug, "getResponseAsync", cf); 362 } 363 364 @Override 365 CompletableFuture<T> readBodyAsync(BodyHandler<T> handler, 366 boolean returnConnectionToPool, 367 Executor executor) 368 { 369 BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(), 370 response.responseHeaders(), 371 HTTP_1_1)); 372 CompletableFuture<T> bodyCF = response.readBody(bs, 373 returnConnectionToPool, 374 executor); 375 return bodyCF; 376 } 377 378 @Override 379 CompletableFuture<Void> ignoreBody() { 380 return response.ignoreBody(executor); 381 } 382 383 ByteBuffer drainLeftOverBytes() { 384 synchronized (lock) { 385 asyncReceiver.stop(); 386 return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER); 387 } 388 } 389 390 void released() { 391 Http1Response<T> resp = this.response; 392 if (resp != null) resp.completed(); 393 asyncReceiver.clear(); 394 } 395 396 void completed() { 397 Http1Response<T> resp = this.response; 398 if (resp != null) resp.completed(); 399 } 400 401 /** 402 * Cancel checks to see if request and responseAsync finished already. 403 * If not it closes the connection and completes all pending operations 404 */ 405 @Override 406 void cancel() { 407 cancelImpl(new IOException("Request cancelled")); 408 } 409 410 /** 411 * Cancel checks to see if request and responseAsync finished already. 412 * If not it closes the connection and completes all pending operations 413 */ 414 @Override 415 void cancel(IOException cause) { 416 cancelImpl(cause); 417 } 418 419 private void cancelImpl(Throwable cause) { 420 LinkedList<CompletableFuture<?>> toComplete = null; 421 int count = 0; 422 Throwable error; 423 synchronized (lock) { 424 if ((error = failed) == null) { 425 failed = error = cause; 426 } 427 if (debug.on()) { 428 debug.log(request.uri() + ": " + error); 429 } 430 if (requestAction != null && requestAction.finished() 431 && response != null && response.finished()) { 432 return; 433 } 434 writePublisher.writeScheduler.stop(); 435 if (operations.isEmpty()) { 436 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation." 437 + "\n\tCan''t cancel yet with {2}", 438 request.uri(), 439 request.timeout().isPresent() ? 440 // calling duration.toMillis() can throw an exception. 441 // this is just debugging, we don't care if it overflows. 442 (request.timeout().get().getSeconds() * 1000 443 + request.timeout().get().getNano() / 1000000) : -1, 444 cause); 445 } else { 446 for (CompletableFuture<?> cf : operations) { 447 if (!cf.isDone()) { 448 if (toComplete == null) toComplete = new LinkedList<>(); 449 toComplete.add(cf); 450 count++; 451 } 452 } 453 operations.clear(); 454 } 455 } 456 try { 457 Log.logError("Http1Exchange.cancel: count=" + count); 458 if (toComplete != null) { 459 // We might be in the selector thread in case of timeout, when 460 // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline() 461 // There may or may not be other places that reach here 462 // from the SelectorManager thread, so just make sure we 463 // don't complete any CF from within the selector manager 464 // thread. 465 Executor exec = client.isSelectorThread() 466 ? executor 467 : this::runInline; 468 Throwable x = error; 469 while (!toComplete.isEmpty()) { 470 CompletableFuture<?> cf = toComplete.poll(); 471 exec.execute(() -> { 472 if (cf.completeExceptionally(x)) { 473 if (debug.on()) 474 debug.log("%s: completed cf with %s", request.uri(), x); 475 } 476 }); 477 } 478 } 479 } finally { 480 connection.close(); 481 } 482 } 483 484 private void runInline(Runnable run) { 485 assert !client.isSelectorThread(); 486 run.run(); 487 } 488 489 /** Returns true if this exchange was canceled. */ 490 boolean isCanceled() { 491 synchronized (lock) { 492 return failed != null; 493 } 494 } 495 496 /** Returns the cause for which this exchange was canceled, if available. */ 497 Throwable getCancelCause() { 498 synchronized (lock) { 499 return failed; 500 } 501 } 502 503 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */ 504 void appendToOutgoing(Throwable throwable) { 505 appendToOutgoing(new DataPair(null, throwable)); 506 } 507 508 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */ 509 void appendToOutgoing(List<ByteBuffer> item) { 510 appendToOutgoing(new DataPair(item, null)); 511 } 512 513 private void appendToOutgoing(DataPair dp) { 514 if (debug.on()) debug.log("appending to outgoing " + dp); 515 outgoing.add(dp); 516 writePublisher.writeScheduler.runOrSchedule(); 517 } 518 519 /** Tells whether, or not, there is any outgoing data that can be published, 520 * or if there is an error. */ 521 private boolean hasOutgoing() { 522 return !outgoing.isEmpty(); 523 } 524 525 private void requestMoreBody() { 526 try { 527 if (debug.on()) debug.log("requesting more request body from the subscriber"); 528 bodySubscriber.request(1); 529 } catch (Throwable t) { 530 if (debug.on()) debug.log("Subscription::request failed", t); 531 cancelImpl(t); 532 bodySentCF.completeExceptionally(t); 533 } 534 } 535 536 // Invoked only by the publisher 537 // ALL tasks should execute off the Selector-Manager thread 538 /** Returns the next portion of the HTTP request, or the error. */ 539 private DataPair getOutgoing() { 540 final Executor exec = client.theExecutor(); 541 final DataPair dp = outgoing.pollFirst(); 542 543 if (writePublisher.cancelled) { 544 if (debug.on()) debug.log("cancelling upstream publisher"); 545 if (bodySubscriber != null) { 546 exec.execute(bodySubscriber::cancelSubscription); 547 } else if (debug.on()) { 548 debug.log("bodySubscriber is null"); 549 } 550 headersSentCF.completeAsync(() -> this, exec); 551 bodySentCF.completeAsync(() -> this, exec); 552 return null; 553 } 554 555 if (dp == null) // publisher has not published anything yet 556 return null; 557 558 if (dp.throwable != null) { 559 synchronized (lock) { 560 state = State.ERROR; 561 } 562 exec.execute(() -> { 563 headersSentCF.completeExceptionally(dp.throwable); 564 bodySentCF.completeExceptionally(dp.throwable); 565 connection.close(); 566 }); 567 return dp; 568 } 569 570 switch (state) { 571 case HEADERS: 572 synchronized (lock) { 573 state = State.BODY; 574 } 575 // completeAsync, since dependent tasks should run in another thread 576 if (debug.on()) debug.log("initiating completion of headersSentCF"); 577 headersSentCF.completeAsync(() -> this, exec); 578 break; 579 case BODY: 580 if (dp.data == Http1BodySubscriber.COMPLETED) { 581 synchronized (lock) { 582 state = State.COMPLETING; 583 } 584 if (debug.on()) debug.log("initiating completion of bodySentCF"); 585 bodySentCF.completeAsync(() -> this, exec); 586 } else { 587 exec.execute(this::requestMoreBody); 588 } 589 break; 590 case INITIAL: 591 case ERROR: 592 case COMPLETING: 593 case COMPLETED: 594 default: 595 assert false : "Unexpected state:" + state; 596 } 597 598 return dp; 599 } 600 601 /** A Publisher of HTTP/1.1 headers and request body. */ 602 final class Http1Publisher implements FlowTube.TubePublisher { 603 604 final Logger debug = Utils.getDebugLogger(this::dbgString); 605 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 606 volatile boolean cancelled; 607 final Http1WriteSubscription subscription = new Http1WriteSubscription(); 608 final Demand demand = new Demand(); 609 final SequentialScheduler writeScheduler = 610 SequentialScheduler.synchronizedScheduler(new WriteTask()); 611 612 @Override 613 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { 614 assert state == State.INITIAL; 615 Objects.requireNonNull(s); 616 assert subscriber == null; 617 618 subscriber = s; 619 if (debug.on()) debug.log("got subscriber: %s", s); 620 s.onSubscribe(subscription); 621 } 622 623 volatile String dbgTag; 624 String dbgString() { 625 String tag = dbgTag; 626 Object flow = connection.getConnectionFlow(); 627 if (tag == null && flow != null) { 628 dbgTag = tag = "Http1Publisher(" + flow + ")"; 629 } else if (tag == null) { 630 tag = "Http1Publisher(?)"; 631 } 632 return tag; 633 } 634 635 final class WriteTask implements Runnable { 636 @Override 637 public void run() { 638 assert state != State.COMPLETED : "Unexpected state:" + state; 639 if (debug.on()) debug.log("WriteTask"); 640 641 if (cancelled) { 642 if (debug.on()) debug.log("handling cancellation"); 643 writeScheduler.stop(); 644 getOutgoing(); 645 return; 646 } 647 648 if (subscriber == null) { 649 if (debug.on()) debug.log("no subscriber yet"); 650 return; 651 } 652 if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing()); 653 while (hasOutgoing() && demand.tryDecrement()) { 654 DataPair dp = getOutgoing(); 655 if (dp == null) 656 break; 657 658 if (dp.throwable != null) { 659 if (debug.on()) debug.log("onError"); 660 // Do not call the subscriber's onError, it is not required. 661 writeScheduler.stop(); 662 } else { 663 List<ByteBuffer> data = dp.data; 664 if (data == Http1BodySubscriber.COMPLETED) { 665 synchronized (lock) { 666 assert state == State.COMPLETING : "Unexpected state:" + state; 667 state = State.COMPLETED; 668 } 669 if (debug.on()) 670 debug.log("completed, stopping %s", writeScheduler); 671 writeScheduler.stop(); 672 // Do nothing more. Just do not publish anything further. 673 // The next Subscriber will eventually take over. 674 675 } else { 676 if (debug.on()) 677 debug.log("onNext with " + Utils.remaining(data) + " bytes"); 678 subscriber.onNext(data); 679 } 680 } 681 } 682 } 683 } 684 685 final class Http1WriteSubscription implements Flow.Subscription { 686 687 @Override 688 public void request(long n) { 689 if (cancelled) 690 return; //no-op 691 demand.increase(n); 692 if (debug.on()) 693 debug.log("subscription request(%d), demand=%s", n, demand); 694 writeScheduler.runOrSchedule(client.theExecutor()); 695 } 696 697 @Override 698 public void cancel() { 699 if (debug.on()) debug.log("subscription cancelled"); 700 if (cancelled) 701 return; //no-op 702 cancelled = true; 703 writeScheduler.runOrSchedule(client.theExecutor()); 704 } 705 } 706 } 707 708 String dbgString() { 709 return "Http1Exchange"; 710 } 711 }