1 /* 2 * Copyright (c) 2001, 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 * 23 */ 24 25 #include "precompiled.hpp" 26 #include "gc/shared/workgroup.hpp" 27 #include "memory/allocation.hpp" 28 #include "memory/allocation.inline.hpp" 29 #include "runtime/atomic.inline.hpp" 30 #include "runtime/os.hpp" 31 32 // Definitions of WorkGang methods. 33 34 AbstractWorkGang::AbstractWorkGang(const char* name, 35 bool are_GC_task_threads, 36 bool are_ConcurrentGC_threads) : 37 _name(name), 38 _are_GC_task_threads(are_GC_task_threads), 39 _are_ConcurrentGC_threads(are_ConcurrentGC_threads) { 40 41 assert(!(are_GC_task_threads && are_ConcurrentGC_threads), 42 "They cannot both be STW GC and Concurrent threads" ); 43 44 // Other initialization. 45 _monitor = new Monitor(/* priority */ Mutex::leaf, 46 /* name */ "WorkGroup monitor", 47 /* allow_vm_block */ are_GC_task_threads, 48 Monitor::_safepoint_check_sometimes); 49 assert(monitor() != NULL, "Failed to allocate monitor"); 50 _terminate = false; 51 _task = NULL; 52 _sequence_number = 0; 53 _started_workers = 0; 54 _finished_workers = 0; 55 } 56 57 WorkGang::WorkGang(const char* name, 58 uint workers, 59 bool are_GC_task_threads, 60 bool are_ConcurrentGC_threads) : 61 AbstractWorkGang(name, are_GC_task_threads, are_ConcurrentGC_threads) { 62 _total_workers = workers; 63 } 64 65 GangWorker* WorkGang::allocate_worker(uint which) { 66 GangWorker* new_worker = new GangWorker(this, which); 67 return new_worker; 68 } 69 70 // The current implementation will exit if the allocation 71 // of any worker fails. Still, return a boolean so that 72 // a future implementation can possibly do a partial 73 // initialization of the workers and report such to the 74 // caller. 75 bool WorkGang::initialize_workers() { 76 77 if (TraceWorkGang) { 78 tty->print_cr("Constructing work gang %s with %d threads", 79 name(), 80 total_workers()); 81 } 82 _gang_workers = NEW_C_HEAP_ARRAY(GangWorker*, total_workers(), mtInternal); 83 if (gang_workers() == NULL) { 84 vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, "Cannot create GangWorker array."); 85 return false; 86 } 87 os::ThreadType worker_type; 88 if (are_ConcurrentGC_threads()) { 89 worker_type = os::cgc_thread; 90 } else { 91 worker_type = os::pgc_thread; 92 } 93 for (uint worker = 0; worker < total_workers(); worker += 1) { 94 GangWorker* new_worker = allocate_worker(worker); 95 assert(new_worker != NULL, "Failed to allocate GangWorker"); 96 _gang_workers[worker] = new_worker; 97 if (new_worker == NULL || !os::create_thread(new_worker, worker_type)) { 98 vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, 99 "Cannot create worker GC thread. Out of system resources."); 100 return false; 101 } 102 if (!DisableStartThread) { 103 os::start_thread(new_worker); 104 } 105 } 106 return true; 107 } 108 109 AbstractWorkGang::~AbstractWorkGang() { 110 if (TraceWorkGang) { 111 tty->print_cr("Destructing work gang %s", name()); 112 } 113 stop(); // stop all the workers 114 for (uint worker = 0; worker < total_workers(); worker += 1) { 115 delete gang_worker(worker); 116 } 117 delete gang_workers(); 118 delete monitor(); 119 } 120 121 GangWorker* AbstractWorkGang::gang_worker(uint i) const { 122 // Array index bounds checking. 123 GangWorker* result = NULL; 124 assert(gang_workers() != NULL, "No workers for indexing"); 125 assert(i < total_workers(), "Worker index out of bounds"); 126 result = _gang_workers[i]; 127 assert(result != NULL, "Indexing to null worker"); 128 return result; 129 } 130 131 void WorkGang::run_task(AbstractGangTask* task) { 132 run_task(task, total_workers()); 133 } 134 135 void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) { 136 task->set_for_termination(no_of_parallel_workers); 137 138 // This thread is executed by the VM thread which does not block 139 // on ordinary MutexLocker's. 140 MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); 141 if (TraceWorkGang) { 142 tty->print_cr("Running work gang %s task %s", name(), task->name()); 143 } 144 // Tell all the workers to run a task. 145 assert(task != NULL, "Running a null task"); 146 // Initialize. 147 _task = task; 148 _sequence_number += 1; 149 _started_workers = 0; 150 _finished_workers = 0; 151 // Tell the workers to get to work. 152 monitor()->notify_all(); 153 // Wait for them to be finished 154 while (finished_workers() < no_of_parallel_workers) { 155 if (TraceWorkGang) { 156 tty->print_cr("Waiting in work gang %s: %u/%u finished sequence %d", 157 name(), finished_workers(), no_of_parallel_workers, 158 _sequence_number); 159 } 160 monitor()->wait(/* no_safepoint_check */ true); 161 } 162 _task = NULL; 163 if (TraceWorkGang) { 164 tty->print_cr("\nFinished work gang %s: %u/%u sequence %d", 165 name(), finished_workers(), no_of_parallel_workers, 166 _sequence_number); 167 Thread* me = Thread::current(); 168 tty->print_cr(" T: " PTR_FORMAT " VM_thread: %d", p2i(me), me->is_VM_thread()); 169 } 170 } 171 172 void FlexibleWorkGang::run_task(AbstractGangTask* task) { 173 // If active_workers() is passed, _finished_workers 174 // must only be incremented for workers that find non_null 175 // work (as opposed to all those that just check that the 176 // task is not null). 177 WorkGang::run_task(task, (uint) active_workers()); 178 } 179 180 void AbstractWorkGang::stop() { 181 // Tell all workers to terminate, then wait for them to become inactive. 182 MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); 183 if (TraceWorkGang) { 184 tty->print_cr("Stopping work gang %s task %s", name(), task()->name()); 185 } 186 _task = NULL; 187 _terminate = true; 188 monitor()->notify_all(); 189 while (finished_workers() < active_workers()) { 190 if (TraceWorkGang) { 191 tty->print_cr("Waiting in work gang %s: %u/%u finished", 192 name(), finished_workers(), active_workers()); 193 } 194 monitor()->wait(/* no_safepoint_check */ true); 195 } 196 } 197 198 void AbstractWorkGang::internal_worker_poll(WorkData* data) const { 199 assert(monitor()->owned_by_self(), "worker_poll is an internal method"); 200 assert(data != NULL, "worker data is null"); 201 data->set_terminate(terminate()); 202 data->set_task(task()); 203 data->set_sequence_number(sequence_number()); 204 } 205 206 void AbstractWorkGang::internal_note_start() { 207 assert(monitor()->owned_by_self(), "note_finish is an internal method"); 208 _started_workers += 1; 209 } 210 211 void AbstractWorkGang::internal_note_finish() { 212 assert(monitor()->owned_by_self(), "note_finish is an internal method"); 213 _finished_workers += 1; 214 } 215 216 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const { 217 uint num_thr = total_workers(); 218 for (uint i = 0; i < num_thr; i++) { 219 gang_worker(i)->print_on(st); 220 st->cr(); 221 } 222 } 223 224 void AbstractWorkGang::threads_do(ThreadClosure* tc) const { 225 assert(tc != NULL, "Null ThreadClosure"); 226 uint num_thr = total_workers(); 227 for (uint i = 0; i < num_thr; i++) { 228 tc->do_thread(gang_worker(i)); 229 } 230 } 231 232 // GangWorker methods. 233 234 GangWorker::GangWorker(AbstractWorkGang* gang, uint id) { 235 _gang = gang; 236 set_id(id); 237 set_name("%s#%d", gang->name(), id); 238 } 239 240 void GangWorker::run() { 241 initialize(); 242 loop(); 243 } 244 245 void GangWorker::initialize() { 246 this->initialize_thread_local_storage(); 247 this->record_stack_base_and_size(); 248 this->initialize_named_thread(); 249 assert(_gang != NULL, "No gang to run in"); 250 os::set_priority(this, NearMaxPriority); 251 if (TraceWorkGang) { 252 tty->print_cr("Running gang worker for gang %s id %u", 253 gang()->name(), id()); 254 } 255 // The VM thread should not execute here because MutexLocker's are used 256 // as (opposed to MutexLockerEx's). 257 assert(!Thread::current()->is_VM_thread(), "VM thread should not be part" 258 " of a work gang"); 259 } 260 261 void GangWorker::loop() { 262 int previous_sequence_number = 0; 263 Monitor* gang_monitor = gang()->monitor(); 264 for ( ; /* !terminate() */; ) { 265 WorkData data; 266 int part; // Initialized below. 267 { 268 // Grab the gang mutex. 269 MutexLocker ml(gang_monitor); 270 // Wait for something to do. 271 // Polling outside the while { wait } avoids missed notifies 272 // in the outer loop. 273 gang()->internal_worker_poll(&data); 274 if (TraceWorkGang) { 275 tty->print("Polled outside for work in gang %s worker %u", 276 gang()->name(), id()); 277 tty->print(" terminate: %s", 278 data.terminate() ? "true" : "false"); 279 tty->print(" sequence: %d (prev: %d)", 280 data.sequence_number(), previous_sequence_number); 281 if (data.task() != NULL) { 282 tty->print(" task: %s", data.task()->name()); 283 } else { 284 tty->print(" task: NULL"); 285 } 286 tty->cr(); 287 } 288 for ( ; /* break or return */; ) { 289 // Terminate if requested. 290 if (data.terminate()) { 291 gang()->internal_note_finish(); 292 gang_monitor->notify_all(); 293 return; 294 } 295 // Check for new work. 296 if ((data.task() != NULL) && 297 (data.sequence_number() != previous_sequence_number)) { 298 if (gang()->needs_more_workers()) { 299 gang()->internal_note_start(); 300 gang_monitor->notify_all(); 301 part = gang()->started_workers() - 1; 302 break; 303 } 304 } 305 // Nothing to do. 306 gang_monitor->wait(/* no_safepoint_check */ true); 307 gang()->internal_worker_poll(&data); 308 if (TraceWorkGang) { 309 tty->print("Polled inside for work in gang %s worker %u", 310 gang()->name(), id()); 311 tty->print(" terminate: %s", 312 data.terminate() ? "true" : "false"); 313 tty->print(" sequence: %d (prev: %d)", 314 data.sequence_number(), previous_sequence_number); 315 if (data.task() != NULL) { 316 tty->print(" task: %s", data.task()->name()); 317 } else { 318 tty->print(" task: NULL"); 319 } 320 tty->cr(); 321 } 322 } 323 // Drop gang mutex. 324 } 325 if (TraceWorkGang) { 326 tty->print("Work for work gang %s id %u task %s part %d", 327 gang()->name(), id(), data.task()->name(), part); 328 } 329 assert(data.task() != NULL, "Got null task"); 330 data.task()->work(part); 331 { 332 if (TraceWorkGang) { 333 tty->print("Finish for work gang %s id %u task %s part %d", 334 gang()->name(), id(), data.task()->name(), part); 335 } 336 // Grab the gang mutex. 337 MutexLocker ml(gang_monitor); 338 gang()->internal_note_finish(); 339 // Tell the gang you are done. 340 gang_monitor->notify_all(); 341 // Drop the gang mutex. 342 } 343 previous_sequence_number = data.sequence_number(); 344 } 345 } 346 347 bool GangWorker::is_GC_task_thread() const { 348 return gang()->are_GC_task_threads(); 349 } 350 351 bool GangWorker::is_ConcurrentGC_thread() const { 352 return gang()->are_ConcurrentGC_threads(); 353 } 354 355 void GangWorker::print_on(outputStream* st) const { 356 st->print("\"%s\" ", name()); 357 Thread::print_on(st); 358 st->cr(); 359 } 360 361 // Printing methods 362 363 const char* AbstractWorkGang::name() const { 364 return _name; 365 } 366 367 #ifndef PRODUCT 368 369 const char* AbstractGangTask::name() const { 370 return _name; 371 } 372 373 #endif /* PRODUCT */ 374 375 // FlexibleWorkGang 376 377 378 // *** WorkGangBarrierSync 379 380 WorkGangBarrierSync::WorkGangBarrierSync() 381 : _monitor(Mutex::safepoint, "work gang barrier sync", true, 382 Monitor::_safepoint_check_never), 383 _n_workers(0), _n_completed(0), _should_reset(false), _aborted(false) { 384 } 385 386 WorkGangBarrierSync::WorkGangBarrierSync(uint n_workers, const char* name) 387 : _monitor(Mutex::safepoint, name, true, Monitor::_safepoint_check_never), 388 _n_workers(n_workers), _n_completed(0), _should_reset(false), _aborted(false) { 389 } 390 391 void WorkGangBarrierSync::set_n_workers(uint n_workers) { 392 _n_workers = n_workers; 393 _n_completed = 0; 394 _should_reset = false; 395 _aborted = false; 396 } 397 398 bool WorkGangBarrierSync::enter() { 399 MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag); 400 if (should_reset()) { 401 // The should_reset() was set and we are the first worker to enter 402 // the sync barrier. We will zero the n_completed() count which 403 // effectively resets the barrier. 404 zero_completed(); 405 set_should_reset(false); 406 } 407 inc_completed(); 408 if (n_completed() == n_workers()) { 409 // At this point we would like to reset the barrier to be ready in 410 // case it is used again. However, we cannot set n_completed() to 411 // 0, even after the notify_all(), given that some other workers 412 // might still be waiting for n_completed() to become == 413 // n_workers(). So, if we set n_completed() to 0, those workers 414 // will get stuck (as they will wake up, see that n_completed() != 415 // n_workers() and go back to sleep). Instead, we raise the 416 // should_reset() flag and the barrier will be reset the first 417 // time a worker enters it again. 418 set_should_reset(true); 419 monitor()->notify_all(); 420 } else { 421 while (n_completed() != n_workers() && !aborted()) { 422 monitor()->wait(/* no_safepoint_check */ true); 423 } 424 } 425 return !aborted(); 426 } 427 428 void WorkGangBarrierSync::abort() { 429 MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag); 430 set_aborted(); 431 monitor()->notify_all(); 432 } 433 434 // SubTasksDone functions. 435 436 SubTasksDone::SubTasksDone(uint n) : 437 _n_tasks(n), _tasks(NULL) { 438 _tasks = NEW_C_HEAP_ARRAY(uint, n, mtInternal); 439 guarantee(_tasks != NULL, "alloc failure"); 440 clear(); 441 } 442 443 bool SubTasksDone::valid() { 444 return _tasks != NULL; 445 } 446 447 void SubTasksDone::clear() { 448 for (uint i = 0; i < _n_tasks; i++) { 449 _tasks[i] = 0; 450 } 451 _threads_completed = 0; 452 #ifdef ASSERT 453 _claimed = 0; 454 #endif 455 } 456 457 bool SubTasksDone::is_task_claimed(uint t) { 458 assert(t < _n_tasks, "bad task id."); 459 uint old = _tasks[t]; 460 if (old == 0) { 461 old = Atomic::cmpxchg(1, &_tasks[t], 0); 462 } 463 assert(_tasks[t] == 1, "What else?"); 464 bool res = old != 0; 465 #ifdef ASSERT 466 if (!res) { 467 assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?"); 468 Atomic::inc((volatile jint*) &_claimed); 469 } 470 #endif 471 return res; 472 } 473 474 void SubTasksDone::all_tasks_completed(uint n_threads) { 475 jint observed = _threads_completed; 476 jint old; 477 do { 478 old = observed; 479 observed = Atomic::cmpxchg(old+1, &_threads_completed, old); 480 } while (observed != old); 481 // If this was the last thread checking in, clear the tasks. 482 uint adjusted_thread_count = (n_threads == 0 ? 1 : n_threads); 483 if (observed + 1 == (jint)adjusted_thread_count) { 484 clear(); 485 } 486 } 487 488 489 SubTasksDone::~SubTasksDone() { 490 if (_tasks != NULL) FREE_C_HEAP_ARRAY(jint, _tasks); 491 } 492 493 // *** SequentialSubTasksDone 494 495 void SequentialSubTasksDone::clear() { 496 _n_tasks = _n_claimed = 0; 497 _n_threads = _n_completed = 0; 498 } 499 500 bool SequentialSubTasksDone::valid() { 501 return _n_threads > 0; 502 } 503 504 bool SequentialSubTasksDone::is_task_claimed(uint& t) { 505 uint* n_claimed_ptr = &_n_claimed; 506 t = *n_claimed_ptr; 507 while (t < _n_tasks) { 508 jint res = Atomic::cmpxchg(t+1, n_claimed_ptr, t); 509 if (res == (jint)t) { 510 return false; 511 } 512 t = *n_claimed_ptr; 513 } 514 return true; 515 } 516 517 bool SequentialSubTasksDone::all_tasks_completed() { 518 uint* n_completed_ptr = &_n_completed; 519 uint complete = *n_completed_ptr; 520 while (true) { 521 uint res = Atomic::cmpxchg(complete+1, n_completed_ptr, complete); 522 if (res == complete) { 523 break; 524 } 525 complete = res; 526 } 527 if (complete+1 == _n_threads) { 528 clear(); 529 return true; 530 } 531 return false; 532 } 533 534 bool FreeIdSet::_stat_init = false; 535 FreeIdSet* FreeIdSet::_sets[NSets]; 536 bool FreeIdSet::_safepoint; 537 538 FreeIdSet::FreeIdSet(int sz, Monitor* mon) : 539 _sz(sz), _mon(mon), _hd(0), _waiters(0), _index(-1), _claimed(0) 540 { 541 _ids = NEW_C_HEAP_ARRAY(int, sz, mtInternal); 542 for (int i = 0; i < sz; i++) _ids[i] = i+1; 543 _ids[sz-1] = end_of_list; // end of list. 544 if (_stat_init) { 545 for (int j = 0; j < NSets; j++) _sets[j] = NULL; 546 _stat_init = true; 547 } 548 // Add to sets. (This should happen while the system is still single-threaded.) 549 for (int j = 0; j < NSets; j++) { 550 if (_sets[j] == NULL) { 551 _sets[j] = this; 552 _index = j; 553 break; 554 } 555 } 556 guarantee(_index != -1, "Too many FreeIdSets in use!"); 557 } 558 559 FreeIdSet::~FreeIdSet() { 560 _sets[_index] = NULL; 561 FREE_C_HEAP_ARRAY(int, _ids); 562 } 563 564 void FreeIdSet::set_safepoint(bool b) { 565 _safepoint = b; 566 if (b) { 567 for (int j = 0; j < NSets; j++) { 568 if (_sets[j] != NULL && _sets[j]->_waiters > 0) { 569 Monitor* mon = _sets[j]->_mon; 570 mon->lock_without_safepoint_check(); 571 mon->notify_all(); 572 mon->unlock(); 573 } 574 } 575 } 576 } 577 578 #define FID_STATS 0 579 580 int FreeIdSet::claim_par_id() { 581 #if FID_STATS 582 thread_t tslf = thr_self(); 583 tty->print("claim_par_id[%d]: sz = %d, claimed = %d\n", tslf, _sz, _claimed); 584 #endif 585 MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag); 586 while (!_safepoint && _hd == end_of_list) { 587 _waiters++; 588 #if FID_STATS 589 if (_waiters > 5) { 590 tty->print("claim_par_id waiting[%d]: %d waiters, %d claimed.\n", 591 tslf, _waiters, _claimed); 592 } 593 #endif 594 _mon->wait(Mutex::_no_safepoint_check_flag); 595 _waiters--; 596 } 597 if (_hd == end_of_list) { 598 #if FID_STATS 599 tty->print("claim_par_id[%d]: returning EOL.\n", tslf); 600 #endif 601 return -1; 602 } else { 603 int res = _hd; 604 _hd = _ids[res]; 605 _ids[res] = claimed; // For debugging. 606 _claimed++; 607 #if FID_STATS 608 tty->print("claim_par_id[%d]: returning %d, claimed = %d.\n", 609 tslf, res, _claimed); 610 #endif 611 return res; 612 } 613 } 614 615 bool FreeIdSet::claim_perm_id(int i) { 616 assert(0 <= i && i < _sz, "Out of range."); 617 MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag); 618 int prev = end_of_list; 619 int cur = _hd; 620 while (cur != end_of_list) { 621 if (cur == i) { 622 if (prev == end_of_list) { 623 _hd = _ids[cur]; 624 } else { 625 _ids[prev] = _ids[cur]; 626 } 627 _ids[cur] = claimed; 628 _claimed++; 629 return true; 630 } else { 631 prev = cur; 632 cur = _ids[cur]; 633 } 634 } 635 return false; 636 637 } 638 639 void FreeIdSet::release_par_id(int id) { 640 MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag); 641 assert(_ids[id] == claimed, "Precondition."); 642 _ids[id] = _hd; 643 _hd = id; 644 _claimed--; 645 #if FID_STATS 646 tty->print("[%d] release_par_id(%d), waiters =%d, claimed = %d.\n", 647 thr_self(), id, _waiters, _claimed); 648 #endif 649 if (_waiters > 0) 650 // Notify all would be safer, but this is OK, right? 651 _mon->notify_all(); 652 }