1 /* 2 * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.corba.se.impl.orbutil.threadpool; 27 28 import java.io.IOException; 29 import java.io.Closeable; 30 31 import java.security.AccessController; 32 import java.security.PrivilegedAction; 33 34 import java.util.List; 35 import java.util.ArrayList; 36 37 import java.util.concurrent.atomic.AtomicInteger; 38 import java.util.concurrent.atomic.AtomicLong; 39 40 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 41 import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool; 42 import com.sun.corba.se.spi.orbutil.threadpool.Work; 43 import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue; 44 45 import com.sun.corba.se.impl.orbutil.ORBConstants; 46 import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl; 47 48 import com.sun.corba.se.spi.monitoring.MonitoringConstants; 49 import com.sun.corba.se.spi.monitoring.MonitoredObject; 50 import com.sun.corba.se.spi.monitoring.MonitoringFactories; 51 import com.sun.corba.se.spi.orb.ORB; 52 import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase; 53 54 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 55 import com.sun.corba.se.impl.orbutil.ORBConstants; 56 import com.sun.corba.se.spi.logging.CORBALogDomains; 57 58 public class ThreadPoolImpl implements ThreadPool 59 { 60 // serial counter useful for debugging 61 private static AtomicInteger threadCounter = new AtomicInteger(0); 62 private static final ORBUtilSystemException wrapper = 63 ORBUtilSystemException.get(CORBALogDomains.RPC_TRANSPORT); 64 65 66 // Any time currentThreadCount and/or availableWorkerThreads is updated 67 // or accessed this ThreadPool's WorkQueue must be locked. And, it is 68 // expected that this ThreadPool's WorkQueue is the only object that 69 // updates and accesses these values directly and indirectly though a 70 // call to a method in this ThreadPool. If any call to update or access 71 // those values must synchronized on this ThreadPool's WorkQueue. 72 private WorkQueue workQueue; 73 74 // Stores the number of available worker threads 75 private int availableWorkerThreads = 0; 76 77 // Stores the number of threads in the threadpool currently 78 private int currentThreadCount = 0; 79 80 // Minimum number of worker threads created at instantiation of the threadpool 81 private int minWorkerThreads = 0; 82 83 // Maximum number of worker threads in the threadpool 84 private int maxWorkerThreads = 0; 85 86 // Inactivity timeout value for worker threads to exit and stop running 87 private long inactivityTimeout; 88 89 // Indicates if the threadpool is bounded or unbounded 90 private boolean boundedThreadPool = false; 91 92 // Running count of the work items processed 93 // Set the value to 1 so that divide by zero is avoided in 94 // averageWorkCompletionTime() 95 private AtomicLong processedCount = new AtomicLong(1); 96 97 // Running aggregate of the time taken in millis to execute work items 98 // processed by the threads in the threadpool 99 private AtomicLong totalTimeTaken = new AtomicLong(0); 100 101 // Name of the ThreadPool 102 private String name; 103 104 // MonitoredObject for ThreadPool 105 private MonitoredObject threadpoolMonitoredObject; 106 107 // ThreadGroup in which threads should be created 108 private ThreadGroup threadGroup; 109 110 Object workersLock = new Object(); 111 List<WorkerThread> workers = new ArrayList<>(); 112 113 /** 114 * This constructor is used to create an unbounded threadpool 115 */ 116 public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) { 117 inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT; 118 maxWorkerThreads = Integer.MAX_VALUE; 119 workQueue = new WorkQueueImpl(this); 120 threadGroup = tg; 121 name = threadpoolName; 122 initializeMonitoring(); 123 } 124 125 /** 126 * This constructor is used to create an unbounded threadpool 127 * in the ThreadGroup of the current thread 128 */ 129 public ThreadPoolImpl(String threadpoolName) { 130 this( Thread.currentThread().getThreadGroup(), threadpoolName ) ; 131 } 132 133 /** 134 * This constructor is used to create bounded threadpool 135 */ 136 public ThreadPoolImpl(int minSize, int maxSize, long timeout, 137 String threadpoolName) 138 { 139 minWorkerThreads = minSize; 140 maxWorkerThreads = maxSize; 141 inactivityTimeout = timeout; 142 boundedThreadPool = true; 143 workQueue = new WorkQueueImpl(this); 144 name = threadpoolName; 145 for (int i = 0; i < minWorkerThreads; i++) { 146 createWorkerThread(); 147 } 148 initializeMonitoring(); 149 } 150 151 // Note that this method should not return until AFTER all threads have died. 152 public void close() throws IOException { 153 154 // Copy to avoid concurrent modification problems. 155 List<WorkerThread> copy = null; 156 synchronized (workersLock) { 157 copy = new ArrayList<>(workers); 158 } 159 160 for (WorkerThread wt : copy) { 161 wt.close(); 162 while (wt.getState() != Thread.State.TERMINATED) { 163 try { 164 wt.join(); 165 } catch (InterruptedException exc) { 166 wrapper.interruptedJoinCallWhileClosingThreadPool(exc, wt, this); 167 } 168 } 169 } 170 171 threadGroup = null; 172 } 173 174 175 // Setup monitoring for this threadpool 176 private void initializeMonitoring() { 177 // Get root monitored object 178 MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory(). 179 createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null). 180 getRootMonitoredObject(); 181 182 // Create the threadpool monitoring root 183 MonitoredObject threadPoolMonitoringObjectRoot = root.getChild( 184 MonitoringConstants.THREADPOOL_MONITORING_ROOT); 185 if (threadPoolMonitoringObjectRoot == null) { 186 threadPoolMonitoringObjectRoot = MonitoringFactories. 187 getMonitoredObjectFactory().createMonitoredObject( 188 MonitoringConstants.THREADPOOL_MONITORING_ROOT, 189 MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION); 190 root.addChild(threadPoolMonitoringObjectRoot); 191 } 192 threadpoolMonitoredObject = MonitoringFactories. 193 getMonitoredObjectFactory(). 194 createMonitoredObject(name, 195 MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION); 196 197 threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject); 198 199 LongMonitoredAttributeBase b1 = new 200 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS, 201 MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) { 202 public Object getValue() { 203 return new Long(ThreadPoolImpl.this.currentNumberOfThreads()); 204 } 205 }; 206 threadpoolMonitoredObject.addAttribute(b1); 207 LongMonitoredAttributeBase b2 = new 208 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS, 209 MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) { 210 public Object getValue() { 211 return new Long(ThreadPoolImpl.this.numberOfAvailableThreads()); 212 } 213 }; 214 threadpoolMonitoredObject.addAttribute(b2); 215 LongMonitoredAttributeBase b3 = new 216 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS, 217 MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) { 218 public Object getValue() { 219 return new Long(ThreadPoolImpl.this.numberOfBusyThreads()); 220 } 221 }; 222 threadpoolMonitoredObject.addAttribute(b3); 223 LongMonitoredAttributeBase b4 = new 224 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME, 225 MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) { 226 public Object getValue() { 227 return new Long(ThreadPoolImpl.this.averageWorkCompletionTime()); 228 } 229 }; 230 threadpoolMonitoredObject.addAttribute(b4); 231 LongMonitoredAttributeBase b5 = new 232 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT, 233 MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) { 234 public Object getValue() { 235 return new Long(ThreadPoolImpl.this.currentProcessedCount()); 236 } 237 }; 238 threadpoolMonitoredObject.addAttribute(b5); 239 240 // Add the monitored object for the WorkQueue 241 242 threadpoolMonitoredObject.addChild( 243 ((WorkQueueImpl)workQueue).getMonitoredObject()); 244 } 245 246 // Package private method to get the monitored object for this 247 // class 248 MonitoredObject getMonitoredObject() { 249 return threadpoolMonitoredObject; 250 } 251 252 public WorkQueue getAnyWorkQueue() 253 { 254 return workQueue; 255 } 256 257 public WorkQueue getWorkQueue(int queueId) 258 throws NoSuchWorkQueueException 259 { 260 if (queueId != 0) 261 throw new NoSuchWorkQueueException(); 262 return workQueue; 263 } 264 265 /** 266 * To be called from the workqueue when work is added to the 267 * workQueue. This method would create new threads if required 268 * or notify waiting threads on the queue for available work 269 */ 270 void notifyForAvailableWork(WorkQueue aWorkQueue) { 271 synchronized (aWorkQueue) { 272 if (availableWorkerThreads < aWorkQueue.workItemsInQueue()) { 273 createWorkerThread(); 274 } else { 275 aWorkQueue.notify(); 276 } 277 } 278 } 279 280 281 private Thread createWorkerThreadHelper( String name ) { 282 // Thread creation needs to be in a doPrivileged block 283 // if there is a non-null security manager for two reasons: 284 // 1. The creation of a thread in a specific ThreadGroup 285 // is a privileged operation. Lack of a doPrivileged 286 // block here causes an AccessControlException 287 // (see bug 6268145). 288 // 2. We want to make sure that the permissions associated 289 // with this thread do NOT include the permissions of 290 // the current thread that is calling this method. 291 // This leads to problems in the app server where 292 // some threads in the ThreadPool randomly get 293 // bad permissions, leading to unpredictable 294 // permission errors (see bug 6021011). 295 // 296 // A Java thread contains a stack of call frames, 297 // one for each method called that has not yet returned. 298 // Each method comes from a particular class. The class 299 // was loaded by a ClassLoader which has an associated 300 // CodeSource, and this determines the Permissions 301 // for all methods in that class. The current 302 // Permissions for the thread are the intersection of 303 // all Permissions for the methods on the stack. 304 // This is part of the Security Context of the thread. 305 // 306 // When a thread creates a new thread, the new thread 307 // inherits the security context of the old thread. 308 // This is bad in a ThreadPool, because different 309 // creators of threads may have different security contexts. 310 // This leads to occasional unpredictable errors when 311 // a thread is re-used in a different security context. 312 // 313 // Avoiding this problem is simple: just do the thread 314 // creation in a doPrivileged block. This sets the 315 // inherited security context to that of the code source 316 // for the ORB code itself, which contains all permissions 317 // in either Java SE or Java EE. 318 WorkerThread thread = new WorkerThread(threadGroup, name); 319 synchronized (workersLock) { 320 workers.add(thread); 321 } 322 323 // The thread must be set to a daemon thread so the 324 // VM can exit if the only threads left are PooledThreads 325 // or other daemons. We don't want to rely on the 326 // calling thread always being a daemon. 327 // Note that no exception is possible here since we 328 // are inside the doPrivileged block. 329 thread.setDaemon(true); 330 331 wrapper.workerThreadCreated(thread, thread.getContextClassLoader()); 332 333 thread.start(); 334 return null; 335 } 336 337 338 /** 339 * To be called from the workqueue to create worker threads when none 340 * available. 341 */ 342 void createWorkerThread() { 343 final String name = getName(); 344 synchronized (workQueue) { 345 try { 346 if (System.getSecurityManager() == null) { 347 createWorkerThreadHelper(name); 348 } else { 349 // If we get here, we need to create a thread. 350 AccessController.doPrivileged( 351 new PrivilegedAction() { 352 public Object run() { 353 return createWorkerThreadHelper(name); 354 } 355 } 356 ); 357 } 358 } catch (Throwable t) { 359 // Decrementing the count of current worker threads. 360 // But, it will be increased in the finally block. 361 decrementCurrentNumberOfThreads(); 362 wrapper.workerThreadCreationFailure(t); 363 } finally { 364 incrementCurrentNumberOfThreads(); 365 } 366 } 367 } 368 369 public int minimumNumberOfThreads() { 370 return minWorkerThreads; 371 } 372 373 public int maximumNumberOfThreads() { 374 return maxWorkerThreads; 375 } 376 377 public long idleTimeoutForThreads() { 378 return inactivityTimeout; 379 } 380 381 public int currentNumberOfThreads() { 382 synchronized (workQueue) { 383 return currentThreadCount; 384 } 385 } 386 387 void decrementCurrentNumberOfThreads() { 388 synchronized (workQueue) { 389 currentThreadCount--; 390 } 391 } 392 393 void incrementCurrentNumberOfThreads() { 394 synchronized (workQueue) { 395 currentThreadCount++; 396 } 397 } 398 399 public int numberOfAvailableThreads() { 400 synchronized (workQueue) { 401 return availableWorkerThreads; 402 } 403 } 404 405 public int numberOfBusyThreads() { 406 synchronized (workQueue) { 407 return (currentThreadCount - availableWorkerThreads); 408 } 409 } 410 411 public long averageWorkCompletionTime() { 412 synchronized (workQueue) { 413 return (totalTimeTaken.get() / processedCount.get()); 414 } 415 } 416 417 public long currentProcessedCount() { 418 synchronized (workQueue) { 419 return processedCount.get(); 420 } 421 } 422 423 public String getName() { 424 return name; 425 } 426 427 /** 428 * This method will return the number of WorkQueues serviced by the threadpool. 429 */ 430 public int numberOfWorkQueues() { 431 return 1; 432 } 433 434 435 private static synchronized int getUniqueThreadId() { 436 return ThreadPoolImpl.threadCounter.incrementAndGet(); 437 } 438 439 /** 440 * This method will decrement the number of available threads 441 * in the threadpool which are waiting for work. Called from 442 * WorkQueueImpl.requestWork() 443 */ 444 void decrementNumberOfAvailableThreads() { 445 synchronized (workQueue) { 446 availableWorkerThreads--; 447 } 448 } 449 450 /** 451 * This method will increment the number of available threads 452 * in the threadpool which are waiting for work. Called from 453 * WorkQueueImpl.requestWork() 454 */ 455 void incrementNumberOfAvailableThreads() { 456 synchronized (workQueue) { 457 availableWorkerThreads++; 458 } 459 } 460 461 462 private class WorkerThread extends Thread implements Closeable 463 { 464 private Work currentWork; 465 private int threadId = 0; // unique id for the thread 466 private volatile boolean closeCalled = false; 467 private String threadPoolName; 468 // name seen by Thread.getName() 469 private StringBuffer workerThreadName = new StringBuffer(); 470 471 WorkerThread(ThreadGroup tg, String threadPoolName) { 472 super(tg, null, "Idle", 0, false); 473 this.threadId = ThreadPoolImpl.getUniqueThreadId(); 474 this.threadPoolName = threadPoolName; 475 setName(composeWorkerThreadName(threadPoolName, "Idle")); 476 } 477 478 public synchronized void close() { 479 closeCalled = true; 480 interrupt(); 481 } 482 483 private void resetClassLoader() { 484 485 } 486 487 private void performWork() { 488 long start = System.currentTimeMillis(); 489 try { 490 currentWork.doWork(); 491 } catch (Throwable t) { 492 wrapper.workerThreadDoWorkThrowable(this, t); 493 } 494 long elapsedTime = System.currentTimeMillis() - start; 495 totalTimeTaken.addAndGet(elapsedTime); 496 processedCount.incrementAndGet(); 497 } 498 499 public void run() { 500 try { 501 while (!closeCalled) { 502 try { 503 currentWork = ((WorkQueueImpl)workQueue).requestWork( 504 inactivityTimeout); 505 if (currentWork == null) 506 continue; 507 } catch (InterruptedException exc) { 508 wrapper.workQueueThreadInterrupted( exc, getName(), 509 Boolean.valueOf(closeCalled)); 510 511 continue ; 512 } catch (Throwable t) { 513 wrapper.workerThreadThrowableFromRequestWork(this, t, 514 workQueue.getName()); 515 516 continue; 517 } 518 519 performWork(); 520 521 // set currentWork to null so that the work item can be 522 // garbage collected without waiting for the next work item. 523 currentWork = null; 524 525 resetClassLoader(); 526 } 527 } catch (Throwable e) { 528 // This should not be possible 529 wrapper.workerThreadCaughtUnexpectedThrowable(this,e); 530 } finally { 531 synchronized (workersLock) { 532 workers.remove(this); 533 } 534 } 535 } 536 537 private String composeWorkerThreadName(String poolName, String workerName) { 538 workerThreadName.setLength(0); 539 workerThreadName.append("p: ").append(poolName); 540 workerThreadName.append("; w: ").append(workerName); 541 return workerThreadName.toString(); 542 } 543 } // End of WorkerThread class 544 545 } 546 547 // End of file.