From 41b994a6c5d465572cbe7b466ad4c17844ac5e2a Mon Sep 17 00:00:00 2001 From: xiaochen-zhou <598457447@qq.com> Date: Sun, 19 Oct 2025 17:04:02 +0800 Subject: [PATCH 1/3] [Feature][Connector-V2] Support the dynamic options of the paimon table read. --- docs/en/connector-v2/source/Paimon.md | 6 ++++- docs/zh/connector-v2/source/Paimon.md | 8 +++++- .../seatunnel/paimon/source/PaimonSource.java | 20 +++++++++----- .../paimon/source/PaimonSourceReader.java | 10 +++---- .../SqlToPaimonPredicateConverter.java | 21 +++++++++++++++ .../converter/SqlToPaimonConverterTest.java | 26 +++++++++++++++++++ 6 files changed, 76 insertions(+), 15 deletions(-) diff --git a/docs/en/connector-v2/source/Paimon.md b/docs/en/connector-v2/source/Paimon.md index 12c4f83b362..e9acb82b753 100644 --- a/docs/en/connector-v2/source/Paimon.md +++ b/docs/en/connector-v2/source/Paimon.md @@ -95,7 +95,11 @@ The filter condition of the table read. For example: `select * from st_test wher Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null, is not null, between...and, in, not in, like, and others are not supported. The Having, Group By, Order By clauses are currently unsupported, because these clauses are not supported by Paimon. you can also project specific columns, for example: select id, name from st_test where id > 100. -The limit will be supported in the future. + +Supports dynamic options settings: +```sql +SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-10-12 00:00:00,2025-10-12 00:08:00') */ WHERE int_col > 3 OR double_col < 6.6 */; +``` Note: When the field after the where condition is a string or boolean value, its value must be enclosed in single quotes, otherwise an error will be reported. `For example: name='abc' or tag='true'` The field data types currently supported by where conditions are as follows: diff --git a/docs/zh/connector-v2/source/Paimon.md b/docs/zh/connector-v2/source/Paimon.md index 343afa1031f..c604a9236e8 100644 --- a/docs/zh/connector-v2/source/Paimon.md +++ b/docs/zh/connector-v2/source/Paimon.md @@ -97,7 +97,13 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要 Projection 已支持,你可以选择特定的列,例如:select id, name from st_test where id > 100。 -由于 Paimon 限制,目前不支持 `Having`, `Group By` 和 `Order By`,未来版本将会支持 `limit`。 +由于 Paimon 限制,目前不支持 `Having`, `Group By` 和 `Order By`。 + +query 参数支持动态参数设置: +```sql +SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-10-12 00:00:00,2025-10-12 00:08:00') */ WHERE int_col > 3 OR double_col < 6.6 */; +``` + 注意:当 `where` 后的字段为字符串或布尔值时,其值必须使用单引号,否则将会报错。例如 `name='abc'` 或 `tag='true'`。 diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java index 2d970403ce4..16d87c1bfa0 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java @@ -39,10 +39,11 @@ import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.table.Table; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.RowType; +import lombok.extern.slf4j.Slf4j; import net.sf.jsqlparser.statement.select.PlainSelect; import java.util.LinkedList; @@ -54,6 +55,7 @@ import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect; /** Paimon connector source class. */ +@Slf4j public class PaimonSource implements SeaTunnelSource { @@ -64,7 +66,7 @@ public class PaimonSource private JobContext jobContext; private List catalogTables = Lists.newArrayList(); - private Map paimonTables = Maps.newHashMap(); + private Map paimonTables = Maps.newHashMap(); private Map seaTunnelRowTypes = Maps.newHashMap(); private Map readBuilders = Maps.newHashMap(); @@ -75,12 +77,18 @@ public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) tableConfig -> { TablePath tablePath = tableConfig.getTablePath(); CatalogTable catalogTable = paimonCatalog.getTable(tablePath); - Table paimonTable = paimonCatalog.getPaimonTable(tablePath); + FileStoreTable paimonTable = + (FileStoreTable) paimonCatalog.getPaimonTable(tablePath); + String query = tableConfig.getQuery(); + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + if (!dynamicOptions.isEmpty()) { + paimonTable.copy(dynamicOptions); + } RowType paimonRowType = paimonTable.rowType(); String[] filedNames = paimonRowType.getFieldNames().toArray(new String[0]); - - PlainSelect plainSelect = convertToPlainSelect(tableConfig.getQuery()); + PlainSelect plainSelect = convertToPlainSelect(query); Predicate predicate = null; int[] projectionIndex = null; if (!Objects.isNull(plainSelect)) { @@ -102,13 +110,11 @@ public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) this.seaTunnelRowTypes.put( tableKey, RowTypeConverter.convert(paimonRowType, projectionIndex)); - ReadBuilder readBuilder = paimonTable .newReadBuilder() .withProjection(projectionIndex) .withFilter(predicate); - this.paimonTables.put(tableKey, paimonTable); this.readBuilders.put(tableKey, readBuilder); }); diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java index ac24e64d048..e8247cd0de1 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java @@ -30,7 +30,6 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.TableRead; @@ -51,14 +50,14 @@ public class PaimonSourceReader implements SourceReader sourceSplits = new ConcurrentLinkedDeque<>(); private final SourceReader.Context context; - private final Map tables; + private final Map tables; private final Map seaTunnelRowTypes; private final Map tableReads; private volatile boolean noMoreSplit; public PaimonSourceReader( Context context, - Map tables, + Map tables, Map seaTunnelRowTypes, Map readBuilders) { this.context = context; @@ -86,7 +85,7 @@ public void pollNext(Collector output) throws Exception { final PaimonSourceSplit split = sourceSplits.poll(); if (Objects.nonNull(split)) { String tableId = split.getTableId(); - Table table = tables.get(tableId); + FileStoreTable table = tables.get(tableId); SeaTunnelRowType seaTunnelRowType = seaTunnelRowTypes.get(tableId); TableRead tableRead = tableReads.get(tableId); try (final RecordReader reader = @@ -96,8 +95,7 @@ public void pollNext(Collector output) throws Exception { while (rowIterator.hasNext()) { final InternalRow row = rowIterator.next(); final SeaTunnelRow seaTunnelRow = - RowConverter.convert( - row, seaTunnelRowType, ((FileStoreTable) table).schema()); + RowConverter.convert(row, seaTunnelRowType, table.schema()); if (Boundedness.UNBOUNDED.equals(context.getBoundedness())) { RowKind rowKind = RowKindConverter.convertPaimonRowKind2SeatunnelRowkind( diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java index 1a7ddd0a222..cd54c4a51d3 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java @@ -67,7 +67,9 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.regex.Matcher; @@ -149,6 +151,25 @@ public static Predicate convertSqlWhereToPaimonPredicate( return parseExpressionToPredicate(builder, rowType, whereExpression); } + public static Map parseDynamicOptions(String sql) { + Map dynamicOptions = new HashMap<>(); + Pattern optionsPattern = + Pattern.compile("/\\*\\+ OPTIONS\\((.*?)\\) \\*/", Pattern.CASE_INSENSITIVE); + Matcher optionsMatcher = optionsPattern.matcher(sql); + if (optionsMatcher.find()) { + String optionsContent = optionsMatcher.group(1).trim(); + + Pattern kvPattern = Pattern.compile("'\\s*(.*?)\\s*'\\s*=\\s*'\\s*(.*?)\\s*'"); + Matcher kvMatcher = kvPattern.matcher(optionsContent); + while (kvMatcher.find()) { + String key = kvMatcher.group(1).trim(); + String value = kvMatcher.group(2).trim(); + dynamicOptions.put(key, value); + } + } + return dynamicOptions; + } + private static Predicate parseExpressionToPredicate( PredicateBuilder builder, RowType rowType, Expression expression) { if (expression instanceof IsNullExpression) { diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java index 812d1da98ee..9c0bfcd6a65 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java @@ -49,12 +49,14 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.Arrays; +import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class SqlToPaimonConverterTest { @@ -292,4 +294,28 @@ public void testConvertSqlWhereToPaimonLikePredicate() { assertEquals(expectedPredicate.toString(), predicate.toString()); } + + @Test + public void testParseDynamicOptions() { + String query = + "SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00') */ WHERE int_col > 3 OR double_col < 6.6 "; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertEquals(1, dynamicOptions.size()); + assertTrue(dynamicOptions.containsKey("incremental-between-timestamp")); + assertEquals( + "2025-03-12 00:00:00,2025-03-12 00:08:00", + dynamicOptions.get("incremental-between-timestamp")); + + query = + "SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00', 'scan.tag-name' = 'my-tag') */ WHERE int_col > 3 OR double_col < 6.6 "; + dynamicOptions = SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertEquals(2, dynamicOptions.size()); + assertTrue(dynamicOptions.containsKey("incremental-between-timestamp")); + assertTrue(dynamicOptions.containsKey("scan.tag-name")); + assertEquals( + "2025-03-12 00:00:00,2025-03-12 00:08:00", + dynamicOptions.get("incremental-between-timestamp")); + assertEquals("my-tag", dynamicOptions.get("scan.tag-name")); + } } From 9a1f37bab49aa4d4af4784a08eb20fe0f51760f9 Mon Sep 17 00:00:00 2001 From: xiaochen-zhou <598457447@qq.com> Date: Sat, 25 Oct 2025 16:55:32 +0800 Subject: [PATCH 2/3] [Feature][Connector-V2] Support the dynamic options of the paimon table read. --- docs/en/connector-v2/source/Paimon.md | 2 +- docs/zh/connector-v2/source/Paimon.md | 2 +- .../seatunnel/paimon/source/PaimonSource.java | 2 +- .../SqlToPaimonPredicateConverter.java | 4 +- .../source/PaimonDynamicOptionsTest.java | 87 ++++++ .../paimon/PaimonDynamicOptionsIT.java | 248 ++++++++++++++++++ ...assert_with_dynamic_options_of_branch.conf | 49 ++++ ...sert_with_dynamic_options_of_incr_tag.conf | 60 +++++ ...o_assert_with_dynamic_options_of_tag1.conf | 49 ++++ ...o_assert_with_dynamic_options_of_tag2.conf | 49 ++++ 10 files changed, 547 insertions(+), 5 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonDynamicOptionsTest.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonDynamicOptionsIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_branch.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_incr_tag.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag1.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag2.conf diff --git a/docs/en/connector-v2/source/Paimon.md b/docs/en/connector-v2/source/Paimon.md index e9acb82b753..b0cafbc94d6 100644 --- a/docs/en/connector-v2/source/Paimon.md +++ b/docs/en/connector-v2/source/Paimon.md @@ -98,7 +98,7 @@ you can also project specific columns, for example: select id, name from st_test Supports dynamic options settings: ```sql -SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-10-12 00:00:00,2025-10-12 00:08:00') */ WHERE int_col > 3 OR double_col < 6.6 */; +SELECT * FROM table /*+ OPTIONS('incremental-between' = 'test-tag1,test-tag2') */; ``` Note: When the field after the where condition is a string or boolean value, its value must be enclosed in single quotes, otherwise an error will be reported. `For example: name='abc' or tag='true'` diff --git a/docs/zh/connector-v2/source/Paimon.md b/docs/zh/connector-v2/source/Paimon.md index c604a9236e8..ad484c7f21c 100644 --- a/docs/zh/connector-v2/source/Paimon.md +++ b/docs/zh/connector-v2/source/Paimon.md @@ -101,7 +101,7 @@ Projection 已支持,你可以选择特定的列,例如:select id, name from query 参数支持动态参数设置: ```sql -SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-10-12 00:00:00,2025-10-12 00:08:00') */ WHERE int_col > 3 OR double_col < 6.6 */; +SELECT * FROM table /*+ OPTIONS('incremental-between' = 'test-tag1,test-tag2') */; ``` diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java index 16d87c1bfa0..501a1c46b92 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java @@ -83,7 +83,7 @@ public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) Map dynamicOptions = SqlToPaimonPredicateConverter.parseDynamicOptions(query); if (!dynamicOptions.isEmpty()) { - paimonTable.copy(dynamicOptions); + paimonTable = paimonTable.copy(dynamicOptions); } RowType paimonRowType = paimonTable.rowType(); String[] filedNames = diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java index cd54c4a51d3..3e7867c0b68 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java @@ -152,9 +152,9 @@ public static Predicate convertSqlWhereToPaimonPredicate( } public static Map parseDynamicOptions(String sql) { + String dynamicOptionsPattern = "/\\*\\+ OPTIONS\\((.*?)\\) \\*/"; Map dynamicOptions = new HashMap<>(); - Pattern optionsPattern = - Pattern.compile("/\\*\\+ OPTIONS\\((.*?)\\) \\*/", Pattern.CASE_INSENSITIVE); + Pattern optionsPattern = Pattern.compile(dynamicOptionsPattern, Pattern.CASE_INSENSITIVE); Matcher optionsMatcher = optionsPattern.matcher(sql); if (optionsMatcher.find()) { String optionsContent = optionsMatcher.group(1).trim(); diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonDynamicOptionsTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonDynamicOptionsTest.java new file mode 100644 index 00000000000..23f70121e8c --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonDynamicOptionsTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.source; + +import org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PaimonDynamicOptionsTest { + + @Test + public void testParseDynamicOptionsWithIncrementalTimestamp() { + String query = + "SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00') */ WHERE int_col > 3"; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertEquals(1, dynamicOptions.size()); + assertTrue(dynamicOptions.containsKey("incremental-between-timestamp")); + assertEquals( + "2025-03-12 00:00:00,2025-03-12 00:08:00", + dynamicOptions.get("incremental-between-timestamp")); + } + + @Test + public void testParseDynamicOptionsWithScanTag() { + String query = + "SELECT * FROM table /*+ OPTIONS('scan.tag-name' = 'my-tag') */ WHERE int_col > 3"; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertEquals(1, dynamicOptions.size()); + assertTrue(dynamicOptions.containsKey("scan.tag-name")); + assertEquals("my-tag", dynamicOptions.get("scan.tag-name")); + } + + @Test + public void testParseDynamicOptionsWithMultipleOptions() { + String query = + "SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00', 'scan.tag-name' = 'my-tag', 'scan.snapshot-id' = '123') */ WHERE int_col > 3"; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertEquals(3, dynamicOptions.size()); + assertTrue(dynamicOptions.containsKey("incremental-between-timestamp")); + assertTrue(dynamicOptions.containsKey("scan.tag-name")); + assertTrue(dynamicOptions.containsKey("scan.snapshot-id")); + assertEquals( + "2025-03-12 00:00:00,2025-03-12 00:08:00", + dynamicOptions.get("incremental-between-timestamp")); + assertEquals("my-tag", dynamicOptions.get("scan.tag-name")); + assertEquals("123", dynamicOptions.get("scan.snapshot-id")); + } + + @Test + public void testParseDynamicOptionsWithNoOptions() { + String query = "SELECT * FROM table WHERE int_col > 3"; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertTrue(dynamicOptions.isEmpty()); + } + + @Test + public void testParseDynamicOptionsWithEmptyOptions() { + String query = "SELECT * FROM table /*+ OPTIONS() */ WHERE int_col > 3"; + Map dynamicOptions = + SqlToPaimonPredicateConverter.parseDynamicOptions(query); + assertTrue(dynamicOptions.isEmpty()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonDynamicOptionsIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonDynamicOptionsIT.java new file mode 100644 index 00000000000..b3cbb0e42a1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonDynamicOptionsIT.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.paimon; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.container.TestContainerId; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.ContainerUtil; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryArray; +import org.apache.paimon.data.BinaryArrayWriter; +import org.apache.paimon.data.BinaryMap; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.DateTimeUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.utility.MountableFile; + +import java.math.BigDecimal; +import java.nio.file.Path; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@DisabledOnContainer( + value = {TestContainerId.FLINK_1_13, TestContainerId.SPARK_2_4}, + disabledReason = + "Paimon does not support flink 1.13, Spark 2.4.6 has a jar package(zstd-jni-version.jar) version compatibility issue.") +public class PaimonDynamicOptionsIT extends TestSuiteBase implements TestResource { + + private final String DATABASE_NAME = "default"; + private final String TABLE_NAME = "st_test_p"; + + private static final String NAMESPACE = "paimon"; + protected static String hostName = System.getProperty("user.name"); + protected static final String CONTAINER_VOLUME_MOUNT_PATH = "/tmp/seatunnel_mnt"; + protected static final boolean isWindows = + System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS"); + public static final String HOST_VOLUME_MOUNT_PATH = + isWindows + ? String.format("C:/Users/%s/tmp/seatunnel_mnt", hostName) + : CONTAINER_VOLUME_MOUNT_PATH; + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Path schemaPath = ContainerUtil.getResourcesFile("/schema-0.json").toPath(); + container.copyFileToContainer( + MountableFile.forHostPath(schemaPath), + "/tmp/seatunnel_mnt/paimon/default.db/st_test/schema/schema-0"); + container.copyFileToContainer( + MountableFile.forHostPath(schemaPath), + "/tmp/seatunnel_mnt/paimon/default.db/st_test_p/schema/schema-0"); + container.copyFileToContainer( + MountableFile.forHostPath(schemaPath), + "/tmp/seatunnel_mnt/paimon/default.db/st_test_p1/schema/schema-0"); + container.execInContainer("chmod", "777", "-R", "/tmp/seatunnel_mnt/"); + }; + + @Override + public void startUp() throws Exception {} + + @Override + @AfterEach + public void tearDown() throws Exception {} + + @TestTemplate + public void testPaimonDynamicOptionsOfBranch(TestContainer container) throws Exception { + String testBranchName = "test-branch"; + FileStoreTable table = (FileStoreTable) getTable(DATABASE_NAME, TABLE_NAME); + List branches = table.branchManager().branches(); + if (!branches.contains(testBranchName)) { + table.createBranch(testBranchName); + } + FileStoreTable fileStoreTableWithBranch = table.switchToBranch(testBranchName); + TableWriteImpl write = fileStoreTableWithBranch.newWrite("3494269"); + + write.write(createTestRow(1L, "First record")); + write.write(createTestRow(2L, "Second record")); + write.write(createTestRow(3L, "Third record")); + write.write(createTestRow(4L, "Fourth record")); + write.write(createTestRow(5L, "Fifth record")); + + List commitMessages = write.prepareCommit(false, 1); + try (TableCommitImpl commit = fileStoreTableWithBranch.newCommit("3494269")) { + commit.commit(commitMessages); + } + write.close(); + + Container.ExecResult textWriteResult = + container.executeJob("/paimon_to_assert_with_dynamic_options_of_branch.conf"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + } + + @TestTemplate + public void testPaimonDynamicOptionsOfTag(TestContainer container) throws Exception { + String testTag1 = "test-tag1"; + String testTag2 = "test-tag2"; + FileStoreTable table = (FileStoreTable) getTable(DATABASE_NAME, TABLE_NAME); + + TableWriteImpl write = table.newWrite("3494269"); + + write.write(createTestRow(1L, "First record")); + write.write(createTestRow(2L, "Second record")); + write.write(createTestRow(3L, "Third record")); + write.write(createTestRow(4L, "Fourth record")); + write.write(createTestRow(5L, "Fifth record")); + + List commitMessages = write.prepareCommit(false, 1); + try (TableCommitImpl commit = table.newCommit("3494269")) { + commit.commit(commitMessages); + } + table.createTag(testTag1); + + Container.ExecResult textWriteTag1 = + container.executeJob("/paimon_to_assert_with_dynamic_options_of_tag1.conf"); + Assertions.assertEquals(0, textWriteTag1.getExitCode()); + + write.write(createTestRow(6L, "Sixth record")); + write.write(createTestRow(7L, "Seventh record")); + commitMessages = write.prepareCommit(false, 1); + try (TableCommitImpl commit = table.newCommit("3494269")) { + commit.commit(commitMessages); + } + table.createTag(testTag2); + write.close(); + + Container.ExecResult textWriteTag2 = + container.executeJob("/paimon_to_assert_with_dynamic_options_of_tag2.conf"); + Assertions.assertEquals(0, textWriteTag2.getExitCode()); + + Container.ExecResult textWriteResult = + container.executeJob("/paimon_to_assert_with_dynamic_options_of_incr_tag.conf"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + } + + private Table getTable(String dbName, String tbName) { + Options options = new Options(); + String warehouse = + String.format( + "%s%s/%s", isWindows ? "" : "file://", HOST_VOLUME_MOUNT_PATH, NAMESPACE); + options.set("warehouse", warehouse); + try { + Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); + return catalog.getTable(Identifier.create(dbName, tbName)); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException("table not exist"); + } + } + + private GenericRow createTestRow(Long pkId, String description) { + Map mapData = new HashMap<>(); + mapData.put("key1", "value1_" + pkId); + mapData.put("key2", "value2_" + pkId); + mapData.put("description", description); + BinaryArray keyArray = new BinaryArray(); + BinaryArrayWriter keyWriter = + new BinaryArrayWriter( + keyArray, 3, BinaryArray.calculateFixLengthPartSize(DataTypes.STRING())); + keyWriter.writeString(0, BinaryString.fromString("key1")); + keyWriter.writeString(1, BinaryString.fromString("key2")); + keyWriter.writeString(2, BinaryString.fromString("description")); + keyWriter.complete(); + + BinaryArray valueArray = new BinaryArray(); + BinaryArrayWriter valueWriter = + new BinaryArrayWriter( + valueArray, 3, BinaryArray.calculateFixLengthPartSize(DataTypes.STRING())); + valueWriter.writeString(0, BinaryString.fromString("value1_" + pkId)); + valueWriter.writeString(1, BinaryString.fromString("value2_" + pkId)); + valueWriter.writeString(2, BinaryString.fromString(description)); + valueWriter.complete(); + + BinaryMap binaryMap = BinaryMap.valueOf(keyArray, valueArray); + BinaryArray intArray = new BinaryArray(); + BinaryArrayWriter intArrayWriter = + new BinaryArrayWriter( + intArray, 3, BinaryArray.calculateFixLengthPartSize(DataTypes.INT())); + intArrayWriter.writeInt(0, pkId.intValue()); + intArrayWriter.writeInt(1, pkId.intValue() * 10); + intArrayWriter.writeInt(2, pkId.intValue() * 100); + intArrayWriter.complete(); + return GenericRow.of( + pkId, + binaryMap, + intArray, + BinaryString.fromString(description + "_" + pkId), + pkId % 2 == 0, + (byte) (pkId % 128), + (short) (pkId * 10), + pkId.intValue() * 100, + pkId * 1000L, + pkId.floatValue() + 0.5f, + pkId.doubleValue() + 0.123, + Decimal.fromBigDecimal(new BigDecimal(pkId + ".12345678"), 30, 8), + BinaryString.fromString("bytes_" + pkId).toBytes(), + DateTimeUtils.toInternal(LocalDate.of(2024, 1, pkId.intValue() % 28 + 1)), + Timestamp.fromLocalDateTime( + LocalDateTime.of( + 2024, + 1, + pkId.intValue() % 28 + 1, + pkId.intValue() % 24, + pkId.intValue() % 60, + 0)), + DateTimeUtils.toInternal( + LocalTime.of(pkId.intValue() % 24, pkId.intValue() % 60, 0))); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_branch.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_branch.conf new file mode 100644 index 00000000000..1d6c8696af3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_branch.conf @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/seatunnel_mnt/paimon" + database = "default" + table = "st_test_p" + query = "SELECT * FROM st_test_p /*+ OPTIONS('branch' = 'test-branch') */" + plugin_output = paimon_source + } +} + +sink { + Assert { + plugin_input = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + }, + { + rule_type = MIN_ROW + rule_value = 5 + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_incr_tag.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_incr_tag.conf new file mode 100644 index 00000000000..59eac4591ef --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_incr_tag.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/seatunnel_mnt/paimon" + database = "default" + table = "st_test_p" + query = "SELECT * FROM st_test_p /*+ OPTIONS('incremental-between' = 'test-tag1,test-tag2') */" + plugin_output = paimon_source + } +} + +sink { + Assert { + plugin_input = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 2 + }, + { + rule_type = MIN_ROW + rule_value = 2 + } + ] + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag1.conf new file mode 100644 index 00000000000..7875565e2fd --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag1.conf @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/seatunnel_mnt/paimon" + database = "default" + table = "st_test_p" + query = "SELECT * FROM st_test_p /*+ OPTIONS('scan.tag-name'='test-tag1') */ " + plugin_output = paimon_source + } +} + +sink { + Assert { + plugin_input = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + }, + { + rule_type = MIN_ROW + rule_value = 5 + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag2.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag2.conf new file mode 100644 index 00000000000..6d0a13dfb82 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_dynamic_options_of_tag2.conf @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Paimon { + warehouse = "/tmp/seatunnel_mnt/paimon" + database = "default" + table = "st_test_p" + query = "SELECT * FROM st_test_p /*+ OPTIONS('scan.tag-name'='test-tag2') */ " + plugin_output = paimon_source + } +} + +sink { + Assert { + plugin_input = paimon_source + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 7 + }, + { + rule_type = MIN_ROW + rule_value = 7 + } + ] + } + } +} From 7c82be323b8714987d38be408c36256a29ae45fb Mon Sep 17 00:00:00 2001 From: xiaochen-zhou <598457447@qq.com> Date: Sat, 25 Oct 2025 20:10:02 +0800 Subject: [PATCH 3/3] fix test failures --- .../source/converter/SqlToPaimonPredicateConverter.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java index 3e7867c0b68..62070284af6 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java @@ -152,8 +152,11 @@ public static Predicate convertSqlWhereToPaimonPredicate( } public static Map parseDynamicOptions(String sql) { - String dynamicOptionsPattern = "/\\*\\+ OPTIONS\\((.*?)\\) \\*/"; Map dynamicOptions = new HashMap<>(); + if (StringUtils.isBlank(sql)) { + return dynamicOptions; + } + String dynamicOptionsPattern = "/\\*\\+ OPTIONS\\((.*?)\\) \\*/"; Pattern optionsPattern = Pattern.compile(dynamicOptionsPattern, Pattern.CASE_INSENSITIVE); Matcher optionsMatcher = optionsPattern.matcher(sql); if (optionsMatcher.find()) {