From 7ce91ebf7b75f96dcf463f94ff0f063155a4bf29 Mon Sep 17 00:00:00 2001 From: Glenn Hickey Date: Fri, 5 Apr 2024 16:45:37 -0400 Subject: [PATCH] merge adjacent intervals from different threads --- src/rgfa.cpp | 76 ++++++++++++++++++++++++++++++++++++---------------- src/rgfa.hpp | 3 ++- 2 files changed, 55 insertions(+), 24 deletions(-) diff --git a/src/rgfa.cpp b/src/rgfa.cpp index 237c7cb8f81..243899d2425 100644 --- a/src/rgfa.cpp +++ b/src/rgfa.cpp @@ -128,6 +128,7 @@ void RGFACover::compute(const PathHandleGraph* graph, this->num_ref_intervals = this->rgfa_intervals.size(); #ifdef debug +#pragma omp critical(cerr) cerr << "[rgfa] Selected " << rgfa_intervals.size() << " rank=0 reference paths" << endl; #endif @@ -148,8 +149,6 @@ void RGFACover::compute(const PathHandleGraph* graph, vector queue = {snarl}; - cerr << "top level snarl " << pb2json(*snarl) << endl; - while(!queue.empty()) { const Snarl* cur_snarl = queue.back(); queue.pop_back(); @@ -168,24 +167,23 @@ void RGFACover::compute(const PathHandleGraph* graph, } }); - // now we need to fold up the thread covers + // now we need to fold up the thread covers for (int64_t t = 0; t < thread_count; ++t) { - int64_t offset = this->rgfa_intervals.size(); - - // todo: figure out one-shot stl invocation to move - for (int64_t j = 0; j < rgfa_intervals_vector[t].size(); ++j) { - this->rgfa_intervals.push_back(rgfa_intervals_vector[t][j]); - } #ifdef debug +#pragma omp critical(cerr) cerr << "Adding " << rgfa_intervals_vector[t].size() << " rgfa intervals from thread " << t << endl; #endif - rgfa_intervals_vector[t].clear(); - - for (const auto& node_interval : node_to_interval_vector[t]) { - this->node_to_interval[node_interval.first] = node_interval.second + offset; + // important to go through function rather than do a raw copy since + // inter-top-level snarl merging may need to happen + for (int64_t j = 0; j < rgfa_intervals_vector[t].size(); ++j) { + // the true flag at the end disables the overlap check. since they were computed + // in separate threads, snarls can overlap by a single node + add_interval(this->rgfa_intervals, this->node_to_interval, rgfa_intervals_vector[t][j], true); } + rgfa_intervals_vector[t].clear(); node_to_interval_vector[t].clear(); } + // todo: we could optionally go through uncovered nodes and try to greedily search // for rgfa intervals at this point, since it seems for some graphs there are @@ -345,7 +343,10 @@ void RGFACover::load(const PathHandleGraph* graph, void RGFACover::apply(MutablePathMutableHandleGraph* mutable_graph) { assert(this->graph == static_cast(mutable_graph)); - +#ifdef debug + cerr << "applying rGFA cover with " << this->num_ref_intervals << " ref intervals " + << " and " << this->rgfa_intervals.size() << " total intervals" << endl; +#endif // index paths isued in rgfa cover unordered_map step_to_offset; unordered_set source_path_set; @@ -484,6 +485,7 @@ void RGFACover::compute_snarl(const Snarl& snarl, PathTraversalFinder& path_trav } } #ifdef debug +#pragma omp critical(cerr) cerr << "doing snarl " << pb2json(snarl.start()) << "-" << pb2json(snarl.end()) << " with " << travs.size() << " travs" << endl; #endif @@ -537,9 +539,12 @@ void RGFACover::compute_snarl(const Snarl& snarl, PathTraversalFinder& path_trav const pair& uncovered_interval = best_stats_fragment.fragment; #ifdef debug +#pragma omp critical(cerr) + { cerr << "best trav: "; for (auto xx : trav) cerr << " " << graph->get_id(graph->get_handle_of_step(xx)); cerr << endl << "uncovered interval [" << uncovered_interval.first << "," << uncovered_interval.second << "]" <get_next_step(trav.at(uncovered_interval.second - 1))); #ifdef debug int64_t interval_length = uncovered_interval.second - uncovered_interval.first; +#pragma omp critical(cerr) cerr << "adding interval with length " << interval_length << endl; #endif add_interval(thread_rgfa_intervals, thread_node_to_interval, new_interval); @@ -619,9 +625,17 @@ vector> RGFACover::get_uncovered_intervals(const vector>& thread_rgfa_intervals, unordered_map& thread_node_to_interval, - const pair& new_interval) { + const pair& new_interval, + bool global) { +#ifdef debug +#pragma omp critical(cerr) + cerr << "adding interval " << graph->get_path_name(graph->get_path_handle_of_step(new_interval.first)) + << ":" << graph->get_id(graph->get_handle_of_step(new_interval.first)) + << "-" << graph->get_id(graph->get_handle_of_step(new_interval.second)) << endl; +#endif bool merged = false; + int64_t merged_interval_idx = -1; path_handle_t path_handle = graph->get_path_handle_of_step(new_interval.first); // check the before-first step. if it's in an interval then it must be immediately @@ -630,11 +644,19 @@ bool RGFACover::add_interval(vector>& thread_ if (before_first_step != graph->path_front_end(graph->get_path_handle_of_step(before_first_step))) { nid_t prev_node_id = graph->get_id(graph->get_handle_of_step(before_first_step)); if (thread_node_to_interval.count(prev_node_id)) { - pair& prev_interval = thread_rgfa_intervals[thread_node_to_interval[prev_node_id]]; + int64_t prev_idx = thread_node_to_interval[prev_node_id]; + pair& prev_interval = thread_rgfa_intervals[prev_idx]; if (graph->get_path_handle_of_step(prev_interval.first) == path_handle) { - assert(prev_interval.second == new_interval.first); +#ifdef debug +#pragma omp critical(cerr) + cerr << "prev interval found" << graph->get_path_name(graph->get_path_handle_of_step(prev_interval.first)) + << ":" << graph->get_id(graph->get_handle_of_step(prev_interval.first)) + << "-" << graph->get_id(graph->get_handle_of_step(prev_interval.second)) << endl; +#endif + assert(global || prev_interval.second == new_interval.first); prev_interval.second = new_interval.second; merged = true; + merged_interval_idx = prev_idx; } } } @@ -644,24 +666,32 @@ bool RGFACover::add_interval(vector>& thread_ if (new_interval.second != graph->path_end(graph->get_path_handle_of_step(new_interval.second))) { nid_t next_node_id = graph->get_id(graph->get_handle_of_step(new_interval.second)); if (thread_node_to_interval.count(next_node_id)) { - pair& next_interval = thread_rgfa_intervals[thread_node_to_interval[next_node_id]]; + int64_t next_idx = thread_node_to_interval[next_node_id]; + pair& next_interval = thread_rgfa_intervals[next_idx]; path_handle_t next_path = graph->get_path_handle_of_step(next_interval.first); if (graph->get_path_handle_of_step(next_interval.first) == path_handle) { - assert(next_interval.first == new_interval.second); +#ifdef debug +#pragma omp critical(cerr) + cerr << "next interval found" << graph->get_path_name(graph->get_path_handle_of_step(next_interval.first)) + << ":" << graph->get_id(graph->get_handle_of_step(next_interval.first)) + << "-" << graph->get_id(graph->get_handle_of_step(next_interval.second)) << endl; +#endif + assert(global || next_interval.first == new_interval.second); next_interval.first = new_interval.first; merged = true; + merged_interval_idx = next_idx; } } } // add the interval to the local (thread safe) structures if (!merged) { - for (step_handle_t step = new_interval.first; step != new_interval.second; step = graph->get_next_step(step)) { - thread_node_to_interval[graph->get_id(graph->get_handle_of_step(step))] = thread_rgfa_intervals.size(); - } + merged_interval_idx = thread_rgfa_intervals.size(); thread_rgfa_intervals.push_back(new_interval); } - + for (step_handle_t step = new_interval.first; step != new_interval.second; step = graph->get_next_step(step)) { + thread_node_to_interval[graph->get_id(graph->get_handle_of_step(step))] = merged_interval_idx; + } return !merged; } diff --git a/src/rgfa.hpp b/src/rgfa.hpp index 74311da333f..6204621b991 100644 --- a/src/rgfa.hpp +++ b/src/rgfa.hpp @@ -106,7 +106,8 @@ class RGFACover { // returns true if a new interval was added, false if an existing interval was updated bool add_interval(vector>& thread_rgfa_intervals, unordered_map& thread_node_to_interval, - const pair& new_interval); + const pair& new_interval, + bool global = false); // get the total coverage of a traversal (sum of step lengths) int64_t get_coverage(const vector& trav, const pair& uncovered_interval);