21
21
22
22
static pthread_barrier_t thread_barrier ;
23
23
24
+ static int ratio_send_to_receive = 1 ;
25
+
24
26
int tst_threaded_ring_partitioned_init (struct tst_env * env )
25
27
{
26
28
int comm_rank ;
@@ -66,15 +68,16 @@ int tst_threaded_ring_partitioned_init(struct tst_env *env)
66
68
return 0 ;
67
69
}
68
70
69
- // busy wait until partition arrived
71
+
72
+ // busy wait until partition arrived, using exponential backoff with initial backoff time given.
70
73
// returns 1 if the partition has arrived and 0 if waiting was interupted
71
- static int wait_for_partition (MPI_Request * recv_request , int partition_num )
74
+ static int wait_for_partition (MPI_Request * recv_request , int partition_num , useconds_t backoff_time )
72
75
{
73
76
int flag = 0 ;
74
77
do
75
78
{
76
79
MPI_CHECK (MPI_Parrived (* recv_request , partition_num , & flag ));
77
- } while (flag == 0 && usleep (2000 ) == 0 );
80
+ } while (flag == 0 && usleep (( backoff_time = ( backoff_time * 3 ) / 2 ) ) == 0 );
78
81
79
82
return flag ;
80
83
}
@@ -129,26 +132,31 @@ int tst_threaded_ring_partitioned_run(struct tst_env *env)
129
132
send_to , recv_from , env -> tag );
130
133
131
134
// number of partitions and values per partition
132
- int num_partitions = num_worker_threads ;
133
- int partition_size = env -> values_num ; // number of elements
135
+ int num_send_partitions = num_worker_threads ;
136
+ int num_recv_partitions = num_send_partitions / ratio_send_to_receive ;
137
+ int partition_size = env -> values_num ; // number of elements per send partition
138
+
139
+ // partition numbers for this thread
140
+ int send_partition_num = thread_num ;
141
+ int recv_partition_num = (thread_num % ratio_send_to_receive == 0 ) ? thread_num / ratio_send_to_receive : -1 ;
134
142
135
143
// init send and recv and start both
136
144
if (thread_num == TST_THREAD_MASTER )
137
145
{
138
- tst_output_printf (DEBUG_LOG , TST_REPORT_MAX , "(Rank:%i, Thread:%i) initializing send to %i and recv from %i with %i partitions of size %i*%i bytes\n" ,
146
+ tst_output_printf (DEBUG_LOG , TST_REPORT_MAX ,"(Rank:%i, Thread:%i) initializing send to %i and recv from %i with %i partitions of size %i*%i bytes\n" ,
139
147
comm_rank , thread_num ,
140
- send_to , recv_from , num_partitions , partition_size , type_extent );
148
+ send_to , recv_from , num_send_partitions , partition_size , type_extent );
141
149
142
- MPI_CHECK (MPI_Psend_init (env -> send_buffer , num_partitions , partition_size , type , send_to ,
150
+ MPI_CHECK (MPI_Psend_init (env -> send_buffer , num_send_partitions , partition_size , type , send_to ,
143
151
0 , comm , MPI_INFO_NULL , send_request ));
144
- MPI_CHECK (MPI_Precv_init (env -> recv_buffer , num_partitions , partition_size , type , recv_from ,
152
+ MPI_CHECK (MPI_Precv_init (env -> recv_buffer , num_recv_partitions , partition_size * ratio_send_to_receive , type , recv_from ,
145
153
0 , comm , MPI_INFO_NULL , recv_request ));
146
154
147
155
MPI_CHECK (MPI_Startall (2 , env -> req_buffer ));
148
156
149
157
// wait for all ranks to become ready
150
158
MPI_CHECK (MPI_Barrier (MPI_COMM_WORLD ));
151
- };
159
+ }
152
160
153
161
pthread_barrier_wait (& thread_barrier );
154
162
@@ -157,30 +165,32 @@ int tst_threaded_ring_partitioned_run(struct tst_env *env)
157
165
if (thread_num == TST_THREAD_MASTER )
158
166
time_init = MPI_Wtime ();
159
167
160
- if (thread_num >= 0 && thread_num < num_partitions )
168
+ if (send_partition_num >= 0 && send_partition_num < num_send_partitions )
161
169
{
162
170
// allow this partition to be sent
163
- MPI_CHECK (MPI_Pready (thread_num , * send_request ));
171
+ MPI_CHECK (MPI_Pready (send_partition_num , * send_request ));
164
172
}
165
173
166
- if (thread_num >= 0 && thread_num < num_partitions )
174
+ if (recv_partition_num >= 0 && recv_partition_num < num_recv_partitions )
167
175
{
168
- wait_for_partition (recv_request , thread_num );
176
+ wait_for_partition (recv_request , recv_partition_num , 512 );
169
177
}
170
178
}
171
179
else
172
180
{
173
- if (thread_num >= 0 && thread_num < num_partitions )
181
+ if (send_partition_num >= 0 && send_partition_num < num_send_partitions )
174
182
{
175
- wait_for_partition (recv_request , thread_num );
183
+ if (recv_partition_num >= 0 && recv_partition_num < num_recv_partitions ) {
184
+ wait_for_partition (recv_request , recv_partition_num , 128 );
185
+ }
176
186
177
187
// simply copy data from input to output buffer
178
- int begin_index = partition_size * thread_num * type_extent ;
188
+ int begin_index = partition_size * send_partition_num * type_extent ;
179
189
int size = partition_size * type_extent ;
180
190
memcpy (& env -> send_buffer [begin_index ], & env -> recv_buffer [begin_index ], size );
181
191
182
192
// allow sending of this partition
183
- MPI_CHECK (MPI_Pready (thread_num , * send_request ));
193
+ MPI_CHECK (MPI_Pready (send_partition_num , * send_request ));
184
194
}
185
195
}
186
196
0 commit comments