Skip to content

Commit

Permalink
Faster serialization of Avro unions (#6880)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrp authored Sep 17, 2024
1 parent 11f684f commit 662f0d6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
2 changes: 2 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
* [#6826](https://github.com/TouK/nussknacker/pull/6826) Security fix: added validation of expression used inside
indexer for Maps and Lists (for example `{1,2,3}[#otherList.remove(1) == null ? 0 : 0]`). This allowed executing
some types of unallowed expressions.
* [#6880](https://github.com/TouK/nussknacker/pull/6880) Performance optimization of generating Avro messages with unions
- shorter message in logs

## 1.17

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat
case (Schema.Type.RECORD, map: util.Map[String @unchecked, _]) =>
encodeRecord(map, schema)
case (Schema.Type.ENUM, symbol: CharSequence) =>
encodeEnumOrError(symbol.toString, schema, fieldName)
encodeEnum(symbol.toString, schema, fieldName)
case (Schema.Type.ENUM, symbol: EnumSymbol) =>
encodeEnumOrError(symbol.toString, schema, fieldName)
encodeEnum(symbol.toString, schema, fieldName)
case (Schema.Type.ARRAY, collection: Iterable[_]) =>
encodeCollection(collection, schema)
case (Schema.Type.ARRAY, collection: util.Collection[_]) =>
Expand All @@ -61,6 +61,8 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat
case (Schema.Type.MAP, map: util.Map[_, _]) =>
encodeMap(map.asScala, schema)
case (Schema.Type.UNION, _) =>
// Note: calling 'toString' on Avro schema is expensive, especially when we reject some messages.
// Error messages should be lazily evaluated, and materialized only when exiting public functions.
schema.getTypes.asScala
.to(LazyList)
.flatMap { subTypeSchema =>
Expand Down Expand Up @@ -139,9 +141,9 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat
case (Schema.Type.NULL, None) =>
Valid(null)
case (_, null) =>
error(s"Not expected null for field: $fieldName with schema: $schema")
error(s"Not expected null for field: $fieldName with schema: ${schema.getFullName}")
case (_, _) =>
error(s"Not expected type: ${value.getClass.getName} for field: $fieldName with schema: $schema")
error(s"Not expected type: ${value.getClass.getName} for field: $fieldName with schema: ${schema.getFullName}")
}
}

Expand All @@ -150,11 +152,15 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat
decimal.setScale(decimalLogicalType.getScale, RoundingMode.DOWN).bigDecimal
}

def encodeEnumOrError(symbol: String, schema: Schema, fieldName: Option[String]): WithError[EnumSymbol] =
if (!schema.hasEnumSymbol(symbol))
error(s"Not expected symbol: $symbol for field: $fieldName with schema: $schema")
else
private def encodeEnum(symbol: String, schema: Schema, fieldName: Option[String]): WithError[EnumSymbol] =
if (!schema.hasEnumSymbol(symbol)) {
val allowedEnumValues = schema.getEnumSymbols.asScala.mkString(", ")
error(
s"Not expected symbol: $symbol for field: $fieldName with schema: ${schema.getFullName}, allowed values: $allowedEnumValues"
)
} else {
Valid(new EnumSymbol(schema, symbol))
}

def encodeRecordOrError(fields: collection.Map[String, _], schema: Schema): GenericData.Record = {
encodeRecordOrError(fields.asJava, schema)
Expand Down Expand Up @@ -222,7 +228,7 @@ class ToAvroSchemaBasedEncoder(avroSchemaEvolution: AvroSchemaEvolution, validat

private def encodeFixed(bytes: Array[Byte], schema: Schema): WithError[GenericData.Fixed] = {
if (bytes.length != schema.getFixedSize) {
error(s"Fixed size not matches: ${bytes.length} != ${schema.getFixedSize} for schema: $schema")
error(s"Fixed size not matches: ${bytes.length} != ${schema.getFixedSize} for schema: ${schema.getFullName}")
} else {
val fixed = new GenericData.Fixed(schema)
fixed.bytes(bytes)
Expand Down

0 comments on commit 662f0d6

Please sign in to comment.