< prev index next >

src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java

Print this page
8193174: SubmissionPublisher invokes the Subscriber's onComplete before all of its submitted items have been published
Reviewed-by: martin, psandoz, chegar


1235 
1236         /**
1237          * Consumer loop, called from ConsumerTask, or indirectly when
1238          * helping during submit.
1239          */
1240         final void consume() {
1241             Subscriber<? super T> s;
1242             if ((s = subscriber) != null) {          // hoist checks
1243                 subscribeOnOpen(s);
1244                 long d = demand;
1245                 for (int h = head, t = tail;;) {
1246                     int c, taken; boolean empty;
1247                     if (((c = ctl) & ERROR) != 0) {
1248                         closeOnError(s, null);
1249                         break;
1250                     }
1251                     else if ((taken = takeItems(s, d, h)) > 0) {
1252                         head = h += taken;
1253                         d = subtractDemand(taken);
1254                     }
1255                     else if ((empty = (t == h)) && (c & COMPLETE) != 0) {
1256                         closeOnComplete(s);          // end of stream
1257                         break;
1258                     }
1259                     else if ((d = demand) == 0L && (c & REQS) != 0)
1260                         weakCasCtl(c, c & ~REQS);    // exhausted demand
1261                     else if (d != 0L && (c & REQS) == 0)
1262                         weakCasCtl(c, c | REQS);     // new demand
1263                     else if (t == (t = tail) && (empty || d == 0L)) {





1264                         int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
1265                         if (weakCasCtl(c, c & ~bit) && bit == RUN)
1266                             break;                   // un-keep-alive or exit

1267                     }
1268                 }
1269             }
1270         }
1271 
1272         /**
1273          * Consumes some items until unavailable or bound or error.
1274          *
1275          * @param s subscriber
1276          * @param d current demand
1277          * @param h current head
1278          * @return number taken
1279          */
1280         final int takeItems(Subscriber<? super T> s, long d, int h) {
1281             Object[] a;
1282             int k = 0, cap;
1283             if ((a = array) != null && (cap = a.length) > 0) {
1284                 int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
1285                 int n = (d < (long)b) ? (int)d : b;
1286                 for (; k < n; ++h, ++k) {




1235 
1236         /**
1237          * Consumer loop, called from ConsumerTask, or indirectly when
1238          * helping during submit.
1239          */
1240         final void consume() {
1241             Subscriber<? super T> s;
1242             if ((s = subscriber) != null) {          // hoist checks
1243                 subscribeOnOpen(s);
1244                 long d = demand;
1245                 for (int h = head, t = tail;;) {
1246                     int c, taken; boolean empty;
1247                     if (((c = ctl) & ERROR) != 0) {
1248                         closeOnError(s, null);
1249                         break;
1250                     }
1251                     else if ((taken = takeItems(s, d, h)) > 0) {
1252                         head = h += taken;
1253                         d = subtractDemand(taken);
1254                     }




1255                     else if ((d = demand) == 0L && (c & REQS) != 0)
1256                         weakCasCtl(c, c & ~REQS);    // exhausted demand
1257                     else if (d != 0L && (c & REQS) == 0)
1258                         weakCasCtl(c, c | REQS);     // new demand
1259                     else if (t == (t = tail)) {      // stability check
1260                         if ((empty = (t == h)) && (c & COMPLETE) != 0) {
1261                             closeOnComplete(s);      // end of stream
1262                             break;
1263                         }
1264                         else if (empty || d == 0L) {
1265                             int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
1266                             if (weakCasCtl(c, c & ~bit) && bit == RUN)
1267                                 break;               // un-keep-alive or exit
1268                         }
1269                     }
1270                 }
1271             }
1272         }
1273 
1274         /**
1275          * Consumes some items until unavailable or bound or error.
1276          *
1277          * @param s subscriber
1278          * @param d current demand
1279          * @param h current head
1280          * @return number taken
1281          */
1282         final int takeItems(Subscriber<? super T> s, long d, int h) {
1283             Object[] a;
1284             int k = 0, cap;
1285             if ((a = array) != null && (cap = a.length) > 0) {
1286                 int m = cap - 1, b = (m >>> 3) + 1; // min(1, cap/8)
1287                 int n = (d < (long)b) ? (int)d : b;
1288                 for (; k < n; ++h, ++k) {


< prev index next >