Skip to content

Commit

Permalink
Add Logger and use it to report progress loading ingestion package (#19)
Browse files Browse the repository at this point in the history
Prepend server response with INFO/WARNING/ERROR.
Provide Java methods to parse level/message from a log line.
  • Loading branch information
weisenje authored and GitHub Enterprise committed Mar 17, 2023
1 parent c4a2bc7 commit eb248cf
Show file tree
Hide file tree
Showing 16 changed files with 204 additions and 98 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.ge.research.semtk.utility;

import java.io.PrintWriter;

public class Logger {

public enum Levels {INFO, WARNING, ERROR};
private PrintWriter printWriter;

/**
* Constructor
*/
public Logger(PrintWriter printWriter) {
this.printWriter = printWriter;
}

/**
* Log an info message
*/
public void info(String s) {
log(s, Levels.INFO);
}

/**
* Log a warning message
*/
public void warning(String s) {
log(s, Levels.WARNING);
}

/**
* Log an error message
*/
public void error(String s) {
log(s, Levels.ERROR);
}

/**
* Get the logging level from a logged line
* @param line (e.g. "INFO: performed an action")
* @return the level (e.g. Levels.INFO)
*/
public static Levels getLevel(String line) throws Exception {
String levelStr = line.substring(0, line.indexOf(":"));
if(levelStr.equals(Levels.INFO.name())) { return Levels.INFO; }
if(levelStr.equals(Levels.WARNING.name())) { return Levels.WARNING; }
if(levelStr.equals(Levels.ERROR.name())) { return Levels.ERROR; }
throw new Exception("Unrecognized level: " + levelStr);
}

/**
* Get the message from a logged line
* @param line (e.g. "INFO: performed an action")
* @return the message, e.g. "performed an action"
*/
public static String getMessage(String line) {
return line.substring(line.indexOf(": ") + 2);
}

// log the message
private void log(String s, Levels level) {
printWriter.println(level.name() + ": " + s);
printWriter.flush();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.ge.research.semtk.load.config;

import java.io.File;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.LinkedList;
Expand All @@ -29,6 +28,7 @@
import com.ge.research.semtk.sparqlX.SparqlEndpointInterface;
import com.ge.research.semtk.sparqlX.client.SparqlQueryClient;
import com.ge.research.semtk.utility.LocalLogger;
import com.ge.research.semtk.utility.Logger;
import com.ge.research.semtk.utility.Utility;

/**
Expand Down Expand Up @@ -124,9 +124,9 @@ public void addDatagraph(String datagraph) {
* @param serverTypeString triple store type
* @param clear if true, clears before loading
* @param ingestClient Ingestor rest client
* @param progressWriter writer for reporting progress
* @param logger logger for reporting progress (may be null)
*/
public void load(String modelGraph, LinkedList<String> dataGraphs, String server, String serverType, boolean clear, IngestorRestClient ingestClient, NodeGroupExecutionClient ngeClient, SparqlQueryClient queryClient, PrintWriter progressWriter) throws Exception {
public void load(String modelGraph, LinkedList<String> dataGraphs, String server, String serverType, boolean clear, IngestorRestClient ingestClient, NodeGroupExecutionClient ngeClient, SparqlQueryClient queryClient, Logger logger) throws Exception {
try {

// determine which model/data graphs to use
Expand Down Expand Up @@ -154,11 +154,11 @@ public void load(String modelGraph, LinkedList<String> dataGraphs, String server
// execute each step
for(IngestionStep step : this.getSteps()) {
if(step instanceof CsvByClassIngestionStep) {
((CsvByClassIngestionStep)step).run(conn, ingestClient, ngeClient, progressWriter);
((CsvByClassIngestionStep)step).run(conn, ingestClient, ngeClient, logger);
}else if(step instanceof CsvByNodegroupIngestionStep) {
((CsvByNodegroupIngestionStep)step).run(conn, ngeClient, progressWriter);
((CsvByNodegroupIngestionStep)step).run(conn, ngeClient, logger);
}else if(step instanceof OwlIngestionStep) {
((OwlIngestionStep)step).run(conn, progressWriter);
((OwlIngestionStep)step).run(conn, logger);
}else {
throw new Exception("Unrecognized ingestion step"); // should not get here
}
Expand Down Expand Up @@ -206,12 +206,16 @@ public CsvByClassIngestionStep(String clazz, String baseDir, String csvPath) {
public String getClazz() {
return clazz;
}
public void run(SparqlConnection conn, IngestorRestClient ingestClient, NodeGroupExecutionClient ngeClient, PrintWriter progressWriter) throws Exception {
writeProgress("Load CSV " + getDisplayableFilePath() + " as class " + clazz, progressWriter);
public void run(SparqlConnection conn, IngestorRestClient ingestClient, NodeGroupExecutionClient ngeClient, Logger logger) throws Exception {
if(logger != null) {
logger.info("Load CSV " + getDisplayableFilePath() + " as class " + clazz);
}
String jobId = ingestClient.execFromCsvUsingClassTemplate(clazz, null, Files.readString(getFile().toPath()), conn, false, null);
if(ingestClient.getWarnings() != null) {
for (String warning : ingestClient.getWarnings()) {
writeProgress("Load CSV warning: " + warning, progressWriter);
if(logger != null) {
logger.warning(warning);
}
}
}
ngeClient.waitForCompletion(jobId);
Expand All @@ -231,8 +235,10 @@ public CsvByNodegroupIngestionStep(String nodegroupId, String baseDir, String cs
public String getNodegroupId() {
return nodegroupId;
}
public void run(SparqlConnection conn, NodeGroupExecutionClient ngeClient, PrintWriter progressWriter) throws Exception {
writeProgress("Load CSV " + getDisplayableFilePath() + " using nodegroup \"" + nodegroupId + "\"", progressWriter);
public void run(SparqlConnection conn, NodeGroupExecutionClient ngeClient, Logger logger) throws Exception {
if(logger != null) {
logger.info("Load CSV " + getDisplayableFilePath() + " using nodegroup \"" + nodegroupId + "\"");
}
ngeClient.dispatchIngestFromCsvStringsByIdSync(this.nodegroupId, Files.readString(getFile().toPath()), conn);
}
}
Expand All @@ -242,12 +248,14 @@ public static class OwlIngestionStep extends FileIngestionStep{
public OwlIngestionStep(String baseDir, String owlPath) {
super(baseDir, owlPath);
}
public void run(SparqlConnection conn, PrintWriter progressWriter) throws Exception {
public void run(SparqlConnection conn, Logger logger) throws Exception {
if(conn.getDataInterfaceCount() != 1) {
throw new Exception("Error: cannot load OWL because 0 or multiple data interfaces are specified");
}
SparqlEndpointInterface sei = conn.getDataInterface(0);
writeProgress("Load OWL " + getDisplayableFilePath() + " to " + sei.getGraph(), progressWriter);
if(logger != null) {
logger.info("Load OWL " + getDisplayableFilePath() + " to " + sei.getGraph());
}
sei.uploadOwl(Files.readAllBytes(getFile().toPath())); // OK to use SEI (instead of client) because uploading data (not model)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package com.ge.research.semtk.load.config;

import java.io.File;
import java.io.PrintWriter;
import java.util.LinkedList;
import java.util.List;

import com.fasterxml.jackson.databind.JsonNode;
import com.ge.research.semtk.sparqlX.SparqlEndpointInterface;
import com.ge.research.semtk.sparqlX.client.SparqlQueryClient;
import com.ge.research.semtk.utility.LocalLogger;
import com.ge.research.semtk.utility.Logger;
import com.ge.research.semtk.utility.Utility;

/**
Expand Down Expand Up @@ -79,9 +79,9 @@ public void addFile(String file) {
* @param modelGraph a model graph (optional - overrides if present)
* @param server triple store location
* @param serverTypeString triple store type
* @param progressWriter writer for reporting progress
* @param logger logger for reporting progress (may be null)
*/
public void load(String modelGraph, String server, String serverType, SparqlQueryClient queryClient, PrintWriter progressWriter) throws Exception {
public void load(String modelGraph, String server, String serverType, SparqlQueryClient queryClient, Logger logger) throws Exception {
try {
// use modelGraph from method parameter if present. Else use from config YAML if present. Else use default.
modelGraph = (modelGraph != null) ? modelGraph : (this.getModelgraph() != null ? this.getModelgraph() : this.defaultModelGraph );
Expand All @@ -90,7 +90,9 @@ public void load(String modelGraph, String server, String serverType, SparqlQuer
// upload each OWL file to model graph
for(String fileStr : this.getFiles()) {
File file = new File(this.baseDir, fileStr);
writeProgress("Load OWL " + new File(this.baseDir).getName() + File.separator + file.getName() + " to " + modelGraph, progressWriter);
if(logger != null) {
logger.info("Load OWL " + new File(this.baseDir).getName() + File.separator + file.getName() + " to " + modelGraph);
}
SparqlEndpointInterface sei = SparqlEndpointInterface.getInstance(serverType, server, modelGraph, username, password);
queryClient.setSei(sei);
queryClient.uploadOwl(file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.ge.research.semtk.load.config;

import java.io.File;
import java.io.PrintWriter;
import java.util.LinkedList;

import org.apache.commons.math3.util.Pair;
Expand All @@ -29,6 +28,7 @@
import com.ge.research.semtk.sparqlX.SparqlConnection;
import com.ge.research.semtk.sparqlX.SparqlEndpointInterface;
import com.ge.research.semtk.sparqlX.client.SparqlQueryClient;
import com.ge.research.semtk.utility.Logger;
import com.ge.research.semtk.utility.Utility;

/**
Expand Down Expand Up @@ -212,17 +212,21 @@ public SparqlConnection getFootprintConnection(String server, String serverTypeS
* @param clear if true, clears the footprint graphs (before loading)
* @param topLevel true if this is a top-level manifest, false for recursively calling sub-manifests
* @param ingestClient ingestionRestClient
* @param progressWriter writer for reporting progress
* @param logger logger for reporting progress (may be null)
*/
public void load(String server, String serverTypeString, boolean clear, boolean topLevel, IngestorRestClient ingestClient, NodeGroupExecutionClient ngeClient, NodeGroupStoreRestClient ngStoreClient, SparqlQueryClient queryClient, PrintWriter progressWriter) throws Exception {
public void load(String server, String serverTypeString, boolean clear, boolean topLevel, IngestorRestClient ingestClient, NodeGroupExecutionClient ngeClient, NodeGroupStoreRestClient ngStoreClient, SparqlQueryClient queryClient, Logger logger) throws Exception {

writeProgress("Loading manifest for '" + getName() + "'...", progressWriter);
if(logger != null) {
logger.info("Loading manifest for '" + getName() + "'...");
}

// clear graphs first
if(clear) {
// clear each model and data graph in the footprint
for(String g : getGraphsFootprint()) {
writeProgress("Clear graph " + g, progressWriter);
if(logger != null) {
logger.info("Clear graph " + g);
}
queryClient.setSei(SparqlEndpointInterface.getInstance(serverTypeString, server, g, username, password));
queryClient.clearAll();
}
Expand All @@ -237,32 +241,36 @@ public void load(String server, String serverTypeString, boolean clear, boolean
// load via an owl ingestion YAML
File stepFile = new File(baseDir, (String)step.getValue());
LoadOwlConfig config = new LoadOwlConfig(stepFile, this.defaultModelGraph);
config.load(null, server, serverTypeString, queryClient, progressWriter);
config.load(null, server, serverTypeString, queryClient, logger);

}else if(type == StepType.DATA) {
// load content using CSV ingestion YAML
File stepFile = new File(baseDir, (String)step.getValue());
LoadDataConfig config = new LoadDataConfig(stepFile, this.defaultModelGraph, this.defaultDataGraph);
config.load(null, null, server, serverTypeString, false, ingestClient, ngeClient, queryClient, progressWriter);
config.load(null, null, server, serverTypeString, false, ingestClient, ngeClient, queryClient, logger);

}else if(type == StepType.NODEGROUPS) {
// load nodegroups/reports from a directory
writeProgress("Store nodegroups", progressWriter);
if(logger != null) {
logger.info("Store nodegroups");
}
File nodegroupsDirectory = new File(baseDir, (String)step.getValue());
File csvFile = new File(nodegroupsDirectory, "store_data.csv");
ngStoreClient.loadStoreDataCsv(csvFile.getAbsolutePath(), null, progressWriter);
ngStoreClient.loadStoreDataCsv(csvFile.getAbsolutePath(), null, logger);

}else if(type == StepType.MANIFEST) {
// load content using sub-manifest
File stepFile = new File(baseDir, (String)step.getValue());
ManifestConfig subManifest = new ManifestConfig(stepFile, defaultModelGraph, defaultDataGraph);
subManifest.load(server, serverTypeString, false, false, ingestClient, ngeClient, ngStoreClient, queryClient, progressWriter);
subManifest.load(server, serverTypeString, false, false, ingestClient, ngeClient, ngStoreClient, queryClient, logger);

}else if(type == StepType.COPYGRAPH) {
// perform the copy
String fromGraph = (String)((Pair)step.getValue()).getFirst();
String toGraph = (String)((Pair)step.getValue()).getSecond();
writeProgress("Copy " + fromGraph + " to " + toGraph, progressWriter);
if(logger != null) {
logger.info("Copy " + fromGraph + " to " + toGraph);
}
ngeClient.copyGraph(server, serverTypeString, fromGraph, server, serverTypeString, toGraph);

}else {
Expand All @@ -277,21 +285,29 @@ public void load(String server, String serverTypeString, boolean clear, boolean
String copyToGraph = this.getCopyToGraph();
if(copyToGraph != null) {
if(clear) {
writeProgress("Clear graph " + copyToGraph, progressWriter);
if(logger != null) {
logger.info("Clear graph " + copyToGraph);
}
queryClient.setSei(SparqlEndpointInterface.getInstance(serverTypeString, server, copyToGraph, username, password));
queryClient.clearAll();
}
for(String graph : this.getGraphsFootprint()) { // copy each model/data footprint graph to the given graph
writeProgress("Copy graph " + graph + " to " + copyToGraph, progressWriter);
if(logger != null) {
logger.info("Copy graph " + graph + " to " + copyToGraph);
}
String msg = ngeClient.copyGraph(server, serverTypeString, graph, server, serverTypeString, copyToGraph);
writeProgress(msg, progressWriter);
if(logger != null) {
logger.info(msg);
}
}
}

// entity resolution
String entityResolutionGraph = this.getEntityResolutionGraph();
if(entityResolutionGraph != null) {
writeProgress("Perform entity resolution in " + entityResolutionGraph, progressWriter);
if(logger != null) {
logger.info("Perform entity resolution in " + entityResolutionGraph);
}
try {
SparqlConnection conn = new SparqlConnection("Entity Resolution", serverTypeString, server, entityResolutionGraph);
ngeClient.combineEntitiesInConn(conn);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.ge.research.semtk.load.config;

import java.io.File;
import java.io.PrintWriter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand All @@ -19,6 +18,7 @@ public abstract class YamlConfig {
protected JsonNode configNode; // this file as a JsonNode
protected String username = "YamlConfigUser"; // no user or password functionality yet.
protected String password = "YamlConfigPassword";

/**
* Constructor
*/
Expand Down Expand Up @@ -83,12 +83,4 @@ protected static String getStringOrFirstArrayEntry(JsonNode node) throws Excepti
return null;
}

/**
* Convenience method to write/flush
*/
protected static void writeProgress(String s, PrintWriter progressWriter) {
progressWriter.println(s);
progressWriter.flush();
}

}
Loading

0 comments on commit eb248cf

Please sign in to comment.