Skip to content

Commit

Permalink
Issue 149: Change AvroDeserializer deserialize method to reduce perfo…
Browse files Browse the repository at this point in the history
…rmance overhead on DatumReader creation (#151)

* [changed] AvroDeserializer deserialize method to reduce performance overhead on DatumReader creation

Signed-off-by: jingerbread <[email protected]>

* Issue 149: Change AvroDeserializer deserialize method to reduce performance overhead on DatumReader creation
[updated] GenericAvroDeserializer to avoid creating data readers
[used] computeIfAbsent

Signed-off-by: jingerbread <[email protected]>

* [removed] getKnownSchemaReaders()

Signed-off-by: jingerbread <[email protected]>

* [changed] visibility of createDatumReader and getKnownSchemaReaders methods for unit test
[added] unit test for AvroDeserializer

Signed-off-by: jingerbread <[email protected]>

* [fixed] failing gradle check for import of io.pravega.schemaregistry.serializers.SerializerFactory

Signed-off-by: jingerbread <[email protected]>

* [added] licencing headers
[removed] Preconditions.checkNotNull(readerSchemaInfo) for AvroGenericDeserializer

Signed-off-by: jingerbread <[email protected]>

Co-authored-by: shivesh ranjan <[email protected]>
  • Loading branch information
jingerbread and shiveshr authored Oct 29, 2020
1 parent 9f207de commit 209efcc
Show file tree
Hide file tree
Showing 11 changed files with 3,093 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
*/
package io.pravega.schemaregistry.serializer.avro.impl;

import com.google.common.base.Charsets;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.contract.data.SchemaInfo;
Expand All @@ -19,48 +20,59 @@
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;

class AvroDeserializer<T> extends AbstractDeserializer<T> {
private final AvroSchema<T> avroSchema;
private final ConcurrentHashMap<SchemaInfo, Schema> knownSchemas;
private final ConcurrentHashMap<ByteBuffer, DatumReader<T>> knownSchemaReaders;
private final boolean specific;
private final Schema readerSchema;

AvroDeserializer(String groupId, SchemaRegistryClient client,
AvroSchema<T> schema,
SerializerConfig.Decoders decoder, EncodingCache encodingCache) {
super(groupId, client, schema, false, decoder, encodingCache, true);
Preconditions.checkNotNull(schema);
this.avroSchema = schema;
this.knownSchemas = new ConcurrentHashMap<>();
this.knownSchemaReaders = new ConcurrentHashMap<>();
specific = SpecificRecordBase.class.isAssignableFrom(schema.getTClass());
readerSchema = schema.getSchema();
ByteBuffer schemaData = schema.getSchemaInfo().getSchemaData();
knownSchemaReaders.put(schemaData, createDatumReader(readerSchema, readerSchema, specific));
}

@Override
public final T deserialize(InputStream inputStream, SchemaInfo writerSchemaInfo, SchemaInfo readerSchemaInfo) throws IOException {
Preconditions.checkNotNull(writerSchemaInfo);
Schema writerSchema;
if (knownSchemas.containsKey(writerSchemaInfo)) {
writerSchema = knownSchemas.get(writerSchemaInfo);
} else {
String schemaString = new String(writerSchemaInfo.getSchemaData().array(), Charsets.UTF_8);
writerSchema = new Schema.Parser().parse(schemaString);
knownSchemas.put(writerSchemaInfo, writerSchema);
}
Schema readerSchema = avroSchema.getSchema();
final ByteBuffer writerSchemaData = writerSchemaInfo.getSchemaData();
DatumReader<T> datumReader = knownSchemaReaders.computeIfAbsent(writerSchemaData, key -> {
Schema writerSchema = AvroSchema.from(writerSchemaInfo).getSchema();
return createDatumReader(writerSchema, this.readerSchema, specific);
});
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);

if (SpecificRecordBase.class.isAssignableFrom(avroSchema.getTClass())) {
SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
return datumReader.read(null, decoder);
return datumReader.read(null, decoder);
}

@VisibleForTesting
DatumReader<T> createDatumReader(Schema writerSchema, Schema readerSchema, boolean specific) {
DatumReader<T> datumReader;
if (specific) {
datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
} else {
ReflectDatumReader<T> datumReader = new ReflectDatumReader<>(writerSchema, readerSchema);
return datumReader.read(null, decoder);
datumReader = new ReflectDatumReader<>(writerSchema, readerSchema);
}
return datumReader;
}

@VisibleForTesting
ImmutableMap<ByteBuffer, DatumReader<T>> getKnownSchemaReaders() {
return ImmutableMap.copyOf(knownSchemaReaders);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,49 @@
*/
package io.pravega.schemaregistry.serializer.avro.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import com.google.common.collect.ImmutableMap;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.contract.data.SchemaInfo;
import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import io.pravega.schemaregistry.serializer.shared.impl.AbstractDeserializer;
import io.pravega.schemaregistry.serializer.shared.impl.EncodingCache;
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;

import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ConcurrentHashMap;

public class AvroGenericDeserializer extends AbstractDeserializer<Object> {
private final ConcurrentHashMap<SchemaInfo, Schema> knownSchemas;
private final ConcurrentHashMap<Pair<SchemaInfo, SchemaInfo>, GenericDatumReader<Object>> knownSchemaReaders;

public AvroGenericDeserializer(String groupId, SchemaRegistryClient client, @Nullable AvroSchema<Object> schema,
SerializerConfig.Decoders decoder, EncodingCache encodingCache) {
super(groupId, client, schema, false, decoder, encodingCache, true);
this.knownSchemas = new ConcurrentHashMap<>();
this.knownSchemaReaders = new ConcurrentHashMap<>();
}

@Override
public final Object deserialize(InputStream inputStream, SchemaInfo writerSchemaInfo, SchemaInfo readerSchemaInfo) throws IOException {
Preconditions.checkNotNull(writerSchemaInfo);
Schema writerSchema = knownSchemas.computeIfAbsent(writerSchemaInfo, x -> AvroSchema.from(x).getSchema());
Schema readerSchema = knownSchemas.computeIfAbsent(readerSchemaInfo, x -> AvroSchema.from(x).getSchema());

GenericDatumReader<Object> genericDatumReader = new GenericDatumReader<>(writerSchema, readerSchema);

final Pair<SchemaInfo, SchemaInfo> keyPair = Pair.of(writerSchemaInfo, readerSchemaInfo);
GenericDatumReader<Object> genericDatumReader = knownSchemaReaders.computeIfAbsent(keyPair, key -> {
Schema writerSchema = AvroSchema.from(writerSchemaInfo).getSchema();
Schema readerSchema = AvroSchema.from(readerSchemaInfo).getSchema();
return new GenericDatumReader<>(writerSchema, readerSchema);
});
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
return genericDatumReader.read(null, decoder);
}

@VisibleForTesting
ImmutableMap<Pair<SchemaInfo, SchemaInfo>, GenericDatumReader<Object>> getKnownSchemaReaders() {
return ImmutableMap.copyOf(knownSchemaReaders);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.schemaregistry.serializer.avro.impl;

import com.google.common.collect.ImmutableMap;
import io.pravega.client.stream.Serializer;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.contract.data.*;
import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import io.pravega.schemaregistry.serializer.avro.testobjs.generated.avro.AddressEntry;
import io.pravega.schemaregistry.serializer.avro.testobjs.generated.avro.User;
import io.pravega.schemaregistry.serializer.shared.codec.Codecs;
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

@Slf4j
public class AvroDeserializerTest {

private Serializer<User> serializer;
private AvroDeserializer<User> avroDeserializer;
private AvroGenericDeserializer genericDeserializer;
private User user;

@Before
public void init() {
AvroSchema<User> userAvroSchema = AvroSchema.of(User.class);
log.info("Aliases: {}", userAvroSchema.getSchema().getAliases());
VersionInfo versionInfo1 = new VersionInfo("avroUser1", 0, 0);
SchemaRegistryClient client = mock(SchemaRegistryClient.class);
doAnswer(x -> true).when(client).canReadUsing(anyString(), any());
doAnswer(x -> new EncodingId(0)).when(client).getEncodingId(anyString(), any(), any());
doAnswer(x -> new EncodingInfo(versionInfo1, userAvroSchema.getSchemaInfo(), Codecs.None.getCodec().getCodecType())).when(client).getEncodingInfo(anyString(), eq(new EncodingId(0)));
SerializerConfig serializerConfig = SerializerConfig.builder().registryClient(client).groupId("avroUser1")
.createGroup(SerializationFormat.Avro).registerSchema(true).build();
this.serializer = AvroSerializerFactory
.serializer(serializerConfig, userAvroSchema);
this.avroDeserializer = Mockito.spy((AvroDeserializer<User>)AvroSerializerFactory.deserializer(
serializerConfig, userAvroSchema));

org.apache.avro.Schema schema = userAvroSchema.getSchema();
AvroSchema<Object> objectAvroSchema = AvroSchema.of(schema);
this.genericDeserializer = Mockito.spy((AvroGenericDeserializer)AvroSerializerFactory.genericDeserializer(
serializerConfig, objectAvroSchema));

this.user = User.newBuilder()
.setUserId("111111111111")
.setBiography("Greg Egan was born 20 August 1961")
.setName("Greg Egan")
.setEventTimestamp(System.currentTimeMillis())
.setKeyValues(null)
.setKeyValues2(null)
.setKeyValues3(null)
.setAddress(AddressEntry.newBuilder().setCity("Perth")
.setPostalCode(5018)
.setStreetAddress("4/19 Gardner Road").build()).build();
}

@Test
public void testCreatingReadersOnceForSchemaGeneric() {
ImmutableMap<Pair<SchemaInfo, SchemaInfo>, GenericDatumReader<Object>> knownSchemaReaders1 = genericDeserializer.getKnownSchemaReaders();
Assert.assertTrue(knownSchemaReaders1.isEmpty());
Assert.assertEquals(0, knownSchemaReaders1.size());

ByteBuffer serialized = serializer.serialize(user);
int payloadSize = serialized.limit();
log.info("serialized into {}", payloadSize);
Assert.assertEquals(100, payloadSize);
byte[] bytes = serialized.array();
log.info("bytes: {}", new String(bytes, StandardCharsets.UTF_8));
Object user1 = genericDeserializer.deserialize(ByteBuffer.wrap(bytes));
log.info("deserialized {}", user1);
ImmutableMap<Pair<SchemaInfo, SchemaInfo>, GenericDatumReader<Object>> knownSchemaReaders2 = genericDeserializer.getKnownSchemaReaders();
Assert.assertEquals(1, knownSchemaReaders2.size());
}

@Test
public void testCreatingReadersOnceForSchema() {
ImmutableMap<ByteBuffer, DatumReader<User>> knownSchemaReaders1 = avroDeserializer.getKnownSchemaReaders();
Assert.assertFalse(knownSchemaReaders1.isEmpty());
Assert.assertEquals(1, knownSchemaReaders1.size());
AvroSchema<User> userAvroSchema = AvroSchema.of(User.class);
DatumReader<User> datumReader = knownSchemaReaders1.get(userAvroSchema.getSchemaInfo().getSchemaData());
Assert.assertNotNull(datumReader);

ByteBuffer serialized = serializer.serialize(user);
int payloadSize = serialized.limit();
log.info("serialized into {}", payloadSize);
Assert.assertEquals(100, payloadSize);
byte[] bytes = serialized.array();
log.info("bytes: {}", new String(bytes, StandardCharsets.UTF_8));
User user1 = avroDeserializer.deserialize(ByteBuffer.wrap(bytes));

log.info("deserialized {}", user1);
Assert.assertEquals(user, user1);
serializer.serialize(user1);
ImmutableMap<ByteBuffer, DatumReader<User>> knownSchemaReaders2 = avroDeserializer.getKnownSchemaReaders();
Assert.assertEquals(1, knownSchemaReaders2.size());
Assert.assertEquals(knownSchemaReaders1, knownSchemaReaders2);
// called zero times outside constructor
Mockito.verify(avroDeserializer, Mockito.times(0)).createDatumReader(Mockito.any(), Mockito.any(), Mockito.anyBoolean());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.schemaregistry.serializer.avro.impl;


import io.pravega.client.stream.Serializer;
import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.schemaregistry.serializer.avro.testobjs;

public interface EventTimestampAware {
void setEventTimestamp(Long value);

Long getEventTimestamp();
}
Loading

0 comments on commit 209efcc

Please sign in to comment.