@@ -22,40 +22,41 @@ class lock_free_queue {
22
22
// Keeping the structure within a machine word makes it more likely
23
23
// that the atomic operations can be lock-free on many platforms
24
24
unsigned internal_count:30 ;
25
- unsigned external_counters:2 ; // 2
25
+ unsigned external_counters:2 ;
26
26
};
27
27
28
28
struct node
29
29
{
30
30
std::atomic<T*> data;
31
- std::atomic<node_counter> count; // 3
32
- counted_node_ptr next;
31
+ std::atomic<node_counter> count;
32
+ std::atomic< counted_node_ptr> next; // 1
33
33
34
34
node () {
35
35
node_counter new_count;
36
36
new_count.internal_count = 0 ;
37
37
// queue's own counters: only head and tail (max = 2)
38
- new_count.external_counters = 2 ; // 4
38
+ new_count.external_counters = 2 ;
39
39
count.store (new_count);
40
40
41
41
next.ptr = nullptr ;
42
42
next.external_count = 0 ;
43
43
}
44
44
45
45
/* *
46
- * Decreases this node's internal_count and deletes useless node */
46
+ * Decreases this node's internal_count and deletes useless node.
47
+ * Release the single reference held by this thread */
47
48
void release_ref () {
48
49
node_counter old_counter = count.load (std::memory_order_relaxed);
49
50
node_counter new_counter;
50
51
51
52
do {
52
53
new_counter = old_counter;
53
- --new_counter.internal_count ; // 1
54
- } while (!count.compare_exchange_strong (old_counter, new_counter, // 2
54
+ --new_counter.internal_count ;
55
+ } while (!count.compare_exchange_strong (old_counter, new_counter,
55
56
std::memory_order_acquire, std::memory_order_relaxed));
56
57
57
58
if (!new_counter.internal_count && !new_counter.external_counters ) {
58
- delete this ; // 3
59
+ delete this ;
59
60
}
60
61
}
61
62
@@ -94,20 +95,35 @@ class lock_free_queue {
94
95
node_counter new_counter;
95
96
do {
96
97
new_counter = old_counter;
97
- --new_counter.external_counters ; // 1
98
- new_counter.internal_count += count_increase; // 2
99
- } while (!ptr->count .compare_exchange_strong (old_counter, new_counter, // 3
98
+ --new_counter.external_counters ;
99
+ new_counter.internal_count += count_increase;
100
+ } while (!ptr->count .compare_exchange_strong (old_counter, new_counter,
100
101
std::memory_order_acquire, std::memory_order_relaxed));
101
102
102
103
// delete node if all counters became 0
103
104
if (!new_counter.internal_counter && !new_counter.external_counters ) {
104
- delete ptr; // 4
105
+ delete ptr;
105
106
}
106
107
};
108
+
109
+ void set_new_tail (counted_node_ptr& old_tail, counted_node_ptr const & new_tail) { // 1
110
+ node* const current_tail_ptr = old_tail.ptr ;
111
+
112
+ // if another thread updated tail -> check that new tail pointer to node is the same as
113
+ // it was before loop -> break if not the same
114
+ while (!tail.compare_exchange_weak (old_tail, new_tail) && old_tail.ptr == current_tail_ptr); // 2
115
+
116
+ if (old_tail.ptr == current_tail_ptr) // 3
117
+ // ptr is the same once the loop has exited, successfully set the tail
118
+ free_external_count (old_tail); // 4
119
+ else
120
+ // another thread will have freed the counter, release the single reference held by this thread
121
+ current_tail_ptr->release_ref (); // 5
122
+ }
107
123
};
108
124
109
125
std::atomic<counted_node_ptr> head;
110
- std::atomic<counted_node_ptr> tail; // 1
126
+ std::atomic<counted_node_ptr> tail;
111
127
112
128
public:
113
129
/* *
@@ -121,54 +137,70 @@ class lock_free_queue {
121
137
counted_node_ptr old_tail = tail.load ();
122
138
123
139
for (;;) {
124
- increase_external_count (tail, old_tail); // 5
140
+ increase_external_count (tail, old_tail);
125
141
126
142
T* old_data = nullptr ;
127
- // no other thread can modify tail while data is not nullptr
143
+
128
144
if (old_tail.ptr ->data .compare_exchange_strong (old_data, new_data.get ())) // 6
129
145
{
130
- old_tail.ptr ->next = new_next;
131
- old_tail = tail.exchange (new_next);
132
- // now node is fully pushed (data is nullptr), so other threads
133
- // can proceed with their pushes
134
- free_external_counter (old_tail); // 7
146
+ // successfully set the tail's node's data pointer to 'new_data' because it was 'nullptr'
147
+ counted_node_ptr old_next = { 0 };
148
+ if (!old_tail.ptr ->next .compare_exchange_strong (old_next, new_next)) { // 7
149
+ // another thread has already helped this thread and set the next pointer in step 10
150
+ delete new_next.ptr ; // 8: don't need the new empty node, delete it
151
+ new_next = old_next; // 9: use the next value that the other thread set for updating tail
152
+ }
153
+ set_new_tail (old_tail, new_next);
135
154
new_data.release ();
136
155
break ;
137
156
}
138
- old_tail.ptr ->release_ref (); //
157
+ else { // 10: help the successful thread to complete the update: set 'next' and 'tail'
158
+ // the tail's node's data pointer was not 'nullptr'
159
+ counted_node_ptr old_next = { 0 };
160
+ if (old_tail.ptr ->next .compare_exchange_strong (old_next, new_next)) { // 11
161
+ old_next = new_next; // 12
162
+ new_next.ptr = new node; // 13
163
+ }
164
+ set_new_tail (old_tail, old_next); // 14
165
+ }
139
166
}
140
167
}
141
168
142
169
/* *
143
170
* Pops node from the queue */
144
171
std::unique_ptr<T> pop () {
145
- counted_node_ptr old_head = head.load (std::memory_order_relaxed); // 1
172
+ counted_node_ptr old_head = head.load (std::memory_order_relaxed);
146
173
147
174
for (;;) {
148
- increase_external_count (head, old_head); // 2
175
+ increase_external_count (head, old_head);
149
176
node* const ptr = old_head.ptr ;
150
177
151
- // if queue is empty ('ptr' is null)
178
+ /* if head's counted_node_ptr's ptr points to the same node as tail
179
+ // it means that the queue is empty so return new empty unique_ptr */
152
180
if (ptr == tail.load ().ptr ) {
153
- prt->release_ref (); // 3
181
+ prt->release_ref ();
154
182
return std::unique_ptr<T>();
155
183
}
156
184
185
+ counted_node_ptr next = ptr->next .load (); // 2
157
186
// this compares the external count and pointer as a single entity
158
- if (head.compare_exchange_strong (old_head, ptr-> next )) { // 4
187
+ if (head.compare_exchange_strong (old_head, next)) {
159
188
T* const res = ptr->data .exchange (nullptr );
160
- free_external_count (old_head); // 5
189
+ free_external_count (old_head);
161
190
return std::unique_ptr<T>(res);
162
191
}
163
192
164
- ptr->release_ref ; // 6
193
+ ptr->release_ref ;
165
194
}
166
195
}
167
196
};
168
197
169
198
170
199
int main ()
171
200
{
201
+ lock_free_queue<int >* lfq = new lock_free_queue<int >();
202
+ lfq->push (5 );
203
+
172
204
return 0 ;
173
205
}
174
206
0 commit comments