1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http; 27 28 import java.io.IOException; 29 import java.nio.ByteBuffer; 30 import java.util.ArrayList; 31 import java.util.Collections; 32 import java.util.List; 33 import java.util.function.Consumer; 34 import java.net.http.HttpHeaders; 35 import java.net.http.HttpResponse; 36 import jdk.internal.net.http.common.Logger; 37 import jdk.internal.net.http.common.Utils; 38 import static java.lang.String.format; 39 40 /** 41 * Implements chunked/fixed transfer encodings of HTTP/1.1 responses. 42 * 43 * Call pushBody() to read the body (blocking). Data and errors are provided 44 * to given Consumers. After final buffer delivered, empty optional delivered 45 */ 46 class ResponseContent { 47 48 final HttpResponse.BodySubscriber<?> pusher; 49 final long contentLength; 50 final HttpHeaders headers; 51 // this needs to run before we complete the body 52 // so that connection can be returned to pool 53 private final Runnable onFinished; 54 private final String dbgTag; 55 56 ResponseContent(HttpConnection connection, 57 long contentLength, 58 HttpHeaders h, 59 HttpResponse.BodySubscriber<?> userSubscriber, 60 Runnable onFinished) 61 { 62 this.pusher = userSubscriber; 63 this.contentLength = contentLength; 64 this.headers = h; 65 this.onFinished = onFinished; 66 this.dbgTag = connection.dbgString() + "/ResponseContent"; 67 } 68 69 static final int LF = 10; 70 static final int CR = 13; 71 72 private boolean chunkedContent, chunkedContentInitialized; 73 74 boolean contentChunked() throws IOException { 75 if (chunkedContentInitialized) { 76 return chunkedContent; 77 } 78 if (contentLength == -2) { 79 // HTTP/1.0 content 80 chunkedContentInitialized = true; 81 chunkedContent = false; 82 return chunkedContent; 83 } 84 if (contentLength == -1) { 85 String tc = headers.firstValue("Transfer-Encoding") 86 .orElse(""); 87 if (!tc.equals("")) { 88 if (tc.equalsIgnoreCase("chunked")) { 89 chunkedContent = true; 90 } else { 91 throw new IOException("invalid content"); 92 } 93 } else { 94 chunkedContent = false; 95 } 96 } 97 chunkedContentInitialized = true; 98 return chunkedContent; 99 } 100 101 interface BodyParser extends Consumer<ByteBuffer> { 102 void onSubscribe(AbstractSubscription sub); 103 // A current-state message suitable for inclusion in an exception 104 // detail message. 105 String currentStateMessage(); 106 } 107 108 // Returns a parser that will take care of parsing the received byte 109 // buffers and forward them to the BodySubscriber. 110 // When the parser is done, it will call onComplete. 111 // If parsing was successful, the throwable parameter will be null. 112 // Otherwise it will be the exception that occurred 113 // Note: revisit: it might be better to use a CompletableFuture than 114 // a completion handler. 115 BodyParser getBodyParser(Consumer<Throwable> onComplete) 116 throws IOException { 117 if (contentChunked()) { 118 return new ChunkedBodyParser(onComplete); 119 } else { 120 return contentLength == -2 121 ? new UnknownLengthBodyParser(onComplete) 122 : new FixedLengthBodyParser(contentLength, onComplete); 123 } 124 } 125 126 127 static enum ChunkState {READING_LENGTH, READING_DATA, DONE} 128 class ChunkedBodyParser implements BodyParser { 129 final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER; 130 final Consumer<Throwable> onComplete; 131 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 132 final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser"; 133 134 volatile Throwable closedExceptionally; 135 volatile int partialChunklen = 0; // partially read chunk len 136 volatile int chunklen = -1; // number of bytes in chunk 137 volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF 138 volatile boolean cr = false; // tryReadChunkLength has found CR 139 volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding 140 volatile ChunkState state = ChunkState.READING_LENGTH; // current state 141 volatile AbstractSubscription sub; 142 ChunkedBodyParser(Consumer<Throwable> onComplete) { 143 this.onComplete = onComplete; 144 } 145 146 String dbgString() { 147 return dbgTag; 148 } 149 150 @Override 151 public void onSubscribe(AbstractSubscription sub) { 152 if (debug.on()) 153 debug.log("onSubscribe: " + pusher.getClass().getName()); 154 pusher.onSubscribe(this.sub = sub); 155 } 156 157 @Override 158 public String currentStateMessage() { 159 return format("chunked transfer encoding, state: %s", state); 160 } 161 162 @Override 163 public void accept(ByteBuffer b) { 164 if (closedExceptionally != null) { 165 if (debug.on()) 166 debug.log("already closed: " + closedExceptionally); 167 return; 168 } 169 boolean completed = false; 170 try { 171 List<ByteBuffer> out = new ArrayList<>(); 172 do { 173 if (tryPushOneHunk(b, out)) { 174 // We're done! (true if the final chunk was parsed). 175 if (!out.isEmpty()) { 176 // push what we have and complete 177 // only reduce demand if we actually push something. 178 // we would not have come here if there was no 179 // demand. 180 boolean hasDemand = sub.demand().tryDecrement(); 181 assert hasDemand; 182 pusher.onNext(Collections.unmodifiableList(out)); 183 if (debug.on()) debug.log("Chunks sent"); 184 } 185 if (debug.on()) debug.log("done!"); 186 assert closedExceptionally == null; 187 assert state == ChunkState.DONE; 188 onFinished.run(); 189 pusher.onComplete(); 190 if (debug.on()) debug.log("subscriber completed"); 191 completed = true; 192 onComplete.accept(closedExceptionally); // should be null 193 break; 194 } 195 // the buffer may contain several hunks, and therefore 196 // we must loop while it's not exhausted. 197 } while (b.hasRemaining()); 198 199 if (!completed && !out.isEmpty()) { 200 // push what we have. 201 // only reduce demand if we actually push something. 202 // we would not have come here if there was no 203 // demand. 204 boolean hasDemand = sub.demand().tryDecrement(); 205 assert hasDemand; 206 pusher.onNext(Collections.unmodifiableList(out)); 207 if (debug.on()) debug.log("Chunk sent"); 208 } 209 assert state == ChunkState.DONE || !b.hasRemaining(); 210 } catch(Throwable t) { 211 if (debug.on()) 212 debug.log("Error while processing buffer: %s", (Object)t ); 213 closedExceptionally = t; 214 if (!completed) onComplete.accept(t); 215 } 216 } 217 218 // reads and returns chunklen. Position of chunkbuf is first byte 219 // of chunk on return. chunklen includes the CR LF at end of chunk 220 // returns -1 if needs more bytes 221 private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException { 222 assert state == ChunkState.READING_LENGTH; 223 while (chunkbuf.hasRemaining()) { 224 int c = chunkbuf.get(); 225 if (cr) { 226 if (c == LF) { 227 return partialChunklen; 228 } else { 229 throw new IOException("invalid chunk header"); 230 } 231 } 232 if (c == CR) { 233 cr = true; 234 } else { 235 int digit = toDigit(c); 236 partialChunklen = partialChunklen * 16 + digit; 237 } 238 } 239 return -1; 240 } 241 242 243 // try to consume as many bytes as specified by bytesToConsume. 244 // returns the number of bytes that still need to be consumed. 245 // In practice this method is only called to consume one CRLF pair 246 // with bytesToConsume set to 2, so it will only return 0 (if completed), 247 // 1, or 2 (if chunkbuf doesn't have the 2 chars). 248 private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException { 249 int n = bytesToConsume; 250 if (n > 0) { 251 int e = Math.min(chunkbuf.remaining(), n); 252 253 // verifies some assertions 254 // this methods is called only to consume CRLF 255 if (Utils.ASSERTIONSENABLED) { 256 assert n <= 2 && e <= 2; 257 ByteBuffer tmp = chunkbuf.slice(); 258 // if n == 2 assert that we will first consume CR 259 assert (n == 2 && e > 0) ? tmp.get() == CR : true; 260 // if n == 1 || n == 2 && e == 2 assert that we then consume LF 261 assert (n == 1 || e == 2) ? tmp.get() == LF : true; 262 } 263 264 chunkbuf.position(chunkbuf.position() + e); 265 n -= e; 266 bytesToConsume = n; 267 } 268 assert n >= 0; 269 return n; 270 } 271 272 /** 273 * Returns a ByteBuffer containing chunk of data or a "hunk" of data 274 * (a chunk of a chunk if the chunk size is larger than our ByteBuffers). 275 * If the given chunk does not have enough data this method return 276 * an empty ByteBuffer (READMORE). 277 * If we encounter the final chunk (an empty chunk) this method 278 * returns null. 279 */ 280 ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException { 281 int unfulfilled = bytesremaining; 282 int toconsume = bytesToConsume; 283 ChunkState st = state; 284 if (st == ChunkState.READING_LENGTH && chunklen == -1) { 285 if (debug.on()) debug.log(() -> "Trying to read chunk len" 286 + " (remaining in buffer:"+chunk.remaining()+")"); 287 int clen = chunklen = tryReadChunkLen(chunk); 288 if (clen == -1) return READMORE; 289 if (debug.on()) debug.log("Got chunk len %d", clen); 290 cr = false; partialChunklen = 0; 291 unfulfilled = bytesremaining = clen; 292 if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk 293 else st = state = ChunkState.READING_DATA; // read the data 294 } 295 296 if (toconsume > 0) { 297 if (debug.on()) 298 debug.log("Trying to consume bytes: %d (remaining in buffer: %s)", 299 toconsume, chunk.remaining()); 300 if (tryConsumeBytes(chunk) > 0) { 301 return READMORE; 302 } 303 } 304 305 toconsume = bytesToConsume; 306 assert toconsume == 0; 307 308 309 if (st == ChunkState.READING_LENGTH) { 310 // we will come here only if chunklen was 0, after having 311 // consumed the trailing CRLF 312 int clen = chunklen; 313 assert clen == 0; 314 if (debug.on()) debug.log("No more chunks: %d", clen); 315 // the DONE state is not really needed but it helps with 316 // assertions... 317 state = ChunkState.DONE; 318 return null; 319 } 320 321 int clen = chunklen; 322 assert clen > 0; 323 assert st == ChunkState.READING_DATA; 324 325 ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk 326 if (unfulfilled > 0) { 327 int bytesread = chunk.remaining(); 328 if (debug.on()) 329 debug.log("Reading chunk: available %d, needed %d", 330 bytesread, unfulfilled); 331 332 int bytes2return = Math.min(bytesread, unfulfilled); 333 if (debug.on()) 334 debug.log( "Returning chunk bytes: %d", bytes2return); 335 returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return).asReadOnlyBuffer(); 336 unfulfilled = bytesremaining -= bytes2return; 337 if (unfulfilled == 0) bytesToConsume = 2; 338 } 339 340 assert unfulfilled >= 0; 341 342 if (unfulfilled == 0) { 343 if (debug.on()) 344 debug.log("No more bytes to read - %d yet to consume.", 345 unfulfilled); 346 // check whether the trailing CRLF is consumed, try to 347 // consume it if not. If tryConsumeBytes needs more bytes 348 // then we will come back here later - skipping the block 349 // that reads data because remaining==0, and finding 350 // that the two bytes are now consumed. 351 if (tryConsumeBytes(chunk) == 0) { 352 // we're done for this chunk! reset all states and 353 // prepare to read the next chunk. 354 chunklen = -1; 355 partialChunklen = 0; 356 cr = false; 357 state = ChunkState.READING_LENGTH; 358 if (debug.on()) debug.log("Ready to read next chunk"); 359 } 360 } 361 if (returnBuffer == READMORE) { 362 if (debug.on()) debug.log("Need more data"); 363 } 364 return returnBuffer; 365 } 366 367 368 // Attempt to parse and push one hunk from the buffer. 369 // Returns true if the final chunk was parsed. 370 // Returns false if we need to push more chunks. 371 private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out) 372 throws IOException { 373 assert state != ChunkState.DONE; 374 ByteBuffer b1 = tryReadOneHunk(b); 375 if (b1 != null) { 376 //assert b1.hasRemaining() || b1 == READMORE; 377 if (b1.hasRemaining()) { 378 if (debug.on()) 379 debug.log("Sending chunk to consumer (%d)", b1.remaining()); 380 out.add(b1); 381 } 382 return false; // we haven't parsed the final chunk yet. 383 } else { 384 return true; // we're done! the final chunk was parsed. 385 } 386 } 387 388 private int toDigit(int b) throws IOException { 389 if (b >= 0x30 && b <= 0x39) { 390 return b - 0x30; 391 } 392 if (b >= 0x41 && b <= 0x46) { 393 return b - 0x41 + 10; 394 } 395 if (b >= 0x61 && b <= 0x66) { 396 return b - 0x61 + 10; 397 } 398 throw new IOException("Invalid chunk header byte " + b); 399 } 400 401 } 402 403 class UnknownLengthBodyParser implements BodyParser { 404 final Consumer<Throwable> onComplete; 405 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 406 final String dbgTag = ResponseContent.this.dbgTag + "/UnknownLengthBodyParser"; 407 volatile Throwable closedExceptionally; 408 volatile AbstractSubscription sub; 409 volatile int breceived = 0; 410 411 UnknownLengthBodyParser(Consumer<Throwable> onComplete) { 412 this.onComplete = onComplete; 413 } 414 415 String dbgString() { 416 return dbgTag; 417 } 418 419 @Override 420 public void onSubscribe(AbstractSubscription sub) { 421 if (debug.on()) 422 debug.log("onSubscribe: " + pusher.getClass().getName()); 423 pusher.onSubscribe(this.sub = sub); 424 } 425 426 @Override 427 public String currentStateMessage() { 428 return format("http1_0 content, bytes received: %d", breceived); 429 } 430 431 @Override 432 public void accept(ByteBuffer b) { 433 if (closedExceptionally != null) { 434 if (debug.on()) 435 debug.log("already closed: " + closedExceptionally); 436 return; 437 } 438 boolean completed = false; 439 try { 440 if (debug.on()) 441 debug.log("Parser got %d bytes ", b.remaining()); 442 443 if (b.hasRemaining()) { 444 // only reduce demand if we actually push something. 445 // we would not have come here if there was no 446 // demand. 447 boolean hasDemand = sub.demand().tryDecrement(); 448 assert hasDemand; 449 breceived += b.remaining(); 450 pusher.onNext(List.of(b.asReadOnlyBuffer())); 451 } 452 } catch (Throwable t) { 453 if (debug.on()) debug.log("Unexpected exception", t); 454 closedExceptionally = t; 455 if (!completed) { 456 onComplete.accept(t); 457 } 458 } 459 } 460 461 /** 462 * Must be called externally when connection has closed 463 * and therefore no more bytes can be read 464 */ 465 public void complete() { 466 // We're done! All data has been received. 467 if (debug.on()) 468 debug.log("Parser got all expected bytes: completing"); 469 assert closedExceptionally == null; 470 onFinished.run(); 471 pusher.onComplete(); 472 onComplete.accept(closedExceptionally); // should be null 473 } 474 } 475 476 class FixedLengthBodyParser implements BodyParser { 477 final long contentLength; 478 final Consumer<Throwable> onComplete; 479 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 480 final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser"; 481 volatile long remaining; 482 volatile Throwable closedExceptionally; 483 volatile AbstractSubscription sub; 484 FixedLengthBodyParser(long contentLength, Consumer<Throwable> onComplete) { 485 this.contentLength = this.remaining = contentLength; 486 this.onComplete = onComplete; 487 } 488 489 String dbgString() { 490 return dbgTag; 491 } 492 493 @Override 494 public void onSubscribe(AbstractSubscription sub) { 495 if (debug.on()) 496 debug.log("length=" + contentLength +", onSubscribe: " 497 + pusher.getClass().getName()); 498 pusher.onSubscribe(this.sub = sub); 499 try { 500 if (contentLength == 0) { 501 onFinished.run(); 502 pusher.onComplete(); 503 onComplete.accept(null); 504 } 505 } catch (Throwable t) { 506 closedExceptionally = t; 507 try { 508 pusher.onError(t); 509 } finally { 510 onComplete.accept(t); 511 } 512 } 513 } 514 515 @Override 516 public String currentStateMessage() { 517 return format("fixed content-length: %d, bytes received: %d", 518 contentLength, contentLength - remaining); 519 } 520 521 @Override 522 public void accept(ByteBuffer b) { 523 if (closedExceptionally != null) { 524 if (debug.on()) 525 debug.log("already closed: " + closedExceptionally); 526 return; 527 } 528 boolean completed = false; 529 try { 530 long unfulfilled = remaining; 531 if (debug.on()) 532 debug.log("Parser got %d bytes (%d remaining / %d)", 533 b.remaining(), unfulfilled, contentLength); 534 assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0; 535 536 if (unfulfilled == 0 && contentLength > 0) return; 537 538 if (b.hasRemaining() && unfulfilled > 0) { 539 // only reduce demand if we actually push something. 540 // we would not have come here if there was no 541 // demand. 542 boolean hasDemand = sub.demand().tryDecrement(); 543 assert hasDemand; 544 int amount = (int)Math.min(b.remaining(), unfulfilled); // safe cast 545 unfulfilled = remaining -= amount; 546 ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount); 547 pusher.onNext(List.of(buffer.asReadOnlyBuffer())); 548 } 549 if (unfulfilled == 0) { 550 // We're done! All data has been received. 551 if (debug.on()) 552 debug.log("Parser got all expected bytes: completing"); 553 assert closedExceptionally == null; 554 onFinished.run(); 555 pusher.onComplete(); 556 completed = true; 557 onComplete.accept(closedExceptionally); // should be null 558 } else { 559 assert b.remaining() == 0; 560 } 561 } catch (Throwable t) { 562 if (debug.on()) debug.log("Unexpected exception", t); 563 closedExceptionally = t; 564 if (!completed) { 565 onComplete.accept(t); 566 } 567 } 568 } 569 } 570 }