Skip to content

Commit b0e922d

Browse files
shaikzakiriitmrhauch
authored andcommitted
KAFKA-10477: Fix JsonConverter regression to treat MISSING nodes as NULL nodes (#9306)
Fixes a regression introduced in `JsonConverter` with previous upgrades from Jackson Databind 2.9.x to 2.10.x. Jackson Databind version 2.10.0 included a backward-incompatible behavioral change to use `JsonNodeType.MISSING` (and `MissingNode`, the subclass of `JsonNode` that has a type of `MISSING`) instead of `JsonNodeType.NULL` / `NullNode`. See FasterXML/jackson-databind#2211 for details of this change. This change makes recovers the older `JsonConverter` behavior of returning null on empty input. Added two unit tests for this change. Both unit tests were independently tested with earlier released versions and passed on all versions that used Jackson 2.9.x and earlier, and failed on all versions that used 2.10.x and that did not have the fixed included in the PR. Both of the new unit tests pass with this fix to `JsonConverter`. Author: Shaik Zakir Hussain <[email protected]> Reviewer: Randall Hauch <[email protected]>
1 parent a5d883c commit b0e922d

File tree

2 files changed

+31
-1
lines changed

2 files changed

+31
-1
lines changed

connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,7 @@ private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
708708
} else {
709709
switch (jsonValue.getNodeType()) {
710710
case NULL:
711+
case MISSING:
711712
// Special case. With no schema
712713
return null;
713714
case BOOLEAN:
@@ -730,7 +731,6 @@ private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
730731
break;
731732

732733
case BINARY:
733-
case MISSING:
734734
case POJO:
735735
default:
736736
schemaType = null;

connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,36 @@ public void nullToConnect() {
190190
assertEquals(SchemaAndValue.NULL, converted);
191191
}
192192

193+
/**
194+
* When schemas are disabled, empty data should be decoded to an empty envelope.
195+
* This test verifies the case where `schemas.enable` configuration is set to false, and
196+
* {@link JsonConverter} converts empty bytes to {@link SchemaAndValue#NULL}.
197+
*/
198+
@Test
199+
public void emptyBytesToConnect() {
200+
// This characterizes the messages with empty data when Json schemas is disabled
201+
Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
202+
converter.configure(props, true);
203+
SchemaAndValue converted = converter.toConnectData(TOPIC, "".getBytes());
204+
assertEquals(SchemaAndValue.NULL, converted);
205+
}
206+
207+
/**
208+
* When schemas are disabled, fields are mapped to Connect maps.
209+
*/
210+
@Test
211+
public void schemalessWithEmptyFieldValueToConnect() {
212+
// This characterizes the messages with empty data when Json schemas is disabled
213+
Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
214+
converter.configure(props, true);
215+
String input = "{ \"a\": \"\", \"b\": null}";
216+
SchemaAndValue converted = converter.toConnectData(TOPIC, input.getBytes());
217+
Map<String, String> expected = new HashMap<>();
218+
expected.put("a", "");
219+
expected.put("b", null);
220+
assertEquals(new SchemaAndValue(null, expected), converted);
221+
}
222+
193223
@Test
194224
public void nullSchemaPrimitiveToConnect() {
195225
SchemaAndValue converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes());

0 commit comments

Comments
 (0)