Skip to content

Commit

Permalink
[Feature] temporary table(part-2): support automatic deletion of temp…
Browse files Browse the repository at this point in the history
…orary tables (StarRocks#44139)

Signed-off-by: silverbullet233 <[email protected]>
  • Loading branch information
silverbullet233 authored Apr 19, 2024
1 parent 4863376 commit 420d78f
Show file tree
Hide file tree
Showing 20 changed files with 414 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ public class FunctionSet {
public static final String UNNEST = "unnest";

public static final String CONNECTION_ID = "connection_id";
public static final String SESSION_ID = "session_id";

public static final String CATALOG = "catalog";

Expand Down Expand Up @@ -702,6 +703,7 @@ public class FunctionSet {

public static final Set<String> INFORMATION_FUNCTIONS = ImmutableSet.<String>builder()
.add(CONNECTION_ID)
.add(SESSION_ID)
.add(CATALOG)
.add(DATABASE)
.add(SCHEMA)
Expand Down
21 changes: 21 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.starrocks.server.WarehouseManager;
import com.starrocks.sql.analyzer.Authorizer;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.CleanTemporaryTableStmt;
import com.starrocks.sql.ast.SetListItem;
import com.starrocks.sql.ast.SetStmt;
import com.starrocks.sql.ast.SetType;
Expand Down Expand Up @@ -986,6 +987,26 @@ public void changeCatalogDb(String identifier) throws DdlException {
this.setDatabase(dbName);
}

public void cleanTemporaryTable() {
if (sessionId == null) {
return;
}
if (!GlobalStateMgr.getCurrentState().getTemporaryTableMgr().sessionExists(sessionId)) {
return;
}
LOG.debug("clean temporary table on session {}", sessionId);
try {
setQueryId(UUIDUtil.genUUID());
CleanTemporaryTableStmt cleanTemporaryTableStmt = new CleanTemporaryTableStmt(sessionId);
cleanTemporaryTableStmt.setOrigStmt(
new OriginStatement("clean temporary table on session '" + sessionId.toString() + "'"));
executor = new StmtExecutor(this, cleanTemporaryTableStmt);
executor.execute();
} catch (Throwable e) {
LOG.warn("Failed to clean temporary table on session {}, {}", sessionId, e);
}
}

/**
* Set thread-local context for the scope, and remove it after leaving the scope
*/
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -152,6 +155,7 @@ public void unregisterConnection(ConnectContext ctx) {
if (conns != null) {
conns.decrementAndGet();
}
ctx.cleanTemporaryTable();
LOG.info("Connection closed. remote={}, connectionId={}",
ctx.getMysqlChannel().getRemoteHostPortString(), ctx.getConnectionId());
}
Expand Down Expand Up @@ -192,6 +196,14 @@ public List<ConnectContext.ThreadInfo> listConnection(ConnectContext context, St
return getAllConnThreadInfoByUser(context, user);
}

public Set<UUID> listAllSessionsId() {
Set<UUID> sessionIds = new HashSet<>();
connectionMap.values().forEach(ctx -> {
sessionIds.add(ctx.getSessionId());
});
return sessionIds;
}

private class LoopHandler implements Runnable {
ConnectContext context;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.starrocks.sql.ast.CancelLoadStmt;
import com.starrocks.sql.ast.CancelRefreshDictionaryStmt;
import com.starrocks.sql.ast.CancelRefreshMaterializedViewStmt;
import com.starrocks.sql.ast.CleanTemporaryTableStmt;
import com.starrocks.sql.ast.ClearDataCacheRulesStmt;
import com.starrocks.sql.ast.CreateAnalyzeJobStmt;
import com.starrocks.sql.ast.CreateCatalogStmt;
Expand Down Expand Up @@ -319,6 +320,14 @@ public ShowResultSet visitDropTemporaryTableStatement(DropTemporaryTableStmt stm
return null;
}

@Override
public ShowResultSet visitCleanTemporaryTableStatement(CleanTemporaryTableStmt stmt, ConnectContext context) {
ErrorReport.wrapWithRuntimeException(() -> {
context.getGlobalStateMgr().getMetadataMgr().cleanTemporaryTables(stmt);
});
return null;
}

@Override
public ShowResultSet visitCreateMaterializedViewStmt(CreateMaterializedViewStmt stmt, ConnectContext context) {
ErrorReport.wrapWithRuntimeException(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ public class GlobalStateMgr {

private final MetaRecoveryDaemon metaRecoveryDaemon = new MetaRecoveryDaemon();
private TemporaryTableMgr temporaryTableMgr;
private TemporaryTableCleaner temporaryTableCleaner;

private final SqlParser sqlParser;
private final Analyzer analyzer;
Expand Down Expand Up @@ -754,6 +755,7 @@ public void transferToNonLeader(FrontendNodeType newType) {
this.authorizer = new Authorizer(accessControlProvider);
this.ddlStmtExecutor = new DDLStmtExecutor(DDLStmtExecutor.StmtExecutorVisitor.getInstance());
this.showExecutor = new ShowExecutor(ShowExecutor.ShowExecutorVisitor.getInstance());
this.temporaryTableCleaner = new TemporaryTableCleaner();
}

public static void destroyCheckpoint() {
Expand Down Expand Up @@ -1334,6 +1336,7 @@ private void startLeaderOnlyDaemonThreads() {
LOG.info("run system in recovery mode");
metaRecoveryDaemon.start();
}
temporaryTableCleaner.start();

}

Expand Down
32 changes: 32 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/server/MetadataMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.starrocks.connector.statistics.ConnectorTableColumnStats;
import com.starrocks.qe.ConnectContext;
import com.starrocks.sql.ast.AlterTableStmt;
import com.starrocks.sql.ast.CleanTemporaryTableStmt;
import com.starrocks.sql.ast.CreateTableLikeStmt;
import com.starrocks.sql.ast.CreateTableStmt;
import com.starrocks.sql.ast.CreateTemporaryTableStmt;
Expand Down Expand Up @@ -438,6 +439,37 @@ public void dropTemporaryTable(DropTemporaryTableStmt stmt) {
});
}

public void cleanTemporaryTables(CleanTemporaryTableStmt stmt) {
Preconditions.checkArgument(stmt.getSessionId() != null,
"session id should not be null in DropTemporaryTableStmt");
cleanTemporaryTables(stmt.getSessionId());
}

public void cleanTemporaryTables(UUID sessionId) {
TemporaryTableMgr temporaryTableMgr = GlobalStateMgr.getCurrentState().getTemporaryTableMgr();
com.google.common.collect.Table<Long, String, Long> allTables = temporaryTableMgr.getTemporaryTables(sessionId);

for (Long databaseId : allTables.rowKeySet()) {
Database database = localMetastore.getDb(databaseId);
if (database == null) {
// database maybe dropped by force, we should clean temporary tables on it.
temporaryTableMgr.dropTemporaryTables(sessionId, databaseId);
continue;
}
Map<String, Long> tables = allTables.row(databaseId);
tables.forEach((tableName, tableId) -> {
try {
database.dropTemporaryTable(tableId, tableName, true, true);
temporaryTableMgr.dropTemporaryTable(sessionId, database.getId(), tableName);
} catch (DdlException e) {
LOG.error("Failed to drop temporary table {}.{} in session {}",
database.getFullName(), tableName, sessionId, e);
}
});
}
temporaryTableMgr.removeTemporaryTables(sessionId);
}

public Optional<Table> getTable(TableName tableName) {
return Optional.ofNullable(getTable(tableName.getCatalog(), tableName.getDb(), tableName.getTbl()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.server;

import com.starrocks.common.Config;
import com.starrocks.common.ThreadPoolManager;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.rpc.FrontendServiceProxy;
import com.starrocks.system.Frontend;
import com.starrocks.thrift.TListSessionsOptions;
import com.starrocks.thrift.TListSessionsRequest;
import com.starrocks.thrift.TListSessionsResponse;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TSessionInfo;
import com.starrocks.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;

// TemporaryTableCleaner is used to automatically clean up temporary tables.
// It is only executed on the FE leader.
// It periodically obtains the active session IDs related to all temporary tables on all FEs and
// automatically cleans up temporary tables that are no longer active.
public class TemporaryTableCleaner extends FrontendDaemon {
private static final Logger LOG = LogManager.getLogger(TemporaryTableCleaner.class);
private static final int TEMP_TABLE_CLEANER_THREAD_NUM = 1;
private static final int TEMP_TABLE_CLEANER_QUEUE_SIZE = 100000;

private final ExecutorService executor;

public TemporaryTableCleaner() {
this.executor = ThreadPoolManager.newDaemonFixedThreadPool(TEMP_TABLE_CLEANER_THREAD_NUM, TEMP_TABLE_CLEANER_QUEUE_SIZE,
"temp-table-cleaner-pool", false);
}

@Override
protected void runAfterCatalogReady() {
List<Frontend> frontends = GlobalStateMgr.getCurrentState().getNodeMgr().getFrontends(null);
List<UUID> aliveSessions = new ArrayList<>();

TListSessionsOptions options = new TListSessionsOptions();
options.setTemporary_table_only(true);
TListSessionsRequest request = new TListSessionsRequest();
request.setOptions(options);

for (Frontend frontend : frontends) {
try {
TNetworkAddress thriftAddress = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort());
TListSessionsResponse response = FrontendServiceProxy.call(
thriftAddress, Config.thrift_rpc_timeout_ms, Config.thrift_rpc_retry_times,
client -> client.listSessions(request));
if (response.getStatus() == null || response.getStatus().getStatus_code() != TStatusCode.OK) {
throw new Exception("response status is not ok: " +
(response.getStatus() == null ? "NULL" : response.getStatus().getStatus_code()));
}
if (response.getSessions() != null) {
List<TSessionInfo> sessions = response.getSessions();
for (TSessionInfo sessionInfo : sessions) {
UUID sessionId = UUID.fromString(sessionInfo.getSession_id());
LOG.debug("alive session {} on fe {}:{}",
sessionId, frontend.getHost(), frontend.getRpcPort());
aliveSessions.add(sessionId);
}
}
} catch (Throwable e) {
LOG.warn("listSessions return error from {}:{}, skip clean temporary tables",
frontend.getHost(), frontend.getRpcPort(), e);
return;
}
}

MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr();
TemporaryTableMgr temporaryTableMgr = GlobalStateMgr.getCurrentState().getTemporaryTableMgr();
Set<UUID> recordSessions = temporaryTableMgr.listSessions();
for (UUID sessionId : recordSessions) {
if (!aliveSessions.contains(sessionId)) {
LOG.info("cannot find alive session {}, should clean all temporary tables on it", sessionId);
metadataMgr.cleanTemporaryTables(sessionId);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public void removeTable(Long databaseId, String tableName) {
}
}

public void removeTables(Long databaseId) {
try (CloseableLock ignored = CloseableLock.lock(this.rwLock.writeLock())) {
temporaryTables.row(databaseId).clear();
}
}

public Table<Long, String, Long> getAllTables() {
try (CloseableLock ignored = CloseableLock.lock(this.rwLock.readLock())) {
return HashBasedTable.create(temporaryTables);
Expand Down Expand Up @@ -117,6 +123,15 @@ public void dropTemporaryTable(UUID sessionId, long databaseId, String tableName
sessionId.toString(), databaseId, tableName);
}

public void dropTemporaryTables(UUID sessionId, long databaseId) {
TemporaryTableTable tables = tablesMap.get(sessionId);
if (tables == null) {
return;
}
tables.removeTables(databaseId);
LOG.info("drop all temporary tables on database[{}], session[{}]", databaseId, sessionId.toString());
}

public Table<Long, String, Long> getTemporaryTables(UUID sessionId) {
TemporaryTableTable tables = tablesMap.get(sessionId);
if (tables == null) {
Expand Down Expand Up @@ -154,6 +169,10 @@ public Table<Long, UUID, Long> getAllTemporaryTables(Set<Long> requiredDatabaseI
return result;
}

public boolean sessionExists(UUID sessionId) {
return tablesMap.containsKey(sessionId);
}

public Set<UUID> listSessions() {
return tablesMap.keySet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import com.starrocks.scheduler.persist.TaskRunStatus;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.MetadataMgr;
import com.starrocks.server.TemporaryTableMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.sql.analyzer.AnalyzerUtils;
import com.starrocks.sql.analyzer.Authorizer;
Expand Down Expand Up @@ -237,6 +238,9 @@
import com.starrocks.thrift.TListPipesInfo;
import com.starrocks.thrift.TListPipesParams;
import com.starrocks.thrift.TListPipesResult;
import com.starrocks.thrift.TListSessionsOptions;
import com.starrocks.thrift.TListSessionsRequest;
import com.starrocks.thrift.TListSessionsResponse;
import com.starrocks.thrift.TListTableStatusResult;
import com.starrocks.thrift.TLoadInfo;
import com.starrocks.thrift.TLoadJobType;
Expand Down Expand Up @@ -273,6 +277,7 @@
import com.starrocks.thrift.TRequireSlotRequest;
import com.starrocks.thrift.TRequireSlotResponse;
import com.starrocks.thrift.TRoutineLoadJobInfo;
import com.starrocks.thrift.TSessionInfo;
import com.starrocks.thrift.TSetConfigRequest;
import com.starrocks.thrift.TSetConfigResponse;
import com.starrocks.thrift.TShowVariableRequest;
Expand Down Expand Up @@ -321,6 +326,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -2816,4 +2822,34 @@ public TReportLakeCompactionResponse reportLakeCompaction(TReportLakeCompactionR
}
return resp;
}

@Override
public TListSessionsResponse listSessions(TListSessionsRequest request) throws TException {
TListSessionsResponse response = new TListSessionsResponse();
if (!request.isSetOptions()) {
TStatus status = new TStatus(TStatusCode.INVALID_ARGUMENT);
status.addToError_msgs("options must be set");
response.setStatus(status);
return response;
}
TListSessionsOptions options = request.options;
if (options.isSetTemporary_table_only() && options.temporary_table_only) {
TemporaryTableMgr temporaryTableMgr = GlobalStateMgr.getCurrentState().getTemporaryTableMgr();
Set<UUID> sessions = ExecuteEnv.getInstance().getScheduler().listAllSessionsId();
sessions.retainAll(temporaryTableMgr.listSessions());
List<TSessionInfo> sessionInfos = new ArrayList<>();
for (UUID session : sessions) {
TSessionInfo sessionInfo = new TSessionInfo();
sessionInfo.setSession_id(session.toString());
sessionInfos.add(sessionInfo);
}
response.setStatus(new TStatus(TStatusCode.OK));
response.setSessions(sessionInfos);
} else {
TStatus status = new TStatus(NOT_IMPLEMENTED_ERROR);
status.addToError_msgs("only support temporary_table_only options now");
response.setStatus(status);
}
return response;
}
}
Loading

0 comments on commit 420d78f

Please sign in to comment.