--- old/src/share/classes/com/sun/corba/se/impl/orbutil/threadpool/ThreadPoolImpl.java Mon Jun 25 21:04:18 2012 +++ new/src/share/classes/com/sun/corba/se/impl/orbutil/threadpool/ThreadPoolImpl.java Mon Jun 25 21:04:18 2012 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2003, 2004, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2003, 2012, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,6 +25,18 @@ package com.sun.corba.se.impl.orbutil.threadpool; +import java.io.IOException; +import java.io.Closeable; + +import java.security.AccessController; +import java.security.PrivilegedAction; + +import java.util.List; +import java.util.ArrayList; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool; import com.sun.corba.se.spi.orbutil.threadpool.Work; @@ -36,12 +48,27 @@ import com.sun.corba.se.spi.monitoring.MonitoringConstants; import com.sun.corba.se.spi.monitoring.MonitoredObject; import com.sun.corba.se.spi.monitoring.MonitoringFactories; +import com.sun.corba.se.spi.orb.ORB; import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase; +import com.sun.corba.se.impl.logging.ORBUtilSystemException; +import com.sun.corba.se.impl.orbutil.ORBConstants; +import com.sun.corba.se.spi.logging.CORBALogDomains; + public class ThreadPoolImpl implements ThreadPool { - private static int threadCounter = 0; // serial counter useful for debugging + // serial counter useful for debugging + private static AtomicInteger threadCounter = new AtomicInteger(0); + private static final ORBUtilSystemException wrapper = + ORBUtilSystemException.get(CORBALogDomains.RPC_TRANSPORT); + + // Any time currentThreadCount and/or availableWorkerThreads is updated + // or accessed this ThreadPool's WorkQueue must be locked. And, it is + // expected that this ThreadPool's WorkQueue is the only object that + // updates and accesses these values directly and indirectly though a + // call to a method in this ThreadPool. If any call to update or access + // those values must synchronized on this ThreadPool's WorkQueue. private WorkQueue workQueue; // Stores the number of available worker threads @@ -65,15 +92,12 @@ // Running count of the work items processed // Set the value to 1 so that divide by zero is avoided in // averageWorkCompletionTime() - private long processedCount = 1; + private AtomicLong processedCount = new AtomicLong(1); // Running aggregate of the time taken in millis to execute work items // processed by the threads in the threadpool - private long totalTimeTaken = 0; + private AtomicLong totalTimeTaken = new AtomicLong(0); - // Lock for protecting state when required - private Object lock = new Object(); - // Name of the ThreadPool private String name; @@ -81,8 +105,11 @@ private MonitoredObject threadpoolMonitoredObject; // ThreadGroup in which threads should be created - private ThreadGroup threadGroup ; + private ThreadGroup threadGroup; + Object workersLock = new Object(); + List workers = new ArrayList<>(); + /** * This constructor is used to create an unbounded threadpool */ @@ -90,7 +117,7 @@ inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT; maxWorkerThreads = Integer.MAX_VALUE; workQueue = new WorkQueueImpl(this); - threadGroup = tg ; + threadGroup = tg; name = threadpoolName; initializeMonitoring(); } @@ -121,6 +148,30 @@ initializeMonitoring(); } + // Note that this method should not return until AFTER all threads have died. + public void close() throws IOException { + + // Copy to avoid concurrent modification problems. + List copy = null; + synchronized (workersLock) { + copy = new ArrayList<>(workers); + } + + for (WorkerThread wt : copy) { + wt.close(); + while (wt.getState() != Thread.State.TERMINATED) { + try { + wt.join(); + } catch (InterruptedException exc) { + wrapper.interruptedJoinCallWhileClosingThreadPool(exc, wt, this); + } + } + } + + threadGroup = null; + } + + // Setup monitoring for this threadpool private void initializeMonitoring() { // Get root monitored object @@ -217,8 +268,8 @@ * or notify waiting threads on the queue for available work */ void notifyForAvailableWork(WorkQueue aWorkQueue) { - synchronized (lock) { - if (availableWorkerThreads == 0) { + synchronized (aWorkQueue) { + if (availableWorkerThreads < aWorkQueue.workItemsInQueue()) { createWorkerThread(); } else { aWorkQueue.notify(); @@ -227,30 +278,46 @@ } - /** - * To be called from the workqueue to create worker threads when none - * available. - */ - void createWorkerThread() { - WorkerThread thread; - - synchronized (lock) { - if (boundedThreadPool) { - if (currentThreadCount < maxWorkerThreads) { - thread = new WorkerThread(threadGroup, getName()); - currentThreadCount++; - } else { - // REVIST - Need to create a thread to monitor the - // the state for deadlock i.e. all threads waiting for - // something which can be got from the item in the - // workqueue, but there is no thread available to - // process that work item - DEADLOCK !! - return; - } - } else { - thread = new WorkerThread(threadGroup, getName()); - currentThreadCount++; - } + private Thread createWorkerThreadHelper( String name ) { + // Thread creation needs to be in a doPrivileged block + // if there is a non-null security manager for two reasons: + // 1. The creation of a thread in a specific ThreadGroup + // is a privileged operation. Lack of a doPrivileged + // block here causes an AccessControlException + // (see bug 6268145). + // 2. We want to make sure that the permissions associated + // with this thread do NOT include the permissions of + // the current thread that is calling this method. + // This leads to problems in the app server where + // some threads in the ThreadPool randomly get + // bad permissions, leading to unpredictable + // permission errors (see bug 6021011). + // + // A Java thread contains a stack of call frames, + // one for each method called that has not yet returned. + // Each method comes from a particular class. The class + // was loaded by a ClassLoader which has an associated + // CodeSource, and this determines the Permissions + // for all methods in that class. The current + // Permissions for the thread are the intersection of + // all Permissions for the methods on the stack. + // This is part of the Security Context of the thread. + // + // When a thread creates a new thread, the new thread + // inherits the security context of the old thread. + // This is bad in a ThreadPool, because different + // creators of threads may have different security contexts. + // This leads to occasional unpredictable errors when + // a thread is re-used in a different security context. + // + // Avoiding this problem is simple: just do the thread + // creation in a doPrivileged block. This sets the + // inherited security context to that of the code source + // for the ORB code itself, which contains all permissions + // in either Java SE or Java EE. + WorkerThread thread = new WorkerThread(threadGroup, name); + synchronized (workersLock) { + workers.add(thread); } // The thread must be set to a daemon thread so the @@ -257,90 +324,99 @@ // VM can exit if the only threads left are PooledThreads // or other daemons. We don't want to rely on the // calling thread always being a daemon. + // Note that no exception is possible here since we + // are inside the doPrivileged block. + thread.setDaemon(true); - // Catch exceptions since setDaemon can cause a - // security exception to be thrown under netscape - // in the Applet mode - try { - thread.setDaemon(true); - } catch (Exception e) { - // REVISIT - need to do some logging here - } + wrapper.workerThreadCreated(thread, thread.getContextClassLoader()); thread.start(); + return null; } + /** - * This method will return the minimum number of threads maintained - * by the threadpool. - */ + * To be called from the workqueue to create worker threads when none + * available. + */ + void createWorkerThread() { + final String name = getName(); + synchronized (workQueue) { + try { + if (System.getSecurityManager() == null) { + createWorkerThreadHelper(name); + } else { + // If we get here, we need to create a thread. + AccessController.doPrivileged( + new PrivilegedAction() { + public Object run() { + return createWorkerThreadHelper(name); + } + } + ); + } + } catch (Throwable t) { + // Decrementing the count of current worker threads. + // But, it will be increased in the finally block. + decrementCurrentNumberOfThreads(); + wrapper.workerThreadCreationFailure(t); + } finally { + incrementCurrentNumberOfThreads(); + } + } + } + public int minimumNumberOfThreads() { return minWorkerThreads; } - /** - * This method will return the maximum number of threads in the - * threadpool at any point in time, for the life of the threadpool - */ public int maximumNumberOfThreads() { return maxWorkerThreads; } - /** - * This method will return the time in milliseconds when idle - * threads in the threadpool are removed. - */ public long idleTimeoutForThreads() { return inactivityTimeout; } - /** - * This method will return the total number of threads currently in the - * threadpool. This method returns a value which is not synchronized. - */ public int currentNumberOfThreads() { - synchronized (lock) { + synchronized (workQueue) { return currentThreadCount; } } - /** - * This method will return the number of available threads in the - * threadpool which are waiting for work. This method returns a - * value which is not synchronized. - */ + void decrementCurrentNumberOfThreads() { + synchronized (workQueue) { + currentThreadCount--; + } + } + + void incrementCurrentNumberOfThreads() { + synchronized (workQueue) { + currentThreadCount++; + } + } + public int numberOfAvailableThreads() { - synchronized (lock) { + synchronized (workQueue) { return availableWorkerThreads; } } - /** - * This method will return the number of busy threads in the threadpool - * This method returns a value which is not synchronized. - */ public int numberOfBusyThreads() { - synchronized (lock) { + synchronized (workQueue) { return (currentThreadCount - availableWorkerThreads); } } - /** - * This method returns the average elapsed time taken to complete a Work - * item in milliseconds. - */ public long averageWorkCompletionTime() { - synchronized (lock) { - return (totalTimeTaken / processedCount); + synchronized (workQueue) { + return (totalTimeTaken.get() / processedCount.get()); } } - /** - * This method returns the number of Work items processed by the threadpool - */ public long currentProcessedCount() { - synchronized (lock) { - return processedCount; + synchronized (workQueue) { + return processedCount.get(); } } @@ -357,15 +433,37 @@ private static synchronized int getUniqueThreadId() { - return ThreadPoolImpl.threadCounter++; + return ThreadPoolImpl.threadCounter.incrementAndGet(); } + /** + * This method will decrement the number of available threads + * in the threadpool which are waiting for work. Called from + * WorkQueueImpl.requestWork() + */ + void decrementNumberOfAvailableThreads() { + synchronized (workQueue) { + availableWorkerThreads--; + } + } - private class WorkerThread extends Thread + /** + * This method will increment the number of available threads + * in the threadpool which are waiting for work. Called from + * WorkQueueImpl.requestWork() + */ + void incrementNumberOfAvailableThreads() { + synchronized (workQueue) { + availableWorkerThreads++; + } + } + + + private class WorkerThread extends Thread implements Closeable { private Work currentWork; private int threadId = 0; // unique id for the thread - // thread pool this WorkerThread belongs too + private volatile boolean closeCalled = false; private String threadPoolName; // name seen by Thread.getName() private StringBuffer workerThreadName = new StringBuffer(); @@ -377,101 +475,62 @@ setName(composeWorkerThreadName(threadPoolName, "Idle")); } - public void run() { - while (true) { - try { + public synchronized void close() { + closeCalled = true; + interrupt(); + } - synchronized (lock) { - availableWorkerThreads++; - } + private void resetClassLoader() { - // Get some work to do - currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout); + } - synchronized (lock) { - availableWorkerThreads--; - // It is possible in notifyForAvailableWork that the - // check for availableWorkerThreads = 0 may return - // false, because the availableWorkerThreads has not been - // decremented to zero before the producer thread added - // work to the queue. This may create a deadlock, if the - // executing thread needs information which is in the work - // item queued in the workqueue, but has no thread to work - // on it since none was created because availableWorkerThreads = 0 - // returned false. - // The following code will ensure that a thread is always available - // in those situations - if ((availableWorkerThreads == 0) && - (workQueue.workItemsInQueue() > 0)) { - createWorkerThread(); - } - } + private void performWork() { + long start = System.currentTimeMillis(); + try { + currentWork.doWork(); + } catch (Throwable t) { + wrapper.workerThreadDoWorkThrowable(this, t); + } + long elapsedTime = System.currentTimeMillis() - start; + totalTimeTaken.addAndGet(elapsedTime); + processedCount.incrementAndGet(); + } - // Set the thread name for debugging. - setName(composeWorkerThreadName(threadPoolName, - Integer.toString(this.threadId))); - - long start = System.currentTimeMillis(); - + public void run() { + try { + while (!closeCalled) { try { - // Do the work - currentWork.doWork(); + currentWork = ((WorkQueueImpl)workQueue).requestWork( + inactivityTimeout); + if (currentWork == null) + continue; + } catch (InterruptedException exc) { + wrapper.workQueueThreadInterrupted( exc, getName(), + Boolean.valueOf(closeCalled)); + + continue ; } catch (Throwable t) { - // Ignore all errors. - ; - } + wrapper.workerThreadThrowableFromRequestWork(this, t, + workQueue.getName()); - long end = System.currentTimeMillis(); - - - synchronized (lock) { - totalTimeTaken += (end - start); - processedCount++; + continue; } + performWork(); + // set currentWork to null so that the work item can be - // garbage collected + // garbage collected without waiting for the next work item. currentWork = null; - setName(composeWorkerThreadName(threadPoolName, "Idle")); - - } catch (TimeoutException e) { - // This thread timed out waiting for something to do. - - synchronized (lock) { - availableWorkerThreads--; - - // This should for both bounded and unbounded case - if (currentThreadCount > minWorkerThreads) { - currentThreadCount--; - // This thread can exit. - return; - } else { - // Go back to waiting on workQueue - continue; - } - } - } catch (InterruptedException ie) { - // InterruptedExceptions are - // caught here. Thus, threads can be forced out of - // requestWork and so they have to reacquire the lock. - // Other options include ignoring or - // letting this thread die. - // Ignoring for now. REVISIT - synchronized (lock) { - availableWorkerThreads--; - } - - } catch (Throwable e) { - - // Ignore any exceptions that currentWork.process - // accidently lets through, but let Errors pass. - // Add debugging output? REVISIT - synchronized (lock) { - availableWorkerThreads--; - } - + resetClassLoader(); } + } catch (Throwable e) { + // This should not be possible + wrapper.workerThreadCaughtUnexpectedThrowable(this,e); + } finally { + synchronized (workersLock) { + workers.remove(this); + } } }