Skip to content

Commit

Permalink
Added pruneToColumn optional param to nodeGroupExecution/dispatchById
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Cuddihy committed Jan 9, 2023
1 parent ce8860a commit 28cc019
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,15 @@ public JSONObject dispatchAnyJobById(@RequestBody DispatchByIdRequestBody reques
}

// dispatch the job.
// ERROR thought I was calling version on line 342
ngExecutor.dispatchJob(qt, rt, connection, requestBody.getNodeGroupId(),
requestBody.getExternalDataConnectionConstraintsJson(),
requestBody.getFlags(),
requestBody.getRuntimeConstraints(),
requestBody.getLimitOverride(),
requestBody.getOffsetOverride(),
targetId);
targetId,
requestBody.getPruneToColumn());

retval.setSuccess(true);
retval.addResult(SimpleResultSet.JOB_ID_RESULT_KEY, ngExecutor.getJobID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public class DispatchByIdRequestBody extends DispatchRequestBody {
example = "-1")
private int offsetOverride = -1;

// This is custom optional field only available through straight REST
// for Saqib @ collins
@Schema(
description = "Prune select table return down to one column and uniquify.",
required = false,
example = "favorite_column")
private String pruneToColumn = null;

public int getLimitOverride() {
return limitOverride;
}
Expand All @@ -57,6 +65,13 @@ public String getNodeGroupId(){
return this.nodeGroupId;
}

public void setPruneToColumn(String val) {
this.pruneToColumn = val;
}
public String getPruneToColumn() {
return this.pruneToColumn;
}

/**
* Validate request contents. Throws an exception if validation fails.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ public JSONObject queryFromNodeGroup(@RequestBody QueryRequestBody requestBody,
try {
dsp = getDispatcher(props, jobId, (NodegroupRequestBody) requestBody, useAuth, true);
dsp.getJobTracker().incrementPercentComplete(dsp.getJobId(), 1, 10);
dsp.addPruneToColumn(requestBody.getPruneToColumn());

WorkThread thread = new WorkThread(dsp, requestBody.getExternalConstraints(), requestBody.getFlags(), qt, rt);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ public class QueryRequestBody extends NodegroupRequestBody {

@Schema(required = false, example = "[\"UNOPTIONALIZE_CONSTRAINED\"]")
private String flags; // a string parseable to a JSONArray


// This is custom optional field only available through straight REST
// for Saqib @ collins
@Schema(
description = "Prune select table return down to one column and uniquify.",
required = false,
example = "favorite_column")
private String pruneToColumn = null;

public void setConstraintSet(String constraintSet){
this.constraintSet = constraintSet;
}
Expand All @@ -59,4 +67,11 @@ public QueryFlags getFlags() throws Exception{
return new QueryFlags(Utility.getJsonArrayFromString(this.flags));
}

public void setPruneToColumn(String val) {
this.pruneToColumn = val;
}
public String getPruneToColumn() {
return this.pruneToColumn;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -302,22 +302,57 @@ public void dispatchRawSparqlUpdate(SparqlConnection sc, String sparqlQuery) thr
this.setJobID(simpleRes.getResult("requestID"));
}

/**
* Version without queryFlags or pruneToColumn
* @param qt
* @param rt
* @param sc
* @param ng
* @param externalConstraints
* @param runtimeConstraints
* @param targetObjectSparqlID
* @throws Exception
*/
public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt,SparqlConnection sc, NodeGroup ng, JSONObject externalConstraints, JSONArray runtimeConstraints, String targetObjectSparqlID) throws Exception{
this.dispatchJob(qt, rt, sc, ng, externalConstraints, null, runtimeConstraints, -1, -1, targetObjectSparqlID);
this.dispatchJob(qt, rt, sc, ng, externalConstraints, null, runtimeConstraints, -1, -1, targetObjectSparqlID, null);
}

/**
* Version without pruneToColumn
* @param qt
* @param rt
* @param sc
* @param ng
* @param externalConstraints
* @param flags
* @param runtimeConstraints
* @param limitOverride
* @param offsetOverride
* @param targetObjectSparqlID
* @param pruneToColumn
* @throws Exception
*/
public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt, SparqlConnection sc, NodeGroup ng, JSONObject externalConstraints, QueryFlags flags, JSONArray runtimeConstraints, int limitOverride, int offsetOverride, String targetObjectSparqlID) throws Exception{
this.dispatchJob(qt, rt, sc, ng, externalConstraints, flags, runtimeConstraints, -1, -1, targetObjectSparqlID, null);
}

/**
*
* @param qt
* @param rt
* @param sc
* @param ng
* @param externalConstraints
* @param flags
* @param runtimeConstraints
* @param limitOverride
* @param offsetOverride
* @param targetObjectSparqlID
* @param pruneToColumn - unusual special : prune a SELECT table result to only this column, uniquified
* @throws Exception
*/
@SuppressWarnings("unchecked")
public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt, SparqlConnection sc, NodeGroup ng, JSONObject externalConstraints, QueryFlags flags, JSONArray runtimeConstraints, int limitOverride, int offsetOverride, String targetObjectSparqlID) throws Exception{
public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt, SparqlConnection sc, NodeGroup ng, JSONObject externalConstraints, QueryFlags flags, JSONArray runtimeConstraints, int limitOverride, int offsetOverride, String targetObjectSparqlID, String pruneToColumn) throws Exception{
// externalConstraints as used by executeQueryFromNodeGroup

LocalLogger.logToStdOut("Sending a " + qt + " query to the dispatcher...");
Expand Down Expand Up @@ -361,16 +396,45 @@ public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt, Sparql
SparqlGraphJson sgjson = new SparqlGraphJson(ng, sc);
SimpleResultSet simpleRes = null;

simpleRes = this.dispatchClient.executeQuery(sgjson, qt, rt, externalConstraints, flags, targetObjectSparqlID);
simpleRes = this.dispatchClient.executeQuery(sgjson, qt, rt, externalConstraints, flags, targetObjectSparqlID, pruneToColumn);
simpleRes.throwExceptionIfUnsuccessful();

// set up the Job ID
this.setJobID(simpleRes.getJobId());

}

/**
* Version without queryFlags or pruneToColumn
* @param qt
* @param rt
* @param sc
* @param storedNodeGroupId
* @param externalConstraints
* @param runtimeConstraints
* @param targetObjectSparqlID
* @throws Exception
*/
public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt,SparqlConnection sc, String storedNodeGroupId, JSONObject externalConstraints, JSONArray runtimeConstraints, String targetObjectSparqlID) throws Exception{
this.dispatchJob(qt, rt, sc, storedNodeGroupId, externalConstraints, null, runtimeConstraints, -1, -1, targetObjectSparqlID);
this.dispatchJob(qt, rt, sc, storedNodeGroupId, externalConstraints, null, runtimeConstraints, -1, -1, targetObjectSparqlID, null);
}

/**
* Version without pruneToColumn
* @param qt
* @param rt
* @param sc
* @param storedNodeGroupId
* @param externalConstraints
* @param flags
* @param runtimeConstraints
* @param limitOverride
* @param offsetOverride
* @param targetObjectSparqlID
* @throws Exception
*/
public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt, SparqlConnection sc, String storedNodeGroupId, JSONObject externalConstraints, QueryFlags flags, JSONArray runtimeConstraints, int limitOverride, int offsetOverride, String targetObjectSparqlID) throws Exception{
this.dispatchJob(qt, rt, sc, storedNodeGroupId, externalConstraints, flags, runtimeConstraints, -1, -1, targetObjectSparqlID, null);
}
/**
*
Expand All @@ -382,7 +446,7 @@ public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt,SparqlC
* @param targetObjectSparqlID
* @throws Exception
*/
public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt, SparqlConnection sc, String storedNodeGroupId, JSONObject externalConstraints, QueryFlags flags, JSONArray runtimeConstraints, int limitOverride, int offsetOverride, String targetObjectSparqlID) throws Exception{
public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt, SparqlConnection sc, String storedNodeGroupId, JSONObject externalConstraints, QueryFlags flags, JSONArray runtimeConstraints, int limitOverride, int offsetOverride, String targetObjectSparqlID, String pruneToColumn) throws Exception{

// get the node group from the remote store
TableResultSet trs = this.storeClient.executeGetNodeGroupById(storedNodeGroupId);
Expand Down Expand Up @@ -427,7 +491,7 @@ public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt, Sparql
}

// dispatch the job itself
this.dispatchJob(qt, rt, conn, ng, externalConstraints, flags, runtimeConstraints, limitOverride, offsetOverride, targetObjectSparqlID);
this.dispatchJob(qt, rt, conn, ng, externalConstraints, flags, runtimeConstraints, limitOverride, offsetOverride, targetObjectSparqlID, pruneToColumn);
}

public URL[] dispatchJobSynchronous(AutoGeneratedQueryTypes qt, SparqlResultTypes rt, SparqlConnection sc, String storedNodeGroupId, JSONObject externalConstraints, JSONArray runtimeConstraints, String targetObjectSparqlID) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class NodeGroupExecutionClient extends SharedIngestNgeClient {
private static final String JSON_KEY_RUNTIME_CONSTRAINTS = "runtimeConstraints";
private static final String JSON_KEY_EDC_CONSTRAINTS = "externalDataConnectionConstraints";
private static final String JSON_KEY_FLAGS = "flags";
private static final String JSON_KEY_PRUNE_TO_COLUMN = "pruneToColumn";

// service mapping
private static final String mappingPrefix = "/nodeGroupExecution";
Expand Down Expand Up @@ -450,6 +451,8 @@ public String dispatchSelectByIdToJobId(String nodegroupID, SparqlConnection ove
SimpleResultSet ret = this.execDispatchSelectById(nodegroupID, overrideConn, edcConstraintsJson, runtimeConstraintsJson, limitOverride, offsetOverride, flags);
return ret.getResult("JobId");
}



/**
* Run a construct query by nodegroupID
Expand Down Expand Up @@ -479,6 +482,7 @@ public String dispatchConstructByIdToJobId(String nodegroupID, SparqlConnection
public Table execDispatchSelectByIdToTable(String nodegroupID, SparqlConnection overrideConn, JSONObject edcConstraintsJson, JSONArray runtimeConstraintsJson) throws Exception {
return this.execDispatchSelectByIdToTable(nodegroupID, overrideConn, edcConstraintsJson, runtimeConstraintsJson, -1, -1, null);
}


/**
* Run select query by nodegroupID
Expand All @@ -505,6 +509,32 @@ public Table execDispatchSelectByIdToTable(String nodegroupID, SparqlConnection
}
}

/**
*
* @param nodegroupID
* @param overrideConn
* @param edcConstraintsJson
* @param flagsJson
* @param runtimeConstraintsJson
* @param limitOverride
* @param offsetOverride
* @param pruneToColumn
* @return
* @throws Exception
*/
public Table execDispatchByIdToTable(String nodegroupID, SparqlConnection overrideConn, JSONObject edcConstraintsJson, JSONArray flagsJson, JSONArray runtimeConstraintsJson, int limitOverride, int offsetOverride, String pruneToColumn) throws Exception{
// dispatch the job
String jobId = this.dispatchByIdToJobId(nodegroupID, overrideConn, edcConstraintsJson, flagsJson, runtimeConstraintsJson, limitOverride, offsetOverride, pruneToColumn);

try {
return this.waitForJobAndGetTable(jobId);
} catch (Exception e) {
// Add nodegroupID and "SELECT" to the error message
throw new Exception(String.format("Error executing SELECT on nodegroup id='%s': %s", nodegroupID, e.getMessage()));
}
}


/**
* Run a query to find all instance values for a target object given a Select query
* @param nodegroupID
Expand Down Expand Up @@ -1608,14 +1638,16 @@ public Table getRuntimeConstraintsByNodeGroupID(String nodegroupID) throws Excep
* @param runtimeConstraintsJson
* @param limitOverride
* @param offsetOverride
* @param pruneToColumn
* @return String jobId
* @throws Exception
*/
public String dispatchByIdWithToJobId(String nodegroupID, SparqlConnection overrideConn, JSONObject edcConstraintsJson, JSONArray flagsJson, JSONArray runtimeConstraintsJson, int limitOverride, int offsetOverride) throws Exception{
SimpleResultSet ret = this.execDispatchById(nodegroupID, overrideConn, edcConstraintsJson, flagsJson, runtimeConstraintsJson, limitOverride, offsetOverride);
public String dispatchByIdToJobId(String nodegroupID, SparqlConnection overrideConn, JSONObject edcConstraintsJson, JSONArray flagsJson, JSONArray runtimeConstraintsJson, int limitOverride, int offsetOverride, String pruneToColumn) throws Exception{
SimpleResultSet ret = this.execDispatchById(nodegroupID, overrideConn, edcConstraintsJson, flagsJson, runtimeConstraintsJson, limitOverride, offsetOverride, pruneToColumn);
return ret.getResult("JobId");
}


/**
* Launch a select query given a nodegroupID
* @param nodegroupID
Expand All @@ -1631,7 +1663,7 @@ public String dispatchByIdToJobId(String nodegroupID, SparqlConnection overrideC
}

/**
* Raw call to launch a select query
* Raw call to launch a select query: version without query flags or pruneToColumn
* @param nodegroupID
* @param overrideConn
* @param edcConstraintsJson
Expand All @@ -1640,7 +1672,21 @@ public String dispatchByIdToJobId(String nodegroupID, SparqlConnection overrideC
* @throws Exception
*/
public SimpleResultSet execDispatchById(String nodegroupID, SparqlConnection overrideConn, JSONObject edcConstraintsJson, JSONArray runtimeConstraintsJson) throws Exception{
return this.execDispatchById(nodegroupID, overrideConn, edcConstraintsJson, null, runtimeConstraintsJson, -1, -1);
return this.execDispatchById(nodegroupID, overrideConn, edcConstraintsJson, null, runtimeConstraintsJson, -1, -1, null);
}

/**
* Raw call to launch a select query: version without pruneToColumn
* @param nodegroupID
* @param overrideConn
* @param edcConstraintsJson
* @param flagsJson
* @param runtimeConstraintsJson
* @return
* @throws Exception
*/
public SimpleResultSet execDispatchById(String nodegroupID, SparqlConnection overrideConn, JSONObject edcConstraintsJson, JSONArray flagsJson, JSONArray runtimeConstraintsJson, int limitOverride, int offsetOverride) throws Exception{
return this.execDispatchById(nodegroupID, overrideConn, edcConstraintsJson, flagsJson, runtimeConstraintsJson, limitOverride, offsetOverride, null);
}

/**
Expand All @@ -1652,11 +1698,12 @@ public SimpleResultSet execDispatchById(String nodegroupID, SparqlConnection ove
* @param runtimeConstraintsJson
* @param limitOverride
* @param offsetOverride
* @param pruneToColumn
* @return SimpleResultSet containing "JobId"
* @throws Exception
*/
@SuppressWarnings("unchecked")
public SimpleResultSet execDispatchById(String nodegroupID, SparqlConnection overrideConn, JSONObject edcConstraintsJson, JSONArray flagsJson, JSONArray runtimeConstraintsJson, int limitOverride, int offsetOverride) throws Exception{
public SimpleResultSet execDispatchById(String nodegroupID, SparqlConnection overrideConn, JSONObject edcConstraintsJson, JSONArray flagsJson, JSONArray runtimeConstraintsJson, int limitOverride, int offsetOverride, String pruneToColumn) throws Exception{
SimpleResultSet retval = null;

conf.setServiceEndpoint(mappingPrefix + dispatchByIdEndpoint);
Expand All @@ -1669,6 +1716,8 @@ public SimpleResultSet execDispatchById(String nodegroupID, SparqlConnection ove
this.parametersJSON.put(JSON_KEY_EDC_CONSTRAINTS, edcConstraintsJson == null ? null : edcConstraintsJson.toJSONString());
this.parametersJSON.put(JSON_KEY_FLAGS, flagsJson == null ? null : flagsJson.toJSONString());
this.parametersJSON.put(JSON_KEY_RUNTIME_CONSTRAINTS, runtimeConstraintsJson == null ? null : runtimeConstraintsJson.toJSONString());
if (pruneToColumn != null)
this.parametersJSON.put(JSON_KEY_PRUNE_TO_COLUMN, pruneToColumn);

try{
LocalLogger.logToStdErr("sending executeDispatchById request");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* Base class for dispatchers.
*/
public abstract class AsynchronousNodeGroupBasedQueryDispatcher {
private String pruneToColumn;

protected NodeGroup queryNodeGroup;
protected OntologyInfoClient oInfoClient;
Expand Down Expand Up @@ -128,6 +129,25 @@ protected void sendResultsToService(Table resTable) throws ConnectException, End
}
resTable.replaceColumnNames(modColnames);

// special rare feature prune To uniquified column
if (this.pruneToColumn != null) {
if (resTable.getColumnIndex(this.pruneToColumn) == -1) {
throw new Exception ("PruneToColumn can not find column named: " + this.pruneToColumn);
}
for (String c : resTable.getColumnNames()) {
if (!c.equals(this.pruneToColumn)) {
resTable.removeColumn(c);
}
}
resTable.uniquify(new String [] {this.pruneToColumn});
}
} catch(Exception e){
this.jobTracker.setJobFailure(this.jobID, "Failure preparing results table: " + e.getMessage());
LocalLogger.printStackTrace(e);
throw new Exception(e);
}

try {
(new ResultsClient(this.resConfig)).execStoreTableResults(this.jobID, resTable);
}
catch(Exception e){
Expand Down Expand Up @@ -315,4 +335,9 @@ public JobTracker getJobTracker() {
return this.jobTracker;
}

public void addPruneToColumn(String pruneToColumn) {
this.pruneToColumn = pruneToColumn;

}

}
Loading

0 comments on commit 28cc019

Please sign in to comment.