From 205d484ab4eeb08afaf0c7c51ffe17815f6c8acc Mon Sep 17 00:00:00 2001 From: zdeng Date: Tue, 17 Dec 2024 16:39:25 +0800 Subject: [PATCH 1/4] HIVE-28669: Deadlock found when TxnStoreMutex trying to acquireLock --- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 15 +++-- .../hive/ql/txn/compactor/Initiator.java | 20 +++--- .../hive/metastore/MetastoreTaskThread.java | 8 +++ ...aterializationsRebuildLockCleanerTask.java | 16 ++--- .../hive/metastore/MetaStoreThread.java | 8 +++ .../hive/metastore/leader/CompactorTasks.java | 1 + .../metastore/leader/HouseKeepingTasks.java | 2 + .../hive/metastore/leader/LeaderElection.java | 9 +++ .../metastore/leader/LeaseLeaderElection.java | 8 ++- .../leader/StaticLeaderElection.java | 7 ++- .../metastore/leader/StatsUpdaterTask.java | 1 + .../txn/TransactionalRetryProxy.java | 2 +- .../hive/metastore/txn/TxnDummyMutex.java | 62 +++++++++++++++++++ .../hive/metastore/txn/TxnStoreMutex.java | 10 ++- .../txn/jdbc/TransactionContextManager.java | 17 +++-- .../functions/AbortCompactionFunction.java | 4 +- .../txn/service/AcidHouseKeeperService.java | 16 ++--- .../txn/service/AcidTxnCleanerService.java | 16 ++--- 18 files changed, 172 insertions(+), 50 deletions(-) create mode 100644 standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDummyMutex.java diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 3493ea4d8da6..4d5297da98d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler; import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory; @@ -46,6 +47,7 @@ public class Cleaner extends MetaStoreCompactorThread { static final private String CLASS_NAME = Cleaner.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private boolean metricsEnabled = false; + private boolean shouldUseMutex = true; private ExecutorService cleanerExecutor; private List cleanupHandlers; @@ -70,14 +72,13 @@ public void run() { LOG.info("Starting Cleaner thread"); try { do { - TxnStore.MutexAPI.LockHandle handle = null; + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex(); metadataCache.invalidate(); long startedAt = -1; // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. - try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); + try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.Cleaner.name())) { startedAt = System.currentTimeMillis(); if (metricsEnabled) { @@ -120,9 +121,6 @@ public void run() { LOG.error("Caught an exception in the main loop of compactor cleaner, {}", StringUtils.stringifyException(t)); } finally { - if (handle != null) { - handle.releaseLocks(); - } if (metricsEnabled) { updateCycleDurationMetric(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION, startedAt); } @@ -170,4 +168,9 @@ public void run() { updateCycleDurationMetric(metric, startedAt); } } + + @Override + public void shouldUseMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 8a1bcb987335..5cefb78b1803 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.metrics.PerfLogger; +import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -56,6 +57,7 @@ public class Initiator extends MetaStoreCompactorThread { private ExecutorService compactionExecutor; private boolean metricsEnabled; + private boolean shouldUseMutex = true; @Override public void run() { @@ -70,6 +72,7 @@ public void run() { long abortedTimeThreshold = HiveConf .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, TimeUnit.MILLISECONDS); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex(); // Make sure we run through the loop once before checking to stop as this makes testing // much easier. The stop value is only for testing anyway and not used when called from @@ -78,13 +81,10 @@ public void run() { PerfLogger perfLogger = PerfLogger.getPerfLogger(false); long startedAt = -1; long prevStart; - TxnStore.MutexAPI.LockHandle handle = null; - boolean exceptionally = false; // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop // don't doom the entire thread. - try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name()); + try (TxnStore.MutexAPI.LockHandle handle = mutex.acquireLock(TxnStore.MUTEX_KEY.Initiator.name())) { startedAt = System.currentTimeMillis(); prevStart = handle.getLastUpdateTime(); @@ -174,16 +174,13 @@ public void run() { // Check for timed out remote workers. recoverFailedCompactions(true); + handle.releaseLocks(startedAt); } catch (InterruptedException e) { // do not ignore interruption requests return; } catch (Throwable t) { LOG.error("Initiator loop caught unexpected exception this time through the loop", t); - exceptionally = true; } finally { - if (handle != null) { - if (!exceptionally) handle.releaseLocks(startedAt); else handle.releaseLocks(); - } if (metricsEnabled) { perfLogger.perfLogEnd(CLASS_NAME, MetricsConstants.COMPACTION_INITIATOR_CYCLE); updateCycleDurationMetric(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION, startedAt); @@ -215,8 +212,6 @@ private Database resolveDatabase(CompactionInfo ci) throws MetaException, NoSuch return CompactorUtil.resolveDatabase(conf, ci.dbname); } - - @VisibleForTesting protected String resolveUserToRunAs(Map cache, Table t, Partition p) throws IOException, InterruptedException { @@ -428,4 +423,9 @@ public void run() { } } } + + @Override + public void shouldUseMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java index d56bc2ac8be2..b9a59f793b9a 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java @@ -43,4 +43,12 @@ public interface MetastoreTaskThread extends Configurable, Runnable { default long initialDelay(TimeUnit unit) { return runFrequency(unit); } + + /** + * Should use mutex support to allow only one copy of this task running across the warehouse. + * @param enableMutex true for enabling the mutex, false otherwise + */ + default void shouldUseMutex(boolean enableMutex) { + // no-op + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java index 4c2d5e31b32e..b223317385d5 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; +import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; @@ -37,6 +38,7 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre private Configuration conf; private TxnStore txnHandler; + private boolean shouldUseMutex = true; @Override public long runFrequency(TimeUnit unit) { @@ -60,9 +62,8 @@ public void run() { LOG.debug("Cleaning up materialization rebuild locks"); } - TxnStore.MutexAPI.LockHandle handle = null; - try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name()); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex(); + try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name())) { ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0); long removedCnt = txnHandler.cleanupMaterializationRebuildLocks(validTxnList, MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS)); @@ -73,10 +74,11 @@ public void run() { } } catch (Throwable t) { LOG.error("Unexpected error in thread: {}, message: {}", Thread.currentThread().getName(), t.getMessage(), t); - } finally { - if (handle != null) { - handle.releaseLocks(); - } } } + + @Override + public void shouldUseMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java index dabd61c4b46e..208d941aaef2 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java @@ -42,4 +42,12 @@ public interface MetaStoreThread extends Configurable { * been called. */ void start(); + + /** + * Should use mutex support to allow only one copy of this task running across the warehouse. + * @param enableMutex true for enabling the mutex, false otherwise + */ + default void shouldUseMutex(boolean enableMutex) { + // no-op + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java index 684862762fe6..cab5ae4eff1c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java @@ -138,6 +138,7 @@ public void takeLeadership(LeaderElection election) throws Exception { AtomicBoolean flag = new AtomicBoolean(); thread.setConf(configuration); thread.init(flag); + thread.shouldUseMutex(election.hasMultipleLeaders()); metastoreThreadsMap.put(thread, flag); HiveMetaStore.LOG.info("Starting metastore thread of type " + thread.getClass().getName()); thread.start(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java index 3a4414fd0045..bc9a21f804ed 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java @@ -99,6 +99,7 @@ public void takeLeadership(LeaderElection election) throws Exception { List alwaysTasks = new ArrayList<>(getAlwaysTasks()); for (MetastoreTaskThread task : alwaysTasks) { task.setConf(configuration); + task.shouldUseMutex(election.hasMultipleLeaders()); long freq = task.runFrequency(TimeUnit.MILLISECONDS); // For backwards compatibility, since some threads used to be hard coded but only run if // frequency was > 0 @@ -111,6 +112,7 @@ public void takeLeadership(LeaderElection election) throws Exception { List remoteOnlyTasks = new ArrayList<>(getRemoteOnlyTasks()); for (MetastoreTaskThread task : remoteOnlyTasks) { task.setConf(configuration); + task.shouldUseMutex(election.hasMultipleLeaders()); long freq = task.runFrequency(TimeUnit.MILLISECONDS); runningTasks.add(task); metastoreTaskThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java index b6f10e5936b7..8bf5858b0a52 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java @@ -31,6 +31,11 @@ * @param the type of mutex */ public interface LeaderElection extends Closeable { + // We might have different versions of HMS, or even the same version but with different + // leader election methods running inside the warehouse, so it's hard to know how many HMS instances + // that elected as the leader. Relying on this property to tell us, default is true, means it has multiple + // HMS instances acting as the leader. + public static final String HAVE_MULTIPLE_LEADERS = "hive.metastore.have.multiple.leaders"; /** * Place where election happens @@ -65,6 +70,10 @@ public void tryBeLeader(Configuration conf, T mutex) */ public String getName(); + default boolean hasMultipleLeaders() { + return true; + } + public interface LeadershipStateListener { /** diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java index d6ad76dcce9b..0508ad338054 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java @@ -95,6 +95,7 @@ public class LeaseLeaderElection implements LeaderElection { private String name; private String userName; private String hostName; + private boolean multipleLeaders; public LeaseLeaderElection() throws IOException { userName = SecurityUtils.getUser(); @@ -152,7 +153,7 @@ private void notifyListener() { public void tryBeLeader(Configuration conf, TableName table) throws LeaderException { requireNonNull(conf, "conf is null"); requireNonNull(table, "table is null"); - + this.multipleLeaders = conf.getBoolean(HAVE_MULTIPLE_LEADERS, true); if (store == null) { store = TxnUtils.getTxnStore(conf); } @@ -471,4 +472,9 @@ public void setName(String name) { public String getName() { return name; } + + @Override + public boolean hasMultipleLeaders() { + return this.multipleLeaders; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java index 8a1752bd9a3e..1daa4f3e6fb1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java @@ -39,13 +39,14 @@ public class StaticLeaderElection implements LeaderElection { private volatile boolean isLeader; private String name; private List listeners = new ArrayList<>(); + private boolean multipleLeaders; @Override public void tryBeLeader(Configuration conf, String hostName) throws LeaderException { String leaderHost = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME); - + this.multipleLeaders = conf.getBoolean(HAVE_MULTIPLE_LEADERS, true); // For the sake of backward compatibility, when the current HMS becomes the leader when no // leader is specified. if (leaderHost == null || leaderHost.isEmpty()) { @@ -103,4 +104,8 @@ public void close() { } } + @Override + public boolean hasMultipleLeaders() { + return this.multipleLeaders; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java index 9c3b754d1abb..d2fd56f2b5b6 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java @@ -76,6 +76,7 @@ public void takeLeadership(LeaderElection election) throws Exception { thread.setConf(configuration); stop = new AtomicBoolean(false); thread.init(stop); + thread.shouldUseMutex(election.hasMultipleLeaders()); HiveMetaStore.LOG.info("Starting metastore thread of type " + thread.getClass().getName()); thread.start(); } catch (Exception e) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TransactionalRetryProxy.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TransactionalRetryProxy.java index 0b85b450f3c8..6c854ffc8f5c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TransactionalRetryProxy.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TransactionalRetryProxy.java @@ -110,7 +110,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl TransactionContext context = null; try { jdbcResource.bindDataSource(transactional); - context = jdbcResource.getTransactionManager().getNewTransaction(transactional.propagation().value()); + context = jdbcResource.getTransactionManager().getNewTransaction(transactional); Object result = toCall.execute(); LOG.debug("Successfull method invocation within transactional context: {}, going to commit.", callerId); if (context.isRollbackOnly()) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDummyMutex.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDummyMutex.java new file mode 100644 index 000000000000..2637d53da766 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDummyMutex.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.metastore.api.MetaException; + +/** + * An empty implementation of TxnStore.MutexAPI + */ +public class TxnDummyMutex implements TxnStore.MutexAPI { + + @Override + public LockHandle acquireLock(String key) throws MetaException { + return new DummyHandle(); + } + + @Override + public void acquireLock(String key, LockHandle handle) throws MetaException { + // no-op + } + + private static class DummyHandle implements LockHandle { + + private long lastUpdateTime = 0L; + + @Override + public void releaseLocks() { + // no-op + } + + @Override + public Long getLastUpdateTime() { + return lastUpdateTime; + } + + @Override + public void releaseLocks(Long timestamp) { + this.lastUpdateTime = timestamp; + } + + @Override + public void close() { + // no-op + } + } + +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java index 1013493a7913..cf88f70554b7 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java @@ -35,7 +35,6 @@ import java.util.concurrent.Semaphore; import static org.apache.hadoop.hive.metastore.txn.TxnStore.POOL_MUTEX; -import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED; public class TxnStoreMutex implements TxnStore.MutexAPI { @@ -68,7 +67,7 @@ public LockHandle acquireLock(String key) throws MetaException { TransactionContext context = null; try { jdbcResource.bindDataSource(POOL_MUTEX); - context = jdbcResource.getTransactionManager().getNewTransaction(PROPAGATION_REQUIRED); + context = jdbcResource.getTransactionManager().getNewTransaction(null); MapSqlParameterSource paramSource = new MapSqlParameterSource().addValue("key", key); String sqlStmt = sqlGenerator.addForUpdateClause("SELECT \"MT_COMMENT\", \"MT_KEY2\" FROM \"AUX_TABLE\" WHERE \"MT_KEY1\" = :key"); @@ -142,6 +141,7 @@ public static final class LockHandleImpl implements LockHandle { private final Semaphore derbySemaphore; private final String key; private final Long lastUpdateTime; + private boolean released = false; public LockHandleImpl(MultiDataSourceJdbcResource jdbcResource, TransactionContext context, String key, Long lastUpdateTime, Semaphore derbySemaphore) { @@ -166,6 +166,7 @@ public void releaseLocks() { LOG.debug("{} unlocked by {}", key, HOSTNAME); } } finally { + released = true; jdbcResource.unbindDataSource(); } } @@ -196,13 +197,16 @@ public void releaseLocks(Long timestamp) { LOG.debug("{} unlocked by {}", key, HOSTNAME); } } finally { + released = true; jdbcResource.unbindDataSource(); } } @Override public void close() { - releaseLocks(); + if (!released) { + releaseLocks(); + } } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java index 5be42248fe73..666e13cbdf21 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java @@ -22,6 +22,9 @@ import org.slf4j.LoggerFactory; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.DefaultTransactionDefinition; /** @@ -54,11 +57,17 @@ public class TransactionContextManager { * The created transaction is wrapped into a {@link TransactionContext} which is {@link AutoCloseable} and allows using * the wrapper inside a try-with-resources block. * - * @param propagation The transaction propagation to use. + * @param transactional the transactional definition. */ - public TransactionContext getNewTransaction(int propagation) { - TransactionContext context = new TransactionContext(realTransactionManager.getTransaction( - new DefaultTransactionDefinition(propagation)), this); + public TransactionContext getNewTransaction(Transactional transactional) { + Propagation propagation = transactional == null ? Propagation.REQUIRED : transactional.propagation(); + DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition(propagation.value()); + // Default isolation level is READ_COMMITTED + transactionDefinition.setIsolationLevel(Isolation.READ_COMMITTED.value()); + if (transactional != null && transactional.isolation() != Isolation.DEFAULT) { + transactionDefinition.setIsolationLevel(transactional.isolation().value()); + } + TransactionContext context = new TransactionContext(realTransactionManager.getTransaction(transactionDefinition), this); contexts.set(context); return context; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java index bd4fa91961db..6c398676152e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java @@ -43,8 +43,6 @@ import java.util.List; import java.util.Map; -import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED; - public class AbortCompactionFunction implements TransactionalFunction { private static final Logger LOG = LoggerFactory.getLogger(AbortCompactionFunction.class); @@ -152,7 +150,7 @@ public CompactionAborter withCompactionInfo(CompactionInfo compactionInfo) { @Override public AbortCompactionResponseElement execute() { - try (TransactionContext context = jdbcResource.getTransactionManager().getNewTransaction(PROPAGATION_REQUIRED)) { + try (TransactionContext context = jdbcResource.getTransactionManager().getNewTransaction(null)) { compactionInfo.state = TxnStore.ABORTED_STATE; compactionInfo.errorMessage = "Compaction Aborted by Abort Comapction request."; int updCount; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java index 86799e90621d..02921a99c620 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; @@ -46,6 +47,7 @@ public class AcidHouseKeeperService implements MetastoreTaskThread { protected TxnStore txnHandler; protected String serviceName; protected Map, String> tasks; + private boolean shouldUseMutex = true; public AcidHouseKeeperService() { serviceName = this.getClass().getSimpleName(); @@ -78,19 +80,14 @@ public long runFrequency(TimeUnit unit) { @Override public void run() { - TxnStore.MutexAPI.LockHandle handle = null; - try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name()); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex(); + try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name())) { LOG.info("Starting to run {}", serviceName); long start = System.currentTimeMillis(); cleanTheHouse(); LOG.debug("Total time {} took: {} seconds.", serviceName, elapsedSince(start)); } catch (Exception e) { LOG.error("Unexpected exception in thread: {}, message: {}", Thread.currentThread().getName(), e.getMessage(), e); - } finally { - if (handle != null) { - handle.releaseLocks(); - } } } @@ -107,4 +104,9 @@ private void performTask(FailableRunnable task, String descriptio private long elapsedSince(long start) { return (System.currentTimeMillis() - start) / 1000; } + + @Override + public void shouldUseMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java index 06f284faee08..0398fa13b611 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; @@ -37,6 +38,7 @@ public class AcidTxnCleanerService implements MetastoreTaskThread { private Configuration conf; private TxnStore txnHandler; + private boolean shouldUseMutex = true; @Override public void setConf(Configuration configuration) { @@ -56,22 +58,22 @@ public long runFrequency(TimeUnit unit) { @Override public void run() { - TxnStore.MutexAPI.LockHandle handle = null; - try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.TxnCleaner.name()); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex(); + try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.TxnCleaner.name())) { long start = System.currentTimeMillis(); txnHandler.cleanEmptyAbortedAndCommittedTxns(); LOG.debug("Txn cleaner service took: {} seconds.", elapsedSince(start)); } catch (Exception e) { LOG.error("Unexpected exception in thread: {}, message: {}", Thread.currentThread().getName(), e.getMessage(), e); - } finally { - if (handle != null) { - handle.releaseLocks(); - } } } private long elapsedSince(long start) { return (System.currentTimeMillis() - start) / 1000; } + + @Override + public void shouldUseMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } } From d4fedb15600b22e957f8249a05c60954aac4506d Mon Sep 17 00:00:00 2001 From: zdeng Date: Tue, 7 Jan 2025 22:50:45 +0800 Subject: [PATCH 2/4] review-1 --- .../org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java | 6 +++--- .../apache/hadoop/hive/ql/txn/compactor/Initiator.java | 6 +++--- .../apache/hadoop/hive/metastore/MetastoreTaskThread.java | 4 ++-- .../metastore/MaterializationsRebuildLockCleanerTask.java | 6 +++--- .../org/apache/hadoop/hive/metastore/MetaStoreThread.java | 2 +- .../hadoop/hive/metastore/leader/CompactorTasks.java | 2 +- .../hadoop/hive/metastore/leader/HouseKeepingTasks.java | 4 ++-- .../hadoop/hive/metastore/leader/LeaderElection.java | 4 ++-- .../hadoop/hive/metastore/leader/LeaseLeaderElection.java | 6 +++--- .../hive/metastore/leader/StaticLeaderElection.java | 8 ++++---- .../hadoop/hive/metastore/leader/StatsUpdaterTask.java | 2 +- .../metastore/txn/{TxnDummyMutex.java => NoMutex.java} | 2 +- .../metastore/txn/service/AcidHouseKeeperService.java | 6 +++--- .../hive/metastore/txn/service/AcidTxnCleanerService.java | 6 +++--- 14 files changed, 32 insertions(+), 32 deletions(-) rename standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/{TxnDummyMutex.java => NoMutex.java} (96%) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 4d5297da98d6..6126f150e3ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -21,7 +21,7 @@ import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; -import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex; +import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler; import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory; @@ -72,7 +72,7 @@ public void run() { LOG.info("Starting Cleaner thread"); try { do { - TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex(); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); metadataCache.invalidate(); long startedAt = -1; @@ -170,7 +170,7 @@ public void run() { } @Override - public void shouldUseMutex(boolean enableMutex) { + public void enforceMutex(boolean enableMutex) { this.shouldUseMutex = enableMutex; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 5cefb78b1803..f166d677e40f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.metrics.PerfLogger; -import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex; +import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -72,7 +72,7 @@ public void run() { long abortedTimeThreshold = HiveConf .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, TimeUnit.MILLISECONDS); - TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex(); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); // Make sure we run through the loop once before checking to stop as this makes testing // much easier. The stop value is only for testing anyway and not used when called from @@ -425,7 +425,7 @@ public void run() { } @Override - public void shouldUseMutex(boolean enableMutex) { + public void enforceMutex(boolean enableMutex) { this.shouldUseMutex = enableMutex; } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java index b9a59f793b9a..82beb909773c 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java @@ -46,9 +46,9 @@ default long initialDelay(TimeUnit unit) { /** * Should use mutex support to allow only one copy of this task running across the warehouse. - * @param enableMutex true for enabling the mutex, false otherwise + * @param useMutex true for enabling the mutex, false otherwise */ - default void shouldUseMutex(boolean enableMutex) { + default void enforceMutex(boolean useMutex) { // no-op } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java index b223317385d5..10f9721be21a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java @@ -21,7 +21,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; -import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex; +import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; @@ -62,7 +62,7 @@ public void run() { LOG.debug("Cleaning up materialization rebuild locks"); } - TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex(); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name())) { ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0); long removedCnt = txnHandler.cleanupMaterializationRebuildLocks(validTxnList, @@ -78,7 +78,7 @@ public void run() { } @Override - public void shouldUseMutex(boolean enableMutex) { + public void enforceMutex(boolean enableMutex) { this.shouldUseMutex = enableMutex; } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java index 208d941aaef2..a5c98942a5e7 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java @@ -47,7 +47,7 @@ public interface MetaStoreThread extends Configurable { * Should use mutex support to allow only one copy of this task running across the warehouse. * @param enableMutex true for enabling the mutex, false otherwise */ - default void shouldUseMutex(boolean enableMutex) { + default void enforceMutex(boolean enableMutex) { // no-op } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java index cab5ae4eff1c..8f86e5fbc0e6 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java @@ -138,7 +138,7 @@ public void takeLeadership(LeaderElection election) throws Exception { AtomicBoolean flag = new AtomicBoolean(); thread.setConf(configuration); thread.init(flag); - thread.shouldUseMutex(election.hasMultipleLeaders()); + thread.enforceMutex(election.enforceMutex()); metastoreThreadsMap.put(thread, flag); HiveMetaStore.LOG.info("Starting metastore thread of type " + thread.getClass().getName()); thread.start(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java index bc9a21f804ed..f09be0966ce8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java @@ -99,7 +99,7 @@ public void takeLeadership(LeaderElection election) throws Exception { List alwaysTasks = new ArrayList<>(getAlwaysTasks()); for (MetastoreTaskThread task : alwaysTasks) { task.setConf(configuration); - task.shouldUseMutex(election.hasMultipleLeaders()); + task.enforceMutex(election.enforceMutex()); long freq = task.runFrequency(TimeUnit.MILLISECONDS); // For backwards compatibility, since some threads used to be hard coded but only run if // frequency was > 0 @@ -112,7 +112,7 @@ public void takeLeadership(LeaderElection election) throws Exception { List remoteOnlyTasks = new ArrayList<>(getRemoteOnlyTasks()); for (MetastoreTaskThread task : remoteOnlyTasks) { task.setConf(configuration); - task.shouldUseMutex(election.hasMultipleLeaders()); + task.enforceMutex(election.enforceMutex()); long freq = task.runFrequency(TimeUnit.MILLISECONDS); runningTasks.add(task); metastoreTaskThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java index 8bf5858b0a52..5a6ab5d77bba 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java @@ -35,7 +35,7 @@ public interface LeaderElection extends Closeable { // leader election methods running inside the warehouse, so it's hard to know how many HMS instances // that elected as the leader. Relying on this property to tell us, default is true, means it has multiple // HMS instances acting as the leader. - public static final String HAVE_MULTIPLE_LEADERS = "hive.metastore.have.multiple.leaders"; + static final String HIVE_TXN_ENFORCE_AUX_MUTEX = "hive.metastore.enforce.aux.mutex"; /** * Place where election happens @@ -70,7 +70,7 @@ public void tryBeLeader(Configuration conf, T mutex) */ public String getName(); - default boolean hasMultipleLeaders() { + default boolean enforceMutex() { return true; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java index 0508ad338054..0eac756a9ddb 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java @@ -95,7 +95,7 @@ public class LeaseLeaderElection implements LeaderElection { private String name; private String userName; private String hostName; - private boolean multipleLeaders; + private boolean enforceMutex; public LeaseLeaderElection() throws IOException { userName = SecurityUtils.getUser(); @@ -153,7 +153,7 @@ private void notifyListener() { public void tryBeLeader(Configuration conf, TableName table) throws LeaderException { requireNonNull(conf, "conf is null"); requireNonNull(table, "table is null"); - this.multipleLeaders = conf.getBoolean(HAVE_MULTIPLE_LEADERS, true); + this.enforceMutex = conf.getBoolean(HIVE_TXN_ENFORCE_AUX_MUTEX, true); if (store == null) { store = TxnUtils.getTxnStore(conf); } @@ -474,7 +474,7 @@ public String getName() { } @Override - public boolean hasMultipleLeaders() { + public boolean enforceMutex() { return this.multipleLeaders; } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java index 1daa4f3e6fb1..45917feab7e0 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java @@ -39,14 +39,14 @@ public class StaticLeaderElection implements LeaderElection { private volatile boolean isLeader; private String name; private List listeners = new ArrayList<>(); - private boolean multipleLeaders; + private boolean enforceMutex; @Override public void tryBeLeader(Configuration conf, String hostName) throws LeaderException { String leaderHost = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME); - this.multipleLeaders = conf.getBoolean(HAVE_MULTIPLE_LEADERS, true); + this.enforceMutex = conf.getBoolean(HIVE_TXN_ENFORCE_AUX_MUTEX, true); // For the sake of backward compatibility, when the current HMS becomes the leader when no // leader is specified. if (leaderHost == null || leaderHost.isEmpty()) { @@ -105,7 +105,7 @@ public void close() { } @Override - public boolean hasMultipleLeaders() { - return this.multipleLeaders; + public boolean enforceMutex() { + return this.enforceMutex; } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java index d2fd56f2b5b6..82a49188ba63 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java @@ -76,7 +76,7 @@ public void takeLeadership(LeaderElection election) throws Exception { thread.setConf(configuration); stop = new AtomicBoolean(false); thread.init(stop); - thread.shouldUseMutex(election.hasMultipleLeaders()); + thread.enforceMutex(election.enforceMutex()); HiveMetaStore.LOG.info("Starting metastore thread of type " + thread.getClass().getName()); thread.start(); } catch (Exception e) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDummyMutex.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/NoMutex.java similarity index 96% rename from standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDummyMutex.java rename to standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/NoMutex.java index 2637d53da766..4ce65aac0609 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDummyMutex.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/NoMutex.java @@ -22,7 +22,7 @@ /** * An empty implementation of TxnStore.MutexAPI */ -public class TxnDummyMutex implements TxnStore.MutexAPI { +public class NoMutex implements TxnStore.MutexAPI { @Override public LockHandle acquireLock(String key) throws MetaException { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java index 02921a99c620..836b85851e76 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex; +import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; @@ -80,7 +80,7 @@ public long runFrequency(TimeUnit unit) { @Override public void run() { - TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex(); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name())) { LOG.info("Starting to run {}", serviceName); long start = System.currentTimeMillis(); @@ -106,7 +106,7 @@ private long elapsedSince(long start) { } @Override - public void shouldUseMutex(boolean enableMutex) { + public void enforceMutex(boolean enableMutex) { this.shouldUseMutex = enableMutex; } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java index 0398fa13b611..766ef7b67d8e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java @@ -20,7 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex; +import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; @@ -58,7 +58,7 @@ public long runFrequency(TimeUnit unit) { @Override public void run() { - TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex(); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.TxnCleaner.name())) { long start = System.currentTimeMillis(); txnHandler.cleanEmptyAbortedAndCommittedTxns(); @@ -73,7 +73,7 @@ private long elapsedSince(long start) { } @Override - public void shouldUseMutex(boolean enableMutex) { + public void enforceMutex(boolean enableMutex) { this.shouldUseMutex = enableMutex; } } From 95e10038965f95fad9ca7f9b26b8d8fda47f80d0 Mon Sep 17 00:00:00 2001 From: zdeng Date: Tue, 7 Jan 2025 23:48:36 +0800 Subject: [PATCH 3/4] review-2 --- .../txn/TransactionalRetryProxy.java | 2 +- .../hive/metastore/txn/TxnStoreMutex.java | 2 +- .../txn/jdbc/TransactionContextManager.java | 25 +++++++++++-------- .../functions/AbortCompactionFunction.java | 2 +- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TransactionalRetryProxy.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TransactionalRetryProxy.java index 6c854ffc8f5c..0b85b450f3c8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TransactionalRetryProxy.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TransactionalRetryProxy.java @@ -110,7 +110,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl TransactionContext context = null; try { jdbcResource.bindDataSource(transactional); - context = jdbcResource.getTransactionManager().getNewTransaction(transactional); + context = jdbcResource.getTransactionManager().getNewTransaction(transactional.propagation().value()); Object result = toCall.execute(); LOG.debug("Successfull method invocation within transactional context: {}, going to commit.", callerId); if (context.isRollbackOnly()) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java index cf88f70554b7..4499902ae209 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java @@ -67,7 +67,7 @@ public LockHandle acquireLock(String key) throws MetaException { TransactionContext context = null; try { jdbcResource.bindDataSource(POOL_MUTEX); - context = jdbcResource.getTransactionManager().getNewTransaction(null); + context = jdbcResource.getTransactionManager().getNewTransaction(); MapSqlParameterSource paramSource = new MapSqlParameterSource().addValue("key", key); String sqlStmt = sqlGenerator.addForUpdateClause("SELECT \"MT_COMMENT\", \"MT_KEY2\" FROM \"AUX_TABLE\" WHERE \"MT_KEY1\" = :key"); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java index 666e13cbdf21..40f7e6e5fa32 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java @@ -24,7 +24,6 @@ import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.DefaultTransactionDefinition; /** @@ -57,21 +56,25 @@ public class TransactionContextManager { * The created transaction is wrapped into a {@link TransactionContext} which is {@link AutoCloseable} and allows using * the wrapper inside a try-with-resources block. * - * @param transactional the transactional definition. + * @param propagation The transaction propagation to use. */ - public TransactionContext getNewTransaction(Transactional transactional) { - Propagation propagation = transactional == null ? Propagation.REQUIRED : transactional.propagation(); - DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition(propagation.value()); - // Default isolation level is READ_COMMITTED + public TransactionContext getNewTransaction(int propagation) { + DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition(propagation); + // The TxnStore default isolation level is READ_COMMITTED transactionDefinition.setIsolationLevel(Isolation.READ_COMMITTED.value()); - if (transactional != null && transactional.isolation() != Isolation.DEFAULT) { - transactionDefinition.setIsolationLevel(transactional.isolation().value()); - } - TransactionContext context = new TransactionContext(realTransactionManager.getTransaction(transactionDefinition), this); + TransactionContext context = new TransactionContext( + realTransactionManager.getTransaction(transactionDefinition), this); contexts.set(context); return context; } - + + /** + * Return a new or an existing transaction with isolation level is READ_COMMITTED + */ + public TransactionContext getNewTransaction() { + return getNewTransaction(Propagation.REQUIRED.value()); + } + public TransactionContext getActiveTransaction() { return contexts.get(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java index 6c398676152e..d856a93a3411 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java @@ -150,7 +150,7 @@ public CompactionAborter withCompactionInfo(CompactionInfo compactionInfo) { @Override public AbortCompactionResponseElement execute() { - try (TransactionContext context = jdbcResource.getTransactionManager().getNewTransaction(null)) { + try (TransactionContext context = jdbcResource.getTransactionManager().getNewTransaction()) { compactionInfo.state = TxnStore.ABORTED_STATE; compactionInfo.errorMessage = "Compaction Aborted by Abort Comapction request."; int updCount; From de9eac9b0c36f45d8c439cbaa12d99962f28930f Mon Sep 17 00:00:00 2001 From: zdeng Date: Wed, 8 Jan 2025 09:10:15 +0800 Subject: [PATCH 4/4] review-3 --- .../hadoop/hive/metastore/leader/LeaseLeaderElection.java | 2 +- .../apache/hadoop/hive/metastore/txn/TxnStoreMutex.java | 3 ++- .../metastore/txn/jdbc/TransactionContextManager.java | 8 -------- .../txn/jdbc/functions/AbortCompactionFunction.java | 4 +++- 4 files changed, 6 insertions(+), 11 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java index 0eac756a9ddb..fc4d4078df24 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java @@ -475,6 +475,6 @@ public String getName() { @Override public boolean enforceMutex() { - return this.multipleLeaders; + return this.enforceMutex; } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java index 4499902ae209..3dad12fc1f56 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java @@ -35,6 +35,7 @@ import java.util.concurrent.Semaphore; import static org.apache.hadoop.hive.metastore.txn.TxnStore.POOL_MUTEX; +import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED; public class TxnStoreMutex implements TxnStore.MutexAPI { @@ -67,7 +68,7 @@ public LockHandle acquireLock(String key) throws MetaException { TransactionContext context = null; try { jdbcResource.bindDataSource(POOL_MUTEX); - context = jdbcResource.getTransactionManager().getNewTransaction(); + context = jdbcResource.getTransactionManager().getNewTransaction(PROPAGATION_REQUIRED); MapSqlParameterSource paramSource = new MapSqlParameterSource().addValue("key", key); String sqlStmt = sqlGenerator.addForUpdateClause("SELECT \"MT_COMMENT\", \"MT_KEY2\" FROM \"AUX_TABLE\" WHERE \"MT_KEY1\" = :key"); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java index 40f7e6e5fa32..40daf6f1d39f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java @@ -23,7 +23,6 @@ import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Isolation; -import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.support.DefaultTransactionDefinition; /** @@ -68,13 +67,6 @@ public TransactionContext getNewTransaction(int propagation) { return context; } - /** - * Return a new or an existing transaction with isolation level is READ_COMMITTED - */ - public TransactionContext getNewTransaction() { - return getNewTransaction(Propagation.REQUIRED.value()); - } - public TransactionContext getActiveTransaction() { return contexts.get(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java index d856a93a3411..bd4fa91961db 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/AbortCompactionFunction.java @@ -43,6 +43,8 @@ import java.util.List; import java.util.Map; +import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED; + public class AbortCompactionFunction implements TransactionalFunction { private static final Logger LOG = LoggerFactory.getLogger(AbortCompactionFunction.class); @@ -150,7 +152,7 @@ public CompactionAborter withCompactionInfo(CompactionInfo compactionInfo) { @Override public AbortCompactionResponseElement execute() { - try (TransactionContext context = jdbcResource.getTransactionManager().getNewTransaction()) { + try (TransactionContext context = jdbcResource.getTransactionManager().getNewTransaction(PROPAGATION_REQUIRED)) { compactionInfo.state = TxnStore.ABORTED_STATE; compactionInfo.errorMessage = "Compaction Aborted by Abort Comapction request."; int updCount;