Skip to content

Commit ec12d56

Browse files
shaikzakiriitmrhauch
authored andcommitted
KAFKA-10477: Fix JsonConverter regression to treat MISSING nodes as NULL nodes (#9306)
Fixes a regression introduced in `JsonConverter` with previous upgrades from Jackson Databind 2.9.x to 2.10.x. Jackson Databind version 2.10.0 included a backward-incompatible behavioral change to use `JsonNodeType.MISSING` (and `MissingNode`, the subclass of `JsonNode` that has a type of `MISSING`) instead of `JsonNodeType.NULL` / `NullNode`. See FasterXML/jackson-databind#2211 for details of this change. This change makes recovers the older `JsonConverter` behavior of returning null on empty input. Added two unit tests for this change. Both unit tests were independently tested with earlier released versions and passed on all versions that used Jackson 2.9.x and earlier, and failed on all versions that used 2.10.x and that did not have the fixed included in the PR. Both of the new unit tests pass with this fix to `JsonConverter`. Author: Shaik Zakir Hussain <[email protected]> Reviewer: Randall Hauch <[email protected]>
1 parent 63f3e1c commit ec12d56

File tree

2 files changed

+31
-1
lines changed

2 files changed

+31
-1
lines changed

connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,7 @@ private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
729729
} else {
730730
switch (jsonValue.getNodeType()) {
731731
case NULL:
732+
case MISSING:
732733
// Special case. With no schema
733734
return null;
734735
case BOOLEAN:
@@ -751,7 +752,6 @@ private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
751752
break;
752753

753754
case BINARY:
754-
case MISSING:
755755
case POJO:
756756
default:
757757
schemaType = null;

connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,36 @@ public void nullToConnect() {
195195
assertEquals(SchemaAndValue.NULL, converted);
196196
}
197197

198+
/**
199+
* When schemas are disabled, empty data should be decoded to an empty envelope.
200+
* This test verifies the case where `schemas.enable` configuration is set to false, and
201+
* {@link JsonConverter} converts empty bytes to {@link SchemaAndValue#NULL}.
202+
*/
203+
@Test
204+
public void emptyBytesToConnect() {
205+
// This characterizes the messages with empty data when Json schemas is disabled
206+
Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
207+
converter.configure(props, true);
208+
SchemaAndValue converted = converter.toConnectData(TOPIC, "".getBytes());
209+
assertEquals(SchemaAndValue.NULL, converted);
210+
}
211+
212+
/**
213+
* When schemas are disabled, fields are mapped to Connect maps.
214+
*/
215+
@Test
216+
public void schemalessWithEmptyFieldValueToConnect() {
217+
// This characterizes the messages with empty data when Json schemas is disabled
218+
Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
219+
converter.configure(props, true);
220+
String input = "{ \"a\": \"\", \"b\": null}";
221+
SchemaAndValue converted = converter.toConnectData(TOPIC, input.getBytes());
222+
Map<String, String> expected = new HashMap<>();
223+
expected.put("a", "");
224+
expected.put("b", null);
225+
assertEquals(new SchemaAndValue(null, expected), converted);
226+
}
227+
198228
@Test
199229
public void nullSchemaPrimitiveToConnect() {
200230
SchemaAndValue converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes());

0 commit comments

Comments
 (0)