Skip to content

Commit 9f196f6

Browse files
committed
Add full thread pool example and description
Approaches are explained first, then the code and explanations on changes.
1 parent b9ddce2 commit 9f196f6

File tree

2 files changed

+222
-7
lines changed

2 files changed

+222
-7
lines changed

concurrency-primer.tex

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ \subsection{ABA problem}
916916

917917
\begin{ccode}
918918
job_t *job = atomic_load(&thrd_pool->head->prev);
919-
while (!atomic_compare_exchange_strong(&thrd_pool->head->prev, &job,
919+
while (!atomic_compare_exchange_weak(&thrd_pool->head->prev, &job,
920920
job->prev)) {
921921
}
922922
\end{ccode}
@@ -930,23 +930,46 @@ \subsection{ABA problem}
930930
\item Thread B sets thread pool state to idle.
931931
\item Main thread fininshes waiting and adds more jobs.
932932
\item Memory allocator reuses the recently freed memory as new jobs addresses.
933-
\item Fortunately, the first added job has the same address as the one Thread A holded.
933+
\item Fortunately, the first added job has the same address as the one thread A holded.
934934
\item Thread A is back in running state. The comparison result is equal so it updates \monobox{thrd\_pool->head->prev} with the old \monobox{job->prev}, which is already a dangling pointer.
935935
\item Another thread loads the dangling pointer from \monobox{thrd\_pool->head->prev}.
936936
\end{enumerate}
937937

938-
Notice that even though \monobox{job->prev} is not loaded explicitly before comparison, compiler could place loading instructions before comparison.
938+
Notice that even though \monobox{job->prev} is not loaded explicitly before comparison, compiler could place loading instructions before comparison.
939939
At the end, the dangling pointer could either point to garbage or trigger segmentation fault.
940940
It could be even worse if nested ABA problem occurs in thread B.
941+
Also, the possibility to allocate a job with same address could be higher when using memory pool, meaning that more chances to have ABA problem occurred.
942+
In fact, pre-allocated memory should be used to achive lock-free since \monobox{malloc} could have mutex involved in multithreaded environment.
941943

942944
Failure to recognize changed target object through comparison can result in stale information.
943945
The general concept of solving this problem involves adding more information to make different state distinguishable, and then making a decision on whether to act on the old state or retry with the new state.
944946
If acting on the old state is chosen, then safe memory reclamation should be considered as memory may have already been freed by other threads.
945947
More aggressively, one might consider the programming paradigm where each operation on the target object does not have a side effect on modifying it.
946-
In the later section, we will introduce a different way of implementing atomic \textsc{RMW} operations by using LL/SC instructions. The exclusiveness provided by LL/SC instructions avoids the pitfall introduced by comparison.
947-
948-
To make different state distinguishable, a common solution is adding a version number to be compared as well.
949-
948+
In the later section, we will introduce a different way of implementing atomic \textsc{RMW} operations by using LL/SC instructions. The exclusive status provided by LL/SC instructions avoids the pitfall introduced by comparison.
949+
950+
To make different state distinguishable, a common solution is incrementing a version number each time target object is changed.
951+
By bundling the target object and version into a comparison, it ensures that each change marks a distinguishable result.
952+
Given a sufficient large size for version number, there should be no repeated version numbers.
953+
There are multiple methods for storing the version number, depending on the evaluation of the duration before a version number wraps around.
954+
In the thread pool example, the target object is a pointer. The unused bits in a pointer can be utilized to store the version number.
955+
In addition to embedding the version number into a pointer, we could consider utilizing an additional 32-bit or 64-bit value next to the target object for the version number.
956+
It requires the compare-and-swap instruction to be capable of comparing a wider size at once.
957+
Sometimes, this is referred to as \introduce{double-width compare-and-swap}.
958+
On x86-64 processors, for atomic instructions that load or store more that a CPU word size, it needs additional hardware support.
959+
You can use \monobox{grep cx16 /proc/crpuinfo} to check if the processor supports 16-byte compare-and-swap.
960+
For hardware that does not support the desired size, software implementations which may have locks involve are used instead as mentioned in \secref{arbitrarily-size}.
961+
Back to the example, the following code is fixed by using an an version number that increments each time a job is added to the empty queue. On x86-64, add a compiler flag \monobox{-mcx64} to enable 16-byte compare-and-swap in \monobox{worker} function.
962+
963+
\inputminted{c}{./examples/rmw_example_aba.c}
964+
965+
Notice that, in the \monobox{struct idle\_job}, a union is used for type punning, which bundles the pointer and version number for compare-and-swap.
966+
Directly casting a job pointer to a pointer that points to a 16-byte object is undefined behavior (due to having different alignment), thus type punnined is used instead.
967+
By using this techniques, \monobox{struct idle\_job} still can be accessed normally in other places, minimizing code modification.
968+
Compiler optimizations are conservative on type punning, but it is acceptable for atomic operations.
969+
See \secref{fusing}.
970+
Another way to prevent ABA problem in the example is using safe memory reclamation mechanisms.
971+
Different from previously mentioned acting on the old state, the address of a job is not freed until no one is using it.
972+
This prevents memory allocator or memory pool reuses the address and causing problem.
950973

951974
\section{Sequential consistency on weakly-ordered hardware}
952975

examples/rmw_example_aba.c

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
#include <stdalign.h>
2+
#include <stdio.h>
3+
#include <stdatomic.h>
4+
#include <threads.h>
5+
#include <stdlib.h>
6+
#include <stdbool.h>
7+
#include <assert.h>
8+
9+
#define CACHE_LINE_SIZE 64
10+
#define N_JOBS 16
11+
#define N_THREADS 8
12+
13+
typedef struct job {
14+
void *args;
15+
struct job *next, *prev;
16+
} job_t;
17+
18+
typedef struct idle_job {
19+
union {
20+
struct {
21+
_Atomic(job_t *) prev;
22+
unsigned long long version;
23+
};
24+
_Atomic struct B16 {
25+
job_t *_prev;
26+
unsigned long long _version;
27+
} DCAS;
28+
};
29+
char padding[CACHE_LINE_SIZE - sizeof(_Atomic(job_t *)) -
30+
sizeof(unsigned long long)];
31+
job_t job;
32+
} idle_job_t;
33+
34+
enum state { idle, running, cancelled };
35+
36+
typedef struct thread_pool {
37+
atomic_flag initialezed;
38+
int size;
39+
thrd_t *pool;
40+
atomic_int state;
41+
thrd_start_t func;
42+
// job queue is a SPMC ring buffer
43+
idle_job_t *head;
44+
} thread_pool_t;
45+
46+
static int worker(void *args)
47+
{
48+
if (!args)
49+
return EXIT_FAILURE;
50+
thread_pool_t *thrd_pool = (thread_pool_t *)args;
51+
52+
while (1) {
53+
if (atomic_load(&thrd_pool->state) == cancelled)
54+
return EXIT_SUCCESS;
55+
if (atomic_load(&thrd_pool->state) == running) {
56+
// claim the job
57+
struct B16 job = atomic_load(&thrd_pool->head->DCAS);
58+
struct B16 next;
59+
do {
60+
next._prev = job._prev->prev;
61+
next._version = job._version;
62+
} while (!atomic_compare_exchange_weak(&thrd_pool->head->DCAS, &job,
63+
next));
64+
65+
if (job._prev->args == NULL) {
66+
atomic_store(&thrd_pool->state, idle);
67+
} else {
68+
printf("Hello from job %d\n", *(int *)job._prev->args);
69+
free(job._prev->args);
70+
free(job._prev); // could cause dangling pointer in other threads
71+
}
72+
} else {
73+
/* To auto run when jobs added, set status to running if job queue is not empty.
74+
* As long as the producer is protected */
75+
thrd_yield();
76+
continue;
77+
}
78+
};
79+
}
80+
81+
static bool thread_pool_init(thread_pool_t *thrd_pool, size_t size)
82+
{
83+
if (atomic_flag_test_and_set(&thrd_pool->initialezed)) {
84+
printf("This thread pool has already been initialized.\n");
85+
return false;
86+
}
87+
88+
assert(size > 0);
89+
thrd_pool->pool = malloc(sizeof(thrd_t) * size);
90+
if (!thrd_pool->pool) {
91+
printf("Failed to allocate thread identifiers.\n");
92+
return false;
93+
}
94+
95+
// May use memory pool for jobs
96+
idle_job_t *idle_job = malloc(sizeof(idle_job_t));
97+
if (!idle_job) {
98+
printf("Failed to allocate idle job.\n");
99+
return false;
100+
}
101+
// idle_job will always be the first job
102+
idle_job->job.args = NULL;
103+
idle_job->job.next = &idle_job->job;
104+
idle_job->job.prev = &idle_job->job;
105+
idle_job->prev = &idle_job->job;
106+
idle_job->version = 0ULL;
107+
thrd_pool->func = worker;
108+
thrd_pool->head = idle_job;
109+
thrd_pool->state = idle;
110+
thrd_pool->size = size;
111+
112+
for (size_t i = 0; i < size; i++) {
113+
thrd_create(thrd_pool->pool + i, worker, thrd_pool);
114+
//TODO: error handling
115+
}
116+
117+
return true;
118+
}
119+
120+
static void thread_pool_destroy(thread_pool_t *thrd_pool)
121+
{
122+
if (atomic_exchange(&thrd_pool->state, cancelled))
123+
printf("Thread pool cancelled with jobs still running.\n");
124+
for (int i = 0; i < thrd_pool->size; i++) {
125+
thrd_join(thrd_pool->pool[i], NULL);
126+
}
127+
while (thrd_pool->head->prev != &thrd_pool->head->job) {
128+
job_t *job = thrd_pool->head->prev->prev;
129+
free(thrd_pool->head->prev);
130+
thrd_pool->head->prev = job;
131+
}
132+
free(thrd_pool->head);
133+
free(thrd_pool->pool);
134+
atomic_fetch_and(&thrd_pool->state, 0);
135+
atomic_flag_clear(&thrd_pool->initialezed);
136+
}
137+
138+
__attribute__((nonnull(2))) static bool add_job(thread_pool_t *thrd_pool,
139+
void *args)
140+
{
141+
// May use memory pool for jobs
142+
job_t *job = malloc(sizeof(job_t));
143+
if (!job)
144+
return false;
145+
146+
// unprotected producer
147+
job->args = args;
148+
job->next = thrd_pool->head->job.next;
149+
job->prev = &thrd_pool->head->job;
150+
thrd_pool->head->job.next->prev = job;
151+
thrd_pool->head->job.next = job;
152+
if (thrd_pool->head->prev == &thrd_pool->head->job) {
153+
thrd_pool->head->prev = job;
154+
thrd_pool->head->version += 1;
155+
// trap worker at idle job
156+
thrd_pool->head->job.prev = &thrd_pool->head->job;
157+
}
158+
159+
return true;
160+
}
161+
162+
static inline void wait_until(thread_pool_t *thrd_pool, int state)
163+
{
164+
while (atomic_load(&thrd_pool->state) != state) {
165+
thrd_yield();
166+
}
167+
}
168+
169+
int main()
170+
{
171+
thread_pool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT };
172+
if (!thread_pool_init(&thrd_pool, N_THREADS)) {
173+
printf("failed to init.\n");
174+
return 0;
175+
}
176+
for (int i = 0; i < N_JOBS; i++) {
177+
int *id = malloc(sizeof(int));
178+
*id = i;
179+
add_job(&thrd_pool, id);
180+
}
181+
// Due to simplified job queue (not protecting producer), starting the pool manually
182+
atomic_store(&thrd_pool.state, running);
183+
wait_until(&thrd_pool, idle);
184+
for (int i = 0; i < N_JOBS; i++) {
185+
int *id = malloc(sizeof(int));
186+
*id = i;
187+
add_job(&thrd_pool, id);
188+
}
189+
atomic_store(&thrd_pool.state, running);
190+
thread_pool_destroy(&thrd_pool);
191+
return 0;
192+
}

0 commit comments

Comments
 (0)