Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final QueryPlan qplan,
final List<KeyRange> splits, String query) throws
IOException {
if (qplan == null){
if (qplan == null) {
throw new NullPointerException();
}if (splits == null){
}
if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());

Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
.newJobContext(new Job(jobConf)));
boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
final Path[] tablePaths = FileInputFormat.getInputPaths(
ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
final boolean splitByStats = jobConf.getBoolean(
PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);

final int parallelThreshould = jobConf.getInt(
"hive.phoenix.split.parallel.threshold",
32);
setScanCacheSize(jobConf);
if (
(parallelThreshould <= 0)
Comment thread
jichen20210919 marked this conversation as resolved.
Outdated
||
(qplan.getScans().size() < parallelThreshould)
) {
LOG.info("generate splits in serial");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it clear that Generate Input Splits in serial.

for (final List<Scan> scans : qplan.getScans()) {
psplits.addAll(
generateSplitsInternal(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduce the number of lines used to call the method.

jobConf,
qplan,
splits,
query,
scans,
splitByStats,
tablePaths)
);
}
} else {
final int parallism = jobConf.getInt(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the difference between this parallelism level config and parallel threshold.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parallelism level config is used to control the worker threads count for parallel split method, parallel threshold is used to control which split-generation method is used, serial or parallel.

"hive.phoenix.split.parallel.level",
Runtime.getRuntime().availableProcessors() * 2);
ExecutorService executorService = Executors.newFixedThreadPool(
parallism);
LOG.info("generate splits in parallel with {} threads", parallism);

// Adding Localization
try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
.getTableRef().getTable().getPhysicalName().toString()));
List<Future<List<InputSplit>>> tasks = new ArrayList<>();

for (List<Scan> scans : qplan.getScans()) {
try {
for (final List<Scan> scans : qplan.getScans()) {
Future<List<InputSplit>> task = executorService.submit(
new Callable<List<InputSplit>>() {
@Override
public List<InputSplit> call() throws Exception {
return generateSplitsInternal(jobConf,
qplan,
splits,
query,
scans,
splitByStats,
tablePaths);
}
});
tasks.add(task);
}
for (Future<List<InputSplit>> task : tasks) {
psplits.addAll(task.get());
}
} catch (ExecutionException | InterruptedException exception) {
throw new IOException("failed to get splits,reason:",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log message can be improved.

exception);
} finally {
executorService.shutdown();
}
}
return psplits;
}
/**
* This method is used to generate splits for each scan list.
* @param jobConf MapReduce Job Configuration
* @param qplan phoenix query plan
* @param splits phoenix table splits
* @param query phoenix query statement
* @param scans scan list slice of query plan
* @param splitByStats split by stat enabled
* @param tablePaths table paths
* @return List of Input Splits
* @throws IOException if function fails
*/
private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
final QueryPlan qplan,
final List<KeyRange> splits,
final String query,
final List<Scan> scans,
final boolean splitByStats,
final Path[] tablePaths) throws IOException {

final List<InputSplit> psplits = new ArrayList<>(scans.size());
try (org.apache.hadoop.hbase.client.Connection connection =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection creation can be shared and reuse when generating the inputsplit.

ConnectionFactory.createConnection(
PhoenixConnectionUtil.getConfiguration(jobConf))) {
RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Region locator also can be shared for each call.

qplan.getTableRef().getTable()
.getPhysicalName().toString()));
PhoenixInputSplit inputSplit;

HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow()
, false);
long regionSize = CompatUtil.getSize(regionLocator, connection.getAdmin(), location);
String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
HRegionLocation location = regionLocator.getRegionLocation(
scans.get(0).getStartRow(),
false);
long regionSize = CompatUtil.getSize(regionLocator,
connection.getAdmin(),
location);
String regionLocation =
PhoenixStorageHandlerUtil.getRegionLocation(location,
LOG);

if (splitByStats) {
for (Scan aScan : scans) {
if (LOG.isDebugEnabled()) {
LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan
.getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" +
aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan
.getBatch() + "] and regionLocation : " + regionLocation);
LOG.debug("Split for scan : "

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format the code properly.

+ aScan
+ "with scanAttribute : "
+ aScan.getAttributesMap()
+ " [scanCache, cacheBlock, scanBatch] : ["
+ aScan.getCaching()
+ ", "
+ aScan.getCacheBlocks()
+ ", "
+ aScan.getBatch()
+ "] and regionLocation : "
+ regionLocation);
}

inputSplit = new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
regionLocation, regionSize);
inputSplit =
new PhoenixInputSplit(
new ArrayList<>(Arrays.asList(aScan)),
tablePaths[0],
regionLocation,
regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
.get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
.size() - 1).getStopRow()));
LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
.get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " +
"[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
+ ", " + scans.get(0).getBatch() + "] and regionLocation : " +
regionLocation);
LOG.debug("Scan count["

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format properly.

+ scans.size()
+ "] : "
+ Bytes.toStringBinary(
scans.get(0).getStartRow())
+ " ~ "
+ Bytes.toStringBinary(
scans.get(scans.size() - 1).getStopRow()));
LOG.debug("First scan : "
+ scans.get(0)
+ "with scanAttribute : "
+ scans.get(0).getAttributesMap()
+ " [scanCache, cacheBlock, scanBatch] : "
+ "[" + scans.get(0).getCaching()
+ ", "
+ scans.get(0).getCacheBlocks()
+ ", "
+ scans.get(0).getBatch()
+ "] and regionLocation : "
+ regionLocation);

for (int i = 0, limit = scans.size(); i < limit; i++) {
LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
.toStringBinary(scans.get(i).getAttribute
(BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
LOG.debug(
"EXPECTED_UPPER_REGION_KEY["
+ i
+ "] : "
+ Bytes.toStringBinary(
scans.get(i).getAttribute(
BaseScannerRegionObserver
.EXPECTED_UPPER_REGION_KEY
)
));
}
}

inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation,
regionSize);
inputSplit =
new PhoenixInputSplit(scans,
tablePaths[0],
regionLocation,
regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
}
}

return psplits;
}

Expand Down
Loading