1 /*
   2  * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package com.sun.corba.se.impl.orbutil.threadpool;
  27 
  28 import java.io.IOException;
  29 import java.io.Closeable;
  30 
  31 import java.security.AccessController;
  32 import java.security.PrivilegedAction;
  33 
  34 import java.util.List;
  35 import java.util.ArrayList;
  36 
  37 import java.util.concurrent.atomic.AtomicInteger;
  38 import java.util.concurrent.atomic.AtomicLong;
  39 
  40 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  41 import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
  42 import com.sun.corba.se.spi.orbutil.threadpool.Work;
  43 import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;
  44 
  45 import com.sun.corba.se.impl.orbutil.ORBConstants;
  46 import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;
  47 
  48 import com.sun.corba.se.spi.monitoring.MonitoringConstants;
  49 import com.sun.corba.se.spi.monitoring.MonitoredObject;
  50 import com.sun.corba.se.spi.monitoring.MonitoringFactories;
  51 import com.sun.corba.se.spi.orb.ORB;
  52 import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
  53 
  54 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  55 import com.sun.corba.se.impl.orbutil.ORBConstants;
  56 import com.sun.corba.se.spi.logging.CORBALogDomains;
  57 
  58 public class ThreadPoolImpl implements ThreadPool
  59 {
  60     // serial counter useful for debugging
  61     private static AtomicInteger threadCounter = new AtomicInteger(0);
  62     private static final ORBUtilSystemException wrapper =
  63         ORBUtilSystemException.get(CORBALogDomains.RPC_TRANSPORT);
  64 
  65 
  66     // Any time currentThreadCount and/or availableWorkerThreads is updated
  67     // or accessed this ThreadPool's WorkQueue must be locked. And, it is
  68     // expected that this ThreadPool's WorkQueue is the only object that
  69     // updates and accesses these values directly and indirectly though a
  70     // call to a method in this ThreadPool. If any call to update or access
  71     // those values must synchronized on this ThreadPool's WorkQueue.
  72     private WorkQueue workQueue;
  73 
  74     // Stores the number of available worker threads
  75     private int availableWorkerThreads = 0;
  76 
  77     // Stores the number of threads in the threadpool currently
  78     private int currentThreadCount = 0;
  79 
  80     // Minimum number of worker threads created at instantiation of the threadpool
  81     private int minWorkerThreads = 0;
  82 
  83     // Maximum number of worker threads in the threadpool
  84     private int maxWorkerThreads = 0;
  85 
  86     // Inactivity timeout value for worker threads to exit and stop running
  87     private long inactivityTimeout;
  88 
  89     // Indicates if the threadpool is bounded or unbounded
  90     private boolean boundedThreadPool = false;
  91 
  92     // Running count of the work items processed
  93     // Set the value to 1 so that divide by zero is avoided in
  94     // averageWorkCompletionTime()
  95     private AtomicLong processedCount = new AtomicLong(1);
  96 
  97     // Running aggregate of the time taken in millis to execute work items
  98     // processed by the threads in the threadpool
  99     private AtomicLong totalTimeTaken = new AtomicLong(0);
 100 
 101     // Name of the ThreadPool
 102     private String name;
 103 
 104     // MonitoredObject for ThreadPool
 105     private MonitoredObject threadpoolMonitoredObject;
 106 
 107     // ThreadGroup in which threads should be created
 108     private ThreadGroup threadGroup;
 109 
 110     Object workersLock = new Object();
 111     List<WorkerThread> workers = new ArrayList<>();
 112 
 113     /**
 114      * This constructor is used to create an unbounded threadpool
 115      */
 116     public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) {
 117         inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT;
 118         maxWorkerThreads = Integer.MAX_VALUE;
 119         workQueue = new WorkQueueImpl(this);
 120         threadGroup = tg;
 121         name = threadpoolName;
 122         initializeMonitoring();
 123     }
 124 
 125     /**
 126      * This constructor is used to create an unbounded threadpool
 127      * in the ThreadGroup of the current thread
 128      */
 129     public ThreadPoolImpl(String threadpoolName) {
 130         this( Thread.currentThread().getThreadGroup(), threadpoolName ) ;
 131     }
 132 
 133     /**
 134      * This constructor is used to create bounded threadpool
 135      */
 136     public ThreadPoolImpl(int minSize, int maxSize, long timeout,
 137                                             String threadpoolName)
 138     {
 139         minWorkerThreads = minSize;
 140         maxWorkerThreads = maxSize;
 141         inactivityTimeout = timeout;
 142         boundedThreadPool = true;
 143         workQueue = new WorkQueueImpl(this);
 144         name = threadpoolName;
 145         for (int i = 0; i < minWorkerThreads; i++) {
 146             createWorkerThread();
 147         }
 148         initializeMonitoring();
 149     }
 150 
 151     // Note that this method should not return until AFTER all threads have died.
 152     public void close() throws IOException {
 153 
 154         // Copy to avoid concurrent modification problems.
 155         List<WorkerThread> copy = null;
 156         synchronized (workersLock) {
 157             copy = new ArrayList<>(workers);
 158         }
 159 
 160         for (WorkerThread wt : copy) {
 161             wt.close();
 162             while (wt.getState() != Thread.State.TERMINATED) {
 163                 try {
 164                     wt.join();
 165                 } catch (InterruptedException exc) {
 166                     wrapper.interruptedJoinCallWhileClosingThreadPool(exc, wt, this);
 167                 }
 168             }
 169         }
 170 
 171         threadGroup = null;
 172     }
 173 
 174 
 175     // Setup monitoring for this threadpool
 176     private void initializeMonitoring() {
 177         // Get root monitored object
 178         MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory().
 179                 createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null).
 180                 getRootMonitoredObject();
 181 
 182         // Create the threadpool monitoring root
 183         MonitoredObject threadPoolMonitoringObjectRoot = root.getChild(
 184                     MonitoringConstants.THREADPOOL_MONITORING_ROOT);
 185         if (threadPoolMonitoringObjectRoot == null) {
 186             threadPoolMonitoringObjectRoot =  MonitoringFactories.
 187                     getMonitoredObjectFactory().createMonitoredObject(
 188                     MonitoringConstants.THREADPOOL_MONITORING_ROOT,
 189                     MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION);
 190             root.addChild(threadPoolMonitoringObjectRoot);
 191         }
 192         threadpoolMonitoredObject = MonitoringFactories.
 193                     getMonitoredObjectFactory().
 194                     createMonitoredObject(name,
 195                     MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION);
 196 
 197         threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject);
 198 
 199         LongMonitoredAttributeBase b1 = new
 200             LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS,
 201                     MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
 202                 public Object getValue() {
 203                     return new Long(ThreadPoolImpl.this.currentNumberOfThreads());
 204                 }
 205             };
 206         threadpoolMonitoredObject.addAttribute(b1);
 207         LongMonitoredAttributeBase b2 = new
 208             LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS,
 209                     MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
 210                 public Object getValue() {
 211                     return new Long(ThreadPoolImpl.this.numberOfAvailableThreads());
 212                 }
 213             };
 214         threadpoolMonitoredObject.addAttribute(b2);
 215         LongMonitoredAttributeBase b3 = new
 216             LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS,
 217                     MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) {
 218                 public Object getValue() {
 219                     return new Long(ThreadPoolImpl.this.numberOfBusyThreads());
 220                 }
 221             };
 222         threadpoolMonitoredObject.addAttribute(b3);
 223         LongMonitoredAttributeBase b4 = new
 224             LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME,
 225                     MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) {
 226                 public Object getValue() {
 227                     return new Long(ThreadPoolImpl.this.averageWorkCompletionTime());
 228                 }
 229             };
 230         threadpoolMonitoredObject.addAttribute(b4);
 231         LongMonitoredAttributeBase b5 = new
 232             LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT,
 233                     MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) {
 234                 public Object getValue() {
 235                     return new Long(ThreadPoolImpl.this.currentProcessedCount());
 236                 }
 237             };
 238         threadpoolMonitoredObject.addAttribute(b5);
 239 
 240         // Add the monitored object for the WorkQueue
 241 
 242         threadpoolMonitoredObject.addChild(
 243                 ((WorkQueueImpl)workQueue).getMonitoredObject());
 244     }
 245 
 246     // Package private method to get the monitored object for this
 247     // class
 248     MonitoredObject getMonitoredObject() {
 249         return threadpoolMonitoredObject;
 250     }
 251 
 252     public WorkQueue getAnyWorkQueue()
 253     {
 254         return workQueue;
 255     }
 256 
 257     public WorkQueue getWorkQueue(int queueId)
 258         throws NoSuchWorkQueueException
 259     {
 260         if (queueId != 0)
 261             throw new NoSuchWorkQueueException();
 262         return workQueue;
 263     }
 264 
 265     /**
 266      * To be called from the workqueue when work is added to the
 267      * workQueue. This method would create new threads if required
 268      * or notify waiting threads on the queue for available work
 269      */
 270     void notifyForAvailableWork(WorkQueue aWorkQueue) {
 271         synchronized (aWorkQueue) {
 272             if (availableWorkerThreads < aWorkQueue.workItemsInQueue()) {
 273                 createWorkerThread();
 274             } else {
 275                 aWorkQueue.notify();
 276             }
 277         }
 278     }
 279 
 280 
 281     private Thread createWorkerThreadHelper( String name ) {
 282         // Thread creation needs to be in a doPrivileged block
 283         // if there is a non-null security manager for two reasons:
 284         // 1. The creation of a thread in a specific ThreadGroup
 285         //    is a privileged operation.  Lack of a doPrivileged
 286         //    block here causes an AccessControlException
 287         //    (see bug 6268145).
 288         // 2. We want to make sure that the permissions associated
 289         //    with this thread do NOT include the permissions of
 290         //    the current thread that is calling this method.
 291         //    This leads to problems in the app server where
 292         //    some threads in the ThreadPool randomly get
 293         //    bad permissions, leading to unpredictable
 294         //    permission errors (see bug 6021011).
 295         //
 296         //    A Java thread contains a stack of call frames,
 297         //    one for each method called that has not yet returned.
 298         //    Each method comes from a particular class.  The class
 299         //    was loaded by a ClassLoader which has an associated
 300         //    CodeSource, and this determines the Permissions
 301         //    for all methods in that class.  The current
 302         //    Permissions for the thread are the intersection of
 303         //    all Permissions for the methods on the stack.
 304         //    This is part of the Security Context of the thread.
 305         //
 306         //    When a thread creates a new thread, the new thread
 307         //    inherits the security context of the old thread.
 308         //    This is bad in a ThreadPool, because different
 309         //    creators of threads may have different security contexts.
 310         //    This leads to occasional unpredictable errors when
 311         //    a thread is re-used in a different security context.
 312         //
 313         //    Avoiding this problem is simple: just do the thread
 314         //    creation in a doPrivileged block.  This sets the
 315         //    inherited security context to that of the code source
 316         //    for the ORB code itself, which contains all permissions
 317         //    in either Java SE or Java EE.
 318         WorkerThread thread = new WorkerThread(threadGroup, name);
 319         synchronized (workersLock) {
 320             workers.add(thread);
 321         }
 322 
 323         // The thread must be set to a daemon thread so the
 324         // VM can exit if the only threads left are PooledThreads
 325         // or other daemons.  We don't want to rely on the
 326         // calling thread always being a daemon.
 327         // Note that no exception is possible here since we
 328         // are inside the doPrivileged block.
 329         thread.setDaemon(true);
 330 
 331         wrapper.workerThreadCreated(thread, thread.getContextClassLoader());
 332 
 333         thread.start();
 334         return null;
 335     }
 336 
 337 
 338     /**
 339      * To be called from the workqueue to create worker threads when none
 340      * available.
 341      */
 342     void createWorkerThread() {
 343         final String name = getName();
 344         synchronized (workQueue) {
 345             try {
 346                 if (System.getSecurityManager() == null) {
 347                     createWorkerThreadHelper(name);
 348                 } else {
 349                     // If we get here, we need to create a thread.
 350                     AccessController.doPrivileged(
 351                             new PrivilegedAction() {
 352                         public Object run() {
 353                             return createWorkerThreadHelper(name);
 354                         }
 355                     }
 356                     );
 357                 }
 358             } catch (Throwable t) {
 359                 // Decrementing the count of current worker threads.
 360                 // But, it will be increased in the finally block.
 361                 decrementCurrentNumberOfThreads();
 362                 wrapper.workerThreadCreationFailure(t);
 363             } finally {
 364                 incrementCurrentNumberOfThreads();
 365             }
 366         }
 367     }
 368 
 369     public int minimumNumberOfThreads() {
 370         return minWorkerThreads;
 371     }
 372 
 373     public int maximumNumberOfThreads() {
 374         return maxWorkerThreads;
 375     }
 376 
 377     public long idleTimeoutForThreads() {
 378         return inactivityTimeout;
 379     }
 380 
 381     public int currentNumberOfThreads() {
 382         synchronized (workQueue) {
 383             return currentThreadCount;
 384         }
 385     }
 386 
 387     void decrementCurrentNumberOfThreads() {
 388         synchronized (workQueue) {
 389             currentThreadCount--;
 390         }
 391     }
 392 
 393     void incrementCurrentNumberOfThreads() {
 394         synchronized (workQueue) {
 395             currentThreadCount++;
 396         }
 397     }
 398 
 399     public int numberOfAvailableThreads() {
 400         synchronized (workQueue) {
 401             return availableWorkerThreads;
 402         }
 403     }
 404 
 405     public int numberOfBusyThreads() {
 406         synchronized (workQueue) {
 407             return (currentThreadCount - availableWorkerThreads);
 408         }
 409     }
 410 
 411     public long averageWorkCompletionTime() {
 412         synchronized (workQueue) {
 413             return (totalTimeTaken.get() / processedCount.get());
 414         }
 415     }
 416 
 417     public long currentProcessedCount() {
 418         synchronized (workQueue) {
 419             return processedCount.get();
 420         }
 421     }
 422 
 423     public String getName() {
 424         return name;
 425     }
 426 
 427     /**
 428     * This method will return the number of WorkQueues serviced by the threadpool.
 429     */
 430     public int numberOfWorkQueues() {
 431         return 1;
 432     }
 433 
 434 
 435     private static synchronized int getUniqueThreadId() {
 436         return ThreadPoolImpl.threadCounter.incrementAndGet();
 437     }
 438 
 439     /**
 440      * This method will decrement the number of available threads
 441      * in the threadpool which are waiting for work. Called from
 442      * WorkQueueImpl.requestWork()
 443      */
 444     void decrementNumberOfAvailableThreads() {
 445         synchronized (workQueue) {
 446             availableWorkerThreads--;
 447         }
 448     }
 449 
 450     /**
 451      * This method will increment the number of available threads
 452      * in the threadpool which are waiting for work. Called from
 453      * WorkQueueImpl.requestWork()
 454      */
 455     void incrementNumberOfAvailableThreads() {
 456         synchronized (workQueue) {
 457             availableWorkerThreads++;
 458         }
 459     }
 460 
 461 
 462     private class WorkerThread extends Thread implements Closeable
 463     {
 464         private Work currentWork;
 465         private int threadId = 0; // unique id for the thread
 466         private volatile boolean closeCalled = false;
 467         private String threadPoolName;
 468         // name seen by Thread.getName()
 469         private StringBuffer workerThreadName = new StringBuffer();
 470 
 471         WorkerThread(ThreadGroup tg, String threadPoolName) {
 472             super(tg, null, "Idle", 0, false);
 473             this.threadId = ThreadPoolImpl.getUniqueThreadId();
 474             this.threadPoolName = threadPoolName;
 475             setName(composeWorkerThreadName(threadPoolName, "Idle"));
 476         }
 477 
 478         public synchronized void close() {
 479             closeCalled = true;
 480             interrupt();
 481         }
 482 
 483         private void resetClassLoader() {
 484 
 485         }
 486 
 487         private void performWork() {
 488             long start = System.currentTimeMillis();
 489             try {
 490                 currentWork.doWork();
 491             } catch (Throwable t) {
 492                 wrapper.workerThreadDoWorkThrowable(this, t);
 493             }
 494             long elapsedTime = System.currentTimeMillis() - start;
 495             totalTimeTaken.addAndGet(elapsedTime);
 496             processedCount.incrementAndGet();
 497         }
 498 
 499         public void run() {
 500             try  {
 501                 while (!closeCalled) {
 502                     try {
 503                         currentWork = ((WorkQueueImpl)workQueue).requestWork(
 504                             inactivityTimeout);
 505                         if (currentWork == null)
 506                             continue;
 507                     } catch (InterruptedException exc) {
 508                         wrapper.workQueueThreadInterrupted( exc, getName(),
 509                            Boolean.valueOf(closeCalled));
 510 
 511                         continue ;
 512                     } catch (Throwable t) {
 513                          wrapper.workerThreadThrowableFromRequestWork(this, t,
 514                                 workQueue.getName());
 515 
 516                         continue;
 517                     }
 518 
 519                     performWork();
 520 
 521                     // set currentWork to null so that the work item can be
 522                     // garbage collected without waiting for the next work item.
 523                     currentWork = null;
 524 
 525                     resetClassLoader();
 526                 }
 527             } catch (Throwable e) {
 528                 // This should not be possible
 529                 wrapper.workerThreadCaughtUnexpectedThrowable(this,e);
 530             } finally {
 531                 synchronized (workersLock) {
 532                     workers.remove(this);
 533                 }
 534             }
 535         }
 536 
 537         private String composeWorkerThreadName(String poolName, String workerName) {
 538             workerThreadName.setLength(0);
 539             workerThreadName.append("p: ").append(poolName);
 540             workerThreadName.append("; w: ").append(workerName);
 541             return workerThreadName.toString();
 542         }
 543     } // End of WorkerThread class
 544 
 545 }
 546 
 547 // End of file.