< prev index next >
src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java
Print this page
8193174: SubmissionPublisher invokes the Subscriber's onComplete before all of its submitted items have been published
Reviewed-by: martin, psandoz, chegar
@@ -1250,26 +1250,28 @@
}
else if ((taken = takeItems(s, d, h)) > 0) {
head = h += taken;
d = subtractDemand(taken);
}
- else if ((empty = (t == h)) && (c & COMPLETE) != 0) {
- closeOnComplete(s); // end of stream
- break;
- }
else if ((d = demand) == 0L && (c & REQS) != 0)
weakCasCtl(c, c & ~REQS); // exhausted demand
else if (d != 0L && (c & REQS) == 0)
weakCasCtl(c, c | REQS); // new demand
- else if (t == (t = tail) && (empty || d == 0L)) {
+ else if (t == (t = tail)) { // stability check
+ if ((empty = (t == h)) && (c & COMPLETE) != 0) {
+ closeOnComplete(s); // end of stream
+ break;
+ }
+ else if (empty || d == 0L) {
int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
if (weakCasCtl(c, c & ~bit) && bit == RUN)
break; // un-keep-alive or exit
}
}
}
}
+ }
/**
* Consumes some items until unavailable or bound or error.
*
* @param s subscriber
< prev index next >