1 /* 2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.InputStreamReader; 27 import java.io.Reader; 28 import java.net.URI; 29 import jdk.incubator.http.HttpClient; 30 import jdk.incubator.http.HttpHeaders; 31 import jdk.incubator.http.HttpRequest; 32 import jdk.incubator.http.HttpResponse; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.Charset; 35 import java.util.Locale; 36 import java.util.Optional; 37 import java.util.concurrent.ArrayBlockingQueue; 38 import java.util.concurrent.BlockingQueue; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.concurrent.CompletionStage; 41 import java.util.concurrent.Flow; 42 import java.util.stream.Stream; 43 44 /* 45 * @test 46 * @summary An example on how to read a response body with InputStream... 47 * @run main/othervm HttpInputStreamTest 48 * @author daniel fuchs 49 */ 50 public class HttpInputStreamTest { 51 52 public static boolean DEBUG = Boolean.getBoolean("test.debug"); 53 54 /** 55 * A simple HttpResponse.BodyHandler that creates a live 56 * InputStream to read the response body from the underlying ByteBuffer 57 * Flow. 58 * The InputStream is made immediately available for consumption, before 59 * the response body is fully received. 60 */ 61 public static class HttpInputStreamHandler 62 implements HttpResponse.BodyHandler<InputStream> { 63 64 public static final int MAX_BUFFERS_IN_QUEUE = 1; 65 66 private final int maxBuffers; 67 68 public HttpInputStreamHandler() { 69 this(MAX_BUFFERS_IN_QUEUE); 70 } 71 72 public HttpInputStreamHandler(int maxBuffers) { 73 this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 74 } 75 76 @Override 77 public synchronized HttpResponse.BodyProcessor<InputStream> 78 apply(int i, HttpHeaders hh) { 79 return new HttpResponseInputStream(maxBuffers); 80 } 81 82 /** 83 * An InputStream built on top of the Flow API. 84 */ 85 private static class HttpResponseInputStream extends InputStream 86 implements HttpResponse.BodyProcessor<InputStream> { 87 88 // An immutable ByteBuffer sentinel to mark that the last byte was received. 89 private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]); 90 91 // A queue of yet unprocessed ByteBuffers received from the flow API. 92 private final BlockingQueue<ByteBuffer> buffers; 93 private volatile Flow.Subscription subscription; 94 private volatile boolean closed; 95 private volatile Throwable failed; 96 private volatile ByteBuffer current; 97 98 HttpResponseInputStream() { 99 this(MAX_BUFFERS_IN_QUEUE); 100 } 101 102 HttpResponseInputStream(int maxBuffers) { 103 int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 104 this.buffers = new ArrayBlockingQueue<>(capacity); 105 } 106 107 @Override 108 public CompletionStage<InputStream> getBody() { 109 // Return the stream immediately, before the 110 // response body is received. 111 // This makes it possible for senAsync().get().body() 112 // to complete before the response body is received. 113 return CompletableFuture.completedStage(this); 114 } 115 116 // Returns the current byte buffer to read from. 117 // If the current buffer has no remaining data, will take the 118 // next buffer from the buffers queue, possibly blocking until 119 // a new buffer is made available through the Flow API, or the 120 // end of the flow is reached. 121 private ByteBuffer current() throws IOException { 122 while (current == null || !current.hasRemaining()) { 123 // Check whether the stream is claused or exhausted 124 if (closed || failed != null) { 125 throw new IOException("closed", failed); 126 } 127 if (current == LAST) break; 128 129 try { 130 // Take a new buffer from the queue, blocking 131 // if none is available yet... 132 if (DEBUG) System.err.println("Taking Buffer"); 133 current = buffers.take(); 134 if (DEBUG) System.err.println("Buffer Taken"); 135 136 // Check whether some exception was encountered 137 // upstream 138 if (closed || failed != null) { 139 throw new IOException("closed", failed); 140 } 141 142 // Check whether we're done. 143 if (current == LAST) break; 144 145 // Inform the producer that it can start sending 146 // us a new buffer 147 Flow.Subscription s = subscription; 148 if (s != null) s.request(1); 149 150 } catch (InterruptedException ex) { 151 // continue 152 } 153 } 154 assert current == LAST || current.hasRemaining(); 155 return current; 156 } 157 158 @Override 159 public int read(byte[] bytes, int off, int len) throws IOException { 160 // get the buffer to read from, possibly blocking if 161 // none is available 162 ByteBuffer buffer; 163 if ((buffer = current()) == LAST) return -1; 164 165 // don't attempt to read more than what is available 166 // in the current buffer. 167 int read = Math.min(buffer.remaining(), len); 168 assert read > 0 && read <= buffer.remaining(); 169 170 // buffer.get() will do the boundary check for us. 171 buffer.get(bytes, off, read); 172 return read; 173 } 174 175 @Override 176 public int read() throws IOException { 177 ByteBuffer buffer; 178 if ((buffer = current()) == LAST) return -1; 179 return buffer.get() & 0xFF; 180 } 181 182 @Override 183 public void onSubscribe(Flow.Subscription s) { 184 this.subscription = s; 185 s.request(Math.max(2, buffers.remainingCapacity() + 1)); 186 } 187 188 @Override 189 public synchronized void onNext(ByteBuffer t) { 190 try { 191 if (DEBUG) System.err.println("next buffer received"); 192 buffers.put(t); 193 if (DEBUG) System.err.println("buffered offered"); 194 } catch (Exception ex) { 195 failed = ex; 196 try { 197 close(); 198 } catch (IOException ex1) { 199 // OK 200 } 201 } 202 } 203 204 @Override 205 public void onError(Throwable thrwbl) { 206 failed = thrwbl; 207 } 208 209 @Override 210 public synchronized void onComplete() { 211 subscription = null; 212 onNext(LAST); 213 } 214 215 @Override 216 public void close() throws IOException { 217 synchronized (this) { 218 closed = true; 219 Flow.Subscription s = subscription; 220 if (s != null) { 221 s.cancel(); 222 } 223 subscription = null; 224 } 225 super.close(); 226 } 227 228 } 229 } 230 231 /** 232 * Examine the response headers to figure out the charset used to 233 * encode the body content. 234 * If the content type is not textual, returns an empty Optional. 235 * Otherwise, returns the body content's charset, defaulting to 236 * ISO-8859-1 if none is explicitly specified. 237 * @param headers The response headers. 238 * @return The charset to use for decoding the response body, if 239 * the response body content is text/... 240 */ 241 public static Optional<Charset> getCharset(HttpHeaders headers) { 242 Optional<String> contentType = headers.firstValue("Content-Type"); 243 Optional<Charset> charset = Optional.empty(); 244 if (contentType.isPresent()) { 245 final String[] values = contentType.get().split(";"); 246 if (values[0].startsWith("text/")) { 247 charset = Optional.of(Stream.of(values) 248 .map(x -> x.toLowerCase(Locale.ROOT)) 249 .map(String::trim) 250 .filter(x -> x.startsWith("charset=")) 251 .map(x -> x.substring("charset=".length())) 252 .findFirst() 253 .orElse("ISO-8859-1")) 254 .map(Charset::forName); 255 } 256 } 257 return charset; 258 } 259 260 public static void main(String[] args) throws Exception { 261 HttpClient client = HttpClient.newHttpClient(); 262 HttpRequest request = HttpRequest 263 .newBuilder(new URI("http://hg.openjdk.java.net/jdk9/sandbox/jdk/shortlog/http-client-branch/")) 264 .GET() 265 .build(); 266 267 // This example shows how to return an InputStream that can be used to 268 // start reading the response body before the response is fully received. 269 // In comparison, the snipet below (which uses 270 // HttpResponse.BodyHandler.asString()) obviously will not return before the 271 // response body is fully read: 272 // 273 // System.out.println( 274 // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); 275 276 CompletableFuture<HttpResponse<InputStream>> handle = 277 client.sendAsync(request, new HttpInputStreamHandler()); 278 if (DEBUG) System.err.println("Request sent"); 279 280 HttpResponse<InputStream> pending = handle.get(); 281 282 // At this point, the response headers have been received, but the 283 // response body may not have arrived yet. This comes from 284 // the implementation of HttpResponseInputStream::getBody above, 285 // which returns an already completed completion stage, without 286 // waiting for any data. 287 // We can therefore access the headers - and the body, which 288 // is our live InputStream, without waiting... 289 HttpHeaders responseHeaders = pending.headers(); 290 291 // Get the charset declared in the response headers. 292 // The optional will be empty if the content type is not 293 // of type text/... 294 Optional<Charset> charset = getCharset(responseHeaders); 295 296 try (InputStream is = pending.body(); 297 // We assume a textual content type. Construct an InputStream 298 // Reader with the appropriate Charset. 299 // charset.get() will throw NPE if the content is not textual. 300 Reader r = new InputStreamReader(is, charset.get())) { 301 302 char[] buff = new char[32]; 303 int off=0, n=0; 304 if (DEBUG) System.err.println("Start receiving response body"); 305 if (DEBUG) System.err.println("Charset: " + charset.get()); 306 307 // Start consuming the InputStream as the data arrives. 308 // Will block until there is something to read... 309 while ((n = r.read(buff, off, buff.length - off)) > 0) { 310 assert (buff.length - off) > 0; 311 assert n <= (buff.length - off); 312 if (n == (buff.length - off)) { 313 System.out.print(buff); 314 off = 0; 315 } else { 316 off += n; 317 } 318 assert off < buff.length; 319 } 320 321 // last call to read may not have filled 'buff' completely. 322 // flush out the remaining characters. 323 assert off >= 0 && off < buff.length; 324 for (int i=0; i < off; i++) { 325 System.out.print(buff[i]); 326 } 327 328 // We're done! 329 System.out.println("Done!"); 330 } 331 } 332 333 }