179 @Override
180 public void close() {
181 Utils.close(appOutputQ, channelInputQ, lowerOutput);
182 }
183
184 // The code below can be uncommented to shake out
185 // the implementation by inserting random delays and trigger
186 // handshake in the SelectorManager thread (upperRead)
187 // static final java.util.Random random =
188 // new java.util.Random(System.currentTimeMillis());
189
190 /**
191 * Attempts to wrap buffers from appOutputQ and place them on the
192 * channelOutputQ for writing. If handshaking is happening, then the
193 * process stalls and last buffers taken off the appOutputQ are put back
194 * into it until handshaking completes.
195 *
196 * This same method is called to try and resume output after a blocking
197 * handshaking operation has completed.
198 */
199 private void upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
200 // currently delayCallback is not used. Use it when it's needed to execute handshake in another thread.
201 try {
202 ByteBuffer[] buffers = ByteBufferReference.toBuffers(refs);
203 int bytes = Utils.remaining(buffers);
204 while (bytes > 0) {
205 EngineResult r = wrapBuffers(buffers);
206 int bytesProduced = r.bytesProduced();
207 int bytesConsumed = r.bytesConsumed();
208 bytes -= bytesConsumed;
209 if (bytesProduced > 0) {
210 lowerOutput.writeAsync(new ByteBufferReference[]{r.destBuffer});
211 }
212
213 // The code below can be uncommented to shake out
214 // the implementation by inserting random delays and trigger
215 // handshake in the SelectorManager thread (upperRead)
216
217 // int sleep = random.nextInt(100);
218 // if (sleep > 20) {
219 // Thread.sleep(sleep);
220 // }
221
222 // handshaking is happening or is needed
223 if (r.handshaking()) {
224 Log.logTrace("Write: needs handshake");
225 doHandshakeNow("Write");
226 }
227 }
228 ByteBufferReference.clear(refs);
229 } catch (Throwable t) {
230 closeExceptionally(t);
231 errorHandler.accept(t);
232 }
233 }
234
235 // Connecting at this level means the initial handshake has completed.
236 // This means that the initial SSL parameters are available including
237 // ALPN result.
238 void connect() throws IOException, InterruptedException {
239 doHandshakeNow("Init");
240 connected = true;
241 }
242
243 boolean connected() {
244 return connected;
245 }
246
247 private void startHandshake(String tag) {
248 Runnable run = () -> {
249 try {
250 doHandshakeNow(tag);
251 } catch (Throwable t) {
252 Log.logTrace("{0}: handshake failed: {1}", tag, t);
|
179 @Override
180 public void close() {
181 Utils.close(appOutputQ, channelInputQ, lowerOutput);
182 }
183
184 // The code below can be uncommented to shake out
185 // the implementation by inserting random delays and trigger
186 // handshake in the SelectorManager thread (upperRead)
187 // static final java.util.Random random =
188 // new java.util.Random(System.currentTimeMillis());
189
190 /**
191 * Attempts to wrap buffers from appOutputQ and place them on the
192 * channelOutputQ for writing. If handshaking is happening, then the
193 * process stalls and last buffers taken off the appOutputQ are put back
194 * into it until handshaking completes.
195 *
196 * This same method is called to try and resume output after a blocking
197 * handshaking operation has completed.
198 */
199 private boolean upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
200 // currently delayCallback is not used. Use it when it's needed to execute handshake in another thread.
201 try {
202 ByteBuffer[] buffers = ByteBufferReference.toBuffers(refs);
203 int bytes = Utils.remaining(buffers);
204 while (bytes > 0) {
205 EngineResult r = wrapBuffers(buffers);
206 int bytesProduced = r.bytesProduced();
207 int bytesConsumed = r.bytesConsumed();
208 bytes -= bytesConsumed;
209 if (bytesProduced > 0) {
210 lowerOutput.writeAsync(new ByteBufferReference[]{r.destBuffer});
211 }
212
213 // The code below can be uncommented to shake out
214 // the implementation by inserting random delays and trigger
215 // handshake in the SelectorManager thread (upperRead)
216
217 // int sleep = random.nextInt(100);
218 // if (sleep > 20) {
219 // Thread.sleep(sleep);
220 // }
221
222 // handshaking is happening or is needed
223 if (r.handshaking()) {
224 Log.logTrace("Write: needs handshake");
225 doHandshakeNow("Write");
226 }
227 }
228 ByteBufferReference.clear(refs);
229 } catch (Throwable t) {
230 closeExceptionally(t);
231 errorHandler.accept(t);
232 }
233 // We always return true: either all the data was sent, or
234 // an exception happened and we have closed the queue.
235 return true;
236 }
237
238 // Connecting at this level means the initial handshake has completed.
239 // This means that the initial SSL parameters are available including
240 // ALPN result.
241 void connect() throws IOException, InterruptedException {
242 doHandshakeNow("Init");
243 connected = true;
244 }
245
246 boolean connected() {
247 return connected;
248 }
249
250 private void startHandshake(String tag) {
251 Runnable run = () -> {
252 try {
253 doHandshakeNow(tag);
254 } catch (Throwable t) {
255 Log.logTrace("{0}: handshake failed: {1}", tag, t);
|