1 /*
   2  * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.orbutil.threadpool;
  27 
  28 import java.io.IOException;
  29 import java.io.Closeable;
  30 
  31 import java.security.AccessController;
  32 import java.security.PrivilegedAction;
  33 
  34 import java.util.List;
  35 import java.util.ArrayList;
  36 
  37 import java.util.concurrent.atomic.AtomicInteger;
  38 import java.util.concurrent.atomic.AtomicLong;
  39 
  40 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  41 import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
  42 import com.sun.corba.se.spi.orbutil.threadpool.Work;
  43 import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;
  44 
  45 import com.sun.corba.se.impl.orbutil.ORBConstants;
  46 import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;
  47 
  48 import com.sun.corba.se.spi.monitoring.MonitoringConstants;
  49 import com.sun.corba.se.spi.monitoring.MonitoredObject;
  50 import com.sun.corba.se.spi.monitoring.MonitoringFactories;
  51 import com.sun.corba.se.spi.orb.ORB;
  52 import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
  53 
  54 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  55 import com.sun.corba.se.impl.orbutil.ORBConstants;
  56 import com.sun.corba.se.spi.logging.CORBALogDomains;
  57 import com.sun.corba.se.impl.transport.ManagedLocalsThread;
  58 
  59 public class ThreadPoolImpl implements ThreadPool
  60 {
  61     // serial counter useful for debugging
  62     private static AtomicInteger threadCounter = new AtomicInteger(0);
  63     private static final ORBUtilSystemException wrapper =
  64         ORBUtilSystemException.get(CORBALogDomains.RPC_TRANSPORT);
  65 
  66 
  67     // Any time currentThreadCount and/or availableWorkerThreads is updated
  68     // or accessed this ThreadPool's WorkQueue must be locked. And, it is
  69     // expected that this ThreadPool's WorkQueue is the only object that
  70     // updates and accesses these values directly and indirectly though a
  71     // call to a method in this ThreadPool. If any call to update or access
  72     // those values must synchronized on this ThreadPool's WorkQueue.
  73     private WorkQueue workQueue;
  74 
  75     // Stores the number of available worker threads
  76     private int availableWorkerThreads = 0;
  77 
  78     // Stores the number of threads in the threadpool currently
  79     private int currentThreadCount = 0;
  80 
  81     // Minimum number of worker threads created at instantiation of the threadpool
  82     private int minWorkerThreads = 0;
  83 
  84     // Maximum number of worker threads in the threadpool
  85     private int maxWorkerThreads = 0;
  86 
  87     // Inactivity timeout value for worker threads to exit and stop running
  88     private long inactivityTimeout;
  89 
  90     // Indicates if the threadpool is bounded or unbounded
  91     private boolean boundedThreadPool = false;
  92 
  93     // Running count of the work items processed
  94     // Set the value to 1 so that divide by zero is avoided in
  95     // averageWorkCompletionTime()
  96     private AtomicLong processedCount = new AtomicLong(1);
  97 
  98     // Running aggregate of the time taken in millis to execute work items
  99     // processed by the threads in the threadpool
 100     private AtomicLong totalTimeTaken = new AtomicLong(0);
 101 
 102     // Name of the ThreadPool
 103     private String name;
 104 
 105     // MonitoredObject for ThreadPool
 106     private MonitoredObject threadpoolMonitoredObject;
 107 
 108     // ThreadGroup in which threads should be created
 109     private ThreadGroup threadGroup;
 110 
 111     Object workersLock = new Object();
 112     List<WorkerThread> workers = new ArrayList<>();
 113 
 114     /**
 115      * This constructor is used to create an unbounded threadpool
 116      */
 117     public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) {
 118         inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT;
 119         maxWorkerThreads = Integer.MAX_VALUE;
 120         workQueue = new WorkQueueImpl(this);
 121         threadGroup = tg;
 122         name = threadpoolName;
 123         initializeMonitoring();
 124     }
 125 
 126     /**
 127      * This constructor is used to create an unbounded threadpool
 128      * in the ThreadGroup of the current thread
 129      */
 130     public ThreadPoolImpl(String threadpoolName) {
 131         this( Thread.currentThread().getThreadGroup(), threadpoolName ) ;
 132     }
 133 
 134     /**
 135      * This constructor is used to create bounded threadpool
 136      */
 137     public ThreadPoolImpl(int minSize, int maxSize, long timeout,
 138                                             String threadpoolName)
 139     {
 140         minWorkerThreads = minSize;
 141         maxWorkerThreads = maxSize;
 142         inactivityTimeout = timeout;
 143         boundedThreadPool = true;
 144         workQueue = new WorkQueueImpl(this);
 145         name = threadpoolName;
 146         for (int i = 0; i < minWorkerThreads; i++) {
 147             createWorkerThread();
 148         }
 149         initializeMonitoring();
 150     }
 151 
 152     // Note that this method should not return until AFTER all threads have died.
 153     public void close() throws IOException {
 154 
 155         // Copy to avoid concurrent modification problems.
 156         List<WorkerThread> copy = null;
 157         synchronized (workersLock) {
 158             copy = new ArrayList<>(workers);
 159         }
 160 
 161         for (WorkerThread wt : copy) {
 162             wt.close();
 163             while (wt.getState() != Thread.State.TERMINATED) {
 164                 try {
 165                     wt.join();
 166                 } catch (InterruptedException exc) {
 167                     wrapper.interruptedJoinCallWhileClosingThreadPool(exc, wt, this);
 168                 }
 169             }
 170         }
 171 
 172         threadGroup = null;
 173     }
 174 
 175 
 176     // Setup monitoring for this threadpool
 177     private void initializeMonitoring() {
 178         // Get root monitored object
 179         MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory().
 180                 createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null).
 181                 getRootMonitoredObject();
 182 
 183         // Create the threadpool monitoring root
 184         MonitoredObject threadPoolMonitoringObjectRoot = root.getChild(
 185                     MonitoringConstants.THREADPOOL_MONITORING_ROOT);
 186         if (threadPoolMonitoringObjectRoot == null) {
 187             threadPoolMonitoringObjectRoot =  MonitoringFactories.
 188                     getMonitoredObjectFactory().createMonitoredObject(
 189                     MonitoringConstants.THREADPOOL_MONITORING_ROOT,
 190                     MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION);
 191             root.addChild(threadPoolMonitoringObjectRoot);
 192         }
 193         threadpoolMonitoredObject = MonitoringFactories.
 194                     getMonitoredObjectFactory().
 195                     createMonitoredObject(name,
 196                     MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION);
 197 
 198         threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject);
 199 
 200         LongMonitoredAttributeBase b1 = new
 201             LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS,
 202                     MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
 203                 public Object getValue() {
 204                     return new Long(ThreadPoolImpl.this.currentNumberOfThreads());
 205                 }
 206             };
 207         threadpoolMonitoredObject.addAttribute(b1);
 208         LongMonitoredAttributeBase b2 = new
 209             LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS,
 210                     MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
 211                 public Object getValue() {
 212                     return new Long(ThreadPoolImpl.this.numberOfAvailableThreads());
 213                 }
 214             };
 215         threadpoolMonitoredObject.addAttribute(b2);
 216         LongMonitoredAttributeBase b3 = new
 217             LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS,
 218                     MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) {
 219                 public Object getValue() {
 220                     return new Long(ThreadPoolImpl.this.numberOfBusyThreads());
 221                 }
 222             };
 223         threadpoolMonitoredObject.addAttribute(b3);
 224         LongMonitoredAttributeBase b4 = new
 225             LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME,
 226                     MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) {
 227                 public Object getValue() {
 228                     return new Long(ThreadPoolImpl.this.averageWorkCompletionTime());
 229                 }
 230             };
 231         threadpoolMonitoredObject.addAttribute(b4);
 232         LongMonitoredAttributeBase b5 = new
 233             LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT,
 234                     MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) {
 235                 public Object getValue() {
 236                     return new Long(ThreadPoolImpl.this.currentProcessedCount());
 237                 }
 238             };
 239         threadpoolMonitoredObject.addAttribute(b5);
 240 
 241         // Add the monitored object for the WorkQueue
 242 
 243         threadpoolMonitoredObject.addChild(
 244                 ((WorkQueueImpl)workQueue).getMonitoredObject());
 245     }
 246 
 247     // Package private method to get the monitored object for this
 248     // class
 249     MonitoredObject getMonitoredObject() {
 250         return threadpoolMonitoredObject;
 251     }
 252 
 253     public WorkQueue getAnyWorkQueue()
 254     {
 255         return workQueue;
 256     }
 257 
 258     public WorkQueue getWorkQueue(int queueId)
 259         throws NoSuchWorkQueueException
 260     {
 261         if (queueId != 0)
 262             throw new NoSuchWorkQueueException();
 263         return workQueue;
 264     }
 265 
 266     /**
 267      * To be called from the workqueue when work is added to the
 268      * workQueue. This method would create new threads if required
 269      * or notify waiting threads on the queue for available work
 270      */
 271     void notifyForAvailableWork(WorkQueue aWorkQueue) {
 272         synchronized (aWorkQueue) {
 273             if (availableWorkerThreads < aWorkQueue.workItemsInQueue()) {
 274                 createWorkerThread();
 275             } else {
 276                 aWorkQueue.notify();
 277             }
 278         }
 279     }
 280 
 281 
 282     private Thread createWorkerThreadHelper( String name ) {
 283         // Thread creation needs to be in a doPrivileged block
 284         // if there is a non-null security manager for two reasons:
 285         // 1. The creation of a thread in a specific ThreadGroup
 286         //    is a privileged operation.  Lack of a doPrivileged
 287         //    block here causes an AccessControlException
 288         //    (see bug 6268145).
 289         // 2. We want to make sure that the permissions associated
 290         //    with this thread do NOT include the permissions of
 291         //    the current thread that is calling this method.
 292         //    This leads to problems in the app server where
 293         //    some threads in the ThreadPool randomly get
 294         //    bad permissions, leading to unpredictable
 295         //    permission errors (see bug 6021011).
 296         //
 297         //    A Java thread contains a stack of call frames,
 298         //    one for each method called that has not yet returned.
 299         //    Each method comes from a particular class.  The class
 300         //    was loaded by a ClassLoader which has an associated
 301         //    CodeSource, and this determines the Permissions
 302         //    for all methods in that class.  The current
 303         //    Permissions for the thread are the intersection of
 304         //    all Permissions for the methods on the stack.
 305         //    This is part of the Security Context of the thread.
 306         //
 307         //    When a thread creates a new thread, the new thread
 308         //    inherits the security context of the old thread.
 309         //    This is bad in a ThreadPool, because different
 310         //    creators of threads may have different security contexts.
 311         //    This leads to occasional unpredictable errors when
 312         //    a thread is re-used in a different security context.
 313         //
 314         //    Avoiding this problem is simple: just do the thread
 315         //    creation in a doPrivileged block.  This sets the
 316         //    inherited security context to that of the code source
 317         //    for the ORB code itself, which contains all permissions
 318         //    in either Java SE or Java EE.
 319         WorkerThread thread = new WorkerThread(threadGroup, name);
 320         synchronized (workersLock) {
 321             workers.add(thread);
 322         }
 323 
 324         // The thread must be set to a daemon thread so the
 325         // VM can exit if the only threads left are PooledThreads
 326         // or other daemons.  We don't want to rely on the
 327         // calling thread always being a daemon.
 328         // Note that no exception is possible here since we
 329         // are inside the doPrivileged block.
 330         thread.setDaemon(true);
 331 
 332         wrapper.workerThreadCreated(thread, thread.getContextClassLoader());
 333 
 334         thread.start();
 335         return null;
 336     }
 337 
 338 
 339     /**
 340      * To be called from the workqueue to create worker threads when none
 341      * available.
 342      */
 343     void createWorkerThread() {
 344         final String name = getName();
 345         synchronized (workQueue) {
 346             try {
 347                 if (System.getSecurityManager() == null) {
 348                     createWorkerThreadHelper(name);
 349                 } else {
 350                     // If we get here, we need to create a thread.
 351                     AccessController.doPrivileged(
 352                             new PrivilegedAction() {
 353                         public Object run() {
 354                             return createWorkerThreadHelper(name);
 355                         }
 356                     }
 357                     );
 358                 }
 359             } catch (Throwable t) {
 360                 // Decrementing the count of current worker threads.
 361                 // But, it will be increased in the finally block.
 362                 decrementCurrentNumberOfThreads();
 363                 wrapper.workerThreadCreationFailure(t);
 364             } finally {
 365                 incrementCurrentNumberOfThreads();
 366             }
 367         }
 368     }
 369 
 370     public int minimumNumberOfThreads() {
 371         return minWorkerThreads;
 372     }
 373 
 374     public int maximumNumberOfThreads() {
 375         return maxWorkerThreads;
 376     }
 377 
 378     public long idleTimeoutForThreads() {
 379         return inactivityTimeout;
 380     }
 381 
 382     public int currentNumberOfThreads() {
 383         synchronized (workQueue) {
 384             return currentThreadCount;
 385         }
 386     }
 387 
 388     void decrementCurrentNumberOfThreads() {
 389         synchronized (workQueue) {
 390             currentThreadCount--;
 391         }
 392     }
 393 
 394     void incrementCurrentNumberOfThreads() {
 395         synchronized (workQueue) {
 396             currentThreadCount++;
 397         }
 398     }
 399 
 400     public int numberOfAvailableThreads() {
 401         synchronized (workQueue) {
 402             return availableWorkerThreads;
 403         }
 404     }
 405 
 406     public int numberOfBusyThreads() {
 407         synchronized (workQueue) {
 408             return (currentThreadCount - availableWorkerThreads);
 409         }
 410     }
 411 
 412     public long averageWorkCompletionTime() {
 413         synchronized (workQueue) {
 414             return (totalTimeTaken.get() / processedCount.get());
 415         }
 416     }
 417 
 418     public long currentProcessedCount() {
 419         synchronized (workQueue) {
 420             return processedCount.get();
 421         }
 422     }
 423 
 424     public String getName() {
 425         return name;
 426     }
 427 
 428     /**
 429     * This method will return the number of WorkQueues serviced by the threadpool.
 430     */
 431     public int numberOfWorkQueues() {
 432         return 1;
 433     }
 434 
 435 
 436     private static synchronized int getUniqueThreadId() {
 437         return ThreadPoolImpl.threadCounter.incrementAndGet();
 438     }
 439 
 440     /**
 441      * This method will decrement the number of available threads
 442      * in the threadpool which are waiting for work. Called from
 443      * WorkQueueImpl.requestWork()
 444      */
 445     void decrementNumberOfAvailableThreads() {
 446         synchronized (workQueue) {
 447             availableWorkerThreads--;
 448         }
 449     }
 450 
 451     /**
 452      * This method will increment the number of available threads
 453      * in the threadpool which are waiting for work. Called from
 454      * WorkQueueImpl.requestWork()
 455      */
 456     void incrementNumberOfAvailableThreads() {
 457         synchronized (workQueue) {
 458             availableWorkerThreads++;
 459         }
 460     }
 461 
 462 
 463     private class WorkerThread extends ManagedLocalsThread implements Closeable
 464     {
 465         private Work currentWork;
 466         private int threadId = 0; // unique id for the thread
 467         private volatile boolean closeCalled = false;
 468         private String threadPoolName;
 469         // name seen by Thread.getName()
 470         private StringBuffer workerThreadName = new StringBuffer();
 471 
 472         WorkerThread(ThreadGroup tg, String threadPoolName) {
 473             super(tg, "Idle");
 474             this.threadId = ThreadPoolImpl.getUniqueThreadId();
 475             this.threadPoolName = threadPoolName;
 476             setName(composeWorkerThreadName(threadPoolName, "Idle"));
 477         }
 478 
 479         public synchronized void close() {
 480             closeCalled = true;
 481             interrupt();
 482         }
 483 
 484         private void resetClassLoader() {
 485 
 486         }
 487 
 488         private void performWork() {
 489             long start = System.currentTimeMillis();
 490             try {
 491                 currentWork.doWork();
 492             } catch (Throwable t) {
 493                 wrapper.workerThreadDoWorkThrowable(this, t);
 494             }
 495             long elapsedTime = System.currentTimeMillis() - start;
 496             totalTimeTaken.addAndGet(elapsedTime);
 497             processedCount.incrementAndGet();
 498         }
 499 
 500         public void run() {
 501             try  {
 502                 while (!closeCalled) {
 503                     try {
 504                         currentWork = ((WorkQueueImpl)workQueue).requestWork(
 505                             inactivityTimeout);
 506                         if (currentWork == null)
 507                             continue;
 508                     } catch (InterruptedException exc) {
 509                         wrapper.workQueueThreadInterrupted( exc, getName(),
 510                            Boolean.valueOf(closeCalled));
 511 
 512                         continue ;
 513                     } catch (Throwable t) {
 514                          wrapper.workerThreadThrowableFromRequestWork(this, t,
 515                                 workQueue.getName());
 516 
 517                         continue;
 518                     }
 519 
 520                     performWork();
 521 
 522                     // set currentWork to null so that the work item can be
 523                     // garbage collected without waiting for the next work item.
 524                     currentWork = null;
 525 
 526                     resetClassLoader();
 527                 }
 528             } catch (Throwable e) {
 529                 // This should not be possible
 530                 wrapper.workerThreadCaughtUnexpectedThrowable(this,e);
 531             } finally {
 532                 synchronized (workersLock) {
 533                     workers.remove(this);
 534                 }
 535             }
 536         }
 537 
 538         private String composeWorkerThreadName(String poolName, String workerName) {
 539             workerThreadName.setLength(0);
 540             workerThreadName.append("p: ").append(poolName);
 541             workerThreadName.append("; w: ").append(workerName);
 542             return workerThreadName.toString();
 543         }
 544     } // End of WorkerThread class
 545 
 546 }
 547 
 548 // End of file.