< prev index next >

src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java

Print this page

        

@@ -1,7 +1,7 @@
 /*
- * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 only, as
  * published by the Free Software Foundation.  Oracle designates this

@@ -25,65 +25,114 @@
 
 package jdk.incubator.http;
 
 import java.util.Iterator;
 import java.util.concurrent.Flow;
+import jdk.incubator.http.internal.common.Demand;
+import jdk.incubator.http.internal.common.SequentialScheduler;
 
 /**
- * A Publisher that is expected to run in same thread as subscriber.
- * Items are obtained from Iterable. Each new subscription gets a new Iterator.
+ * A Publisher that publishes items obtained from the given Iterable. Each new
+ * subscription gets a new Iterator.
  */
 class PullPublisher<T> implements Flow.Publisher<T> {
 
+    // Only one of `iterable` and `throwable` can be non-null. throwable is
+    // non-null when an error has been encountered, by the creator of
+    // PullPublisher, while subscribing the subscriber, but before subscribe has
+    // completed.
     private final Iterable<T> iterable;
+    private final Throwable throwable;
 
-    PullPublisher(Iterable<T> iterable) {
+    PullPublisher(Iterable<T> iterable, Throwable throwable) {
         this.iterable = iterable;
+        this.throwable = throwable;
+    }
+
+    PullPublisher(Iterable<T> iterable) {
+        this(iterable, null);
     }
 
     @Override
     public void subscribe(Flow.Subscriber<? super T> subscriber) {
-        subscriber.onSubscribe(new Subscription(subscriber, iterable.iterator()));
+        Subscription sub;
+        if (throwable != null) {
+            assert iterable == null : "non-null iterable: " + iterable;
+            sub = new Subscription(subscriber, null, throwable);
+        } else {
+            assert throwable == null : "non-null exception: " + throwable;
+            sub = new Subscription(subscriber, iterable.iterator(), null);
+        }
+        subscriber.onSubscribe(sub);
+
+        if (throwable != null) {
+            sub.pullScheduler.runOrSchedule();
+        }
     }
 
     private class Subscription implements Flow.Subscription {
 
         private final Flow.Subscriber<? super T> subscriber;
         private final Iterator<T> iter;
-        private boolean done = false;
-        private long demand = 0;
-        private int recursion = 0;
-
-        Subscription(Flow.Subscriber<? super T> subscriber, Iterator<T> iter) {
+        private volatile boolean completed;
+        private volatile boolean cancelled;
+        private volatile Throwable error;
+        final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask());
+        private final Demand demand = new Demand();
+
+        Subscription(Flow.Subscriber<? super T> subscriber,
+                     Iterator<T> iter,
+                     Throwable throwable) {
             this.subscriber = subscriber;
             this.iter = iter;
+            this.error = throwable;
         }
 
+        final class PullTask extends SequentialScheduler.CompleteRestartableTask {
         @Override
-        public void request(long n) {
-            if (done) {
-                subscriber.onError(new IllegalArgumentException("request(" + n + ")"));
-            }
-            demand += n;
-            recursion ++;
-            if (recursion > 1) {
+            protected void run() {
+                if (completed || cancelled) {
                 return;
             }
-            while (demand > 0) {
-                done = !iter.hasNext();
-                if (done) {
-                    subscriber.onComplete();
-                    recursion --;
+
+                Throwable t = error;
+                if (t != null) {
+                    completed = true;
+                    pullScheduler.stop();
+                    subscriber.onError(t);
                     return;
                 }
+
+                while (demand.tryDecrement() && !cancelled) {
+                    if (!iter.hasNext()) {
+                        break;
+                    } else {
                 subscriber.onNext(iter.next());
-                demand --;
+                    }
+                }
+                if (!iter.hasNext() && !cancelled) {
+                    completed = true;
+                    pullScheduler.stop();
+                    subscriber.onComplete();
+                }
             }
         }
 
         @Override
-        public void cancel() {
-            done = true;
+        public void request(long n) {
+            if (cancelled)
+                return;  // no-op
+
+            if (n <= 0) {
+                error = new IllegalArgumentException("illegal non-positive request:" + n);
+            } else {
+                demand.increase(n);
+            }
+            pullScheduler.runOrSchedule();
         }
 
+        @Override
+        public void cancel() {
+            cancelled = true;
+        }
     }
 }
< prev index next >