70 * /\
71 * ||
72 * "lowerRead" method puts buffers into channelInputQ. It is invoked from
73 * OP_READ events from the selector.
74 *
75 * Whenever handshaking is required, the doHandshaking() method is called
76 * which creates a thread to complete the handshake. It takes over the
77 * channelInputQ from upperRead, and puts outgoing packets on channelOutputQ.
78 * Selector events are delivered to lowerRead and lowerWrite as normal.
79 *
80 * Errors
81 *
82 * Any exception thrown by the engine or channel, causes all Queues to be closed
83 * the channel to be closed, and the error is reported to the user's
84 * Consumer<Throwable>
85 */
86 class AsyncSSLDelegate implements Closeable, AsyncConnection {
87
88 // outgoing buffers put in this queue first and may remain here
89 // while SSL handshaking happening.
90 final Queue<ByteBuffer> appOutputQ;
91
92 // queue of wrapped ByteBuffers waiting to be sent on socket channel
93 //final Queue<ByteBuffer> channelOutputQ;
94
95 // Bytes read into this queue before being unwrapped. Backup on this
96 // Q should only happen when the engine is stalled due to delegated tasks
97 final Queue<ByteBuffer> channelInputQ;
98
99 // input occurs through the read() method which is expected to be called
100 // when the selector signals some data is waiting to be read. All incoming
101 // handshake data is handled in this method, which means some calls to
102 // read() may return zero bytes of user data. This is not a sign of spinning,
103 // just that the handshake mechanics are being executed.
104
105 final SSLEngine engine;
106 final SSLParameters sslParameters;
107 //final SocketChannel chan;
108 final HttpConnection lowerOutput;
109 final HttpClientImpl client;
110 final ExecutorService executor;
111 final BufferHandler bufPool;
112 Consumer<ByteBuffer> receiver;
113 Consumer<Throwable> errorHandler;
114 // Locks.
115 final Object reader = new Object();
116 final Object writer = new Object();
117 // synchronizing handshake state
118 final Object handshaker = new Object();
119 // flag set when reader or writer is blocked waiting for handshake to finish
120 boolean writerBlocked;
121 boolean readerBlocked;
122
123 // some thread is currently doing the handshake
124 boolean handshaking;
125
126 // alpn[] may be null. upcall is callback which receives incoming decoded bytes off socket
127
128 AsyncSSLDelegate(HttpConnection lowerOutput, HttpClientImpl client, String[] alpn)
129 {
130 SSLContext context = client.sslContext();
131 executor = client.executorService();
132 bufPool = client;
133 appOutputQ = new Queue<>();
134 appOutputQ.registerPutCallback(this::upperWrite);
135 //channelOutputQ = new Queue<>();
136 //channelOutputQ.registerPutCallback(this::lowerWrite);
137 engine = context.createSSLEngine();
138 engine.setUseClientMode(true);
139 SSLParameters sslp = client.sslParameters().orElse(null);
140 if (sslp == null) {
141 sslp = context.getSupportedSSLParameters();
142 //sslp = context.getDefaultSSLParameters();
143 //printParams(sslp);
144 }
145 sslParameters = Utils.copySSLParameters(sslp);
146 if (alpn != null) {
147 sslParameters.setApplicationProtocols(alpn);
148 }
149 logParams(sslParameters);
150 engine.setSSLParameters(sslParameters);
151 this.lowerOutput = lowerOutput;
152 this.client = client;
153 this.channelInputQ = new Queue<>();
154 this.channelInputQ.registerPutCallback(this::upperRead);
155 }
156
157 /**
158 * Put buffers to appOutputQ, and call upperWrite() if q was empty.
159 *
160 * @param src
161 */
162 public void write(ByteBuffer[] src) throws IOException {
163 appOutputQ.putAll(src);
164 }
165
166 public void write(ByteBuffer buf) throws IOException {
167 ByteBuffer[] a = new ByteBuffer[1];
168 a[0] = buf;
169 write(a);
170 }
171
172 @Override
173 public void close() {
174 Utils.close(appOutputQ, channelInputQ, lowerOutput);
175 }
176
177 /**
178 * Attempts to wrap buffers from appOutputQ and place them on the
179 * channelOutputQ for writing. If handshaking is happening, then the
180 * process stalls and last buffers taken off the appOutputQ are put back
181 * into it until handshaking completes.
182 *
183 * This same method is called to try and resume output after a blocking
184 * handshaking operation has completed.
185 */
186 private void upperWrite() {
187 try {
188 EngineResult r = null;
189 ByteBuffer[] buffers = appOutputQ.pollAll(Utils.EMPTY_BB_ARRAY);
190 int bytes = Utils.remaining(buffers);
191 while (bytes > 0) {
192 synchronized (writer) {
193 r = wrapBuffers(buffers);
194 int bytesProduced = r.bytesProduced();
195 int bytesConsumed = r.bytesConsumed();
196 bytes -= bytesConsumed;
197 if (bytesProduced > 0) {
198 // pass destination buffer to channelOutputQ.
199 lowerOutput.write(r.destBuffer);
200 }
201 synchronized (handshaker) {
202 if (r.handshaking()) {
203 // handshaking is happening or is needed
204 // so we put the buffers back on Q to process again
205 // later. It's possible that some may have already
206 // been processed, which is ok.
207 appOutputQ.pushbackAll(buffers);
208 writerBlocked = true;
209 if (!handshaking()) {
210 // execute the handshake in another thread.
211 // This method will be called again to resume sending
212 // later
213 doHandshake(r);
214 }
215 return;
216 }
217 }
218 }
219 }
220 returnBuffers(buffers);
221 } catch (Throwable t) {
222 close();
223 errorHandler.accept(t);
224 }
225 }
226
227 private void doHandshake(EngineResult r) {
228 handshaking = true;
229 channelInputQ.registerPutCallback(null);
230 executor.execute(() -> {
231 try {
232 doHandshakeImpl(r);
233 channelInputQ.registerPutCallback(this::upperRead);
234 } catch (Throwable t) {
235 close();
236 errorHandler.accept(t);
237 }
238 });
239 }
240
241 private void returnBuffers(ByteBuffer[] bufs) {
242 for (ByteBuffer buf : bufs)
243 client.returnBuffer(buf);
244 }
245
246 /**
247 * Return true if some thread is currently doing the handshake
248 *
249 * @return
250 */
251 boolean handshaking() {
252 synchronized(handshaker) {
253 return handshaking;
254 }
255 }
256
257 /**
258 * Executes entire handshake in calling thread.
259 * Returns after handshake is completed or error occurs
260 * @param r
261 * @throws IOException
262 */
263 private void doHandshakeImpl(EngineResult r) throws IOException {
264 while (true) {
265 SSLEngineResult.HandshakeStatus status = r.handshakeStatus();
266 if (status == NEED_TASK) {
267 LinkedList<Runnable> tasks = obtainTasks();
268 for (Runnable task : tasks)
269 task.run();
270 r = handshakeWrapAndSend();
271 } else if (status == NEED_WRAP) {
272 r = handshakeWrapAndSend();
273 } else if (status == NEED_UNWRAP) {
274 r = handshakeReceiveAndUnWrap();
275 }
276 if (!r.handshaking())
277 break;
278 }
279 boolean dowrite = false;
280 boolean doread = false;
281 // Handshake is finished. Now resume reading and/or writing
282 synchronized(handshaker) {
283 handshaking = false;
284 if (writerBlocked) {
285 writerBlocked = false;
286 dowrite = true;
287 }
288 if (readerBlocked) {
289 readerBlocked = false;
290 doread = true;
291 }
292 }
293 if (dowrite)
294 upperWrite();
295 if (doread)
296 upperRead();
297 }
298
299 // acknowledge a received CLOSE request from peer
300 void doClosure() throws IOException {
301 //while (!wrapAndSend(emptyArray))
302 //;
303 }
304
305 LinkedList<Runnable> obtainTasks() {
306 LinkedList<Runnable> l = new LinkedList<>();
307 Runnable r;
308 while ((r = engine.getDelegatedTask()) != null)
309 l.add(r);
310 return l;
311 }
312
313 @Override
314 public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver, Consumer<Throwable> errorReceiver) {
315 this.receiver = asyncReceiver;
316 this.errorHandler = errorReceiver;
317 }
339 int bytesProduced() {
340 return result.bytesProduced();
341 }
342
343 Throwable exception() {
344 return t;
345 }
346
347 SSLEngineResult.HandshakeStatus handshakeStatus() {
348 return result.getHandshakeStatus();
349 }
350
351 SSLEngineResult.Status status() {
352 return result.getStatus();
353 }
354 }
355
356 EngineResult handshakeWrapAndSend() throws IOException {
357 EngineResult r = wrapBuffer(Utils.EMPTY_BYTEBUFFER);
358 if (r.bytesProduced() > 0) {
359 lowerOutput.write(r.destBuffer);
360 }
361 return r;
362 }
363
364 // called during handshaking. It blocks until a complete packet
365 // is available, unwraps it and returns.
366 EngineResult handshakeReceiveAndUnWrap() throws IOException {
367 ByteBuffer buf = channelInputQ.take();
368 while (true) {
369 // block waiting for input
370 EngineResult r = unwrapBuffer(buf);
371 SSLEngineResult.Status status = r.status();
372 if (status == BUFFER_UNDERFLOW) {
373 // wait for another buffer to arrive
374 ByteBuffer buf1 = channelInputQ.take();
375 buf = combine (buf, buf1);
376 continue;
377 }
378 // OK
379 // theoretically possible we could receive some user data
440 r.destBuffer = dst;
441 return r;
442 }
443 }
444 }
445
446 /**
447 * Asynchronous read input. Call this when selector fires.
448 * Unwrap done in upperRead because it also happens in
449 * doHandshake() when handshake taking place
450 */
451 public void lowerRead(ByteBuffer buffer) {
452 try {
453 channelInputQ.put(buffer);
454 } catch (Throwable t) {
455 close();
456 errorHandler.accept(t);
457 }
458 }
459
460 public void upperRead() {
461 EngineResult r;
462 ByteBuffer srcbuf;
463 synchronized (reader) {
464 try {
465 srcbuf = channelInputQ.poll();
466 if (srcbuf == null) {
467 return;
468 }
469 while (true) {
470 r = unwrapBuffer(srcbuf);
471 switch (r.result.getStatus()) {
472 case BUFFER_UNDERFLOW:
473 // Buffer too small. Need to combine with next buf
474 ByteBuffer nextBuf = channelInputQ.poll();
475 if (nextBuf == null) {
476 // no data available. push buffer back until more data available
477 channelInputQ.pushback(srcbuf);
478 return;
479 } else {
480 srcbuf = combine(srcbuf, nextBuf);
481 }
482 break;
483 case OK:
484 // check for any handshaking work
485 synchronized (handshaker) {
486 if (r.handshaking()) {
487 // handshaking is happening or is needed
488 // so we put the buffer back on Q to process again
489 // later.
490 channelInputQ.pushback(srcbuf);
491 readerBlocked = true;
492 if (!handshaking()) {
493 // execute the handshake in another thread.
494 // This method will be called again to resume sending
495 // later
496 doHandshake(r);
497 }
498 return;
499 }
500 }
501 ByteBuffer dst = r.destBuffer;
502 if (dst.hasRemaining()) {
503 receiver.accept(dst);
504 }
505 }
506 if (srcbuf.hasRemaining()) {
507 continue;
508 }
509 srcbuf = channelInputQ.poll();
510 if (srcbuf == null) {
511 return;
512 }
513 }
514 } catch (Throwable t) {
515 close();
516 errorHandler.accept(t);
517 }
518 }
519 }
520
521 /**
522 * Get a new buffer that is the right size for application buffers.
523 *
524 * @return
525 */
526 ByteBuffer getApplicationBuffer() {
527 SSLSession session = engine.getSession();
528 int appBufsize = session.getApplicationBufferSize();
529 bufPool.setMinBufferSize(appBufsize);
530 return bufPool.getBuffer(appBufsize);
531 }
532
533 ByteBuffer getPacketBuffer() {
534 SSLSession session = engine.getSession();
535 int packetBufSize = session.getPacketBufferSize();
536 bufPool.setMinBufferSize(packetBufSize);
537 return bufPool.getBuffer(packetBufSize);
538 }
|
70 * /\
71 * ||
72 * "lowerRead" method puts buffers into channelInputQ. It is invoked from
73 * OP_READ events from the selector.
74 *
75 * Whenever handshaking is required, the doHandshaking() method is called
76 * which creates a thread to complete the handshake. It takes over the
77 * channelInputQ from upperRead, and puts outgoing packets on channelOutputQ.
78 * Selector events are delivered to lowerRead and lowerWrite as normal.
79 *
80 * Errors
81 *
82 * Any exception thrown by the engine or channel, causes all Queues to be closed
83 * the channel to be closed, and the error is reported to the user's
84 * Consumer<Throwable>
85 */
86 class AsyncSSLDelegate implements Closeable, AsyncConnection {
87
88 // outgoing buffers put in this queue first and may remain here
89 // while SSL handshaking happening.
90 final AsyncWriteQueue<ByteBuffer[]> appOutputQ;
91
92 // queue of wrapped ByteBuffers waiting to be sent on socket channel
93 //final Queue<ByteBuffer> channelOutputQ;
94
95 // Bytes read into this queue before being unwrapped. Backup on this
96 // Q should only happen when the engine is stalled due to delegated tasks
97 final AsyncReadQueue<ByteBuffer> channelInputQ;
98
99 // input occurs through the read() method which is expected to be called
100 // when the selector signals some data is waiting to be read. All incoming
101 // handshake data is handled in this method, which means some calls to
102 // read() may return zero bytes of user data. This is not a sign of spinning,
103 // just that the handshake mechanics are being executed.
104
105 final SSLEngine engine;
106 final SSLParameters sslParameters;
107 //final SocketChannel chan;
108 final AsyncConnection lowerOutput;
109 final HttpClientImpl client;
110 final ExecutorService executor;
111 final BufferHandler bufPool;
112 Consumer<ByteBuffer> receiver;
113 Consumer<Throwable> errorHandler;
114 // synchronizing handshake state
115 final Object handshaker = new Object();
116
117 // some thread is currently doing the handshake
118 boolean handshaking;
119
120 // alpn[] may be null. upcall is callback which receives incoming decoded bytes off socket
121
122 AsyncSSLDelegate(HttpConnection lowerOutput, HttpClientImpl client, String[] alpn)
123 {
124 SSLContext context = client.sslContext();
125 executor = client.executorService();
126 bufPool = client;
127 appOutputQ = new AsyncWriteQueue<>(this::upperWrite);
128 //channelOutputQ = new Queue<>();
129 //channelOutputQ.registerPutCallback(this::lowerWrite);
130 engine = context.createSSLEngine();
131 engine.setUseClientMode(true);
132 SSLParameters sslp = client.sslParameters().orElse(null);
133 if (sslp == null) {
134 sslp = context.getSupportedSSLParameters();
135 //sslp = context.getDefaultSSLParameters();
136 //printParams(sslp);
137 }
138 sslParameters = Utils.copySSLParameters(sslp);
139 if (alpn != null) {
140 sslParameters.setApplicationProtocols(alpn);
141 }
142 logParams(sslParameters);
143 engine.setSSLParameters(sslParameters);
144 this.lowerOutput = (AsyncConnection)lowerOutput;
145 this.client = client;
146 this.channelInputQ = new AsyncReadQueue<>(this::upperRead,this::combine);
147 }
148
149 /**
150 * Put buffers to appOutputQ, and call upperWrite() if q was empty.
151 *
152 * @param src
153 */
154 @Override
155 public void writeAsync(ByteBuffer[] src) throws IOException {
156 appOutputQ.put(src);
157 }
158
159 @Override
160 public void writeAsyncUnordered(ByteBuffer[] buffers) throws IOException {
161 appOutputQ.putFirst(buffers);
162 }
163
164 @Override
165 public void flushAsync() {
166 if(appOutputQ.flush()) {
167 lowerOutput.flushAsync();
168 }
169 }
170
171 @Override
172 public void close() {
173 Utils.close( channelInputQ, (HttpConnection)lowerOutput);
174 }
175
176 private void upperWrite(ByteBuffer[] buffers, Consumer<ByteBuffer[]> setDelayCallback) {
177 try {
178 int bytes = Utils.remaining(buffers);
179 while (bytes > 0) {
180 EngineResult r = wrapBuffers(buffers);
181 int bytesProduced = r.bytesProduced();
182 int bytesConsumed = r.bytesConsumed();
183 bytes -= bytesConsumed;
184 if (bytesProduced > 0) {
185 lowerOutput.writeAsync(new ByteBuffer[]{r.destBuffer});
186 }
187 if (r.handshaking()) {
188 setDelayCallback.accept(buffers);
189 doHandshake(r);
190 return;
191 }
192 }
193 returnBuffers(buffers);
194 } catch (Throwable t) {
195 close();
196 errorHandler.accept(t);
197 }
198 }
199
200 private void doHandshake(EngineResult r) {
201 synchronized (handshaker) {
202 if (!handshaking()) {
203 // execute the handshake in another thread.
204 // This method will be called again to resume sending
205 // later
206 handshaking = true;
207 channelInputQ.setDelayed(null);
208 executor.execute(() -> {
209 try {
210 doHandshakeImpl(r);
211 appOutputQ.flushDelayed();
212 lowerOutput.flushAsync();
213 channelInputQ.setDirect();
214 } catch (Throwable t) {
215 close();
216 errorHandler.accept(t);
217 }
218 });
219 }
220 }
221 }
222
223 private void returnBuffers(ByteBuffer[] bufs) {
224 for (ByteBuffer buf : bufs)
225 client.returnBuffer(buf);
226 }
227
228 /**
229 * Return true if some thread is currently doing the handshake
230 *
231 * @return
232 */
233 boolean handshaking() {
234 synchronized(handshaker) {
235 return handshaking;
236 }
237 }
238
239 /**
240 * Executes entire handshake in calling thread.
241 * Returns after handshake is completed or error occurs
242 * @param r
243 * @throws IOException
244 */
245 private void doHandshakeImpl(EngineResult r) throws IOException {
246 while (true) {
247 SSLEngineResult.HandshakeStatus status = r.handshakeStatus();
248 if (status == NEED_TASK) {
249 LinkedList<Runnable> tasks = obtainTasks();
250 for (Runnable task : tasks)
251 task.run();
252 r = handshakeWrapAndSend();
253 } else if (status == NEED_WRAP) {
254 r = handshakeWrapAndSend();
255 } else if (status == NEED_UNWRAP) {
256 r = handshakeReceiveAndUnWrap();
257 }
258 if (!r.handshaking())
259 break;
260 }
261 // Handshake is finished. Now resume reading and/or writing
262 synchronized(handshaker) {
263 handshaking = false;
264 }
265 }
266
267 // acknowledge a received CLOSE request from peer
268 void doClosure() throws IOException {
269 //while (!wrapAndSend(emptyArray))
270 //;
271 }
272
273 LinkedList<Runnable> obtainTasks() {
274 LinkedList<Runnable> l = new LinkedList<>();
275 Runnable r;
276 while ((r = engine.getDelegatedTask()) != null)
277 l.add(r);
278 return l;
279 }
280
281 @Override
282 public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver, Consumer<Throwable> errorReceiver) {
283 this.receiver = asyncReceiver;
284 this.errorHandler = errorReceiver;
285 }
307 int bytesProduced() {
308 return result.bytesProduced();
309 }
310
311 Throwable exception() {
312 return t;
313 }
314
315 SSLEngineResult.HandshakeStatus handshakeStatus() {
316 return result.getHandshakeStatus();
317 }
318
319 SSLEngineResult.Status status() {
320 return result.getStatus();
321 }
322 }
323
324 EngineResult handshakeWrapAndSend() throws IOException {
325 EngineResult r = wrapBuffer(Utils.EMPTY_BYTEBUFFER);
326 if (r.bytesProduced() > 0) {
327 lowerOutput.writeAsync(new ByteBuffer[]{r.destBuffer});
328 lowerOutput.flushAsync();
329 }
330 return r;
331 }
332
333 // called during handshaking. It blocks until a complete packet
334 // is available, unwraps it and returns.
335 EngineResult handshakeReceiveAndUnWrap() throws IOException {
336 ByteBuffer buf = channelInputQ.take();
337 while (true) {
338 // block waiting for input
339 EngineResult r = unwrapBuffer(buf);
340 SSLEngineResult.Status status = r.status();
341 if (status == BUFFER_UNDERFLOW) {
342 // wait for another buffer to arrive
343 ByteBuffer buf1 = channelInputQ.take();
344 buf = combine (buf, buf1);
345 continue;
346 }
347 // OK
348 // theoretically possible we could receive some user data
409 r.destBuffer = dst;
410 return r;
411 }
412 }
413 }
414
415 /**
416 * Asynchronous read input. Call this when selector fires.
417 * Unwrap done in upperRead because it also happens in
418 * doHandshake() when handshake taking place
419 */
420 public void lowerRead(ByteBuffer buffer) {
421 try {
422 channelInputQ.put(buffer);
423 } catch (Throwable t) {
424 close();
425 errorHandler.accept(t);
426 }
427 }
428
429 public void upperRead(ByteBuffer srcbuf, AsyncReadQueue<ByteBuffer> inputQ) {
430 EngineResult r;
431
432 try {
433 while (srcbuf.hasRemaining()) {
434 r = unwrapBuffer(srcbuf);
435 switch (r.result.getStatus()) {
436 case BUFFER_UNDERFLOW:
437 // Buffer too small. Need to combine with next buf
438 inputQ.pushback(srcbuf);
439 return;
440 case OK:
441 // check for any handshaking work
442 if (r.handshaking()) {
443 // handshaking is happening or is needed
444 // so we put the buffer back on Q to process again
445 // later.
446 inputQ.setDelayed(srcbuf);
447 doHandshake(r);
448 return;
449 }
450 ByteBuffer dst = r.destBuffer;
451 if (dst.hasRemaining()) {
452 receiver.accept(dst);
453 }
454 }
455 }
456 } catch (Throwable t) {
457 close();
458 errorHandler.accept(t);
459 }
460 }
461
462 /**
463 * Get a new buffer that is the right size for application buffers.
464 *
465 * @return
466 */
467 ByteBuffer getApplicationBuffer() {
468 SSLSession session = engine.getSession();
469 int appBufsize = session.getApplicationBufferSize();
470 bufPool.setMinBufferSize(appBufsize);
471 return bufPool.getBuffer(appBufsize);
472 }
473
474 ByteBuffer getPacketBuffer() {
475 SSLSession session = engine.getSession();
476 int packetBufSize = session.getPacketBufferSize();
477 bufPool.setMinBufferSize(packetBufSize);
478 return bufPool.getBuffer(packetBufSize);
479 }
|