1 /*
   2  * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.transport;
  27 
  28 import java.io.IOException;
  29 import java.nio.channels.ClosedChannelException;
  30 import java.nio.channels.SelectableChannel;
  31 import java.nio.channels.SelectionKey;
  32 import java.nio.channels.Selector;
  33 import java.util.ArrayList;
  34 import java.util.HashMap;
  35 import java.util.Map;
  36 import java.util.Iterator;
  37 import java.util.List;
  38 
  39 import com.sun.corba.se.pept.broker.Broker;
  40 import com.sun.corba.se.pept.transport.Acceptor;
  41 import com.sun.corba.se.pept.transport.Connection;
  42 import com.sun.corba.se.pept.transport.EventHandler;
  43 import com.sun.corba.se.pept.transport.ListenerThread;
  44 import com.sun.corba.se.pept.transport.ReaderThread;
  45 
  46 import com.sun.corba.se.spi.logging.CORBALogDomains;
  47 import com.sun.corba.se.spi.orb.ORB;
  48 import com.sun.corba.se.spi.orbutil.threadpool.Work;
  49 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
  50 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  51 
  52 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  53 import com.sun.corba.se.impl.orbutil.ORBUtility;
  54 
  55 /**
  56  * @author Harold Carr
  57  */
  58 class SelectorImpl
  59     extends
  60         sun.misc.ManagedLocalsThread
  61     implements
  62         com.sun.corba.se.pept.transport.Selector
  63 {
  64     private ORB orb;
  65     private Selector selector;
  66     private long timeout;
  67     private List deferredRegistrations;
  68     private List interestOpsList;
  69     private HashMap listenerThreads;
  70     private Map readerThreads;
  71     private boolean selectorStarted;
  72     private volatile boolean closed;
  73     private ORBUtilSystemException wrapper;
  74 
  75 
  76     public SelectorImpl(ORB orb)
  77     {
  78         this.orb = orb;
  79         selector = null;
  80         selectorStarted = false;
  81         timeout = 60000;
  82         deferredRegistrations = new ArrayList();
  83         interestOpsList = new ArrayList();
  84         listenerThreads = new HashMap();
  85         readerThreads = java.util.Collections.synchronizedMap(new HashMap());
  86         closed = false;
  87         wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);
  88     }
  89 
  90     public void setTimeout(long timeout)
  91     {
  92         this.timeout = timeout;
  93     }
  94 
  95     public long getTimeout()
  96     {
  97         return timeout;
  98     }
  99 
 100     public void registerInterestOps(EventHandler eventHandler)
 101     {
 102         if (orb.transportDebugFlag) {
 103             dprint(".registerInterestOps:-> " + eventHandler);
 104         }
 105 
 106         SelectionKey selectionKey = eventHandler.getSelectionKey();
 107         if (selectionKey.isValid()) {
 108             int ehOps = eventHandler.getInterestOps();
 109             SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
 110             synchronized(interestOpsList) {
 111                 interestOpsList.add(keyAndOp);
 112             }
 113             // tell Selector Thread there's an update to a SelectorKey's Ops
 114             selector.wakeup();
 115         }
 116         else {
 117             wrapper.selectionKeyInvalid(eventHandler.toString());
 118             if (orb.transportDebugFlag) {
 119                 dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
 120             }
 121         }
 122 
 123         if (orb.transportDebugFlag) {
 124             dprint(".registerInterestOps:<- ");
 125         }
 126     }
 127 
 128     public void registerForEvent(EventHandler eventHandler)
 129     {
 130         if (orb.transportDebugFlag) {
 131             dprint(".registerForEvent: " + eventHandler);
 132         }
 133 
 134         if (isClosed()) {
 135             if (orb.transportDebugFlag) {
 136                 dprint(".registerForEvent: closed: " + eventHandler);
 137             }
 138             return;
 139         }
 140 
 141         if (eventHandler.shouldUseSelectThreadToWait()) {
 142             synchronized (deferredRegistrations) {
 143                 deferredRegistrations.add(eventHandler);
 144             }
 145             if (! selectorStarted) {
 146                 startSelector();
 147             }
 148             selector.wakeup();
 149             return;
 150         }
 151 
 152         switch (eventHandler.getInterestOps()) {
 153         case SelectionKey.OP_ACCEPT :
 154             createListenerThread(eventHandler);
 155             break;
 156         case SelectionKey.OP_READ :
 157             createReaderThread(eventHandler);
 158             break;
 159         default:
 160             if (orb.transportDebugFlag) {
 161                 dprint(".registerForEvent: default: " + eventHandler);
 162             }
 163             throw new RuntimeException(
 164                 "SelectorImpl.registerForEvent: unknown interest ops");
 165         }
 166     }
 167 
 168     public void unregisterForEvent(EventHandler eventHandler)
 169     {
 170         if (orb.transportDebugFlag) {
 171             dprint(".unregisterForEvent: " + eventHandler);
 172         }
 173 
 174         if (isClosed()) {
 175             if (orb.transportDebugFlag) {
 176                 dprint(".unregisterForEvent: closed: " + eventHandler);
 177             }
 178             return;
 179         }
 180 
 181         if (eventHandler.shouldUseSelectThreadToWait()) {
 182             SelectionKey selectionKey ;
 183             synchronized(deferredRegistrations) {
 184                 selectionKey = eventHandler.getSelectionKey();
 185             }
 186             if (selectionKey != null) {
 187                 selectionKey.cancel();
 188             }
 189             selector.wakeup();
 190             return;
 191         }
 192 
 193         switch (eventHandler.getInterestOps()) {
 194         case SelectionKey.OP_ACCEPT :
 195             destroyListenerThread(eventHandler);
 196             break;
 197         case SelectionKey.OP_READ :
 198             destroyReaderThread(eventHandler);
 199             break;
 200         default:
 201             if (orb.transportDebugFlag) {
 202                 dprint(".unregisterForEvent: default: " + eventHandler);
 203             }
 204             throw new RuntimeException(
 205                 "SelectorImpl.uregisterForEvent: unknown interest ops");
 206         }
 207     }
 208 
 209     public void close()
 210     {
 211         if (orb.transportDebugFlag) {
 212             dprint(".close");
 213         }
 214 
 215         if (isClosed()) {
 216             if (orb.transportDebugFlag) {
 217                 dprint(".close: already closed");
 218             }
 219             return;
 220         }
 221 
 222         setClosed(true);
 223 
 224         Iterator i;
 225 
 226         // Kill listeners.
 227 
 228         i = listenerThreads.values().iterator();
 229         while (i.hasNext()) {
 230             ListenerThread listenerThread = (ListenerThread) i.next();
 231             listenerThread.close();
 232         }
 233 
 234         // Kill readers.
 235 
 236         i = readerThreads.values().iterator();
 237         while (i.hasNext()) {
 238             ReaderThread readerThread = (ReaderThread) i.next();
 239             readerThread.close();
 240         }
 241 
 242         // Selector
 243 
 244         try {
 245             if (selector != null) {
 246                 // wakeup Selector thread to process close request
 247                 selector.wakeup();
 248             }
 249         } catch (Throwable t) {
 250             if (orb.transportDebugFlag) {
 251                 dprint(".close: selector.close: " + t);
 252             }
 253         }
 254     }
 255 
 256     ///////////////////////////////////////////////////
 257     //
 258     // Thread methods.
 259     //
 260 
 261     public void run()
 262     {
 263         setName("SelectorThread");
 264         while (!closed) {
 265             try {
 266                 int n = 0;
 267                 if (timeout == 0 && orb.transportDebugFlag) {
 268                     dprint(".run: Beginning of selection cycle");
 269                 }
 270                 handleDeferredRegistrations();
 271                 enableInterestOps();
 272                 try {
 273                     n = selector.select(timeout);
 274                 } catch (IOException  e) {
 275                     if (orb.transportDebugFlag) {
 276                         dprint(".run: selector.select: " + e);
 277                     }
 278                 }
 279                 if (closed) {
 280                     selector.close();
 281                     if (orb.transportDebugFlag) {
 282                         dprint(".run: closed - .run return");
 283                     }
 284                     return;
 285                 }
 286                 /*
 287                   if (timeout == 0 && orb.transportDebugFlag) {
 288                   dprint(".run: selector.select() returned: " + n);
 289                   }
 290                   if (n == 0) {
 291                   continue;
 292                   }
 293                 */
 294                 Iterator iterator = selector.selectedKeys().iterator();
 295                 if (orb.transportDebugFlag) {
 296                     if (iterator.hasNext()) {
 297                         dprint(".run: n = " + n);
 298                     }
 299                 }
 300                 while (iterator.hasNext()) {
 301                     SelectionKey selectionKey = (SelectionKey) iterator.next();
 302                     iterator.remove();
 303                     EventHandler eventHandler = (EventHandler)
 304                         selectionKey.attachment();
 305                     try {
 306                         eventHandler.handleEvent();
 307                     } catch (Throwable t) {
 308                         if (orb.transportDebugFlag) {
 309                             dprint(".run: eventHandler.handleEvent", t);
 310                         }
 311                     }
 312                 }
 313                 if (timeout == 0 && orb.transportDebugFlag) {
 314                     dprint(".run: End of selection cycle");
 315                 }
 316             } catch (Throwable t) {
 317                 // IMPORTANT: ignore all errors so the select thread keeps running.
 318                 // Otherwise a guaranteed hang.
 319                 if (orb.transportDebugFlag) {
 320                     dprint(".run: ignoring", t);
 321                 }
 322             }
 323         }
 324     }
 325 
 326     /////////////////////////////////////////////////////
 327     //
 328     // Implementation.
 329     //
 330 
 331     private synchronized boolean isClosed ()
 332     {
 333         return closed;
 334     }
 335 
 336     private synchronized void setClosed(boolean closed)
 337     {
 338         this.closed = closed;
 339     }
 340 
 341     private void startSelector()
 342     {
 343         try {
 344             selector = Selector.open();
 345         } catch (IOException e) {
 346             if (orb.transportDebugFlag) {
 347                 dprint(".startSelector: Selector.open: IOException: " + e);
 348             }
 349             // REVISIT - better handling/reporting
 350             RuntimeException rte =
 351                 new RuntimeException(".startSelector: Selector.open exception");
 352             rte.initCause(e);
 353             throw rte;
 354         }
 355         setDaemon(true);
 356         start();
 357         selectorStarted = true;
 358         if (orb.transportDebugFlag) {
 359             dprint(".startSelector: selector.start completed.");
 360         }
 361     }
 362 
 363     private void handleDeferredRegistrations()
 364     {
 365         synchronized (deferredRegistrations) {
 366             int deferredListSize = deferredRegistrations.size();
 367             for (int i = 0; i < deferredListSize; i++) {
 368                 EventHandler eventHandler =
 369                     (EventHandler)deferredRegistrations.get(i);
 370                 if (orb.transportDebugFlag) {
 371                     dprint(".handleDeferredRegistrations: " + eventHandler);
 372                 }
 373                 SelectableChannel channel = eventHandler.getChannel();
 374                 SelectionKey selectionKey = null;
 375                 try {
 376                     selectionKey =
 377                         channel.register(selector,
 378                                          eventHandler.getInterestOps(),
 379                                          (Object)eventHandler);
 380                 } catch (ClosedChannelException e) {
 381                     if (orb.transportDebugFlag) {
 382                         dprint(".handleDeferredRegistrations: " + e);
 383                     }
 384                 }
 385                 eventHandler.setSelectionKey(selectionKey);
 386             }
 387             deferredRegistrations.clear();
 388         }
 389     }
 390 
 391     private void enableInterestOps()
 392     {
 393         synchronized (interestOpsList) {
 394             int listSize = interestOpsList.size();
 395             if (listSize > 0) {
 396                 if (orb.transportDebugFlag) {
 397                     dprint(".enableInterestOps:->");
 398                 }
 399                 SelectionKey selectionKey = null;
 400                 SelectionKeyAndOp keyAndOp = null;
 401                 int keyOp, selectionKeyOps = 0;
 402                 for (int i = 0; i < listSize; i++) {
 403                     keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
 404                     selectionKey = keyAndOp.selectionKey;
 405 
 406                     // Need to check if the SelectionKey is valid because a
 407                     // connection's SelectionKey could be put on the list to
 408                     // have its OP enabled and before it's enabled be reclaimed.
 409                     // Otherwise, the enabling of the OP will throw an exception
 410                     // here and exit this method an potentially not enable all
 411                     // registered ops.
 412                     //
 413                     // So, we ignore SelectionKeys that are invalid. They will get
 414                     // cleaned up on the next Selector.select() call.
 415 
 416                     if (selectionKey.isValid()) {
 417                         if (orb.transportDebugFlag) {
 418                             dprint(".enableInterestOps: " + keyAndOp);
 419                         }
 420                         keyOp = keyAndOp.keyOp;
 421                         selectionKeyOps = selectionKey.interestOps();
 422                         selectionKey.interestOps(selectionKeyOps | keyOp);
 423                     }
 424                 }
 425                 interestOpsList.clear();
 426                 if (orb.transportDebugFlag) {
 427                     dprint(".enableInterestOps:<-");
 428                 }
 429             }
 430         }
 431     }
 432 
 433     private void createListenerThread(EventHandler eventHandler)
 434     {
 435         if (orb.transportDebugFlag) {
 436             dprint(".createListenerThread: " + eventHandler);
 437         }
 438         Acceptor acceptor = eventHandler.getAcceptor();
 439         ListenerThread listenerThread =
 440             new ListenerThreadImpl(orb, acceptor, this);
 441         listenerThreads.put(eventHandler, listenerThread);
 442         Throwable throwable = null;
 443         try {
 444             orb.getThreadPoolManager().getThreadPool(0)
 445                 .getWorkQueue(0).addWork((Work)listenerThread);
 446         } catch (NoSuchThreadPoolException e) {
 447             throwable = e;
 448         } catch (NoSuchWorkQueueException e) {
 449             throwable = e;
 450         }
 451         if (throwable != null) {
 452             RuntimeException rte = new RuntimeException(throwable.toString());
 453             rte.initCause(throwable);
 454             throw rte;
 455         }
 456     }
 457 
 458     private void destroyListenerThread(EventHandler eventHandler)
 459     {
 460         if (orb.transportDebugFlag) {
 461             dprint(".destroyListenerThread: " + eventHandler);
 462         }
 463         ListenerThread listenerThread = (ListenerThread)
 464             listenerThreads.get(eventHandler);
 465         if (listenerThread == null) {
 466             if (orb.transportDebugFlag) {
 467                 dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
 468             }
 469             return;
 470         }
 471         listenerThreads.remove(eventHandler);
 472         listenerThread.close();
 473     }
 474 
 475     private void createReaderThread(EventHandler eventHandler)
 476     {
 477         if (orb.transportDebugFlag) {
 478             dprint(".createReaderThread: " + eventHandler);
 479         }
 480         Connection connection = eventHandler.getConnection();
 481         ReaderThread readerThread =
 482             new ReaderThreadImpl(orb, connection, this);
 483         readerThreads.put(eventHandler, readerThread);
 484         Throwable throwable = null;
 485         try {
 486             orb.getThreadPoolManager().getThreadPool(0)
 487                 .getWorkQueue(0).addWork((Work)readerThread);
 488         } catch (NoSuchThreadPoolException e) {
 489             throwable = e;
 490         } catch (NoSuchWorkQueueException e) {
 491             throwable = e;
 492         }
 493         if (throwable != null) {
 494             RuntimeException rte = new RuntimeException(throwable.toString());
 495             rte.initCause(throwable);
 496             throw rte;
 497         }
 498     }
 499 
 500     private void destroyReaderThread(EventHandler eventHandler)
 501     {
 502         if (orb.transportDebugFlag) {
 503             dprint(".destroyReaderThread: " + eventHandler);
 504         }
 505         ReaderThread readerThread = (ReaderThread)
 506             readerThreads.get(eventHandler);
 507         if (readerThread == null) {
 508             if (orb.transportDebugFlag) {
 509                 dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
 510             }
 511             return;
 512         }
 513         readerThreads.remove(eventHandler);
 514         readerThread.close();
 515     }
 516 
 517     private void dprint(String msg)
 518     {
 519         ORBUtility.dprint("SelectorImpl", msg);
 520     }
 521 
 522     protected void dprint(String msg, Throwable t)
 523     {
 524         dprint(msg);
 525         t.printStackTrace(System.out);
 526     }
 527 
 528     // Private class to contain a SelectionKey and a SelectionKey op.
 529     // Used only by SelectorImpl to register and enable SelectionKey
 530     // Op.
 531     // REVISIT - Could do away with this class and use the EventHanlder
 532     //           directly.
 533     private class SelectionKeyAndOp
 534     {
 535         // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT]
 536         public int keyOp;
 537         public SelectionKey selectionKey;
 538 
 539         // constructor
 540         public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) {
 541             this.selectionKey = selectionKey;
 542             this.keyOp = keyOp;
 543         }
 544     }
 545 
 546 // End of file.
 547 }