1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.IOException;
  29 import java.nio.ByteBuffer;
  30 import java.util.ArrayList;
  31 import java.util.Collections;
  32 import java.util.List;
  33 import java.util.function.Consumer;
  34 import java.net.http.HttpHeaders;
  35 import java.net.http.HttpResponse;
  36 import jdk.internal.net.http.common.Logger;
  37 import jdk.internal.net.http.common.Utils;
  38 import static java.lang.String.format;
  39 
  40 /**
  41  * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
  42  *
  43  * Call pushBody() to read the body (blocking). Data and errors are provided
  44  * to given Consumers. After final buffer delivered, empty optional delivered
  45  */
  46 class ResponseContent {
  47 
  48     final HttpResponse.BodySubscriber<?> pusher;
  49     final long contentLength;
  50     final HttpHeaders headers;
  51     // this needs to run before we complete the body
  52     // so that connection can be returned to pool
  53     private final Runnable onFinished;
  54     private final String dbgTag;
  55 
  56     ResponseContent(HttpConnection connection,
  57                     long contentLength,
  58                     HttpHeaders h,
  59                     HttpResponse.BodySubscriber<?> userSubscriber,
  60                     Runnable onFinished)
  61     {
  62         this.pusher = userSubscriber;
  63         this.contentLength = contentLength;
  64         this.headers = h;
  65         this.onFinished = onFinished;
  66         this.dbgTag = connection.dbgString() + "/ResponseContent";
  67     }
  68 
  69     static final int LF = 10;
  70     static final int CR = 13;
  71 
  72     private boolean chunkedContent, chunkedContentInitialized;
  73 
  74     boolean contentChunked() throws IOException {
  75         if (chunkedContentInitialized) {
  76             return chunkedContent;
  77         }
  78         if (contentLength == -2) {
  79             // HTTP/1.0 content
  80             chunkedContentInitialized = true;
  81             chunkedContent = false;
  82             return chunkedContent;
  83         }
  84         if (contentLength == -1) {
  85             String tc = headers.firstValue("Transfer-Encoding")
  86                                .orElse("");
  87             if (!tc.equals("")) {
  88                 if (tc.equalsIgnoreCase("chunked")) {
  89                     chunkedContent = true;
  90                 } else {
  91                     throw new IOException("invalid content");
  92                 }
  93             } else {
  94                 chunkedContent = false;
  95             }
  96         }
  97         chunkedContentInitialized = true;
  98         return chunkedContent;
  99     }
 100 
 101     interface BodyParser extends Consumer<ByteBuffer> {
 102         void onSubscribe(AbstractSubscription sub);
 103         // A current-state message suitable for inclusion in an exception
 104         // detail message.
 105         String currentStateMessage();
 106     }
 107 
 108     // Returns a parser that will take care of parsing the received byte
 109     // buffers and forward them to the BodySubscriber.
 110     // When the parser is done, it will call onComplete.
 111     // If parsing was successful, the throwable parameter will be null.
 112     // Otherwise it will be the exception that occurred
 113     // Note: revisit: it might be better to use a CompletableFuture than
 114     //       a completion handler.
 115     BodyParser getBodyParser(Consumer<Throwable> onComplete)
 116         throws IOException {
 117         if (contentChunked()) {
 118             return new ChunkedBodyParser(onComplete);
 119         } else {
 120             return contentLength == -2
 121                 ? new UnknownLengthBodyParser(onComplete)
 122                 : new FixedLengthBodyParser(contentLength, onComplete);
 123         }
 124     }
 125 
 126 
 127     static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
 128     class ChunkedBodyParser implements BodyParser {
 129         final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
 130         final Consumer<Throwable> onComplete;
 131         final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
 132         final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
 133 
 134         volatile Throwable closedExceptionally;
 135         volatile int partialChunklen = 0; // partially read chunk len
 136         volatile int chunklen = -1;  // number of bytes in chunk
 137         volatile int bytesremaining;  // number of bytes in chunk left to be read incl CRLF
 138         volatile boolean cr = false;  // tryReadChunkLength has found CR
 139         volatile int bytesToConsume;  // number of bytes that still need to be consumed before proceeding
 140         volatile ChunkState state = ChunkState.READING_LENGTH; // current state
 141         volatile AbstractSubscription sub;
 142         ChunkedBodyParser(Consumer<Throwable> onComplete) {
 143             this.onComplete = onComplete;
 144         }
 145 
 146         String dbgString() {
 147             return dbgTag;
 148         }
 149 
 150         @Override
 151         public void onSubscribe(AbstractSubscription sub) {
 152             if (debug.on())
 153                 debug.log("onSubscribe: "  + pusher.getClass().getName());
 154             pusher.onSubscribe(this.sub = sub);
 155         }
 156 
 157         @Override
 158         public String currentStateMessage() {
 159             return format("chunked transfer encoding, state: %s", state);
 160         }
 161 
 162         @Override
 163         public void accept(ByteBuffer b) {
 164             if (closedExceptionally != null) {
 165                 if (debug.on())
 166                     debug.log("already closed: " + closedExceptionally);
 167                 return;
 168             }
 169             boolean completed = false;
 170             try {
 171                 List<ByteBuffer> out = new ArrayList<>();
 172                 do {
 173                     if (tryPushOneHunk(b, out))  {
 174                         // We're done! (true if the final chunk was parsed).
 175                         if (!out.isEmpty()) {
 176                             // push what we have and complete
 177                             // only reduce demand if we actually push something.
 178                             // we would not have come here if there was no
 179                             // demand.
 180                             boolean hasDemand = sub.demand().tryDecrement();
 181                             assert hasDemand;
 182                             pusher.onNext(Collections.unmodifiableList(out));
 183                             if (debug.on()) debug.log("Chunks sent");
 184                         }
 185                         if (debug.on()) debug.log("done!");
 186                         assert closedExceptionally == null;
 187                         assert state == ChunkState.DONE;
 188                         onFinished.run();
 189                         pusher.onComplete();
 190                         if (debug.on()) debug.log("subscriber completed");
 191                         completed = true;
 192                         onComplete.accept(closedExceptionally); // should be null
 193                         break;
 194                     }
 195                     // the buffer may contain several hunks, and therefore
 196                     // we must loop while it's not exhausted.
 197                 } while (b.hasRemaining());
 198 
 199                 if (!completed && !out.isEmpty()) {
 200                     // push what we have.
 201                     // only reduce demand if we actually push something.
 202                     // we would not have come here if there was no
 203                     // demand.
 204                     boolean hasDemand = sub.demand().tryDecrement();
 205                     assert hasDemand;
 206                     pusher.onNext(Collections.unmodifiableList(out));
 207                     if (debug.on()) debug.log("Chunk sent");
 208                 }
 209                 assert state == ChunkState.DONE || !b.hasRemaining();
 210             } catch(Throwable t) {
 211                 if (debug.on())
 212                     debug.log("Error while processing buffer: %s", (Object)t );
 213                 closedExceptionally = t;
 214                 if (!completed) onComplete.accept(t);
 215             }
 216         }
 217 
 218         // reads and returns chunklen. Position of chunkbuf is first byte
 219         // of chunk on return. chunklen includes the CR LF at end of chunk
 220         // returns -1 if needs more bytes
 221         private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
 222             assert state == ChunkState.READING_LENGTH;
 223             while (chunkbuf.hasRemaining()) {
 224                 int c = chunkbuf.get();
 225                 if (cr) {
 226                     if (c == LF) {
 227                         return partialChunklen;
 228                     } else {
 229                         throw new IOException("invalid chunk header");
 230                     }
 231                 }
 232                 if (c == CR) {
 233                     cr = true;
 234                 } else {
 235                     int digit = toDigit(c);
 236                     partialChunklen = partialChunklen * 16 + digit;
 237                 }
 238             }
 239             return -1;
 240         }
 241 
 242 
 243         // try to consume as many bytes as specified by bytesToConsume.
 244         // returns the number of bytes that still need to be consumed.
 245         // In practice this method is only called to consume one CRLF pair
 246         // with bytesToConsume set to 2, so it will only return 0 (if completed),
 247         // 1, or 2 (if chunkbuf doesn't have the 2 chars).
 248         private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
 249             int n = bytesToConsume;
 250             if (n > 0) {
 251                 int e = Math.min(chunkbuf.remaining(), n);
 252 
 253                 // verifies some assertions
 254                 // this methods is called only to consume CRLF
 255                 if (Utils.ASSERTIONSENABLED) {
 256                     assert n <= 2 && e <= 2;
 257                     ByteBuffer tmp = chunkbuf.slice();
 258                     // if n == 2 assert that we will first consume CR
 259                     assert (n == 2 && e > 0) ? tmp.get() == CR : true;
 260                     // if n == 1 || n == 2 && e == 2 assert that we then consume LF
 261                     assert (n == 1 || e == 2) ? tmp.get() == LF : true;
 262                 }
 263 
 264                 chunkbuf.position(chunkbuf.position() + e);
 265                 n -= e;
 266                 bytesToConsume = n;
 267             }
 268             assert n >= 0;
 269             return n;
 270         }
 271 
 272         /**
 273          * Returns a ByteBuffer containing chunk of data or a "hunk" of data
 274          * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
 275          * If the given chunk does not have enough data this method return
 276          * an empty ByteBuffer (READMORE).
 277          * If we encounter the final chunk (an empty chunk) this method
 278          * returns null.
 279          */
 280         ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
 281             int unfulfilled = bytesremaining;
 282             int toconsume = bytesToConsume;
 283             ChunkState st = state;
 284             if (st == ChunkState.READING_LENGTH && chunklen == -1) {
 285                 if (debug.on()) debug.log(() ->  "Trying to read chunk len"
 286                         + " (remaining in buffer:"+chunk.remaining()+")");
 287                 int clen = chunklen = tryReadChunkLen(chunk);
 288                 if (clen == -1) return READMORE;
 289                 if (debug.on()) debug.log("Got chunk len %d", clen);
 290                 cr = false; partialChunklen = 0;
 291                 unfulfilled = bytesremaining =  clen;
 292                 if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
 293                 else st = state = ChunkState.READING_DATA; // read the data
 294             }
 295 
 296             if (toconsume > 0) {
 297                 if (debug.on())
 298                     debug.log("Trying to consume bytes: %d (remaining in buffer: %s)",
 299                               toconsume, chunk.remaining());
 300                 if (tryConsumeBytes(chunk) > 0) {
 301                     return READMORE;
 302                 }
 303             }
 304 
 305             toconsume = bytesToConsume;
 306             assert toconsume == 0;
 307 
 308 
 309             if (st == ChunkState.READING_LENGTH) {
 310                 // we will come here only if chunklen was 0, after having
 311                 // consumed the trailing CRLF
 312                 int clen = chunklen;
 313                 assert clen == 0;
 314                 if (debug.on()) debug.log("No more chunks: %d", clen);
 315                 // the DONE state is not really needed but it helps with
 316                 // assertions...
 317                 state = ChunkState.DONE;
 318                 return null;
 319             }
 320 
 321             int clen = chunklen;
 322             assert clen > 0;
 323             assert st == ChunkState.READING_DATA;
 324 
 325             ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
 326             if (unfulfilled > 0) {
 327                 int bytesread = chunk.remaining();
 328                 if (debug.on())
 329                     debug.log("Reading chunk: available %d, needed %d",
 330                               bytesread, unfulfilled);
 331 
 332                 int bytes2return = Math.min(bytesread, unfulfilled);
 333                 if (debug.on())
 334                     debug.log( "Returning chunk bytes: %d", bytes2return);
 335                 returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return).asReadOnlyBuffer();
 336                 unfulfilled = bytesremaining -= bytes2return;
 337                 if (unfulfilled == 0) bytesToConsume = 2;
 338             }
 339 
 340             assert unfulfilled >= 0;
 341 
 342             if (unfulfilled == 0) {
 343                 if (debug.on())
 344                     debug.log("No more bytes to read - %d yet to consume.",
 345                               unfulfilled);
 346                 // check whether the trailing CRLF is consumed, try to
 347                 // consume it if not. If tryConsumeBytes needs more bytes
 348                 // then we will come back here later - skipping the block
 349                 // that reads data because remaining==0, and finding
 350                 // that the two bytes are now consumed.
 351                 if (tryConsumeBytes(chunk) == 0) {
 352                     // we're done for this chunk! reset all states and
 353                     // prepare to read the next chunk.
 354                     chunklen = -1;
 355                     partialChunklen = 0;
 356                     cr = false;
 357                     state = ChunkState.READING_LENGTH;
 358                     if (debug.on()) debug.log("Ready to read next chunk");
 359                 }
 360             }
 361             if (returnBuffer == READMORE) {
 362                 if (debug.on()) debug.log("Need more data");
 363             }
 364             return returnBuffer;
 365         }
 366 
 367 
 368         // Attempt to parse and push one hunk from the buffer.
 369         // Returns true if the final chunk was parsed.
 370         // Returns false if we need to push more chunks.
 371         private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
 372                 throws IOException {
 373             assert state != ChunkState.DONE;
 374             ByteBuffer b1 = tryReadOneHunk(b);
 375             if (b1 != null) {
 376                 //assert b1.hasRemaining() || b1 == READMORE;
 377                 if (b1.hasRemaining()) {
 378                     if (debug.on())
 379                         debug.log("Sending chunk to consumer (%d)", b1.remaining());
 380                     out.add(b1);
 381                 }
 382                 return false; // we haven't parsed the final chunk yet.
 383             } else {
 384                 return true; // we're done! the final chunk was parsed.
 385             }
 386         }
 387 
 388         private int toDigit(int b) throws IOException {
 389             if (b >= 0x30 && b <= 0x39) {
 390                 return b - 0x30;
 391             }
 392             if (b >= 0x41 && b <= 0x46) {
 393                 return b - 0x41 + 10;
 394             }
 395             if (b >= 0x61 && b <= 0x66) {
 396                 return b - 0x61 + 10;
 397             }
 398             throw new IOException("Invalid chunk header byte " + b);
 399         }
 400 
 401     }
 402 
 403     class UnknownLengthBodyParser implements BodyParser {
 404         final Consumer<Throwable> onComplete;
 405         final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
 406         final String dbgTag = ResponseContent.this.dbgTag + "/UnknownLengthBodyParser";
 407         volatile Throwable closedExceptionally;
 408         volatile AbstractSubscription sub;
 409         volatile int breceived = 0;
 410 
 411         UnknownLengthBodyParser(Consumer<Throwable> onComplete) {
 412             this.onComplete = onComplete;
 413         }
 414 
 415         String dbgString() {
 416             return dbgTag;
 417         }
 418 
 419         @Override
 420         public void onSubscribe(AbstractSubscription sub) {
 421             if (debug.on())
 422                 debug.log("onSubscribe: " + pusher.getClass().getName());
 423             pusher.onSubscribe(this.sub = sub);
 424         }
 425 
 426         @Override
 427         public String currentStateMessage() {
 428             return format("http1_0 content, bytes received: %d", breceived);
 429         }
 430 
 431         @Override
 432         public void accept(ByteBuffer b) {
 433             if (closedExceptionally != null) {
 434                 if (debug.on())
 435                     debug.log("already closed: " + closedExceptionally);
 436                 return;
 437             }
 438             boolean completed = false;
 439             try {
 440                 if (debug.on())
 441                     debug.log("Parser got %d bytes ", b.remaining());
 442 
 443                 if (b.hasRemaining()) {
 444                     // only reduce demand if we actually push something.
 445                     // we would not have come here if there was no
 446                     // demand.
 447                     boolean hasDemand = sub.demand().tryDecrement();
 448                     assert hasDemand;
 449                     breceived += b.remaining();
 450                     pusher.onNext(List.of(b.asReadOnlyBuffer()));
 451                 }
 452             } catch (Throwable t) {
 453                 if (debug.on()) debug.log("Unexpected exception", t);
 454                 closedExceptionally = t;
 455                 if (!completed) {
 456                     onComplete.accept(t);
 457                 }
 458             }
 459         }
 460 
 461         /**
 462          * Must be called externally when connection has closed
 463          * and therefore no more bytes can be read
 464          */
 465         public void complete() {
 466             // We're done! All data has been received.
 467             if (debug.on())
 468                 debug.log("Parser got all expected bytes: completing");
 469             assert closedExceptionally == null;
 470             onFinished.run();
 471             pusher.onComplete();
 472             onComplete.accept(closedExceptionally); // should be null
 473         }
 474     }
 475 
 476     class FixedLengthBodyParser implements BodyParser {
 477         final long contentLength;
 478         final Consumer<Throwable> onComplete;
 479         final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
 480         final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
 481         volatile long remaining;
 482         volatile Throwable closedExceptionally;
 483         volatile AbstractSubscription sub;
 484         FixedLengthBodyParser(long contentLength, Consumer<Throwable> onComplete) {
 485             this.contentLength = this.remaining = contentLength;
 486             this.onComplete = onComplete;
 487         }
 488 
 489         String dbgString() {
 490             return dbgTag;
 491         }
 492 
 493         @Override
 494         public void onSubscribe(AbstractSubscription sub) {
 495             if (debug.on())
 496                 debug.log("length=" + contentLength +", onSubscribe: "
 497                            + pusher.getClass().getName());
 498             pusher.onSubscribe(this.sub = sub);
 499             try {
 500                 if (contentLength == 0) {
 501                     onFinished.run();
 502                     pusher.onComplete();
 503                     onComplete.accept(null);
 504                 }
 505             } catch (Throwable t) {
 506                 closedExceptionally = t;
 507                 try {
 508                     pusher.onError(t);
 509                 } finally {
 510                     onComplete.accept(t);
 511                 }
 512             }
 513         }
 514 
 515         @Override
 516         public String currentStateMessage() {
 517             return format("fixed content-length: %d, bytes received: %d",
 518                           contentLength, contentLength - remaining);
 519         }
 520 
 521         @Override
 522         public void accept(ByteBuffer b) {
 523             if (closedExceptionally != null) {
 524                 if (debug.on())
 525                     debug.log("already closed: " + closedExceptionally);
 526                 return;
 527             }
 528             boolean completed = false;
 529             try {
 530                 long unfulfilled = remaining;
 531                 if (debug.on())
 532                     debug.log("Parser got %d bytes (%d remaining / %d)",
 533                               b.remaining(), unfulfilled, contentLength);
 534                 assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
 535 
 536                 if (unfulfilled == 0 && contentLength > 0) return;
 537 
 538                 if (b.hasRemaining() && unfulfilled > 0) {
 539                     // only reduce demand if we actually push something.
 540                     // we would not have come here if there was no
 541                     // demand.
 542                     boolean hasDemand = sub.demand().tryDecrement();
 543                     assert hasDemand;
 544                     int amount = (int)Math.min(b.remaining(), unfulfilled); // safe cast
 545                     unfulfilled = remaining -= amount;
 546                     ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount);
 547                     pusher.onNext(List.of(buffer.asReadOnlyBuffer()));
 548                 }
 549                 if (unfulfilled == 0) {
 550                     // We're done! All data has been received.
 551                     if (debug.on())
 552                         debug.log("Parser got all expected bytes: completing");
 553                     assert closedExceptionally == null;
 554                     onFinished.run();
 555                     pusher.onComplete();
 556                     completed = true;
 557                     onComplete.accept(closedExceptionally); // should be null
 558                 } else {
 559                     assert b.remaining() == 0;
 560                 }
 561             } catch (Throwable t) {
 562                 if (debug.on()) debug.log("Unexpected exception", t);
 563                 closedExceptionally = t;
 564                 if (!completed) {
 565                     onComplete.accept(t);
 566                 }
 567             }
 568         }
 569     }
 570 }