11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 * 23 */ 24 25 #include "precompiled.hpp" 26 #include "gc/shared/workgroup.hpp" 27 #include "memory/allocation.hpp" 28 #include "memory/allocation.inline.hpp" 29 #include "runtime/atomic.inline.hpp" 30 #include "runtime/os.hpp" 31 32 // Definitions of WorkGang methods. 33 34 // The current implementation will exit if the allocation 35 // of any worker fails. Still, return a boolean so that 36 // a future implementation can possibly do a partial 37 // initialization of the workers and report such to the 38 // caller. 39 bool AbstractWorkGang::initialize_workers() { 40 41 if (TraceWorkGang) { 42 tty->print_cr("Constructing work gang %s with %d threads", 43 name(), 44 total_workers()); 45 } 46 _workers = NEW_C_HEAP_ARRAY(AbstractGangWorker*, total_workers(), mtInternal); 47 if (_workers == NULL) { 48 vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, "Cannot create GangWorker array."); 49 return false; 50 } 79 assert(result != NULL, "Indexing to null worker"); 80 return result; 81 } 82 83 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const { 84 uint workers = total_workers(); 85 for (uint i = 0; i < workers; i++) { 86 worker(i)->print_on(st); 87 st->cr(); 88 } 89 } 90 91 void AbstractWorkGang::threads_do(ThreadClosure* tc) const { 92 assert(tc != NULL, "Null ThreadClosure"); 93 uint workers = total_workers(); 94 for (uint i = 0; i < workers; i++) { 95 tc->do_thread(worker(i)); 96 } 97 } 98 99 WorkGang::WorkGang(const char* name, 100 uint workers, 101 bool are_GC_task_threads, 102 bool are_ConcurrentGC_threads) : 103 AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads), 104 _started_workers(0), 105 _finished_workers(0), 106 _sequence_number(0), 107 _task(NULL) { 108 109 // Other initialization. 110 _monitor = new Monitor(/* priority */ Mutex::leaf, 111 /* name */ "WorkGroup monitor", 112 /* allow_vm_block */ are_GC_task_threads, 113 Monitor::_safepoint_check_sometimes); 114 115 assert(monitor() != NULL, "Failed to allocate monitor"); 116 } 117 118 AbstractGangWorker* WorkGang::allocate_worker(uint worker_id) { 119 return new GangWorker(this, worker_id); 120 } 121 122 void WorkGang::run_task(AbstractGangTask* task) { 123 run_task(task, (uint)active_workers()); 124 } 125 126 void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) { 127 // This thread is executed by the VM thread which does not block 128 // on ordinary MutexLocker's. 129 MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag); 130 if (TraceWorkGang) { 131 tty->print_cr("Running work gang %s task %s", name(), task->name()); 132 } 133 // Tell all the workers to run a task. 134 assert(task != NULL, "Running a null task"); 135 // Initialize. 136 _task = task; 137 _sequence_number += 1; 138 _started_workers = 0; 139 _finished_workers = 0; 140 // Tell the workers to get to work. 141 monitor()->notify_all(); 142 // Wait for them to be finished 143 while (finished_workers() < no_of_parallel_workers) { 144 if (TraceWorkGang) { 145 tty->print_cr("Waiting in work gang %s: %u/%u finished sequence %d", 146 name(), finished_workers(), no_of_parallel_workers, 147 _sequence_number); 148 } 149 monitor()->wait(/* no_safepoint_check */ true); 150 } 151 _task = NULL; 152 if (TraceWorkGang) { 153 tty->print_cr("\nFinished work gang %s: %u/%u sequence %d", 154 name(), finished_workers(), no_of_parallel_workers, 155 _sequence_number); 156 Thread* me = Thread::current(); 157 tty->print_cr(" T: " PTR_FORMAT " VM_thread: %d", p2i(me), me->is_VM_thread()); 158 } 159 } 160 161 void WorkGang::internal_worker_poll(WorkData* data) const { 162 assert(monitor()->owned_by_self(), "worker_poll is an internal method"); 163 assert(data != NULL, "worker data is null"); 164 data->set_task(task()); 165 data->set_sequence_number(sequence_number()); 166 } 167 168 void WorkGang::internal_note_start() { 169 assert(monitor()->owned_by_self(), "note_finish is an internal method"); 170 _started_workers += 1; 171 } 172 173 void WorkGang::internal_note_finish() { 174 assert(monitor()->owned_by_self(), "note_finish is an internal method"); 175 _finished_workers += 1; 176 } 177 178 // GangWorker methods. 179 180 AbstractGangWorker::AbstractGangWorker(AbstractWorkGang* gang, uint id) { 181 _gang = gang; 182 set_id(id); 183 set_name("%s#%d", gang->name(), id); 184 } 185 186 void AbstractGangWorker::run() { 187 initialize(); 188 loop(); 189 } 190 191 void AbstractGangWorker::initialize() { 192 this->initialize_thread_local_storage(); 193 this->record_stack_base_and_size(); 194 this->initialize_named_thread(); 195 assert(_gang != NULL, "No gang to run in"); 196 os::set_priority(this, NearMaxPriority); 197 if (TraceWorkGang) { 198 tty->print_cr("Running gang worker for gang %s id %u", 201 // The VM thread should not execute here because MutexLocker's are used 202 // as (opposed to MutexLockerEx's). 203 assert(!Thread::current()->is_VM_thread(), "VM thread should not be part" 204 " of a work gang"); 205 } 206 207 bool AbstractGangWorker::is_GC_task_thread() const { 208 return gang()->are_GC_task_threads(); 209 } 210 211 bool AbstractGangWorker::is_ConcurrentGC_thread() const { 212 return gang()->are_ConcurrentGC_threads(); 213 } 214 215 void AbstractGangWorker::print_on(outputStream* st) const { 216 st->print("\"%s\" ", name()); 217 Thread::print_on(st); 218 st->cr(); 219 } 220 221 void GangWorker::loop() { 222 int previous_sequence_number = 0; 223 Monitor* gang_monitor = gang()->monitor(); 224 for ( ; ; ) { 225 WorkData data; 226 int part; // Initialized below. 227 { 228 // Grab the gang mutex. 229 MutexLocker ml(gang_monitor); 230 // Wait for something to do. 231 // Polling outside the while { wait } avoids missed notifies 232 // in the outer loop. 233 gang()->internal_worker_poll(&data); 234 if (TraceWorkGang) { 235 tty->print("Polled outside for work in gang %s worker %u", 236 gang()->name(), id()); 237 tty->print(" sequence: %d (prev: %d)", 238 data.sequence_number(), previous_sequence_number); 239 if (data.task() != NULL) { 240 tty->print(" task: %s", data.task()->name()); 241 } else { 242 tty->print(" task: NULL"); 243 } 244 tty->cr(); 245 } 246 for ( ; /* break */; ) { 247 // Check for new work. 248 if ((data.task() != NULL) && 249 (data.sequence_number() != previous_sequence_number)) { 250 if (gang()->needs_more_workers()) { 251 gang()->internal_note_start(); 252 gang_monitor->notify_all(); 253 part = gang()->started_workers() - 1; 254 break; 255 } 256 } 257 // Nothing to do. 258 gang_monitor->wait(/* no_safepoint_check */ true); 259 gang()->internal_worker_poll(&data); 260 if (TraceWorkGang) { 261 tty->print("Polled inside for work in gang %s worker %u", 262 gang()->name(), id()); 263 tty->print(" sequence: %d (prev: %d)", 264 data.sequence_number(), previous_sequence_number); 265 if (data.task() != NULL) { 266 tty->print(" task: %s", data.task()->name()); 267 } else { 268 tty->print(" task: NULL"); 269 } 270 tty->cr(); 271 } 272 } 273 // Drop gang mutex. 274 } 275 if (TraceWorkGang) { 276 tty->print("Work for work gang %s id %u task %s part %d", 277 gang()->name(), id(), data.task()->name(), part); 278 } 279 assert(data.task() != NULL, "Got null task"); 280 data.task()->work(part); 281 { 282 if (TraceWorkGang) { 283 tty->print("Finish for work gang %s id %u task %s part %d", 284 gang()->name(), id(), data.task()->name(), part); 285 } 286 // Grab the gang mutex. 287 MutexLocker ml(gang_monitor); 288 gang()->internal_note_finish(); 289 // Tell the gang you are done. 290 gang_monitor->notify_all(); 291 // Drop the gang mutex. 292 } 293 previous_sequence_number = data.sequence_number(); 294 } 295 } 296 297 // *** WorkGangBarrierSync 298 299 WorkGangBarrierSync::WorkGangBarrierSync() 300 : _monitor(Mutex::safepoint, "work gang barrier sync", true, 301 Monitor::_safepoint_check_never), 302 _n_workers(0), _n_completed(0), _should_reset(false), _aborted(false) { 303 } 304 305 WorkGangBarrierSync::WorkGangBarrierSync(uint n_workers, const char* name) 306 : _monitor(Mutex::safepoint, name, true, Monitor::_safepoint_check_never), 307 _n_workers(n_workers), _n_completed(0), _should_reset(false), _aborted(false) { 308 } 309 310 void WorkGangBarrierSync::set_n_workers(uint n_workers) { 311 _n_workers = n_workers; 312 _n_completed = 0; 313 _should_reset = false; | 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 * 23 */ 24 25 #include "precompiled.hpp" 26 #include "gc/shared/workgroup.hpp" 27 #include "memory/allocation.hpp" 28 #include "memory/allocation.inline.hpp" 29 #include "runtime/atomic.inline.hpp" 30 #include "runtime/os.hpp" 31 #include "runtime/semaphore.hpp" 32 #include "runtime/thread.inline.hpp" 33 34 // Definitions of WorkGang methods. 35 36 // The current implementation will exit if the allocation 37 // of any worker fails. Still, return a boolean so that 38 // a future implementation can possibly do a partial 39 // initialization of the workers and report such to the 40 // caller. 41 bool AbstractWorkGang::initialize_workers() { 42 43 if (TraceWorkGang) { 44 tty->print_cr("Constructing work gang %s with %d threads", 45 name(), 46 total_workers()); 47 } 48 _workers = NEW_C_HEAP_ARRAY(AbstractGangWorker*, total_workers(), mtInternal); 49 if (_workers == NULL) { 50 vm_exit_out_of_memory(0, OOM_MALLOC_ERROR, "Cannot create GangWorker array."); 51 return false; 52 } 81 assert(result != NULL, "Indexing to null worker"); 82 return result; 83 } 84 85 void AbstractWorkGang::print_worker_threads_on(outputStream* st) const { 86 uint workers = total_workers(); 87 for (uint i = 0; i < workers; i++) { 88 worker(i)->print_on(st); 89 st->cr(); 90 } 91 } 92 93 void AbstractWorkGang::threads_do(ThreadClosure* tc) const { 94 assert(tc != NULL, "Null ThreadClosure"); 95 uint workers = total_workers(); 96 for (uint i = 0; i < workers; i++) { 97 tc->do_thread(worker(i)); 98 } 99 } 100 101 // WorkGang dispatcher implemented with semaphores. 102 // 103 // Semaphores don't require the worker threads to re-claim the lock when they wake up. 104 // This helps lowering the latency when starting and stopping the worker threads. 105 class SemaphoreGangTaskDispatcher : public GangTaskDispatcher { 106 // The task currently being dispatched to the GangWorkers. 107 AbstractGangTask* _task; 108 109 volatile uint _started; 110 volatile uint _not_finished; 111 112 // Semaphore used to start the GangWorkers. 113 Semaphore* _start_semaphore; 114 // Semaphore used to notify the coordinator that all workers are done. 115 Semaphore* _end_semaphore; 116 117 public: 118 SemaphoreGangTaskDispatcher() : 119 _task(NULL), 120 _started(0), 121 _not_finished(0), 122 // Limit the semaphore value to the number of workers. 123 _start_semaphore(new Semaphore()), 124 _end_semaphore(new Semaphore()) 125 { } 126 127 ~SemaphoreGangTaskDispatcher() { 128 delete _start_semaphore; 129 delete _end_semaphore; 130 } 131 132 void coordinator_execute_on_workers(AbstractGangTask* task, uint num_workers) { 133 // No workers are allowed to read the state variables until they have been signaled. 134 _task = task; 135 _not_finished = num_workers; 136 137 // Dispatch 'num_workers' number of tasks. 138 _start_semaphore->signal(num_workers); 139 140 // Wait for the last worker to signal the coordinator. 141 _end_semaphore->wait(); 142 143 // No workers are allowed to read the state variables after the coordinator has been signaled. 144 _task = NULL; 145 _started = 0; 146 _not_finished = 0; 147 } 148 149 WorkData worker_wait_for_task() { 150 // Wait for the coordinator to dispatch a task. 151 _start_semaphore->wait(); 152 153 uint num_started = (uint) Atomic::add(1, (volatile jint*)&_started); 154 155 // Subtract one to get a zero-indexed worker id. 156 uint worker_id = num_started - 1; 157 158 return WorkData(_task, worker_id); 159 } 160 161 void worker_done_with_task() { 162 // Mark that the worker is done with the task. 163 // The worker is not allowed to read the state variables after this line. 164 uint not_finished = (uint) Atomic::add(-1, (volatile jint*)&_not_finished); 165 166 // The last worker signals to the coordinator that all work is completed. 167 if (not_finished == 0) { 168 _end_semaphore->signal(); 169 } 170 } 171 }; 172 173 class MutexGangTaskDispatcher : public GangTaskDispatcher { 174 AbstractGangTask* _task; 175 176 volatile uint _started; 177 volatile uint _finished; 178 volatile uint _num_workers; 179 180 Monitor* _monitor; 181 182 public: 183 MutexGangTaskDispatcher() 184 : _task(NULL), 185 _monitor(new Monitor(Monitor::leaf, "WorkGang dispatcher lock", false, Monitor::_safepoint_check_never)), 186 _started(0), 187 _finished(0), 188 _num_workers(0) {} 189 190 ~MutexGangTaskDispatcher() { 191 delete _monitor; 192 } 193 194 void coordinator_execute_on_workers(AbstractGangTask* task, uint num_workers) { 195 MutexLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag); 196 197 _task = task; 198 _num_workers = num_workers; 199 200 // Tell the workers to get to work. 201 _monitor->notify_all(); 202 203 // Wait for them to finish. 204 while (_finished < _num_workers) { 205 _monitor->wait(/* no_safepoint_check */ true); 206 } 207 208 _task = NULL; 209 _num_workers = 0; 210 _started = 0; 211 _finished = 0; 212 } 213 214 WorkData worker_wait_for_task() { 215 MonitorLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag); 216 217 while (_num_workers == 0 || _started == _num_workers) { 218 _monitor->wait(/* no_safepoint_check */ true); 219 } 220 221 _started++; 222 223 // Subtract one to get a zero-indexed worker id. 224 uint worker_id = _started - 1; 225 226 return WorkData(_task, worker_id); 227 } 228 229 void worker_done_with_task() { 230 MonitorLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag); 231 232 _finished++; 233 234 if (_finished == _num_workers) { 235 // This will wake up all workers and not only the coordinator. 236 _monitor->notify_all(); 237 } 238 } 239 }; 240 241 static GangTaskDispatcher* create_dispatcher() { 242 if (UseSemaphoreGCThreadsSynchronization) { 243 return new SemaphoreGangTaskDispatcher(); 244 } 245 246 return new MutexGangTaskDispatcher(); 247 } 248 249 WorkGang::WorkGang(const char* name, 250 uint workers, 251 bool are_GC_task_threads, 252 bool are_ConcurrentGC_threads) : 253 AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads), 254 _dispatcher(create_dispatcher()) 255 { } 256 257 AbstractGangWorker* WorkGang::allocate_worker(uint worker_id) { 258 return new GangWorker(this, worker_id); 259 } 260 261 void WorkGang::run_task(AbstractGangTask* task) { 262 _dispatcher->coordinator_execute_on_workers(task, active_workers()); 263 } 264 265 AbstractGangWorker::AbstractGangWorker(AbstractWorkGang* gang, uint id) { 266 _gang = gang; 267 set_id(id); 268 set_name("%s#%d", gang->name(), id); 269 } 270 271 void AbstractGangWorker::run() { 272 initialize(); 273 loop(); 274 } 275 276 void AbstractGangWorker::initialize() { 277 this->initialize_thread_local_storage(); 278 this->record_stack_base_and_size(); 279 this->initialize_named_thread(); 280 assert(_gang != NULL, "No gang to run in"); 281 os::set_priority(this, NearMaxPriority); 282 if (TraceWorkGang) { 283 tty->print_cr("Running gang worker for gang %s id %u", 286 // The VM thread should not execute here because MutexLocker's are used 287 // as (opposed to MutexLockerEx's). 288 assert(!Thread::current()->is_VM_thread(), "VM thread should not be part" 289 " of a work gang"); 290 } 291 292 bool AbstractGangWorker::is_GC_task_thread() const { 293 return gang()->are_GC_task_threads(); 294 } 295 296 bool AbstractGangWorker::is_ConcurrentGC_thread() const { 297 return gang()->are_ConcurrentGC_threads(); 298 } 299 300 void AbstractGangWorker::print_on(outputStream* st) const { 301 st->print("\"%s\" ", name()); 302 Thread::print_on(st); 303 st->cr(); 304 } 305 306 WorkData GangWorker::wait_for_task() { 307 return gang()->dispatcher()->worker_wait_for_task(); 308 } 309 310 void GangWorker::signal_task_done() { 311 gang()->dispatcher()->worker_done_with_task(); 312 } 313 314 void GangWorker::print_task_started(WorkData data) { 315 if (TraceWorkGang) { 316 tty->print_cr("Running work gang %s task %s worker %u", name(), data._task->name(), data._worker_id); 317 } 318 } 319 320 void GangWorker::print_task_done(WorkData data) { 321 if (TraceWorkGang) { 322 tty->print_cr("\nFinished work gang %s task %s worker %u", name(), data._task->name(), data._worker_id); 323 Thread* me = Thread::current(); 324 tty->print_cr(" T: " PTR_FORMAT " VM_thread: %d", p2i(me), me->is_VM_thread()); 325 } 326 } 327 328 void GangWorker::run_task(WorkData data) { 329 print_task_started(data); 330 331 data._task->work(data._worker_id); 332 333 print_task_done(data); 334 } 335 336 void GangWorker::loop() { 337 while (true) { 338 WorkData data = wait_for_task(); 339 340 run_task(data); 341 342 signal_task_done(); 343 } 344 } 345 346 // *** WorkGangBarrierSync 347 348 WorkGangBarrierSync::WorkGangBarrierSync() 349 : _monitor(Mutex::safepoint, "work gang barrier sync", true, 350 Monitor::_safepoint_check_never), 351 _n_workers(0), _n_completed(0), _should_reset(false), _aborted(false) { 352 } 353 354 WorkGangBarrierSync::WorkGangBarrierSync(uint n_workers, const char* name) 355 : _monitor(Mutex::safepoint, name, true, Monitor::_safepoint_check_never), 356 _n_workers(n_workers), _n_completed(0), _should_reset(false), _aborted(false) { 357 } 358 359 void WorkGangBarrierSync::set_n_workers(uint n_workers) { 360 _n_workers = n_workers; 361 _n_completed = 0; 362 _should_reset = false; |