1 /*
   2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.EOFException;
  29 import java.io.IOException;
  30 import java.lang.System.Logger.Level;
  31 import java.nio.ByteBuffer;
  32 import java.util.Arrays;
  33 import java.util.HashSet;
  34 import java.util.List;
  35 import java.util.Set;
  36 import java.util.concurrent.ConcurrentLinkedDeque;
  37 import java.util.concurrent.Executor;
  38 import java.util.concurrent.Flow;
  39 import java.util.concurrent.atomic.AtomicBoolean;
  40 import java.util.concurrent.atomic.AtomicLong;
  41 import java.util.concurrent.atomic.AtomicReference;
  42 import java.util.stream.Collectors;
  43 import jdk.incubator.http.internal.common.Demand;
  44 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
  45 import jdk.incubator.http.internal.common.SequentialScheduler;
  46 import jdk.incubator.http.internal.common.ConnectionExpiredException;
  47 import jdk.incubator.http.internal.common.Utils;
  48 
  49 
  50 /**
  51  * A helper class that will queue up incoming data until the receiving
  52  * side is ready to handle it.
  53  */
  54 class Http1AsyncReceiver {
  55 
  56     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  57     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
  58 
  59     /**
  60      * A delegate that can asynchronously receive data from an upstream flow,
  61      * parse, it, then possibly transform it and either store it (response
  62      * headers) or possibly pass it to a downstream subscriber (response body).
  63      * Usually, there will be one Http1AsyncDelegate in charge of receiving
  64      * and parsing headers, and another one in charge of receiving, parsing,
  65      * and forwarding body. Each will sequentially subscribe with the
  66      * Http1AsyncReceiver in turn. There may be additional delegates which
  67      * subscribe to the Http1AsyncReceiver, mainly for the purpose of handling
  68      * errors while the connection is busy transmitting the request body and the
  69      * Http1Exchange::readBody method hasn't been called yet, and response
  70      * delegates haven't subscribed yet.
  71      */
  72     static interface Http1AsyncDelegate {
  73         /**
  74          * Receives and handles a byte buffer reference.
  75          * @param ref A byte buffer reference coming from upstream.
  76          * @return false, if the byte buffer reference should be kept in the queue.
  77          *         Usually, this means that either the byte buffer reference
  78          *         was handled and parsing is finished, or that the receiver
  79          *         didn't handle the byte reference at all.
  80          *         There may or may not be any remaining data in the
  81          *         byte buffer, and the byte buffer reference must not have
  82          *         been cleared.
  83          *         true, if the byte buffer reference was fully read and
  84          *         more data can be received.
  85          */
  86         public boolean tryAsyncReceive(ByteBuffer ref);
  87 
  88         /**
  89          * Called when an exception is raised.
  90          * @param ex The raised Throwable.
  91          */
  92         public void onReadError(Throwable ex);
  93 
  94         /**
  95          * Must be called before any other method on the delegate.
  96          * The subscription can be either used directly by the delegate
  97          * to request more data (e.g. if the delegate is a header parser),
  98          * or can be forwarded to a downstream subscriber (if the delegate
  99          * is a body parser that wraps a response BodySubscriber).
 100          * In all cases, it is the responsibility of the delegate to ensure
 101          * that request(n) and demand.tryDecrement() are called appropriately.
 102          * No data will be sent to {@code tryAsyncReceive} unless
 103          * the subscription has some demand.
 104          *
 105          * @param s A subscription that allows the delegate to control the
 106          *          data flow.
 107          */
 108         public void onSubscribe(AbstractSubscription s);
 109 
 110         /**
 111          * Returns the subscription that was passed to {@code onSubscribe}
 112          * @return the subscription that was passed to {@code onSubscribe}..
 113          */
 114         public AbstractSubscription subscription();
 115 
 116     }
 117 
 118     /**
 119      * A simple subclass of AbstractSubscription that ensures the
 120      * SequentialScheduler will be run when request() is called and demand
 121      * becomes positive again.
 122      */
 123     private static final class Http1AsyncDelegateSubscription
 124             extends AbstractSubscription
 125     {
 126         private final Runnable onCancel;
 127         private final SequentialScheduler scheduler;
 128         Http1AsyncDelegateSubscription(SequentialScheduler scheduler,
 129                                        Runnable onCancel) {
 130             this.scheduler = scheduler;
 131             this.onCancel = onCancel;
 132         }
 133         @Override
 134         public void request(long n) {
 135             final Demand demand = demand();
 136             if (demand.increase(n)) {
 137                 scheduler.runOrSchedule();
 138             }
 139         }
 140         @Override
 141         public void cancel() { onCancel.run();}
 142     }
 143 
 144     private final ConcurrentLinkedDeque<ByteBuffer> queue
 145             = new ConcurrentLinkedDeque<>();
 146     private final SequentialScheduler scheduler =
 147             SequentialScheduler.synchronizedScheduler(this::flush);
 148     private final Executor executor;
 149     private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
 150     private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef;
 151     private final AtomicLong received = new AtomicLong();
 152     final AtomicBoolean canRequestMore = new AtomicBoolean();
 153 
 154     private volatile Throwable error;
 155     private volatile Http1AsyncDelegate delegate;
 156     // This reference is only used to prevent early GC of the exchange.
 157     private volatile Http1Exchange<?>  owner;
 158     // Only used for checking whether we run on the selector manager thread.
 159     private final HttpClientImpl client;
 160     private boolean retry;
 161 
 162     public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) {
 163         this.pendingDelegateRef = new AtomicReference<>();
 164         this.executor = executor;
 165         this.owner = owner;
 166         this.client = owner.client;
 167     }
 168 
 169     // This is the main loop called by the SequentialScheduler.
 170     // It attempts to empty the queue until the scheduler is stopped,
 171     // or the delegate is unregistered, or the delegate is unable to
 172     // process the data (because it's not ready or already done), which
 173     // it signals by returning 'true';
 174     private void flush() {
 175         ByteBuffer buf;
 176         try {
 177             assert !client.isSelectorThread() :
 178                     "Http1AsyncReceiver::flush should not run in the selector: "
 179                     + Thread.currentThread().getName();
 180 
 181             // First check whether we have a pending delegate that has
 182             // just subscribed, and if so, create a Subscription for it
 183             // and call onSubscribe.
 184             handlePendingDelegate();
 185 
 186             // Then start emptying the queue, if possible.
 187             while ((buf = queue.peek()) != null) {
 188                 Http1AsyncDelegate delegate = this.delegate;
 189                 debug.log(Level.DEBUG, "Got %s bytes for delegate %s",
 190                                        buf.remaining(), delegate);
 191                 if (!hasDemand(delegate)) {
 192                     // The scheduler will be invoked again later when the demand
 193                     // becomes positive.
 194                     return;
 195                 }
 196 
 197                 assert delegate != null;
 198                 debug.log(Level.DEBUG, "Forwarding %s bytes to delegate %s",
 199                           buf.remaining(), delegate);
 200                 // The delegate has demand: feed it the next buffer.
 201                 if (!delegate.tryAsyncReceive(buf)) {
 202                     final long remaining = buf.remaining();
 203                     debug.log(Level.DEBUG, () -> {
 204                         // If the scheduler is stopped, the queue may already
 205                         // be empty and the reference may already be released.
 206                         String remstr = scheduler.isStopped() ? "" :
 207                                 " remaining in ref: "
 208                                 + remaining;
 209                         remstr =  remstr
 210                                 + " total remaining: " + remaining();
 211                         return "Delegate done: " + remaining;
 212                     });
 213                     canRequestMore.set(false);
 214                     // The last buffer parsed may have remaining unparsed bytes.
 215                     // Don't take it out of the queue.
 216                     return; // done.
 217                 }
 218 
 219                 // removed parsed buffer from queue, and continue with next
 220                 // if available
 221                 ByteBuffer parsed = queue.remove();
 222                 canRequestMore.set(queue.isEmpty());
 223                 assert parsed == buf;
 224             }
 225 
 226             // queue is empty: let's see if we should request more
 227             checkRequestMore();
 228 
 229         } catch (Throwable t) {
 230             Throwable x = error;
 231             if (x == null) error = t; // will be handled in the finally block
 232             debug.log(Level.DEBUG, "Unexpected error caught in flush()", t);
 233         } finally {
 234             // Handles any pending error.
 235             // The most recently subscribed delegate will get the error.
 236             checkForErrors();
 237         }
 238     }
 239 
 240     /**
 241      * Must be called from within the scheduler main loop.
 242      * Handles any pending errors by calling delegate.onReadError().
 243      * If the error can be forwarded to the delegate, stops the scheduler.
 244      */
 245     private void checkForErrors() {
 246         // Handles any pending error.
 247         // The most recently subscribed delegate will get the error.
 248         // If the delegate is null, the error will be handled by the next
 249         // delegate that subscribes.
 250         // If the queue is not empty, wait until it it is empty before
 251         // handling the error.
 252         Http1AsyncDelegate delegate = pendingDelegateRef.get();
 253         if (delegate == null) delegate = this.delegate;
 254         Throwable x = error;
 255         if (delegate != null && x != null && queue.isEmpty()) {
 256             // forward error only after emptying the queue.
 257             final Object captured = delegate;
 258             debug.log(Level.DEBUG, () -> "flushing " + x
 259                     + "\n\t delegate: " + captured
 260                     + "\t\t queue.isEmpty: " + queue.isEmpty());
 261             scheduler.stop();
 262             delegate.onReadError(x);
 263         }
 264     }
 265 
 266     /**
 267      * Must be called from within the scheduler main loop.
 268      * Figure out whether more data should be requested from the
 269      * Http1TubeSubscriber.
 270      */
 271     private void checkRequestMore() {
 272         Http1AsyncDelegate delegate = this.delegate;
 273         boolean more = this.canRequestMore.get();
 274         boolean hasDemand = hasDemand(delegate);
 275         debug.log(Level.DEBUG, () -> "checkRequestMore: "
 276                   + "canRequestMore=" + more + ", hasDemand=" + hasDemand
 277                   + (delegate == null ? ", delegate=null" : ""));
 278         if (hasDemand) {
 279             subscriber.requestMore();
 280         }
 281     }
 282 
 283     /**
 284      * Must be called from within the scheduler main loop.
 285      * Return true if the delegate is not null and has some demand.
 286      * @param delegate The Http1AsyncDelegate delegate
 287      * @return true if the delegate is not null and has some demand
 288      */
 289     private boolean hasDemand(Http1AsyncDelegate delegate) {
 290         if (delegate == null) return false;
 291         AbstractSubscription subscription = delegate.subscription();
 292         long demand = subscription.demand().get();
 293         debug.log(Level.DEBUG, "downstream subscription demand is %s", demand);
 294         return demand > 0;
 295     }
 296 
 297     /**
 298      * Must be called from within the scheduler main loop.
 299      * Handles pending delegate subscription.
 300      * Return true if there was some pending delegate subscription and a new
 301      * delegate was subscribed, false otherwise.
 302      *
 303      * @return true if there was some pending delegate subscription and a new
 304      *         delegate was subscribed, false otherwise.
 305      */
 306     private boolean handlePendingDelegate() {
 307         Http1AsyncDelegate pending = pendingDelegateRef.get();
 308         if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) {
 309             Http1AsyncDelegate delegate = this.delegate;
 310             if (delegate != null) unsubscribe(delegate);
 311             Runnable cancel = () -> {
 312                 debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending);
 313                 // The connection should be closed, as some data may
 314                 // be left over in the stream.
 315                 try {
 316                     setRetryOnError(false);
 317                     onReadError(new IOException("subscription cancelled"));
 318                     unsubscribe(pending);
 319                 } finally {
 320                     Http1Exchange<?> exchg = owner;
 321                     stop();
 322                     if (exchg != null) exchg.connection().close();
 323                 }
 324             };
 325             // The subscription created by a delegate is only loosely
 326             // coupled with the upstream subscription. This is partly because
 327             // the header/body parser work with a flow of ByteBuffer, whereas
 328             // we have a flow List<ByteBuffer> upstream.
 329             Http1AsyncDelegateSubscription subscription =
 330                     new Http1AsyncDelegateSubscription(scheduler, cancel);
 331             pending.onSubscribe(subscription);
 332             this.delegate = delegate = pending;
 333             final Object captured = delegate;
 334             debug.log(Level.DEBUG, () -> "delegate is now " + captured
 335                   + ", demand=" + subscription.demand().get()
 336                   + ", canRequestMore=" + canRequestMore.get()
 337                   + ", queue.isEmpty=" + queue.isEmpty());
 338             return true;
 339         }
 340         return false;
 341     }
 342 
 343     synchronized void setRetryOnError(boolean retry) {
 344         this.retry = retry;
 345     }
 346 
 347     void clear() {
 348         debug.log(Level.DEBUG, "cleared");
 349         this.pendingDelegateRef.set(null);
 350         this.delegate = null;
 351         this.owner = null;
 352     }
 353 
 354     void subscribe(Http1AsyncDelegate delegate) {
 355         synchronized(this) {
 356             pendingDelegateRef.set(delegate);
 357         }
 358         if (queue.isEmpty()) {
 359             canRequestMore.set(true);
 360         }
 361         debug.log(Level.DEBUG, () ->
 362                 "Subscribed pending " + delegate + " queue.isEmpty: "
 363                 + queue.isEmpty());
 364         // Everything may have been received already. Make sure
 365         // we parse it.
 366         if (client.isSelectorThread()) {
 367             scheduler.deferOrSchedule(executor);
 368         } else {
 369             scheduler.runOrSchedule();
 370         }
 371     }
 372 
 373     // Used for debugging only!
 374     long remaining() {
 375         return Utils.remaining(queue.toArray(Utils.EMPTY_BB_ARRAY));
 376     }
 377 
 378     void unsubscribe(Http1AsyncDelegate delegate) {
 379         synchronized(this) {
 380             if (this.delegate == delegate) {
 381                 debug.log(Level.DEBUG, "Unsubscribed %s", delegate);
 382                 this.delegate = null;
 383             }
 384         }
 385     }
 386 
 387     // Callback: Consumer of ByteBuffer
 388     private void asyncReceive(ByteBuffer buf) {
 389         debug.log(Level.DEBUG, "Putting %s bytes into the queue", buf.remaining());
 390         received.addAndGet(buf.remaining());
 391         queue.offer(buf);
 392 
 393         // This callback is called from within the selector thread.
 394         // Use an executor here to avoid doing the heavy lifting in the
 395         // selector.
 396         scheduler.deferOrSchedule(executor);
 397     }
 398 
 399     // Callback: Consumer of Throwable
 400     void onReadError(Throwable ex) {
 401         Http1AsyncDelegate delegate;
 402         Throwable recorded;
 403         debug.log(Level.DEBUG, "onError: %s", (Object) ex);
 404         synchronized (this) {
 405             delegate = this.delegate;
 406             recorded = error;
 407             if (recorded == null) {
 408                 // retry is set to true by HttpExchange when the connection is
 409                 // already connected, which means it's been retrieved from
 410                 // the pool.
 411                 if (retry && (ex instanceof IOException)) {
 412                     // could be either EOFException, or
 413                     // IOException("connection reset by peer), or
 414                     // SSLHandshakeException resulting from the server having
 415                     // closed the SSL session.
 416                     if (received.get() == 0) {
 417                         // If we receive such an exception before having
 418                         // received any byte, then in this case, we will
 419                         // throw ConnectionExpiredException
 420                         // to try & force a retry of the request.
 421                         retry = false;
 422                         ex = new ConnectionExpiredException(
 423                                 "subscription is finished", ex);
 424                     }
 425                 }
 426                 error = ex;
 427             }
 428             final Throwable t = (recorded == null ? ex : recorded);
 429             debug.log(Level.DEBUG, () -> "recorded " + t
 430                     + "\n\t delegate: " + delegate
 431                     + "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
 432         }
 433         if (queue.isEmpty() || pendingDelegateRef.get() != null) {
 434             // This callback is called from within the selector thread.
 435             // Use an executor here to avoid doing the heavy lifting in the
 436             // selector.
 437             scheduler.deferOrSchedule(executor);
 438         }
 439     }
 440 
 441     void stop() {
 442         debug.log(Level.DEBUG, "stopping");
 443         scheduler.stop();
 444         delegate = null;
 445         owner  = null;
 446     }
 447 
 448     /**
 449      * Returns the TubeSubscriber for reading from the connection flow.
 450      * @return the TubeSubscriber for reading from the connection flow.
 451      */
 452     TubeSubscriber subscriber() {
 453         return subscriber;
 454     }
 455 
 456     /**
 457      * A simple tube subscriber for reading from the connection flow.
 458      */
 459     final class Http1TubeSubscriber implements TubeSubscriber {
 460         volatile Flow.Subscription subscription;
 461         volatile boolean completed;
 462         volatile boolean dropped;
 463 
 464         public void onSubscribe(Flow.Subscription subscription) {
 465             // supports being called multiple time.
 466             // doesn't cancel the previous subscription, since that is
 467             // most probably the same as the new subscription.
 468             assert this.subscription == null || dropped == false;
 469             this.subscription = subscription;
 470             dropped = false;
 471             canRequestMore.set(true);
 472             if (delegate != null) {
 473                 scheduler.deferOrSchedule(executor);
 474             }
 475         }
 476 
 477         void requestMore() {
 478             Flow.Subscription s = subscription;
 479             if (s == null) return;
 480             if (canRequestMore.compareAndSet(true, false)) {
 481                 if (!completed && !dropped) {
 482                     debug.log(Level.DEBUG,
 483                         "Http1TubeSubscriber: requesting one more from upstream");
 484                     s.request(1);
 485                     return;
 486                 }
 487             }
 488             debug.log(Level.DEBUG, "Http1TubeSubscriber: no need to request more");
 489         }
 490 
 491         @Override
 492         public void onNext(List<ByteBuffer> item) {
 493             canRequestMore.set(item.isEmpty());
 494             for (ByteBuffer buffer : item) {
 495                 asyncReceive(buffer);
 496             }
 497         }
 498 
 499         @Override
 500         public void onError(Throwable throwable) {
 501             onReadError(throwable);
 502             completed = true;
 503         }
 504 
 505         @Override
 506         public void onComplete() {
 507             onReadError(new EOFException("EOF reached while reading"));
 508             completed = true;
 509         }
 510 
 511         public void dropSubscription() {
 512             debug.log(Level.DEBUG, "Http1TubeSubscriber: dropSubscription");
 513             // we could probably set subscription to null here...
 514             // then we might not need the 'dropped' boolean?
 515             dropped = true;
 516         }
 517 
 518     }
 519 
 520     // Drains the content of the queue into a single ByteBuffer.
 521     // The scheduler must be permanently stopped before calling drain().
 522     ByteBuffer drain(ByteBuffer initial) {
 523         // Revisit: need to clean that up.
 524         //
 525         ByteBuffer b = initial = (initial == null ? Utils.EMPTY_BYTEBUFFER : initial);
 526         assert scheduler.isStopped();
 527 
 528         if (queue.isEmpty()) return b;
 529 
 530         // sanity check: we shouldn't have queued the same
 531         // buffer twice.
 532         ByteBuffer[] qbb = queue.toArray(new ByteBuffer[queue.size()]);
 533         assert java.util.stream.Stream.of(qbb)
 534                 .collect(Collectors.toSet())
 535                 .size() == qbb.length : debugQBB(qbb);
 536 
 537         // compute the number of bytes in the queue, the number of bytes
 538         // in the initial buffer
 539         // TODO: will need revisiting - as it is not guaranteed that all
 540         // data will fit in single BB!
 541         int size = Utils.remaining(qbb, Integer.MAX_VALUE);
 542         int remaining = b.remaining();
 543         int free = b.capacity() - b.position() - remaining;
 544         debug.log(Level.DEBUG,
 545             "Flushing %s bytes from queue into initial buffer (remaining=%s, free=%s)",
 546             size, remaining, free);
 547 
 548         // check whether the initial buffer has enough space
 549         if (size > free) {
 550             debug.log(Level.DEBUG,
 551                     "Allocating new buffer for initial: %s", (size + remaining));
 552             // allocates a new buffer and copy initial to it
 553             b = ByteBuffer.allocate(size + remaining);
 554             Utils.copy(initial, b);
 555             assert b.position() == remaining;
 556             b.flip();
 557             assert b.position() == 0;
 558             assert b.limit() == remaining;
 559             assert b.remaining() == remaining;
 560         }
 561 
 562         // store position and limit
 563         int pos = b.position();
 564         int limit = b.limit();
 565         assert limit - pos == remaining;
 566         assert b.capacity() >= remaining + size
 567                 : "capacity: " + b.capacity()
 568                 + ", remaining: " + b.remaining()
 569                 + ", size: " + size;
 570 
 571         // prepare to copy the content of the queue
 572         b.position(limit);
 573         b.limit(pos + remaining + size);
 574         assert b.remaining() >= size :
 575                 "remaining: " + b.remaining() + ", size: " + size;
 576 
 577         // copy the content of the queue
 578         int count = 0;
 579         for (int i=0; i<qbb.length; i++) {
 580             ByteBuffer b2 = qbb[i];
 581             int r = b2.remaining();
 582             assert b.remaining() >= r : "need at least " + r + " only "
 583                     + b.remaining() + " available";
 584             int copied = Utils.copy(b2, b);
 585             assert copied == r : "copied="+copied+" available="+r;
 586             assert b2.remaining() == 0;
 587             count += copied;
 588         }
 589         assert count == size;
 590         assert b.position() == pos + remaining + size :
 591                 "b.position="+b.position()+" != "+pos+"+"+remaining+"+"+size;
 592 
 593         // reset limit and position
 594         b.limit(limit+size);
 595         b.position(pos);
 596 
 597         // we can clear the refs
 598         queue.clear();
 599         final ByteBuffer bb = b;
 600         debug.log(Level.DEBUG, () -> "Initial buffer now has " + bb.remaining()
 601                 + " pos=" + bb.position() + " limit=" + bb.limit());
 602 
 603         return b;
 604     }
 605 
 606     private String debugQBB(ByteBuffer[] qbb) {
 607         StringBuilder msg = new StringBuilder();
 608         List<ByteBuffer> lbb = Arrays.asList(qbb);
 609         Set<ByteBuffer> sbb = new HashSet<>(Arrays.asList(qbb));
 610 
 611         int uniquebb = sbb.size();
 612         msg.append("qbb: ").append(lbb.size())
 613            .append(" (unique: ").append(uniquebb).append("), ")
 614            .append("duplicates: ");
 615         String sep = "";
 616         for (ByteBuffer b : lbb) {
 617             if (!sbb.remove(b)) {
 618                 msg.append(sep)
 619                    .append(String.valueOf(b))
 620                    .append("[remaining=")
 621                    .append(b.remaining())
 622                    .append(", position=")
 623                    .append(b.position())
 624                    .append(", capacity=")
 625                    .append(b.capacity())
 626                    .append("]");
 627                 sep = ", ";
 628             }
 629         }
 630         return msg.toString();
 631     }
 632 
 633     volatile String dbgTag;
 634     String dbgString() {
 635         String tag = dbgTag;
 636         if (tag == null) {
 637             String flowTag = null;
 638             Http1Exchange<?> exchg = owner;
 639             Object flow = (exchg != null)
 640                     ? exchg.connection().getConnectionFlow()
 641                     : null;
 642             flowTag = tag = flow == null ? null: (String.valueOf(flow));
 643             if (flowTag != null) {
 644                 dbgTag = tag = flowTag + " Http1AsyncReceiver";
 645             } else {
 646                 tag = "Http1AsyncReceiver";
 647             }
 648         }
 649         return tag;
 650     }
 651 }