Skip to content

Commit

Permalink
review-1
Browse files Browse the repository at this point in the history
  • Loading branch information
dengzhhu653 committed Jan 7, 2025
1 parent 205d484 commit 0390bf3
Show file tree
Hide file tree
Showing 13 changed files with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex;
import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory;
Expand Down Expand Up @@ -72,7 +72,7 @@ public void run() {
LOG.info("Starting Cleaner thread");
try {
do {
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex();
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex();
metadataCache.invalidate();
long startedAt = -1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex;
import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
Expand Down Expand Up @@ -72,7 +72,7 @@ public void run() {
long abortedTimeThreshold = HiveConf
.getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
TimeUnit.MILLISECONDS);
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex();
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex();

// Make sure we run through the loop once before checking to stop as this makes testing
// much easier. The stop value is only for testing anyway and not used when called from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ default long initialDelay(TimeUnit unit) {

/**
* Should use mutex support to allow only one copy of this task running across the warehouse.
* @param enableMutex true for enabling the mutex, false otherwise
* @param useMutex true for enabling the mutex, false otherwise
*/
default void shouldUseMutex(boolean enableMutex) {
default void enforceMutex(boolean useMutex) {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex;
import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -62,7 +62,7 @@ public void run() {
LOG.debug("Cleaning up materialization rebuild locks");
}

TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex();
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex();
try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name())) {
ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
long removedCnt = txnHandler.cleanupMaterializationRebuildLocks(validTxnList,
Expand All @@ -78,7 +78,7 @@ public void run() {
}

@Override
public void shouldUseMutex(boolean enableMutex) {
public void enforceMutex(boolean enableMutex) {
this.shouldUseMutex = enableMutex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void takeLeadership(LeaderElection election) throws Exception {
AtomicBoolean flag = new AtomicBoolean();
thread.setConf(configuration);
thread.init(flag);
thread.shouldUseMutex(election.hasMultipleLeaders());
thread.shouldUseMutex(election.enforceMutex());
metastoreThreadsMap.put(thread, flag);
HiveMetaStore.LOG.info("Starting metastore thread of type " + thread.getClass().getName());
thread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void takeLeadership(LeaderElection election) throws Exception {
List<MetastoreTaskThread> alwaysTasks = new ArrayList<>(getAlwaysTasks());
for (MetastoreTaskThread task : alwaysTasks) {
task.setConf(configuration);
task.shouldUseMutex(election.hasMultipleLeaders());
task.enforceMutex(election.enforceMutex());
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
// For backwards compatibility, since some threads used to be hard coded but only run if
// frequency was > 0
Expand All @@ -112,7 +112,7 @@ public void takeLeadership(LeaderElection election) throws Exception {
List<MetastoreTaskThread> remoteOnlyTasks = new ArrayList<>(getRemoteOnlyTasks());
for (MetastoreTaskThread task : remoteOnlyTasks) {
task.setConf(configuration);
task.shouldUseMutex(election.hasMultipleLeaders());
task.enforceMutex(election.enforceMutex());
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
runningTasks.add(task);
metastoreTaskThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface LeaderElection<T> extends Closeable {
// leader election methods running inside the warehouse, so it's hard to know how many HMS instances
// that elected as the leader. Relying on this property to tell us, default is true, means it has multiple
// HMS instances acting as the leader.
public static final String HAVE_MULTIPLE_LEADERS = "hive.metastore.have.multiple.leaders";
static final String HIVE_TXN_ENFORCE_AUX_MUTEX = "hive.metastore.enforce.aux.mutex";

/**
* Place where election happens
Expand Down Expand Up @@ -70,7 +70,7 @@ public void tryBeLeader(Configuration conf, T mutex)
*/
public String getName();

default boolean hasMultipleLeaders() {
default boolean enforceMutex() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class LeaseLeaderElection implements LeaderElection<TableName> {
private String name;
private String userName;
private String hostName;
private boolean multipleLeaders;
private boolean enforceMutex;

public LeaseLeaderElection() throws IOException {
userName = SecurityUtils.getUser();
Expand Down Expand Up @@ -153,7 +153,7 @@ private void notifyListener() {
public void tryBeLeader(Configuration conf, TableName table) throws LeaderException {
requireNonNull(conf, "conf is null");
requireNonNull(table, "table is null");
this.multipleLeaders = conf.getBoolean(HAVE_MULTIPLE_LEADERS, true);
this.enforceMutex = conf.getBoolean(HIVE_TXN_ENFORCE_AUX_MUTEX, true);
if (store == null) {
store = TxnUtils.getTxnStore(conf);
}
Expand Down Expand Up @@ -474,7 +474,7 @@ public String getName() {
}

@Override
public boolean hasMultipleLeaders() {
public boolean enforceMutex() {
return this.multipleLeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ public class StaticLeaderElection implements LeaderElection<String> {
private volatile boolean isLeader;
private String name;
private List<LeadershipStateListener> listeners = new ArrayList<>();
private boolean multipleLeaders;
private boolean enforceMutex;

@Override
public void tryBeLeader(Configuration conf, String hostName)
throws LeaderException {
String leaderHost = MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME);
this.multipleLeaders = conf.getBoolean(HAVE_MULTIPLE_LEADERS, true);
this.enforceMutex = conf.getBoolean(HIVE_TXN_ENFORCE_AUX_MUTEX, true);
// For the sake of backward compatibility, when the current HMS becomes the leader when no
// leader is specified.
if (leaderHost == null || leaderHost.isEmpty()) {
Expand Down Expand Up @@ -105,7 +105,7 @@ public void close() {
}

@Override
public boolean hasMultipleLeaders() {
return this.multipleLeaders;
public boolean enforceMutex() {
return this.enforceMutex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void takeLeadership(LeaderElection election) throws Exception {
thread.setConf(configuration);
stop = new AtomicBoolean(false);
thread.init(stop);
thread.shouldUseMutex(election.hasMultipleLeaders());
thread.shouldUseMutex(election.enforceMutex());
HiveMetaStore.LOG.info("Starting metastore thread of type " + thread.getClass().getName());
thread.start();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* An empty implementation of TxnStore.MutexAPI
*/
public class TxnDummyMutex implements TxnStore.MutexAPI {
public class NoMutex implements TxnStore.MutexAPI {

@Override
public LockHandle acquireLock(String key) throws MetaException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex;
import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -80,7 +80,7 @@ public long runFrequency(TimeUnit unit) {

@Override
public void run() {
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex();
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex();
try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name())) {
LOG.info("Starting to run {}", serviceName);
long start = System.currentTimeMillis();
Expand All @@ -106,7 +106,7 @@ private long elapsedSince(long start) {
}

@Override
public void shouldUseMutex(boolean enableMutex) {
public void enforceMutex(boolean enableMutex) {
this.shouldUseMutex = enableMutex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnDummyMutex;
import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -58,7 +58,7 @@ public long runFrequency(TimeUnit unit) {

@Override
public void run() {
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new TxnDummyMutex();
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex();
try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.TxnCleaner.name())) {
long start = System.currentTimeMillis();
txnHandler.cleanEmptyAbortedAndCommittedTxns();
Expand All @@ -73,7 +73,7 @@ private long elapsedSince(long start) {
}

@Override
public void shouldUseMutex(boolean enableMutex) {
public void enforceMutex(boolean enableMutex) {
this.shouldUseMutex = enableMutex;
}
}

0 comments on commit 0390bf3

Please sign in to comment.