diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 3493ea4d8da6..6126f150e3ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; +import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler; import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory; @@ -46,6 +47,7 @@ public class Cleaner extends MetaStoreCompactorThread { static final private String CLASS_NAME = Cleaner.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private boolean metricsEnabled = false; + private boolean shouldUseMutex = true; private ExecutorService cleanerExecutor; private List cleanupHandlers; @@ -70,14 +72,13 @@ public void run() { LOG.info("Starting Cleaner thread"); try { do { - TxnStore.MutexAPI.LockHandle handle = null; + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); metadataCache.invalidate(); long startedAt = -1; // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. - try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); + try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.Cleaner.name())) { startedAt = System.currentTimeMillis(); if (metricsEnabled) { @@ -120,9 +121,6 @@ public void run() { LOG.error("Caught an exception in the main loop of compactor cleaner, {}", StringUtils.stringifyException(t)); } finally { - if (handle != null) { - handle.releaseLocks(); - } if (metricsEnabled) { updateCycleDurationMetric(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION, startedAt); } @@ -170,4 +168,9 @@ public void run() { updateCycleDurationMetric(metric, startedAt); } } + + @Override + public void enforceMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 8a1bcb987335..f166d677e40f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.metrics.PerfLogger; +import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -56,6 +57,7 @@ public class Initiator extends MetaStoreCompactorThread { private ExecutorService compactionExecutor; private boolean metricsEnabled; + private boolean shouldUseMutex = true; @Override public void run() { @@ -70,6 +72,7 @@ public void run() { long abortedTimeThreshold = HiveConf .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, TimeUnit.MILLISECONDS); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); // Make sure we run through the loop once before checking to stop as this makes testing // much easier. The stop value is only for testing anyway and not used when called from @@ -78,13 +81,10 @@ public void run() { PerfLogger perfLogger = PerfLogger.getPerfLogger(false); long startedAt = -1; long prevStart; - TxnStore.MutexAPI.LockHandle handle = null; - boolean exceptionally = false; // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop // don't doom the entire thread. - try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name()); + try (TxnStore.MutexAPI.LockHandle handle = mutex.acquireLock(TxnStore.MUTEX_KEY.Initiator.name())) { startedAt = System.currentTimeMillis(); prevStart = handle.getLastUpdateTime(); @@ -174,16 +174,13 @@ public void run() { // Check for timed out remote workers. recoverFailedCompactions(true); + handle.releaseLocks(startedAt); } catch (InterruptedException e) { // do not ignore interruption requests return; } catch (Throwable t) { LOG.error("Initiator loop caught unexpected exception this time through the loop", t); - exceptionally = true; } finally { - if (handle != null) { - if (!exceptionally) handle.releaseLocks(startedAt); else handle.releaseLocks(); - } if (metricsEnabled) { perfLogger.perfLogEnd(CLASS_NAME, MetricsConstants.COMPACTION_INITIATOR_CYCLE); updateCycleDurationMetric(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION, startedAt); @@ -215,8 +212,6 @@ private Database resolveDatabase(CompactionInfo ci) throws MetaException, NoSuch return CompactorUtil.resolveDatabase(conf, ci.dbname); } - - @VisibleForTesting protected String resolveUserToRunAs(Map cache, Table t, Partition p) throws IOException, InterruptedException { @@ -428,4 +423,9 @@ public void run() { } } } + + @Override + public void enforceMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java index d56bc2ac8be2..82beb909773c 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java @@ -43,4 +43,12 @@ public interface MetastoreTaskThread extends Configurable, Runnable { default long initialDelay(TimeUnit unit) { return runFrequency(unit); } + + /** + * Should use mutex support to allow only one copy of this task running across the warehouse. + * @param useMutex true for enabling the mutex, false otherwise + */ + default void enforceMutex(boolean useMutex) { + // no-op + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java index 4c2d5e31b32e..10f9721be21a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; +import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; @@ -37,6 +38,7 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre private Configuration conf; private TxnStore txnHandler; + private boolean shouldUseMutex = true; @Override public long runFrequency(TimeUnit unit) { @@ -60,9 +62,8 @@ public void run() { LOG.debug("Cleaning up materialization rebuild locks"); } - TxnStore.MutexAPI.LockHandle handle = null; - try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name()); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); + try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name())) { ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0); long removedCnt = txnHandler.cleanupMaterializationRebuildLocks(validTxnList, MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS)); @@ -73,10 +74,11 @@ public void run() { } } catch (Throwable t) { LOG.error("Unexpected error in thread: {}, message: {}", Thread.currentThread().getName(), t.getMessage(), t); - } finally { - if (handle != null) { - handle.releaseLocks(); - } } } + + @Override + public void enforceMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java index dabd61c4b46e..a5c98942a5e7 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java @@ -42,4 +42,12 @@ public interface MetaStoreThread extends Configurable { * been called. */ void start(); + + /** + * Should use mutex support to allow only one copy of this task running across the warehouse. + * @param enableMutex true for enabling the mutex, false otherwise + */ + default void enforceMutex(boolean enableMutex) { + // no-op + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java index 684862762fe6..8f86e5fbc0e6 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java @@ -138,6 +138,7 @@ public void takeLeadership(LeaderElection election) throws Exception { AtomicBoolean flag = new AtomicBoolean(); thread.setConf(configuration); thread.init(flag); + thread.enforceMutex(election.enforceMutex()); metastoreThreadsMap.put(thread, flag); HiveMetaStore.LOG.info("Starting metastore thread of type " + thread.getClass().getName()); thread.start(); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java index 3a4414fd0045..f09be0966ce8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java @@ -99,6 +99,7 @@ public void takeLeadership(LeaderElection election) throws Exception { List alwaysTasks = new ArrayList<>(getAlwaysTasks()); for (MetastoreTaskThread task : alwaysTasks) { task.setConf(configuration); + task.enforceMutex(election.enforceMutex()); long freq = task.runFrequency(TimeUnit.MILLISECONDS); // For backwards compatibility, since some threads used to be hard coded but only run if // frequency was > 0 @@ -111,6 +112,7 @@ public void takeLeadership(LeaderElection election) throws Exception { List remoteOnlyTasks = new ArrayList<>(getRemoteOnlyTasks()); for (MetastoreTaskThread task : remoteOnlyTasks) { task.setConf(configuration); + task.enforceMutex(election.enforceMutex()); long freq = task.runFrequency(TimeUnit.MILLISECONDS); runningTasks.add(task); metastoreTaskThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java index b6f10e5936b7..5a6ab5d77bba 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java @@ -31,6 +31,11 @@ * @param the type of mutex */ public interface LeaderElection extends Closeable { + // We might have different versions of HMS, or even the same version but with different + // leader election methods running inside the warehouse, so it's hard to know how many HMS instances + // that elected as the leader. Relying on this property to tell us, default is true, means it has multiple + // HMS instances acting as the leader. + static final String HIVE_TXN_ENFORCE_AUX_MUTEX = "hive.metastore.enforce.aux.mutex"; /** * Place where election happens @@ -65,6 +70,10 @@ public void tryBeLeader(Configuration conf, T mutex) */ public String getName(); + default boolean enforceMutex() { + return true; + } + public interface LeadershipStateListener { /** diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java index d6ad76dcce9b..fc4d4078df24 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java @@ -95,6 +95,7 @@ public class LeaseLeaderElection implements LeaderElection { private String name; private String userName; private String hostName; + private boolean enforceMutex; public LeaseLeaderElection() throws IOException { userName = SecurityUtils.getUser(); @@ -152,7 +153,7 @@ private void notifyListener() { public void tryBeLeader(Configuration conf, TableName table) throws LeaderException { requireNonNull(conf, "conf is null"); requireNonNull(table, "table is null"); - + this.enforceMutex = conf.getBoolean(HIVE_TXN_ENFORCE_AUX_MUTEX, true); if (store == null) { store = TxnUtils.getTxnStore(conf); } @@ -471,4 +472,9 @@ public void setName(String name) { public String getName() { return name; } + + @Override + public boolean enforceMutex() { + return this.enforceMutex; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java index 8a1752bd9a3e..45917feab7e0 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java @@ -39,13 +39,14 @@ public class StaticLeaderElection implements LeaderElection { private volatile boolean isLeader; private String name; private List listeners = new ArrayList<>(); + private boolean enforceMutex; @Override public void tryBeLeader(Configuration conf, String hostName) throws LeaderException { String leaderHost = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME); - + this.enforceMutex = conf.getBoolean(HIVE_TXN_ENFORCE_AUX_MUTEX, true); // For the sake of backward compatibility, when the current HMS becomes the leader when no // leader is specified. if (leaderHost == null || leaderHost.isEmpty()) { @@ -103,4 +104,8 @@ public void close() { } } + @Override + public boolean enforceMutex() { + return this.enforceMutex; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java index 9c3b754d1abb..82a49188ba63 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java @@ -76,6 +76,7 @@ public void takeLeadership(LeaderElection election) throws Exception { thread.setConf(configuration); stop = new AtomicBoolean(false); thread.init(stop); + thread.enforceMutex(election.enforceMutex()); HiveMetaStore.LOG.info("Starting metastore thread of type " + thread.getClass().getName()); thread.start(); } catch (Exception e) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/NoMutex.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/NoMutex.java new file mode 100644 index 000000000000..4ce65aac0609 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/NoMutex.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn; + +import org.apache.hadoop.hive.metastore.api.MetaException; + +/** + * An empty implementation of TxnStore.MutexAPI + */ +public class NoMutex implements TxnStore.MutexAPI { + + @Override + public LockHandle acquireLock(String key) throws MetaException { + return new DummyHandle(); + } + + @Override + public void acquireLock(String key, LockHandle handle) throws MetaException { + // no-op + } + + private static class DummyHandle implements LockHandle { + + private long lastUpdateTime = 0L; + + @Override + public void releaseLocks() { + // no-op + } + + @Override + public Long getLastUpdateTime() { + return lastUpdateTime; + } + + @Override + public void releaseLocks(Long timestamp) { + this.lastUpdateTime = timestamp; + } + + @Override + public void close() { + // no-op + } + } + +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java index 1013493a7913..3dad12fc1f56 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java @@ -142,6 +142,7 @@ public static final class LockHandleImpl implements LockHandle { private final Semaphore derbySemaphore; private final String key; private final Long lastUpdateTime; + private boolean released = false; public LockHandleImpl(MultiDataSourceJdbcResource jdbcResource, TransactionContext context, String key, Long lastUpdateTime, Semaphore derbySemaphore) { @@ -166,6 +167,7 @@ public void releaseLocks() { LOG.debug("{} unlocked by {}", key, HOSTNAME); } } finally { + released = true; jdbcResource.unbindDataSource(); } } @@ -196,13 +198,16 @@ public void releaseLocks(Long timestamp) { LOG.debug("{} unlocked by {}", key, HOSTNAME); } } finally { + released = true; jdbcResource.unbindDataSource(); } } @Override public void close() { - releaseLocks(); + if (!released) { + releaseLocks(); + } } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java index 5be42248fe73..40daf6f1d39f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.support.DefaultTransactionDefinition; /** @@ -57,12 +58,15 @@ public class TransactionContextManager { * @param propagation The transaction propagation to use. */ public TransactionContext getNewTransaction(int propagation) { - TransactionContext context = new TransactionContext(realTransactionManager.getTransaction( - new DefaultTransactionDefinition(propagation)), this); + DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition(propagation); + // The TxnStore default isolation level is READ_COMMITTED + transactionDefinition.setIsolationLevel(Isolation.READ_COMMITTED.value()); + TransactionContext context = new TransactionContext( + realTransactionManager.getTransaction(transactionDefinition), this); contexts.set(context); return context; } - + public TransactionContext getActiveTransaction() { return contexts.get(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java index 86799e90621d..836b85851e76 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; @@ -46,6 +47,7 @@ public class AcidHouseKeeperService implements MetastoreTaskThread { protected TxnStore txnHandler; protected String serviceName; protected Map, String> tasks; + private boolean shouldUseMutex = true; public AcidHouseKeeperService() { serviceName = this.getClass().getSimpleName(); @@ -78,19 +80,14 @@ public long runFrequency(TimeUnit unit) { @Override public void run() { - TxnStore.MutexAPI.LockHandle handle = null; - try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name()); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); + try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name())) { LOG.info("Starting to run {}", serviceName); long start = System.currentTimeMillis(); cleanTheHouse(); LOG.debug("Total time {} took: {} seconds.", serviceName, elapsedSince(start)); } catch (Exception e) { LOG.error("Unexpected exception in thread: {}, message: {}", Thread.currentThread().getName(), e.getMessage(), e); - } finally { - if (handle != null) { - handle.releaseLocks(); - } } } @@ -107,4 +104,9 @@ private void performTask(FailableRunnable task, String descriptio private long elapsedSince(long start) { return (System.currentTimeMillis() - start) / 1000; } + + @Override + public void enforceMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java index 06f284faee08..766ef7b67d8e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java @@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.NoMutex; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.slf4j.Logger; @@ -37,6 +38,7 @@ public class AcidTxnCleanerService implements MetastoreTaskThread { private Configuration conf; private TxnStore txnHandler; + private boolean shouldUseMutex = true; @Override public void setConf(Configuration configuration) { @@ -56,22 +58,22 @@ public long runFrequency(TimeUnit unit) { @Override public void run() { - TxnStore.MutexAPI.LockHandle handle = null; - try { - handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.TxnCleaner.name()); + TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex(); + try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.TxnCleaner.name())) { long start = System.currentTimeMillis(); txnHandler.cleanEmptyAbortedAndCommittedTxns(); LOG.debug("Txn cleaner service took: {} seconds.", elapsedSince(start)); } catch (Exception e) { LOG.error("Unexpected exception in thread: {}, message: {}", Thread.currentThread().getName(), e.getMessage(), e); - } finally { - if (handle != null) { - handle.releaseLocks(); - } } } private long elapsedSince(long start) { return (System.currentTimeMillis() - start) / 1000; } + + @Override + public void enforceMutex(boolean enableMutex) { + this.shouldUseMutex = enableMutex; + } }