1313#include " caliper/cali.h"
1414#include " caliper/cali-manager.h"
1515
16- #include " caliper/reader/Aggregator.h"
1716#include " caliper/reader/CaliReader.h"
1817#include " caliper/reader/CaliperMetadataDB.h"
1918#include " caliper/reader/FormatProcessor.h"
20- #include " caliper/reader/Preprocessor.h"
21- #include " caliper/reader/RecordProcessor.h"
22- #include " caliper/reader/RecordSelector.h"
19+ #include " caliper/reader/QueryProcessor.h"
2320
2421#include " caliper/common/Node.h"
2522#include " caliper/common/OutputStream.h"
@@ -109,42 +106,8 @@ const Args::Table option_table[] = {
109106 Args::Terminator
110107};
111108
112- // / A node record filter that filters redundant identical node records.
113- // / Redundant node records can occur when merging/unifying two streams.
114- class FilterDuplicateNodes
115- {
116- cali_id_t m_max_node;
117-
118- public:
119-
120- FilterDuplicateNodes () : m_max_node { 0 } {}
121-
122- void operator () (CaliperMetadataAccessInterface& db, const Node* node, NodeProcessFn push)
123- {
124- cali_id_t id = node->id ();
125-
126- if (id != CALI_INV_ID) {
127- if (id < m_max_node) {
128- return ;
129- } else
130- m_max_node = id;
131- }
132-
133- push (db, node);
134- }
135- };
136-
137- // / NodeFilterStep helper struct
138- // / Basically the chain link in the processing chain.
139- // / Passes result of @param m_filter_fn to @param m_push_fn
140- struct NodeFilterStep {
141- NodeFilterFn m_filter_fn; // /< This processing step
142- NodeProcessFn m_push_fn; // /< Next processing step
143-
144- NodeFilterStep (NodeFilterFn filter_fn, NodeProcessFn push_fn) : m_filter_fn { filter_fn }, m_push_fn { push_fn } {}
145-
146- void operator () (CaliperMetadataAccessInterface& db, const Node* node) { m_filter_fn (db, node, m_push_fn); }
147- };
109+ void node_proc_noop (CaliperMetadataAccessInterface&,const Node*) {}
110+ void snap_proc_noop (CaliperMetadataAccessInterface&,const EntryList&) {}
148111
149112} // namespace
150113
@@ -238,41 +201,10 @@ int main(int argc, const char* argv[])
238201 return -2 ;
239202 }
240203
241- QuerySpec spec = query_parser.spec ();
242-
243204 // setup format spec
244205
245- FormatProcessor format (spec, stream);
246-
247- NodeProcessFn node_proc = [](CaliperMetadataAccessInterface&, const Node*) {
248- return ;
249- };
250- SnapshotProcessFn snap_proc = [](CaliperMetadataAccessInterface&, const EntryList&) {
251- return ;
252- };
253-
254- Aggregator aggregate (spec);
255-
256- if (!args.is_set (" list-globals" )) {
257- if (spec.aggregate .selection == QuerySpec::AggregationSelection::None)
258- snap_proc = format;
259- else
260- snap_proc = aggregate;
261-
262- if (spec.filter .selection == QuerySpec::FilterSelection::List)
263- snap_proc = SnapshotFilterStep (RecordSelector (spec), snap_proc);
264- if (!spec.preprocess_ops .empty ())
265- snap_proc = SnapshotFilterStep (Preprocessor (spec), snap_proc);
266-
267- if (args.is_set (" list-attributes" )) {
268- node_proc = AttributeExtract (snap_proc);
269- snap_proc = [](CaliperMetadataAccessInterface&, const EntryList&) {
270- return ;
271- };
272- }
273- }
274-
275- node_proc = ::NodeFilterStep (::FilterDuplicateNodes (), node_proc);
206+ QuerySpec spec = query_parser.spec ();
207+ QueryProcessor processor (spec, stream);
276208
277209 std::vector<std::string> files = args.arguments ();
278210
@@ -299,7 +231,12 @@ int main(int argc, const char* argv[])
299231 std::cerr << " cali-query: Reading " << file << std::endl;
300232
301233 CaliReader reader;
302- reader.read (file, metadb, node_proc, snap_proc);
234+ if (args.is_set (" list-attributes" ))
235+ reader.read (file, metadb, AttributeExtract (processor), snap_proc_noop);
236+ else if (args.is_set (" list-globals" ))
237+ reader.read (file, metadb, node_proc_noop, snap_proc_noop);
238+ else
239+ reader.read (file, metadb, node_proc_noop, processor);
303240
304241 if (reader.error ())
305242 std::cerr << " cali-query: Error reading " << file << " : " << reader.error_msg () << std::endl;
@@ -331,8 +268,7 @@ int main(int argc, const char* argv[])
331268 global_format.process_record (metadb, metadb.get_globals ());
332269 global_format.flush (metadb);
333270 } else {
334- aggregate.flush (metadb, format);
335- format.flush (metadb);
271+ processor.flush (metadb);
336272 }
337273
338274 CALI_MARK_END (" Writing" );
0 commit comments