Skip to content

Commit

Permalink
[Enhancement] Upgrade delta kernel to 3.2.0 (StarRocks#45926)
Browse files Browse the repository at this point in the history
Signed-off-by: Youngwb <[email protected]>
  • Loading branch information
Youngwb authored May 20, 2024
1 parent 8ae5186 commit 299d5a4
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 27 deletions.
2 changes: 1 addition & 1 deletion fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ under the License.
<jacoco.version>0.8.7</jacoco.version>
<iceberg.version>1.5.0</iceberg.version>
<paimon.version>0.7.0-incubating</paimon.version>
<delta-kernel.version>3.1.0</delta-kernel.version>
<delta-kernel.version>3.2.0</delta-kernel.version>
<staros.version>3.3-rc1</staros.version>
<python>python</python>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.starrocks.thrift.TTableDescriptor;
import com.starrocks.thrift.TTableType;
import io.delta.kernel.Snapshot;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.Metadata;

Expand All @@ -45,7 +45,7 @@ public class DeltaLakeTable extends Table {
private List<String> partColumnNames;
private SnapshotImpl deltaSnapshot;
private String tableLocation;
private TableClient tableClient;
private Engine deltaEngine;


public static final String PARTITION_NULL_VALUE = "null";
Expand All @@ -57,15 +57,15 @@ public DeltaLakeTable() {

public DeltaLakeTable(long id, String catalogName, String dbName, String tableName, List<Column> schema,
List<String> partitionNames, SnapshotImpl deltaSnapshot, String tableLocation,
TableClient tableClient, long createTime) {
Engine deltaEngine, long createTime) {
super(id, tableName, TableType.DELTALAKE, schema);
this.catalogName = catalogName;
this.dbName = dbName;
this.tableName = tableName;
this.partColumnNames = partitionNames;
this.deltaSnapshot = deltaSnapshot;
this.tableLocation = tableLocation;
this.tableClient = tableClient;
this.deltaEngine = deltaEngine;
this.createTime = createTime;
}

Expand All @@ -86,8 +86,8 @@ public Snapshot getDeltaSnapshot() {
return deltaSnapshot;
}

public TableClient getTableClient() {
return tableClient;
public Engine getDeltaEngine() {
return deltaEngine;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.common.ErrorType;
import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
Expand Down Expand Up @@ -71,13 +70,13 @@ public static void checkTableFeatureSupported(Protocol protocol, Metadata metada

public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName, String tblName, String path,
Configuration configuration, long createTime) {
TableClient deltaTableClient = DefaultTableClient.create(configuration);
DefaultEngine deltaEngine = DefaultEngine.create(configuration);

Table deltaTable = null;
SnapshotImpl snapshot = null;
try {
deltaTable = Table.forPath(deltaTableClient, path);
snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaTableClient);
deltaTable = Table.forPath(deltaEngine, path);
snapshot = (SnapshotImpl) deltaTable.getLatestSnapshot(deltaEngine);
} catch (TableNotFoundException e) {
LOG.error("Failed to find Delta table for {}.{}.{}, {}", catalog, dbName, tblName, e.getMessage());
throw new SemanticException("Failed to find Delta table for " + catalog + "." + dbName + "." + tblName);
Expand All @@ -86,7 +85,7 @@ public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName
throw new SemanticException("Failed to get latest snapshot for " + catalog + "." + dbName + "." + tblName);
}

StructType deltaSchema = snapshot.getSchema(deltaTableClient);
StructType deltaSchema = snapshot.getSchema(deltaEngine);
if (deltaSchema == null) {
throw new IllegalArgumentException(String.format("Unable to find Schema information in Delta log for " +
"%s.%s.%s", catalog, dbName, tblName));
Expand All @@ -108,7 +107,7 @@ public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName

return new DeltaLakeTable(CONNECTOR_ID_GENERATOR.getNextId().asInt(), catalog, dbName, tblName,
fullSchema, Lists.newArrayList(snapshot.getMetadata().getPartitionColNames()), snapshot, path,
deltaTableClient, createTime);
deltaEngine, createTime);
}

public static RemoteFileInputFormat getRemoteFileFormat(String format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Predicate visitIsNullPredicate(IsNullPredicate node, Void context) {
if (node.isNotNull()) {
return new Predicate("IS_NOT_NULL", column);
} else {
return new Predicate("NOT", new Predicate("IS_NOT_NULL", column));
return new Predicate("IS_NULL", column);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import com.starrocks.thrift.TScanRangeLocations;
import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.And;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
Expand Down Expand Up @@ -152,13 +152,13 @@ public void setupScanRangeLocations(DescriptorTable descTbl) throws AnalysisExce
// PartitionKey -> partition id
Map<PartitionKey, Long> partitionKeys = Maps.newHashMap();

TableClient tableClient = deltaLakeTable.getTableClient();
ScanBuilder scanBuilder = deltaLakeTable.getDeltaSnapshot().getScanBuilder(tableClient);
Engine deltaEngine = deltaLakeTable.getDeltaEngine();
ScanBuilder scanBuilder = deltaLakeTable.getDeltaSnapshot().getScanBuilder(deltaEngine);
Scan scan = deltaLakePredicates.isPresent() ?
scanBuilder.withFilter(tableClient, deltaLakePredicates.get()).build() :
scanBuilder.withFilter(deltaEngine, deltaLakePredicates.get()).build() :
scanBuilder.build();

try (CloseableIterator<FilteredColumnarBatch> scanFilesAsBatches = scan.getScanFiles(tableClient)) {
try (CloseableIterator<FilteredColumnarBatch> scanFilesAsBatches = scan.getScanFiles(deltaEngine)) {
while (scanFilesAsBatches.hasNext()) {
FilteredColumnarBatch scanFileBatch = scanFilesAsBatches.next();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
import com.google.common.collect.Lists;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.optimizer.validate.ValidateException;
import io.delta.kernel.Operation;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.TransactionBuilder;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.TableImpl;
import io.delta.kernel.internal.actions.Metadata;
Expand All @@ -34,6 +38,8 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;

import static io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_MODE_KEY;
import static io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_MODE_NAME;

Expand Down Expand Up @@ -94,7 +100,7 @@ public void testConvertDeltaToSRTableWithException1() {

new MockUp<Table>() {
@mockit.Mock
public Table forPath(TableClient tableClient, String path) throws TableNotFoundException {
public Table forPath(Engine deltaEngine, String path) throws TableNotFoundException {
throw new TableNotFoundException("Table not found");
}
};
Expand All @@ -108,18 +114,45 @@ public void testConvertDeltaToSRTableWithException2() {
expectedEx.expect(SemanticException.class);
expectedEx.expectMessage("Failed to get latest snapshot for catalog.db.tbl");
Table table = new Table() {
public Table forPath(TableClient tableClient, String path) {
public Table forPath(Engine engine, String path) {
return this;
}

@Override
public String getPath(Engine engine) {
return null;
}

@Override
public SnapshotImpl getLatestSnapshot(TableClient tableClient) {
public SnapshotImpl getLatestSnapshot(Engine engine) {
throw new RuntimeException("Failed to get latest snapshot");
}

@Override
public Snapshot getSnapshotAsOfVersion(Engine engine, long versionId) throws TableNotFoundException {
return null;
}

@Override
public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC)
throws TableNotFoundException {
return null;
}

@Override
public TransactionBuilder createTransactionBuilder(Engine engine, String engineInfo, Operation operation) {
return null;
}

@Override
public void checkpoint(Engine engine, long version)
throws TableNotFoundException, CheckpointAlreadyExistsException, IOException {
}
};

new MockUp<TableImpl>() {
@Mock
public Table forPath(TableClient tableClient, String path) {
public Table forPath(Engine engine, String path) {
return table;
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.delta;

import com.google.common.collect.Lists;
import com.starrocks.analysis.BinaryPredicate;
import com.starrocks.analysis.BinaryType;
import com.starrocks.analysis.IsNullPredicate;
import com.starrocks.analysis.LiteralExpr;
import com.starrocks.analysis.SlotDescriptor;
import com.starrocks.analysis.SlotId;
import com.starrocks.analysis.SlotRef;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.ScalarType;
import com.starrocks.common.AnalysisException;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.types.BasePrimitiveType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class ExpressionConvertTest {
private static StructType tableSchema;

@BeforeClass
public static void setUp() {
tableSchema = new StructType(
Lists.newArrayList(
new StructField("name", BasePrimitiveType.createPrimitive("string"), true),
new StructField("age", BasePrimitiveType.createPrimitive("integer"), true),
new StructField("salary", BasePrimitiveType.createPrimitive("double"), true),
new StructField("birthday", BasePrimitiveType.createPrimitive("date"), true)));
}

@Test
public void testIsNullPredicate() {
SlotDescriptor slotDescriptor = new SlotDescriptor(new SlotId(1), "name", ScalarType.STRING, true);
slotDescriptor.setColumn(new Column("name", ScalarType.STRING));
SlotRef name = new SlotRef(slotDescriptor);
// Test is not null predicate
IsNullPredicate isNullPredicate = new IsNullPredicate(name, true);
Expression expression = new ExpressionConverter(tableSchema).convert(isNullPredicate);
Assert.assertNotNull(expression);
Assert.assertEquals("IS_NOT_NULL(column(`name`))", expression.toString());

// Test is null predicate
isNullPredicate = new IsNullPredicate(name, false);
expression = new ExpressionConverter(tableSchema).convert(isNullPredicate);
Assert.assertNotNull(expression);
Assert.assertEquals("IS_NULL(column(`name`))", expression.toString());
}

@Test
public void testBinaryPredicate() throws AnalysisException {
SlotDescriptor slotDescriptor = new SlotDescriptor(new SlotId(1), "age", ScalarType.INT, true);
slotDescriptor.setColumn(new Column("age", ScalarType.INT));
SlotRef age = new SlotRef(slotDescriptor);

// Test equal predicate
BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryType.EQ, age, LiteralExpr.create("1", ScalarType.INT));
Expression expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
Assert.assertNotNull(expression);
Assert.assertEquals("(column(`age`) = 1)", expression.toString());

// Test not equal predicate
binaryPredicate = new BinaryPredicate(BinaryType.NE, age, LiteralExpr.create("1", ScalarType.INT));
expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
Assert.assertNotNull(expression);
Assert.assertEquals("NOT((column(`age`) = 1))", expression.toString());

// Test greater than predicate
binaryPredicate = new BinaryPredicate(BinaryType.GT, age, LiteralExpr.create("1", ScalarType.INT));
expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
Assert.assertNotNull(expression);
Assert.assertEquals("(column(`age`) > 1)", expression.toString());

// Test greater than or equal predicate
binaryPredicate = new BinaryPredicate(BinaryType.GE, age, LiteralExpr.create("1", ScalarType.INT));
expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
Assert.assertNotNull(expression);
Assert.assertEquals("(column(`age`) >= 1)", expression.toString());

// Test less than predicate
binaryPredicate = new BinaryPredicate(BinaryType.LT, age, LiteralExpr.create("1", ScalarType.INT));
expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
Assert.assertNotNull(expression);
Assert.assertEquals("(column(`age`) < 1)", expression.toString());

// Test less than or equal predicate
binaryPredicate = new BinaryPredicate(BinaryType.LE, age, LiteralExpr.create("1", ScalarType.INT));
expression = new ExpressionConverter(tableSchema).convert(binaryPredicate);
Assert.assertNotNull(expression);
Assert.assertEquals("(column(`age`) <= 1)", expression.toString());
}
}

0 comments on commit 299d5a4

Please sign in to comment.