237 if (_finished == _num_workers) { 238 // This will wake up all workers and not only the coordinator. 239 _monitor->notify_all(); 240 } 241 } 242 }; 243 244 static GangTaskDispatcher* create_dispatcher(uint workers) { 245 #if IMPLEMENTS_SEMAPHORE_CLASS 246 if (UseSemaphoreGCThreadsSynchronization) { 247 return new SemaphoreGangTaskDispatcher(workers); 248 } 249 #endif 250 251 return new MutexGangTaskDispatcher(); 252 } 253 254 WorkGang::WorkGang(const char* name, 255 uint workers, 256 bool are_GC_task_threads, 257 bool are_ConcurrentGC_threads) : 258 AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads), 259 _dispatcher(create_dispatcher(workers)) 260 { } 261 262 AbstractGangWorker* WorkGang::allocate_worker(uint worker_id) { 263 return new GangWorker(this, worker_id); 264 } 265 266 void WorkGang::run_task(AbstractGangTask* task) { 267 _dispatcher->coordinator_execute_on_workers(task, active_workers()); 268 } 269 270 AbstractGangWorker::AbstractGangWorker(AbstractWorkGang* gang, uint id) { 271 _gang = gang; 272 set_id(id); 273 set_name("%s#%d", gang->name(), id); 274 } 275 276 void AbstractGangWorker::run() { 277 initialize(); 278 loop(); 279 } 606 } 607 } 608 return false; 609 610 } 611 612 void FreeIdSet::release_par_id(int id) { 613 MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag); 614 assert(_ids[id] == claimed, "Precondition."); 615 _ids[id] = _hd; 616 _hd = id; 617 _claimed--; 618 #if FID_STATS 619 tty->print("[%d] release_par_id(%d), waiters =%d, claimed = %d.\n", 620 thr_self(), id, _waiters, _claimed); 621 #endif 622 if (_waiters > 0) 623 // Notify all would be safer, but this is OK, right? 624 _mon->notify_all(); 625 } | 237 if (_finished == _num_workers) { 238 // This will wake up all workers and not only the coordinator. 239 _monitor->notify_all(); 240 } 241 } 242 }; 243 244 static GangTaskDispatcher* create_dispatcher(uint workers) { 245 #if IMPLEMENTS_SEMAPHORE_CLASS 246 if (UseSemaphoreGCThreadsSynchronization) { 247 return new SemaphoreGangTaskDispatcher(workers); 248 } 249 #endif 250 251 return new MutexGangTaskDispatcher(); 252 } 253 254 WorkGang::WorkGang(const char* name, 255 uint workers, 256 bool are_GC_task_threads, 257 bool are_ConcurrentGC_threads, 258 GangTaskDispatcher* dispatcher) : 259 AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads), 260 _dispatcher(dispatcher != NULL ? dispatcher: create_dispatcher(workers)) 261 { } 262 263 AbstractGangWorker* WorkGang::allocate_worker(uint worker_id) { 264 return new GangWorker(this, worker_id); 265 } 266 267 void WorkGang::run_task(AbstractGangTask* task) { 268 _dispatcher->coordinator_execute_on_workers(task, active_workers()); 269 } 270 271 AbstractGangWorker::AbstractGangWorker(AbstractWorkGang* gang, uint id) { 272 _gang = gang; 273 set_id(id); 274 set_name("%s#%d", gang->name(), id); 275 } 276 277 void AbstractGangWorker::run() { 278 initialize(); 279 loop(); 280 } 607 } 608 } 609 return false; 610 611 } 612 613 void FreeIdSet::release_par_id(int id) { 614 MutexLockerEx x(_mon, Mutex::_no_safepoint_check_flag); 615 assert(_ids[id] == claimed, "Precondition."); 616 _ids[id] = _hd; 617 _hd = id; 618 _claimed--; 619 #if FID_STATS 620 tty->print("[%d] release_par_id(%d), waiters =%d, claimed = %d.\n", 621 thr_self(), id, _waiters, _claimed); 622 #endif 623 if (_waiters > 0) 624 // Notify all would be safer, but this is OK, right? 625 _mon->notify_all(); 626 } 627 628 /////////////// Unit tests /////////////// 629 630 #ifndef PRODUCT 631 632 class CountTask : public AbstractGangTask { 633 volatile jint _count; 634 bool _sleep; 635 public: 636 CountTask(bool sleep) : AbstractGangTask("CountTask"), _sleep(sleep), _count(0) {} 637 virtual void work(uint worker_id) { 638 if (_sleep) { 639 // Sleep a while, to delay the task to allow the coordinator to run. 640 os::sleep(Thread::current(), 100, false); 641 } 642 643 // Count number of times executed. 644 Atomic::inc(&_count); 645 } 646 uint count() { return _count; } 647 }; 648 649 650 static void test_workgang_dispatch(bool use_semaphore, 651 uint total_workers, 652 uint active_workers, 653 bool sleep) { 654 655 GangTaskDispatcher* dispatcher = use_semaphore 656 ? (GangTaskDispatcher*) new SemGangTaskDispatcher(total_workers) 657 : (GangTaskDispatcher*) new MutexGangTaskDispatcher(); 658 659 // Intentionally leaking WorkGang, since there's no support to delete WorkGangs. 660 WorkGang* gang = new WorkGang("Test WorkGang", total_workers, false, false, dispatcher); 661 gang->initialize_workers(); 662 663 gang->set_active_workers(active_workers); 664 665 CountTask task(sleep); 666 667 gang->run_task(&task); 668 669 uint task_count = task.count(); 670 671 assert(task_count == active_workers, err_msg("Expected count: %u got: %u", active_workers, task_count)); 672 } 673 674 static void test_workgang_dispatch(bool use_semaphore) { 675 const bool sleep = true; 676 const uint total_workers = 8; 677 for (uint active_workers = 1; active_workers <= 4; active_workers++) { 678 test_workgang_dispatch(use_semaphore, total_workers, active_workers, sleep); 679 test_workgang_dispatch(use_semaphore, total_workers, active_workers, !sleep); 680 } 681 } 682 683 static void test_workgang_dispatch_mutex() { 684 test_workgang_dispatch(false); 685 } 686 687 static void test_workgang_dispatch_semaphore() { 688 test_workgang_dispatch(true); 689 } 690 691 void test_workgang() { 692 // Needed to use dynamic number of active workers. 693 FlagSetting fs(UseDynamicNumberOfGCThreads, true); 694 695 test_workgang_dispatch_semaphore(); 696 test_workgang_dispatch_mutex(); 697 } 698 699 #endif // PRODUCT |