1 /*
   2  * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.transport;
  27 
  28 import java.io.IOException;
  29 import java.net.ServerSocket;
  30 import java.nio.channels.ClosedChannelException;
  31 import java.nio.channels.SelectableChannel;
  32 import java.nio.channels.ServerSocketChannel;
  33 import java.nio.channels.SelectionKey;
  34 import java.nio.channels.Selector;
  35 import java.nio.channels.ClosedSelectorException;
  36 import java.util.ArrayList;
  37 import java.util.HashMap;
  38 import java.util.Map;
  39 import java.util.Iterator;
  40 import java.util.List;
  41 
  42 
  43 import com.sun.corba.se.pept.broker.Broker;
  44 import com.sun.corba.se.pept.transport.Acceptor;
  45 import com.sun.corba.se.pept.transport.Connection;
  46 import com.sun.corba.se.pept.transport.EventHandler;
  47 import com.sun.corba.se.pept.transport.ListenerThread;
  48 import com.sun.corba.se.pept.transport.ReaderThread;
  49 
  50 import com.sun.corba.se.spi.logging.CORBALogDomains;
  51 import com.sun.corba.se.spi.orb.ORB;
  52 import com.sun.corba.se.spi.orbutil.threadpool.Work;
  53 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
  54 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  55 
  56 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  57 import com.sun.corba.se.impl.orbutil.ORBUtility;
  58 
  59 /**
  60  * @author Harold Carr
  61  */
  62 class SelectorImpl
  63     extends
  64         Thread
  65     implements
  66         com.sun.corba.se.pept.transport.Selector
  67 {
  68     private ORB orb;
  69     private Selector selector;
  70     private long timeout;
  71     private List deferredRegistrations;
  72     private List interestOpsList;
  73     private HashMap listenerThreads;
  74     private Map readerThreads;
  75     private boolean selectorStarted;
  76     private volatile boolean closed;
  77     private ORBUtilSystemException wrapper;
  78 
  79 
  80     public SelectorImpl(ORB orb)
  81     {
  82         super(null, null, "ORB-Selector-Thread", 0, false);
  83         this.orb = orb;
  84         selector = null;
  85         selectorStarted = false;
  86         timeout = 60000;
  87         deferredRegistrations = new ArrayList();
  88         interestOpsList = new ArrayList();
  89         listenerThreads = new HashMap();
  90         readerThreads = java.util.Collections.synchronizedMap(new HashMap());
  91         closed = false;
  92         wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);
  93     }
  94 
  95     public void setTimeout(long timeout)
  96     {
  97         this.timeout = timeout;
  98     }
  99 
 100     public long getTimeout()
 101     {
 102         return timeout;
 103     }
 104 
 105     public void registerInterestOps(EventHandler eventHandler)
 106     {
 107         if (orb.transportDebugFlag) {
 108             dprint(".registerInterestOps:-> " + eventHandler);
 109         }
 110 
 111         SelectionKey selectionKey = eventHandler.getSelectionKey();
 112         if (selectionKey.isValid()) {
 113             int ehOps = eventHandler.getInterestOps();
 114             SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
 115             synchronized(interestOpsList) {
 116                 interestOpsList.add(keyAndOp);
 117             }
 118             // tell Selector Thread there's an update to a SelectorKey's Ops
 119             try {
 120                 if (selector != null) {
 121                     // wakeup Selector thread to process close request
 122                     selector.wakeup();
 123                 }
 124             } catch (Throwable t) {
 125                 if (orb.transportDebugFlag) {
 126                     dprint(".registerInterestOps: selector.wakeup: ", t);
 127                 }
 128             }
 129         }
 130         else {
 131             wrapper.selectionKeyInvalid(eventHandler.toString());
 132             if (orb.transportDebugFlag) {
 133                 dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
 134             }
 135         }
 136 
 137         if (orb.transportDebugFlag) {
 138             dprint(".registerInterestOps:<- ");
 139         }
 140     }
 141 
 142     public void registerForEvent(EventHandler eventHandler)
 143     {
 144         if (orb.transportDebugFlag) {
 145             dprint(".registerForEvent: " + eventHandler);
 146         }
 147 
 148         if (isClosed()) {
 149             if (orb.transportDebugFlag) {
 150                 dprint(".registerForEvent: closed: " + eventHandler);
 151             }
 152             return;
 153         }
 154 
 155         if (eventHandler.shouldUseSelectThreadToWait()) {
 156             synchronized (deferredRegistrations) {
 157                 deferredRegistrations.add(eventHandler);
 158             }
 159             if (! selectorStarted) {
 160                 startSelector();
 161             }
 162             selector.wakeup();
 163             return;
 164         }
 165 
 166         switch (eventHandler.getInterestOps()) {
 167         case SelectionKey.OP_ACCEPT :
 168             createListenerThread(eventHandler);
 169             break;
 170         case SelectionKey.OP_READ :
 171             createReaderThread(eventHandler);
 172             break;
 173         default:
 174             if (orb.transportDebugFlag) {
 175                 dprint(".registerForEvent: default: " + eventHandler);
 176             }
 177             throw new RuntimeException(
 178                 "SelectorImpl.registerForEvent: unknown interest ops");
 179         }
 180     }
 181 
 182     public void unregisterForEvent(EventHandler eventHandler)
 183     {
 184         if (orb.transportDebugFlag) {
 185             dprint(".unregisterForEvent: " + eventHandler);
 186         }
 187 
 188         if (isClosed()) {
 189             if (orb.transportDebugFlag) {
 190                 dprint(".unregisterForEvent: closed: " + eventHandler);
 191             }
 192             return;
 193         }
 194 
 195         if (eventHandler.shouldUseSelectThreadToWait()) {
 196             SelectionKey selectionKey ;
 197             synchronized(deferredRegistrations) {
 198                 selectionKey = eventHandler.getSelectionKey();
 199             }
 200             if (selectionKey != null) {
 201                 selectionKey.cancel();
 202             }
 203             if (selector != null) {
 204                 selector.wakeup();
 205             }
 206             return;
 207         }
 208 
 209         switch (eventHandler.getInterestOps()) {
 210         case SelectionKey.OP_ACCEPT :
 211             destroyListenerThread(eventHandler);
 212             break;
 213         case SelectionKey.OP_READ :
 214             destroyReaderThread(eventHandler);
 215             break;
 216         default:
 217             if (orb.transportDebugFlag) {
 218                 dprint(".unregisterForEvent: default: " + eventHandler);
 219             }
 220             throw new RuntimeException(
 221                 "SelectorImpl.uregisterForEvent: unknown interest ops");
 222         }
 223     }
 224 
 225     public void close()
 226     {
 227         if (orb.transportDebugFlag) {
 228             dprint(".close");
 229         }
 230 
 231         if (isClosed()) {
 232             if (orb.transportDebugFlag) {
 233                 dprint(".close: already closed");
 234             }
 235             return;
 236         }
 237 
 238         setClosed(true);
 239 
 240         Iterator i;
 241 
 242         // Kill listeners.
 243 
 244         i = listenerThreads.values().iterator();
 245         while (i.hasNext()) {
 246             ListenerThread listenerThread = (ListenerThread) i.next();
 247             listenerThread.close();
 248         }
 249 
 250         // Kill readers.
 251 
 252         i = readerThreads.values().iterator();
 253         while (i.hasNext()) {
 254             ReaderThread readerThread = (ReaderThread) i.next();
 255             readerThread.close();
 256         }
 257 
 258        clearDeferredRegistrations();
 259 
 260         // Selector
 261 
 262         try {
 263             if (selector != null) {
 264                 // wakeup Selector thread to process close request
 265                 selector.wakeup();
 266             }
 267         } catch (Throwable t) {
 268             if (orb.transportDebugFlag) {
 269                 dprint(".close: selector.wakeup: ", t);
 270             }
 271         }
 272     }
 273 
 274     ///////////////////////////////////////////////////
 275     //
 276     // Thread methods.
 277     //
 278 
 279     public void run()
 280     {
 281         while (!closed) {
 282             try {
 283                 int n = 0;
 284                 if (timeout == 0 && orb.transportDebugFlag) {
 285                     dprint(".run: Beginning of selection cycle");
 286                 }
 287                 handleDeferredRegistrations();
 288                 enableInterestOps();
 289                 try {
 290                     n = selector.select(timeout);
 291                 } catch (IOException  e) {
 292                     if (orb.transportDebugFlag) {
 293                         dprint(".run: selector.select: ", e);
 294                     }
 295                 } catch (ClosedSelectorException csEx) {
 296                     if (orb.transportDebugFlag) {
 297                         dprint(".run: selector.select: ", csEx);
 298                     }
 299                     break;
 300                 }
 301                 if (closed) {
 302                     break;
 303                 }
 304                 /*
 305                   if (timeout == 0 && orb.transportDebugFlag) {
 306                   dprint(".run: selector.select() returned: " + n);
 307                   }
 308                   if (n == 0) {
 309                   continue;
 310                   }
 311                 */
 312                 Iterator iterator = selector.selectedKeys().iterator();
 313                 if (orb.transportDebugFlag) {
 314                     if (iterator.hasNext()) {
 315                         dprint(".run: n = " + n);
 316                     }
 317                 }
 318                 while (iterator.hasNext()) {
 319                     SelectionKey selectionKey = (SelectionKey) iterator.next();
 320                     iterator.remove();
 321                     EventHandler eventHandler = (EventHandler)
 322                         selectionKey.attachment();
 323                     try {
 324                         eventHandler.handleEvent();
 325                     } catch (Throwable t) {
 326                         if (orb.transportDebugFlag) {
 327                             dprint(".run: eventHandler.handleEvent", t);
 328                         }
 329                     }
 330                 }
 331                 if (timeout == 0 && orb.transportDebugFlag) {
 332                     dprint(".run: End of selection cycle");
 333                 }
 334             } catch (Throwable t) {
 335                 // IMPORTANT: ignore all errors so the select thread keeps running.
 336                 // Otherwise a guaranteed hang.
 337                 if (orb.transportDebugFlag) {
 338                     dprint(".run: ignoring", t);
 339                 }
 340             }
 341         }
 342         try {
 343             if (selector != null) {
 344                 if (orb.transportDebugFlag) {
 345                     dprint(".run: selector.close ");
 346                 }
 347                 selector.close();
 348             }
 349         } catch (Throwable t) {
 350             if (orb.transportDebugFlag) {
 351                 dprint(".run: selector.close: ", t);
 352             }
 353         }
 354     }
 355 
 356     /////////////////////////////////////////////////////
 357     //
 358     // Implementation.
 359     //
 360 
 361     private void clearDeferredRegistrations() {
 362         synchronized (deferredRegistrations) {
 363             int deferredListSize = deferredRegistrations.size();
 364             if (orb.transportDebugFlag) {
 365                 dprint(".clearDeferredRegistrations:deferred list size == " + deferredListSize);
 366             }
 367             for (int i = 0; i < deferredListSize; i++) {
 368                 EventHandler eventHandler =
 369                     (EventHandler)deferredRegistrations.get(i);
 370                 if (orb.transportDebugFlag) {
 371                     dprint(".clearDeferredRegistrations: " + eventHandler);
 372                 }
 373                 SelectableChannel channel = eventHandler.getChannel();
 374                 SelectionKey selectionKey = null;
 375 
 376                 try {
 377                     if (orb.transportDebugFlag) {
 378                         dprint(".clearDeferredRegistrations:close channel == "
 379                                 + channel);
 380                         dprint(".clearDeferredRegistrations:close channel class == "
 381                                 + channel.getClass().getName());
 382                     }
 383                     channel.close();
 384                     selectionKey = eventHandler.getSelectionKey();
 385                     if (selectionKey != null) {
 386                         selectionKey.cancel();
 387                         selectionKey.attach(null);
 388                     }
 389                 } catch (IOException ioEx) {
 390                     if (orb.transportDebugFlag) {
 391                         dprint(".clearDeferredRegistrations: ", ioEx);
 392                     }
 393                 }
 394             }
 395             deferredRegistrations.clear();
 396         }
 397     }
 398 
 399     private synchronized boolean isClosed ()
 400     {
 401         return closed;
 402     }
 403 
 404     private synchronized void setClosed(boolean closed)
 405     {
 406         this.closed = closed;
 407     }
 408 
 409     private void startSelector()
 410     {
 411         try {
 412             selector = Selector.open();
 413         } catch (IOException e) {
 414             if (orb.transportDebugFlag) {
 415                 dprint(".startSelector: Selector.open: IOException: ", e);
 416             }
 417             // REVISIT - better handling/reporting
 418             RuntimeException rte =
 419                 new RuntimeException(".startSelector: Selector.open exception");
 420             rte.initCause(e);
 421             throw rte;
 422         }
 423         setDaemon(true);
 424         start();
 425         selectorStarted = true;
 426         if (orb.transportDebugFlag) {
 427             dprint(".startSelector: selector.start completed.");
 428         }
 429     }
 430 
 431     private void handleDeferredRegistrations()
 432     {
 433         synchronized (deferredRegistrations) {
 434             int deferredListSize = deferredRegistrations.size();
 435             for (int i = 0; i < deferredListSize; i++) {
 436                 EventHandler eventHandler =
 437                     (EventHandler)deferredRegistrations.get(i);
 438                 if (orb.transportDebugFlag) {
 439                     dprint(".handleDeferredRegistrations: " + eventHandler);
 440                 }
 441                 SelectableChannel channel = eventHandler.getChannel();
 442                 SelectionKey selectionKey = null;
 443                 try {
 444                     selectionKey =
 445                         channel.register(selector,
 446                                          eventHandler.getInterestOps(),
 447                                          (Object)eventHandler);
 448                 } catch (ClosedChannelException e) {
 449                     if (orb.transportDebugFlag) {
 450                         dprint(".handleDeferredRegistrations: ", e);
 451                     }
 452                 }
 453                 eventHandler.setSelectionKey(selectionKey);
 454             }
 455             deferredRegistrations.clear();
 456         }
 457     }
 458 
 459     private void enableInterestOps()
 460     {
 461         synchronized (interestOpsList) {
 462             int listSize = interestOpsList.size();
 463             if (listSize > 0) {
 464                 if (orb.transportDebugFlag) {
 465                     dprint(".enableInterestOps:->");
 466                 }
 467                 SelectionKey selectionKey = null;
 468                 SelectionKeyAndOp keyAndOp = null;
 469                 int keyOp, selectionKeyOps = 0;
 470                 for (int i = 0; i < listSize; i++) {
 471                     keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
 472                     selectionKey = keyAndOp.selectionKey;
 473 
 474                     // Need to check if the SelectionKey is valid because a
 475                     // connection's SelectionKey could be put on the list to
 476                     // have its OP enabled and before it's enabled be reclaimed.
 477                     // Otherwise, the enabling of the OP will throw an exception
 478                     // here and exit this method an potentially not enable all
 479                     // registered ops.
 480                     //
 481                     // So, we ignore SelectionKeys that are invalid. They will get
 482                     // cleaned up on the next Selector.select() call.
 483 
 484                     if (selectionKey.isValid()) {
 485                         if (orb.transportDebugFlag) {
 486                             dprint(".enableInterestOps: " + keyAndOp);
 487                         }
 488                         keyOp = keyAndOp.keyOp;
 489                         selectionKeyOps = selectionKey.interestOps();
 490                         selectionKey.interestOps(selectionKeyOps | keyOp);
 491                     }
 492                 }
 493                 interestOpsList.clear();
 494                 if (orb.transportDebugFlag) {
 495                     dprint(".enableInterestOps:<-");
 496                 }
 497             }
 498         }
 499     }
 500 
 501     private void createListenerThread(EventHandler eventHandler)
 502     {
 503         if (orb.transportDebugFlag) {
 504             dprint(".createListenerThread: " + eventHandler);
 505         }
 506         Acceptor acceptor = eventHandler.getAcceptor();
 507         ListenerThread listenerThread =
 508             new ListenerThreadImpl(orb, acceptor, this);
 509         listenerThreads.put(eventHandler, listenerThread);
 510         Throwable throwable = null;
 511         try {
 512             orb.getThreadPoolManager().getThreadPool(0)
 513                 .getWorkQueue(0).addWork((Work)listenerThread);
 514         } catch (NoSuchThreadPoolException e) {
 515             throwable = e;
 516         } catch (NoSuchWorkQueueException e) {
 517             throwable = e;
 518         }
 519         if (throwable != null) {
 520             RuntimeException rte = new RuntimeException(throwable.toString());
 521             rte.initCause(throwable);
 522             throw rte;
 523         }
 524     }
 525 
 526     private void destroyListenerThread(EventHandler eventHandler)
 527     {
 528         if (orb.transportDebugFlag) {
 529             dprint(".destroyListenerThread: " + eventHandler);
 530         }
 531         ListenerThread listenerThread = (ListenerThread)
 532             listenerThreads.get(eventHandler);
 533         if (listenerThread == null) {
 534             if (orb.transportDebugFlag) {
 535                 dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
 536             }
 537             return;
 538         }
 539         listenerThreads.remove(eventHandler);
 540         listenerThread.close();
 541     }
 542 
 543     private void createReaderThread(EventHandler eventHandler)
 544     {
 545         if (orb.transportDebugFlag) {
 546             dprint(".createReaderThread: " + eventHandler);
 547         }
 548         Connection connection = eventHandler.getConnection();
 549         ReaderThread readerThread =
 550             new ReaderThreadImpl(orb, connection, this);
 551         readerThreads.put(eventHandler, readerThread);
 552         Throwable throwable = null;
 553         try {
 554             orb.getThreadPoolManager().getThreadPool(0)
 555                 .getWorkQueue(0).addWork((Work)readerThread);
 556         } catch (NoSuchThreadPoolException e) {
 557             throwable = e;
 558         } catch (NoSuchWorkQueueException e) {
 559             throwable = e;
 560         }
 561         if (throwable != null) {
 562             RuntimeException rte = new RuntimeException(throwable.toString());
 563             rte.initCause(throwable);
 564             throw rte;
 565         }
 566     }
 567 
 568     private void destroyReaderThread(EventHandler eventHandler)
 569     {
 570         if (orb.transportDebugFlag) {
 571             dprint(".destroyReaderThread: " + eventHandler);
 572         }
 573         ReaderThread readerThread = (ReaderThread)
 574             readerThreads.get(eventHandler);
 575         if (readerThread == null) {
 576             if (orb.transportDebugFlag) {
 577                 dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
 578             }
 579             return;
 580         }
 581         readerThreads.remove(eventHandler);
 582         readerThread.close();
 583     }
 584 
 585     private void dprint(String msg)
 586     {
 587         ORBUtility.dprint("SelectorImpl", msg);
 588     }
 589 
 590     protected void dprint(String msg, Throwable t)
 591     {
 592         dprint(msg);
 593         t.printStackTrace(System.out);
 594     }
 595 
 596     // Private class to contain a SelectionKey and a SelectionKey op.
 597     // Used only by SelectorImpl to register and enable SelectionKey
 598     // Op.
 599     // REVISIT - Could do away with this class and use the EventHanlder
 600     //           directly.
 601     private class SelectionKeyAndOp
 602     {
 603         // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT]
 604         public int keyOp;
 605         public SelectionKey selectionKey;
 606 
 607         // constructor
 608         public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) {
 609             this.selectionKey = selectionKey;
 610             this.keyOp = keyOp;
 611         }
 612     }
 613 
 614 // End of file.
 615 }