1 /* 2 * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package com.sun.corba.se.impl.orbutil.threadpool; 27 28 import java.io.IOException; 29 import java.io.Closeable; 30 31 import java.security.AccessController; 32 import java.security.PrivilegedAction; 33 34 import java.util.List; 35 import java.util.ArrayList; 36 37 import java.util.concurrent.atomic.AtomicInteger; 38 import java.util.concurrent.atomic.AtomicLong; 39 40 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 41 import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool; 42 import com.sun.corba.se.spi.orbutil.threadpool.Work; 43 import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue; 44 45 import com.sun.corba.se.impl.orbutil.ORBConstants; 46 import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl; 47 48 import com.sun.corba.se.spi.monitoring.MonitoringConstants; 49 import com.sun.corba.se.spi.monitoring.MonitoredObject; 50 import com.sun.corba.se.spi.monitoring.MonitoringFactories; 51 import com.sun.corba.se.spi.orb.ORB; 52 import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase; 53 54 import com.sun.corba.se.impl.logging.ORBUtilSystemException; 55 import com.sun.corba.se.impl.orbutil.ORBConstants; 56 import com.sun.corba.se.spi.logging.CORBALogDomains; 57 import com.sun.corba.se.impl.transport.ManagedLocalsThread; 58 59 public class ThreadPoolImpl implements ThreadPool 60 { 61 // serial counter useful for debugging 62 private static AtomicInteger threadCounter = new AtomicInteger(0); 63 private static final ORBUtilSystemException wrapper = 64 ORBUtilSystemException.get(CORBALogDomains.RPC_TRANSPORT); 65 66 67 // Any time currentThreadCount and/or availableWorkerThreads is updated 68 // or accessed this ThreadPool's WorkQueue must be locked. And, it is 69 // expected that this ThreadPool's WorkQueue is the only object that 70 // updates and accesses these values directly and indirectly though a 71 // call to a method in this ThreadPool. If any call to update or access 72 // those values must synchronized on this ThreadPool's WorkQueue. 73 private WorkQueue workQueue; 74 75 // Stores the number of available worker threads 76 private int availableWorkerThreads = 0; 77 78 // Stores the number of threads in the threadpool currently 79 private int currentThreadCount = 0; 80 81 // Minimum number of worker threads created at instantiation of the threadpool 82 private int minWorkerThreads = 0; 83 84 // Maximum number of worker threads in the threadpool 85 private int maxWorkerThreads = 0; 86 87 // Inactivity timeout value for worker threads to exit and stop running 88 private long inactivityTimeout; 89 90 // Indicates if the threadpool is bounded or unbounded 91 private boolean boundedThreadPool = false; 92 93 // Running count of the work items processed 94 // Set the value to 1 so that divide by zero is avoided in 95 // averageWorkCompletionTime() 96 private AtomicLong processedCount = new AtomicLong(1); 97 98 // Running aggregate of the time taken in millis to execute work items 99 // processed by the threads in the threadpool 100 private AtomicLong totalTimeTaken = new AtomicLong(0); 101 102 // Name of the ThreadPool 103 private String name; 104 105 // MonitoredObject for ThreadPool 106 private MonitoredObject threadpoolMonitoredObject; 107 108 // ThreadGroup in which threads should be created 109 private ThreadGroup threadGroup; 110 111 Object workersLock = new Object(); 112 List<WorkerThread> workers = new ArrayList<>(); 113 114 /** 115 * This constructor is used to create an unbounded threadpool 116 */ 117 public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) { 118 inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT; 119 maxWorkerThreads = Integer.MAX_VALUE; 120 workQueue = new WorkQueueImpl(this); 121 threadGroup = tg; 122 name = threadpoolName; 123 initializeMonitoring(); 124 } 125 126 /** 127 * This constructor is used to create an unbounded threadpool 128 * in the ThreadGroup of the current thread 129 */ 130 public ThreadPoolImpl(String threadpoolName) { 131 this( Thread.currentThread().getThreadGroup(), threadpoolName ) ; 132 } 133 134 /** 135 * This constructor is used to create bounded threadpool 136 */ 137 public ThreadPoolImpl(int minSize, int maxSize, long timeout, 138 String threadpoolName) 139 { 140 minWorkerThreads = minSize; 141 maxWorkerThreads = maxSize; 142 inactivityTimeout = timeout; 143 boundedThreadPool = true; 144 workQueue = new WorkQueueImpl(this); 145 name = threadpoolName; 146 for (int i = 0; i < minWorkerThreads; i++) { 147 createWorkerThread(); 148 } 149 initializeMonitoring(); 150 } 151 152 // Note that this method should not return until AFTER all threads have died. 153 public void close() throws IOException { 154 155 // Copy to avoid concurrent modification problems. 156 List<WorkerThread> copy = null; 157 synchronized (workersLock) { 158 copy = new ArrayList<>(workers); 159 } 160 161 for (WorkerThread wt : copy) { 162 wt.close(); 163 while (wt.getState() != Thread.State.TERMINATED) { 164 try { 165 wt.join(); 166 } catch (InterruptedException exc) { 167 wrapper.interruptedJoinCallWhileClosingThreadPool(exc, wt, this); 168 } 169 } 170 } 171 172 threadGroup = null; 173 } 174 175 176 // Setup monitoring for this threadpool 177 private void initializeMonitoring() { 178 // Get root monitored object 179 MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory(). 180 createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null). 181 getRootMonitoredObject(); 182 183 // Create the threadpool monitoring root 184 MonitoredObject threadPoolMonitoringObjectRoot = root.getChild( 185 MonitoringConstants.THREADPOOL_MONITORING_ROOT); 186 if (threadPoolMonitoringObjectRoot == null) { 187 threadPoolMonitoringObjectRoot = MonitoringFactories. 188 getMonitoredObjectFactory().createMonitoredObject( 189 MonitoringConstants.THREADPOOL_MONITORING_ROOT, 190 MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION); 191 root.addChild(threadPoolMonitoringObjectRoot); 192 } 193 threadpoolMonitoredObject = MonitoringFactories. 194 getMonitoredObjectFactory(). 195 createMonitoredObject(name, 196 MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION); 197 198 threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject); 199 200 LongMonitoredAttributeBase b1 = new 201 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS, 202 MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) { 203 public Object getValue() { 204 return new Long(ThreadPoolImpl.this.currentNumberOfThreads()); 205 } 206 }; 207 threadpoolMonitoredObject.addAttribute(b1); 208 LongMonitoredAttributeBase b2 = new 209 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS, 210 MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) { 211 public Object getValue() { 212 return new Long(ThreadPoolImpl.this.numberOfAvailableThreads()); 213 } 214 }; 215 threadpoolMonitoredObject.addAttribute(b2); 216 LongMonitoredAttributeBase b3 = new 217 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS, 218 MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) { 219 public Object getValue() { 220 return new Long(ThreadPoolImpl.this.numberOfBusyThreads()); 221 } 222 }; 223 threadpoolMonitoredObject.addAttribute(b3); 224 LongMonitoredAttributeBase b4 = new 225 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME, 226 MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) { 227 public Object getValue() { 228 return new Long(ThreadPoolImpl.this.averageWorkCompletionTime()); 229 } 230 }; 231 threadpoolMonitoredObject.addAttribute(b4); 232 LongMonitoredAttributeBase b5 = new 233 LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT, 234 MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) { 235 public Object getValue() { 236 return new Long(ThreadPoolImpl.this.currentProcessedCount()); 237 } 238 }; 239 threadpoolMonitoredObject.addAttribute(b5); 240 241 // Add the monitored object for the WorkQueue 242 243 threadpoolMonitoredObject.addChild( 244 ((WorkQueueImpl)workQueue).getMonitoredObject()); 245 } 246 247 // Package private method to get the monitored object for this 248 // class 249 MonitoredObject getMonitoredObject() { 250 return threadpoolMonitoredObject; 251 } 252 253 public WorkQueue getAnyWorkQueue() 254 { 255 return workQueue; 256 } 257 258 public WorkQueue getWorkQueue(int queueId) 259 throws NoSuchWorkQueueException 260 { 261 if (queueId != 0) 262 throw new NoSuchWorkQueueException(); 263 return workQueue; 264 } 265 266 /** 267 * To be called from the workqueue when work is added to the 268 * workQueue. This method would create new threads if required 269 * or notify waiting threads on the queue for available work 270 */ 271 void notifyForAvailableWork(WorkQueue aWorkQueue) { 272 synchronized (aWorkQueue) { 273 if (availableWorkerThreads < aWorkQueue.workItemsInQueue()) { 274 createWorkerThread(); 275 } else { 276 aWorkQueue.notify(); 277 } 278 } 279 } 280 281 282 private Thread createWorkerThreadHelper( String name ) { 283 // Thread creation needs to be in a doPrivileged block 284 // if there is a non-null security manager for two reasons: 285 // 1. The creation of a thread in a specific ThreadGroup 286 // is a privileged operation. Lack of a doPrivileged 287 // block here causes an AccessControlException 288 // (see bug 6268145). 289 // 2. We want to make sure that the permissions associated 290 // with this thread do NOT include the permissions of 291 // the current thread that is calling this method. 292 // This leads to problems in the app server where 293 // some threads in the ThreadPool randomly get 294 // bad permissions, leading to unpredictable 295 // permission errors (see bug 6021011). 296 // 297 // A Java thread contains a stack of call frames, 298 // one for each method called that has not yet returned. 299 // Each method comes from a particular class. The class 300 // was loaded by a ClassLoader which has an associated 301 // CodeSource, and this determines the Permissions 302 // for all methods in that class. The current 303 // Permissions for the thread are the intersection of 304 // all Permissions for the methods on the stack. 305 // This is part of the Security Context of the thread. 306 // 307 // When a thread creates a new thread, the new thread 308 // inherits the security context of the old thread. 309 // This is bad in a ThreadPool, because different 310 // creators of threads may have different security contexts. 311 // This leads to occasional unpredictable errors when 312 // a thread is re-used in a different security context. 313 // 314 // Avoiding this problem is simple: just do the thread 315 // creation in a doPrivileged block. This sets the 316 // inherited security context to that of the code source 317 // for the ORB code itself, which contains all permissions 318 // in either Java SE or Java EE. 319 WorkerThread thread = new WorkerThread(threadGroup, name); 320 synchronized (workersLock) { 321 workers.add(thread); 322 } 323 324 // The thread must be set to a daemon thread so the 325 // VM can exit if the only threads left are PooledThreads 326 // or other daemons. We don't want to rely on the 327 // calling thread always being a daemon. 328 // Note that no exception is possible here since we 329 // are inside the doPrivileged block. 330 thread.setDaemon(true); 331 332 wrapper.workerThreadCreated(thread, thread.getContextClassLoader()); 333 334 thread.start(); 335 return null; 336 } 337 338 339 /** 340 * To be called from the workqueue to create worker threads when none 341 * available. 342 */ 343 void createWorkerThread() { 344 final String name = getName(); 345 synchronized (workQueue) { 346 try { 347 if (System.getSecurityManager() == null) { 348 createWorkerThreadHelper(name); 349 } else { 350 // If we get here, we need to create a thread. 351 AccessController.doPrivileged( 352 new PrivilegedAction() { 353 public Object run() { 354 return createWorkerThreadHelper(name); 355 } 356 } 357 ); 358 } 359 } catch (Throwable t) { 360 // Decrementing the count of current worker threads. 361 // But, it will be increased in the finally block. 362 decrementCurrentNumberOfThreads(); 363 wrapper.workerThreadCreationFailure(t); 364 } finally { 365 incrementCurrentNumberOfThreads(); 366 } 367 } 368 } 369 370 public int minimumNumberOfThreads() { 371 return minWorkerThreads; 372 } 373 374 public int maximumNumberOfThreads() { 375 return maxWorkerThreads; 376 } 377 378 public long idleTimeoutForThreads() { 379 return inactivityTimeout; 380 } 381 382 public int currentNumberOfThreads() { 383 synchronized (workQueue) { 384 return currentThreadCount; 385 } 386 } 387 388 void decrementCurrentNumberOfThreads() { 389 synchronized (workQueue) { 390 currentThreadCount--; 391 } 392 } 393 394 void incrementCurrentNumberOfThreads() { 395 synchronized (workQueue) { 396 currentThreadCount++; 397 } 398 } 399 400 public int numberOfAvailableThreads() { 401 synchronized (workQueue) { 402 return availableWorkerThreads; 403 } 404 } 405 406 public int numberOfBusyThreads() { 407 synchronized (workQueue) { 408 return (currentThreadCount - availableWorkerThreads); 409 } 410 } 411 412 public long averageWorkCompletionTime() { 413 synchronized (workQueue) { 414 return (totalTimeTaken.get() / processedCount.get()); 415 } 416 } 417 418 public long currentProcessedCount() { 419 synchronized (workQueue) { 420 return processedCount.get(); 421 } 422 } 423 424 public String getName() { 425 return name; 426 } 427 428 /** 429 * This method will return the number of WorkQueues serviced by the threadpool. 430 */ 431 public int numberOfWorkQueues() { 432 return 1; 433 } 434 435 436 private static synchronized int getUniqueThreadId() { 437 return ThreadPoolImpl.threadCounter.incrementAndGet(); 438 } 439 440 /** 441 * This method will decrement the number of available threads 442 * in the threadpool which are waiting for work. Called from 443 * WorkQueueImpl.requestWork() 444 */ 445 void decrementNumberOfAvailableThreads() { 446 synchronized (workQueue) { 447 availableWorkerThreads--; 448 } 449 } 450 451 /** 452 * This method will increment the number of available threads 453 * in the threadpool which are waiting for work. Called from 454 * WorkQueueImpl.requestWork() 455 */ 456 void incrementNumberOfAvailableThreads() { 457 synchronized (workQueue) { 458 availableWorkerThreads++; 459 } 460 } 461 462 463 private class WorkerThread extends ManagedLocalsThread implements Closeable 464 { 465 private Work currentWork; 466 private int threadId = 0; // unique id for the thread 467 private volatile boolean closeCalled = false; 468 private String threadPoolName; 469 // name seen by Thread.getName() 470 private StringBuffer workerThreadName = new StringBuffer(); 471 472 WorkerThread(ThreadGroup tg, String threadPoolName) { 473 super(tg, "Idle"); 474 this.threadId = ThreadPoolImpl.getUniqueThreadId(); 475 this.threadPoolName = threadPoolName; 476 setName(composeWorkerThreadName(threadPoolName, "Idle")); 477 } 478 479 public synchronized void close() { 480 closeCalled = true; 481 interrupt(); 482 } 483 484 private void resetClassLoader() { 485 486 } 487 488 private void performWork() { 489 long start = System.currentTimeMillis(); 490 try { 491 currentWork.doWork(); 492 } catch (Throwable t) { 493 wrapper.workerThreadDoWorkThrowable(this, t); 494 } 495 long elapsedTime = System.currentTimeMillis() - start; 496 totalTimeTaken.addAndGet(elapsedTime); 497 processedCount.incrementAndGet(); 498 } 499 500 public void run() { 501 try { 502 while (!closeCalled) { 503 try { 504 currentWork = ((WorkQueueImpl)workQueue).requestWork( 505 inactivityTimeout); 506 if (currentWork == null) 507 continue; 508 } catch (InterruptedException exc) { 509 wrapper.workQueueThreadInterrupted( exc, getName(), 510 Boolean.valueOf(closeCalled)); 511 512 continue ; 513 } catch (Throwable t) { 514 wrapper.workerThreadThrowableFromRequestWork(this, t, 515 workQueue.getName()); 516 517 continue; 518 } 519 520 performWork(); 521 522 // set currentWork to null so that the work item can be 523 // garbage collected without waiting for the next work item. 524 currentWork = null; 525 526 resetClassLoader(); 527 } 528 } catch (Throwable e) { 529 // This should not be possible 530 wrapper.workerThreadCaughtUnexpectedThrowable(this,e); 531 } finally { 532 synchronized (workersLock) { 533 workers.remove(this); 534 } 535 } 536 } 537 538 private String composeWorkerThreadName(String poolName, String workerName) { 539 workerThreadName.setLength(0); 540 workerThreadName.append("p: ").append(poolName); 541 workerThreadName.append("; w: ").append(workerName); 542 return workerThreadName.toString(); 543 } 544 } // End of WorkerThread class 545 546 } 547 548 // End of file.