1 /* 2 * Copyright (c) 2002, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.net.*; 25 import java.io.*; 26 import java.nio.*; 27 import java.nio.channels.*; 28 import sun.net.www.MessageHeader; 29 import java.util.*; 30 import javax.net.ssl.*; 31 import javax.net.ssl.SSLEngineResult.*; 32 import java.security.*; 33 34 /** 35 * This class implements a simple HTTPS server. It uses multiple threads to 36 * handle connections in parallel, and will spin off a new thread to handle 37 * each request. (this is easier to implement with SSLEngine) 38 * <p> 39 * It must be instantiated with a {@link HttpCallback} object to which 40 * requests are given and must be handled. 41 * <p> 42 * Simple synchronization between the client(s) and server can be done 43 * using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and 44 * {@link #rendezvous(String,int)} methods. 45 * 46 * NOTE NOTE NOTE NOTE NOTE NOTE NOTE 47 * 48 * If you make a change in here, please don't forget to make the 49 * corresponding change in the J2SE equivalent. 50 * 51 * NOTE NOTE NOTE NOTE NOTE NOTE NOTE 52 */ 53 54 public class TestHttpsServer { 55 56 ServerSocketChannel schan; 57 int threads; 58 int cperthread; 59 HttpCallback cb; 60 Server[] servers; 61 62 // ssl related fields 63 static SSLContext sslCtx; 64 65 /** 66 * Create a <code>TestHttpsServer<code> instance with the specified callback object 67 * for handling requests. One thread is created to handle requests, 68 * and up to ten TCP connections will be handled simultaneously. 69 * @param cb the callback object which is invoked to handle each 70 * incoming request 71 */ 72 73 public TestHttpsServer(HttpCallback cb) throws IOException { 74 this(cb, 1, 10, 0); 75 } 76 77 /** 78 * Create a <code>TestHttpsServer<code> instance with the specified number of 79 * threads and maximum number of connections per thread. This functions 80 * the same as the 4 arg constructor, where the port argument is set to zero. 81 * @param cb the callback object which is invoked to handle each 82 * incoming request 83 * @param threads the number of threads to create to handle requests 84 * in parallel 85 * @param cperthread the number of simultaneous TCP connections to 86 * handle per thread 87 */ 88 89 public TestHttpsServer(HttpCallback cb, int threads, int cperthread) 90 throws IOException { 91 this(cb, threads, cperthread, 0); 92 } 93 94 /** 95 * Create a <code>TestHttpsServer<code> instance with the specified number 96 * of threads and maximum number of connections per thread and running on 97 * the specified port. The specified number of threads are created to 98 * handle incoming requests, and each thread is allowed 99 * to handle a number of simultaneous TCP connections. 100 * @param cb the callback object which is invoked to handle 101 * each incoming request 102 * @param threads the number of threads to create to handle 103 * requests in parallel 104 * @param cperthread the number of simultaneous TCP connections 105 * to handle per thread 106 * @param port the port number to bind the server to. <code>Zero</code> 107 * means choose any free port. 108 */ 109 public TestHttpsServer(HttpCallback cb, int threads, int cperthread, int port) 110 throws IOException { 111 this(cb, threads, cperthread, null, port); 112 } 113 114 /** 115 * Create a <code>TestHttpsServer<code> instance with the specified number 116 * of threads and maximum number of connections per thread and running on 117 * the specified port. The specified number of threads are created to 118 * handle incoming requests, and each thread is allowed 119 * to handle a number of simultaneous TCP connections. 120 * @param cb the callback object which is invoked to handle 121 * each incoming request 122 * @param threads the number of threads to create to handle 123 * requests in parallel 124 * @param cperthread the number of simultaneous TCP connections 125 * to handle per thread 126 * @param address the InetAddress to bind to. {@code Null} means the 127 * wildcard address. 128 * @param port the port number to bind the server to. {@code Zero} 129 * means choose any free port. 130 */ 131 132 public TestHttpsServer(HttpCallback cb, int threads, int cperthread, InetAddress address, int port) 133 throws IOException { 134 schan = ServerSocketChannel.open(); 135 InetSocketAddress addr = new InetSocketAddress(address, port); 136 schan.socket().bind(addr); 137 this.threads = threads; 138 this.cb = cb; 139 this.cperthread = cperthread; 140 141 try { 142 // create and initialize a SSLContext 143 KeyStore ks = KeyStore.getInstance("JKS"); 144 KeyStore ts = KeyStore.getInstance("JKS"); 145 char[] passphrase = "passphrase".toCharArray(); 146 147 ks.load(new FileInputStream(System.getProperty("javax.net.ssl.keyStore")), passphrase); 148 ts.load(new FileInputStream(System.getProperty("javax.net.ssl.trustStore")), passphrase); 149 150 KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); 151 kmf.init(ks, passphrase); 152 153 TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); 154 tmf.init(ts); 155 156 sslCtx = SSLContext.getInstance("TLS"); 157 158 sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); 159 160 servers = new Server[threads]; 161 for (int i=0; i<threads; i++) { 162 servers[i] = new Server(cb, schan, cperthread); 163 servers[i].start(); 164 } 165 } catch (Exception ex) { 166 throw new RuntimeException("test failed. cause: "+ex.getMessage()); 167 } 168 } 169 170 /** Tell all threads in the server to exit within 5 seconds. 171 * This is an abortive termination. Just prior to the thread exiting 172 * all channels in that thread waiting to be closed are forceably closed. 173 */ 174 175 public void terminate() { 176 for (int i=0; i<threads; i++) { 177 servers[i].terminate (); 178 } 179 } 180 181 /** 182 * return the local port number to which the server is bound. 183 * @return the local port number 184 */ 185 186 public int getLocalPort () { 187 return schan.socket().getLocalPort (); 188 } 189 190 public String getAuthority() { 191 InetAddress address = schan.socket().getInetAddress(); 192 String hostaddr = address.getHostAddress(); 193 if (address.isAnyLocalAddress()) hostaddr = "localhost"; 194 if (hostaddr.indexOf(':') > -1) hostaddr = "[" + hostaddr + "]"; 195 return hostaddr + ":" + getLocalPort(); 196 } 197 198 static class Server extends Thread { 199 200 ServerSocketChannel schan; 201 Selector selector; 202 SelectionKey listenerKey; 203 SelectionKey key; /* the current key being processed */ 204 HttpCallback cb; 205 ByteBuffer consumeBuffer; 206 int maxconn; 207 int nconn; 208 ClosedChannelList clist; 209 boolean shutdown; 210 211 Server(HttpCallback cb, ServerSocketChannel schan, int maxconn) { 212 this.schan = schan; 213 this.maxconn = maxconn; 214 this.cb = cb; 215 nconn = 0; 216 consumeBuffer = ByteBuffer.allocate(512); 217 clist = new ClosedChannelList(); 218 try { 219 selector = Selector.open(); 220 schan.configureBlocking(false); 221 listenerKey = schan.register(selector, SelectionKey.OP_ACCEPT); 222 } catch (IOException e) { 223 System.err.println("Server could not start: " + e); 224 } 225 } 226 227 /* Stop the thread as soon as possible */ 228 public synchronized void terminate() { 229 shutdown = true; 230 } 231 232 public void run() { 233 try { 234 while (true) { 235 selector.select(1000); 236 Set selected = selector.selectedKeys(); 237 Iterator iter = selected.iterator(); 238 while (iter.hasNext()) { 239 key = (SelectionKey)iter.next(); 240 if (key.equals (listenerKey)) { 241 SocketChannel sock = schan.accept(); 242 if (sock == null) { 243 /* false notification */ 244 iter.remove(); 245 continue; 246 } 247 sock.configureBlocking(true); 248 SSLEngine sslEng = sslCtx.createSSLEngine(); 249 sslEng.setUseClientMode(false); 250 new ServerWorker(cb, sock, sslEng).start(); 251 nconn ++; 252 if (nconn == maxconn) { 253 /* deregister */ 254 listenerKey.cancel(); 255 listenerKey = null; 256 } 257 } else { 258 if (key.isReadable()) { 259 boolean closed = false; 260 SocketChannel chan = (SocketChannel)key.channel(); 261 if (key.attachment() != null) { 262 closed = consume(chan); 263 } 264 265 if (closed) { 266 chan.close(); 267 key.cancel(); 268 if (nconn == maxconn) { 269 listenerKey = schan.register(selector, SelectionKey.OP_ACCEPT); 270 } 271 nconn --; 272 } 273 } 274 } 275 iter.remove(); 276 } 277 clist.check(); 278 279 synchronized (this) { 280 if (shutdown) { 281 clist.terminate(); 282 return; 283 } 284 } 285 } 286 } catch (IOException e) { 287 System.out.println("Server exception: " + e); 288 // TODO finish 289 } 290 } 291 292 /* read all the data off the channel without looking at it 293 * return true if connection closed 294 */ 295 boolean consume(SocketChannel chan) { 296 try { 297 consumeBuffer.clear(); 298 int c = chan.read(consumeBuffer); 299 if (c == -1) 300 return true; 301 } catch (IOException e) { 302 return true; 303 } 304 return false; 305 } 306 } 307 308 static class ServerWorker extends Thread { 309 private ByteBuffer inNetBB; 310 private ByteBuffer outNetBB; 311 private ByteBuffer inAppBB; 312 private ByteBuffer outAppBB; 313 314 SSLEngine sslEng; 315 SocketChannel schan; 316 HttpCallback cb; 317 HandshakeStatus currentHSStatus; 318 boolean initialHSComplete; 319 /* 320 * All inbound data goes through this buffer. 321 * 322 * It might be nice to use a cache of ByteBuffers so we're 323 * not alloc/dealloc'ing all over the place. 324 */ 325 326 /* 327 * Application buffers, also used for handshaking 328 */ 329 private int appBBSize; 330 331 ServerWorker(HttpCallback cb, SocketChannel schan, SSLEngine sslEng) { 332 this.sslEng = sslEng; 333 this.schan = schan; 334 this.cb = cb; 335 currentHSStatus = HandshakeStatus.NEED_UNWRAP; 336 initialHSComplete = false; 337 int netBBSize = sslEng.getSession().getPacketBufferSize(); 338 inNetBB = ByteBuffer.allocate(netBBSize); 339 outNetBB = ByteBuffer.allocate(netBBSize); 340 appBBSize = sslEng.getSession().getApplicationBufferSize(); 341 inAppBB = ByteBuffer.allocate(appBBSize); 342 outAppBB = ByteBuffer.allocate(appBBSize); 343 } 344 345 public SSLEngine getSSLEngine() { 346 return sslEng; 347 } 348 349 public ByteBuffer outNetBB() { 350 return outNetBB; 351 } 352 353 public ByteBuffer outAppBB() { 354 return outAppBB; 355 } 356 357 public void run () { 358 try { 359 SSLEngineResult result; 360 361 while (!initialHSComplete) { 362 363 switch (currentHSStatus) { 364 365 case NEED_UNWRAP: 366 int bytes = schan.read(inNetBB); 367 368 needIO: 369 while (currentHSStatus == HandshakeStatus.NEED_UNWRAP) { 370 /* 371 * Don't need to resize requestBB, since no app data should 372 * be generated here. 373 */ 374 inNetBB.flip(); 375 result = sslEng.unwrap(inNetBB, inAppBB); 376 inNetBB.compact(); 377 currentHSStatus = result.getHandshakeStatus(); 378 379 switch (result.getStatus()) { 380 381 case OK: 382 switch (currentHSStatus) { 383 case NOT_HANDSHAKING: 384 throw new IOException( 385 "Not handshaking during initial handshake"); 386 387 case NEED_TASK: 388 Runnable task; 389 while ((task = sslEng.getDelegatedTask()) != null) { 390 task.run(); 391 currentHSStatus = sslEng.getHandshakeStatus(); 392 } 393 break; 394 } 395 396 break; 397 398 case BUFFER_UNDERFLOW: 399 break needIO; 400 401 default: // BUFFER_OVERFLOW/CLOSED: 402 throw new IOException("Received" + result.getStatus() + 403 "during initial handshaking"); 404 } 405 } 406 407 /* 408 * Just transitioned from read to write. 409 */ 410 if (currentHSStatus != HandshakeStatus.NEED_WRAP) { 411 break; 412 } 413 414 // Fall through and fill the write buffer. 415 416 case NEED_WRAP: 417 /* 418 * The flush above guarantees the out buffer to be empty 419 */ 420 outNetBB.clear(); 421 result = sslEng.wrap(inAppBB, outNetBB); 422 outNetBB.flip(); 423 schan.write (outNetBB); 424 outNetBB.compact(); 425 currentHSStatus = result.getHandshakeStatus(); 426 427 switch (result.getStatus()) { 428 case OK: 429 430 if (currentHSStatus == HandshakeStatus.NEED_TASK) { 431 Runnable task; 432 while ((task = sslEng.getDelegatedTask()) != null) { 433 task.run(); 434 currentHSStatus = sslEng.getHandshakeStatus(); 435 } 436 } 437 438 break; 439 440 default: // BUFFER_OVERFLOW/BUFFER_UNDERFLOW/CLOSED: 441 throw new IOException("Received" + result.getStatus() + 442 "during initial handshaking"); 443 } 444 break; 445 446 case FINISHED: 447 initialHSComplete = true; 448 break; 449 default: // NOT_HANDSHAKING/NEED_TASK 450 throw new RuntimeException("Invalid Handshaking State" + 451 currentHSStatus); 452 } // switch 453 } 454 // read the application data; using non-blocking mode 455 schan.configureBlocking(false); 456 read(schan, sslEng); 457 } catch (Exception ex) { 458 throw new RuntimeException(ex); 459 } 460 } 461 462 /* return true if the connection is closed, false otherwise */ 463 464 private boolean read(SocketChannel chan, SSLEngine sslEng) { 465 HttpTransaction msg; 466 boolean res; 467 try { 468 InputStream is = new BufferedInputStream(new NioInputStream(chan, sslEng, inNetBB, inAppBB)); 469 String requestline = readLine(is); 470 MessageHeader mhead = new MessageHeader(is); 471 String clen = mhead.findValue("Content-Length"); 472 String trferenc = mhead.findValue("Transfer-Encoding"); 473 String data = null; 474 if (trferenc != null && trferenc.equals("chunked")) 475 data = new String(readChunkedData(is)); 476 else if (clen != null) 477 data = new String(readNormalData(is, Integer.parseInt(clen))); 478 String[] req = requestline.split(" "); 479 if (req.length < 2) { 480 /* invalid request line */ 481 return false; 482 } 483 String cmd = req[0]; 484 URI uri = null; 485 try { 486 uri = new URI(req[1]); 487 msg = new HttpTransaction(this, cmd, uri, mhead, data, null, chan); 488 cb.request(msg); 489 } catch (URISyntaxException e) { 490 System.err.println ("Invalid URI: " + e); 491 msg = new HttpTransaction(this, cmd, null, null, null, null, chan); 492 msg.sendResponse(501, "Whatever"); 493 } 494 res = false; 495 } catch (IOException e) { 496 res = true; 497 } 498 return res; 499 } 500 501 byte[] readNormalData(InputStream is, int len) throws IOException { 502 byte[] buf = new byte[len]; 503 int c, off=0, remain=len; 504 while (remain > 0 && ((c=is.read (buf, off, remain))>0)) { 505 remain -= c; 506 off += c; 507 } 508 return buf; 509 } 510 511 private void readCRLF(InputStream is) throws IOException { 512 int cr = is.read(); 513 int lf = is.read(); 514 515 if (((cr & 0xff) != 0x0d) || 516 ((lf & 0xff) != 0x0a)) { 517 throw new IOException( 518 "Expected <CR><LF>: got '" + cr + "/" + lf + "'"); 519 } 520 } 521 522 byte[] readChunkedData(InputStream is) throws IOException { 523 LinkedList l = new LinkedList(); 524 int total = 0; 525 for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) { 526 l.add(readNormalData(is, len)); 527 total += len; 528 readCRLF(is); // CRLF at end of chunk 529 } 530 readCRLF(is); // CRLF at end of Chunked Stream. 531 byte[] buf = new byte[total]; 532 Iterator i = l.iterator(); 533 int x = 0; 534 while (i.hasNext()) { 535 byte[] b = (byte[])i.next(); 536 System.arraycopy(b, 0, buf, x, b.length); 537 x += b.length; 538 } 539 return buf; 540 } 541 542 private int readChunkLen(InputStream is) throws IOException { 543 int c, len=0; 544 boolean done=false, readCR=false; 545 while (!done) { 546 c = is.read(); 547 if (c == '\n' && readCR) { 548 done = true; 549 } else { 550 if (c == '\r' && !readCR) { 551 readCR = true; 552 } else { 553 int x=0; 554 if (c >= 'a' && c <= 'f') { 555 x = c - 'a' + 10; 556 } else if (c >= 'A' && c <= 'F') { 557 x = c - 'A' + 10; 558 } else if (c >= '0' && c <= '9') { 559 x = c - '0'; 560 } 561 len = len * 16 + x; 562 } 563 } 564 } 565 return len; 566 } 567 568 private String readLine(InputStream is) throws IOException { 569 boolean done=false, readCR=false; 570 byte[] b = new byte[512]; 571 int c, l = 0; 572 573 while (!done) { 574 c = is.read(); 575 if (c == '\n' && readCR) { 576 done = true; 577 } else { 578 if (c == '\r' && !readCR) { 579 readCR = true; 580 } else { 581 b[l++] = (byte)c; 582 } 583 } 584 } 585 return new String(b); 586 } 587 588 /** close the channel associated with the current key by: 589 * 1. shutdownOutput (send a FIN) 590 * 2. mark the key so that incoming data is to be consumed and discarded 591 * 3. After a period, close the socket 592 */ 593 594 synchronized void orderlyCloseChannel(SocketChannel ch) throws IOException { 595 ch.socket().shutdownOutput(); 596 } 597 598 synchronized void abortiveCloseChannel(SocketChannel ch) throws IOException { 599 Socket s = ch.socket(); 600 s.setSoLinger(true, 0); 601 ch.close(); 602 } 603 } 604 605 606 /** 607 * Implements blocking reading semantics on top of a non-blocking channel 608 */ 609 610 static class NioInputStream extends InputStream { 611 SSLEngine sslEng; 612 SocketChannel channel; 613 Selector selector; 614 ByteBuffer inNetBB; 615 ByteBuffer inAppBB; 616 SelectionKey key; 617 int available; 618 byte[] one; 619 boolean closed; 620 ByteBuffer markBuf; /* reads may be satisifed from this buffer */ 621 boolean marked; 622 boolean reset; 623 int readlimit; 624 625 public NioInputStream(SocketChannel chan, SSLEngine sslEng, ByteBuffer inNetBB, ByteBuffer inAppBB) throws IOException { 626 this.sslEng = sslEng; 627 this.channel = chan; 628 selector = Selector.open(); 629 this.inNetBB = inNetBB; 630 this.inAppBB = inAppBB; 631 key = chan.register(selector, SelectionKey.OP_READ); 632 available = 0; 633 one = new byte[1]; 634 closed = marked = reset = false; 635 } 636 637 public synchronized int read(byte[] b) throws IOException { 638 return read(b, 0, b.length); 639 } 640 641 public synchronized int read() throws IOException { 642 return read(one, 0, 1); 643 } 644 645 public synchronized int read(byte[] b, int off, int srclen) throws IOException { 646 647 int canreturn, willreturn; 648 649 if (closed) 650 return -1; 651 652 if (reset) { /* satisfy from markBuf */ 653 canreturn = markBuf.remaining(); 654 willreturn = canreturn > srclen ? srclen : canreturn; 655 markBuf.get(b, off, willreturn); 656 if (canreturn == willreturn) { 657 reset = false; 658 } 659 } else { /* satisfy from channel */ 660 canreturn = available(); 661 if (canreturn == 0) { 662 block(); 663 canreturn = available(); 664 } 665 willreturn = canreturn > srclen ? srclen : canreturn; 666 inAppBB.get(b, off, willreturn); 667 available -= willreturn; 668 669 if (marked) { /* copy into markBuf */ 670 try { 671 markBuf.put(b, off, willreturn); 672 } catch (BufferOverflowException e) { 673 marked = false; 674 } 675 } 676 } 677 return willreturn; 678 } 679 680 public synchronized int available() throws IOException { 681 if (closed) 682 throw new IOException("Stream is closed"); 683 684 if (reset) 685 return markBuf.remaining(); 686 687 if (available > 0) 688 return available; 689 690 inAppBB.clear(); 691 int bytes = channel.read(inNetBB); 692 693 int needed = sslEng.getSession().getApplicationBufferSize(); 694 if (needed > inAppBB.remaining()) { 695 inAppBB = ByteBuffer.allocate(needed); 696 } 697 inNetBB.flip(); 698 SSLEngineResult result = sslEng.unwrap(inNetBB, inAppBB); 699 inNetBB.compact(); 700 available = result.bytesProduced(); 701 702 if (available > 0) 703 inAppBB.flip(); 704 else if (available == -1) 705 throw new IOException("Stream is closed"); 706 return available; 707 } 708 709 /** 710 * block() only called when available==0 and buf is empty 711 */ 712 private synchronized void block() throws IOException { 713 //assert available == 0; 714 int n = selector.select(); 715 //assert n == 1; 716 selector.selectedKeys().clear(); 717 available(); 718 } 719 720 public void close() throws IOException { 721 if (closed) 722 return; 723 channel.close(); 724 closed = true; 725 } 726 727 public synchronized void mark(int readlimit) { 728 if (closed) 729 return; 730 this.readlimit = readlimit; 731 markBuf = ByteBuffer.allocate(readlimit); 732 marked = true; 733 reset = false; 734 } 735 736 public synchronized void reset() throws IOException { 737 if (closed ) 738 return; 739 if (!marked) 740 throw new IOException("Stream not marked"); 741 marked = false; 742 reset = true; 743 markBuf.flip(); 744 } 745 } 746 747 static class NioOutputStream extends OutputStream { 748 SSLEngine sslEng; 749 SocketChannel channel; 750 ByteBuffer outNetBB; 751 ByteBuffer outAppBB; 752 SelectionKey key; 753 Selector selector; 754 boolean closed; 755 byte[] one; 756 757 public NioOutputStream(SocketChannel channel, SSLEngine sslEng, ByteBuffer outNetBB, ByteBuffer outAppBB) throws IOException { 758 this.sslEng = sslEng; 759 this.channel = channel; 760 this.outNetBB = outNetBB; 761 this.outAppBB = outAppBB; 762 selector = Selector.open(); 763 key = channel.register(selector, SelectionKey.OP_WRITE); 764 closed = false; 765 one = new byte[1]; 766 } 767 768 public synchronized void write(int b) throws IOException { 769 one[0] = (byte)b; 770 write(one, 0, 1); 771 } 772 773 public synchronized void write(byte[] b) throws IOException { 774 write(b, 0, b.length); 775 } 776 777 public synchronized void write(byte[] b, int off, int len) throws IOException { 778 if (closed) 779 throw new IOException("stream is closed"); 780 781 outAppBB = ByteBuffer.allocate(len); 782 outAppBB.put(b, off, len); 783 outAppBB.flip(); 784 int n; 785 outNetBB.clear(); 786 int needed = sslEng.getSession().getPacketBufferSize(); 787 if (outNetBB.capacity() < needed) { 788 outNetBB = ByteBuffer.allocate(needed); 789 } 790 SSLEngineResult ret = sslEng.wrap(outAppBB, outNetBB); 791 outNetBB.flip(); 792 int newLen = ret.bytesProduced(); 793 while ((n = channel.write (outNetBB)) < newLen) { 794 newLen -= n; 795 if (newLen == 0) 796 return; 797 selector.select(); 798 selector.selectedKeys().clear(); 799 } 800 } 801 802 public void close() throws IOException { 803 if (closed) 804 return; 805 channel.close(); 806 closed = true; 807 } 808 } 809 810 /** 811 * Utilities for synchronization. A condition is 812 * identified by a string name, and is initialized 813 * upon first use (ie. setCondition() or waitForCondition()). Threads 814 * are blocked until some thread calls (or has called) setCondition() for the same 815 * condition. 816 * <P> 817 * A rendezvous built on a condition is also provided for synchronizing 818 * N threads. 819 */ 820 821 private static HashMap conditions = new HashMap(); 822 823 /* 824 * Modifiable boolean object 825 */ 826 private static class BValue { 827 boolean v; 828 } 829 830 /* 831 * Modifiable int object 832 */ 833 private static class IValue { 834 int v; 835 IValue(int i) { 836 v =i; 837 } 838 } 839 840 841 private static BValue getCond(String condition) { 842 synchronized (conditions) { 843 BValue cond = (BValue) conditions.get(condition); 844 if (cond == null) { 845 cond = new BValue(); 846 conditions.put(condition, cond); 847 } 848 return cond; 849 } 850 } 851 852 /** 853 * Set the condition to true. Any threads that are currently blocked 854 * waiting on the condition, will be unblocked and allowed to continue. 855 * Threads that subsequently call waitForCondition() will not block. 856 * If the named condition did not exist prior to the call, then it is created 857 * first. 858 */ 859 860 public static void setCondition(String condition) { 861 BValue cond = getCond(condition); 862 synchronized (cond) { 863 if (cond.v) { 864 return; 865 } 866 cond.v = true; 867 cond.notifyAll(); 868 } 869 } 870 871 /** 872 * If the named condition does not exist, then it is created and initialized 873 * to false. If the condition exists or has just been created and its value 874 * is false, then the thread blocks until another thread sets the condition. 875 * If the condition exists and is already set to true, then this call returns 876 * immediately without blocking. 877 */ 878 879 public static void waitForCondition(String condition) { 880 BValue cond = getCond(condition); 881 synchronized (cond) { 882 if (!cond.v) { 883 try { 884 cond.wait(); 885 } catch (InterruptedException e) {} 886 } 887 } 888 } 889 890 /* conditions must be locked when accessing this */ 891 static HashMap rv = new HashMap(); 892 893 /** 894 * Force N threads to rendezvous (ie. wait for each other) before proceeding. 895 * The first thread(s) to call are blocked until the last 896 * thread makes the call. Then all threads continue. 897 * <p> 898 * All threads that call with the same condition name, must use the same value 899 * for N (or the results may be not be as expected). 900 * <P> 901 * Obviously, if fewer than N threads make the rendezvous then the result 902 * will be a hang. 903 */ 904 905 public static void rendezvous(String condition, int N) { 906 BValue cond; 907 IValue iv; 908 String name = "RV_"+condition; 909 910 /* get the condition */ 911 912 synchronized (conditions) { 913 cond = (BValue)conditions.get(name); 914 if (cond == null) { 915 /* we are first caller */ 916 if (N < 2) { 917 throw new RuntimeException("rendezvous must be called with N >= 2"); 918 } 919 cond = new BValue(); 920 conditions.put(name, cond); 921 iv = new IValue(N-1); 922 rv.put(name, iv); 923 } else { 924 /* already initialised, just decrement the counter */ 925 iv = (IValue) rv.get(name); 926 iv.v--; 927 } 928 } 929 930 if (iv.v > 0) { 931 waitForCondition(name); 932 } else { 933 setCondition(name); 934 synchronized (conditions) { 935 clearCondition(name); 936 rv.remove(name); 937 } 938 } 939 } 940 941 /** 942 * If the named condition exists and is set then remove it, so it can 943 * be re-initialized and used again. If the condition does not exist, or 944 * exists but is not set, then the call returns without doing anything. 945 * Note, some higher level synchronization 946 * may be needed between clear and the other operations. 947 */ 948 949 public static void clearCondition(String condition) { 950 BValue cond; 951 synchronized (conditions) { 952 cond = (BValue) conditions.get(condition); 953 if (cond == null) { 954 return; 955 } 956 synchronized (cond) { 957 if (cond.v) { 958 conditions.remove(condition); 959 } 960 } 961 } 962 } 963 }