src/java.corba/share/classes/com/sun/corba/se/impl/transport/SelectorImpl.java

Print this page




   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.transport;
  27 
  28 import java.io.IOException;

  29 import java.nio.channels.ClosedChannelException;
  30 import java.nio.channels.SelectableChannel;

  31 import java.nio.channels.SelectionKey;
  32 import java.nio.channels.Selector;

  33 import java.util.ArrayList;
  34 import java.util.HashMap;
  35 import java.util.Map;
  36 import java.util.Iterator;
  37 import java.util.List;
  38 

  39 import com.sun.corba.se.pept.broker.Broker;
  40 import com.sun.corba.se.pept.transport.Acceptor;
  41 import com.sun.corba.se.pept.transport.Connection;
  42 import com.sun.corba.se.pept.transport.EventHandler;
  43 import com.sun.corba.se.pept.transport.ListenerThread;
  44 import com.sun.corba.se.pept.transport.ReaderThread;
  45 
  46 import com.sun.corba.se.spi.logging.CORBALogDomains;
  47 import com.sun.corba.se.spi.orb.ORB;
  48 import com.sun.corba.se.spi.orbutil.threadpool.Work;
  49 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
  50 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  51 
  52 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  53 import com.sun.corba.se.impl.orbutil.ORBUtility;
  54 
  55 /**
  56  * @author Harold Carr
  57  */
  58 class SelectorImpl


  94 
  95     public long getTimeout()
  96     {
  97         return timeout;
  98     }
  99 
 100     public void registerInterestOps(EventHandler eventHandler)
 101     {
 102         if (orb.transportDebugFlag) {
 103             dprint(".registerInterestOps:-> " + eventHandler);
 104         }
 105 
 106         SelectionKey selectionKey = eventHandler.getSelectionKey();
 107         if (selectionKey.isValid()) {
 108             int ehOps = eventHandler.getInterestOps();
 109             SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
 110             synchronized(interestOpsList) {
 111                 interestOpsList.add(keyAndOp);
 112             }
 113             // tell Selector Thread there's an update to a SelectorKey's Ops



 114             selector.wakeup();
 115         }






 116         else {
 117             wrapper.selectionKeyInvalid(eventHandler.toString());
 118             if (orb.transportDebugFlag) {
 119                 dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
 120             }
 121         }
 122 
 123         if (orb.transportDebugFlag) {
 124             dprint(".registerInterestOps:<- ");
 125         }
 126     }
 127 
 128     public void registerForEvent(EventHandler eventHandler)
 129     {
 130         if (orb.transportDebugFlag) {
 131             dprint(".registerForEvent: " + eventHandler);
 132         }
 133 
 134         if (isClosed()) {
 135             if (orb.transportDebugFlag) {


 169     {
 170         if (orb.transportDebugFlag) {
 171             dprint(".unregisterForEvent: " + eventHandler);
 172         }
 173 
 174         if (isClosed()) {
 175             if (orb.transportDebugFlag) {
 176                 dprint(".unregisterForEvent: closed: " + eventHandler);
 177             }
 178             return;
 179         }
 180 
 181         if (eventHandler.shouldUseSelectThreadToWait()) {
 182             SelectionKey selectionKey ;
 183             synchronized(deferredRegistrations) {
 184                 selectionKey = eventHandler.getSelectionKey();
 185             }
 186             if (selectionKey != null) {
 187                 selectionKey.cancel();
 188             }

 189             selector.wakeup();

 190             return;
 191         }
 192 
 193         switch (eventHandler.getInterestOps()) {
 194         case SelectionKey.OP_ACCEPT :
 195             destroyListenerThread(eventHandler);
 196             break;
 197         case SelectionKey.OP_READ :
 198             destroyReaderThread(eventHandler);
 199             break;
 200         default:
 201             if (orb.transportDebugFlag) {
 202                 dprint(".unregisterForEvent: default: " + eventHandler);
 203             }
 204             throw new RuntimeException(
 205                 "SelectorImpl.uregisterForEvent: unknown interest ops");
 206         }
 207     }
 208 
 209     public void close()


 222         setClosed(true);
 223 
 224         Iterator i;
 225 
 226         // Kill listeners.
 227 
 228         i = listenerThreads.values().iterator();
 229         while (i.hasNext()) {
 230             ListenerThread listenerThread = (ListenerThread) i.next();
 231             listenerThread.close();
 232         }
 233 
 234         // Kill readers.
 235 
 236         i = readerThreads.values().iterator();
 237         while (i.hasNext()) {
 238             ReaderThread readerThread = (ReaderThread) i.next();
 239             readerThread.close();
 240         }
 241 


 242         // Selector
 243 
 244         try {
 245             if (selector != null) {
 246                 // wakeup Selector thread to process close request
 247                 selector.wakeup();
 248             }
 249         } catch (Throwable t) {
 250             if (orb.transportDebugFlag) {
 251                 dprint(".close: selector.close: " + t);
 252             }
 253         }
 254     }
 255 
 256     ///////////////////////////////////////////////////
 257     //
 258     // Thread methods.
 259     //
 260 
 261     public void run()
 262     {
 263         setName("SelectorThread");
 264         while (!closed) {
 265             try {
 266                 int n = 0;
 267                 if (timeout == 0 && orb.transportDebugFlag) {
 268                     dprint(".run: Beginning of selection cycle");
 269                 }
 270                 handleDeferredRegistrations();
 271                 enableInterestOps();
 272                 try {
 273                     n = selector.select(timeout);
 274                 } catch (IOException  e) {
 275                     if (orb.transportDebugFlag) {
 276                         dprint(".run: selector.select: " + e);
 277                     }
 278                 }
 279                 if (closed) {
 280                     selector.close();
 281                     if (orb.transportDebugFlag) {
 282                         dprint(".run: closed - .run return");
 283                     }
 284                     return;
 285                 }



 286                 /*
 287                   if (timeout == 0 && orb.transportDebugFlag) {
 288                   dprint(".run: selector.select() returned: " + n);
 289                   }
 290                   if (n == 0) {
 291                   continue;
 292                   }
 293                 */
 294                 Iterator iterator = selector.selectedKeys().iterator();
 295                 if (orb.transportDebugFlag) {
 296                     if (iterator.hasNext()) {
 297                         dprint(".run: n = " + n);
 298                     }
 299                 }
 300                 while (iterator.hasNext()) {
 301                     SelectionKey selectionKey = (SelectionKey) iterator.next();
 302                     iterator.remove();
 303                     EventHandler eventHandler = (EventHandler)
 304                         selectionKey.attachment();
 305                     try {
 306                         eventHandler.handleEvent();
 307                     } catch (Throwable t) {
 308                         if (orb.transportDebugFlag) {
 309                             dprint(".run: eventHandler.handleEvent", t);
 310                         }
 311                     }
 312                 }
 313                 if (timeout == 0 && orb.transportDebugFlag) {
 314                     dprint(".run: End of selection cycle");
 315                 }
 316             } catch (Throwable t) {
 317                 // IMPORTANT: ignore all errors so the select thread keeps running.
 318                 // Otherwise a guaranteed hang.
 319                 if (orb.transportDebugFlag) {
 320                     dprint(".run: ignoring", t);
 321                 }
 322             }
 323         }




 324     }








 325 
 326     /////////////////////////////////////////////////////
 327     //
 328     // Implementation.
 329     //
 330 






































 331     private synchronized boolean isClosed ()
 332     {
 333         return closed;
 334     }
 335 
 336     private synchronized void setClosed(boolean closed)
 337     {
 338         this.closed = closed;
 339     }
 340 
 341     private void startSelector()
 342     {
 343         try {
 344             selector = Selector.open();
 345         } catch (IOException e) {
 346             if (orb.transportDebugFlag) {
 347                 dprint(".startSelector: Selector.open: IOException: " + e);
 348             }
 349             // REVISIT - better handling/reporting
 350             RuntimeException rte =
 351                 new RuntimeException(".startSelector: Selector.open exception");
 352             rte.initCause(e);
 353             throw rte;
 354         }
 355         setDaemon(true);
 356         start();
 357         selectorStarted = true;
 358         if (orb.transportDebugFlag) {
 359             dprint(".startSelector: selector.start completed.");
 360         }
 361     }
 362 
 363     private void handleDeferredRegistrations()
 364     {
 365         synchronized (deferredRegistrations) {
 366             int deferredListSize = deferredRegistrations.size();
 367             for (int i = 0; i < deferredListSize; i++) {
 368                 EventHandler eventHandler =
 369                     (EventHandler)deferredRegistrations.get(i);
 370                 if (orb.transportDebugFlag) {
 371                     dprint(".handleDeferredRegistrations: " + eventHandler);
 372                 }
 373                 SelectableChannel channel = eventHandler.getChannel();
 374                 SelectionKey selectionKey = null;
 375                 try {
 376                     selectionKey =
 377                         channel.register(selector,
 378                                          eventHandler.getInterestOps(),
 379                                          (Object)eventHandler);
 380                 } catch (ClosedChannelException e) {
 381                     if (orb.transportDebugFlag) {
 382                         dprint(".handleDeferredRegistrations: " + e);
 383                     }
 384                 }
 385                 eventHandler.setSelectionKey(selectionKey);
 386             }
 387             deferredRegistrations.clear();
 388         }
 389     }
 390 
 391     private void enableInterestOps()
 392     {
 393         synchronized (interestOpsList) {
 394             int listSize = interestOpsList.size();
 395             if (listSize > 0) {
 396                 if (orb.transportDebugFlag) {
 397                     dprint(".enableInterestOps:->");
 398                 }
 399                 SelectionKey selectionKey = null;
 400                 SelectionKeyAndOp keyAndOp = null;
 401                 int keyOp, selectionKeyOps = 0;
 402                 for (int i = 0; i < listSize; i++) {




   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.transport;
  27 
  28 import java.io.IOException;
  29 import java.net.ServerSocket;
  30 import java.nio.channels.ClosedChannelException;
  31 import java.nio.channels.SelectableChannel;
  32 import java.nio.channels.ServerSocketChannel;
  33 import java.nio.channels.SelectionKey;
  34 import java.nio.channels.Selector;
  35 import java.nio.channels.ClosedSelectorException;
  36 import java.util.ArrayList;
  37 import java.util.HashMap;
  38 import java.util.Map;
  39 import java.util.Iterator;
  40 import java.util.List;
  41 
  42 
  43 import com.sun.corba.se.pept.broker.Broker;
  44 import com.sun.corba.se.pept.transport.Acceptor;
  45 import com.sun.corba.se.pept.transport.Connection;
  46 import com.sun.corba.se.pept.transport.EventHandler;
  47 import com.sun.corba.se.pept.transport.ListenerThread;
  48 import com.sun.corba.se.pept.transport.ReaderThread;
  49 
  50 import com.sun.corba.se.spi.logging.CORBALogDomains;
  51 import com.sun.corba.se.spi.orb.ORB;
  52 import com.sun.corba.se.spi.orbutil.threadpool.Work;
  53 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
  54 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  55 
  56 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  57 import com.sun.corba.se.impl.orbutil.ORBUtility;
  58 
  59 /**
  60  * @author Harold Carr
  61  */
  62 class SelectorImpl


  98 
  99     public long getTimeout()
 100     {
 101         return timeout;
 102     }
 103 
 104     public void registerInterestOps(EventHandler eventHandler)
 105     {
 106         if (orb.transportDebugFlag) {
 107             dprint(".registerInterestOps:-> " + eventHandler);
 108         }
 109 
 110         SelectionKey selectionKey = eventHandler.getSelectionKey();
 111         if (selectionKey.isValid()) {
 112             int ehOps = eventHandler.getInterestOps();
 113             SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
 114             synchronized(interestOpsList) {
 115                 interestOpsList.add(keyAndOp);
 116             }
 117             // tell Selector Thread there's an update to a SelectorKey's Ops
 118             try {
 119                 if (selector != null) {
 120                     // wakeup Selector thread to process close request
 121                     selector.wakeup();
 122                 }
 123             } catch (Throwable t) {
 124                 if (orb.transportDebugFlag) {
 125                     dprint(".registerInterestOps: selector.wakeup: ", t);
 126                 }
 127             }
 128         }
 129         else {
 130             wrapper.selectionKeyInvalid(eventHandler.toString());
 131             if (orb.transportDebugFlag) {
 132                 dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
 133             }
 134         }
 135 
 136         if (orb.transportDebugFlag) {
 137             dprint(".registerInterestOps:<- ");
 138         }
 139     }
 140 
 141     public void registerForEvent(EventHandler eventHandler)
 142     {
 143         if (orb.transportDebugFlag) {
 144             dprint(".registerForEvent: " + eventHandler);
 145         }
 146 
 147         if (isClosed()) {
 148             if (orb.transportDebugFlag) {


 182     {
 183         if (orb.transportDebugFlag) {
 184             dprint(".unregisterForEvent: " + eventHandler);
 185         }
 186 
 187         if (isClosed()) {
 188             if (orb.transportDebugFlag) {
 189                 dprint(".unregisterForEvent: closed: " + eventHandler);
 190             }
 191             return;
 192         }
 193 
 194         if (eventHandler.shouldUseSelectThreadToWait()) {
 195             SelectionKey selectionKey ;
 196             synchronized(deferredRegistrations) {
 197                 selectionKey = eventHandler.getSelectionKey();
 198             }
 199             if (selectionKey != null) {
 200                 selectionKey.cancel();
 201             }
 202             if (selector != null) {
 203                 selector.wakeup();
 204             }
 205             return;
 206         }
 207 
 208         switch (eventHandler.getInterestOps()) {
 209         case SelectionKey.OP_ACCEPT :
 210             destroyListenerThread(eventHandler);
 211             break;
 212         case SelectionKey.OP_READ :
 213             destroyReaderThread(eventHandler);
 214             break;
 215         default:
 216             if (orb.transportDebugFlag) {
 217                 dprint(".unregisterForEvent: default: " + eventHandler);
 218             }
 219             throw new RuntimeException(
 220                 "SelectorImpl.uregisterForEvent: unknown interest ops");
 221         }
 222     }
 223 
 224     public void close()


 237         setClosed(true);
 238 
 239         Iterator i;
 240 
 241         // Kill listeners.
 242 
 243         i = listenerThreads.values().iterator();
 244         while (i.hasNext()) {
 245             ListenerThread listenerThread = (ListenerThread) i.next();
 246             listenerThread.close();
 247         }
 248 
 249         // Kill readers.
 250 
 251         i = readerThreads.values().iterator();
 252         while (i.hasNext()) {
 253             ReaderThread readerThread = (ReaderThread) i.next();
 254             readerThread.close();
 255         }
 256 
 257        clearDeferredRegistrations();
 258 
 259         // Selector
 260 
 261         try {
 262             if (selector != null) {
 263                 // wakeup Selector thread to process close request
 264                 selector.wakeup();
 265             }
 266         } catch (Throwable t) {
 267             if (orb.transportDebugFlag) {
 268                 dprint(".close: selector.wakeup: ", t);
 269             }
 270         }
 271     }
 272 
 273     ///////////////////////////////////////////////////
 274     //
 275     // Thread methods.
 276     //
 277 
 278     public void run()
 279     {
 280         setName("SelectorThread");
 281         while (!closed) {
 282             try {
 283                 int n = 0;
 284                 if (timeout == 0 && orb.transportDebugFlag) {
 285                     dprint(".run: Beginning of selection cycle");
 286                 }
 287                 handleDeferredRegistrations();
 288                 enableInterestOps();
 289                 try {
 290                     n = selector.select(timeout);
 291                 } catch (IOException  e) {
 292                     if (orb.transportDebugFlag) {
 293                         dprint(".run: selector.select: ", e);
 294                     }
 295                 } catch (ClosedSelectorException csEx) {


 296                     if (orb.transportDebugFlag) {
 297                         dprint(".run: selector.select: ", csEx);
 298                     }
 299                     break;
 300                 }
 301                 if (closed) {
 302                     break;
 303                 }
 304                 /*
 305                   if (timeout == 0 && orb.transportDebugFlag) {
 306                   dprint(".run: selector.select() returned: " + n);
 307                   }
 308                   if (n == 0) {
 309                   continue;
 310                   }
 311                 */
 312                 Iterator iterator = selector.selectedKeys().iterator();
 313                 if (orb.transportDebugFlag) {
 314                     if (iterator.hasNext()) {
 315                         dprint(".run: n = " + n);
 316                     }
 317                 }
 318                 while (iterator.hasNext()) {
 319                     SelectionKey selectionKey = (SelectionKey) iterator.next();
 320                     iterator.remove();
 321                     EventHandler eventHandler = (EventHandler)
 322                         selectionKey.attachment();
 323                     try {
 324                         eventHandler.handleEvent();
 325                     } catch (Throwable t) {
 326                         if (orb.transportDebugFlag) {
 327                             dprint(".run: eventHandler.handleEvent", t);
 328                         }
 329                     }
 330                 }
 331                 if (timeout == 0 && orb.transportDebugFlag) {
 332                     dprint(".run: End of selection cycle");
 333                 }
 334             } catch (Throwable t) {
 335                 // IMPORTANT: ignore all errors so the select thread keeps running.
 336                 // Otherwise a guaranteed hang.
 337                 if (orb.transportDebugFlag) {
 338                     dprint(".run: ignoring", t);
 339                 }
 340             }
 341         }
 342         try {
 343             if (selector != null) {
 344                 if (orb.transportDebugFlag) {
 345                     dprint(".run: selector.close ");
 346                 }
 347                 selector.close();
 348             }
 349         } catch (Throwable t) {
 350             if (orb.transportDebugFlag) {
 351                 dprint(".run: selector.close: ", t);
 352             }
 353         }
 354     }
 355 
 356     /////////////////////////////////////////////////////
 357     //
 358     // Implementation.
 359     //
 360 
 361     private void clearDeferredRegistrations() {
 362         synchronized (deferredRegistrations) {
 363             int deferredListSize = deferredRegistrations.size();
 364             if (orb.transportDebugFlag) {
 365                 dprint(".clearDeferredRegistrations:deferred list size == " + deferredListSize);
 366             }
 367             for (int i = 0; i < deferredListSize; i++) {
 368                 EventHandler eventHandler =
 369                     (EventHandler)deferredRegistrations.get(i);
 370                 if (orb.transportDebugFlag) {
 371                     dprint(".clearDeferredRegistrations: " + eventHandler);
 372                 }
 373                 SelectableChannel channel = eventHandler.getChannel();
 374                 SelectionKey selectionKey = null;
 375 
 376                 try {
 377                     if (orb.transportDebugFlag) {
 378                         dprint(".clearDeferredRegistrations:close channel == "
 379                                 + channel);
 380                         dprint(".clearDeferredRegistrations:close channel class == "
 381                                 + channel.getClass().getName());
 382                     }
 383                     channel.close();
 384                     selectionKey = eventHandler.getSelectionKey();
 385                     if (selectionKey != null) {
 386                         selectionKey.cancel();
 387                         selectionKey.attach(null);
 388                     }
 389                 } catch (IOException ioEx) {
 390                     if (orb.transportDebugFlag) {
 391                         dprint(".clearDeferredRegistrations: ", ioEx);
 392                     }
 393                 }
 394             }
 395             deferredRegistrations.clear();
 396         }
 397     }
 398 
 399     private synchronized boolean isClosed ()
 400     {
 401         return closed;
 402     }
 403 
 404     private synchronized void setClosed(boolean closed)
 405     {
 406         this.closed = closed;
 407     }
 408 
 409     private void startSelector()
 410     {
 411         try {
 412             selector = Selector.open();
 413         } catch (IOException e) {
 414             if (orb.transportDebugFlag) {
 415                 dprint(".startSelector: Selector.open: IOException: ", e);
 416             }
 417             // REVISIT - better handling/reporting
 418             RuntimeException rte =
 419                 new RuntimeException(".startSelector: Selector.open exception");
 420             rte.initCause(e);
 421             throw rte;
 422         }
 423         setDaemon(true);
 424         start();
 425         selectorStarted = true;
 426         if (orb.transportDebugFlag) {
 427             dprint(".startSelector: selector.start completed.");
 428         }
 429     }
 430 
 431     private void handleDeferredRegistrations()
 432     {
 433         synchronized (deferredRegistrations) {
 434             int deferredListSize = deferredRegistrations.size();
 435             for (int i = 0; i < deferredListSize; i++) {
 436                 EventHandler eventHandler =
 437                     (EventHandler)deferredRegistrations.get(i);
 438                 if (orb.transportDebugFlag) {
 439                     dprint(".handleDeferredRegistrations: " + eventHandler);
 440                 }
 441                 SelectableChannel channel = eventHandler.getChannel();
 442                 SelectionKey selectionKey = null;
 443                 try {
 444                     selectionKey =
 445                         channel.register(selector,
 446                                          eventHandler.getInterestOps(),
 447                                          (Object)eventHandler);
 448                 } catch (ClosedChannelException e) {
 449                     if (orb.transportDebugFlag) {
 450                         dprint(".handleDeferredRegistrations: ", e);
 451                     }
 452                 }
 453                 eventHandler.setSelectionKey(selectionKey);
 454             }
 455             deferredRegistrations.clear();
 456         }
 457     }
 458 
 459     private void enableInterestOps()
 460     {
 461         synchronized (interestOpsList) {
 462             int listSize = interestOpsList.size();
 463             if (listSize > 0) {
 464                 if (orb.transportDebugFlag) {
 465                     dprint(".enableInterestOps:->");
 466                 }
 467                 SelectionKey selectionKey = null;
 468                 SelectionKeyAndOp keyAndOp = null;
 469                 int keyOp, selectionKeyOps = 0;
 470                 for (int i = 0; i < listSize; i++) {