1 /* 2 * Copyright (c) 2001, 2014, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 * 23 */ 24 25 #include "precompiled.hpp" 26 #include "memory/allocation.hpp" 27 #include "memory/allocation.inline.hpp" 28 #include "runtime/os.hpp" 29 #include "utilities/workgroup.hpp" 30 31 PRAGMA_FORMAT_MUTE_WARNINGS_FOR_GCC 32 33 // Definitions of WorkGang methods. 34 35 AbstractWorkGang::AbstractWorkGang(const char* name, 36 bool are_GC_task_threads, 37 bool are_ConcurrentGC_threads) : 38 _name(name), 39 _are_GC_task_threads(are_GC_task_threads), 40 _are_ConcurrentGC_threads(are_ConcurrentGC_threads) { 41 42 assert(!(are_GC_task_threads && are_ConcurrentGC_threads), 43 "They cannot both be STW GC and Concurrent threads" ); 44 45 // Other initialization. 46 _monitor = new Monitor(/* priority */ Mutex::leaf, 47 /* name */ "WorkGroup monitor", 48 /* allow_vm_block */ are_GC_task_threads); 49 assert(monitor() != NULL, "Failed to allocate monitor"); 50 _terminate = false; 51 _task = NULL; 52 _sequence_number = 0; 53 _started_workers = 0; 54 _finished_workers = 0; 55 } 56 57 WorkGang::WorkGang(const char* name, 58 uint workers, 59 bool are_GC_task_threads, 60 bool are_ConcurrentGC_threads) : 61 AbstractWorkGang(name, are_GC_task_threads, are_ConcurrentGC_threads) { 62 _total_workers = workers; 63 } 64 65 GangWorker* WorkGang::allocate_worker(uint which) { 66 GangWorker* new_worker = new GangWorker(this, which); 67 return new_worker; 68 } 69 70 // The current implementation will exit if the allocation 71 // of any worker fails. Still, return a boolean so that 72 // a future implementation can possibly do a partial 73 // initialization of the workers and report such to the 74 // caller. 75 bool WorkGang::initialize_workers() { 76 77 if (TraceWorkGang) { 78 tty->print_cr("Constructing work gang %s with %d threads", 79 name(), 80 total_workers()); 81 } 82 _gang_workers = NEW_C_HEAP_ARRAY(GangWorker*, total_workers(), mtInternal); 83 if (gang_workers() == NULL) { 84 vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, "Cannot create GangWorker array."); 85 return false; 86 } 87 os::ThreadType worker_type; 88 if (are_ConcurrentGC_threads()) { 89 worker_type = os::cgc_thread; 90 } else { 91 worker_type = os::pgc_thread; 92 } 93 for (uint worker = 0; worker < total_workers(); worker += 1) { 94 GangWorker* new_worker = allocate_worker(worker); 95 assert(new_worker != NULL, "Failed to allocate GangWorker"); 96 _gang_workers[worker] = new_worker; 97 if (new_worker == NULL || !os::create_thread(new_worker, worker_type)) { 98 vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, 99 "Cannot create worker GC thread. Out of system resources."); 100 return false; 101 } 102 if (!DisableStartThread) { 103 os::start_thread(new_worker); 104 } 105 } 106 return true; 107 } 108 109 AbstractWorkGang::~AbstractWorkGang() { 110 if (TraceWorkGang) { 111 tty->print_cr("Destructing work gang %s", name()); 112 } 113 stop(); // stop all the workers 114 for (uint worker = 0; worker < total_workers(); worker += 1) { 115 delete gang_worker(worker); 116 } 117 delete gang_workers(); 118 delete monitor(); 119 } 120 121 GangWorker* AbstractWorkGang::gang_worker(uint i) const { 122 // Array index bounds checking. 123 GangWorker* result = NULL; 124 assert(gang_workers() != NULL, "No workers for indexing"); 125 assert(((i >= 0) && (i < total_workers())), "Worker index out of bounds"); 126 result = _gang_workers[i]; 127 assert(result != NULL, "Indexing to null worker"); 128 return result; 129 } 130 131 void WorkGang::run_task(AbstractGangTask* task) { 132 run_task(task, total_workers()); 133 } 134 135 void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) { 136 task->set_for_termination(no_of_parallel_workers); 137 138 // This thread is executed by the VM thread which does not block 139 // on ordinary MutexLocker's. 140 MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); 141 if (TraceWorkGang) { 142 tty->print_cr("Running work gang %s task %s", name(), task->name()); 143 } 144 // Tell all the workers to run a task. 145 assert(task != NULL, "Running a null task"); 146 // Initialize. 147 _task = task; 148 _sequence_number += 1; 149 _started_workers = 0; 150 _finished_workers = 0; 151 // Tell the workers to get to work. 152 monitor()->notify_all(); 153 // Wait for them to be finished 154 while (finished_workers() < no_of_parallel_workers) { 155 if (TraceWorkGang) { 156 tty->print_cr("Waiting in work gang %s: %d/%d finished sequence %d", 157 name(), finished_workers(), no_of_parallel_workers, 158 _sequence_number); 159 } 160 monitor()->wait(/* no_safepoint_check */ true); 161 } 162 _task = NULL; 163 if (TraceWorkGang) { 164 tty->print_cr("\nFinished work gang %s: %d/%d sequence %d", 165 name(), finished_workers(), no_of_parallel_workers, 166 _sequence_number); 167 Thread* me = Thread::current(); 168 tty->print_cr(" T: 0x%x VM_thread: %d", me, me->is_VM_thread()); 169 } 170 } 171 172 void FlexibleWorkGang::run_task(AbstractGangTask* task) { 173 // If active_workers() is passed, _finished_workers 174 // must only be incremented for workers that find non_null 175 // work (as opposed to all those that just check that the 176 // task is not null). 177 WorkGang::run_task(task, (uint) active_workers()); 178 } 179 180 void AbstractWorkGang::stop() { 181 // Tell all workers to terminate, then wait for them to become inactive. 182 MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); 183 if (TraceWorkGang) { 184 tty->print_cr("Stopping work gang %s task %s", name(), task()->name()); 185 } 186 _task = NULL; 187 _terminate = true; 188 monitor()->notify_all(); 189 while (finished_workers() < active_workers()) { 190 if (TraceWorkGang) { 191 tty->print_cr("Waiting in work gang %s: %d/%d finished", 192 name(), finished_workers(), active_workers()); 193 } 194 monitor()->wait(/* no_safepoint_check */ true); 195 } 196 } 197 198 void AbstractWorkGang::internal_worker_poll(WorkData* data) const { 199 assert(monitor()->owned_by_self(), "worker_poll is an internal method"); 200 assert(data != NULL, "worker data is null"); 201 data->set_terminate(terminate()); 202 data->set_task(task()); 203 data->set_sequence_number(sequence_number()); 204 } 205 206 void AbstractWorkGang::internal_note_start() { 207 assert(monitor()->owned_by_self(), "note_finish is an internal method"); 208 _started_workers += 1; 209 } 210 211 void AbstractWorkGang::internal_note_finish() { 212 assert(monitor()->owned_by_self(), "note_finish is an internal method"); 213 _finished_workers += 1; 214 } 215 216 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const { 217 uint num_thr = total_workers(); 218 for (uint i = 0; i < num_thr; i++) { 219 gang_worker(i)->print_on(st); 220 st->cr(); 221 } 222 } 223 224 void AbstractWorkGang::threads_do(ThreadClosure* tc) const { 225 assert(tc != NULL, "Null ThreadClosure"); 226 uint num_thr = total_workers(); 227 for (uint i = 0; i < num_thr; i++) { 228 tc->do_thread(gang_worker(i)); 229 } 230 } 231 232 // GangWorker methods. 233 234 GangWorker::GangWorker(AbstractWorkGang* gang, uint id) { 235 _gang = gang; 236 set_id(id); 237 set_name("Gang worker#%d (%s)", id, gang->name()); 238 } 239 240 void GangWorker::run() { 241 initialize(); 242 loop(); 243 } 244 245 void GangWorker::initialize() { 246 this->initialize_thread_local_storage(); 247 this->record_stack_base_and_size(); 248 assert(_gang != NULL, "No gang to run in"); 249 os::set_priority(this, NearMaxPriority); 250 if (TraceWorkGang) { 251 tty->print_cr("Running gang worker for gang %s id %d", 252 gang()->name(), id()); 253 } 254 // The VM thread should not execute here because MutexLocker's are used 255 // as (opposed to MutexLockerEx's). 256 assert(!Thread::current()->is_VM_thread(), "VM thread should not be part" 257 " of a work gang"); 258 } 259 260 void GangWorker::loop() { 261 int previous_sequence_number = 0; 262 Monitor* gang_monitor = gang()->monitor(); 263 for ( ; /* !terminate() */; ) { 264 WorkData data; 265 int part; // Initialized below. 266 { 267 // Grab the gang mutex. 268 MutexLocker ml(gang_monitor); 269 // Wait for something to do. 270 // Polling outside the while { wait } avoids missed notifies 271 // in the outer loop. 272 gang()->internal_worker_poll(&data); 273 if (TraceWorkGang) { 274 tty->print("Polled outside for work in gang %s worker %d", 275 gang()->name(), id()); 276 tty->print(" terminate: %s", 277 data.terminate() ? "true" : "false"); 278 tty->print(" sequence: %d (prev: %d)", 279 data.sequence_number(), previous_sequence_number); 280 if (data.task() != NULL) { 281 tty->print(" task: %s", data.task()->name()); 282 } else { 283 tty->print(" task: NULL"); 284 } 285 tty->cr(); 286 } 287 for ( ; /* break or return */; ) { 288 // Terminate if requested. 289 if (data.terminate()) { 290 gang()->internal_note_finish(); 291 gang_monitor->notify_all(); 292 return; 293 } 294 // Check for new work. 295 if ((data.task() != NULL) && 296 (data.sequence_number() != previous_sequence_number)) { 297 if (gang()->needs_more_workers()) { 298 gang()->internal_note_start(); 299 gang_monitor->notify_all(); 300 part = gang()->started_workers() - 1; 301 break; 302 } 303 } 304 // Nothing to do. 305 gang_monitor->wait(/* no_safepoint_check */ true); 306 gang()->internal_worker_poll(&data); 307 if (TraceWorkGang) { 308 tty->print("Polled inside for work in gang %s worker %d", 309 gang()->name(), id()); 310 tty->print(" terminate: %s", 311 data.terminate() ? "true" : "false"); 312 tty->print(" sequence: %d (prev: %d)", 313 data.sequence_number(), previous_sequence_number); 314 if (data.task() != NULL) { 315 tty->print(" task: %s", data.task()->name()); 316 } else { 317 tty->print(" task: NULL"); 318 } 319 tty->cr(); 320 } 321 } 322 // Drop gang mutex. 323 } 324 if (TraceWorkGang) { 325 tty->print("Work for work gang %s id %d task %s part %d", 326 gang()->name(), id(), data.task()->name(), part); 327 } 328 assert(data.task() != NULL, "Got null task"); 329 data.task()->work(part); 330 { 331 if (TraceWorkGang) { 332 tty->print("Finish for work gang %s id %d task %s part %d", 333 gang()->name(), id(), data.task()->name(), part); 334 } 335 // Grab the gang mutex. 336 MutexLocker ml(gang_monitor); 337 gang()->internal_note_finish(); 338 // Tell the gang you are done. 339 gang_monitor->notify_all(); 340 // Drop the gang mutex. 341 } 342 previous_sequence_number = data.sequence_number(); 343 } 344 } 345 346 bool GangWorker::is_GC_task_thread() const { 347 return gang()->are_GC_task_threads(); 348 } 349 350 bool GangWorker::is_ConcurrentGC_thread() const { 351 return gang()->are_ConcurrentGC_threads(); 352 } 353 354 void GangWorker::print_on(outputStream* st) const { 355 st->print("\"%s\" ", name()); 356 Thread::print_on(st); 357 st->cr(); 358 } 359 360 // Printing methods 361 362 const char* AbstractWorkGang::name() const { 363 return _name; 364 } 365 366 #ifndef PRODUCT 367 368 const char* AbstractGangTask::name() const { 369 return _name; 370 } 371 372 #endif /* PRODUCT */ 373 374 // FlexibleWorkGang 375 376 377 // *** WorkGangBarrierSync 378 379 WorkGangBarrierSync::WorkGangBarrierSync() 380 : _monitor(Mutex::safepoint, "work gang barrier sync", true), 381 _n_workers(0), _n_completed(0), _should_reset(false), _aborted(false) { 382 } 383 384 WorkGangBarrierSync::WorkGangBarrierSync(uint n_workers, const char* name) 385 : _monitor(Mutex::safepoint, name, true), 386 _n_workers(n_workers), _n_completed(0), _should_reset(false), _aborted(false) { 387 } 388 389 void WorkGangBarrierSync::set_n_workers(uint n_workers) { 390 _n_workers = n_workers; 391 _n_completed = 0; 392 _should_reset = false; 393 _aborted = false; 394 } 395 396 bool WorkGangBarrierSync::enter() { 397 MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag); 398 if (should_reset()) { 399 // The should_reset() was set and we are the first worker to enter 400 // the sync barrier. We will zero the n_completed() count which 401 // effectively resets the barrier. 402 zero_completed(); 403 set_should_reset(false); 404 } 405 inc_completed(); 406 if (n_completed() == n_workers()) { 407 // At this point we would like to reset the barrier to be ready in 408 // case it is used again. However, we cannot set n_completed() to 409 // 0, even after the notify_all(), given that some other workers 410 // might still be waiting for n_completed() to become == 411 // n_workers(). So, if we set n_completed() to 0, those workers 412 // will get stuck (as they will wake up, see that n_completed() != 413 // n_workers() and go back to sleep). Instead, we raise the 414 // should_reset() flag and the barrier will be reset the first 415 // time a worker enters it again. 416 set_should_reset(true); 417 monitor()->notify_all(); 418 } else { 419 while (n_completed() != n_workers() && !aborted()) { 420 monitor()->wait(/* no_safepoint_check */ true); 421 } 422 } 423 return !aborted(); 424 } 425 426 void WorkGangBarrierSync::abort() { 427 MutexLockerEx x(monitor(), Mutex::_no_safepoint_check_flag); 428 set_aborted(); 429 monitor()->notify_all(); 430 } 431 432 // SubTasksDone functions. 433 434 SubTasksDone::SubTasksDone(uint n) : 435 _n_tasks(n), _n_threads(1), _tasks(NULL) { 436 _tasks = NEW_C_HEAP_ARRAY(uint, n, mtInternal); 437 guarantee(_tasks != NULL, "alloc failure"); 438 clear(); 439 } 440 441 bool SubTasksDone::valid() { 442 return _tasks != NULL; 443 } 444 445 void SubTasksDone::set_n_threads(uint t) { 446 assert(_claimed == 0 || _threads_completed == _n_threads, 447 "should not be called while tasks are being processed!"); 448 _n_threads = (t == 0 ? 1 : t); 449 } 450 451 void SubTasksDone::clear() { 452 for (uint i = 0; i < _n_tasks; i++) { 453 _tasks[i] = 0; 454 } 455 _threads_completed = 0; 456 #ifdef ASSERT 457 _claimed = 0; 458 #endif 459 } 460 461 bool SubTasksDone::is_task_claimed(uint t) { 462 assert(0 <= t && t < _n_tasks, "bad task id."); 463 uint old = _tasks[t]; 464 if (old == 0) { 465 old = Atomic::cmpxchg(1, &_tasks[t], 0); 466 } 467 assert(_tasks[t] == 1, "What else?"); 468 bool res = old != 0; 469 #ifdef ASSERT 470 if (!res) { 471 assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?"); 472 Atomic::inc((volatile jint*) &_claimed); 473 } 474 #endif 475 return res; 476 } 477 478 void SubTasksDone::all_tasks_completed() { 479 jint observed = _threads_completed; 480 jint old; 481 do { 482 old = observed; 483 observed = Atomic::cmpxchg(old+1, &_threads_completed, old); 484 } while (observed != old); 485 // If this was the last thread checking in, clear the tasks. 486 if (observed+1 == (jint)_n_threads) clear(); 487 } 488 489 490 SubTasksDone::~SubTasksDone() { 491 if (_tasks != NULL) FREE_C_HEAP_ARRAY(jint, _tasks, mtInternal); 492 } 493 494 // *** SequentialSubTasksDone 495 496 void SequentialSubTasksDone::clear() { 497 _n_tasks = _n_claimed = 0; 498 _n_threads = _n_completed = 0; 499 } 500 501 bool SequentialSubTasksDone::valid() { 502 return _n_threads > 0; 503 } 504 505 bool SequentialSubTasksDone::is_task_claimed(uint& t) { 506 uint* n_claimed_ptr = &_n_claimed; 507 t = *n_claimed_ptr; 508 while (t < _n_tasks) { 509 jint res = Atomic::cmpxchg(t+1, n_claimed_ptr, t); 510 if (res == (jint)t) { 511 return false; 512 } 513 t = *n_claimed_ptr; 514 } 515 return true; 516 } 517 518 bool SequentialSubTasksDone::all_tasks_completed() { 519 uint* n_completed_ptr = &_n_completed; 520 uint complete = *n_completed_ptr; 521 while (true) { 522 uint res = Atomic::cmpxchg(complete+1, n_completed_ptr, complete); 523 if (res == complete) { 524 break; 525 } 526 complete = res; 527 } 528 if (complete+1 == _n_threads) { 529 clear(); 530 return true; 531 } 532 return false; 533 } 534 535 bool FreeIdSet::_stat_init = false; 536 FreeIdSet* FreeIdSet::_sets[NSets]; 537 bool FreeIdSet::_safepoint; 538 539 FreeIdSet::FreeIdSet(int sz, Monitor* mon) : 540 _sz(sz), _mon(mon), _hd(0), _waiters(0), _index(-1), _claimed(0) 541 { 542 _ids = NEW_C_HEAP_ARRAY(int, sz, mtInternal); 543 for (int i = 0; i < sz; i++) _ids[i] = i+1; 544 _ids[sz-1] = end_of_list; // end of list. 545 if (_stat_init) { 546 for (int j = 0; j < NSets; j++) _sets[j] = NULL; 547 _stat_init = true; 548 } 549 // Add to sets. (This should happen while the system is still single-threaded.) 550 for (int j = 0; j < NSets; j++) { 551 if (_sets[j] == NULL) { 552 _sets[j] = this; 553 _index = j; 554 break; 555 } 556 } 557 guarantee(_index != -1, "Too many FreeIdSets in use!"); 558 } 559 560 FreeIdSet::~FreeIdSet() { 561 _sets[_index] = NULL; 562 FREE_C_HEAP_ARRAY(int, _ids, mtInternal); 563 } 564 565 void FreeIdSet::set_safepoint(bool b) { 566 _safepoint = b; 567 if (b) { 568 for (int j = 0; j < NSets; j++) { 569 if (_sets[j] != NULL && _sets[j]->_waiters > 0) { 570 Monitor* mon = _sets[j]->_mon; 571 mon->lock_without_safepoint_check(); 572 mon->notify_all(); 573 mon->unlock(); 574 } 575 } 576 } 577 } 578 579 #define FID_STATS 0 580 581 int FreeIdSet::claim_par_id() { 582 #if FID_STATS 583 thread_t tslf = thr_self(); 584 tty->print("claim_par_id[%d]: sz = %d, claimed = %d\n", tslf, _sz, _claimed); 585 #endif 586 MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag); 587 while (!_safepoint && _hd == end_of_list) { 588 _waiters++; 589 #if FID_STATS 590 if (_waiters > 5) { 591 tty->print("claim_par_id waiting[%d]: %d waiters, %d claimed.\n", 592 tslf, _waiters, _claimed); 593 } 594 #endif 595 _mon->wait(Mutex::_no_safepoint_check_flag); 596 _waiters--; 597 } 598 if (_hd == end_of_list) { 599 #if FID_STATS 600 tty->print("claim_par_id[%d]: returning EOL.\n", tslf); 601 #endif 602 return -1; 603 } else { 604 int res = _hd; 605 _hd = _ids[res]; 606 _ids[res] = claimed; // For debugging. 607 _claimed++; 608 #if FID_STATS 609 tty->print("claim_par_id[%d]: returning %d, claimed = %d.\n", 610 tslf, res, _claimed); 611 #endif 612 return res; 613 } 614 } 615 616 bool FreeIdSet::claim_perm_id(int i) { 617 assert(0 <= i && i < _sz, "Out of range."); 618 MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag); 619 int prev = end_of_list; 620 int cur = _hd; 621 while (cur != end_of_list) { 622 if (cur == i) { 623 if (prev == end_of_list) { 624 _hd = _ids[cur]; 625 } else { 626 _ids[prev] = _ids[cur]; 627 } 628 _ids[cur] = claimed; 629 _claimed++; 630 return true; 631 } else { 632 prev = cur; 633 cur = _ids[cur]; 634 } 635 } 636 return false; 637 638 } 639 640 void FreeIdSet::release_par_id(int id) { 641 MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag); 642 assert(_ids[id] == claimed, "Precondition."); 643 _ids[id] = _hd; 644 _hd = id; 645 _claimed--; 646 #if FID_STATS 647 tty->print("[%d] release_par_id(%d), waiters =%d, claimed = %d.\n", 648 thr_self(), id, _waiters, _claimed); 649 #endif 650 if (_waiters > 0) 651 // Notify all would be safer, but this is OK, right? 652 _mon->notify_all(); 653 }