Skip to content

Commit 081b737

Browse files
committed
Add procedures support for Lakehouse
LakehouseModule module declares the procedures from Hive, DeltaLake and Iceberg. If the same procedure appears in two modules, a wrapper procedure is declared in the subpackage 'procedures'. The new procedure will have the extra first parameter named 'tableType" (possible values 'HIVE', 'DELTA" and 'ICEBERG'). Based on the 'tableType', the wrapper procedure delegates to the actual procedure from the concrete module.
1 parent df01efb commit 081b737

11 files changed

+552
-1
lines changed

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.trino.spi.connector.ConnectorSession;
2727
import io.trino.spi.connector.ConnectorSplitManager;
2828
import io.trino.spi.connector.ConnectorTransactionHandle;
29+
import io.trino.spi.procedure.Procedure;
2930
import io.trino.spi.session.PropertyMetadata;
3031
import io.trino.spi.transaction.IsolationLevel;
3132

@@ -51,6 +52,7 @@ public class LakehouseConnector
5152
private final LakehouseSessionProperties sessionProperties;
5253
private final LakehouseTableProperties tableProperties;
5354
private final IcebergMaterializedViewProperties materializedViewProperties;
55+
private final Set<Procedure> procedures;
5456

5557
@Inject
5658
public LakehouseConnector(
@@ -62,7 +64,8 @@ public LakehouseConnector(
6264
LakehouseNodePartitioningProvider nodePartitioningProvider,
6365
LakehouseSessionProperties sessionProperties,
6466
LakehouseTableProperties tableProperties,
65-
IcebergMaterializedViewProperties materializedViewProperties)
67+
IcebergMaterializedViewProperties materializedViewProperties,
68+
Set<Procedure> procedures)
6669
{
6770
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
6871
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
@@ -73,6 +76,7 @@ public LakehouseConnector(
7376
this.sessionProperties = requireNonNull(sessionProperties, "sessionProperties is null");
7477
this.tableProperties = requireNonNull(tableProperties, "tableProperties is null");
7578
this.materializedViewProperties = requireNonNull(materializedViewProperties, "materializedViewProperties is null");
79+
this.procedures = requireNonNull(procedures, "procedures is null");
7680
}
7781

7882
@Override
@@ -148,6 +152,12 @@ public List<PropertyMetadata<?>> getMaterializedViewProperties()
148152
return materializedViewProperties.getMaterializedViewProperties();
149153
}
150154

155+
@Override
156+
public Set<Procedure> getProcedures()
157+
{
158+
return procedures;
159+
}
160+
151161
@Override
152162
public void shutdown()
153163
{

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import io.trino.plugin.hive.line.SimpleTextFileWriterFactory;
5959
import io.trino.plugin.hive.metastore.HiveMetastoreConfig;
6060
import io.trino.plugin.hive.metastore.HiveMetastoreModule;
61+
import io.trino.plugin.hive.metastore.glue.GlueCache;
6162
import io.trino.plugin.hive.orc.OrcFileWriterFactory;
6263
import io.trino.plugin.hive.orc.OrcPageSourceFactory;
6364
import io.trino.plugin.hive.parquet.ParquetFileWriterFactory;
@@ -67,6 +68,7 @@
6768
import java.util.Optional;
6869

6970
import static com.google.inject.multibindings.Multibinder.newSetBinder;
71+
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
7072
import static io.airlift.configuration.ConfigBinder.configBinder;
7173
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
7274
import static org.weakref.jmx.guice.ExportBinder.newExporter;
@@ -138,5 +140,8 @@ protected void setup(Binder binder)
138140

139141
binder.install(new HiveExecutorModule());
140142
install(new ParquetEncryptionModule());
143+
144+
newOptionalBinder(binder, GlueCache.class);
145+
newOptionalBinder(binder, DirectoryLister.class);
141146
}
142147
}

plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,33 @@
1616
import com.google.inject.Binder;
1717
import com.google.inject.Key;
1818
import com.google.inject.Scopes;
19+
import com.google.inject.multibindings.Multibinder;
1920
import io.airlift.configuration.AbstractConfigurationAwareModule;
2021
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
22+
import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure;
23+
import io.trino.plugin.deltalake.procedure.FlushMetadataCacheProcedure;
24+
import io.trino.plugin.deltalake.procedure.RegisterTableProcedure;
25+
import io.trino.plugin.deltalake.procedure.UnregisterTableProcedure;
26+
import io.trino.plugin.deltalake.procedure.VacuumProcedure;
2127
import io.trino.plugin.hive.HideDeltaLakeTables;
2228
import io.trino.plugin.hive.SortingFileWriterConfig;
2329
import io.trino.plugin.hive.orc.OrcReaderConfig;
2430
import io.trino.plugin.hive.orc.OrcWriterConfig;
2531
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
2632
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
33+
import io.trino.plugin.hive.procedure.CreateEmptyPartitionProcedure;
34+
import io.trino.plugin.hive.procedure.DropStatsProcedure;
35+
import io.trino.plugin.hive.procedure.RegisterPartitionProcedure;
36+
import io.trino.plugin.hive.procedure.SyncPartitionMetadataProcedure;
37+
import io.trino.plugin.hive.procedure.UnregisterPartitionProcedure;
38+
import io.trino.plugin.iceberg.procedure.RollbackToSnapshotProcedure;
39+
import io.trino.plugin.lakehouse.procedures.LakehouseDropStatsProcedure;
40+
import io.trino.plugin.lakehouse.procedures.LakehouseFlushMetadataCacheProcedure;
41+
import io.trino.plugin.lakehouse.procedures.LakehouseRegisterTableProcedure;
42+
import io.trino.plugin.lakehouse.procedures.LakehouseUnregisterTableProcedure;
43+
import io.trino.spi.procedure.Procedure;
2744

45+
import static com.google.inject.multibindings.Multibinder.newSetBinder;
2846
import static io.airlift.configuration.ConfigBinder.configBinder;
2947
import static org.weakref.jmx.guice.ExportBinder.newExporter;
3048

@@ -53,6 +71,33 @@ protected void setup(Binder binder)
5371
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
5472
newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName();
5573

74+
Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
75+
// DeltaLake procedures
76+
procedures.addBinding().toProvider(VacuumProcedure.class).in(Scopes.SINGLETON);
77+
// Hive procedures
78+
procedures.addBinding().toProvider(CreateEmptyPartitionProcedure.class).in(Scopes.SINGLETON);
79+
procedures.addBinding().toProvider(RegisterPartitionProcedure.class).in(Scopes.SINGLETON);
80+
procedures.addBinding().toProvider(UnregisterPartitionProcedure.class).in(Scopes.SINGLETON);
81+
procedures.addBinding().toProvider(SyncPartitionMetadataProcedure.class).in(Scopes.SINGLETON);
82+
// Iceberg procedures
83+
procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON);
84+
// Mixed procedures
85+
binder.bind(DropExtendedStatsProcedure.class).in(Scopes.SINGLETON);
86+
binder.bind(DropStatsProcedure.class).in(Scopes.SINGLETON);
87+
procedures.addBinding().toProvider(LakehouseDropStatsProcedure.class).in(Scopes.SINGLETON);
88+
89+
binder.bind(RegisterTableProcedure.class).in(Scopes.SINGLETON);
90+
binder.bind(io.trino.plugin.iceberg.procedure.RegisterTableProcedure.class).in(Scopes.SINGLETON);
91+
procedures.addBinding().toProvider(LakehouseRegisterTableProcedure.class).in(Scopes.SINGLETON);
92+
93+
binder.bind(UnregisterTableProcedure.class).in(Scopes.SINGLETON);
94+
binder.bind(io.trino.plugin.iceberg.procedure.UnregisterTableProcedure.class).in(Scopes.SINGLETON);
95+
procedures.addBinding().toProvider(LakehouseUnregisterTableProcedure.class).in(Scopes.SINGLETON);
96+
97+
binder.bind(FlushMetadataCacheProcedure.class).in(Scopes.SINGLETON);
98+
binder.bind(io.trino.plugin.hive.procedure.FlushMetadataCacheProcedure.class).in(Scopes.SINGLETON);
99+
procedures.addBinding().toProvider(LakehouseFlushMetadataCacheProcedure.class).in(Scopes.SINGLETON);
100+
56101
binder.bind(Key.get(boolean.class, HideDeltaLakeTables.class)).toInstance(false);
57102
}
58103
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.lakehouse.procedures;
15+
16+
import com.google.common.collect.ImmutableList;
17+
import com.google.inject.Inject;
18+
import com.google.inject.Provider;
19+
import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure;
20+
import io.trino.plugin.hive.procedure.DropStatsProcedure;
21+
import io.trino.plugin.lakehouse.TableType;
22+
import io.trino.spi.connector.ConnectorAccessControl;
23+
import io.trino.spi.connector.ConnectorSession;
24+
import io.trino.spi.procedure.Procedure;
25+
import io.trino.spi.type.ArrayType;
26+
27+
import java.lang.invoke.MethodHandle;
28+
import java.util.List;
29+
30+
import static io.trino.spi.type.VarcharType.VARCHAR;
31+
import static java.lang.invoke.MethodHandles.lookup;
32+
import static java.util.Objects.requireNonNull;
33+
34+
/**
35+
* A procedure that drops statistics.
36+
* <p>
37+
* It is delegated to the appropriate underlying procedure based on the table type.
38+
* Currently, it supports Delta Lake and Hive table types.
39+
*/
40+
public class LakehouseDropStatsProcedure
41+
implements Provider<Procedure>
42+
{
43+
private static final MethodHandle DROP_STATS;
44+
45+
private static final String SYSTEM_SCHEMA = "system";
46+
private static final String PROCEDURE_NAME = "drop_stats";
47+
48+
private static final String TABLE_TYPE = "TABLE_TYPE";
49+
private static final String SCHEMA_NAME = "SCHEMA_NAME";
50+
private static final String TABLE_NAME = "TABLE_NAME";
51+
private static final String PARTITION_VALUES = "PARTITION_VALUES";
52+
53+
static {
54+
try {
55+
DROP_STATS = lookup().unreflect(LakehouseDropStatsProcedure.class.getMethod(
56+
"dropStats", ConnectorSession.class, ConnectorAccessControl.class, String.class, String.class, String.class, List.class));
57+
}
58+
catch (ReflectiveOperationException e) {
59+
throw new AssertionError(e);
60+
}
61+
}
62+
63+
private final DropExtendedStatsProcedure deltaLakeDropStatsProcedure;
64+
private final DropStatsProcedure hiveDropStatsProcedure;
65+
66+
@Inject
67+
public LakehouseDropStatsProcedure(
68+
DropExtendedStatsProcedure deltaLakeDropStatsProcedure,
69+
DropStatsProcedure hiveDropStatsProcedure)
70+
{
71+
this.deltaLakeDropStatsProcedure = requireNonNull(deltaLakeDropStatsProcedure, "deltaLakeDropStatsProcedure is null");
72+
this.hiveDropStatsProcedure = requireNonNull(hiveDropStatsProcedure, "hiveDropStatsProcedure is null");
73+
}
74+
75+
@Override
76+
public Procedure get()
77+
{
78+
return new Procedure(
79+
SYSTEM_SCHEMA,
80+
PROCEDURE_NAME,
81+
ImmutableList.of(
82+
new Procedure.Argument(TABLE_TYPE, VARCHAR),
83+
new Procedure.Argument(SCHEMA_NAME, VARCHAR),
84+
new Procedure.Argument(TABLE_NAME, VARCHAR),
85+
new Procedure.Argument(PARTITION_VALUES, new ArrayType(new ArrayType(VARCHAR)), false, null)),
86+
DROP_STATS.bindTo(this));
87+
}
88+
89+
public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<?> partitionValues)
90+
{
91+
if (TableType.DELTA.name().equals(tableType)) {
92+
if (partitionValues != null) {
93+
throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
94+
}
95+
deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table);
96+
}
97+
else if (TableType.HIVE.name().equals(tableType)) {
98+
hiveDropStatsProcedure.dropStats(session, accessControl, schema, table, partitionValues);
99+
}
100+
else {
101+
throw new IllegalArgumentException("Unsupported table type: " + tableType);
102+
}
103+
}
104+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.lakehouse.procedures;
15+
16+
import com.google.common.collect.ImmutableList;
17+
import com.google.inject.Inject;
18+
import com.google.inject.Provider;
19+
import io.trino.plugin.deltalake.procedure.FlushMetadataCacheProcedure;
20+
import io.trino.plugin.lakehouse.TableType;
21+
import io.trino.spi.connector.ConnectorSession;
22+
import io.trino.spi.procedure.Procedure;
23+
import io.trino.spi.type.ArrayType;
24+
25+
import java.lang.invoke.MethodHandle;
26+
import java.util.List;
27+
28+
import static io.trino.spi.type.VarcharType.VARCHAR;
29+
import static java.lang.invoke.MethodHandles.lookup;
30+
import static java.util.Objects.requireNonNull;
31+
32+
/**
33+
* A procedure that registers a table in the metastore.
34+
* <p>
35+
* It is delegated to the appropriate underlying procedure based on the table type.
36+
* Currently, it supports Delta Lake and Hive table types.
37+
*/
38+
public class LakehouseFlushMetadataCacheProcedure
39+
implements Provider<Procedure>
40+
{
41+
private static final MethodHandle FLUSH_METADATA_CACHE;
42+
43+
private static final String SYSTEM_SCHEMA = "system";
44+
private static final String PROCEDURE_NAME = "flush_metadata_cache";
45+
46+
private static final String TABLE_TYPE = "TABLE_TYPE";
47+
private static final String SCHEMA_NAME = "SCHEMA_NAME";
48+
private static final String TABLE_NAME = "TABLE_NAME";
49+
private static final String PARAM_PARTITION_COLUMNS = "PARTITION_COLUMNS";
50+
private static final String PARAM_PARTITION_VALUES = "PARTITION_VALUES";
51+
52+
static {
53+
try {
54+
FLUSH_METADATA_CACHE = lookup().unreflect(LakehouseFlushMetadataCacheProcedure.class.getMethod(
55+
"flushMetadataCache", ConnectorSession.class, String.class, String.class, String.class, List.class, List.class));
56+
}
57+
catch (ReflectiveOperationException e) {
58+
throw new AssertionError(e);
59+
}
60+
}
61+
62+
private final FlushMetadataCacheProcedure deltaLakeFlushMetadataCacheProcedure;
63+
private final io.trino.plugin.hive.procedure.FlushMetadataCacheProcedure hiveFlushMetadataCacheProcedure;
64+
65+
@Inject
66+
public LakehouseFlushMetadataCacheProcedure(
67+
FlushMetadataCacheProcedure deltaLakeFlushMetadataCacheProcedure,
68+
io.trino.plugin.hive.procedure.FlushMetadataCacheProcedure hiveFlushMetadataCacheProcedure)
69+
{
70+
this.deltaLakeFlushMetadataCacheProcedure = requireNonNull(deltaLakeFlushMetadataCacheProcedure, "deltaLakeFlushMetadataCacheProcedure is null");
71+
this.hiveFlushMetadataCacheProcedure = requireNonNull(hiveFlushMetadataCacheProcedure, "hiveFlushMetadataCacheProcedure is null");
72+
}
73+
74+
@Override
75+
public Procedure get()
76+
{
77+
return new Procedure(
78+
SYSTEM_SCHEMA,
79+
PROCEDURE_NAME,
80+
ImmutableList.of(
81+
new Procedure.Argument(TABLE_TYPE, VARCHAR),
82+
new Procedure.Argument(SCHEMA_NAME, VARCHAR),
83+
new Procedure.Argument(TABLE_NAME, VARCHAR),
84+
new Procedure.Argument(PARAM_PARTITION_COLUMNS, new ArrayType(VARCHAR), false, null),
85+
new Procedure.Argument(PARAM_PARTITION_VALUES, new ArrayType(VARCHAR), false, null)),
86+
FLUSH_METADATA_CACHE.bindTo(this));
87+
}
88+
89+
public void flushMetadataCache(ConnectorSession session, String tableType, String schema, String table, List<String> partitionColumns, List<String> partitionValues)
90+
{
91+
if (TableType.DELTA.name().equals(tableType)) {
92+
if (partitionColumns != null && !partitionColumns.isEmpty()) {
93+
throw new IllegalArgumentException("Partition columns are not supported for Delta Lake tables");
94+
}
95+
if (partitionValues != null && !partitionValues.isEmpty()) {
96+
throw new IllegalArgumentException("Partition values are not supported for Delta Lake tables");
97+
}
98+
deltaLakeFlushMetadataCacheProcedure.flushMetadataCache(schema, table);
99+
}
100+
else if (TableType.HIVE.name().equals(tableType)) {
101+
hiveFlushMetadataCacheProcedure.flushMetadataCache(session, schema, table, partitionColumns, partitionValues);
102+
}
103+
else {
104+
throw new IllegalArgumentException("Unsupported table type: " + tableType);
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)