1 /* 2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.InputStreamReader; 27 import java.io.Reader; 28 import java.net.URI; 29 import jdk.incubator.http.HttpClient; 30 import jdk.incubator.http.HttpHeaders; 31 import jdk.incubator.http.HttpRequest; 32 import jdk.incubator.http.HttpResponse; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.Charset; 35 import java.util.Locale; 36 import java.util.Optional; 37 import java.util.concurrent.ArrayBlockingQueue; 38 import java.util.concurrent.BlockingQueue; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.concurrent.CompletionStage; 41 import java.util.concurrent.Flow; 42 import java.util.stream.Stream; 43 44 /* 45 * @test 46 * @summary An example on how to read a response body with InputStream... 47 * @run main/othervm HttpInputStreamTest 48 * @author daniel fuchs 49 */ 50 public class HttpInputStreamTest { 51 52 public static boolean DEBUG = Boolean.getBoolean("test.debug"); 53 54 /** 55 * A simple HttpResponse.BodyHandler that creates a live 56 * InputStream to read the response body from the underlying ByteBuffer 57 * Flow. 58 * The InputStream is made immediately available for consumption, before 59 * the response body is fully received. 60 */ 61 public static class HttpInputStreamHandler 62 implements HttpResponse.BodyHandler<InputStream> { 63 64 public static final int MAX_BUFFERS_IN_QUEUE = 1; 65 66 private final int maxBuffers; 67 68 public HttpInputStreamHandler() { 69 this(MAX_BUFFERS_IN_QUEUE); 70 } 71 72 public HttpInputStreamHandler(int maxBuffers) { 73 this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 74 } 75 76 @Override 77 public synchronized HttpResponse.BodyProcessor<InputStream> 78 apply(int i, HttpHeaders hh) { 79 return new HttpResponseInputStream(maxBuffers); 80 } 81 82 /** 83 * An InputStream built on top of the Flow API. 84 */ 85 private static class HttpResponseInputStream extends InputStream 86 implements HttpResponse.BodyProcessor<InputStream> { 87 88 // An immutable ByteBuffer sentinel to mark that the last byte was received. 89 private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]); 90 91 // A queue of yet unprocessed ByteBuffers received from the flow API. 92 private final BlockingQueue<ByteBuffer> buffers; 93 private volatile Flow.Subscription subscription; 94 private volatile boolean closed; 95 private volatile Throwable failed; 96 private volatile ByteBuffer current; 97 98 HttpResponseInputStream() { 99 this(MAX_BUFFERS_IN_QUEUE); 100 } 101 102 HttpResponseInputStream(int maxBuffers) { 103 int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 104 this.buffers = new ArrayBlockingQueue<>(capacity); 105 } 106 107 @Override 108 public CompletionStage<InputStream> getBody() { 109 // Return the stream immediately, before the 110 // response body is received. 111 // This makes it possible for senAsync().get().body() 112 // to complete before the response body is received. 113 return CompletableFuture.completedStage(this); 114 } 115 116 // Returns the current byte buffer to read from. 117 // If the current buffer has no remaining data, will take the 118 // next buffer from the buffers queue, possibly blocking until 119 // a new buffer is made available through the Flow API, or the 120 // end of the flow is reached. 121 private ByteBuffer current() throws IOException { 122 while (current == null || !current.hasRemaining()) { 123 // Check whether the stream is claused or exhausted 124 if (closed || failed != null) { 125 throw new IOException("closed", failed); 126 } 127 if (current == LAST) break; 128 129 try { 130 // Take a new buffer from the queue, blocking 131 // if none is available yet... 132 if (DEBUG) System.err.println("Taking Buffer"); 133 current = buffers.take(); 134 if (DEBUG) System.err.println("Buffer Taken"); 135 136 // Check whether some exception was encountered 137 // upstream 138 if (closed || failed != null) { 139 throw new IOException("closed", failed); 140 } 141 142 // Check whether we're done. 143 if (current == LAST) break; 144 145 // Inform the producer that it can start sending 146 // us a new buffer 147 Flow.Subscription s = subscription; 148 if (s != null) s.request(1); 149 150 } catch (InterruptedException ex) { 151 // continue 152 } 153 } 154 assert current == LAST || current.hasRemaining(); 155 return current; 156 } 157 158 @Override 159 public int read(byte[] bytes, int off, int len) throws IOException { 160 // get the buffer to read from, possibly blocking if 161 // none is available 162 ByteBuffer buffer; 163 if ((buffer = current()) == LAST) return -1; 164 165 // don't attempt to read more than what is available 166 // in the current buffer. 167 int read = Math.min(buffer.remaining(), len); 168 assert read > 0 && read <= buffer.remaining(); 169 170 // buffer.get() will do the boundary check for us. 171 buffer.get(bytes, off, read); 172 return read; 173 } 174 175 @Override 176 public int read() throws IOException { 177 ByteBuffer buffer; 178 if ((buffer = current()) == LAST) return -1; 179 return buffer.get() & 0xFF; 180 } 181 182 @Override 183 public void onSubscribe(Flow.Subscription s) { 184 this.subscription = s; 185 s.request(Math.max(2, buffers.remainingCapacity() + 1)); 186 } 187 188 @Override 189 public synchronized void onNext(ByteBuffer t) { 190 try { 191 if (DEBUG) System.err.println("next buffer received"); 192 buffers.put(t); 193 if (DEBUG) System.err.println("buffered offered"); 194 } catch (Exception ex) { 195 failed = ex; 196 try { 197 close(); 198 } catch (IOException ex1) { 199 // OK 200 } 201 } 202 } 203 204 @Override 205 public void onError(Throwable thrwbl) { 206 failed = thrwbl; 207 } 208 209 @Override 210 public synchronized void onComplete() { 211 subscription = null; 212 onNext(LAST); 213 } 214 215 @Override 216 public void close() throws IOException { 217 synchronized (this) { 218 closed = true; 219 Flow.Subscription s = subscription; 220 if (s != null) { 221 s.cancel(); 222 } 223 subscription = null; 224 } 225 super.close(); 226 } 227 228 } 229 } 230 231 /** 232 * Examine the response headers to figure out the charset used to 233 * encode the body content. 234 * If the content type is not textual, returns an empty Optional. 235 * Otherwise, returns the body content's charset, defaulting to 236 * ISO-8859-1 if none is explicitly specified. 237 * @param headers The response headers. 238 * @return The charset to use for decoding the response body, if 239 * the response body content is text/... 240 */ 241 public static Optional<Charset> getCharset(HttpHeaders headers) { 242 Optional<String> contentType = headers.firstValue("Content-Type"); 243 Optional<Charset> charset = Optional.empty(); 244 if (contentType.isPresent()) { 257 return charset; 258 } 259 260 public static void main(String[] args) throws Exception { 261 HttpClient client = HttpClient.newHttpClient(); 262 HttpRequest request = HttpRequest 263 .newBuilder(new URI("http://hg.openjdk.java.net/jdk9/sandbox/jdk/shortlog/http-client-branch/")) 264 .GET() 265 .build(); 266 267 // This example shows how to return an InputStream that can be used to 268 // start reading the response body before the response is fully received. 269 // In comparison, the snipet below (which uses 270 // HttpResponse.BodyHandler.asString()) obviously will not return before the 271 // response body is fully read: 272 // 273 // System.out.println( 274 // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); 275 276 CompletableFuture<HttpResponse<InputStream>> handle = 277 client.sendAsync(request, new HttpInputStreamHandler()); 278 if (DEBUG) System.err.println("Request sent"); 279 280 HttpResponse<InputStream> pending = handle.get(); 281 282 // At this point, the response headers have been received, but the 283 // response body may not have arrived yet. This comes from 284 // the implementation of HttpResponseInputStream::getBody above, 285 // which returns an already completed completion stage, without 286 // waiting for any data. 287 // We can therefore access the headers - and the body, which 288 // is our live InputStream, without waiting... 289 HttpHeaders responseHeaders = pending.headers(); 290 291 // Get the charset declared in the response headers. 292 // The optional will be empty if the content type is not 293 // of type text/... 294 Optional<Charset> charset = getCharset(responseHeaders); 295 296 try (InputStream is = pending.body(); 297 // We assume a textual content type. Construct an InputStream 298 // Reader with the appropriate Charset. 299 // charset.get() will throw NPE if the content is not textual. 300 Reader r = new InputStreamReader(is, charset.get())) { 301 302 char[] buff = new char[32]; 303 int off=0, n=0; 304 if (DEBUG) System.err.println("Start receiving response body"); 305 if (DEBUG) System.err.println("Charset: " + charset.get()); 306 307 // Start consuming the InputStream as the data arrives. 308 // Will block until there is something to read... 309 while ((n = r.read(buff, off, buff.length - off)) > 0) { 310 assert (buff.length - off) > 0; 311 assert n <= (buff.length - off); 312 if (n == (buff.length - off)) { 313 System.out.print(buff); 314 off = 0; 315 } else { 316 off += n; 317 } 318 assert off < buff.length; 319 } 320 321 // last call to read may not have filled 'buff' completely. 322 // flush out the remaining characters. 323 assert off >= 0 && off < buff.length; 324 for (int i=0; i < off; i++) { 325 System.out.print(buff[i]); | 1 /* 2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.InputStreamReader; 27 import java.io.Reader; 28 import java.net.URI; 29 import jdk.incubator.http.HttpClient; 30 import jdk.incubator.http.HttpHeaders; 31 import jdk.incubator.http.HttpRequest; 32 import jdk.incubator.http.HttpResponse; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.Charset; 35 import java.util.Iterator; 36 import java.util.List; 37 import java.util.Locale; 38 import java.util.Optional; 39 import java.util.concurrent.ArrayBlockingQueue; 40 import java.util.concurrent.BlockingQueue; 41 import java.util.concurrent.CompletableFuture; 42 import java.util.concurrent.CompletionStage; 43 import java.util.concurrent.Flow; 44 import java.util.stream.Stream; 45 import static java.lang.System.err; 46 47 /* 48 * @test 49 * @summary An example on how to read a response body with InputStream... 50 * @run main/othervm -Dtest.debug=true HttpInputStreamTest 51 * @author daniel fuchs 52 */ 53 public class HttpInputStreamTest { 54 55 public static boolean DEBUG = Boolean.getBoolean("test.debug"); 56 57 /** 58 * A simple HttpResponse.BodyHandler that creates a live 59 * InputStream to read the response body from the underlying ByteBuffer 60 * Flow. 61 * The InputStream is made immediately available for consumption, before 62 * the response body is fully received. 63 */ 64 public static class HttpInputStreamHandler 65 implements HttpResponse.BodyHandler<InputStream> { 66 67 public static final int MAX_BUFFERS_IN_QUEUE = 1; // lock-step with the producer 68 69 private final int maxBuffers; 70 71 public HttpInputStreamHandler() { 72 this(MAX_BUFFERS_IN_QUEUE); 73 } 74 75 public HttpInputStreamHandler(int maxBuffers) { 76 this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 77 } 78 79 @Override 80 public HttpResponse.BodySubscriber<InputStream> 81 apply(int i, HttpHeaders hh) { 82 return new HttpResponseInputStream(maxBuffers); 83 } 84 85 /** 86 * An InputStream built on top of the Flow API. 87 */ 88 private static class HttpResponseInputStream extends InputStream 89 implements HttpResponse.BodySubscriber<InputStream> { 90 91 // An immutable ByteBuffer sentinel to mark that the last byte was received. 92 private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]); 93 private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER); 94 95 // A queue of yet unprocessed ByteBuffers received from the flow API. 96 private final BlockingQueue<List<ByteBuffer>> buffers; 97 private volatile Flow.Subscription subscription; 98 private volatile boolean closed; 99 private volatile Throwable failed; 100 private volatile Iterator<ByteBuffer> currentListItr; 101 private volatile ByteBuffer currentBuffer; 102 103 HttpResponseInputStream() { 104 this(MAX_BUFFERS_IN_QUEUE); 105 } 106 107 HttpResponseInputStream(int maxBuffers) { 108 int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 109 // 1 additional slot for LAST_LIST added by onComplete 110 this.buffers = new ArrayBlockingQueue<>(capacity + 1); 111 } 112 113 @Override 114 public CompletionStage<InputStream> getBody() { 115 // Return the stream immediately, before the 116 // response body is received. 117 // This makes it possible for senAsync().get().body() 118 // to complete before the response body is received. 119 return CompletableFuture.completedStage(this); 120 } 121 122 // Returns the current byte buffer to read from. 123 // If the current buffer has no remaining data, will take the 124 // next buffer from the buffers queue, possibly blocking until 125 // a new buffer is made available through the Flow API, or the 126 // end of the flow is reached. 127 private ByteBuffer current() throws IOException { 128 while (currentBuffer == null || !currentBuffer.hasRemaining()) { 129 // Check whether the stream is closed or exhausted 130 if (closed || failed != null) { 131 throw new IOException("closed", failed); 132 } 133 if (currentBuffer == LAST_BUFFER) break; 134 135 try { 136 if (currentListItr == null || !currentListItr.hasNext()) { 137 // Take a new list of buffers from the queue, blocking 138 // if none is available yet... 139 140 if (DEBUG) err.println("Taking list of Buffers"); 141 List<ByteBuffer> lb = buffers.take(); 142 currentListItr = lb.iterator(); 143 if (DEBUG) err.println("List of Buffers Taken"); 144 145 // Check whether an exception was encountered upstream 146 if (closed || failed != null) 147 throw new IOException("closed", failed); 148 149 // Check whether we're done. 150 if (lb == LAST_LIST) { 151 currentListItr = null; 152 currentBuffer = LAST_BUFFER; 153 break; 154 } 155 156 // Request another upstream item ( list of buffers ) 157 Flow.Subscription s = subscription; 158 if (s != null) 159 s.request(1); 160 } 161 assert currentListItr != null; 162 assert currentListItr.hasNext(); 163 if (DEBUG) err.println("Next Buffer"); 164 currentBuffer = currentListItr.next(); 165 } catch (InterruptedException ex) { 166 // continue 167 } 168 } 169 assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining(); 170 return currentBuffer; 171 } 172 173 @Override 174 public int read(byte[] bytes, int off, int len) throws IOException { 175 // get the buffer to read from, possibly blocking if 176 // none is available 177 ByteBuffer buffer; 178 if ((buffer = current()) == LAST_BUFFER) return -1; 179 180 // don't attempt to read more than what is available 181 // in the current buffer. 182 int read = Math.min(buffer.remaining(), len); 183 assert read > 0 && read <= buffer.remaining(); 184 185 // buffer.get() will do the boundary check for us. 186 buffer.get(bytes, off, read); 187 return read; 188 } 189 190 @Override 191 public int read() throws IOException { 192 ByteBuffer buffer; 193 if ((buffer = current()) == LAST_BUFFER) return -1; 194 return buffer.get() & 0xFF; 195 } 196 197 @Override 198 public void onSubscribe(Flow.Subscription s) { 199 if (this.subscription != null) { 200 s.cancel(); 201 return; 202 } 203 this.subscription = s; 204 assert buffers.remainingCapacity() > 1; // should at least be 2 205 if (DEBUG) err.println("onSubscribe: requesting " 206 + Math.max(1, buffers.remainingCapacity() - 1)); 207 s.request(Math.max(1, buffers.remainingCapacity() - 1)); 208 } 209 210 @Override 211 public void onNext(List<ByteBuffer> t) { 212 try { 213 if (DEBUG) err.println("next item received"); 214 if (!buffers.offer(t)) { 215 throw new IllegalStateException("queue is full"); 216 } 217 if (DEBUG) err.println("item offered"); 218 } catch (Exception ex) { 219 failed = ex; 220 try { 221 close(); 222 } catch (IOException ex1) { 223 // OK 224 } 225 } 226 } 227 228 @Override 229 public void onError(Throwable thrwbl) { 230 subscription = null; 231 failed = thrwbl; 232 } 233 234 @Override 235 public void onComplete() { 236 subscription = null; 237 onNext(LAST_LIST); 238 } 239 240 @Override 241 public void close() throws IOException { 242 synchronized (this) { 243 if (closed) return; 244 closed = true; 245 } 246 Flow.Subscription s = subscription; 247 subscription = null; 248 if (s != null) { 249 s.cancel(); 250 } 251 super.close(); 252 } 253 254 } 255 } 256 257 /** 258 * Examine the response headers to figure out the charset used to 259 * encode the body content. 260 * If the content type is not textual, returns an empty Optional. 261 * Otherwise, returns the body content's charset, defaulting to 262 * ISO-8859-1 if none is explicitly specified. 263 * @param headers The response headers. 264 * @return The charset to use for decoding the response body, if 265 * the response body content is text/... 266 */ 267 public static Optional<Charset> getCharset(HttpHeaders headers) { 268 Optional<String> contentType = headers.firstValue("Content-Type"); 269 Optional<Charset> charset = Optional.empty(); 270 if (contentType.isPresent()) { 283 return charset; 284 } 285 286 public static void main(String[] args) throws Exception { 287 HttpClient client = HttpClient.newHttpClient(); 288 HttpRequest request = HttpRequest 289 .newBuilder(new URI("http://hg.openjdk.java.net/jdk9/sandbox/jdk/shortlog/http-client-branch/")) 290 .GET() 291 .build(); 292 293 // This example shows how to return an InputStream that can be used to 294 // start reading the response body before the response is fully received. 295 // In comparison, the snipet below (which uses 296 // HttpResponse.BodyHandler.asString()) obviously will not return before the 297 // response body is fully read: 298 // 299 // System.out.println( 300 // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); 301 302 CompletableFuture<HttpResponse<InputStream>> handle = 303 client.sendAsync(request, new HttpInputStreamHandler(3)); 304 if (DEBUG) err.println("Request sent"); 305 306 HttpResponse<InputStream> pending = handle.get(); 307 308 // At this point, the response headers have been received, but the 309 // response body may not have arrived yet. This comes from 310 // the implementation of HttpResponseInputStream::getBody above, 311 // which returns an already completed completion stage, without 312 // waiting for any data. 313 // We can therefore access the headers - and the body, which 314 // is our live InputStream, without waiting... 315 HttpHeaders responseHeaders = pending.headers(); 316 317 // Get the charset declared in the response headers. 318 // The optional will be empty if the content type is not 319 // of type text/... 320 Optional<Charset> charset = getCharset(responseHeaders); 321 322 try (InputStream is = pending.body(); 323 // We assume a textual content type. Construct an InputStream 324 // Reader with the appropriate Charset. 325 // charset.get() will throw NPE if the content is not textual. 326 Reader r = new InputStreamReader(is, charset.get())) { 327 328 char[] buff = new char[32]; 329 int off=0, n=0; 330 if (DEBUG) err.println("Start receiving response body"); 331 if (DEBUG) err.println("Charset: " + charset.get()); 332 333 // Start consuming the InputStream as the data arrives. 334 // Will block until there is something to read... 335 while ((n = r.read(buff, off, buff.length - off)) > 0) { 336 assert (buff.length - off) > 0; 337 assert n <= (buff.length - off); 338 if (n == (buff.length - off)) { 339 System.out.print(buff); 340 off = 0; 341 } else { 342 off += n; 343 } 344 assert off < buff.length; 345 } 346 347 // last call to read may not have filled 'buff' completely. 348 // flush out the remaining characters. 349 assert off >= 0 && off < buff.length; 350 for (int i=0; i < off; i++) { 351 System.out.print(buff[i]); |