1 /* 2 * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.corba.se.impl.transport; 27 28 import java.io.IOException; 29 import java.net.ServerSocket; 30 import java.nio.channels.ClosedChannelException; 31 import java.nio.channels.SelectableChannel; 32 import java.nio.channels.ServerSocketChannel; 33 import java.nio.channels.SelectionKey; 34 import java.nio.channels.Selector; 35 import java.nio.channels.ClosedSelectorException; 36 import java.util.ArrayList; 37 import java.util.HashMap; 38 import java.util.Map; 39 import java.util.Iterator; 40 import java.util.List; 41 42 43 import com.sun.corba.se.pept.broker.Broker; 44 import com.sun.corba.se.pept.transport.Acceptor; 45 import com.sun.corba.se.pept.transport.Connection; 46 import com.sun.corba.se.pept.transport.EventHandler; 47 import com.sun.corba.se.pept.transport.ListenerThread; 48 import com.sun.corba.se.pept.transport.ReaderThread; 49 50 import com.sun.corba.se.spi.logging.CORBALogDomains; 51 import com.sun.corba.se.spi.orb.ORB; 52 import com.sun.corba.se.spi.orbutil.threadpool.Work; 53 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException; 54 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 55 56 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 57 import com.sun.corba.se.impl.orbutil.ORBUtility; 58 59 /** 60 * @author Harold Carr 61 */ 62 class SelectorImpl 63 extends 64 sun.misc.ManagedLocalsThread 65 implements 66 com.sun.corba.se.pept.transport.Selector 67 { 68 private ORB orb; 69 private Selector selector; 70 private long timeout; 71 private List deferredRegistrations; 72 private List interestOpsList; 73 private HashMap listenerThreads; 74 private Map readerThreads; 75 private boolean selectorStarted; 76 private volatile boolean closed; 77 private ORBUtilSystemException wrapper; 78 79 80 public SelectorImpl(ORB orb) 81 { 82 this.orb = orb; 83 selector = null; 84 selectorStarted = false; 85 timeout = 60000; 86 deferredRegistrations = new ArrayList(); 87 interestOpsList = new ArrayList(); 88 listenerThreads = new HashMap(); 89 readerThreads = java.util.Collections.synchronizedMap(new HashMap()); 90 closed = false; 91 wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT); 92 } 93 94 public void setTimeout(long timeout) 95 { 96 this.timeout = timeout; 97 } 98 99 public long getTimeout() 100 { 101 return timeout; 102 } 103 104 public void registerInterestOps(EventHandler eventHandler) 105 { 106 if (orb.transportDebugFlag) { 107 dprint(".registerInterestOps:-> " + eventHandler); 108 } 109 110 SelectionKey selectionKey = eventHandler.getSelectionKey(); 111 if (selectionKey.isValid()) { 112 int ehOps = eventHandler.getInterestOps(); 113 SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps); 114 synchronized(interestOpsList) { 115 interestOpsList.add(keyAndOp); 116 } 117 // tell Selector Thread there's an update to a SelectorKey's Ops 118 try { 119 if (selector != null) { 120 // wakeup Selector thread to process close request 121 selector.wakeup(); 122 } 123 } catch (Throwable t) { 124 if (orb.transportDebugFlag) { 125 dprint(".registerInterestOps: selector.wakeup: ", t); 126 } 127 } 128 } 129 else { 130 wrapper.selectionKeyInvalid(eventHandler.toString()); 131 if (orb.transportDebugFlag) { 132 dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler); 133 } 134 } 135 136 if (orb.transportDebugFlag) { 137 dprint(".registerInterestOps:<- "); 138 } 139 } 140 141 public void registerForEvent(EventHandler eventHandler) 142 { 143 if (orb.transportDebugFlag) { 144 dprint(".registerForEvent: " + eventHandler); 145 } 146 147 if (isClosed()) { 148 if (orb.transportDebugFlag) { 149 dprint(".registerForEvent: closed: " + eventHandler); 150 } 151 return; 152 } 153 154 if (eventHandler.shouldUseSelectThreadToWait()) { 155 synchronized (deferredRegistrations) { 156 deferredRegistrations.add(eventHandler); 157 } 158 if (! selectorStarted) { 159 startSelector(); 160 } 161 selector.wakeup(); 162 return; 163 } 164 165 switch (eventHandler.getInterestOps()) { 166 case SelectionKey.OP_ACCEPT : 167 createListenerThread(eventHandler); 168 break; 169 case SelectionKey.OP_READ : 170 createReaderThread(eventHandler); 171 break; 172 default: 173 if (orb.transportDebugFlag) { 174 dprint(".registerForEvent: default: " + eventHandler); 175 } 176 throw new RuntimeException( 177 "SelectorImpl.registerForEvent: unknown interest ops"); 178 } 179 } 180 181 public void unregisterForEvent(EventHandler eventHandler) 182 { 183 if (orb.transportDebugFlag) { 184 dprint(".unregisterForEvent: " + eventHandler); 185 } 186 187 if (isClosed()) { 188 if (orb.transportDebugFlag) { 189 dprint(".unregisterForEvent: closed: " + eventHandler); 190 } 191 return; 192 } 193 194 if (eventHandler.shouldUseSelectThreadToWait()) { 195 SelectionKey selectionKey ; 196 synchronized(deferredRegistrations) { 197 selectionKey = eventHandler.getSelectionKey(); 198 } 199 if (selectionKey != null) { 200 selectionKey.cancel(); 201 } 202 if (selector != null) { 203 selector.wakeup(); 204 } 205 return; 206 } 207 208 switch (eventHandler.getInterestOps()) { 209 case SelectionKey.OP_ACCEPT : 210 destroyListenerThread(eventHandler); 211 break; 212 case SelectionKey.OP_READ : 213 destroyReaderThread(eventHandler); 214 break; 215 default: 216 if (orb.transportDebugFlag) { 217 dprint(".unregisterForEvent: default: " + eventHandler); 218 } 219 throw new RuntimeException( 220 "SelectorImpl.uregisterForEvent: unknown interest ops"); 221 } 222 } 223 224 public void close() 225 { 226 if (orb.transportDebugFlag) { 227 dprint(".close"); 228 } 229 230 if (isClosed()) { 231 if (orb.transportDebugFlag) { 232 dprint(".close: already closed"); 233 } 234 return; 235 } 236 237 setClosed(true); 238 239 Iterator i; 240 241 // Kill listeners. 242 243 i = listenerThreads.values().iterator(); 244 while (i.hasNext()) { 245 ListenerThread listenerThread = (ListenerThread) i.next(); 246 listenerThread.close(); 247 } 248 249 // Kill readers. 250 251 i = readerThreads.values().iterator(); 252 while (i.hasNext()) { 253 ReaderThread readerThread = (ReaderThread) i.next(); 254 readerThread.close(); 255 } 256 257 clearDeferredRegistrations(); 258 259 // Selector 260 261 try { 262 if (selector != null) { 263 // wakeup Selector thread to process close request 264 selector.wakeup(); 265 } 266 } catch (Throwable t) { 267 if (orb.transportDebugFlag) { 268 dprint(".close: selector.wakeup: ", t); 269 } 270 } 271 } 272 273 /////////////////////////////////////////////////// 274 // 275 // Thread methods. 276 // 277 278 public void run() 279 { 280 setName("SelectorThread"); 281 while (!closed) { 282 try { 283 int n = 0; 284 if (timeout == 0 && orb.transportDebugFlag) { 285 dprint(".run: Beginning of selection cycle"); 286 } 287 handleDeferredRegistrations(); 288 enableInterestOps(); 289 try { 290 n = selector.select(timeout); 291 } catch (IOException e) { 292 if (orb.transportDebugFlag) { 293 dprint(".run: selector.select: ", e); 294 } 295 } catch (ClosedSelectorException csEx) { 296 if (orb.transportDebugFlag) { 297 dprint(".run: selector.select: ", csEx); 298 } 299 break; 300 } 301 if (closed) { 302 break; 303 } 304 /* 305 if (timeout == 0 && orb.transportDebugFlag) { 306 dprint(".run: selector.select() returned: " + n); 307 } 308 if (n == 0) { 309 continue; 310 } 311 */ 312 Iterator iterator = selector.selectedKeys().iterator(); 313 if (orb.transportDebugFlag) { 314 if (iterator.hasNext()) { 315 dprint(".run: n = " + n); 316 } 317 } 318 while (iterator.hasNext()) { 319 SelectionKey selectionKey = (SelectionKey) iterator.next(); 320 iterator.remove(); 321 EventHandler eventHandler = (EventHandler) 322 selectionKey.attachment(); 323 try { 324 eventHandler.handleEvent(); 325 } catch (Throwable t) { 326 if (orb.transportDebugFlag) { 327 dprint(".run: eventHandler.handleEvent", t); 328 } 329 } 330 } 331 if (timeout == 0 && orb.transportDebugFlag) { 332 dprint(".run: End of selection cycle"); 333 } 334 } catch (Throwable t) { 335 // IMPORTANT: ignore all errors so the select thread keeps running. 336 // Otherwise a guaranteed hang. 337 if (orb.transportDebugFlag) { 338 dprint(".run: ignoring", t); 339 } 340 } 341 } 342 try { 343 if (selector != null) { 344 if (orb.transportDebugFlag) { 345 dprint(".run: selector.close "); 346 } 347 selector.close(); 348 } 349 } catch (Throwable t) { 350 if (orb.transportDebugFlag) { 351 dprint(".run: selector.close: ", t); 352 } 353 } 354 } 355 356 ///////////////////////////////////////////////////// 357 // 358 // Implementation. 359 // 360 361 private void clearDeferredRegistrations() { 362 synchronized (deferredRegistrations) { 363 int deferredListSize = deferredRegistrations.size(); 364 if (orb.transportDebugFlag) { 365 dprint(".clearDeferredRegistrations:deferred list size == " + deferredListSize); 366 } 367 for (int i = 0; i < deferredListSize; i++) { 368 EventHandler eventHandler = 369 (EventHandler)deferredRegistrations.get(i); 370 if (orb.transportDebugFlag) { 371 dprint(".clearDeferredRegistrations: " + eventHandler); 372 } 373 SelectableChannel channel = eventHandler.getChannel(); 374 SelectionKey selectionKey = null; 375 376 try { 377 if (orb.transportDebugFlag) { 378 dprint(".clearDeferredRegistrations:close channel == " 379 + channel); 380 dprint(".clearDeferredRegistrations:close channel class == " 381 + channel.getClass().getName()); 382 } 383 channel.close(); 384 selectionKey = eventHandler.getSelectionKey(); 385 if (selectionKey != null) { 386 selectionKey.cancel(); 387 selectionKey.attach(null); 388 } 389 } catch (IOException ioEx) { 390 if (orb.transportDebugFlag) { 391 dprint(".clearDeferredRegistrations: ", ioEx); 392 } 393 } 394 } 395 deferredRegistrations.clear(); 396 } 397 } 398 399 private synchronized boolean isClosed () 400 { 401 return closed; 402 } 403 404 private synchronized void setClosed(boolean closed) 405 { 406 this.closed = closed; 407 } 408 409 private void startSelector() 410 { 411 try { 412 selector = Selector.open(); 413 } catch (IOException e) { 414 if (orb.transportDebugFlag) { 415 dprint(".startSelector: Selector.open: IOException: ", e); 416 } 417 // REVISIT - better handling/reporting 418 RuntimeException rte = 419 new RuntimeException(".startSelector: Selector.open exception"); 420 rte.initCause(e); 421 throw rte; 422 } 423 setDaemon(true); 424 start(); 425 selectorStarted = true; 426 if (orb.transportDebugFlag) { 427 dprint(".startSelector: selector.start completed."); 428 } 429 } 430 431 private void handleDeferredRegistrations() 432 { 433 synchronized (deferredRegistrations) { 434 int deferredListSize = deferredRegistrations.size(); 435 for (int i = 0; i < deferredListSize; i++) { 436 EventHandler eventHandler = 437 (EventHandler)deferredRegistrations.get(i); 438 if (orb.transportDebugFlag) { 439 dprint(".handleDeferredRegistrations: " + eventHandler); 440 } 441 SelectableChannel channel = eventHandler.getChannel(); 442 SelectionKey selectionKey = null; 443 try { 444 selectionKey = 445 channel.register(selector, 446 eventHandler.getInterestOps(), 447 (Object)eventHandler); 448 } catch (ClosedChannelException e) { 449 if (orb.transportDebugFlag) { 450 dprint(".handleDeferredRegistrations: ", e); 451 } 452 } 453 eventHandler.setSelectionKey(selectionKey); 454 } 455 deferredRegistrations.clear(); 456 } 457 } 458 459 private void enableInterestOps() 460 { 461 synchronized (interestOpsList) { 462 int listSize = interestOpsList.size(); 463 if (listSize > 0) { 464 if (orb.transportDebugFlag) { 465 dprint(".enableInterestOps:->"); 466 } 467 SelectionKey selectionKey = null; 468 SelectionKeyAndOp keyAndOp = null; 469 int keyOp, selectionKeyOps = 0; 470 for (int i = 0; i < listSize; i++) { 471 keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i); 472 selectionKey = keyAndOp.selectionKey; 473 474 // Need to check if the SelectionKey is valid because a 475 // connection's SelectionKey could be put on the list to 476 // have its OP enabled and before it's enabled be reclaimed. 477 // Otherwise, the enabling of the OP will throw an exception 478 // here and exit this method an potentially not enable all 479 // registered ops. 480 // 481 // So, we ignore SelectionKeys that are invalid. They will get 482 // cleaned up on the next Selector.select() call. 483 484 if (selectionKey.isValid()) { 485 if (orb.transportDebugFlag) { 486 dprint(".enableInterestOps: " + keyAndOp); 487 } 488 keyOp = keyAndOp.keyOp; 489 selectionKeyOps = selectionKey.interestOps(); 490 selectionKey.interestOps(selectionKeyOps | keyOp); 491 } 492 } 493 interestOpsList.clear(); 494 if (orb.transportDebugFlag) { 495 dprint(".enableInterestOps:<-"); 496 } 497 } 498 } 499 } 500 501 private void createListenerThread(EventHandler eventHandler) 502 { 503 if (orb.transportDebugFlag) { 504 dprint(".createListenerThread: " + eventHandler); 505 } 506 Acceptor acceptor = eventHandler.getAcceptor(); 507 ListenerThread listenerThread = 508 new ListenerThreadImpl(orb, acceptor, this); 509 listenerThreads.put(eventHandler, listenerThread); 510 Throwable throwable = null; 511 try { 512 orb.getThreadPoolManager().getThreadPool(0) 513 .getWorkQueue(0).addWork((Work)listenerThread); 514 } catch (NoSuchThreadPoolException e) { 515 throwable = e; 516 } catch (NoSuchWorkQueueException e) { 517 throwable = e; 518 } 519 if (throwable != null) { 520 RuntimeException rte = new RuntimeException(throwable.toString()); 521 rte.initCause(throwable); 522 throw rte; 523 } 524 } 525 526 private void destroyListenerThread(EventHandler eventHandler) 527 { 528 if (orb.transportDebugFlag) { 529 dprint(".destroyListenerThread: " + eventHandler); 530 } 531 ListenerThread listenerThread = (ListenerThread) 532 listenerThreads.get(eventHandler); 533 if (listenerThread == null) { 534 if (orb.transportDebugFlag) { 535 dprint(".destroyListenerThread: cannot find ListenerThread - ignoring."); 536 } 537 return; 538 } 539 listenerThreads.remove(eventHandler); 540 listenerThread.close(); 541 } 542 543 private void createReaderThread(EventHandler eventHandler) 544 { 545 if (orb.transportDebugFlag) { 546 dprint(".createReaderThread: " + eventHandler); 547 } 548 Connection connection = eventHandler.getConnection(); 549 ReaderThread readerThread = 550 new ReaderThreadImpl(orb, connection, this); 551 readerThreads.put(eventHandler, readerThread); 552 Throwable throwable = null; 553 try { 554 orb.getThreadPoolManager().getThreadPool(0) 555 .getWorkQueue(0).addWork((Work)readerThread); 556 } catch (NoSuchThreadPoolException e) { 557 throwable = e; 558 } catch (NoSuchWorkQueueException e) { 559 throwable = e; 560 } 561 if (throwable != null) { 562 RuntimeException rte = new RuntimeException(throwable.toString()); 563 rte.initCause(throwable); 564 throw rte; 565 } 566 } 567 568 private void destroyReaderThread(EventHandler eventHandler) 569 { 570 if (orb.transportDebugFlag) { 571 dprint(".destroyReaderThread: " + eventHandler); 572 } 573 ReaderThread readerThread = (ReaderThread) 574 readerThreads.get(eventHandler); 575 if (readerThread == null) { 576 if (orb.transportDebugFlag) { 577 dprint(".destroyReaderThread: cannot find ReaderThread - ignoring."); 578 } 579 return; 580 } 581 readerThreads.remove(eventHandler); 582 readerThread.close(); 583 } 584 585 private void dprint(String msg) 586 { 587 ORBUtility.dprint("SelectorImpl", msg); 588 } 589 590 protected void dprint(String msg, Throwable t) 591 { 592 dprint(msg); 593 t.printStackTrace(System.out); 594 } 595 596 // Private class to contain a SelectionKey and a SelectionKey op. 597 // Used only by SelectorImpl to register and enable SelectionKey 598 // Op. 599 // REVISIT - Could do away with this class and use the EventHanlder 600 // directly. 601 private class SelectionKeyAndOp 602 { 603 // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT] 604 public int keyOp; 605 public SelectionKey selectionKey; 606 607 // constructor 608 public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) { 609 this.selectionKey = selectionKey; 610 this.keyOp = keyOp; 611 } 612 } 613 614 // End of file. 615 }