Skip to content

Commit ecd1b57

Browse files
committed
Update tpool in aba example to the one in main
The new thread pool is ported to the rmw_example_aba.c file. Makefile is updated accordingly as well.
1 parent e8b81b4 commit ecd1b57

File tree

2 files changed

+134
-66
lines changed

2 files changed

+134
-66
lines changed

examples/Makefile

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
all:
22
$(CC) -Wall -o rmw_example rmw_example.c -pthread -lm
3+
$(CC) -Wall -o rmw_example_aba rmw_example_aba.c -pthread -lm -mcx16
34
clean:
4-
rm -f rmw_example
5+
rm -f rmw_example rmw_example_aba
56
check: all
6-
./rmw_example
7+
./rmw_example
8+
./rmw_example_aba

examples/rmw_example_aba.c

Lines changed: 130 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
1-
#include <stdalign.h>
21
#include <stdio.h>
32
#include <stdatomic.h>
43
#include <threads.h>
54
#include <stdlib.h>
65
#include <stdbool.h>
76
#include <assert.h>
7+
#include <math.h>
88

9+
#define PRECISION 100 /* upper bound in BPP sum */
910
#define CACHE_LINE_SIZE 64
10-
#define N_JOBS 16
11-
#define N_THREADS 8
11+
#define N_THREADS 64
12+
13+
struct tpool_future {
14+
void *result;
15+
void *arg;
16+
atomic_flag flag;
17+
};
1218

1319
typedef struct job {
14-
void *args;
20+
void *(*func)(void *);
21+
struct tpool_future *future;
1522
struct job *next, *prev;
1623
} job_t;
1724

@@ -21,64 +28,93 @@ typedef struct idle_job {
2128
_Atomic(job_t *) prev;
2229
unsigned long long version;
2330
};
24-
_Atomic struct B16 {
25-
job_t *_prev;
31+
_Atomic struct versioned_prev {
32+
job_t *ptr;
2633
unsigned long long _version;
27-
} DCAS;
34+
} v_prev;
2835
};
2936
char padding[CACHE_LINE_SIZE - sizeof(_Atomic(job_t *)) -
30-
sizeof(unsigned long long)];
37+
sizeof(unsigned long long)]; /* avoid false sharing */
3138
job_t job;
3239
} idle_job_t;
3340

3441
enum state { idle, running, cancelled };
3542

36-
typedef struct thread_pool {
43+
typedef struct tpool {
3744
atomic_flag initialezed;
3845
int size;
3946
thrd_t *pool;
4047
atomic_int state;
4148
thrd_start_t func;
42-
// job queue is a SPMC ring buffer
43-
idle_job_t *head;
44-
} thread_pool_t;
49+
idle_job_t *head; /* job queue is a SPMC ring buffer */
50+
} tpool_t;
51+
52+
static struct tpool_future *tpool_future_create(void *arg)
53+
{
54+
struct tpool_future *future = malloc(sizeof(struct tpool_future));
55+
if (future) {
56+
future->result = NULL;
57+
future->arg = arg;
58+
atomic_flag_clear(&future->flag);
59+
atomic_flag_test_and_set(&future->flag);
60+
}
61+
return future;
62+
}
63+
64+
void tpool_future_wait(struct tpool_future *future)
65+
{
66+
while (atomic_flag_test_and_set(&future->flag))
67+
;
68+
}
69+
70+
void tpool_future_destroy(struct tpool_future *future)
71+
{
72+
free(future->result);
73+
free(future);
74+
}
4575

4676
static int worker(void *args)
4777
{
4878
if (!args)
4979
return EXIT_FAILURE;
50-
thread_pool_t *thrd_pool = (thread_pool_t *)args;
80+
tpool_t *thrd_pool = (tpool_t *)args;
5181

5282
while (1) {
83+
/* worker is laid off */
5384
if (atomic_load(&thrd_pool->state) == cancelled)
5485
return EXIT_SUCCESS;
5586
if (atomic_load(&thrd_pool->state) == running) {
56-
// claim the job
57-
struct B16 job = atomic_load(&thrd_pool->head->DCAS);
58-
struct B16 next;
87+
/* worker takes the job */
88+
struct versioned_prev job = atomic_load(&thrd_pool->head->v_prev);
89+
/* worker checks if there is only an idle job in the job queue */
90+
if (job.ptr == &thrd_pool->head->job) {
91+
/* worker says it is idle */
92+
atomic_store(&thrd_pool->state, idle);
93+
thrd_yield();
94+
continue;
95+
}
96+
97+
struct versioned_prev next;
98+
/* compare 16 byte at once */
5999
do {
60-
next._prev = job._prev->prev;
100+
next.ptr = job.ptr->prev;
61101
next._version = job._version;
62-
} while (!atomic_compare_exchange_weak(&thrd_pool->head->DCAS, &job,
63-
next));
102+
} while (!atomic_compare_exchange_weak(&thrd_pool->head->v_prev,
103+
&job, next));
64104

65-
if (job._prev->args == NULL) {
66-
atomic_store(&thrd_pool->state, idle);
67-
} else {
68-
printf("Hello from job %d\n", *(int *)job._prev->args);
69-
free(job._prev->args);
70-
free(job._prev); // could cause dangling pointer in other threads
71-
}
105+
job.ptr->future->result =
106+
(void *)job.ptr->func(job.ptr->future->arg);
107+
atomic_flag_clear(&job.ptr->future->flag);
108+
free(job.ptr);
72109
} else {
73-
/* To auto run when jobs added, set status to running if job queue is not empty.
74-
* As long as the producer is protected */
110+
/* worker is idle */
75111
thrd_yield();
76-
continue;
77112
}
78113
};
114+
return EXIT_SUCCESS;
79115
}
80116

81-
static bool thread_pool_init(thread_pool_t *thrd_pool, size_t size)
117+
static bool tpool_init(tpool_t *thrd_pool, size_t size)
82118
{
83119
if (atomic_flag_test_and_set(&thrd_pool->initialezed)) {
84120
printf("This thread pool has already been initialized.\n");
@@ -92,14 +128,13 @@ static bool thread_pool_init(thread_pool_t *thrd_pool, size_t size)
92128
return false;
93129
}
94130

95-
// May use memory pool for jobs
96131
idle_job_t *idle_job = malloc(sizeof(idle_job_t));
97132
if (!idle_job) {
98133
printf("Failed to allocate idle job.\n");
99134
return false;
100135
}
101-
// idle_job will always be the first job
102-
idle_job->job.args = NULL;
136+
137+
/* idle_job will always be the first job */
103138
idle_job->job.next = &idle_job->job;
104139
idle_job->job.prev = &idle_job->job;
105140
idle_job->prev = &idle_job->job;
@@ -109,21 +144,21 @@ static bool thread_pool_init(thread_pool_t *thrd_pool, size_t size)
109144
thrd_pool->state = idle;
110145
thrd_pool->size = size;
111146

112-
for (size_t i = 0; i < size; i++) {
147+
/* employer hires many workers */
148+
for (size_t i = 0; i < size; i++)
113149
thrd_create(thrd_pool->pool + i, worker, thrd_pool);
114-
//TODO: error handling
115-
}
116150

117151
return true;
118152
}
119153

120-
static void thread_pool_destroy(thread_pool_t *thrd_pool)
154+
static void tpool_destroy(tpool_t *thrd_pool)
121155
{
122156
if (atomic_exchange(&thrd_pool->state, cancelled))
123157
printf("Thread pool cancelled with jobs still running.\n");
124-
for (int i = 0; i < thrd_pool->size; i++) {
158+
159+
for (int i = 0; i < thrd_pool->size; i++)
125160
thrd_join(thrd_pool->pool[i], NULL);
126-
}
161+
127162
while (thrd_pool->head->prev != &thrd_pool->head->job) {
128163
job_t *job = thrd_pool->head->prev->prev;
129164
free(thrd_pool->head->prev);
@@ -135,58 +170,89 @@ static void thread_pool_destroy(thread_pool_t *thrd_pool)
135170
atomic_flag_clear(&thrd_pool->initialezed);
136171
}
137172

138-
__attribute__((nonnull(2))) static bool add_job(thread_pool_t *thrd_pool,
139-
void *args)
173+
/* Use Bailey–Borwein–Plouffe formula to approximate PI */
174+
static void *bbp(void *arg)
175+
{
176+
int k = *(int *)arg;
177+
double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
178+
(1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
179+
double *product = malloc(sizeof(double));
180+
if (!product)
181+
return NULL;
182+
183+
*product = 1 / pow(16, k) * sum;
184+
return (void *)product;
185+
}
186+
187+
struct tpool_future *add_job(tpool_t *thrd_pool, void *(*func)(void *),
188+
void *arg)
140189
{
141-
// May use memory pool for jobs
142190
job_t *job = malloc(sizeof(job_t));
143191
if (!job)
144-
return false;
192+
return NULL;
193+
194+
struct tpool_future *future = tpool_future_create(arg);
195+
if (!future) {
196+
free(job);
197+
return NULL;
198+
}
145199

146-
// unprotected producer
147-
job->args = args;
200+
job->func = func;
201+
job->future = future;
148202
job->next = thrd_pool->head->job.next;
149203
job->prev = &thrd_pool->head->job;
150204
thrd_pool->head->job.next->prev = job;
151205
thrd_pool->head->job.next = job;
152206
if (thrd_pool->head->prev == &thrd_pool->head->job) {
153207
thrd_pool->head->prev = job;
154208
thrd_pool->head->version += 1;
155-
// trap worker at idle job
209+
/* the previous job of the idle job is itself */
156210
thrd_pool->head->job.prev = &thrd_pool->head->job;
157211
}
158-
159-
return true;
212+
return future;
160213
}
161214

162-
static inline void wait_until(thread_pool_t *thrd_pool, int state)
215+
static inline void wait_until(tpool_t *thrd_pool, int state)
163216
{
164-
while (atomic_load(&thrd_pool->state) != state) {
217+
while (atomic_load(&thrd_pool->state) != state)
165218
thrd_yield();
166-
}
167219
}
168220

169221
int main()
170222
{
171-
thread_pool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT };
172-
if (!thread_pool_init(&thrd_pool, N_THREADS)) {
223+
int bbp_args[PRECISION];
224+
struct tpool_future *futures[PRECISION];
225+
double bbp_sum = 0;
226+
227+
tpool_t thrd_pool = { .initialezed = ATOMIC_FLAG_INIT };
228+
if (!tpool_init(&thrd_pool, N_THREADS)) {
173229
printf("failed to init.\n");
174230
return 0;
175231
}
176-
for (int i = 0; i < N_JOBS; i++) {
177-
int *id = malloc(sizeof(int));
178-
*id = i;
179-
add_job(&thrd_pool, id);
180-
}
181-
// Due to simplified job queue (not protecting producer), starting the pool manually
232+
/* employer ask workers to work */
182233
atomic_store(&thrd_pool.state, running);
234+
235+
/* employer wait ... until workers are idle */
183236
wait_until(&thrd_pool, idle);
184-
for (int i = 0; i < N_JOBS; i++) {
185-
int *id = malloc(sizeof(int));
186-
*id = i;
187-
add_job(&thrd_pool, id);
237+
238+
/* employer add more job to the job queue */
239+
for (int i = 0; i < PRECISION; i++) {
240+
bbp_args[i] = i;
241+
futures[i] = add_job(&thrd_pool, bbp, &bbp_args[i]);
188242
}
243+
244+
/* employer ask workers to work */
189245
atomic_store(&thrd_pool.state, running);
190-
thread_pool_destroy(&thrd_pool);
246+
247+
/* employer wait for the result of job */
248+
for (int i = 0; i < PRECISION; i++) {
249+
tpool_future_wait(futures[i]);
250+
bbp_sum += *(double *)(futures[i]->result);
251+
tpool_future_destroy(futures[i]);
252+
}
253+
254+
/* employer destroys the job queue and lays workers off */
255+
tpool_destroy(&thrd_pool);
256+
printf("PI calculated with %d terms: %.15f\n", PRECISION, bbp_sum);
191257
return 0;
192258
}

0 commit comments

Comments
 (0)