Skip to content

Commit 11baa72

Browse files
weihsinyehidoleat
andcommitted
Add atomic instruction and simplify rmw_example.c
Add the description of atomic instruction to let readers know there is a difference between using fetch and..., which is only a programming tool, and its actual execution as an atomic operation that depends on the compiler. Simplify the rmw_example code to provide more flexible examples. - Initially, all worker threads will be initialized. The main thread will ask all workers to start running. If there is no job or the job is completed, the worker will become idle. Next, the main thread will continue to add more jobs and ask the worker to start running again. Meanwhile, the main thread will also wait for the results of the work. - Use the struct `tpool_future` to record all the information required for the job. Co-authored-by: Chih-Wei Chien <[email protected]>
1 parent 4dc5699 commit 11baa72

File tree

4 files changed

+89
-87
lines changed

4 files changed

+89
-87
lines changed

concurrency-primer.tex

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -396,12 +396,12 @@ \section{Read-modify-write}
396396
In \secref{atomicity}, there is a need for atomicity to ensure that a group of operations is not only sequentially executed but also completes without being interrupted by operation from other threads.
397397
This establishes correct order of operations from different threads.
398398

399-
\includegraphics[keepaspectratio, width=0.6\linewidth]{images/atomic_rmw}
399+
\includegraphics[keepaspectratio, width=0.6\linewidth]{images/atomic-rmw}
400400
\captionof{figure}{Exchange, Test and Set, Fetch and…, Compare and Swap can all be transformed into atomic RMW operations, ensuring that operations like t1 \to t2 \to t3 will become an atomic step.}
401-
\label{fig:atomic_rmw}
401+
\label{fig:atomic-rmw}
402402

403403
Atomic loads and stores are all well and good when we do not need to consider the previous state of atomic variables, but sometimes we need to read a value, modify it, and write it back as a single atomic step.
404-
As shown in \fig{fig:atomic_rmw}, the modification is based on the previous state that is visible for reading, and the result is then written back.
404+
As shown in \fig{fig:atomic-rmw}, the modification is based on the previous state that is visible for reading, and the result is then written back.
405405
A complete \introduce{read-modify-write} operation is performed atomically to ensure visibility to subsequent operations.
406406

407407
Furthermore, for communication between concurrent threads, a shared resource is required, as shown in \fig{fig:atomicity}
@@ -411,16 +411,16 @@ \section{Read-modify-write}
411411

412412
As discussed earlier, the process of accessing shared resources responsible for communication must also ensure both order and non-interference.
413413
To prevent the recursive protection of shared resources,
414-
atomic operations can be introduced for the shared resources responsible for communication, as shown in \fig{fig:atomic_types}.
414+
atomic operations can be introduced for the shared resources responsible for communication, as shown in \fig{fig:atomic-types}.
415415

416416
There are a few common \introduce{read-modify-write} (\textsc{RMW}) operations to make theses operation become a single atomic step.
417417
In \cplusplus{}, they are represented as member functions of \cpp|std::atomic<T>|.
418418
In \clang{}, they are freestanding functions.
419419

420-
\includegraphics[keepaspectratio, width=1\linewidth]{images/atomic_types}
420+
\includegraphics[keepaspectratio, width=1\linewidth]{images/atomic-types}
421421
\captionof{figure}{Test and Set (Left) and Compare and Swap (Right) leverage their functionality of checking and their atomicity to make other RMW operations perform atomically.
422422
The red color represents atomic RMW operations, while the blue color represents RMW operations that behave atomically.}
423-
\label{fig:atomic_types}
423+
\label{fig:atomic-types}
424424

425425
\subsection{Exchange}
426426
\label{exchange}
@@ -441,7 +441,7 @@ \subsection{Test and set}
441441
\introduce{Test-and-set} operations are not limited to just \textsc{RMW} functions;
442442
they can also be utilized for constructing simple spinlock.
443443
In this scenario, the flag acts as a shared resource for communication between threads.
444-
Thus, spinlock implemented with \introduce{Test-and-set} operations ensures that entire \textsc{RMW} operations on shared resources are performed atomically, as shown in \fig{fig:atomic_types}.
444+
Thus, spinlock implemented with \introduce{Test-and-set} operations ensures that entire \textsc{RMW} operations on shared resources are performed atomically, as shown in \fig{fig:atomic-types}.
445445
\label{spinlock}
446446
\begin{ccode}
447447
atomic_flag af = ATOMIC_FLAG_INIT;
@@ -464,7 +464,7 @@ \subsection{Fetch and…}
464464
or bitwise \textsc{AND}, \textsc{OR}, \textsc{XOR}) and return its previous value,
465465
all as part of a single atomic operation.
466466
Compare with \introduce{Exchange} \secref{exchange}, when programmers only need to make simple modification to the shared variable,
467-
they can use \introduce{Fetch and…}.
467+
they can use \introduce{Fetch-and…}.
468468

469469
\subsection{Compare and swap}
470470
\label{cas}
@@ -501,50 +501,57 @@ \subsection{Compare and swap}
501501
Subsequently, update the expected value with the current shared value and retry modify in a loop.
502502
This iterative process allows \textsc{CAS} to serve as a communication mechanism between threads,
503503
ensuring that entire \textsc{RMW} operations on shared resources are performed atomically.
504-
As shown in \fig{fig:atomic_types}, compared with \introduce{Test-and-set} \secref{Testandset},
504+
As shown in \fig{fig:atomic-types}, compared with \introduce{Test-and-set} \secref{Testandset},
505505
a thread that employs \textsc{CAS} can directly use the shared resource to check.
506506
It uses atomic \textsc{CAS} to ensure that Modify is atomic,
507507
coupled with a while loop to ensure that the entire \textsc{RMW} can behave atomically.
508508

509+
~\\
510+
However, atomic \textsc{RMW} operations here are merely a programming tool for programmers to achieve program logic correctness.
511+
Its actual execution as atomic operations depends on the how compiler translate it into actual atomic instructions based on differenct hardware instruction set.
512+
\introduce{Exchange}, \introduce{Fetch-and-Add}, \introduce{Test-and-set} and \textsc{CAS} in instruction level are different style of atomic \textsc{RMW} instructions.
513+
ISA could only provide some of them,
514+
leaving the rest to compilers to synthesize atomic \textsc{RMW} operations.
515+
For example, In IA32/64 and IBM System/360/z architectures,
516+
\introduce{Test-and-set} functionality is directly supported by hardware instructions.
517+
x86 has XCHG, XADD for \introduce{Exchange} and \introduce{Fetch-and-Add} but has \introduce{Test-and-set} implemented with XCHG.
518+
Arm, in another style, provides LL/SC (Load Linked/Store Conditional) flavor instructions for all the operations,
519+
with \textsc{CAS} added in Armv8/v9-A.
520+
509521
\subsection{example}
510522
\label{rmw_example}
511-
Following example code is a simplify implementation of thread pool to demonstrate the use of \clang{}11 atomic library.
523+
The following example code is a simplified implementation of a thread pool, which demonstrates the use of \clang{}11 atomic library.
512524

513525
\inputminted{c}{./examples/rmw_example.c}
514526

515-
%Compile the code with \monobox{gcc rmw\_example.c -o rmw\_example -Wall -Wextra -std=c11 -pthread} and execute the program.
516-
%A thread pool has three states: idle, cancelled and running.
517-
%It is initialized with \monobox{N\_THREADS} (default 8) of threads.
518-
%\monobox{N\_JOBS} (default 16) of jobs are added, and the pool is then set to running.
519-
%A job is simply echoing its job ID.
520-
%\monobox{sleep(1)} is used to ensure that the second batch of jobs is added after the first batch is finished; otherwise, jobs may not be consumed as expected.
521-
%Thread pool is then destroyed right after starting running.
522527
Stdout of the program is:
523528
\begin{ccode}
524-
PI calculated with 101 terms: 3.141592653589793
529+
PI calculated with 100 terms: 3.141592653589793
525530
\end{ccode}
526531

527532
\textbf{Exchange}
528-
In function \monobox{thread\_pool\_destroy}, \monobox{atomic\_exchange(\&thrd\_pool->state, cancelled)} reads current state and replaces it with "cancelled". A warning message is printed if the pool is destroyed when still running.
529-
If the exchange is not performed atomically, we may initially get the state as "running". Subsequently, a thread could set the state to "cancelled" after finishing the last one, resulting in a false warning.
533+
In function \monobox{thread\_pool\_destroy}, \monobox{atomic\_exchange(\&thrd\_pool->state, cancelled)} reads the current state and replaces it with ``cancelled''.
534+
A warning message is printed if the pool is destroyed while workers are still ``running''.
535+
If the exchange is not performed atomically, we may initially get the state as ``running''. Subsequently, a thread could set the state to ``cancelled'' after finishing the last one, resulting in a false warning.
530536

531537
\textbf{Test and set}
532-
In the example, the scenario is as follows:
533-
First, the main thread initially acquire a lock \monobox{future->flag} and then set it true,
534-
which is akin to creating a job and then transfer its ownership to the worker.
535-
Subsequently, the main thread will be blocked until the worker clear the flag.
536-
This inidcate the main thread will wail until the worker completes the job and return the ownership back to the main thread, which ensure correct cooperation.
538+
In this example, the scenario is as follows:
539+
First, the main thread initially acquires a lock \monobox{future->flag} and then sets it true,
540+
which is akin to creating a job and then transferring its ownership to the worker.
541+
Subsequently, the main thread will be blocked until the worker clears the flag.
542+
This indicates that the main thread will wail until the worker completes the job and returns ownership back to the main thread, which ensures correct cooperation.
537543

538544
\textbf{Fetch and…}
539-
In the function \monobox{thread\_pool\_destroy}, \monobox{atomic\_fetch\_and} is utilized as a means to set the state to idle.
545+
In the function \monobox{thread\_pool\_destroy}, \monobox{atomic\_fetch\_and} is utilized as a means to set the state to ``idle''.
540546
Yet, in this case, it is not necessary, as the pool needs to be reinitialized for further use regardless.
541547
Its return value could be further utilized, for instance, to report the previous state and perform additional actions.
542548

543549
\textbf{Compare and swap}
544550
Once threads are created in the thread pool as workers, they will continuously search for jobs to do.
545-
Jobs are taken from the tail of job queue.
546-
To claim a job without it being taken by another worker halfway through, we need to atomically change the pointer to the last job. Otherwise the last job is under races.
547-
The while loop in function \monobox{worker},
551+
Jobs are taken from the tail of the job queue.
552+
To take a job without being taken by another worker halfway through, we need to atomically change the pointer to the last job.
553+
Otherwise, the last job is under race.
554+
The while loop in the function \monobox{worker},
548555
\begin{ccode}
549556
while (!atomic_compare_exchange_weak(&thrd_pool->head->prev, &job,
550557
job->prev)) {
@@ -575,10 +582,6 @@ \subsection{Further improvements}
575582
Without specifying, atomic operations in \clang{}11 atomic library use \monobox{memory\_order\_seq\_cst} as default memory order. Operations post-fix with \monobox{\_explicit} accept an additional argument to specify which memory order to use.
576583
How to leverage memory orders to optimize performance will be covered later in \secref{lock-example}.
577584

578-
You may have noticed that there is padding after \monobox{\_Atomic(job\_t *) prev} in \monobox{struct idle\_job} in the example.
579-
It is used for preventing \introduce{false sharing} in a cache line.
580-
Further discussion on cache effects and false sharing is provided in \secref{false-sharing}.
581-
582585
\section{Atomic operations as building blocks}
583586

584587
Atomic loads, stores, and \textsc{RMW} operations are the building blocks for every single concurrency tool.

examples/rmw_example.c

Lines changed: 53 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,28 @@
44
#include <stdlib.h>
55
#include <stdbool.h>
66
#include <assert.h>
7-
87
#include <math.h>
98

109
#define PRECISION 100 /* upper bound in BPP sum */
11-
1210
#define CACHE_LINE_SIZE 64
1311
#define N_THREADS 64
1412

1513
struct tpool_future {
1614
void *result;
15+
void *arg;
1716
atomic_flag flag;
1817
};
1918

2019
typedef struct job {
2120
void *(*func)(void *);
22-
void *args;
2321
struct tpool_future *future;
2422
struct job *next, *prev;
2523
} job_t;
2624

2725
typedef struct idle_job {
2826
_Atomic(job_t *) prev;
29-
char padding[CACHE_LINE_SIZE - sizeof(_Atomic(job_t *))];
27+
char padding[CACHE_LINE_SIZE -
28+
sizeof(_Atomic(job_t *))]; /* avoid false sharing */
3029
job_t job;
3130
} idle_job_t;
3231

@@ -38,15 +37,15 @@ typedef struct tpool {
3837
thrd_t *pool;
3938
atomic_int state;
4039
thrd_start_t func;
41-
// job queue is a SPMC ring buffer
42-
idle_job_t *head;
40+
idle_job_t *head; /* job queue is a SPMC ring buffer */
4341
} tpool_t;
4442

45-
static struct tpool_future *tpool_future_create(void)
43+
static struct tpool_future *tpool_future_create(void *arg)
4644
{
4745
struct tpool_future *future = malloc(sizeof(struct tpool_future));
4846
if (future) {
4947
future->result = NULL;
48+
future->arg = arg;
5049
atomic_flag_clear(&future->flag);
5150
atomic_flag_test_and_set(&future->flag);
5251
}
@@ -72,28 +71,28 @@ static int worker(void *args)
7271
tpool_t *thrd_pool = (tpool_t *)args;
7372

7473
while (1) {
74+
/* worker is laid off */
7575
if (atomic_load(&thrd_pool->state) == cancelled)
7676
return EXIT_SUCCESS;
7777
if (atomic_load(&thrd_pool->state) == running) {
78-
// claim the job
78+
/* worker takes the job */
7979
job_t *job = atomic_load(&thrd_pool->head->prev);
80-
while (!atomic_compare_exchange_weak(&thrd_pool->head->prev, &job,
81-
job->prev)) {
82-
}
83-
if (job->args == NULL) {
80+
/* worker checks if there is only an idle job in the job queue */
81+
if (job == &thrd_pool->head->job) {
82+
/* worker says it is idle */
8483
atomic_store(&thrd_pool->state, idle);
85-
} else {
86-
void *ret_value = job->func(job->args);
87-
job->future->result = ret_value;
88-
atomic_flag_clear(&job->future->flag);
89-
free(job->args);
90-
free(job);
84+
thrd_yield();
85+
continue;
9186
}
87+
while (!atomic_compare_exchange_weak(&thrd_pool->head->prev, &job,
88+
job->prev))
89+
;
90+
job->future->result = (void *)job->func(job->future->arg);
91+
atomic_flag_clear(&job->future->flag);
92+
free(job);
9293
} else {
93-
/* To auto run when jobs added, set status to running if job queue is not empty.
94-
* As long as the producer is protected */
94+
/* worker is idle */
9595
thrd_yield();
96-
continue;
9796
}
9897
};
9998
return EXIT_SUCCESS;
@@ -113,14 +112,13 @@ static bool tpool_init(tpool_t *thrd_pool, size_t size)
113112
return false;
114113
}
115114

116-
// May use memory pool for jobs
117115
idle_job_t *idle_job = malloc(sizeof(idle_job_t));
118116
if (!idle_job) {
119117
printf("Failed to allocate idle job.\n");
120118
return false;
121119
}
122-
// idle_job will always be the first job
123-
idle_job->job.args = NULL;
120+
121+
/* idle_job will always be the first job */
124122
idle_job->job.next = &idle_job->job;
125123
idle_job->job.prev = &idle_job->job;
126124
idle_job->prev = &idle_job->job;
@@ -129,10 +127,9 @@ static bool tpool_init(tpool_t *thrd_pool, size_t size)
129127
thrd_pool->state = idle;
130128
thrd_pool->size = size;
131129

132-
for (size_t i = 0; i < size; i++) {
130+
/* employer hires many workers */
131+
for (size_t i = 0; i < size; i++)
133132
thrd_create(thrd_pool->pool + i, worker, thrd_pool);
134-
//TODO: error handling
135-
}
136133

137134
return true;
138135
}
@@ -141,9 +138,10 @@ static void tpool_destroy(tpool_t *thrd_pool)
141138
{
142139
if (atomic_exchange(&thrd_pool->state, cancelled))
143140
printf("Thread pool cancelled with jobs still running.\n");
144-
for (int i = 0; i < thrd_pool->size; i++) {
141+
142+
for (int i = 0; i < thrd_pool->size; i++)
145143
thrd_join(thrd_pool->pool[i], NULL);
146-
}
144+
147145
while (thrd_pool->head->prev != &thrd_pool->head->job) {
148146
job_t *job = thrd_pool->head->prev->prev;
149147
free(thrd_pool->head->prev);
@@ -164,78 +162,79 @@ static void *bbp(void *arg)
164162
double *product = malloc(sizeof(double));
165163
if (!product)
166164
return NULL;
167-
165+
168166
*product = 1 / pow(16, k) * sum;
169167
return (void *)product;
170168
}
171169

172170
struct tpool_future *add_job(tpool_t *thrd_pool, void *(*func)(void *),
173-
void *args)
171+
void *arg)
174172
{
175-
// May use memory pool for jobs
176173
job_t *job = malloc(sizeof(job_t));
177174
if (!job)
178175
return NULL;
179176

180-
struct tpool_future *future = tpool_future_create();
181-
if (!future){
177+
struct tpool_future *future = tpool_future_create(arg);
178+
if (!future) {
182179
free(job);
183180
return NULL;
184181
}
185182

186-
// unprotected producer
187-
job->args = args;
188-
job->func = bbp;
183+
job->func = func;
189184
job->future = future;
190185
job->next = thrd_pool->head->job.next;
191186
job->prev = &thrd_pool->head->job;
192187
thrd_pool->head->job.next->prev = job;
193188
thrd_pool->head->job.next = job;
194189
if (thrd_pool->head->prev == &thrd_pool->head->job) {
195190
thrd_pool->head->prev = job;
196-
// trap worker at idle job
191+
/* the previous job of the idle job is itself */
197192
thrd_pool->head->job.prev = &thrd_pool->head->job;
198193
}
199194
return future;
200195
}
201196

202197
static inline void wait_until(tpool_t *thrd_pool, int state)
203198
{
204-
while (atomic_load(&thrd_pool->state) != state) {
199+
while (atomic_load(&thrd_pool->state) != state)
205200
thrd_yield();
206-
}
207201
}
208202

209203
int main()
210204
{
211-
struct tpool_future *futures[PRECISION + 1];
205+
int bbp_args[PRECISION];
206+
struct tpool_future *futures[PRECISION];
212207
double bbp_sum = 0;
213208

214209
tpool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT };
215210
if (!tpool_init(&thrd_pool, N_THREADS)) {
216211
printf("failed to init.\n");
217212
return 0;
218213
}
219-
for (int i = 0; i <= PRECISION; i++) {
220-
int *id = malloc(sizeof(int));
221-
*id = i;
222-
futures[i] = add_job(&thrd_pool, bbp, id);
223-
}
224-
// Due to simplified job queue (not protecting producer), starting the pool manually
214+
/* employer ask workers to work */
225215
atomic_store(&thrd_pool.state, running);
216+
217+
/* employer wait ... until workers are idle */
226218
wait_until(&thrd_pool, idle);
227-
for (int i = 0; i <= PRECISION; i++) {
228-
int *id = malloc(sizeof(int));
229-
*id = i;
230-
add_job(&thrd_pool, bbp, id);
219+
220+
/* employer add more job to the job queue */
221+
for (int i = 0; i < PRECISION; i++) {
222+
bbp_args[i] = i;
223+
futures[i] = add_job(&thrd_pool, bbp, &bbp_args[i]);
231224
}
232-
for (int i = 0; i <= PRECISION; i++) {
225+
226+
/* employer ask workers to work */
227+
atomic_store(&thrd_pool.state, running);
228+
229+
/* employer wait for the result of job */
230+
for (int i = 0; i < PRECISION; i++) {
233231
tpool_future_wait(futures[i]);
234232
bbp_sum += *(double *)(futures[i]->result);
235233
tpool_future_destroy(futures[i]);
236234
}
237-
atomic_store(&thrd_pool.state, running);
235+
236+
/* employer destroys the job queue and lays workers off */
238237
tpool_destroy(&thrd_pool);
239-
printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bbp_sum);
238+
printf("PI calculated with %d terms: %.15f\n", PRECISION, bbp_sum);
240239
return 0;
241240
}
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)