diff --git a/java/core/src/main/java/co/worklytics/psoxy/gateway/StorageEventResponse.java b/java/core/src/main/java/co/worklytics/psoxy/gateway/StorageEventResponse.java index e2f5f445f9..d3bdbbaf1a 100644 --- a/java/core/src/main/java/co/worklytics/psoxy/gateway/StorageEventResponse.java +++ b/java/core/src/main/java/co/worklytics/psoxy/gateway/StorageEventResponse.java @@ -27,4 +27,10 @@ public class StorageEventResponse { */ @NonNull String destinationObjectPath; + + /** + * how many records errored during processing + */ + @Builder.Default + Integer errorCount = 0; } diff --git a/java/core/src/main/java/co/worklytics/psoxy/storage/BulkDataSanitizer.java b/java/core/src/main/java/co/worklytics/psoxy/storage/BulkDataSanitizer.java index 6c2d5e86b5..e9f59cba8a 100644 --- a/java/core/src/main/java/co/worklytics/psoxy/storage/BulkDataSanitizer.java +++ b/java/core/src/main/java/co/worklytics/psoxy/storage/BulkDataSanitizer.java @@ -2,8 +2,8 @@ import co.worklytics.psoxy.Pseudonymizer; +import java.io.BufferedReader; import java.io.IOException; -import java.io.Reader; import java.io.Writer; /** @@ -20,9 +20,10 @@ public interface BulkDataSanitizer { * implicit in records * @param writer The stream writer to which sanitized content should be written. * @param pseudonymizer The pseudonymizer to use - * @throws IOException IO problem reading or writing + * @return records in data which could not be processed due to errors + * @throws IOException IO problem reading or writing */ - void sanitize(Reader reader, - Writer writer, - Pseudonymizer pseudonymizer) throws IOException; + int sanitize(BufferedReader reader, + Writer writer, + Pseudonymizer pseudonymizer) throws IOException; } diff --git a/java/core/src/main/java/co/worklytics/psoxy/storage/StorageHandler.java b/java/core/src/main/java/co/worklytics/psoxy/storage/StorageHandler.java index 3494c99036..db9076b64b 100644 --- a/java/core/src/main/java/co/worklytics/psoxy/storage/StorageHandler.java +++ b/java/core/src/main/java/co/worklytics/psoxy/storage/StorageHandler.java @@ -82,6 +82,7 @@ public enum BulkMetaData { //SHA-1 of rules RULES_SHA, + ERROR_COUNT, ; // aws prepends `x-amz-meta-` to this; but per documentation, that's not visible via the @@ -106,11 +107,12 @@ public StorageEventResponse handle(StorageEventRequest request, this.validate(request, transform, inputStreamSupplier); - this.process(request, transform, inputStreamSupplier, outputStreamSupplier); + int errorCount = this.process(request, transform, inputStreamSupplier, outputStreamSupplier); StorageEventResponse response = StorageEventResponse.builder() .destinationBucketName(request.getDestinationBucketName()) .destinationObjectPath(request.getDestinationObjectPath()) + .errorCount(errorCount) .build(); log.info("Writing to: " + response.getDestinationBucketName() + "/" + response.getDestinationObjectPath()); @@ -371,7 +373,7 @@ public int getBufferSize() { * @param outputStreamSupplier */ @SneakyThrows - void process(StorageEventRequest request, + int process(StorageEventRequest request, StorageHandler.ObjectTransform transform, Supplier inputStreamSupplier, Supplier outputStreamSupplier) { @@ -379,7 +381,7 @@ void process(StorageEventRequest request, try ( InputStream inputStream = readInputStream(request, bufferSize, inputStreamSupplier); - Reader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8), bufferSize); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8), bufferSize); OutputStream outputStream = writeOutputStream(request, bufferSize, outputStreamSupplier); OutputStreamWriter writer = new OutputStreamWriter(outputStream) ) { @@ -393,7 +395,7 @@ void process(StorageEventRequest request, BulkDataSanitizer fileHandler = bulkDataSanitizerFactory.get(applicableRules.get()); - fileHandler.sanitize(reader, writer, pseudonymizer); + return fileHandler.sanitize(reader, writer, pseudonymizer); } } diff --git a/java/core/src/main/java/co/worklytics/psoxy/storage/impl/ColumnarBulkDataSanitizerImpl.java b/java/core/src/main/java/co/worklytics/psoxy/storage/impl/ColumnarBulkDataSanitizerImpl.java index c679b49cce..b471b6be5b 100644 --- a/java/core/src/main/java/co/worklytics/psoxy/storage/impl/ColumnarBulkDataSanitizerImpl.java +++ b/java/core/src/main/java/co/worklytics/psoxy/storage/impl/ColumnarBulkDataSanitizerImpl.java @@ -32,14 +32,13 @@ import org.apache.commons.lang3.tuple.Pair; import javax.inject.Inject; -import java.io.IOException; -import java.io.Reader; -import java.io.Writer; +import java.io.*; import java.util.*; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; +import java.util.logging.Level; import java.util.regex.Matcher; import java.util.stream.Collectors; @@ -74,10 +73,11 @@ public ColumnarBulkDataSanitizerImpl(@Assisted ColumnarRules rules) { this.rules = rules; } + @Override - public void sanitize(@NonNull Reader reader, - @NonNull Writer writer, - @NonNull Pseudonymizer pseudonymizer) throws IOException { + public int sanitize(@NonNull BufferedReader reader, + @NonNull Writer writer, + @NonNull Pseudonymizer pseudonymizer) throws IOException { CSVFormat inputCSVFormat = CSVFormat.Builder.create(CSVFormat.DEFAULT) .setDelimiter(rules.getDelimiter()) @@ -87,8 +87,9 @@ public void sanitize(@NonNull Reader reader, .setTrim(true) .build(); + ParsedFirstLine parsedFirstLine = parseFirstLine(reader); - CSVParser records = inputCSVFormat.parse(reader); + CSVParser records = inputCSVFormat.parse(new StringReader(parsedFirstLine.contents)); Preconditions.checkArgument(records.getHeaderMap() != null, "Failed to parse header from file"); @@ -275,44 +276,90 @@ public void sanitize(@NonNull Reader reader, List columnNamesForOutputFile = columnTransforms.keySet() .stream() .sorted(originalHeadersOrRenamedFirst.thenComparing(byColumnName)) - .collect(Collectors.toList()) + .toList() .stream() // leave original casing for headers .map( h -> headers.stream().filter(h::equalsIgnoreCase).findFirst().orElse(h)) .collect(Collectors.toList()); - CSVFormat csvFormat = CSVFormat.Builder.create() + CSVFormat inputCsvFormat = CSVFormat.DEFAULT.builder() + .setHeader(records.getHeaderNames().toArray(new String[0])) + .build(); + + + CSVFormat outputFormat = CSVFormat.Builder.create() .setHeader(columnNamesForOutputFile.toArray(new String[0])) - .setRecordSeparator(records.getFirstEndOfLine()) + .setRecordSeparator(parsedFirstLine.getEndOfLine()) // use the same EOL as in the input file .setNullString("") .build(); - // create an empty record to fill with the transformed values, ensuring all rows have - // the same columns - // linked hash map: need predictable iteration order and null values - LinkedHashMap newRecord = new LinkedHashMap<>(); - columnNamesForOutputFile.forEach(h -> newRecord.put(h, null)); + int errorCount = 0; + try (CSVPrinter printer = new CSVPrinter(writer, outputFormat)) { + ProcessingBuffer buffer = getRecordsProcessingBuffer(printer); + errorCount = processRecords(columnNamesForOutputFile, inputCsvFormat, columnTransforms, reader, buffer); - try (CSVPrinter printer = new CSVPrinter(writer, csvFormat)) { + if (buffer.flush()) { + log.info(String.format("Processed records: %d", buffer.getProcessed())); + } + } + return errorCount; + } - ProcessingBuffer buffer = getRecordsProcessingBuffer(printer); + /** + * processes records into outputBuffer, applying the transformations to each + * @param columnNamesForOutput to render in the output buffer; so columns which DON'T have values in particular record will be filled + * @param columnTransformsToApply to apply to each column in the record + * @param bodyRecords to process; should be a CSVParser with headers + * @param outputBuffer to write the processed records to + * @return number of records which could not be processed due to errors + */ + int processRecords( + List columnNamesForOutput, + CSVFormat inputCsvFormat, + Map>>>> columnTransformsToApply, + BufferedReader bodyRecords, + ProcessingBuffer outputBuffer) throws IOException { + + int errorCount = 0; + + // collect transforms that are MISSING mappings in the records + Set transformsWithoutMappings = new HashSet<>(); + + // we're going to read line-by-line, so we need to ensure that the CSVFormat is set to not skip the header record + CSVFormat perLineFormat = CSVFormat.DEFAULT.builder() + .setDelimiter(inputCsvFormat.getDelimiter()) + .setSkipHeaderRecord(false).build(); + + Map headerMap = new HashMap<>(inputCsvFormat.getHeader().length); + for (int i = 0; i < inputCsvFormat.getHeader().length; i++) { + headerMap.put(inputCsvFormat.getHeader()[i].toLowerCase(), i); + } - Set transformsWithoutMappings = new HashSet<>(); + // linked hash map: need predictable iteration order and null values + LinkedHashMap newRecord = new LinkedHashMap<>(); + // create an empty record to fill with the transformed values, ensuring all rows have + // the same columns + columnNamesForOutput.forEach(h -> newRecord.put(h, null)); - for (CSVRecord record : records) { - // clean up the record prior to use + String line; + while ((line = bodyRecords.readLine()) != null) { + try (CSVParser parser = CSVParser.parse(line, perLineFormat)) { + CSVRecord record = parser.getRecords().get(0); newRecord.replaceAll((k, v) -> null); newRecord.keySet().forEach(h -> { - - Pair>>> transforms = columnTransforms.getOrDefault(h, null); + Pair>>> transforms = columnTransformsToApply.getOrDefault(h, null); if (transforms == null) { newRecord.put(h, null); } else { // apply all transformations in insertion order // key holds the original column - if (record.isMapped(transforms.getKey())) { - String v = record.get(transforms.getKey()); + + //TODO: build this into columnTransformsToApply, so don't need to call lowerCase on every loop + String lcKey = transforms.getKey().toLowerCase(); + + if (headerMap.containsKey(lcKey)) { + String v = StringUtils.trim(record.get(headerMap.get(lcKey))); if (StringUtils.isNotBlank(v)) { for (Function> transform : transforms.getValue()) { v = transform.apply(v).orElse(null); @@ -328,19 +375,20 @@ public void sanitize(@NonNull Reader reader, } } } - }); - if (buffer.addAndAttemptFlush(ProcessedRecord.of(Lists.newArrayList(newRecord.values())))) { - log.info(String.format("Processed records: %d", buffer.getProcessed())); - }; - } - if (buffer.flush()) { - log.info(String.format("Processed records: %d", buffer.getProcessed())); + if (outputBuffer.addAndAttemptFlush(ProcessedRecord.of(Lists.newArrayList(newRecord.values())))) { + log.info(String.format("Processed records: %d", outputBuffer.getProcessed())); + } + } catch (IndexOutOfBoundsException | UncheckedIOException | IOException e) { + log.log(Level.WARNING, "Failed to process record", e); + errorCount++; } } + return errorCount; } + @VisibleForTesting Set determineMissingColumnsToPseudonymize(Set columnsToPseudonymize, Set outputColumnsCI) { Function, Set> asLowercase = (Set set) -> set.stream().map(String::toLowerCase).collect(Collectors.toSet()); @@ -375,7 +423,7 @@ private ProcessingBuffer getRecordsProcessingBuffer(final CSVPr @Value @RequiredArgsConstructor(staticName = "of") - private static class ProcessedRecord { + static class ProcessedRecord { Collection values; } @@ -436,4 +484,65 @@ TriFunction buildPseudonymizationFunction }; } + + + @Value + static class ParsedFirstLine { + String contents; + /** + * one of the following: + * - "\n" (LF) + * - "\r\n" (CRLF) + * - "\r" (CR) + * + */ + String endOfLine; + } + + /** + * helper to parse the first line from a BufferedReader + * @param reader to parse + * @return ParsedFirstLine with the contents of the first line and the end of line character(s) used + */ + ParsedFirstLine parseFirstLine(BufferedReader reader) { + String line = null, endOfLine = null; + try { + StringBuilder sb = new StringBuilder(); + int ch; + boolean foundEOL = false; + while ((ch = reader.read()) != -1) { + char c = (char) ch; + sb.append(c); + if (c == '\r') { + reader.mark(1); + int next = reader.read(); + if (next == '\n') { + sb.append('\n'); + endOfLine = "\r\n"; + } else { + if (next != -1) { + reader.reset(); + } + endOfLine = "\r"; + } + foundEOL = true; + break; + } else if (c == '\n') { + endOfLine = "\n"; + foundEOL = true; + break; + } + } + if (sb.isEmpty() && !foundEOL) { + throw new IllegalArgumentException("Empty file, no header found"); + } + line = sb.toString().replaceAll("(\r\n|\r|\n)$", ""); + if (endOfLine == null) { + endOfLine = "\n"; // default + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return new ParsedFirstLine(line, endOfLine); + } } diff --git a/java/core/src/main/java/co/worklytics/psoxy/storage/impl/RecordBulkDataSanitizerImpl.java b/java/core/src/main/java/co/worklytics/psoxy/storage/impl/RecordBulkDataSanitizerImpl.java index 58f77bd44c..f2240f705d 100644 --- a/java/core/src/main/java/co/worklytics/psoxy/storage/impl/RecordBulkDataSanitizerImpl.java +++ b/java/core/src/main/java/co/worklytics/psoxy/storage/impl/RecordBulkDataSanitizerImpl.java @@ -58,9 +58,9 @@ public RecordBulkDataSanitizerImpl(@Assisted RecordRules rules) { @Override - public void sanitize(@NonNull Reader reader, - @NonNull Writer writer, - @NonNull Pseudonymizer pseudonymizer) throws IOException { + public int sanitize(BufferedReader reader, + @NonNull Writer writer, + @NonNull Pseudonymizer pseudonymizer) throws IOException { List> compiledTransforms = rules.getTransforms().stream() @@ -71,20 +71,24 @@ public void sanitize(@NonNull Reader reader, )) .collect(Collectors.toList()); + if (rules.getFormat() == RecordRules.Format.NDJSON) { - sanitizeNdjson(reader, writer, compiledTransforms); + return sanitizeNdjson(reader, writer, compiledTransforms); } else if (rules.getFormat() == RecordRules.Format.CSV) { - sanitizeCsv(reader, writer, compiledTransforms); + return sanitizeCsv(reader, writer, compiledTransforms); } else { throw new IllegalArgumentException("Unsupported format: " + rules.getFormat()); } } @VisibleForTesting - void sanitizeCsv(@NonNull Reader reader, + int sanitizeCsv(@NonNull Reader reader, @NonNull Writer writer, @NonNull List> compiledTransforms) throws IOException { + //q: why doesn't this just use ColumnarBulkDataSanitizerImpl? + + int errorCount = 0; try (CSVParser records = CSVFormat.DEFAULT .withFirstRecordAsHeader() .withIgnoreHeaderCase() @@ -113,31 +117,42 @@ void sanitizeCsv(@NonNull Reader reader, printer.println(); } catch (UnmatchedPseudonymization e) { log.warning("Skipped record due to UnmatchedPseudonymization: " + e.getPath()); + errorCount++; } } } + return errorCount; } @VisibleForTesting - void sanitizeNdjson(@NonNull Reader reader, + int sanitizeNdjson(@NonNull Reader reader, @NonNull Writer writer, @NonNull List> compiledTransforms) throws IOException { + int errorCount = 0; try (BufferedReader bufferedReader = new BufferedReader(reader)) { String line; while ((line = StringUtils.trimToNull(bufferedReader.readLine())) != null) { - Object document = jsonConfiguration.jsonProvider().parse(line); - try { + Object document = jsonConfiguration.jsonProvider().parse(line); + document = applyTransforms(document, compiledTransforms); writer.append(jsonConfiguration.jsonProvider().toJson(document)); writer.append('\n'); // NDJSON uses newlines between records writer.flush(); //after each line } catch (UnmatchedPseudonymization e) { log.warning("Skipped record due to UnmatchedPseudonymization: " + e.getPath()); + errorCount++; + } catch (JsonPathException e) { + log.warning("Skipped record due to JsonPathException: " + e.getMessage()); + errorCount++; + } catch (Exception e) { + log.warning("Skipped record due to unexpected exception: " + e.getMessage()); + errorCount++; } } } + return errorCount; } diff --git a/java/core/src/test/java/co/worklytics/psoxy/storage/impl/BulkDataSanitizerImplTest.java b/java/core/src/test/java/co/worklytics/psoxy/storage/impl/BulkDataSanitizerImplTest.java index 2d3a554c74..9307e20918 100644 --- a/java/core/src/test/java/co/worklytics/psoxy/storage/impl/BulkDataSanitizerImplTest.java +++ b/java/core/src/test/java/co/worklytics/psoxy/storage/impl/BulkDataSanitizerImplTest.java @@ -39,10 +39,10 @@ import javax.inject.Inject; import javax.inject.Named; import javax.inject.Singleton; -import java.io.File; -import java.io.FileReader; -import java.io.StringReader; -import java.io.StringWriter; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -124,15 +124,16 @@ void handle_pseudonymize() { .columnToPseudonymize("EMPLOYEE_EMAIL") .build(); - File inputFile = new File(getClass().getResource("/csv/hris-example.csv").getFile()); columnarFileSanitizerImpl.setRules(rules); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); } } + + @ParameterizedTest @ValueSource(strings = {"EMPLOYEE_EMAIL", "employee_email", "Employee_Email"}) @SneakyThrows @@ -148,9 +149,8 @@ void handle_pseudonymizeIfPresent(String caseVariant) { .columnToPseudonymizeIfPresent("EXTRA_EMAIL") //unlike 'columnToPseudonymize', this doesn't throw error .build(); - File inputFile = new File(getClass().getResource("/csv/hris-example.csv").getFile()); columnarFileSanitizerImpl.setRules(rules); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -171,10 +171,9 @@ void handle_redaction() { .columnToRedact("DEPARTMENT") .build(); - File inputFile = new File(getClass().getResource("/csv/hris-example.csv").getFile()); columnarFileSanitizerImpl.setRules(rules); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -191,10 +190,9 @@ void handle_cased() { .columnToPseudonymize("EMPLOYEE_ID") .columnToPseudonymize("AN EMAIL").build(); - File inputFile = new File(getClass().getResource("/csv/hris-example-headers-w-spaces.csv").getFile()); columnarFileSanitizerImpl.setRules(rules); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example-headers-w-spaces.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -211,10 +209,9 @@ void handle_quotes() { .columnToPseudonymize("EMPLOYEE_ID") .columnToPseudonymize("EMAIL") .build(); - File inputFile = new File(getClass().getResource("/csv/hris-example-quotes.csv").getFile()); columnarFileSanitizerImpl.setRules(rules); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example-quotes.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -242,10 +239,9 @@ void defaultRules(String exampleFile) { ColumnarRules rules = (ColumnarRules) rulesUtils.getRulesFromConfig(config, new EnvVarsConfigService()).orElseThrow(); - File inputFile = new File(getClass().getResource(exampleFile).getFile()); columnarFileSanitizerImpl.setRules(rules); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile(exampleFile); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -269,10 +265,9 @@ void defaultRules_padded() { ColumnarRules rules = (ColumnarRules) rulesUtils.getRulesFromConfig(config, new EnvVarsConfigService()).orElseThrow(); - File inputFile = new File(getClass().getResource("/csv/hris-default-rules_padded-employee-id.csv").getFile()); columnarFileSanitizerImpl.setRules(rules); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-default-rules_padded-employee-id.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -290,10 +285,7 @@ void validCaseInsensitiveAndTrimRules() { .columnToPseudonymize(" an EMAIL ") .build(); columnarFileSanitizerImpl.setRules(rules); - - File inputFile = new File(getClass().getResource("/csv/hris-example-headers-w-spaces.csv").getFile()); - - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example-headers-w-spaces.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -312,9 +304,7 @@ void handle_rename() { .build(); columnarFileSanitizerImpl.setRules(rules); - File inputFile = new File(getClass().getResource("/csv/hris-example-quotes.csv").getFile()); - - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example-quotes.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -346,9 +336,7 @@ void handle_duplicates() { .build(); columnarFileSanitizerImpl.setRules(rules); - File inputFile = new File(getClass().getResource("/csv/hris-example.csv").getFile()); - - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, defaultPseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -385,9 +373,7 @@ void handle_duplicates_lookup_table_via_transforms() { .build(); columnarFileSanitizerImpl.setRules(rules); - File inputFile = new File(getClass().getResource("/csv/hris-example.csv").getFile()); - - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, defaultPseudonymizer); String output = out.toString(); @@ -420,9 +406,7 @@ void handle_duplicates_lookup_table_pre_0_4_48() { .build(); columnarFileSanitizerImpl.setRules(rules); - File inputFile = new File(getClass().getResource("/csv/hris-example.csv").getFile()); - - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, defaultPseudonymizer); String output = out.toString(); @@ -456,11 +440,7 @@ void acmeExample() { pseudonymizerImplFactory.create(Pseudonymizer.ConfigurationOptions.builder() .build()); - - File inputFile = new File(getClass().getResource("/csv/example_acme_20220901.csv").getFile()); - - - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/example_acme_20220901.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, defaultPseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -478,9 +458,7 @@ void handle_inclusion() { .build(); columnarFileSanitizerImpl.setRules(rules); - File inputFile = new File(getClass().getResource("/csv/hris-example-quotes.csv").getFile()); - - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example-quotes.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -502,15 +480,11 @@ void shuffle() { .columnToPseudonymize("EMPLOYEE_EMAIL") .build(); columnarFileSanitizerImpl.setRules(rules); - - File inputFile = new File(getClass().getResource("/csv/hris-example.csv").getFile()); - - // replace shuffler implementation with one that reverses the list, so deterministic columnarFileSanitizerImpl.setRecordShuffleChunkSize(2); columnarFileSanitizerImpl.makeShuffleDeterministic(); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -570,14 +544,12 @@ void transform_ghusername() { .build(); columnarFileSanitizerImpl.setRules(rules); - File inputFile = new File(getClass().getResource("/csv/hris-example.csv").getFile()); - // replace shuffler implementation with one that reverses the list, so deterministic columnarFileSanitizerImpl.setRecordShuffleChunkSize(2); columnarFileSanitizerImpl.makeShuffleDeterministic(); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -637,7 +609,7 @@ void transform_complex_ghusername() { String resultString; - try (StringReader in = new StringReader(INITIAL); + try (BufferedReader in = new BufferedReader(new StringReader(INITIAL)); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); @@ -669,15 +641,13 @@ void transform_fromYamlComplex() { "\"{\"\"hash\"\":\"\"4_hashed\"\"}\",\"{\"\"hash\"\":\"\"dave@worklytics.co_hashed\"\"}\",Engineering,2023-01-06,\"{\"\"hash\"\":\"\"1_hashed\"\"}\",2018-06-03,,\"{\"\"hash\"\":\"\"dave_hashed\"\"}\",\"{\"\"hash\"\":\"\"dave_alternate_hashed\"\"}\",dave\n" + "\"{\"\"hash\"\":\"\"3_hashed\"\"}\",\"{\"\"hash\"\":\"\"charles.clark@workltycis.co_hashed\"\"}\",Engineering,2023-01-06,\"{\"\"hash\"\":\"\"1_hashed\"\"}\",2019-10-06,2022-12-08,\"{\"\"hash\"\":\"\"charles_clark_hashed\"\"}\",\"{\"\"hash\"\":\"\"charles_clark_alternate_hashed\"\"}\",charles_clark\n"; - File inputFile = new File(getClass().getResource("/csv/hris-example-split-email-usernames.csv").getFile()); - columnarFileSanitizerImpl.setRecordShuffleChunkSize(2); columnarFileSanitizerImpl.makeShuffleDeterministic(); // use stub for easy check on values Pseudonymizer pseudonymizer = new StubPseudonymizer(); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example-split-email-usernames.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -709,14 +679,14 @@ void transform_fromYamlWithRegExp() { "\"{\"\"hash\"\":\"\"1_hashed\"\"}\",\"{\"\"hash\"\":\"\"alice@worklytics.co_hashed\"\"}\",Engineering,2023-01-06,,2019-11-11,,\"{\"\"hash\"\":\"\"alice_acme_hashed\"\"}\"\n" + "\"{\"\"hash\"\":\"\"4_hashed\"\"}\",,Engineering,2023-01-06,\"{\"\"hash\"\":\"\"1_hashed\"\"}\",2018-06-03,,\n" + "\"{\"\"hash\"\":\"\"3_hashed\"\"}\",\"{\"\"hash\"\":\"\"charles@workltycis.co_hashed\"\"}\",Engineering,2023-01-06,\"{\"\"hash\"\":\"\"1_hashed\"\"}\",2019-10-06,2022-12-08,\"{\"\"hash\"\":\"\"charles_acme_hashed\"\"}\"\n"; - File inputFile = new File(getClass().getResource("/csv/hris-example.csv").getFile()); + columnarFileSanitizerImpl.setRecordShuffleChunkSize(2); columnarFileSanitizerImpl.makeShuffleDeterministic(); Pseudonymizer pseudonymizer = new StubPseudonymizer(); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = forFile("/csv/hris-example.csv"); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -747,9 +717,7 @@ void handle_null_transformations() { .build(); columnarFileSanitizerImpl.setRules(rules); - File inputFile = new File(getClass().getResource("/csv/hris-example.csv").getFile()); - - try (StringReader in = new StringReader(SOURCE); + try (BufferedReader in = new BufferedReader(new StringReader(SOURCE)); StringWriter out = new StringWriter()) { columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer); assertEquals(EXPECTED, out.toString()); @@ -769,7 +737,7 @@ void handle_control_chars_in_headers_reproduce_error() { .build(); columnarFileSanitizerImpl.setRules(rules); - try (StringReader in = new StringReader(SOURCE); + try (BufferedReader in = new BufferedReader(new StringReader(SOURCE)); StringWriter out = new StringWriter()) { assertThrows(IllegalArgumentException.class, () -> columnarFileSanitizerImpl.sanitize(in, out, pseudonymizer), "Non-ASCII characters found in headers, inspect file with cat -v for control characters"); } @@ -798,4 +766,13 @@ public ConfigurationOptions getOptions() { } } + + BufferedReader forFile(String file) { + File inputFile = new File(getClass().getResource(file).getFile()); + try { + return Files.newBufferedReader(Paths.get(inputFile.toURI()), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Failed to read file: " + file, e); + } + } } diff --git a/java/core/src/test/java/co/worklytics/psoxy/storage/impl/ColumnarBulkDataSanitizerImplTest.java b/java/core/src/test/java/co/worklytics/psoxy/storage/impl/ColumnarBulkDataSanitizerImplTest.java index 9e5be855ea..2502a35024 100644 --- a/java/core/src/test/java/co/worklytics/psoxy/storage/impl/ColumnarBulkDataSanitizerImplTest.java +++ b/java/core/src/test/java/co/worklytics/psoxy/storage/impl/ColumnarBulkDataSanitizerImplTest.java @@ -1,10 +1,23 @@ package co.worklytics.psoxy.storage.impl; +import co.worklytics.psoxy.utils.ProcessingBuffer; import com.avaulta.gateway.rules.ColumnarRules; +import lombok.SneakyThrows; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import java.io.BufferedReader; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.mock; @@ -24,9 +37,65 @@ void determineMissingColumnsToPseudonymize() { assertEmpty(columnarBulkDataSanitizer.determineMissingColumnsToPseudonymize(Set.of("EMPLOYEE_ID"), Set.of("employee_id"))); assertEmpty(columnarBulkDataSanitizer.determineMissingColumnsToPseudonymize(Set.of("employee_Id"), Set.of("employee_id", "employee_name"))); assertEmpty(columnarBulkDataSanitizer.determineMissingColumnsToPseudonymize(Set.of("employee_id"), Set.of("employee_name", "EMPLOYEE_ID"))); + } + + + + + @ParameterizedTest + @MethodSource("headerLinesAndEndings") + void firstLine(String headerLine, String expectedEndOfLine) { + ColumnarBulkDataSanitizerImpl columnarBulkDataSanitizer = new ColumnarBulkDataSanitizerImpl(mock(ColumnarRules.class)); + BufferedReader reader = new BufferedReader(new StringReader(headerLine)); + ColumnarBulkDataSanitizerImpl.ParsedFirstLine parsedFirstLine = columnarBulkDataSanitizer.parseFirstLine(reader); + assertEquals(expectedEndOfLine, parsedFirstLine.getEndOfLine()); + } + + static Stream headerLinesAndEndings() { + return Stream.of( + Arguments.of("employee_id,employee_name\n", "\n"), + Arguments.of("employee_id,employee_name \n", "\n"), + Arguments.of("employee_id,employee_name\r\n", "\r\n"), + Arguments.of("employee_id,employee_name \r\n", "\r\n"), + Arguments.of("employee_id,employee_name\r", "\r"), + Arguments.of("employee_id,employee_name \r", "\r") + ); + } + + @SneakyThrows + @Test + void processingSkipsMalformedRows() { + ColumnarBulkDataSanitizerImpl columnarBulkDataSanitizer = new ColumnarBulkDataSanitizerImpl(ColumnarRules.builder() + .columnToPseudonymize("employee_id") + .build()); + + String MALFORMED_CSV = "employee_id,employee_name\n" + + "12345,John Doe\n" + + "malformed_row_without_comma,\"missing_employee_name\n" + + "67890,Jane Doe\n"; + + + CSVFormat csvFormat = CSVFormat.DEFAULT.builder() + .setHeader("employee_id", "employee_name") + .setSkipHeaderRecord(false) + .build(); + + List processedRecords = new ArrayList<>(); + BufferedReader reader = new BufferedReader(new StringReader(MALFORMED_CSV)); + reader.readLine(); // skip header + + columnarBulkDataSanitizer.processRecords(List.of("employee_id,employee_name"), + csvFormat, + Collections.emptyMap(), + reader, + new ProcessingBuffer<>(1, r -> processedRecords.addAll(r)) {} + ); + + assertEquals(2, processedRecords.size()); } + void assertEmpty(Set list) { assertTrue(list.isEmpty(), "Not empty list"); } diff --git a/java/impl/aws/src/main/java/co/worklytics/psoxy/S3Handler.java b/java/impl/aws/src/main/java/co/worklytics/psoxy/S3Handler.java index 8d9c81032f..8e60176a77 100644 --- a/java/impl/aws/src/main/java/co/worklytics/psoxy/S3Handler.java +++ b/java/impl/aws/src/main/java/co/worklytics/psoxy/S3Handler.java @@ -120,9 +120,9 @@ StorageEventResponse process(String importBucket, String sourceKey, StorageHandl Optional.ofNullable(sourceMetadata.getContentEncoding()) .ifPresent(destinationMetadata::setContentEncoding); } - - destinationMetadata.setUserMetadata(storageHandler.buildObjectMetadata(importBucket, sourceKey, transform)); - + Map metadata = new HashMap<>(storageHandler.buildObjectMetadata(importBucket, sourceKey, transform)); + metadata.put(StorageHandler.BulkMetaData.ERROR_COUNT.getMetaDataKey(), String.valueOf(storageEventResponse.getErrorCount())); + destinationMetadata.setUserMetadata(metadata); s3Client.putObject(storageEventResponse.getDestinationBucketName(), storageEventResponse.getDestinationObjectPath(), diff --git a/java/impl/cmd-line/src/main/java/co/worklytics/psoxy/Handler.java b/java/impl/cmd-line/src/main/java/co/worklytics/psoxy/Handler.java index 264ce57010..baed078bbd 100644 --- a/java/impl/cmd-line/src/main/java/co/worklytics/psoxy/Handler.java +++ b/java/impl/cmd-line/src/main/java/co/worklytics/psoxy/Handler.java @@ -52,7 +52,7 @@ public void sanitize(@NonNull Config config, Pseudonymizer pseudonymizer = pseudonymizerImplFactory.create(options.build()); BulkDataSanitizer sanitizer = fileHandlerStrategy.get(rules); - try (FileReader in = new FileReader(inputFile); + try (BufferedReader in = new BufferedReader(new FileReader(inputFile)); Writer writer = new AppendableWriter(out)) { sanitizer.sanitize(in, writer, pseudonymizer); }