Skip to content

Commit cbc1aa7

Browse files
authored
Fix: Support wrapped primitive types in Avro union fields (#379)
* Fix: Support wrapped primitive types in Avro union fields Fixes #376 The convertUnionField function was unable to handle wrapped primitive types in union fields (e.g., {"int": 20474} or {"int.date": 20474}). This caused serialization errors when users attempted to send data in this format for union types containing primitives with logical types. Changes: - Add getPrimitiveTypeName helper to map Avro schema types to primitive names - Update convertUnionField to recognize and unwrap wrapped primitive types - Handle logical type suffixes (e.g., "int.date" -> "int") by stripping suffix - Return unwrapped primitive values directly (hamba/avro expects unwrapped primitives) The fix ensures that both wrapped formats work: - {"int": 20474} -> unwraps to int32(20474) - {"int.date": 20474} -> strips suffix and unwraps to int32(20474) Added comprehensive tests to verify the fix works for various primitive types and union combinations. * Add test script * Fix linter issues * Fix Avro logical union date handling for issue #376 Normalize logical primitive union values to the discriminator format expected by hamba/avro, preventing float64/unknown union errors in nested schemas. Document supported input shapes for logical union fields and add regression coverage for serialize and conversion paths. * Refine logical union roundtrip regression test Replace the skipped logical union test with a focused roundtrip assertion that validates deserialize output type/value for date unions, avoiding overlap with serialize-path coverage. * Address golangci-lint findings in Avro conversion paths Fix unsafe byte narrowing in bytes conversion with explicit range checks and reduce duplicated logical date schema literals in tests by reusing a shared constant. * Update CI Go patch version for xk6 build compatibility Pin GitHub Actions setup-go to 1.25.8 and enable check-latest so go install xk6@latest meets the minimum Go patch requirement during CI builds.
1 parent b8a1e3a commit cbc1aa7

7 files changed

Lines changed: 910 additions & 22 deletions

File tree

.github/workflows/test.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ jobs:
3838
- name: Install Go 🧑‍💻
3939
uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0
4040
with:
41-
go-version: "1.25"
41+
go-version: "1.25.8"
42+
check-latest: true
4243
cache: false
4344

4445
- name: Run prettier for linting scripts, configs and docs 🧹

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ The example scripts are available as `test_<format/feature>.js` with more code a
636636
637637
## Avro Union Types
638638
639-
xk6-kafka uses `hamba/avro` for Avro serialization/deserialization. When working with Avro union types, you can provide union values directly without wrapping them in type-specific objects. For nullable fields, you can use `null` directly. See the [Schema Registry documentation](./docs/schema-registry.md#complex-schemas--manage-union-types) for detailed examples and best practices.
639+
xk6-kafka uses `hamba/avro` for Avro serialization/deserialization. When working with Avro union types, you can usually provide union values directly without wrapping them in type-specific objects. For nullable fields, you can use `null` directly. For logical primitive unions (for example `int` with `logicalType: "date"`), direct values and wrapped values like `{ "int": 20474 }` or `{ "int.date": 20474 }` are supported and normalized before encoding. See the [Schema Registry documentation](./docs/schema-registry.md#complex-schemas--manage-union-types) for detailed examples and best practices.
640640
641641
## Contributions, Issues and Feedback
642642

avro.go

Lines changed: 215 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"encoding/json"
55
"errors"
66
"fmt"
7+
"math"
8+
"strings"
79

810
"github.com/hamba/avro/v2"
911
)
@@ -21,6 +23,37 @@ type AvroSerde struct {
2123
Serdes
2224
}
2325

26+
func convertNumericValueToByte(value any) (byte, error) {
27+
switch val := value.(type) {
28+
case float64:
29+
if val < 0 || val > math.MaxUint8 || math.Trunc(val) != val {
30+
return 0, fmt.Errorf("%w: %v", ErrCannotConvertToByte, value)
31+
}
32+
//nolint:gosec // value is range-checked and integral before narrowing conversion
33+
return byte(val), nil
34+
case int:
35+
if val < 0 || val > math.MaxUint8 {
36+
return 0, fmt.Errorf("%w: %v", ErrCannotConvertToByte, value)
37+
}
38+
//nolint:gosec // value is range-checked before narrowing conversion
39+
return byte(val), nil
40+
case int32:
41+
if val < 0 || val > math.MaxUint8 {
42+
return 0, fmt.Errorf("%w: %v", ErrCannotConvertToByte, value)
43+
}
44+
//nolint:gosec // value is range-checked before narrowing conversion
45+
return byte(val), nil
46+
case int64:
47+
if val < 0 || val > math.MaxUint8 {
48+
return 0, fmt.Errorf("%w: %v", ErrCannotConvertToByte, value)
49+
}
50+
//nolint:gosec // value is range-checked before narrowing conversion
51+
return byte(val), nil
52+
default:
53+
return 0, fmt.Errorf("%w: %T", ErrCannotConvertToByte, value)
54+
}
55+
}
56+
2457
// convertPrimitiveType converts a primitive value to the correct Avro type.
2558
// Handles float64->int32/int64 conversion and array->bytes conversion.
2659
func convertPrimitiveType(data any, schema avro.Schema) (any, error) {
@@ -30,18 +63,11 @@ func convertPrimitiveType(data any, schema avro.Schema) (any, error) {
3063
if arr, ok := data.([]any); ok {
3164
bytes := make([]byte, len(arr))
3265
for i, v := range arr {
33-
switch val := v.(type) {
34-
case float64:
35-
bytes[i] = byte(val)
36-
case int:
37-
bytes[i] = byte(val)
38-
case int32:
39-
bytes[i] = byte(val)
40-
case int64:
41-
bytes[i] = byte(val)
42-
default:
66+
convertedByte, err := convertNumericValueToByte(v)
67+
if err != nil {
4368
return nil, fmt.Errorf("%w at index %d: %T", ErrCannotConvertToByte, i, v)
4469
}
70+
bytes[i] = convertedByte
4571
}
4672
return bytes, nil
4773
}
@@ -74,6 +100,135 @@ func convertPrimitiveType(data any, schema avro.Schema) (any, error) {
74100
}
75101
}
76102

103+
// getPrimitiveTypeName returns the Avro primitive type name for a given schema type,
104+
// or empty string if it's not a primitive type.
105+
func getPrimitiveTypeName(schemaType avro.Type) string {
106+
switch schemaType {
107+
case avro.Null:
108+
return "null"
109+
case avro.Boolean:
110+
return "boolean"
111+
case avro.Int:
112+
return "int"
113+
case avro.Long:
114+
return "long"
115+
case avro.Float:
116+
return "float"
117+
case avro.Double:
118+
return "double"
119+
case avro.Bytes:
120+
return "bytes"
121+
case avro.String:
122+
return "string"
123+
case avro.Record, avro.Error, avro.Ref, avro.Enum, avro.Array, avro.Map, avro.Union, avro.Fixed:
124+
return ""
125+
default:
126+
return ""
127+
}
128+
}
129+
130+
// getPrimitiveUnionDiscriminator returns the discriminator key expected by hamba/avro
131+
// for a primitive union branch. For logical primitive types, this is typically
132+
// "<primitive>.<logicalType>" (for example: "int.date").
133+
func getPrimitiveUnionDiscriminator(schema avro.Schema) string {
134+
if schema == nil {
135+
return ""
136+
}
137+
138+
if refSchema, ok := schema.(*avro.RefSchema); ok {
139+
schema = refSchema.Schema()
140+
}
141+
142+
primitive := getPrimitiveTypeName(schema.Type())
143+
if primitive == "" {
144+
return ""
145+
}
146+
147+
schemaString := schema.String()
148+
if !strings.HasPrefix(schemaString, "{") {
149+
return primitive
150+
}
151+
152+
var schemaMap map[string]any
153+
if err := json.Unmarshal([]byte(schemaString), &schemaMap); err != nil {
154+
return primitive
155+
}
156+
157+
logicalType, ok := schemaMap["logicalType"].(string)
158+
if !ok || logicalType == "" {
159+
return primitive
160+
}
161+
162+
return primitive + "." + logicalType
163+
}
164+
165+
// isValueCompatibleWithSchema checks whether value has a Go type compatible with the Avro schema.
166+
// It is used by union matching to avoid accepting a branch when conversion didn't actually produce
167+
// a value that can be encoded for that branch.
168+
func isValueCompatibleWithSchema(value any, schema avro.Schema) bool {
169+
if schema == nil {
170+
return false
171+
}
172+
173+
if refSchema, ok := schema.(*avro.RefSchema); ok {
174+
schema = refSchema.Schema()
175+
}
176+
177+
switch schema.Type() {
178+
case avro.Null:
179+
return value == nil
180+
case avro.Boolean:
181+
_, ok := value.(bool)
182+
return ok
183+
case avro.Int:
184+
switch value.(type) {
185+
case int32, int16, int8, int:
186+
return true
187+
default:
188+
return false
189+
}
190+
case avro.Long:
191+
switch value.(type) {
192+
case int64, int32, int16, int8, int:
193+
return true
194+
default:
195+
return false
196+
}
197+
case avro.Float:
198+
switch value.(type) {
199+
case float32, float64:
200+
return true
201+
default:
202+
return false
203+
}
204+
case avro.Double:
205+
switch value.(type) {
206+
case float64, float32:
207+
return true
208+
default:
209+
return false
210+
}
211+
case avro.Bytes:
212+
_, ok := value.([]byte)
213+
return ok
214+
case avro.String, avro.Enum:
215+
_, ok := value.(string)
216+
return ok
217+
case avro.Array:
218+
_, ok := value.([]any)
219+
return ok
220+
case avro.Map, avro.Record:
221+
_, ok := value.(map[string]any)
222+
return ok
223+
case avro.Union:
224+
return true
225+
case avro.Fixed, avro.Error, avro.Ref:
226+
return true
227+
default:
228+
return true
229+
}
230+
}
231+
77232
// convertUnionField converts a union field value, wrapping named schemas appropriately.
78233
func convertUnionField(fieldValue any, unionSchema *avro.UnionSchema) (any, error) {
79234
if fieldValue == nil {
@@ -88,7 +243,17 @@ func convertUnionField(fieldValue any, unionSchema *avro.UnionSchema) (any, erro
88243
// Check if it's already wrapped: {"typeName": value}
89244
if len(fieldValueMap) == 1 {
90245
for key, wrappedValue := range fieldValueMap {
91-
// Try to find matching named schema
246+
// First, try to match as a primitive type name (e.g., "int", "string")
247+
// Handle logical types like "int.date" by stripping the suffix
248+
primitiveKey := key
249+
for i := range key {
250+
if key[i] == '.' {
251+
primitiveKey = key[:i]
252+
break
253+
}
254+
}
255+
256+
// Try to find matching primitive type
92257
for _, unionType := range types {
93258
if unionType.Type() == avro.Null {
94259
continue
@@ -97,6 +262,26 @@ func convertUnionField(fieldValue any, unionSchema *avro.UnionSchema) (any, erro
97262
if refSchema, ok := unionType.(*avro.RefSchema); ok {
98263
actualType = refSchema.Schema()
99264
}
265+
266+
// Check if this is a primitive type matching the key
267+
if primitiveName := getPrimitiveTypeName(actualType.Type()); primitiveName != "" && primitiveName == primitiveKey {
268+
// Found matching primitive type, unwrap and convert
269+
converted, err := convertFloat64ToIntForIntegerFields(wrappedValue, actualType)
270+
if err != nil {
271+
return nil, err
272+
}
273+
if !isValueCompatibleWithSchema(converted, actualType) {
274+
continue
275+
}
276+
discriminator := getPrimitiveUnionDiscriminator(actualType)
277+
if discriminator != "" && discriminator != primitiveName {
278+
return map[string]any{discriminator: converted}, nil
279+
}
280+
// Return unwrapped value for non-logical primitives.
281+
return converted, nil
282+
}
283+
284+
// Try to find matching named schema
100285
if namedSchema, ok := actualType.(avro.NamedSchema); ok && namedSchema.FullName() == key {
101286
// Already wrapped, convert nested value
102287
converted, err := convertFloat64ToIntForIntegerFields(wrappedValue, actualType)
@@ -140,7 +325,20 @@ func convertUnionField(fieldValue any, unionSchema *avro.UnionSchema) (any, erro
140325
actualType = refSchema.Schema()
141326
}
142327

143-
// Named schemas (enums, fixed) need wrapping
328+
// Primitive types should stay unwrapped.
329+
if primitiveName := getPrimitiveTypeName(actualType.Type()); primitiveName != "" {
330+
converted, err := convertFloat64ToIntForIntegerFields(fieldValue, actualType)
331+
if err == nil && isValueCompatibleWithSchema(converted, actualType) {
332+
discriminator := getPrimitiveUnionDiscriminator(actualType)
333+
if discriminator != "" && discriminator != primitiveName {
334+
return map[string]any{discriminator: converted}, nil
335+
}
336+
return converted, nil
337+
}
338+
continue
339+
}
340+
341+
// Named schemas (enums, fixed, records) need wrapping.
144342
if namedSchema, ok := actualType.(avro.NamedSchema); ok {
145343
if actualType.Type() == avro.Enum {
146344
// Enums are strings, wrap directly
@@ -151,12 +349,6 @@ func convertUnionField(fieldValue any, unionSchema *avro.UnionSchema) (any, erro
151349
if err == nil {
152350
return map[string]any{namedSchema.FullName(): converted}, nil
153351
}
154-
} else {
155-
// Primitive types, convert and return directly
156-
converted, err := convertFloat64ToIntForIntegerFields(fieldValue, actualType)
157-
if err == nil {
158-
return converted, nil
159-
}
160352
}
161353
}
162354

@@ -230,7 +422,11 @@ func convertFloat64ToIntForIntegerFields(data any, schema avro.Schema) (any, err
230422

231423
return convertedMap, nil
232424
case avro.Union:
233-
fallthrough
425+
unionSchema, ok := schema.(*avro.UnionSchema)
426+
if !ok {
427+
return data, nil
428+
}
429+
return convertUnionField(data, unionSchema)
234430
case avro.Error, avro.Ref, avro.Enum, avro.Fixed, avro.String,
235431
avro.Float, avro.Double, avro.Boolean, avro.Null:
236432
fallthrough

0 commit comments

Comments
 (0)