< prev index next >

src/share/vm/gc/shared/workgroup.cpp

Print this page




  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  *
  23  */
  24 
  25 #include "precompiled.hpp"
  26 #include "gc/shared/workgroup.hpp"
  27 #include "memory/allocation.hpp"
  28 #include "memory/allocation.inline.hpp"
  29 #include "runtime/atomic.inline.hpp"
  30 #include "runtime/os.hpp"


  31 
  32 // Definitions of WorkGang methods.
  33 
  34 // The current implementation will exit if the allocation
  35 // of any worker fails.  Still, return a boolean so that
  36 // a future implementation can possibly do a partial
  37 // initialization of the workers and report such to the
  38 // caller.
  39 bool AbstractWorkGang::initialize_workers() {
  40 
  41   if (TraceWorkGang) {
  42     tty->print_cr("Constructing work gang %s with %d threads",
  43                   name(),
  44                   total_workers());
  45   }
  46   _workers = NEW_C_HEAP_ARRAY(AbstractGangWorker*, total_workers(), mtInternal);
  47   if (_workers == NULL) {
  48     vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, "Cannot create GangWorker array.");
  49     return false;
  50   }


  79   assert(result != NULL, "Indexing to null worker");
  80   return result;
  81 }
  82 
  83 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
  84   uint workers = total_workers();
  85   for (uint i = 0; i < workers; i++) {
  86     worker(i)->print_on(st);
  87     st->cr();
  88   }
  89 }
  90 
  91 void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
  92   assert(tc != NULL, "Null ThreadClosure");
  93   uint workers = total_workers();
  94   for (uint i = 0; i < workers; i++) {
  95     tc->do_thread(worker(i));
  96   }
  97 }
  98 
  99 WorkGang::WorkGang(const char* name,
 100                    uint        workers,
 101                    bool        are_GC_task_threads,
 102                    bool        are_ConcurrentGC_threads) :
 103     AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads),
 104     _started_workers(0),
 105     _finished_workers(0),
 106     _sequence_number(0),
 107     _task(NULL) {
 108 
 109   // Other initialization.
 110   _monitor = new Monitor(/* priority */       Mutex::leaf,
 111                          /* name */           "WorkGroup monitor",
 112                          /* allow_vm_block */ are_GC_task_threads,
 113                                               Monitor::_safepoint_check_sometimes);
 114 
 115   assert(monitor() != NULL, "Failed to allocate monitor");
 116 }




























 117 
 118 AbstractGangWorker* WorkGang::allocate_worker(uint worker_id) {
 119   return new GangWorker(this, worker_id);
 120 }

 121 
 122 void WorkGang::run_task(AbstractGangTask* task) {
 123   run_task(task, (uint)active_workers());
 124 }
 125 
 126 void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) {
 127   // This thread is executed by the VM thread which does not block
 128   // on ordinary MutexLocker's.
 129   MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
 130   if (TraceWorkGang) {
 131     tty->print_cr("Running work gang %s task %s", name(), task->name());
























 132   }
 133   // Tell all the workers to run a task.
 134   assert(task != NULL, "Running a null task");
 135   // Initialize.
























 136   _task = task;
 137   _sequence_number += 1;
 138   _started_workers = 0;
 139   _finished_workers = 0;
 140   // Tell the workers to get to work.
 141   monitor()->notify_all();
 142   // Wait for them to be finished
 143   while (finished_workers() < no_of_parallel_workers) {
 144     if (TraceWorkGang) {
 145       tty->print_cr("Waiting in work gang %s: %u/%u finished sequence %d",
 146                     name(), finished_workers(), no_of_parallel_workers,
 147                     _sequence_number);
 148     }
 149     monitor()->wait(/* no_safepoint_check */ true);
 150   }

 151   _task = NULL;
 152   if (TraceWorkGang) {
 153     tty->print_cr("\nFinished work gang %s: %u/%u sequence %d",
 154                   name(), finished_workers(), no_of_parallel_workers,
 155                   _sequence_number);
 156     Thread* me = Thread::current();
 157     tty->print_cr("  T: " PTR_FORMAT "  VM_thread: %d", p2i(me), me->is_VM_thread());
 158   }
 159 }
 160 
 161 void WorkGang::internal_worker_poll(WorkData* data) const {
 162   assert(monitor()->owned_by_self(), "worker_poll is an internal method");
 163   assert(data != NULL, "worker data is null");
 164   data->set_task(task());
 165   data->set_sequence_number(sequence_number());
 166 }








 167 
 168 void WorkGang::internal_note_start() {
 169   assert(monitor()->owned_by_self(), "note_finish is an internal method");
 170   _started_workers += 1;

















 171 }
 172 
 173 void WorkGang::internal_note_finish() {
 174   assert(monitor()->owned_by_self(), "note_finish is an internal method");
 175   _finished_workers += 1;







 176 }
 177 
 178 // GangWorker methods.


 179 
 180 AbstractGangWorker::AbstractGangWorker(AbstractWorkGang* gang, uint id) {
 181   _gang = gang;
 182   set_id(id);
 183   set_name("%s#%d", gang->name(), id);
 184 }
 185 
 186 void AbstractGangWorker::run() {
 187   initialize();
 188   loop();
 189 }
 190 
 191 void AbstractGangWorker::initialize() {
 192   this->initialize_thread_local_storage();
 193   this->record_stack_base_and_size();
 194   this->initialize_named_thread();
 195   assert(_gang != NULL, "No gang to run in");
 196   os::set_priority(this, NearMaxPriority);
 197   if (TraceWorkGang) {
 198     tty->print_cr("Running gang worker for gang %s id %u",


 201   // The VM thread should not execute here because MutexLocker's are used
 202   // as (opposed to MutexLockerEx's).
 203   assert(!Thread::current()->is_VM_thread(), "VM thread should not be part"
 204          " of a work gang");
 205 }
 206 
 207 bool AbstractGangWorker::is_GC_task_thread() const {
 208   return gang()->are_GC_task_threads();
 209 }
 210 
 211 bool AbstractGangWorker::is_ConcurrentGC_thread() const {
 212   return gang()->are_ConcurrentGC_threads();
 213 }
 214 
 215 void AbstractGangWorker::print_on(outputStream* st) const {
 216   st->print("\"%s\" ", name());
 217   Thread::print_on(st);
 218   st->cr();
 219 }
 220 
 221 void GangWorker::loop() {
 222   int previous_sequence_number = 0;
 223   Monitor* gang_monitor = gang()->monitor();
 224   for ( ; ; ) {
 225     WorkData data;
 226     int part;  // Initialized below.
 227     {
 228       // Grab the gang mutex.
 229       MutexLocker ml(gang_monitor);
 230       // Wait for something to do.
 231       // Polling outside the while { wait } avoids missed notifies
 232       // in the outer loop.
 233       gang()->internal_worker_poll(&data);
 234       if (TraceWorkGang) {
 235         tty->print("Polled outside for work in gang %s worker %u",
 236                    gang()->name(), id());
 237         tty->print("  sequence: %d (prev: %d)",
 238                    data.sequence_number(), previous_sequence_number);
 239         if (data.task() != NULL) {
 240           tty->print("  task: %s", data.task()->name());
 241         } else {
 242           tty->print("  task: NULL");
 243         }
 244         tty->cr();
 245       }
 246       for ( ; /* break */; ) {
 247         // Check for new work.
 248         if ((data.task() != NULL) &&
 249             (data.sequence_number() != previous_sequence_number)) {
 250           if (gang()->needs_more_workers()) {
 251             gang()->internal_note_start();
 252             gang_monitor->notify_all();
 253             part = gang()->started_workers() - 1;
 254             break;
 255           }
 256         }
 257         // Nothing to do.
 258         gang_monitor->wait(/* no_safepoint_check */ true);
 259         gang()->internal_worker_poll(&data);
 260         if (TraceWorkGang) {
 261           tty->print("Polled inside for work in gang %s worker %u",
 262                      gang()->name(), id());
 263           tty->print("  sequence: %d (prev: %d)",
 264                      data.sequence_number(), previous_sequence_number);
 265           if (data.task() != NULL) {
 266             tty->print("  task: %s", data.task()->name());
 267           } else {
 268             tty->print("  task: NULL");
 269           }
 270           tty->cr();
 271         }
 272       }
 273       // Drop gang mutex.
 274     }
 275     if (TraceWorkGang) {
 276       tty->print("Work for work gang %s id %u task %s part %d",
 277                  gang()->name(), id(), data.task()->name(), part);
 278     }
 279     assert(data.task() != NULL, "Got null task");
 280     data.task()->work(part);
 281     {
 282       if (TraceWorkGang) {
 283         tty->print("Finish for work gang %s id %u task %s part %d",
 284                    gang()->name(), id(), data.task()->name(), part);
 285       }
 286       // Grab the gang mutex.
 287       MutexLocker ml(gang_monitor);
 288       gang()->internal_note_finish();
 289       // Tell the gang you are done.
 290       gang_monitor->notify_all();
 291       // Drop the gang mutex.
 292     }
 293     previous_sequence_number = data.sequence_number();
















 294   }
 295 }
 296 
 297 // *** WorkGangBarrierSync
 298 
 299 WorkGangBarrierSync::WorkGangBarrierSync()
 300   : _monitor(Mutex::safepoint, "work gang barrier sync", true,
 301              Monitor::_safepoint_check_never),
 302     _n_workers(0), _n_completed(0), _should_reset(false), _aborted(false) {
 303 }
 304 
 305 WorkGangBarrierSync::WorkGangBarrierSync(uint n_workers, const char* name)
 306   : _monitor(Mutex::safepoint, name, true, Monitor::_safepoint_check_never),
 307     _n_workers(n_workers), _n_completed(0), _should_reset(false), _aborted(false) {
 308 }
 309 
 310 void WorkGangBarrierSync::set_n_workers(uint n_workers) {
 311   _n_workers    = n_workers;
 312   _n_completed  = 0;
 313   _should_reset = false;




  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  *
  23  */
  24 
  25 #include "precompiled.hpp"
  26 #include "gc/shared/workgroup.hpp"
  27 #include "memory/allocation.hpp"
  28 #include "memory/allocation.inline.hpp"
  29 #include "runtime/atomic.inline.hpp"
  30 #include "runtime/os.hpp"
  31 #include "runtime/thread.inline.hpp"
  32 #include "utilities/semaphore.hpp"
  33 
  34 // Definitions of WorkGang methods.
  35 
  36 // The current implementation will exit if the allocation
  37 // of any worker fails.  Still, return a boolean so that
  38 // a future implementation can possibly do a partial
  39 // initialization of the workers and report such to the
  40 // caller.
  41 bool AbstractWorkGang::initialize_workers() {
  42 
  43   if (TraceWorkGang) {
  44     tty->print_cr("Constructing work gang %s with %d threads",
  45                   name(),
  46                   total_workers());
  47   }
  48   _workers = NEW_C_HEAP_ARRAY(AbstractGangWorker*, total_workers(), mtInternal);
  49   if (_workers == NULL) {
  50     vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, "Cannot create GangWorker array.");
  51     return false;
  52   }


  81   assert(result != NULL, "Indexing to null worker");
  82   return result;
  83 }
  84 
  85 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
  86   uint workers = total_workers();
  87   for (uint i = 0; i < workers; i++) {
  88     worker(i)->print_on(st);
  89     st->cr();
  90   }
  91 }
  92 
  93 void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
  94   assert(tc != NULL, "Null ThreadClosure");
  95   uint workers = total_workers();
  96   for (uint i = 0; i < workers; i++) {
  97     tc->do_thread(worker(i));
  98   }
  99 }
 100 
 101 #if IMPLEMENTS_SEMAPHORE_CLASS














 102 
 103 // WorkGang dispatcher implemented with semaphores.
 104 //
 105 // Semaphores don't require the worker threads to re-claim the lock when they wake up.
 106 // This helps lowering the latency when starting and stopping the worker threads.
 107 class SemaphoreGangTaskDispatcher : public GangTaskDispatcher {
 108   // The task currently being dispatched to the GangWorkers.
 109   AbstractGangTask* _task;
 110 
 111   volatile uint _started;
 112   volatile uint _not_finished;
 113 
 114   // Semaphore used to start the GangWorkers.
 115   Semaphore* _start_semaphore;
 116   // Semaphore used to notify the coordinator that all workers are done.
 117   Semaphore* _end_semaphore;
 118 
 119 public:
 120   SemaphoreGangTaskDispatcher(uint workers) :
 121       _task(NULL),
 122       _started(0),
 123       _not_finished(0),
 124       // Limit the semaphore value to the number of workers.
 125       _start_semaphore(new Semaphore(0, workers)),
 126       _end_semaphore(new Semaphore(0, workers))
 127 { }
 128 
 129   ~SemaphoreGangTaskDispatcher() {
 130     delete _start_semaphore;
 131     delete _end_semaphore;
 132   }
 133 
 134   void coordinator_execute_on_workers(AbstractGangTask* task, uint num_workers) {
 135     // No workers are allowed to read the state variables until they have been signaled.
 136     _task         = task;
 137     _not_finished = num_workers;
 138 
 139     // Dispatch 'num_workers' number of tasks.
 140     _start_semaphore->signal(num_workers);

 141 
 142     // Wait for the last worker to signal the coordinator.
 143     _end_semaphore->wait();
 144 
 145     // No workers are allowed to read the state variables after the coordinator has been signaled.
 146     _task         = NULL;
 147     _started      = 0;
 148     _not_finished = 0;
 149   }
 150 
 151   WorkData worker_wait_for_task() {
 152     // Wait for the coordinator to dispatch a task.
 153     _start_semaphore->wait();
 154 
 155     uint num_started = (uint) Atomic::add(1, (volatile jint*)&_started);
 156 
 157     // Subtract one to get a zero-indexed worker id.
 158     uint worker_id = num_started - 1;
 159 
 160     return WorkData(_task, worker_id);
 161   }
 162 
 163   void worker_done_with_task() {
 164     // Mark that the worker is done with the task.
 165     // The worker is not allowed to read the state variables after this line.
 166     uint not_finished = (uint) Atomic::add(-1, (volatile jint*)&_not_finished);
 167 
 168     // The last worker signals to the coordinator that all work is completed.
 169     if (not_finished == 0) {
 170       _end_semaphore->signal();
 171     }
 172   }
 173 };
 174 #endif // IMPLEMENTS_SEMAPHORE_CLASS
 175 
 176 class MutexGangTaskDispatcher : public GangTaskDispatcher {
 177   AbstractGangTask* _task;
 178 
 179   volatile uint _started;
 180   volatile uint _finished;
 181   volatile uint _num_workers;
 182 
 183   Monitor* _monitor;
 184 
 185  public:
 186   MutexGangTaskDispatcher()
 187       : _task(NULL),
 188         _monitor(new Monitor(Monitor::leaf, "WorkGang dispatcher lock", false, Monitor::_safepoint_check_never)),
 189         _started(0),
 190         _finished(0),
 191         _num_workers(0) {}
 192 
 193   ~MutexGangTaskDispatcher() {
 194     delete _monitor;
 195   }
 196 
 197   void coordinator_execute_on_workers(AbstractGangTask* task, uint num_workers) {
 198     MutexLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
 199 
 200     _task        = task;
 201     _num_workers = num_workers;
 202 

 203     // Tell the workers to get to work.
 204     _monitor->notify_all();
 205 
 206     // Wait for them to finish.
 207     while (_finished < _num_workers) {
 208       _monitor->wait(/* no_safepoint_check */ true);




 209     }
 210 
 211     _task        = NULL;
 212     _num_workers = 0;
 213     _started     = 0;
 214     _finished    = 0;



 215   }

 216 
 217   WorkData worker_wait_for_task() {
 218     MonitorLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
 219 
 220     while (_num_workers == 0 || _started == _num_workers) {
 221       _monitor->wait(/* no_safepoint_check */ true);
 222     }
 223 
 224     _started++;
 225 
 226     // Subtract one to get a zero-indexed worker id.
 227     uint worker_id = _started - 1;
 228 
 229     return WorkData(_task, worker_id);
 230   }
 231 
 232   void worker_done_with_task() {
 233     MonitorLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
 234 
 235     _finished++;
 236 
 237     if (_finished == _num_workers) {
 238       // This will wake up all workers and not only the coordinator.
 239       _monitor->notify_all();
 240     }
 241   }
 242 };
 243 
 244 static GangTaskDispatcher* create_dispatcher(uint workers) {
 245 #if IMPLEMENTS_SEMAPHORE_CLASS
 246   if (UseSemaphoreGCThreadsSynchronization) {
 247     return new SemaphoreGangTaskDispatcher(workers);
 248   }
 249 #endif
 250 
 251   return new MutexGangTaskDispatcher();
 252 }
 253 
 254 WorkGang::WorkGang(const char* name,
 255                    uint  workers,
 256                    bool  are_GC_task_threads,
 257                    bool  are_ConcurrentGC_threads) :
 258     AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads),
 259     _dispatcher(create_dispatcher(workers))
 260 { }
 261 
 262 AbstractGangWorker* WorkGang::allocate_worker(uint worker_id) {
 263   return new GangWorker(this, worker_id);
 264 }
 265 
 266 void WorkGang::run_task(AbstractGangTask* task) {
 267   _dispatcher->coordinator_execute_on_workers(task, active_workers());
 268 }
 269 
 270 AbstractGangWorker::AbstractGangWorker(AbstractWorkGang* gang, uint id) {
 271   _gang = gang;
 272   set_id(id);
 273   set_name("%s#%d", gang->name(), id);
 274 }
 275 
 276 void AbstractGangWorker::run() {
 277   initialize();
 278   loop();
 279 }
 280 
 281 void AbstractGangWorker::initialize() {
 282   this->initialize_thread_local_storage();
 283   this->record_stack_base_and_size();
 284   this->initialize_named_thread();
 285   assert(_gang != NULL, "No gang to run in");
 286   os::set_priority(this, NearMaxPriority);
 287   if (TraceWorkGang) {
 288     tty->print_cr("Running gang worker for gang %s id %u",


 291   // The VM thread should not execute here because MutexLocker's are used
 292   // as (opposed to MutexLockerEx's).
 293   assert(!Thread::current()->is_VM_thread(), "VM thread should not be part"
 294          " of a work gang");
 295 }
 296 
 297 bool AbstractGangWorker::is_GC_task_thread() const {
 298   return gang()->are_GC_task_threads();
 299 }
 300 
 301 bool AbstractGangWorker::is_ConcurrentGC_thread() const {
 302   return gang()->are_ConcurrentGC_threads();
 303 }
 304 
 305 void AbstractGangWorker::print_on(outputStream* st) const {
 306   st->print("\"%s\" ", name());
 307   Thread::print_on(st);
 308   st->cr();
 309 }
 310 
 311 WorkData GangWorker::wait_for_task() {
 312   return gang()->dispatcher()->worker_wait_for_task();
 313 }
 314 
 315 void GangWorker::signal_task_done() {
 316   gang()->dispatcher()->worker_done_with_task();
 317 }
 318 
 319 void GangWorker::print_task_started(WorkData data) {













































 320   if (TraceWorkGang) {
 321     tty->print_cr("Running work gang %s task %s worker %u", name(), data._task->name(), data._worker_id);

 322   }
 323 }
 324 
 325 void GangWorker::print_task_done(WorkData data) {
 326   if (TraceWorkGang) {
 327     tty->print_cr("\nFinished work gang %s task %s worker %u", name(), data._task->name(), data._worker_id);
 328     Thread* me = Thread::current();
 329     tty->print_cr("  T: " PTR_FORMAT "  VM_thread: %d", p2i(me), me->is_VM_thread());






 330   }
 331 }
 332 
 333 void GangWorker::run_task(WorkData data) {
 334   print_task_started(data);
 335 
 336   data._task->work(data._worker_id);
 337 
 338   print_task_done(data);
 339 }
 340 
 341 void GangWorker::loop() {
 342   while (true) {
 343     WorkData data = wait_for_task();
 344 
 345     run_task(data);
 346 
 347     signal_task_done();
 348   }
 349 }
 350 
 351 // *** WorkGangBarrierSync
 352 
 353 WorkGangBarrierSync::WorkGangBarrierSync()
 354   : _monitor(Mutex::safepoint, "work gang barrier sync", true,
 355              Monitor::_safepoint_check_never),
 356     _n_workers(0), _n_completed(0), _should_reset(false), _aborted(false) {
 357 }
 358 
 359 WorkGangBarrierSync::WorkGangBarrierSync(uint n_workers, const char* name)
 360   : _monitor(Mutex::safepoint, name, true, Monitor::_safepoint_check_never),
 361     _n_workers(n_workers), _n_completed(0), _should_reset(false), _aborted(false) {
 362 }
 363 
 364 void WorkGangBarrierSync::set_n_workers(uint n_workers) {
 365   _n_workers    = n_workers;
 366   _n_completed  = 0;
 367   _should_reset = false;


< prev index next >