Skip to content

Commit

Permalink
Added /constructToGraphById and /constructToGraphFromNodegroup.
Browse files Browse the repository at this point in the history
Java client.   Junit.
  • Loading branch information
Paul Cuddihy committed Feb 6, 2023
1 parent 69213c5 commit 42a4d54
Show file tree
Hide file tree
Showing 14 changed files with 451 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ public abstract class SparqlEndpointInterface {
public final static String NEPTUNE_SERVER = "neptune";
public final static String BLAZEGRAPH_SERVER = "blazegraph";

public final static String JSON_KEY_TYPE = "type";
public final static String JSON_KEY_URL = "url";
public final static String JSON_KEY_GRAPH = "graph";
public final static String JSON_KEY_DEPRECATED_DATASET = "dataset";
public final static String JSON_KEY_USERNAME = "username"; // not supported
public final static String JSON_KEY_PASSWORD = "password"; // not supported

protected static final String TRIPLESTORE_DEFAULT_GRAPH_NAME = "urn:x-arq:DefaultGraph";

// results types to request
Expand Down Expand Up @@ -444,29 +451,29 @@ public static SparqlEndpointInterface getInstance(String serverTypeString, Strin
* @throws Exception
*/
public static SparqlEndpointInterface getInstance(JSONObject jObj) throws Exception {
for (String key : new String[] {"type", "url"}) {
for (String key : new String[] {JSON_KEY_TYPE, JSON_KEY_URL}) {
if (!jObj.containsKey(key)) {
throw new Exception("Invalid SparqlEndpointInterface JSON does not contain " + key + ": " + jObj.toJSONString());
}
}
if (jObj.containsKey("graph")) {
return SparqlEndpointInterface.getInstance((String)jObj.get("type"), (String)jObj.get("url"), (String)jObj.get("graph"));
} else if (jObj.containsKey("dataset")) {
return SparqlEndpointInterface.getInstance((String)jObj.get("type"), (String)jObj.get("url"), (String)jObj.get("dataset"));
if (jObj.containsKey(JSON_KEY_GRAPH)) {
return SparqlEndpointInterface.getInstance((String)jObj.get(JSON_KEY_TYPE), (String)jObj.get(JSON_KEY_URL), (String)jObj.get(JSON_KEY_GRAPH));
} else if (jObj.containsKey(JSON_KEY_DEPRECATED_DATASET)) {
return SparqlEndpointInterface.getInstance((String)jObj.get(JSON_KEY_TYPE), (String)jObj.get(JSON_KEY_URL), (String)jObj.get(JSON_KEY_DEPRECATED_DATASET));
} else {
throw new Exception("Invalid SparqlEndpointInterface JSON does not contain 'graph' or 'dataset': " + jObj.toJSONString());
}
}
/**
* Write json
* Write json but not username/password
* @return
*/
public JSONObject toJson() {
JSONObject ret = new JSONObject();

ret.put("type", this.getServerType());
ret.put("url", this.getServerAndPort());
ret.put("graph", this.getGraph());
ret.put(JSON_KEY_TYPE, this.getServerType());
ret.put(JSON_KEY_URL, this.getServerAndPort());
ret.put(JSON_KEY_GRAPH, this.getGraph());

return ret;
}
Expand Down Expand Up @@ -1653,7 +1660,7 @@ private Table getTable(JSONArray colNamesJsonArray, JSONArray rowsJsonArray) thr

} else {
valueValue = (String) jsonCell.get("value");
valueType = (String) jsonCell.get("type");
valueType = (String) jsonCell.get(JSON_KEY_TYPE);
if (valueType.endsWith("literal") && jsonCell.containsKey("datatype") ) {
valueDataType = (String) jsonCell.get("datatype");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
import com.ge.research.semtk.services.nodeGroupExecution.requests.DispatchByIdRequestBody;
import com.ge.research.semtk.services.nodeGroupExecution.requests.DispatchFromNodegroupRequestBody;
import com.ge.research.semtk.services.nodeGroupExecution.requests.DispatchRawSparqlRequestBody;
import com.ge.research.semtk.services.nodeGroupExecution.requests.DispatchConstructToGraphByIdRequestBody;
import com.ge.research.semtk.services.nodeGroupExecution.requests.DispatchConstructToGraphFromNgRequestBody;
import com.ge.research.semtk.services.nodeGroupExecution.requests.FilterDispatchByIdRequestBody;
import com.ge.research.semtk.services.nodeGroupExecution.requests.FilterDispatchFromNodeGroupRequestBody;
import com.ge.research.semtk.services.nodeGroupExecution.requests.IngestByIdCsvStrAsyncBody;
Expand All @@ -106,6 +108,7 @@
import com.ge.research.semtk.springutilib.requests.IdRequest;
import com.ge.research.semtk.springutilib.requests.IngestConstants;
import com.ge.research.semtk.springutilib.requests.IngestionFromStringsAndClassRequestBody;
import com.ge.research.semtk.springutilib.requests.SparqlConnectionRequest;
import com.ge.research.semtk.springutilib.requests.SparqlEndpointRequestBody;
import com.ge.research.semtk.springutilib.requests.SparqlEndpointTrackRequestBody;
import com.ge.research.semtk.springutilib.requests.SparqlEndpointsRequestBody;
Expand Down Expand Up @@ -524,7 +527,7 @@ public JSONObject dispatchAnyJobById(@RequestBody DispatchByIdRequestBody reques

NodeGroupExecutor ngExecutor = this.getExecutor(null );

SparqlConnection connection = requestBody.getSparqlConnection();
SparqlConnection connection = requestBody.buildSparqlConnection();
// create a json object from the external data constraints.

// check if this is actually for a filter query
Expand All @@ -535,9 +538,8 @@ public JSONObject dispatchAnyJobById(@RequestBody DispatchByIdRequestBody reques
}

// dispatch the job.
// ERROR thought I was calling version on line 342
ngExecutor.dispatchJob(qt, rt, connection, requestBody.getNodeGroupId(),
requestBody.getExternalDataConnectionConstraintsJson(),
requestBody.buildExternalDataConnectionConstraintsJson(),
requestBody.getFlags(),
requestBody.getRuntimeConstraints(),
requestBody.getLimitOverride(),
Expand Down Expand Up @@ -571,11 +573,11 @@ private JSONObject dispatchAnyJobFromNodegroup(@RequestBody DispatchFromNodegrou
NodeGroupExecutor ngExecutor = this.getExecutor(null );

// try to create a sparql connection
SparqlConnection connection = requestBody.getSparqlConnection();
SparqlConnection connection = requestBody.buildSparqlConnection();
// create a json object from the external data constraints.

// decode the endcodedNodeGroup
SparqlGraphJson sgJson = new SparqlGraphJson(requestBody.getJsonNodeGroup());
SparqlGraphJson sgJson = new SparqlGraphJson(requestBody.buildJsonNodeGroup());

// swap in the connection if requested
// "connection == null" is included for legacy. Not sure this is correct -Paul 6/2018
Expand All @@ -592,7 +594,7 @@ private JSONObject dispatchAnyJobFromNodegroup(@RequestBody DispatchFromNodegrou
NodeGroup ng = sgJson.getNodeGroup();
// dispatch the job.
ngExecutor.dispatchJob(qt, rt, connection, ng,
requestBody.getExternalDataConnectionConstraintsJson(),
requestBody.buildExternalDataConnectionConstraintsJson(),
requestBody.getFlags(),
requestBody.getRuntimeConstraints(),
-1,
Expand Down Expand Up @@ -1114,7 +1116,7 @@ public JSONObject dispatchRawSparql(@RequestBody DispatchRawSparqlRequestBody re
NodeGroupExecutor ngExecutor = this.getExecutor(null );

// try to create a sparql connection
SparqlConnection connection = requestBody.getSparqlConnection();
SparqlConnection connection = requestBody.buildSparqlConnection();

// dispatch the job.
ngExecutor.dispatchRawSparql(connection, requestBody.getSparql(), requestBody.getResultType());
Expand Down Expand Up @@ -1158,7 +1160,7 @@ public JSONObject dispatchRawSparqlUpdate(@RequestBody DispatchRawSparqlRequestB
// create a new StoredQueryExecutor
NodeGroupExecutor ngExecutor = this.getExecutor(null );
// try to create a sparql connection
SparqlConnection connection = requestBody.getSparqlConnection();
SparqlConnection connection = requestBody.buildSparqlConnection();

// dispatch the job.
ngExecutor.dispatchRawSparqlUpdate(connection, requestBody.getSparql());
Expand Down Expand Up @@ -1347,7 +1349,7 @@ public JSONObject ingestFromCsvStringsById(@RequestBody IngestByIdCsvStrRequestB
try{
NodeGroupExecutor nodeGroupExecutor = this.getExecutor(null);

retval = nodeGroupExecutor.ingestFromNodegroupIdAndCsvString(requestBody.getSparqlConnection(), requestBody.getNodegroupId(), requestBody.getCsvContent(), requestBody.getTrackFlag(), requestBody.getOverrideBaseURI());
retval = nodeGroupExecutor.ingestFromNodegroupIdAndCsvString(requestBody.buildSparqlConnection(), requestBody.getNodegroupId(), requestBody.getCsvContent(), requestBody.getTrackFlag(), requestBody.getOverrideBaseURI());
}catch(Exception e){
LoggerRestClient.easyLog(logger, SERVICE_NAME, ENDPOINT_NAME + " exception", "message", e.toString());
retval = new RecordProcessResults(false);
Expand Down Expand Up @@ -1382,7 +1384,7 @@ public JSONObject ingestFromCsvStringsByIdAsync(@RequestBody IngestByIdCsvStrAsy
NodeGroupExecutor nodeGroupExecutor = this.getExecutor(null);

String jobId = nodeGroupExecutor.ingestFromNodegroupIdAndCsvStringAsync(
requestBody.getSparqlConnection(),
requestBody.buildSparqlConnection(),
requestBody.getNodegroupId(),
requestBody.getCsvContent(),
requestBody.getSkipPrecheck(),
Expand Down Expand Up @@ -1637,7 +1639,7 @@ public JSONObject copyGraph(@RequestBody SparqlEndpointsRequestBody requestBody,

try {
String jobId = JobTracker.generateJobId();
JobTracker tracker = new JobTracker(servicesgraph_props.buildSei());
JobTracker tracker = this.getJobTracker();
tracker.createJob(jobId);

new Thread(() -> {
Expand Down Expand Up @@ -1694,6 +1696,100 @@ public JSONObject copyGraph(@RequestBody SparqlEndpointsRequestBody requestBody,
}
}


@Operation(
description="Execute query putting results into another graph. Async gives JobId."
)
@CrossOrigin
@RequestMapping(value={"/dispatchConstructToGraphById"}, method= RequestMethod.POST)
public JSONObject dispatchConstructToGraphById(@RequestBody DispatchConstructToGraphByIdRequestBody requestBody, @RequestHeader HttpHeaders headers) {
final String ENDPOINT_NAME = "dispatchConstructToGraphById";
HeadersManager.setHeaders(headers);

try {

String jobId = JobTracker.generateJobId();
JobTracker tracker = this.getJobTracker();
tracker.createJob(jobId);

new Thread(() -> {
try {
// dispatch
JSONObject simpleResJson = dispatchAnyJobById(requestBody, AutoGeneratedQueryTypes.CONSTRUCT, SparqlResultTypes.RDF);
waitThenStoreRdfToSei(jobId, (new SimpleResultSet(simpleResJson)).getJobId(), requestBody.buildResultsSei());

} catch (Exception e) {
try {
tracker.setJobFailure(jobId, e.getMessage());
} catch (Exception ee) {
LocalLogger.logToStdErr(ENDPOINT_NAME + " error accessing job tracker");
LocalLogger.printStackTrace(ee);
}
}

}).start();

SimpleResultSet res = new SimpleResultSet(true);
res.addJobId(jobId);
return res.toJson();

} catch (Exception e) {
LocalLogger.printStackTrace(e);
SimpleResultSet res = new SimpleResultSet(false, e.getMessage());
return res.toJson();
} finally {
HeadersManager.clearHeaders();
}

}


@Operation(
description="Execute query putting results into another graph. Async gives JobId."
)
@CrossOrigin
@RequestMapping(value={"/dispatchConstructToGraphFromNodegroup"}, method= RequestMethod.POST)
public JSONObject dispatchConstructToGraphFromNodegroup(@RequestBody DispatchConstructToGraphFromNgRequestBody requestBody, @RequestHeader HttpHeaders headers) {
final String ENDPOINT_NAME = "dispatchConstructToGraphFromNodegroup";
HeadersManager.setHeaders(headers);

try {

String jobId = JobTracker.generateJobId();
JobTracker tracker = this.getJobTracker();
tracker.createJob(jobId);

new Thread(() -> {
try {
// dispatch
JSONObject simpleResJson = dispatchAnyJobFromNodegroup(requestBody, AutoGeneratedQueryTypes.CONSTRUCT, SparqlResultTypes.RDF);
waitThenStoreRdfToSei(jobId, (new SimpleResultSet(simpleResJson)).getJobId(), requestBody.buildResultsSei());

} catch (Exception e) {
try {
tracker.setJobFailure(jobId, e.getMessage());
} catch (Exception ee) {
LocalLogger.logToStdErr(ENDPOINT_NAME + " error accessing job tracker");
LocalLogger.printStackTrace(ee);
}
}

}).start();

SimpleResultSet res = new SimpleResultSet(true);
res.addJobId(jobId);
return res.toJson();

} catch (Exception e) {
LocalLogger.printStackTrace(e);
SimpleResultSet res = new SimpleResultSet(false, e.getMessage());
return res.toJson();
} finally {
HeadersManager.clearHeaders();
}

}

@Operation(
summary= "Run a query of tracked events."
)
Expand Down Expand Up @@ -1875,6 +1971,27 @@ public JSONObject dispatchCombineEntitiesInConn(@RequestBody CombineEntitiesInCo
return err.toJson();
}
}

/**
* Wait for query to complete, get RDF results, upload to Sei
* @param jobId
* @param queryJobId
* @param resultsSei
* @throws Exception
*/
private void waitThenStoreRdfToSei(String jobId, String queryJobId, SparqlEndpointInterface resultsSei) throws Exception {
JobTracker tracker = this.getJobTracker();
// wait
tracker.waitTilCompleteUpdatingParent(queryJobId, jobId, "Running query to RDF", 10000, 10, 80);

// store results to
tracker.setJobPercentComplete(jobId, 81, "uploading RDF to graph");
JSONObject jObj = getResultsClient().execGetBlobResult(queryJobId);
String owl = (String) jObj.get("RDF");
this.uploadOwl(resultsSei, owl.getBytes());
tracker.setJobSuccess(jobId);

}
// get the runtime constraints, if any.
private JSONArray getRuntimeConstraintsAsJsonArray(String potentialConstraints) throws Exception{
JSONArray retval = null;
Expand Down Expand Up @@ -1916,6 +2033,23 @@ private void uploadFile(SparqlEndpointInterface sei, InputStream is, String file
oinfo_props.getClient().uncacheChangedConn(sei);
}


private void uploadOwl(SparqlEndpointInterface sei, byte[] owl) throws Exception {

if (sei instanceof NeptuneSparqlEndpointInterface) {
((NeptuneSparqlEndpointInterface)sei).setS3Config(
neptune_prop.getS3ClientRegion(),
neptune_prop.getS3BucketName(),
neptune_prop.getAwsIamRoleArn());
}

JSONObject simpleResultSetJson = sei.executeAuthUploadOwl(owl);
SimpleResultSet sResult = SimpleResultSet.fromJson(simpleResultSetJson);
sResult.throwExceptionIfUnsuccessful();

oinfo_props.getClient().uncacheChangedConn(sei);
}

private SparqlGraphJson getNodegroupById(String id) throws Exception {
NodeGroupStoreConfig ngcConf = new NodeGroupStoreConfig(ngstore_prop.getProtocol(), ngstore_prop.getServer(), ngstore_prop.getPort());
NodeGroupStoreRestClient nodegroupstoreclient = new NodeGroupStoreRestClient(ngcConf);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
** Copyright 2016 General Electric Company
**
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
*/

package com.ge.research.semtk.services.nodeGroupExecution.requests;

import org.json.simple.parser.JSONParser;
import org.json.simple.JSONObject;

import com.ge.research.semtk.sparqlX.SparqlEndpointInterface;

import io.swagger.v3.oas.annotations.media.Schema;

public class DispatchConstructToGraphByIdRequestBody extends DispatchByIdRequestBody {

@Schema(
description = "Send results as rdf this endpoint",
requiredMode = Schema.RequiredMode.REQUIRED,
example = "{\"type\":\"fuseki\",\"url\":\"http://localhost:3030/EXAMPLE\",\"graph\":\"http://example\"}")
private String resultsEndpoint;


public String getResultsEndpoint() {
return this.resultsEndpoint;
}
public SparqlEndpointInterface buildResultsSei() throws Exception {
return SparqlEndpointInterface.getInstance((JSONObject)(new JSONParser().parse(this.resultsEndpoint)));
}

public void setResultsEndpoint(String s) {
this.resultsEndpoint = s;
}

/**
* Validate request contents. Deprecated with @Schema
*/
public void validate() throws Exception{
super.validate();

}
}
Loading

0 comments on commit 42a4d54

Please sign in to comment.