1 /*
   2  * Copyright (c) 2002, 2019, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.net.*;
  25 import java.io.*;
  26 import java.nio.*;
  27 import java.nio.channels.*;
  28 import sun.net.www.MessageHeader;
  29 import java.util.*;
  30 import javax.net.ssl.*;
  31 import javax.net.ssl.SSLEngineResult.*;
  32 import java.security.*;
  33 
  34 /**
  35  * This class implements a simple HTTPS server. It uses multiple threads to
  36  * handle connections in parallel, and will spin off a new thread to handle
  37  * each request. (this is easier to implement with SSLEngine)
  38  * <p>
  39  * It must be instantiated with a {@link HttpCallback} object to which
  40  * requests are given and must be handled.
  41  * <p>
  42  * Simple synchronization between the client(s) and server can be done
  43  * using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and
  44  * {@link #rendezvous(String,int)} methods.
  45  *
  46  * NOTE NOTE NOTE NOTE NOTE NOTE NOTE
  47  *
  48  * If you make a change in here, please don't forget to make the
  49  * corresponding change in the J2SE equivalent.
  50  *
  51  * NOTE NOTE NOTE NOTE NOTE NOTE NOTE
  52  */
  53 
  54 public class TestHttpsServer {
  55 
  56     ServerSocketChannel schan;
  57     int threads;
  58     int cperthread;
  59     HttpCallback cb;
  60     Server[] servers;
  61 
  62     // ssl related fields
  63     static SSLContext sslCtx;
  64 
  65     /**
  66      * Create a <code>TestHttpsServer<code> instance with the specified callback object
  67      * for handling requests. One thread is created to handle requests,
  68      * and up to ten TCP connections will be handled simultaneously.
  69      * @param cb the callback object which is invoked to handle each
  70      *  incoming request
  71      */
  72 
  73     public TestHttpsServer(HttpCallback cb) throws IOException {
  74         this(cb, 1, 10, 0);
  75     }
  76 
  77     /**
  78      * Create a <code>TestHttpsServer<code> instance with the specified number of
  79      * threads and maximum number of connections per thread. This functions
  80      * the same as the 4 arg constructor, where the port argument is set to zero.
  81      * @param cb the callback object which is invoked to handle each
  82      *     incoming request
  83      * @param threads the number of threads to create to handle requests
  84      *     in parallel
  85      * @param cperthread the number of simultaneous TCP connections to
  86      *     handle per thread
  87      */
  88 
  89     public TestHttpsServer(HttpCallback cb, int threads, int cperthread)
  90         throws IOException {
  91         this(cb, threads, cperthread, 0);
  92     }
  93 
  94     /**
  95      * Create a <code>TestHttpsServer<code> instance with the specified number
  96      * of threads and maximum number of connections per thread and running on
  97      * the specified port. The specified number of threads are created to
  98      * handle incoming requests, and each thread is allowed
  99      * to handle a number of simultaneous TCP connections.
 100      * @param cb the callback object which is invoked to handle
 101      *  each incoming request
 102      * @param threads the number of threads to create to handle
 103      *  requests in parallel
 104      * @param cperthread the number of simultaneous TCP connections
 105      *  to handle per thread
 106      * @param port the port number to bind the server to. <code>Zero</code>
 107      *  means choose any free port.
 108      */
 109     public TestHttpsServer(HttpCallback cb, int threads, int cperthread, int port)
 110         throws IOException {
 111         this(cb, threads, cperthread, null, port);
 112     }
 113 
 114     /**
 115      * Create a <code>TestHttpsServer<code> instance with the specified number
 116      * of threads and maximum number of connections per thread and running on
 117      * the specified port. The specified number of threads are created to
 118      * handle incoming requests, and each thread is allowed
 119      * to handle a number of simultaneous TCP connections.
 120      * @param cb the callback object which is invoked to handle
 121      *  each incoming request
 122      * @param threads the number of threads to create to handle
 123      *  requests in parallel
 124      * @param cperthread the number of simultaneous TCP connections
 125      *  to handle per thread
 126      * @param address the InetAddress to bind to. {@code Null} means the
 127      *  wildcard address.
 128      * @param port the port number to bind the server to. {@code Zero}
 129      *  means choose any free port.
 130      */
 131 
 132     public TestHttpsServer(HttpCallback cb, int threads, int cperthread, InetAddress address, int port)
 133         throws IOException {
 134         schan = ServerSocketChannel.open();
 135         InetSocketAddress addr = new InetSocketAddress(address, port);
 136         schan.socket().bind(addr);
 137         this.threads = threads;
 138         this.cb = cb;
 139         this.cperthread = cperthread;
 140 
 141         try {
 142             // create and initialize a SSLContext
 143             KeyStore ks = KeyStore.getInstance("JKS");
 144             KeyStore ts = KeyStore.getInstance("JKS");
 145             char[] passphrase = "passphrase".toCharArray();
 146 
 147             ks.load(new FileInputStream(System.getProperty("javax.net.ssl.keyStore")), passphrase);
 148             ts.load(new FileInputStream(System.getProperty("javax.net.ssl.trustStore")), passphrase);
 149 
 150             KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
 151             kmf.init(ks, passphrase);
 152 
 153             TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
 154             tmf.init(ts);
 155 
 156             sslCtx = SSLContext.getInstance("TLS");
 157 
 158             sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
 159 
 160             servers = new Server[threads];
 161             for (int i=0; i<threads; i++) {
 162                 servers[i] = new Server(cb, schan, cperthread);
 163                 servers[i].start();
 164             }
 165         } catch (Exception ex) {
 166             throw new RuntimeException("test failed. cause: "+ex.getMessage());
 167         }
 168     }
 169 
 170     /** Tell all threads in the server to exit within 5 seconds.
 171      *  This is an abortive termination. Just prior to the thread exiting
 172      *  all channels in that thread waiting to be closed are forceably closed.
 173      */
 174 
 175     public void terminate() {
 176         for (int i=0; i<threads; i++) {
 177             servers[i].terminate ();
 178         }
 179     }
 180 
 181     /**
 182      * return the local port number to which the server is bound.
 183      * @return the local port number
 184      */
 185 
 186     public int getLocalPort () {
 187         return schan.socket().getLocalPort ();
 188     }
 189 
 190     public String getAuthority() {
 191         InetAddress address = schan.socket().getInetAddress();
 192         String hostaddr = address.getHostAddress();
 193         if (address.isAnyLocalAddress()) hostaddr = "localhost";
 194         if (hostaddr.indexOf(':') > -1) hostaddr = "[" + hostaddr + "]";
 195         return hostaddr + ":" + getLocalPort();
 196     }
 197 
 198     static class Server extends Thread {
 199 
 200         ServerSocketChannel schan;
 201         Selector selector;
 202         SelectionKey listenerKey;
 203         SelectionKey key; /* the current key being processed */
 204         HttpCallback cb;
 205         ByteBuffer consumeBuffer;
 206         int maxconn;
 207         int nconn;
 208         ClosedChannelList clist;
 209         boolean shutdown;
 210 
 211         Server(HttpCallback cb, ServerSocketChannel schan, int maxconn) {
 212             this.schan = schan;
 213             this.maxconn = maxconn;
 214             this.cb = cb;
 215             nconn = 0;
 216             consumeBuffer = ByteBuffer.allocate(512);
 217             clist = new ClosedChannelList();
 218             try {
 219                 selector = Selector.open();
 220                 schan.configureBlocking(false);
 221                 listenerKey = schan.register(selector, SelectionKey.OP_ACCEPT);
 222             } catch (IOException e) {
 223                 System.err.println("Server could not start: " + e);
 224             }
 225         }
 226 
 227         /* Stop the thread as soon as possible */
 228         public synchronized void terminate() {
 229             shutdown = true;
 230         }
 231 
 232         public void run()  {
 233             try {
 234                 while (true) {
 235                     selector.select(1000);
 236                     Set selected = selector.selectedKeys();
 237                     Iterator iter = selected.iterator();
 238                     while (iter.hasNext()) {
 239                         key = (SelectionKey)iter.next();
 240                         if (key.equals (listenerKey)) {
 241                             SocketChannel sock = schan.accept();
 242                             if (sock == null) {
 243                                 /* false notification */
 244                                 iter.remove();
 245                                 continue;
 246                             }
 247                             sock.configureBlocking(true);
 248                             SSLEngine sslEng = sslCtx.createSSLEngine();
 249                             sslEng.setUseClientMode(false);
 250                             new ServerWorker(cb, sock, sslEng).start();
 251                             nconn ++;
 252                             if (nconn == maxconn) {
 253                                 /* deregister */
 254                                 listenerKey.cancel();
 255                                 listenerKey = null;
 256                             }
 257                         } else {
 258                             if (key.isReadable()) {
 259                                 boolean closed = false;
 260                                 SocketChannel chan = (SocketChannel)key.channel();
 261                                 if (key.attachment() != null) {
 262                                     closed = consume(chan);
 263                                 }
 264 
 265                                 if (closed) {
 266                                     chan.close();
 267                                     key.cancel();
 268                                     if (nconn == maxconn) {
 269                                         listenerKey = schan.register(selector, SelectionKey.OP_ACCEPT);
 270                                     }
 271                                     nconn --;
 272                                 }
 273                             }
 274                         }
 275                         iter.remove();
 276                     }
 277                     clist.check();
 278 
 279                     synchronized (this) {
 280                         if (shutdown) {
 281                             clist.terminate();
 282                             return;
 283                         }
 284                     }
 285                 }
 286             } catch (IOException e) {
 287                 System.out.println("Server exception: " + e);
 288                 // TODO finish
 289             }
 290         }
 291 
 292         /* read all the data off the channel without looking at it
 293          * return true if connection closed
 294          */
 295         boolean consume(SocketChannel chan) {
 296             try {
 297                 consumeBuffer.clear();
 298                 int c = chan.read(consumeBuffer);
 299                 if (c == -1)
 300                     return true;
 301             } catch (IOException e) {
 302                 return true;
 303             }
 304             return false;
 305         }
 306     }
 307 
 308     static class ServerWorker extends Thread {
 309         private ByteBuffer inNetBB;
 310         private ByteBuffer outNetBB;
 311         private ByteBuffer inAppBB;
 312         private ByteBuffer outAppBB;
 313 
 314         SSLEngine sslEng;
 315         SocketChannel schan;
 316         HttpCallback cb;
 317         HandshakeStatus currentHSStatus;
 318         boolean initialHSComplete;
 319         /*
 320          * All inbound data goes through this buffer.
 321          *
 322          * It might be nice to use a cache of ByteBuffers so we're
 323          * not alloc/dealloc'ing all over the place.
 324          */
 325 
 326         /*
 327          * Application buffers, also used for handshaking
 328          */
 329         private int appBBSize;
 330 
 331         ServerWorker(HttpCallback cb, SocketChannel schan, SSLEngine sslEng) {
 332             this.sslEng = sslEng;
 333             this.schan = schan;
 334             this.cb = cb;
 335             currentHSStatus = HandshakeStatus.NEED_UNWRAP;
 336             initialHSComplete = false;
 337             int netBBSize = sslEng.getSession().getPacketBufferSize();
 338             inNetBB =  ByteBuffer.allocate(netBBSize);
 339             outNetBB = ByteBuffer.allocate(netBBSize);
 340             appBBSize = sslEng.getSession().getApplicationBufferSize();
 341             inAppBB = ByteBuffer.allocate(appBBSize);
 342             outAppBB = ByteBuffer.allocate(appBBSize);
 343         }
 344 
 345         public SSLEngine getSSLEngine() {
 346             return sslEng;
 347         }
 348 
 349         public ByteBuffer outNetBB() {
 350             return outNetBB;
 351         }
 352 
 353         public ByteBuffer outAppBB() {
 354             return outAppBB;
 355         }
 356 
 357         public void run () {
 358             try {
 359                 SSLEngineResult result;
 360 
 361                 while (!initialHSComplete) {
 362 
 363                     switch (currentHSStatus) {
 364 
 365                     case NEED_UNWRAP:
 366                         int bytes = schan.read(inNetBB);
 367 
 368 needIO:
 369                         while (currentHSStatus == HandshakeStatus.NEED_UNWRAP) {
 370                             /*
 371                              * Don't need to resize requestBB, since no app data should
 372                              * be generated here.
 373                              */
 374                             inNetBB.flip();
 375                             result = sslEng.unwrap(inNetBB, inAppBB);
 376                             inNetBB.compact();
 377                             currentHSStatus = result.getHandshakeStatus();
 378 
 379                             switch (result.getStatus()) {
 380 
 381                             case OK:
 382                                 switch (currentHSStatus) {
 383                                 case NOT_HANDSHAKING:
 384                                     throw new IOException(
 385                                                           "Not handshaking during initial handshake");
 386 
 387                                 case NEED_TASK:
 388                                     Runnable task;
 389                                     while ((task = sslEng.getDelegatedTask()) != null) {
 390                                         task.run();
 391                                         currentHSStatus = sslEng.getHandshakeStatus();
 392                                     }
 393                                     break;
 394                                 }
 395 
 396                                 break;
 397 
 398                             case BUFFER_UNDERFLOW:
 399                                 break needIO;
 400 
 401                             default: // BUFFER_OVERFLOW/CLOSED:
 402                                 throw new IOException("Received" + result.getStatus() +
 403                                                       "during initial handshaking");
 404                             }
 405                         }
 406 
 407                         /*
 408                          * Just transitioned from read to write.
 409                          */
 410                         if (currentHSStatus != HandshakeStatus.NEED_WRAP) {
 411                             break;
 412                         }
 413 
 414                         // Fall through and fill the write buffer.
 415 
 416                     case NEED_WRAP:
 417                         /*
 418                          * The flush above guarantees the out buffer to be empty
 419                          */
 420                         outNetBB.clear();
 421                         result = sslEng.wrap(inAppBB, outNetBB);
 422                         outNetBB.flip();
 423                         schan.write (outNetBB);
 424                         outNetBB.compact();
 425                         currentHSStatus = result.getHandshakeStatus();
 426 
 427                         switch (result.getStatus()) {
 428                         case OK:
 429 
 430                             if (currentHSStatus == HandshakeStatus.NEED_TASK) {
 431                                 Runnable task;
 432                                 while ((task = sslEng.getDelegatedTask()) != null) {
 433                                     task.run();
 434                                     currentHSStatus = sslEng.getHandshakeStatus();
 435                                 }
 436                             }
 437 
 438                             break;
 439 
 440                         default: // BUFFER_OVERFLOW/BUFFER_UNDERFLOW/CLOSED:
 441                             throw new IOException("Received" + result.getStatus() +
 442                                                   "during initial handshaking");
 443                         }
 444                         break;
 445 
 446                     case FINISHED:
 447                         initialHSComplete = true;
 448                         break;
 449                     default: // NOT_HANDSHAKING/NEED_TASK
 450                         throw new RuntimeException("Invalid Handshaking State" +
 451                                                    currentHSStatus);
 452                     } // switch
 453                 }
 454                 // read the application data; using non-blocking mode
 455                 schan.configureBlocking(false);
 456                 read(schan, sslEng);
 457             } catch (Exception ex) {
 458                 throw new RuntimeException(ex);
 459             }
 460         }
 461 
 462         /* return true if the connection is closed, false otherwise */
 463 
 464         private boolean read(SocketChannel chan, SSLEngine sslEng) {
 465             HttpTransaction msg;
 466             boolean res;
 467             try {
 468                 InputStream is = new BufferedInputStream(new NioInputStream(chan, sslEng, inNetBB, inAppBB));
 469                 String requestline = readLine(is);
 470                 MessageHeader mhead = new MessageHeader(is);
 471                 String clen = mhead.findValue("Content-Length");
 472                 String trferenc = mhead.findValue("Transfer-Encoding");
 473                 String data = null;
 474                 if (trferenc != null && trferenc.equals("chunked"))
 475                     data = new String(readChunkedData(is));
 476                 else if (clen != null)
 477                     data = new String(readNormalData(is, Integer.parseInt(clen)));
 478                 String[] req = requestline.split(" ");
 479                 if (req.length < 2) {
 480                     /* invalid request line */
 481                     return false;
 482                 }
 483                 String cmd = req[0];
 484                 URI uri = null;
 485                 try {
 486                     uri = new URI(req[1]);
 487                     msg = new HttpTransaction(this, cmd, uri, mhead, data, null, chan);
 488                     cb.request(msg);
 489                 } catch (URISyntaxException e) {
 490                     System.err.println ("Invalid URI: " + e);
 491                     msg = new HttpTransaction(this, cmd, null, null, null, null, chan);
 492                     msg.sendResponse(501, "Whatever");
 493                 }
 494                 res = false;
 495             } catch (IOException e) {
 496                 res = true;
 497             }
 498             return res;
 499         }
 500 
 501         byte[] readNormalData(InputStream is, int len) throws IOException {
 502             byte[] buf  = new byte[len];
 503             int c, off=0, remain=len;
 504             while (remain > 0 && ((c=is.read (buf, off, remain))>0)) {
 505                 remain -= c;
 506                 off += c;
 507             }
 508             return buf;
 509         }
 510 
 511         private void readCRLF(InputStream is) throws IOException {
 512             int cr = is.read();
 513             int lf = is.read();
 514 
 515             if (((cr & 0xff) != 0x0d) ||
 516                 ((lf & 0xff) != 0x0a)) {
 517                 throw new IOException(
 518                     "Expected <CR><LF>:  got '" + cr + "/" + lf + "'");
 519             }
 520         }
 521 
 522         byte[] readChunkedData(InputStream is) throws IOException {
 523             LinkedList l = new LinkedList();
 524             int total = 0;
 525             for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) {
 526                 l.add(readNormalData(is, len));
 527                 total += len;
 528                 readCRLF(is); // CRLF at end of chunk
 529             }
 530             readCRLF(is); // CRLF at end of Chunked Stream.
 531             byte[] buf = new byte[total];
 532             Iterator i = l.iterator();
 533             int x = 0;
 534             while (i.hasNext()) {
 535                 byte[] b = (byte[])i.next();
 536                 System.arraycopy(b, 0, buf, x, b.length);
 537                 x += b.length;
 538             }
 539             return buf;
 540         }
 541 
 542         private int readChunkLen(InputStream is) throws IOException {
 543             int c, len=0;
 544             boolean done=false, readCR=false;
 545             while (!done) {
 546                 c = is.read();
 547                 if (c == '\n' && readCR) {
 548                     done = true;
 549                 } else {
 550                     if (c == '\r' && !readCR) {
 551                         readCR = true;
 552                     } else {
 553                         int x=0;
 554                         if (c >= 'a' && c <= 'f') {
 555                             x = c - 'a' + 10;
 556                         } else if (c >= 'A' && c <= 'F') {
 557                             x = c - 'A' + 10;
 558                         } else if (c >= '0' && c <= '9') {
 559                             x = c - '0';
 560                         }
 561                         len = len * 16 + x;
 562                     }
 563                 }
 564             }
 565             return len;
 566         }
 567 
 568         private String readLine(InputStream is) throws IOException {
 569             boolean done=false, readCR=false;
 570             byte[] b = new byte[512];
 571             int c, l = 0;
 572 
 573             while (!done) {
 574                 c = is.read();
 575                 if (c == '\n' && readCR) {
 576                     done = true;
 577                 } else {
 578                     if (c == '\r' && !readCR) {
 579                         readCR = true;
 580                     } else {
 581                         b[l++] = (byte)c;
 582                     }
 583                 }
 584             }
 585             return new String(b);
 586         }
 587 
 588         /** close the channel associated with the current key by:
 589          * 1. shutdownOutput (send a FIN)
 590          * 2. mark the key so that incoming data is to be consumed and discarded
 591          * 3. After a period, close the socket
 592          */
 593 
 594         synchronized void orderlyCloseChannel(SocketChannel ch) throws IOException {
 595             ch.socket().shutdownOutput();
 596         }
 597 
 598         synchronized void abortiveCloseChannel(SocketChannel ch) throws IOException {
 599             Socket s = ch.socket();
 600             s.setSoLinger(true, 0);
 601             ch.close();
 602         }
 603     }
 604 
 605 
 606     /**
 607      * Implements blocking reading semantics on top of a non-blocking channel
 608      */
 609 
 610     static class NioInputStream extends InputStream {
 611         SSLEngine sslEng;
 612         SocketChannel channel;
 613         Selector selector;
 614         ByteBuffer inNetBB;
 615         ByteBuffer inAppBB;
 616         SelectionKey key;
 617         int available;
 618         byte[] one;
 619         boolean closed;
 620         ByteBuffer markBuf; /* reads may be satisifed from this buffer */
 621         boolean marked;
 622         boolean reset;
 623         int readlimit;
 624 
 625         public NioInputStream(SocketChannel chan, SSLEngine sslEng, ByteBuffer inNetBB, ByteBuffer inAppBB) throws IOException {
 626             this.sslEng = sslEng;
 627             this.channel = chan;
 628             selector = Selector.open();
 629             this.inNetBB = inNetBB;
 630             this.inAppBB = inAppBB;
 631             key = chan.register(selector, SelectionKey.OP_READ);
 632             available = 0;
 633             one = new byte[1];
 634             closed = marked = reset = false;
 635         }
 636 
 637         public synchronized int read(byte[] b) throws IOException {
 638             return read(b, 0, b.length);
 639         }
 640 
 641         public synchronized int read() throws IOException {
 642             return read(one, 0, 1);
 643         }
 644 
 645         public synchronized int read(byte[] b, int off, int srclen) throws IOException {
 646 
 647             int canreturn, willreturn;
 648 
 649             if (closed)
 650                 return -1;
 651 
 652             if (reset) { /* satisfy from markBuf */
 653                 canreturn = markBuf.remaining();
 654                 willreturn = canreturn > srclen ? srclen : canreturn;
 655                 markBuf.get(b, off, willreturn);
 656                 if (canreturn == willreturn) {
 657                     reset = false;
 658                 }
 659             } else { /* satisfy from channel */
 660                 canreturn = available();
 661                 if (canreturn == 0) {
 662                     block();
 663                     canreturn = available();
 664                 }
 665                 willreturn = canreturn > srclen ? srclen : canreturn;
 666                 inAppBB.get(b, off, willreturn);
 667                 available -= willreturn;
 668 
 669                 if (marked) { /* copy into markBuf */
 670                     try {
 671                         markBuf.put(b, off, willreturn);
 672                     } catch (BufferOverflowException e) {
 673                         marked = false;
 674                     }
 675                 }
 676             }
 677             return willreturn;
 678         }
 679 
 680         public synchronized int available() throws IOException {
 681             if (closed)
 682                 throw new IOException("Stream is closed");
 683 
 684             if (reset)
 685                 return markBuf.remaining();
 686 
 687             if (available > 0)
 688                 return available;
 689 
 690             inAppBB.clear();
 691             int bytes = channel.read(inNetBB);
 692 
 693             int needed = sslEng.getSession().getApplicationBufferSize();
 694             if (needed > inAppBB.remaining()) {
 695                 inAppBB = ByteBuffer.allocate(needed);
 696             }
 697             inNetBB.flip();
 698             SSLEngineResult result = sslEng.unwrap(inNetBB, inAppBB);
 699             inNetBB.compact();
 700             available = result.bytesProduced();
 701 
 702             if (available > 0)
 703                 inAppBB.flip();
 704             else if (available == -1)
 705                 throw new IOException("Stream is closed");
 706             return available;
 707         }
 708 
 709         /**
 710          * block() only called when available==0 and buf is empty
 711          */
 712         private synchronized void block() throws IOException {
 713             //assert available == 0;
 714             int n = selector.select();
 715             //assert n == 1;
 716             selector.selectedKeys().clear();
 717             available();
 718         }
 719 
 720         public void close() throws IOException {
 721             if (closed)
 722                 return;
 723             channel.close();
 724             closed = true;
 725         }
 726 
 727         public synchronized void mark(int readlimit) {
 728             if (closed)
 729                 return;
 730             this.readlimit = readlimit;
 731             markBuf = ByteBuffer.allocate(readlimit);
 732             marked = true;
 733             reset = false;
 734         }
 735 
 736         public synchronized void reset() throws IOException {
 737             if (closed )
 738                 return;
 739             if (!marked)
 740                 throw new IOException("Stream not marked");
 741             marked = false;
 742             reset = true;
 743             markBuf.flip();
 744         }
 745     }
 746 
 747     static class NioOutputStream extends OutputStream {
 748         SSLEngine sslEng;
 749         SocketChannel channel;
 750         ByteBuffer outNetBB;
 751         ByteBuffer outAppBB;
 752         SelectionKey key;
 753         Selector selector;
 754         boolean closed;
 755         byte[] one;
 756 
 757         public NioOutputStream(SocketChannel channel, SSLEngine sslEng, ByteBuffer outNetBB, ByteBuffer outAppBB) throws IOException {
 758             this.sslEng = sslEng;
 759             this.channel = channel;
 760             this.outNetBB = outNetBB;
 761             this.outAppBB = outAppBB;
 762             selector = Selector.open();
 763             key = channel.register(selector, SelectionKey.OP_WRITE);
 764             closed = false;
 765             one = new byte[1];
 766         }
 767 
 768         public synchronized void write(int b) throws IOException {
 769             one[0] = (byte)b;
 770             write(one, 0, 1);
 771         }
 772 
 773         public synchronized void write(byte[] b) throws IOException {
 774             write(b, 0, b.length);
 775         }
 776 
 777         public synchronized void write(byte[] b, int off, int len) throws IOException {
 778             if (closed)
 779                 throw new IOException("stream is closed");
 780 
 781             outAppBB = ByteBuffer.allocate(len);
 782             outAppBB.put(b, off, len);
 783             outAppBB.flip();
 784             int n;
 785             outNetBB.clear();
 786             int needed = sslEng.getSession().getPacketBufferSize();
 787             if (outNetBB.capacity() < needed) {
 788                 outNetBB = ByteBuffer.allocate(needed);
 789             }
 790             SSLEngineResult ret = sslEng.wrap(outAppBB, outNetBB);
 791             outNetBB.flip();
 792             int newLen = ret.bytesProduced();
 793             while ((n = channel.write (outNetBB)) < newLen) {
 794                 newLen -= n;
 795                 if (newLen == 0)
 796                     return;
 797                 selector.select();
 798                 selector.selectedKeys().clear();
 799             }
 800         }
 801 
 802         public void close() throws IOException {
 803             if (closed)
 804                 return;
 805             channel.close();
 806             closed = true;
 807         }
 808     }
 809 
 810     /**
 811      * Utilities for synchronization. A condition is
 812      * identified by a string name, and is initialized
 813      * upon first use (ie. setCondition() or waitForCondition()). Threads
 814      * are blocked until some thread calls (or has called) setCondition() for the same
 815      * condition.
 816      * <P>
 817      * A rendezvous built on a condition is also provided for synchronizing
 818      * N threads.
 819      */
 820 
 821     private static HashMap conditions = new HashMap();
 822 
 823     /*
 824      * Modifiable boolean object
 825      */
 826     private static class BValue {
 827         boolean v;
 828     }
 829 
 830     /*
 831      * Modifiable int object
 832      */
 833     private static class IValue {
 834         int v;
 835         IValue(int i) {
 836             v =i;
 837         }
 838     }
 839 
 840 
 841     private static BValue getCond(String condition) {
 842         synchronized (conditions) {
 843             BValue cond = (BValue) conditions.get(condition);
 844             if (cond == null) {
 845                 cond = new BValue();
 846                 conditions.put(condition, cond);
 847             }
 848             return cond;
 849         }
 850     }
 851 
 852     /**
 853      * Set the condition to true. Any threads that are currently blocked
 854      * waiting on the condition, will be unblocked and allowed to continue.
 855      * Threads that subsequently call waitForCondition() will not block.
 856      * If the named condition did not exist prior to the call, then it is created
 857      * first.
 858      */
 859 
 860     public static void setCondition(String condition) {
 861         BValue cond = getCond(condition);
 862         synchronized (cond) {
 863             if (cond.v) {
 864                 return;
 865             }
 866             cond.v = true;
 867             cond.notifyAll();
 868         }
 869     }
 870 
 871     /**
 872      * If the named condition does not exist, then it is created and initialized
 873      * to false. If the condition exists or has just been created and its value
 874      * is false, then the thread blocks until another thread sets the condition.
 875      * If the condition exists and is already set to true, then this call returns
 876      * immediately without blocking.
 877      */
 878 
 879     public static void waitForCondition(String condition) {
 880         BValue cond = getCond(condition);
 881         synchronized (cond) {
 882             if (!cond.v) {
 883                 try {
 884                     cond.wait();
 885                 } catch (InterruptedException e) {}
 886             }
 887         }
 888     }
 889 
 890     /* conditions must be locked when accessing this */
 891     static HashMap rv = new HashMap();
 892 
 893     /**
 894      * Force N threads to rendezvous (ie. wait for each other) before proceeding.
 895      * The first thread(s) to call are blocked until the last
 896      * thread makes the call. Then all threads continue.
 897      * <p>
 898      * All threads that call with the same condition name, must use the same value
 899      * for N (or the results may be not be as expected).
 900      * <P>
 901      * Obviously, if fewer than N threads make the rendezvous then the result
 902      * will be a hang.
 903      */
 904 
 905     public static void rendezvous(String condition, int N) {
 906         BValue cond;
 907         IValue iv;
 908         String name = "RV_"+condition;
 909 
 910         /* get the condition */
 911 
 912         synchronized (conditions) {
 913             cond = (BValue)conditions.get(name);
 914             if (cond == null) {
 915                 /* we are first caller */
 916                 if (N < 2) {
 917                     throw new RuntimeException("rendezvous must be called with N >= 2");
 918                 }
 919                 cond = new BValue();
 920                 conditions.put(name, cond);
 921                 iv = new IValue(N-1);
 922                 rv.put(name, iv);
 923             } else {
 924                 /* already initialised, just decrement the counter */
 925                 iv = (IValue) rv.get(name);
 926                 iv.v--;
 927             }
 928         }
 929 
 930         if (iv.v > 0) {
 931             waitForCondition(name);
 932         } else {
 933             setCondition(name);
 934             synchronized (conditions) {
 935                 clearCondition(name);
 936                 rv.remove(name);
 937             }
 938         }
 939     }
 940 
 941     /**
 942      * If the named condition exists and is set then remove it, so it can
 943      * be re-initialized and used again. If the condition does not exist, or
 944      * exists but is not set, then the call returns without doing anything.
 945      * Note, some higher level synchronization
 946      * may be needed between clear and the other operations.
 947      */
 948 
 949     public static void clearCondition(String condition) {
 950         BValue cond;
 951         synchronized (conditions) {
 952             cond = (BValue) conditions.get(condition);
 953             if (cond == null) {
 954                 return;
 955             }
 956             synchronized (cond) {
 957                 if (cond.v) {
 958                     conditions.remove(condition);
 959                 }
 960             }
 961         }
 962     }
 963 }