1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.IOException; 29 import java.net.InetSocketAddress; 30 import java.net.http.HttpResponse.BodyHandler; 31 import java.net.http.HttpResponse.BodySubscriber; 32 import java.nio.ByteBuffer; 33 import java.util.Objects; 34 import java.util.concurrent.CompletableFuture; 35 import java.util.LinkedList; 36 import java.util.List; 37 import java.util.concurrent.ConcurrentLinkedDeque; 38 import java.util.concurrent.Executor; 39 import java.util.concurrent.Flow; 40 import jdk.internal.net.http.common.Demand; 41 import jdk.internal.net.http.common.Log; 42 import jdk.internal.net.http.common.FlowTube; 43 import jdk.internal.net.http.common.Logger; 44 import jdk.internal.net.http.common.SequentialScheduler; 45 import jdk.internal.net.http.common.MinimalFuture; 46 import jdk.internal.net.http.common.Utils; 47 import static java.net.http.HttpClient.Version.HTTP_1_1; 48 import static jdk.internal.net.http.common.Utils.wrapWithExtraDetail; 49 50 /** 51 * Encapsulates one HTTP/1.1 request/response exchange. 52 */ 53 class Http1Exchange<T> extends ExchangeImpl<T> { 54 55 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 56 final HttpRequestImpl request; // main request 57 final Http1Request requestAction; 58 private volatile Http1Response<T> response; 59 final HttpConnection connection; 60 final HttpClientImpl client; 61 final Executor executor; 62 private final Http1AsyncReceiver asyncReceiver; 63 64 /** Records a possible cancellation raised before any operation 65 * has been initiated, or an error received while sending the request. */ 66 private Throwable failed; 67 private final List<CompletableFuture<?>> operations; // used for cancel 68 69 /** Must be held when operating on any internal state or data. */ 70 private final Object lock = new Object(); 71 72 /** Holds the outgoing data, either the headers or a request body part. Or 73 * an error from the request body publisher. At most there can be ~2 pieces 74 * of outgoing data ( onComplete|onError can be invoked without demand ).*/ 75 final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>(); 76 77 /** The write publisher, responsible for writing the complete request ( both 78 * headers and body ( if any ). */ 79 private final Http1Publisher writePublisher = new Http1Publisher(); 80 81 /** Completed when the header have been published, or there is an error */ 82 private final CompletableFuture<ExchangeImpl<T>> headersSentCF = new MinimalFuture<>(); 83 /** Completed when the body has been published, or there is an error */ 84 private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>(); 85 86 /** The subscriber to the request's body published. Maybe null. */ 87 private volatile Http1BodySubscriber bodySubscriber; 88 89 enum State { INITIAL, 90 HEADERS, 91 BODY, 92 ERROR, // terminal state 93 COMPLETING, 94 COMPLETED } // terminal state 95 96 private State state = State.INITIAL; 97 98 /** A carrier for either data or an error. Used to carry data, and communicate 99 * errors from the request ( both headers and body ) to the exchange. */ 100 static class DataPair { 101 Throwable throwable; 102 List<ByteBuffer> data; 103 DataPair(List<ByteBuffer> data, Throwable throwable){ 104 this.data = data; 105 this.throwable = throwable; 106 } 107 @Override 108 public String toString() { 109 return "DataPair [data=" + data + ", throwable=" + throwable + "]"; 110 } 111 } 112 113 /** An abstract supertype for HTTP/1.1 body subscribers. There are two 114 * concrete implementations: {@link Http1Request.StreamSubscriber}, and 115 * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and 116 * fixed length bodies, respectively. */ 117 static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> { 118 final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>(); 119 private volatile Flow.Subscription subscription; 120 volatile boolean complete; 121 private final Logger debug; 122 Http1BodySubscriber(Logger debug) { 123 assert debug != null; 124 this.debug = debug; 125 } 126 127 /** Final sentinel in the stream of request body. */ 128 static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0)); 129 130 final void request(long n) { 131 if (debug.on()) 132 debug.log("Http1BodySubscriber requesting %d, from %s", 133 n, subscription); 134 subscription.request(n); 135 } 136 137 /** A current-state message suitable for inclusion in an exception detail message. */ 138 abstract String currentStateMessage(); 139 140 final boolean isSubscribed() { 141 return subscription != null; 142 } 143 144 final void setSubscription(Flow.Subscription subscription) { 145 this.subscription = subscription; 146 whenSubscribed.complete(subscription); 147 } 148 149 final void cancelSubscription() { 150 try { 151 subscription.cancel(); 152 } catch(Throwable t) { 153 String msg = "Ignoring exception raised when canceling BodyPublisher subscription"; 154 if (debug.on()) debug.log("%s: %s", msg, t); 155 Log.logError("{0}: {1}", msg, (Object)t); 156 } 157 } 158 159 static Http1BodySubscriber completeSubscriber(Logger debug) { 160 return new Http1BodySubscriber(debug) { 161 @Override public void onSubscribe(Flow.Subscription subscription) { error(); } 162 @Override public void onNext(ByteBuffer item) { error(); } 163 @Override public void onError(Throwable throwable) { error(); } 164 @Override public void onComplete() { error(); } 165 @Override String currentStateMessage() { return null; } 166 private void error() { 167 throw new InternalError("should not reach here"); 168 } 169 }; 170 } 171 } 172 173 @Override 174 public String toString() { 175 return "HTTP/1.1 " + request.toString(); 176 } 177 178 HttpRequestImpl request() { 179 return request; 180 } 181 182 Http1Exchange(Exchange<T> exchange, HttpConnection connection) 183 throws IOException 184 { 185 super(exchange); 186 this.request = exchange.request(); 187 this.client = exchange.client(); 188 this.executor = exchange.executor(); 189 this.operations = new LinkedList<>(); 190 operations.add(headersSentCF); 191 operations.add(bodySentCF); 192 if (connection != null) { 193 this.connection = connection; 194 } else { 195 InetSocketAddress addr = request.getAddress(); 196 this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1); 197 } 198 this.requestAction = new Http1Request(request, this); 199 this.asyncReceiver = new Http1AsyncReceiver(executor, this); 200 } 201 202 @Override 203 HttpConnection connection() { 204 return connection; 205 } 206 207 private void connectFlows(HttpConnection connection) { 208 FlowTube tube = connection.getConnectionFlow(); 209 if (debug.on()) debug.log("%s connecting flows", tube); 210 211 // Connect the flow to our Http1TubeSubscriber: 212 // asyncReceiver.subscriber(). 213 tube.connectFlows(writePublisher, 214 asyncReceiver.subscriber()); 215 } 216 217 @Override 218 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { 219 // create the response before sending the request headers, so that 220 // the response can set the appropriate receivers. 221 if (debug.on()) debug.log("Sending headers only"); 222 // If the first attempt to read something triggers EOF, or 223 // IOException("channel reset by peer"), we're going to retry. 224 // Instruct the asyncReceiver to throw ConnectionExpiredException 225 // to force a retry. 226 asyncReceiver.setRetryOnError(true); 227 if (response == null) { 228 response = new Http1Response<>(connection, this, asyncReceiver); 229 } 230 231 if (debug.on()) debug.log("response created in advance"); 232 233 CompletableFuture<Void> connectCF; 234 if (!connection.connected()) { 235 if (debug.on()) debug.log("initiating connect async"); 236 connectCF = connection.connectAsync(exchange) 237 .thenCompose(unused -> connection.finishConnect()); 238 Throwable cancelled; 239 synchronized (lock) { 240 if ((cancelled = failed) == null) { 241 operations.add(connectCF); 242 } 243 } 244 if (cancelled != null) { 245 if (client.isSelectorThread()) { 246 executor.execute(() -> 247 connectCF.completeExceptionally(cancelled)); 248 } else { 249 connectCF.completeExceptionally(cancelled); 250 } 251 } 252 } else { 253 connectCF = new MinimalFuture<>(); 254 connectCF.complete(null); 255 } 256 257 return connectCF 258 .thenCompose(unused -> { 259 CompletableFuture<Void> cf = new MinimalFuture<>(); 260 try { 261 asyncReceiver.whenFinished.whenComplete((r,t) -> { 262 if (t != null) { 263 if (debug.on()) 264 debug.log("asyncReceiver finished (failed=%s)", (Object)t); 265 if (!headersSentCF.isDone()) 266 headersSentCF.completeAsync(() -> this, executor); 267 } 268 }); 269 connectFlows(connection); 270 271 if (debug.on()) debug.log("requestAction.headers"); 272 List<ByteBuffer> data = requestAction.headers(); 273 synchronized (lock) { 274 state = State.HEADERS; 275 } 276 if (debug.on()) debug.log("setting outgoing with headers"); 277 assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing; 278 appendToOutgoing(data); 279 cf.complete(null); 280 return cf; 281 } catch (Throwable t) { 282 if (debug.on()) debug.log("Failed to send headers: %s", t); 283 headersSentCF.completeExceptionally(t); 284 bodySentCF.completeExceptionally(t); 285 connection.close(); 286 cf.completeExceptionally(t); 287 return cf; 288 } }) 289 .thenCompose(unused -> headersSentCF); 290 } 291 292 private void cancelIfFailed(Flow.Subscription s) { 293 asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> { 294 if (debug.on()) 295 debug.log("asyncReceiver finished (failed=%s)", (Object)t); 296 if (t != null) { 297 s.cancel(); 298 // Don't complete exceptionally here as 't' 299 // might not be the right exception: it will 300 // not have been decorated yet. 301 // t is an exception raised by the read side, 302 // an EOFException or Broken Pipe... 303 // We are cancelling the BodyPublisher subscription 304 // and completing bodySentCF to allow the next step 305 // to flow and call readHeaderAsync, which will 306 // get the right exception from the asyncReceiver. 307 bodySentCF.complete(this); 308 } 309 }, executor); 310 } 311 312 @Override 313 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { 314 assert headersSentCF.isDone(); 315 if (debug.on()) debug.log("sendBodyAsync"); 316 try { 317 bodySubscriber = requestAction.continueRequest(); 318 if (debug.on()) debug.log("bodySubscriber is %s", 319 bodySubscriber == null ? null : bodySubscriber.getClass()); 320 if (bodySubscriber == null) { 321 bodySubscriber = Http1BodySubscriber.completeSubscriber(debug); 322 appendToOutgoing(Http1BodySubscriber.COMPLETED); 323 } else { 324 // start 325 bodySubscriber.whenSubscribed 326 .thenAccept((s) -> cancelIfFailed(s)) 327 .thenAccept((s) -> requestMoreBody()); 328 } 329 } catch (Throwable t) { 330 cancelImpl(t); 331 bodySentCF.completeExceptionally(t); 332 } 333 return Utils.wrapForDebug(debug, "sendBodyAsync", bodySentCF); 334 } 335 336 @Override 337 CompletableFuture<Response> getResponseAsync(Executor executor) { 338 if (debug.on()) debug.log("reading headers"); 339 CompletableFuture<Response> cf = response.readHeadersAsync(executor); 340 Throwable cause; 341 synchronized (lock) { 342 operations.add(cf); 343 cause = failed; 344 failed = null; 345 } 346 347 if (cause != null) { 348 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]" 349 + "\n\tCompleting exceptionally with {2}\n", 350 request.uri(), 351 request.timeout().isPresent() ? 352 // calling duration.toMillis() can throw an exception. 353 // this is just debugging, we don't care if it overflows. 354 (request.timeout().get().getSeconds() * 1000 355 + request.timeout().get().getNano() / 1000000) : -1, 356 cause); 357 boolean acknowledged = cf.completeExceptionally(cause); 358 if (debug.on()) 359 debug.log(acknowledged ? ("completed response with " + cause) 360 : ("response already completed, ignoring " + cause)); 361 } 362 return Utils.wrapForDebug(debug, "getResponseAsync", cf); 363 } 364 365 @Override 366 CompletableFuture<T> readBodyAsync(BodyHandler<T> handler, 367 boolean returnConnectionToPool, 368 Executor executor) 369 { 370 BodySubscriber<T> bs = handler.apply(new ResponseInfoImpl(response.responseCode(), 371 response.responseHeaders(), 372 HTTP_1_1)); 373 CompletableFuture<T> bodyCF = response.readBody(bs, 374 returnConnectionToPool, 375 executor); 376 return bodyCF; 377 } 378 379 @Override 380 CompletableFuture<Void> ignoreBody() { 381 return response.ignoreBody(executor); 382 } 383 384 ByteBuffer drainLeftOverBytes() { 385 synchronized (lock) { 386 asyncReceiver.stop(); 387 return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER); 388 } 389 } 390 391 void released() { 392 Http1Response<T> resp = this.response; 393 if (resp != null) resp.completed(); 394 asyncReceiver.clear(); 395 } 396 397 void completed() { 398 Http1Response<T> resp = this.response; 399 if (resp != null) resp.completed(); 400 } 401 402 /** 403 * Cancel checks to see if request and responseAsync finished already. 404 * If not it closes the connection and completes all pending operations 405 */ 406 @Override 407 void cancel() { 408 cancelImpl(new IOException("Request cancelled")); 409 } 410 411 /** 412 * Cancel checks to see if request and responseAsync finished already. 413 * If not it closes the connection and completes all pending operations 414 */ 415 @Override 416 void cancel(IOException cause) { 417 cancelImpl(cause); 418 } 419 420 private void cancelImpl(Throwable cause) { 421 LinkedList<CompletableFuture<?>> toComplete = null; 422 int count = 0; 423 Throwable error; 424 synchronized (lock) { 425 if ((error = failed) == null) { 426 failed = error = cause; 427 } 428 if (debug.on()) { 429 debug.log(request.uri() + ": " + error); 430 } 431 if (requestAction != null && requestAction.finished() 432 && response != null && response.finished()) { 433 return; 434 } 435 writePublisher.writeScheduler.stop(); 436 if (operations.isEmpty()) { 437 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation." 438 + "\n\tCan''t cancel yet with {2}", 439 request.uri(), 440 request.timeout().isPresent() ? 441 // calling duration.toMillis() can throw an exception. 442 // this is just debugging, we don't care if it overflows. 443 (request.timeout().get().getSeconds() * 1000 444 + request.timeout().get().getNano() / 1000000) : -1, 445 cause); 446 } else { 447 for (CompletableFuture<?> cf : operations) { 448 if (!cf.isDone()) { 449 if (toComplete == null) toComplete = new LinkedList<>(); 450 toComplete.add(cf); 451 count++; 452 } 453 } 454 operations.clear(); 455 } 456 } 457 try { 458 Log.logError("Http1Exchange.cancel: count=" + count); 459 if (toComplete != null) { 460 // We might be in the selector thread in case of timeout, when 461 // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline() 462 // There may or may not be other places that reach here 463 // from the SelectorManager thread, so just make sure we 464 // don't complete any CF from within the selector manager 465 // thread. 466 Executor exec = client.isSelectorThread() 467 ? executor 468 : this::runInline; 469 Throwable x = error; 470 while (!toComplete.isEmpty()) { 471 CompletableFuture<?> cf = toComplete.poll(); 472 exec.execute(() -> { 473 if (cf.completeExceptionally(x)) { 474 if (debug.on()) 475 debug.log("%s: completed cf with %s", request.uri(), x); 476 } 477 }); 478 } 479 } 480 } finally { 481 connection.close(); 482 } 483 } 484 485 private void runInline(Runnable run) { 486 assert !client.isSelectorThread(); 487 run.run(); 488 } 489 490 /** Returns true if this exchange was canceled. */ 491 boolean isCanceled() { 492 synchronized (lock) { 493 return failed != null; 494 } 495 } 496 497 /** Returns the cause for which this exchange was canceled, if available. */ 498 Throwable getCancelCause() { 499 synchronized (lock) { 500 return failed; 501 } 502 } 503 504 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */ 505 void appendToOutgoing(Throwable throwable) { 506 appendToOutgoing(new DataPair(null, throwable)); 507 } 508 509 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */ 510 void appendToOutgoing(List<ByteBuffer> item) { 511 appendToOutgoing(new DataPair(item, null)); 512 } 513 514 private void appendToOutgoing(DataPair dp) { 515 if (debug.on()) debug.log("appending to outgoing " + dp); 516 outgoing.add(dp); 517 writePublisher.writeScheduler.runOrSchedule(); 518 } 519 520 /** Tells whether, or not, there is any outgoing data that can be published, 521 * or if there is an error. */ 522 private boolean hasOutgoing() { 523 return !outgoing.isEmpty(); 524 } 525 526 private void requestMoreBody() { 527 try { 528 if (debug.on()) debug.log("requesting more request body from the subscriber"); 529 bodySubscriber.request(1); 530 } catch (Throwable t) { 531 if (debug.on()) debug.log("Subscription::request failed", t); 532 cancelImpl(t); 533 bodySentCF.completeExceptionally(t); 534 } 535 } 536 537 // Invoked only by the publisher 538 // ALL tasks should execute off the Selector-Manager thread 539 /** Returns the next portion of the HTTP request, or the error. */ 540 private DataPair getOutgoing() { 541 final Executor exec = client.theExecutor(); 542 final DataPair dp = outgoing.pollFirst(); 543 544 if (writePublisher.cancelled) { 545 if (debug.on()) debug.log("cancelling upstream publisher"); 546 if (bodySubscriber != null) { 547 exec.execute(bodySubscriber::cancelSubscription); 548 } else if (debug.on()) { 549 debug.log("bodySubscriber is null"); 550 } 551 headersSentCF.completeAsync(() -> this, exec); 552 bodySentCF.completeAsync(() -> this, exec); 553 return null; 554 } 555 556 if (dp == null) // publisher has not published anything yet 557 return null; 558 559 if (dp.throwable != null) { 560 synchronized (lock) { 561 state = State.ERROR; 562 } 563 exec.execute(() -> { 564 headersSentCF.completeExceptionally(dp.throwable); 565 bodySentCF.completeExceptionally(dp.throwable); 566 connection.close(); 567 }); 568 return dp; 569 } 570 571 switch (state) { 572 case HEADERS: 573 synchronized (lock) { 574 state = State.BODY; 575 } 576 // completeAsync, since dependent tasks should run in another thread 577 if (debug.on()) debug.log("initiating completion of headersSentCF"); 578 headersSentCF.completeAsync(() -> this, exec); 579 break; 580 case BODY: 581 if (dp.data == Http1BodySubscriber.COMPLETED) { 582 synchronized (lock) { 583 state = State.COMPLETING; 584 } 585 if (debug.on()) debug.log("initiating completion of bodySentCF"); 586 bodySentCF.completeAsync(() -> this, exec); 587 } else { 588 exec.execute(this::requestMoreBody); 589 } 590 break; 591 case INITIAL: 592 case ERROR: 593 case COMPLETING: 594 case COMPLETED: 595 default: 596 assert false : "Unexpected state:" + state; 597 } 598 599 return dp; 600 } 601 602 /** A Publisher of HTTP/1.1 headers and request body. */ 603 final class Http1Publisher implements FlowTube.TubePublisher { 604 605 final Logger debug = Utils.getDebugLogger(this::dbgString); 606 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; 607 volatile boolean cancelled; 608 final Http1WriteSubscription subscription = new Http1WriteSubscription(); 609 final Demand demand = new Demand(); 610 final SequentialScheduler writeScheduler = 611 SequentialScheduler.synchronizedScheduler(new WriteTask()); 612 613 @Override 614 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { 615 assert state == State.INITIAL; 616 Objects.requireNonNull(s); 617 assert subscriber == null; 618 619 subscriber = s; 620 if (debug.on()) debug.log("got subscriber: %s", s); 621 s.onSubscribe(subscription); 622 } 623 624 volatile String dbgTag; 625 String dbgString() { 626 String tag = dbgTag; 627 Object flow = connection.getConnectionFlow(); 628 if (tag == null && flow != null) { 629 dbgTag = tag = "Http1Publisher(" + flow + ")"; 630 } else if (tag == null) { 631 tag = "Http1Publisher(?)"; 632 } 633 return tag; 634 } 635 636 final class WriteTask implements Runnable { 637 @Override 638 public void run() { 639 assert state != State.COMPLETED : "Unexpected state:" + state; 640 if (debug.on()) debug.log("WriteTask"); 641 642 if (cancelled) { 643 if (debug.on()) debug.log("handling cancellation"); 644 writeScheduler.stop(); 645 getOutgoing(); 646 return; 647 } 648 649 if (subscriber == null) { 650 if (debug.on()) debug.log("no subscriber yet"); 651 return; 652 } 653 if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing()); 654 while (hasOutgoing() && demand.tryDecrement()) { 655 DataPair dp = getOutgoing(); 656 if (dp == null) 657 break; 658 659 if (dp.throwable != null) { 660 if (debug.on()) debug.log("onError"); 661 // Do not call the subscriber's onError, it is not required. 662 writeScheduler.stop(); 663 } else { 664 List<ByteBuffer> data = dp.data; 665 if (data == Http1BodySubscriber.COMPLETED) { 666 synchronized (lock) { 667 assert state == State.COMPLETING : "Unexpected state:" + state; 668 state = State.COMPLETED; 669 } 670 if (debug.on()) 671 debug.log("completed, stopping %s", writeScheduler); 672 writeScheduler.stop(); 673 // Do nothing more. Just do not publish anything further. 674 // The next Subscriber will eventually take over. 675 676 } else { 677 if (debug.on()) 678 debug.log("onNext with " + Utils.remaining(data) + " bytes"); 679 subscriber.onNext(data); 680 } 681 } 682 } 683 } 684 } 685 686 final class Http1WriteSubscription implements Flow.Subscription { 687 688 @Override 689 public void request(long n) { 690 if (cancelled) 691 return; //no-op 692 demand.increase(n); 693 if (debug.on()) 694 debug.log("subscription request(%d), demand=%s", n, demand); 695 writeScheduler.runOrSchedule(client.theExecutor()); 696 } 697 698 @Override 699 public void cancel() { 700 if (debug.on()) debug.log("subscription cancelled"); 701 if (cancelled) 702 return; //no-op 703 cancelled = true; 704 writeScheduler.runOrSchedule(client.theExecutor()); 705 } 706 } 707 } 708 709 String dbgString() { 710 return "Http1Exchange"; 711 } 712 }