From 42a4d546a8a1609586ed96acef27a90333556a89 Mon Sep 17 00:00:00 2001 From: Paul Cuddihy Date: Mon, 6 Feb 2023 14:57:19 -0500 Subject: [PATCH] Added /constructToGraphById and /constructToGraphFromNodegroup. Java client. Junit. --- .../sparqlX/SparqlEndpointInterface.java | 27 +-- .../NodeGroupExecutionRestController.java | 156 ++++++++++++++++-- ...spatchConstructToGraphByIdRequestBody.java | 54 ++++++ ...atchConstructToGraphFromNgRequestBody.java | 55 ++++++ .../DispatchFromNodegroupRequestBody.java | 6 +- .../DispatchRawSparqlRequestBody.java | 2 +- .../requests/DispatchRequestBody.java | 2 +- .../IngestByConnIdCsvStrRequestBody.java | 5 +- .../requests/SparqlConnRequestBody.java | 5 +- .../nodeGroupExecution/NodeGroupExecutor.java | 5 +- .../client/NodeGroupExecutionClient.java | 85 +++++++++- .../semtk/edc/client/ResultsClient.java | 17 ++ ...chronousNodeGroupBasedQueryDispatcher.java | 10 +- .../test/NodeGroupExecutionClientTest_IT.java | 57 ++++++- 14 files changed, 451 insertions(+), 35 deletions(-) create mode 100644 nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchConstructToGraphByIdRequestBody.java create mode 100644 nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchConstructToGraphFromNgRequestBody.java diff --git a/connectionUtils/src/main/java/com/ge/research/semtk/sparqlX/SparqlEndpointInterface.java b/connectionUtils/src/main/java/com/ge/research/semtk/sparqlX/SparqlEndpointInterface.java index 9f9834aed..5f97125e3 100644 --- a/connectionUtils/src/main/java/com/ge/research/semtk/sparqlX/SparqlEndpointInterface.java +++ b/connectionUtils/src/main/java/com/ge/research/semtk/sparqlX/SparqlEndpointInterface.java @@ -111,6 +111,13 @@ public abstract class SparqlEndpointInterface { public final static String NEPTUNE_SERVER = "neptune"; public final static String BLAZEGRAPH_SERVER = "blazegraph"; + public final static String JSON_KEY_TYPE = "type"; + public final static String JSON_KEY_URL = "url"; + public final static String JSON_KEY_GRAPH = "graph"; + public final static String JSON_KEY_DEPRECATED_DATASET = "dataset"; + public final static String JSON_KEY_USERNAME = "username"; // not supported + public final static String JSON_KEY_PASSWORD = "password"; // not supported + protected static final String TRIPLESTORE_DEFAULT_GRAPH_NAME = "urn:x-arq:DefaultGraph"; // results types to request @@ -444,29 +451,29 @@ public static SparqlEndpointInterface getInstance(String serverTypeString, Strin * @throws Exception */ public static SparqlEndpointInterface getInstance(JSONObject jObj) throws Exception { - for (String key : new String[] {"type", "url"}) { + for (String key : new String[] {JSON_KEY_TYPE, JSON_KEY_URL}) { if (!jObj.containsKey(key)) { throw new Exception("Invalid SparqlEndpointInterface JSON does not contain " + key + ": " + jObj.toJSONString()); } } - if (jObj.containsKey("graph")) { - return SparqlEndpointInterface.getInstance((String)jObj.get("type"), (String)jObj.get("url"), (String)jObj.get("graph")); - } else if (jObj.containsKey("dataset")) { - return SparqlEndpointInterface.getInstance((String)jObj.get("type"), (String)jObj.get("url"), (String)jObj.get("dataset")); + if (jObj.containsKey(JSON_KEY_GRAPH)) { + return SparqlEndpointInterface.getInstance((String)jObj.get(JSON_KEY_TYPE), (String)jObj.get(JSON_KEY_URL), (String)jObj.get(JSON_KEY_GRAPH)); + } else if (jObj.containsKey(JSON_KEY_DEPRECATED_DATASET)) { + return SparqlEndpointInterface.getInstance((String)jObj.get(JSON_KEY_TYPE), (String)jObj.get(JSON_KEY_URL), (String)jObj.get(JSON_KEY_DEPRECATED_DATASET)); } else { throw new Exception("Invalid SparqlEndpointInterface JSON does not contain 'graph' or 'dataset': " + jObj.toJSONString()); } } /** - * Write json + * Write json but not username/password * @return */ public JSONObject toJson() { JSONObject ret = new JSONObject(); - ret.put("type", this.getServerType()); - ret.put("url", this.getServerAndPort()); - ret.put("graph", this.getGraph()); + ret.put(JSON_KEY_TYPE, this.getServerType()); + ret.put(JSON_KEY_URL, this.getServerAndPort()); + ret.put(JSON_KEY_GRAPH, this.getGraph()); return ret; } @@ -1653,7 +1660,7 @@ private Table getTable(JSONArray colNamesJsonArray, JSONArray rowsJsonArray) thr } else { valueValue = (String) jsonCell.get("value"); - valueType = (String) jsonCell.get("type"); + valueType = (String) jsonCell.get(JSON_KEY_TYPE); if (valueType.endsWith("literal") && jsonCell.containsKey("datatype") ) { valueDataType = (String) jsonCell.get("datatype"); diff --git a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/NodeGroupExecutionRestController.java b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/NodeGroupExecutionRestController.java index e80376d34..d312d4b77 100644 --- a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/NodeGroupExecutionRestController.java +++ b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/NodeGroupExecutionRestController.java @@ -82,6 +82,8 @@ import com.ge.research.semtk.services.nodeGroupExecution.requests.DispatchByIdRequestBody; import com.ge.research.semtk.services.nodeGroupExecution.requests.DispatchFromNodegroupRequestBody; import com.ge.research.semtk.services.nodeGroupExecution.requests.DispatchRawSparqlRequestBody; +import com.ge.research.semtk.services.nodeGroupExecution.requests.DispatchConstructToGraphByIdRequestBody; +import com.ge.research.semtk.services.nodeGroupExecution.requests.DispatchConstructToGraphFromNgRequestBody; import com.ge.research.semtk.services.nodeGroupExecution.requests.FilterDispatchByIdRequestBody; import com.ge.research.semtk.services.nodeGroupExecution.requests.FilterDispatchFromNodeGroupRequestBody; import com.ge.research.semtk.services.nodeGroupExecution.requests.IngestByIdCsvStrAsyncBody; @@ -106,6 +108,7 @@ import com.ge.research.semtk.springutilib.requests.IdRequest; import com.ge.research.semtk.springutilib.requests.IngestConstants; import com.ge.research.semtk.springutilib.requests.IngestionFromStringsAndClassRequestBody; +import com.ge.research.semtk.springutilib.requests.SparqlConnectionRequest; import com.ge.research.semtk.springutilib.requests.SparqlEndpointRequestBody; import com.ge.research.semtk.springutilib.requests.SparqlEndpointTrackRequestBody; import com.ge.research.semtk.springutilib.requests.SparqlEndpointsRequestBody; @@ -524,7 +527,7 @@ public JSONObject dispatchAnyJobById(@RequestBody DispatchByIdRequestBody reques NodeGroupExecutor ngExecutor = this.getExecutor(null ); - SparqlConnection connection = requestBody.getSparqlConnection(); + SparqlConnection connection = requestBody.buildSparqlConnection(); // create a json object from the external data constraints. // check if this is actually for a filter query @@ -535,9 +538,8 @@ public JSONObject dispatchAnyJobById(@RequestBody DispatchByIdRequestBody reques } // dispatch the job. - // ERROR thought I was calling version on line 342 ngExecutor.dispatchJob(qt, rt, connection, requestBody.getNodeGroupId(), - requestBody.getExternalDataConnectionConstraintsJson(), + requestBody.buildExternalDataConnectionConstraintsJson(), requestBody.getFlags(), requestBody.getRuntimeConstraints(), requestBody.getLimitOverride(), @@ -571,11 +573,11 @@ private JSONObject dispatchAnyJobFromNodegroup(@RequestBody DispatchFromNodegrou NodeGroupExecutor ngExecutor = this.getExecutor(null ); // try to create a sparql connection - SparqlConnection connection = requestBody.getSparqlConnection(); + SparqlConnection connection = requestBody.buildSparqlConnection(); // create a json object from the external data constraints. // decode the endcodedNodeGroup - SparqlGraphJson sgJson = new SparqlGraphJson(requestBody.getJsonNodeGroup()); + SparqlGraphJson sgJson = new SparqlGraphJson(requestBody.buildJsonNodeGroup()); // swap in the connection if requested // "connection == null" is included for legacy. Not sure this is correct -Paul 6/2018 @@ -592,7 +594,7 @@ private JSONObject dispatchAnyJobFromNodegroup(@RequestBody DispatchFromNodegrou NodeGroup ng = sgJson.getNodeGroup(); // dispatch the job. ngExecutor.dispatchJob(qt, rt, connection, ng, - requestBody.getExternalDataConnectionConstraintsJson(), + requestBody.buildExternalDataConnectionConstraintsJson(), requestBody.getFlags(), requestBody.getRuntimeConstraints(), -1, @@ -1114,7 +1116,7 @@ public JSONObject dispatchRawSparql(@RequestBody DispatchRawSparqlRequestBody re NodeGroupExecutor ngExecutor = this.getExecutor(null ); // try to create a sparql connection - SparqlConnection connection = requestBody.getSparqlConnection(); + SparqlConnection connection = requestBody.buildSparqlConnection(); // dispatch the job. ngExecutor.dispatchRawSparql(connection, requestBody.getSparql(), requestBody.getResultType()); @@ -1158,7 +1160,7 @@ public JSONObject dispatchRawSparqlUpdate(@RequestBody DispatchRawSparqlRequestB // create a new StoredQueryExecutor NodeGroupExecutor ngExecutor = this.getExecutor(null ); // try to create a sparql connection - SparqlConnection connection = requestBody.getSparqlConnection(); + SparqlConnection connection = requestBody.buildSparqlConnection(); // dispatch the job. ngExecutor.dispatchRawSparqlUpdate(connection, requestBody.getSparql()); @@ -1347,7 +1349,7 @@ public JSONObject ingestFromCsvStringsById(@RequestBody IngestByIdCsvStrRequestB try{ NodeGroupExecutor nodeGroupExecutor = this.getExecutor(null); - retval = nodeGroupExecutor.ingestFromNodegroupIdAndCsvString(requestBody.getSparqlConnection(), requestBody.getNodegroupId(), requestBody.getCsvContent(), requestBody.getTrackFlag(), requestBody.getOverrideBaseURI()); + retval = nodeGroupExecutor.ingestFromNodegroupIdAndCsvString(requestBody.buildSparqlConnection(), requestBody.getNodegroupId(), requestBody.getCsvContent(), requestBody.getTrackFlag(), requestBody.getOverrideBaseURI()); }catch(Exception e){ LoggerRestClient.easyLog(logger, SERVICE_NAME, ENDPOINT_NAME + " exception", "message", e.toString()); retval = new RecordProcessResults(false); @@ -1382,7 +1384,7 @@ public JSONObject ingestFromCsvStringsByIdAsync(@RequestBody IngestByIdCsvStrAsy NodeGroupExecutor nodeGroupExecutor = this.getExecutor(null); String jobId = nodeGroupExecutor.ingestFromNodegroupIdAndCsvStringAsync( - requestBody.getSparqlConnection(), + requestBody.buildSparqlConnection(), requestBody.getNodegroupId(), requestBody.getCsvContent(), requestBody.getSkipPrecheck(), @@ -1637,7 +1639,7 @@ public JSONObject copyGraph(@RequestBody SparqlEndpointsRequestBody requestBody, try { String jobId = JobTracker.generateJobId(); - JobTracker tracker = new JobTracker(servicesgraph_props.buildSei()); + JobTracker tracker = this.getJobTracker(); tracker.createJob(jobId); new Thread(() -> { @@ -1694,6 +1696,100 @@ public JSONObject copyGraph(@RequestBody SparqlEndpointsRequestBody requestBody, } } + + @Operation( + description="Execute query putting results into another graph. Async gives JobId." + ) + @CrossOrigin + @RequestMapping(value={"/dispatchConstructToGraphById"}, method= RequestMethod.POST) + public JSONObject dispatchConstructToGraphById(@RequestBody DispatchConstructToGraphByIdRequestBody requestBody, @RequestHeader HttpHeaders headers) { + final String ENDPOINT_NAME = "dispatchConstructToGraphById"; + HeadersManager.setHeaders(headers); + + try { + + String jobId = JobTracker.generateJobId(); + JobTracker tracker = this.getJobTracker(); + tracker.createJob(jobId); + + new Thread(() -> { + try { + // dispatch + JSONObject simpleResJson = dispatchAnyJobById(requestBody, AutoGeneratedQueryTypes.CONSTRUCT, SparqlResultTypes.RDF); + waitThenStoreRdfToSei(jobId, (new SimpleResultSet(simpleResJson)).getJobId(), requestBody.buildResultsSei()); + + } catch (Exception e) { + try { + tracker.setJobFailure(jobId, e.getMessage()); + } catch (Exception ee) { + LocalLogger.logToStdErr(ENDPOINT_NAME + " error accessing job tracker"); + LocalLogger.printStackTrace(ee); + } + } + + }).start(); + + SimpleResultSet res = new SimpleResultSet(true); + res.addJobId(jobId); + return res.toJson(); + + } catch (Exception e) { + LocalLogger.printStackTrace(e); + SimpleResultSet res = new SimpleResultSet(false, e.getMessage()); + return res.toJson(); + } finally { + HeadersManager.clearHeaders(); + } + + } + + + @Operation( + description="Execute query putting results into another graph. Async gives JobId." + ) + @CrossOrigin + @RequestMapping(value={"/dispatchConstructToGraphFromNodegroup"}, method= RequestMethod.POST) + public JSONObject dispatchConstructToGraphFromNodegroup(@RequestBody DispatchConstructToGraphFromNgRequestBody requestBody, @RequestHeader HttpHeaders headers) { + final String ENDPOINT_NAME = "dispatchConstructToGraphFromNodegroup"; + HeadersManager.setHeaders(headers); + + try { + + String jobId = JobTracker.generateJobId(); + JobTracker tracker = this.getJobTracker(); + tracker.createJob(jobId); + + new Thread(() -> { + try { + // dispatch + JSONObject simpleResJson = dispatchAnyJobFromNodegroup(requestBody, AutoGeneratedQueryTypes.CONSTRUCT, SparqlResultTypes.RDF); + waitThenStoreRdfToSei(jobId, (new SimpleResultSet(simpleResJson)).getJobId(), requestBody.buildResultsSei()); + + } catch (Exception e) { + try { + tracker.setJobFailure(jobId, e.getMessage()); + } catch (Exception ee) { + LocalLogger.logToStdErr(ENDPOINT_NAME + " error accessing job tracker"); + LocalLogger.printStackTrace(ee); + } + } + + }).start(); + + SimpleResultSet res = new SimpleResultSet(true); + res.addJobId(jobId); + return res.toJson(); + + } catch (Exception e) { + LocalLogger.printStackTrace(e); + SimpleResultSet res = new SimpleResultSet(false, e.getMessage()); + return res.toJson(); + } finally { + HeadersManager.clearHeaders(); + } + + } + @Operation( summary= "Run a query of tracked events." ) @@ -1875,6 +1971,27 @@ public JSONObject dispatchCombineEntitiesInConn(@RequestBody CombineEntitiesInCo return err.toJson(); } } + + /** + * Wait for query to complete, get RDF results, upload to Sei + * @param jobId + * @param queryJobId + * @param resultsSei + * @throws Exception + */ + private void waitThenStoreRdfToSei(String jobId, String queryJobId, SparqlEndpointInterface resultsSei) throws Exception { + JobTracker tracker = this.getJobTracker(); + // wait + tracker.waitTilCompleteUpdatingParent(queryJobId, jobId, "Running query to RDF", 10000, 10, 80); + + // store results to + tracker.setJobPercentComplete(jobId, 81, "uploading RDF to graph"); + JSONObject jObj = getResultsClient().execGetBlobResult(queryJobId); + String owl = (String) jObj.get("RDF"); + this.uploadOwl(resultsSei, owl.getBytes()); + tracker.setJobSuccess(jobId); + + } // get the runtime constraints, if any. private JSONArray getRuntimeConstraintsAsJsonArray(String potentialConstraints) throws Exception{ JSONArray retval = null; @@ -1916,6 +2033,23 @@ private void uploadFile(SparqlEndpointInterface sei, InputStream is, String file oinfo_props.getClient().uncacheChangedConn(sei); } + + private void uploadOwl(SparqlEndpointInterface sei, byte[] owl) throws Exception { + + if (sei instanceof NeptuneSparqlEndpointInterface) { + ((NeptuneSparqlEndpointInterface)sei).setS3Config( + neptune_prop.getS3ClientRegion(), + neptune_prop.getS3BucketName(), + neptune_prop.getAwsIamRoleArn()); + } + + JSONObject simpleResultSetJson = sei.executeAuthUploadOwl(owl); + SimpleResultSet sResult = SimpleResultSet.fromJson(simpleResultSetJson); + sResult.throwExceptionIfUnsuccessful(); + + oinfo_props.getClient().uncacheChangedConn(sei); + } + private SparqlGraphJson getNodegroupById(String id) throws Exception { NodeGroupStoreConfig ngcConf = new NodeGroupStoreConfig(ngstore_prop.getProtocol(), ngstore_prop.getServer(), ngstore_prop.getPort()); NodeGroupStoreRestClient nodegroupstoreclient = new NodeGroupStoreRestClient(ngcConf); diff --git a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchConstructToGraphByIdRequestBody.java b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchConstructToGraphByIdRequestBody.java new file mode 100644 index 000000000..cea4190f6 --- /dev/null +++ b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchConstructToGraphByIdRequestBody.java @@ -0,0 +1,54 @@ +/** + ** Copyright 2016 General Electric Company + ** + ** + ** Licensed under the Apache License, Version 2.0 (the "License"); + ** you may not use this file except in compliance with the License. + ** You may obtain a copy of the License at + ** + ** http://www.apache.org/licenses/LICENSE-2.0 + ** + ** Unless required by applicable law or agreed to in writing, software + ** distributed under the License is distributed on an "AS IS" BASIS, + ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ** See the License for the specific language governing permissions and + ** limitations under the License. + */ + +package com.ge.research.semtk.services.nodeGroupExecution.requests; + +import org.json.simple.parser.JSONParser; +import org.json.simple.JSONObject; + +import com.ge.research.semtk.sparqlX.SparqlEndpointInterface; + +import io.swagger.v3.oas.annotations.media.Schema; + +public class DispatchConstructToGraphByIdRequestBody extends DispatchByIdRequestBody { + + @Schema( + description = "Send results as rdf this endpoint", + requiredMode = Schema.RequiredMode.REQUIRED, + example = "{\"type\":\"fuseki\",\"url\":\"http://localhost:3030/EXAMPLE\",\"graph\":\"http://example\"}") + private String resultsEndpoint; + + + public String getResultsEndpoint() { + return this.resultsEndpoint; + } + public SparqlEndpointInterface buildResultsSei() throws Exception { + return SparqlEndpointInterface.getInstance((JSONObject)(new JSONParser().parse(this.resultsEndpoint))); + } + + public void setResultsEndpoint(String s) { + this.resultsEndpoint = s; + } + + /** + * Validate request contents. Deprecated with @Schema + */ + public void validate() throws Exception{ + super.validate(); + + } +} diff --git a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchConstructToGraphFromNgRequestBody.java b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchConstructToGraphFromNgRequestBody.java new file mode 100644 index 000000000..70534339c --- /dev/null +++ b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchConstructToGraphFromNgRequestBody.java @@ -0,0 +1,55 @@ +/** + ** Copyright 2016 General Electric Company + ** + ** + ** Licensed under the Apache License, Version 2.0 (the "License"); + ** you may not use this file except in compliance with the License. + ** You may obtain a copy of the License at + ** + ** http://www.apache.org/licenses/LICENSE-2.0 + ** + ** Unless required by applicable law or agreed to in writing, software + ** distributed under the License is distributed on an "AS IS" BASIS, + ** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ** See the License for the specific language governing permissions and + ** limitations under the License. + */ + +package com.ge.research.semtk.services.nodeGroupExecution.requests; + +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; + +import com.ge.research.semtk.sparqlX.SparqlEndpointInterface; + +import io.swagger.v3.oas.annotations.media.Schema; + +public class DispatchConstructToGraphFromNgRequestBody extends DispatchFromNodegroupRequestBody { + + @Schema( + description = "Send results as rdf this endpoint", + requiredMode = Schema.RequiredMode.REQUIRED, + example = "{\"type\":\"fuseki\",\"url\":\"http://localhost:3030/EXAMPLE\",\"graph\":\"http://example\"}") + private String resultsEndpoint; + + + public String getResultsEndpoint() { + return this.resultsEndpoint; + } + public SparqlEndpointInterface buildResultsSei() throws Exception { + return SparqlEndpointInterface.getInstance((JSONObject)(new JSONParser().parse(this.resultsEndpoint))); + } + + public void setResultsEndpoint(String s) { + this.resultsEndpoint = s; + } + + /** + * Validate request contents. Deprecated with @Schema + */ + public void validate() throws Exception{ + super.validate(); + + } + +} diff --git a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchFromNodegroupRequestBody.java b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchFromNodegroupRequestBody.java index 13235bb93..83a38b11d 100644 --- a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchFromNodegroupRequestBody.java +++ b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchFromNodegroupRequestBody.java @@ -33,11 +33,15 @@ public class DispatchFromNodegroupRequestBody extends DispatchRequestBody { ) private String jsonRenderedNodeGroup; + public String getJsonRenderedNodeGroup() { + return jsonRenderedNodeGroup; + } + public void setJsonRenderedNodeGroup(String jsonRenderedNodeGroup) { this.jsonRenderedNodeGroup = jsonRenderedNodeGroup; } - public JSONObject getJsonNodeGroup(){ + public JSONObject buildJsonNodeGroup(){ JSONParser prsr = new JSONParser(); JSONObject retval = null; try { diff --git a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchRawSparqlRequestBody.java b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchRawSparqlRequestBody.java index 5464486fd..2a5e780f2 100644 --- a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchRawSparqlRequestBody.java +++ b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchRawSparqlRequestBody.java @@ -29,7 +29,7 @@ public class DispatchRawSparqlRequestBody { private String sparql; private SparqlResultTypes resultType = SparqlResultTypes.TABLE; - public SparqlConnection getSparqlConnection() throws Exception { + public SparqlConnection buildSparqlConnection() throws Exception { return new SparqlConnection(sparqlConnection); } public void setSparqlConnection(String sparqlConnection) { diff --git a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchRequestBody.java b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchRequestBody.java index f010f134d..a4304fc37 100644 --- a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchRequestBody.java +++ b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/DispatchRequestBody.java @@ -45,7 +45,7 @@ public class DispatchRequestBody extends SparqlConnRequestBody { private String runtimeConstraints; - public JSONObject getExternalDataConnectionConstraintsJson() throws Exception { + public JSONObject buildExternalDataConnectionConstraintsJson() throws Exception { if(this.externalDataConnectionConstraints == null || this.externalDataConnectionConstraints.trim().isEmpty()){ return null; } diff --git a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/IngestByConnIdCsvStrRequestBody.java b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/IngestByConnIdCsvStrRequestBody.java index 972ec21ea..c1396559a 100644 --- a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/IngestByConnIdCsvStrRequestBody.java +++ b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/IngestByConnIdCsvStrRequestBody.java @@ -23,7 +23,10 @@ public class IngestByConnIdCsvStrRequestBody extends IngestByIdCsvStrRequestBod private String sparqlConnection = ""; - public SparqlConnection getSparqlConnection() throws Exception { + public String getSparqlConnection() { + return sparqlConnection; + } + public SparqlConnection buildSparqlConnection() throws Exception { return new SparqlConnection(sparqlConnection); } public void setSparqlConnection(String sparqlConnection) { diff --git a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/SparqlConnRequestBody.java b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/SparqlConnRequestBody.java index 52144e236..2fcbfd74c 100644 --- a/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/SparqlConnRequestBody.java +++ b/nodeGroupExecutionService/src/main/java/com/ge/research/semtk/services/nodeGroupExecution/requests/SparqlConnRequestBody.java @@ -31,7 +31,10 @@ public class SparqlConnRequestBody { example = "NODEGROUP_DEFAULT") private String sparqlConnection; - public SparqlConnection getSparqlConnection() throws Exception { + public String getSparqlConnection() { + return this.sparqlConnection; + } + public SparqlConnection buildSparqlConnection() throws Exception { if (sparqlConnection.equals(NodeGroupExecutor.USE_NODEGROUP_CONN_STR_SHORT)) { return NodeGroupExecutor.get_USE_NODEGROUP_CONN(); } else { diff --git a/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/api/nodeGroupExecution/NodeGroupExecutor.java b/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/api/nodeGroupExecution/NodeGroupExecutor.java index 599d13c07..772347dc6 100644 --- a/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/api/nodeGroupExecution/NodeGroupExecutor.java +++ b/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/api/nodeGroupExecution/NodeGroupExecutor.java @@ -304,7 +304,7 @@ public void dispatchRawSparqlUpdate(SparqlConnection sc, String sparqlQuery) thr /** - * + * Dispatch by nodegroup * @param qt * @param rt * @param sc @@ -371,6 +371,7 @@ public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt, Sparql } /** + * Get nodegroup by ID then dispatchJob with the nodegroup * Version without queryFlags * @param qt * @param rt @@ -386,7 +387,7 @@ public void dispatchJob(AutoGeneratedQueryTypes qt, SparqlResultTypes rt,SparqlC } /** - * + * Get nodegroup by ID then dispatchJob with the nodegroup * @param qt * @param sc - if null, fill from the stored nodegroup * @param storedNodeGroupId diff --git a/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/api/nodeGroupExecution/client/NodeGroupExecutionClient.java b/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/api/nodeGroupExecution/client/NodeGroupExecutionClient.java index 5d353c3c2..31d466b5a 100644 --- a/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/api/nodeGroupExecution/client/NodeGroupExecutionClient.java +++ b/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/api/nodeGroupExecution/client/NodeGroupExecutionClient.java @@ -20,7 +20,6 @@ import java.net.ConnectException; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import org.json.simple.JSONArray; import org.json.simple.JSONObject; @@ -38,6 +37,7 @@ import com.ge.research.semtk.resultSet.TableResultSet; import com.ge.research.semtk.services.client.RestClientConfig; import com.ge.research.semtk.sparqlX.SparqlConnection; +import com.ge.research.semtk.sparqlX.SparqlEndpointInterface; import com.ge.research.semtk.sparqlX.SparqlResultTypes; import com.ge.research.semtk.sparqlX.dispatch.QueryFlags; import com.ge.research.semtk.utility.LocalLogger; @@ -77,6 +77,7 @@ public class NodeGroupExecutionClient extends SharedIngestNgeClient { private static final String JSON_KEY_PERCENT_COMPLETE = "percentComplete"; private static final String JSON_KEY_QUERY_TYPE = "queryType"; private static final String JSON_KEY_RESULT_TYPE = "resultType"; + private static final String JSON_KEY_RESULTS_ENDPOINT = "resultsEndpoint"; private static final String JSON_KEY_SPARQL_CONNECTION = "sparqlConnection"; private static final String JSON_KEY_SPARQL = "sparql"; private static final String JSON_KEY_RUNTIME_CONSTRAINTS = "runtimeConstraints"; @@ -111,6 +112,8 @@ public class NodeGroupExecutionClient extends SharedIngestNgeClient { private static final String dispatchSelectByIdSyncEndpoint = "/dispatchSelectByIdSync"; private static final String dispatchSelectFromNodegroupEndpoint = "/dispatchSelectFromNodegroup"; private static final String dispatchQueryFromNodegroupEndpoint = "/dispatchQueryFromNodegroup"; + private static final String dispatchConstructToGraphByIdEndpoint = "/dispatchConstructToGraphById"; + private static final String dispatchConstructToGraphFromNodegroupEndpoint = "/dispatchConstructToGraphFromNodegroup"; private static final String dispatchCountByIdEndpoint = "/dispatchCountById"; private static final String dispatchCountFromNodegroupEndpoint = "/dispatchCountFromNodegroup"; private static final String dispatchFilterByIdEndpoint = "/dispatchFilterById"; @@ -769,6 +772,58 @@ public SimpleResultSet execDispatchSelectById(String nodegroupID, SparqlConnecti return retval; } + @SuppressWarnings("unchecked") + public SimpleResultSet execDispatchConstructToGraphById(String nodegroupID, SparqlConnection overrideConn, SparqlEndpointInterface resultsSei, JSONObject edcConstraintsJson, RuntimeConstraintManager runtimeConstraints, int limitOverride, int offsetOverride, QueryFlags flags) throws Exception{ + SimpleResultSet retval = null; + + conf.setServiceEndpoint(mappingPrefix + dispatchConstructToGraphByIdEndpoint); + this.parametersJSON.put(JSON_KEY_NODEGROUP_ID, nodegroupID); + this.parametersJSON.put(JSON_KEY_NODEGROUP_ID_PREV, nodegroupID); + this.parametersJSON.put(JSON_KEY_RESULTS_ENDPOINT, resultsSei.toJson().toJSONString()); + this.parametersJSON.put(JSON_KEY_LIMIT_OVERRIDE, limitOverride); + this.parametersJSON.put(JSON_KEY_OFFSET_OVERRIDE, offsetOverride); + + this.parametersJSON.put(JSON_KEY_SPARQL_CONNECTION, overrideConn.toJson().toJSONString()); + this.parametersJSON.put(JSON_KEY_EDC_CONSTRAINTS, edcConstraintsJson == null ? null : edcConstraintsJson.toJSONString()); + this.parametersJSON.put(JSON_KEY_RUNTIME_CONSTRAINTS, runtimeConstraints == null ? null : runtimeConstraints.toJSONString()); + this.parametersJSON.put(JSON_KEY_FLAGS, flags == null ? null : flags.toJSONString()); + + + try{ + LocalLogger.logToStdErr("sending executeDispatchSelectById request"); + retval = SimpleResultSet.fromJson((JSONObject) this.execute() ); + retval.throwExceptionIfUnsuccessful(String.format("Error running SELECT on nodegroup id='%s'", nodegroupID)); + } + finally{ + this.reset(); + } + LocalLogger.logToStdErr("executeDispatchSelectById request finished without exception"); + return retval; + } + + @SuppressWarnings("unchecked") + public SimpleResultSet execDispatchConstructToGraphFromNodeGroup(NodeGroup ng, SparqlConnection conn, SparqlEndpointInterface resultsSei, JSONObject edcConstraintsJson, RuntimeConstraintManager runtimeConstraints, QueryFlags flags) throws Exception{ + + SimpleResultSet retval = null; + + conf.setServiceEndpoint(mappingPrefix + dispatchConstructToGraphFromNodegroupEndpoint); + this.parametersJSON.put(JSON_KEY_NODEGROUP, ng.toJson().toJSONString()); + this.parametersJSON.put(JSON_KEY_SPARQL_CONNECTION, conn.toJson().toJSONString()); + this.parametersJSON.put(JSON_KEY_RESULTS_ENDPOINT, resultsSei.toJson().toJSONString()); + this.parametersJSON.put(JSON_KEY_EDC_CONSTRAINTS, edcConstraintsJson == null ? null : edcConstraintsJson.toJSONString()); + this.parametersJSON.put(JSON_KEY_RUNTIME_CONSTRAINTS, runtimeConstraints == null ? null : runtimeConstraints.toJSONString()); + this.parametersJSON.put(JSON_KEY_FLAGS, flags == null ? null : flags.toJSONString()); + + try{ + retval = SimpleResultSet.fromJson((JSONObject) this.execute() ); + retval.throwExceptionIfUnsuccessful("Error at " + mappingPrefix + dispatchSelectFromNodegroupEndpoint); + } + finally{ + this.reset(); + } + + return retval; + } /** * Run synchronous Select. Warning HTTP protocols could break the connection before it completes. * @param nodegroupID @@ -1239,8 +1294,9 @@ public SimpleResultSet execDispatchSelectFromNodeGroup(NodeGroup ng, SparqlConne * @param flags * @return SimpleResultSet with "JobId" field * @throws Exception - */ - public SimpleResultSet execDispatchSelectFromNodeGroup(NodeGroup ng, SparqlConnection conn, JSONObject edcConstraintsJson, RuntimeConstraintManager runtimeConstraints, QueryFlags flags) throws Exception{ + */ + @SuppressWarnings("unchecked") + public SimpleResultSet execDispatchSelectFromNodeGroup(NodeGroup ng, SparqlConnection conn, JSONObject edcConstraintsJson, RuntimeConstraintManager runtimeConstraints, QueryFlags flags) throws Exception{ SimpleResultSet retval = null; @@ -1262,6 +1318,8 @@ public SimpleResultSet execDispatchSelectFromNodeGroup(NodeGroup ng, SparqlConne return retval; } + + /** * launch a construct query from a a nodegroup * @param ng @@ -1355,6 +1413,7 @@ public Table dispatchSelectFromNodeGroup(NodeGroup ng, SparqlConnection conn, JS return this.waitForJobAndGetTable(ret.getResult("JobId")); } + /** * Run a select query given a nodegroup and wait for results Table * @param sgjson @@ -1371,6 +1430,26 @@ public Table dispatchSelectFromNodeGroup(SparqlGraphJson sgjson) throws Exceptio return this.dispatchSelectFromNodeGroup(sgjson.getNodeGroup(), sgjson.getSparqlConn(), null, null, null); } + public void dispatchConstructToGraphFromNodeGroup(NodeGroup ng, SparqlConnection conn, SparqlEndpointInterface resultsSei) throws Exception { + this.dispatchConstructToGraphFromNodeGroup(ng, conn, resultsSei, null, null, null); + } + + public void dispatchConstructToGraphFromNodeGroup(NodeGroup ng, SparqlConnection conn, SparqlEndpointInterface resultsSei, JSONObject edcConstraintsJson, RuntimeConstraintManager runtimeConstraints, QueryFlags flags) throws Exception{ + + SimpleResultSet res = this.execDispatchConstructToGraphFromNodeGroup(ng, conn, resultsSei, edcConstraintsJson, runtimeConstraints, flags); + this.waitForSuccessfulCompletion(res.getJobId()); + } + + public void dispatchConstructToGraphById(String nodegroupID, SparqlConnection overrideConn, SparqlEndpointInterface resultsSei) throws Exception { + this.dispatchConstructToGraphById(nodegroupID, overrideConn, resultsSei, null, null, 0, 0, null); + } + + public void dispatchConstructToGraphById(String nodegroupID, SparqlConnection overrideConn, SparqlEndpointInterface resultsSei, JSONObject edcConstraintsJson, RuntimeConstraintManager runtimeConstraints, int limitOverride, int offsetOverride, QueryFlags flags) throws Exception { + + SimpleResultSet res = this.execDispatchConstructToGraphById(nodegroupID, overrideConn, resultsSei, edcConstraintsJson, runtimeConstraints, limitOverride, offsetOverride, flags); + this.waitForSuccessfulCompletion(res.getJobId()); + } + /** * Run a select query given a nodegroup * @param sgjson diff --git a/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/edc/client/ResultsClient.java b/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/edc/client/ResultsClient.java index 8c48dcf49..ea331c576 100644 --- a/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/edc/client/ResultsClient.java +++ b/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/edc/client/ResultsClient.java @@ -19,6 +19,7 @@ package com.ge.research.semtk.edc.client; import java.io.File; +import java.io.InputStream; import java.net.ConnectException; import java.net.URL; import java.util.ArrayList; @@ -264,6 +265,22 @@ public JSONObject execGetBlobResult(String jobId) throws ConnectException, Endpo this.cleanUp(); } } + + public InputStream execStreamJsonBobResult(String jobId) throws ConnectException, EndpointNotFoundException, Exception { + this.parametersJSON.clear(); + + this.conf.setServiceEndpoint("results/getJsonBlobResults"); + this.conf.setMethod(RestClientConfig.Methods.POST); + this.parametersJSON.put("jobId", jobId); + this.parametersJSON.put("appendDownloadHeaders", false); + + try { + return super.executeToStream(); // return raw response (not parseable into JSON) + + } finally { + this.cleanUp(); + } + } // table support diff --git a/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/sparqlX/asynchronousQuery/AsynchronousNodeGroupBasedQueryDispatcher.java b/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/sparqlX/asynchronousQuery/AsynchronousNodeGroupBasedQueryDispatcher.java index 7954dd3d5..2a291f43d 100644 --- a/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/sparqlX/asynchronousQuery/AsynchronousNodeGroupBasedQueryDispatcher.java +++ b/sparqlGraphLibrary/src/main/java/com/ge/research/semtk/sparqlX/asynchronousQuery/AsynchronousNodeGroupBasedQueryDispatcher.java @@ -234,7 +234,8 @@ public void executePlainSparqlQuery(String sparqlQuery, SparqlResultTypes resTyp if( resType == SparqlResultTypes.GRAPH_JSONLD || resType == SparqlResultTypes.CONFIRM || - resType == SparqlResultTypes.TABLE) { + resType == SparqlResultTypes.TABLE || + resType == SparqlResultTypes.RDF) { genResult = this.querySei.executeQueryAndBuildResultSet(sparqlQuery, resType); } else { @@ -255,8 +256,11 @@ public void executePlainSparqlQuery(String sparqlQuery, SparqlResultTypes resTyp if (resType == SparqlResultTypes.GRAPH_JSONLD) { LocalLogger.logToStdErr("Query returned JSON-LD"); this.sendResultsToService(genResult.getResultsJSON()); - } - else if(resType == SparqlResultTypes.TABLE) { + } else if(resType == SparqlResultTypes.RDF) { + // RDF results store as json blobs with { "RDF": "..." } + this.sendResultsToService(genResult.getResultsJSON()); + + } else if(resType == SparqlResultTypes.TABLE) { // all other types Table tab = ((TableResultSet)genResult).getTable(); LocalLogger.logToStdErr("Query returned " + tab.getNumRows() + " results."); diff --git a/sparqlGraphLibrary/src/test/java/com/ge/research/semtk/api/nodeGroupExecution/client/test/NodeGroupExecutionClientTest_IT.java b/sparqlGraphLibrary/src/test/java/com/ge/research/semtk/api/nodeGroupExecution/client/test/NodeGroupExecutionClientTest_IT.java index 44929102e..64c8769c2 100755 --- a/sparqlGraphLibrary/src/test/java/com/ge/research/semtk/api/nodeGroupExecution/client/test/NodeGroupExecutionClientTest_IT.java +++ b/sparqlGraphLibrary/src/test/java/com/ge/research/semtk/api/nodeGroupExecution/client/test/NodeGroupExecutionClientTest_IT.java @@ -38,6 +38,7 @@ import com.ge.research.semtk.resultSet.SimpleResultSet; import com.ge.research.semtk.resultSet.Table; import com.ge.research.semtk.resultSet.TableResultSet; +import com.ge.research.semtk.sparqlX.SparqlEndpointInterface; import com.ge.research.semtk.sparqlX.SparqlResultTypes; import com.ge.research.semtk.sparqlX.dispatch.QueryFlags; import com.ge.research.semtk.test.IntegrationTestUtility; @@ -62,6 +63,8 @@ public class NodeGroupExecutionClientTest_IT { private final static String BATTERY = "http://kdl.ge.com/batterydemo#Battery"; private final static String CELL = "http://kdl.ge.com/batterydemo#Cell"; + + private static SparqlEndpointInterface resultSei; @BeforeClass public static void setup() throws Exception { @@ -69,7 +72,7 @@ public static void setup() throws Exception { // instantiate a client nodeGroupExecutionClient = new NodeGroupExecutionClient(new NodeGroupExecutionClientConfig(IntegrationTestUtility.get("protocol"), IntegrationTestUtility.get("nodegroupexecution.server"), IntegrationTestUtility.getInt("nodegroupexecution.port"))); nodeGroupStoreClient = IntegrationTestUtility.getNodeGroupStoreRestClient(); // instantiate client, with configurations from properties file - + resultSei = TestGraph.getSei(TestGraph.generateGraphName("result")); IntegrationTestUtility.cleanupNodegroupStore(nodeGroupStoreClient, CREATOR); } @@ -531,5 +534,57 @@ public void testUTFRoundTrip() throws Exception { } } + @Test + public void testConstructToGraphById() throws Exception { + resultSei.clearGraph(); + TestGraph.clearGraph(); + TestGraph.uploadOwlResource(this, "/sampleBattery.owl"); + + // get ingestion and result nodegroup + SparqlGraphJson sgjson = TestGraph.getSparqlGraphJsonFromFile("src/test/resources/sampleBattery.json"); + + // ingest data + String csvStr = Utility.readFile("src/test/resources/sampleBattery.csv"); + nodeGroupExecutionClient.execIngestionFromCsvStr(sgjson, csvStr); + + // store a nodegroup (modified with the test graph) + JSONObject ngJson = sgjson.getJson(); + try { + nodeGroupStoreClient.deleteStoredNodeGroupIfExists(ID); + } catch (Exception e) { + } + nodeGroupStoreClient.executeStoreNodeGroup(ID, "testConstructToGraphFromNodegroup", CREATOR, ngJson); + + nodeGroupExecutionClient.dispatchConstructToGraphById( + ID, + TestGraph.getSparqlAuthConn(), + resultSei); + + String query = sgjson.getNodeGroup(TestGraph.getOInfo()).generateSparqlSelect(); + Table t = resultSei.executeQueryToTable(query); + assertEquals("Wrong number of rows returned from constructed graph", 4, t.getNumRows()); + } + + @Test + public void testConstructToGraphFromNodegroup() throws Exception { + resultSei.clearGraph(); + TestGraph.clearGraph(); + TestGraph.uploadOwlResource(this, "/sampleBattery.owl"); + + // get ingestion and result nodegroup + SparqlGraphJson sgjson = TestGraph.getSparqlGraphJsonFromFile("src/test/resources/sampleBattery.json"); + + String csvStr = Utility.readFile("src/test/resources/sampleBattery.csv"); + nodeGroupExecutionClient.execIngestionFromCsvStr(sgjson, csvStr); + + nodeGroupExecutionClient.dispatchConstructToGraphFromNodeGroup( + sgjson.getNodeGroup(), + TestGraph.getSparqlAuthConn(), + resultSei); + + String query = sgjson.getNodeGroup(TestGraph.getOInfo()).generateSparqlSelect(); + Table t = resultSei.executeQueryToTable(query); + assertEquals("Wrong number of rows returned from constructed graph", 4, t.getNumRows()); + } }