12
12
13
13
#include " catalog/abstract_catalog.h"
14
14
15
- #include " binder/bind_node_visitor.h"
16
-
17
15
#include " common/statement.h"
18
16
17
+ #include " catalog/catalog.h"
19
18
#include " catalog/database_catalog.h"
20
19
#include " catalog/table_catalog.h"
21
20
32
31
#include " executor/delete_executor.h"
33
32
#include " executor/index_scan_executor.h"
34
33
#include " executor/insert_executor.h"
34
+ #include " executor/plan_executor.h"
35
35
#include " executor/seq_scan_executor.h"
36
+ #include " executor/update_executor.h"
36
37
37
38
#include " storage/database.h"
38
39
#include " storage/storage_manager.h"
@@ -45,84 +46,95 @@ AbstractCatalog::AbstractCatalog(oid_t catalog_table_oid,
45
46
std::string catalog_table_name,
46
47
catalog::Schema *catalog_table_schema,
47
48
storage::Database *pg_catalog) {
49
+ // set database_oid
50
+ database_oid = pg_catalog->GetOid ();
48
51
// Create catalog_table_
49
52
catalog_table_ = storage::TableFactory::GetDataTable (
50
- CATALOG_DATABASE_OID, catalog_table_oid, catalog_table_schema,
51
- catalog_table_name, DEFAULT_TUPLES_PER_TILEGROUP, true , false , true );
52
-
53
+ database_oid, catalog_table_oid, catalog_table_schema, catalog_table_name,
54
+ DEFAULT_TUPLES_PER_TILEGROUP, true , false , true );
53
55
// Add catalog_table_ into pg_catalog database
54
56
pg_catalog->AddTable (catalog_table_, true );
55
57
}
56
58
57
59
AbstractCatalog::AbstractCatalog (const std::string &catalog_table_ddl,
58
60
concurrency::TransactionContext *txn) {
59
- // Get catalog table schema
61
+ // get catalog table schema
60
62
auto &peloton_parser = parser::PostgresParser::GetInstance ();
61
-
62
- // Build the parse tree
63
- const auto parse_tree_list = peloton_parser.BuildParseTree (catalog_table_ddl);
64
- if (parse_tree_list->GetStatements ().empty ()) {
65
- throw CatalogException (
66
- " Parse tree list has no parse trees. Cannot build plan" );
67
- }
68
- // TODO: support multi-statement queries
69
- auto parse_tree = parse_tree_list->GetStatement (0 );
70
-
71
- // Run binder
72
- auto bind_node_visitor = binder::BindNodeVisitor (txn, DATABASE_CATALOG_NAME);
73
- bind_node_visitor.BindNameToNode (parse_tree);
74
-
75
- // Create the plan tree
76
63
auto create_plan = std::dynamic_pointer_cast<planner::CreatePlan>(
77
- optimizer::Optimizer ().BuildPelotonPlanTree (parse_tree_list, txn));
64
+ optimizer::Optimizer ().BuildPelotonPlanTree (
65
+ peloton_parser.BuildParseTree (catalog_table_ddl), txn));
78
66
auto catalog_table_schema = create_plan->GetSchema ();
79
67
auto catalog_table_name = create_plan->GetTableName ();
80
-
81
- // Create catalog table
68
+ auto catalog_schema_name = create_plan->GetSchemaName ();
69
+ auto catalog_database_name = create_plan->GetDatabaseName ();
70
+ PELOTON_ASSERT (catalog_schema_name == std::string (CATALOG_SCHEMA_NAME));
71
+ // create catalog table
82
72
Catalog::GetInstance ()->CreateTable (
83
- CATALOG_DATABASE_NAME , catalog_table_name,
73
+ catalog_database_name, catalog_schema_name , catalog_table_name,
84
74
std::unique_ptr<catalog::Schema>(catalog_table_schema), txn, true );
85
75
86
- // Get catalog table oid
76
+ // get catalog table oid
87
77
auto catalog_table_object = Catalog::GetInstance ()->GetTableObject (
88
- CATALOG_DATABASE_NAME , catalog_table_name, txn);
78
+ catalog_database_name, catalog_schema_name , catalog_table_name, txn);
89
79
90
- // Set catalog_table_
80
+ // set catalog_table_
91
81
try {
92
82
catalog_table_ = storage::StorageManager::GetInstance ()->GetTableWithOid (
93
- CATALOG_DATABASE_OID, catalog_table_object->GetTableOid ());
83
+ catalog_table_object->GetDatabaseOid (),
84
+ catalog_table_object->GetTableOid ());
85
+ // set database_oid
86
+ database_oid = catalog_table_object->GetDatabaseOid ();
94
87
} catch (CatalogException &e) {
95
88
LOG_TRACE (" Can't find table %d! Return false" ,
96
89
catalog_table_object->GetTableOid ());
97
90
}
98
91
}
99
92
100
93
/* @brief insert tuple(reord) helper function
101
- * @param tuple tuple to be inserted
102
- * @param txn TransactionContext
103
- * @return Whether insertion is Successful
104
- */
94
+ * @param tuple tuple to be inserted
95
+ * @param txn TransactionContext
96
+ * @return Whether insertion is Successful
97
+ */
105
98
bool AbstractCatalog::InsertTuple (std::unique_ptr<storage::Tuple> tuple,
106
99
concurrency::TransactionContext *txn) {
107
100
if (txn == nullptr )
108
101
throw CatalogException (" Insert tuple requires transaction" );
109
102
110
- std::unique_ptr<executor::ExecutorContext> context (
111
- new executor::ExecutorContext (txn));
112
- planner::InsertPlan node (catalog_table_, std::move (tuple));
113
- executor::InsertExecutor executor (&node, context.get ());
114
- executor.Init ();
115
- bool status = executor.Execute ();
103
+ std::vector<type::Value> params;
104
+ std::vector<std::string> columns;
105
+ std::vector<std::vector<std::unique_ptr<expression::AbstractExpression>>>
106
+ values;
107
+ values.push_back (
108
+ std::vector<std::unique_ptr<expression::AbstractExpression>>());
109
+ std::vector<int > result_format (tuple->GetSchema ()->GetColumnCount (), 0 );
110
+ for (size_t i = 0 ; i < tuple->GetSchema ()->GetColumnCount (); i++) {
111
+ params.push_back (tuple->GetValue (i));
112
+ columns.push_back (tuple->GetSchema ()->GetColumn (i).GetName ());
113
+ values[0 ].emplace_back (
114
+ new expression::ConstantValueExpression (tuple->GetValue (i)));
115
+ }
116
+ auto node =
117
+ std::make_shared<planner::InsertPlan>(catalog_table_, &columns, &values);
116
118
117
- return status;
119
+ executor::ExecutionResult this_p_status;
120
+ auto on_complete = [&this_p_status](
121
+ executor::ExecutionResult p_status,
122
+ std::vector<ResultValue> &&values UNUSED_ATTRIBUTE) {
123
+ this_p_status = p_status;
124
+ };
125
+
126
+ executor::PlanExecutor::ExecutePlan (node, txn, params, result_format,
127
+ on_complete);
128
+
129
+ return this_p_status.m_result == peloton::ResultType::SUCCESS;
118
130
}
119
131
120
132
/* @brief Delete a tuple using index scan
121
- * @param index_offset Offset of index for scan
122
- * @param values Values for search
123
- * @param txn TransactionContext
124
- * @return Whether deletion is Successful
125
- */
133
+ * @param index_offset Offset of index for scan
134
+ * @param values Values for search
135
+ * @param txn TransactionContext
136
+ * @return Whether deletion is Successful
137
+ */
126
138
bool AbstractCatalog::DeleteWithIndexScan (
127
139
oid_t index_offset, std::vector<type::Value> values,
128
140
concurrency::TransactionContext *txn) {
@@ -167,12 +179,12 @@ bool AbstractCatalog::DeleteWithIndexScan(
167
179
}
168
180
169
181
/* @brief Index scan helper function
170
- * @param column_offsets Column ids for search (projection)
171
- * @param index_offset Offset of index for scan
172
- * @param values Values for search
173
- * @param txn TransactionContext
174
- * @return Unique pointer of vector of logical tiles
175
- */
182
+ * @param column_offsets Column ids for search (projection)
183
+ * @param index_offset Offset of index for scan
184
+ * @param values Values for search
185
+ * @param txn TransactionContext
186
+ * @return Unique pointer of vector of logical tiles
187
+ */
176
188
std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>>
177
189
AbstractCatalog::GetResultWithIndexScan (
178
190
std::vector<oid_t > column_offsets, oid_t index_offset,
@@ -215,14 +227,14 @@ AbstractCatalog::GetResultWithIndexScan(
215
227
}
216
228
217
229
/* @brief Sequential scan helper function
218
- * NOTE: try to use efficient index scan instead of sequential scan, but you
219
- * shouldn't build too many indexes on one catalog table
220
- * @param column_offsets Column ids for search (projection)
221
- * @param predicate predicate for this sequential scan query
222
- * @param txn TransactionContext
223
- *
224
- * @return Unique pointer of vector of logical tiles
225
- */
230
+ * NOTE: try to use efficient index scan instead of sequential scan, but you
231
+ * shouldn't build too many indexes on one catalog table
232
+ * @param column_offsets Column ids for search (projection)
233
+ * @param predicate predicate for this sequential scan query
234
+ * @param txn TransactionContext
235
+ *
236
+ * @return Unique pointer of vector of logical tiles
237
+ */
226
238
std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>>
227
239
AbstractCatalog::GetResultWithSeqScan (std::vector<oid_t > column_offsets,
228
240
expression::AbstractExpression *predicate,
@@ -250,14 +262,14 @@ AbstractCatalog::GetResultWithSeqScan(std::vector<oid_t> column_offsets,
250
262
}
251
263
252
264
/* @brief Add index on catalog table
253
- * @param key_attrs indexed column offset(position)
254
- * @param index_oid index id(global unique)
255
- * @param index_name index name(global unique)
256
- * @param index_constraint index constraints
257
- * @return Unique pointer of vector of logical tiles
258
- * Note: Use catalog::Catalog::CreateIndex() if you can, only ColumnCatalog and
259
- * IndexCatalog should need this
260
- */
265
+ * @param key_attrs indexed column offset(position)
266
+ * @param index_oid index id(global unique)
267
+ * @param index_name index name(global unique)
268
+ * @param index_constraint index constraints
269
+ * @return Unique pointer of vector of logical tiles
270
+ * Note: Use catalog::Catalog::CreateIndex() if you can, only ColumnCatalog and
271
+ * IndexCatalog should need this
272
+ */
261
273
void AbstractCatalog::AddIndex (const std::vector<oid_t > &key_attrs,
262
274
oid_t index_oid, const std::string &index_name,
263
275
IndexConstraintType index_constraint) {
@@ -286,5 +298,74 @@ void AbstractCatalog::AddIndex(const std::vector<oid_t> &key_attrs,
286
298
index_name.c_str (), (int )catalog_table_->GetOid ());
287
299
}
288
300
301
+ /* @brief Update specific columns using index scan
302
+ * @param update_columns Columns to be updated
303
+ * @param update_values Values to be updated
304
+ * @param scan_values Value to be scaned (used in index scan)
305
+ * @param index_offset Offset of index for scan
306
+ * @return true if successfully executes
307
+ */
308
+ bool AbstractCatalog::UpdateWithIndexScan (
309
+ std::vector<oid_t > update_columns, std::vector<type::Value> update_values,
310
+ std::vector<type::Value> scan_values, oid_t index_offset,
311
+ concurrency::TransactionContext *txn) {
312
+ if (txn == nullptr ) throw CatalogException (" Scan table requires transaction" );
313
+
314
+ std::unique_ptr<executor::ExecutorContext> context (
315
+ new executor::ExecutorContext (txn));
316
+ // Construct index scan executor
317
+ auto index = catalog_table_->GetIndex (index_offset);
318
+ std::vector<oid_t > key_column_offsets =
319
+ index->GetMetadata ()->GetKeySchema ()->GetIndexedColumns ();
320
+
321
+ // NOTE: For indexed scan on catalog tables, we expect it not to be "partial
322
+ // indexed scan"(efficiency purpose).That being said, indexed column number
323
+ // must be equal to passed in "scan_values" size
324
+ PELOTON_ASSERT (scan_values.size () == key_column_offsets.size ());
325
+ std::vector<ExpressionType> expr_types (scan_values.size (),
326
+ ExpressionType::COMPARE_EQUAL);
327
+ std::vector<expression::AbstractExpression *> runtime_keys;
328
+
329
+ planner::IndexScanPlan::IndexScanDesc index_scan_desc (
330
+ index->GetOid (), key_column_offsets, expr_types, scan_values,
331
+ runtime_keys);
332
+
333
+ planner::IndexScanPlan index_scan_node (catalog_table_, nullptr ,
334
+ update_columns, index_scan_desc);
335
+
336
+ executor::IndexScanExecutor index_scan_executor (&index_scan_node,
337
+ context.get ());
338
+ // Construct update executor
339
+ TargetList target_list;
340
+ DirectMapList direct_map_list;
341
+
342
+ size_t column_count = catalog_table_->GetSchema ()->GetColumnCount ();
343
+ for (size_t col_itr = 0 ; col_itr < column_count; col_itr++) {
344
+ // Skip any column for update
345
+ if (std::find (std::begin (update_columns), std::end (update_columns),
346
+ col_itr) == std::end (update_columns)) {
347
+ direct_map_list.emplace_back (col_itr, std::make_pair (0 , col_itr));
348
+ }
349
+ }
350
+
351
+ PELOTON_ASSERT (update_columns.size () == update_values.size ());
352
+ for (size_t i = 0 ; i < update_values.size (); i++) {
353
+ planner::DerivedAttribute update_attribute{
354
+ new expression::ConstantValueExpression (update_values[i])};
355
+ target_list.emplace_back (update_columns[i], update_attribute);
356
+ }
357
+
358
+ std::unique_ptr<const planner::ProjectInfo> project_info (
359
+ new planner::ProjectInfo (std::move (target_list),
360
+ std::move (direct_map_list)));
361
+ planner::UpdatePlan update_node (catalog_table_, std::move (project_info));
362
+
363
+ executor::UpdateExecutor update_executor (&update_node, context.get ());
364
+ update_executor.AddChild (&index_scan_executor);
365
+ // Execute
366
+ update_executor.Init ();
367
+ return update_executor.Execute ();
368
+ }
369
+
289
370
} // namespace catalog
290
371
} // namespace peloton
0 commit comments