< prev index next >

src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java

Print this page
8193174: SubmissionPublisher invokes the Subscriber's onComplete before all of its submitted items have been published
Reviewed-by: martin, psandoz, chegar

@@ -1250,26 +1250,28 @@
                     }
                     else if ((taken = takeItems(s, d, h)) > 0) {
                         head = h += taken;
                         d = subtractDemand(taken);
                     }
-                    else if ((empty = (t == h)) && (c & COMPLETE) != 0) {
-                        closeOnComplete(s);          // end of stream
-                        break;
-                    }
                     else if ((d = demand) == 0L && (c & REQS) != 0)
                         weakCasCtl(c, c & ~REQS);    // exhausted demand
                     else if (d != 0L && (c & REQS) == 0)
                         weakCasCtl(c, c | REQS);     // new demand
-                    else if (t == (t = tail) && (empty || d == 0L)) {
+                    else if (t == (t = tail)) {      // stability check
+                        if ((empty = (t == h)) && (c & COMPLETE) != 0) {
+                            closeOnComplete(s);      // end of stream
+                            break;
+                        }
+                        else if (empty || d == 0L) {
                         int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
                         if (weakCasCtl(c, c & ~bit) && bit == RUN)
                             break;                   // un-keep-alive or exit
                     }
                 }
             }
         }
+        }
 
         /**
          * Consumes some items until unavailable or bound or error.
          *
          * @param s subscriber
< prev index next >