-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwarp_jacobi.cpp
302 lines (222 loc) · 9.82 KB
/
warp_jacobi.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
#include <vector>
#include <string>
#include <fstream>
#include <graphlab.hpp>
#include <graphlab/warp.hpp>
#include "../collaborative_filtering/eigen_wrapper.hpp"
#include "../collaborative_filtering/types.hpp"
#include "../collaborative_filtering/eigen_serialization.hpp"
#include <boost/algorithm/string/predicate.hpp>
#include <graphlab/util/stl_util.hpp>
using namespace graphlab;
#define initX 0 //random seed for later
//int rows = 3, cols = 3;
int max_iter = 10;
double tol = 1e-5;
//double max_err=100;
// Represents a row of the matrix and its diagonal value
struct vertex_data : public graphlab::IS_POD_TYPE{
double x_i; // current guess
//int b_i; // coefficient = side of row
double A_ii; // diagonal value
double err;
};
// represents an non-diagonal element of a matrix
struct edge_data : public graphlab::IS_POD_TYPE{
double A_ij; // value of matrix element
edge_data(double input = 1) : A_ij(input) {};
};
typedef distributed_graph<vertex_data, edge_data> graph_type;
double max(double x, double y) { return x > y ? x : y; }
/*
double jacobi_map(graph_type::edge_type edge, graph_type::vertex_type other, double prev_guess)
{
return edge.data().A_ij * prev_guess;
}
*/
double edge_val(graph_type::edge_type edge, graph_type::vertex_type other) { return fabs(edge.data().A_ij); }
void add_magnitude(double& a, const double& b) { a += b;}
void jacobi_precondition(graph_type::vertex_type& vertex)
{
vertex_data& data = vertex.data();
double row_sum = warp::map_reduce_neighborhood(vertex, OUT_EDGES, edge_val, add_magnitude);
data.A_ii = row_sum + 1;
}
double jacobi_map(graph_type::edge_type edge, graph_type::vertex_type other, const double prev_guess) {
return edge.data().A_ij * prev_guess;
}
void jacobi_combine(double& a, const double& b, const double na) { a += b; }
void jacobi_step(graph_type::vertex_type& vertex)
{
vertex_data& data = vertex.data();
double res = warp::map_reduce_neighborhood(vertex, OUT_EDGES, data.x_i, jacobi_map, jacobi_combine);
double x_i_next = (1 - res) / data.A_ii;
//if (x_i_next != 0)
data.err = fabs(x_i_next - data.x_i) / fabs(x_i_next);
//else
//data.err = 0;
data.x_i = x_i_next;
}
/***
* JACOBI UPDATE FUNCTION
* Update rule is:
* x_i = (b_i - \sum_j A_ij * x_j)/A_ii
void jacobi_step(graph_type::vertex_type& vertex)
{
vertex_data& data = vertex.data();
//Warp function allows a map-reduce aggregation of the neighborhood of a vertex to be performed
double res = warp::map_reduce_neighborhood(vertex, OUT_EDGES, data.x_i, jacobi_map);
//use OUT_EDGES for sum up the rows
//IN_EDGES:In edges implies that only whose target is the center vertex are processed during gather or scatter
//OUT_EDGES:Out edges implies that only whose source is the center vertex are processed during gather or scatter
//ALL_EDGES:All edges implies that all adges adjacent to a the center vertex are processed on gather or scatter. Note that some neighbors may be encountered twice if there is both an in and out edge to that neighbor
//b_i =1 always
//double x_i_next = (data.b_i - res) / data.A_ii;
double x_i_next = (1 - res) / data.A_ii;
data.err = fabs(x_i_next - data.x_i) / x_i_next;
data.x_i = x_i_next;
}
void jacobi_step(graph_type::vertex_type& vertex)
{
vertex_data& data = vertex.data();
double res = warp::map_reduce_neighborhood(vertex, OUT_EDGES, data.x_i, jacobi_map, jacobi_combine);
double x_i_next = (data.b_i - res) / data.A_ii;
data.err = fabs(x_i_next - data.x_i) / x_i_next;
data.x_i = x_i_next;
}
*/
//void maxErr(graph_type::vertex_type vertex) {
//max_err = max(max_err, vertex.data().err);
//}
inline bool graph_loader(graph_type& graph, const std::string& filename, const std::string& line) {
//no need to parse
// if (boost::algorithm::ends_with(filename))
// return true;
ASSERT_FALSE(line.empty());
// Determine the role of the data
// Parse the line
std::stringstream strm(line);
graph_type::vertex_id_type source_id(-1), target_id(-1);
float weight(0);
strm >> source_id >> target_id;
source_id--; target_id--;
//if (source_id >= (uint)rows)
//logstream(LOG_FATAL)<<"Row number: " << source_id << " sould be < rows " << rows << " [ line: " << line << " ] " << std::endl;
//if (target_id >= (uint)cols)
//logstream(LOG_FATAL)<<"Col number: " << target_id << " sould be < cols " << cols << " [ line: " << line << " ] " << std::endl;
strm >> weight;
//the diagonal, which is the vertex value
if (source_id == target_id){
vertex_data data;
data.A_ii = weight; //the diagonal value
data.x_i = 1.0;
graph.add_vertex(source_id, data);
}
// Create an edge and add it to the graph, for the non diagonals
else graph.add_edge(source_id, target_id, edge_data(weight));
return true; // successful load
} // end of graph_loader
struct pagerank_writer{
std::string save_vertex(graph_type::vertex_type v) {
char c[128];
//sprintf(c, "%u\t%f\n", v.id(), v.data().x_i);
sprintf(c, "%f\n", v.data().x_i);
return c;
}
std::string save_edge(const graph_type::edge_type& edge) {return "";}
};
double err_value(const graph_type::vertex_type& vertex) { return vertex.data().err * vertex.data().err; }
int main(int argc, char** argv)
{
graphlab::mpi_tools::init(argc, argv);
graphlab::distributed_control dc;
//---------------------------------- input options
const std::string description =
"Solve a linear system using Jacobi method";
graphlab::command_line_options clopts(description);
std::string input_dir, output_dir;
clopts.attach_option("matrix", input_dir,"The directory containing the matrix file");
clopts.add_positional("matrix");
//clopts.attach_option("initial_vector", vecfile,"optional initial vector");
//clopts.attach_option("debug", debug, "Display debug output.");
clopts.attach_option("max_iter", max_iter, "max iterations");
clopts.attach_option("tol", tol, "convergence threshold");
//clopts.attach_option("rows", rows, "number of rows");
//clopts.attach_option("cols", cols, "number of cols");
if(!clopts.parse(argc, argv) || input_dir == "") {
std::cout << "Error in parsing command line arguments." << std::endl;
clopts.print_description();
return EXIT_FAILURE;
}
//if (rows <= 0 || cols <= 0 || rows != cols)
//logstream(LOG_FATAL)<<"Please specify number of rows/cols of the input matrix" << std::endl;
//---------------------------------- calling graph_loader
dc.cout() << "Loading graph." << std::endl;
graphlab::timer timer;
graph_type graph(dc, clopts); //graph_type
graph.load(input_dir, graph_loader); //loading data
//pgraph = &graph;
dc.cout() << "Loading graph. Finished in " << timer.current_time() << std::endl;
dc.cout() << "Finalizing graph." << std::endl;
timer.start();
graph.finalize(); //done loading graph
dc.cout() << "Finalizing graph. Finished in " << timer.current_time() << std::endl;
//------------------------------------ print graph info
dc.cout()
<< "========== Graph statistics on proc " << dc.procid() << " ==============="
<< "\n Num vertices: " << graph.num_vertices()
<< "\n Num edges: " << graph.num_edges()
<< "\n Num replica: " << graph.num_replicas() //should be non?
<< "\n Replica to vertex ratio: "
<< float(graph.num_replicas())/graph.num_vertices() //should be 0?
<< "\n --------------------------------------------"
<< "\n Num local own vertices: " << graph.num_local_own_vertices()
<< "\n Num local vertices: " << graph.num_local_vertices()
<< "\n Replica to own ratio: "
<< (float)graph.num_local_vertices()/graph.num_local_own_vertices()
<< "\n Num local edges: " << graph.num_local_edges()
//<< "\n Begin edge id: " << graph.global_eid(0)
<< "\n Edge balance ratio: "
<< float(graph.num_local_edges())/graph.num_edges()
<< std::endl;
//----------------------------------
//
//warp_engine engine(dc, graph);
bool converged = false;
double max_err;
int k = 0;
dc.cout() << "Starting Jacobi precondition\n";
//warp::parfor_all_vertices(graph, jacobi_precondition);
do {
dc.cout() << "Starting jacobi iteration " << k << "\n";
//This Warp function provides a simple parallel for loop over all vertices in the graph, or in a given set of vertices
warp::parfor_all_vertices(graph, jacobi_step);
max_err = graph.map_reduce_vertices<double>(err_value);
max_err = sqrt(max_err);
//warp::parfor_all_vertices(graph, maxErr);
dc.cout() << "Error for iteration: " << max_err << std::endl;
if (fabs(max_err) < 0.000000001) {
converged = true;
}
k++;
} while (!converged);
dc.cout() << "Finished computing in " << k << " iterations error: " << max_err << "\n";
//---------------------------------- finished
double runtime = timer.current_time();
dc.cout() << "Jacobi finished in " << runtime << std::endl;
//dc.cout() << "\t Updates: " << engine.num_updates()<< std::endl;
//dc.cout() << "Solution converged to residual: " << ret.toDouble() << std::endl;
dc.cout() << "----------------------------------------------------------"
<< std::endl
<< "Final Runtime (seconds): " << runtime
<< std::endl
//<< "Updates executed: " << engine.num_updates() << std::endl
//<< "Update Rate (updates/second): "
//<< engine.num_updates() / (double)runtime << std::endl;
;
//----------------------------------save to file
//graph.save("gl", pagerank_writer().save_vertex, false, true, false, 1);
graph.save("gl", pagerank_writer(), false, true, false, 1);
graphlab::mpi_tools::finalize();
return EXIT_SUCCESS;
}