46 size_t _stop_task; // Last task
47 size_t _size_log2; // Table size.
48 bool _is_mt;
49
50 BucketsOperation(ConcurrentHashTable<VALUE, CONFIG, F>* cht, bool is_mt = false)
51 : _cht(cht), _is_mt(is_mt), _next_to_claim(0), _task_size_log2(DEFAULT_TASK_SIZE_LOG2),
52 _stop_task(0), _size_log2(0) {}
53
54 // Returns true if you succeeded to claim the range start -> (stop-1).
55 bool claim(size_t* start, size_t* stop) {
56 size_t claimed = Atomic::add((size_t)1, &_next_to_claim) - 1;
57 if (claimed >= _stop_task) {
58 return false;
59 }
60 *start = claimed * (((size_t)1) << _task_size_log2);
61 *stop = ((*start) + (((size_t)1) << _task_size_log2));
62 return true;
63 }
64
65 // Calculate starting values.
66 void setup() {
67 _size_log2 = _cht->_table->_log2_size;
68 _task_size_log2 = MIN2(_task_size_log2, _size_log2);
69 size_t tmp = _size_log2 > _task_size_log2 ?
70 _size_log2 - _task_size_log2 : 0;
71 _stop_task = (((size_t)1) << tmp);
72 }
73
74 // Returns false if all ranges are claimed.
75 bool have_more_work() {
76 return OrderAccess::load_acquire(&_next_to_claim) >= _stop_task;
77 }
78
79 // If we have changed size.
80 bool is_same_table() {
81 // Not entirely true.
82 return _size_log2 != _cht->_table->_log2_size;
83 }
84
85 void thread_owns_resize_lock(Thread* thread) {
86 assert(BucketsOperation::_cht->_resize_lock_owner == thread,
87 "Should be locked by me");
88 assert(BucketsOperation::_cht->_resize_lock->owned_by_self(),
89 "Operations lock not held");
90 }
91 void thread_owns_only_state_lock(Thread* thread) {
92 assert(BucketsOperation::_cht->_resize_lock_owner == thread,
93 "Should be locked by me");
94 assert(!BucketsOperation::_cht->_resize_lock->owned_by_self(),
95 "Operations lock held");
96 }
97 void thread_do_not_own_resize_lock(Thread* thread) {
98 assert(!BucketsOperation::_cht->_resize_lock->owned_by_self(),
99 "Operations lock held");
100 assert(BucketsOperation::_cht->_resize_lock_owner != thread,
101 "Should not be locked by me");
102 }
103 };
104
105 // For doing pausable/parallel bulk delete.
106 template <typename VALUE, typename CONFIG, MEMFLAGS F>
107 class ConcurrentHashTable<VALUE, CONFIG, F>::BulkDeleteTask :
108 public BucketsOperation
109 {
110 public:
111 BulkDeleteTask(ConcurrentHashTable<VALUE, CONFIG, F>* cht, bool is_mt = false)
112 : BucketsOperation(cht, is_mt) {
113 }
114 // Before start prepare must be called.
115 bool prepare(Thread* thread) {
116 bool lock = BucketsOperation::_cht->try_resize_lock(thread);
117 if (!lock) {
118 return false;
119 }
120 this->setup();
121 this->thread_owns_resize_lock(thread);
122 return true;
123 }
124
125 // Does one range destroying all matching EVALUATE_FUNC and
126 // DELETE_FUNC is called be destruction. Returns true if there is more work.
127 template <typename EVALUATE_FUNC, typename DELETE_FUNC>
128 bool do_task(Thread* thread, EVALUATE_FUNC& eval_f, DELETE_FUNC& del_f) {
129 size_t start, stop;
130 assert(BucketsOperation::_cht->_resize_lock_owner != NULL,
131 "Should be locked");
132 if (!this->claim(&start, &stop)) {
133 return false;
134 }
135 BucketsOperation::_cht->do_bulk_delete_locked_for(thread, start, stop,
136 eval_f, del_f,
137 BucketsOperation::_is_mt);
138 return true;
139 }
140
141 // Pauses this operations for a safepoint.
142 void pause(Thread* thread) {
143 this->thread_owns_resize_lock(thread);
144 // This leaves internal state locked.
145 BucketsOperation::_cht->unlock_resize_lock(thread);
146 this->thread_do_not_own_resize_lock(thread);
147 }
148
149 // Continues this operations after a safepoint.
150 bool cont(Thread* thread) {
151 this->thread_do_not_own_resize_lock(thread);
152 if (!BucketsOperation::_cht->try_resize_lock(thread)) {
153 this->thread_do_not_own_resize_lock(thread);
154 return false;
155 }
156 if (BucketsOperation::is_same_table()) {
157 BucketsOperation::_cht->unlock_resize_lock(thread);
158 this->thread_do_not_own_resize_lock(thread);
159 return false;
160 }
161 this->thread_owns_resize_lock(thread);
162 return true;
163 }
164
165 // Must be called after ranges are done.
166 void done(Thread* thread) {
167 this->thread_owns_resize_lock(thread);
168 BucketsOperation::_cht->unlock_resize_lock(thread);
169 this->thread_do_not_own_resize_lock(thread);
170 }
171 };
172
173 template <typename VALUE, typename CONFIG, MEMFLAGS F>
174 class ConcurrentHashTable<VALUE, CONFIG, F>::GrowTask :
175 public BucketsOperation
176 {
177 public:
178 GrowTask(ConcurrentHashTable<VALUE, CONFIG, F>* cht) : BucketsOperation(cht) {
179 }
180 // Before start prepare must be called.
181 bool prepare(Thread* thread) {
182 if (!BucketsOperation::_cht->internal_grow_prolog(
183 thread, BucketsOperation::_cht->_log2_size_limit)) {
184 return false;
185 }
186 this->thread_owns_resize_lock(thread);
187 BucketsOperation::setup();
188 return true;
189 }
190
191 // Re-sizes a portion of the table. Returns true if there is more work.
192 bool do_task(Thread* thread) {
193 size_t start, stop;
194 assert(BucketsOperation::_cht->_resize_lock_owner != NULL,
195 "Should be locked");
196 if (!this->claim(&start, &stop)) {
197 return false;
198 }
199 BucketsOperation::_cht->internal_grow_range(thread, start, stop);
200 assert(BucketsOperation::_cht->_resize_lock_owner != NULL,
201 "Should be locked");
202 return true;
203 }
204
205 // Pauses growing for safepoint
206 void pause(Thread* thread) {
207 // This leaves internal state locked.
208 this->thread_owns_resize_lock(thread);
209 BucketsOperation::_cht->_resize_lock->unlock();
210 this->thread_owns_only_state_lock(thread);
211 }
212
213 // Continues growing after safepoint.
214 void cont(Thread* thread) {
215 this->thread_owns_only_state_lock(thread);
216 // If someone slips in here directly after safepoint.
217 while (!BucketsOperation::_cht->_resize_lock->try_lock())
218 { /* for ever */ };
219 this->thread_owns_resize_lock(thread);
220 }
221
222 // Must be called after do_task returns false.
223 void done(Thread* thread) {
224 this->thread_owns_resize_lock(thread);
225 BucketsOperation::_cht->internal_grow_epilog(thread);
226 this->thread_do_not_own_resize_lock(thread);
227 }
228 };
229
230 #endif // include guard
|
46 size_t _stop_task; // Last task
47 size_t _size_log2; // Table size.
48 bool _is_mt;
49
50 BucketsOperation(ConcurrentHashTable<VALUE, CONFIG, F>* cht, bool is_mt = false)
51 : _cht(cht), _is_mt(is_mt), _next_to_claim(0), _task_size_log2(DEFAULT_TASK_SIZE_LOG2),
52 _stop_task(0), _size_log2(0) {}
53
54 // Returns true if you succeeded to claim the range start -> (stop-1).
55 bool claim(size_t* start, size_t* stop) {
56 size_t claimed = Atomic::add((size_t)1, &_next_to_claim) - 1;
57 if (claimed >= _stop_task) {
58 return false;
59 }
60 *start = claimed * (((size_t)1) << _task_size_log2);
61 *stop = ((*start) + (((size_t)1) << _task_size_log2));
62 return true;
63 }
64
65 // Calculate starting values.
66 void setup(Thread* thread) {
67 thread_owns_resize_lock(thread);
68 _size_log2 = _cht->_table->_log2_size;
69 _task_size_log2 = MIN2(_task_size_log2, _size_log2);
70 size_t tmp = _size_log2 > _task_size_log2 ?
71 _size_log2 - _task_size_log2 : 0;
72 _stop_task = (((size_t)1) << tmp);
73 }
74
75 // Returns false if all ranges are claimed.
76 bool have_more_work() {
77 return OrderAccess::load_acquire(&_next_to_claim) >= _stop_task;
78 }
79
80 void thread_owns_resize_lock(Thread* thread) {
81 assert(BucketsOperation::_cht->_resize_lock_owner == thread,
82 "Should be locked by me");
83 assert(BucketsOperation::_cht->_resize_lock->owned_by_self(),
84 "Operations lock not held");
85 }
86 void thread_owns_only_state_lock(Thread* thread) {
87 assert(BucketsOperation::_cht->_resize_lock_owner == thread,
88 "Should be locked by me");
89 assert(!BucketsOperation::_cht->_resize_lock->owned_by_self(),
90 "Operations lock held");
91 }
92 void thread_do_not_own_resize_lock(Thread* thread) {
93 assert(!BucketsOperation::_cht->_resize_lock->owned_by_self(),
94 "Operations lock held");
95 assert(BucketsOperation::_cht->_resize_lock_owner != thread,
96 "Should not be locked by me");
97 }
98
99 public:
100 // Pauses for safepoint
101 void pause(Thread* thread) {
102 // This leaves internal state locked.
103 this->thread_owns_resize_lock(thread);
104 BucketsOperation::_cht->_resize_lock->unlock();
105 this->thread_owns_only_state_lock(thread);
106 }
107
108 // Continues after safepoint.
109 void cont(Thread* thread) {
110 this->thread_owns_only_state_lock(thread);
111 // If someone slips in here directly after safepoint.
112 while (!BucketsOperation::_cht->_resize_lock->try_lock())
113 { /* for ever */ };
114 this->thread_owns_resize_lock(thread);
115 }
116 };
117
118 // For doing pausable/parallel bulk delete.
119 template <typename VALUE, typename CONFIG, MEMFLAGS F>
120 class ConcurrentHashTable<VALUE, CONFIG, F>::BulkDeleteTask :
121 public BucketsOperation
122 {
123 public:
124 BulkDeleteTask(ConcurrentHashTable<VALUE, CONFIG, F>* cht, bool is_mt = false)
125 : BucketsOperation(cht, is_mt) {
126 }
127 // Before start prepare must be called.
128 bool prepare(Thread* thread) {
129 bool lock = BucketsOperation::_cht->try_resize_lock(thread);
130 if (!lock) {
131 return false;
132 }
133 this->setup(thread);
134 return true;
135 }
136
137 // Does one range destroying all matching EVALUATE_FUNC and
138 // DELETE_FUNC is called be destruction. Returns true if there is more work.
139 template <typename EVALUATE_FUNC, typename DELETE_FUNC>
140 bool do_task(Thread* thread, EVALUATE_FUNC& eval_f, DELETE_FUNC& del_f) {
141 size_t start, stop;
142 assert(BucketsOperation::_cht->_resize_lock_owner != NULL,
143 "Should be locked");
144 if (!this->claim(&start, &stop)) {
145 return false;
146 }
147 BucketsOperation::_cht->do_bulk_delete_locked_for(thread, start, stop,
148 eval_f, del_f,
149 BucketsOperation::_is_mt);
150 assert(BucketsOperation::_cht->_resize_lock_owner != NULL,
151 "Should be locked");
152 return true;
153 }
154
155 // Must be called after ranges are done.
156 void done(Thread* thread) {
157 this->thread_owns_resize_lock(thread);
158 BucketsOperation::_cht->unlock_resize_lock(thread);
159 this->thread_do_not_own_resize_lock(thread);
160 }
161 };
162
163 template <typename VALUE, typename CONFIG, MEMFLAGS F>
164 class ConcurrentHashTable<VALUE, CONFIG, F>::GrowTask :
165 public BucketsOperation
166 {
167 public:
168 GrowTask(ConcurrentHashTable<VALUE, CONFIG, F>* cht) : BucketsOperation(cht) {
169 }
170 // Before start prepare must be called.
171 bool prepare(Thread* thread) {
172 if (!BucketsOperation::_cht->internal_grow_prolog(
173 thread, BucketsOperation::_cht->_log2_size_limit)) {
174 return false;
175 }
176 this->setup(thread);
177 return true;
178 }
179
180 // Re-sizes a portion of the table. Returns true if there is more work.
181 bool do_task(Thread* thread) {
182 size_t start, stop;
183 assert(BucketsOperation::_cht->_resize_lock_owner != NULL,
184 "Should be locked");
185 if (!this->claim(&start, &stop)) {
186 return false;
187 }
188 BucketsOperation::_cht->internal_grow_range(thread, start, stop);
189 assert(BucketsOperation::_cht->_resize_lock_owner != NULL,
190 "Should be locked");
191 return true;
192 }
193
194 // Must be called after do_task returns false.
195 void done(Thread* thread) {
196 this->thread_owns_resize_lock(thread);
197 BucketsOperation::_cht->internal_grow_epilog(thread);
198 this->thread_do_not_own_resize_lock(thread);
199 }
200 };
201
202 #endif // include guard
|