1 /* 2 * Copyright (c) 2001, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 * 23 */ 24 25 #include "precompiled.hpp" 26 #include "gc/shared/ptrQueue.hpp" 27 #include "logging/log.hpp" 28 #include "memory/allocation.hpp" 29 #include "memory/allocation.inline.hpp" 30 #include "runtime/atomic.hpp" 31 #include "runtime/mutex.hpp" 32 #include "runtime/mutexLocker.hpp" 33 #include "runtime/orderAccess.hpp" 34 #include "runtime/thread.inline.hpp" 35 #include "utilities/globalCounter.inline.hpp" 36 37 #include <new> 38 39 PtrQueue::PtrQueue(PtrQueueSet* qset, bool active) : 40 _qset(qset), 41 _active(active), 42 _index(0), 43 _capacity_in_bytes(0), 44 _buf(NULL) 45 {} 46 47 PtrQueue::~PtrQueue() { 48 assert(_buf == NULL, "queue must be flushed before delete"); 49 } 50 51 void PtrQueue::flush_impl() { 52 if (_buf != NULL) { 53 BufferNode* node = BufferNode::make_node_from_buffer(_buf, index()); 54 if (is_empty()) { 55 // No work to do. 56 qset()->deallocate_buffer(node); 57 } else { 58 qset()->enqueue_completed_buffer(node); 59 } 60 _buf = NULL; 61 set_index(0); 62 } 63 } 64 65 66 void PtrQueue::enqueue_known_active(void* ptr) { 67 while (_index == 0) { 68 handle_zero_index(); 69 } 70 71 assert(_buf != NULL, "postcondition"); 72 assert(index() > 0, "postcondition"); 73 assert(index() <= capacity(), "invariant"); 74 _index -= _element_size; 75 _buf[index()] = ptr; 76 } 77 78 BufferNode* BufferNode::allocate(size_t size) { 79 size_t byte_size = size * sizeof(void*); 80 void* data = NEW_C_HEAP_ARRAY(char, buffer_offset() + byte_size, mtGC); 81 return new (data) BufferNode; 82 } 83 84 void BufferNode::deallocate(BufferNode* node) { 85 node->~BufferNode(); 86 FREE_C_HEAP_ARRAY(char, node); 87 } 88 89 BufferNode::Allocator::Allocator(const char* name, size_t buffer_size) : 90 _buffer_size(buffer_size), 91 _pending_list(), 92 _free_list(), 93 _pending_count(0), 94 _free_count(0), 95 _transfer_lock(false) 96 { 97 strncpy(_name, name, sizeof(_name)); 98 _name[sizeof(_name) - 1] = '\0'; 99 } 100 101 BufferNode::Allocator::~Allocator() { 102 delete_list(_free_list.pop_all()); 103 delete_list(_pending_list.pop_all()); 104 } 105 106 void BufferNode::Allocator::delete_list(BufferNode* list) { 107 while (list != NULL) { 108 BufferNode* next = list->next(); 109 DEBUG_ONLY(list->set_next(NULL);) 110 BufferNode::deallocate(list); 111 list = next; 112 } 113 } 114 115 size_t BufferNode::Allocator::free_count() const { 116 return Atomic::load(&_free_count); 117 } 118 119 BufferNode* BufferNode::Allocator::allocate() { 120 BufferNode* node; 121 { 122 // Protect against ABA; see release(). 123 GlobalCounter::CriticalSection cs(Thread::current()); 124 node = _free_list.pop(); 125 } 126 if (node == NULL) { 127 node = BufferNode::allocate(_buffer_size); 128 } else { 129 // Decrement count after getting buffer from free list. This, along 130 // with incrementing count before adding to free list, ensures count 131 // never underflows. 132 size_t count = Atomic::sub(1u, &_free_count); 133 assert((count + 1) != 0, "_free_count underflow"); 134 } 135 return node; 136 } 137 138 // To solve the ABA problem for lock-free stack pop, allocate does the 139 // pop inside a critical section, and release synchronizes on the 140 // critical sections before adding to the _free_list. But we don't 141 // want to make every release have to do a synchronize. Instead, we 142 // initially place released nodes on the _pending_list, and transfer 143 // them to the _free_list in batches. Only one transfer at a time is 144 // permitted, with a lock bit to control access to that phase. A 145 // transfer takes all the nodes from the _pending_list, synchronizes on 146 // the _free_list pops, and then adds the former pending nodes to the 147 // _free_list. While that's happening, other threads might be adding 148 // other nodes to the _pending_list, to be dealt with by some later 149 // transfer. 150 void BufferNode::Allocator::release(BufferNode* node) { 151 assert(node != NULL, "precondition"); 152 assert(node->next() == NULL, "precondition"); 153 154 // Desired minimum transfer batch size. There is relatively little 155 // importance to the specific number. It shouldn't be too big, else 156 // we're wasting space when the release rate is low. If the release 157 // rate is high, we might accumulate more than this before being 158 // able to start a new transfer, but that's okay. Also note that 159 // the allocation rate and the release rate are going to be fairly 160 // similar, due to how the buffers are used. 161 const size_t trigger_transfer = 10; 162 163 // Add to pending list. Update count first so no underflow in transfer. 164 size_t pending_count = Atomic::add(1u, &_pending_count); 165 _pending_list.push(*node); 166 if (pending_count > trigger_transfer) { 167 try_transfer_pending(); 168 } 169 } 170 171 // Try to transfer nodes from _pending_list to _free_list, with a 172 // synchronization delay for any in-progress pops from the _free_list, 173 // to solve ABA there. Return true if performed a (possibly empty) 174 // transfer, false if blocked from doing so by some other thread's 175 // in-progress transfer. 176 bool BufferNode::Allocator::try_transfer_pending() { 177 // Attempt to claim the lock. 178 if (Atomic::load(&_transfer_lock) || // Skip CAS if likely to fail. 179 Atomic::cmpxchg(true, &_transfer_lock, false)) { 180 return false; 181 } 182 // Have the lock; perform the transfer. 183 184 // Claim all the pending nodes. 185 BufferNode* first = _pending_list.pop_all(); 186 if (first != NULL) { 187 // Prepare to add the claimed nodes, and update _pending_count. 188 BufferNode* last = first; 189 size_t count = 1; 190 for (BufferNode* next = first->next(); next != NULL; next = next->next()) { 191 last = next; 192 ++count; 193 } 194 Atomic::sub(count, &_pending_count); 195 196 // Wait for any in-progress pops, to avoid ABA for them. 197 GlobalCounter::write_synchronize(); 198 199 // Add synchronized nodes to _free_list. 200 // Update count first so no underflow in allocate(). 201 Atomic::add(count, &_free_count); 202 _free_list.prepend(*first, *last); 203 log_trace(gc, ptrqueue, freelist) 204 ("Transferred %s pending to free: " SIZE_FORMAT, name(), count); 205 } 206 OrderAccess::release_store(&_transfer_lock, false); 207 return true; 208 } 209 210 size_t BufferNode::Allocator::reduce_free_list(size_t remove_goal) { 211 try_transfer_pending(); 212 size_t removed = 0; 213 for ( ; removed < remove_goal; ++removed) { 214 BufferNode* node = _free_list.pop(); 215 if (node == NULL) break; 216 BufferNode::deallocate(node); 217 } 218 size_t new_count = Atomic::sub(removed, &_free_count); 219 log_debug(gc, ptrqueue, freelist) 220 ("Reduced %s free list by " SIZE_FORMAT " to " SIZE_FORMAT, 221 name(), removed, new_count); 222 return removed; 223 } 224 225 PtrQueueSet::PtrQueueSet(bool notify_when_complete) : 226 _allocator(NULL), 227 _cbl_mon(NULL), 228 _completed_buffers_head(NULL), 229 _completed_buffers_tail(NULL), 230 _n_completed_buffers(0), 231 _process_completed_buffers_threshold(ProcessCompletedBuffersThresholdNever), 232 _process_completed_buffers(false), 233 _notify_when_complete(notify_when_complete), 234 _max_completed_buffers(MaxCompletedBuffersUnlimited), 235 _completed_buffers_padding(0), 236 _all_active(false) 237 {} 238 239 PtrQueueSet::~PtrQueueSet() { 240 // There are presently only a couple (derived) instances ever 241 // created, and they are permanent, so no harm currently done by 242 // doing nothing here. 243 } 244 245 void PtrQueueSet::initialize(Monitor* cbl_mon, 246 BufferNode::Allocator* allocator) { 247 assert(cbl_mon != NULL && allocator != NULL, "Init order issue?"); 248 _cbl_mon = cbl_mon; 249 _allocator = allocator; 250 } 251 252 void** PtrQueueSet::allocate_buffer() { 253 BufferNode* node = _allocator->allocate(); 254 return BufferNode::make_buffer_from_node(node); 255 } 256 257 void PtrQueueSet::deallocate_buffer(BufferNode* node) { 258 _allocator->release(node); 259 } 260 261 void PtrQueue::handle_zero_index() { 262 assert(index() == 0, "precondition"); 263 264 // This thread records the full buffer and allocates a new one (while 265 // holding the lock if there is one). 266 if (_buf != NULL) { 267 if (!should_enqueue_buffer()) { 268 assert(index() > 0, "the buffer can only be re-used if it's not full"); 269 return; 270 } 271 272 BufferNode* node = BufferNode::make_node_from_buffer(_buf, index()); 273 if (qset()->process_or_enqueue_completed_buffer(node)) { 274 // Recycle the buffer. No allocation. 275 assert(_buf == BufferNode::make_buffer_from_node(node), "invariant"); 276 assert(capacity() == qset()->buffer_size(), "invariant"); 277 reset(); 278 return; 279 } 280 } 281 // Set capacity in case this is the first allocation. 282 set_capacity(qset()->buffer_size()); 283 // Allocate a new buffer. 284 _buf = qset()->allocate_buffer(); 285 reset(); 286 } 287 288 bool PtrQueueSet::process_or_enqueue_completed_buffer(BufferNode* node) { 289 if (Thread::current()->is_Java_thread()) { 290 // If the number of buffers exceeds the limit, make this Java 291 // thread do the processing itself. We don't lock to access 292 // buffer count or padding; it is fine to be imprecise here. The 293 // add of padding could overflow, which is treated as unlimited. 294 size_t limit = _max_completed_buffers + _completed_buffers_padding; 295 if ((_n_completed_buffers > limit) && (limit >= _max_completed_buffers)) { 296 if (mut_process_buffer(node)) { 297 // Successfully processed; return true to allow buffer reuse. 298 return true; 299 } 300 } 301 } 302 // The buffer will be enqueued. The caller will have to get a new one. 303 enqueue_completed_buffer(node); 304 return false; 305 } 306 307 void PtrQueueSet::enqueue_completed_buffer(BufferNode* cbn) { 308 MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag); 309 cbn->set_next(NULL); 310 if (_completed_buffers_tail == NULL) { 311 assert(_completed_buffers_head == NULL, "Well-formedness"); 312 _completed_buffers_head = cbn; 313 _completed_buffers_tail = cbn; 314 } else { 315 _completed_buffers_tail->set_next(cbn); 316 _completed_buffers_tail = cbn; 317 } 318 _n_completed_buffers++; 319 320 if (!_process_completed_buffers && 321 (_n_completed_buffers > _process_completed_buffers_threshold)) { 322 _process_completed_buffers = true; 323 if (_notify_when_complete) { 324 _cbl_mon->notify(); 325 } 326 } 327 assert_completed_buffers_list_len_correct_locked(); 328 } 329 330 BufferNode* PtrQueueSet::get_completed_buffer(size_t stop_at) { 331 MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag); 332 333 if (_n_completed_buffers <= stop_at) { 334 return NULL; 335 } 336 337 assert(_n_completed_buffers > 0, "invariant"); 338 assert(_completed_buffers_head != NULL, "invariant"); 339 assert(_completed_buffers_tail != NULL, "invariant"); 340 341 BufferNode* bn = _completed_buffers_head; 342 _n_completed_buffers--; 343 _completed_buffers_head = bn->next(); 344 if (_completed_buffers_head == NULL) { 345 assert(_n_completed_buffers == 0, "invariant"); 346 _completed_buffers_tail = NULL; 347 _process_completed_buffers = false; 348 } 349 assert_completed_buffers_list_len_correct_locked(); 350 bn->set_next(NULL); 351 return bn; 352 } 353 354 void PtrQueueSet::abandon_completed_buffers() { 355 BufferNode* buffers_to_delete = NULL; 356 { 357 MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag); 358 buffers_to_delete = _completed_buffers_head; 359 _completed_buffers_head = NULL; 360 _completed_buffers_tail = NULL; 361 _n_completed_buffers = 0; 362 _process_completed_buffers = false; 363 } 364 while (buffers_to_delete != NULL) { 365 BufferNode* bn = buffers_to_delete; 366 buffers_to_delete = bn->next(); 367 bn->set_next(NULL); 368 deallocate_buffer(bn); 369 } 370 } 371 372 #ifdef ASSERT 373 374 void PtrQueueSet::assert_completed_buffers_list_len_correct_locked() { 375 assert_lock_strong(_cbl_mon); 376 size_t n = 0; 377 for (BufferNode* bn = _completed_buffers_head; bn != NULL; bn = bn->next()) { 378 ++n; 379 } 380 assert(n == _n_completed_buffers, 381 "Completed buffer length is wrong: counted: " SIZE_FORMAT 382 ", expected: " SIZE_FORMAT, n, _n_completed_buffers); 383 } 384 385 #endif // ASSERT 386 387 // Merge lists of buffers. Notify the processing threads. 388 // The source queue is emptied as a result. The queues 389 // must share the monitor. 390 void PtrQueueSet::merge_bufferlists(PtrQueueSet *src) { 391 assert(_cbl_mon == src->_cbl_mon, "Should share the same lock"); 392 MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag); 393 if (_completed_buffers_tail == NULL) { 394 assert(_completed_buffers_head == NULL, "Well-formedness"); 395 _completed_buffers_head = src->_completed_buffers_head; 396 _completed_buffers_tail = src->_completed_buffers_tail; 397 } else { 398 assert(_completed_buffers_head != NULL, "Well formedness"); 399 if (src->_completed_buffers_head != NULL) { 400 _completed_buffers_tail->set_next(src->_completed_buffers_head); 401 _completed_buffers_tail = src->_completed_buffers_tail; 402 } 403 } 404 _n_completed_buffers += src->_n_completed_buffers; 405 406 src->_n_completed_buffers = 0; 407 src->_completed_buffers_head = NULL; 408 src->_completed_buffers_tail = NULL; 409 src->_process_completed_buffers = false; 410 411 assert(_completed_buffers_head == NULL && _completed_buffers_tail == NULL || 412 _completed_buffers_head != NULL && _completed_buffers_tail != NULL, 413 "Sanity"); 414 assert_completed_buffers_list_len_correct_locked(); 415 } 416 417 void PtrQueueSet::notify_if_necessary() { 418 MutexLockerEx x(_cbl_mon, Mutex::_no_safepoint_check_flag); 419 if (_n_completed_buffers > _process_completed_buffers_threshold) { 420 _process_completed_buffers = true; 421 if (_notify_when_complete) 422 _cbl_mon->notify(); 423 } 424 }