Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,34 @@ public class PhoenixStorageHandlerConstants {
FUNCTION_VALUE_MARKER;

public static final IntWritable INT_ZERO = new IntWritable(0);

/**
* Minimum number of parallel scans(Nps) threshold (Nt) required to trigger parallel split
* generation method (PSGM),instead of serial split generation method (SSGM).
* According to test, SSGM is better when Nps is less than Nt, when Nps is larger than Nt,
* PSGM will be better.
* Note: It is strongly recommend to leave the setting as default,tuning the value doesn't
* make much difference.If you insist on using legacy method(SSGM),set
* phoenix.minimum.parallel.scans.threshold = 0 .
*/
public static final String PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD =
"phoenix.minimum.parallel.scans.threshold";
/**
* Default minimum number of parallel scans threshold,value is acquired by local testing.
*/
public static final int DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD = 8;

/**
* Number of worker threads used to generate input splits using PSGM.
* Note: default setting is suitable for most use cases,
* you can set it to bigger value properly to get better performance.
*/
public static final String PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT =
"phoenix.inputsplit.generation.thread.count";
/**
* Default worker threads used to generate input splits using PSGM.
*/
public static final int DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT =
Runtime.getRuntime().availableProcessors() * 2;

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
Expand All @@ -48,6 +54,7 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.phoenix.compat.CompatUtil;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
Expand All @@ -61,7 +68,6 @@
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.compat.CompatUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -73,7 +79,6 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
T> {

private static final Logger LOG = LoggerFactory.getLogger(PhoenixInputFormat.class);

public PhoenixInputFormat() {
if (LOG.isDebugEnabled()) {
LOG.debug("PhoenixInputFormat created");
Expand Down Expand Up @@ -119,74 +124,144 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
}

private List<InputSplit> generateSplits(final JobConf jobConf, final QueryPlan qplan,
final List<KeyRange> splits, String query) throws
IOException {
if (qplan == null){
final List<KeyRange> splits, final String query)
throws IOException {

if (qplan == null) {
throw new NullPointerException();
}if (splits == null){
}
if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());

Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
.newJobContext(new Job(jobConf)));
boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
final Path[] tablePaths = FileInputFormat.getInputPaths(
ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
final boolean splitByStats = jobConf.getBoolean(
PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);

final int parallelThreshold = jobConf.getInt(
PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD);
setScanCacheSize(jobConf);
try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory
.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
final RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(
qplan.getTableRef().getTable().getPhysicalName().toString()));
final int scanSize = qplan.getScans().size();
if (useParallelInputGeneration(parallelThreshold, scanSize)) {
final int parallism = jobConf.getInt(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final int parallism = jobConf.getInt(
final int parallelism = jobConf.getInt(

PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,
PhoenixStorageHandlerConstants
.DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT);
ExecutorService executorService = Executors.newFixedThreadPool(parallism);
LOG.info("Generate Input Splits in Parallel with {} threads", parallism);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG.info("Generate Input Splits in Parallel with {} threads", parallism);
LOG.info("Generating Input Splits in Parallel with {} threads", parallism);


// Adding Localization
try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
.getTableRef().getTable().getPhysicalName().toString()));

for (List<Scan> scans : qplan.getScans()) {
PhoenixInputSplit inputSplit;

HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow()
, false);
long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location);
String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);

if (splitByStats) {
for (Scan aScan : scans) {
if (LOG.isDebugEnabled()) {
LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan
.getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" +
aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan
.getBatch() + "] and regionLocation : " + regionLocation);
}
List<Future<List<InputSplit>>> tasks = new ArrayList<>();

inputSplit = new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
regionLocation, regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
try {
for (final List<Scan> scans : qplan.getScans()) {
Future<List<InputSplit>> task = executorService.submit(
new Callable<List<InputSplit>>() {
@Override public List<InputSplit> call() throws Exception {
return generateSplitsInternal(query, scans, splitByStats,
connection, regionLocator, tablePaths);
}
});
tasks.add(task);
}
for (Future<List<InputSplit>> task : tasks) {
psplits.addAll(task.get());
}
} catch (ExecutionException | InterruptedException exception) {
throw new IOException("Failed to Generate Input Splits in Parallel, reason:",
exception);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to unwrap the ExecutionException and throw back the real exception. It may already be an IOException which you can throw with a cast, rather than rewrapping in another IOException.

} finally {
executorService.shutdown();
}
} else {
LOG.info("Generate Input Splits in Serial");
for (final List<Scan> scans : qplan.getScans()) {
psplits.addAll(generateSplitsInternal(query, scans,
splitByStats, connection, regionLocator, tablePaths));
}
}
}

return psplits;
}

/**
* This method is used to check whether need to run in parallel to reduce time costs.
* @param parallelThreshold parameter parallelThreshold
* @param scans number of scans
* @return true indicates should generate split in parallel.
*/
private boolean useParallelInputGeneration(final int parallelThreshold, final int scans) {
return parallelThreshold > 0 && scans >= parallelThreshold;
}

/**
* This method is used to generate splits for each scan list.
* @param query phoenix query statement
* @param scans scan list slice of query plan
* @param splitByStats split by stat enabled
* @param connection phoenix connection
* @param regionLocator Hbase Region Locator
* @param tablePaths table paths
* @return List of Input Splits
* @throws IOException if function fails
*/
private List<InputSplit> generateSplitsInternal(final String query, final List<Scan> scans,
final boolean splitByStats, final org.apache.hadoop.hbase.client.Connection connection,
final RegionLocator regionLocator, final Path[] tablePaths) throws IOException {

final List<InputSplit> psplits = new ArrayList<>(scans.size());

PhoenixInputSplit inputSplit;

HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow(),
false);
long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location);
String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);

if (splitByStats) {
for (Scan aScan : scans) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
.get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
.size() - 1).getStopRow()));
LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
.get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " +
"[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
+ ", " + scans.get(0).getBatch() + "] and regionLocation : " +
regionLocation);

for (int i = 0, limit = scans.size(); i < limit; i++) {
LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
.toStringBinary(scans.get(i).getAttribute
(BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
}
LOG.debug("Split for scan : " + aScan + "with scanAttribute : "
+ aScan.getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : ["
+ aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", "
+ aScan.getBatch() + "] and regionLocation : " + regionLocation);
}

inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation,
regionSize);
inputSplit = new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)),
tablePaths[0], regionLocation, regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
.get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
.size() - 1).getStopRow()));

LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
.get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : "
+ "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
+ ", " + scans.get(0).getBatch() + "] and regionLocation : "
+ regionLocation);

for (int i = 0, limit = scans.size(); i < limit; i++) {
LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
.toStringBinary(scans.get(i).getAttribute
(BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
}
}

inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation, regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
}

return psplits;
}
Expand Down
Loading