1 /*
2 * Copyright (c) 2001, 2019, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 *
23 */
24
25 #include "precompiled.hpp"
26 #include "gc/shared/ptrQueue.hpp"
27 #include "logging/log.hpp"
28 #include "memory/allocation.hpp"
29 #include "memory/allocation.inline.hpp"
30 #include "runtime/atomic.hpp"
31 #include "runtime/mutex.hpp"
32 #include "runtime/mutexLocker.hpp"
33 #include "runtime/orderAccess.hpp"
34 #include "runtime/thread.inline.hpp"
35 #include "utilities/globalCounter.inline.hpp"
36
37 #include <new>
38
39 PtrQueue::PtrQueue(PtrQueueSet* qset, bool active) :
40 _qset(qset),
41 _active(active),
42 _index(0),
43 _capacity_in_bytes(0),
44 _buf(NULL)
45 {}
46
47 PtrQueue::~PtrQueue() {
48 assert(_buf == NULL, "queue must be flushed before delete");
49 }
50
51 void PtrQueue::flush_impl() {
52 if (_buf != NULL) {
53 BufferNode* node = BufferNode::make_node_from_buffer(_buf, index());
54 if (is_empty()) {
55 // No work to do.
56 qset()->deallocate_buffer(node);
57 } else {
58 qset()->enqueue_completed_buffer(node);
59 }
60 _buf = NULL;
61 set_index(0);
62 }
63 }
64
65
66 void PtrQueue::enqueue_known_active(void* ptr) {
67 while (_index == 0) {
68 handle_zero_index();
69 }
70
71 assert(_buf != NULL, "postcondition");
72 assert(index() > 0, "postcondition");
73 assert(index() <= capacity(), "invariant");
74 _index -= _element_size;
75 _buf[index()] = ptr;
76 }
77
78 BufferNode* BufferNode::allocate(size_t size) {
79 size_t byte_size = size * sizeof(void*);
80 void* data = NEW_C_HEAP_ARRAY(char, buffer_offset() + byte_size, mtGC);
81 return new (data) BufferNode;
82 }
83
84 void BufferNode::deallocate(BufferNode* node) {
85 node->~BufferNode();
86 FREE_C_HEAP_ARRAY(char, node);
87 }
88
89 BufferNode::Allocator::Allocator(const char* name, size_t buffer_size) :
90 _buffer_size(buffer_size),
91 _pending_list(),
92 _free_list(),
93 _pending_count(0),
94 _free_count(0),
95 _transfer_lock(false)
96 {
97 strncpy(_name, name, sizeof(_name) - 1);
98 _name[sizeof(_name) - 1] = '\0';
99 }
100
101 BufferNode::Allocator::~Allocator() {
102 delete_list(_free_list.pop_all());
103 delete_list(_pending_list.pop_all());
104 }
105
106 void BufferNode::Allocator::delete_list(BufferNode* list) {
107 while (list != NULL) {
108 BufferNode* next = list->next();
109 DEBUG_ONLY(list->set_next(NULL);)
110 BufferNode::deallocate(list);
111 list = next;
112 }
113 }
114
115 size_t BufferNode::Allocator::free_count() const {
116 return Atomic::load(&_free_count);
117 }
118
119 BufferNode* BufferNode::Allocator::allocate() {
120 BufferNode* node;
121 {
122 // Protect against ABA; see release().
123 GlobalCounter::CriticalSection cs(Thread::current());
124 node = _free_list.pop();
125 }
126 if (node == NULL) {
127 node = BufferNode::allocate(_buffer_size);
128 } else {
129 // Decrement count after getting buffer from free list. This, along
130 // with incrementing count before adding to free list, ensures count
131 // never underflows.
132 size_t count = Atomic::sub(1u, &_free_count);
133 assert((count + 1) != 0, "_free_count underflow");
134 }
135 return node;
136 }
137
138 // To solve the ABA problem for lock-free stack pop, allocate does the
139 // pop inside a critical section, and release synchronizes on the
140 // critical sections before adding to the _free_list. But we don't
141 // want to make every release have to do a synchronize. Instead, we
142 // initially place released nodes on the _pending_list, and transfer
143 // them to the _free_list in batches. Only one transfer at a time is
144 // permitted, with a lock bit to control access to that phase. A
145 // transfer takes all the nodes from the _pending_list, synchronizes on
146 // the _free_list pops, and then adds the former pending nodes to the
147 // _free_list. While that's happening, other threads might be adding
148 // other nodes to the _pending_list, to be dealt with by some later
149 // transfer.
150 void BufferNode::Allocator::release(BufferNode* node) {
151 assert(node != NULL, "precondition");
152 assert(node->next() == NULL, "precondition");
153
154 // Desired minimum transfer batch size. There is relatively little
155 // importance to the specific number. It shouldn't be too big, else
156 // we're wasting space when the release rate is low. If the release
157 // rate is high, we might accumulate more than this before being
158 // able to start a new transfer, but that's okay. Also note that
159 // the allocation rate and the release rate are going to be fairly
160 // similar, due to how the buffers are used.
161 const size_t trigger_transfer = 10;
162
163 // Add to pending list. Update count first so no underflow in transfer.
164 size_t pending_count = Atomic::add(1u, &_pending_count);
165 _pending_list.push(*node);
166 if (pending_count > trigger_transfer) {
167 try_transfer_pending();
168 }
169 }
170
171 // Try to transfer nodes from _pending_list to _free_list, with a
172 // synchronization delay for any in-progress pops from the _free_list,
173 // to solve ABA there. Return true if performed a (possibly empty)
174 // transfer, false if blocked from doing so by some other thread's
175 // in-progress transfer.
176 bool BufferNode::Allocator::try_transfer_pending() {
177 // Attempt to claim the lock.
178 if (Atomic::load(&_transfer_lock) || // Skip CAS if likely to fail.
179 Atomic::cmpxchg(true, &_transfer_lock, false)) {
180 return false;
181 }
182 // Have the lock; perform the transfer.
183
184 // Claim all the pending nodes.
185 BufferNode* first = _pending_list.pop_all();
186 if (first != NULL) {
187 // Prepare to add the claimed nodes, and update _pending_count.
188 BufferNode* last = first;
189 size_t count = 1;
190 for (BufferNode* next = first->next(); next != NULL; next = next->next()) {
191 last = next;
192 ++count;
193 }
194 Atomic::sub(count, &_pending_count);
195
196 // Wait for any in-progress pops, to avoid ABA for them.
197 GlobalCounter::write_synchronize();
198
199 // Add synchronized nodes to _free_list.
200 // Update count first so no underflow in allocate().
201 Atomic::add(count, &_free_count);
202 _free_list.prepend(*first, *last);
203 log_trace(gc, ptrqueue, freelist)
204 ("Transferred %s pending to free: " SIZE_FORMAT, name(), count);
205 }
206 OrderAccess::release_store(&_transfer_lock, false);
207 return true;
208 }
209
210 size_t BufferNode::Allocator::reduce_free_list(size_t remove_goal) {
211 try_transfer_pending();
212 size_t removed = 0;
213 for ( ; removed < remove_goal; ++removed) {
214 BufferNode* node = _free_list.pop();
215 if (node == NULL) break;
216 BufferNode::deallocate(node);
217 }
218 size_t new_count = Atomic::sub(removed, &_free_count);
219 log_debug(gc, ptrqueue, freelist)
220 ("Reduced %s free list by " SIZE_FORMAT " to " SIZE_FORMAT,
221 name(), removed, new_count);
222 return removed;
223 }
224
225 PtrQueueSet::PtrQueueSet(bool notify_when_complete) :
226 _allocator(NULL),
227 _cbl_mon(NULL),
228 _completed_buffers_head(NULL),
229 _completed_buffers_tail(NULL),
230 _n_completed_buffers(0),
231 _process_completed_buffers_threshold(ProcessCompletedBuffersThresholdNever),
232 _process_completed_buffers(false),
233 _notify_when_complete(notify_when_complete),
234 _max_completed_buffers(MaxCompletedBuffersUnlimited),
235 _completed_buffers_padding(0),
236 _all_active(false)
237 {}
238
239 PtrQueueSet::~PtrQueueSet() {
240 // There are presently only a couple (derived) instances ever
241 // created, and they are permanent, so no harm currently done by
242 // doing nothing here.
243 }
244
245 void PtrQueueSet::initialize(Monitor* cbl_mon,
246 BufferNode::Allocator* allocator) {
247 assert(cbl_mon != NULL && allocator != NULL, "Init order issue?");
248 _cbl_mon = cbl_mon;
249 _allocator = allocator;
250 }
251
252 void** PtrQueueSet::allocate_buffer() {
253 BufferNode* node = _allocator->allocate();
254 return BufferNode::make_buffer_from_node(node);
255 }
256
257 void PtrQueueSet::deallocate_buffer(BufferNode* node) {
258 _allocator->release(node);
259 }
260
261 void PtrQueue::handle_zero_index() {
262 assert(index() == 0, "precondition");
263
264 // This thread records the full buffer and allocates a new one (while
265 // holding the lock if there is one).
266 if (_buf != NULL) {
267 if (!should_enqueue_buffer()) {
268 assert(index() > 0, "the buffer can only be re-used if it's not full");
269 return;
270 }
271
272 BufferNode* node = BufferNode::make_node_from_buffer(_buf, index());
273 if (qset()->process_or_enqueue_completed_buffer(node)) {
274 // Recycle the buffer. No allocation.
275 assert(_buf == BufferNode::make_buffer_from_node(node), "invariant");
276 assert(capacity() == qset()->buffer_size(), "invariant");
277 reset();
278 return;
279 }
280 }
281 // Set capacity in case this is the first allocation.
282 set_capacity(qset()->buffer_size());
283 // Allocate a new buffer.
284 _buf = qset()->allocate_buffer();
285 reset();
286 }
287
288 bool PtrQueueSet::process_or_enqueue_completed_buffer(BufferNode* node) {
289 if (Thread::current()->is_Java_thread()) {
290 // If the number of buffers exceeds the limit, make this Java
291 // thread do the processing itself. We don't lock to access
292 // buffer count or padding; it is fine to be imprecise here. The
293 // add of padding could overflow, which is treated as unlimited.
294 size_t limit = _max_completed_buffers + _completed_buffers_padding;
295 if ((_n_completed_buffers > limit) && (limit >= _max_completed_buffers)) {
296 if (mut_process_buffer(node)) {
297 // Successfully processed; return true to allow buffer reuse.
298 return true;
299 }
300 }
301 }
302 // The buffer will be enqueued. The caller will have to get a new one.
303 enqueue_completed_buffer(node);
304 return false;
305 }
306
307 void PtrQueueSet::enqueue_completed_buffer(BufferNode* cbn) {
308 MutexLocker x(_cbl_mon, Mutex::_no_safepoint_check_flag);
309 cbn->set_next(NULL);
310 if (_completed_buffers_tail == NULL) {
311 assert(_completed_buffers_head == NULL, "Well-formedness");
312 _completed_buffers_head = cbn;
313 _completed_buffers_tail = cbn;
314 } else {
315 _completed_buffers_tail->set_next(cbn);
316 _completed_buffers_tail = cbn;
317 }
318 _n_completed_buffers++;
319
320 if (!_process_completed_buffers &&
321 (_n_completed_buffers > _process_completed_buffers_threshold)) {
322 _process_completed_buffers = true;
323 if (_notify_when_complete) {
324 _cbl_mon->notify();
325 }
326 }
327 assert_completed_buffers_list_len_correct_locked();
328 }
329
330 BufferNode* PtrQueueSet::get_completed_buffer(size_t stop_at) {
331 MutexLocker x(_cbl_mon, Mutex::_no_safepoint_check_flag);
332
333 if (_n_completed_buffers <= stop_at) {
334 return NULL;
335 }
336
337 assert(_n_completed_buffers > 0, "invariant");
338 assert(_completed_buffers_head != NULL, "invariant");
339 assert(_completed_buffers_tail != NULL, "invariant");
340
341 BufferNode* bn = _completed_buffers_head;
342 _n_completed_buffers--;
343 _completed_buffers_head = bn->next();
344 if (_completed_buffers_head == NULL) {
345 assert(_n_completed_buffers == 0, "invariant");
346 _completed_buffers_tail = NULL;
347 _process_completed_buffers = false;
348 }
349 assert_completed_buffers_list_len_correct_locked();
350 bn->set_next(NULL);
351 return bn;
352 }
353
354 void PtrQueueSet::abandon_completed_buffers() {
355 BufferNode* buffers_to_delete = NULL;
356 {
357 MutexLocker x(_cbl_mon, Mutex::_no_safepoint_check_flag);
358 buffers_to_delete = _completed_buffers_head;
359 _completed_buffers_head = NULL;
360 _completed_buffers_tail = NULL;
361 _n_completed_buffers = 0;
362 _process_completed_buffers = false;
363 }
364 while (buffers_to_delete != NULL) {
365 BufferNode* bn = buffers_to_delete;
366 buffers_to_delete = bn->next();
367 bn->set_next(NULL);
368 deallocate_buffer(bn);
369 }
370 }
371
372 #ifdef ASSERT
373
374 void PtrQueueSet::assert_completed_buffers_list_len_correct_locked() {
375 assert_lock_strong(_cbl_mon);
376 size_t n = 0;
377 for (BufferNode* bn = _completed_buffers_head; bn != NULL; bn = bn->next()) {
378 ++n;
379 }
380 assert(n == _n_completed_buffers,
381 "Completed buffer length is wrong: counted: " SIZE_FORMAT
382 ", expected: " SIZE_FORMAT, n, _n_completed_buffers);
383 }
384
385 #endif // ASSERT
386
387 // Merge lists of buffers. Notify the processing threads.
388 // The source queue is emptied as a result. The queues
389 // must share the monitor.
390 void PtrQueueSet::merge_bufferlists(PtrQueueSet *src) {
391 assert(_cbl_mon == src->_cbl_mon, "Should share the same lock");
392 MutexLocker x(_cbl_mon, Mutex::_no_safepoint_check_flag);
393 if (_completed_buffers_tail == NULL) {
394 assert(_completed_buffers_head == NULL, "Well-formedness");
395 _completed_buffers_head = src->_completed_buffers_head;
396 _completed_buffers_tail = src->_completed_buffers_tail;
397 } else {
398 assert(_completed_buffers_head != NULL, "Well formedness");
399 if (src->_completed_buffers_head != NULL) {
400 _completed_buffers_tail->set_next(src->_completed_buffers_head);
401 _completed_buffers_tail = src->_completed_buffers_tail;
402 }
403 }
404 _n_completed_buffers += src->_n_completed_buffers;
405
406 src->_n_completed_buffers = 0;
407 src->_completed_buffers_head = NULL;
408 src->_completed_buffers_tail = NULL;
409 src->_process_completed_buffers = false;
410
411 assert(_completed_buffers_head == NULL && _completed_buffers_tail == NULL ||
412 _completed_buffers_head != NULL && _completed_buffers_tail != NULL,
413 "Sanity");
414 assert_completed_buffers_list_len_correct_locked();
415 }
416
417 void PtrQueueSet::notify_if_necessary() {
418 MutexLocker x(_cbl_mon, Mutex::_no_safepoint_check_flag);
419 if (_n_completed_buffers > _process_completed_buffers_threshold) {
420 _process_completed_buffers = true;
421 if (_notify_when_complete)
422 _cbl_mon->notify();
423 }
424 }
--- EOF ---