Skip to content

Commit bfd4fb2

Browse files
MyroTkarthurpassos
authored andcommitted
Merge pull request #1320 from Altinity/accept_table_function_as_destination_for_part_export
Accept table function as destination for part export
1 parent 55d931d commit bfd4fb2

14 files changed

+229
-43
lines changed

docs/en/engines/table-engines/mergetree-family/part_export.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ SETTINGS allow_experimental_export_merge_tree_part = 1
2121
[, setting_name = value, ...]
2222
```
2323

24+
## Syntax with table function
25+
26+
```sql
27+
ALTER TABLE [database.]table_name
28+
EXPORT PART 'part_name'
29+
TO TABLE FUNCTION s3(s3_conn, filename='table_function', partition_strategy...)
30+
SETTINGS allow_experimental_export_merge_tree_part = 1
31+
[, setting_name = value, ...]
32+
```
33+
2434
### Parameters
2535

2636
- **`table_name`**: The source MergeTree table containing the part to export
@@ -34,6 +44,8 @@ Source and destination tables must be 100% compatible:
3444
1. **Identical schemas** - same columns, types, and order
3545
2. **Matching partition keys** - partition expressions must be identical
3646

47+
In case a table function is used as the destination, the schema can be omitted and it will be inferred from the source table.
48+
3749
## Settings
3850

3951
### `allow_experimental_export_merge_tree_part` (Required)
@@ -95,6 +107,20 @@ ALTER TABLE mt_table EXPORT PART '2021_2_2_0' TO TABLE s3_table
95107
SETTINGS allow_experimental_export_merge_tree_part = 1;
96108
```
97109

110+
### Table function export
111+
112+
```sql
113+
-- Create source and destination tables
114+
CREATE TABLE mt_table (id UInt64, year UInt16)
115+
ENGINE = MergeTree() PARTITION BY year ORDER BY tuple();
116+
117+
-- Insert and export
118+
INSERT INTO mt_table VALUES (1, 2020), (2, 2020), (3, 2021);
119+
120+
ALTER TABLE mt_table EXPORT PART '2020_1_1_0' TO TABLE FUNCTION s3(s3_conn, filename='table_function', format=Parquet, partition_strategy='hive') PARTITION BY year
121+
SETTINGS allow_experimental_export_merge_tree_part = 1;
122+
```
123+
98124
## Monitoring
99125

100126
### Active Exports

src/Interpreters/InterpreterAlterQuery.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,9 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
576576
case ASTAlterCommand::EXPORT_PART:
577577
{
578578
required_access.emplace_back(AccessType::ALTER_EXPORT_PART, database, table);
579-
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
579+
/// For table functions, access control is handled by the table function itself
580+
if (!command.to_table_function)
581+
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
580582
break;
581583
}
582584
case ASTAlterCommand::EXPORT_PARTITION:

src/Parsers/ASTAlterQuery.cpp

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ ASTPtr ASTAlterCommand::clone() const
6565
res->sql_security = res->children.emplace_back(sql_security->clone()).get();
6666
if (rename_to)
6767
res->rename_to = res->children.emplace_back(rename_to->clone()).get();
68+
if (to_table_function)
69+
res->to_table_function = res->children.emplace_back(to_table_function->clone()).get();
70+
if (partition_by_expr)
71+
res->partition_by_expr = res->children.emplace_back(partition_by_expr->clone()).get();
6872

6973
return res;
7074
}
@@ -377,11 +381,23 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
377381
{
378382
case DataDestinationType::TABLE:
379383
ostr << "TABLE ";
380-
if (!to_database.empty())
384+
if (to_table_function)
385+
{
386+
ostr << "FUNCTION ";
387+
to_table_function->format(ostr, settings, state, frame);
388+
if (partition_by_expr)
389+
{
390+
ostr << " PARTITION BY ";
391+
partition_by_expr->format(ostr, settings, state, frame);
392+
}
393+
}
394+
else
381395
{
382-
ostr << backQuoteIfNeed(to_database) << ".";
396+
if (!to_database.empty())
397+
ostr << backQuoteIfNeed(to_database) << ".";
398+
399+
ostr << backQuoteIfNeed(to_table);
383400
}
384-
ostr << backQuoteIfNeed(to_table);
385401
return;
386402
default:
387403
break;
@@ -603,6 +619,8 @@ void ASTAlterCommand::forEachPointerToChild(std::function<void(IAST **, boost::i
603619
f(&select, nullptr);
604620
f(&sql_security, nullptr);
605621
f(&rename_to, nullptr);
622+
f(&to_table_function, nullptr);
623+
f(&partition_by_expr, nullptr);
606624
}
607625

608626

src/Parsers/ASTAlterQuery.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ class ASTAlterCommand : public IAST
221221
/// MOVE PARTITION partition TO TABLE db.table
222222
String to_database;
223223
String to_table;
224+
/// EXPORT PART/PARTITION to TABLE FUNCTION (e.g., s3())
225+
IAST * to_table_function = nullptr;
226+
IAST * partition_by_expr = nullptr;
224227

225228
String snapshot_name;
226229
IAST * snapshot_desc;

src/Parsers/ParserAlterQuery.cpp

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
9595
ParserKeyword s_unfreeze(Keyword::UNFREEZE);
9696
ParserKeyword s_unlock_snapshot(Keyword::UNLOCK_SNAPSHOT);
9797
ParserKeyword s_partition(Keyword::PARTITION);
98+
ParserKeyword s_partition_by(Keyword::PARTITION_BY);
9899

99100
ParserKeyword s_first(Keyword::FIRST);
100101
ParserKeyword s_after(Keyword::AFTER);
@@ -109,6 +110,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
109110
ParserKeyword s_to_volume(Keyword::TO_VOLUME);
110111
ParserKeyword s_to_table(Keyword::TO_TABLE);
111112
ParserKeyword s_to_shard(Keyword::TO_SHARD);
113+
ParserKeyword s_function(Keyword::FUNCTION);
112114

113115
ParserKeyword s_delete(Keyword::DELETE);
114116
ParserKeyword s_update(Keyword::UPDATE);
@@ -179,6 +181,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
179181
ASTPtr command_rename_to;
180182
ASTPtr command_sql_security;
181183
ASTPtr command_snapshot_desc;
184+
ASTPtr export_table_function;
185+
ASTPtr export_table_function_partition_by_expr;
182186

183187
if (with_round_bracket)
184188
{
@@ -556,9 +560,27 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
556560
return false;
557561
}
558562

559-
if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
560-
return false;
561-
command->move_destination_type = DataDestinationType::TABLE;
563+
if (s_function.ignore(pos, expected))
564+
{
565+
ParserFunction table_function_parser(/*allow_function_parameters=*/true, /*is_table_function=*/true);
566+
567+
if (!table_function_parser.parse(pos, export_table_function, expected))
568+
return false;
569+
570+
if (s_partition_by.ignore(pos, expected))
571+
if (!parser_exp_elem.parse(pos, export_table_function_partition_by_expr, expected))
572+
return false;
573+
574+
command->to_table_function = export_table_function.get();
575+
command->partition_by_expr = export_table_function_partition_by_expr.get();
576+
command->move_destination_type = DataDestinationType::TABLE;
577+
}
578+
else
579+
{
580+
if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
581+
return false;
582+
command->move_destination_type = DataDestinationType::TABLE;
583+
}
562584
}
563585
else if (s_export_partition.ignore(pos, expected))
564586
{
@@ -1121,6 +1143,10 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
11211143
command->rename_to = command->children.emplace_back(std::move(command_rename_to)).get();
11221144
if (command_snapshot_desc)
11231145
command->snapshot_desc = command->children.emplace_back(std::move(command_snapshot_desc)).get();
1146+
if (export_table_function)
1147+
command->to_table_function = command->children.emplace_back(std::move(export_table_function)).get();
1148+
if (export_table_function_partition_by_expr)
1149+
command->partition_by_expr = command->children.emplace_back(std::move(export_table_function_partition_by_expr)).get();
11241150

11251151
return true;
11261152
}

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -111,19 +111,12 @@ bool ExportPartTask::executeStep()
111111
block_with_partition_values = manifest.data_part->minmax_idx->getBlock(storage);
112112
}
113113

114-
auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, local_context);
115-
if (!destination_storage)
116-
{
117-
std::lock_guard inner_lock(storage.export_manifests_mutex);
118-
119-
const auto destination_storage_id_name = manifest.destination_storage_id.getNameForLogs();
120-
storage.export_manifests.erase(manifest);
121-
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name);
122-
}
114+
const auto & destination_storage = manifest.destination_storage_ptr;
115+
const auto destination_storage_id = destination_storage->getStorageID();
123116

124117
auto exports_list_entry = storage.getContext()->getExportsList().insert(
125118
getStorageID(),
126-
manifest.destination_storage_id,
119+
destination_storage_id,
127120
manifest.data_part->getBytesOnDisk(),
128121
manifest.data_part->name,
129122
std::vector<std::string>{},

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <Storages/MergeTree/ExportList.h>
66
#include <Access/AccessControl.h>
7+
#include <TableFunctions/TableFunctionFactory.h>
78
#include <AggregateFunctions/AggregateFunctionCount.h>
89
#include <Analyzer/QueryTreeBuilder.h>
910
#include <Analyzer/Utils.h>
@@ -6444,9 +6445,45 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP
64446445

64456446
const auto part_name = command.partition->as<ASTLiteral &>().value.safeGet<String>();
64466447

6447-
const auto database_name = query_context->resolveDatabase(command.to_database);
6448+
if (!command.to_table_function)
6449+
{
6450+
const auto database_name = query_context->resolveDatabase(command.to_database);
6451+
exportPartToTable(part_name, StorageID{database_name, command.to_table}, generateSnowflakeIDString(), query_context);
6452+
6453+
return;
6454+
}
6455+
6456+
auto table_function_ast = command.to_table_function;
6457+
auto table_function_ptr = TableFunctionFactory::instance().get(command.to_table_function, query_context);
6458+
6459+
if (table_function_ptr->needStructureHint())
6460+
{
6461+
const auto source_metadata_ptr = getInMemoryMetadataPtr();
6462+
6463+
/// Grab only the readable columns from the source metadata to skip ephemeral columns
6464+
const auto readable_columns = ColumnsDescription(source_metadata_ptr->getColumns().getReadable());
6465+
table_function_ptr->setStructureHint(readable_columns);
6466+
}
6467+
6468+
if (command.partition_by_expr)
6469+
{
6470+
table_function_ptr->setPartitionBy(command.partition_by_expr);
6471+
}
6472+
6473+
auto dest_storage = table_function_ptr->execute(
6474+
table_function_ast,
6475+
query_context,
6476+
table_function_ptr->getName(),
6477+
/* cached_columns */ {},
6478+
/* use_global_context */ false,
6479+
/* is_insert_query */ true);
6480+
6481+
if (!dest_storage)
6482+
{
6483+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failed to reconstruct destination storage");
6484+
}
64486485

6449-
exportPartToTable(part_name, StorageID{database_name, command.to_table}, generateSnowflakeIDString(), query_context);
6486+
exportPartToTable(part_name, dest_storage, generateSnowflakeIDString(), query_context);
64506487
}
64516488

64526489
void MergeTreeData::exportPartToTable(
@@ -6464,6 +6501,17 @@ void MergeTreeData::exportPartToTable(
64646501
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Exporting to the same table is not allowed");
64656502
}
64666503

6504+
exportPartToTable(part_name, dest_storage, transaction_id, query_context, allow_outdated_parts, completion_callback);
6505+
}
6506+
6507+
void MergeTreeData::exportPartToTable(
6508+
const std::string & part_name,
6509+
const StoragePtr & dest_storage,
6510+
const String & transaction_id,
6511+
ContextPtr query_context,
6512+
bool allow_outdated_parts,
6513+
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback)
6514+
{
64676515
if (!dest_storage->supportsImport())
64686516
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Destination storage {} does not support MergeTree parts or uses unsupported partitioning", dest_storage->getName());
64696517

@@ -6533,7 +6581,7 @@ void MergeTreeData::exportPartToTable(
65336581
{
65346582
const auto format_settings = getFormatSettings(query_context);
65356583
MergeTreePartExportManifest manifest(
6536-
dest_storage->getStorageID(),
6584+
dest_storage,
65376585
part,
65386586
transaction_id,
65396587
query_context->getCurrentQueryId(),
@@ -6546,8 +6594,7 @@ void MergeTreeData::exportPartToTable(
65466594

65476595
if (!export_manifests.emplace(std::move(manifest)).second)
65486596
{
6549-
throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'",
6550-
part_name, dest_storage->getStorageID().getFullTableName());
6597+
throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported", part_name);
65516598
}
65526599
}
65536600

@@ -9038,8 +9085,9 @@ std::vector<MergeTreeExportStatus> MergeTreeData::getExportsStatus() const
90389085

90399086
status.source_database = source_database;
90409087
status.source_table = source_table;
9041-
status.destination_database = manifest.destination_storage_id.database_name;
9042-
status.destination_table = manifest.destination_storage_id.table_name;
9088+
const auto destination_storage_id = manifest.destination_storage_ptr->getStorageID();
9089+
status.destination_database = destination_storage_id.database_name;
9090+
status.destination_table = destination_storage_id.table_name;
90439091
status.create_time = manifest.create_time;
90449092
status.part_name = manifest.data_part->name;
90459093

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,14 @@ class MergeTreeData : public IStorage, public WithMutableContext
10251025

10261026
void exportPartToTable(const PartitionCommand & command, ContextPtr query_context);
10271027

1028+
void exportPartToTable(
1029+
const std::string & part_name,
1030+
const StoragePtr & destination_storage,
1031+
const String & transaction_id,
1032+
ContextPtr query_context,
1033+
bool allow_outdated_parts = false,
1034+
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback = {});
1035+
10281036
void exportPartToTable(
10291037
const std::string & part_name,
10301038
const StorageID & destination_storage_id,

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <QueryPipeline/QueryPipeline.h>
77
#include <optional>
88
#include <Core/Settings.h>
9+
#include <Storages/IStorage.h>
910

1011
namespace DB
1112
{
@@ -43,15 +44,15 @@ struct MergeTreePartExportManifest
4344
};
4445

4546
MergeTreePartExportManifest(
46-
const StorageID & destination_storage_id_,
47+
const StoragePtr destination_storage_ptr_,
4748
const DataPartPtr & data_part_,
4849
const String & transaction_id_,
4950
const String & query_id_,
5051
FileAlreadyExistsPolicy file_already_exists_policy_,
5152
const Settings & settings_,
5253
const StorageMetadataPtr & metadata_snapshot_,
5354
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
54-
: destination_storage_id(destination_storage_id_),
55+
: destination_storage_ptr(destination_storage_ptr_),
5556
data_part(data_part_),
5657
transaction_id(transaction_id_),
5758
query_id(query_id_),
@@ -61,7 +62,7 @@ struct MergeTreePartExportManifest
6162
completion_callback(completion_callback_),
6263
create_time(time(nullptr)) {}
6364

64-
StorageID destination_storage_id;
65+
StoragePtr destination_storage_ptr;
6566
DataPartPtr data_part;
6667
/// Used for killing the export.
6768
String transaction_id;
@@ -81,20 +82,12 @@ struct MergeTreePartExportManifest
8182

8283
bool operator<(const MergeTreePartExportManifest & rhs) const
8384
{
84-
// Lexicographic comparison: first compare destination storage, then part name
85-
auto lhs_storage = destination_storage_id.getQualifiedName();
86-
auto rhs_storage = rhs.destination_storage_id.getQualifiedName();
87-
88-
if (lhs_storage != rhs_storage)
89-
return lhs_storage < rhs_storage;
90-
9185
return data_part->name < rhs.data_part->name;
9286
}
9387

9488
bool operator==(const MergeTreePartExportManifest & rhs) const
9589
{
96-
return destination_storage_id.getQualifiedName() == rhs.destination_storage_id.getQualifiedName()
97-
&& data_part->name == rhs.data_part->name;
90+
return data_part->name == rhs.data_part->name;
9891
}
9992
};
10093

0 commit comments

Comments
 (0)