Skip to content

Commit

Permalink
Support reader/writer schema on binary reader factory
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed May 5, 2024
1 parent 5892628 commit 1537180
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ import java.nio.ByteBuffer
* Pass in a pre-created [DecoderFactory] if you wish to configure buffer size.
*/
class BinaryReaderFactory(
schema: Schema,
reader: Schema,
writer: Schema,
private val factory: DecoderFactory,
) {
constructor(schema: Schema) : this(schema, DecoderFactory.get())

private val datumReader = GenericDatumReader<GenericRecord>(schema)
constructor(schema: Schema) : this(schema, schema, DecoderFactory.get())
constructor(reader: Schema, writer: Schema) : this(reader, writer, DecoderFactory.get())
constructor(schema: Schema, factory: DecoderFactory) : this(schema, schema, factory)

private val datumReader = GenericDatumReader<GenericRecord>(/* writer = */ writer, /* reader = */ reader)

/**
* Creates an [BinaryReader] that reads from the given [InputStream].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,38 @@ class BinaryWriterFactory(
schema: Schema,
private val factory: EncoderFactory,
) {

/**
* Creates an [BinaryWriterFactory] with the default [EncoderFactory].
*/
constructor(schema: Schema) : this(schema, EncoderFactory.get())

companion object {

/**
* Creates an avro encoded byte array from the given [record].
* This method is a convenience function that is useful when you want to write a single record.
*
* Pass in a [Codec] to compress output.
*
* For better performance, considering creating a [BinaryWriterFactory] which will use
* a shared [GenericDatumWriter] and allows customizating the [EncoderFactory].
*/
fun write(record: GenericRecord, codec: Codec? = null): ByteArray {
val datumWriter = GenericDatumWriter<GenericRecord>(record.schema)

val writer = BinaryWriter(datumWriter, ByteArrayOutputStream(), EncoderFactory.get())
writer.write(record)
writer.close()
return if (codec == null) writer.bytes() else {
val compressed = codec.compress(ByteBuffer.wrap(writer.bytes()))
val b = ByteArray(compressed.remaining())
compressed.get(b)
b
}
}
}

private val datumWriter = GenericDatumWriter<GenericRecord>(schema)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class Serde<T : Any>(
private val options: SerdeOptions,
) {

companion object {

}

init {
if (options.fastReader)
GenericData.get().setFastReaderEnabled(true)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.sksamuel.centurion.avro

import com.sksamuel.centurion.avro.io.BinaryReaderFactory
import com.sksamuel.centurion.avro.io.BinaryWriterFactory
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
import org.apache.avro.util.Utf8

class EvolutionTest : FunSpec() {

init {
test("evolve schema by adding a nullable field") {

val schema1 = SchemaBuilder.record("foo").fields()
.requiredString("a")
.requiredBoolean("b")
.endRecord()

val schema2 = SchemaBuilder.record("foo").fields()
.requiredString("a")
.requiredBoolean("b")
.optionalString("c")
.endRecord()

val record1 = GenericData.Record(schema1)
record1.put("a", "hello")
record1.put("b", true)

val bytes = BinaryWriterFactory.write(record1)
val record2 = BinaryReaderFactory(schema2, schema1).read(bytes)

record2["a"] shouldBe Utf8("hello")
record2["b"] shouldBe true
record2["c"] shouldBe null

}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class SerdeTest : FunSpec({

test("round trip happy path") {
val user = User(Random.nextLong(), "sammy mcsamface", "[email protected]", Random.nextLong(), UserType.Admin)
val serde = Serde<User>()
val serde = ReflectionSerdeFactory.create<User>()
serde.deserialize(serde.serialize(user)) shouldBe user
}

Expand All @@ -24,8 +24,8 @@ class SerdeTest : FunSpec({
Random.nextLong(),
UserType.Admin
)
val serde1 = Serde<User>()
val serde2 = Serde<User>(SerdeOptions(codec = BZip2Codec()))
val serde1 = ReflectionSerdeFactory.create<User>()
val serde2 = ReflectionSerdeFactory.create<User>(SerdeOptions(codec = BZip2Codec()))
serde1.serialize(user).size shouldBeGreaterThan serde2.serialize(user).size
serde2.deserialize(serde2.serialize(user)) shouldBe user
}
Expand Down

0 comments on commit 1537180

Please sign in to comment.