1 /*
   2  * Copyright (c) 2001, 2007, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.
   8  *
   9  * This code is distributed in the hope that it will be useful, but WITHOUT
  10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  12  * version 2 for more details (a copy is included in the LICENSE file that
  13  * accompanied this code).
  14  *
  15  * You should have received a copy of the GNU General Public License version
  16  * 2 along with this work; if not, write to the Free Software Foundation,
  17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  18  *
  19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  20  * or visit www.oracle.com if you need additional information or have any
  21  * questions.
  22  *
  23  */
  24 
  25 # include "incls/_precompiled.incl"
  26 # include "incls/_workgroup.cpp.incl"
  27 
  28 // Definitions of WorkGang methods.
  29 
  30 AbstractWorkGang::AbstractWorkGang(const char* name,
  31                                    bool  are_GC_task_threads,
  32                                    bool  are_ConcurrentGC_threads) :
  33   _name(name),
  34   _are_GC_task_threads(are_GC_task_threads),
  35   _are_ConcurrentGC_threads(are_ConcurrentGC_threads) {
  36 
  37   assert(!(are_GC_task_threads && are_ConcurrentGC_threads),
  38          "They cannot both be STW GC and Concurrent threads" );
  39 
  40   // Other initialization.
  41   _monitor = new Monitor(/* priority */       Mutex::leaf,
  42                          /* name */           "WorkGroup monitor",
  43                          /* allow_vm_block */ are_GC_task_threads);
  44   assert(monitor() != NULL, "Failed to allocate monitor");
  45   _terminate = false;
  46   _task = NULL;
  47   _sequence_number = 0;
  48   _started_workers = 0;
  49   _finished_workers = 0;
  50 }
  51 
  52 WorkGang::WorkGang(const char* name,
  53                    int         workers,
  54                    bool        are_GC_task_threads,
  55                    bool        are_ConcurrentGC_threads) :
  56   AbstractWorkGang(name, are_GC_task_threads, are_ConcurrentGC_threads)
  57 {
  58   // Save arguments.
  59   _total_workers = workers;
  60 
  61   if (TraceWorkGang) {
  62     tty->print_cr("Constructing work gang %s with %d threads", name, workers);
  63   }
  64   _gang_workers = NEW_C_HEAP_ARRAY(GangWorker*, workers);
  65   if (gang_workers() == NULL) {
  66     vm_exit_out_of_memory(0, "Cannot create GangWorker array.");
  67   }
  68   for (int worker = 0; worker < total_workers(); worker += 1) {
  69     GangWorker* new_worker = new GangWorker(this, worker);
  70     assert(new_worker != NULL, "Failed to allocate GangWorker");
  71     _gang_workers[worker] = new_worker;
  72     if (new_worker == NULL || !os::create_thread(new_worker, os::pgc_thread))
  73       vm_exit_out_of_memory(0, "Cannot create worker GC thread. Out of system resources.");
  74     if (!DisableStartThread) {
  75       os::start_thread(new_worker);
  76     }
  77   }
  78 }
  79 
  80 AbstractWorkGang::~AbstractWorkGang() {
  81   if (TraceWorkGang) {
  82     tty->print_cr("Destructing work gang %s", name());
  83   }
  84   stop();   // stop all the workers
  85   for (int worker = 0; worker < total_workers(); worker += 1) {
  86     delete gang_worker(worker);
  87   }
  88   delete gang_workers();
  89   delete monitor();
  90 }
  91 
  92 GangWorker* AbstractWorkGang::gang_worker(int i) const {
  93   // Array index bounds checking.
  94   GangWorker* result = NULL;
  95   assert(gang_workers() != NULL, "No workers for indexing");
  96   assert(((i >= 0) && (i < total_workers())), "Worker index out of bounds");
  97   result = _gang_workers[i];
  98   assert(result != NULL, "Indexing to null worker");
  99   return result;
 100 }
 101 
 102 void WorkGang::run_task(AbstractGangTask* task) {
 103   // This thread is executed by the VM thread which does not block
 104   // on ordinary MutexLocker's.
 105   MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
 106   if (TraceWorkGang) {
 107     tty->print_cr("Running work gang %s task %s", name(), task->name());
 108   }
 109   // Tell all the workers to run a task.
 110   assert(task != NULL, "Running a null task");
 111   // Initialize.
 112   _task = task;
 113   _sequence_number += 1;
 114   _started_workers = 0;
 115   _finished_workers = 0;
 116   // Tell the workers to get to work.
 117   monitor()->notify_all();
 118   // Wait for them to be finished
 119   while (finished_workers() < total_workers()) {
 120     if (TraceWorkGang) {
 121       tty->print_cr("Waiting in work gang %s: %d/%d finished sequence %d",
 122                     name(), finished_workers(), total_workers(),
 123                     _sequence_number);
 124     }
 125     monitor()->wait(/* no_safepoint_check */ true);
 126   }
 127   _task = NULL;
 128   if (TraceWorkGang) {
 129     tty->print_cr("/nFinished work gang %s: %d/%d sequence %d",
 130                   name(), finished_workers(), total_workers(),
 131                   _sequence_number);
 132     }
 133 }
 134 
 135 void AbstractWorkGang::stop() {
 136   // Tell all workers to terminate, then wait for them to become inactive.
 137   MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
 138   if (TraceWorkGang) {
 139     tty->print_cr("Stopping work gang %s task %s", name(), task()->name());
 140   }
 141   _task = NULL;
 142   _terminate = true;
 143   monitor()->notify_all();
 144   while (finished_workers() < total_workers()) {
 145     if (TraceWorkGang) {
 146       tty->print_cr("Waiting in work gang %s: %d/%d finished",
 147                     name(), finished_workers(), total_workers());
 148     }
 149     monitor()->wait(/* no_safepoint_check */ true);
 150   }
 151 }
 152 
 153 void AbstractWorkGang::internal_worker_poll(WorkData* data) const {
 154   assert(monitor()->owned_by_self(), "worker_poll is an internal method");
 155   assert(data != NULL, "worker data is null");
 156   data->set_terminate(terminate());
 157   data->set_task(task());
 158   data->set_sequence_number(sequence_number());
 159 }
 160 
 161 void AbstractWorkGang::internal_note_start() {
 162   assert(monitor()->owned_by_self(), "note_finish is an internal method");
 163   _started_workers += 1;
 164 }
 165 
 166 void AbstractWorkGang::internal_note_finish() {
 167   assert(monitor()->owned_by_self(), "note_finish is an internal method");
 168   _finished_workers += 1;
 169 }
 170 
 171 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const {
 172   uint    num_thr = total_workers();
 173   for (uint i = 0; i < num_thr; i++) {
 174     gang_worker(i)->print_on(st);
 175     st->cr();
 176   }
 177 }
 178 
 179 void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
 180   assert(tc != NULL, "Null ThreadClosure");
 181   uint num_thr = total_workers();
 182   for (uint i = 0; i < num_thr; i++) {
 183     tc->do_thread(gang_worker(i));
 184   }
 185 }
 186 
 187 // GangWorker methods.
 188 
 189 GangWorker::GangWorker(AbstractWorkGang* gang, uint id) {
 190   _gang = gang;
 191   set_id(id);
 192   set_name("Gang worker#%d (%s)", id, gang->name());
 193 }
 194 
 195 void GangWorker::run() {
 196   initialize();
 197   loop();
 198 }
 199 
 200 void GangWorker::initialize() {
 201   this->initialize_thread_local_storage();
 202   assert(_gang != NULL, "No gang to run in");
 203   os::set_priority(this, NearMaxPriority);
 204   if (TraceWorkGang) {
 205     tty->print_cr("Running gang worker for gang %s id %d",
 206                   gang()->name(), id());
 207   }
 208   // The VM thread should not execute here because MutexLocker's are used
 209   // as (opposed to MutexLockerEx's).
 210   assert(!Thread::current()->is_VM_thread(), "VM thread should not be part"
 211          " of a work gang");
 212 }
 213 
 214 void GangWorker::loop() {
 215   int previous_sequence_number = 0;
 216   Monitor* gang_monitor = gang()->monitor();
 217   for ( ; /* !terminate() */; ) {
 218     WorkData data;
 219     int part;  // Initialized below.
 220     {
 221       // Grab the gang mutex.
 222       MutexLocker ml(gang_monitor);
 223       // Wait for something to do.
 224       // Polling outside the while { wait } avoids missed notifies
 225       // in the outer loop.
 226       gang()->internal_worker_poll(&data);
 227       if (TraceWorkGang) {
 228         tty->print("Polled outside for work in gang %s worker %d",
 229                    gang()->name(), id());
 230         tty->print("  terminate: %s",
 231                    data.terminate() ? "true" : "false");
 232         tty->print("  sequence: %d (prev: %d)",
 233                    data.sequence_number(), previous_sequence_number);
 234         if (data.task() != NULL) {
 235           tty->print("  task: %s", data.task()->name());
 236         } else {
 237           tty->print("  task: NULL");
 238         }
 239         tty->cr();
 240       }
 241       for ( ; /* break or return */; ) {
 242         // Terminate if requested.
 243         if (data.terminate()) {
 244           gang()->internal_note_finish();
 245           gang_monitor->notify_all();
 246           return;
 247         }
 248         // Check for new work.
 249         if ((data.task() != NULL) &&
 250             (data.sequence_number() != previous_sequence_number)) {
 251           gang()->internal_note_start();
 252           gang_monitor->notify_all();
 253           part = gang()->started_workers() - 1;
 254           break;
 255         }
 256         // Nothing to do.
 257         gang_monitor->wait(/* no_safepoint_check */ true);
 258         gang()->internal_worker_poll(&data);
 259         if (TraceWorkGang) {
 260           tty->print("Polled inside for work in gang %s worker %d",
 261                      gang()->name(), id());
 262           tty->print("  terminate: %s",
 263                      data.terminate() ? "true" : "false");
 264           tty->print("  sequence: %d (prev: %d)",
 265                      data.sequence_number(), previous_sequence_number);
 266           if (data.task() != NULL) {
 267             tty->print("  task: %s", data.task()->name());
 268           } else {
 269             tty->print("  task: NULL");
 270           }
 271           tty->cr();
 272         }
 273       }
 274       // Drop gang mutex.
 275     }
 276     if (TraceWorkGang) {
 277       tty->print("Work for work gang %s id %d task %s part %d",
 278                  gang()->name(), id(), data.task()->name(), part);
 279     }
 280     assert(data.task() != NULL, "Got null task");
 281     data.task()->work(part);
 282     {
 283       if (TraceWorkGang) {
 284         tty->print("Finish for work gang %s id %d task %s part %d",
 285                    gang()->name(), id(), data.task()->name(), part);
 286       }
 287       // Grab the gang mutex.
 288       MutexLocker ml(gang_monitor);
 289       gang()->internal_note_finish();
 290       // Tell the gang you are done.
 291       gang_monitor->notify_all();
 292       // Drop the gang mutex.
 293     }
 294     previous_sequence_number = data.sequence_number();
 295   }
 296 }
 297 
 298 bool GangWorker::is_GC_task_thread() const {
 299   return gang()->are_GC_task_threads();
 300 }
 301 
 302 bool GangWorker::is_ConcurrentGC_thread() const {
 303   return gang()->are_ConcurrentGC_threads();
 304 }
 305 
 306 void GangWorker::print_on(outputStream* st) const {
 307   st->print("\"%s\" ", name());
 308   Thread::print_on(st);
 309   st->cr();
 310 }
 311 
 312 // Printing methods
 313 
 314 const char* AbstractWorkGang::name() const {
 315   return _name;
 316 }
 317 
 318 #ifndef PRODUCT
 319 
 320 const char* AbstractGangTask::name() const {
 321   return _name;
 322 }
 323 
 324 #endif /* PRODUCT */
 325 
 326 // *** WorkGangBarrierSync
 327 
 328 WorkGangBarrierSync::WorkGangBarrierSync()
 329   : _monitor(Mutex::safepoint, "work gang barrier sync", true),
 330     _n_workers(0), _n_completed(0), _should_reset(false) {
 331 }
 332 
 333 WorkGangBarrierSync::WorkGangBarrierSync(int n_workers, const char* name)
 334   : _monitor(Mutex::safepoint, name, true),
 335     _n_workers(n_workers), _n_completed(0), _should_reset(false) {
 336 }
 337 
 338 void WorkGangBarrierSync::set_n_workers(int n_workers) {
 339   _n_workers   = n_workers;
 340   _n_completed = 0;
 341   _should_reset = false;
 342 }
 343 
 344 void WorkGangBarrierSync::enter() {
 345   MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag);
 346   if (should_reset()) {
 347     // The should_reset() was set and we are the first worker to enter
 348     // the sync barrier. We will zero the n_completed() count which
 349     // effectively resets the barrier.
 350     zero_completed();
 351     set_should_reset(false);
 352   }
 353   inc_completed();
 354   if (n_completed() == n_workers()) {
 355     // At this point we would like to reset the barrier to be ready in
 356     // case it is used again. However, we cannot set n_completed() to
 357     // 0, even after the notify_all(), given that some other workers
 358     // might still be waiting for n_completed() to become ==
 359     // n_workers(). So, if we set n_completed() to 0, those workers
 360     // will get stuck (as they will wake up, see that n_completed() !=
 361     // n_workers() and go back to sleep). Instead, we raise the
 362     // should_reset() flag and the barrier will be reset the first
 363     // time a worker enters it again.
 364     set_should_reset(true);
 365     monitor()->notify_all();
 366   } else {
 367     while (n_completed() != n_workers()) {
 368       monitor()->wait(/* no_safepoint_check */ true);
 369     }
 370   }
 371 }
 372 
 373 // SubTasksDone functions.
 374 
 375 SubTasksDone::SubTasksDone(int n) :
 376   _n_tasks(n), _n_threads(1), _tasks(NULL) {
 377   _tasks = NEW_C_HEAP_ARRAY(jint, n);
 378   guarantee(_tasks != NULL, "alloc failure");
 379   clear();
 380 }
 381 
 382 bool SubTasksDone::valid() {
 383   return _tasks != NULL;
 384 }
 385 
 386 void SubTasksDone::set_par_threads(int t) {
 387 #ifdef ASSERT
 388   assert(_claimed == 0 || _threads_completed == _n_threads,
 389          "should not be called while tasks are being processed!");
 390 #endif
 391   _n_threads = (t == 0 ? 1 : t);
 392 }
 393 
 394 void SubTasksDone::clear() {
 395   for (int i = 0; i < _n_tasks; i++) {
 396     _tasks[i] = 0;
 397   }
 398   _threads_completed = 0;
 399 #ifdef ASSERT
 400   _claimed = 0;
 401 #endif
 402 }
 403 
 404 bool SubTasksDone::is_task_claimed(int t) {
 405   assert(0 <= t && t < _n_tasks, "bad task id.");
 406   jint old = _tasks[t];
 407   if (old == 0) {
 408     old = Atomic::cmpxchg(1, &_tasks[t], 0);
 409   }
 410   assert(_tasks[t] == 1, "What else?");
 411   bool res = old != 0;
 412 #ifdef ASSERT
 413   if (!res) {
 414     assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?");
 415     Atomic::inc(&_claimed);
 416   }
 417 #endif
 418   return res;
 419 }
 420 
 421 void SubTasksDone::all_tasks_completed() {
 422   jint observed = _threads_completed;
 423   jint old;
 424   do {
 425     old = observed;
 426     observed = Atomic::cmpxchg(old+1, &_threads_completed, old);
 427   } while (observed != old);
 428   // If this was the last thread checking in, clear the tasks.
 429   if (observed+1 == _n_threads) clear();
 430 }
 431 
 432 
 433 SubTasksDone::~SubTasksDone() {
 434   if (_tasks != NULL) FREE_C_HEAP_ARRAY(jint, _tasks);
 435 }
 436 
 437 // *** SequentialSubTasksDone
 438 
 439 void SequentialSubTasksDone::clear() {
 440   _n_tasks   = _n_claimed   = 0;
 441   _n_threads = _n_completed = 0;
 442 }
 443 
 444 bool SequentialSubTasksDone::valid() {
 445   return _n_threads > 0;
 446 }
 447 
 448 bool SequentialSubTasksDone::is_task_claimed(int& t) {
 449   jint* n_claimed_ptr = &_n_claimed;
 450   t = *n_claimed_ptr;
 451   while (t < _n_tasks) {
 452     jint res = Atomic::cmpxchg(t+1, n_claimed_ptr, t);
 453     if (res == t) {
 454       return false;
 455     }
 456     t = *n_claimed_ptr;
 457   }
 458   return true;
 459 }
 460 
 461 bool SequentialSubTasksDone::all_tasks_completed() {
 462   jint* n_completed_ptr = &_n_completed;
 463   jint  complete        = *n_completed_ptr;
 464   while (true) {
 465     jint res = Atomic::cmpxchg(complete+1, n_completed_ptr, complete);
 466     if (res == complete) {
 467       break;
 468     }
 469     complete = res;
 470   }
 471   if (complete+1 == _n_threads) {
 472     clear();
 473     return true;
 474   }
 475   return false;
 476 }
 477 
 478 bool FreeIdSet::_stat_init = false;
 479 FreeIdSet* FreeIdSet::_sets[NSets];
 480 bool FreeIdSet::_safepoint;
 481 
 482 FreeIdSet::FreeIdSet(int sz, Monitor* mon) :
 483   _sz(sz), _mon(mon), _hd(0), _waiters(0), _index(-1), _claimed(0)
 484 {
 485   _ids = new int[sz];
 486   for (int i = 0; i < sz; i++) _ids[i] = i+1;
 487   _ids[sz-1] = end_of_list; // end of list.
 488   if (_stat_init) {
 489     for (int j = 0; j < NSets; j++) _sets[j] = NULL;
 490     _stat_init = true;
 491   }
 492   // Add to sets.  (This should happen while the system is still single-threaded.)
 493   for (int j = 0; j < NSets; j++) {
 494     if (_sets[j] == NULL) {
 495       _sets[j] = this;
 496       _index = j;
 497       break;
 498     }
 499   }
 500   guarantee(_index != -1, "Too many FreeIdSets in use!");
 501 }
 502 
 503 FreeIdSet::~FreeIdSet() {
 504   _sets[_index] = NULL;
 505 }
 506 
 507 void FreeIdSet::set_safepoint(bool b) {
 508   _safepoint = b;
 509   if (b) {
 510     for (int j = 0; j < NSets; j++) {
 511       if (_sets[j] != NULL && _sets[j]->_waiters > 0) {
 512         Monitor* mon = _sets[j]->_mon;
 513         mon->lock_without_safepoint_check();
 514         mon->notify_all();
 515         mon->unlock();
 516       }
 517     }
 518   }
 519 }
 520 
 521 #define FID_STATS 0
 522 
 523 int FreeIdSet::claim_par_id() {
 524 #if FID_STATS
 525   thread_t tslf = thr_self();
 526   tty->print("claim_par_id[%d]: sz = %d, claimed = %d\n", tslf, _sz, _claimed);
 527 #endif
 528   MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
 529   while (!_safepoint && _hd == end_of_list) {
 530     _waiters++;
 531 #if FID_STATS
 532     if (_waiters > 5) {
 533       tty->print("claim_par_id waiting[%d]: %d waiters, %d claimed.\n",
 534                  tslf, _waiters, _claimed);
 535     }
 536 #endif
 537     _mon->wait(Mutex::_no_safepoint_check_flag);
 538     _waiters--;
 539   }
 540   if (_hd == end_of_list) {
 541 #if FID_STATS
 542     tty->print("claim_par_id[%d]: returning EOL.\n", tslf);
 543 #endif
 544     return -1;
 545   } else {
 546     int res = _hd;
 547     _hd = _ids[res];
 548     _ids[res] = claimed;  // For debugging.
 549     _claimed++;
 550 #if FID_STATS
 551     tty->print("claim_par_id[%d]: returning %d, claimed = %d.\n",
 552                tslf, res, _claimed);
 553 #endif
 554     return res;
 555   }
 556 }
 557 
 558 bool FreeIdSet::claim_perm_id(int i) {
 559   assert(0 <= i && i < _sz, "Out of range.");
 560   MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
 561   int prev = end_of_list;
 562   int cur = _hd;
 563   while (cur != end_of_list) {
 564     if (cur == i) {
 565       if (prev == end_of_list) {
 566         _hd = _ids[cur];
 567       } else {
 568         _ids[prev] = _ids[cur];
 569       }
 570       _ids[cur] = claimed;
 571       _claimed++;
 572       return true;
 573     } else {
 574       prev = cur;
 575       cur = _ids[cur];
 576     }
 577   }
 578   return false;
 579 
 580 }
 581 
 582 void FreeIdSet::release_par_id(int id) {
 583   MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag);
 584   assert(_ids[id] == claimed, "Precondition.");
 585   _ids[id] = _hd;
 586   _hd = id;
 587   _claimed--;
 588 #if FID_STATS
 589   tty->print("[%d] release_par_id(%d), waiters =%d,  claimed = %d.\n",
 590              thr_self(), id, _waiters, _claimed);
 591 #endif
 592   if (_waiters > 0)
 593     // Notify all would be safer, but this is OK, right?
 594     _mon->notify_all();
 595 }