< prev index next >

src/java.httpclient/share/classes/java/net/http/AsyncSSLDelegate.java

Print this page
rev 15335 : Async Queues

*** 85,126 **** */ class AsyncSSLDelegate implements Closeable, AsyncConnection { // outgoing buffers put in this queue first and may remain here // while SSL handshaking happening. ! final Queue<ByteBuffer> appOutputQ; // queue of wrapped ByteBuffers waiting to be sent on socket channel //final Queue<ByteBuffer> channelOutputQ; // Bytes read into this queue before being unwrapped. Backup on this // Q should only happen when the engine is stalled due to delegated tasks ! final Queue<ByteBuffer> channelInputQ; // input occurs through the read() method which is expected to be called // when the selector signals some data is waiting to be read. All incoming // handshake data is handled in this method, which means some calls to // read() may return zero bytes of user data. This is not a sign of spinning, // just that the handshake mechanics are being executed. final SSLEngine engine; final SSLParameters sslParameters; //final SocketChannel chan; ! final HttpConnection lowerOutput; final HttpClientImpl client; final ExecutorService executor; final BufferHandler bufPool; Consumer<ByteBuffer> receiver; Consumer<Throwable> errorHandler; - // Locks. - final Object reader = new Object(); - final Object writer = new Object(); // synchronizing handshake state final Object handshaker = new Object(); - // flag set when reader or writer is blocked waiting for handshake to finish - boolean writerBlocked; - boolean readerBlocked; // some thread is currently doing the handshake boolean handshaking; // alpn[] may be null. upcall is callback which receives incoming decoded bytes off socket --- 85,120 ---- */ class AsyncSSLDelegate implements Closeable, AsyncConnection { // outgoing buffers put in this queue first and may remain here // while SSL handshaking happening. ! final AsyncWriteQueue<ByteBuffer[]> appOutputQ; // queue of wrapped ByteBuffers waiting to be sent on socket channel //final Queue<ByteBuffer> channelOutputQ; // Bytes read into this queue before being unwrapped. Backup on this // Q should only happen when the engine is stalled due to delegated tasks ! final AsyncReadQueue<ByteBuffer> channelInputQ; // input occurs through the read() method which is expected to be called // when the selector signals some data is waiting to be read. All incoming // handshake data is handled in this method, which means some calls to // read() may return zero bytes of user data. This is not a sign of spinning, // just that the handshake mechanics are being executed. final SSLEngine engine; final SSLParameters sslParameters; //final SocketChannel chan; ! final AsyncConnection lowerOutput; final HttpClientImpl client; final ExecutorService executor; final BufferHandler bufPool; Consumer<ByteBuffer> receiver; Consumer<Throwable> errorHandler; // synchronizing handshake state final Object handshaker = new Object(); // some thread is currently doing the handshake boolean handshaking; // alpn[] may be null. upcall is callback which receives incoming decoded bytes off socket
*** 128,139 **** AsyncSSLDelegate(HttpConnection lowerOutput, HttpClientImpl client, String[] alpn) { SSLContext context = client.sslContext(); executor = client.executorService(); bufPool = client; ! appOutputQ = new Queue<>(); ! appOutputQ.registerPutCallback(this::upperWrite); //channelOutputQ = new Queue<>(); //channelOutputQ.registerPutCallback(this::lowerWrite); engine = context.createSSLEngine(); engine.setUseClientMode(true); SSLParameters sslp = client.sslParameters().orElse(null); --- 122,132 ---- AsyncSSLDelegate(HttpConnection lowerOutput, HttpClientImpl client, String[] alpn) { SSLContext context = client.sslContext(); executor = client.executorService(); bufPool = client; ! appOutputQ = new AsyncWriteQueue<>(this::upperWrite); //channelOutputQ = new Queue<>(); //channelOutputQ.registerPutCallback(this::lowerWrite); engine = context.createSSLEngine(); engine.setUseClientMode(true); SSLParameters sslp = client.sslParameters().orElse(null);
*** 146,244 **** if (alpn != null) { sslParameters.setApplicationProtocols(alpn); } logParams(sslParameters); engine.setSSLParameters(sslParameters); ! this.lowerOutput = lowerOutput; this.client = client; ! this.channelInputQ = new Queue<>(); ! this.channelInputQ.registerPutCallback(this::upperRead); } /** * Put buffers to appOutputQ, and call upperWrite() if q was empty. * * @param src */ ! public void write(ByteBuffer[] src) throws IOException { ! appOutputQ.putAll(src); } ! public void write(ByteBuffer buf) throws IOException { ! ByteBuffer[] a = new ByteBuffer[1]; ! a[0] = buf; ! write(a); } @Override public void close() { ! Utils.close(appOutputQ, channelInputQ, lowerOutput); } ! /** ! * Attempts to wrap buffers from appOutputQ and place them on the ! * channelOutputQ for writing. If handshaking is happening, then the ! * process stalls and last buffers taken off the appOutputQ are put back ! * into it until handshaking completes. ! * ! * This same method is called to try and resume output after a blocking ! * handshaking operation has completed. ! */ ! private void upperWrite() { try { - EngineResult r = null; - ByteBuffer[] buffers = appOutputQ.pollAll(Utils.EMPTY_BB_ARRAY); int bytes = Utils.remaining(buffers); while (bytes > 0) { ! synchronized (writer) { ! r = wrapBuffers(buffers); int bytesProduced = r.bytesProduced(); int bytesConsumed = r.bytesConsumed(); bytes -= bytesConsumed; if (bytesProduced > 0) { ! // pass destination buffer to channelOutputQ. ! lowerOutput.write(r.destBuffer); } - synchronized (handshaker) { if (r.handshaking()) { ! // handshaking is happening or is needed ! // so we put the buffers back on Q to process again ! // later. It's possible that some may have already ! // been processed, which is ok. ! appOutputQ.pushbackAll(buffers); ! writerBlocked = true; ! if (!handshaking()) { ! // execute the handshake in another thread. ! // This method will be called again to resume sending ! // later doHandshake(r); - } return; } } - } - } returnBuffers(buffers); } catch (Throwable t) { close(); errorHandler.accept(t); } } private void doHandshake(EngineResult r) { handshaking = true; ! channelInputQ.registerPutCallback(null); executor.execute(() -> { try { doHandshakeImpl(r); ! channelInputQ.registerPutCallback(this::upperRead); } catch (Throwable t) { close(); errorHandler.accept(t); } }); } private void returnBuffers(ByteBuffer[] bufs) { for (ByteBuffer buf : bufs) client.returnBuffer(buf); } --- 139,226 ---- if (alpn != null) { sslParameters.setApplicationProtocols(alpn); } logParams(sslParameters); engine.setSSLParameters(sslParameters); ! this.lowerOutput = (AsyncConnection)lowerOutput; this.client = client; ! this.channelInputQ = new AsyncReadQueue<>(this::upperRead,this::combine); } /** * Put buffers to appOutputQ, and call upperWrite() if q was empty. * * @param src */ ! @Override ! public void writeAsync(ByteBuffer[] src) throws IOException { ! appOutputQ.put(src); ! } ! ! @Override ! public void writeAsyncUnordered(ByteBuffer[] buffers) throws IOException { ! appOutputQ.putFirst(buffers); } ! @Override ! public void flushAsync() { ! if(appOutputQ.flush()) { ! lowerOutput.flushAsync(); ! } } @Override public void close() { ! Utils.close( channelInputQ, (HttpConnection)lowerOutput); } ! private void upperWrite(ByteBuffer[] buffers, Consumer<ByteBuffer[]> setDelayCallback) { try { int bytes = Utils.remaining(buffers); while (bytes > 0) { ! EngineResult r = wrapBuffers(buffers); int bytesProduced = r.bytesProduced(); int bytesConsumed = r.bytesConsumed(); bytes -= bytesConsumed; if (bytesProduced > 0) { ! lowerOutput.writeAsync(new ByteBuffer[]{r.destBuffer}); } if (r.handshaking()) { ! setDelayCallback.accept(buffers); doHandshake(r); return; } } returnBuffers(buffers); } catch (Throwable t) { close(); errorHandler.accept(t); } } private void doHandshake(EngineResult r) { + synchronized (handshaker) { + if (!handshaking()) { + // execute the handshake in another thread. + // This method will be called again to resume sending + // later handshaking = true; ! channelInputQ.setDelayed(null); executor.execute(() -> { try { doHandshakeImpl(r); ! appOutputQ.flushDelayed(); ! lowerOutput.flushAsync(); ! channelInputQ.setDirect(); } catch (Throwable t) { close(); errorHandler.accept(t); } }); } + } + } private void returnBuffers(ByteBuffer[] bufs) { for (ByteBuffer buf : bufs) client.returnBuffer(buf); }
*** 274,302 **** r = handshakeReceiveAndUnWrap(); } if (!r.handshaking()) break; } - boolean dowrite = false; - boolean doread = false; // Handshake is finished. Now resume reading and/or writing synchronized(handshaker) { handshaking = false; - if (writerBlocked) { - writerBlocked = false; - dowrite = true; - } - if (readerBlocked) { - readerBlocked = false; - doread = true; } } - if (dowrite) - upperWrite(); - if (doread) - upperRead(); - } // acknowledge a received CLOSE request from peer void doClosure() throws IOException { //while (!wrapAndSend(emptyArray)) //; --- 256,270 ----
*** 354,364 **** } EngineResult handshakeWrapAndSend() throws IOException { EngineResult r = wrapBuffer(Utils.EMPTY_BYTEBUFFER); if (r.bytesProduced() > 0) { ! lowerOutput.write(r.destBuffer); } return r; } // called during handshaking. It blocks until a complete packet --- 322,333 ---- } EngineResult handshakeWrapAndSend() throws IOException { EngineResult r = wrapBuffer(Utils.EMPTY_BYTEBUFFER); if (r.bytesProduced() > 0) { ! lowerOutput.writeAsync(new ByteBuffer[]{r.destBuffer}); ! lowerOutput.flushAsync(); } return r; } // called during handshaking. It blocks until a complete packet
*** 455,524 **** close(); errorHandler.accept(t); } } ! public void upperRead() { EngineResult r; ! ByteBuffer srcbuf; ! synchronized (reader) { try { ! srcbuf = channelInputQ.poll(); ! if (srcbuf == null) { ! return; ! } ! while (true) { r = unwrapBuffer(srcbuf); switch (r.result.getStatus()) { case BUFFER_UNDERFLOW: // Buffer too small. Need to combine with next buf ! ByteBuffer nextBuf = channelInputQ.poll(); ! if (nextBuf == null) { ! // no data available. push buffer back until more data available ! channelInputQ.pushback(srcbuf); return; - } else { - srcbuf = combine(srcbuf, nextBuf); - } - break; case OK: // check for any handshaking work - synchronized (handshaker) { if (r.handshaking()) { // handshaking is happening or is needed // so we put the buffer back on Q to process again // later. ! channelInputQ.pushback(srcbuf); ! readerBlocked = true; ! if (!handshaking()) { ! // execute the handshake in another thread. ! // This method will be called again to resume sending ! // later doHandshake(r); - } return; } - } ByteBuffer dst = r.destBuffer; if (dst.hasRemaining()) { receiver.accept(dst); } } - if (srcbuf.hasRemaining()) { - continue; - } - srcbuf = channelInputQ.poll(); - if (srcbuf == null) { - return; - } } } catch (Throwable t) { close(); errorHandler.accept(t); } } - } /** * Get a new buffer that is the right size for application buffers. * * @return --- 424,465 ---- close(); errorHandler.accept(t); } } ! public void upperRead(ByteBuffer srcbuf, AsyncReadQueue<ByteBuffer> inputQ) { EngineResult r; ! try { ! while (srcbuf.hasRemaining()) { r = unwrapBuffer(srcbuf); switch (r.result.getStatus()) { case BUFFER_UNDERFLOW: // Buffer too small. Need to combine with next buf ! inputQ.pushback(srcbuf); return; case OK: // check for any handshaking work if (r.handshaking()) { // handshaking is happening or is needed // so we put the buffer back on Q to process again // later. ! inputQ.setDelayed(srcbuf); doHandshake(r); return; } ByteBuffer dst = r.destBuffer; if (dst.hasRemaining()) { receiver.accept(dst); } } } } catch (Throwable t) { close(); errorHandler.accept(t); } } /** * Get a new buffer that is the right size for application buffers. * * @return
< prev index next >