-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgraph-load-balance.c
196 lines (173 loc) · 6.31 KB
/
graph-load-balance.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "common.h"
#include "utils.h"
#include <athread.h>
#include <time.h>
const char* version_name = "A reference version of edge-based load balancing";
extern void SLAVE_FUN(prepare)();
extern void SLAVE_FUN(search)();
int *data_info; // 姣忎釜浠庢牳姣忚疆杩唬澶勭悊缁撶偣涓暟 // 姣忎釜浠庢牳姣忚疆杩唬澶勭悊鏁版嵁鐨勮捣濮嬪湴鍧�
index_t* buffer_s;
index_t* buffer_e; // 鍙戦�佺粰浠庢牳鐨勮竟缂撳啿鍖?
index_t *mypred; // 鍓嶇紑
index_t *bound; // 杈圭晫椤剁偣
index_t *prefix; // 杈圭晫椤剁偣鐨勫墠椹?
index_t *buffer;
index_t *nextLayer; // 涓嬩竴灞傞《鐐?
index_t *prefix_nx;
int r_id, c_id;
int col_num;
MPI_Comm row_comm;
MPI_Comm col_comm;
index_t *v_pos;
index_t *e_dst;
int offset_v;
// 鍚岃杩涚▼鏁版嵁鍋忕Щ
int row_v_count[8];
int row_v_displs[8];
index_t *row_v_pos; // 瀛樺偍涓�琛岀殑鏁版嵁
index_t *row_e_dst;
// 鍚堝苟浠ュ悗杈规暟鍜岀偣鏁?
int row_local_e;
int row_local_v;
int vpos_count[9]; // 姣忎竴琛岀粨鐐规暟閲忕殑鍓嶇紑鍜?
int vpos_count_second[8];
int size_b, size_n;
int recvcounts[8]; // 鍒楅�氫俊鍩熻繘琛実ather鏃剁殑鍙傛暟
int displs[8];
void preprocess(dist_graph_t *graph) {
int p_id = graph->p_id;
int p_num = graph->p_num;
int local_e = graph->local_e;
int local_v = graph->local_v;
//*****************閫氫俊鍩熷垝鍒?*******************************/
col_num = sqrt(p_num); // 鍒掑垎涓轰簩缁? col_num 涓哄垪鏁?
MPI_Comm_split(MPI_COMM_WORLD, p_id / col_num, p_id % col_num, &row_comm); // 鍒涘缓琛岄�氫俊鍩?
MPI_Comm_split(MPI_COMM_WORLD, p_id % col_num, p_id / col_num, &col_comm); // 鍒涘缓鍒楅�氫俊鍩?
MPI_Comm_rank(row_comm, &c_id); // 杩涚▼鍦ㄨ閫氫俊鍩熶腑鐨刬d
MPI_Comm_rank(col_comm, &r_id); // 鍒楅�氫俊鍩熶腑鐨刬d
//***************************************************************/
int row_e_count[8];
int row_e_displs[8];
//*****************鏋勯�?琛屽唴gatherv 鍙傛暟 *******************************/
MPI_Allgather(&local_v, 1, MPI_INT, row_v_count, 1, MPI_INT, row_comm);
MPI_Allgather(&local_e, 1, MPI_INT, row_e_count, 1, MPI_INT, row_comm);
int v_pre = 0;
int e_pre = 0;
for(int i = 0; i < col_num; i++){
row_v_displs[i] = v_pre;
row_e_displs[i] = e_pre;
v_pre += row_v_count[i];
e_pre += row_e_count[i];
}
//***********************************************************************
row_local_v = v_pre; // 鍚堝苟浠ュ悗鐨勭偣鏁伴噺
row_local_e = e_pre; // 鍚堝苟浠ュ悗鐨勮竟鏁伴噺
row_v_pos = (index_t*)malloc(sizeof(index_t) * (row_local_v + 1)); // 琛岀紦鍐插尯
row_e_dst = (index_t*)malloc(sizeof(index_t) * row_local_e);
// 琛屽唴gather鏁版嵁
MPI_Allgatherv(graph->v_pos, local_v, MPI_INT, row_v_pos, row_v_count, row_v_displs, MPI_INT, row_comm);
MPI_Allgatherv(graph->e_dst, local_e, MPI_INT, row_e_dst, row_e_count, row_e_displs, MPI_INT, row_comm);
row_v_pos[row_local_v] = row_v_pos[0] + row_local_e; //鍝ㄥ叺
// 鍒楅�氫俊锛屽緱鍒版瘡涓�琛岀粨鐐圭殑鏁伴噺
MPI_Allgather(&row_local_v, 1, MPI_INT, vpos_count, 1, MPI_INT, col_comm);
// 姹傚墠闈㈣鐨勭粨鐐规暟閲?
offset_v = 0;
for(int i = 0; i < col_num; i++){
int temp = vpos_count[i];
vpos_count[i] = offset_v;
offset_v += temp;
vpos_count_second[i] = offset_v;
}
vpos_count[col_num] = offset_v;
offset_v = vpos_count[r_id];
// 瀛樺偍杩涚▼鏈湴鏁版嵁
v_pos = (index_t*)malloc(sizeof(index_t) * (row_local_v + 1));
e_dst = (index_t*)malloc(sizeof(index_t) * row_local_e); //max size
mypred = (index_t*)malloc(sizeof(index_t) * row_local_v);
buffer = (index_t*)malloc(sizeof(index_t) * row_local_e);
bound = (index_t*)malloc(sizeof(index_t) * graph->global_e);
prefix = (index_t*)malloc(sizeof(index_t) * graph->global_e);
nextLayer = (index_t*)malloc(sizeof(index_t) * row_local_e);
prefix_nx = (index_t*)malloc(sizeof(index_t) * row_local_e);
//浠庢牳閮ㄥ垎鎵�瑕佺敤鍒扮殑鍑芥暟
buffer_s = (index_t*)malloc(sizeof(index_t) * row_local_e); //
buffer_e = (index_t*)malloc(sizeof(index_t) * row_local_e);
data_info = (int*)malloc(sizeof(int) * 129);
data_info[128] = offset_v;
athread_init();
//****************鏋勯�?灞炰簬鏈繘绋嬬殑鏁版嵁 *********************** ********
int p = 0; // 边的下标
for(int i = 0; i < row_local_v; i++){
v_pos[i] = p; // 本进程中,前面的点有多少边
int begin = row_v_pos[i] - row_v_pos[0];
int end = row_v_pos[i + 1] - row_v_pos[0];
for(int e = begin; e < end; e++){
int v = row_e_dst[e];
if (vpos_count[c_id] <= v && v < vpos_count_second[c_id]){
e_dst[p++] = v;
}
}
}
v_pos[row_local_v] = p;
free(row_v_pos);
free(row_e_dst);
//**************************************************************
}
void bfs(dist_graph_t *graph, index_t s, index_t* pred){
memset(mypred, UNREACHABLE, sizeof(int) * row_local_v);
size_b = 0;
if(vpos_count[c_id] <= s && s < vpos_count_second[c_id]){
bound[0] = s;
prefix[0] = s;
size_b = 1;
}
int newnode;
do{
if(size_b < 64){
size_n = 0;
for(int i = 0; i < size_b; i++){
int u = bound[i];
if(mypred[u - offset_v] == UNREACHABLE){
int begin = v_pos[u - offset_v];
int end = v_pos[u + 1 - offset_v];
mypred[u - offset_v] = prefix[i];
for(int j = begin; j < end; j++){
nextLayer[size_n] = e_dst[j];
prefix_nx[size_n] = u;
size_n++;
}
}
}
}else{
int q = size_b / 64;
int r = size_b % 64;
for(int i = 0; i < 64; i++){
data_info[i] = q + ((i < r) ? 1 : 0);
data_info[i + 64] = i * q + ((i < r) ? i : r);
}
athread_spawn(search, 0);
athread_join();
}
MPI_Barrier(MPI_COMM_WORLD);
MPI_Allgather(&size_n, 1, MPI_INT, recvcounts, 1, MPI_INT, col_comm); // 閫夋嫨瀵硅绾胯繘绋嬩綔涓烘牴鑺傜偣锛屼究浜庝箣鍚庤骞挎挱
int pre = 0;
for(int i = 0; i < col_num; i++){
displs[i] = pre;
pre += recvcounts[i];
}
size_b = pre;
MPI_Bcast(&size_b, 1, MPI_INT, r_id, row_comm);
MPI_Gatherv(nextLayer, size_n, MPI_INT, bound, recvcounts, displs, MPI_INT, c_id, col_comm);
MPI_Gatherv(prefix_nx, size_n, MPI_INT, prefix, recvcounts, displs, MPI_INT, c_id, col_comm);
MPI_Bcast(bound, size_b, MPI_INT, r_id, row_comm);
MPI_Bcast(prefix, size_b, MPI_INT, r_id, row_comm);
MPI_Allreduce(&size_b, &newnode, 1, MPI_INT, MPI_SUM, col_comm);
}while(newnode > 0);
MPI_Scatterv(mypred, row_v_count, row_v_displs, MPI_INT, pred, graph->local_v, MPI_INT, r_id, row_comm);
}
void destroy_additional_info(void *additional_info) {
free(additional_info);
}