1 /*
   2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.incubator.http;
  27 
  28 import java.io.IOException;
  29 import java.lang.System.Logger.Level;
  30 import java.nio.ByteBuffer;
  31 import java.util.ArrayList;
  32 import java.util.List;
  33 import java.util.function.Consumer;
  34 import jdk.incubator.http.internal.common.Utils;
  35 
  36 /**
  37  * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
  38  *
  39  * Call pushBody() to read the body (blocking). Data and errors are provided
  40  * to given Consumers. After final buffer delivered, empty optional delivered
  41  */
  42 class ResponseContent {
  43 
  44     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  45 
  46     final HttpResponse.BodySubscriber<?> pusher;
  47     final int contentLength;
  48     final HttpHeaders headers;
  49     // this needs to run before we complete the body
  50     // so that connection can be returned to pool
  51     private final Runnable onFinished;
  52     private final String dbgTag;
  53 
  54     ResponseContent(HttpConnection connection,
  55                     int contentLength,
  56                     HttpHeaders h,
  57                     HttpResponse.BodySubscriber<?> userSubscriber,
  58                     Runnable onFinished)
  59     {
  60         this.pusher = userSubscriber;
  61         this.contentLength = contentLength;
  62         this.headers = h;
  63         this.onFinished = onFinished;
  64         this.dbgTag = connection.dbgString() + "/ResponseContent";
  65     }
  66 
  67     static final int LF = 10;
  68     static final int CR = 13;
  69 
  70     private boolean chunkedContent, chunkedContentInitialized;
  71 
  72     boolean contentChunked() throws IOException {
  73         if (chunkedContentInitialized) {
  74             return chunkedContent;
  75         }
  76         if (contentLength == -1) {
  77             String tc = headers.firstValue("Transfer-Encoding")
  78                                .orElse("");
  79             if (!tc.equals("")) {
  80                 if (tc.equalsIgnoreCase("chunked")) {
  81                     chunkedContent = true;
  82                 } else {
  83                     throw new IOException("invalid content");
  84                 }
  85             } else {
  86                 chunkedContent = false;
  87             }
  88         }
  89         chunkedContentInitialized = true;
  90         return chunkedContent;
  91     }
  92 
  93     interface BodyParser extends Consumer<ByteBuffer> {
  94         void onSubscribe(AbstractSubscription sub);
  95     }
  96 
  97     // Returns a parser that will take care of parsing the received byte
  98     // buffers and forward them to the BodySubscriber.
  99     // When the parser is done, it will call onComplete.
 100     // If parsing was successful, the throwable parameter will be null.
 101     // Otherwise it will be the exception that occurred
 102     // Note: revisit: it might be better to use a CompletableFuture than
 103     //       a completion handler.
 104     BodyParser getBodyParser(Consumer<Throwable> onComplete)
 105         throws IOException {
 106         if (contentChunked()) {
 107             return new ChunkedBodyParser(onComplete);
 108         } else {
 109             return new FixedLengthBodyParser(contentLength, onComplete);
 110         }
 111     }
 112 
 113 
 114     static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
 115     class ChunkedBodyParser implements BodyParser {
 116         final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
 117         final Consumer<Throwable> onComplete;
 118         final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
 119         final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
 120 
 121         volatile Throwable closedExceptionally;
 122         volatile int partialChunklen = 0; // partially read chunk len
 123         volatile int chunklen = -1;  // number of bytes in chunk
 124         volatile int bytesremaining;  // number of bytes in chunk left to be read incl CRLF
 125         volatile boolean cr = false;  // tryReadChunkLength has found CR
 126         volatile int bytesToConsume;  // number of bytes that still need to be consumed before proceeding
 127         volatile ChunkState state = ChunkState.READING_LENGTH; // current state
 128         volatile AbstractSubscription sub;
 129         ChunkedBodyParser(Consumer<Throwable> onComplete) {
 130             this.onComplete = onComplete;
 131         }
 132 
 133         String dbgString() {
 134             return dbgTag;
 135         }
 136 
 137         @Override
 138         public void onSubscribe(AbstractSubscription sub) {
 139             debug.log(Level.DEBUG, () ->  "onSubscribe: "
 140                         + pusher.getClass().getName());
 141             pusher.onSubscribe(this.sub = sub);
 142         }
 143 
 144         @Override
 145         public void accept(ByteBuffer b) {
 146             if (closedExceptionally != null) {
 147                 debug.log(Level.DEBUG, () ->  "already closed: "
 148                             + closedExceptionally);
 149                 return;
 150             }
 151             boolean completed = false;
 152             try {
 153                 List<ByteBuffer> out = new ArrayList<>();
 154                 do {
 155                     if (tryPushOneHunk(b, out))  {
 156                         // We're done! (true if the final chunk was parsed).
 157                         if (!out.isEmpty()) {
 158                             // push what we have and complete
 159                             // only reduce demand if we actually push something.
 160                             // we would not have come here if there was no
 161                             // demand.
 162                             boolean hasDemand = sub.demand().tryDecrement();
 163                             assert hasDemand;
 164                             pusher.onNext(out);
 165                         }
 166                         debug.log(Level.DEBUG, () ->  "done!");
 167                         assert closedExceptionally == null;
 168                         assert state == ChunkState.DONE;
 169                         onFinished.run();
 170                         pusher.onComplete();
 171                         completed = true;
 172                         onComplete.accept(closedExceptionally); // should be null
 173                         break;
 174                     }
 175                     // the buffer may contain several hunks, and therefore
 176                     // we must loop while it's not exhausted.
 177                 } while (b.hasRemaining());
 178 
 179                 if (!completed && !out.isEmpty()) {
 180                     // push what we have.
 181                     // only reduce demand if we actually push something.
 182                     // we would not have come here if there was no
 183                     // demand.
 184                     boolean hasDemand = sub.demand().tryDecrement();
 185                     assert hasDemand;
 186                     pusher.onNext(out);
 187                 }
 188                 assert state == ChunkState.DONE || !b.hasRemaining();
 189             } catch(Throwable t) {
 190                 closedExceptionally = t;
 191                 if (!completed) onComplete.accept(t);
 192             }
 193         }
 194 
 195         // reads and returns chunklen. Position of chunkbuf is first byte
 196         // of chunk on return. chunklen includes the CR LF at end of chunk
 197         // returns -1 if needs more bytes
 198         private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
 199             assert state == ChunkState.READING_LENGTH;
 200             while (chunkbuf.hasRemaining()) {
 201                 int c = chunkbuf.get();
 202                 if (cr) {
 203                     if (c == LF) {
 204                         return partialChunklen;
 205                     } else {
 206                         throw new IOException("invalid chunk header");
 207                     }
 208                 }
 209                 if (c == CR) {
 210                     cr = true;
 211                 } else {
 212                     int digit = toDigit(c);
 213                     partialChunklen = partialChunklen * 16 + digit;
 214                 }
 215             }
 216             return -1;
 217         }
 218 
 219 
 220         // try to consume as many bytes as specified by bytesToConsume.
 221         // returns the number of bytes that still need to be consumed.
 222         // In practice this method is only called to consume one CRLF pair
 223         // with bytesToConsume set to 2, so it will only return 0 (if completed),
 224         // 1, or 2 (if chunkbuf doesn't have the 2 chars).
 225         private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
 226             int n = bytesToConsume;
 227             if (n > 0) {
 228                 int e = Math.min(chunkbuf.remaining(), n);
 229 
 230                 // verifies some assertions
 231                 // this methods is called only to consume CRLF
 232                 if (Utils.ASSERTIONSENABLED) {
 233                     assert n <= 2 && e <= 2;
 234                     ByteBuffer tmp = chunkbuf.slice();
 235                     // if n == 2 assert that we will first consume CR
 236                     assert (n == 2 && e > 0) ? tmp.get() == CR : true;
 237                     // if n == 1 || n == 2 && e == 2 assert that we then consume LF
 238                     assert (n == 1 || e == 2) ? tmp.get() == LF : true;
 239                 }
 240 
 241                 chunkbuf.position(chunkbuf.position() + e);
 242                 n -= e;
 243                 bytesToConsume = n;
 244             }
 245             assert n >= 0;
 246             return n;
 247         }
 248 
 249         /**
 250          * Returns a ByteBuffer containing chunk of data or a "hunk" of data
 251          * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
 252          * If the given chunk does not have enough data this method return
 253          * an empty ByteBuffer (READMORE).
 254          * If we encounter the final chunk (an empty chunk) this method
 255          * returns null.
 256          */
 257         ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
 258             int unfulfilled = bytesremaining;
 259             int toconsume = bytesToConsume;
 260             ChunkState st = state;
 261             if (st == ChunkState.READING_LENGTH && chunklen == -1) {
 262                 debug.log(Level.DEBUG, () ->  "Trying to read chunk len"
 263                         + " (remaining in buffer:"+chunk.remaining()+")");
 264                 int clen = chunklen = tryReadChunkLen(chunk);
 265                 if (clen == -1) return READMORE;
 266                 debug.log(Level.DEBUG, "Got chunk len %d", clen);
 267                 cr = false; partialChunklen = 0;
 268                 unfulfilled = bytesremaining =  clen;
 269                 if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
 270                 else st = state = ChunkState.READING_DATA; // read the data
 271             }
 272 
 273             if (toconsume > 0) {
 274                 debug.log(Level.DEBUG,
 275                         "Trying to consume bytes: %d (remaining in buffer: %s)",
 276                         toconsume, chunk.remaining());
 277                 if (tryConsumeBytes(chunk) > 0) {
 278                     return READMORE;
 279                 }
 280             }
 281 
 282             toconsume = bytesToConsume;
 283             assert toconsume == 0;
 284 
 285 
 286             if (st == ChunkState.READING_LENGTH) {
 287                 // we will come here only if chunklen was 0, after having
 288                 // consumed the trailing CRLF
 289                 int clen = chunklen;
 290                 assert clen == 0;
 291                 debug.log(Level.DEBUG, "No more chunks: %d", clen);
 292                 // the DONE state is not really needed but it helps with
 293                 // assertions...
 294                 state = ChunkState.DONE;
 295                 return null;
 296             }
 297 
 298             int clen = chunklen;
 299             assert clen > 0;
 300             assert st == ChunkState.READING_DATA;
 301 
 302             ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
 303             if (unfulfilled > 0) {
 304                 int bytesread = chunk.remaining();
 305                 debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
 306                           bytesread, unfulfilled);
 307 
 308                 int bytes2return = Math.min(bytesread, unfulfilled);
 309                 debug.log(Level.DEBUG,  "Returning chunk bytes: %d", bytes2return);
 310                 returnBuffer = Utils.slice(chunk, bytes2return);
 311                 unfulfilled = bytesremaining -= bytes2return;
 312                 if (unfulfilled == 0) bytesToConsume = 2;
 313             }
 314 
 315             assert unfulfilled >= 0;
 316 
 317             if (unfulfilled == 0) {
 318                 debug.log(Level.DEBUG,
 319                         "No more bytes to read - %d yet to consume.",
 320                         unfulfilled);
 321                 // check whether the trailing CRLF is consumed, try to
 322                 // consume it if not. If tryConsumeBytes needs more bytes
 323                 // then we will come back here later - skipping the block
 324                 // that reads data because remaining==0, and finding
 325                 // that the two bytes are now consumed.
 326                 if (tryConsumeBytes(chunk) == 0) {
 327                     // we're done for this chunk! reset all states and
 328                     // prepare to read the next chunk.
 329                     chunklen = -1;
 330                     partialChunklen = 0;
 331                     cr = false;
 332                     state = ChunkState.READING_LENGTH;
 333                     debug.log(Level.DEBUG, "Ready to read next chunk");
 334                 }
 335             }
 336             if (returnBuffer == READMORE) {
 337                 debug.log(Level.DEBUG, "Need more data");
 338             }
 339             return returnBuffer;
 340         }
 341 
 342 
 343         // Attempt to parse and push one hunk from the buffer.
 344         // Returns true if the final chunk was parsed.
 345         // Returns false if we need to push more chunks.
 346         private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
 347                 throws IOException {
 348             assert state != ChunkState.DONE;
 349             ByteBuffer b1 = tryReadOneHunk(b);
 350             if (b1 != null) {
 351                 //assert b1.hasRemaining() || b1 == READMORE;
 352                 if (b1.hasRemaining()) {
 353                     debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
 354                               b1.remaining());
 355                     out.add(b1);
 356                     debug.log(Level.DEBUG, "Chunk sent.");
 357                 }
 358                 return false; // we haven't parsed the final chunk yet.
 359             } else {
 360                 return true; // we're done! the final chunk was parsed.
 361             }
 362         }
 363 
 364         private int toDigit(int b) throws IOException {
 365             if (b >= 0x30 && b <= 0x39) {
 366                 return b - 0x30;
 367             }
 368             if (b >= 0x41 && b <= 0x46) {
 369                 return b - 0x41 + 10;
 370             }
 371             if (b >= 0x61 && b <= 0x66) {
 372                 return b - 0x61 + 10;
 373             }
 374             throw new IOException("Invalid chunk header byte " + b);
 375         }
 376 
 377     }
 378 
 379     class FixedLengthBodyParser implements BodyParser {
 380         final int contentLength;
 381         final Consumer<Throwable> onComplete;
 382         final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
 383         final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
 384         volatile int remaining;
 385         volatile Throwable closedExceptionally;
 386         volatile AbstractSubscription sub;
 387         FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
 388             this.contentLength = this.remaining = contentLength;
 389             this.onComplete = onComplete;
 390         }
 391 
 392         String dbgString() {
 393             return dbgTag;
 394         }
 395 
 396         @Override
 397         public void onSubscribe(AbstractSubscription sub) {
 398             debug.log(Level.DEBUG, () -> "length="
 399                         + contentLength +", onSubscribe: "
 400                         + pusher.getClass().getName());
 401             pusher.onSubscribe(this.sub = sub);
 402             try {
 403                 if (contentLength == 0) {
 404                     pusher.onComplete();
 405                     onComplete.accept(null);
 406                 }
 407             } catch (Throwable t) {
 408                 closedExceptionally = t;
 409                 try {
 410                     pusher.onError(t);
 411                 } finally {
 412                     onComplete.accept(t);
 413                 }
 414             }
 415         }
 416 
 417         @Override
 418         public void accept(ByteBuffer b) {
 419             if (closedExceptionally != null) {
 420                 debug.log(Level.DEBUG, () -> "already closed: "
 421                             + closedExceptionally);
 422                 return;
 423             }
 424             boolean completed = false;
 425             try {
 426                 int unfulfilled = remaining;
 427                 debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
 428                         b.remaining(), unfulfilled, contentLength);
 429                 assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
 430 
 431                 if (unfulfilled == 0 && contentLength > 0) return;
 432 
 433                 if (b.hasRemaining() && unfulfilled > 0) {
 434                     // only reduce demand if we actually push something.
 435                     // we would not have come here if there was no
 436                     // demand.
 437                     boolean hasDemand = sub.demand().tryDecrement();
 438                     assert hasDemand;
 439                     int amount = Math.min(b.remaining(), unfulfilled);
 440                     unfulfilled = remaining -= amount;
 441                     ByteBuffer buffer = Utils.slice(b, amount);
 442                     pusher.onNext(List.of(buffer));
 443                 }
 444                 if (unfulfilled == 0) {
 445                     // We're done! All data has been received.
 446                     assert closedExceptionally == null;
 447                     onFinished.run();
 448                     pusher.onComplete();
 449                     completed = true;
 450                     onComplete.accept(closedExceptionally); // should be null
 451                 } else {
 452                     assert b.remaining() == 0;
 453                 }
 454             } catch (Throwable t) {
 455                 debug.log(Level.DEBUG, "Unexpected exception", t);
 456                 closedExceptionally = t;
 457                 if (!completed) {
 458                     onComplete.accept(t);
 459                 }
 460             }
 461         }
 462     }
 463 }