1 /* 2 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.EOFException; 29 import java.lang.System.Logger.Level; 30 import java.nio.ByteBuffer; 31 import java.util.concurrent.CompletableFuture; 32 import java.util.concurrent.CompletionStage; 33 import java.util.concurrent.Executor; 34 import java.util.function.BiConsumer; 35 import java.util.function.Consumer; 36 import java.util.function.Function; 37 import jdk.incubator.http.ResponseContent.BodyParser; 38 import jdk.incubator.http.internal.common.Log; 39 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1; 40 import jdk.incubator.http.internal.common.MinimalFuture; 41 import jdk.incubator.http.internal.common.Utils; 42 43 /** 44 * Handles a HTTP/1.1 response (headers + body). 45 * There can be more than one of these per Http exchange. 46 */ 47 class Http1Response<T> { 48 49 private volatile ResponseContent content; 50 private final HttpRequestImpl request; 51 private Response response; 52 private final HttpConnection connection; 53 private HttpHeaders headers; 54 private int responseCode; 55 private final Http1Exchange<T> exchange; 56 private boolean return2Cache; // return connection to cache when finished 57 private final HeadersReader headersReader; // used to read the headers 58 private final BodyReader bodyReader; // used to read the body 59 private final Http1AsyncReceiver asyncReceiver; 60 private volatile EOFException eof; 61 // max number of bytes of (fixed length) body to ignore on redirect 62 private final static int MAX_IGNORE = 1024; 63 64 // Revisit: can we get rid of this? 65 static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} 66 private volatile State readProgress = State.INITIAL; 67 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 68 final System.Logger debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG); 69 70 71 Http1Response(HttpConnection conn, 72 Http1Exchange<T> exchange, 73 Http1AsyncReceiver asyncReceiver) { 74 this.readProgress = State.INITIAL; 75 this.request = exchange.request(); 76 this.exchange = exchange; 77 this.connection = conn; 78 this.asyncReceiver = asyncReceiver; 79 headersReader = new HeadersReader(this::advance); 80 bodyReader = new BodyReader(this::advance); 81 } 82 83 public CompletableFuture<Response> readHeadersAsync(Executor executor) { 84 debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: " 85 + asyncReceiver.remaining() +") " + readProgress); 86 // with expect continue we will resume reading headers + body. 87 asyncReceiver.unsubscribe(bodyReader); 88 bodyReader.reset(); 89 Http1HeaderParser hd = new Http1HeaderParser(); 90 readProgress = State.READING_HEADERS; 91 headersReader.start(hd); 92 asyncReceiver.subscribe(headersReader); 93 CompletableFuture<State> cf = headersReader.completion(); 94 assert cf != null : "parsing not started"; 95 96 Function<State, Response> lambda = (State completed) -> { 97 assert completed == State.READING_HEADERS; 98 debug.log(Level.DEBUG, () -> 99 "Reading Headers: creating Response object;" 100 + " state is now " + readProgress); 101 asyncReceiver.unsubscribe(headersReader); 102 responseCode = hd.responseCode(); 103 headers = hd.headers(); 104 105 response = new Response(request, 106 exchange.getExchange(), 107 headers, 108 responseCode, 109 HTTP_1_1); 110 return response; 111 }; 112 113 if (executor != null) { 114 return cf.thenApplyAsync(lambda, executor); 115 } else { 116 return cf.thenApply(lambda); 117 } 118 } 119 120 private boolean finished; 121 122 synchronized void completed() { 123 finished = true; 124 } 125 126 synchronized boolean finished() { 127 return finished; 128 } 129 130 int fixupContentLen(int clen) { 131 if (request.method().equalsIgnoreCase("HEAD")) { 132 return 0; 133 } 134 if (clen == -1) { 135 if (headers.firstValue("Transfer-encoding").orElse("") 136 .equalsIgnoreCase("chunked")) { 137 return -1; 138 } 139 return 0; 140 } 141 return clen; 142 } 143 144 /** 145 * Read up to MAX_IGNORE bytes discarding 146 */ 147 public CompletableFuture<Void> ignoreBody(Executor executor) { 148 int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1); 149 if (clen == -1 || clen > MAX_IGNORE) { 150 connection.close(); 151 return MinimalFuture.completedFuture(null); // not treating as error 152 } else { 153 return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor); 154 } 155 } 156 157 public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p, 158 boolean return2Cache, 159 Executor executor) { 160 this.return2Cache = return2Cache; 161 final HttpResponse.BodySubscriber<U> pusher = p; 162 final CompletionStage<U> bodyCF = p.getBody(); 163 final CompletableFuture<U> cf = MinimalFuture.of(bodyCF); 164 165 int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1); 166 167 final int clen = fixupContentLen(clen0); 168 169 // expect-continue reads headers and body twice. 170 // if we reach here, we must reset the headersReader state. 171 asyncReceiver.unsubscribe(headersReader); 172 headersReader.reset(); 173 174 executor.execute(() -> { 175 try { 176 HttpClientImpl client = connection.client(); 177 content = new ResponseContent( 178 connection, clen, headers, pusher, 179 this::onFinished 180 ); 181 if (cf.isCompletedExceptionally()) { 182 // if an error occurs during subscription 183 connection.close(); 184 return; 185 } 186 // increment the reference count on the HttpClientImpl 187 // to prevent the SelectorManager thread from exiting until 188 // the body is fully read. 189 client.reference(); 190 bodyReader.start(content.getBodyParser( 191 (t) -> { 192 try { 193 if (t != null) { 194 pusher.onError(t); 195 connection.close(); 196 if (!cf.isDone()) 197 cf.completeExceptionally(t); 198 } 199 } finally { 200 // decrement the reference count on the HttpClientImpl 201 // to allow the SelectorManager thread to exit if no 202 // other operation is pending and the facade is no 203 // longer referenced. 204 client.unreference(); 205 bodyReader.onComplete(t); 206 } 207 })); 208 CompletableFuture<State> bodyReaderCF = bodyReader.completion(); 209 asyncReceiver.subscribe(bodyReader); 210 assert bodyReaderCF != null : "parsing not started"; 211 // Make sure to keep a reference to asyncReceiver from 212 // within this 213 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> { 214 t = Utils.getCompletionCause(t); 215 try { 216 if (t != null) { 217 debug.log(Level.DEBUG, () -> 218 "Finished reading body: " + s); 219 assert s == State.READING_BODY; 220 } 221 if (t != null && !cf.isDone()) { 222 pusher.onError(t); 223 cf.completeExceptionally(t); 224 } 225 } catch (Throwable x) { 226 // not supposed to happen 227 asyncReceiver.onReadError(x); 228 } 229 }); 230 connection.addTrailingOperation(trailingOp); 231 } catch (Throwable t) { 232 debug.log(Level.DEBUG, () -> "Failed reading body: " + t); 233 try { 234 if (!cf.isDone()) { 235 pusher.onError(t); 236 cf.completeExceptionally(t); 237 } 238 } finally { 239 asyncReceiver.onReadError(t); 240 } 241 } 242 }); 243 return cf; 244 } 245 246 247 private void onFinished() { 248 asyncReceiver.clear(); 249 if (return2Cache) { 250 Log.logTrace("Attempting to return connection to the pool: {0}", connection); 251 // TODO: need to do something here? 252 // connection.setAsyncCallbacks(null, null, null); 253 254 // don't return the connection to the cache if EOF happened. 255 debug.log(Level.DEBUG, () -> connection.getConnectionFlow() 256 + ": return to HTTP/1.1 pool"); 257 connection.closeOrReturnToCache(eof == null ? headers : null); 258 } 259 } 260 261 HttpHeaders responseHeaders() { 262 return headers; 263 } 264 265 int responseCode() { 266 return responseCode; 267 } 268 269 // ================ Support for plugging into Http1Receiver ================= 270 // ============================================================================ 271 272 // Callback: Error receiver: Consumer of Throwable. 273 void onReadError(Throwable t) { 274 Log.logError(t); 275 Receiver<?> receiver = receiver(readProgress); 276 if (t instanceof EOFException) { 277 debug.log(Level.DEBUG, "onReadError: received EOF"); 278 eof = (EOFException) t; 279 } 280 CompletableFuture<?> cf = receiver == null ? null : receiver.completion(); 281 debug.log(Level.DEBUG, () -> "onReadError: cf is " 282 + (cf == null ? "null" 283 : (cf.isDone() ? "already completed" 284 : "not yet completed"))); 285 if (cf != null && !cf.isDone()) cf.completeExceptionally(t); 286 else { debug.log(Level.DEBUG, "onReadError", t); } 287 debug.log(Level.DEBUG, () -> "closing connection: cause is " + t); 288 connection.close(); 289 } 290 291 // ======================================================================== 292 293 private State advance(State previous) { 294 assert readProgress == previous; 295 switch(previous) { 296 case READING_HEADERS: 297 asyncReceiver.unsubscribe(headersReader); 298 return readProgress = State.READING_BODY; 299 case READING_BODY: 300 asyncReceiver.unsubscribe(bodyReader); 301 return readProgress = State.DONE; 302 default: 303 throw new InternalError("can't advance from " + previous); 304 } 305 } 306 307 Receiver<?> receiver(State state) { 308 switch(state) { 309 case READING_HEADERS: return headersReader; 310 case READING_BODY: return bodyReader; 311 default: return null; 312 } 313 314 } 315 316 static abstract class Receiver<T> 317 implements Http1AsyncReceiver.Http1AsyncDelegate { 318 abstract void start(T parser); 319 abstract CompletableFuture<State> completion(); 320 // accepts a buffer from upstream. 321 // this should be implemented as a simple call to 322 // accept(ref, parser, cf) 323 public abstract boolean tryAsyncReceive(ByteBuffer buffer); 324 public abstract void onReadError(Throwable t); 325 // handle a byte buffer received from upstream. 326 // this method should set the value of Http1Response.buffer 327 // to ref.get() before beginning parsing. 328 abstract void handle(ByteBuffer buf, T parser, 329 CompletableFuture<State> cf); 330 // resets this objects state so that it can be reused later on 331 // typically puts the reference to parser and completion to null 332 abstract void reset(); 333 334 // accepts a byte buffer received from upstream 335 // returns true if the buffer is fully parsed and more data can 336 // be accepted, false otherwise. 337 final boolean accept(ByteBuffer buf, T parser, 338 CompletableFuture<State> cf) { 339 if (cf == null || parser == null || cf.isDone()) return false; 340 handle(buf, parser, cf); 341 return !cf.isDone(); 342 } 343 public abstract void onSubscribe(AbstractSubscription s); 344 public abstract AbstractSubscription subscription(); 345 346 } 347 348 // Invoked with each new ByteBuffer when reading headers... 349 final class HeadersReader extends Receiver<Http1HeaderParser> { 350 final Consumer<State> onComplete; 351 volatile Http1HeaderParser parser; 352 volatile CompletableFuture<State> cf; 353 volatile long count; // bytes parsed (for debug) 354 volatile AbstractSubscription subscription; 355 356 HeadersReader(Consumer<State> onComplete) { 357 this.onComplete = onComplete; 358 } 359 360 @Override 361 public AbstractSubscription subscription() { 362 return subscription; 363 } 364 365 @Override 366 public void onSubscribe(AbstractSubscription s) { 367 this.subscription = s; 368 s.request(1); 369 } 370 371 @Override 372 void reset() { 373 cf = null; 374 parser = null; 375 count = 0; 376 subscription = null; 377 } 378 379 // Revisit: do we need to support restarting? 380 @Override 381 final void start(Http1HeaderParser hp) { 382 count = 0; 383 cf = new MinimalFuture<>(); 384 parser = hp; 385 } 386 387 @Override 388 CompletableFuture<State> completion() { 389 return cf; 390 } 391 392 @Override 393 public final boolean tryAsyncReceive(ByteBuffer ref) { 394 boolean hasDemand = subscription.demand().tryDecrement(); 395 assert hasDemand; 396 boolean needsMore = accept(ref, parser, cf); 397 if (needsMore) subscription.request(1); 398 return needsMore; 399 } 400 401 @Override 402 public final void onReadError(Throwable t) { 403 Http1Response.this.onReadError(t); 404 } 405 406 @Override 407 final void handle(ByteBuffer b, 408 Http1HeaderParser parser, 409 CompletableFuture<State> cf) { 410 assert cf != null : "parsing not started"; 411 assert parser != null : "no parser"; 412 try { 413 count += b.remaining(); 414 debug.log(Level.DEBUG, () -> "Sending " + b.remaining() 415 + "/" + b.capacity() + " bytes to header parser"); 416 if (parser.parse(b)) { 417 count -= b.remaining(); 418 debug.log(Level.DEBUG, () -> 419 "Parsing headers completed. bytes=" + count); 420 onComplete.accept(State.READING_HEADERS); 421 cf.complete(State.READING_HEADERS); 422 } 423 } catch (Throwable t) { 424 debug.log(Level.DEBUG, 425 () -> "Header parser failed to handle buffer: " + t); 426 cf.completeExceptionally(t); 427 } 428 } 429 } 430 431 // Invoked with each new ByteBuffer when reading bodies... 432 final class BodyReader extends Receiver<BodyParser> { 433 final Consumer<State> onComplete; 434 volatile BodyParser parser; 435 volatile CompletableFuture<State> cf; 436 volatile AbstractSubscription subscription; 437 BodyReader(Consumer<State> onComplete) { 438 this.onComplete = onComplete; 439 } 440 441 @Override 442 void reset() { 443 parser = null; 444 cf = null; 445 subscription = null; 446 } 447 448 // Revisit: do we need to support restarting? 449 @Override 450 final void start(BodyParser parser) { 451 cf = new MinimalFuture<>(); 452 this.parser = parser; 453 } 454 455 @Override 456 CompletableFuture<State> completion() { 457 return cf; 458 } 459 460 @Override 461 public final boolean tryAsyncReceive(ByteBuffer b) { 462 return accept(b, parser, cf); 463 } 464 465 @Override 466 public final void onReadError(Throwable t) { 467 Http1Response.this.onReadError(t); 468 } 469 470 @Override 471 public AbstractSubscription subscription() { 472 return subscription; 473 } 474 475 @Override 476 public void onSubscribe(AbstractSubscription s) { 477 this.subscription = s; 478 parser.onSubscribe(s); 479 } 480 481 @Override 482 final void handle(ByteBuffer b, 483 BodyParser parser, 484 CompletableFuture<State> cf) { 485 assert cf != null : "parsing not started"; 486 assert parser != null : "no parser"; 487 try { 488 debug.log(Level.DEBUG, () -> "Sending " + b.remaining() 489 + "/" + b.capacity() + " bytes to body parser"); 490 parser.accept(b); 491 } catch (Throwable t) { 492 debug.log(Level.DEBUG, 493 () -> "Body parser failed to handle buffer: " + t); 494 if (!cf.isDone()) { 495 cf.completeExceptionally(t); 496 } 497 } 498 } 499 500 final void onComplete(Throwable closedExceptionally) { 501 if (cf.isDone()) return; 502 if (closedExceptionally != null) { 503 cf.completeExceptionally(closedExceptionally); 504 } else { 505 onComplete.accept(State.READING_BODY); 506 cf.complete(State.READING_BODY); 507 } 508 } 509 510 @Override 511 public String toString() { 512 return super.toString() + "/parser=" + String.valueOf(parser); 513 } 514 515 } 516 }