Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ public class StorageEventResponse {
*/
@NonNull
String destinationObjectPath;

/**
* how many records errored during processing
*/
@Builder.Default
Integer errorCount = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import co.worklytics.psoxy.Pseudonymizer;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;

/**
Expand All @@ -20,9 +20,10 @@ public interface BulkDataSanitizer {
* implicit in records
* @param writer The stream writer to which sanitized content should be written.
* @param pseudonymizer The pseudonymizer to use
* @throws IOException IO problem reading or writing
* @return records in data which could not be processed due to errors
* @throws IOException IO problem reading or writing
*/
void sanitize(Reader reader,
Writer writer,
Pseudonymizer pseudonymizer) throws IOException;
int sanitize(BufferedReader reader,
Writer writer,
Pseudonymizer pseudonymizer) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public enum BulkMetaData {

//SHA-1 of rules
RULES_SHA,
ERROR_COUNT,
;

// aws prepends `x-amz-meta-` to this; but per documentation, that's not visible via the
Expand All @@ -106,11 +107,12 @@ public StorageEventResponse handle(StorageEventRequest request,

this.validate(request, transform, inputStreamSupplier);

this.process(request, transform, inputStreamSupplier, outputStreamSupplier);
int errorCount = this.process(request, transform, inputStreamSupplier, outputStreamSupplier);

StorageEventResponse response = StorageEventResponse.builder()
.destinationBucketName(request.getDestinationBucketName())
.destinationObjectPath(request.getDestinationObjectPath())
.errorCount(errorCount)
.build();

log.info("Writing to: " + response.getDestinationBucketName() + "/" + response.getDestinationObjectPath());
Expand Down Expand Up @@ -371,15 +373,15 @@ public int getBufferSize() {
* @param outputStreamSupplier
*/
@SneakyThrows
void process(StorageEventRequest request,
int process(StorageEventRequest request,
StorageHandler.ObjectTransform transform,
Supplier<InputStream> inputStreamSupplier,
Supplier<OutputStream> outputStreamSupplier) {
int bufferSize = getBufferSize();

try (
InputStream inputStream = readInputStream(request, bufferSize, inputStreamSupplier);
Reader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8), bufferSize);
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8), bufferSize);
OutputStream outputStream = writeOutputStream(request, bufferSize, outputStreamSupplier);
OutputStreamWriter writer = new OutputStreamWriter(outputStream)
) {
Expand All @@ -393,7 +395,7 @@ void process(StorageEventRequest request,

BulkDataSanitizer fileHandler = bulkDataSanitizerFactory.get(applicableRules.get());

fileHandler.sanitize(reader, writer, pseudonymizer);
return fileHandler.sanitize(reader, writer, pseudonymizer);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@
import org.apache.commons.lang3.tuple.Pair;

import javax.inject.Inject;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.io.*;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.logging.Level;
import java.util.regex.Matcher;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -74,10 +73,11 @@ public ColumnarBulkDataSanitizerImpl(@Assisted ColumnarRules rules) {
this.rules = rules;
}


@Override
public void sanitize(@NonNull Reader reader,
@NonNull Writer writer,
@NonNull Pseudonymizer pseudonymizer) throws IOException {
public int sanitize(@NonNull BufferedReader reader,
@NonNull Writer writer,
@NonNull Pseudonymizer pseudonymizer) throws IOException {

CSVFormat inputCSVFormat = CSVFormat.Builder.create(CSVFormat.DEFAULT)
.setDelimiter(rules.getDelimiter())
Expand All @@ -87,8 +87,9 @@ public void sanitize(@NonNull Reader reader,
.setTrim(true)
.build();

ParsedFirstLine parsedFirstLine = parseFirstLine(reader);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd include a comment here about why use this and not records.getFirstEndOfLine()


CSVParser records = inputCSVFormat.parse(reader);
CSVParser records = inputCSVFormat.parse(new StringReader(parsedFirstLine.contents));

Preconditions.checkArgument(records.getHeaderMap() != null, "Failed to parse header from file");

Expand Down Expand Up @@ -275,44 +276,90 @@ public void sanitize(@NonNull Reader reader,
List<String> columnNamesForOutputFile = columnTransforms.keySet()
.stream()
.sorted(originalHeadersOrRenamedFirst.thenComparing(byColumnName))
.collect(Collectors.toList())
.toList()
.stream()
// leave original casing for headers
.map( h -> headers.stream().filter(h::equalsIgnoreCase).findFirst().orElse(h))
.collect(Collectors.toList());

CSVFormat csvFormat = CSVFormat.Builder.create()
CSVFormat inputCsvFormat = CSVFormat.DEFAULT.builder()
.setHeader(records.getHeaderNames().toArray(new String[0]))
.build();


CSVFormat outputFormat = CSVFormat.Builder.create()
.setHeader(columnNamesForOutputFile.toArray(new String[0]))
.setRecordSeparator(records.getFirstEndOfLine())
.setRecordSeparator(parsedFirstLine.getEndOfLine()) // use the same EOL as in the input file
.setNullString("")
.build();

// create an empty record to fill with the transformed values, ensuring all rows have
// the same columns
// linked hash map: need predictable iteration order and null values

LinkedHashMap<String, String> newRecord = new LinkedHashMap<>();
columnNamesForOutputFile.forEach(h -> newRecord.put(h, null));
int errorCount = 0;
try (CSVPrinter printer = new CSVPrinter(writer, outputFormat)) {
ProcessingBuffer<ProcessedRecord> buffer = getRecordsProcessingBuffer(printer);
errorCount = processRecords(columnNamesForOutputFile, inputCsvFormat, columnTransforms, reader, buffer);

try (CSVPrinter printer = new CSVPrinter(writer, csvFormat)) {
if (buffer.flush()) {
log.info(String.format("Processed records: %d", buffer.getProcessed()));
}
}
return errorCount;
}

ProcessingBuffer<ProcessedRecord> buffer = getRecordsProcessingBuffer(printer);
/**
* processes records into outputBuffer, applying the transformations to each
* @param columnNamesForOutput to render in the output buffer; so columns which DON'T have values in particular record will be filled
* @param columnTransformsToApply to apply to each column in the record
* @param bodyRecords to process; should be a CSVParser with headers
* @param outputBuffer to write the processed records to
* @return number of records which could not be processed due to errors
*/
int processRecords(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd return an object with the count of errors and processed lines too

List<String> columnNamesForOutput,
CSVFormat inputCsvFormat,
Map<String, Pair<String, List<Function<String, Optional<String>>>>> columnTransformsToApply,
BufferedReader bodyRecords,
ProcessingBuffer<ProcessedRecord> outputBuffer) throws IOException {

int errorCount = 0;

// collect transforms that are MISSING mappings in the records
Set<String> transformsWithoutMappings = new HashSet<>();

// we're going to read line-by-line, so we need to ensure that the CSVFormat is set to not skip the header record
CSVFormat perLineFormat = CSVFormat.DEFAULT.builder()
.setDelimiter(inputCsvFormat.getDelimiter())
.setSkipHeaderRecord(false).build();

Map<String, Integer> headerMap = new HashMap<>(inputCsvFormat.getHeader().length);
for (int i = 0; i < inputCsvFormat.getHeader().length; i++) {
headerMap.put(inputCsvFormat.getHeader()[i].toLowerCase(), i);
}

Set<String> transformsWithoutMappings = new HashSet<>();
// linked hash map: need predictable iteration order and null values
LinkedHashMap<String, String> newRecord = new LinkedHashMap<>();
// create an empty record to fill with the transformed values, ensuring all rows have
// the same columns
columnNamesForOutput.forEach(h -> newRecord.put(h, null));

for (CSVRecord record : records) {
// clean up the record prior to use
String line;
while ((line = bodyRecords.readLine()) != null) {
try (CSVParser parser = CSVParser.parse(line, perLineFormat)) {
CSVRecord record = parser.getRecords().get(0);
newRecord.replaceAll((k, v) -> null);
newRecord.keySet().forEach(h -> {

Pair<String, List<Function<String, Optional<String>>>> transforms = columnTransforms.getOrDefault(h, null);
Pair<String, List<Function<String, Optional<String>>>> transforms = columnTransformsToApply.getOrDefault(h, null);
if (transforms == null) {
newRecord.put(h, null);
} else {
// apply all transformations in insertion order
// key holds the original column
if (record.isMapped(transforms.getKey())) {
String v = record.get(transforms.getKey());

//TODO: build this into columnTransformsToApply, so don't need to call lowerCase on every loop
String lcKey = transforms.getKey().toLowerCase();

if (headerMap.containsKey(lcKey)) {
String v = StringUtils.trim(record.get(headerMap.get(lcKey)));
if (StringUtils.isNotBlank(v)) {
for (Function<String, Optional<String>> transform : transforms.getValue()) {
v = transform.apply(v).orElse(null);
Expand All @@ -328,19 +375,20 @@ public void sanitize(@NonNull Reader reader,
}
}
}

});

if (buffer.addAndAttemptFlush(ProcessedRecord.of(Lists.newArrayList(newRecord.values())))) {
log.info(String.format("Processed records: %d", buffer.getProcessed()));
};
}
if (buffer.flush()) {
log.info(String.format("Processed records: %d", buffer.getProcessed()));
if (outputBuffer.addAndAttemptFlush(ProcessedRecord.of(Lists.newArrayList(newRecord.values())))) {
log.info(String.format("Processed records: %d", outputBuffer.getProcessed()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah true, that appears in logs but just the buffer; I think if we expose the total as part of metadata could be useful

}
} catch (IndexOutOfBoundsException | UncheckedIOException | IOException e) {
log.log(Level.WARNING, "Failed to process record", e);
errorCount++;
}
}
return errorCount;
}


@VisibleForTesting
Set<String> determineMissingColumnsToPseudonymize(Set<String> columnsToPseudonymize, Set<String> outputColumnsCI) {
Function<Set<String>, Set<String>> asLowercase = (Set<String> set) -> set.stream().map(String::toLowerCase).collect(Collectors.toSet());
Expand Down Expand Up @@ -375,7 +423,7 @@ private ProcessingBuffer<ProcessedRecord> getRecordsProcessingBuffer(final CSVPr

@Value
@RequiredArgsConstructor(staticName = "of")
private static class ProcessedRecord {
static class ProcessedRecord {
Collection<String> values;
}

Expand Down Expand Up @@ -436,4 +484,65 @@ TriFunction<String, String, Pseudonymizer, String> buildPseudonymizationFunction
};
}



@Value
static class ParsedFirstLine {
String contents;
/**
* one of the following:
* - "\n" (LF)
* - "\r\n" (CRLF)
* - "\r" (CR)
*
*/
String endOfLine;
}

/**
* helper to parse the first line from a BufferedReader
* @param reader to parse
* @return ParsedFirstLine with the contents of the first line and the end of line character(s) used
*/
ParsedFirstLine parseFirstLine(BufferedReader reader) {
String line = null, endOfLine = null;
try {
StringBuilder sb = new StringBuilder();
int ch;
boolean foundEOL = false;
while ((ch = reader.read()) != -1) {
char c = (char) ch;
sb.append(c);
if (c == '\r') {
reader.mark(1);
int next = reader.read();
if (next == '\n') {
sb.append('\n');
endOfLine = "\r\n";
} else {
if (next != -1) {
reader.reset();
}
endOfLine = "\r";
}
foundEOL = true;
break;
} else if (c == '\n') {
endOfLine = "\n";
foundEOL = true;
break;
}
}
if (sb.isEmpty() && !foundEOL) {
throw new IllegalArgumentException("Empty file, no header found");
}
line = sb.toString().replaceAll("(\r\n|\r|\n)$", "");
if (endOfLine == null) {
endOfLine = "\n"; // default
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return new ParsedFirstLine(line, endOfLine);
}
}
Loading
Loading