--- old/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java 2017-11-30 04:04:07.571803478 -0800 +++ new/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java 2017-11-30 04:04:07.376786429 -0800 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -27,63 +27,112 @@ import java.util.Iterator; import java.util.concurrent.Flow; +import jdk.incubator.http.internal.common.Demand; +import jdk.incubator.http.internal.common.SequentialScheduler; /** - * A Publisher that is expected to run in same thread as subscriber. - * Items are obtained from Iterable. Each new subscription gets a new Iterator. + * A Publisher that publishes items obtained from the given Iterable. Each new + * subscription gets a new Iterator. */ class PullPublisher implements Flow.Publisher { + // Only one of `iterable` and `throwable` can be non-null. throwable is + // non-null when an error has been encountered, by the creator of + // PullPublisher, while subscribing the subscriber, but before subscribe has + // completed. private final Iterable iterable; + private final Throwable throwable; - PullPublisher(Iterable iterable) { + PullPublisher(Iterable iterable, Throwable throwable) { this.iterable = iterable; + this.throwable = throwable; + } + + PullPublisher(Iterable iterable) { + this(iterable, null); } @Override public void subscribe(Flow.Subscriber subscriber) { - subscriber.onSubscribe(new Subscription(subscriber, iterable.iterator())); + Subscription sub; + if (throwable != null) { + assert iterable == null : "non-null iterable: " + iterable; + sub = new Subscription(subscriber, null, throwable); + } else { + assert throwable == null : "non-null exception: " + throwable; + sub = new Subscription(subscriber, iterable.iterator(), null); + } + subscriber.onSubscribe(sub); + + if (throwable != null) { + sub.pullScheduler.runOrSchedule(); + } } private class Subscription implements Flow.Subscription { private final Flow.Subscriber subscriber; private final Iterator iter; - private boolean done = false; - private long demand = 0; - private int recursion = 0; - - Subscription(Flow.Subscriber subscriber, Iterator iter) { + private volatile boolean completed; + private volatile boolean cancelled; + private volatile Throwable error; + final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask()); + private final Demand demand = new Demand(); + + Subscription(Flow.Subscriber subscriber, + Iterator iter, + Throwable throwable) { this.subscriber = subscriber; this.iter = iter; + this.error = throwable; } - @Override - public void request(long n) { - if (done) { - subscriber.onError(new IllegalArgumentException("request(" + n + ")")); - } - demand += n; - recursion ++; - if (recursion > 1) { - return; - } - while (demand > 0) { - done = !iter.hasNext(); - if (done) { - subscriber.onComplete(); - recursion --; + final class PullTask extends SequentialScheduler.CompleteRestartableTask { + @Override + protected void run() { + if (completed || cancelled) { return; } - subscriber.onNext(iter.next()); - demand --; + + Throwable t = error; + if (t != null) { + completed = true; + pullScheduler.stop(); + subscriber.onError(t); + return; + } + + while (demand.tryDecrement() && !cancelled) { + if (!iter.hasNext()) { + break; + } else { + subscriber.onNext(iter.next()); + } + } + if (!iter.hasNext() && !cancelled) { + completed = true; + pullScheduler.stop(); + subscriber.onComplete(); + } } } @Override - public void cancel() { - done = true; + public void request(long n) { + if (cancelled) + return; // no-op + + if (n <= 0) { + error = new IllegalArgumentException("illegal non-positive request:" + n); + } else { + demand.increase(n); + } + pullScheduler.runOrSchedule(); } + @Override + public void cancel() { + cancelled = true; + } } }