1 /*
   2  * Copyright (c) 2002, 2004, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.net.*;
  25 import java.io.*;
  26 import java.nio.*;
  27 import java.nio.channels.*;
  28 import sun.net.www.MessageHeader;
  29 import java.util.*;
  30 import javax.net.ssl.*;
  31 import javax.net.ssl.SSLEngineResult.*;
  32 import java.security.*;
  33 
  34 /**
  35  * This class implements a simple HTTPS server. It uses multiple threads to
  36  * handle connections in parallel, and will spin off a new thread to handle
  37  * each request. (this is easier to implement with SSLEngine)
  38  * <p>
  39  * It must be instantiated with a {@link HttpCallback} object to which
  40  * requests are given and must be handled.
  41  * <p>
  42  * Simple synchronization between the client(s) and server can be done
  43  * using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and
  44  * {@link #rendezvous(String,int)} methods.
  45  *
  46  * NOTE NOTE NOTE NOTE NOTE NOTE NOTE
  47  *
  48  * If you make a change in here, please don't forget to make the
  49  * corresponding change in the J2SE equivalent.
  50  *
  51  * NOTE NOTE NOTE NOTE NOTE NOTE NOTE
  52  */
  53 
  54 public class TestHttpsServer {
  55 
  56     ServerSocketChannel schan;
  57     int threads;
  58     int cperthread;
  59     HttpCallback cb;
  60     Server[] servers;
  61 
  62     // ssl related fields
  63     static SSLContext sslCtx;
  64 
  65     /**
  66      * Create a <code>TestHttpsServer<code> instance with the specified callback object
  67      * for handling requests. One thread is created to handle requests,
  68      * and up to ten TCP connections will be handled simultaneously.
  69      * @param cb the callback object which is invoked to handle each
  70      *  incoming request
  71      */
  72 
  73     public TestHttpsServer (HttpCallback cb) throws IOException {
  74         this (cb, 1, 10, 0);
  75     }
  76 
  77     /**
  78      * Create a <code>TestHttpsServer<code> instance with the specified number of
  79      * threads and maximum number of connections per thread. This functions
  80      * the same as the 4 arg constructor, where the port argument is set to zero.
  81      * @param cb the callback object which is invoked to handle each
  82      *     incoming request
  83      * @param threads the number of threads to create to handle requests
  84      *     in parallel
  85      * @param cperthread the number of simultaneous TCP connections to
  86      *     handle per thread
  87      */
  88 
  89     public TestHttpsServer (HttpCallback cb, int threads, int cperthread)
  90         throws IOException {
  91         this (cb, threads, cperthread, 0);
  92     }
  93 
  94     /**
  95      * Create a <code>TestHttpsServer<code> instance with the specified number
  96      * of threads and maximum number of connections per thread and running on
  97      * the specified port. The specified number of threads are created to
  98      * handle incoming requests, and each thread is allowed
  99      * to handle a number of simultaneous TCP connections.
 100      * @param cb the callback object which is invoked to handle
 101      *  each incoming request
 102      * @param threads the number of threads to create to handle
 103      *  requests in parallel
 104      * @param cperthread the number of simultaneous TCP connections
 105      *  to handle per thread
 106      * @param port the port number to bind the server to. <code>Zero</code>
 107      *  means choose any free port.
 108      */
 109 
 110     public TestHttpsServer (HttpCallback cb, int threads, int cperthread, int port)
 111         throws IOException {
 112         schan = ServerSocketChannel.open ();
 113         InetSocketAddress addr = new InetSocketAddress (port);
 114         schan.socket().bind (addr);
 115         this.threads = threads;
 116         this.cb = cb;
 117         this.cperthread = cperthread;
 118 
 119         try {
 120             // create and initialize a SSLContext
 121             KeyStore ks = KeyStore.getInstance("JKS");
 122             KeyStore ts = KeyStore.getInstance("JKS");
 123             char[] passphrase = "passphrase".toCharArray();
 124 
 125             ks.load(new FileInputStream(System.getProperty("javax.net.ssl.keyStore")), passphrase);
 126             ts.load(new FileInputStream(System.getProperty("javax.net.ssl.trustStore")), passphrase);
 127 
 128             KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
 129             kmf.init(ks, passphrase);
 130 
 131             TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
 132             tmf.init(ts);
 133 
 134             sslCtx = SSLContext.getInstance("TLS");
 135 
 136             sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
 137 
 138             servers = new Server [threads];
 139             for (int i=0; i<threads; i++) {
 140                 servers[i] = new Server (cb, schan, cperthread);
 141                 servers[i].start();
 142             }
 143         } catch (Exception ex) {
 144             throw new RuntimeException("test failed. cause: "+ex.getMessage());
 145         }
 146     }
 147 
 148     /** Tell all threads in the server to exit within 5 seconds.
 149      *  This is an abortive termination. Just prior to the thread exiting
 150      *  all channels in that thread waiting to be closed are forceably closed.
 151      */
 152 
 153     public void terminate () {
 154         for (int i=0; i<threads; i++) {
 155             servers[i].terminate ();
 156         }
 157     }
 158 
 159     /**
 160      * return the local port number to which the server is bound.
 161      * @return the local port number
 162      */
 163 
 164     public int getLocalPort () {
 165         return schan.socket().getLocalPort ();
 166     }
 167 
 168     static class Server extends Thread {
 169 
 170         ServerSocketChannel schan;
 171         Selector selector;
 172         SelectionKey listenerKey;
 173         SelectionKey key; /* the current key being processed */
 174         HttpCallback cb;
 175         ByteBuffer consumeBuffer;
 176         int maxconn;
 177         int nconn;
 178         ClosedChannelList clist;
 179         boolean shutdown;
 180 
 181         Server (HttpCallback cb, ServerSocketChannel schan, int maxconn) {
 182             this.schan = schan;
 183             this.maxconn = maxconn;
 184             this.cb = cb;
 185             nconn = 0;
 186             consumeBuffer = ByteBuffer.allocate (512);
 187             clist = new ClosedChannelList ();
 188             try {
 189                 selector = Selector.open ();
 190                 schan.configureBlocking (false);
 191                 listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
 192             } catch (IOException e) {
 193                 System.err.println ("Server could not start: " + e);
 194             }
 195         }
 196 
 197         /* Stop the thread as soon as possible */
 198         public synchronized void terminate () {
 199             shutdown = true;
 200         }
 201 
 202         public void run ()  {
 203             try {
 204                 while (true) {
 205                     selector.select (1000);
 206                     Set selected = selector.selectedKeys();
 207                     Iterator iter = selected.iterator();
 208                     while (iter.hasNext()) {
 209                         key = (SelectionKey)iter.next();
 210                         if (key.equals (listenerKey)) {
 211                             SocketChannel sock = schan.accept ();
 212                             if (sock == null) {
 213                                 /* false notification */
 214                                 iter.remove();
 215                                 continue;
 216                             }
 217                             sock.configureBlocking (true);
 218                             SSLEngine sslEng = sslCtx.createSSLEngine();
 219                             sslEng.setUseClientMode(false);
 220                             new ServerWorker(cb, sock, sslEng).start();
 221                             nconn ++;
 222                             if (nconn == maxconn) {
 223                                 /* deregister */
 224                                 listenerKey.cancel ();
 225                                 listenerKey = null;
 226                             }
 227                         } else {
 228                             if (key.isReadable()) {
 229                                 boolean closed = false;
 230                                 SocketChannel chan = (SocketChannel) key.channel();
 231                                 if (key.attachment() != null) {
 232                                     closed = consume (chan);
 233                                 }
 234 
 235                                 if (closed) {
 236                                     chan.close ();
 237                                     key.cancel ();
 238                                     if (nconn == maxconn) {
 239                                         listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
 240                                     }
 241                                     nconn --;
 242                                 }
 243                             }
 244                         }
 245                         iter.remove();
 246                     }
 247                     clist.check();
 248 
 249                     synchronized (this) {
 250                         if (shutdown) {
 251                             clist.terminate ();
 252                             return;
 253                         }
 254                     }
 255                 }
 256             } catch (IOException e) {
 257                 System.out.println ("Server exception: " + e);
 258                 // TODO finish
 259             }
 260         }
 261 
 262         /* read all the data off the channel without looking at it
 263              * return true if connection closed
 264              */
 265         boolean consume (SocketChannel chan) {
 266             try {
 267                 consumeBuffer.clear ();
 268                 int c = chan.read (consumeBuffer);
 269                 if (c == -1)
 270                     return true;
 271             } catch (IOException e) {
 272                 return true;
 273             }
 274             return false;
 275         }
 276     }
 277 
 278     static class ServerWorker extends Thread {
 279         private ByteBuffer inNetBB;
 280         private ByteBuffer outNetBB;
 281         private ByteBuffer inAppBB;
 282         private ByteBuffer outAppBB;
 283 
 284         SSLEngine sslEng;
 285         SocketChannel schan;
 286         HttpCallback cb;
 287         HandshakeStatus currentHSStatus;
 288         boolean initialHSComplete;
 289         /*
 290          * All inbound data goes through this buffer.
 291          *
 292          * It might be nice to use a cache of ByteBuffers so we're
 293          * not alloc/dealloc'ing all over the place.
 294          */
 295 
 296         /*
 297          * Application buffers, also used for handshaking
 298          */
 299         private int appBBSize;
 300 
 301         ServerWorker (HttpCallback cb, SocketChannel schan, SSLEngine sslEng) {
 302             this.sslEng = sslEng;
 303             this.schan = schan;
 304             this.cb = cb;
 305             currentHSStatus = HandshakeStatus.NEED_UNWRAP;
 306             initialHSComplete = false;
 307             int netBBSize = sslEng.getSession().getPacketBufferSize();
 308             inNetBB =  ByteBuffer.allocate(netBBSize);
 309             outNetBB = ByteBuffer.allocate(netBBSize);
 310             appBBSize = sslEng.getSession().getApplicationBufferSize();
 311             inAppBB = ByteBuffer.allocate(appBBSize);
 312             outAppBB = ByteBuffer.allocate(appBBSize);
 313         }
 314 
 315         public SSLEngine getSSLEngine() {
 316             return sslEng;
 317         }
 318 
 319         public ByteBuffer outNetBB() {
 320             return outNetBB;
 321         }
 322 
 323         public ByteBuffer outAppBB() {
 324             return outAppBB;
 325         }
 326 
 327         public void run () {
 328             try {
 329                 SSLEngineResult result;
 330 
 331                 while (!initialHSComplete) {
 332 
 333                     switch (currentHSStatus) {
 334 
 335                     case NEED_UNWRAP:
 336                         int bytes = schan.read(inNetBB);
 337 
 338 needIO:
 339                         while (currentHSStatus == HandshakeStatus.NEED_UNWRAP) {
 340                             /*
 341                              * Don't need to resize requestBB, since no app data should
 342                              * be generated here.
 343                              */
 344                             inNetBB.flip();
 345                             result = sslEng.unwrap(inNetBB, inAppBB);
 346                             inNetBB.compact();
 347                             currentHSStatus = result.getHandshakeStatus();
 348 
 349                             switch (result.getStatus()) {
 350 
 351                             case OK:
 352                                 switch (currentHSStatus) {
 353                                 case NOT_HANDSHAKING:
 354                                     throw new IOException(
 355                                                           "Not handshaking during initial handshake");
 356 
 357                                 case NEED_TASK:
 358                                     Runnable task;
 359                                     while ((task = sslEng.getDelegatedTask()) != null) {
 360                                         task.run();
 361                                         currentHSStatus = sslEng.getHandshakeStatus();
 362                                     }
 363                                     break;
 364                                 }
 365 
 366                                 break;
 367 
 368                             case BUFFER_UNDERFLOW:
 369                                 break needIO;
 370 
 371                             default: // BUFFER_OVERFLOW/CLOSED:
 372                                 throw new IOException("Received" + result.getStatus() +
 373                                                       "during initial handshaking");
 374                             }
 375                         }
 376 
 377                         /*
 378                          * Just transitioned from read to write.
 379                          */
 380                         if (currentHSStatus != HandshakeStatus.NEED_WRAP) {
 381                             break;
 382                         }
 383 
 384                         // Fall through and fill the write buffer.
 385 
 386                     case NEED_WRAP:
 387                         /*
 388                          * The flush above guarantees the out buffer to be empty
 389                          */
 390                         outNetBB.clear();
 391                         result = sslEng.wrap(inAppBB, outNetBB);
 392                         outNetBB.flip();
 393                         schan.write (outNetBB);
 394                         outNetBB.compact();
 395                         currentHSStatus = result.getHandshakeStatus();
 396 
 397                         switch (result.getStatus()) {
 398                         case OK:
 399 
 400                             if (currentHSStatus == HandshakeStatus.NEED_TASK) {
 401                                 Runnable task;
 402                                 while ((task = sslEng.getDelegatedTask()) != null) {
 403                                     task.run();
 404                                     currentHSStatus = sslEng.getHandshakeStatus();
 405                                 }
 406                             }
 407 
 408                             break;
 409 
 410                         default: // BUFFER_OVERFLOW/BUFFER_UNDERFLOW/CLOSED:
 411                             throw new IOException("Received" + result.getStatus() +
 412                                                   "during initial handshaking");
 413                         }
 414                         break;
 415 
 416                     case FINISHED:
 417                         initialHSComplete = true;
 418                         break;
 419                     default: // NOT_HANDSHAKING/NEED_TASK
 420                         throw new RuntimeException("Invalid Handshaking State" +
 421                                                    currentHSStatus);
 422                     } // switch
 423                 }
 424                 // read the application data; using non-blocking mode
 425                 schan.configureBlocking(false);
 426                 read(schan, sslEng);
 427             } catch (Exception ex) {
 428                 throw new RuntimeException(ex);
 429             }
 430         }
 431 
 432         /* return true if the connection is closed, false otherwise */
 433 
 434         private boolean read (SocketChannel chan, SSLEngine sslEng) {
 435             HttpTransaction msg;
 436             boolean res;
 437             try {
 438                 InputStream is = new BufferedInputStream (new NioInputStream (chan, sslEng, inNetBB, inAppBB));
 439                 String requestline = readLine (is);
 440                 MessageHeader mhead = new MessageHeader (is);
 441                 String clen = mhead.findValue ("Content-Length");
 442                 String trferenc = mhead.findValue ("Transfer-Encoding");
 443                 String data = null;
 444                 if (trferenc != null && trferenc.equals ("chunked"))
 445                     data = new String (readChunkedData (is));
 446                 else if (clen != null)
 447                     data = new String (readNormalData (is, Integer.parseInt (clen)));
 448                 String[] req = requestline.split (" ");
 449                 if (req.length < 2) {
 450                     /* invalid request line */
 451                     return false;
 452                 }
 453                 String cmd = req[0];
 454                 URI uri = null;
 455                 try {
 456                     uri = new URI (req[1]);
 457                     msg = new HttpTransaction (this, cmd, uri, mhead, data, null, chan);
 458                     cb.request (msg);
 459                 } catch (URISyntaxException e) {
 460                     System.err.println ("Invalid URI: " + e);
 461                     msg = new HttpTransaction (this, cmd, null, null, null, null, chan);
 462                     msg.sendResponse (501, "Whatever");
 463                 }
 464                 res = false;
 465             } catch (IOException e) {
 466                 res = true;
 467             }
 468             return res;
 469         }
 470 
 471         byte[] readNormalData (InputStream is, int len) throws IOException {
 472             byte [] buf  = new byte [len];
 473             int c, off=0, remain=len;
 474             while (remain > 0 && ((c=is.read (buf, off, remain))>0)) {
 475                 remain -= c;
 476                 off += c;
 477             }
 478             return buf;
 479         }
 480 
 481         private void readCRLF(InputStream is) throws IOException {
 482             int cr = is.read();
 483             int lf = is.read();
 484 
 485             if (((cr & 0xff) != 0x0d) ||
 486                 ((lf & 0xff) != 0x0a)) {
 487                 throw new IOException(
 488                     "Expected <CR><LF>:  got '" + cr + "/" + lf + "'");
 489             }
 490         }
 491 
 492         byte[] readChunkedData (InputStream is) throws IOException {
 493             LinkedList l = new LinkedList ();
 494             int total = 0;
 495             for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) {
 496                 l.add (readNormalData(is, len));
 497                 total += len;
 498                 readCRLF(is); // CRLF at end of chunk
 499             }
 500             readCRLF(is); // CRLF at end of Chunked Stream.
 501             byte[] buf = new byte [total];
 502             Iterator i = l.iterator();
 503             int x = 0;
 504             while (i.hasNext()) {
 505                 byte[] b = (byte[])i.next();
 506                 System.arraycopy (b, 0, buf, x, b.length);
 507                 x += b.length;
 508             }
 509             return buf;
 510         }
 511 
 512         private int readChunkLen (InputStream is) throws IOException {
 513             int c, len=0;
 514             boolean done=false, readCR=false;
 515             while (!done) {
 516                 c = is.read ();
 517                 if (c == '\n' && readCR) {
 518                     done = true;
 519                 } else {
 520                     if (c == '\r' && !readCR) {
 521                         readCR = true;
 522                     } else {
 523                         int x=0;
 524                         if (c >= 'a' && c <= 'f') {
 525                             x = c - 'a' + 10;
 526                         } else if (c >= 'A' && c <= 'F') {
 527                             x = c - 'A' + 10;
 528                         } else if (c >= '0' && c <= '9') {
 529                             x = c - '0';
 530                         }
 531                         len = len * 16 + x;
 532                     }
 533                 }
 534             }
 535             return len;
 536         }
 537 
 538         private String readLine (InputStream is) throws IOException {
 539             boolean done=false, readCR=false;
 540             byte[] b = new byte [512];
 541             int c, l = 0;
 542 
 543             while (!done) {
 544                 c = is.read ();
 545                 if (c == '\n' && readCR) {
 546                     done = true;
 547                 } else {
 548                     if (c == '\r' && !readCR) {
 549                         readCR = true;
 550                     } else {
 551                         b[l++] = (byte)c;
 552                     }
 553                 }
 554             }
 555             return new String (b);
 556         }
 557 
 558         /** close the channel associated with the current key by:
 559          * 1. shutdownOutput (send a FIN)
 560          * 2. mark the key so that incoming data is to be consumed and discarded
 561          * 3. After a period, close the socket
 562          */
 563 
 564         synchronized void orderlyCloseChannel (SocketChannel ch) throws IOException {
 565             ch.socket().shutdownOutput();
 566         }
 567 
 568         synchronized void abortiveCloseChannel (SocketChannel ch) throws IOException {
 569             Socket s = ch.socket ();
 570             s.setSoLinger (true, 0);
 571             ch.close();
 572         }
 573     }
 574 
 575 
 576     /**
 577      * Implements blocking reading semantics on top of a non-blocking channel
 578      */
 579 
 580     static class NioInputStream extends InputStream {
 581         SSLEngine sslEng;
 582         SocketChannel channel;
 583         Selector selector;
 584         ByteBuffer inNetBB;
 585         ByteBuffer inAppBB;
 586         SelectionKey key;
 587         int available;
 588         byte[] one;
 589         boolean closed;
 590         ByteBuffer markBuf; /* reads may be satisifed from this buffer */
 591         boolean marked;
 592         boolean reset;
 593         int readlimit;
 594 
 595         public NioInputStream (SocketChannel chan, SSLEngine sslEng, ByteBuffer inNetBB, ByteBuffer inAppBB) throws IOException {
 596             this.sslEng = sslEng;
 597             this.channel = chan;
 598             selector = Selector.open();
 599             this.inNetBB = inNetBB;
 600             this.inAppBB = inAppBB;
 601             key = chan.register (selector, SelectionKey.OP_READ);
 602             available = 0;
 603             one = new byte[1];
 604             closed = marked = reset = false;
 605         }
 606 
 607         public synchronized int read (byte[] b) throws IOException {
 608             return read (b, 0, b.length);
 609         }
 610 
 611         public synchronized int read () throws IOException {
 612             return read (one, 0, 1);
 613         }
 614 
 615         public synchronized int read (byte[] b, int off, int srclen) throws IOException {
 616 
 617             int canreturn, willreturn;
 618 
 619             if (closed)
 620                 return -1;
 621 
 622             if (reset) { /* satisfy from markBuf */
 623                 canreturn = markBuf.remaining ();
 624                 willreturn = canreturn>srclen ? srclen : canreturn;
 625                 markBuf.get(b, off, willreturn);
 626                 if (canreturn == willreturn) {
 627                     reset = false;
 628                 }
 629             } else { /* satisfy from channel */
 630                 canreturn = available();
 631                 if (canreturn == 0) {
 632                     block ();
 633                     canreturn = available();
 634                 }
 635                 willreturn = canreturn>srclen ? srclen : canreturn;
 636                 inAppBB.get(b, off, willreturn);
 637                 available -= willreturn;
 638 
 639                 if (marked) { /* copy into markBuf */
 640                     try {
 641                         markBuf.put (b, off, willreturn);
 642                     } catch (BufferOverflowException e) {
 643                         marked = false;
 644                     }
 645                 }
 646             }
 647             return willreturn;
 648         }
 649 
 650         public synchronized int available () throws IOException {
 651             if (closed)
 652                 throw new IOException ("Stream is closed");
 653 
 654             if (reset)
 655                 return markBuf.remaining();
 656 
 657             if (available > 0)
 658                 return available;
 659 
 660             inAppBB.clear ();
 661             int bytes = channel.read (inNetBB);
 662 
 663             int needed = sslEng.getSession().getApplicationBufferSize();
 664             if (needed > inAppBB.remaining()) {
 665                 inAppBB = ByteBuffer.allocate(needed);
 666             }
 667             inNetBB.flip();
 668             SSLEngineResult result = sslEng.unwrap(inNetBB, inAppBB);
 669             inNetBB.compact();
 670             available = result.bytesProduced();
 671 
 672             if (available > 0)
 673                 inAppBB.flip();
 674             else if (available == -1)
 675                 throw new IOException ("Stream is closed");
 676             return available;
 677         }
 678 
 679         /**
 680          * block() only called when available==0 and buf is empty
 681          */
 682         private synchronized void block () throws IOException {
 683             //assert available == 0;
 684             int n = selector.select ();
 685             //assert n == 1;
 686             selector.selectedKeys().clear();
 687             available ();
 688         }
 689 
 690         public void close () throws IOException {
 691             if (closed)
 692                 return;
 693             channel.close ();
 694             closed = true;
 695         }
 696 
 697         public synchronized void mark (int readlimit) {
 698             if (closed)
 699                 return;
 700             this.readlimit = readlimit;
 701             markBuf = ByteBuffer.allocate (readlimit);
 702             marked = true;
 703             reset = false;
 704         }
 705 
 706         public synchronized void reset () throws IOException {
 707             if (closed )
 708                 return;
 709             if (!marked)
 710                 throw new IOException ("Stream not marked");
 711             marked = false;
 712             reset = true;
 713             markBuf.flip ();
 714         }
 715     }
 716 
 717     static class NioOutputStream extends OutputStream {
 718         SSLEngine sslEng;
 719         SocketChannel channel;
 720         ByteBuffer outNetBB;
 721         ByteBuffer outAppBB;
 722         SelectionKey key;
 723         Selector selector;
 724         boolean closed;
 725         byte[] one;
 726 
 727         public NioOutputStream (SocketChannel channel, SSLEngine sslEng, ByteBuffer outNetBB, ByteBuffer outAppBB) throws IOException {
 728             this.sslEng = sslEng;
 729             this.channel = channel;
 730             this.outNetBB = outNetBB;
 731             this.outAppBB = outAppBB;
 732             selector = Selector.open ();
 733             key = channel.register (selector, SelectionKey.OP_WRITE);
 734             closed = false;
 735             one = new byte [1];
 736         }
 737 
 738         public synchronized void write (int b) throws IOException {
 739             one[0] = (byte)b;
 740             write (one, 0, 1);
 741         }
 742 
 743         public synchronized void write (byte[] b) throws IOException {
 744             write (b, 0, b.length);
 745         }
 746 
 747         public synchronized void write (byte[] b, int off, int len) throws IOException {
 748             if (closed)
 749                 throw new IOException ("stream is closed");
 750 
 751             outAppBB = ByteBuffer.allocate (len);
 752             outAppBB.put (b, off, len);
 753             outAppBB.flip ();
 754             int n;
 755             outNetBB.clear();
 756             int needed = sslEng.getSession().getPacketBufferSize();
 757             if (outNetBB.capacity() < needed) {
 758                 outNetBB = ByteBuffer.allocate(needed);
 759             }
 760             SSLEngineResult ret = sslEng.wrap(outAppBB, outNetBB);
 761             outNetBB.flip();
 762             int newLen = ret.bytesProduced();
 763             while ((n = channel.write (outNetBB)) < newLen) {
 764                 newLen -= n;
 765                 if (newLen == 0)
 766                     return;
 767                 selector.select ();
 768                 selector.selectedKeys().clear ();
 769             }
 770         }
 771 
 772         public void close () throws IOException {
 773             if (closed)
 774                 return;
 775             channel.close ();
 776             closed = true;
 777         }
 778     }
 779 
 780     /**
 781      * Utilities for synchronization. A condition is
 782      * identified by a string name, and is initialized
 783      * upon first use (ie. setCondition() or waitForCondition()). Threads
 784      * are blocked until some thread calls (or has called) setCondition() for the same
 785      * condition.
 786      * <P>
 787      * A rendezvous built on a condition is also provided for synchronizing
 788      * N threads.
 789      */
 790 
 791     private static HashMap conditions = new HashMap();
 792 
 793     /*
 794      * Modifiable boolean object
 795      */
 796     private static class BValue {
 797         boolean v;
 798     }
 799 
 800     /*
 801      * Modifiable int object
 802      */
 803     private static class IValue {
 804         int v;
 805         IValue (int i) {
 806             v =i;
 807         }
 808     }
 809 
 810 
 811     private static BValue getCond (String condition) {
 812         synchronized (conditions) {
 813             BValue cond = (BValue) conditions.get (condition);
 814             if (cond == null) {
 815                 cond = new BValue();
 816                 conditions.put (condition, cond);
 817             }
 818             return cond;
 819         }
 820     }
 821 
 822     /**
 823      * Set the condition to true. Any threads that are currently blocked
 824      * waiting on the condition, will be unblocked and allowed to continue.
 825      * Threads that subsequently call waitForCondition() will not block.
 826      * If the named condition did not exist prior to the call, then it is created
 827      * first.
 828      */
 829 
 830     public static void setCondition (String condition) {
 831         BValue cond = getCond (condition);
 832         synchronized (cond) {
 833             if (cond.v) {
 834                 return;
 835             }
 836             cond.v = true;
 837             cond.notifyAll();
 838         }
 839     }
 840 
 841     /**
 842      * If the named condition does not exist, then it is created and initialized
 843      * to false. If the condition exists or has just been created and its value
 844      * is false, then the thread blocks until another thread sets the condition.
 845      * If the condition exists and is already set to true, then this call returns
 846      * immediately without blocking.
 847      */
 848 
 849     public static void waitForCondition (String condition) {
 850         BValue cond = getCond (condition);
 851         synchronized (cond) {
 852             if (!cond.v) {
 853                 try {
 854                     cond.wait();
 855                 } catch (InterruptedException e) {}
 856             }
 857         }
 858     }
 859 
 860     /* conditions must be locked when accessing this */
 861     static HashMap rv = new HashMap();
 862 
 863     /**
 864      * Force N threads to rendezvous (ie. wait for each other) before proceeding.
 865      * The first thread(s) to call are blocked until the last
 866      * thread makes the call. Then all threads continue.
 867      * <p>
 868      * All threads that call with the same condition name, must use the same value
 869      * for N (or the results may be not be as expected).
 870      * <P>
 871      * Obviously, if fewer than N threads make the rendezvous then the result
 872      * will be a hang.
 873      */
 874 
 875     public static void rendezvous (String condition, int N) {
 876         BValue cond;
 877         IValue iv;
 878         String name = "RV_"+condition;
 879 
 880         /* get the condition */
 881 
 882         synchronized (conditions) {
 883             cond = (BValue)conditions.get (name);
 884             if (cond == null) {
 885                 /* we are first caller */
 886                 if (N < 2) {
 887                     throw new RuntimeException ("rendezvous must be called with N >= 2");
 888                 }
 889                 cond = new BValue ();
 890                 conditions.put (name, cond);
 891                 iv = new IValue (N-1);
 892                 rv.put (name, iv);
 893             } else {
 894                 /* already initialised, just decrement the counter */
 895                 iv = (IValue) rv.get (name);
 896                 iv.v --;
 897             }
 898         }
 899 
 900         if (iv.v > 0) {
 901             waitForCondition (name);
 902         } else {
 903             setCondition (name);
 904             synchronized (conditions) {
 905                 clearCondition (name);
 906                 rv.remove (name);
 907             }
 908         }
 909     }
 910 
 911     /**
 912      * If the named condition exists and is set then remove it, so it can
 913      * be re-initialized and used again. If the condition does not exist, or
 914      * exists but is not set, then the call returns without doing anything.
 915      * Note, some higher level synchronization
 916      * may be needed between clear and the other operations.
 917      */
 918 
 919     public static void clearCondition(String condition) {
 920         BValue cond;
 921         synchronized (conditions) {
 922             cond = (BValue) conditions.get (condition);
 923             if (cond == null) {
 924                 return;
 925             }
 926             synchronized (cond) {
 927                 if (cond.v) {
 928                     conditions.remove (condition);
 929                 }
 930             }
 931         }
 932     }
 933 }