1 /*
   2  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  */
  23 
  24 import java.net.*;
  25 import java.io.*;
  26 import java.nio.*;
  27 import java.nio.channels.*;
  28 import sun.net.www.MessageHeader;
  29 import java.util.*;
  30 
  31 /**
  32  * This class implements a simple HTTP server. It uses multiple threads to
  33  * handle connections in parallel, and also multiple connections/requests
  34  * can be handled per thread.
  35  * <p>
  36  * It must be instantiated with a {@link HttpCallback} object to which
  37  * requests are given and must be handled.
  38  * <p>
  39  * Simple synchronization between the client(s) and server can be done
  40  * using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and
  41  * {@link #rendezvous(String,int)} methods.
  42  *
  43  * NOTE NOTE NOTE NOTE NOTE NOTE NOTE
  44  *
  45  * If changes are made here, please sure they are propagated to
  46  * the HTTPS equivalent in the JSSE regression test suite.
  47  *
  48  * NOTE NOTE NOTE NOTE NOTE NOTE NOTE
  49  */
  50 
  51 public class TestHttpServer {
  52 
  53     ServerSocketChannel schan;
  54     int threads;
  55     int cperthread;
  56     HttpCallback cb;
  57     Server[] servers;
  58 
  59     /**
  60      * Create a <code>TestHttpServer<code> instance with the specified callback object
  61      * for handling requests. One thread is created to handle requests,
  62      * and up to ten TCP connections will be handled simultaneously.
  63      * @param cb the callback object which is invoked to handle each
  64      *  incoming request
  65      */
  66 
  67     public TestHttpServer (HttpCallback cb) throws IOException {
  68         this (cb, 1, 10, 0);
  69     }
  70 
  71     /**
  72      * Create a <code>TestHttpServer<code> instance with the specified number of
  73      * threads and maximum number of connections per thread. This functions
  74      * the same as the 4 arg constructor, where the port argument is set to zero.
  75      * @param cb the callback object which is invoked to handle each
  76      *     incoming request
  77      * @param threads the number of threads to create to handle requests
  78      *     in parallel
  79      * @param cperthread the number of simultaneous TCP connections to
  80      *     handle per thread
  81      */
  82 
  83     public TestHttpServer (HttpCallback cb, int threads, int cperthread)
  84         throws IOException {
  85         this (cb, threads, cperthread, 0);
  86     }
  87 
  88     /**
  89      * Create a <code>TestHttpServer<code> instance with the specified number
  90      * of threads and maximum number of connections per thread and running on
  91      * the specified port. The specified number of threads are created to
  92      * handle incoming requests, and each thread is allowed
  93      * to handle a number of simultaneous TCP connections.
  94      * @param cb the callback object which is invoked to handle
  95      *  each incoming request
  96      * @param threads the number of threads to create to handle
  97      *  requests in parallel
  98      * @param cperthread the number of simultaneous TCP connections
  99      *  to handle per thread
 100      * @param port the port number to bind the server to. <code>Zero</code>
 101      *  means choose any free port.
 102      */
 103 
 104     public TestHttpServer (HttpCallback cb, int threads, int cperthread, int port)
 105         throws IOException {
 106         schan = ServerSocketChannel.open ();
 107         InetSocketAddress addr = new InetSocketAddress (port);
 108         schan.socket().bind (addr);
 109         this.threads = threads;
 110         this.cb = cb;
 111         this.cperthread = cperthread;
 112         servers = new Server [threads];
 113         for (int i=0; i<threads; i++) {
 114             servers[i] = new Server (cb, schan, cperthread);
 115             servers[i].start();
 116         }
 117     }
 118 
 119     /** Tell all threads in the server to exit within 5 seconds.
 120      *  This is an abortive termination. Just prior to the thread exiting
 121      *  all channels in that thread waiting to be closed are forceably closed.
 122      */
 123 
 124     public void terminate () {
 125         for (int i=0; i<threads; i++) {
 126             servers[i].terminate ();
 127         }
 128     }
 129 
 130     /**
 131      * return the local port number to which the server is bound.
 132      * @return the local port number
 133      */
 134 
 135     public int getLocalPort () {
 136         return schan.socket().getLocalPort ();
 137     }
 138 
 139     static class Server extends Thread {
 140 
 141         ServerSocketChannel schan;
 142         Selector selector;
 143         SelectionKey listenerKey;
 144         SelectionKey key; /* the current key being processed */
 145         HttpCallback cb;
 146         ByteBuffer consumeBuffer;
 147         int maxconn;
 148         int nconn;
 149         ClosedChannelList clist;
 150         boolean shutdown;
 151 
 152         Server (HttpCallback cb, ServerSocketChannel schan, int maxconn) {
 153             this.schan = schan;
 154             this.maxconn = maxconn;
 155             this.cb = cb;
 156             nconn = 0;
 157             consumeBuffer = ByteBuffer.allocate (512);
 158             clist = new ClosedChannelList ();
 159             try {
 160                 selector = Selector.open ();
 161                 schan.configureBlocking (false);
 162                 listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
 163             } catch (IOException e) {
 164                 System.err.println ("Server could not start: " + e);
 165             }
 166         }
 167 
 168         /* Stop the thread as soon as possible */
 169         public synchronized void terminate () {
 170             shutdown = true;
 171         }
 172 
 173         public void run ()  {
 174             try {
 175                 while (true) {
 176                     selector.select (1000);
 177                     Set selected = selector.selectedKeys();
 178                     Iterator iter = selected.iterator();
 179                     while (iter.hasNext()) {
 180                         key = (SelectionKey)iter.next();
 181                         if (key.equals (listenerKey)) {
 182                             SocketChannel sock = schan.accept ();
 183                             if (sock == null) {
 184                                 /* false notification */
 185                                 iter.remove();
 186                                 continue;
 187                             }
 188                             sock.configureBlocking (false);
 189                             sock.register (selector, SelectionKey.OP_READ);
 190                             nconn ++;
 191                             System.out.println("SERVER: new connection. chan[" + sock + "]");
 192                             if (nconn == maxconn) {
 193                                 /* deregister */
 194                                 listenerKey.cancel ();
 195                                 listenerKey = null;
 196                             }
 197                         } else {
 198                             if (key.isReadable()) {
 199                                 boolean closed;
 200                                 SocketChannel chan = (SocketChannel) key.channel();
 201                                 System.out.println("SERVER: connection readable. chan[" + chan + "]");
 202                                 if (key.attachment() != null) {
 203                                     System.out.println("Server: comsume");
 204                                     closed = consume (chan);
 205                                 } else {
 206                                     closed = read (chan, key);
 207                                 }
 208                                 if (closed) {
 209                                     chan.close ();
 210                                     key.cancel ();
 211                                     if (nconn == maxconn) {
 212                                         listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
 213                                     }
 214                                     nconn --;
 215                                 }
 216                             }
 217                         }
 218                         iter.remove();
 219                     }
 220                     clist.check();
 221                     if (shutdown) {
 222                         clist.terminate ();
 223                         return;
 224                     }
 225                 }
 226             } catch (IOException e) {
 227                 System.out.println ("Server exception: " + e);
 228                 // TODO finish
 229             }
 230         }
 231 
 232         /* read all the data off the channel without looking at it
 233              * return true if connection closed
 234              */
 235         boolean consume (SocketChannel chan) {
 236             try {
 237                 consumeBuffer.clear ();
 238                 int c = chan.read (consumeBuffer);
 239                 if (c == -1)
 240                     return true;
 241             } catch (IOException e) {
 242                 return true;
 243             }
 244             return false;
 245         }
 246 
 247         /* return true if the connection is closed, false otherwise */
 248 
 249         private boolean read (SocketChannel chan, SelectionKey key) {
 250             HttpTransaction msg;
 251             boolean res;
 252             try {
 253                 InputStream is = new BufferedInputStream (new NioInputStream (chan));
 254                 String requestline = readLine (is);
 255                 MessageHeader mhead = new MessageHeader (is);
 256                 String clen = mhead.findValue ("Content-Length");
 257                 String trferenc = mhead.findValue ("Transfer-Encoding");
 258                 String data = null;
 259                 if (trferenc != null && trferenc.equals ("chunked"))
 260                     data = new String (readChunkedData (is));
 261                 else if (clen != null)
 262                     data = new String (readNormalData (is, Integer.parseInt (clen)));
 263                 String[] req = requestline.split (" ");
 264                 if (req.length < 2) {
 265                     /* invalid request line */
 266                     return false;
 267                 }
 268                 String cmd = req[0];
 269                 URI uri = null;
 270                 try {
 271                     uri = new URI (req[1]);
 272                     msg = new HttpTransaction (this, cmd, uri, mhead, data, null, key);
 273                     cb.request (msg);
 274                 } catch (URISyntaxException e) {
 275                     System.err.println ("Invalid URI: " + e);
 276                     msg = new HttpTransaction (this, cmd, null, null, null, null, key);
 277                     msg.sendResponse (501, "Whatever");
 278                 }
 279                 res = false;
 280             } catch (IOException e) {
 281                 res = true;
 282             }
 283             return res;
 284         }
 285 
 286         byte[] readNormalData (InputStream is, int len) throws IOException {
 287             byte [] buf  = new byte [len];
 288             int c, off=0, remain=len;
 289             while (remain > 0 && ((c=is.read (buf, off, remain))>0)) {
 290                 remain -= c;
 291                 off += c;
 292             }
 293             return buf;
 294         }
 295 
 296         private void readCRLF(InputStream is) throws IOException {
 297             int cr = is.read();
 298             int lf = is.read();
 299 
 300             if (((cr & 0xff) != 0x0d) ||
 301                 ((lf & 0xff) != 0x0a)) {
 302                 throw new IOException(
 303                     "Expected <CR><LF>:  got '" + cr + "/" + lf + "'");
 304             }
 305         }
 306 
 307         byte[] readChunkedData (InputStream is) throws IOException {
 308             LinkedList l = new LinkedList ();
 309             int total = 0;
 310             for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) {
 311                 l.add (readNormalData(is, len));
 312                 total += len;
 313                 readCRLF(is);  // CRLF at end of chunk
 314             }
 315             readCRLF(is); // CRLF at end of Chunked Stream.
 316             byte[] buf = new byte [total];
 317             Iterator i = l.iterator();
 318             int x = 0;
 319             while (i.hasNext()) {
 320                 byte[] b = (byte[])i.next();
 321                 System.arraycopy (b, 0, buf, x, b.length);
 322                 x += b.length;
 323             }
 324             return buf;
 325         }
 326 
 327         private int readChunkLen (InputStream is) throws IOException {
 328             int c, len=0;
 329             boolean done=false, readCR=false;
 330             while (!done) {
 331                 c = is.read ();
 332                 if (c == '\n' && readCR) {
 333                     done = true;
 334                 } else {
 335                     if (c == '\r' && !readCR) {
 336                         readCR = true;
 337                     } else {
 338                         int x=0;
 339                         if (c >= 'a' && c <= 'f') {
 340                             x = c - 'a' + 10;
 341                         } else if (c >= 'A' && c <= 'F') {
 342                             x = c - 'A' + 10;
 343                         } else if (c >= '0' && c <= '9') {
 344                             x = c - '0';
 345                         }
 346                         len = len * 16 + x;
 347                     }
 348                 }
 349             }
 350             return len;
 351         }
 352 
 353         private String readLine (InputStream is) throws IOException {
 354             boolean done=false, readCR=false;
 355             byte[] b = new byte [512];
 356             int c, l = 0;
 357 
 358             while (!done) {
 359                 c = is.read ();
 360                 if (c == '\n' && readCR) {
 361                     done = true;
 362                 } else {
 363                     if (c == '\r' && !readCR) {
 364                         readCR = true;
 365                     } else {
 366                         b[l++] = (byte)c;
 367                     }
 368                 }
 369             }
 370             return new String (b);
 371         }
 372 
 373         /** close the channel associated with the current key by:
 374          * 1. shutdownOutput (send a FIN)
 375          * 2. mark the key so that incoming data is to be consumed and discarded
 376          * 3. After a period, close the socket
 377          */
 378 
 379         synchronized void orderlyCloseChannel (SelectionKey key) throws IOException {
 380             SocketChannel ch = (SocketChannel)key.channel ();
 381             System.out.println("SERVER: orderlyCloseChannel chan[" + ch + "]");
 382             ch.socket().shutdownOutput();
 383             key.attach (this);
 384             clist.add (key);
 385         }
 386 
 387         synchronized void abortiveCloseChannel (SelectionKey key) throws IOException {
 388             SocketChannel ch = (SocketChannel)key.channel ();
 389             System.out.println("SERVER: abortiveCloseChannel chan[" + ch + "]");
 390 
 391             Socket s = ch.socket ();
 392             s.setSoLinger (true, 0);
 393             ch.close();
 394         }
 395     }
 396 
 397 
 398     /**
 399      * Implements blocking reading semantics on top of a non-blocking channel
 400      */
 401 
 402     static class NioInputStream extends InputStream {
 403         SocketChannel channel;
 404         Selector selector;
 405         ByteBuffer chanbuf;
 406         SelectionKey key;
 407         int available;
 408         byte[] one;
 409         boolean closed;
 410         ByteBuffer markBuf; /* reads may be satisifed from this buffer */
 411         boolean marked;
 412         boolean reset;
 413         int readlimit;
 414 
 415         public NioInputStream (SocketChannel chan) throws IOException {
 416             this.channel = chan;
 417             selector = Selector.open();
 418             chanbuf = ByteBuffer.allocate (1024);
 419             key = chan.register (selector, SelectionKey.OP_READ);
 420             available = 0;
 421             one = new byte[1];
 422             closed = marked = reset = false;
 423         }
 424 
 425         public synchronized int read (byte[] b) throws IOException {
 426             return read (b, 0, b.length);
 427         }
 428 
 429         public synchronized int read () throws IOException {
 430             return read (one, 0, 1);
 431         }
 432 
 433         public synchronized int read (byte[] b, int off, int srclen) throws IOException {
 434 
 435             int canreturn, willreturn;
 436 
 437             if (closed)
 438                 return -1;
 439 
 440             if (reset) { /* satisfy from markBuf */
 441                 canreturn = markBuf.remaining ();
 442                 willreturn = canreturn>srclen ? srclen : canreturn;
 443                 markBuf.get(b, off, willreturn);
 444                 if (canreturn == willreturn) {
 445                     reset = false;
 446                 }
 447             } else { /* satisfy from channel */
 448                 canreturn = available();
 449                 if (canreturn == 0) {
 450                     block ();
 451                     canreturn = available();
 452                 }
 453                 willreturn = canreturn>srclen ? srclen : canreturn;
 454                 chanbuf.get(b, off, willreturn);
 455                 available -= willreturn;
 456 
 457                 if (marked) { /* copy into markBuf */
 458                     try {
 459                         markBuf.put (b, off, willreturn);
 460                     } catch (BufferOverflowException e) {
 461                         marked = false;
 462                     }
 463                 }
 464             }
 465             return willreturn;
 466         }
 467 
 468         public synchronized int available () throws IOException {
 469             if (closed)
 470                 throw new IOException ("Stream is closed");
 471 
 472             if (reset)
 473                 return markBuf.remaining();
 474 
 475             if (available > 0)
 476                 return available;
 477 
 478             chanbuf.clear ();
 479             available = channel.read (chanbuf);
 480             if (available > 0)
 481                 chanbuf.flip();
 482             else if (available == -1)
 483                 throw new IOException ("Stream is closed");
 484             return available;
 485         }
 486 
 487         /**
 488          * block() only called when available==0 and buf is empty
 489          */
 490         private synchronized void block () throws IOException {
 491             //assert available == 0;
 492             int n = selector.select ();
 493             //assert n == 1;
 494             selector.selectedKeys().clear();
 495             available ();
 496         }
 497 
 498         public void close () throws IOException {
 499             if (closed)
 500                 return;
 501             channel.close ();
 502             closed = true;
 503         }
 504 
 505         public synchronized void mark (int readlimit) {
 506             if (closed)
 507                 return;
 508             this.readlimit = readlimit;
 509             markBuf = ByteBuffer.allocate (readlimit);
 510             marked = true;
 511             reset = false;
 512         }
 513 
 514         public synchronized void reset () throws IOException {
 515             if (closed )
 516                 return;
 517             if (!marked)
 518                 throw new IOException ("Stream not marked");
 519             marked = false;
 520             reset = true;
 521             markBuf.flip ();
 522         }
 523     }
 524 
 525     static class NioOutputStream extends OutputStream {
 526         SocketChannel channel;
 527         ByteBuffer buf;
 528         SelectionKey key;
 529         Selector selector;
 530         boolean closed;
 531         byte[] one;
 532 
 533         public NioOutputStream (SocketChannel channel) throws IOException {
 534             this.channel = channel;
 535             selector = Selector.open ();
 536             key = channel.register (selector, SelectionKey.OP_WRITE);
 537             closed = false;
 538             one = new byte [1];
 539         }
 540 
 541         public synchronized void write (int b) throws IOException {
 542             one[0] = (byte)b;
 543             write (one, 0, 1);
 544         }
 545 
 546         public synchronized void write (byte[] b) throws IOException {
 547             write (b, 0, b.length);
 548         }
 549 
 550         public synchronized void write (byte[] b, int off, int len) throws IOException {
 551             if (closed)
 552                 throw new IOException ("stream is closed");
 553 
 554             buf = ByteBuffer.allocate (len);
 555             buf.put (b, off, len);
 556             buf.flip ();
 557             int n;
 558             while ((n = channel.write (buf)) < len) {
 559                 len -= n;
 560                 if (len == 0)
 561                     return;
 562                 selector.select ();
 563                 selector.selectedKeys().clear ();
 564             }
 565         }
 566 
 567         public void close () throws IOException {
 568             if (closed)
 569                 return;
 570             channel.close ();
 571             closed = true;
 572         }
 573     }
 574 
 575     /**
 576      * Utilities for synchronization. A condition is
 577      * identified by a string name, and is initialized
 578      * upon first use (ie. setCondition() or waitForCondition()). Threads
 579      * are blocked until some thread calls (or has called) setCondition() for the same
 580      * condition.
 581      * <P>
 582      * A rendezvous built on a condition is also provided for synchronizing
 583      * N threads.
 584      */
 585 
 586     private static HashMap conditions = new HashMap();
 587 
 588     /*
 589      * Modifiable boolean object
 590      */
 591     private static class BValue {
 592         boolean v;
 593     }
 594 
 595     /*
 596      * Modifiable int object
 597      */
 598     private static class IValue {
 599         int v;
 600         IValue (int i) {
 601             v =i;
 602         }
 603     }
 604 
 605 
 606     private static BValue getCond (String condition) {
 607         synchronized (conditions) {
 608             BValue cond = (BValue) conditions.get (condition);
 609             if (cond == null) {
 610                 cond = new BValue();
 611                 conditions.put (condition, cond);
 612             }
 613             return cond;
 614         }
 615     }
 616 
 617     /**
 618      * Set the condition to true. Any threads that are currently blocked
 619      * waiting on the condition, will be unblocked and allowed to continue.
 620      * Threads that subsequently call waitForCondition() will not block.
 621      * If the named condition did not exist prior to the call, then it is created
 622      * first.
 623      */
 624 
 625     public static void setCondition (String condition) {
 626         BValue cond = getCond (condition);
 627         synchronized (cond) {
 628             if (cond.v) {
 629                 return;
 630             }
 631             cond.v = true;
 632             cond.notifyAll();
 633         }
 634     }
 635 
 636     /**
 637      * If the named condition does not exist, then it is created and initialized
 638      * to false. If the condition exists or has just been created and its value
 639      * is false, then the thread blocks until another thread sets the condition.
 640      * If the condition exists and is already set to true, then this call returns
 641      * immediately without blocking.
 642      */
 643 
 644     public static void waitForCondition (String condition) {
 645         BValue cond = getCond (condition);
 646         synchronized (cond) {
 647             if (!cond.v) {
 648                 try {
 649                     cond.wait();
 650                 } catch (InterruptedException e) {}
 651             }
 652         }
 653     }
 654 
 655     /* conditions must be locked when accessing this */
 656     static HashMap rv = new HashMap();
 657 
 658     /**
 659      * Force N threads to rendezvous (ie. wait for each other) before proceeding.
 660      * The first thread(s) to call are blocked until the last
 661      * thread makes the call. Then all threads continue.
 662      * <p>
 663      * All threads that call with the same condition name, must use the same value
 664      * for N (or the results may be not be as expected).
 665      * <P>
 666      * Obviously, if fewer than N threads make the rendezvous then the result
 667      * will be a hang.
 668      */
 669 
 670     public static void rendezvous (String condition, int N) {
 671         BValue cond;
 672         IValue iv;
 673         String name = "RV_"+condition;
 674 
 675         /* get the condition */
 676 
 677         synchronized (conditions) {
 678             cond = (BValue)conditions.get (name);
 679             if (cond == null) {
 680                 /* we are first caller */
 681                 if (N < 2) {
 682                     throw new RuntimeException ("rendezvous must be called with N >= 2");
 683                 }
 684                 cond = new BValue ();
 685                 conditions.put (name, cond);
 686                 iv = new IValue (N-1);
 687                 rv.put (name, iv);
 688             } else {
 689                 /* already initialised, just decrement the counter */
 690                 iv = (IValue) rv.get (name);
 691                 iv.v --;
 692             }
 693         }
 694 
 695         if (iv.v > 0) {
 696             waitForCondition (name);
 697         } else {
 698             setCondition (name);
 699             synchronized (conditions) {
 700                 clearCondition (name);
 701                 rv.remove (name);
 702             }
 703         }
 704     }
 705 
 706     /**
 707      * If the named condition exists and is set then remove it, so it can
 708      * be re-initialized and used again. If the condition does not exist, or
 709      * exists but is not set, then the call returns without doing anything.
 710      * Note, some higher level synchronization
 711      * may be needed between clear and the other operations.
 712      */
 713 
 714     public static void clearCondition(String condition) {
 715         BValue cond;
 716         synchronized (conditions) {
 717             cond = (BValue) conditions.get (condition);
 718             if (cond == null) {
 719                 return;
 720             }
 721             synchronized (cond) {
 722                 if (cond.v) {
 723                     conditions.remove (condition);
 724                 }
 725             }
 726         }
 727     }
 728 }