< prev index next >
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java
Print this page
@@ -1,7 +1,7 @@
/*
- * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
@@ -25,65 +25,114 @@
package jdk.incubator.http;
import java.util.Iterator;
import java.util.concurrent.Flow;
+import jdk.incubator.http.internal.common.Demand;
+import jdk.incubator.http.internal.common.SequentialScheduler;
/**
- * A Publisher that is expected to run in same thread as subscriber.
- * Items are obtained from Iterable. Each new subscription gets a new Iterator.
+ * A Publisher that publishes items obtained from the given Iterable. Each new
+ * subscription gets a new Iterator.
*/
class PullPublisher<T> implements Flow.Publisher<T> {
+ // Only one of `iterable` and `throwable` can be non-null. throwable is
+ // non-null when an error has been encountered, by the creator of
+ // PullPublisher, while subscribing the subscriber, but before subscribe has
+ // completed.
private final Iterable<T> iterable;
+ private final Throwable throwable;
- PullPublisher(Iterable<T> iterable) {
+ PullPublisher(Iterable<T> iterable, Throwable throwable) {
this.iterable = iterable;
+ this.throwable = throwable;
+ }
+
+ PullPublisher(Iterable<T> iterable) {
+ this(iterable, null);
}
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
- subscriber.onSubscribe(new Subscription(subscriber, iterable.iterator()));
+ Subscription sub;
+ if (throwable != null) {
+ assert iterable == null : "non-null iterable: " + iterable;
+ sub = new Subscription(subscriber, null, throwable);
+ } else {
+ assert throwable == null : "non-null exception: " + throwable;
+ sub = new Subscription(subscriber, iterable.iterator(), null);
+ }
+ subscriber.onSubscribe(sub);
+
+ if (throwable != null) {
+ sub.pullScheduler.runOrSchedule();
+ }
}
private class Subscription implements Flow.Subscription {
private final Flow.Subscriber<? super T> subscriber;
private final Iterator<T> iter;
- private boolean done = false;
- private long demand = 0;
- private int recursion = 0;
-
- Subscription(Flow.Subscriber<? super T> subscriber, Iterator<T> iter) {
+ private volatile boolean completed;
+ private volatile boolean cancelled;
+ private volatile Throwable error;
+ final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask());
+ private final Demand demand = new Demand();
+
+ Subscription(Flow.Subscriber<? super T> subscriber,
+ Iterator<T> iter,
+ Throwable throwable) {
this.subscriber = subscriber;
this.iter = iter;
+ this.error = throwable;
}
+ final class PullTask extends SequentialScheduler.CompleteRestartableTask {
@Override
- public void request(long n) {
- if (done) {
- subscriber.onError(new IllegalArgumentException("request(" + n + ")"));
- }
- demand += n;
- recursion ++;
- if (recursion > 1) {
+ protected void run() {
+ if (completed || cancelled) {
return;
}
- while (demand > 0) {
- done = !iter.hasNext();
- if (done) {
- subscriber.onComplete();
- recursion --;
+
+ Throwable t = error;
+ if (t != null) {
+ completed = true;
+ pullScheduler.stop();
+ subscriber.onError(t);
return;
}
+
+ while (demand.tryDecrement() && !cancelled) {
+ if (!iter.hasNext()) {
+ break;
+ } else {
subscriber.onNext(iter.next());
- demand --;
+ }
+ }
+ if (!iter.hasNext() && !cancelled) {
+ completed = true;
+ pullScheduler.stop();
+ subscriber.onComplete();
+ }
}
}
@Override
- public void cancel() {
- done = true;
+ public void request(long n) {
+ if (cancelled)
+ return; // no-op
+
+ if (n <= 0) {
+ error = new IllegalArgumentException("illegal non-positive request:" + n);
+ } else {
+ demand.increase(n);
+ }
+ pullScheduler.runOrSchedule();
}
+ @Override
+ public void cancel() {
+ cancelled = true;
+ }
}
}
< prev index next >