diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/PatternMapString.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/PatternMapString.java index 852fbf7..4828029 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/PatternMapString.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/PatternMapString.java @@ -85,7 +85,7 @@ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct inpu final String inputFieldName = inputField.name(); final Object value = inputStruct.get(inputFieldName); outputStruct.put(inputFieldName, value); - if (inputFieldName.equals(config.srcfieldname)) { + if (inputFieldName.equals(config.srcfieldname) && value != null) { String replacedField = (String) value; final Matcher fieldMatcher = this.config.pattern.matcher(replacedField); String replacedValue = fieldMatcher.replaceAll(this.config.replacement); @@ -109,6 +109,8 @@ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct inpu } outputStruct.put(config.destfieldname, replacedValue); retVal = new SchemaAndValue(outPutSchema, outputStruct); + } else { + retVal = new SchemaAndValue(outPutSchema, inputStruct); } } return retVal; diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/PatternMapStringTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/PatternMapStringTest.java index 9037b57..c11f4d8 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/PatternMapStringTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/PatternMapStringTest.java @@ -276,6 +276,105 @@ public void multiple() { assertStruct(expectedStruct, actualStruct); } + @Test + public void missingFieldWithoutRename() { + this.transformation.configure( + ImmutableMap.of( + PatternMapStringConfig.SRC_FIELD_NAME_CONF, "fieldA", + PatternMapStringConfig.DEST_FIELD_NAME_CONF, "fieldA", + PatternMapStringConfig.VALUE_PATTERN_CONF, "from", + PatternMapStringConfig.VALUE_REPLACEMENT_CONF, "to" + ) + ); + + Schema inputSchema = SchemaBuilder.struct() + .name("testing") + .field("fieldA", Schema.OPTIONAL_STRING_SCHEMA) + .field("fieldB", Schema.STRING_SCHEMA); + Struct inputStruct = new Struct(inputSchema) + .put("fieldB", "valueB"); + + final Object key = isKey ? inputStruct : null; + final Object value = isKey ? null : inputStruct; + final Schema keySchema = isKey ? inputSchema : null; + final Schema valueSchema = isKey ? null : inputSchema; + + final SinkRecord inputRecord = new SinkRecord( + TOPIC, + 1, + keySchema, + key, + valueSchema, + value, + 1234L + ); + final SinkRecord outputRecord = this.transformation.apply(inputRecord); + assertNotNull(outputRecord); + + final Schema actualSchema = isKey ? outputRecord.keySchema() : outputRecord.valueSchema(); + final Struct actualStruct = (Struct) (isKey ? outputRecord.key() : outputRecord.value()); + + final Schema expectedSchema = SchemaBuilder.struct() + .name("testing") + .field("fieldA", Schema.OPTIONAL_STRING_SCHEMA) + .field("fieldB", Schema.STRING_SCHEMA); + Struct expectedStruct = new Struct(expectedSchema) + .put("fieldB", "valueB"); + + assertSchema(expectedSchema, actualSchema); + assertStruct(expectedStruct, actualStruct); + } + + @Test + public void missingFieldWithRename() { + this.transformation.configure( + ImmutableMap.of( + PatternMapStringConfig.SRC_FIELD_NAME_CONF, "fieldA", + PatternMapStringConfig.DEST_FIELD_NAME_CONF, "fieldC", + PatternMapStringConfig.VALUE_PATTERN_CONF, "from", + PatternMapStringConfig.VALUE_REPLACEMENT_CONF, "to" + ) + ); + + Schema inputSchema = SchemaBuilder.struct() + .name("testing") + .field("fieldA", Schema.OPTIONAL_STRING_SCHEMA) + .field("fieldB", Schema.STRING_SCHEMA); + Struct inputStruct = new Struct(inputSchema) + .put("fieldB", "valueB"); + + final Object key = isKey ? inputStruct : null; + final Object value = isKey ? null : inputStruct; + final Schema keySchema = isKey ? inputSchema : null; + final Schema valueSchema = isKey ? null : inputSchema; + + final SinkRecord inputRecord = new SinkRecord( + TOPIC, + 1, + keySchema, + key, + valueSchema, + value, + 1234L + ); + final SinkRecord outputRecord = this.transformation.apply(inputRecord); + assertNotNull(outputRecord); + + final Schema actualSchema = isKey ? outputRecord.keySchema() : outputRecord.valueSchema(); + final Struct actualStruct = (Struct) (isKey ? outputRecord.key() : outputRecord.value()); + + final Schema expectedSchema = SchemaBuilder.struct() + .name("testing") + .field("fieldA", Schema.OPTIONAL_STRING_SCHEMA) + .field("fieldC", Schema.OPTIONAL_STRING_SCHEMA) + .field("fieldB", Schema.STRING_SCHEMA); + Struct expectedStruct = new Struct(expectedSchema) + .put("fieldB", "valueB"); + + assertSchema(expectedSchema, actualSchema); + assertStruct(expectedStruct, actualStruct); + } + public static class KeyTest> extends PatternMapStringTest { protected KeyTest() {