1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.InputStreamReader; 27 import java.io.Reader; 28 import java.net.URI; 29 import jdk.incubator.http.HttpClient; 30 import jdk.incubator.http.HttpHeaders; 31 import jdk.incubator.http.HttpRequest; 32 import jdk.incubator.http.HttpResponse; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.Charset; 35 import java.util.Iterator; 36 import java.util.List; 37 import java.util.Locale; 38 import java.util.Optional; 39 import java.util.concurrent.ArrayBlockingQueue; 40 import java.util.concurrent.BlockingQueue; 41 import java.util.concurrent.CompletableFuture; 42 import java.util.concurrent.CompletionStage; 43 import java.util.concurrent.Flow; 44 import java.util.stream.Stream; 45 import static java.lang.System.err; 46 47 /* 48 * @test 49 * @summary An example on how to read a response body with InputStream... 50 * @run main/othervm -Dtest.debug=true HttpInputStreamTest 51 * @author daniel fuchs 52 */ 53 public class HttpInputStreamTest { 54 55 public static boolean DEBUG = Boolean.getBoolean("test.debug"); 56 57 /** 58 * A simple HttpResponse.BodyHandler that creates a live 59 * InputStream to read the response body from the underlying ByteBuffer 60 * Flow. 61 * The InputStream is made immediately available for consumption, before 62 * the response body is fully received. 63 */ 64 public static class HttpInputStreamHandler 65 implements HttpResponse.BodyHandler<InputStream> { 66 67 public static final int MAX_BUFFERS_IN_QUEUE = 1; // lock-step with the producer 68 69 private final int maxBuffers; 70 71 public HttpInputStreamHandler() { 72 this(MAX_BUFFERS_IN_QUEUE); 73 } 74 75 public HttpInputStreamHandler(int maxBuffers) { 76 this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 77 } 78 79 @Override 80 public HttpResponse.BodySubscriber<InputStream> 81 apply(int i, HttpHeaders hh) { 82 return new HttpResponseInputStream(maxBuffers); 83 } 84 85 /** 86 * An InputStream built on top of the Flow API. 87 */ 88 private static class HttpResponseInputStream extends InputStream 89 implements HttpResponse.BodySubscriber<InputStream> { 90 91 // An immutable ByteBuffer sentinel to mark that the last byte was received. 92 private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]); 93 private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER); 94 95 // A queue of yet unprocessed ByteBuffers received from the flow API. 96 private final BlockingQueue<List<ByteBuffer>> buffers; 97 private volatile Flow.Subscription subscription; 98 private volatile boolean closed; 99 private volatile Throwable failed; 100 private volatile Iterator<ByteBuffer> currentListItr; 101 private volatile ByteBuffer currentBuffer; 102 103 HttpResponseInputStream() { 104 this(MAX_BUFFERS_IN_QUEUE); 105 } 106 107 HttpResponseInputStream(int maxBuffers) { 108 int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 109 // 1 additional slot for LAST_LIST added by onComplete 110 this.buffers = new ArrayBlockingQueue<>(capacity + 1); 111 } 112 113 @Override 114 public CompletionStage<InputStream> getBody() { 115 // Return the stream immediately, before the 116 // response body is received. 117 // This makes it possible for senAsync().get().body() 118 // to complete before the response body is received. 119 return CompletableFuture.completedStage(this); 120 } 121 122 // Returns the current byte buffer to read from. 123 // If the current buffer has no remaining data, will take the 124 // next buffer from the buffers queue, possibly blocking until 125 // a new buffer is made available through the Flow API, or the 126 // end of the flow is reached. 127 private ByteBuffer current() throws IOException { 128 while (currentBuffer == null || !currentBuffer.hasRemaining()) { 129 // Check whether the stream is closed or exhausted 130 if (closed || failed != null) { 131 throw new IOException("closed", failed); 132 } 133 if (currentBuffer == LAST_BUFFER) break; 134 135 try { 136 if (currentListItr == null || !currentListItr.hasNext()) { 137 // Take a new list of buffers from the queue, blocking 138 // if none is available yet... 139 140 if (DEBUG) err.println("Taking list of Buffers"); 141 List<ByteBuffer> lb = buffers.take(); 142 currentListItr = lb.iterator(); 143 if (DEBUG) err.println("List of Buffers Taken"); 144 145 // Check whether an exception was encountered upstream 146 if (closed || failed != null) 147 throw new IOException("closed", failed); 148 149 // Check whether we're done. 150 if (lb == LAST_LIST) { 151 currentListItr = null; 152 currentBuffer = LAST_BUFFER; 153 break; 154 } 155 156 // Request another upstream item ( list of buffers ) 157 Flow.Subscription s = subscription; 158 if (s != null) 159 s.request(1); 160 } 161 assert currentListItr != null; 162 assert currentListItr.hasNext(); 163 if (DEBUG) err.println("Next Buffer"); 164 currentBuffer = currentListItr.next(); 165 } catch (InterruptedException ex) { 166 // continue 167 } 168 } 169 assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining(); 170 return currentBuffer; 171 } 172 173 @Override 174 public int read(byte[] bytes, int off, int len) throws IOException { 175 // get the buffer to read from, possibly blocking if 176 // none is available 177 ByteBuffer buffer; 178 if ((buffer = current()) == LAST_BUFFER) return -1; 179 180 // don't attempt to read more than what is available 181 // in the current buffer. 182 int read = Math.min(buffer.remaining(), len); 183 assert read > 0 && read <= buffer.remaining(); 184 185 // buffer.get() will do the boundary check for us. 186 buffer.get(bytes, off, read); 187 return read; 188 } 189 190 @Override 191 public int read() throws IOException { 192 ByteBuffer buffer; 193 if ((buffer = current()) == LAST_BUFFER) return -1; 194 return buffer.get() & 0xFF; 195 } 196 197 @Override 198 public void onSubscribe(Flow.Subscription s) { 199 if (this.subscription != null) { 200 s.cancel(); 201 return; 202 } 203 this.subscription = s; 204 assert buffers.remainingCapacity() > 1; // should at least be 2 205 if (DEBUG) err.println("onSubscribe: requesting " 206 + Math.max(1, buffers.remainingCapacity() - 1)); 207 s.request(Math.max(1, buffers.remainingCapacity() - 1)); 208 } 209 210 @Override 211 public void onNext(List<ByteBuffer> t) { 212 try { 213 if (DEBUG) err.println("next item received"); 214 if (!buffers.offer(t)) { 215 throw new IllegalStateException("queue is full"); 216 } 217 if (DEBUG) err.println("item offered"); 218 } catch (Exception ex) { 219 failed = ex; 220 try { 221 close(); 222 } catch (IOException ex1) { 223 // OK 224 } 225 } 226 } 227 228 @Override 229 public void onError(Throwable thrwbl) { 230 subscription = null; 231 failed = thrwbl; 232 } 233 234 @Override 235 public void onComplete() { 236 subscription = null; 237 onNext(LAST_LIST); 238 } 239 240 @Override 241 public void close() throws IOException { 242 synchronized (this) { 243 if (closed) return; 244 closed = true; 245 } 246 Flow.Subscription s = subscription; 247 subscription = null; 248 if (s != null) { 249 s.cancel(); 250 } 251 super.close(); 252 } 253 254 } 255 } 256 257 /** 258 * Examine the response headers to figure out the charset used to 259 * encode the body content. 260 * If the content type is not textual, returns an empty Optional. 261 * Otherwise, returns the body content's charset, defaulting to 262 * ISO-8859-1 if none is explicitly specified. 263 * @param headers The response headers. 264 * @return The charset to use for decoding the response body, if 265 * the response body content is text/... 266 */ 267 public static Optional<Charset> getCharset(HttpHeaders headers) { 268 Optional<String> contentType = headers.firstValue("Content-Type"); 269 Optional<Charset> charset = Optional.empty(); 270 if (contentType.isPresent()) { 271 final String[] values = contentType.get().split(";"); 272 if (values[0].startsWith("text/")) { 273 charset = Optional.of(Stream.of(values) 274 .map(x -> x.toLowerCase(Locale.ROOT)) 275 .map(String::trim) 276 .filter(x -> x.startsWith("charset=")) 277 .map(x -> x.substring("charset=".length())) 278 .findFirst() 279 .orElse("ISO-8859-1")) 280 .map(Charset::forName); 281 } 282 } 283 return charset; 284 } 285 286 public static void main(String[] args) throws Exception { 287 HttpClient client = HttpClient.newHttpClient(); 288 HttpRequest request = HttpRequest 289 .newBuilder(new URI("http://hg.openjdk.java.net/jdk9/sandbox/jdk/shortlog/http-client-branch/")) 290 .GET() 291 .build(); 292 293 // This example shows how to return an InputStream that can be used to 294 // start reading the response body before the response is fully received. 295 // In comparison, the snipet below (which uses 296 // HttpResponse.BodyHandler.asString()) obviously will not return before the 297 // response body is fully read: 298 // 299 // System.out.println( 300 // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); 301 302 CompletableFuture<HttpResponse<InputStream>> handle = 303 client.sendAsync(request, new HttpInputStreamHandler(3)); 304 if (DEBUG) err.println("Request sent"); 305 306 HttpResponse<InputStream> pending = handle.get(); 307 308 // At this point, the response headers have been received, but the 309 // response body may not have arrived yet. This comes from 310 // the implementation of HttpResponseInputStream::getBody above, 311 // which returns an already completed completion stage, without 312 // waiting for any data. 313 // We can therefore access the headers - and the body, which 314 // is our live InputStream, without waiting... 315 HttpHeaders responseHeaders = pending.headers(); 316 317 // Get the charset declared in the response headers. 318 // The optional will be empty if the content type is not 319 // of type text/... 320 Optional<Charset> charset = getCharset(responseHeaders); 321 322 try (InputStream is = pending.body(); 323 // We assume a textual content type. Construct an InputStream 324 // Reader with the appropriate Charset. 325 // charset.get() will throw NPE if the content is not textual. 326 Reader r = new InputStreamReader(is, charset.get())) { 327 328 char[] buff = new char[32]; 329 int off=0, n=0; 330 if (DEBUG) err.println("Start receiving response body"); 331 if (DEBUG) err.println("Charset: " + charset.get()); 332 333 // Start consuming the InputStream as the data arrives. 334 // Will block until there is something to read... 335 while ((n = r.read(buff, off, buff.length - off)) > 0) { 336 assert (buff.length - off) > 0; 337 assert n <= (buff.length - off); 338 if (n == (buff.length - off)) { 339 System.out.print(buff); 340 off = 0; 341 } else { 342 off += n; 343 } 344 assert off < buff.length; 345 } 346 347 // last call to read may not have filled 'buff' completely. 348 // flush out the remaining characters. 349 assert off >= 0 && off < buff.length; 350 for (int i=0; i < off; i++) { 351 System.out.print(buff[i]); 352 } 353 354 // We're done! 355 System.out.println("Done!"); 356 } 357 } 358 359 }