< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseProcessors.java

Print this page

        

*** 70,83 **** this.subscription = subscription; subscription.request(1); } @Override ! public void onNext(ByteBuffer item) { byte[] buf = new byte[item.remaining()]; item.get(buf); consumer.accept(Optional.of(buf)); subscription.request(1); } @Override public void onError(Throwable throwable) { --- 70,85 ---- this.subscription = subscription; subscription.request(1); } @Override ! public void onNext(List<ByteBuffer> items) { ! for (ByteBuffer item : items) { byte[] buf = new byte[item.remaining()]; item.get(buf); consumer.accept(Optional.of(buf)); + } subscription.request(1); } @Override public void onError(Throwable throwable) {
*** 118,130 **** } subscription.request(1); } @Override ! public void onNext(ByteBuffer item) { try { out.write(item); } catch (IOException ex) { Utils.close(out); subscription.cancel(); result.completeExceptionally(ex); } --- 120,134 ---- } subscription.request(1); } @Override ! public void onNext(List<ByteBuffer> items) { try { + for (ByteBuffer item : items) { out.write(item); + } } catch (IOException ex) { Utils.close(out); subscription.cancel(); result.completeExceptionally(ex); }
*** 170,185 **** // We can handle whatever you've got subscription.request(Long.MAX_VALUE); } @Override ! public void onNext(ByteBuffer item) { // incoming buffers are allocated by http client internally, // and won't be used anywhere except this place. // So it's free simply to store them for further processing. ! if(item.hasRemaining()) { ! received.add(item); } } @Override public void onError(Throwable throwable) { --- 174,189 ---- // We can handle whatever you've got subscription.request(Long.MAX_VALUE); } @Override ! public void onNext(List<ByteBuffer> items) { // incoming buffers are allocated by http client internally, // and won't be used anywhere except this place. // So it's free simply to store them for further processing. ! if(Utils.remaining(items) > 0) { ! received.addAll(items); } } @Override public void onError(Throwable throwable) {
*** 302,315 **** this.subscription = subscription; subscription.request(Long.MAX_VALUE); } @Override ! public void onNext(ByteBuffer item) { // TODO: check whether this should consume the buffer, as in: item.position(item.limit()); } @Override public void onError(Throwable throwable) { cf.completeExceptionally(throwable); } --- 306,321 ---- this.subscription = subscription; subscription.request(Long.MAX_VALUE); } @Override ! public void onNext(List<ByteBuffer> items) { // TODO: check whether this should consume the buffer, as in: + for (ByteBuffer item : items) { item.position(item.limit()); } + } @Override public void onError(Throwable throwable) { cf.completeExceptionally(throwable); }
< prev index next >