1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.nio.ByteBuffer; 31 import java.util.ArrayList; 32 import java.util.List; 33 import java.util.function.Consumer; 34 import jdk.incubator.http.internal.common.Utils; 35 36 /** 37 * Implements chunked/fixed transfer encodings of HTTP/1.1 responses. 38 * 39 * Call pushBody() to read the body (blocking). Data and errors are provided 40 * to given Consumers. After final buffer delivered, empty optional delivered 41 */ 42 class ResponseContent { 43 44 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 45 46 final HttpResponse.BodySubscriber<?> pusher; 47 final int contentLength; 48 final HttpHeaders headers; 49 // this needs to run before we complete the body 50 // so that connection can be returned to pool 51 private final Runnable onFinished; 52 private final String dbgTag; 53 54 ResponseContent(HttpConnection connection, 55 int contentLength, 56 HttpHeaders h, 57 HttpResponse.BodySubscriber<?> userSubscriber, 58 Runnable onFinished) 59 { 60 this.pusher = userSubscriber; 61 this.contentLength = contentLength; 62 this.headers = h; 63 this.onFinished = onFinished; 64 this.dbgTag = connection.dbgString() + "/ResponseContent"; 65 } 66 67 static final int LF = 10; 68 static final int CR = 13; 69 70 private boolean chunkedContent, chunkedContentInitialized; 71 72 boolean contentChunked() throws IOException { 73 if (chunkedContentInitialized) { 74 return chunkedContent; 75 } 76 if (contentLength == -1) { 77 String tc = headers.firstValue("Transfer-Encoding") 78 .orElse(""); 79 if (!tc.equals("")) { 80 if (tc.equalsIgnoreCase("chunked")) { 81 chunkedContent = true; 82 } else { 83 throw new IOException("invalid content"); 84 } 85 } else { 86 chunkedContent = false; 87 } 88 } 89 chunkedContentInitialized = true; 90 return chunkedContent; 91 } 92 93 interface BodyParser extends Consumer<ByteBuffer> { 94 void onSubscribe(AbstractSubscription sub); 95 } 96 97 // Returns a parser that will take care of parsing the received byte 98 // buffers and forward them to the BodySubscriber. 99 // When the parser is done, it will call onComplete. 100 // If parsing was successful, the throwable parameter will be null. 101 // Otherwise it will be the exception that occurred 102 // Note: revisit: it might be better to use a CompletableFuture than 103 // a completion handler. 104 BodyParser getBodyParser(Consumer<Throwable> onComplete) 105 throws IOException { 106 if (contentChunked()) { 107 return new ChunkedBodyParser(onComplete); 108 } else { 109 return new FixedLengthBodyParser(contentLength, onComplete); 110 } 111 } 112 113 114 static enum ChunkState {READING_LENGTH, READING_DATA, DONE} 115 class ChunkedBodyParser implements BodyParser { 116 final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER; 117 final Consumer<Throwable> onComplete; 118 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 119 final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser"; 120 121 volatile Throwable closedExceptionally; 122 volatile int partialChunklen = 0; // partially read chunk len 123 volatile int chunklen = -1; // number of bytes in chunk 124 volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF 125 volatile boolean cr = false; // tryReadChunkLength has found CR 126 volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding 127 volatile ChunkState state = ChunkState.READING_LENGTH; // current state 128 volatile AbstractSubscription sub; 129 ChunkedBodyParser(Consumer<Throwable> onComplete) { 130 this.onComplete = onComplete; 131 } 132 133 String dbgString() { 134 return dbgTag; 135 } 136 137 @Override 138 public void onSubscribe(AbstractSubscription sub) { 139 debug.log(Level.DEBUG, () -> "onSubscribe: " 140 + pusher.getClass().getName()); 141 pusher.onSubscribe(this.sub = sub); 142 } 143 144 @Override 145 public void accept(ByteBuffer b) { 146 if (closedExceptionally != null) { 147 debug.log(Level.DEBUG, () -> "already closed: " 148 + closedExceptionally); 149 return; 150 } 151 boolean completed = false; 152 try { 153 List<ByteBuffer> out = new ArrayList<>(); 154 do { 155 if (tryPushOneHunk(b, out)) { 156 // We're done! (true if the final chunk was parsed). 157 if (!out.isEmpty()) { 158 // push what we have and complete 159 // only reduce demand if we actually push something. 160 // we would not have come here if there was no 161 // demand. 162 boolean hasDemand = sub.demand().tryDecrement(); 163 assert hasDemand; 164 pusher.onNext(out); 165 } 166 debug.log(Level.DEBUG, () -> "done!"); 167 assert closedExceptionally == null; 168 assert state == ChunkState.DONE; 169 onFinished.run(); 170 pusher.onComplete(); 171 completed = true; 172 onComplete.accept(closedExceptionally); // should be null 173 break; 174 } 175 // the buffer may contain several hunks, and therefore 176 // we must loop while it's not exhausted. 177 } while (b.hasRemaining()); 178 179 if (!completed && !out.isEmpty()) { 180 // push what we have. 181 // only reduce demand if we actually push something. 182 // we would not have come here if there was no 183 // demand. 184 boolean hasDemand = sub.demand().tryDecrement(); 185 assert hasDemand; 186 pusher.onNext(out); 187 } 188 assert state == ChunkState.DONE || !b.hasRemaining(); 189 } catch(Throwable t) { 190 closedExceptionally = t; 191 if (!completed) onComplete.accept(t); 192 } 193 } 194 195 // reads and returns chunklen. Position of chunkbuf is first byte 196 // of chunk on return. chunklen includes the CR LF at end of chunk 197 // returns -1 if needs more bytes 198 private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException { 199 assert state == ChunkState.READING_LENGTH; 200 while (chunkbuf.hasRemaining()) { 201 int c = chunkbuf.get(); 202 if (cr) { 203 if (c == LF) { 204 return partialChunklen; 205 } else { 206 throw new IOException("invalid chunk header"); 207 } 208 } 209 if (c == CR) { 210 cr = true; 211 } else { 212 int digit = toDigit(c); 213 partialChunklen = partialChunklen * 16 + digit; 214 } 215 } 216 return -1; 217 } 218 219 220 // try to consume as many bytes as specified by bytesToConsume. 221 // returns the number of bytes that still need to be consumed. 222 // In practice this method is only called to consume one CRLF pair 223 // with bytesToConsume set to 2, so it will only return 0 (if completed), 224 // 1, or 2 (if chunkbuf doesn't have the 2 chars). 225 private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException { 226 int n = bytesToConsume; 227 if (n > 0) { 228 int e = Math.min(chunkbuf.remaining(), n); 229 230 // verifies some assertions 231 // this methods is called only to consume CRLF 232 if (Utils.ASSERTIONSENABLED) { 233 assert n <= 2 && e <= 2; 234 ByteBuffer tmp = chunkbuf.slice(); 235 // if n == 2 assert that we will first consume CR 236 assert (n == 2 && e > 0) ? tmp.get() == CR : true; 237 // if n == 1 || n == 2 && e == 2 assert that we then consume LF 238 assert (n == 1 || e == 2) ? tmp.get() == LF : true; 239 } 240 241 chunkbuf.position(chunkbuf.position() + e); 242 n -= e; 243 bytesToConsume = n; 244 } 245 assert n >= 0; 246 return n; 247 } 248 249 /** 250 * Returns a ByteBuffer containing chunk of data or a "hunk" of data 251 * (a chunk of a chunk if the chunk size is larger than our ByteBuffers). 252 * If the given chunk does not have enough data this method return 253 * an empty ByteBuffer (READMORE). 254 * If we encounter the final chunk (an empty chunk) this method 255 * returns null. 256 */ 257 ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException { 258 int unfulfilled = bytesremaining; 259 int toconsume = bytesToConsume; 260 ChunkState st = state; 261 if (st == ChunkState.READING_LENGTH && chunklen == -1) { 262 debug.log(Level.DEBUG, () -> "Trying to read chunk len" 263 + " (remaining in buffer:"+chunk.remaining()+")"); 264 int clen = chunklen = tryReadChunkLen(chunk); 265 if (clen == -1) return READMORE; 266 debug.log(Level.DEBUG, "Got chunk len %d", clen); 267 cr = false; partialChunklen = 0; 268 unfulfilled = bytesremaining = clen; 269 if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk 270 else st = state = ChunkState.READING_DATA; // read the data 271 } 272 273 if (toconsume > 0) { 274 debug.log(Level.DEBUG, 275 "Trying to consume bytes: %d (remaining in buffer: %s)", 276 toconsume, chunk.remaining()); 277 if (tryConsumeBytes(chunk) > 0) { 278 return READMORE; 279 } 280 } 281 282 toconsume = bytesToConsume; 283 assert toconsume == 0; 284 285 286 if (st == ChunkState.READING_LENGTH) { 287 // we will come here only if chunklen was 0, after having 288 // consumed the trailing CRLF 289 int clen = chunklen; 290 assert clen == 0; 291 debug.log(Level.DEBUG, "No more chunks: %d", clen); 292 // the DONE state is not really needed but it helps with 293 // assertions... 294 state = ChunkState.DONE; 295 return null; 296 } 297 298 int clen = chunklen; 299 assert clen > 0; 300 assert st == ChunkState.READING_DATA; 301 302 ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk 303 if (unfulfilled > 0) { 304 int bytesread = chunk.remaining(); 305 debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d", 306 bytesread, unfulfilled); 307 308 int bytes2return = Math.min(bytesread, unfulfilled); 309 debug.log(Level.DEBUG, "Returning chunk bytes: %d", bytes2return); 310 returnBuffer = Utils.slice(chunk, bytes2return); 311 unfulfilled = bytesremaining -= bytes2return; 312 if (unfulfilled == 0) bytesToConsume = 2; 313 } 314 315 assert unfulfilled >= 0; 316 317 if (unfulfilled == 0) { 318 debug.log(Level.DEBUG, 319 "No more bytes to read - %d yet to consume.", 320 unfulfilled); 321 // check whether the trailing CRLF is consumed, try to 322 // consume it if not. If tryConsumeBytes needs more bytes 323 // then we will come back here later - skipping the block 324 // that reads data because remaining==0, and finding 325 // that the two bytes are now consumed. 326 if (tryConsumeBytes(chunk) == 0) { 327 // we're done for this chunk! reset all states and 328 // prepare to read the next chunk. 329 chunklen = -1; 330 partialChunklen = 0; 331 cr = false; 332 state = ChunkState.READING_LENGTH; 333 debug.log(Level.DEBUG, "Ready to read next chunk"); 334 } 335 } 336 if (returnBuffer == READMORE) { 337 debug.log(Level.DEBUG, "Need more data"); 338 } 339 return returnBuffer; 340 } 341 342 343 // Attempt to parse and push one hunk from the buffer. 344 // Returns true if the final chunk was parsed. 345 // Returns false if we need to push more chunks. 346 private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out) 347 throws IOException { 348 assert state != ChunkState.DONE; 349 ByteBuffer b1 = tryReadOneHunk(b); 350 if (b1 != null) { 351 //assert b1.hasRemaining() || b1 == READMORE; 352 if (b1.hasRemaining()) { 353 debug.log(Level.DEBUG, "Sending chunk to consumer (%d)", 354 b1.remaining()); 355 out.add(b1); 356 debug.log(Level.DEBUG, "Chunk sent."); 357 } 358 return false; // we haven't parsed the final chunk yet. 359 } else { 360 return true; // we're done! the final chunk was parsed. 361 } 362 } 363 364 private int toDigit(int b) throws IOException { 365 if (b >= 0x30 && b <= 0x39) { 366 return b - 0x30; 367 } 368 if (b >= 0x41 && b <= 0x46) { 369 return b - 0x41 + 10; 370 } 371 if (b >= 0x61 && b <= 0x66) { 372 return b - 0x61 + 10; 373 } 374 throw new IOException("Invalid chunk header byte " + b); 375 } 376 377 } 378 379 class FixedLengthBodyParser implements BodyParser { 380 final int contentLength; 381 final Consumer<Throwable> onComplete; 382 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); 383 final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser"; 384 volatile int remaining; 385 volatile Throwable closedExceptionally; 386 volatile AbstractSubscription sub; 387 FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) { 388 this.contentLength = this.remaining = contentLength; 389 this.onComplete = onComplete; 390 } 391 392 String dbgString() { 393 return dbgTag; 394 } 395 396 @Override 397 public void onSubscribe(AbstractSubscription sub) { 398 debug.log(Level.DEBUG, () -> "length=" 399 + contentLength +", onSubscribe: " 400 + pusher.getClass().getName()); 401 pusher.onSubscribe(this.sub = sub); 402 try { 403 if (contentLength == 0) { 404 pusher.onComplete(); 405 onComplete.accept(null); 406 } 407 } catch (Throwable t) { 408 closedExceptionally = t; 409 try { 410 pusher.onError(t); 411 } finally { 412 onComplete.accept(t); 413 } 414 } 415 } 416 417 @Override 418 public void accept(ByteBuffer b) { 419 if (closedExceptionally != null) { 420 debug.log(Level.DEBUG, () -> "already closed: " 421 + closedExceptionally); 422 return; 423 } 424 boolean completed = false; 425 try { 426 int unfulfilled = remaining; 427 debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)", 428 b.remaining(), unfulfilled, contentLength); 429 assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0; 430 431 if (unfulfilled == 0 && contentLength > 0) return; 432 433 if (b.hasRemaining() && unfulfilled > 0) { 434 // only reduce demand if we actually push something. 435 // we would not have come here if there was no 436 // demand. 437 boolean hasDemand = sub.demand().tryDecrement(); 438 assert hasDemand; 439 int amount = Math.min(b.remaining(), unfulfilled); 440 unfulfilled = remaining -= amount; 441 ByteBuffer buffer = Utils.slice(b, amount); 442 pusher.onNext(List.of(buffer)); 443 } 444 if (unfulfilled == 0) { 445 // We're done! All data has been received. 446 assert closedExceptionally == null; 447 onFinished.run(); 448 pusher.onComplete(); 449 completed = true; 450 onComplete.accept(closedExceptionally); // should be null 451 } else { 452 assert b.remaining() == 0; 453 } 454 } catch (Throwable t) { 455 debug.log(Level.DEBUG, "Unexpected exception", t); 456 closedExceptionally = t; 457 if (!completed) { 458 onComplete.accept(t); 459 } 460 } 461 } 462 } 463 }