30 import java.lang.System.Logger.Level; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.FileChannel; 33 import java.nio.file.OpenOption; 34 import java.nio.file.Path; 35 import java.security.AccessControlContext; 36 import java.security.AccessController; 37 import java.security.PrivilegedActionException; 38 import java.security.PrivilegedExceptionAction; 39 import java.util.ArrayList; 40 import java.util.Iterator; 41 import java.util.List; 42 import java.util.Objects; 43 import java.util.Optional; 44 import java.util.concurrent.ArrayBlockingQueue; 45 import java.util.concurrent.BlockingQueue; 46 import java.util.concurrent.CompletableFuture; 47 import java.util.concurrent.CompletionStage; 48 import java.util.concurrent.ConcurrentHashMap; 49 import java.util.concurrent.Flow; 50 import java.util.concurrent.atomic.AtomicBoolean; 51 import java.util.function.Consumer; 52 import java.util.function.Function; 53 import jdk.incubator.http.internal.common.MinimalFuture; 54 import jdk.incubator.http.internal.common.Utils; 55 56 class ResponseSubscribers { 57 58 static class ConsumerSubscriber implements HttpResponse.BodySubscriber<Void> { 59 private final Consumer<Optional<byte[]>> consumer; 60 private Flow.Subscription subscription; 61 private final CompletableFuture<Void> result = new MinimalFuture<>(); 62 private final AtomicBoolean subscribed = new AtomicBoolean(); 63 64 ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) { 65 this.consumer = consumer; 66 } 67 68 @Override 69 public CompletionStage<Void> getBody() { 538 public void onNext(List<ByteBuffer> items) { 539 Objects.requireNonNull(items); 540 } 541 542 @Override 543 public void onError(Throwable throwable) { 544 cf.completeExceptionally(throwable); 545 } 546 547 @Override 548 public void onComplete() { 549 if (result.isPresent()) { 550 cf.complete(result.get()); 551 } else { 552 cf.complete(null); 553 } 554 } 555 556 @Override 557 public CompletionStage<T> getBody() { 558 return cf; 559 } 560 } 561 } | 30 import java.lang.System.Logger.Level; 31 import java.nio.ByteBuffer; 32 import java.nio.channels.FileChannel; 33 import java.nio.file.OpenOption; 34 import java.nio.file.Path; 35 import java.security.AccessControlContext; 36 import java.security.AccessController; 37 import java.security.PrivilegedActionException; 38 import java.security.PrivilegedExceptionAction; 39 import java.util.ArrayList; 40 import java.util.Iterator; 41 import java.util.List; 42 import java.util.Objects; 43 import java.util.Optional; 44 import java.util.concurrent.ArrayBlockingQueue; 45 import java.util.concurrent.BlockingQueue; 46 import java.util.concurrent.CompletableFuture; 47 import java.util.concurrent.CompletionStage; 48 import java.util.concurrent.ConcurrentHashMap; 49 import java.util.concurrent.Flow; 50 import java.util.concurrent.Flow.Subscriber; 51 import java.util.concurrent.Flow.Subscription; 52 import java.util.concurrent.atomic.AtomicBoolean; 53 import java.util.function.Consumer; 54 import java.util.function.Function; 55 import jdk.incubator.http.internal.common.MinimalFuture; 56 import jdk.incubator.http.internal.common.Utils; 57 58 class ResponseSubscribers { 59 60 static class ConsumerSubscriber implements HttpResponse.BodySubscriber<Void> { 61 private final Consumer<Optional<byte[]>> consumer; 62 private Flow.Subscription subscription; 63 private final CompletableFuture<Void> result = new MinimalFuture<>(); 64 private final AtomicBoolean subscribed = new AtomicBoolean(); 65 66 ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) { 67 this.consumer = consumer; 68 } 69 70 @Override 71 public CompletionStage<Void> getBody() { 540 public void onNext(List<ByteBuffer> items) { 541 Objects.requireNonNull(items); 542 } 543 544 @Override 545 public void onError(Throwable throwable) { 546 cf.completeExceptionally(throwable); 547 } 548 549 @Override 550 public void onComplete() { 551 if (result.isPresent()) { 552 cf.complete(result.get()); 553 } else { 554 cf.complete(null); 555 } 556 } 557 558 @Override 559 public CompletionStage<T> getBody() { 560 return cf; 561 } 562 } 563 564 /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */ 565 static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R> 566 implements HttpResponse.BodySubscriber<R> 567 { 568 private final CompletableFuture<R> cf = new MinimalFuture<>(); 569 private final S subscriber; 570 private final Function<S,R> finisher; 571 572 SubscriberAdapter(S subscriber, Function<S,R> finisher) { 573 this.subscriber = Objects.requireNonNull(subscriber); 574 this.finisher = Objects.requireNonNull(finisher); 575 } 576 577 @Override 578 public void onSubscribe(Subscription subscription) { 579 subscriber.onSubscribe(subscription); 580 } 581 582 @Override 583 public void onNext(List<ByteBuffer> item) { 584 subscriber.onNext(item); 585 } 586 587 @Override 588 public void onError(Throwable throwable) { 589 try { 590 subscriber.onError(throwable); 591 } finally { 592 cf.completeExceptionally(throwable); 593 } 594 } 595 596 @Override 597 public void onComplete() { 598 try { 599 subscriber.onComplete(); 600 } finally { 601 cf.complete(finisher.apply(subscriber)); 602 } 603 } 604 605 @Override 606 public CompletionStage<R> getBody() { 607 return cf; 608 } 609 } 610 } |