Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/en/connector-v2/source/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ The filter condition of the table read. For example: `select * from st_test wher
Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null, is not null, between...and, in, not in, like, and others are not supported.
The Having, Group By, Order By clauses are currently unsupported, because these clauses are not supported by Paimon.
you can also project specific columns, for example: select id, name from st_test where id > 100.
The limit will be supported in the future.

Supports dynamic options settings:
```sql
SELECT * FROM table /*+ OPTIONS('incremental-between' = 'test-tag1,test-tag2') */;
```

Note: When the field after the where condition is a string or boolean value, its value must be enclosed in single quotes, otherwise an error will be reported. `For example: name='abc' or tag='true'`
The field data types currently supported by where conditions are as follows:
Expand Down
8 changes: 7 additions & 1 deletion docs/zh/connector-v2/source/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要

Projection 已支持,你可以选择特定的列,例如:select id, name from st_test where id > 100。

由于 Paimon 限制,目前不支持 `Having`, `Group By` 和 `Order By`,未来版本将会支持 `limit`。
由于 Paimon 限制,目前不支持 `Having`, `Group By` 和 `Order By`。

query 参数支持动态参数设置:
```sql
SELECT * FROM table /*+ OPTIONS('incremental-between' = 'test-tag1,test-tag2') */;
```


注意:当 `where` 后的字段为字符串或布尔值时,其值必须使用单引号,否则将会报错。例如 `name='abc'` 或 `tag='true'`。

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;

import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.RowType;

import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.statement.select.PlainSelect;

import java.util.LinkedList;
Expand All @@ -54,6 +55,7 @@
import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect;

/** Paimon connector source class. */
@Slf4j
public class PaimonSource
implements SeaTunnelSource<SeaTunnelRow, PaimonSourceSplit, PaimonSourceState> {

Expand All @@ -64,7 +66,7 @@ public class PaimonSource
private JobContext jobContext;

private List<CatalogTable> catalogTables = Lists.newArrayList();
private Map<String, Table> paimonTables = Maps.newHashMap();
private Map<String, FileStoreTable> paimonTables = Maps.newHashMap();
private Map<String, SeaTunnelRowType> seaTunnelRowTypes = Maps.newHashMap();
private Map<String, ReadBuilder> readBuilders = Maps.newHashMap();

Expand All @@ -75,12 +77,18 @@ public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog)
tableConfig -> {
TablePath tablePath = tableConfig.getTablePath();
CatalogTable catalogTable = paimonCatalog.getTable(tablePath);
Table paimonTable = paimonCatalog.getPaimonTable(tablePath);
FileStoreTable paimonTable =
(FileStoreTable) paimonCatalog.getPaimonTable(tablePath);
String query = tableConfig.getQuery();
Map<String, String> dynamicOptions =
SqlToPaimonPredicateConverter.parseDynamicOptions(query);
if (!dynamicOptions.isEmpty()) {
paimonTable = paimonTable.copy(dynamicOptions);
}
RowType paimonRowType = paimonTable.rowType();
String[] filedNames =
paimonRowType.getFieldNames().toArray(new String[0]);

PlainSelect plainSelect = convertToPlainSelect(tableConfig.getQuery());
PlainSelect plainSelect = convertToPlainSelect(query);
Predicate predicate = null;
int[] projectionIndex = null;
if (!Objects.isNull(plainSelect)) {
Expand All @@ -102,13 +110,11 @@ public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog)
this.seaTunnelRowTypes.put(
tableKey,
RowTypeConverter.convert(paimonRowType, projectionIndex));

ReadBuilder readBuilder =
paimonTable
.newReadBuilder()
.withProjection(projectionIndex)
.withFilter(predicate);

this.paimonTables.put(tableKey, paimonTable);
this.readBuilders.put(tableKey, readBuilder);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;

Expand All @@ -51,14 +50,14 @@ public class PaimonSourceReader implements SourceReader<SeaTunnelRow, PaimonSour

private final Deque<PaimonSourceSplit> sourceSplits = new ConcurrentLinkedDeque<>();
private final SourceReader.Context context;
private final Map<String, Table> tables;
private final Map<String, FileStoreTable> tables;
private final Map<String, SeaTunnelRowType> seaTunnelRowTypes;
private final Map<String, TableRead> tableReads;
private volatile boolean noMoreSplit;

public PaimonSourceReader(
Context context,
Map<String, Table> tables,
Map<String, FileStoreTable> tables,
Map<String, SeaTunnelRowType> seaTunnelRowTypes,
Map<String, ReadBuilder> readBuilders) {
this.context = context;
Expand Down Expand Up @@ -86,7 +85,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
final PaimonSourceSplit split = sourceSplits.poll();
if (Objects.nonNull(split)) {
String tableId = split.getTableId();
Table table = tables.get(tableId);
FileStoreTable table = tables.get(tableId);
SeaTunnelRowType seaTunnelRowType = seaTunnelRowTypes.get(tableId);
TableRead tableRead = tableReads.get(tableId);
try (final RecordReader<InternalRow> reader =
Expand All @@ -96,8 +95,7 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
while (rowIterator.hasNext()) {
final InternalRow row = rowIterator.next();
final SeaTunnelRow seaTunnelRow =
RowConverter.convert(
row, seaTunnelRowType, ((FileStoreTable) table).schema());
RowConverter.convert(row, seaTunnelRowType, table.schema());
if (Boundedness.UNBOUNDED.equals(context.getBoundedness())) {
RowKind rowKind =
RowKindConverter.convertPaimonRowKind2SeatunnelRowkind(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -149,6 +151,28 @@ public static Predicate convertSqlWhereToPaimonPredicate(
return parseExpressionToPredicate(builder, rowType, whereExpression);
}

public static Map<String, String> parseDynamicOptions(String sql) {
Map<String, String> dynamicOptions = new HashMap<>();
if (StringUtils.isBlank(sql)) {
return dynamicOptions;
}
String dynamicOptionsPattern = "/\\*\\+ OPTIONS\\((.*?)\\) \\*/";
Pattern optionsPattern = Pattern.compile(dynamicOptionsPattern, Pattern.CASE_INSENSITIVE);
Matcher optionsMatcher = optionsPattern.matcher(sql);
if (optionsMatcher.find()) {
String optionsContent = optionsMatcher.group(1).trim();

Pattern kvPattern = Pattern.compile("'\\s*(.*?)\\s*'\\s*=\\s*'\\s*(.*?)\\s*'");
Matcher kvMatcher = kvPattern.matcher(optionsContent);
while (kvMatcher.find()) {
String key = kvMatcher.group(1).trim();
String value = kvMatcher.group(2).trim();
dynamicOptions.put(key, value);
}
}
return dynamicOptions;
}

private static Predicate parseExpressionToPredicate(
PredicateBuilder builder, RowType rowType, Expression expression) {
if (expression instanceof IsNullExpression) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.paimon.source;

import org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter;

import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class PaimonDynamicOptionsTest {

@Test
public void testParseDynamicOptionsWithIncrementalTimestamp() {
String query =
"SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00') */ WHERE int_col > 3";
Map<String, String> dynamicOptions =
SqlToPaimonPredicateConverter.parseDynamicOptions(query);
assertEquals(1, dynamicOptions.size());
assertTrue(dynamicOptions.containsKey("incremental-between-timestamp"));
assertEquals(
"2025-03-12 00:00:00,2025-03-12 00:08:00",
dynamicOptions.get("incremental-between-timestamp"));
}

@Test
public void testParseDynamicOptionsWithScanTag() {
String query =
"SELECT * FROM table /*+ OPTIONS('scan.tag-name' = 'my-tag') */ WHERE int_col > 3";
Map<String, String> dynamicOptions =
SqlToPaimonPredicateConverter.parseDynamicOptions(query);
assertEquals(1, dynamicOptions.size());
assertTrue(dynamicOptions.containsKey("scan.tag-name"));
assertEquals("my-tag", dynamicOptions.get("scan.tag-name"));
}

@Test
public void testParseDynamicOptionsWithMultipleOptions() {
String query =
"SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00', 'scan.tag-name' = 'my-tag', 'scan.snapshot-id' = '123') */ WHERE int_col > 3";
Map<String, String> dynamicOptions =
SqlToPaimonPredicateConverter.parseDynamicOptions(query);
assertEquals(3, dynamicOptions.size());
assertTrue(dynamicOptions.containsKey("incremental-between-timestamp"));
assertTrue(dynamicOptions.containsKey("scan.tag-name"));
assertTrue(dynamicOptions.containsKey("scan.snapshot-id"));
assertEquals(
"2025-03-12 00:00:00,2025-03-12 00:08:00",
dynamicOptions.get("incremental-between-timestamp"));
assertEquals("my-tag", dynamicOptions.get("scan.tag-name"));
assertEquals("123", dynamicOptions.get("scan.snapshot-id"));
}

@Test
public void testParseDynamicOptionsWithNoOptions() {
String query = "SELECT * FROM table WHERE int_col > 3";
Map<String, String> dynamicOptions =
SqlToPaimonPredicateConverter.parseDynamicOptions(query);
assertTrue(dynamicOptions.isEmpty());
}

@Test
public void testParseDynamicOptionsWithEmptyOptions() {
String query = "SELECT * FROM table /*+ OPTIONS() */ WHERE int_col > 3";
Map<String, String> dynamicOptions =
SqlToPaimonPredicateConverter.parseDynamicOptions(query);
assertTrue(dynamicOptions.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertToPlainSelect;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class SqlToPaimonConverterTest {

Expand Down Expand Up @@ -292,4 +294,28 @@ public void testConvertSqlWhereToPaimonLikePredicate() {

assertEquals(expectedPredicate.toString(), predicate.toString());
}

@Test
public void testParseDynamicOptions() {
String query =
"SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00') */ WHERE int_col > 3 OR double_col < 6.6 ";
Map<String, String> dynamicOptions =
SqlToPaimonPredicateConverter.parseDynamicOptions(query);
assertEquals(1, dynamicOptions.size());
assertTrue(dynamicOptions.containsKey("incremental-between-timestamp"));
assertEquals(
"2025-03-12 00:00:00,2025-03-12 00:08:00",
dynamicOptions.get("incremental-between-timestamp"));

query =
"SELECT * FROM table /*+ OPTIONS('incremental-between-timestamp' = '2025-03-12 00:00:00,2025-03-12 00:08:00', 'scan.tag-name' = 'my-tag') */ WHERE int_col > 3 OR double_col < 6.6 ";
dynamicOptions = SqlToPaimonPredicateConverter.parseDynamicOptions(query);
assertEquals(2, dynamicOptions.size());
assertTrue(dynamicOptions.containsKey("incremental-between-timestamp"));
assertTrue(dynamicOptions.containsKey("scan.tag-name"));
assertEquals(
"2025-03-12 00:00:00,2025-03-12 00:08:00",
dynamicOptions.get("incremental-between-timestamp"));
assertEquals("my-tag", dynamicOptions.get("scan.tag-name"));
}
}
Loading