1 /*
   2  * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.transport;
  27 
  28 import java.io.IOException;
  29 import java.net.ServerSocket;
  30 import java.nio.channels.ClosedChannelException;
  31 import java.nio.channels.SelectableChannel;
  32 import java.nio.channels.ServerSocketChannel;
  33 import java.nio.channels.SelectionKey;
  34 import java.nio.channels.Selector;
  35 import java.nio.channels.ClosedSelectorException;
  36 import java.util.ArrayList;
  37 import java.util.HashMap;
  38 import java.util.Map;
  39 import java.util.Iterator;
  40 import java.util.List;
  41 
  42 
  43 import com.sun.corba.se.pept.broker.Broker;
  44 import com.sun.corba.se.pept.transport.Acceptor;
  45 import com.sun.corba.se.pept.transport.Connection;
  46 import com.sun.corba.se.pept.transport.EventHandler;
  47 import com.sun.corba.se.pept.transport.ListenerThread;
  48 import com.sun.corba.se.pept.transport.ReaderThread;
  49 
  50 import com.sun.corba.se.spi.logging.CORBALogDomains;
  51 import com.sun.corba.se.spi.orb.ORB;
  52 import com.sun.corba.se.spi.orbutil.threadpool.Work;
  53 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
  54 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  55 
  56 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  57 import com.sun.corba.se.impl.orbutil.ORBUtility;
  58 
  59 /**
  60  * @author Harold Carr
  61  */
  62 class SelectorImpl
  63     extends
  64         sun.misc.ManagedLocalsThread
  65     implements
  66         com.sun.corba.se.pept.transport.Selector
  67 {
  68     private ORB orb;
  69     private Selector selector;
  70     private long timeout;
  71     private List deferredRegistrations;
  72     private List interestOpsList;
  73     private HashMap listenerThreads;
  74     private Map readerThreads;
  75     private boolean selectorStarted;
  76     private volatile boolean closed;
  77     private ORBUtilSystemException wrapper;
  78 
  79 
  80     public SelectorImpl(ORB orb)
  81     {
  82         this.orb = orb;
  83         selector = null;
  84         selectorStarted = false;
  85         timeout = 60000;
  86         deferredRegistrations = new ArrayList();
  87         interestOpsList = new ArrayList();
  88         listenerThreads = new HashMap();
  89         readerThreads = java.util.Collections.synchronizedMap(new HashMap());
  90         closed = false;
  91         wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);
  92     }
  93 
  94     public void setTimeout(long timeout)
  95     {
  96         this.timeout = timeout;
  97     }
  98 
  99     public long getTimeout()
 100     {
 101         return timeout;
 102     }
 103 
 104     public void registerInterestOps(EventHandler eventHandler)
 105     {
 106         if (orb.transportDebugFlag) {
 107             dprint(".registerInterestOps:-> " + eventHandler);
 108         }
 109 
 110         SelectionKey selectionKey = eventHandler.getSelectionKey();
 111         if (selectionKey.isValid()) {
 112             int ehOps = eventHandler.getInterestOps();
 113             SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
 114             synchronized(interestOpsList) {
 115                 interestOpsList.add(keyAndOp);
 116             }
 117             // tell Selector Thread there's an update to a SelectorKey's Ops
 118             try {
 119                 if (selector != null) {
 120                     // wakeup Selector thread to process close request
 121                     selector.wakeup();
 122                 }
 123             } catch (Throwable t) {
 124                 if (orb.transportDebugFlag) {
 125                     dprint(".registerInterestOps: selector.wakeup: ", t);
 126                 }
 127             }
 128         }
 129         else {
 130             wrapper.selectionKeyInvalid(eventHandler.toString());
 131             if (orb.transportDebugFlag) {
 132                 dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
 133             }
 134         }
 135 
 136         if (orb.transportDebugFlag) {
 137             dprint(".registerInterestOps:<- ");
 138         }
 139     }
 140 
 141     public void registerForEvent(EventHandler eventHandler)
 142     {
 143         if (orb.transportDebugFlag) {
 144             dprint(".registerForEvent: " + eventHandler);
 145         }
 146 
 147         if (isClosed()) {
 148             if (orb.transportDebugFlag) {
 149                 dprint(".registerForEvent: closed: " + eventHandler);
 150             }
 151             return;
 152         }
 153 
 154         if (eventHandler.shouldUseSelectThreadToWait()) {
 155             synchronized (deferredRegistrations) {
 156                 deferredRegistrations.add(eventHandler);
 157             }
 158             if (! selectorStarted) {
 159                 startSelector();
 160             }
 161             selector.wakeup();
 162             return;
 163         }
 164 
 165         switch (eventHandler.getInterestOps()) {
 166         case SelectionKey.OP_ACCEPT :
 167             createListenerThread(eventHandler);
 168             break;
 169         case SelectionKey.OP_READ :
 170             createReaderThread(eventHandler);
 171             break;
 172         default:
 173             if (orb.transportDebugFlag) {
 174                 dprint(".registerForEvent: default: " + eventHandler);
 175             }
 176             throw new RuntimeException(
 177                 "SelectorImpl.registerForEvent: unknown interest ops");
 178         }
 179     }
 180 
 181     public void unregisterForEvent(EventHandler eventHandler)
 182     {
 183         if (orb.transportDebugFlag) {
 184             dprint(".unregisterForEvent: " + eventHandler);
 185         }
 186 
 187         if (isClosed()) {
 188             if (orb.transportDebugFlag) {
 189                 dprint(".unregisterForEvent: closed: " + eventHandler);
 190             }
 191             return;
 192         }
 193 
 194         if (eventHandler.shouldUseSelectThreadToWait()) {
 195             SelectionKey selectionKey ;
 196             synchronized(deferredRegistrations) {
 197                 selectionKey = eventHandler.getSelectionKey();
 198             }
 199             if (selectionKey != null) {
 200                 selectionKey.cancel();
 201             }
 202             if (selector != null) {
 203                 selector.wakeup();
 204             }
 205             return;
 206         }
 207 
 208         switch (eventHandler.getInterestOps()) {
 209         case SelectionKey.OP_ACCEPT :
 210             destroyListenerThread(eventHandler);
 211             break;
 212         case SelectionKey.OP_READ :
 213             destroyReaderThread(eventHandler);
 214             break;
 215         default:
 216             if (orb.transportDebugFlag) {
 217                 dprint(".unregisterForEvent: default: " + eventHandler);
 218             }
 219             throw new RuntimeException(
 220                 "SelectorImpl.uregisterForEvent: unknown interest ops");
 221         }
 222     }
 223 
 224     public void close()
 225     {
 226         if (orb.transportDebugFlag) {
 227             dprint(".close");
 228         }
 229 
 230         if (isClosed()) {
 231             if (orb.transportDebugFlag) {
 232                 dprint(".close: already closed");
 233             }
 234             return;
 235         }
 236 
 237         setClosed(true);
 238 
 239         Iterator i;
 240 
 241         // Kill listeners.
 242 
 243         i = listenerThreads.values().iterator();
 244         while (i.hasNext()) {
 245             ListenerThread listenerThread = (ListenerThread) i.next();
 246             listenerThread.close();
 247         }
 248 
 249         // Kill readers.
 250 
 251         i = readerThreads.values().iterator();
 252         while (i.hasNext()) {
 253             ReaderThread readerThread = (ReaderThread) i.next();
 254             readerThread.close();
 255         }
 256 
 257        clearDeferredRegistrations();
 258 
 259         // Selector
 260 
 261         try {
 262             if (selector != null) {
 263                 // wakeup Selector thread to process close request
 264                 selector.wakeup();
 265             }
 266         } catch (Throwable t) {
 267             if (orb.transportDebugFlag) {
 268                 dprint(".close: selector.wakeup: ", t);
 269             }
 270         }
 271     }
 272 
 273     ///////////////////////////////////////////////////
 274     //
 275     // Thread methods.
 276     //
 277 
 278     public void run()
 279     {
 280         setName("SelectorThread");
 281         while (!closed) {
 282             try {
 283                 int n = 0;
 284                 if (timeout == 0 && orb.transportDebugFlag) {
 285                     dprint(".run: Beginning of selection cycle");
 286                 }
 287                 handleDeferredRegistrations();
 288                 enableInterestOps();
 289                 try {
 290                     n = selector.select(timeout);
 291                 } catch (IOException  e) {
 292                     if (orb.transportDebugFlag) {
 293                         dprint(".run: selector.select: ", e);
 294                     }
 295                 } catch (ClosedSelectorException csEx) {
 296                     if (orb.transportDebugFlag) {
 297                         dprint(".run: selector.select: ", csEx);
 298                     }
 299                     break;
 300                 }
 301                 if (closed) {
 302                     break;
 303                 }
 304                 /*
 305                   if (timeout == 0 && orb.transportDebugFlag) {
 306                   dprint(".run: selector.select() returned: " + n);
 307                   }
 308                   if (n == 0) {
 309                   continue;
 310                   }
 311                 */
 312                 Iterator iterator = selector.selectedKeys().iterator();
 313                 if (orb.transportDebugFlag) {
 314                     if (iterator.hasNext()) {
 315                         dprint(".run: n = " + n);
 316                     }
 317                 }
 318                 while (iterator.hasNext()) {
 319                     SelectionKey selectionKey = (SelectionKey) iterator.next();
 320                     iterator.remove();
 321                     EventHandler eventHandler = (EventHandler)
 322                         selectionKey.attachment();
 323                     try {
 324                         eventHandler.handleEvent();
 325                     } catch (Throwable t) {
 326                         if (orb.transportDebugFlag) {
 327                             dprint(".run: eventHandler.handleEvent", t);
 328                         }
 329                     }
 330                 }
 331                 if (timeout == 0 && orb.transportDebugFlag) {
 332                     dprint(".run: End of selection cycle");
 333                 }
 334             } catch (Throwable t) {
 335                 // IMPORTANT: ignore all errors so the select thread keeps running.
 336                 // Otherwise a guaranteed hang.
 337                 if (orb.transportDebugFlag) {
 338                     dprint(".run: ignoring", t);
 339                 }
 340             }
 341         }
 342         try {
 343             if (selector != null) {
 344                 if (orb.transportDebugFlag) {
 345                     dprint(".run: selector.close ");
 346                 }
 347                 selector.close();
 348             }
 349         } catch (Throwable t) {
 350             if (orb.transportDebugFlag) {
 351                 dprint(".run: selector.close: ", t);
 352             }
 353         }
 354     }
 355 
 356     /////////////////////////////////////////////////////
 357     //
 358     // Implementation.
 359     //
 360 
 361     private void clearDeferredRegistrations() {
 362         synchronized (deferredRegistrations) {
 363             int deferredListSize = deferredRegistrations.size();
 364             if (orb.transportDebugFlag) {
 365                 dprint(".clearDeferredRegistrations:deferred list size == " + deferredListSize);
 366             }
 367             for (int i = 0; i < deferredListSize; i++) {
 368                 EventHandler eventHandler =
 369                     (EventHandler)deferredRegistrations.get(i);
 370                 if (orb.transportDebugFlag) {
 371                     dprint(".clearDeferredRegistrations: " + eventHandler);
 372                 }
 373                 SelectableChannel channel = eventHandler.getChannel();
 374                 SelectionKey selectionKey = null;
 375 
 376                 try {
 377                     if (orb.transportDebugFlag) {
 378                         dprint(".clearDeferredRegistrations:close channel == "
 379                                 + channel);
 380                         dprint(".clearDeferredRegistrations:close channel class == "
 381                                 + channel.getClass().getName());
 382                     }
 383                     channel.close();
 384                     selectionKey = eventHandler.getSelectionKey();
 385                     if (selectionKey != null) {
 386                         selectionKey.cancel();
 387                         selectionKey.attach(null);
 388                     }
 389                 } catch (IOException ioEx) {
 390                     if (orb.transportDebugFlag) {
 391                         dprint(".clearDeferredRegistrations: ", ioEx);
 392                     }
 393                 }
 394             }
 395             deferredRegistrations.clear();
 396         }
 397     }
 398 
 399     private synchronized boolean isClosed ()
 400     {
 401         return closed;
 402     }
 403 
 404     private synchronized void setClosed(boolean closed)
 405     {
 406         this.closed = closed;
 407     }
 408 
 409     private void startSelector()
 410     {
 411         try {
 412             selector = Selector.open();
 413         } catch (IOException e) {
 414             if (orb.transportDebugFlag) {
 415                 dprint(".startSelector: Selector.open: IOException: ", e);
 416             }
 417             // REVISIT - better handling/reporting
 418             RuntimeException rte =
 419                 new RuntimeException(".startSelector: Selector.open exception");
 420             rte.initCause(e);
 421             throw rte;
 422         }
 423         setDaemon(true);
 424         start();
 425         selectorStarted = true;
 426         if (orb.transportDebugFlag) {
 427             dprint(".startSelector: selector.start completed.");
 428         }
 429     }
 430 
 431     private void handleDeferredRegistrations()
 432     {
 433         synchronized (deferredRegistrations) {
 434             int deferredListSize = deferredRegistrations.size();
 435             for (int i = 0; i < deferredListSize; i++) {
 436                 EventHandler eventHandler =
 437                     (EventHandler)deferredRegistrations.get(i);
 438                 if (orb.transportDebugFlag) {
 439                     dprint(".handleDeferredRegistrations: " + eventHandler);
 440                 }
 441                 SelectableChannel channel = eventHandler.getChannel();
 442                 SelectionKey selectionKey = null;
 443                 try {
 444                     selectionKey =
 445                         channel.register(selector,
 446                                          eventHandler.getInterestOps(),
 447                                          (Object)eventHandler);
 448                 } catch (ClosedChannelException e) {
 449                     if (orb.transportDebugFlag) {
 450                         dprint(".handleDeferredRegistrations: ", e);
 451                     }
 452                 }
 453                 eventHandler.setSelectionKey(selectionKey);
 454             }
 455             deferredRegistrations.clear();
 456         }
 457     }
 458 
 459     private void enableInterestOps()
 460     {
 461         synchronized (interestOpsList) {
 462             int listSize = interestOpsList.size();
 463             if (listSize > 0) {
 464                 if (orb.transportDebugFlag) {
 465                     dprint(".enableInterestOps:->");
 466                 }
 467                 SelectionKey selectionKey = null;
 468                 SelectionKeyAndOp keyAndOp = null;
 469                 int keyOp, selectionKeyOps = 0;
 470                 for (int i = 0; i < listSize; i++) {
 471                     keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
 472                     selectionKey = keyAndOp.selectionKey;
 473 
 474                     // Need to check if the SelectionKey is valid because a
 475                     // connection's SelectionKey could be put on the list to
 476                     // have its OP enabled and before it's enabled be reclaimed.
 477                     // Otherwise, the enabling of the OP will throw an exception
 478                     // here and exit this method an potentially not enable all
 479                     // registered ops.
 480                     //
 481                     // So, we ignore SelectionKeys that are invalid. They will get
 482                     // cleaned up on the next Selector.select() call.
 483 
 484                     if (selectionKey.isValid()) {
 485                         if (orb.transportDebugFlag) {
 486                             dprint(".enableInterestOps: " + keyAndOp);
 487                         }
 488                         keyOp = keyAndOp.keyOp;
 489                         selectionKeyOps = selectionKey.interestOps();
 490                         selectionKey.interestOps(selectionKeyOps | keyOp);
 491                     }
 492                 }
 493                 interestOpsList.clear();
 494                 if (orb.transportDebugFlag) {
 495                     dprint(".enableInterestOps:<-");
 496                 }
 497             }
 498         }
 499     }
 500 
 501     private void createListenerThread(EventHandler eventHandler)
 502     {
 503         if (orb.transportDebugFlag) {
 504             dprint(".createListenerThread: " + eventHandler);
 505         }
 506         Acceptor acceptor = eventHandler.getAcceptor();
 507         ListenerThread listenerThread =
 508             new ListenerThreadImpl(orb, acceptor, this);
 509         listenerThreads.put(eventHandler, listenerThread);
 510         Throwable throwable = null;
 511         try {
 512             orb.getThreadPoolManager().getThreadPool(0)
 513                 .getWorkQueue(0).addWork((Work)listenerThread);
 514         } catch (NoSuchThreadPoolException e) {
 515             throwable = e;
 516         } catch (NoSuchWorkQueueException e) {
 517             throwable = e;
 518         }
 519         if (throwable != null) {
 520             RuntimeException rte = new RuntimeException(throwable.toString());
 521             rte.initCause(throwable);
 522             throw rte;
 523         }
 524     }
 525 
 526     private void destroyListenerThread(EventHandler eventHandler)
 527     {
 528         if (orb.transportDebugFlag) {
 529             dprint(".destroyListenerThread: " + eventHandler);
 530         }
 531         ListenerThread listenerThread = (ListenerThread)
 532             listenerThreads.get(eventHandler);
 533         if (listenerThread == null) {
 534             if (orb.transportDebugFlag) {
 535                 dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
 536             }
 537             return;
 538         }
 539         listenerThreads.remove(eventHandler);
 540         listenerThread.close();
 541     }
 542 
 543     private void createReaderThread(EventHandler eventHandler)
 544     {
 545         if (orb.transportDebugFlag) {
 546             dprint(".createReaderThread: " + eventHandler);
 547         }
 548         Connection connection = eventHandler.getConnection();
 549         ReaderThread readerThread =
 550             new ReaderThreadImpl(orb, connection, this);
 551         readerThreads.put(eventHandler, readerThread);
 552         Throwable throwable = null;
 553         try {
 554             orb.getThreadPoolManager().getThreadPool(0)
 555                 .getWorkQueue(0).addWork((Work)readerThread);
 556         } catch (NoSuchThreadPoolException e) {
 557             throwable = e;
 558         } catch (NoSuchWorkQueueException e) {
 559             throwable = e;
 560         }
 561         if (throwable != null) {
 562             RuntimeException rte = new RuntimeException(throwable.toString());
 563             rte.initCause(throwable);
 564             throw rte;
 565         }
 566     }
 567 
 568     private void destroyReaderThread(EventHandler eventHandler)
 569     {
 570         if (orb.transportDebugFlag) {
 571             dprint(".destroyReaderThread: " + eventHandler);
 572         }
 573         ReaderThread readerThread = (ReaderThread)
 574             readerThreads.get(eventHandler);
 575         if (readerThread == null) {
 576             if (orb.transportDebugFlag) {
 577                 dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
 578             }
 579             return;
 580         }
 581         readerThreads.remove(eventHandler);
 582         readerThread.close();
 583     }
 584 
 585     private void dprint(String msg)
 586     {
 587         ORBUtility.dprint("SelectorImpl", msg);
 588     }
 589 
 590     protected void dprint(String msg, Throwable t)
 591     {
 592         dprint(msg);
 593         t.printStackTrace(System.out);
 594     }
 595 
 596     // Private class to contain a SelectionKey and a SelectionKey op.
 597     // Used only by SelectorImpl to register and enable SelectionKey
 598     // Op.
 599     // REVISIT - Could do away with this class and use the EventHanlder
 600     //           directly.
 601     private class SelectionKeyAndOp
 602     {
 603         // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT]
 604         public int keyOp;
 605         public SelectionKey selectionKey;
 606 
 607         // constructor
 608         public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) {
 609             this.selectionKey = selectionKey;
 610             this.keyOp = keyOp;
 611         }
 612     }
 613 
 614 // End of file.
 615 }