< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseProcessors.java
Print this page
@@ -70,14 +70,16 @@
this.subscription = subscription;
subscription.request(1);
}
@Override
- public void onNext(ByteBuffer item) {
+ public void onNext(List<ByteBuffer> items) {
+ for (ByteBuffer item : items) {
byte[] buf = new byte[item.remaining()];
item.get(buf);
consumer.accept(Optional.of(buf));
+ }
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
@@ -118,13 +120,15 @@
}
subscription.request(1);
}
@Override
- public void onNext(ByteBuffer item) {
+ public void onNext(List<ByteBuffer> items) {
try {
+ for (ByteBuffer item : items) {
out.write(item);
+ }
} catch (IOException ex) {
Utils.close(out);
subscription.cancel();
result.completeExceptionally(ex);
}
@@ -170,16 +174,16 @@
// We can handle whatever you've got
subscription.request(Long.MAX_VALUE);
}
@Override
- public void onNext(ByteBuffer item) {
+ public void onNext(List<ByteBuffer> items) {
// incoming buffers are allocated by http client internally,
// and won't be used anywhere except this place.
// So it's free simply to store them for further processing.
- if(item.hasRemaining()) {
- received.add(item);
+ if(Utils.remaining(items) > 0) {
+ received.addAll(items);
}
}
@Override
public void onError(Throwable throwable) {
@@ -302,14 +306,16 @@
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
- public void onNext(ByteBuffer item) {
+ public void onNext(List<ByteBuffer> items) {
// TODO: check whether this should consume the buffer, as in:
+ for (ByteBuffer item : items) {
item.position(item.limit());
}
+ }
@Override
public void onError(Throwable throwable) {
cf.completeExceptionally(throwable);
}
< prev index next >