136 _task = task; 137 _not_finished = num_workers; 138 139 // Dispatch 'num_workers' number of tasks. 140 _start_semaphore->signal(num_workers); 141 142 // Wait for the last worker to signal the coordinator. 143 _end_semaphore->wait(); 144 145 // No workers are allowed to read the state variables after the coordinator has been signaled. 146 assert(_not_finished == 0, "%d not finished workers?", _not_finished); 147 _task = NULL; 148 _started = 0; 149 150 } 151 152 WorkData worker_wait_for_task() { 153 // Wait for the coordinator to dispatch a task. 154 _start_semaphore->wait(); 155 156 uint num_started = Atomic::add(1u, &_started); 157 158 // Subtract one to get a zero-indexed worker id. 159 uint worker_id = num_started - 1; 160 161 return WorkData(_task, worker_id); 162 } 163 164 void worker_done_with_task() { 165 // Mark that the worker is done with the task. 166 // The worker is not allowed to read the state variables after this line. 167 uint not_finished = Atomic::sub(1u, &_not_finished); 168 169 // The last worker signals to the coordinator that all work is completed. 170 if (not_finished == 0) { 171 _end_semaphore->signal(); 172 } 173 } 174 }; 175 176 class MutexGangTaskDispatcher : public GangTaskDispatcher { 177 AbstractGangTask* _task; 178 179 volatile uint _started; 180 volatile uint _finished; 181 volatile uint _num_workers; 182 183 Monitor* _monitor; 184 185 public: 186 MutexGangTaskDispatcher() : 187 _task(NULL), 409 } 410 411 bool SubTasksDone::valid() { 412 return _tasks != NULL; 413 } 414 415 void SubTasksDone::clear() { 416 for (uint i = 0; i < _n_tasks; i++) { 417 _tasks[i] = 0; 418 } 419 _threads_completed = 0; 420 #ifdef ASSERT 421 _claimed = 0; 422 #endif 423 } 424 425 bool SubTasksDone::try_claim_task(uint t) { 426 assert(t < _n_tasks, "bad task id."); 427 uint old = _tasks[t]; 428 if (old == 0) { 429 old = Atomic::cmpxchg(1u, &_tasks[t], 0u); 430 } 431 bool res = old == 0; 432 #ifdef ASSERT 433 if (res) { 434 assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?"); 435 Atomic::inc(&_claimed); 436 } 437 #endif 438 return res; 439 } 440 441 void SubTasksDone::all_tasks_completed(uint n_threads) { 442 uint observed = _threads_completed; 443 uint old; 444 do { 445 old = observed; 446 observed = Atomic::cmpxchg(old+1, &_threads_completed, old); 447 } while (observed != old); 448 // If this was the last thread checking in, clear the tasks. 449 uint adjusted_thread_count = (n_threads == 0 ? 1 : n_threads); 450 if (observed + 1 == adjusted_thread_count) { 451 clear(); 452 } 453 } 454 455 456 SubTasksDone::~SubTasksDone() { 457 FREE_C_HEAP_ARRAY(uint, _tasks); 458 } 459 460 // *** SequentialSubTasksDone 461 462 void SequentialSubTasksDone::clear() { 463 _n_tasks = _n_claimed = 0; 464 _n_threads = _n_completed = 0; 465 } 466 467 bool SequentialSubTasksDone::valid() { 468 return _n_threads > 0; 469 } 470 471 bool SequentialSubTasksDone::try_claim_task(uint& t) { 472 t = _n_claimed; 473 while (t < _n_tasks) { 474 uint res = Atomic::cmpxchg(t+1, &_n_claimed, t); 475 if (res == t) { 476 return true; 477 } 478 t = res; 479 } 480 return false; 481 } 482 483 bool SequentialSubTasksDone::all_tasks_completed() { 484 uint complete = _n_completed; 485 while (true) { 486 uint res = Atomic::cmpxchg(complete+1, &_n_completed, complete); 487 if (res == complete) { 488 break; 489 } 490 complete = res; 491 } 492 if (complete+1 == _n_threads) { 493 clear(); 494 return true; 495 } 496 return false; 497 } | 136 _task = task; 137 _not_finished = num_workers; 138 139 // Dispatch 'num_workers' number of tasks. 140 _start_semaphore->signal(num_workers); 141 142 // Wait for the last worker to signal the coordinator. 143 _end_semaphore->wait(); 144 145 // No workers are allowed to read the state variables after the coordinator has been signaled. 146 assert(_not_finished == 0, "%d not finished workers?", _not_finished); 147 _task = NULL; 148 _started = 0; 149 150 } 151 152 WorkData worker_wait_for_task() { 153 // Wait for the coordinator to dispatch a task. 154 _start_semaphore->wait(); 155 156 uint num_started = Atomic::add(&_started, 1u); 157 158 // Subtract one to get a zero-indexed worker id. 159 uint worker_id = num_started - 1; 160 161 return WorkData(_task, worker_id); 162 } 163 164 void worker_done_with_task() { 165 // Mark that the worker is done with the task. 166 // The worker is not allowed to read the state variables after this line. 167 uint not_finished = Atomic::sub(&_not_finished, 1u); 168 169 // The last worker signals to the coordinator that all work is completed. 170 if (not_finished == 0) { 171 _end_semaphore->signal(); 172 } 173 } 174 }; 175 176 class MutexGangTaskDispatcher : public GangTaskDispatcher { 177 AbstractGangTask* _task; 178 179 volatile uint _started; 180 volatile uint _finished; 181 volatile uint _num_workers; 182 183 Monitor* _monitor; 184 185 public: 186 MutexGangTaskDispatcher() : 187 _task(NULL), 409 } 410 411 bool SubTasksDone::valid() { 412 return _tasks != NULL; 413 } 414 415 void SubTasksDone::clear() { 416 for (uint i = 0; i < _n_tasks; i++) { 417 _tasks[i] = 0; 418 } 419 _threads_completed = 0; 420 #ifdef ASSERT 421 _claimed = 0; 422 #endif 423 } 424 425 bool SubTasksDone::try_claim_task(uint t) { 426 assert(t < _n_tasks, "bad task id."); 427 uint old = _tasks[t]; 428 if (old == 0) { 429 old = Atomic::cmpxchg(&_tasks[t], 0u, 1u); 430 } 431 bool res = old == 0; 432 #ifdef ASSERT 433 if (res) { 434 assert(_claimed < _n_tasks, "Too many tasks claimed; missing clear?"); 435 Atomic::inc(&_claimed); 436 } 437 #endif 438 return res; 439 } 440 441 void SubTasksDone::all_tasks_completed(uint n_threads) { 442 uint observed = _threads_completed; 443 uint old; 444 do { 445 old = observed; 446 observed = Atomic::cmpxchg(&_threads_completed, old, old+1); 447 } while (observed != old); 448 // If this was the last thread checking in, clear the tasks. 449 uint adjusted_thread_count = (n_threads == 0 ? 1 : n_threads); 450 if (observed + 1 == adjusted_thread_count) { 451 clear(); 452 } 453 } 454 455 456 SubTasksDone::~SubTasksDone() { 457 FREE_C_HEAP_ARRAY(uint, _tasks); 458 } 459 460 // *** SequentialSubTasksDone 461 462 void SequentialSubTasksDone::clear() { 463 _n_tasks = _n_claimed = 0; 464 _n_threads = _n_completed = 0; 465 } 466 467 bool SequentialSubTasksDone::valid() { 468 return _n_threads > 0; 469 } 470 471 bool SequentialSubTasksDone::try_claim_task(uint& t) { 472 t = _n_claimed; 473 while (t < _n_tasks) { 474 uint res = Atomic::cmpxchg(&_n_claimed, t, t+1); 475 if (res == t) { 476 return true; 477 } 478 t = res; 479 } 480 return false; 481 } 482 483 bool SequentialSubTasksDone::all_tasks_completed() { 484 uint complete = _n_completed; 485 while (true) { 486 uint res = Atomic::cmpxchg(&_n_completed, complete, complete+1); 487 if (res == complete) { 488 break; 489 } 490 complete = res; 491 } 492 if (complete+1 == _n_threads) { 493 clear(); 494 return true; 495 } 496 return false; 497 } |