--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java 2017-11-30 04:04:01.977314408 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java 2017-11-30 04:04:01.662286869 -0800 @@ -26,20 +26,22 @@ package jdk.incubator.http; import java.io.IOException; -import java.io.UncheckedIOException; +import java.io.InputStream; import java.net.URI; -import jdk.incubator.http.ResponseProcessors.MultiFile; -import jdk.incubator.http.ResponseProcessors.MultiProcessorImpl; +import jdk.incubator.http.ResponseSubscribers.MultiSubscriberImpl; import static jdk.incubator.http.internal.common.Utils.unchecked; import static jdk.incubator.http.internal.common.Utils.charsetFrom; import java.nio.ByteBuffer; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; +import java.nio.channels.FileChannel; import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; -import java.util.Map; +import java.security.AccessControlContext; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -52,7 +54,7 @@ * Represents a response to a {@link HttpRequest}. * {@Incubating} * - *

A {@code HttpResponse} is available when the response status code and + *

A {@code HttpResponse} is available when the response status code and * headers have been received, and typically after the response body has also * been received. This depends on the response body handler provided when * sending the request. In all cases, the response body handler is invoked @@ -61,23 +63,24 @@ * *

Methods are provided in this class for accessing the response headers, * and response body. - *

- * Response handlers and processors - *

- * Response bodies are handled at two levels. Application code supplies a response - * handler ({@link BodyHandler}) which may examine the response status code - * and headers, and which then returns a {@link BodyProcessor} to actually read - * (or discard) the body and convert it into some useful Java object type. The handler - * can return one of the pre-defined processor types, or a custom processor, or - * if the body is to be discarded, it can call {@link BodyProcessor#discard(Object) - * BodyProcessor.discard()} and return a processor which discards the response body. - * Static implementations of both handlers and processors are provided in - * {@link BodyHandler BodyHandler} and {@link BodyProcessor BodyProcessor} respectively. - * In all cases, the handler functions provided are convenience implementations - * which ignore the supplied status code and - * headers and return the relevant pre-defined {@code BodyProcessor}. - *

- * See {@link BodyHandler} for example usage. + * + *

Response handlers and subscribers + * + *

Response bodies are handled at two levels. Application code supplies a + * response handler ({@link BodyHandler}) which may examine the response status + * code and headers, and which then returns a {@link BodySubscriber} to actually + * read (or discard) the body and convert it into some useful Java object type. + * The handler can return one of the pre-defined subscriber types, or a custom + * subscriber, or if the body is to be discarded it can call {@link + * BodySubscriber#discard(Object) discard} and return a subscriber which + * discards the response body. Static implementations of both handlers and + * subscribers are provided in {@linkplain BodyHandler BodyHandler} and + * {@linkplain BodySubscriber BodySubscriber} respectively. In all cases, the + * handler functions provided are convenience implementations which ignore the + * supplied status code and headers and return the relevant pre-defined {@code + * BodySubscriber}. + * + *

See {@link BodyHandler} for example usage. * * @param the response body type * @since 9 @@ -97,19 +100,26 @@ public abstract int statusCode(); /** - * Returns the initial {@link HttpRequest} that initiated the exchange. + * Returns the {@link HttpRequest} corresponding to this response. + * + *

This may not be the original request provided by the caller, + * for example, if that request was redirected. + * + * @see #previousResponse() * * @return the request */ public abstract HttpRequest request(); /** - * Returns the final {@link HttpRequest} that was sent on the wire for the - * exchange ( may, or may not, be the same as the initial request ). + * Returns an {@code Optional} containing the previous intermediate response + * if one was received. An intermediate response is one that is received + * as a result of redirection or authentication. If no previous response + * was received then an empty {@code Optional} is returned. * - * @return the request + * @return an Optional containing the HttpResponse, if any. */ - public abstract HttpRequest finalRequest(); + public abstract Optional> previousResponse(); /** * Returns the received response headers. @@ -119,21 +129,14 @@ public abstract HttpHeaders headers(); /** - * Returns the received response trailers, if there are any, when they - * become available. For many response processor types this will be at the same - * time as the {@code HttpResponse} itself is available. In such cases, the - * returned {@code CompletableFuture} will be already completed. - * - * @return a CompletableFuture of the response trailers (may be empty) - */ - public abstract CompletableFuture trailers(); - - /** - * Returns the body. Depending on the type of {@code T}, the returned body may - * represent the body after it was read (such as {@code byte[]}, or + * Returns the body. Depending on the type of {@code T}, the returned body + * may represent the body after it was read (such as {@code byte[]}, or * {@code String}, or {@code Path}) or it may represent an object with * which the body is read, such as an {@link java.io.InputStream}. * + *

If this {@code HttpResponse} was returned from an invocation of + * {@link #previousResponse()} then this method returns {@code null} + * * @return the body */ public abstract T body(); @@ -161,36 +164,124 @@ */ public abstract HttpClient.Version version(); + + private static String pathForSecurityCheck(Path path) { + return path.toFile().getPath(); + } + + /** A body handler that is further restricted by a given ACC. */ + interface UntrustedBodyHandler extends BodyHandler { + void setAccessControlContext(AccessControlContext acc); + } + + /** + * A Path body handler. + * + * Note: Exists mainly too allow setting of the senders ACC post creation of + * the handler. + */ + static class PathBodyHandler implements UntrustedBodyHandler { + private final Path file; + private final OpenOption[]openOptions; + private volatile AccessControlContext acc; + + PathBodyHandler(Path file, OpenOption... openOptions) { + this.file = file; + this.openOptions = openOptions; + } + + @Override + public void setAccessControlContext(AccessControlContext acc) { + this.acc = acc; + } + + @Override + public BodySubscriber apply(int statusCode, HttpHeaders headers) { + ResponseSubscribers.PathSubscriber bs = (ResponseSubscribers.PathSubscriber) + BodySubscriber.asFileImpl(file, openOptions); + bs.setAccessControlContext(acc); + return bs; + } + } + + // Similar to Path body handler, but for file download. Supports setting ACC. + static class FileDownloadBodyHandler implements UntrustedBodyHandler { + private final Path directory; + private final OpenOption[]openOptions; + private volatile AccessControlContext acc; + + FileDownloadBodyHandler(Path directory, OpenOption... openOptions) { + this.directory = directory; + this.openOptions = openOptions; + } + + @Override + public void setAccessControlContext(AccessControlContext acc) { + this.acc = acc; + } + + @Override + public BodySubscriber apply(int statusCode, HttpHeaders headers) { + String dispoHeader = headers.firstValue("Content-Disposition") + .orElseThrow(() -> unchecked(new IOException("No Content-Disposition"))); + if (!dispoHeader.startsWith("attachment;")) { + throw unchecked(new IOException("Unknown Content-Disposition type")); + } + int n = dispoHeader.indexOf("filename="); + if (n == -1) { + throw unchecked(new IOException("Bad Content-Disposition type")); + } + int lastsemi = dispoHeader.lastIndexOf(';'); + String disposition; + if (lastsemi < n) { + disposition = dispoHeader.substring(n + 9); + } else { + disposition = dispoHeader.substring(n + 9, lastsemi); + } + Path file = Paths.get(directory.toString(), disposition); + + ResponseSubscribers.PathSubscriber bs = (ResponseSubscribers.PathSubscriber) + BodySubscriber.asFileImpl(file, openOptions); + bs.setAccessControlContext(acc); + return bs; + } + } + /** * A handler for response bodies. * {@Incubating} - *

- * This is a function that takes two parameters: the response status code, - * and the response headers, and which returns a {@link BodyProcessor}. + * + *

This is a function that takes two parameters: the response status code, + * and the response headers, and which returns a {@linkplain BodySubscriber}. * The function is always called just before the response body is read. Its * implementation may examine the status code or headers and must decide, * whether to accept the response body or discard it, and if accepting it, * exactly how to handle it. - *

- * Some pre-defined implementations which do not utilize the status code + * + *

Some pre-defined implementations which do not utilize the status code * or headers (meaning the body is always accepted) are defined: *

- *

- * These implementations return the equivalent {@link BodyProcessor}. + *

  • {@link #buffering(BodyHandler, int) + * buffering(BodyHandler,int)}
  • + * + * + *

    These implementations return the equivalent {@link BodySubscriber}. * Alternatively, the handler can be used to examine the status code - * or headers and return different body processors as appropriate. - *

    - * Examples of handler usage - *

    - * The first example uses one of the predefined handler functions which - * ignore the response headers and status, and always process the response + * or headers and return different body subscribers as appropriate. + * + *

    Examples of handler usage + * + *

    The first example uses one of the predefined handler functions which + * ignores the response headers and status, and always process the response * body in the same way. *

          * {@code
    @@ -201,11 +292,11 @@
          * }
          * 
    * Note, that even though these pre-defined handlers ignore the status code - * and headers, this information is still accessible from the {@code HttpResponse} - * when it is returned. - *

    - * In the second example, the function returns a different processor depending - * on the status code. + * and headers, this information is still accessible from the + * {@code HttpResponse} when it is returned. + * + *

    In the second example, the function returns a different subscriber + * depending on the status code. *

          * {@code
          *      HttpResponse resp1 = HttpRequest
    @@ -213,93 +304,134 @@
          *              .GET()
          *              .response(
          *                  (status, headers) -> status == 200
    -     *                      ? BodyProcessor.asFile(Paths.get("/tmp/f"))
    -     *                      : BodyProcessor.discard(Paths.get("/NULL")));
    +     *                      ? BodySubscriber.asFile(Paths.get("/tmp/f"))
    +     *                      : BodySubscriber.discard(Paths.get("/NULL")));
          * }
          * 
    * - * @param the response body type. + * @param the response body type */ @FunctionalInterface public interface BodyHandler { /** - * Returns a {@link BodyProcessor BodyProcessor} considering the given response status - * code and headers. This method is always called before the body is read - * and its implementation can decide to keep the body and store it somewhere - * or else discard it, by returning the {@code BodyProcessor} returned - * from {@link BodyProcessor#discard(java.lang.Object) discard()}. + * Returns a {@link BodySubscriber BodySubscriber} considering the given + * response status code and headers. This method is always called before + * the body is read and its implementation can decide to keep the body + * and store it somewhere, or else discard it by returning the {@code + * BodySubscriber} returned from {@link BodySubscriber#discard(Object) + * discard}. * * @param statusCode the HTTP status code received * @param responseHeaders the response headers received - * @return a response body handler + * @return a body subscriber */ - public BodyProcessor apply(int statusCode, HttpHeaders responseHeaders); + public BodySubscriber apply(int statusCode, HttpHeaders responseHeaders); /** * Returns a response body handler which discards the response body and * uses the given value as a replacement for it. * * @param the response body type - * @param value the value of U to return as the body + * @param value the value of U to return as the body, may be {@code null} * @return a response body handler */ public static BodyHandler discard(U value) { - return (status, headers) -> BodyProcessor.discard(value); + return (status, headers) -> BodySubscriber.discard(value); } /** * Returns a {@code BodyHandler} that returns a - * {@link BodyProcessor BodyProcessor}{@code } obtained from - * {@link BodyProcessor#asString(java.nio.charset.Charset) - * BodyProcessor.asString(Charset)}. If a charset is provided, the - * body is decoded using it. If charset is {@code null} then the processor - * tries to determine the character set from the {@code Content-encoding} - * header. If that charset is not supported then - * {@link java.nio.charset.StandardCharsets#UTF_8 UTF_8} is used. - * - * @param charset the name of the charset to interpret the body as. If - * {@code null} then charset determined from Content-encoding header + * {@link BodySubscriber BodySubscriber}{@code } obtained from + * {@link BodySubscriber#asString(Charset) BodySubscriber.asString(Charset)}. + * If a charset is provided, the body is decoded using it. If charset is + * {@code null} then the handler tries to determine the character set + * from the {@code Content-encoding} header. If that charset is not + * supported then {@link java.nio.charset.StandardCharsets#UTF_8 UTF_8} + * is used. + * + * @param charset The name of the charset to interpret the body as. If + * {@code null} then the charset is determined from the + * Content-encoding header. * @return a response body handler */ public static BodyHandler asString(Charset charset) { return (status, headers) -> { if (charset != null) { - return BodyProcessor.asString(charset); + return BodySubscriber.asString(charset); } - return BodyProcessor.asString(charsetFrom(headers)); + return BodySubscriber.asString(charsetFrom(headers)); }; } + /** + * Returns a {@code BodyHandler} that returns a + * {@link BodySubscriber BodySubscriber}{@code } obtained from + * {@link BodySubscriber#asFile(Path, OpenOption...) + * BodySubscriber.asFile(Path,OpenOption...)}. + * + *

    When the {@code HttpResponse} object is returned, the body has + * been completely written to the file, and {@link #body()} returns a + * reference to its {@link Path}. + * + * @param file the filename to store the body in + * @param openOptions any options to use when opening/creating the file + * @return a response body handler + * @throws SecurityException If a security manager has been installed + * and it denies {@link SecurityManager#checkWrite(String) + * write access} to the file. The {@link + * SecurityManager#checkDelete(String) checkDelete} method is + * invoked to check delete access if the file is opened with + * the {@code DELETE_ON_CLOSE} option. + */ + public static BodyHandler asFile(Path file, OpenOption... openOptions) { + Objects.requireNonNull(file); + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + String fn = pathForSecurityCheck(file); + sm.checkWrite(fn); + List opts = Arrays.asList(openOptions); + if (opts.contains(StandardOpenOption.DELETE_ON_CLOSE)) + sm.checkDelete(fn); + if (opts.contains(StandardOpenOption.READ)) + sm.checkRead(fn); + } + return new PathBodyHandler(file, openOptions); + } /** * Returns a {@code BodyHandler} that returns a - * {@link BodyProcessor BodyProcessor}{@code } obtained from - * {@link BodyProcessor#asFile(Path) BodyProcessor.asFile(Path)}. - *

    - * When the {@code HttpResponse} object is returned, the body has been completely - * written to the file, and {@link #body()} returns a reference to its - * {@link Path}. + * {@link BodySubscriber BodySubscriber}{@code } obtained from + * {@link BodySubscriber#asFile(Path) BodySubscriber.asFile(Path)}. + * + *

    When the {@code HttpResponse} object is returned, the body has + * been completely written to the file, and {@link #body()} returns a + * reference to its {@link Path}. * * @param file the file to store the body in * @return a response body handler + * @throws SecurityException if a security manager has been installed + * and it denies {@link SecurityManager#checkWrite(String) + * write access} to the file */ public static BodyHandler asFile(Path file) { - return (status, headers) -> BodyProcessor.asFile(file); + return BodyHandler.asFile(file, StandardOpenOption.CREATE, + StandardOpenOption.WRITE); } /** * Returns a {@code BodyHandler} that returns a - * {@link BodyProcessor BodyProcessor}<{@link Path}> + * {@link BodySubscriber BodySubscriber}<{@link Path}> * where the download directory is specified, but the filename is * obtained from the {@code Content-Disposition} response header. The - * {@code Content-Disposition} header must specify the attachment type - * and must also contain a - * filename parameter. If the filename specifies multiple path - * components only the final component is used as the filename (with the - * given directory name). When the {@code HttpResponse} object is - * returned, the body has been completely written to the file and {@link - * #body()} returns a {@code Path} object for the file. The returned {@code Path} is the + * {@code Content-Disposition} header must specify the attachment + * type and must also contain a filename parameter. If the + * filename specifies multiple path components only the final component + * is used as the filename (with the given directory name). + * + *

    When the {@code HttpResponse} object is returned, the body has + * been completely written to the file and {@link #body()} returns a + * {@code Path} object for the file. The returned {@code Path} is the * combination of the supplied directory name and the file name supplied * by the server. If the destination directory does not exist or cannot * be written to, then the response will fail with an {@link IOException}. @@ -307,245 +439,355 @@ * @param directory the directory to store the file in * @param openOptions open options * @return a response body handler - */ - public static BodyHandler asFileDownload(Path directory, OpenOption... openOptions) { - return (status, headers) -> { - String dispoHeader = headers.firstValue("Content-Disposition") - .orElseThrow(() -> unchecked(new IOException("No Content-Disposition"))); - if (!dispoHeader.startsWith("attachment;")) { - throw unchecked(new IOException("Unknown Content-Disposition type")); - } - int n = dispoHeader.indexOf("filename="); - if (n == -1) { - throw unchecked(new IOException("Bad Content-Disposition type")); - } - int lastsemi = dispoHeader.lastIndexOf(';'); - String disposition; - if (lastsemi < n) { - disposition = dispoHeader.substring(n + 9); - } else { - disposition = dispoHeader.substring(n + 9, lastsemi); - } - Path file = Paths.get(directory.toString(), disposition); - return BodyProcessor.asFile(file, openOptions); - }; + * @throws SecurityException If a security manager has been installed + * and it denies {@link SecurityManager#checkWrite(String) + * write access} to the file. The {@link + * SecurityManager#checkDelete(String) checkDelete} method is + * invoked to check delete access if the file is opened with + * the {@code DELETE_ON_CLOSE} option. + */ + //####: check if the dir exists and is writable?? + public static BodyHandler asFileDownload(Path directory, + OpenOption... openOptions) { + Objects.requireNonNull(directory); + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + String fn = pathForSecurityCheck(directory); + sm.checkWrite(fn); + List opts = Arrays.asList(openOptions); + if (opts.contains(StandardOpenOption.DELETE_ON_CLOSE)) + sm.checkDelete(fn); + if (opts.contains(StandardOpenOption.READ)) + sm.checkRead(fn); + } + return new FileDownloadBodyHandler(directory, openOptions); } /** - * Returns a {@code BodyHandler} that returns a - * {@link BodyProcessor BodyProcessor}{@code } obtained from - * {@link BodyProcessor#asFile(java.nio.file.Path, java.nio.file.OpenOption...) - * BodyProcessor.asFile(Path,OpenOption...)}. - *

    - * When the {@code HttpResponse} object is returned, the body has been completely - * written to the file, and {@link #body()} returns a reference to its - * {@link Path}. + * Returns a {@code BodyHandler} that returns a + * {@link BodySubscriber BodySubscriber}{@code } obtained + * from {@link BodySubscriber#asInputStream() BodySubscriber.asInputStream}. + * + *

    When the {@code HttpResponse} object is returned, the response + * headers will have been completely read, but the body may not have + * been fully received yet. The {@link #body()} method returns an + * {@link InputStream} from which the body can be read as it is received. + * + * @apiNote See {@link BodySubscriber#asInputStream()} for more information. * - * @param file the filename to store the body in - * @param openOptions any options to use when opening/creating the file * @return a response body handler */ - public static BodyHandler asFile(Path file, OpenOption... openOptions) { - return (status, headers) -> BodyProcessor.asFile(file, openOptions); + public static BodyHandler asInputStream() { + return (status, headers) -> BodySubscriber.asInputStream(); } /** * Returns a {@code BodyHandler} that returns a - * {@link BodyProcessor BodyProcessor}{@code } obtained from - * {@link BodyProcessor#asByteArrayConsumer(java.util.function.Consumer) - * BodyProcessor.asByteArrayConsumer(Consumer)}. - *

    - * When the {@code HttpResponse} object is returned, the body has been completely - * written to the consumer. + * {@link BodySubscriber BodySubscriber}{@code } obtained from + * {@link BodySubscriber#asByteArrayConsumer(Consumer) + * BodySubscriber.asByteArrayConsumer(Consumer)}. + * + *

    When the {@code HttpResponse} object is returned, the body has + * been completely written to the consumer. * * @param consumer a Consumer to accept the response body * @return a response body handler */ public static BodyHandler asByteArrayConsumer(Consumer> consumer) { - return (status, headers) -> BodyProcessor.asByteArrayConsumer(consumer); + return (status, headers) -> BodySubscriber.asByteArrayConsumer(consumer); } /** * Returns a {@code BodyHandler} that returns a - * {@link BodyProcessor BodyProcessor}<{@code byte[]}> obtained - * from {@link BodyProcessor#asByteArray() BodyProcessor.asByteArray()}. - *

    - * When the {@code HttpResponse} object is returned, the body has been completely - * written to the byte array. + * {@link BodySubscriber BodySubscriber}<{@code byte[]}> obtained + * from {@link BodySubscriber#asByteArray() BodySubscriber.asByteArray()}. + * + *

    When the {@code HttpResponse} object is returned, the body has + * been completely written to the byte array. * * @return a response body handler */ public static BodyHandler asByteArray() { - return (status, headers) -> BodyProcessor.asByteArray(); + return (status, headers) -> BodySubscriber.asByteArray(); } /** * Returns a {@code BodyHandler} that returns a - * {@link BodyProcessor BodyProcessor}{@code } obtained from - * {@link BodyProcessor#asString(java.nio.charset.Charset) - * BodyProcessor.asString(Charset)}. The body is + * {@link BodySubscriber BodySubscriber}{@code } obtained from + * {@link BodySubscriber#asString(java.nio.charset.Charset) + * BodySubscriber.asString(Charset)}. The body is * decoded using the character set specified in * the {@code Content-encoding} response header. If there is no such * header, or the character set is not supported, then * {@link java.nio.charset.StandardCharsets#UTF_8 UTF_8} is used. - *

    - * When the {@code HttpResponse} object is returned, the body has been completely - * written to the string. + * + *

    When the {@code HttpResponse} object is returned, the body has + * been completely written to the string. * * @return a response body handler */ public static BodyHandler asString() { - return (status, headers) -> BodyProcessor.asString(charsetFrom(headers)); + return (status, headers) -> BodySubscriber.asString(charsetFrom(headers)); } + + /** + * Returns a {@code BodyHandler} which, when invoked, returns a {@linkplain + * BodySubscriber#buffering(BodySubscriber,int) buffering BodySubscriber} + * that buffers data before delivering it to the downstream subscriber. + * These {@code BodySubscriber} instances are created by calling + * {@linkplain BodySubscriber#buffering(BodySubscriber,int) + * BodySubscriber.buffering} with a subscriber obtained from the given + * downstream handler and the {@code bufferSize} parameter. + * + * @param downstreamHandler the downstream handler + * @param bufferSize the buffer size parameter passed to {@linkplain + * BodySubscriber#buffering(BodySubscriber,int) BodySubscriber.buffering} + * @return a body handler + * @throws IllegalArgumentException if {@code bufferSize <= 0} + */ + public static BodyHandler buffering(BodyHandler downstreamHandler, + int bufferSize) { + if (bufferSize <= 0) + throw new IllegalArgumentException("must be greater than 0"); + return (status, headers) -> BodySubscriber + .buffering(downstreamHandler.apply(status, headers), + bufferSize); + } } /** - * A processor for response bodies. + * A subscriber for response bodies. * {@Incubating} - *

    - * The object acts as a {@link Flow.Subscriber}<{@link ByteBuffer}> to - * the HTTP client implementation which publishes ByteBuffers containing the - * response body. The processor converts the incoming buffers of data to - * some user-defined object type {@code T}. - *

    - * The {@link #getBody()} method returns a {@link CompletionStage}{@code } - * that provides the response body object. The {@code CompletionStage} must - * be obtainable at any time. When it completes depends on the nature - * of type {@code T}. In many cases, when {@code T} represents the entire body after being - * read then it completes after the body has been read. If {@code T} is a streaming - * type such as {@link java.io.InputStream} then it completes before the - * body has been read, because the calling code uses it to consume the data. + * + *

    The object acts as a {@link Flow.Subscriber}<{@link List}<{@link + * ByteBuffer}>> to the HTTP client implementation, which publishes + * unmodifiable lists of ByteBuffers containing the response body. The Flow + * of data, as well as the order of ByteBuffers in the Flow lists, is a + * strictly ordered representation of the response body. Both the Lists and + * the ByteBuffers, once passed to the subscriber, are no longer used by the + * HTTP client. The subscriber converts the incoming buffers of data to some + * user-defined object type {@code T}. + * + *

    The {@link #getBody()} method returns a {@link CompletionStage}{@code + * } that provides the response body object. The {@code CompletionStage} + * must be obtainable at any time. When it completes depends on the nature + * of type {@code T}. In many cases, when {@code T} represents the entire + * body after being read then it completes after the body has been read. If + * {@code T} is a streaming type such as {@link java.io.InputStream} then it + * completes before the body has been read, because the calling code uses it + * to consume the data. + * + * @apiNote To ensure that all resources associated with the + * corresponding exchange are properly released, an implementation + * of {@code BodySubscriber} must ensure to {@linkplain + * Flow.Subscription#request request} more data until {@link + * #onComplete() onComplete} or {@link #onError(Throwable) onError} + * are signalled, or {@linkplain Flow.Subscription#request cancel} its + * {@linkplain #onSubscribe(Flow.Subscription) subscription} + * if unable or unwilling to do so. + * Calling {@code cancel} before exhausting the data may cause + * the underlying HTTP connection to be closed and prevent it + * from being reused for subsequent operations. * * @param the response body type */ - public interface BodyProcessor - extends Flow.Subscriber { + public interface BodySubscriber + extends Flow.Subscriber> { /** - * Returns a {@code CompletionStage} which when completed will return the - * response body object. + * Returns a {@code CompletionStage} which when completed will return + * the response body object. * * @return a CompletionStage for the response body */ public CompletionStage getBody(); /** - * Returns a body processor which stores the response body as a {@code + * Returns a body subscriber which stores the response body as a {@code * String} converted using the given {@code Charset}. - *

    - * The {@link HttpResponse} using this processor is available after the - * entire response has been read. + * + *

    The {@link HttpResponse} using this subscriber is available after + * the entire response has been read. * * @param charset the character set to convert the String with - * @return a body processor + * @return a body subscriber */ - public static BodyProcessor asString(Charset charset) { - return new ResponseProcessors.ByteArrayProcessor<>( + public static BodySubscriber asString(Charset charset) { + return new ResponseSubscribers.ByteArraySubscriber<>( bytes -> new String(bytes, charset) ); } /** - * Returns a {@code BodyProcessor} which stores the response body as a + * Returns a {@code BodySubscriber} which stores the response body as a * byte array. - *

    - * The {@link HttpResponse} using this processor is available after the - * entire response has been read. * - * @return a body processor + *

    The {@link HttpResponse} using this subscriber is available after + * the entire response has been read. + * + * @return a body subscriber */ - public static BodyProcessor asByteArray() { - return new ResponseProcessors.ByteArrayProcessor<>( + public static BodySubscriber asByteArray() { + return new ResponseSubscribers.ByteArraySubscriber<>( Function.identity() // no conversion ); } + // no security check + private static BodySubscriber asFileImpl(Path file, OpenOption... openOptions) { + return new ResponseSubscribers.PathSubscriber(file, openOptions); + } + /** - * Returns a {@code BodyProcessor} which stores the response body in a + * Returns a {@code BodySubscriber} which stores the response body in a * file opened with the given options and name. The file will be opened - * with the given options using - * {@link java.nio.channels.FileChannel#open(java.nio.file.Path,java.nio.file.OpenOption...) - * FileChannel.open} just before the body is read. Any exception thrown will be returned - * or thrown from {@link HttpClient#send(jdk.incubator.http.HttpRequest, - * jdk.incubator.http.HttpResponse.BodyHandler) HttpClient::send} - * or {@link HttpClient#sendAsync(jdk.incubator.http.HttpRequest, - * jdk.incubator.http.HttpResponse.BodyHandler) HttpClient::sendAsync} - * as appropriate. - *

    - * The {@link HttpResponse} using this processor is available after the - * entire response has been read. + * with the given options using {@link FileChannel#open(Path,OpenOption...) + * FileChannel.open} just before the body is read. Any exception thrown + * will be returned or thrown from {@link HttpClient#send(HttpRequest, + * BodyHandler) HttpClient::send} or {@link HttpClient#sendAsync(HttpRequest, + * BodyHandler) HttpClient::sendAsync} as appropriate. + * + *

    The {@link HttpResponse} using this subscriber is available after + * the entire response has been read. * * @param file the file to store the body in * @param openOptions the list of options to open the file with - * @return a body processor + * @return a body subscriber + * @throws SecurityException If a security manager has been installed + * and it denies {@link SecurityManager#checkWrite(String) + * write access} to the file. The {@link + * SecurityManager#checkDelete(String) checkDelete} method is + * invoked to check delete access if the file is opened with the + * {@code DELETE_ON_CLOSE} option. + */ + public static BodySubscriber asFile(Path file, OpenOption... openOptions) { + Objects.requireNonNull(file); + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + String fn = pathForSecurityCheck(file); + sm.checkWrite(fn); + List opts = Arrays.asList(openOptions); + if (opts.contains(StandardOpenOption.DELETE_ON_CLOSE)) + sm.checkDelete(fn); + if (opts.contains(StandardOpenOption.READ)) + sm.checkRead(fn); + } + return asFileImpl(file, openOptions); + } + + /** + * Returns a {@code BodySubscriber} which stores the response body in a + * file opened with the given name. Has the same effect as calling + * {@link #asFile(Path, OpenOption...) asFile} with the standard open + * options {@code CREATE} and {@code WRITE} + * + *

    The {@link HttpResponse} using this subscriber is available after + * the entire response has been read. + * + * @param file the file to store the body in + * @return a body subscriber + * @throws SecurityException if a security manager has been installed + * and it denies {@link SecurityManager#checkWrite(String) + * write access} to the file */ - public static BodyProcessor asFile(Path file, OpenOption... openOptions) { - return new ResponseProcessors.PathProcessor(file, openOptions); + public static BodySubscriber asFile(Path file) { + return asFile(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE); } /** - * Returns a {@code BodyProcessor} which provides the incoming body + * Returns a {@code BodySubscriber} which provides the incoming body * data to the provided Consumer of {@code Optional}. Each * call to {@link Consumer#accept(java.lang.Object) Consumer.accept()} - * will contain a non empty {@code Optional}, except for the final invocation after - * all body data has been read, when the {@code Optional} will be empty. - *

    - * The {@link HttpResponse} using this processor is available after the - * entire response has been read. + * will contain a non empty {@code Optional}, except for the final + * invocation after all body data has been read, when the {@code + * Optional} will be empty. + * + *

    The {@link HttpResponse} using this subscriber is available after + * the entire response has been read. * * @param consumer a Consumer of byte arrays - * @return a BodyProcessor + * @return a BodySubscriber */ - public static BodyProcessor asByteArrayConsumer(Consumer> consumer) { - return new ResponseProcessors.ConsumerProcessor(consumer); + public static BodySubscriber asByteArrayConsumer(Consumer> consumer) { + return new ResponseSubscribers.ConsumerSubscriber(consumer); } /** - * Returns a {@code BodyProcessor} which stores the response body in a - * file opened with the given name. Has the same effect as calling - * {@link #asFile(java.nio.file.Path, java.nio.file.OpenOption...) asFile} - * with the standard open options {@code CREATE} and {@code WRITE} - *

    - * The {@link HttpResponse} using this processor is available after the - * entire response has been read. + * Returns a {@code BodySubscriber} which streams the response body as + * an {@link InputStream}. * - * @param file the file to store the body in - * @return a body processor + *

    The {@link HttpResponse} using this subscriber is available + * immediately after the response headers have been read, without + * requiring to wait for the entire body to be processed. The response + * body can then be read directly from the {@link InputStream}. + * + * @apiNote To ensure that all resources associated with the + * corresponding exchange are properly released the caller must + * ensure to either read all bytes until EOF is reached, or call + * {@link InputStream#close} if it is unable or unwilling to do so. + * Calling {@code close} before exhausting the stream may cause + * the underlying HTTP connection to be closed and prevent it + * from being reused for subsequent operations. + * + * @return a body subscriber that streams the response body as an + * {@link InputStream}. */ - public static BodyProcessor asFile(Path file) { - return new ResponseProcessors.PathProcessor( - file, - StandardOpenOption.CREATE, StandardOpenOption.WRITE); + public static BodySubscriber asInputStream() { + return new ResponseSubscribers.HttpResponseInputStream(); } /** - * Returns a response processor which discards the response body. The + * Returns a response subscriber which discards the response body. The * supplied value is the value that will be returned from * {@link HttpResponse#body()}. * * @param The type of the response body - * @param value the value to return from HttpResponse.body() - * @return a {@code BodyProcessor} + * @param value the value to return from HttpResponse.body(), may be {@code null} + * @return a {@code BodySubscriber} */ - public static BodyProcessor discard(U value) { - return new ResponseProcessors.NullProcessor<>(Optional.ofNullable(value)); + public static BodySubscriber discard(U value) { + return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(value)); } + + /** + * Returns a {@code BodySubscriber} which buffers data before delivering + * it to the given downstream subscriber. The subscriber guarantees to + * deliver {@code buffersize} bytes of data to each invocation of the + * downstream's {@linkplain #onNext(Object) onNext} method, except for + * the final invocation, just before {@linkplain #onComplete() onComplete} + * is invoked. The final invocation of {@code onNext} may contain fewer + * than {@code buffersize} bytes. + * + *

    The returned subscriber delegates its {@link #getBody()} method + * to the downstream subscriber. + * + * @param downstream the downstream subscriber + * @param bufferSize the buffer size + * @return a buffering body subscriber + * @throws IllegalArgumentException if {@code bufferSize <= 0} + */ + public static BodySubscriber buffering(BodySubscriber downstream, + int bufferSize) { + if (bufferSize <= 0) + throw new IllegalArgumentException("must be greater than 0"); + return new BufferingSubscriber(downstream, bufferSize); + } } /** - * A response processor for a HTTP/2 multi response. + * A response subscriber for a HTTP/2 multi response. * {@Incubating} - *

    - * A multi response comprises a main response, and zero or more additional + * + *

    A multi response comprises a main response, and zero or more additional * responses. Each additional response is sent by the server in response to - * requests that the server also generates. Additional responses are + * requests (PUSH_PROMISEs) that the server also generates. Additional responses are * typically resources that the server expects the client will need which * are related to the initial request. *

    * Note. Instead of implementing this interface, applications should consider * first using the mechanism (built on this interface) provided by - * {@link MultiProcessor#asMap(java.util.function.Function, boolean) - * MultiProcessor.asMap()} which is a slightly simplified, but + * {@link MultiSubscriber#asMap(java.util.function.Function, boolean) + * MultiSubscriber.asMap()} which is a slightly simplified, but also * general purpose interface. *

    * The server generated requests are also known as push promises. @@ -556,7 +798,7 @@ * the server does not wait for any acknowledgment before sending the * response, this must be done quickly to avoid unnecessary data transmission. * - *

    {@code MultiProcessor}s are parameterized with a type {@code U} which + *

    {@code MultiSubscriber}s are parameterized with a type {@code U} which * represents some meaningful aggregate of the responses received. This * would typically be a collection of response or response body objects. * @@ -565,29 +807,43 @@ * * @since 9 */ - public interface MultiProcessor { + public interface MultiSubscriber { /** - * Called for the main request and each push promise that is received. - * The first call will always be for the main request that was sent - * by the caller. This {@link HttpRequest} parameter - * represents the initial request or subsequent PUSH_PROMISE. The - * implementation must return an {@code Optional} of {@link BodyHandler} for - * the response body. Different handlers (of the same type) can be returned - * for different pushes within the same multi send. If no handler - * (an empty {@code Optional}) is returned, then the push will be canceled. It is - * an error to not return a valid {@code BodyHandler} for the initial (main) request. + * Called for the main request from the user. This {@link HttpRequest} + * parameter is the request that was supplied to {@link + * HttpClient#sendAsync(HttpRequest, MultiSubscriber)}. The + * implementation must return an {@link BodyHandler} for the response + * body. * - * @param request the main request or subsequent push promise + * @param request the request * * @return an optional body handler */ - Optional> onRequest(HttpRequest request); + BodyHandler onRequest(HttpRequest request); + + /** + * Called for each push promise that is received. The {@link HttpRequest} + * parameter represents the PUSH_PROMISE. The implementation must return + * an {@code Optional} of {@link BodyHandler} for the response body. + * Different handlers (of the same type) can be returned for different + * pushes within the same multi send. If no handler (an empty {@code + * Optional}) is returned, then the push will be canceled. If required, + * the {@code CompletableFuture} supplied to the {@code + * onFinalPushPromise} parameter of {@link + * #completion(CompletableFuture, CompletableFuture)} can be used to + * determine when the final PUSH_PROMISE is received. + * + * @param pushPromise the push promise + * + * @return an optional body handler + */ + Optional> onPushPromise(HttpRequest pushPromise); /** * Called for each response received. For each request either one of * onResponse() or onError() is guaranteed to be called, but not both. * - * [Note] The reason for switching to this callback interface rather + *

    Note: The reason for switching to this callback interface rather * than using CompletableFutures supplied to onRequest() is that there * is a subtle interaction between those CFs and the CF returned from * completion() (or when onComplete() was called formerly). The completion() @@ -615,9 +871,11 @@ * Returns a {@link java.util.concurrent.CompletableFuture}{@code } * which completes when the aggregate result object itself is available. * It is expected that the returned {@code CompletableFuture} will depend - * on one of the given {@code CompletableFuture @@ -653,47 +911,50 @@ * generated push promise) is returned as a key of the map. The value * corresponding to each key is a * {@code CompletableFuture>}. - *

    - * There are two ways to use these handlers, depending on the value of - * the completion parameter. If completion is true, then the + * + *

    There are two ways to use these handlers, depending on the value + * of the completion parameter. If completion is true, then the * aggregated result will be available after all responses have * themselves completed. If completion is false, then the * aggregated result will be available immediately after the last push * promise was received. In the former case, this implies that all the * CompletableFutures in the map values will have completed. In the * latter case, they may or may not have completed yet. - *

    - * The simplest way to use these handlers is to set completion to + * + *

    The simplest way to use these handlers is to set completion to * {@code true}, and then all (results) values in the Map will be * accessible without blocking. *

    - * See {@link #asMap(java.util.function.Function, boolean) - * } + * See {@link #asMap(java.util.function.Function, boolean)} * for a code sample of using this interface. * + *

    See {@link #asMap(Function, boolean)} for a code sample of using + * this interface. + * * @param the body type used for all responses - * @param pushHandler a function invoked for each request or push - * promise + * @param reqHandler a function invoked for the user's request and each + * push promise * @param completion {@code true} if the aggregate CompletableFuture - * completes after all responses have been received, or {@code false} - * after all push promises received. + * completes after all responses have been received, + * or {@code false} after all push promises received * - * @return a MultiProcessor + * @return a MultiSubscriber */ - public static MultiProcessor,V> asMap( - Function>> pushHandler, + public static MultiSubscriber,V> asMap( + Function>> reqHandler, boolean completion) { - - return new MultiProcessorImpl(pushHandler, completion); + return new MultiSubscriberImpl(reqHandler.andThen(optv -> optv.get()), + reqHandler, + completion); } /** * Returns a general purpose handler for multi responses. This is a - * convenience method which invokes {@link #asMap(java.util.function.Function,boolean) + * convenience method which invokes {@link #asMap(Function,boolean) * asMap(Function, true)} meaning that the aggregate result * object completes after all responses have been received. - *

    - * Example usage: + * + *

    Example usage: *
    *

              * {@code
    @@ -705,26 +966,26 @@
              *          HttpClient client = HttpClient.newHttpClient();
              *
              *          Map>> results = client
    -         *              .sendAsync(request, MultiProcessor.asMap(
    +         *              .sendAsync(request, MultiSubscriber.asMap(
              *                  (req) -> Optional.of(HttpResponse.BodyHandler.asString())))
              *              .join();
              * }
    - *

    - * The lambda in this example is the simplest possible implementation, + * + *

    The lambda in this example is the simplest possible implementation, * where neither the incoming requests are examined, nor the response * headers, and every push that the server sends is accepted. When the * join() call returns, all {@code HttpResponse}s and their associated * body objects are available. * * @param the body type used for all responses - * @param pushHandler a function invoked for each request or push - * promise - * @return a MultiProcessor + * @param reqHandler a function invoked for each push promise and the + * main request + * @return a MultiSubscriber */ - public static MultiProcessor,V> asMap( - Function>> pushHandler) { + public static MultiSubscriber,V> asMap( + Function>> reqHandler) { - return asMap(pushHandler, true); + return asMap(reqHandler, true); } }