< prev index next >

test/jdk/java/net/httpclient/HttpInputStreamTest.java

Print this page

        

*** 1,7 **** /* ! * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. --- 1,7 ---- /* ! * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation.
*** 30,52 **** import jdk.incubator.http.HttpHeaders; import jdk.incubator.http.HttpRequest; import jdk.incubator.http.HttpResponse; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Locale; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.stream.Stream; /* * @test * @summary An example on how to read a response body with InputStream... ! * @run main/othervm HttpInputStreamTest * @author daniel fuchs */ public class HttpInputStreamTest { public static boolean DEBUG = Boolean.getBoolean("test.debug"); --- 30,55 ---- import jdk.incubator.http.HttpHeaders; import jdk.incubator.http.HttpRequest; import jdk.incubator.http.HttpResponse; import java.nio.ByteBuffer; import java.nio.charset.Charset; + import java.util.Iterator; + import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; import java.util.stream.Stream; + import static java.lang.System.err; /* * @test * @summary An example on how to read a response body with InputStream... ! * @run main/othervm -Dtest.debug=true HttpInputStreamTest * @author daniel fuchs */ public class HttpInputStreamTest { public static boolean DEBUG = Boolean.getBoolean("test.debug");
*** 59,69 **** * the response body is fully received. */ public static class HttpInputStreamHandler implements HttpResponse.BodyHandler<InputStream> { ! public static final int MAX_BUFFERS_IN_QUEUE = 1; private final int maxBuffers; public HttpInputStreamHandler() { this(MAX_BUFFERS_IN_QUEUE); --- 62,72 ---- * the response body is fully received. */ public static class HttpInputStreamHandler implements HttpResponse.BodyHandler<InputStream> { ! public static final int MAX_BUFFERS_IN_QUEUE = 1; // lock-step with the producer private final int maxBuffers; public HttpInputStreamHandler() { this(MAX_BUFFERS_IN_QUEUE);
*** 72,109 **** public HttpInputStreamHandler(int maxBuffers) { this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; } @Override ! public synchronized HttpResponse.BodyProcessor<InputStream> apply(int i, HttpHeaders hh) { return new HttpResponseInputStream(maxBuffers); } /** * An InputStream built on top of the Flow API. */ private static class HttpResponseInputStream extends InputStream ! implements HttpResponse.BodyProcessor<InputStream> { // An immutable ByteBuffer sentinel to mark that the last byte was received. ! private static final ByteBuffer LAST = ByteBuffer.wrap(new byte[0]); // A queue of yet unprocessed ByteBuffers received from the flow API. ! private final BlockingQueue<ByteBuffer> buffers; private volatile Flow.Subscription subscription; private volatile boolean closed; private volatile Throwable failed; ! private volatile ByteBuffer current; HttpResponseInputStream() { this(MAX_BUFFERS_IN_QUEUE); } HttpResponseInputStream(int maxBuffers) { int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; ! this.buffers = new ArrayBlockingQueue<>(capacity); } @Override public CompletionStage<InputStream> getBody() { // Return the stream immediately, before the --- 75,115 ---- public HttpInputStreamHandler(int maxBuffers) { this.maxBuffers = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; } @Override ! public HttpResponse.BodySubscriber<InputStream> apply(int i, HttpHeaders hh) { return new HttpResponseInputStream(maxBuffers); } /** * An InputStream built on top of the Flow API. */ private static class HttpResponseInputStream extends InputStream ! implements HttpResponse.BodySubscriber<InputStream> { // An immutable ByteBuffer sentinel to mark that the last byte was received. ! private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]); ! private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER); // A queue of yet unprocessed ByteBuffers received from the flow API. ! private final BlockingQueue<List<ByteBuffer>> buffers; private volatile Flow.Subscription subscription; private volatile boolean closed; private volatile Throwable failed; ! private volatile Iterator<ByteBuffer> currentListItr; ! private volatile ByteBuffer currentBuffer; HttpResponseInputStream() { this(MAX_BUFFERS_IN_QUEUE); } HttpResponseInputStream(int maxBuffers) { int capacity = maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers; ! // 1 additional slot for LAST_LIST added by onComplete ! this.buffers = new ArrayBlockingQueue<>(capacity + 1); } @Override public CompletionStage<InputStream> getBody() { // Return the stream immediately, before the
*** 117,168 **** // If the current buffer has no remaining data, will take the // next buffer from the buffers queue, possibly blocking until // a new buffer is made available through the Flow API, or the // end of the flow is reached. private ByteBuffer current() throws IOException { ! while (current == null || !current.hasRemaining()) { ! // Check whether the stream is claused or exhausted if (closed || failed != null) { throw new IOException("closed", failed); } ! if (current == LAST) break; try { ! // Take a new buffer from the queue, blocking // if none is available yet... - if (DEBUG) System.err.println("Taking Buffer"); - current = buffers.take(); - if (DEBUG) System.err.println("Buffer Taken"); ! // Check whether some exception was encountered ! // upstream ! if (closed || failed != null) { throw new IOException("closed", failed); - } // Check whether we're done. ! if (current == LAST) break; ! // Inform the producer that it can start sending ! // us a new buffer Flow.Subscription s = subscription; ! if (s != null) s.request(1); ! } catch (InterruptedException ex) { // continue } } ! assert current == LAST || current.hasRemaining(); ! return current; } @Override public int read(byte[] bytes, int off, int len) throws IOException { // get the buffer to read from, possibly blocking if // none is available ByteBuffer buffer; ! if ((buffer = current()) == LAST) return -1; // don't attempt to read more than what is available // in the current buffer. int read = Math.min(buffer.remaining(), len); assert read > 0 && read <= buffer.remaining(); --- 123,183 ---- // If the current buffer has no remaining data, will take the // next buffer from the buffers queue, possibly blocking until // a new buffer is made available through the Flow API, or the // end of the flow is reached. private ByteBuffer current() throws IOException { ! while (currentBuffer == null || !currentBuffer.hasRemaining()) { ! // Check whether the stream is closed or exhausted if (closed || failed != null) { throw new IOException("closed", failed); } ! if (currentBuffer == LAST_BUFFER) break; try { ! if (currentListItr == null || !currentListItr.hasNext()) { ! // Take a new list of buffers from the queue, blocking // if none is available yet... ! if (DEBUG) err.println("Taking list of Buffers"); ! List<ByteBuffer> lb = buffers.take(); ! currentListItr = lb.iterator(); ! if (DEBUG) err.println("List of Buffers Taken"); ! ! // Check whether an exception was encountered upstream ! if (closed || failed != null) throw new IOException("closed", failed); // Check whether we're done. ! if (lb == LAST_LIST) { ! currentListItr = null; ! currentBuffer = LAST_BUFFER; ! break; ! } ! // Request another upstream item ( list of buffers ) Flow.Subscription s = subscription; ! if (s != null) ! s.request(1); ! } ! assert currentListItr != null; ! assert currentListItr.hasNext(); ! if (DEBUG) err.println("Next Buffer"); ! currentBuffer = currentListItr.next(); } catch (InterruptedException ex) { // continue } } ! assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining(); ! return currentBuffer; } @Override public int read(byte[] bytes, int off, int len) throws IOException { // get the buffer to read from, possibly blocking if // none is available ByteBuffer buffer; ! if ((buffer = current()) == LAST_BUFFER) return -1; // don't attempt to read more than what is available // in the current buffer. int read = Math.min(buffer.remaining(), len); assert read > 0 && read <= buffer.remaining();
*** 173,198 **** } @Override public int read() throws IOException { ByteBuffer buffer; ! if ((buffer = current()) == LAST) return -1; return buffer.get() & 0xFF; } @Override public void onSubscribe(Flow.Subscription s) { this.subscription = s; ! s.request(Math.max(2, buffers.remainingCapacity() + 1)); } @Override ! public synchronized void onNext(ByteBuffer t) { try { ! if (DEBUG) System.err.println("next buffer received"); ! buffers.put(t); ! if (DEBUG) System.err.println("buffered offered"); } catch (Exception ex) { failed = ex; try { close(); } catch (IOException ex1) { --- 188,222 ---- } @Override public int read() throws IOException { ByteBuffer buffer; ! if ((buffer = current()) == LAST_BUFFER) return -1; return buffer.get() & 0xFF; } @Override public void onSubscribe(Flow.Subscription s) { + if (this.subscription != null) { + s.cancel(); + return; + } this.subscription = s; ! assert buffers.remainingCapacity() > 1; // should at least be 2 ! if (DEBUG) err.println("onSubscribe: requesting " ! + Math.max(1, buffers.remainingCapacity() - 1)); ! s.request(Math.max(1, buffers.remainingCapacity() - 1)); } @Override ! public void onNext(List<ByteBuffer> t) { try { ! if (DEBUG) err.println("next item received"); ! if (!buffers.offer(t)) { ! throw new IllegalStateException("queue is full"); ! } ! if (DEBUG) err.println("item offered"); } catch (Exception ex) { failed = ex; try { close(); } catch (IOException ex1) {
*** 201,229 **** } } @Override public void onError(Throwable thrwbl) { failed = thrwbl; } @Override ! public synchronized void onComplete() { subscription = null; ! onNext(LAST); } @Override public void close() throws IOException { synchronized (this) { closed = true; Flow.Subscription s = subscription; if (s != null) { s.cancel(); } - subscription = null; - } super.close(); } } } --- 225,255 ---- } } @Override public void onError(Throwable thrwbl) { + subscription = null; failed = thrwbl; } @Override ! public void onComplete() { subscription = null; ! onNext(LAST_LIST); } @Override public void close() throws IOException { synchronized (this) { + if (closed) return; closed = true; + } Flow.Subscription s = subscription; + subscription = null; if (s != null) { s.cancel(); } super.close(); } } }
*** 272,283 **** // // System.out.println( // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); CompletableFuture<HttpResponse<InputStream>> handle = ! client.sendAsync(request, new HttpInputStreamHandler()); ! if (DEBUG) System.err.println("Request sent"); HttpResponse<InputStream> pending = handle.get(); // At this point, the response headers have been received, but the // response body may not have arrived yet. This comes from --- 298,309 ---- // // System.out.println( // client.sendAsync(request, HttpResponse.BodyHandler.asString()).get().body()); CompletableFuture<HttpResponse<InputStream>> handle = ! client.sendAsync(request, new HttpInputStreamHandler(3)); ! if (DEBUG) err.println("Request sent"); HttpResponse<InputStream> pending = handle.get(); // At this point, the response headers have been received, but the // response body may not have arrived yet. This comes from
*** 299,310 **** // charset.get() will throw NPE if the content is not textual. Reader r = new InputStreamReader(is, charset.get())) { char[] buff = new char[32]; int off=0, n=0; ! if (DEBUG) System.err.println("Start receiving response body"); ! if (DEBUG) System.err.println("Charset: " + charset.get()); // Start consuming the InputStream as the data arrives. // Will block until there is something to read... while ((n = r.read(buff, off, buff.length - off)) > 0) { assert (buff.length - off) > 0; --- 325,336 ---- // charset.get() will throw NPE if the content is not textual. Reader r = new InputStreamReader(is, charset.get())) { char[] buff = new char[32]; int off=0, n=0; ! if (DEBUG) err.println("Start receiving response body"); ! if (DEBUG) err.println("Charset: " + charset.get()); // Start consuming the InputStream as the data arrives. // Will block until there is something to read... while ((n = r.read(buff, off, buff.length - off)) > 0) { assert (buff.length - off) > 0;
< prev index next >