diff --git a/docs/en/connector-v2/source/Paimon.md b/docs/en/connector-v2/source/Paimon.md index 12c4f83b362..b0cafbc94d6 100644 --- a/docs/en/connector-v2/source/Paimon.md +++ b/docs/en/connector-v2/source/Paimon.md @@ -95,7 +95,11 @@ The filter condition of the table read. For example: `select * from st_test wher Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null, is not null, between...and, in, not in, like, and others are not supported. The Having, Group By, Order By clauses are currently unsupported, because these clauses are not supported by Paimon. you can also project specific columns, for example: select id, name from st_test where id > 100. -The limit will be supported in the future. + +Supports dynamic options settings: +```sql +SELECT * FROM table /*+ OPTIONS('incremental-between' = 'test-tag1,test-tag2') */; +``` Note: When the field after the where condition is a string or boolean value, its value must be enclosed in single quotes, otherwise an error will be reported. `For example: name='abc' or tag='true'` The field data types currently supported by where conditions are as follows: diff --git a/docs/zh/connector-v2/source/Paimon.md b/docs/zh/connector-v2/source/Paimon.md index 343afa1031f..ad484c7f21c 100644 --- a/docs/zh/connector-v2/source/Paimon.md +++ b/docs/zh/connector-v2/source/Paimon.md @@ -97,7 +97,13 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要 Projection 已支持,你可以选择特定的列,例如:select id, name from st_test where id > 100。 -由于 Paimon 限制,目前不支持 `Having`, `Group By` 和 `Order By`,未来版本将会支持 `limit`。 +由于 Paimon 限制,目前不支持 `Having`, `Group By` 和 `Order By`。 + +query 参数支持动态参数设置: +```sql +SELECT * FROM table /*+ OPTIONS('incremental-between' = 'test-tag1,test-tag2') */; +``` + 注意:当 `where` 后的字段为字符串或布尔值时,其值必须使用单引号,否则将会报错。例如 `name='abc'` 或 `tag='true'`。 diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java index 2d970403ce4..501a1c46b92 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java @@ -39,10 +39,11 @@ import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.table.Table; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.RowType; +import lombok.extern.slf4j.Slf4j; import net.sf.jsqlparser.statement.select.PlainSelect; import java.util.LinkedList; @@ -54,6 +55,7 @@ import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect; /** Paimon connector source class. */ +@Slf4j public class PaimonSource implements SeaTunnelSource { @@ -64,7 +66,7 @@ public class PaimonSource private JobContext jobContext; private List catalogTables = Lists.newArrayList(); - private Map paimonTables = Maps.newHashMap(); + private Map paimonTables = Maps.newHashMap(); private Map seaTunnelRowTypes = Maps.newHashMap(); private Map readBuilders = Maps.newHashMap(); @@ -75,12 +77,18 @@ public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) tableConfig -> { TablePath tablePath = tableConfig.getTablePath(); CatalogTable catalogTable = paimonCatalog.getTable(tablePath); - Table paimonTable = paimonCatalog.getPaimonTable(tablePath); + FileStoreTable paimonTable = + (FileStoreTable) paimonCatalog.getPaimonTable(tablePath); + String query = tableConfig.getQuery(); + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + if (!dynamicOptions.isEmpty()) { + paimonTable = paimonTable.copy(dynamicOptions); + } RowType paimonRowType = paimonTable.rowType(); String[] filedNames = paimonRowType.getFieldNames().toArray(new String[0]); - - PlainSelect plainSelect = convertToPlainSelect(tableConfig.getQuery()); + PlainSelect plainSelect = convertToPlainSelect(query); Predicate predicate = null; int[] projectionIndex = null; if (!Objects.isNull(plainSelect)) { @@ -102,13 +110,11 @@ public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) this.seaTunnelRowTypes.put( tableKey, RowTypeConverter.convert(paimonRowType, projectionIndex)); - ReadBuilder readBuilder = paimonTable .newReadBuilder() .withProjection(projectionIndex) .withFilter(predicate); - this.paimonTables.put(tableKey, paimonTable); this.readBuilders.put(tableKey, readBuilder); }); diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java index ac24e64d048..e8247cd0de1 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java @@ -30,7 +30,6 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.TableRead; @@ -51,14 +50,14 @@ public class PaimonSourceReader implements SourceReader sourceSplits = new ConcurrentLinkedDeque<>(); private final SourceReader.Context context; - private final Map tables; + private final Map tables; private final Map seaTunnelRowTypes; private final Map tableReads; private volatile boolean noMoreSplit; public PaimonSourceReader( Context context, - Map tables, + Map tables, Map seaTunnelRowTypes, Map readBuilders) { this.context = context; @@ -86,7 +85,7 @@ public void pollNext(Collector output) throws Exception { final PaimonSourceSplit split = sourceSplits.poll(); if (Objects.nonNull(split)) { String tableId = split.getTableId(); - Table table = tables.get(tableId); + FileStoreTable table = tables.get(tableId); SeaTunnelRowType seaTunnelRowType = seaTunnelRowTypes.get(tableId); TableRead tableRead = tableReads.get(tableId); try (final RecordReader reader = @@ -96,8 +95,7 @@ public void pollNext(Collector output) throws Exception { while (rowIterator.hasNext()) { final InternalRow row = rowIterator.next(); final SeaTunnelRow seaTunnelRow = - RowConverter.convert( - row, seaTunnelRowType, ((FileStoreTable) table).schema()); + RowConverter.convert(row, seaTunnelRowType, table.schema()); if (Boundedness.UNBOUNDED.equals(context.getBoundedness())) { RowKind rowKind = RowKindConverter.convertPaimonRowKind2SeatunnelRowkind( diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java index 1a7ddd0a222..62070284af6 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java @@ -67,7 +67,9 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.regex.Matcher; @@ -149,6 +151,28 @@ public static Predicate convertSqlWhereToPaimonPredicate( return parseExpressionToPredicate(builder, rowType, whereExpression); } + public static Map parseDynamicOptions(String sql) { + Map dynamicOptions = new HashMap<>(); + if (StringUtils.isBlank(sql)) { + return dynamicOptions; + } + String dynamicOptionsPattern = "/\\*\\+ OPTIONS\\((.*?)\\) \\*/"; + Pattern optionsPattern = Pattern.compile(dynamicOptionsPattern, Pattern.CASE_INSENSITIVE); + Matcher optionsMatcher = optionsPattern.matcher(sql); + if (optionsMatcher.find()) { + String optionsContent = optionsMatcher.group(1).trim(); + + Pattern kvPattern = Pattern.compile("'\\s*(.*?)\\s*'\\s*=\\s*'\\s*(.*?)\\s*'"); + Matcher kvMatcher = kvPattern.matcher(optionsContent); + while (kvMatcher.find()) { + String key = kvMatcher.group(1).trim(); + String value = kvMatcher.group(2).trim(); + dynamicOptions.put(key, value); + } + } + return dynamicOptions; + } + private static Predicate parseExpressionToPredicate( PredicateBuilder builder, RowType rowType, Expression expression) { if (expression instanceof IsNullExpression) { diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonDynamicOptionsTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonDynamicOptionsTest.java new file mode 100644 index 00000000000..23f70121e8c --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonDynamicOptionsTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.source; + +import org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PaimonDynamicOptionsTest { + + @Test + public void testParseDynamicOptionsWithIncrementalTimestamp() { + String query = + "SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00') */ WHERE int_col > 3"; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertEquals(1, dynamicOptions.size()); + assertTrue(dynamicOptions.containsKey("incremental-between-timestamp")); + assertEquals( + "2025-03-12 00:00:00,2025-03-12 00:08:00", + dynamicOptions.get("incremental-between-timestamp")); + } + + @Test + public void testParseDynamicOptionsWithScanTag() { + String query = + "SELECT * FROM table /*+ OPTIONS('scan.tag-name' = 'my-tag') */ WHERE int_col > 3"; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertEquals(1, dynamicOptions.size()); + assertTrue(dynamicOptions.containsKey("scan.tag-name")); + assertEquals("my-tag", dynamicOptions.get("scan.tag-name")); + } + + @Test + public void testParseDynamicOptionsWithMultipleOptions() { + String query = + "SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00', 'scan.tag-name' = 'my-tag', 'scan.snapshot-id' = '123') */ WHERE int_col > 3"; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertEquals(3, dynamicOptions.size()); + assertTrue(dynamicOptions.containsKey("incremental-between-timestamp")); + assertTrue(dynamicOptions.containsKey("scan.tag-name")); + assertTrue(dynamicOptions.containsKey("scan.snapshot-id")); + assertEquals( + "2025-03-12 00:00:00,2025-03-12 00:08:00", + dynamicOptions.get("incremental-between-timestamp")); + assertEquals("my-tag", dynamicOptions.get("scan.tag-name")); + assertEquals("123", dynamicOptions.get("scan.snapshot-id")); + } + + @Test + public void testParseDynamicOptionsWithNoOptions() { + String query = "SELECT * FROM table WHERE int_col > 3"; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertTrue(dynamicOptions.isEmpty()); + } + + @Test + public void testParseDynamicOptionsWithEmptyOptions() { + String query = "SELECT * FROM table /*+ OPTIONS() */ WHERE int_col > 3"; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertTrue(dynamicOptions.isEmpty()); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java index 812d1da98ee..9c0bfcd6a65 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java @@ -49,12 +49,14 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Arrays; +import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class SqlToPaimonConverterTest { @@ -292,4 +294,28 @@ public void testConvertSqlWhereToPaimonLikePredicate() { assertEquals(expectedPredicate.toString(), predicate.toString()); } + + @Test + public void testParseDynamicOptions() { + String query = + "SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00') */ WHERE int_col > 3 OR double_col < 6.6 "; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertEquals(1, dynamicOptions.size()); + assertTrue(dynamicOptions.containsKey("incremental-between-timestamp")); + assertEquals( + "2025-03-12 00:00:00,2025-03-12 00:08:00", + dynamicOptions.get("incremental-between-timestamp")); + + query = + "SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00', 'scan.tag-name' = 'my-tag') */ WHERE int_col > 3 OR double_col < 6.6 "; + dynamicOptions = SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertEquals(2, dynamicOptions.size()); + assertTrue(dynamicOptions.containsKey("incremental-between-timestamp")); + assertTrue(dynamicOptions.containsKey("scan.tag-name")); + assertEquals( + "2025-03-12 00:00:00,2025-03-12 00:08:00", + dynamicOptions.get("incremental-between-timestamp")); + assertEquals("my-tag", dynamicOptions.get("scan.tag-name")); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonDynamicOptionsIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonDynamicOptionsIT.java new file mode 100644 index 00000000000..b3cbb0e42a1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonDynamicOptionsIT.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.paimon; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.container.TestContainerId; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.ContainerUtil; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryArray; +import org.apache.paimon.data.BinaryArrayWriter; +import org.apache.paimon.data.BinaryMap; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.DateTimeUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.utility.MountableFile; + +import java.math.BigDecimal; +import java.nio.file.Path; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@DisabledOnContainer( + value = {TestContainerId.FLINK_1_13, TestContainerId.SPARK_2_4}, + disabledReason = + "Paimon does not support flink 1.13, Spark 2.4.6 has a jar package(zstd-jni-version.jar) version compatibility issue.") +public class PaimonDynamicOptionsIT extends TestSuiteBase implements TestResource { + + private final String DATABASE_NAME = "default"; + private final String TABLE_NAME = "st_test_p"; + + private static final String NAMESPACE = "paimon"; + protected static String hostName = System.getProperty("user.name"); + protected static final String CONTAINER_VOLUME_MOUNT_PATH = "/tmp/seatunnel_mnt"; + protected static final boolean isWindows = + System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS"); + public static final String HOST_VOLUME_MOUNT_PATH = + isWindows + ? String.format("C:/Users/%s/tmp/seatunnel_mnt", hostName) + : CONTAINER_VOLUME_MOUNT_PATH; + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Path schemaPath = ContainerUtil.getResourcesFile("/schema-0.json").toPath(); + container.copyFileToContainer( + MountableFile.forHostPath(schemaPath), + "/tmp/seatunnel_mnt/paimon/default.db/st_test/schema/schema-0"); + container.copyFileToContainer( + MountableFile.forHostPath(schemaPath), + "/tmp/seatunnel_mnt/paimon/default.db/st_test_p/schema/schema-0"); + container.copyFileToContainer( + MountableFile.forHostPath(schemaPath), + "/tmp/seatunnel_mnt/paimon/default.db/st_test_p1/schema/schema-0"); + container.execInContainer("chmod", "777", "-R", "/tmp/seatunnel_mnt/"); + }; + + @Override + public void startUp() throws Exception {} + + @Override + @AfterEach + public void tearDown() throws Exception {} + + @TestTemplate + public void testPaimonDynamicOptionsOfBranch(TestContainer container) throws Exception { + String testBranchName = "test-branch"; + FileStoreTable table = (FileStoreTable) getTable(DATABASE_NAME, TABLE_NAME); + List branches = table.branchManager().branches(); + if (!branches.contains(testBranchName)) { + table.createBranch(testBranchName); + } + FileStoreTable fileStoreTableWithBranch = table.switchToBranch(testBranchName); + TableWriteImpl write = fileStoreTableWithBranch.newWrite("3494269"); + + write.write(createTestRow(1L, "First record")); + write.write(createTestRow(2L, "Second record")); + write.write(createTestRow(3L, "Third record")); + write.write(createTestRow(4L, "Fourth record")); + write.write(createTestRow(5L, "Fifth record")); + + List commitMessages = write.prepareCommit(false, 1); + try (TableCommitImpl commit = fileStoreTableWithBranch.newCommit("3494269")) { + commit.commit(commitMessages); + } + write.close(); + + Container.ExecResult textWriteResult = + container.executeJob("/paimon_to_assert_with_dynamic_options_of_branch.conf"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + } + + @TestTemplate + public void testPaimonDynamicOptionsOfTag(TestContainer container) throws Exception { + String testTag1 = "test-tag1"; + String testTag2 = "test-tag2"; + FileStoreTable table = (FileStoreTable) getTable(DATABASE_NAME, TABLE_NAME); + + TableWriteImpl write = table.newWrite("3494269"); + + write.write(createTestRow(1L, "First record")); + write.write(createTestRow(2L, "Second record")); + write.write(createTestRow(3L, "Third record")); + write.write(createTestRow(4L, "Fourth record")); + write.write(createTestRow(5L, "Fifth record")); + + List commitMessages = write.prepareCommit(false, 1); + try (TableCommitImpl commit = table.newCommit("3494269")) { + commit.commit(commitMessages); + } + table.createTag(testTag1); + + Container.ExecResult textWriteTag1 = + container.executeJob("/paimon_to_assert_with_dynamic_options_of_tag1.conf"); + Assertions.assertEquals(0, textWriteTag1.getExitCode()); + + write.write(createTestRow(6L, "Sixth record")); + write.write(createTestRow(7L, "Seventh record")); + commitMessages = write.prepareCommit(false, 1); + try (TableCommitImpl commit = table.newCommit("3494269")) { + commit.commit(commitMessages); + } + table.createTag(testTag2); + write.close(); + + Container.ExecResult textWriteTag2 = + container.executeJob("/paimon_to_assert_with_dynamic_options_of_tag2.conf"); + Assertions.assertEquals(0, textWriteTag2.getExitCode()); + + Container.ExecResult textWriteResult = + container.executeJob("/paimon_to_assert_with_dynamic_options_of_incr_tag.conf"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + } + + private Table getTable(String dbName, String tbName) { + Options options = new Options(); + String warehouse = + String.format( + "%s%s/%s", isWindows ? "" : "file://", HOST_VOLUME_MOUNT_PATH, NAMESPACE); + options.set("warehouse", warehouse); + try { + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); + return catalog.getTable(Identifier.create(dbName, tbName)); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException("table not exist"); + } + } + + private GenericRow createTestRow(Long pkId, String description) { + Map mapData = new HashMap<>(); + mapData.put("key1", "value1_" + pkId); + mapData.put("key2", "value2_" + pkId); + mapData.put("description", description); + BinaryArray keyArray = new BinaryArray(); + BinaryArrayWriter keyWriter = + new BinaryArrayWriter( + keyArray, 3, BinaryArray.calculateFixLengthPartSize(DataTypes.STRING())); + keyWriter.writeString(0, BinaryString.fromString("key1")); + keyWriter.writeString(1, BinaryString.fromString("key2")); + keyWriter.writeString(2, BinaryString.fromString("description")); + keyWriter.complete(); + + BinaryArray valueArray = new BinaryArray(); + BinaryArrayWriter valueWriter = + new BinaryArrayWriter( + valueArray, 3, BinaryArray.calculateFixLengthPartSize(DataTypes.STRING())); + valueWriter.writeString(0, BinaryString.fromString("value1_" + pkId)); + valueWriter.writeString(1, BinaryString.fromString("value2_" + pkId)); + valueWriter.writeString(2, BinaryString.fromString(description)); + valueWriter.complete(); + + BinaryMap binaryMap = BinaryMap.valueOf(keyArray, valueArray); + BinaryArray intArray = new BinaryArray(); + BinaryArrayWriter intArrayWriter = + new BinaryArrayWriter( + intArray, 3, BinaryArray.calculateFixLengthPartSize(DataTypes.INT())); + intArrayWriter.writeInt(0, pkId.intValue()); + intArrayWriter.writeInt(1, pkId.intValue() * 10); + intArrayWriter.writeInt(2, pkId.intValue() * 100); + intArrayWriter.complete(); + return GenericRow.of( + pkId, + binaryMap, + intArray, + BinaryString.fromString(description + "_" + pkId), + pkId % 2 == 0, + (byte) (pkId % 128), + (short) (pkId * 10), + pkId.intValue() * 100, + pkId * 1000L, + pkId.floatValue() + 0.5f, + pkId.doubleValue() + 0.123, + Decimal.fromBigDecimal(new BigDecimal(pkId + ".12345678"), 30, 8), + BinaryString.fromString("bytes_" + pkId).toBytes(), + DateTimeUtils.toInternal(LocalDate.of(2024, 1, pkId.intValue() % 28 + 1)), + Timestamp.fromLocalDateTime( + LocalDateTime.of( + 2024, + 1, + pkId.intValue() % 28 + 1, + pkId.intValue() % 24, + pkId.intValue() % 60, + 0)), + DateTimeUtils.toInternal( + LocalTime.of(pkId.intValue() % 24, pkId.intValue() % 60, 0))); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_branch.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_branch.conf new file mode 100644 index 00000000000..1d6c8696af3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_branch.conf @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/seatunnel_mnt/paimon" + database = "default" + table = "st_test_p" + query = "SELECT * FROM st_test_p /*+ OPTIONS('branch' = 'test-branch') */" + plugin_output = paimon_source + } +} + +sink { + Assert { + plugin_input = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + }, + { + rule_type = MIN_ROW + rule_value = 5 + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_incr_tag.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_incr_tag.conf new file mode 100644 index 00000000000..59eac4591ef --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_incr_tag.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/seatunnel_mnt/paimon" + database = "default" + table = "st_test_p" + query = "SELECT * FROM st_test_p /*+ OPTIONS('incremental-between' = 'test-tag1,test-tag2') */" + plugin_output = paimon_source + } +} + +sink { + Assert { + plugin_input = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 2 + }, + { + rule_type = MIN_ROW + rule_value = 2 + } + ] + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag1.conf new file mode 100644 index 00000000000..7875565e2fd --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag1.conf @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/seatunnel_mnt/paimon" + database = "default" + table = "st_test_p" + query = "SELECT * FROM st_test_p /*+ OPTIONS('scan.tag-name'='test-tag1') */ " + plugin_output = paimon_source + } +} + +sink { + Assert { + plugin_input = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + }, + { + rule_type = MIN_ROW + rule_value = 5 + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag2.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag2.conf new file mode 100644 index 00000000000..6d0a13dfb82 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag2.conf @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/seatunnel_mnt/paimon" + database = "default" + table = "st_test_p" + query = "SELECT * FROM st_test_p /*+ OPTIONS('scan.tag-name'='test-tag2') */ " + plugin_output = paimon_source + } +} + +sink { + Assert { + plugin_input = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 7 + }, + { + rule_type = MIN_ROW + rule_value = 7 + } + ] + } + } +}