< prev index next >
src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java
Print this page
8193174: SubmissionPublisher invokes the Subscriber's onComplete before all of its submitted items have been published
Reviewed-by: martin, psandoz, chegar
*** 1250,1275 ****
}
else if ((taken = takeItems(s, d, h)) > 0) {
head = h += taken;
d = subtractDemand(taken);
}
- else if ((empty = (t == h)) && (c & COMPLETE) != 0) {
- closeOnComplete(s); // end of stream
- break;
- }
else if ((d = demand) == 0L && (c & REQS) != 0)
weakCasCtl(c, c & ~REQS); // exhausted demand
else if (d != 0L && (c & REQS) == 0)
weakCasCtl(c, c | REQS); // new demand
! else if (t == (t = tail) && (empty || d == 0L)) {
int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
if (weakCasCtl(c, c & ~bit) && bit == RUN)
break; // un-keep-alive or exit
}
}
}
}
/**
* Consumes some items until unavailable or bound or error.
*
* @param s subscriber
--- 1250,1277 ----
}
else if ((taken = takeItems(s, d, h)) > 0) {
head = h += taken;
d = subtractDemand(taken);
}
else if ((d = demand) == 0L && (c & REQS) != 0)
weakCasCtl(c, c & ~REQS); // exhausted demand
else if (d != 0L && (c & REQS) == 0)
weakCasCtl(c, c | REQS); // new demand
! else if (t == (t = tail)) { // stability check
! if ((empty = (t == h)) && (c & COMPLETE) != 0) {
! closeOnComplete(s); // end of stream
! break;
! }
! else if (empty || d == 0L) {
int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
if (weakCasCtl(c, c & ~bit) && bit == RUN)
break; // un-keep-alive or exit
}
}
}
}
+ }
/**
* Consumes some items until unavailable or bound or error.
*
* @param s subscriber
< prev index next >