Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public abstract class AvroModelHandler
private final Int2ObjectCache<GenericDatumWriter<GenericRecord>> writers;
private final Int2ObjectCache<GenericRecord> records;
private final Int2IntHashMap paddings;
private final Int2IntHashMap avroOverheads;
private final AvroBytesFW bytesRO;
private final AvroIntFW intRO;
private final AvroLongFW longRO;
Expand Down Expand Up @@ -115,6 +116,7 @@ protected AvroModelHandler(
this.writers = new Int2ObjectCache<>(1, 1024, i -> {});
this.records = new Int2ObjectCache<>(1, 1024, i -> {});
this.paddings = new Int2IntHashMap(-1);
this.avroOverheads = new Int2IntHashMap(-1);
this.expandable = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
this.in = new DirectBufferInputStream();
this.event = new AvroModelEventContext(context);
Expand Down Expand Up @@ -195,6 +197,12 @@ protected final int supplyPadding(
return paddings.computeIfAbsent(schemaId, id -> calculatePadding(supplySchema(id)));
}

protected final int supplyAvroOverhead(
int schemaId)
{
return avroOverheads.computeIfAbsent(schemaId, id -> calculateAvroOverhead(supplySchema(id)));
}

protected final GenericDatumReader<GenericRecord> supplyReader(
int schemaId)
{
Expand Down Expand Up @@ -311,6 +319,30 @@ private int calculatePadding(
return padding;
}

private int calculateAvroOverhead(Schema schema)
{
int overhead = 0;
if (schema != null)
{
switch (schema.getType())
{
case RECORD:
for (Schema.Field field : schema.getFields())
{
overhead += calculateAvroOverhead(field.schema());
}
break;
case UNION:
List<Schema> types = schema.getTypes();
for (Schema type : types)
{
overhead += (type.getType().equals(Schema.Type.NULL)) ? calculateAvroOverhead(type) : 1;
}
}
}
return overhead;
}

private void extract(
Schema schema,
DirectBuffer data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,16 @@ public int padding(
int index,
int length)
{
return handler.encodePadding(length);
int padding = handler.encodePadding(length);
int schemaId = catalog != null && catalog.id > 0
? catalog.id
: handler.resolve(subject, catalog.version);

if (VIEW_JSON.equals(view))
{
padding += supplyAvroOverhead(schemaId);
}
return padding;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ public int readIndex() throws IOException

String label;
final JsonToken currentToken = lin.getCurrentToken();
if (currentToken == JsonToken.VALUE_NULL)
if (currentToken == JsonToken.VALUE_NULL ||
currentToken == JsonToken.END_OBJECT ||
currentToken == JsonToken.FIELD_NAME)
{
label = "null";
}
Expand Down Expand Up @@ -140,6 +142,14 @@ else if (currentToken == JsonToken.START_OBJECT &&
}
catch (InvocationTargetException ex)
{
if (ex.getTargetException() instanceof IOException)
{
throw (IOException) ex.getTargetException();
}
else if (ex.getTargetException() instanceof RuntimeException)
{
throw (RuntimeException) ex.getTargetException();
}
throw new RuntimeException(ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void readNull() throws IOException
{
in.nextToken();
}
else
else if (in.getCurrentToken() != JsonToken.END_OBJECT)
{
throw error("null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,38 @@ public void shouldVerifyPaddingLength()
assertEquals(525, converter.padding(data, 0, data.capacity()));
}

@Test
public void shouldVerifyWritePaddingLengthWithNull()
{
TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new)
.namespace("test")
.name("test0")
.type("test")
.options(TestCatalogOptionsConfig::builder)
.id(9)
.schema(SCHEMA_WITH_NULL)
.build()
.build();
AvroModelConfig model = AvroModelConfig.builder()
.view("json")
.catalog()
.name("test0")
.schema()
.strategy("topic")
.version("latest")
.subject("test-value")
.build()
.build()
.build();

when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options));
AvroWriteConverterHandler converter = new AvroWriteConverterHandler(config, model, context);

DirectBuffer data = new UnsafeBuffer();

assertEquals(1, converter.padding(data, 0, data.capacity()));
}

@Test
public void shouldExtract()
{
Expand Down
Loading