15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.InputStreamReader; 27 import java.io.Reader; 28 import java.net.URI; 29 import jdk.incubator.http.HttpClient; 30 import jdk.incubator.http.HttpHeaders; 31 import jdk.incubator.http.HttpRequest; 32 import jdk.incubator.http.HttpResponse; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.Charset; 35 import java.util.Locale; 36 import java.util.Optional; 37 import java.util.concurrent.ArrayBlockingQueue; 38 import java.util.concurrent.BlockingQueue; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.concurrent.CompletionStage; 41 import java.util.concurrent.Flow; 42 import java.util.stream.Stream; 43 44 /* 45 * @test 46 * @summary An example on how to read a response body with InputStream... 47 * @run main/othervm HttpInputStreamTest 48 * @author daniel fuchs 49 */ 50 public class HttpInputStreamTest { 51 52 public static boolean DEBUG = Boolean.getBoolean("test.debug"); 53 54 /** 55 * A simple HttpResponse.BodyHandler that creates a live 56 * InputStream to read the response body from the underlying ByteBuffer 57 * Flow. 58 * The InputStream is made immediately available for consumption, before 59 * the response body is fully received. 60 */ 61 public static class HttpInputStreamHandler 62 implements HttpResponse.BodyHandler<InputStream> { 63 64 public static final int MAX_BUFFERS_IN_QUEUE = 1; 65 66 private final int maxBuffers; 67 68 public HttpInputStreamHandler() { 69 this(MAX_BUFFERS_IN_QUEUE); 70 } 71 72 public HttpInputStreamHandler(int maxBuffers) { 73 this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 74 } 75 76 @Override 77 public synchronized HttpResponse.BodyProcessor<InputStream> 78 apply(int i, HttpHeaders hh) { 79 return new HttpResponseInputStream(maxBuffers); 80 } 81 82 /** 83 * An InputStream built on top of the Flow API. 84 */ 85 private static class HttpResponseInputStream extends InputStream 86 implements HttpResponse.BodyProcessor<InputStream> { 87 88 // An immutable ByteBuffer sentinel to mark that the last byte was received. 89 private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]); 90 91 // A queue of yet unprocessed ByteBuffers received from the flow API. 92 private final BlockingQueue<ByteBuffer> buffers; 93 private volatile Flow.Subscription subscription; 94 private volatile boolean closed; 95 private volatile Throwable failed; 96 private volatile ByteBuffer current; 97 98 HttpResponseInputStream() { 99 this(MAX_BUFFERS_IN_QUEUE); 100 } 101 102 HttpResponseInputStream(int maxBuffers) { 103 int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 104 this.buffers = new ArrayBlockingQueue<>(capacity); 105 } 106 107 @Override 108 public CompletionStage<InputStream> getBody() { 109 // Return the stream immediately, before the 110 // response body is received. 111 // This makes it possible for senAsync().get().body() 112 // to complete before the response body is received. 113 return CompletableFuture.completedStage(this); 114 } 115 116 // Returns the current byte buffer to read from. 117 // If the current buffer has no remaining data, will take the 118 // next buffer from the buffers queue, possibly blocking until 119 // a new buffer is made available through the Flow API, or the 120 // end of the flow is reached. 121 private ByteBuffer current() throws IOException { 122 while (current == null || !current.hasRemaining()) { 123 // Check whether the stream is claused or exhausted 124 if (closed || failed != null) { 125 throw new IOException("closed", failed); 126 } 127 if (current == LAST) break; 128 129 try { 130 // Take a new buffer from the queue, blocking 131 // if none is available yet... 132 if (DEBUG) System.err.println("Taking Buffer"); 133 current = buffers.take(); 134 if (DEBUG) System.err.println("Buffer Taken"); 135 136 // Check whether some exception was encountered 137 // upstream 138 if (closed || failed != null) { 139 throw new IOException("closed", failed); 140 } 141 142 // Check whether we're done. 143 if (current == LAST) break; 144 145 // Inform the producer that it can start sending 146 // us a new buffer 147 Flow.Subscription s = subscription; 148 if (s != null) s.request(1); 149 150 } catch (InterruptedException ex) { 151 // continue 152 } 153 } 154 assert current == LAST || current.hasRemaining(); 155 return current; 156 } 157 158 @Override 159 public int read(byte[] bytes, int off, int len) throws IOException { 160 // get the buffer to read from, possibly blocking if 161 // none is available 162 ByteBuffer buffer; 163 if ((buffer = current()) == LAST) return -1; 164 165 // don't attempt to read more than what is available 166 // in the current buffer. 167 int read = Math.min(buffer.remaining(), len); 168 assert read > 0 && read <= buffer.remaining(); 169 170 // buffer.get() will do the boundary check for us. 171 buffer.get(bytes, off, read); 172 return read; 173 } 174 175 @Override 176 public int read() throws IOException { 177 ByteBuffer buffer; 178 if ((buffer = current()) == LAST) return -1; 179 return buffer.get() & 0xFF; 180 } 181 182 @Override 183 public void onSubscribe(Flow.Subscription s) { 184 this.subscription = s; 185 s.request(Math.max(2, buffers.remainingCapacity() + 1)); 186 } 187 188 @Override 189 public synchronized void onNext(ByteBuffer t) { 190 try { 191 if (DEBUG) System.err.println("next buffer received"); 192 buffers.put(t); 193 if (DEBUG) System.err.println("buffered offered"); 194 } catch (Exception ex) { 195 failed = ex; 196 try { 197 close(); 198 } catch (IOException ex1) { 199 // OK 200 } 201 } 202 } 203 204 @Override 205 public void onError(Throwable thrwbl) { 206 failed = thrwbl; 207 } 208 209 @Override 210 public synchronized void onComplete() { 211 subscription = null; 212 onNext(LAST); 213 } 214 215 @Override 216 public void close() throws IOException { 217 synchronized (this) { 218 closed = true; 219 Flow.Subscription s = subscription; 220 if (s != null) { 221 s.cancel(); 222 } 223 subscription = null; 224 } 225 super.close(); 226 } 227 228 } 229 } 230 231 /** 232 * Examine the response headers to figure out the charset used to 258 } 259 260 public static void main(String[] args) throws Exception { 261 HttpClient client = HttpClient.newHttpClient(); 262 HttpRequest request = HttpRequest 263 .newBuilder(new URI("http://hg.openjdk.java.net/jdk9/sandbox/jdk/shortlog/http-client-branch/")) 264 .GET() 265 .build(); 266 267 // This example shows how to return an InputStream that can be used to 268 // start reading the response body before the response is fully received. 269 // In comparison, the snipet below (which uses 270 // HttpResponse.BodyHandler.asString()) obviously will not return before the 271 // response body is fully read: 272 // 273 // System.out.println( 274 // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); 275 276 CompletableFuture<HttpResponse<InputStream>> handle = 277 client.sendAsync(request, new HttpInputStreamHandler()); 278 if (DEBUG) System.err.println("Request sent"); 279 280 HttpResponse<InputStream> pending = handle.get(); 281 282 // At this point, the response headers have been received, but the 283 // response body may not have arrived yet. This comes from 284 // the implementation of HttpResponseInputStream::getBody above, 285 // which returns an already completed completion stage, without 286 // waiting for any data. 287 // We can therefore access the headers - and the body, which 288 // is our live InputStream, without waiting... 289 HttpHeaders responseHeaders = pending.headers(); 290 291 // Get the charset declared in the response headers. 292 // The optional will be empty if the content type is not 293 // of type text/... 294 Optional<Charset> charset = getCharset(responseHeaders); 295 296 try (InputStream is = pending.body(); 297 // We assume a textual content type. Construct an InputStream 298 // Reader with the appropriate Charset. 299 // charset.get() will throw NPE if the content is not textual. 300 Reader r = new InputStreamReader(is, charset.get())) { 301 302 char[] buff = new char[32]; 303 int off=0, n=0; 304 if (DEBUG) System.err.println("Start receiving response body"); 305 if (DEBUG) System.err.println("Charset: " + charset.get()); 306 307 // Start consuming the InputStream as the data arrives. 308 // Will block until there is something to read... 309 while ((n = r.read(buff, off, buff.length - off)) > 0) { 310 assert (buff.length - off) > 0; 311 assert n <= (buff.length - off); 312 if (n == (buff.length - off)) { 313 System.out.print(buff); 314 off = 0; 315 } else { 316 off += n; 317 } 318 assert off < buff.length; 319 } 320 321 // last call to read may not have filled 'buff' completely. 322 // flush out the remaining characters. 323 assert off >= 0 && off < buff.length; 324 for (int i=0; i < off; i++) { 325 System.out.print(buff[i]); | 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.InputStreamReader; 27 import java.io.Reader; 28 import java.net.URI; 29 import jdk.incubator.http.HttpClient; 30 import jdk.incubator.http.HttpHeaders; 31 import jdk.incubator.http.HttpRequest; 32 import jdk.incubator.http.HttpResponse; 33 import java.nio.ByteBuffer; 34 import java.nio.charset.Charset; 35 import java.util.ArrayList; 36 import java.util.List; 37 import java.util.Locale; 38 import java.util.Optional; 39 import java.util.concurrent.ArrayBlockingQueue; 40 import java.util.concurrent.BlockingQueue; 41 import java.util.concurrent.CompletableFuture; 42 import java.util.concurrent.CompletionStage; 43 import java.util.concurrent.Flow; 44 import java.util.stream.Stream; 45 import static java.lang.System.err; 46 47 /* 48 * @test 49 * @summary An example on how to read a response body with InputStream... 50 * @run main/othervm HttpInputStreamTest 51 * @author daniel fuchs 52 */ 53 public class HttpInputStreamTest { 54 55 public static boolean DEBUG = Boolean.getBoolean("test.debug"); 56 57 /** 58 * A simple HttpResponse.BodyHandler that creates a live 59 * InputStream to read the response body from the underlying ByteBuffer 60 * Flow. 61 * The InputStream is made immediately available for consumption, before 62 * the response body is fully received. 63 */ 64 public static class HttpInputStreamHandler 65 implements HttpResponse.BodyHandler<InputStream> { 66 67 public static final int MAX_BUFFERS_IN_QUEUE = 1; // lock-step with the producer 68 69 private final int maxBuffers; 70 71 public HttpInputStreamHandler() { 72 this(MAX_BUFFERS_IN_QUEUE); 73 } 74 75 public HttpInputStreamHandler(int maxBuffers) { 76 this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 77 } 78 79 @Override 80 public synchronized HttpResponse.BodyProcessor<InputStream> 81 apply(int i, HttpHeaders hh) { 82 return new HttpResponseInputStream(maxBuffers); 83 } 84 85 /** 86 * An InputStream built on top of the Flow API. 87 */ 88 private static class HttpResponseInputStream extends InputStream 89 implements HttpResponse.BodyProcessor<InputStream> { 90 91 // An immutable ByteBuffer sentinel to mark that the last byte was received. 92 private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]); 93 private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER); 94 95 // A queue of yet unprocessed ByteBuffers received from the flow API. 96 private final BlockingQueue<List<ByteBuffer>> buffers; 97 private volatile Flow.Subscription subscription; 98 private volatile boolean closed; 99 private volatile Throwable failed; 100 private volatile List<ByteBuffer> currentList; 101 private volatile ByteBuffer currentBuffer; 102 103 HttpResponseInputStream() { 104 this(MAX_BUFFERS_IN_QUEUE); 105 } 106 107 HttpResponseInputStream(int maxBuffers) { 108 int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; 109 this.buffers = new ArrayBlockingQueue<>(capacity); 110 } 111 112 @Override 113 public CompletionStage<InputStream> getBody() { 114 // Return the stream immediately, before the 115 // response body is received. 116 // This makes it possible for senAsync().get().body() 117 // to complete before the response body is received. 118 return CompletableFuture.completedStage(this); 119 } 120 121 // Returns the current byte buffer to read from. 122 // If the current buffer has no remaining data, will take the 123 // next buffer from the buffers queue, possibly blocking until 124 // a new buffer is made available through the Flow API, or the 125 // end of the flow is reached. 126 private ByteBuffer current() throws IOException { 127 while (currentBuffer == null || !currentBuffer.hasRemaining()) { 128 // Check whether the stream is closed or exhausted 129 if (closed || failed != null) { 130 throw new IOException("closed", failed); 131 } 132 if (currentBuffer == LAST_BUFFER) break; 133 134 try { 135 if (currentList == null || currentList.isEmpty()) { 136 // Take a new list of buffers from the queue, blocking 137 // if none is available yet... 138 139 if (DEBUG) err.println("Taking list of Buffers"); 140 List<ByteBuffer> lb = buffers.take(); 141 if (DEBUG) err.println("List of Buffers Taken"); 142 143 // Check whether an exception was encountered upstream 144 if (closed || failed != null) 145 throw new IOException("closed", failed); 146 147 // Check whether we're done. 148 if (lb == LAST_LIST) { 149 currentList = LAST_LIST; 150 currentBuffer = LAST_BUFFER; 151 break; 152 } 153 154 currentList = new ArrayList<>(lb); // TODO: lb is immutable 155 156 // Request another upstream item ( list of buffers ) 157 Flow.Subscription s = subscription; 158 if (s != null) 159 s.request(1); 160 } 161 assert currentList != null; 162 assert !currentList.isEmpty(); 163 if (DEBUG) err.println("Next Buffer"); 164 currentBuffer = currentList.remove(0); 165 } catch (InterruptedException ex) { 166 // continue 167 } 168 } 169 assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining(); 170 return currentBuffer; 171 } 172 173 @Override 174 public int read(byte[] bytes, int off, int len) throws IOException { 175 // get the buffer to read from, possibly blocking if 176 // none is available 177 ByteBuffer buffer; 178 if ((buffer = current()) == LAST_BUFFER) return -1; 179 180 // don't attempt to read more than what is available 181 // in the current buffer. 182 int read = Math.min(buffer.remaining(), len); 183 assert read > 0 && read <= buffer.remaining(); 184 185 // buffer.get() will do the boundary check for us. 186 buffer.get(bytes, off, read); 187 return read; 188 } 189 190 @Override 191 public int read() throws IOException { 192 ByteBuffer buffer; 193 if ((buffer = current()) == LAST_BUFFER) return -1; 194 return buffer.get() & 0xFF; 195 } 196 197 @Override 198 public void onSubscribe(Flow.Subscription s) { 199 this.subscription = s; 200 s.request(Math.max(2, buffers.remainingCapacity() + 1)); 201 } 202 203 @Override 204 public synchronized void onNext(List<ByteBuffer> t) { 205 try { 206 if (DEBUG) err.println("next item received"); 207 buffers.put(t); 208 if (DEBUG) err.println("item offered"); 209 } catch (Exception ex) { 210 failed = ex; 211 try { 212 close(); 213 } catch (IOException ex1) { 214 // OK 215 } 216 } 217 } 218 219 @Override 220 public void onError(Throwable thrwbl) { 221 failed = thrwbl; 222 } 223 224 @Override 225 public synchronized void onComplete() { 226 subscription = null; 227 onNext(LAST_LIST); 228 } 229 230 @Override 231 public void close() throws IOException { 232 synchronized (this) { 233 closed = true; 234 Flow.Subscription s = subscription; 235 if (s != null) { 236 s.cancel(); 237 } 238 subscription = null; 239 } 240 super.close(); 241 } 242 243 } 244 } 245 246 /** 247 * Examine the response headers to figure out the charset used to 273 } 274 275 public static void main(String[] args) throws Exception { 276 HttpClient client = HttpClient.newHttpClient(); 277 HttpRequest request = HttpRequest 278 .newBuilder(new URI("http://hg.openjdk.java.net/jdk9/sandbox/jdk/shortlog/http-client-branch/")) 279 .GET() 280 .build(); 281 282 // This example shows how to return an InputStream that can be used to 283 // start reading the response body before the response is fully received. 284 // In comparison, the snipet below (which uses 285 // HttpResponse.BodyHandler.asString()) obviously will not return before the 286 // response body is fully read: 287 // 288 // System.out.println( 289 // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); 290 291 CompletableFuture<HttpResponse<InputStream>> handle = 292 client.sendAsync(request, new HttpInputStreamHandler()); 293 if (DEBUG) err.println("Request sent"); 294 295 HttpResponse<InputStream> pending = handle.get(); 296 297 // At this point, the response headers have been received, but the 298 // response body may not have arrived yet. This comes from 299 // the implementation of HttpResponseInputStream::getBody above, 300 // which returns an already completed completion stage, without 301 // waiting for any data. 302 // We can therefore access the headers - and the body, which 303 // is our live InputStream, without waiting... 304 HttpHeaders responseHeaders = pending.headers(); 305 306 // Get the charset declared in the response headers. 307 // The optional will be empty if the content type is not 308 // of type text/... 309 Optional<Charset> charset = getCharset(responseHeaders); 310 311 try (InputStream is = pending.body(); 312 // We assume a textual content type. Construct an InputStream 313 // Reader with the appropriate Charset. 314 // charset.get() will throw NPE if the content is not textual. 315 Reader r = new InputStreamReader(is, charset.get())) { 316 317 char[] buff = new char[32]; 318 int off=0, n=0; 319 if (DEBUG) err.println("Start receiving response body"); 320 if (DEBUG) err.println("Charset: " + charset.get()); 321 322 // Start consuming the InputStream as the data arrives. 323 // Will block until there is something to read... 324 while ((n = r.read(buff, off, buff.length - off)) > 0) { 325 assert (buff.length - off) > 0; 326 assert n <= (buff.length - off); 327 if (n == (buff.length - off)) { 328 System.out.print(buff); 329 off = 0; 330 } else { 331 off += n; 332 } 333 assert off < buff.length; 334 } 335 336 // last call to read may not have filled 'buff' completely. 337 // flush out the remaining characters. 338 assert off >= 0 && off < buff.length; 339 for (int i=0; i < off; i++) { 340 System.out.print(buff[i]); |