Skip to content

Commit 75f1d36

Browse files
committed
[Improve][Connector-V2] Add branch field option for paimon sink
[Improve][Connector-V2] Add branch field option for paimon sink fix failures tests [Improve][Connector-V2] Add branch option for paimon sink
1 parent 3473e2d commit 75f1d36

File tree

11 files changed

+252
-6
lines changed

11 files changed

+252
-6
lines changed

docs/en/connector-v2/sink/Paimon.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ libfb303-xxx.jar
7878
| paimon.hadoop.conf | Map | No | - | Properties in hadoop conf |
7979
| paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files |
8080
| paimon.table.non-primary-key | Boolean | false | - | Switch to create `table with PK` or `table without PK`. true : `table without PK`, false : `table with PK` |
81+
| branch | String | No | main | The branch name of Paimon table to write data to. If the branch does not exist, an exception will be thrown. |
8182

8283

8384
## Checkpoint in batch mode

docs/zh/connector-v2/sink/Paimon.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ libfb303-xxx.jar
7777
| paimon.hadoop.conf | Map || - | Hadoop配置文件属性信息 |
7878
| paimon.hadoop.conf-path | 字符串 || - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |
7979
| paimon.table.non-primary-key | Boolean | false | - | 控制创建主键表或者非主键表. 当为true时,创建非主键表, 为false时,创建主键表 |
80+
| branch | 字符串 || main | 要写入数据的Paimon表分支名称。如果指定的分支不存在,将抛出异常。 |
8081

8182
## 批模式下的checkpoint
8283

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public OptionRule optionRule() {
5555
PaimonSinkOptions.DATA_SAVE_MODE,
5656
PaimonSinkOptions.PRIMARY_KEYS,
5757
PaimonSinkOptions.PARTITION_KEYS,
58-
PaimonSinkOptions.WRITE_PROPS)
58+
PaimonSinkOptions.WRITE_PROPS,
59+
PaimonSinkOptions.BRANCH)
5960
.conditional(
6061
PaimonBaseOptions.CATALOG_TYPE,
6162
PaimonCatalogEnum.HIVE,

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class PaimonSinkConfig extends PaimonConfig {
4040
private final DataSaveMode dataSaveMode;
4141
private final CoreOptions.ChangelogProducer changelogProducer;
4242
private final String changelogTmpPath;
43+
private final String branch;
4344
private final Boolean nonPrimaryKey;
4445
private final List<String> primaryKeys;
4546
private final List<String> partitionKeys;
@@ -79,5 +80,6 @@ public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
7980
this.changelogTmpPath =
8081
writeProps.getOrDefault(
8182
PaimonSinkOptions.CHANGELOG_TMP_PATH, System.getProperty("java.io.tmpdir"));
83+
this.branch = readonlyConfig.get(PaimonSinkOptions.BRANCH);
8284
}
8385
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,7 @@ public class PaimonSinkOptions extends PaimonBaseOptions {
6868
.defaultValue(new HashMap<>())
6969
.withDescription(
7070
"Properties passed through to paimon table initialization, such as 'file.format', 'bucket'(org.apache.paimon.CoreOptions)");
71+
72+
public static final Option<String> BRANCH =
73+
Options.key("branch").stringType().defaultValue("main").withDescription("branch");
7174
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,19 @@
4343
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
4444
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
4545

46+
import org.apache.paimon.table.FileStoreTable;
4647
import org.apache.paimon.table.Table;
48+
import org.apache.paimon.utils.BranchManager;
49+
50+
import lombok.extern.slf4j.Slf4j;
4751

4852
import java.io.IOException;
4953
import java.util.Arrays;
5054
import java.util.List;
5155
import java.util.Optional;
5256
import java.util.UUID;
5357

58+
@Slf4j
5459
public class PaimonSink
5560
implements SeaTunnelSink<
5661
SeaTunnelRow,
@@ -66,7 +71,7 @@ public class PaimonSink
6671

6772
public static final String PLUGIN_NAME = "Paimon";
6873

69-
private Table paimonTable;
74+
private FileStoreTable paimonTable;
7075

7176
private JobContext jobContext;
7277

@@ -96,7 +101,17 @@ public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
96101
TablePath tablePath = catalogTable.getTablePath();
97102
boolean tableExists = paimonCatalog.tableExists(tablePath);
98103
if (tableExists) {
99-
this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
104+
String branchName = paimonSinkConfig.getBranch();
105+
this.paimonTable = (FileStoreTable) paimonCatalog.getPaimonTable(tablePath);
106+
BranchManager branchManager = paimonTable.branchManager();
107+
if (!branchManager.branchExists(branchName)) {
108+
throw new UnsupportedOperationException(
109+
"Branch: " + branchName + " not exists");
110+
}
111+
if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
112+
this.paimonTable = paimonTable.switchToBranch(branchName);
113+
log.info("Switch to branch {}", branchName);
114+
}
100115
}
101116
}
102117
}
@@ -173,7 +188,7 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
173188

174189
@Override
175190
public void setLoadTable(Table table) {
176-
this.paimonTable = table;
191+
this.paimonTable = (FileStoreTable) table;
177192
}
178193

179194
@Override

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public OptionRule optionRule() {
5959
PaimonSinkOptions.PRIMARY_KEYS,
6060
PaimonSinkOptions.PARTITION_KEYS,
6161
PaimonSinkOptions.WRITE_PROPS,
62+
PaimonSinkOptions.BRANCH,
6263
SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
6364
.conditional(
6465
PaimonSinkOptions.CATALOG_TYPE,

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.paimon.table.sink.StreamTableWrite;
5959
import org.apache.paimon.table.sink.TableCommitImpl;
6060
import org.apache.paimon.table.sink.TableWrite;
61+
import org.apache.paimon.utils.BranchManager;
6162

6263
import lombok.extern.slf4j.Slf4j;
6364

@@ -274,6 +275,12 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
274275
private void reOpenTableWrite() {
275276
this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType();
276277
this.paimonTable = (FileStoreTable) paimonCatalog.getPaimonTable(paimonTablePath);
278+
String branchName = paimonSinkConfig.getBranch();
279+
BranchManager branchManager = paimonTable.branchManager();
280+
if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
281+
this.paimonTable = this.paimonTable.switchToBranch(branchName);
282+
log.info("Re-switched to branch {} after reopening table", branchName);
283+
}
277284
this.sinkPaimonTableSchema = this.paimonTable.schema();
278285
this.newTableWrite();
279286
}

seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonPrivilegeCatalogTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
public class PaimonPrivilegeCatalogTest {
106106

107107
private PaimonCatalog authorizedCatalog;
108+
private PaimonCatalog authorizedSelectCatalog;
108109
private PaimonCatalog unAuthorizedCatalog;
109110
private PaimonCatalog rootUserPaimonCatalog;
110111
private String CATALOG_NAME = "paimon_catalog";
@@ -117,9 +118,11 @@ public class PaimonPrivilegeCatalogTest {
117118
private String rootPassword = "123456";
118119
private String bucketKey = "f0";
119120
private String authorizeUser = "paimon";
121+
private String authorizeSelectUser = "authorize_select_user";
120122
private String authorizeUserPassword = "123456";
121123
private String unAuthorizeUser = "unauthorized_paimon";
122124
private String unAuthorizeUserPassword = "123456";
125+
private String authorizeSelectUserPassword = "123456";
123126

124127
private int writeRows = 0;
125128

@@ -129,6 +132,8 @@ public void before() {
129132
initPrivilege();
130133
rootUserPaimonCatalog = createPaimonCatalog(rootUser, rootPassword);
131134
authorizedCatalog = createPaimonCatalog(authorizeUser, authorizeUserPassword);
135+
authorizedSelectCatalog =
136+
createPaimonCatalog(authorizeSelectUser, authorizeSelectUserPassword);
132137
unAuthorizedCatalog = createPaimonCatalog(unAuthorizeUser, unAuthorizeUserPassword);
133138

134139
createUser(authorizeUser, authorizeUserPassword);
@@ -141,6 +146,12 @@ public void before() {
141146
PrivilegeType.INSERT
142147
});
143148
createUser(unAuthorizeUser, unAuthorizeUserPassword);
149+
createUser(authorizeSelectUser, authorizeSelectUserPassword);
150+
grantPrivilege(
151+
authorizeSelectUser,
152+
new PrivilegeType[] {
153+
PrivilegeType.SELECT,
154+
});
144155

145156
createDatabase();
146157
catalogTable = buildTable(TABLE_NAME);
@@ -348,14 +359,14 @@ public void testWriteTable() throws IOException {
348359
NoPrivilegeException.class,
349360
() -> {
350361
try {
351-
writeTable(unAuthorizedCatalog, rows);
362+
writeTable(authorizedSelectCatalog, rows);
352363
} catch (NoPrivilegeException e) {
353364
assertTrue(
354365
e.getMessage()
355366
.contains(
356367
String.format(
357368
"User %s doesn't have privilege INSERT on table",
358-
unAuthorizeUser)));
369+
authorizeSelectUser)));
359370
throw e;
360371
}
361372
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.e2e.connector.paimon;
19+
20+
import org.apache.seatunnel.e2e.common.TestResource;
21+
import org.apache.seatunnel.e2e.common.TestSuiteBase;
22+
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
23+
import org.apache.seatunnel.e2e.common.container.TestContainer;
24+
import org.apache.seatunnel.e2e.common.container.TestContainerId;
25+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
26+
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
27+
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
28+
29+
import org.apache.paimon.catalog.Catalog;
30+
import org.apache.paimon.catalog.CatalogContext;
31+
import org.apache.paimon.catalog.CatalogFactory;
32+
import org.apache.paimon.catalog.Identifier;
33+
import org.apache.paimon.data.InternalRow;
34+
import org.apache.paimon.options.Options;
35+
import org.apache.paimon.reader.RecordReader;
36+
import org.apache.paimon.reader.RecordReaderIterator;
37+
import org.apache.paimon.table.FileStoreTable;
38+
import org.apache.paimon.table.Table;
39+
import org.apache.paimon.table.source.ReadBuilder;
40+
import org.apache.paimon.table.source.TableRead;
41+
import org.apache.paimon.table.source.TableScan;
42+
43+
import org.junit.jupiter.api.AfterEach;
44+
import org.junit.jupiter.api.Assertions;
45+
import org.junit.jupiter.api.TestTemplate;
46+
import org.testcontainers.containers.Container;
47+
import org.testcontainers.utility.MountableFile;
48+
49+
import java.nio.file.Path;
50+
import java.util.List;
51+
52+
@DisabledOnContainer(
53+
value = {TestContainerId.FLINK_1_13, TestContainerId.SPARK_2_4},
54+
disabledReason =
55+
"Paimon does not support flink 1.13, Spark 2.4.6 has a jar package(zstd-jni-version.jar) version compatibility issue.")
56+
public class PaimonSinkBranchIT extends TestSuiteBase implements TestResource {
57+
58+
private final String DATABASE_NAME = "default";
59+
private final String TABLE_NAME = "st_test_p";
60+
61+
private static final String NAMESPACE = "paimon";
62+
protected static String hostName = System.getProperty("user.name");
63+
protected static final String CONTAINER_VOLUME_MOUNT_PATH = "/tmp/seatunnel_mnt";
64+
protected static final boolean isWindows =
65+
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
66+
public static final String HOST_VOLUME_MOUNT_PATH =
67+
isWindows
68+
? String.format("C:/Users/%s/tmp/seatunnel_mnt", hostName)
69+
: CONTAINER_VOLUME_MOUNT_PATH;
70+
71+
@TestContainerExtension
72+
private final ContainerExtendedFactory extendedFactory =
73+
container -> {
74+
Path schemaPath = ContainerUtil.getResourcesFile("/schema-0.json").toPath();
75+
container.copyFileToContainer(
76+
MountableFile.forHostPath(schemaPath),
77+
"/tmp/seatunnel_mnt/paimon/default.db/st_test_p/schema/schema-0");
78+
container.execInContainer("chmod", "777", "-R", "/tmp/seatunnel_mnt/");
79+
};
80+
81+
@Override
82+
public void startUp() throws Exception {}
83+
84+
@Override
85+
@AfterEach
86+
public void tearDown() throws Exception {}
87+
88+
@TestTemplate
89+
public void testSinkBranch(TestContainer container) throws Exception {
90+
91+
String testBranchName = "test-branch";
92+
FileStoreTable table = (FileStoreTable) getTable(DATABASE_NAME, TABLE_NAME);
93+
List<String> branches = table.branchManager().branches();
94+
if (!branches.contains(testBranchName)) {
95+
table.createBranch(testBranchName);
96+
}
97+
Container.ExecResult textWriteResult = container.executeJob("/fake_to_paimon_branch.conf");
98+
Assertions.assertEquals(0, textWriteResult.getExitCode());
99+
long rowCount = getTableRowCount(table);
100+
Assertions.assertEquals(0, rowCount);
101+
102+
FileStoreTable fileStoreTableWithBranch = table.switchToBranch(testBranchName);
103+
rowCount = getTableRowCount(fileStoreTableWithBranch);
104+
Assertions.assertEquals(10001, rowCount);
105+
}
106+
107+
private Table getTable(String dbName, String tbName) {
108+
Options options = new Options();
109+
String warehouse =
110+
String.format(
111+
"%s%s/%s", isWindows ? "" : "file://", HOST_VOLUME_MOUNT_PATH, NAMESPACE);
112+
options.set("warehouse", warehouse);
113+
try {
114+
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
115+
return catalog.getTable(Identifier.create(dbName, tbName));
116+
} catch (Catalog.TableNotExistException e) {
117+
throw new RuntimeException("table not exist");
118+
}
119+
}
120+
121+
private long getTableRowCount(FileStoreTable table) {
122+
try {
123+
ReadBuilder readBuilder = table.newReadBuilder();
124+
TableScan.Plan plan = readBuilder.newScan().plan();
125+
TableRead tableRead = readBuilder.newRead();
126+
127+
long count = 0;
128+
try (RecordReader<InternalRow> reader = tableRead.createReader(plan);
129+
RecordReaderIterator<InternalRow> iterator =
130+
new RecordReaderIterator<>(reader)) {
131+
while (iterator.hasNext()) {
132+
iterator.next();
133+
count++;
134+
}
135+
}
136+
return count;
137+
} catch (Exception e) {
138+
throw new RuntimeException("Failed to read data count from table", e);
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)