diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 8bf940e37c055..ff8623d2fcfaa 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -40,7 +40,7 @@ under the License.
0.8.7
1.5.0
0.7.0-incubating
- 3.1.0
+ 3.2.0
3.3-rc1
python
diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/DeltaLakeTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/DeltaLakeTable.java
index 40ae1fd0db47e..21c3112492e1d 100644
--- a/fe/fe-core/src/main/java/com/starrocks/catalog/DeltaLakeTable.java
+++ b/fe/fe-core/src/main/java/com/starrocks/catalog/DeltaLakeTable.java
@@ -30,7 +30,7 @@
import com.starrocks.thrift.TTableDescriptor;
import com.starrocks.thrift.TTableType;
import io.delta.kernel.Snapshot;
-import io.delta.kernel.client.TableClient;
+import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.Metadata;
@@ -45,7 +45,7 @@ public class DeltaLakeTable extends Table {
private List partColumnNames;
private SnapshotImpl deltaSnapshot;
private String tableLocation;
- private TableClient tableClient;
+ private Engine deltaEngine;
public static final String PARTITION_NULL_VALUE = "null";
@@ -57,7 +57,7 @@ public DeltaLakeTable() {
public DeltaLakeTable(long id, String catalogName, String dbName, String tableName, List schema,
List partitionNames, SnapshotImpl deltaSnapshot, String tableLocation,
- TableClient tableClient, long createTime) {
+ Engine deltaEngine, long createTime) {
super(id, tableName, TableType.DELTALAKE, schema);
this.catalogName = catalogName;
this.dbName = dbName;
@@ -65,7 +65,7 @@ public DeltaLakeTable(long id, String catalogName, String dbName, String tableNa
this.partColumnNames = partitionNames;
this.deltaSnapshot = deltaSnapshot;
this.tableLocation = tableLocation;
- this.tableClient = tableClient;
+ this.deltaEngine = deltaEngine;
this.createTime = createTime;
}
@@ -86,8 +86,8 @@ public Snapshot getDeltaSnapshot() {
return deltaSnapshot;
}
- public TableClient getTableClient() {
- return tableClient;
+ public Engine getDeltaEngine() {
+ return deltaEngine;
}
@Override
diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java
index d41b27377376b..b14bf6d030bf3 100644
--- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java
+++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java
@@ -27,9 +27,8 @@
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.common.ErrorType;
import io.delta.kernel.Table;
-import io.delta.kernel.TableNotFoundException;
-import io.delta.kernel.client.TableClient;
-import io.delta.kernel.defaults.client.DefaultTableClient;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
@@ -71,13 +70,13 @@ public static void checkTableFeatureSupported(Protocol protocol, Metadata metada
public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName, String tblName, String path,
Configuration configuration, long createTime) {
- TableClient deltaTableClient = DefaultTableClient.create(configuration);
+ DefaultEngine deltaEngine = DefaultEngine.create(configuration);
Table deltaTable = null;
SnapshotImpl snapshot = null;
try {
- deltaTable = Table.forPath(deltaTableClient, path);
- snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaTableClient);
+ deltaTable = Table.forPath(deltaEngine, path);
+ snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaEngine);
} catch (TableNotFoundException e) {
LOG.error("Failed to find Delta table for {}.{}.{}, {}", catalog, dbName, tblName, e.getMessage());
throw new SemanticException("Failed to find Delta table for " + catalog + "." + dbName + "." + tblName);
@@ -86,7 +85,7 @@ public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName
throw new SemanticException("Failed to get latest snapshot for " + catalog + "." + dbName + "." + tblName);
}
- StructType deltaSchema = snapshot.getSchema(deltaTableClient);
+ StructType deltaSchema = snapshot.getSchema(deltaEngine);
if (deltaSchema == null) {
throw new IllegalArgumentException(String.format("Unable to find Schema information in Delta log for " +
"%s.%s.%s", catalog, dbName, tblName));
@@ -108,7 +107,7 @@ public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName
return new DeltaLakeTable(CONNECTOR_ID_GENERATOR.getNextId().asInt(), catalog, dbName, tblName,
fullSchema, Lists.newArrayList(snapshot.getMetadata().getPartitionColNames()), snapshot, path,
- deltaTableClient, createTime);
+ deltaEngine, createTime);
}
public static RemoteFileInputFormat getRemoteFileFormat(String format) {
diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/ExpressionConverter.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/ExpressionConverter.java
index 1405ac7ea44a0..381c4340c0c43 100644
--- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/ExpressionConverter.java
+++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/ExpressionConverter.java
@@ -106,7 +106,7 @@ public Predicate visitIsNullPredicate(IsNullPredicate node, Void context) {
if (node.isNotNull()) {
return new Predicate("IS_NOT_NULL", column);
} else {
- return new Predicate("NOT", new Predicate("IS_NOT_NULL", column));
+ return new Predicate("IS_NULL", column);
}
}
diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/DeltaLakeScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/DeltaLakeScanNode.java
index 67a548feb8999..488f7ba58900c 100644
--- a/fe/fe-core/src/main/java/com/starrocks/planner/DeltaLakeScanNode.java
+++ b/fe/fe-core/src/main/java/com/starrocks/planner/DeltaLakeScanNode.java
@@ -47,9 +47,9 @@
import com.starrocks.thrift.TScanRangeLocations;
import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
-import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
+import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.And;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
@@ -152,13 +152,13 @@ public void setupScanRangeLocations(DescriptorTable descTbl) throws AnalysisExce
// PartitionKey -> partition id
Map partitionKeys = Maps.newHashMap();
- TableClient tableClient = deltaLakeTable.getTableClient();
- ScanBuilder scanBuilder = deltaLakeTable.getDeltaSnapshot().getScanBuilder(tableClient);
+ Engine deltaEngine = deltaLakeTable.getDeltaEngine();
+ ScanBuilder scanBuilder = deltaLakeTable.getDeltaSnapshot().getScanBuilder(deltaEngine);
Scan scan = deltaLakePredicates.isPresent() ?
- scanBuilder.withFilter(tableClient, deltaLakePredicates.get()).build() :
+ scanBuilder.withFilter(deltaEngine, deltaLakePredicates.get()).build() :
scanBuilder.build();
- try (CloseableIterator scanFilesAsBatches = scan.getScanFiles(tableClient)) {
+ try (CloseableIterator scanFilesAsBatches = scan.getScanFiles(deltaEngine)) {
while (scanFilesAsBatches.hasNext()) {
FilteredColumnarBatch scanFileBatch = scanFilesAsBatches.next();
diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java
index 7b42576cc1f68..dc71bbfaf6aa0 100644
--- a/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java
+++ b/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java
@@ -18,9 +18,13 @@
import com.google.common.collect.Lists;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.optimizer.validate.ValidateException;
+import io.delta.kernel.Operation;
+import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
-import io.delta.kernel.TableNotFoundException;
-import io.delta.kernel.client.TableClient;
+import io.delta.kernel.TransactionBuilder;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
+import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.TableImpl;
import io.delta.kernel.internal.actions.Metadata;
@@ -34,6 +38,8 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.io.IOException;
+
import static io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_MODE_KEY;
import static io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_MODE_NAME;
@@ -94,7 +100,7 @@ public void testConvertDeltaToSRTableWithException1() {
new MockUp() {
@mockit.Mock
- public Table forPath(TableClient tableClient, String path) throws TableNotFoundException {
+ public Table forPath(Engine deltaEngine, String path) throws TableNotFoundException {
throw new TableNotFoundException("Table not found");
}
};
@@ -108,18 +114,45 @@ public void testConvertDeltaToSRTableWithException2() {
expectedEx.expect(SemanticException.class);
expectedEx.expectMessage("Failed to get latest snapshot for catalog.db.tbl");
Table table = new Table() {
- public Table forPath(TableClient tableClient, String path) {
+ public Table forPath(Engine engine, String path) {
return this;
}
+
+ @Override
+ public String getPath(Engine engine) {
+ return null;
+ }
+
@Override
- public SnapshotImpl getLatestSnapshot(TableClient tableClient) {
+ public SnapshotImpl getLatestSnapshot(Engine engine) {
throw new RuntimeException("Failed to get latest snapshot");
}
+
+ @Override
+ public Snapshot getSnapshotAsOfVersion(Engine engine, long versionId) throws TableNotFoundException {
+ return null;
+ }
+
+ @Override
+ public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC)
+ throws TableNotFoundException {
+ return null;
+ }
+
+ @Override
+ public TransactionBuilder createTransactionBuilder(Engine engine, String engineInfo, Operation operation) {
+ return null;
+ }
+
+ @Override
+ public void checkpoint(Engine engine, long version)
+ throws TableNotFoundException, CheckpointAlreadyExistsException, IOException {
+ }
};
new MockUp() {
@Mock
- public Table forPath(TableClient tableClient, String path) {
+ public Table forPath(Engine engine, String path) {
return table;
}
};
diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/delta/ExpressionConvertTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/delta/ExpressionConvertTest.java
new file mode 100644
index 0000000000000..d8ed82f08df9b
--- /dev/null
+++ b/fe/fe-core/src/test/java/com/starrocks/connector/delta/ExpressionConvertTest.java
@@ -0,0 +1,109 @@
+// Copyright 2021-present StarRocks, Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.starrocks.connector.delta;
+
+import com.google.common.collect.Lists;
+import com.starrocks.analysis.BinaryPredicate;
+import com.starrocks.analysis.BinaryType;
+import com.starrocks.analysis.IsNullPredicate;
+import com.starrocks.analysis.LiteralExpr;
+import com.starrocks.analysis.SlotDescriptor;
+import com.starrocks.analysis.SlotId;
+import com.starrocks.analysis.SlotRef;
+import com.starrocks.catalog.Column;
+import com.starrocks.catalog.ScalarType;
+import com.starrocks.common.AnalysisException;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.types.BasePrimitiveType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ExpressionConvertTest {
+ private static StructType tableSchema;
+
+ @BeforeClass
+ public static void setUp() {
+ tableSchema = new StructType(
+ Lists.newArrayList(
+ new StructField("name", BasePrimitiveType.createPrimitive("string"), true),
+ new StructField("age", BasePrimitiveType.createPrimitive("integer"), true),
+ new StructField("salary", BasePrimitiveType.createPrimitive("double"), true),
+ new StructField("birthday", BasePrimitiveType.createPrimitive("date"), true)));
+ }
+
+ @Test
+ public void testIsNullPredicate() {
+ SlotDescriptor slotDescriptor = new SlotDescriptor(new SlotId(1), "name", ScalarType.STRING, true);
+ slotDescriptor.setColumn(new Column("name", ScalarType.STRING));
+ SlotRef name = new SlotRef(slotDescriptor);
+ // Test is not null predicate
+ IsNullPredicate isNullPredicate = new IsNullPredicate(name, true);
+ Expression expression = new ExpressionConverter(tableSchema).convert(isNullPredicate);
+ Assert.assertNotNull(expression);
+ Assert.assertEquals("IS_NOT_NULL(column(`name`))", expression.toString());
+
+ // Test is null predicate
+ isNullPredicate = new IsNullPredicate(name, false);
+ expression = new ExpressionConverter(tableSchema).convert(isNullPredicate);
+ Assert.assertNotNull(expression);
+ Assert.assertEquals("IS_NULL(column(`name`))", expression.toString());
+ }
+
+ @Test
+ public void testBinaryPredicate() throws AnalysisException {
+ SlotDescriptor slotDescriptor = new SlotDescriptor(new SlotId(1), "age", ScalarType.INT, true);
+ slotDescriptor.setColumn(new Column("age", ScalarType.INT));
+ SlotRef age = new SlotRef(slotDescriptor);
+
+ // Test equal predicate
+ BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryType.EQ, age, LiteralExpr.create("1", ScalarType.INT));
+ Expression expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
+ Assert.assertNotNull(expression);
+ Assert.assertEquals("(column(`age`) = 1)", expression.toString());
+
+ // Test not equal predicate
+ binaryPredicate = new BinaryPredicate(BinaryType.NE, age, LiteralExpr.create("1", ScalarType.INT));
+ expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
+ Assert.assertNotNull(expression);
+ Assert.assertEquals("NOT((column(`age`) = 1))", expression.toString());
+
+ // Test greater than predicate
+ binaryPredicate = new BinaryPredicate(BinaryType.GT, age, LiteralExpr.create("1", ScalarType.INT));
+ expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
+ Assert.assertNotNull(expression);
+ Assert.assertEquals("(column(`age`) > 1)", expression.toString());
+
+ // Test greater than or equal predicate
+ binaryPredicate = new BinaryPredicate(BinaryType.GE, age, LiteralExpr.create("1", ScalarType.INT));
+ expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
+ Assert.assertNotNull(expression);
+ Assert.assertEquals("(column(`age`) >= 1)", expression.toString());
+
+ // Test less than predicate
+ binaryPredicate = new BinaryPredicate(BinaryType.LT, age, LiteralExpr.create("1", ScalarType.INT));
+ expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
+ Assert.assertNotNull(expression);
+ Assert.assertEquals("(column(`age`) < 1)", expression.toString());
+
+ // Test less than or equal predicate
+ binaryPredicate = new BinaryPredicate(BinaryType.LE, age, LiteralExpr.create("1", ScalarType.INT));
+ expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
+ Assert.assertNotNull(expression);
+ Assert.assertEquals("(column(`age`) <= 1)", expression.toString());
+ }
+}