1 /* 2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http; 27 28 import java.io.IOException; 29 import java.nio.ByteBuffer; 30 import java.util.ArrayList; 31 import java.util.Arrays; 32 import java.util.List; 33 import java.util.concurrent.Semaphore; 34 import java.util.concurrent.CompletableFuture; 35 import java.util.function.Consumer; 36 import java.util.function.Supplier; 37 38 import static javax.net.ssl.SSLEngineResult.Status.*; 39 import javax.net.ssl.*; 40 41 import jdk.incubator.http.internal.common.AsyncWriteQueue; 42 import jdk.incubator.http.internal.common.ByteBufferPool; 43 import jdk.incubator.http.internal.common.ByteBufferReference; 44 import jdk.incubator.http.internal.common.Log; 45 import jdk.incubator.http.internal.common.Queue; 46 import jdk.incubator.http.internal.common.Utils; 47 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*; 48 import jdk.incubator.http.internal.common.ExceptionallyCloseable; 49 50 /** 51 * Asynchronous wrapper around SSLEngine. send and receive is fully non 52 * blocking. When handshaking is required, a thread is created to perform 53 * the handshake and application level sends do not take place during this time. 54 * 55 * Is implemented using queues and functions operating on the receiving end 56 * of each queue. 57 * 58 * Application writes to: 59 * || 60 * \/ 61 * appOutputQ 62 * || 63 * \/ 64 * appOutputQ read by "upperWrite" method which does SSLEngine.wrap 65 * and does async write to PlainHttpConnection 66 * 67 * Reading side is as follows 68 * -------------------------- 69 * 70 * "upperRead" method reads off channelInputQ and calls SSLEngine.unwrap and 71 * when decrypted data is returned, it is passed to the user's Consumer<ByteBuffer> 72 * /\ 73 * || 74 * channelInputQ 75 * /\ 76 * || 77 * "asyncReceive" method puts buffers into channelInputQ. It is invoked from 78 * OP_READ events from the selector. 79 * 80 * Whenever handshaking is required, the doHandshaking() method is called 81 * which creates a thread to complete the handshake. It takes over the 82 * channelInputQ from upperRead, and puts outgoing packets on channelOutputQ. 83 * Selector events are delivered to asyncReceive and lowerWrite as normal. 84 * 85 * Errors 86 * 87 * Any exception thrown by the engine or channel, causes all Queues to be closed 88 * the channel to be closed, and the error is reported to the user's 89 * Consumer<Throwable> 90 */ 91 class AsyncSSLDelegate implements ExceptionallyCloseable, AsyncConnection { 92 93 // outgoing buffers put in this queue first and may remain here 94 // while SSL handshaking happening. 95 final AsyncWriteQueue appOutputQ = new AsyncWriteQueue(this::upperWrite); 96 97 // Bytes read into this queue before being unwrapped. Backup on this 98 // Q should only happen when the engine is stalled due to delegated tasks 99 final Queue<ByteBufferReference> channelInputQ; 100 101 // input occurs through the read() method which is expected to be called 102 // when the selector signals some data is waiting to be read. All incoming 103 // handshake data is handled in this method, which means some calls to 104 // read() may return zero bytes of user data. This is not a sign of spinning, 105 // just that the handshake mechanics are being executed. 106 107 final SSLEngine engine; 108 final SSLParameters sslParameters; 109 final HttpConnection lowerOutput; 110 final HttpClientImpl client; 111 final String serverName; 112 // should be volatile to provide proper synchronization(visibility) action 113 volatile Consumer<ByteBufferReference> asyncReceiver; 114 volatile Consumer<Throwable> errorHandler; 115 volatile boolean connected = false; 116 117 // Locks. 118 final Object reader = new Object(); 119 // synchronizing handshake state 120 final Semaphore handshaker = new Semaphore(1); 121 final String[] alpn; 122 123 // alpn[] may be null. upcall is callback which receives incoming decoded bytes off socket 124 125 AsyncSSLDelegate(HttpConnection lowerOutput, HttpClientImpl client, String[] alpn, String sname) 126 { 127 SSLContext context = client.sslContext(); 128 this.serverName = sname; 129 engine = context.createSSLEngine(); 130 engine.setUseClientMode(true); 131 SSLParameters sslp = client.sslParameters() 132 .orElseGet(context::getSupportedSSLParameters); 133 sslParameters = Utils.copySSLParameters(sslp); 134 if (alpn != null) { 135 Log.logSSL("AsyncSSLDelegate: Setting application protocols: " + Arrays.toString(alpn)); 136 sslParameters.setApplicationProtocols(alpn); 137 } else { 138 Log.logSSL("AsyncSSLDelegate: no applications set!"); 139 } 140 if (serverName != null) { 141 SNIHostName sn = new SNIHostName(serverName); 142 sslParameters.setServerNames(List.of(sn)); 143 } 144 logParams(sslParameters); 145 engine.setSSLParameters(sslParameters); 146 this.lowerOutput = lowerOutput; 147 this.client = client; 148 this.channelInputQ = new Queue<>(); 149 this.channelInputQ.registerPutCallback(this::upperRead); 150 this.alpn = alpn; 151 } 152 153 @Override 154 public void writeAsync(ByteBufferReference[] src) throws IOException { 155 appOutputQ.put(src); 156 } 157 158 @Override 159 public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException { 160 appOutputQ.putFirst(buffers); 161 } 162 163 @Override 164 public void flushAsync() throws IOException { 165 if (appOutputQ.flush()) { 166 lowerOutput.flushAsync(); 167 } 168 } 169 170 SSLEngine getEngine() { 171 return engine; 172 } 173 174 @Override 175 public void closeExceptionally(Throwable t) { 176 Utils.close(t, appOutputQ, channelInputQ, lowerOutput); 177 } 178 179 @Override 180 public void close() { 181 Utils.close(appOutputQ, channelInputQ, lowerOutput); 182 } 183 184 // The code below can be uncommented to shake out 185 // the implementation by inserting random delays and trigger 186 // handshake in the SelectorManager thread (upperRead) 187 // static final java.util.Random random = 188 // new java.util.Random(System.currentTimeMillis()); 189 190 /** 191 * Attempts to wrap buffers from appOutputQ and place them on the 192 * channelOutputQ for writing. If handshaking is happening, then the 193 * process stalls and last buffers taken off the appOutputQ are put back 194 * into it until handshaking completes. 195 * 196 * This same method is called to try and resume output after a blocking 197 * handshaking operation has completed. 198 */ 199 private boolean upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { 200 // currently delayCallback is not used. Use it when it's needed to execute handshake in another thread. 201 try { 202 ByteBuffer[] buffers = ByteBufferReference.toBuffers(refs); 203 int bytes = Utils.remaining(buffers); 204 while (bytes > 0) { 205 EngineResult r = wrapBuffers(buffers); 206 int bytesProduced = r.bytesProduced(); 207 int bytesConsumed = r.bytesConsumed(); 208 bytes -= bytesConsumed; 209 if (bytesProduced > 0) { 210 lowerOutput.writeAsync(new ByteBufferReference[]{r.destBuffer}); 211 } 212 213 // The code below can be uncommented to shake out 214 // the implementation by inserting random delays and trigger 215 // handshake in the SelectorManager thread (upperRead) 216 217 // int sleep = random.nextInt(100); 218 // if (sleep > 20) { 219 // Thread.sleep(sleep); 220 // } 221 222 // handshaking is happening or is needed 223 if (r.handshaking()) { 224 Log.logTrace("Write: needs handshake"); 225 doHandshakeNow("Write"); 226 } 227 } 228 ByteBufferReference.clear(refs); 229 } catch (Throwable t) { 230 closeExceptionally(t); 231 errorHandler.accept(t); 232 } 233 // We always return true: either all the data was sent, or 234 // an exception happened and we have closed the queue. 235 return true; 236 } 237 238 // Connecting at this level means the initial handshake has completed. 239 // This means that the initial SSL parameters are available including 240 // ALPN result. 241 void connect() throws IOException, InterruptedException { 242 doHandshakeNow("Init"); 243 connected = true; 244 } 245 246 boolean connected() { 247 return connected; 248 } 249 250 private void startHandshake(String tag) { 251 Runnable run = () -> { 252 try { 253 doHandshakeNow(tag); 254 } catch (Throwable t) { 255 Log.logTrace("{0}: handshake failed: {1}", tag, t); 256 closeExceptionally(t); 257 errorHandler.accept(t); 258 } 259 }; 260 client.executor().execute(run); 261 } 262 263 private void doHandshakeNow(String tag) 264 throws IOException, InterruptedException 265 { 266 handshaker.acquire(); 267 try { 268 channelInputQ.disableCallback(); 269 lowerOutput.flushAsync(); 270 Log.logTrace("{0}: Starting handshake...", tag); 271 doHandshakeImpl(); 272 Log.logTrace("{0}: Handshake completed", tag); 273 // don't unblock the channel here, as we aren't sure yet, whether ALPN 274 // negotiation succeeded. Caller will call enableCallback() externally 275 } finally { 276 handshaker.release(); 277 } 278 } 279 280 public void enableCallback() { 281 channelInputQ.enableCallback(); 282 } 283 284 /** 285 * Executes entire handshake in calling thread. 286 * Returns after handshake is completed or error occurs 287 */ 288 private void doHandshakeImpl() throws IOException { 289 engine.beginHandshake(); 290 while (true) { 291 SSLEngineResult.HandshakeStatus status = engine.getHandshakeStatus(); 292 switch(status) { 293 case NEED_TASK: { 294 List<Runnable> tasks = obtainTasks(); 295 for (Runnable task : tasks) { 296 task.run(); 297 } 298 } break; 299 case NEED_WRAP: 300 handshakeWrapAndSend(); 301 break; 302 case NEED_UNWRAP: case NEED_UNWRAP_AGAIN: 303 handshakeReceiveAndUnWrap(); 304 break; 305 case FINISHED: 306 return; 307 case NOT_HANDSHAKING: 308 return; 309 default: 310 throw new InternalError("Unexpected Handshake Status: " 311 + status); 312 } 313 } 314 } 315 316 // acknowledge a received CLOSE request from peer 317 void doClosure() throws IOException { 318 //while (!wrapAndSend(emptyArray)) 319 //; 320 } 321 322 List<Runnable> obtainTasks() { 323 List<Runnable> l = new ArrayList<>(); 324 Runnable r; 325 while ((r = engine.getDelegatedTask()) != null) { 326 l.add(r); 327 } 328 return l; 329 } 330 331 @Override 332 public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver, 333 Consumer<Throwable> errorReceiver, 334 Supplier<ByteBufferReference> readBufferSupplier) { 335 this.asyncReceiver = asyncReceiver; 336 this.errorHandler = errorReceiver; 337 // readBufferSupplier is not used, 338 // because of AsyncSSLDelegate has its own appBufferPool 339 } 340 341 @Override 342 public void startReading() { 343 // maybe this class does not need to implement AsyncConnection 344 } 345 346 @Override 347 public void stopAsyncReading() { 348 // maybe this class does not need to implement AsyncConnection 349 } 350 351 352 static class EngineResult { 353 final SSLEngineResult result; 354 final ByteBufferReference destBuffer; 355 356 357 // normal result 358 EngineResult(SSLEngineResult result) { 359 this(result, null); 360 } 361 362 EngineResult(SSLEngineResult result, ByteBufferReference destBuffer) { 363 this.result = result; 364 this.destBuffer = destBuffer; 365 } 366 367 boolean handshaking() { 368 SSLEngineResult.HandshakeStatus s = result.getHandshakeStatus(); 369 return s != FINISHED && s != NOT_HANDSHAKING; 370 } 371 372 int bytesConsumed() { 373 return result.bytesConsumed(); 374 } 375 376 int bytesProduced() { 377 return result.bytesProduced(); 378 } 379 380 SSLEngineResult.HandshakeStatus handshakeStatus() { 381 return result.getHandshakeStatus(); 382 } 383 384 SSLEngineResult.Status status() { 385 return result.getStatus(); 386 } 387 } 388 389 EngineResult handshakeWrapAndSend() throws IOException { 390 EngineResult r = wrapBuffer(Utils.EMPTY_BYTEBUFFER); 391 if (r.bytesProduced() > 0) { 392 lowerOutput.writeAsync(new ByteBufferReference[]{r.destBuffer}); 393 lowerOutput.flushAsync(); 394 } 395 return r; 396 } 397 398 // called during handshaking. It blocks until a complete packet 399 // is available, unwraps it and returns. 400 void handshakeReceiveAndUnWrap() throws IOException { 401 ByteBufferReference ref = channelInputQ.take(); 402 while (true) { 403 // block waiting for input 404 EngineResult r = unwrapBuffer(ref.get()); 405 SSLEngineResult.Status status = r.status(); 406 if (status == BUFFER_UNDERFLOW) { 407 // wait for another buffer to arrive 408 ByteBufferReference ref1 = channelInputQ.take(); 409 ref = combine (ref, ref1); 410 continue; 411 } 412 // OK 413 // theoretically possible we could receive some user data 414 if (r.bytesProduced() > 0) { 415 asyncReceiver.accept(r.destBuffer); 416 } else { 417 r.destBuffer.clear(); 418 } 419 // it is also possible that a delegated task could be needed 420 // even though they are handled in the calling function 421 if (r.handshakeStatus() == NEED_TASK) { 422 obtainTasks().stream().forEach((task) -> task.run()); 423 } 424 425 if (!ref.get().hasRemaining()) { 426 ref.clear(); 427 return; 428 } 429 } 430 } 431 432 EngineResult wrapBuffer(ByteBuffer src) throws SSLException { 433 ByteBuffer[] bufs = new ByteBuffer[1]; 434 bufs[0] = src; 435 return wrapBuffers(bufs); 436 } 437 438 private final ByteBufferPool netBufferPool = new ByteBufferPool(); 439 private final ByteBufferPool appBufferPool = new ByteBufferPool(); 440 441 /** 442 * provides buffer of sslEngine@getPacketBufferSize(). 443 * used for encrypted buffers after wrap or before unwrap. 444 * @return ByteBufferReference 445 */ 446 public ByteBufferReference getNetBuffer() { 447 return netBufferPool.get(engine.getSession().getPacketBufferSize()); 448 } 449 450 /** 451 * provides buffer of sslEngine@getApplicationBufferSize(). 452 * @return ByteBufferReference 453 */ 454 private ByteBufferReference getAppBuffer() { 455 return appBufferPool.get(engine.getSession().getApplicationBufferSize()); 456 } 457 458 EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { 459 ByteBufferReference dst = getNetBuffer(); 460 while (true) { 461 SSLEngineResult sslResult = engine.wrap(src, dst.get()); 462 switch (sslResult.getStatus()) { 463 case BUFFER_OVERFLOW: 464 // Shouldn't happen. We allocated buffer with packet size 465 // get it again if net buffer size was changed 466 dst = getNetBuffer(); 467 break; 468 case CLOSED: 469 case OK: 470 dst.get().flip(); 471 return new EngineResult(sslResult, dst); 472 case BUFFER_UNDERFLOW: 473 // Shouldn't happen. Doesn't returns when wrap() 474 // underflow handled externally 475 return new EngineResult(sslResult); 476 default: 477 assert false; 478 } 479 } 480 } 481 482 EngineResult unwrapBuffer(ByteBuffer srcbuf) throws IOException { 483 ByteBufferReference dst = getAppBuffer(); 484 while (true) { 485 SSLEngineResult sslResult = engine.unwrap(srcbuf, dst.get()); 486 switch (sslResult.getStatus()) { 487 case BUFFER_OVERFLOW: 488 // may happen only if app size buffer was changed. 489 // get it again if app buffer size changed 490 dst = getAppBuffer(); 491 break; 492 case CLOSED: 493 doClosure(); 494 throw new IOException("Engine closed"); 495 case BUFFER_UNDERFLOW: 496 dst.clear(); 497 return new EngineResult(sslResult); 498 case OK: 499 dst.get().flip(); 500 return new EngineResult(sslResult, dst); 501 } 502 } 503 } 504 505 /** 506 * Asynchronous read input. Call this when selector fires. 507 * Unwrap done in upperRead because it also happens in 508 * doHandshake() when handshake taking place 509 */ 510 public void asyncReceive(ByteBufferReference buffer) { 511 try { 512 channelInputQ.put(buffer); 513 } catch (Throwable t) { 514 closeExceptionally(t); 515 errorHandler.accept(t); 516 } 517 } 518 519 private ByteBufferReference pollInput() throws IOException { 520 return channelInputQ.poll(); 521 } 522 523 private ByteBufferReference pollInput(ByteBufferReference next) throws IOException { 524 return next == null ? channelInputQ.poll() : next; 525 } 526 527 public void upperRead() { 528 ByteBufferReference src; 529 ByteBufferReference next = null; 530 synchronized (reader) { 531 try { 532 src = pollInput(); 533 if (src == null) { 534 return; 535 } 536 while (true) { 537 EngineResult r = unwrapBuffer(src.get()); 538 switch (r.result.getStatus()) { 539 case BUFFER_UNDERFLOW: 540 // Buffer too small. Need to combine with next buf 541 next = pollInput(next); 542 if (next == null) { 543 // no data available. 544 // push buffer back until more data available 545 channelInputQ.pushback(src); 546 return; 547 } else { 548 src = shift(src, next); 549 if (!next.get().hasRemaining()) { 550 next.clear(); 551 next = null; 552 } 553 } 554 break; 555 case OK: 556 // check for any handshaking work 557 if (r.handshaking()) { 558 // handshaking is happening or is needed 559 // so we put the buffer back on Q to process again 560 // later. 561 Log.logTrace("Read: needs handshake"); 562 channelInputQ.pushback(src); 563 startHandshake("Read"); 564 return; 565 } 566 asyncReceiver.accept(r.destBuffer); 567 } 568 if (src.get().hasRemaining()) { 569 continue; 570 } 571 src.clear(); 572 src = pollInput(next); 573 next = null; 574 if (src == null) { 575 return; 576 } 577 } 578 } catch (Throwable t) { 579 closeExceptionally(t); 580 errorHandler.accept(t); 581 } 582 } 583 } 584 585 ByteBufferReference shift(ByteBufferReference ref1, ByteBufferReference ref2) { 586 ByteBuffer buf1 = ref1.get(); 587 if (buf1.capacity() < engine.getSession().getPacketBufferSize()) { 588 ByteBufferReference newRef = getNetBuffer(); 589 ByteBuffer newBuf = newRef.get(); 590 newBuf.put(buf1); 591 buf1 = newBuf; 592 ref1.clear(); 593 ref1 = newRef; 594 } else { 595 buf1.compact(); 596 } 597 ByteBuffer buf2 = ref2.get(); 598 Utils.copy(buf2, buf1, Math.min(buf1.remaining(), buf2.remaining())); 599 buf1.flip(); 600 return ref1; 601 } 602 603 604 ByteBufferReference combine(ByteBufferReference ref1, ByteBufferReference ref2) { 605 ByteBuffer buf1 = ref1.get(); 606 ByteBuffer buf2 = ref2.get(); 607 int avail1 = buf1.capacity() - buf1.remaining(); 608 if (buf2.remaining() < avail1) { 609 buf1.compact(); 610 buf1.put(buf2); 611 buf1.flip(); 612 ref2.clear(); 613 return ref1; 614 } 615 int newsize = buf1.remaining() + buf2.remaining(); 616 ByteBuffer newbuf = ByteBuffer.allocate(newsize); // getting rid of buffer pools 617 newbuf.put(buf1); 618 newbuf.put(buf2); 619 newbuf.flip(); 620 ref1.clear(); 621 ref2.clear(); 622 return ByteBufferReference.of(newbuf); 623 } 624 625 SSLParameters getSSLParameters() { 626 return sslParameters; 627 } 628 629 static void logParams(SSLParameters p) { 630 if (!Log.ssl()) { 631 return; 632 } 633 634 if (p == null) { 635 Log.logSSL("SSLParameters: Null params"); 636 return; 637 } 638 639 final StringBuilder sb = new StringBuilder("SSLParameters:"); 640 final List<Object> params = new ArrayList<>(); 641 if (p.getCipherSuites() != null) { 642 for (String cipher : p.getCipherSuites()) { 643 sb.append("\n cipher: {") 644 .append(params.size()).append("}"); 645 params.add(cipher); 646 } 647 } 648 649 // SSLParameters.getApplicationProtocols() can't return null 650 // JDK 8 EXCL START 651 for (String approto : p.getApplicationProtocols()) { 652 sb.append("\n application protocol: {") 653 .append(params.size()).append("}"); 654 params.add(approto); 655 } 656 // JDK 8 EXCL END 657 658 if (p.getProtocols() != null) { 659 for (String protocol : p.getProtocols()) { 660 sb.append("\n protocol: {") 661 .append(params.size()).append("}"); 662 params.add(protocol); 663 } 664 } 665 666 if (p.getServerNames() != null) { 667 for (SNIServerName sname : p.getServerNames()) { 668 sb.append("\n server name: {") 669 .append(params.size()).append("}"); 670 params.add(sname.toString()); 671 } 672 } 673 sb.append('\n'); 674 675 Log.logSSL(sb.toString(), params.toArray()); 676 } 677 678 String getSessionInfo() { 679 StringBuilder sb = new StringBuilder(); 680 String application = engine.getApplicationProtocol(); 681 SSLSession sess = engine.getSession(); 682 String cipher = sess.getCipherSuite(); 683 String protocol = sess.getProtocol(); 684 sb.append("Handshake complete alpn: ") 685 .append(application) 686 .append(", Cipher: ") 687 .append(cipher) 688 .append(", Protocol: ") 689 .append(protocol); 690 return sb.toString(); 691 } 692 }