1 /* 2 * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package jdk.internal.net.http.common; 27 28 import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction; 29 30 import javax.net.ssl.SSLEngine; 31 import javax.net.ssl.SSLEngineResult; 32 import javax.net.ssl.SSLEngineResult.HandshakeStatus; 33 import javax.net.ssl.SSLEngineResult.Status; 34 import javax.net.ssl.SSLException; 35 import java.io.IOException; 36 import java.lang.ref.Reference; 37 import java.lang.ref.ReferenceQueue; 38 import java.lang.ref.WeakReference; 39 import java.nio.ByteBuffer; 40 import java.util.ArrayList; 41 import java.util.Collections; 42 import java.util.Iterator; 43 import java.util.LinkedList; 44 import java.util.List; 45 import java.util.concurrent.CompletableFuture; 46 import java.util.concurrent.ConcurrentLinkedQueue; 47 import java.util.concurrent.Executor; 48 import java.util.concurrent.Flow; 49 import java.util.concurrent.Flow.Subscriber; 50 import java.util.concurrent.atomic.AtomicInteger; 51 import java.util.function.Consumer; 52 import java.util.function.IntBinaryOperator; 53 54 /** 55 * Implements SSL using two SubscriberWrappers. 56 * 57 * <p> Constructor takes two Flow.Subscribers: one that receives the network 58 * data (after it has been encrypted by SSLFlowDelegate) data, and one that 59 * receives the application data (before it has been encrypted by SSLFlowDelegate). 60 * 61 * <p> Methods upstreamReader() and upstreamWriter() return the corresponding 62 * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data. 63 * See diagram below. 64 * 65 * <p> How Flow.Subscribers are used in this class, and where they come from: 66 * <pre> 67 * {@code 68 * 69 * 70 * 71 * ---------> data flow direction 72 * 73 * 74 * +------------------+ 75 * upstreamWriter | | downWriter 76 * ---------------> | | ------------> 77 * obtained from this | | supplied to constructor 78 * | SSLFlowDelegate | 79 * downReader | | upstreamReader 80 * <--------------- | | <-------------- 81 * supplied to constructor | | obtained from this 82 * +------------------+ 83 * 84 * Errors are reported to the downReader Flow.Subscriber 85 * 86 * } 87 * </pre> 88 */ 89 public class SSLFlowDelegate { 90 91 final Logger debug = 92 Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 93 94 private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; 95 private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); 96 // When handshake is in progress trying to wrap may produce no bytes. 97 private static final ByteBuffer NOTHING = ByteBuffer.allocate(0); 98 private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate"); 99 private static final boolean isMonitored = 100 monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")); 101 102 final Executor exec; 103 final Reader reader; 104 final Writer writer; 105 final SSLEngine engine; 106 final String tubeName; // hack 107 final CompletableFuture<String> alpnCF; // completes on initial handshake 108 final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped 109 volatile boolean close_notify_received; 110 final CompletableFuture<Void> readerCF; 111 final CompletableFuture<Void> writerCF; 112 final Consumer<ByteBuffer> recycler; 113 static AtomicInteger scount = new AtomicInteger(1); 114 final int id; 115 116 /** 117 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each 118 * Flow.Subscriber requires an associated {@link CompletableFuture} 119 * for errors that need to be signaled from downstream to upstream. 120 */ 121 public SSLFlowDelegate(SSLEngine engine, 122 Executor exec, 123 Subscriber<? super List<ByteBuffer>> downReader, 124 Subscriber<? super List<ByteBuffer>> downWriter) 125 { 126 this(engine, exec, null, downReader, downWriter); 127 } 128 129 /** 130 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each 131 * Flow.Subscriber requires an associated {@link CompletableFuture} 132 * for errors that need to be signaled from downstream to upstream. 133 */ 134 public SSLFlowDelegate(SSLEngine engine, 135 Executor exec, 136 Consumer<ByteBuffer> recycler, 137 Subscriber<? super List<ByteBuffer>> downReader, 138 Subscriber<? super List<ByteBuffer>> downWriter) 139 { 140 this.id = scount.getAndIncrement(); 141 this.tubeName = String.valueOf(downWriter); 142 this.recycler = recycler; 143 this.reader = new Reader(); 144 this.writer = new Writer(); 145 this.engine = engine; 146 this.exec = exec; 147 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); 148 this.readerCF = reader.completion(); 149 this.writerCF = reader.completion(); 150 readerCF.exceptionally(this::stopOnError); 151 writerCF.exceptionally(this::stopOnError); 152 153 CompletableFuture.allOf(reader.completion(), writer.completion()) 154 .thenRun(this::normalStop); 155 this.alpnCF = new MinimalFuture<>(); 156 157 // connect the Reader to the downReader and the 158 // Writer to the downWriter. 159 connect(downReader, downWriter); 160 161 if (isMonitored) Monitor.add(monitor); 162 } 163 164 /** 165 * Returns true if the SSLFlowDelegate has detected a TLS 166 * close_notify from the server. 167 * @return true, if a close_notify was detected. 168 */ 169 public boolean closeNotifyReceived() { 170 return close_notify_received; 171 } 172 173 /** 174 * Connects the read sink (downReader) to the SSLFlowDelegate Reader, 175 * and the write sink (downWriter) to the SSLFlowDelegate Writer. 176 * Called from within the constructor. Overwritten by SSLTube. 177 * 178 * @param downReader The left hand side read sink (typically, the 179 * HttpConnection read subscriber). 180 * @param downWriter The right hand side write sink (typically 181 * the SocketTube write subscriber). 182 */ 183 void connect(Subscriber<? super List<ByteBuffer>> downReader, 184 Subscriber<? super List<ByteBuffer>> downWriter) { 185 this.reader.subscribe(downReader); 186 this.writer.subscribe(downWriter); 187 } 188 189 /** 190 * Returns a CompletableFuture<String> which completes after 191 * the initial handshake completes, and which contains the negotiated 192 * alpn. 193 */ 194 public CompletableFuture<String> alpn() { 195 return alpnCF; 196 } 197 198 private void setALPN() { 199 // Handshake is finished. So, can retrieve the ALPN now 200 if (alpnCF.isDone()) 201 return; 202 String alpn = engine.getApplicationProtocol(); 203 if (debug.on()) debug.log("setALPN = %s", alpn); 204 alpnCF.complete(alpn); 205 } 206 207 public String monitor() { 208 StringBuilder sb = new StringBuilder(); 209 sb.append("SSL: id ").append(id); 210 sb.append(" ").append(dbgString()); 211 sb.append(" HS state: " + states(handshakeState)); 212 sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); 213 if (stateList != null) { 214 sb.append(" LL : "); 215 for (String s : stateList) { 216 sb.append(s).append(" "); 217 } 218 } 219 sb.append("\r\n"); 220 sb.append("Reader:: ").append(reader.toString()); 221 sb.append("\r\n"); 222 sb.append("Writer:: ").append(writer.toString()); 223 sb.append("\r\n==================================="); 224 return sb.toString(); 225 } 226 227 protected SchedulingAction enterReadScheduling() { 228 return SchedulingAction.CONTINUE; 229 } 230 231 232 /** 233 * Processing function for incoming data. Pass it thru SSLEngine.unwrap(). 234 * Any decrypted buffers returned to be passed downstream. 235 * Status codes: 236 * NEED_UNWRAP: do nothing. Following incoming data will contain 237 * any required handshake data 238 * NEED_WRAP: call writer.addData() with empty buffer 239 * NEED_TASK: delegate task to executor 240 * BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap 241 * BUFFER_UNDERFLOW: keep buffer and wait for more data 242 * OK: return generated buffers. 243 * 244 * Upstream subscription strategy is to try and keep no more than 245 * TARGET_BUFSIZE bytes in readBuf 246 */ 247 final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber { 248 // Maximum record size is 16k. 249 // Because SocketTube can feeds us up to 3 16K buffers, 250 // then setting this size to 16K means that the readBuf 251 // can store up to 64K-1 (16K-1 + 3*16K) 252 static final int TARGET_BUFSIZE = 16 * 1024; 253 254 final SequentialScheduler scheduler; 255 volatile ByteBuffer readBuf; 256 volatile boolean completing; 257 final Object readBufferLock = new Object(); 258 final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 259 260 private final class ReaderDownstreamPusher implements Runnable { 261 @Override 262 public void run() { 263 processData(); 264 } 265 } 266 267 Reader() { 268 super(); 269 scheduler = SequentialScheduler.synchronizedScheduler( 270 new ReaderDownstreamPusher()); 271 this.readBuf = ByteBuffer.allocate(1024); 272 readBuf.limit(0); // keep in read mode 273 } 274 275 @Override 276 public boolean supportsRecycling() { 277 return recycler != null; 278 } 279 280 protected SchedulingAction enterScheduling() { 281 return enterReadScheduling(); 282 } 283 284 public final String dbgString() { 285 return "SSL Reader(" + tubeName + ")"; 286 } 287 288 /** 289 * entry point for buffers delivered from upstream Subscriber 290 */ 291 @Override 292 public void incoming(List<ByteBuffer> buffers, boolean complete) { 293 if (debugr.on()) 294 debugr.log("Adding %d bytes to read buffer", 295 Utils.remaining(buffers)); 296 addToReadBuf(buffers, complete); 297 scheduler.runOrSchedule(exec); 298 } 299 300 @Override 301 public String toString() { 302 return "READER: " + super.toString() + ", readBuf: " + readBuf.toString() 303 + ", count: " + count.toString() + ", scheduler: " 304 + (scheduler.isStopped() ? "stopped" : "running") 305 + ", status: " + lastUnwrapStatus; 306 } 307 308 private void reallocReadBuf() { 309 int sz = readBuf.capacity(); 310 ByteBuffer newb = ByteBuffer.allocate(sz * 2); 311 readBuf.flip(); 312 Utils.copy(readBuf, newb); 313 readBuf = newb; 314 } 315 316 @Override 317 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 318 if (readBuf.remaining() > TARGET_BUFSIZE) { 319 if (debugr.on()) 320 debugr.log("readBuf has more than TARGET_BUFSIZE: %d", 321 readBuf.remaining()); 322 return 0; 323 } else { 324 return super.upstreamWindowUpdate(currentWindow, downstreamQsize); 325 } 326 } 327 328 // readBuf is kept ready for reading outside of this method 329 private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) { 330 assert Utils.remaining(buffers) > 0 || buffers.isEmpty(); 331 synchronized (readBufferLock) { 332 for (ByteBuffer buf : buffers) { 333 readBuf.compact(); 334 while (readBuf.remaining() < buf.remaining()) 335 reallocReadBuf(); 336 readBuf.put(buf); 337 readBuf.flip(); 338 // should be safe to call inside lock 339 // since the only implementation 340 // offers the buffer to an unbounded queue. 341 // WARNING: do not touch buf after this point! 342 if (recycler != null) recycler.accept(buf); 343 } 344 if (complete) { 345 this.completing = complete; 346 minBytesRequired = 0; 347 } 348 } 349 } 350 351 void schedule() { 352 scheduler.runOrSchedule(exec); 353 } 354 355 void stop() { 356 if (debugr.on()) debugr.log("stop"); 357 scheduler.stop(); 358 } 359 360 AtomicInteger count = new AtomicInteger(0); 361 362 // minimum number of bytes required to call unwrap. 363 // Usually this is 0, unless there was a buffer underflow. 364 // In this case we need to wait for more bytes than what 365 // we had before calling unwrap() again. 366 volatile int minBytesRequired; 367 368 // work function where it all happens 369 final void processData() { 370 try { 371 if (debugr.on()) 372 debugr.log("processData:" 373 + " readBuf remaining:" + readBuf.remaining() 374 + ", state:" + states(handshakeState) 375 + ", engine handshake status:" + engine.getHandshakeStatus()); 376 int len; 377 boolean complete = false; 378 while (readBuf.remaining() > (len = minBytesRequired)) { 379 boolean handshaking = false; 380 try { 381 EngineResult result; 382 synchronized (readBufferLock) { 383 complete = this.completing; 384 if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining()); 385 // Unless there is a BUFFER_UNDERFLOW, we should try to 386 // unwrap any number of bytes. Set minBytesRequired to 0: 387 // we only need to do that if minBytesRequired is not already 0. 388 len = len > 0 ? minBytesRequired = 0 : len; 389 result = unwrapBuffer(readBuf); 390 len = readBuf.remaining(); 391 if (debugr.on()) { 392 debugr.log("Unwrapped: result: %s", result.result); 393 debugr.log("Unwrapped: consumed: %s", result.bytesConsumed()); 394 } 395 } 396 if (result.bytesProduced() > 0) { 397 if (debugr.on()) 398 debugr.log("sending %d", result.bytesProduced()); 399 count.addAndGet(result.bytesProduced()); 400 outgoing(result.destBuffer, false); 401 } 402 if (result.status() == Status.BUFFER_UNDERFLOW) { 403 if (debugr.on()) debugr.log("BUFFER_UNDERFLOW"); 404 // not enough data in the read buffer... 405 // no need to try to unwrap again unless we get more bytes 406 // than minBytesRequired = len in the read buffer. 407 synchronized (readBufferLock) { 408 minBytesRequired = len; 409 // more bytes could already have been added... 410 assert readBuf.remaining() >= len; 411 // check if we have received some data, and if so 412 // we can just re-spin the loop 413 if (readBuf.remaining() > len) continue; 414 else if (this.completing) { 415 if (debug.on()) { 416 debugr.log("BUFFER_UNDERFLOW with EOF," + 417 " %d bytes non decrypted.", len); 418 } 419 // The channel won't send us any more data, and 420 // we are in underflow: we need to fail. 421 throw new IOException("BUFFER_UNDERFLOW with EOF, " 422 + len + " bytes non decrypted."); 423 } 424 } 425 // request more data and return. 426 requestMore(); 427 return; 428 } 429 if (complete && result.status() == Status.CLOSED) { 430 if (debugr.on()) debugr.log("Closed: completing"); 431 outgoing(Utils.EMPTY_BB_LIST, true); 432 return; 433 } 434 if (result.handshaking()) { 435 handshaking = true; 436 if (debugr.on()) debugr.log("handshaking"); 437 if (doHandshake(result, READER)) continue; // need unwrap 438 else break; // doHandshake will have triggered the write scheduler if necessary 439 } else { 440 if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { 441 handshaking = false; 442 applicationBufferSize = engine.getSession().getApplicationBufferSize(); 443 packetBufferSize = engine.getSession().getPacketBufferSize(); 444 setALPN(); 445 resumeActivity(); 446 } 447 } 448 } catch (IOException ex) { 449 errorCommon(ex); 450 handleError(ex); 451 return; 452 } 453 if (handshaking && !complete) 454 return; 455 } 456 if (!complete) { 457 synchronized (readBufferLock) { 458 complete = this.completing && !readBuf.hasRemaining(); 459 } 460 } 461 if (complete) { 462 if (debugr.on()) debugr.log("completing"); 463 // Complete the alpnCF, if not already complete, regardless of 464 // whether or not the ALPN is available, there will be no more 465 // activity. 466 setALPN(); 467 outgoing(Utils.EMPTY_BB_LIST, true); 468 } 469 } catch (Throwable ex) { 470 errorCommon(ex); 471 handleError(ex); 472 } 473 } 474 475 private volatile Status lastUnwrapStatus; 476 EngineResult unwrapBuffer(ByteBuffer src) throws IOException { 477 ByteBuffer dst = getAppBuffer(); 478 int len = src.remaining(); 479 while (true) { 480 SSLEngineResult sslResult = engine.unwrap(src, dst); 481 switch (lastUnwrapStatus = sslResult.getStatus()) { 482 case BUFFER_OVERFLOW: 483 // may happen if app size buffer was changed, or if 484 // our 'adaptiveBufferSize' guess was too small for 485 // the current payload. In that case, update the 486 // value of applicationBufferSize, and allocate a 487 // buffer of that size, which we are sure will be 488 // big enough to decode whatever needs to be 489 // decoded. We will later update adaptiveBufferSize 490 // in OK: below. 491 int appSize = applicationBufferSize = 492 engine.getSession().getApplicationBufferSize(); 493 ByteBuffer b = ByteBuffer.allocate(appSize + dst.position()); 494 dst.flip(); 495 b.put(dst); 496 dst = b; 497 break; 498 case CLOSED: 499 assert dst.position() == 0; 500 return doClosure(new EngineResult(sslResult)); 501 case BUFFER_UNDERFLOW: 502 // handled implicitly by compaction/reallocation of readBuf 503 assert dst.position() == 0; 504 return new EngineResult(sslResult); 505 case OK: 506 int size = dst.position(); 507 if (debug.on()) { 508 debugr.log("Decoded " + size + " bytes out of " + len 509 + " into buffer of " + dst.capacity() 510 + " remaining to decode: " + src.remaining()); 511 } 512 // if the record payload was bigger than what was originally 513 // allocated, then sets the adaptiveAppBufferSize to size 514 // and we will use that new size as a guess for the next app 515 // buffer. 516 if (size > adaptiveAppBufferSize) { 517 adaptiveAppBufferSize = ((size + 7) >>> 3) << 3; 518 } 519 dst.flip(); 520 return new EngineResult(sslResult, dst); 521 } 522 } 523 } 524 } 525 526 public interface Monitorable { 527 public String getInfo(); 528 } 529 530 public static class Monitor extends Thread { 531 final List<WeakReference<Monitorable>> list; 532 final List<FinalMonitorable> finalList; 533 final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>(); 534 static Monitor themon; 535 536 static { 537 themon = new Monitor(); 538 themon.start(); // uncomment to enable Monitor 539 } 540 541 // An instance used to temporarily store the 542 // last observable state of a monitorable object. 543 // When Monitor.remove(o) is called, we replace 544 // 'o' with a FinalMonitorable whose reference 545 // will be enqueued after the last observable state 546 // has been printed. 547 final class FinalMonitorable implements Monitorable { 548 final String finalState; 549 FinalMonitorable(Monitorable o) { 550 finalState = o.getInfo(); 551 finalList.add(this); 552 } 553 @Override 554 public String getInfo() { 555 finalList.remove(this); 556 return finalState; 557 } 558 } 559 560 Monitor() { 561 super("Monitor"); 562 setDaemon(true); 563 list = Collections.synchronizedList(new LinkedList<>()); 564 finalList = new ArrayList<>(); // access is synchronized on list above 565 } 566 567 void addTarget(Monitorable o) { 568 list.add(new WeakReference<>(o, queue)); 569 } 570 void removeTarget(Monitorable o) { 571 // It can take a long time for GC to clean up references. 572 // Calling Monitor.remove() early helps removing noise from the 573 // logs/ 574 synchronized (list) { 575 Iterator<WeakReference<Monitorable>> it = list.iterator(); 576 while (it.hasNext()) { 577 Monitorable m = it.next().get(); 578 if (m == null) it.remove(); 579 if (o == m) { 580 it.remove(); 581 break; 582 } 583 } 584 FinalMonitorable m = new FinalMonitorable(o); 585 addTarget(m); 586 Reference.reachabilityFence(m); 587 } 588 } 589 590 public static void add(Monitorable o) { 591 themon.addTarget(o); 592 } 593 public static void remove(Monitorable o) { 594 themon.removeTarget(o); 595 } 596 597 @Override 598 public void run() { 599 System.out.println("Monitor starting"); 600 try { 601 while (true) { 602 Thread.sleep(20 * 1000); 603 synchronized (list) { 604 Reference<? extends Monitorable> expired; 605 while ((expired = queue.poll()) != null) list.remove(expired); 606 for (WeakReference<Monitorable> ref : list) { 607 Monitorable o = ref.get(); 608 if (o == null) continue; 609 if (o instanceof FinalMonitorable) { 610 ref.enqueue(); 611 } 612 System.out.println(o.getInfo()); 613 System.out.println("-------------------------"); 614 } 615 } 616 System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-"); 617 } 618 } catch (InterruptedException e) { 619 System.out.println("Monitor exiting with " + e); 620 } 621 } 622 } 623 624 /** 625 * Processing function for outgoing data. Pass it thru SSLEngine.wrap() 626 * Any encrypted buffers generated are passed downstream to be written. 627 * Status codes: 628 * NEED_UNWRAP: call reader.addData() with empty buffer 629 * NEED_WRAP: call addData() with empty buffer 630 * NEED_TASK: delegate task to executor 631 * BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap 632 * BUFFER_UNDERFLOW: shouldn't happen on writing side 633 * OK: return generated buffers 634 */ 635 class Writer extends SubscriberWrapper { 636 final SequentialScheduler scheduler; 637 // queues of buffers received from upstream waiting 638 // to be processed by the SSLEngine 639 final List<ByteBuffer> writeList; 640 final Logger debugw = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); 641 volatile boolean completing; 642 boolean completed; // only accessed in processData 643 644 class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask { 645 @Override public void run() { processData(); } 646 } 647 648 Writer() { 649 super(); 650 writeList = Collections.synchronizedList(new LinkedList<>()); 651 scheduler = new SequentialScheduler(new WriterDownstreamPusher()); 652 } 653 654 @Override 655 protected void incoming(List<ByteBuffer> buffers, boolean complete) { 656 assert complete ? buffers == Utils.EMPTY_BB_LIST : true; 657 assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true; 658 if (complete) { 659 if (debugw.on()) debugw.log("adding SENTINEL"); 660 completing = true; 661 writeList.add(SENTINEL); 662 } else { 663 writeList.addAll(buffers); 664 } 665 if (debugw.on()) 666 debugw.log("added " + buffers.size() 667 + " (" + Utils.remaining(buffers) 668 + " bytes) to the writeList"); 669 scheduler.runOrSchedule(); 670 } 671 672 public final String dbgString() { 673 return "SSL Writer(" + tubeName + ")"; 674 } 675 676 protected void onSubscribe() { 677 if (debugw.on()) debugw.log("onSubscribe initiating handshaking"); 678 addData(HS_TRIGGER); // initiates handshaking 679 } 680 681 void schedule() { 682 scheduler.runOrSchedule(); 683 } 684 685 void stop() { 686 if (debugw.on()) debugw.log("stop"); 687 scheduler.stop(); 688 } 689 690 @Override 691 public boolean closing() { 692 return closeNotifyReceived(); 693 } 694 695 private boolean isCompleting() { 696 return completing; 697 } 698 699 @Override 700 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { 701 if (writeList.size() > 10) 702 return 0; 703 else 704 return super.upstreamWindowUpdate(currentWindow, downstreamQsize); 705 } 706 707 private boolean hsTriggered() { 708 synchronized(writeList) { 709 for (ByteBuffer b : writeList) 710 if (b == HS_TRIGGER) 711 return true; 712 return false; 713 } 714 } 715 716 void triggerWrite() { 717 synchronized (writeList) { 718 if (writeList.isEmpty()) { 719 writeList.add(HS_TRIGGER); 720 } 721 } 722 scheduler.runOrSchedule(); 723 } 724 725 private void processData() { 726 boolean completing = isCompleting(); 727 728 try { 729 if (debugw.on()) 730 debugw.log("processData, writeList remaining:" 731 + Utils.remaining(writeList) + ", hsTriggered:" 732 + hsTriggered() + ", needWrap:" + needWrap()); 733 734 while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) { 735 ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY); 736 EngineResult result = wrapBuffers(outbufs); 737 if (debugw.on()) 738 debugw.log("wrapBuffer returned %s", result.result); 739 740 if (result.status() == Status.CLOSED) { 741 if (!upstreamCompleted) { 742 upstreamCompleted = true; 743 upstreamSubscription.cancel(); 744 } 745 if (result.bytesProduced() <= 0) 746 return; 747 748 if (!completing && !completed) { 749 completing = this.completing = true; 750 // There could still be some outgoing data in outbufs. 751 writeList.add(SENTINEL); 752 } 753 } 754 755 boolean handshaking = false; 756 if (result.handshaking()) { 757 if (debugw.on()) debugw.log("handshaking"); 758 doHandshake(result, WRITER); // ok to ignore return 759 handshaking = true; 760 } else { 761 if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { 762 applicationBufferSize = engine.getSession().getApplicationBufferSize(); 763 packetBufferSize = engine.getSession().getPacketBufferSize(); 764 setALPN(); 765 resumeActivity(); 766 } 767 } 768 cleanList(writeList); // tidy up the source list 769 sendResultBytes(result); 770 if (handshaking) { 771 if (!completing && needWrap()) { 772 continue; 773 } else { 774 return; 775 } 776 } 777 } 778 if (completing && Utils.remaining(writeList) == 0) { 779 if (!completed) { 780 completed = true; 781 writeList.clear(); 782 outgoing(Utils.EMPTY_BB_LIST, true); 783 } 784 return; 785 } 786 if (writeList.isEmpty() && needWrap()) { 787 writer.addData(HS_TRIGGER); 788 } 789 } catch (Throwable ex) { 790 errorCommon(ex); 791 handleError(ex); 792 } 793 } 794 795 // The SSLEngine insists on being given a buffer that is at least 796 // SSLSession.getPacketBufferSize() long (usually 16K). If given 797 // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only 798 // has 6 bytes to wrap. Typical usage shows that for GET we 799 // usually produce an average of ~ 100 bytes. 800 // To avoid wasting space, and because allocating and zeroing 801 // 16K buffers for encoding 6 bytes is costly, we are reusing the 802 // same writeBuffer to interact with SSLEngine.wrap(). 803 // If the SSLEngine produces less than writeBuffer.capacity() / 2, 804 // then we copy off the bytes to a smaller buffer that we send 805 // downstream. Otherwise, we send the writeBuffer downstream 806 // and will allocate a new one next time. 807 volatile ByteBuffer writeBuffer; 808 private volatile Status lastWrappedStatus; 809 @SuppressWarnings("fallthrough") 810 EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException { 811 long len = Utils.remaining(src); 812 if (debugw.on()) 813 debugw.log("wrapping " + len + " bytes"); 814 815 ByteBuffer dst = writeBuffer; 816 if (dst == null) dst = writeBuffer = getNetBuffer(); 817 assert dst.position() == 0 : "buffer position is " + dst.position(); 818 assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity(); 819 820 while (true) { 821 SSLEngineResult sslResult = engine.wrap(src, dst); 822 if (debugw.on()) debugw.log("SSLResult: " + sslResult); 823 switch (lastWrappedStatus = sslResult.getStatus()) { 824 case BUFFER_OVERFLOW: 825 // Shouldn't happen. We allocated buffer with packet size 826 // get it again if net buffer size was changed 827 if (debugw.on()) debugw.log("BUFFER_OVERFLOW"); 828 int netSize = packetBufferSize 829 = engine.getSession().getPacketBufferSize(); 830 ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position()); 831 dst.flip(); 832 b.put(dst); 833 dst = b; 834 break; // try again 835 case CLOSED: 836 if (debugw.on()) debugw.log("CLOSED"); 837 // fallthrough. There could be some remaining data in dst. 838 // CLOSED will be handled by the caller. 839 case OK: 840 final ByteBuffer dest; 841 if (dst.position() == 0) { 842 dest = NOTHING; // can happen if handshake is in progress 843 } else if (dst.position() < dst.capacity() / 2) { 844 // less than half the buffer was used. 845 // copy off the bytes to a smaller buffer, and keep 846 // the writeBuffer for next time. 847 dst.flip(); 848 dest = Utils.copyAligned(dst); 849 dst.clear(); 850 } else { 851 // more than half the buffer was used. 852 // just send that buffer downstream, and we will 853 // get a new writeBuffer next time it is needed. 854 dst.flip(); 855 dest = dst; 856 writeBuffer = null; 857 } 858 if (debugw.on()) 859 debugw.log("OK => produced: %d bytes into %d, not wrapped: %d", 860 dest.remaining(), dest.capacity(), Utils.remaining(src)); 861 return new EngineResult(sslResult, dest); 862 case BUFFER_UNDERFLOW: 863 // Shouldn't happen. Doesn't returns when wrap() 864 // underflow handled externally 865 // assert false : "Buffer Underflow"; 866 if (debug.on()) debug.log("BUFFER_UNDERFLOW"); 867 return new EngineResult(sslResult); 868 default: 869 if (debugw.on()) 870 debugw.log("result: %s", sslResult.getStatus()); 871 assert false : "result:" + sslResult.getStatus(); 872 } 873 } 874 } 875 876 private boolean needWrap() { 877 return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP; 878 } 879 880 private void sendResultBytes(EngineResult result) { 881 if (result.bytesProduced() > 0) { 882 if (debugw.on()) 883 debugw.log("Sending %d bytes downstream", 884 result.bytesProduced()); 885 outgoing(result.destBuffer, false); 886 } 887 } 888 889 @Override 890 public String toString() { 891 return "WRITER: " + super.toString() 892 + ", writeList size: " + Integer.toString(writeList.size()) 893 + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running") 894 + ", status: " + lastWrappedStatus; 895 //" writeList: " + writeList.toString(); 896 } 897 } 898 899 private void handleError(Throwable t) { 900 if (debug.on()) debug.log("handleError", t); 901 readerCF.completeExceptionally(t); 902 writerCF.completeExceptionally(t); 903 // no-op if already completed 904 alpnCF.completeExceptionally(t); 905 reader.stop(); 906 writer.stop(); 907 } 908 909 boolean stopped; 910 911 private synchronized void normalStop() { 912 if (stopped) 913 return; 914 stopped = true; 915 reader.stop(); 916 writer.stop(); 917 if (isMonitored) Monitor.remove(monitor); 918 } 919 920 private Void stopOnError(Throwable currentlyUnused) { 921 // maybe log, etc 922 normalStop(); 923 return null; 924 } 925 926 private void cleanList(List<ByteBuffer> l) { 927 synchronized (l) { 928 Iterator<ByteBuffer> iter = l.iterator(); 929 while (iter.hasNext()) { 930 ByteBuffer b = iter.next(); 931 if (!b.hasRemaining() && b != SENTINEL) { 932 iter.remove(); 933 } 934 } 935 } 936 } 937 938 /** 939 * States for handshake. We avoid races when accessing/updating the AtomicInt 940 * because updates always schedule an additional call to both the read() 941 * and write() functions. 942 */ 943 private static final int NOT_HANDSHAKING = 0; 944 private static final int HANDSHAKING = 1; 945 946 // Bit flags 947 // a thread is currently executing tasks 948 private static final int DOING_TASKS = 4; 949 // a thread wants to execute tasks, while another thread is executing 950 private static final int REQUESTING_TASKS = 8; 951 private static final int TASK_BITS = 12; // Both bits 952 953 private static final int READER = 1; 954 private static final int WRITER = 2; 955 956 private static String states(AtomicInteger state) { 957 int s = state.get(); 958 StringBuilder sb = new StringBuilder(); 959 int x = s & ~TASK_BITS; 960 switch (x) { 961 case NOT_HANDSHAKING: 962 sb.append(" NOT_HANDSHAKING "); 963 break; 964 case HANDSHAKING: 965 sb.append(" HANDSHAKING "); 966 break; 967 default: 968 throw new InternalError(); 969 } 970 if ((s & DOING_TASKS) > 0) 971 sb.append("|DOING_TASKS"); 972 if ((s & REQUESTING_TASKS) > 0) 973 sb.append("|REQUESTING_TASKS"); 974 return sb.toString(); 975 } 976 977 private void resumeActivity() { 978 reader.schedule(); 979 writer.schedule(); 980 } 981 982 final AtomicInteger handshakeState; 983 final ConcurrentLinkedQueue<String> stateList = 984 debug.on() ? new ConcurrentLinkedQueue<>() : null; 985 986 // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS 987 // depending on previous value 988 private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> { 989 if ((current & DOING_TASKS) == 0) 990 return DOING_TASKS | (current & HANDSHAKING); 991 else 992 return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING); 993 }; 994 995 // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set 996 // clears bits if not. 997 private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> { 998 if ((current & REQUESTING_TASKS) != 0) 999 return DOING_TASKS | (current & HANDSHAKING); 1000 // clear both bits 1001 return (current & HANDSHAKING); 1002 }; 1003 1004 private boolean doHandshake(EngineResult r, int caller) { 1005 // unconditionally sets the HANDSHAKING bit, while preserving task bits 1006 handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS)); 1007 if (stateList != null && debug.on()) { 1008 stateList.add(r.handshakeStatus().toString()); 1009 stateList.add(Integer.toString(caller)); 1010 } 1011 switch (r.handshakeStatus()) { 1012 case NEED_TASK: 1013 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS); 1014 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks 1015 return false; 1016 } 1017 1018 if (debug.on()) debug.log("obtaining and initiating task execution"); 1019 List<Runnable> tasks = obtainTasks(); 1020 executeTasks(tasks); 1021 return false; // executeTasks will resume activity 1022 case NEED_WRAP: 1023 if (caller == READER) { 1024 writer.triggerWrite(); 1025 return false; 1026 } 1027 break; 1028 case NEED_UNWRAP: 1029 case NEED_UNWRAP_AGAIN: 1030 // do nothing else 1031 // receiving-side data will trigger unwrap 1032 if (caller == WRITER) { 1033 reader.schedule(); 1034 return false; 1035 } 1036 break; 1037 default: 1038 throw new InternalError("Unexpected handshake status:" 1039 + r.handshakeStatus()); 1040 } 1041 return true; 1042 } 1043 1044 private List<Runnable> obtainTasks() { 1045 List<Runnable> l = new ArrayList<>(); 1046 Runnable r; 1047 while ((r = engine.getDelegatedTask()) != null) { 1048 l.add(r); 1049 } 1050 return l; 1051 } 1052 1053 private void executeTasks(List<Runnable> tasks) { 1054 exec.execute(() -> { 1055 try { 1056 List<Runnable> nextTasks = tasks; 1057 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size())); 1058 do { 1059 nextTasks.forEach(Runnable::run); 1060 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { 1061 nextTasks = obtainTasks(); 1062 } else { 1063 int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS); 1064 if ((s & DOING_TASKS) != 0) { 1065 if (debug.on()) debug.log("re-running tasks (B)"); 1066 nextTasks = obtainTasks(); 1067 continue; 1068 } 1069 break; 1070 } 1071 } while (true); 1072 if (debug.on()) debug.log("finished task execution"); 1073 resumeActivity(); 1074 } catch (Throwable t) { 1075 handleError(t); 1076 } 1077 }); 1078 } 1079 1080 // FIXME: acknowledge a received CLOSE request from peer 1081 EngineResult doClosure(EngineResult r) throws IOException { 1082 if (debug.on()) 1083 debug.log("doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]", 1084 r.result, engine.getHandshakeStatus(), 1085 engine.isOutboundDone(), engine.isInboundDone()); 1086 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) { 1087 // we have received TLS close_notify and need to send 1088 // an acknowledgement back. We're calling doHandshake 1089 // to finish the close handshake. 1090 if (engine.isInboundDone() && !engine.isOutboundDone()) { 1091 if (debug.on()) debug.log("doClosure: close_notify received"); 1092 close_notify_received = true; 1093 if (!writer.scheduler.isStopped()) { 1094 doHandshake(r, READER); 1095 } else { 1096 // We have received closed notify, but we 1097 // won't be able to send the acknowledgement. 1098 // Nothing more will come from the socket either, 1099 // so mark the reader as completed. 1100 synchronized (reader.readBufferLock) { 1101 reader.completing = true; 1102 } 1103 } 1104 } 1105 } 1106 return r; 1107 } 1108 1109 /** 1110 * Returns the upstream Flow.Subscriber of the reading (incoming) side. 1111 * This flow must be given the encrypted data read from upstream (eg socket) 1112 * before it is decrypted. 1113 */ 1114 public Flow.Subscriber<List<ByteBuffer>> upstreamReader() { 1115 return reader; 1116 } 1117 1118 /** 1119 * Returns the upstream Flow.Subscriber of the writing (outgoing) side. 1120 * This flow contains the plaintext data before it is encrypted. 1121 */ 1122 public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() { 1123 return writer; 1124 } 1125 1126 public boolean resumeReader() { 1127 return reader.signalScheduling(); 1128 } 1129 1130 public void resetReaderDemand() { 1131 reader.resetDownstreamDemand(); 1132 } 1133 1134 static class EngineResult { 1135 final SSLEngineResult result; 1136 final ByteBuffer destBuffer; 1137 1138 // normal result 1139 EngineResult(SSLEngineResult result) { 1140 this(result, null); 1141 } 1142 1143 EngineResult(SSLEngineResult result, ByteBuffer destBuffer) { 1144 this.result = result; 1145 this.destBuffer = destBuffer; 1146 } 1147 1148 boolean handshaking() { 1149 HandshakeStatus s = result.getHandshakeStatus(); 1150 return s != HandshakeStatus.FINISHED 1151 && s != HandshakeStatus.NOT_HANDSHAKING 1152 && result.getStatus() != Status.CLOSED; 1153 } 1154 1155 boolean needUnwrap() { 1156 HandshakeStatus s = result.getHandshakeStatus(); 1157 return s == HandshakeStatus.NEED_UNWRAP; 1158 } 1159 1160 1161 int bytesConsumed() { 1162 return result.bytesConsumed(); 1163 } 1164 1165 int bytesProduced() { 1166 return result.bytesProduced(); 1167 } 1168 1169 SSLEngineResult.HandshakeStatus handshakeStatus() { 1170 return result.getHandshakeStatus(); 1171 } 1172 1173 SSLEngineResult.Status status() { 1174 return result.getStatus(); 1175 } 1176 } 1177 1178 // The maximum network buffer size negotiated during 1179 // the handshake. Usually 16K. 1180 volatile int packetBufferSize; 1181 final ByteBuffer getNetBuffer() { 1182 int netSize = packetBufferSize; 1183 if (netSize <= 0) { 1184 packetBufferSize = netSize = engine.getSession().getPacketBufferSize(); 1185 } 1186 return ByteBuffer.allocate(netSize); 1187 } 1188 1189 // The maximum application buffer size negotiated during 1190 // the handshake. Usually close to 16K. 1191 volatile int applicationBufferSize; 1192 // Despite of the maximum applicationBufferSize negotiated 1193 // above, TLS records usually have a much smaller payload. 1194 // The adaptativeAppBufferSize records the max payload 1195 // ever decoded, and we use that as a guess for how big 1196 // a buffer we will need for the next payload. 1197 // This avoids allocating and zeroing a 16K buffer for 1198 // nothing... 1199 volatile int adaptiveAppBufferSize; 1200 final ByteBuffer getAppBuffer() { 1201 int appSize = applicationBufferSize; 1202 if (appSize <= 0) { 1203 applicationBufferSize = appSize 1204 = engine.getSession().getApplicationBufferSize(); 1205 } 1206 int size = adaptiveAppBufferSize; 1207 if (size <= 0) { 1208 size = 512; // start with 512 this is usually enough for handshaking / headers 1209 } else if (size > appSize) { 1210 size = appSize; 1211 } 1212 // will cause a BUFFER_OVERFLOW if not big enough, but 1213 // that's OK. 1214 return ByteBuffer.allocate(size); 1215 } 1216 1217 final String dbgString() { 1218 return "SSLFlowDelegate(" + tubeName + ")"; 1219 } 1220 }