Skip to content

Commit bfd1254

Browse files
authored
Merge pull request #13 from AxelSchneewind/ring-partitioned
Add test case for partitioned communication
2 parents 3e391c0 + 6130a2b commit bfd1254

5 files changed

+514
-0
lines changed

Makefile.am

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ mpi_test_suite_SOURCES = \
124124
threaded/tst_threaded_ring.c \
125125
threaded/tst_threaded_ring_isend.c \
126126
threaded/tst_threaded_ring_persistent.c \
127+
threaded/tst_threaded_ring_partitioned.c \
128+
threaded/tst_threaded_ring_partitioned_many_to_one.c \
127129
tst_comm.c \
128130
tst_comm.h \
129131
tst_file.c \

mpi_test_suite.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -917,6 +917,15 @@ extern int tst_threaded_ring_persistent_cleanup (struct tst_env * env);
917917
extern int tst_threaded_comm_dup_init (struct tst_env * env);
918918
extern int tst_threaded_comm_dup_run (struct tst_env * env);
919919
extern int tst_threaded_comm_dup_cleanup (struct tst_env * env);
920+
921+
extern int tst_threaded_ring_partitioned_init (struct tst_env * env);
922+
extern int tst_threaded_ring_partitioned_run (struct tst_env * env);
923+
extern int tst_threaded_ring_partitioned_cleanup (struct tst_env * env);
924+
925+
extern int tst_threaded_ring_partitioned_many_to_one_init (struct tst_env * env);
926+
extern int tst_threaded_ring_partitioned_many_to_one_run (struct tst_env * env);
927+
extern int tst_threaded_ring_partitioned_many_to_one_cleanup (struct tst_env * env);
928+
920929
#endif
921930

922931
#endif /* __MPI_TESTSUITE_H__ */
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* File: tst_threaded_ring_partitioned.c
3+
*
4+
* Functionality:
5+
* Sends data through a ring using partitioned communication.
6+
* Each thread corresponds to a partition of the send/receive buffers.
7+
*
8+
* Author: Axel Schneewind
9+
*
10+
* Date: July 19th 2023
11+
*/
12+
#include <mpi.h>
13+
#include "mpi_test_suite.h"
14+
#include "tst_threads.h"
15+
#include "tst_output.h"
16+
#include "tst_comm.h"
17+
18+
#include <pthread.h>
19+
20+
#define TST_RANK_MASTER 0
21+
22+
static pthread_barrier_t thread_barrier;
23+
24+
static int ratio_send_to_receive = 1;
25+
26+
int tst_threaded_ring_partitioned_init(struct tst_env *env)
27+
{
28+
int comm_rank;
29+
MPI_Comm comm = tst_comm_getmastercomm(env->comm);
30+
MPI_CHECK(MPI_Comm_rank(comm, &comm_rank));
31+
32+
int thread_num = tst_thread_get_num();
33+
int num_worker_threads = tst_thread_num_threads();
34+
35+
tst_output_printf(DEBUG_LOG, TST_REPORT_MAX, "(Rank:%d, Thread:%d) env->comm:%d env->type:%d env->values_num:%d\n",
36+
tst_global_rank, thread_num, env->comm, env->type, env->values_num);
37+
38+
// each partition contains env->values_num values
39+
MPI_Aint type_extent = tst_type_gettypesize(env->type);
40+
size_t buffer_size = num_worker_threads * env->values_num * type_extent;
41+
42+
if (thread_num == TST_THREAD_MASTER)
43+
{
44+
// one request for sending, one for receiving
45+
tst_thread_alloc_global_requests(2);
46+
47+
// barrier syncs master and worker threads
48+
pthread_barrier_init(&thread_barrier, NULL, num_worker_threads + 1);
49+
50+
// global buffer holds send and recv buffer
51+
tst_thread_global_buffer_init(2 * buffer_size);
52+
}
53+
54+
// wait until buffer is initialized by master thread (busy wait as thread barrier is not ready here)
55+
while (tst_thread_get_global_buffer_size() != 2 * buffer_size)
56+
usleep(2000);
57+
58+
// first half of shared buffer is send and second half is receive buffer
59+
env->send_buffer = tst_thread_get_global_buffer();
60+
env->recv_buffer = (char *)tst_thread_get_global_buffer() + buffer_size;
61+
62+
env->req_buffer = tst_thread_get_global_request(0);
63+
env->status_buffer = MPI_STATUSES_IGNORE;
64+
65+
// master thread of master rank initializes array values
66+
if (comm_rank == TST_RANK_MASTER && thread_num == TST_THREAD_MASTER)
67+
tst_type_setstandardarray(env->type, num_worker_threads * env->values_num, env->send_buffer, comm_rank);
68+
69+
return 0;
70+
}
71+
72+
73+
// busy wait until partition arrived, using exponential backoff with initial backoff time given.
74+
// returns 1 if the partition has arrived and 0 if waiting was interupted
75+
static int wait_for_partition(MPI_Request *recv_request, int partition_num, useconds_t backoff_time)
76+
{
77+
int flag = 0;
78+
do
79+
{
80+
MPI_CHECK(MPI_Parrived(*recv_request, partition_num, &flag));
81+
} while (flag == 0 && usleep((backoff_time = (backoff_time * 3) / 2)) == 0);
82+
83+
return flag;
84+
}
85+
86+
int tst_threaded_ring_partitioned_run(struct tst_env *env)
87+
{
88+
int comm_size;
89+
int comm_rank;
90+
int send_to;
91+
int recv_from;
92+
93+
// for measuring time
94+
double time_init;
95+
96+
// only allow intra comm
97+
MPI_Comm comm = tst_comm_getmastercomm(env->comm);
98+
if (tst_comm_getcommclass(env->comm) & TST_MPI_INTRA_COMM)
99+
{
100+
MPI_CHECK(MPI_Comm_rank(comm, &comm_rank));
101+
MPI_CHECK(MPI_Comm_size(comm, &comm_size));
102+
103+
send_to = (comm_rank + 1) % comm_size;
104+
recv_from = (comm_rank + comm_size - 1) % comm_size;
105+
}
106+
else if (tst_comm_getcommclass(env->comm) & TST_MPI_COMM_SELF)
107+
{
108+
MPI_CHECK(MPI_Comm_rank(comm, &comm_rank));
109+
MPI_CHECK(MPI_Comm_size(comm, &comm_size));
110+
111+
send_to = comm_rank;
112+
recv_from = comm_rank;
113+
}
114+
else
115+
ERROR(EINVAL, "tst_threaded_ring_partitioned cannot run with this kind of communicator");
116+
117+
MPI_Datatype type = tst_type_getdatatype(env->type);
118+
MPI_Aint type_extent = tst_type_gettypesize(env->type);
119+
120+
MPI_Request *send_request = &env->req_buffer[0];
121+
MPI_Request *recv_request = &env->req_buffer[1];
122+
123+
int num_threads = 1 + tst_thread_num_threads(); /* we have to add 1 for the master thread */
124+
int num_worker_threads = tst_thread_num_threads();
125+
int thread_num = tst_thread_get_num();
126+
127+
MPI_CHECK(MPI_Comm_rank(comm, &comm_rank));
128+
MPI_CHECK(MPI_Comm_size(comm, &comm_size));
129+
130+
tst_output_printf(DEBUG_LOG, TST_REPORT_MAX, "(Rank:%d, Thread:%d) comm_rank:%d comm_size:%d "
131+
"send_to:%d recv_from:%d env->tag:%d\n",
132+
tst_global_rank, thread_num, comm_rank, comm_size,
133+
send_to, recv_from, env->tag);
134+
135+
// number of partitions and values per partition
136+
int num_send_partitions = num_worker_threads;
137+
int num_recv_partitions = num_send_partitions / ratio_send_to_receive;
138+
int partition_size = env->values_num; // number of elements per send partition
139+
140+
// partition numbers for this thread
141+
int send_partition_num = thread_num;
142+
int recv_partition_num = (thread_num % ratio_send_to_receive == 0) ? thread_num / ratio_send_to_receive : -1;
143+
144+
// init send and recv and start both
145+
if (thread_num == TST_THREAD_MASTER)
146+
{
147+
tst_output_printf(DEBUG_LOG, TST_REPORT_MAX,"(Rank:%i, Thread:%i) initializing send to %i and recv from %i with %i partitions of size %i*%i bytes\n",
148+
comm_rank, thread_num,
149+
send_to, recv_from, num_send_partitions, partition_size, type_extent);
150+
151+
MPI_CHECK(MPI_Psend_init(env->send_buffer, num_send_partitions, partition_size, type, send_to,
152+
0, comm, MPI_INFO_NULL, send_request));
153+
MPI_CHECK(MPI_Precv_init(env->recv_buffer, num_recv_partitions, partition_size * ratio_send_to_receive, type, recv_from,
154+
0, comm, MPI_INFO_NULL, recv_request));
155+
156+
MPI_CHECK(MPI_Startall(2, env->req_buffer));
157+
158+
// wait for all ranks to become ready
159+
MPI_CHECK(MPI_Barrier(MPI_COMM_WORLD));
160+
}
161+
162+
pthread_barrier_wait(&thread_barrier);
163+
164+
if (comm_rank == TST_RANK_MASTER)
165+
{
166+
if (thread_num == TST_THREAD_MASTER)
167+
time_init = MPI_Wtime();
168+
169+
if (send_partition_num >= 0 && send_partition_num < num_send_partitions)
170+
{
171+
// allow this partition to be sent
172+
MPI_CHECK(MPI_Pready(send_partition_num, *send_request));
173+
}
174+
175+
if (recv_partition_num >= 0 && recv_partition_num < num_recv_partitions)
176+
{
177+
wait_for_partition(recv_request, recv_partition_num, 512);
178+
}
179+
}
180+
else
181+
{
182+
if (send_partition_num >= 0 && send_partition_num < num_send_partitions)
183+
{
184+
if (recv_partition_num >= 0 && recv_partition_num < num_recv_partitions) {
185+
wait_for_partition(recv_request, recv_partition_num, 128);
186+
}
187+
188+
// simply copy data from input to output buffer
189+
int begin_index = partition_size * send_partition_num * type_extent;
190+
int size = partition_size * type_extent;
191+
memcpy(&env->send_buffer[begin_index], &env->recv_buffer[begin_index], size);
192+
193+
// allow sending of this partition
194+
MPI_CHECK(MPI_Pready(send_partition_num, *send_request));
195+
}
196+
}
197+
198+
// wait until sends and recvs are done
199+
if (thread_num == TST_THREAD_MASTER)
200+
{
201+
MPI_CHECK(MPI_Waitall(2, env->req_buffer, env->status_buffer));
202+
203+
if (comm_rank == TST_RANK_MASTER)
204+
{
205+
double time_final = MPI_Wtime();
206+
207+
// print timing
208+
tst_output_printf(DEBUG_LOG, TST_REPORT_MAX, "(Rank:%d) Sending through ring took %fs\n", comm_rank, time_final - time_init);
209+
}
210+
else
211+
tst_output_printf(DEBUG_LOG, TST_REPORT_MAX, "(Rank:%d) done\n", comm_rank);
212+
}
213+
214+
pthread_barrier_wait(&thread_barrier);
215+
216+
// check that data was transmitted correctly
217+
if (thread_num == TST_THREAD_MASTER)
218+
return tst_test_checkstandardarray(env, env->recv_buffer, TST_RANK_MASTER);
219+
else
220+
return 0;
221+
}
222+
223+
int tst_threaded_ring_partitioned_cleanup(struct tst_env *env)
224+
{
225+
int thread_num = tst_thread_get_num();
226+
int num_worker_threads = tst_thread_num_threads();
227+
228+
if (thread_num == TST_THREAD_MASTER)
229+
{
230+
tst_thread_free_global_requests();
231+
232+
tst_thread_global_buffer_cleanup();
233+
234+
pthread_barrier_destroy(&thread_barrier);
235+
}
236+
237+
return 0;
238+
}

0 commit comments

Comments
 (0)