Skip to content

Commit 2d34841

Browse files
committed
fix e2e failures
1 parent ae8b595 commit 2d34841

File tree

4 files changed

+31
-33
lines changed

4 files changed

+31
-33
lines changed

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,5 @@ public class PaimonSinkOptions extends PaimonBaseOptions {
7070
"Properties passed through to paimon table initialization, such as 'file.format', 'bucket'(org.apache.paimon.CoreOptions)");
7171

7272
public static final Option<String> BRANCH =
73-
Options.key("branch").stringType().defaultValue("main").withDescription("branch");
73+
Options.key("branch").stringType().noDefaultValue().withDescription("branch");
7474
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
1919

20+
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
21+
2022
import org.apache.seatunnel.api.common.JobContext;
2123
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2224
import org.apache.seatunnel.api.serialization.DefaultSerializer;
@@ -97,21 +99,25 @@ public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
9799
paimonCatalog.open();
98100
boolean databaseExists =
99101
paimonCatalog.databaseExists(this.paimonSinkConfig.getNamespace());
100-
if (databaseExists) {
101-
TablePath tablePath = catalogTable.getTablePath();
102-
boolean tableExists = paimonCatalog.tableExists(tablePath);
103-
if (tableExists) {
104-
String branchName = paimonSinkConfig.getBranch();
105-
this.paimonTable = (FileStoreTable) paimonCatalog.getPaimonTable(tablePath);
106-
BranchManager branchManager = paimonTable.branchManager();
107-
if (!branchManager.branchExists(branchName)) {
108-
throw new UnsupportedOperationException(
109-
"Branch: " + branchName + " not exists");
110-
}
111-
if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
112-
this.paimonTable = paimonTable.switchToBranch(branchName);
113-
log.info("Switch to branch {}", branchName);
114-
}
102+
if (!databaseExists) {
103+
return;
104+
}
105+
TablePath tablePath = catalogTable.getTablePath();
106+
boolean tableExists = paimonCatalog.tableExists(tablePath);
107+
if (!tableExists) {
108+
return;
109+
}
110+
this.paimonTable = (FileStoreTable) paimonCatalog.getPaimonTable(tablePath);
111+
String branchName = paimonSinkConfig.getBranch();
112+
if (StringUtils.isNotEmpty(branchName)) {
113+
BranchManager branchManager = paimonTable.branchManager();
114+
if (!branchManager.branchExists(branchName)) {
115+
throw new UnsupportedOperationException(
116+
"Branch: " + branchName + " not exists");
117+
}
118+
if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
119+
this.paimonTable = paimonTable.switchToBranch(branchName);
120+
log.info("Switch to branch {}", branchName);
115121
}
116122
}
117123
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.paimon.sink;
1919

2020
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
21+
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
2122

2223
import org.apache.seatunnel.api.common.JobContext;
2324
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -276,10 +277,12 @@ private void reOpenTableWrite() {
276277
this.seaTunnelRowType = this.sourceTableSchema.toPhysicalRowDataType();
277278
this.paimonTable = (FileStoreTable) paimonCatalog.getPaimonTable(paimonTablePath);
278279
String branchName = paimonSinkConfig.getBranch();
279-
BranchManager branchManager = paimonTable.branchManager();
280-
if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
281-
this.paimonTable = this.paimonTable.switchToBranch(branchName);
282-
log.info("Re-switched to branch {} after reopening table", branchName);
280+
if (StringUtils.isNotEmpty(branchName)) {
281+
BranchManager branchManager = paimonTable.branchManager();
282+
if (!branchManager.DEFAULT_MAIN_BRANCH.equalsIgnoreCase(branchName)) {
283+
this.paimonTable = this.paimonTable.switchToBranch(branchName);
284+
log.info("Re-switched to branch {} after reopening table", branchName);
285+
}
283286
}
284287
this.sinkPaimonTableSchema = this.paimonTable.schema();
285288
this.newTableWrite();

seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonPrivilegeCatalogTest.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@
105105
public class PaimonPrivilegeCatalogTest {
106106

107107
private PaimonCatalog authorizedCatalog;
108-
private PaimonCatalog authorizedSelectCatalog;
109108
private PaimonCatalog unAuthorizedCatalog;
110109
private PaimonCatalog rootUserPaimonCatalog;
111110
private String CATALOG_NAME = "paimon_catalog";
@@ -118,11 +117,9 @@ public class PaimonPrivilegeCatalogTest {
118117
private String rootPassword = "123456";
119118
private String bucketKey = "f0";
120119
private String authorizeUser = "paimon";
121-
private String authorizeSelectUser = "authorize_select_user";
122120
private String authorizeUserPassword = "123456";
123121
private String unAuthorizeUser = "unauthorized_paimon";
124122
private String unAuthorizeUserPassword = "123456";
125-
private String authorizeSelectUserPassword = "123456";
126123

127124
private int writeRows = 0;
128125

@@ -132,8 +129,6 @@ public void before() {
132129
initPrivilege();
133130
rootUserPaimonCatalog = createPaimonCatalog(rootUser, rootPassword);
134131
authorizedCatalog = createPaimonCatalog(authorizeUser, authorizeUserPassword);
135-
authorizedSelectCatalog =
136-
createPaimonCatalog(authorizeSelectUser, authorizeSelectUserPassword);
137132
unAuthorizedCatalog = createPaimonCatalog(unAuthorizeUser, unAuthorizeUserPassword);
138133

139134
createUser(authorizeUser, authorizeUserPassword);
@@ -146,12 +141,6 @@ public void before() {
146141
PrivilegeType.INSERT
147142
});
148143
createUser(unAuthorizeUser, unAuthorizeUserPassword);
149-
createUser(authorizeSelectUser, authorizeSelectUserPassword);
150-
grantPrivilege(
151-
authorizeSelectUser,
152-
new PrivilegeType[] {
153-
PrivilegeType.SELECT,
154-
});
155144

156145
createDatabase();
157146
catalogTable = buildTable(TABLE_NAME);
@@ -359,14 +348,14 @@ public void testWriteTable() throws IOException {
359348
NoPrivilegeException.class,
360349
() -> {
361350
try {
362-
writeTable(authorizedSelectCatalog, rows);
351+
writeTable(unAuthorizedCatalog, rows);
363352
} catch (NoPrivilegeException e) {
364353
assertTrue(
365354
e.getMessage()
366355
.contains(
367356
String.format(
368357
"User %s doesn't have privilege INSERT on table",
369-
authorizeSelectUser)));
358+
unAuthorizeUser)));
370359
throw e;
371360
}
372361
});

0 commit comments

Comments
 (0)