< prev index next >

src/share/vm/gc/shared/workgroup.cpp

Print this page




  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  *
  23  */
  24 
  25 #include "precompiled.hpp"
  26 #include "gc/shared/workgroup.hpp"
  27 #include "memory/allocation.hpp"
  28 #include "memory/allocation.inline.hpp"
  29 #include "runtime/atomic.inline.hpp"
  30 #include "runtime/os.hpp"


  31 
  32 // Definitions of WorkGang methods.
  33 
  34 // The current implementation will exit if the allocation
  35 // of any worker fails.  Still, return a boolean so that
  36 // a future implementation can possibly do a partial
  37 // initialization of the workers and report such to the
  38 // caller.
  39 bool AbstractWorkGang::initialize_workers() {
  40 
  41   if (TraceWorkGang) {
  42     tty->print_cr("Constructing work gang %s with %d threads",
  43                   name(),
  44                   total_workers());
  45   }
  46   _workers = NEW_C_HEAP_ARRAY(AbstractGangWorker*, total_workers(), mtInternal);
  47   if (_workers == NULL) {
  48     vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, "Cannot create GangWorker array.");
  49     return false;
  50   }


  79   assert(result != NULL, "Indexing to null worker");
  80   return result;
  81 }
  82 
  83 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
  84   uint workers = total_workers();
  85   for (uint i = 0; i < workers; i++) {
  86     worker(i)->print_on(st);
  87     st->cr();
  88   }
  89 }
  90 
  91 void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
  92   assert(tc != NULL, "Null ThreadClosure");
  93   uint workers = total_workers();
  94   for (uint i = 0; i < workers; i++) {
  95     tc->do_thread(worker(i));
  96   }
  97 }
  98 
  99 WorkGang::WorkGang(const char* name,
 100                    uint        workers,
 101                    bool        are_GC_task_threads,
 102                    bool        are_ConcurrentGC_threads) :
 103     AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads),
 104     _started_workers(0),
 105     _finished_workers(0),
 106     _sequence_number(0),
 107     _task(NULL) {
 108 
 109   // Other initialization.
 110   _monitor = new Monitor(/* priority */       Mutex::leaf,
 111                          /* name */           "WorkGroup monitor",
 112                          /* allow_vm_block */ are_GC_task_threads,
 113                                               Monitor::_safepoint_check_sometimes);















 114 
 115   assert(monitor() != NULL, "Failed to allocate monitor");
 116 }


 117 
 118 AbstractGangWorker* WorkGang::allocate_worker(uint worker_id) {
 119   return new GangWorker(this, worker_id);
 120 }
 121 
 122 void WorkGang::run_task(AbstractGangTask* task) {
 123   run_task(task, (uint)active_workers());
 124 }
 125 
 126 void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) {
 127   // This thread is executed by the VM thread which does not block
 128   // on ordinary MutexLocker's.
 129   MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
 130   if (TraceWorkGang) {
 131     tty->print_cr("Running work gang %s task %s", name(), task->name());





















 132   }
 133   // Tell all the workers to run a task.
 134   assert(task != NULL, "Running a null task");
 135   // Initialize.























 136   _task = task;
 137   _sequence_number += 1;
 138   _started_workers = 0;
 139   _finished_workers = 0;
 140   // Tell the workers to get to work.
 141   monitor()->notify_all();
 142   // Wait for them to be finished
 143   while (finished_workers() < no_of_parallel_workers) {
 144     if (TraceWorkGang) {
 145       tty->print_cr("Waiting in work gang %s: %u/%u finished sequence %d",
 146                     name(), finished_workers(), no_of_parallel_workers,
 147                     _sequence_number);
 148     }
 149     monitor()->wait(/* no_safepoint_check */ true);
 150   }

 151   _task = NULL;
 152   if (TraceWorkGang) {
 153     tty->print_cr("\nFinished work gang %s: %u/%u sequence %d",
 154                   name(), finished_workers(), no_of_parallel_workers,
 155                   _sequence_number);
 156     Thread* me = Thread::current();
 157     tty->print_cr("  T: " PTR_FORMAT "  VM_thread: %d", p2i(me), me->is_VM_thread());
 158   }
 159 }
 160 
 161 void WorkGang::internal_worker_poll(WorkData* data) const {
 162   assert(monitor()->owned_by_self(), "worker_poll is an internal method");
 163   assert(data != NULL, "worker data is null");
 164   data->set_task(task());
 165   data->set_sequence_number(sequence_number());
 166 }








 167 
 168 void WorkGang::internal_note_start() {
 169   assert(monitor()->owned_by_self(), "note_finish is an internal method");
 170   _started_workers += 1;















 171 }
 172 
 173 void WorkGang::internal_note_finish() {
 174   assert(monitor()->owned_by_self(), "note_finish is an internal method");
 175   _finished_workers += 1;







 176 }
 177 
 178 // GangWorker methods.


 179 
 180 AbstractGangWorker::AbstractGangWorker(AbstractWorkGang* gang, uint id) {
 181   _gang = gang;
 182   set_id(id);
 183   set_name("%s#%d", gang->name(), id);
 184 }
 185 
 186 void AbstractGangWorker::run() {
 187   initialize();
 188   loop();
 189 }
 190 
 191 void AbstractGangWorker::initialize() {
 192   this->initialize_thread_local_storage();
 193   this->record_stack_base_and_size();
 194   this->initialize_named_thread();
 195   assert(_gang != NULL, "No gang to run in");
 196   os::set_priority(this, NearMaxPriority);
 197   if (TraceWorkGang) {
 198     tty->print_cr("Running gang worker for gang %s id %u",


 201   // The VM thread should not execute here because MutexLocker's are used
 202   // as (opposed to MutexLockerEx's).
 203   assert(!Thread::current()->is_VM_thread(), "VM thread should not be part"
 204          " of a work gang");
 205 }
 206 
 207 bool AbstractGangWorker::is_GC_task_thread() const {
 208   return gang()->are_GC_task_threads();
 209 }
 210 
 211 bool AbstractGangWorker::is_ConcurrentGC_thread() const {
 212   return gang()->are_ConcurrentGC_threads();
 213 }
 214 
 215 void AbstractGangWorker::print_on(outputStream* st) const {
 216   st->print("\"%s\" ", name());
 217   Thread::print_on(st);
 218   st->cr();
 219 }
 220 
 221 void GangWorker::loop() {
 222   int previous_sequence_number = 0;
 223   Monitor* gang_monitor = gang()->monitor();
 224   for ( ; ; ) {
 225     WorkData data;
 226     int part;  // Initialized below.
 227     {
 228       // Grab the gang mutex.
 229       MutexLocker ml(gang_monitor);
 230       // Wait for something to do.
 231       // Polling outside the while { wait } avoids missed notifies
 232       // in the outer loop.
 233       gang()->internal_worker_poll(&data);
 234       if (TraceWorkGang) {
 235         tty->print("Polled outside for work in gang %s worker %u",
 236                    gang()->name(), id());
 237         tty->print("  sequence: %d (prev: %d)",
 238                    data.sequence_number(), previous_sequence_number);
 239         if (data.task() != NULL) {
 240           tty->print("  task: %s", data.task()->name());
 241         } else {
 242           tty->print("  task: NULL");
 243         }
 244         tty->cr();
 245       }
 246       for ( ; /* break */; ) {
 247         // Check for new work.
 248         if ((data.task() != NULL) &&
 249             (data.sequence_number() != previous_sequence_number)) {
 250           if (gang()->needs_more_workers()) {
 251             gang()->internal_note_start();
 252             gang_monitor->notify_all();
 253             part = gang()->started_workers() - 1;
 254             break;
 255           }
 256         }
 257         // Nothing to do.
 258         gang_monitor->wait(/* no_safepoint_check */ true);
 259         gang()->internal_worker_poll(&data);
 260         if (TraceWorkGang) {
 261           tty->print("Polled inside for work in gang %s worker %u",
 262                      gang()->name(), id());
 263           tty->print("  sequence: %d (prev: %d)",
 264                      data.sequence_number(), previous_sequence_number);
 265           if (data.task() != NULL) {
 266             tty->print("  task: %s", data.task()->name());
 267           } else {
 268             tty->print("  task: NULL");
 269           }
 270           tty->cr();
 271         }
 272       }
 273       // Drop gang mutex.
 274     }
 275     if (TraceWorkGang) {
 276       tty->print("Work for work gang %s id %u task %s part %d",
 277                  gang()->name(), id(), data.task()->name(), part);
 278     }
 279     assert(data.task() != NULL, "Got null task");
 280     data.task()->work(part);
 281     {
 282       if (TraceWorkGang) {
 283         tty->print("Finish for work gang %s id %u task %s part %d",
 284                    gang()->name(), id(), data.task()->name(), part);
 285       }
 286       // Grab the gang mutex.
 287       MutexLocker ml(gang_monitor);
 288       gang()->internal_note_finish();
 289       // Tell the gang you are done.
 290       gang_monitor->notify_all();
 291       // Drop the gang mutex.
 292     }
 293     previous_sequence_number = data.sequence_number();
















 294   }
 295 }
 296 
 297 // *** WorkGangBarrierSync
 298 
 299 WorkGangBarrierSync::WorkGangBarrierSync()
 300   : _monitor(Mutex::safepoint, "work gang barrier sync", true,
 301              Monitor::_safepoint_check_never),
 302     _n_workers(0), _n_completed(0), _should_reset(false), _aborted(false) {
 303 }
 304 
 305 WorkGangBarrierSync::WorkGangBarrierSync(uint n_workers, const char* name)
 306   : _monitor(Mutex::safepoint, name, true, Monitor::_safepoint_check_never),
 307     _n_workers(n_workers), _n_completed(0), _should_reset(false), _aborted(false) {
 308 }
 309 
 310 void WorkGangBarrierSync::set_n_workers(uint n_workers) {
 311   _n_workers    = n_workers;
 312   _n_completed  = 0;
 313   _should_reset = false;




  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  *
  23  */
  24 
  25 #include "precompiled.hpp"
  26 #include "gc/shared/workgroup.hpp"
  27 #include "memory/allocation.hpp"
  28 #include "memory/allocation.inline.hpp"
  29 #include "runtime/atomic.inline.hpp"
  30 #include "runtime/os.hpp"
  31 #include "runtime/semaphore.hpp"
  32 #include "runtime/thread.inline.hpp"
  33 
  34 // Definitions of WorkGang methods.
  35 
  36 // The current implementation will exit if the allocation
  37 // of any worker fails.  Still, return a boolean so that
  38 // a future implementation can possibly do a partial
  39 // initialization of the workers and report such to the
  40 // caller.
  41 bool AbstractWorkGang::initialize_workers() {
  42 
  43   if (TraceWorkGang) {
  44     tty->print_cr("Constructing work gang %s with %d threads",
  45                   name(),
  46                   total_workers());
  47   }
  48   _workers = NEW_C_HEAP_ARRAY(AbstractGangWorker*, total_workers(), mtInternal);
  49   if (_workers == NULL) {
  50     vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, "Cannot create GangWorker array.");
  51     return false;
  52   }


  81   assert(result != NULL, "Indexing to null worker");
  82   return result;
  83 }
  84 
  85 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
  86   uint workers = total_workers();
  87   for (uint i = 0; i < workers; i++) {
  88     worker(i)->print_on(st);
  89     st->cr();
  90   }
  91 }
  92 
  93 void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
  94   assert(tc != NULL, "Null ThreadClosure");
  95   uint workers = total_workers();
  96   for (uint i = 0; i < workers; i++) {
  97     tc->do_thread(worker(i));
  98   }
  99 }
 100 
 101 // WorkGang dispatcher implemented with semaphores.
 102 //
 103 // Semaphores don't require the worker threads to re-claim the lock when they wake up.
 104 // This helps lowering the latency when starting and stopping the worker threads.
 105 class SemaphoreGangTaskDispatcher : public GangTaskDispatcher {
 106   // The task currently being dispatched to the GangWorkers.
 107   AbstractGangTask* _task;
 108 
 109   volatile uint _started;
 110   volatile uint _not_finished;
 111 
 112   // Semaphore used to start the GangWorkers.
 113   Semaphore* _start_semaphore;
 114   // Semaphore used to notify the coordinator that all workers are done.
 115   Semaphore* _end_semaphore;
 116 
 117 public:
 118   SemaphoreGangTaskDispatcher() :
 119       _task(NULL),
 120       _started(0),
 121       _not_finished(0),
 122       // Limit the semaphore value to the number of workers.
 123       _start_semaphore(new Semaphore()),
 124       _end_semaphore(new Semaphore())
 125 { }
 126 
 127   ~SemaphoreGangTaskDispatcher() {
 128     delete _start_semaphore;
 129     delete _end_semaphore;
 130   }
 131 
 132   void coordinator_execute_on_workers(AbstractGangTask* task, uint num_workers) {
 133     // No workers are allowed to read the state variables until they have been signaled.
 134     _task         = task;
 135     _not_finished = num_workers;
 136 
 137     // Dispatch 'num_workers' number of tasks.
 138     _start_semaphore->signal(num_workers);

 139 
 140     // Wait for the last worker to signal the coordinator.
 141     _end_semaphore->wait();

 142 
 143     // No workers are allowed to read the state variables after the coordinator has been signaled.
 144     _task         = NULL;
 145     _started      = 0;
 146     _not_finished = 0;
 147   }
 148 
 149   WorkData worker_wait_for_task() {
 150     // Wait for the coordinator to dispatch a task.
 151     _start_semaphore->wait();
 152 
 153     uint num_started = (uint) Atomic::add(1, (volatile jint*)&_started);
 154 
 155     // Subtract one to get a zero-indexed worker id.
 156     uint worker_id = num_started - 1;
 157 
 158     return WorkData(_task, worker_id);
 159   }
 160 
 161   void worker_done_with_task() {
 162     // Mark that the worker is done with the task.
 163     // The worker is not allowed to read the state variables after this line.
 164     uint not_finished = (uint) Atomic::add(-1, (volatile jint*)&_not_finished);
 165 
 166     // The last worker signals to the coordinator that all work is completed.
 167     if (not_finished == 0) {
 168       _end_semaphore->signal();
 169     }
 170   }
 171 };
 172 
 173 class MutexGangTaskDispatcher : public GangTaskDispatcher {
 174   AbstractGangTask* _task;
 175 
 176   volatile uint _started;
 177   volatile uint _finished;
 178   volatile uint _num_workers;
 179 
 180   Monitor* _monitor;
 181 
 182  public:
 183   MutexGangTaskDispatcher()
 184       : _task(NULL),
 185         _monitor(new Monitor(Monitor::leaf, "WorkGang dispatcher lock", false, Monitor::_safepoint_check_never)),
 186         _started(0),
 187         _finished(0),
 188         _num_workers(0) {}
 189 
 190   ~MutexGangTaskDispatcher() {
 191     delete _monitor;
 192   }
 193 
 194   void coordinator_execute_on_workers(AbstractGangTask* task, uint num_workers) {
 195     MutexLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
 196 
 197     _task        = task;
 198     _num_workers = num_workers;
 199 

 200     // Tell the workers to get to work.
 201     _monitor->notify_all();
 202 
 203     // Wait for them to finish.
 204     while (_finished < _num_workers) {
 205       _monitor->wait(/* no_safepoint_check */ true);




 206     }
 207 
 208     _task        = NULL;
 209     _num_workers = 0;
 210     _started     = 0;
 211     _finished    = 0;



 212   }

 213 
 214   WorkData worker_wait_for_task() {
 215     MonitorLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
 216 
 217     while (_num_workers == 0 || _started == _num_workers) {
 218       _monitor->wait(/* no_safepoint_check */ true);
 219     }
 220 
 221     _started++;
 222 
 223     // Subtract one to get a zero-indexed worker id.
 224     uint worker_id = _started - 1;
 225 
 226     return WorkData(_task, worker_id);
 227   }
 228 
 229   void worker_done_with_task() {
 230     MonitorLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
 231 
 232     _finished++;
 233 
 234     if (_finished == _num_workers) {
 235       // This will wake up all workers and not only the coordinator.
 236       _monitor->notify_all();
 237     }
 238   }
 239 };
 240 
 241 static GangTaskDispatcher* create_dispatcher() {
 242   if (UseSemaphoreGCThreadsSynchronization) {
 243     return new SemaphoreGangTaskDispatcher();
 244   }
 245 
 246   return new MutexGangTaskDispatcher();
 247 }
 248 
 249 WorkGang::WorkGang(const char* name,
 250                    uint  workers,
 251                    bool  are_GC_task_threads,
 252                    bool  are_ConcurrentGC_threads) :
 253     AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads),
 254     _dispatcher(create_dispatcher())
 255 { }
 256 
 257 AbstractGangWorker* WorkGang::allocate_worker(uint worker_id) {
 258   return new GangWorker(this, worker_id);
 259 }
 260 
 261 void WorkGang::run_task(AbstractGangTask* task) {
 262   _dispatcher->coordinator_execute_on_workers(task, active_workers());
 263 }
 264 
 265 AbstractGangWorker::AbstractGangWorker(AbstractWorkGang* gang, uint id) {
 266   _gang = gang;
 267   set_id(id);
 268   set_name("%s#%d", gang->name(), id);
 269 }
 270 
 271 void AbstractGangWorker::run() {
 272   initialize();
 273   loop();
 274 }
 275 
 276 void AbstractGangWorker::initialize() {
 277   this->initialize_thread_local_storage();
 278   this->record_stack_base_and_size();
 279   this->initialize_named_thread();
 280   assert(_gang != NULL, "No gang to run in");
 281   os::set_priority(this, NearMaxPriority);
 282   if (TraceWorkGang) {
 283     tty->print_cr("Running gang worker for gang %s id %u",


 286   // The VM thread should not execute here because MutexLocker's are used
 287   // as (opposed to MutexLockerEx's).
 288   assert(!Thread::current()->is_VM_thread(), "VM thread should not be part"
 289          " of a work gang");
 290 }
 291 
 292 bool AbstractGangWorker::is_GC_task_thread() const {
 293   return gang()->are_GC_task_threads();
 294 }
 295 
 296 bool AbstractGangWorker::is_ConcurrentGC_thread() const {
 297   return gang()->are_ConcurrentGC_threads();
 298 }
 299 
 300 void AbstractGangWorker::print_on(outputStream* st) const {
 301   st->print("\"%s\" ", name());
 302   Thread::print_on(st);
 303   st->cr();
 304 }
 305 
 306 WorkData GangWorker::wait_for_task() {
 307   return gang()->dispatcher()->worker_wait_for_task();
 308 }
 309 
 310 void GangWorker::signal_task_done() {
 311   gang()->dispatcher()->worker_done_with_task();
 312 }
 313 
 314 void GangWorker::print_task_started(WorkData data) {













































 315   if (TraceWorkGang) {
 316     tty->print_cr("Running work gang %s task %s worker %u", name(), data._task->name(), data._worker_id);

 317   }
 318 }
 319 
 320 void GangWorker::print_task_done(WorkData data) {
 321   if (TraceWorkGang) {
 322     tty->print_cr("\nFinished work gang %s task %s worker %u", name(), data._task->name(), data._worker_id);
 323     Thread* me = Thread::current();
 324     tty->print_cr("  T: " PTR_FORMAT "  VM_thread: %d", p2i(me), me->is_VM_thread());






 325   }
 326 }
 327 
 328 void GangWorker::run_task(WorkData data) {
 329   print_task_started(data);
 330 
 331   data._task->work(data._worker_id);
 332 
 333   print_task_done(data);
 334 }
 335 
 336 void GangWorker::loop() {
 337   while (true) {
 338     WorkData data = wait_for_task();
 339 
 340     run_task(data);
 341 
 342     signal_task_done();
 343   }
 344 }
 345 
 346 // *** WorkGangBarrierSync
 347 
 348 WorkGangBarrierSync::WorkGangBarrierSync()
 349   : _monitor(Mutex::safepoint, "work gang barrier sync", true,
 350              Monitor::_safepoint_check_never),
 351     _n_workers(0), _n_completed(0), _should_reset(false), _aborted(false) {
 352 }
 353 
 354 WorkGangBarrierSync::WorkGangBarrierSync(uint n_workers, const char* name)
 355   : _monitor(Mutex::safepoint, name, true, Monitor::_safepoint_check_never),
 356     _n_workers(n_workers), _n_completed(0), _should_reset(false), _aborted(false) {
 357 }
 358 
 359 void WorkGangBarrierSync::set_n_workers(uint n_workers) {
 360   _n_workers    = n_workers;
 361   _n_completed  = 0;
 362   _should_reset = false;


< prev index next >