1 /* 2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.InputStreamReader; 27 import java.io.Reader; 28 import java.net.URI; 29 import jdk.incubator.http.HttpClient; 30 import jdk.incubator.http.HttpHeaders; 31 import jdk.incubator.http.HttpRequest; 32 import jdk.incubator.http.HttpResponse; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.Charset; 35 import java.util.Locale; 36 import java.util.List; 37 import java.util.Optional; 38 import java.util.concurrent.ArrayBlockingQueue; 39 import java.util.concurrent.BlockingQueue; 40 import java.util.concurrent.CompletableFuture; 41 import java.util.concurrent.CompletionStage; 42 import java.util.concurrent.Flow; 43 import java.util.stream.Stream; 44 45 /* 46 * @test 47 * @summary An example on how to read a response body with InputStream... 48 * @run main/othervm HttpInputStreamTest 49 * @author daniel fuchs 50 */ 51 public class HttpInputStreamTest { 52 53 public static boolean DEBUG = Boolean.getBoolean("test.debug"); 54 55 /** 56 * A simple HttpResponse.BodyHandler that creates a live 57 * InputStream to read the response body from the underlying ByteBuffer 58 * Flow. 59 * The InputStream is made immediately available for consumption, before 60 * the response body is fully received. 61 */ 62 public static class HttpInputStreamHandler 63 implements HttpResponse.BodyHandler<InputStream> { 64 65 public static final int MAX_BUFFERS_IN_QUEUE = 1; 66 67 private final int maxBuffers; 68 69 public HttpInputStreamHandler() { 70 this(MAX_BUFFERS_IN_QUEUE); 71 } 72 73 public HttpInputStreamHandler(int maxBuffers) { 74 this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 75 } 76 77 @Override 78 public synchronized HttpResponse.BodyProcessor<InputStream> 79 apply(int i, HttpHeaders hh) { 80 return new HttpResponseInputStream(maxBuffers); 81 } 82 83 /** 84 * An InputStream built on top of the Flow API. 85 */ 86 private static class HttpResponseInputStream extends InputStream 87 implements HttpResponse.BodyProcessor<InputStream> { 88 89 // An immutable ByteBuffer sentinel to mark that the last byte was received. 90 private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]); 91 92 // A queue of yet unprocessed ByteBuffers received from the flow API. 93 private final BlockingQueue<ByteBuffer> buffers; 94 private volatile Flow.Subscription subscription; 95 private volatile boolean closed; 96 private volatile Throwable failed; 97 private volatile ByteBuffer current; 98 99 HttpResponseInputStream() { 100 this(MAX_BUFFERS_IN_QUEUE); 101 } 102 103 HttpResponseInputStream(int maxBuffers) { 104 int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 105 this.buffers = new ArrayBlockingQueue<>(capacity); 106 } 107 108 @Override 109 public CompletionStage<InputStream> getBody() { 110 // Return the stream immediately, before the 111 // response body is received. 112 // This makes it possible for senAsync().get().body() 113 // to complete before the response body is received. 114 return CompletableFuture.completedStage(this); 115 } 116 117 // Returns the current byte buffer to read from. 118 // If the current buffer has no remaining data, will take the 119 // next buffer from the buffers queue, possibly blocking until 120 // a new buffer is made available through the Flow API, or the 121 // end of the flow is reached. 122 private ByteBuffer current() throws IOException { 123 while (current == null || !current.hasRemaining()) { 124 // Check whether the stream is claused or exhausted 125 if (closed || failed != null) { 126 throw new IOException("closed", failed); 127 } 128 if (current == LAST) break; 129 130 try { 131 // Take a new buffer from the queue, blocking 132 // if none is available yet... 133 if (DEBUG) System.err.println("Taking Buffer"); 134 current = buffers.take(); 135 if (DEBUG) System.err.println("Buffer Taken"); 136 137 // Check whether some exception was encountered 138 // upstream 139 if (closed || failed != null) { 140 throw new IOException("closed", failed); 141 } 142 143 // Check whether we're done. 144 if (current == LAST) break; 145 146 // Inform the producer that it can start sending 147 // us a new buffer 148 Flow.Subscription s = subscription; 149 if (s != null) s.request(1); 150 151 } catch (InterruptedException ex) { 152 // continue 153 } 154 } 155 assert current == LAST || current.hasRemaining(); 156 return current; 157 } 158 159 @Override 160 public int read(byte[] bytes, int off, int len) throws IOException { 161 // get the buffer to read from, possibly blocking if 162 // none is available 163 ByteBuffer buffer; 164 if ((buffer = current()) == LAST) return -1; 165 166 // don't attempt to read more than what is available 167 // in the current buffer. 168 int read = Math.min(buffer.remaining(), len); 169 assert read > 0 && read <= buffer.remaining(); 170 171 // buffer.get() will do the boundary check for us. 172 buffer.get(bytes, off, read); 173 return read; 174 } 175 176 @Override 177 public int read() throws IOException { 178 ByteBuffer buffer; 179 if ((buffer = current()) == LAST) return -1; 180 return buffer.get() & 0xFF; 181 } 182 183 @Override 184 public void onSubscribe(Flow.Subscription s) { 185 this.subscription = s; 186 s.request(Math.max(2, buffers.remainingCapacity() + 1)); 187 } 188 189 @Override 190 public synchronized void onNext(List<ByteBuffer> t) { 191 if (t.size() > 1) 192 System.out.println ("XXX " + t.size()); 193 try { 194 if (DEBUG) System.err.println("next buffer received"); 195 for (ByteBuffer b : t) 196 buffers.put(b); 197 if (DEBUG) System.err.println("buffered offered"); 198 } catch (Exception ex) { 199 failed = ex; 200 try { 201 close(); 202 } catch (IOException ex1) { 203 // OK 204 } 205 } 206 } 207 208 @Override 209 public void onError(Throwable thrwbl) { 210 failed = thrwbl; 211 } 212 213 @Override 214 public synchronized void onComplete() { 215 subscription = null; 216 onNext(List.of(LAST)); 217 } 218 219 @Override 220 public void close() throws IOException { 221 synchronized (this) { 222 closed = true; 223 Flow.Subscription s = subscription; 224 if (s != null) { 225 s.cancel(); 226 } 227 subscription = null; 228 } 229 super.close(); 230 } 231 232 } 233 } 234 235 /** 236 * Examine the response headers to figure out the charset used to 237 * encode the body content. 238 * If the content type is not textual, returns an empty Optional. 239 * Otherwise, returns the body content's charset, defaulting to 240 * ISO-8859-1 if none is explicitly specified. 241 * @param headers The response headers. 242 * @return The charset to use for decoding the response body, if 243 * the response body content is text/... 244 */ 245 public static Optional<Charset> getCharset(HttpHeaders headers) { 246 Optional<String> contentType = headers.firstValue("Content-Type"); 247 Optional<Charset> charset = Optional.empty(); 248 if (contentType.isPresent()) { 249 final String[] values = contentType.get().split(";"); 250 if (values[0].startsWith("text/")) { 251 charset = Optional.of(Stream.of(values) 252 .map(x -> x.toLowerCase(Locale.ROOT)) 253 .map(String::trim) 254 .filter(x -> x.startsWith("charset=")) 255 .map(x -> x.substring("charset=".length())) 256 .findFirst() 257 .orElse("ISO-8859-1")) 258 .map(Charset::forName); 259 } 260 } 261 return charset; 262 } 263 264 public static void main(String[] args) throws Exception { 265 HttpClient client = HttpClient.newHttpClient(); 266 HttpRequest request = HttpRequest 267 .newBuilder(new URI("http://hg.openjdk.java.net/jdk9/sandbox/jdk/shortlog/http-client-branch/")) 268 .GET() 269 .build(); 270 271 // This example shows how to return an InputStream that can be used to 272 // start reading the response body before the response is fully received. 273 // In comparison, the snipet below (which uses 274 // HttpResponse.BodyHandler.asString()) obviously will not return before the 275 // response body is fully read: 276 // 277 // System.out.println( 278 // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); 279 280 CompletableFuture<HttpResponse<InputStream>> handle = 281 client.sendAsync(request, new HttpInputStreamHandler()); 282 if (DEBUG) System.err.println("Request sent"); 283 284 HttpResponse<InputStream> pending = handle.get(); 285 286 // At this point, the response headers have been received, but the 287 // response body may not have arrived yet. This comes from 288 // the implementation of HttpResponseInputStream::getBody above, 289 // which returns an already completed completion stage, without 290 // waiting for any data. 291 // We can therefore access the headers - and the body, which 292 // is our live InputStream, without waiting... 293 HttpHeaders responseHeaders = pending.headers(); 294 295 // Get the charset declared in the response headers. 296 // The optional will be empty if the content type is not 297 // of type text/... 298 Optional<Charset> charset = getCharset(responseHeaders); 299 300 try (InputStream is = pending.body(); 301 // We assume a textual content type. Construct an InputStream 302 // Reader with the appropriate Charset. 303 // charset.get() will throw NPE if the content is not textual. 304 Reader r = new InputStreamReader(is, charset.get())) { 305 306 char[] buff = new char[32]; 307 int off=0, n=0; 308 if (DEBUG) System.err.println("Start receiving response body"); 309 if (DEBUG) System.err.println("Charset: " + charset.get()); 310 311 // Start consuming the InputStream as the data arrives. 312 // Will block until there is something to read... 313 while ((n = r.read(buff, off, buff.length - off)) > 0) { 314 assert (buff.length - off) > 0; 315 assert n <= (buff.length - off); 316 if (n == (buff.length - off)) { 317 System.out.print(buff); 318 off = 0; 319 } else { 320 off += n; 321 } 322 assert off < buff.length; 323 } 324 325 // last call to read may not have filled 'buff' completely. 326 // flush out the remaining characters. 327 assert off >= 0 && off < buff.length; 328 for (int i=0; i < off; i++) { 329 System.out.print(buff[i]); 330 } 331 332 // We're done! 333 System.out.println("Done!"); 334 } 335 } 336 337 }