1 /* 2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.incubator.http.internal.common; 27 28 import java.io.IOException; 29 import java.lang.System.Logger.Level; 30 import java.nio.ByteBuffer; 31 import java.util.concurrent.Executor; 32 import java.util.concurrent.Flow; 33 import java.util.concurrent.Flow.Subscriber; 34 import java.util.List; 35 import java.util.ArrayList; 36 import java.util.Collections; 37 import java.util.Iterator; 38 import java.util.LinkedList; 39 import java.util.concurrent.CompletableFuture; 40 import java.util.concurrent.ConcurrentLinkedQueue; 41 import java.util.concurrent.atomic.AtomicInteger; 42 import javax.net.ssl.SSLEngine; 43 import javax.net.ssl.SSLEngineResult; 44 import javax.net.ssl.SSLEngineResult.HandshakeStatus; 45 import javax.net.ssl.SSLEngineResult.Status; 46 import javax.net.ssl.SSLException; 47 import jdk.incubator.http.internal.common.SubscriberWrapper.SchedulingAction; 48 49 /** 50 * Implements SSL using two SubscriberWrappers. 51 * 52 * <p> Constructor takes two Flow.Subscribers: one that receives the network 53 * data (after it has been encrypted by SSLFlowDelegate) data, and one that 54 * receives the application data (before it has been encrypted by SSLFlowDelegate). 55 * 56 * <p> Methods upstreamReader() and upstreamWriter() return the corresponding 57 * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data. 58 * See diagram below. 59 * 60 * <p> How Flow.Subscribers are used in this class, and where they come from: 61 * <pre> 62 * {@code 63 * 64 * 65 * 66 * ---------> data flow direction 67 * 68 * 69 * +------------------+ 70 * upstreamWriter | | downWriter 71 * ---------------> | | ------------> 72 * obtained from this | | supplied to constructor 73 * | SSLFlowDelegate | 74 * downReader | | upstreamReader 75 * <--------------- | | <-------------- 76 * supplied to constructor | | obtained from this 77 * +------------------+ 78 * } 79 * </pre> 80 */ 81 public class SSLFlowDelegate { 82 83 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. 84 final System.Logger debug = 85 Utils.getDebugLogger(this::dbgString, DEBUG); 86 87 final Executor exec; 88 final Reader reader; 89 final Writer writer; 90 final SSLEngine engine; 91 final String tubeName; // hack 92 final CompletableFuture<Void> cf; 93 final CompletableFuture<String> alpnCF; // completes on initial handshake 94 final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; 95 volatile boolean close_notify_received; 96 97 /** 98 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each 99 * Flow.Subscriber requires an associated {@link CompletableFuture} 100 * for errors that need to be signaled from downstream to upstream. 101 */ 102 public SSLFlowDelegate(SSLEngine engine, 103 Executor exec, 104 Subscriber<? super List<ByteBuffer>> downReader, 105 Subscriber<? super List<ByteBuffer>> downWriter) 106 { 107 this.tubeName = String.valueOf(downWriter); 108 this.reader = new Reader(); 109 this.writer = new Writer(); 110 this.engine = engine; 111 this.exec = exec; 112 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); 113 this.cf = CompletableFuture.allOf(reader.completion(), writer.completion()) 114 .thenRun(this::normalStop); 115 this.alpnCF = new MinimalFuture<>(); 116 117 // connect the Reader to the downReader and the 118 // Writer to the downWriter. 119 connect(downReader, downWriter); 120 121 //Monitor.add(this::monitor); 122 } 123 124 /** 125 * Returns true if the SSLFlowDelegate has detected a TLS 126 * close_notify from the server. 127 * @return true, if a close_notify was detected. 128 */ 129 public boolean closeNotifyReceived() { 130 return close_notify_received; 131 } 132 133 /** 134 * Connects the read sink (downReader) to the SSLFlowDelegate Reader, 135 * and the write sink (downWriter) to the SSLFlowDelegate Writer. 136 * Called from within the constructor. Overwritten by SSLTube. 137 * 138 * @param downReader The left hand side read sink (typically, the 139 * HttpConnection read subscriber). 140 * @param downWriter The right hand side write sink (typically 141 * the SocketTube write subscriber). 142 */ 143 void connect(Subscriber<? super List<ByteBuffer>> downReader, 144 Subscriber<? super List<ByteBuffer>> downWriter) { 145 this.reader.subscribe(downReader); 146 this.writer.subscribe(downWriter); 147 } 148 149 /** 150 * Returns a CompletableFuture<String> which completes after 151 * the initial handshake completes, and which contains the negotiated 152 * alpn. 153 */ 154 public CompletableFuture<String> alpn() { 155 return alpnCF; 156 } 157 158 private void setALPN() { 159 // Handshake is finished. So, can retrieve the ALPN now 160 if (alpnCF.isDone()) 161 return; 162 String alpn = engine.getApplicationProtocol(); 163 debug.log(Level.DEBUG, "setALPN = %s", alpn); 164 alpnCF.complete(alpn); 165 } 166 167 public String monitor() { 168 StringBuilder sb = new StringBuilder(); 169 sb.append("SSL: HS state: " + states(handshakeState)); 170 sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); 171 sb.append(" LL : "); 172 synchronized(stateList) { 173 for (String s: stateList) { 174 sb.append(s).append(" "); 175 } 176 } 177 sb.append("\r\n"); 178 sb.append("Reader:: ").append(reader.toString()); 179 sb.append("\r\n"); 180 sb.append("Writer:: ").append(writer.toString()); 181 sb.append("\r\n==================================="); 182 return sb.toString(); 183 } 184 185 protected SchedulingAction enterReadScheduling() { 186 return SchedulingAction.CONTINUE; 187 } 188 189 190 /** 191 * Processing function for incoming data. Pass it thru SSLEngine.unwrap(). 192 * Any decrypted buffers returned to be passed downstream. 193 * Status codes: 194 * NEED_UNWRAP: do nothing. Following incoming data will contain 195 * any required handshake data 196 * NEED_WRAP: call writer.addData() with empty buffer 197 * NEED_TASK: delegate task to executor 198 * BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap 199 * BUFFER_UNDERFLOW: keep buffer and wait for more data 200 * OK: return generated buffers. 201 * 202 * Upstream subscription strategy is to try and keep no more than 203 * TARGET_BUFSIZE bytes in readBuf 204 */ 205 class Reader extends SubscriberWrapper { 206 final SequentialScheduler scheduler; 207 static final int TARGET_BUFSIZE = 16 * 1024; 208 volatile ByteBuffer readBuf; 209 volatile boolean completing = false; 210 final Object readBufferLock = new Object(); 211 final System.Logger debugr = 212 Utils.getDebugLogger(this::dbgString, DEBUG); 213 214 class ReaderDownstreamPusher implements Runnable { 215 @Override public void run() { processData(); } 216 } 217 218 Reader() { 219 super(); 220 scheduler = SequentialScheduler.synchronizedScheduler( 221 new ReaderDownstreamPusher()); 222 this.readBuf = ByteBuffer.allocate(1024); 223 readBuf.limit(0); // keep in read mode 224 } 225 226 protected SchedulingAction enterScheduling() { 227 return enterReadScheduling(); 228 } 229 230 public final String dbgString() { 231 return "SSL Reader(" + tubeName + ")"; 232 } 233 234 /** 235 * entry point for buffers delivered from upstream Subscriber 236 */ 237 @Override 238 public void incoming(List<ByteBuffer> buffers, boolean complete) { 239 debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers) 240 + " bytes to read buffer"); 241 addToReadBuf(buffers); 242 if (complete) { 243 this.completing = true; 244 } 245 scheduler.runOrSchedule(); 246 } 247 248 @Override 249 public String toString() { 250 return "READER: " + super.toString() + " readBuf: " + readBuf.toString() 251 + " count: " + count.toString(); 252 } 253 254 private void reallocReadBuf() { 255 int sz = readBuf.capacity(); 256 ByteBuffer newb = ByteBuffer.allocate(sz*2); 257 readBuf.flip(); 258 Utils.copy(readBuf, newb); 259 readBuf = newb; 260 } 261 262 @Override 263 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 264 if (readBuf.remaining() > TARGET_BUFSIZE) { 265 return 0; 266 } else { 267 return super.upstreamWindowUpdate(currentWindow, downstreamQsize); 268 } 269 } 270 271 // readBuf is kept ready for reading outside of this method 272 private void addToReadBuf(List<ByteBuffer> buffers) { 273 synchronized (readBufferLock) { 274 for (ByteBuffer buf : buffers) { 275 readBuf.compact(); 276 while (readBuf.remaining() < buf.remaining()) 277 reallocReadBuf(); 278 readBuf.put(buf); 279 readBuf.flip(); 280 } 281 } 282 } 283 284 void schedule() { 285 scheduler.runOrSchedule(); 286 } 287 288 void stop() { 289 debugr.log(Level.DEBUG, "stop"); 290 scheduler.stop(); 291 } 292 293 AtomicInteger count = new AtomicInteger(0); 294 295 // work function where it all happens 296 void processData() { 297 try { 298 debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining() 299 + " bytes to unwrap " 300 + states(handshakeState)); 301 302 while (readBuf.hasRemaining()) { 303 boolean handshaking = false; 304 try { 305 EngineResult result; 306 synchronized (readBufferLock) { 307 result = unwrapBuffer(readBuf); 308 debugr.log(Level.DEBUG, "Unwrapped: %s", result.result); 309 } 310 if (result.status() == Status.BUFFER_UNDERFLOW) { 311 debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW"); 312 return; 313 } 314 if (completing && result.status() == Status.CLOSED) { 315 debugr.log(Level.DEBUG, "Closed: completing"); 316 outgoing(Utils.EMPTY_BB_LIST, true); 317 return; 318 } 319 if (result.handshaking() && !completing) { 320 debugr.log(Level.DEBUG, "handshaking"); 321 doHandshake(result, READER); 322 resumeActivity(); 323 handshaking = true; 324 } else { 325 if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { 326 setALPN(); 327 handshaking = false; 328 resumeActivity(); 329 } 330 } 331 if (result.bytesProduced() > 0) { 332 debugr.log(Level.DEBUG, "sending %d", result.bytesProduced()); 333 count.addAndGet(result.bytesProduced()); 334 outgoing(result.destBuffer, false); 335 } 336 } catch (IOException ex) { 337 errorCommon(ex); 338 handleError(ex); 339 } 340 if (handshaking && !completing) 341 return; 342 } 343 if (completing) { 344 debugr.log(Level.DEBUG, "completing"); 345 // Complete the alpnCF, if not already complete, regardless of 346 // whether or not the ALPN is available, there will be no more 347 // activity. 348 setALPN(); 349 outgoing(Utils.EMPTY_BB_LIST, true); 350 } 351 } catch (Throwable ex) { 352 errorCommon(ex); 353 handleError(ex); 354 } 355 } 356 } 357 358 /** 359 * Returns a CompletableFuture which completes after all activity 360 * in the delegate is terminated (whether normally or exceptionally). 361 * 362 * @return 363 */ 364 public CompletableFuture<Void> completion() { 365 return cf; 366 } 367 368 public interface Monitorable { 369 public String getInfo(); 370 } 371 372 public static class Monitor extends Thread { 373 final List<Monitorable> list; 374 static Monitor themon; 375 376 static { 377 themon = new Monitor(); 378 themon.start(); // uncomment to enable Monitor 379 } 380 381 Monitor() { 382 super("Monitor"); 383 setDaemon(true); 384 list = Collections.synchronizedList(new LinkedList<>()); 385 } 386 387 void addTarget(Monitorable o) { 388 list.add(o); 389 } 390 391 public static void add(Monitorable o) { 392 themon.addTarget(o); 393 } 394 395 @Override 396 public void run() { 397 System.out.println("Monitor starting"); 398 while (true) { 399 try {Thread.sleep(20*1000); } catch (Exception e) {} 400 synchronized (list) { 401 for (Monitorable o : list) { 402 System.out.println(o.getInfo()); 403 System.out.println("-------------------------"); 404 } 405 } 406 System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-"); 407 408 } 409 } 410 } 411 412 /** 413 * Processing function for outgoing data. Pass it thru SSLEngine.wrap() 414 * Any encrypted buffers generated are passed downstream to be written. 415 * Status codes: 416 * NEED_UNWRAP: call reader.addData() with empty buffer 417 * NEED_WRAP: call addData() with empty buffer 418 * NEED_TASK: delegate task to executor 419 * BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap 420 * BUFFER_UNDERFLOW: shouldn't happen on writing side 421 * OK: return generated buffers 422 */ 423 class Writer extends SubscriberWrapper { 424 final SequentialScheduler scheduler; 425 // queues of buffers received from upstream waiting 426 // to be processed by the SSLEngine 427 final List<ByteBuffer> writeList; 428 final System.Logger debugw = 429 Utils.getDebugLogger(this::dbgString, DEBUG); 430 431 class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask { 432 @Override public void run() { processData(); } 433 } 434 435 Writer() { 436 super(); 437 writeList = Collections.synchronizedList(new LinkedList<>()); 438 scheduler = new SequentialScheduler(new WriterDownstreamPusher()); 439 } 440 441 @Override 442 protected void incoming(List<ByteBuffer> buffers, boolean complete) { 443 assert complete ? buffers == Utils.EMPTY_BB_LIST : true; 444 assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true; 445 if (complete) { 446 writeList.add(SENTINEL); 447 } else { 448 writeList.addAll(buffers); 449 } 450 debugw.log(Level.DEBUG, () -> "added " + buffers.size() 451 + " (" + Utils.remaining(buffers) 452 + " bytes) to the writeList"); 453 scheduler.runOrSchedule(); 454 } 455 456 public final String dbgString() { 457 return "SSL Writer(" + tubeName + ")"; 458 } 459 460 protected void onSubscribe() { 461 doHandshake(EngineResult.INIT, INIT); 462 resumeActivity(); 463 } 464 465 void schedule() { 466 scheduler.runOrSchedule(); 467 } 468 469 void stop() { 470 debugw.log(Level.DEBUG, "stop"); 471 scheduler.stop(); 472 } 473 474 @Override 475 public boolean closing() { 476 return closeNotifyReceived(); 477 } 478 479 private boolean isCompleting() { 480 synchronized(writeList) { 481 int lastIndex = writeList.size() - 1; 482 if (lastIndex < 0) 483 return false; 484 return writeList.get(lastIndex) == SENTINEL; 485 } 486 } 487 488 @Override 489 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 490 if (writeList.size() > 10) 491 return 0; 492 else 493 return super.upstreamWindowUpdate(currentWindow, downstreamQsize); 494 } 495 496 private boolean hsTriggered() { 497 synchronized(writeList) { 498 for (ByteBuffer b : writeList) 499 if (b == HS_TRIGGER) 500 return true; 501 return false; 502 } 503 } 504 505 private void processData() { 506 boolean completing = isCompleting(); 507 508 try { 509 debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")"); 510 while (Utils.remaining(writeList) > 0 || hsTriggered()) { 511 ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY); 512 EngineResult result = wrapBuffers(outbufs); 513 debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result); 514 515 if (result.status() == Status.CLOSED) { 516 if (result.bytesProduced() <= 0) 517 return; 518 519 completing = true; 520 // There could still be some outgoing data in outbufs. 521 writeList.add(SENTINEL); 522 } 523 524 boolean handshaking = false; 525 if (result.handshaking()) { 526 debugw.log(Level.DEBUG, "handshaking"); 527 doHandshake(result, WRITER); 528 handshaking = true; 529 } else { 530 if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { 531 setALPN(); 532 resumeActivity(); 533 } 534 } 535 cleanList(writeList); // tidy up the source list 536 sendResultBytes(result); 537 if (handshaking && !completing) { 538 if (writeList.isEmpty() && !result.needUnwrap()) { 539 writer.addData(HS_TRIGGER); 540 } 541 return; 542 } 543 } 544 if (completing && Utils.remaining(writeList) == 0) { 545 /* 546 System.out.println("WRITER DOO 3"); 547 engine.closeOutbound(); 548 EngineResult result = wrapBuffers(Utils.EMPTY_BB_ARRAY); 549 sendResultBytes(result); 550 */ 551 outgoing(Utils.EMPTY_BB_LIST, true); 552 return; 553 } 554 } catch (Throwable ex) { 555 handleError(ex); 556 } 557 } 558 559 private void sendResultBytes(EngineResult result) { 560 if (result.bytesProduced() > 0) { 561 debugw.log(Level.DEBUG, "Sending %d bytes downstream", 562 result.bytesProduced()); 563 outgoing(result.destBuffer, false); 564 } 565 } 566 567 @Override 568 public String toString() { 569 return "WRITER: " + super.toString() + 570 " writeList size " + Integer.toString(writeList.size()); 571 //" writeList: " + writeList.toString(); 572 } 573 } 574 575 private void handleError(Throwable t) { 576 debug.log(Level.DEBUG, "handleError", t); 577 cf.completeExceptionally(t); 578 // no-op if already completed 579 alpnCF.completeExceptionally(t); 580 reader.stop(); 581 writer.stop(); 582 } 583 584 private void normalStop() { 585 reader.stop(); 586 writer.stop(); 587 } 588 589 private void cleanList(List<ByteBuffer> l) { 590 synchronized (l) { 591 Iterator<ByteBuffer> iter = l.iterator(); 592 while (iter.hasNext()) { 593 ByteBuffer b = iter.next(); 594 if (!b.hasRemaining()) { 595 iter.remove(); 596 } 597 } 598 } 599 } 600 601 /** 602 * States for handshake. We avoid races when accessing/updating the AtomicInt 603 * because updates always schedule an additional call to both the read() 604 * and write() functions. 605 */ 606 private static final int NOT_HANDSHAKING = 0; 607 private static final int HANDSHAKING = 1; 608 private static final int INIT = 2; 609 private static final int DOING_TASKS = 4; // bit added to above state 610 private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); 611 612 private static final int READER = 1; 613 private static final int WRITER = 2; 614 615 private static String states(AtomicInteger state) { 616 int s = state.get(); 617 StringBuilder sb = new StringBuilder(); 618 int x = s & ~DOING_TASKS; 619 switch (x) { 620 case NOT_HANDSHAKING: 621 sb.append(" NOT_HANDSHAKING "); 622 break; 623 case HANDSHAKING: 624 sb.append(" HANDSHAKING "); 625 break; 626 case INIT: 627 sb.append(" INIT "); 628 break; 629 default: 630 throw new InternalError(); 631 } 632 if ((s & DOING_TASKS) > 0) 633 sb.append("|DOING_TASKS"); 634 return sb.toString(); 635 } 636 637 private void resumeActivity() { 638 reader.schedule(); 639 writer.schedule(); 640 } 641 642 final AtomicInteger handshakeState; 643 final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>(); 644 645 private void doHandshake(EngineResult r, int caller) { 646 int s = handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS)); 647 stateList.add(r.handshakeStatus().toString()); 648 stateList.add(Integer.toString(caller)); 649 switch (r.handshakeStatus()) { 650 case NEED_TASK: 651 if ((s & DOING_TASKS) > 0) // someone else was doing tasks 652 return; 653 List<Runnable> tasks = obtainTasks(); 654 executeTasks(tasks); 655 break; 656 case NEED_WRAP: 657 writer.addData(HS_TRIGGER); 658 break; 659 case NEED_UNWRAP: 660 case NEED_UNWRAP_AGAIN: 661 // do nothing else 662 break; 663 default: 664 throw new InternalError("Unexpected handshake status:" 665 + r.handshakeStatus()); 666 } 667 } 668 669 private List<Runnable> obtainTasks() { 670 List<Runnable> l = new ArrayList<>(); 671 Runnable r; 672 while ((r = engine.getDelegatedTask()) != null) { 673 l.add(r); 674 } 675 return l; 676 } 677 678 private void executeTasks(List<Runnable> tasks) { 679 exec.execute(() -> { 680 handshakeState.getAndUpdate((current) -> current | DOING_TASKS); 681 try { 682 tasks.forEach((r) -> { 683 r.run(); 684 }); 685 } catch (Throwable t) { 686 handleError(t); 687 } 688 handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS); 689 writer.addData(HS_TRIGGER); 690 resumeActivity(); 691 }); 692 } 693 694 695 EngineResult unwrapBuffer(ByteBuffer src) throws IOException { 696 ByteBuffer dst = getAppBuffer(); 697 while (true) { 698 SSLEngineResult sslResult = engine.unwrap(src, dst); 699 switch (sslResult.getStatus()) { 700 case BUFFER_OVERFLOW: 701 // may happen only if app size buffer was changed. 702 // get it again if app buffer size changed 703 int appSize = engine.getSession().getApplicationBufferSize(); 704 ByteBuffer b = ByteBuffer.allocate(appSize + dst.position()); 705 dst.flip(); 706 b.put(dst); 707 dst = b; 708 break; 709 case CLOSED: 710 return doClosure(new EngineResult(sslResult)); 711 case BUFFER_UNDERFLOW: 712 // handled implicitly by compaction/reallocation of readBuf 713 return new EngineResult(sslResult); 714 case OK: 715 dst.flip(); 716 return new EngineResult(sslResult, dst); 717 } 718 } 719 } 720 721 // FIXME: acknowledge a received CLOSE request from peer 722 EngineResult doClosure(EngineResult r) throws IOException { 723 debug.log(Level.DEBUG, 724 "doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]", 725 r.result, engine.getHandshakeStatus(), 726 engine.isOutboundDone(), engine.isInboundDone()); 727 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) { 728 // we have received TLS close_notify and need to send 729 // an acknowledgement back. We're calling doHandshake 730 // to finish the close handshake. 731 if (engine.isInboundDone() && !engine.isOutboundDone()) { 732 debug.log(Level.DEBUG, "doClosure: close_notify received"); 733 close_notify_received = true; 734 doHandshake(r, READER); 735 } 736 } 737 return r; 738 } 739 740 /** 741 * Returns the upstream Flow.Subscriber of the reading (incoming) side. 742 * This flow must be given the encrypted data read from upstream (eg socket) 743 * before it is decrypted. 744 */ 745 public Flow.Subscriber<List<ByteBuffer>> upstreamReader() { 746 return reader; 747 } 748 749 /** 750 * Returns the upstream Flow.Subscriber of the writing (outgoing) side. 751 * This flow contains the plaintext data before it is encrypted. 752 */ 753 public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() { 754 return writer; 755 } 756 757 public boolean resumeReader() { 758 return reader.signalScheduling(); 759 } 760 761 public void resetReaderDemand() { 762 reader.resetDownstreamDemand(); 763 } 764 765 static class EngineResult { 766 final SSLEngineResult result; 767 final ByteBuffer destBuffer; 768 769 // normal result 770 EngineResult(SSLEngineResult result) { 771 this(result, null); 772 } 773 774 EngineResult(SSLEngineResult result, ByteBuffer destBuffer) { 775 this.result = result; 776 this.destBuffer = destBuffer; 777 } 778 779 // Special result used to trigger handshaking in constructor 780 static EngineResult INIT = 781 new EngineResult( 782 new SSLEngineResult(SSLEngineResult.Status.OK, HandshakeStatus.NEED_WRAP, 0, 0)); 783 784 boolean handshaking() { 785 HandshakeStatus s = result.getHandshakeStatus(); 786 return s != HandshakeStatus.FINISHED 787 && s != HandshakeStatus.NOT_HANDSHAKING 788 && result.getStatus() != Status.CLOSED; 789 } 790 791 boolean needUnwrap() { 792 HandshakeStatus s = result.getHandshakeStatus(); 793 return s == HandshakeStatus.NEED_UNWRAP; 794 } 795 796 797 int bytesConsumed() { 798 return result.bytesConsumed(); 799 } 800 801 int bytesProduced() { 802 return result.bytesProduced(); 803 } 804 805 SSLEngineResult.HandshakeStatus handshakeStatus() { 806 return result.getHandshakeStatus(); 807 } 808 809 SSLEngineResult.Status status() { 810 return result.getStatus(); 811 } 812 } 813 814 public ByteBuffer getNetBuffer() { 815 return ByteBuffer.allocate(engine.getSession().getPacketBufferSize()); 816 } 817 818 private ByteBuffer getAppBuffer() { 819 return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()); 820 } 821 822 final String dbgString() { 823 return "SSLFlowDelegate(" + tubeName + ")"; 824 } 825 826 @SuppressWarnings("fallthrough") 827 EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { 828 debug.log(Level.DEBUG, () -> "wrapping " 829 + Utils.remaining(src) + " bytes"); 830 ByteBuffer dst = getNetBuffer(); 831 while (true) { 832 SSLEngineResult sslResult = engine.wrap(src, dst); 833 debug.log(Level.DEBUG, () -> "SSLResult: " + sslResult); 834 switch (sslResult.getStatus()) { 835 case BUFFER_OVERFLOW: 836 // Shouldn't happen. We allocated buffer with packet size 837 // get it again if net buffer size was changed 838 debug.log(Level.DEBUG, "BUFFER_OVERFLOW"); 839 int appSize = engine.getSession().getApplicationBufferSize(); 840 ByteBuffer b = ByteBuffer.allocate(appSize + dst.position()); 841 dst.flip(); 842 b.put(dst); 843 dst = b; 844 break; // try again 845 case CLOSED: 846 debug.log(Level.DEBUG, "CLOSED"); 847 // fallthrough. There could be some remaining data in dst. 848 // CLOSED will be handled by the caller. 849 case OK: 850 dst.flip(); 851 final ByteBuffer dest = dst; 852 debug.log(Level.DEBUG, () -> "OK => produced: " 853 + dest.remaining() 854 + " not wrapped: " 855 + Utils.remaining(src)); 856 return new EngineResult(sslResult, dest); 857 case BUFFER_UNDERFLOW: 858 // Shouldn't happen. Doesn't returns when wrap() 859 // underflow handled externally 860 // assert false : "Buffer Underflow"; 861 debug.log(Level.DEBUG, "BUFFER_UNDERFLOW"); 862 return new EngineResult(sslResult); 863 default: 864 debug.log(Level.DEBUG, "ASSERT"); 865 assert false; 866 } 867 } 868 } 869 }