< prev index next >
src/java.httpclient/share/classes/java/net/http/AsyncSSLDelegate.java
Print this page
rev 15335 : Async Queues
*** 85,126 ****
*/
class AsyncSSLDelegate implements Closeable, AsyncConnection {
// outgoing buffers put in this queue first and may remain here
// while SSL handshaking happening.
! final Queue<ByteBuffer> appOutputQ;
// queue of wrapped ByteBuffers waiting to be sent on socket channel
//final Queue<ByteBuffer> channelOutputQ;
// Bytes read into this queue before being unwrapped. Backup on this
// Q should only happen when the engine is stalled due to delegated tasks
! final Queue<ByteBuffer> channelInputQ;
// input occurs through the read() method which is expected to be called
// when the selector signals some data is waiting to be read. All incoming
// handshake data is handled in this method, which means some calls to
// read() may return zero bytes of user data. This is not a sign of spinning,
// just that the handshake mechanics are being executed.
final SSLEngine engine;
final SSLParameters sslParameters;
//final SocketChannel chan;
! final HttpConnection lowerOutput;
final HttpClientImpl client;
final ExecutorService executor;
final BufferHandler bufPool;
Consumer<ByteBuffer> receiver;
Consumer<Throwable> errorHandler;
- // Locks.
- final Object reader = new Object();
- final Object writer = new Object();
// synchronizing handshake state
final Object handshaker = new Object();
- // flag set when reader or writer is blocked waiting for handshake to finish
- boolean writerBlocked;
- boolean readerBlocked;
// some thread is currently doing the handshake
boolean handshaking;
// alpn[] may be null. upcall is callback which receives incoming decoded bytes off socket
--- 85,120 ----
*/
class AsyncSSLDelegate implements Closeable, AsyncConnection {
// outgoing buffers put in this queue first and may remain here
// while SSL handshaking happening.
! final AsyncWriteQueue<ByteBuffer[]> appOutputQ;
// queue of wrapped ByteBuffers waiting to be sent on socket channel
//final Queue<ByteBuffer> channelOutputQ;
// Bytes read into this queue before being unwrapped. Backup on this
// Q should only happen when the engine is stalled due to delegated tasks
! final AsyncReadQueue<ByteBuffer> channelInputQ;
// input occurs through the read() method which is expected to be called
// when the selector signals some data is waiting to be read. All incoming
// handshake data is handled in this method, which means some calls to
// read() may return zero bytes of user data. This is not a sign of spinning,
// just that the handshake mechanics are being executed.
final SSLEngine engine;
final SSLParameters sslParameters;
//final SocketChannel chan;
! final AsyncConnection lowerOutput;
final HttpClientImpl client;
final ExecutorService executor;
final BufferHandler bufPool;
Consumer<ByteBuffer> receiver;
Consumer<Throwable> errorHandler;
// synchronizing handshake state
final Object handshaker = new Object();
// some thread is currently doing the handshake
boolean handshaking;
// alpn[] may be null. upcall is callback which receives incoming decoded bytes off socket
*** 128,139 ****
AsyncSSLDelegate(HttpConnection lowerOutput, HttpClientImpl client, String[] alpn)
{
SSLContext context = client.sslContext();
executor = client.executorService();
bufPool = client;
! appOutputQ = new Queue<>();
! appOutputQ.registerPutCallback(this::upperWrite);
//channelOutputQ = new Queue<>();
//channelOutputQ.registerPutCallback(this::lowerWrite);
engine = context.createSSLEngine();
engine.setUseClientMode(true);
SSLParameters sslp = client.sslParameters().orElse(null);
--- 122,132 ----
AsyncSSLDelegate(HttpConnection lowerOutput, HttpClientImpl client, String[] alpn)
{
SSLContext context = client.sslContext();
executor = client.executorService();
bufPool = client;
! appOutputQ = new AsyncWriteQueue<>(this::upperWrite);
//channelOutputQ = new Queue<>();
//channelOutputQ.registerPutCallback(this::lowerWrite);
engine = context.createSSLEngine();
engine.setUseClientMode(true);
SSLParameters sslp = client.sslParameters().orElse(null);
*** 146,244 ****
if (alpn != null) {
sslParameters.setApplicationProtocols(alpn);
}
logParams(sslParameters);
engine.setSSLParameters(sslParameters);
! this.lowerOutput = lowerOutput;
this.client = client;
! this.channelInputQ = new Queue<>();
! this.channelInputQ.registerPutCallback(this::upperRead);
}
/**
* Put buffers to appOutputQ, and call upperWrite() if q was empty.
*
* @param src
*/
! public void write(ByteBuffer[] src) throws IOException {
! appOutputQ.putAll(src);
}
! public void write(ByteBuffer buf) throws IOException {
! ByteBuffer[] a = new ByteBuffer[1];
! a[0] = buf;
! write(a);
}
@Override
public void close() {
! Utils.close(appOutputQ, channelInputQ, lowerOutput);
}
! /**
! * Attempts to wrap buffers from appOutputQ and place them on the
! * channelOutputQ for writing. If handshaking is happening, then the
! * process stalls and last buffers taken off the appOutputQ are put back
! * into it until handshaking completes.
! *
! * This same method is called to try and resume output after a blocking
! * handshaking operation has completed.
! */
! private void upperWrite() {
try {
- EngineResult r = null;
- ByteBuffer[] buffers = appOutputQ.pollAll(Utils.EMPTY_BB_ARRAY);
int bytes = Utils.remaining(buffers);
while (bytes > 0) {
! synchronized (writer) {
! r = wrapBuffers(buffers);
int bytesProduced = r.bytesProduced();
int bytesConsumed = r.bytesConsumed();
bytes -= bytesConsumed;
if (bytesProduced > 0) {
! // pass destination buffer to channelOutputQ.
! lowerOutput.write(r.destBuffer);
}
- synchronized (handshaker) {
if (r.handshaking()) {
! // handshaking is happening or is needed
! // so we put the buffers back on Q to process again
! // later. It's possible that some may have already
! // been processed, which is ok.
! appOutputQ.pushbackAll(buffers);
! writerBlocked = true;
! if (!handshaking()) {
! // execute the handshake in another thread.
! // This method will be called again to resume sending
! // later
doHandshake(r);
- }
return;
}
}
- }
- }
returnBuffers(buffers);
} catch (Throwable t) {
close();
errorHandler.accept(t);
}
}
private void doHandshake(EngineResult r) {
handshaking = true;
! channelInputQ.registerPutCallback(null);
executor.execute(() -> {
try {
doHandshakeImpl(r);
! channelInputQ.registerPutCallback(this::upperRead);
} catch (Throwable t) {
close();
errorHandler.accept(t);
}
});
}
private void returnBuffers(ByteBuffer[] bufs) {
for (ByteBuffer buf : bufs)
client.returnBuffer(buf);
}
--- 139,226 ----
if (alpn != null) {
sslParameters.setApplicationProtocols(alpn);
}
logParams(sslParameters);
engine.setSSLParameters(sslParameters);
! this.lowerOutput = (AsyncConnection)lowerOutput;
this.client = client;
! this.channelInputQ = new AsyncReadQueue<>(this::upperRead,this::combine);
}
/**
* Put buffers to appOutputQ, and call upperWrite() if q was empty.
*
* @param src
*/
! @Override
! public void writeAsync(ByteBuffer[] src) throws IOException {
! appOutputQ.put(src);
! }
!
! @Override
! public void writeAsyncUnordered(ByteBuffer[] buffers) throws IOException {
! appOutputQ.putFirst(buffers);
}
! @Override
! public void flushAsync() {
! if(appOutputQ.flush()) {
! lowerOutput.flushAsync();
! }
}
@Override
public void close() {
! Utils.close( channelInputQ, (HttpConnection)lowerOutput);
}
! private void upperWrite(ByteBuffer[] buffers, Consumer<ByteBuffer[]> setDelayCallback) {
try {
int bytes = Utils.remaining(buffers);
while (bytes > 0) {
! EngineResult r = wrapBuffers(buffers);
int bytesProduced = r.bytesProduced();
int bytesConsumed = r.bytesConsumed();
bytes -= bytesConsumed;
if (bytesProduced > 0) {
! lowerOutput.writeAsync(new ByteBuffer[]{r.destBuffer});
}
if (r.handshaking()) {
! setDelayCallback.accept(buffers);
doHandshake(r);
return;
}
}
returnBuffers(buffers);
} catch (Throwable t) {
close();
errorHandler.accept(t);
}
}
private void doHandshake(EngineResult r) {
+ synchronized (handshaker) {
+ if (!handshaking()) {
+ // execute the handshake in another thread.
+ // This method will be called again to resume sending
+ // later
handshaking = true;
! channelInputQ.setDelayed(null);
executor.execute(() -> {
try {
doHandshakeImpl(r);
! appOutputQ.flushDelayed();
! lowerOutput.flushAsync();
! channelInputQ.setDirect();
} catch (Throwable t) {
close();
errorHandler.accept(t);
}
});
}
+ }
+ }
private void returnBuffers(ByteBuffer[] bufs) {
for (ByteBuffer buf : bufs)
client.returnBuffer(buf);
}
*** 274,302 ****
r = handshakeReceiveAndUnWrap();
}
if (!r.handshaking())
break;
}
- boolean dowrite = false;
- boolean doread = false;
// Handshake is finished. Now resume reading and/or writing
synchronized(handshaker) {
handshaking = false;
- if (writerBlocked) {
- writerBlocked = false;
- dowrite = true;
- }
- if (readerBlocked) {
- readerBlocked = false;
- doread = true;
}
}
- if (dowrite)
- upperWrite();
- if (doread)
- upperRead();
- }
// acknowledge a received CLOSE request from peer
void doClosure() throws IOException {
//while (!wrapAndSend(emptyArray))
//;
--- 256,270 ----
*** 354,364 ****
}
EngineResult handshakeWrapAndSend() throws IOException {
EngineResult r = wrapBuffer(Utils.EMPTY_BYTEBUFFER);
if (r.bytesProduced() > 0) {
! lowerOutput.write(r.destBuffer);
}
return r;
}
// called during handshaking. It blocks until a complete packet
--- 322,333 ----
}
EngineResult handshakeWrapAndSend() throws IOException {
EngineResult r = wrapBuffer(Utils.EMPTY_BYTEBUFFER);
if (r.bytesProduced() > 0) {
! lowerOutput.writeAsync(new ByteBuffer[]{r.destBuffer});
! lowerOutput.flushAsync();
}
return r;
}
// called during handshaking. It blocks until a complete packet
*** 455,524 ****
close();
errorHandler.accept(t);
}
}
! public void upperRead() {
EngineResult r;
! ByteBuffer srcbuf;
! synchronized (reader) {
try {
! srcbuf = channelInputQ.poll();
! if (srcbuf == null) {
! return;
! }
! while (true) {
r = unwrapBuffer(srcbuf);
switch (r.result.getStatus()) {
case BUFFER_UNDERFLOW:
// Buffer too small. Need to combine with next buf
! ByteBuffer nextBuf = channelInputQ.poll();
! if (nextBuf == null) {
! // no data available. push buffer back until more data available
! channelInputQ.pushback(srcbuf);
return;
- } else {
- srcbuf = combine(srcbuf, nextBuf);
- }
- break;
case OK:
// check for any handshaking work
- synchronized (handshaker) {
if (r.handshaking()) {
// handshaking is happening or is needed
// so we put the buffer back on Q to process again
// later.
! channelInputQ.pushback(srcbuf);
! readerBlocked = true;
! if (!handshaking()) {
! // execute the handshake in another thread.
! // This method will be called again to resume sending
! // later
doHandshake(r);
- }
return;
}
- }
ByteBuffer dst = r.destBuffer;
if (dst.hasRemaining()) {
receiver.accept(dst);
}
}
- if (srcbuf.hasRemaining()) {
- continue;
- }
- srcbuf = channelInputQ.poll();
- if (srcbuf == null) {
- return;
- }
}
} catch (Throwable t) {
close();
errorHandler.accept(t);
}
}
- }
/**
* Get a new buffer that is the right size for application buffers.
*
* @return
--- 424,465 ----
close();
errorHandler.accept(t);
}
}
! public void upperRead(ByteBuffer srcbuf, AsyncReadQueue<ByteBuffer> inputQ) {
EngineResult r;
!
try {
! while (srcbuf.hasRemaining()) {
r = unwrapBuffer(srcbuf);
switch (r.result.getStatus()) {
case BUFFER_UNDERFLOW:
// Buffer too small. Need to combine with next buf
! inputQ.pushback(srcbuf);
return;
case OK:
// check for any handshaking work
if (r.handshaking()) {
// handshaking is happening or is needed
// so we put the buffer back on Q to process again
// later.
! inputQ.setDelayed(srcbuf);
doHandshake(r);
return;
}
ByteBuffer dst = r.destBuffer;
if (dst.hasRemaining()) {
receiver.accept(dst);
}
}
}
} catch (Throwable t) {
close();
errorHandler.accept(t);
}
}
/**
* Get a new buffer that is the right size for application buffers.
*
* @return
< prev index next >