< prev index next >
   1 /*
   2  * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 package java.net.http;
  26 
  27 import java.io.IOException;
  28 import java.io.InputStream;
  29 import java.net.URI;
  30 import java.nio.ByteBuffer;
  31 import java.nio.channels.FileChannel;
  32 import java.nio.charset.Charset;
  33 import java.nio.charset.StandardCharsets;
  34 import java.nio.file.OpenOption;
  35 import java.nio.file.Path;
  36 import java.nio.file.Paths;
  37 import java.nio.file.StandardOpenOption;
  38 import java.util.HashMap;
  39 import java.util.Map;
  40 import java.util.Set;
  41 import java.util.concurrent.BlockingQueue;
  42 import java.util.concurrent.CompletableFuture;
  43 import java.util.concurrent.LinkedBlockingQueue;
  44 import java.util.function.BiFunction;
  45 import java.util.function.Consumer;
  46 import java.util.function.LongConsumer;
  47 import javax.net.ssl.SSLParameters;
  48 
  49 /**
  50  * Represents a response to a {@link HttpRequest}. A {@code HttpResponse} is
  51  * available when the response status code and headers have been received, but
  52  * before the response body is received.
  53  *
  54  * <p> Methods are provided in this class for accessing the response headers,
  55  * and status code immediately and also methods for retrieving the response body.
  56  * Static methods are provided which implement {@link BodyProcessor} for
  57  * standard body types such as {@code String, byte arrays, files}.
  58  *
  59  * <p> The {@link #body(BodyProcessor) body} or {@link #bodyAsync(BodyProcessor)
  60  * bodyAsync} which retrieve any response body must be called to ensure that the
  61  * TCP connection can be re-used subsequently, and any response trailers
  62  * accessed, if they exist, unless it is known that no response body was received.
  63  *
  64  * @since 9
  65  */
  66 public abstract class HttpResponse {
  67 
  68     HttpResponse() { }
  69 
  70     /**
  71      * Returns the status code for this response.
  72      *
  73      * @return the response code
  74      */
  75     public abstract int statusCode();
  76 
  77     /**
  78      * Returns the {@link HttpRequest} for this response.
  79      *
  80      * @return the request
  81      */
  82     public abstract HttpRequest request();
  83 
  84     /**
  85      * Returns the received response headers.
  86      *
  87      * @return the response headers
  88      */
  89     public abstract HttpHeaders headers();
  90 
  91     /**
  92      * Returns the received response trailers, if there are any. This must only
  93      * be called after the response body has been received.
  94      *
  95      * @return the response trailers (may be empty)
  96      * @throws IllegalStateException if the response body has not been received
  97      *                               yet
  98      */
  99     public abstract HttpHeaders trailers();
 100 
 101     /**
 102      * Returns the body, blocking if necessary. The type T is determined by the
 103      * {@link BodyProcessor} implementation supplied. The body object will be
 104      * returned immediately if it is a type (such as {@link java.io.InputStream}
 105      * which reads the data itself. If the body object represents the fully read
 106      * body then it blocks until it is fully read.
 107      *
 108      * @param <T> the type of the returned body object
 109      * @param processor the processor to handle the response body
 110      * @return the body
 111      * @throws java.io.UncheckedIOException if an I/O error occurs reading the
 112      *                                      response
 113      */
 114     public abstract <T> T body(BodyProcessor<T> processor);
 115 
 116     /**
 117      * Returns a {@link java.util.concurrent.CompletableFuture} of type T. This
 118      * always returns immediately and the future completes when the body object
 119      * is available. The body will be available immediately if it is a type
 120      * (such as {@link java.io.InputStream} which reads the data itself. If the
 121      * body object represents the fully read body then it will not be available
 122      * until it is fully read.
 123      *
 124      * @param <T> the type of the returned body object
 125      * @param processor the processor to handle the response body
 126      * @return a CompletableFuture
 127      */
 128     public abstract <T> CompletableFuture<T> bodyAsync(BodyProcessor<T> processor);
 129 
 130     /**
 131      * Returns the {@link javax.net.ssl.SSLParameters} in effect for this
 132      * response. Returns {@code null} if this is not a https response.
 133      *
 134      * @return the SSLParameters associated with the response
 135      */
 136     public abstract SSLParameters sslParameters();
 137 
 138     /**
 139      * Returns the URI that the response was received from. This may be
 140      * different from the request URI if redirection occurred.
 141      *
 142      * @return the URI of the response
 143      */
 144     public abstract URI uri();
 145 
 146     /**
 147      * Returns the HTTP protocol version that was used for this response.
 148      *
 149      * @return HTTP protocol version
 150      */
 151     public abstract HttpClient.Version version();
 152 
 153     /**
 154      * Returns a {@link BodyProcessor}&lt;{@link java.nio.file.Path}&gt; where
 155      * the file is created if it does not already exist. When the Path object is
 156      * returned, the body has been completely written to the file.
 157      *
 158      * @param file the file to store the body in
 159      * @return a {@code BodyProcessor}
 160      */
 161     public static BodyProcessor<Path> asFile(Path file) {
 162         return asFile(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
 163     }
 164 
 165     /**
 166      * Returns a {@link BodyProcessor}&lt;{@link java.nio.file.Path}&gt; where
 167      * the download directory is specified, but the filename is obtained from
 168      * the Content-Disposition response header. The Content-Disposition header
 169      * must specify the <i>attachment</i> type and must also contain a
 170      * <i>filename</i> parameter. If the filename specifies multiple path
 171      * components only the final component is used as the filename (with the
 172      * given directory name). When the Path object is returned, the body has
 173      * been completely written to the file. The returned Path is the combination
 174      * of the supplied directory name and the file name supplied by the server.
 175      * If the destination directory does not exist or cannot be written to, then
 176      * the response will fail with an IOException.
 177      *
 178      * @param directory the directory to store the file in
 179      * @param openOptions open options
 180      * @return a {@code BodyProcessor}
 181      */
 182     public static BodyProcessor<Path> asFileDownload(Path directory,
 183                                                      OpenOption... openOptions) {
 184         return new AbstractResponseProcessor<Path>() {
 185 
 186             FileChannel fc;
 187             Path file;
 188 
 189             @Override
 190             public Path onResponseBodyStartImpl(long contentLength,
 191                                                 HttpHeaders headers)
 192                 throws IOException
 193             {
 194                 String dispoHeader = headers.firstValue("Content-Disposition")
 195                         .orElseThrow(() -> new IOException("No Content-Disposition"));
 196                 if (!dispoHeader.startsWith("attachment;")) {
 197                     throw new IOException("Unknown Content-Disposition type");
 198                 }
 199                 int n = dispoHeader.indexOf("filename=");
 200                 if (n == -1) {
 201                     throw new IOException("Bad Content-Disposition type");
 202                 }
 203                 String disposition = dispoHeader.substring(n + 9,
 204                                                            dispoHeader.lastIndexOf(';'));
 205                 file = Paths.get(directory.toString(), disposition);
 206                 fc = FileChannel.open(file, openOptions);
 207                 return null;
 208             }
 209 
 210             @Override
 211             public void onResponseBodyChunkImpl(ByteBuffer b) throws IOException {
 212                 fc.write(b);
 213             }
 214 
 215             @Override
 216             public Path onResponseComplete() throws IOException {
 217                 fc.close();
 218                 return file;
 219             }
 220 
 221             @Override
 222             public void onResponseError(Throwable t) {
 223                 try {
 224                     if (fc != null) {
 225                         fc.close();
 226                     }
 227                 } catch (IOException e) {
 228                 }
 229             }
 230         };
 231     }
 232 
 233     /**
 234      * Returns a {@link BodyProcessor}&lt;{@link java.nio.file.Path}&gt;.
 235      *
 236      * <p> {@link HttpResponse}s returned using this response processor complete
 237      * after the entire response, including body has been read.
 238      *
 239      * @param file the filename to store the body in
 240      * @param openOptions any options to use when opening/creating the file
 241      * @return a {@code BodyProcessor}
 242      */
 243     public static BodyProcessor<Path> asFile(Path file,
 244                                              OpenOption... openOptions) {
 245         return new AbstractResponseProcessor<Path>() {
 246 
 247             FileChannel fc;
 248 
 249             @Override
 250             public Path onResponseBodyStartImpl(long contentLength,
 251                                                 HttpHeaders headers)
 252                 throws IOException
 253             {
 254                 fc = FileChannel.open(file, openOptions);
 255                 return null;
 256             }
 257 
 258             @Override
 259             public void onResponseBodyChunkImpl(ByteBuffer b)
 260                 throws IOException
 261             {
 262                 fc.write(b);
 263             }
 264 
 265             @Override
 266             public Path onResponseComplete() throws IOException {
 267                 fc.close();
 268                 return file;
 269             }
 270 
 271             @Override
 272             public void onResponseError(Throwable t) {
 273                 try {
 274                     if (fc != null) {
 275                         fc.close();
 276                     }
 277                 } catch (IOException e) {
 278                 }
 279             }
 280         };
 281     }
 282 
 283     static class ByteArrayResponseProcessor {
 284 
 285         static final int INITIAL_BUFLEN = 1024;
 286 
 287         byte[] buffer;
 288         int capacity;
 289         boolean knownLength;
 290         int position;
 291 
 292         ByteArrayResponseProcessor() { }
 293 
 294         public byte[] onStart(long contentLength) throws IOException {
 295             if (contentLength > Integer.MAX_VALUE) {
 296                 throw new IllegalArgumentException(
 297                         "byte array response limited to MAX_INT size");
 298             }
 299             capacity = (int) contentLength;
 300             if (capacity != -1) {
 301                 buffer = new byte[capacity];
 302                 knownLength = true;
 303             } else {
 304                 buffer = new byte[INITIAL_BUFLEN];
 305                 capacity = INITIAL_BUFLEN;
 306                 knownLength = false;
 307             }
 308             position = 0;
 309             return null;
 310         }
 311 
 312         public void onBodyContent(ByteBuffer b) throws IOException {
 313             int toCopy = b.remaining();
 314             int size = capacity;
 315             if (toCopy > capacity - position) {
 316                 // resize
 317                 size += toCopy * 2;
 318             }
 319             if (size != capacity) {
 320                 if (knownLength) {
 321                     // capacity should have been right from start
 322                     throw new IOException("Inconsistent content length");
 323                 }
 324                 byte[] newbuf = new byte[size];
 325                 System.arraycopy(buffer, 0, newbuf, 0, position);
 326                 buffer = newbuf;
 327                 capacity = size;
 328             }
 329             int srcposition = b.arrayOffset() + b.position();
 330             System.arraycopy(b.array(), srcposition, buffer, position, toCopy);
 331             b.position(b.limit());
 332             position += toCopy;
 333         }
 334 
 335         public byte[] onComplete() throws IOException {
 336             if (knownLength) {
 337                 if (position != capacity) {
 338                     throw new IOException("Wrong number of bytes received");
 339                 }
 340                 return buffer;
 341             }
 342             byte[] buf1 = new byte[position];
 343             System.arraycopy(buffer, 0, buf1, 0, position);
 344             return buf1;
 345         }
 346 
 347         public void onError(Throwable t) {
 348             // TODO:
 349         }
 350     }
 351 
 352     static final byte[] EMPTY = new byte[0];
 353 
 354     /**
 355      * Returns a response processor which supplies the response body to the
 356      * given Consumer. Each time data is received the consumer is invoked with a
 357      * byte[] containing at least one byte of data. After the final buffer is
 358      * received, the consumer is invoked one last time, with an empty byte
 359      * array.
 360      *
 361      * @param consumer a Consumer to accept the response body
 362      * @return a {@code BodyProcessor}
 363      */
 364     public static BodyProcessor<Void> asByteArrayConsumer(Consumer<byte[]> consumer) {
 365         return new AbstractResponseProcessor<Void>() {
 366             @Override
 367             public Void onResponseBodyStartImpl(long clen,
 368                                                 HttpHeaders h)
 369                 throws IOException
 370             {
 371                 return null;
 372             }
 373 
 374             @Override
 375             public void onResponseError(Throwable t) {
 376             }
 377 
 378             @Override
 379             public void onResponseBodyChunkImpl(ByteBuffer b) throws IOException {
 380                 if (!b.hasRemaining()) {
 381                     return;
 382                 }
 383                 byte[] buf = new byte[b.remaining()];
 384                 b.get(buf);
 385                 consumer.accept(buf);
 386             }
 387 
 388             @Override
 389             public Void onResponseComplete() throws IOException {
 390                 consumer.accept(EMPTY);
 391                 return null;
 392             }
 393         };
 394     }
 395 
 396     /**
 397      * Returns a BodyProcessor which delivers the response data to a
 398      * {@link java.util.concurrent.Flow.Subscriber}{@code ByteBuffer}.
 399      * <p>
 400      * The given {@code Supplier<U>} is invoked when the Flow is completed in
 401      * order to convert the flow data into the U object that is returned as the
 402      * response body.
 403      *
 404      * @param <U> the response body type
 405      * @param subscriber the Flow.Subscriber
 406      * @param bufferSize the maximum number of bytes of data to be supplied in
 407      * each ByteBuffer
 408      * @param bodySupplier an object that converts the received data to the body
 409      * type U.
 410      * @return a BodyProcessor
 411      *
 412      * public static <U> BodyProcessor<Flow.Subscriber<ByteBuffer>>
 413      * asFlowSubscriber() {
 414      *
 415      * return new BodyProcessor<U>() { Flow.Subscriber<ByteBuffer> subscriber;
 416      * LongConsumer flowController; FlowSubscription subscription; Supplier<U>
 417      * bodySupplier; int bufferSize; // down-stream Flow window. long
 418      * buffersWindow; // upstream window long bytesWindow;
 419      * LinkedList<ByteBuffer> buffers = new LinkedList<>();
 420      *
 421      * class FlowSubscription implements Subscription { int recurseLevel = 0;
 422      * @Override public void request(long n) { boolean goodToGo = recurseLevel++
 423      * == 0;
 424      *
 425      * while (goodToGo && buffers.size() > 0 && n > 0) { ByteBuffer buf =
 426      * buffers.get(0); subscriber.onNext(buf); n--; } buffersWindow += n;
 427      * flowController.accept(n * bufferSize); recurseLevel--; }
 428      *
 429      * @Override public void cancel() { // ?? set flag and throw exception on
 430      * next receipt of buffer } }
 431      *
 432      * @Override public U onResponseBodyStart(long contentLength, HttpHeaders
 433      * responseHeaders, LongConsumer flowController) throws IOException {
 434      * this.subscriber = subscriber; this.flowController = flowController;
 435      * this.subscription = new FlowSubscription(); this.bufferSize = bufferSize;
 436      * subscriber.onSubscribe(subscription); return null; }
 437      *
 438      * @Override public void onResponseError(Throwable t) {
 439      * subscriber.onError(t); }
 440      *
 441      * @Override public void onResponseBodyChunk(ByteBuffer b) throws
 442      * IOException { if (buffersWindow > 0) { buffersWindow --;
 443      * subscriber.onNext(b); } else { buffers.add(b); // or could combine
 444      * buffers? } }
 445      *
 446      * @Override public U onResponseComplete() throws IOException {
 447      * subscriber.onComplete(); return bodySupplier.get(); } }; }
 448      */
 449     private static final ByteBuffer EOF = ByteBuffer.allocate(0);
 450     private static final ByteBuffer CLOSED = ByteBuffer.allocate(0);
 451 
 452     // prototype using ByteBuffer based flow control. InputStream feeds off a
 453     // BlockingQueue. Size of Q is determined from the the bufsize (bytes) and
 454     // the default ByteBuffer size. bufsize should be a reasonable multiple of
 455     // ByteBuffer size to prevent underflow/starvation. The InputStream updates
 456     // the flowControl window by one as each ByteBuffer is fully consumed.
 457     // Special sentinels are used to indicate stream closed and EOF.
 458     /**
 459      * Returns a response body processor which provides an InputStream to read
 460      * the body.
 461      *
 462      * @implNote This mechanism is provided primarily for backwards
 463      * compatibility for code that expects InputStream. It is recommended for
 464      * better performance to use one of the other response processor
 465      * implementations.
 466      *
 467      * @return a {@code BodyProcessor}
 468      */
 469     public static BodyProcessor<InputStream> asInputStream() {
 470         return new BodyProcessor<InputStream>() {
 471             int queueSize = 2;
 472             private volatile Throwable throwable;
 473 
 474             BlockingQueue<ByteBuffer> queue  = new LinkedBlockingQueue<>();
 475 
 476             private void closeImpl() {
 477                 try {
 478                     queue.put(CLOSED);
 479                 } catch (InterruptedException e) { }
 480             }
 481 
 482             @Override
 483             public InputStream onResponseBodyStart(long contentLength,
 484                                                    HttpHeaders responseHeaders,
 485                                                    LongConsumer flowController)
 486                 throws IOException
 487             {
 488                 flowController.accept(queueSize);
 489 
 490                 return new InputStream() {
 491                     ByteBuffer buffer;
 492 
 493                     @Override
 494                     public int read() throws IOException {
 495                         byte[] bb = new byte[1];
 496                         int n = read(bb, 0, 1);
 497                         if (n == -1) {
 498                             return -1;
 499                         } else {
 500                             return bb[0];
 501                         }
 502                     }
 503 
 504                     @Override
 505                     public int read(byte[] bb) throws IOException {
 506                         return read(bb, 0, bb.length);
 507                     }
 508 
 509                     @Override
 510                     public int read(byte[] bb, int offset, int length)
 511                         throws IOException
 512                     {
 513                         int n;
 514                         if (getBuffer()) {
 515                             return -1; // EOF
 516                         } else {
 517                             int remaining = buffer.remaining();
 518                             if (length >= remaining) {
 519                                 buffer.get(bb, offset, remaining);
 520                                 return remaining;
 521                             } else {
 522                                 buffer.get(bb, offset, length);
 523                                 return length;
 524                             }
 525                         }
 526                     }
 527 
 528                     @Override
 529                     public void close() {
 530                         closeImpl();
 531                     }
 532 
 533                     private boolean getBuffer() throws IOException {
 534                         while (buffer == null || (buffer != EOF &&
 535                                 buffer != CLOSED && !buffer.hasRemaining())) {
 536                             try {
 537                                 buffer = queue.take();
 538                                 flowController.accept(1);
 539                             } catch (InterruptedException e) {
 540                                 throw new IOException(e);
 541                             }
 542                         }
 543                         if (buffer == CLOSED) {
 544                             if (throwable != null) {
 545                                 if (throwable instanceof IOException) {
 546                                     throw (IOException) throwable;
 547                                 } else {
 548                                     throw new IOException(throwable);
 549                                 }
 550                             }
 551                             throw new IOException("Closed");
 552                         }
 553 
 554                         if (buffer == EOF) {
 555                             return true; // EOF
 556                         }
 557                         return false; // not EOF
 558                     }
 559 
 560                 };
 561             }
 562 
 563             @Override
 564             public void onResponseError(Throwable t) {
 565                 throwable = t;
 566                 closeImpl();
 567             }
 568 
 569             @Override
 570             public void onResponseBodyChunk(ByteBuffer b) throws IOException {
 571                 try {
 572                     queue.put(Utils.copy(b));
 573                 } catch (InterruptedException e) {
 574                     // shouldn't happen as queue should never block
 575                     throw new IOException(e);
 576                 }
 577             }
 578 
 579             @Override
 580             public InputStream onResponseComplete() throws IOException {
 581                 try {
 582                     queue.put(EOF);
 583                 } catch (InterruptedException e) {
 584                     throw new IOException(e); // can't happen
 585                 }
 586                 return null;
 587             }
 588 
 589         };
 590     }
 591 
 592     /**
 593      * Common super class that takes care of flow control
 594      *
 595      * @param <T>
 596      */
 597     private static abstract class AbstractResponseProcessor<T>
 598         implements BodyProcessor<T>
 599     {
 600         LongConsumer flowController;
 601 
 602         @Override
 603         public final T onResponseBodyStart(long contentLength,
 604                                            HttpHeaders responseHeaders,
 605                                            LongConsumer flowController)
 606             throws IOException
 607         {
 608             this.flowController = flowController;
 609             flowController.accept(1);
 610             return onResponseBodyStartImpl(contentLength, responseHeaders);
 611         }
 612 
 613         public abstract T onResponseBodyStartImpl(long contentLength,
 614                                                   HttpHeaders responseHeaders)
 615             throws IOException;
 616 
 617         public abstract void onResponseBodyChunkImpl(ByteBuffer b)
 618             throws IOException;
 619 
 620         @Override
 621         public final void onResponseBodyChunk(ByteBuffer b) throws IOException {
 622             onResponseBodyChunkImpl(b);
 623             flowController.accept(1);
 624         }
 625     }
 626 
 627     /**
 628      * Returns a {@link BodyProcessor}&lt;byte[]&gt; which returns the response
 629      * body as a {@code byte array}.
 630      *
 631      * @return a {@code BodyProcessor}
 632      */
 633     public static BodyProcessor<byte[]> asByteArray() {
 634         ByteArrayResponseProcessor brp = new ByteArrayResponseProcessor();
 635 
 636         return new AbstractResponseProcessor<byte[]>() {
 637 
 638             @Override
 639             public byte[] onResponseBodyStartImpl(long contentLength,
 640                                                   HttpHeaders h)
 641                 throws IOException
 642             {
 643                 brp.onStart(contentLength);
 644                 return null;
 645             }
 646 
 647             @Override
 648             public void onResponseBodyChunkImpl(ByteBuffer b)
 649                 throws IOException
 650             {
 651                 brp.onBodyContent(b);
 652             }
 653 
 654             @Override
 655             public byte[] onResponseComplete() throws IOException {
 656                 return brp.onComplete();
 657             }
 658 
 659             @Override
 660             public void onResponseError(Throwable t) {
 661                 brp.onError(t);
 662             }
 663         };
 664     }
 665 
 666     /**
 667      * Returns a response processor which decodes the body using the character
 668      * set specified in the {@code Content-encoding} response header. If there
 669      * is no such header, or the character set is not supported, then
 670      * {@link java.nio.charset.StandardCharsets#ISO_8859_1 ISO_8859_1} is used.
 671      *
 672      * @return a {@code BodyProcessor}
 673      */
 674     public static BodyProcessor<String> asString() {
 675         return asString(null);
 676     }
 677 
 678     /**
 679      * Returns a MultiProcessor that handles multiple responses, writes the
 680      * response bodies to files and which returns an aggregate response object
 681      * that is a {@code Map<URI,Path>}. The keyset of the Map represents the
 682      * URIs of the original request and any additional requests generated by the
 683      * server. The values are the paths of the destination files. Each path uses
 684      * the URI path of the request offset from the destination parent directory
 685      * provided.
 686      *
 687      * <p> All incoming additional requests (push promises) are accepted by this
 688      * multi response processor. Errors are effectively ignored and any failed
 689      * responses are simply omitted from the result Map. Other implementations
 690      * of MultiProcessor can handle these situations
 691      *
 692      * <p><b>Example usage</b>
 693      * <pre>
 694      * {@code
 695      *    CompletableFuture<Map<URI,Path>> cf =
 696      *    HttpRequest.create(new URI("https://www.foo.com/"))
 697      *               .version(Version.HTTP2)
 698      *               .GET()
 699      *               .sendAsyncMulti(HttpResponse.multiFile("/usr/destination"));
 700      *
 701      *    Map<URI,Path> results = cf.join();
 702      * }
 703      * </pre>
 704      *
 705      * @param destination the destination parent directory of all response
 706      * bodies
 707      * @return a MultiProcessor
 708      */
 709     public static MultiProcessor<Map<URI, Path>> multiFile(Path destination) {
 710 
 711         return new MultiProcessor<Map<URI, Path>>() {
 712             Map<URI, CompletableFuture<Path>> bodyCFs = new HashMap<>();
 713 
 714             Map<URI, Path> results = new HashMap<>();
 715 
 716             @Override
 717             public BiFunction<HttpRequest, CompletableFuture<HttpResponse>, Boolean>
 718             onStart(HttpRequest mainRequest,
 719                     CompletableFuture<HttpResponse> response) {
 720                 bodyCFs.put(mainRequest.uri(), getBody(mainRequest, response));
 721                 return (HttpRequest additional, CompletableFuture<HttpResponse> cf) -> {
 722                     CompletableFuture<Path> bcf = getBody(additional, cf);
 723                     bodyCFs.put(additional.uri(), bcf);
 724                     // we accept all comers
 725                     return true;
 726                 };
 727             }
 728 
 729             private CompletableFuture<Path> getBody(HttpRequest req,
 730                                                     CompletableFuture<HttpResponse> cf) {
 731                 URI u = req.uri();
 732                 String path = u.getPath();
 733                 return cf.thenCompose((HttpResponse resp) -> {
 734                     return resp.bodyAsync(HttpResponse.asFile(destination.resolve(path)));
 735                 });
 736             }
 737 
 738             @Override
 739             public Map<URI, Path> onComplete() {
 740                 // all CFs have completed normally or in error.
 741                 Set<Map.Entry<URI, CompletableFuture<Path>>> entries = bodyCFs.entrySet();
 742                 for (Map.Entry<URI, CompletableFuture<Path>> entry : entries) {
 743                     CompletableFuture<Path> v = entry.getValue();
 744                     URI uri = entry.getKey();
 745                     if (v.isDone() && !v.isCompletedExceptionally()) {
 746                         results.put(uri, v.join());
 747                     }
 748                 }
 749                 return results;
 750             }
 751         };
 752     }
 753 
 754     /**
 755      * Returns a {@link BodyProcessor}&lt;{@link String}&gt;.
 756      *
 757      * @param charset the name of the charset to interpret the body as. If
 758      * {@code null} then the processor tries to determine the character set from
 759      * the {@code Content-encoding} header. If that charset is not supported
 760      * then {@link java.nio.charset.StandardCharsets#ISO_8859_1 ISO_8859_1} is
 761      * used.
 762      * @return a {@code BodyProcessor}
 763      */
 764     public static BodyProcessor<String> asString(Charset charset) {
 765 
 766         ByteArrayResponseProcessor brp = new ByteArrayResponseProcessor();
 767 
 768         return new AbstractResponseProcessor<String>() {
 769             Charset cs = charset;
 770             HttpHeaders headers;
 771 
 772             @Override
 773             public String onResponseBodyStartImpl(long contentLength,
 774                                                   HttpHeaders h)
 775                 throws IOException
 776             {
 777                 headers = h;
 778                 brp.onStart(contentLength);
 779                 return null;
 780             }
 781 
 782             @Override
 783             public void onResponseBodyChunkImpl(ByteBuffer b) throws IOException {
 784                 brp.onBodyContent(b);
 785             }
 786 
 787             @Override
 788             public String onResponseComplete() throws IOException {
 789                 byte[] buf = brp.onComplete();
 790                 if (cs == null) {
 791                     cs = headers.firstValue("Content-encoding")
 792                                 .map((String s) -> Charset.forName(s))
 793                                 .orElse(StandardCharsets.ISO_8859_1);
 794                 }
 795                 return new String(buf, cs);
 796             }
 797 
 798             @Override
 799             public void onResponseError(Throwable t) {
 800                 brp.onError(t);
 801             }
 802 
 803         };
 804     }
 805 
 806     /**
 807      * Returns a response processor which ignores the response body.
 808      *
 809      * @return a {@code BodyProcessor}
 810      */
 811     public static BodyProcessor<Void> ignoreBody() {
 812         return asByteArrayConsumer((byte[] buf) -> { /* ignore */ });
 813     }
 814 
 815     /**
 816      * A processor for response bodies, which determines the type of the
 817      * response body returned from {@link HttpResponse}. Response processors can
 818      * either return an object that represents the body itself (after it has
 819      * been read) or else an object that is used to read the body (such as an
 820      * {@code InputStream}). The parameterized type {@code <T>} is the type of
 821      * the returned body object from
 822      * {@link HttpResponse#body(BodyProcessor) HttpResponse.body} and
 823      * (indirectly) from {@link HttpResponse#bodyAsync(BodyProcessor)
 824      * HttpResponse.bodyAsync}.
 825      *
 826      * <p> Implementations of this interface are provided in {@link HttpResponse}
 827      * which write responses to {@code String, byte[], File, Consumer<byte[]>}.
 828      * Custom implementations can also be used.
 829      *
 830      * <p> The methods of this interface may be called from multiple threads,
 831      * but only one method is invoked at a time, and behaves as if called from
 832      * one thread.
 833      *
 834      * @param <T> the type of the response body
 835      *
 836      * @since 9
 837      */
 838     public interface BodyProcessor<T> {
 839 
 840         /**
 841          * Called immediately before the response body is read. If {@code <T>}
 842          * is an object used to read or accept the response body, such as a
 843          * {@code Consumer} or {@code InputStream} then it should be returned
 844          * from this method, and the body object will be returned before any
 845          * data is read. If {@code <T>} represents the body itself after being
 846          * read, then this method must return {@code null} and the body will be
 847          * returned from {@link #onResponseComplete()}. In both cases, the
 848          * actual body data is provided by the
 849          * {@link #onResponseBodyChunk(ByteBuffer) onResponseBodyChunk} method
 850          * in exactly the same way.
 851          *
 852          * <p> flowController is a consumer of long values and is used for
 853          * updating a flow control window as follows. The window represents the
 854          * number of times
 855          * {@link #onResponseBodyChunk(java.nio.ByteBuffer) onResponseBodyChunk}
 856          * may be called before receiving further updates to the window. Each
 857          * time it is called, the window is reduced by {@code 1}. When the
 858          * window reaches zero {@code onResponseBodyChunk()} will not be called
 859          * again until the window has opened again with further calls to
 860          * flowController.accept().
 861          * {@link java.util.function.LongConsumer#accept(long) flowcontroller.accept()}
 862          * must be called to open (increase) the window by the specified amount.
 863          * The initial value is zero. This implies that if {@code
 864          * onResponseBodyStart()} does not call {@code flowController.accept()}
 865          * with a positive value no data will ever be delivered.
 866          *
 867          * @param contentLength {@code -1} signifies unknown content length.
 868          *                      Otherwise, a positive integer, or zero.
 869          * @param responseHeaders the response headers
 870          * @param flowController a LongConsumer used to update the flow control
 871          *                       window
 872          * @return {@code null} or an object that can be used to read the
 873          *         response body.
 874          * @throws IOException if an exception occurs starting the response
 875          *                     body receive
 876          */
 877         T onResponseBodyStart(long contentLength,
 878                               HttpHeaders responseHeaders,
 879                               LongConsumer flowController)
 880             throws IOException;
 881 
 882         /**
 883          * Called if an error occurs while reading the response body. This
 884          * terminates the operation and no further calls will occur after this.
 885          *
 886          * @param t the Throwable
 887          */
 888         void onResponseError(Throwable t);
 889 
 890         /**
 891          * Called for each buffer of data received for this response.
 892          * ByteBuffers can be reused as soon as this method returns.
 893          *
 894          * @param b a ByteBuffer whose position is at the first byte that can be
 895          *          read, and whose limit is after the last byte that can be read
 896          * @throws IOException in case of I/O error
 897          */
 898         void onResponseBodyChunk(ByteBuffer b) throws IOException;
 899 
 900         /**
 901          * Called after the last time
 902          * {@link #onResponseBodyChunk(java.nio.ByteBuffer)} has been called and
 903          * returned indicating that the entire content has been read. This
 904          * method must return an object that represents or contains the response
 905          * body just received, but only if an object was not returned from
 906          * {@link #onResponseBodyStart(long, HttpHeaders, LongConsumer)
 907          * onResponseBodyStart}.
 908          *
 909          * @return a T, or {@code null} if an object was already returned
 910          * @throws IOException in case of I/O error
 911          */
 912         T onResponseComplete() throws IOException;
 913     }
 914 
 915     /**
 916      * A response processor for a HTTP/2 multi response. A multi response
 917      * comprises a main response, and zero or more additional responses. Each
 918      * additional response is sent by the server in response to requests that
 919      * the server also generates. Additional responses are typically resources
 920      * that the server guesses the client will need which are related to the
 921      * initial request.
 922      *
 923      * <p>The server generated requests are also known as <i>push promises</i>.
 924      * The server is permitted to send any number of these requests up to the
 925      * point where the main response is fully received. Therefore, after
 926      * completion of the main response body, the final number of additional
 927      * responses is known. Additional responses may be cancelled, but given that
 928      * the server does not wait for any acknowledgment before sending the
 929      * response, this must be done quickly to avoid unnecessary data transmission.
 930      *
 931      * <p> {@code MultiProcessor}s are parameterised with a type {@code T} which
 932      * represents some meaningful aggregate of the responses received. This
 933      * would typically be a Collection of response or response body objects. One
 934      * example implementation can be found at {@link
 935      * HttpResponse#multiFile(java.nio.file.Path)}.
 936      *
 937      * @param <T> a type representing the aggregated results
 938      *
 939      * @since 9
 940      */
 941     public interface MultiProcessor<T> {
 942 
 943         /**
 944          * Called before or soon after a multi request is sent. The request that
 945          * initiated the multi response is supplied, as well as a
 946          * CompletableFuture for the main response. The implementation of this
 947          * method must return a BiFunction which is called once for each push
 948          * promise received.
 949          *
 950          * <p> The parameters to the {@code BiFunction} are the {@code HttpRequest}
 951          * for the push promise and a {@code CompletableFuture} for its
 952          * response. The function must return a Boolean indicating whether the
 953          * push promise has been accepted (true) or should be canceled (false).
 954          * The CompletableFutures for any canceled pushes are themselves
 955          * completed exceptionally soon after the function returns.
 956          *
 957          * @param mainRequest the main request
 958          * @param response a CompletableFuture for the main response
 959          * @return a BiFunction that is called for each push promise
 960          */
 961         BiFunction<HttpRequest, CompletableFuture<HttpResponse>, Boolean>
 962         onStart(HttpRequest mainRequest,
 963                 CompletableFuture<HttpResponse> response);
 964 
 965         /**
 966          * Called after all responses associated with the multi response have
 967          * been fully processed, including response bodies.
 968          *
 969          * <p> Example types for {@code T} could be Collections of response body
 970          * types or {@code Map}s from request {@code URI} to a response body
 971          * type.
 972          *
 973          * @return the aggregate response object
 974          */
 975         T onComplete();
 976     }
 977 }
< prev index next >