305 + "w.onNext current: " + current;
306 assert subscription != null : dbgString()
307 + "w.onNext: subscription is null";
308 current = bufs;
309 tryFlushCurrent(client.isSelectorThread()); // may be in selector thread
310 // For instance in HTTP/2, a received SETTINGS frame might trigger
311 // the sending of a SETTINGS frame in turn which might cause
312 // onNext to be called from within the same selector thread that the
313 // original SETTINGS frames arrived on. If rs is the read-subscriber
314 // and ws is the write-subscriber then the following can occur:
315 // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
316 // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
317 debugState("leaving w.onNext");
318 }
319
320 // Don't use a SequentialScheduler here: rely on onNext() being invoked
321 // sequentially, and not being invoked if there is no demand, request(1).
322 // onNext is usually called from within a user / executor thread.
323 // Initial writing will be performed in that thread. If for some reason,
324 // not all the data can be written, a writeEvent will be registered, and
325 // writing will resume in the the selector manager thread when the
326 // writeEvent is fired.
327 //
328 // If this method is invoked in the selector manager thread (because of
329 // a writeEvent), then the executor will be used to invoke request(1),
330 // ensuring that onNext() won't be invoked from within the selector
331 // thread. If not in the selector manager thread, then request(1) is
332 // invoked directly.
333 void tryFlushCurrent(boolean inSelectorThread) {
334 List<ByteBuffer> bufs = current;
335 if (bufs == null) return;
336 try {
337 assert inSelectorThread == client.isSelectorThread() :
338 "should " + (inSelectorThread ? "" : "not ")
339 + " be in the selector thread";
340 long remaining = Utils.remaining(bufs);
341 if (debug.on()) debug.log("trying to write: %d", remaining);
342 long written = writeAvailable(bufs);
343 if (debug.on()) debug.log("wrote: %d", written);
344 assert written >= 0 : "negative number of bytes written:" + written;
345 assert written <= remaining;
|
305 + "w.onNext current: " + current;
306 assert subscription != null : dbgString()
307 + "w.onNext: subscription is null";
308 current = bufs;
309 tryFlushCurrent(client.isSelectorThread()); // may be in selector thread
310 // For instance in HTTP/2, a received SETTINGS frame might trigger
311 // the sending of a SETTINGS frame in turn which might cause
312 // onNext to be called from within the same selector thread that the
313 // original SETTINGS frames arrived on. If rs is the read-subscriber
314 // and ws is the write-subscriber then the following can occur:
315 // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
316 // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
317 debugState("leaving w.onNext");
318 }
319
320 // Don't use a SequentialScheduler here: rely on onNext() being invoked
321 // sequentially, and not being invoked if there is no demand, request(1).
322 // onNext is usually called from within a user / executor thread.
323 // Initial writing will be performed in that thread. If for some reason,
324 // not all the data can be written, a writeEvent will be registered, and
325 // writing will resume in the selector manager thread when the
326 // writeEvent is fired.
327 //
328 // If this method is invoked in the selector manager thread (because of
329 // a writeEvent), then the executor will be used to invoke request(1),
330 // ensuring that onNext() won't be invoked from within the selector
331 // thread. If not in the selector manager thread, then request(1) is
332 // invoked directly.
333 void tryFlushCurrent(boolean inSelectorThread) {
334 List<ByteBuffer> bufs = current;
335 if (bufs == null) return;
336 try {
337 assert inSelectorThread == client.isSelectorThread() :
338 "should " + (inSelectorThread ? "" : "not ")
339 + " be in the selector thread";
340 long remaining = Utils.remaining(bufs);
341 if (debug.on()) debug.log("trying to write: %d", remaining);
342 long written = writeAvailable(bufs);
343 if (debug.on()) debug.log("wrote: %d", written);
344 assert written >= 0 : "negative number of bytes written:" + written;
345 assert written <= remaining;
|