From 299d5a489e8b8da1b01cda302d53f8f4ed650b2a Mon Sep 17 00:00:00 2001 From: Youngwb Date: Mon, 20 May 2024 19:39:59 +0800 Subject: [PATCH] [Enhancement] Upgrade delta kernel to 3.2.0 (#45926) Signed-off-by: Youngwb --- fe/fe-core/pom.xml | 2 +- .../com/starrocks/catalog/DeltaLakeTable.java | 12 +- .../starrocks/connector/delta/DeltaUtils.java | 15 ++- .../connector/delta/ExpressionConverter.java | 2 +- .../starrocks/planner/DeltaLakeScanNode.java | 10 +- .../connector/delta/DeltaUtilsTest.java | 45 +++++++- .../delta/ExpressionConvertTest.java | 109 ++++++++++++++++++ 7 files changed, 168 insertions(+), 27 deletions(-) create mode 100644 fe/fe-core/src/test/java/com/starrocks/connector/delta/ExpressionConvertTest.java diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 8bf940e37c055..ff8623d2fcfaa 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -40,7 +40,7 @@ under the License. 0.8.7 1.5.0 0.7.0-incubating - 3.1.0 + 3.2.0 3.3-rc1 python diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/DeltaLakeTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/DeltaLakeTable.java index 40ae1fd0db47e..21c3112492e1d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/DeltaLakeTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/DeltaLakeTable.java @@ -30,7 +30,7 @@ import com.starrocks.thrift.TTableDescriptor; import com.starrocks.thrift.TTableType; import io.delta.kernel.Snapshot; -import io.delta.kernel.client.TableClient; +import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.actions.Metadata; @@ -45,7 +45,7 @@ public class DeltaLakeTable extends Table { private List partColumnNames; private SnapshotImpl deltaSnapshot; private String tableLocation; - private TableClient tableClient; + private Engine deltaEngine; public static final String PARTITION_NULL_VALUE = "null"; @@ -57,7 +57,7 @@ public DeltaLakeTable() { public DeltaLakeTable(long id, String catalogName, String dbName, String tableName, List schema, List partitionNames, SnapshotImpl deltaSnapshot, String tableLocation, - TableClient tableClient, long createTime) { + Engine deltaEngine, long createTime) { super(id, tableName, TableType.DELTALAKE, schema); this.catalogName = catalogName; this.dbName = dbName; @@ -65,7 +65,7 @@ public DeltaLakeTable(long id, String catalogName, String dbName, String tableNa this.partColumnNames = partitionNames; this.deltaSnapshot = deltaSnapshot; this.tableLocation = tableLocation; - this.tableClient = tableClient; + this.deltaEngine = deltaEngine; this.createTime = createTime; } @@ -86,8 +86,8 @@ public Snapshot getDeltaSnapshot() { return deltaSnapshot; } - public TableClient getTableClient() { - return tableClient; + public Engine getDeltaEngine() { + return deltaEngine; } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java index d41b27377376b..b14bf6d030bf3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaUtils.java @@ -27,9 +27,8 @@ import com.starrocks.sql.analyzer.SemanticException; import com.starrocks.sql.common.ErrorType; import io.delta.kernel.Table; -import io.delta.kernel.TableNotFoundException; -import io.delta.kernel.client.TableClient; -import io.delta.kernel.defaults.client.DefaultTableClient; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; @@ -71,13 +70,13 @@ public static void checkTableFeatureSupported(Protocol protocol, Metadata metada public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName, String tblName, String path, Configuration configuration, long createTime) { - TableClient deltaTableClient = DefaultTableClient.create(configuration); + DefaultEngine deltaEngine = DefaultEngine.create(configuration); Table deltaTable = null; SnapshotImpl snapshot = null; try { - deltaTable = Table.forPath(deltaTableClient, path); - snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaTableClient); + deltaTable = Table.forPath(deltaEngine, path); + snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaEngine); } catch (TableNotFoundException e) { LOG.error("Failed to find Delta table for {}.{}.{}, {}", catalog, dbName, tblName, e.getMessage()); throw new SemanticException("Failed to find Delta table for " + catalog + "." + dbName + "." + tblName); @@ -86,7 +85,7 @@ public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName throw new SemanticException("Failed to get latest snapshot for " + catalog + "." + dbName + "." + tblName); } - StructType deltaSchema = snapshot.getSchema(deltaTableClient); + StructType deltaSchema = snapshot.getSchema(deltaEngine); if (deltaSchema == null) { throw new IllegalArgumentException(String.format("Unable to find Schema information in Delta log for " + "%s.%s.%s", catalog, dbName, tblName)); @@ -108,7 +107,7 @@ public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName return new DeltaLakeTable(CONNECTOR_ID_GENERATOR.getNextId().asInt(), catalog, dbName, tblName, fullSchema, Lists.newArrayList(snapshot.getMetadata().getPartitionColNames()), snapshot, path, - deltaTableClient, createTime); + deltaEngine, createTime); } public static RemoteFileInputFormat getRemoteFileFormat(String format) { diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/ExpressionConverter.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/ExpressionConverter.java index 1405ac7ea44a0..381c4340c0c43 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/ExpressionConverter.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/ExpressionConverter.java @@ -106,7 +106,7 @@ public Predicate visitIsNullPredicate(IsNullPredicate node, Void context) { if (node.isNotNull()) { return new Predicate("IS_NOT_NULL", column); } else { - return new Predicate("NOT", new Predicate("IS_NOT_NULL", column)); + return new Predicate("IS_NULL", column); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/DeltaLakeScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/DeltaLakeScanNode.java index 67a548feb8999..488f7ba58900c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/DeltaLakeScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/DeltaLakeScanNode.java @@ -47,9 +47,9 @@ import com.starrocks.thrift.TScanRangeLocations; import io.delta.kernel.Scan; import io.delta.kernel.ScanBuilder; -import io.delta.kernel.client.TableClient; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; import io.delta.kernel.expressions.And; import io.delta.kernel.expressions.Predicate; import io.delta.kernel.internal.InternalScanFileUtils; @@ -152,13 +152,13 @@ public void setupScanRangeLocations(DescriptorTable descTbl) throws AnalysisExce // PartitionKey -> partition id Map partitionKeys = Maps.newHashMap(); - TableClient tableClient = deltaLakeTable.getTableClient(); - ScanBuilder scanBuilder = deltaLakeTable.getDeltaSnapshot().getScanBuilder(tableClient); + Engine deltaEngine = deltaLakeTable.getDeltaEngine(); + ScanBuilder scanBuilder = deltaLakeTable.getDeltaSnapshot().getScanBuilder(deltaEngine); Scan scan = deltaLakePredicates.isPresent() ? - scanBuilder.withFilter(tableClient, deltaLakePredicates.get()).build() : + scanBuilder.withFilter(deltaEngine, deltaLakePredicates.get()).build() : scanBuilder.build(); - try (CloseableIterator scanFilesAsBatches = scan.getScanFiles(tableClient)) { + try (CloseableIterator scanFilesAsBatches = scan.getScanFiles(deltaEngine)) { while (scanFilesAsBatches.hasNext()) { FilteredColumnarBatch scanFileBatch = scanFilesAsBatches.next(); diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java index 7b42576cc1f68..dc71bbfaf6aa0 100644 --- a/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/connector/delta/DeltaUtilsTest.java @@ -18,9 +18,13 @@ import com.google.common.collect.Lists; import com.starrocks.sql.analyzer.SemanticException; import com.starrocks.sql.optimizer.validate.ValidateException; +import io.delta.kernel.Operation; +import io.delta.kernel.Snapshot; import io.delta.kernel.Table; -import io.delta.kernel.TableNotFoundException; -import io.delta.kernel.client.TableClient; +import io.delta.kernel.TransactionBuilder; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.CheckpointAlreadyExistsException; +import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.TableImpl; import io.delta.kernel.internal.actions.Metadata; @@ -34,6 +38,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.io.IOException; + import static io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_MODE_KEY; import static io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_MODE_NAME; @@ -94,7 +100,7 @@ public void testConvertDeltaToSRTableWithException1() { new MockUp() { @mockit.Mock - public Table forPath(TableClient tableClient, String path) throws TableNotFoundException { + public Table forPath(Engine deltaEngine, String path) throws TableNotFoundException { throw new TableNotFoundException("Table not found"); } }; @@ -108,18 +114,45 @@ public void testConvertDeltaToSRTableWithException2() { expectedEx.expect(SemanticException.class); expectedEx.expectMessage("Failed to get latest snapshot for catalog.db.tbl"); Table table = new Table() { - public Table forPath(TableClient tableClient, String path) { + public Table forPath(Engine engine, String path) { return this; } + + @Override + public String getPath(Engine engine) { + return null; + } + @Override - public SnapshotImpl getLatestSnapshot(TableClient tableClient) { + public SnapshotImpl getLatestSnapshot(Engine engine) { throw new RuntimeException("Failed to get latest snapshot"); } + + @Override + public Snapshot getSnapshotAsOfVersion(Engine engine, long versionId) throws TableNotFoundException { + return null; + } + + @Override + public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC) + throws TableNotFoundException { + return null; + } + + @Override + public TransactionBuilder createTransactionBuilder(Engine engine, String engineInfo, Operation operation) { + return null; + } + + @Override + public void checkpoint(Engine engine, long version) + throws TableNotFoundException, CheckpointAlreadyExistsException, IOException { + } }; new MockUp() { @Mock - public Table forPath(TableClient tableClient, String path) { + public Table forPath(Engine engine, String path) { return table; } }; diff --git a/fe/fe-core/src/test/java/com/starrocks/connector/delta/ExpressionConvertTest.java b/fe/fe-core/src/test/java/com/starrocks/connector/delta/ExpressionConvertTest.java new file mode 100644 index 0000000000000..d8ed82f08df9b --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/connector/delta/ExpressionConvertTest.java @@ -0,0 +1,109 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.delta; + +import com.google.common.collect.Lists; +import com.starrocks.analysis.BinaryPredicate; +import com.starrocks.analysis.BinaryType; +import com.starrocks.analysis.IsNullPredicate; +import com.starrocks.analysis.LiteralExpr; +import com.starrocks.analysis.SlotDescriptor; +import com.starrocks.analysis.SlotId; +import com.starrocks.analysis.SlotRef; +import com.starrocks.catalog.Column; +import com.starrocks.catalog.ScalarType; +import com.starrocks.common.AnalysisException; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.types.BasePrimitiveType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ExpressionConvertTest { + private static StructType tableSchema; + + @BeforeClass + public static void setUp() { + tableSchema = new StructType( + Lists.newArrayList( + new StructField("name", BasePrimitiveType.createPrimitive("string"), true), + new StructField("age", BasePrimitiveType.createPrimitive("integer"), true), + new StructField("salary", BasePrimitiveType.createPrimitive("double"), true), + new StructField("birthday", BasePrimitiveType.createPrimitive("date"), true))); + } + + @Test + public void testIsNullPredicate() { + SlotDescriptor slotDescriptor = new SlotDescriptor(new SlotId(1), "name", ScalarType.STRING, true); + slotDescriptor.setColumn(new Column("name", ScalarType.STRING)); + SlotRef name = new SlotRef(slotDescriptor); + // Test is not null predicate + IsNullPredicate isNullPredicate = new IsNullPredicate(name, true); + Expression expression = new ExpressionConverter(tableSchema).convert(isNullPredicate); + Assert.assertNotNull(expression); + Assert.assertEquals("IS_NOT_NULL(column(`name`))", expression.toString()); + + // Test is null predicate + isNullPredicate = new IsNullPredicate(name, false); + expression = new ExpressionConverter(tableSchema).convert(isNullPredicate); + Assert.assertNotNull(expression); + Assert.assertEquals("IS_NULL(column(`name`))", expression.toString()); + } + + @Test + public void testBinaryPredicate() throws AnalysisException { + SlotDescriptor slotDescriptor = new SlotDescriptor(new SlotId(1), "age", ScalarType.INT, true); + slotDescriptor.setColumn(new Column("age", ScalarType.INT)); + SlotRef age = new SlotRef(slotDescriptor); + + // Test equal predicate + BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryType.EQ, age, LiteralExpr.create("1", ScalarType.INT)); + Expression expression = new ExpressionConverter(tableSchema).convert(binaryPredicate); + Assert.assertNotNull(expression); + Assert.assertEquals("(column(`age`) = 1)", expression.toString()); + + // Test not equal predicate + binaryPredicate = new BinaryPredicate(BinaryType.NE, age, LiteralExpr.create("1", ScalarType.INT)); + expression = new ExpressionConverter(tableSchema).convert(binaryPredicate); + Assert.assertNotNull(expression); + Assert.assertEquals("NOT((column(`age`) = 1))", expression.toString()); + + // Test greater than predicate + binaryPredicate = new BinaryPredicate(BinaryType.GT, age, LiteralExpr.create("1", ScalarType.INT)); + expression = new ExpressionConverter(tableSchema).convert(binaryPredicate); + Assert.assertNotNull(expression); + Assert.assertEquals("(column(`age`) > 1)", expression.toString()); + + // Test greater than or equal predicate + binaryPredicate = new BinaryPredicate(BinaryType.GE, age, LiteralExpr.create("1", ScalarType.INT)); + expression = new ExpressionConverter(tableSchema).convert(binaryPredicate); + Assert.assertNotNull(expression); + Assert.assertEquals("(column(`age`) >= 1)", expression.toString()); + + // Test less than predicate + binaryPredicate = new BinaryPredicate(BinaryType.LT, age, LiteralExpr.create("1", ScalarType.INT)); + expression = new ExpressionConverter(tableSchema).convert(binaryPredicate); + Assert.assertNotNull(expression); + Assert.assertEquals("(column(`age`) < 1)", expression.toString()); + + // Test less than or equal predicate + binaryPredicate = new BinaryPredicate(BinaryType.LE, age, LiteralExpr.create("1", ScalarType.INT)); + expression = new ExpressionConverter(tableSchema).convert(binaryPredicate); + Assert.assertNotNull(expression); + Assert.assertEquals("(column(`age`) <= 1)", expression.toString()); + } +}