Skip to content

Commit 575e4e2

Browse files
committed
[Avro] Add support for @union serialization
1 parent 52f58f7 commit 575e4e2

File tree

9 files changed

+302
-83
lines changed

9 files changed

+302
-83
lines changed

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/AvroAnnotationIntrospector.java

+16
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.fasterxml.jackson.dataformat.avro;
22

3+
import java.util.ArrayList;
34
import java.util.Collections;
45
import java.util.List;
56

@@ -15,6 +16,7 @@
1516
import com.fasterxml.jackson.databind.introspect.AnnotatedConstructor;
1617
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
1718
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
19+
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaHelper;
1820
/**
1921
* Adds support for the following annotations from the Apache Avro implementation:
2022
* <ul>
@@ -26,6 +28,7 @@
2628
* <li>{@link Nullable @Nullable} - Alias for <code>JsonProperty(required = false)</code></li>
2729
* <li>{@link Stringable @Stringable} - Alias for <code>JsonCreator</code> on the constructor and <code>JsonValue</code> on
2830
* the {@link #toString()} method. </li>
31+
* <li>{@link Union @Union} - Alias for <code>JsonSubTypes</code></li>
2932
* </ul>
3033
*
3134
* @since 2.9
@@ -107,4 +110,17 @@ public Object findSerializer(Annotated a) {
107110
}
108111
return null;
109112
}
113+
114+
@Override
115+
public List<NamedType> findSubtypes(Annotated a) {
116+
Union union = _findAnnotation(a, Union.class);
117+
if (union == null) {
118+
return null;
119+
}
120+
ArrayList<NamedType> names = new ArrayList<>(union.value().length);
121+
for (Class<?> subtype : union.value()) {
122+
names.add(new NamedType(subtype, AvroSchemaHelper.getTypeId(subtype)));
123+
}
124+
return names;
125+
}
110126
}

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/AvroGenerator.java

+26-7
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
11
package com.fasterxml.jackson.dataformat.avro;
22

3-
import com.fasterxml.jackson.core.*;
4-
import com.fasterxml.jackson.core.base.GeneratorBase;
5-
import com.fasterxml.jackson.core.io.IOContext;
6-
import com.fasterxml.jackson.dataformat.avro.ser.AvroWriteContext;
7-
8-
import org.apache.avro.io.BinaryEncoder;
9-
103
import java.io.IOException;
114
import java.io.OutputStream;
125
import java.math.BigDecimal;
136
import java.math.BigInteger;
147
import java.nio.ByteBuffer;
158

9+
import org.apache.avro.io.BinaryEncoder;
10+
11+
import com.fasterxml.jackson.core.Base64Variant;
12+
import com.fasterxml.jackson.core.FormatFeature;
13+
import com.fasterxml.jackson.core.FormatSchema;
14+
import com.fasterxml.jackson.core.JsonGenerationException;
15+
import com.fasterxml.jackson.core.JsonGenerator;
16+
import com.fasterxml.jackson.core.ObjectCodec;
17+
import com.fasterxml.jackson.core.PrettyPrinter;
18+
import com.fasterxml.jackson.core.SerializableString;
19+
import com.fasterxml.jackson.core.Version;
20+
import com.fasterxml.jackson.core.base.GeneratorBase;
21+
import com.fasterxml.jackson.core.io.IOContext;
22+
import com.fasterxml.jackson.dataformat.avro.ser.AvroWriteContext;
23+
1624
public class AvroGenerator extends GeneratorBase
1725
{
1826
/**
@@ -381,6 +389,17 @@ public final void writeStartObject() throws IOException {
381389
_complete = false;
382390
}
383391

392+
@Override
393+
public void writeStartObject(Object forValue) throws IOException {
394+
_avroContext = _avroContext.createChildObjectContext(forValue);
395+
_complete = false;
396+
if(this._writeContext != null && forValue != null) {
397+
this._writeContext.setCurrentValue(forValue);
398+
}
399+
400+
this.setCurrentValue(forValue);
401+
}
402+
384403
@Override
385404
public final void writeEndObject() throws IOException
386405
{

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/schema/AvroSchemaHelper.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,10 @@ protected static <T> T throwUnsupported() {
192192
throw new UnsupportedOperationException("Format variation not supported");
193193
}
194194

195+
public static String getTypeId(JavaType type) {
196+
return getTypeId(type.getRawClass());
197+
}
198+
195199
/**
196200
* Initializes a record schema with metadata from the given class; this schema is returned in a non-finalized state, and still
197201
* needs to have fields added to it.
@@ -227,11 +231,11 @@ public static Schema createEnumSchema(BeanDescription bean, List<String> values)
227231
/**
228232
* Returns the Avro type ID for a given type
229233
*/
230-
protected static String getTypeId(JavaType type) {
234+
public static String getTypeId(Class<?> type) {
231235
// Primitives use the name of the wrapper class as their type ID
232236
if (type.isPrimitive()) {
233-
return ClassUtil.wrapperType(type.getRawClass()).getName();
237+
return ClassUtil.wrapperType(type).getName();
234238
}
235-
return type.getRawClass().getName();
239+
return type.getName();
236240
}
237241
}

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/schema/RecordVisitor.java

+16
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.fasterxml.jackson.databind.*;
1717
import com.fasterxml.jackson.databind.jsonFormatVisitors.JsonFormatVisitable;
1818
import com.fasterxml.jackson.databind.jsonFormatVisitors.JsonObjectFormatVisitor;
19+
import com.fasterxml.jackson.databind.jsontype.NamedType;
1920
import com.fasterxml.jackson.databind.ser.BeanPropertyWriter;
2021
import com.fasterxml.jackson.dataformat.avro.AvroFixedSize;
2122

@@ -44,10 +45,25 @@ public RecordVisitor(SerializerProvider p, JavaType type, DefinedSchemas schemas
4445
_schemas = schemas;
4546
// Check if the schema for this record is overridden
4647
BeanDescription bean = getProvider().getConfig().introspectDirectClassAnnotations(_type);
48+
List<NamedType> subTypes = getProvider().getAnnotationIntrospector().findSubtypes(bean.getClassInfo());
4749
AvroSchema ann = bean.getClassInfo().getAnnotation(AvroSchema.class);
4850
if (ann != null) {
4951
_avroSchema = AvroSchemaHelper.parseJsonSchema(ann.value());
5052
_overridden = true;
53+
} else if (subTypes != null && !subTypes.isEmpty()) {
54+
List<Schema> unionSchemas = new ArrayList<>();
55+
try {
56+
for (NamedType subType : subTypes) {
57+
JsonSerializer ser = getProvider().findValueSerializer(subType.getType());
58+
VisitorFormatWrapperImpl visitor = new VisitorFormatWrapperImpl(_schemas, getProvider());
59+
ser.acceptJsonFormatVisitor(visitor, getProvider().getTypeFactory().constructType(subType.getType()));
60+
unionSchemas.add(visitor.getAvroSchema());
61+
}
62+
_avroSchema = Schema.createUnion(unionSchemas);
63+
_overridden = true;
64+
} catch (JsonMappingException jme) {
65+
throw new RuntimeException("Failed to build schema", jme);
66+
}
5167
} else {
5268
_avroSchema = AvroSchemaHelper.initializeRecordSchema(bean);
5369
_overridden = false;

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/ser/ArrayWriteContext.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,14 @@ public final AvroWriteContext createChildObjectContext() throws JsonMappingExcep
3434
_array.add(child.rawValue());
3535
return child;
3636
}
37-
37+
38+
@Override
39+
public AvroWriteContext createChildObjectContext(Object object) throws JsonMappingException {
40+
AvroWriteContext child = _createObjectContext(_schema.getElementType(), object);
41+
_array.add(child.rawValue());
42+
return child;
43+
}
44+
3845
@Override
3946
public void writeValue(Object value) {
4047
_array.add(value);

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/ser/AvroWriteContext.java

+91-18
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
package com.fasterxml.jackson.dataformat.avro.ser;
22

33
import java.io.IOException;
4+
import java.math.BigDecimal;
45

56
import org.apache.avro.Schema;
67
import org.apache.avro.Schema.Type;
7-
import org.apache.avro.generic.*;
8+
import org.apache.avro.UnresolvedUnionException;
9+
import org.apache.avro.generic.GenericArray;
10+
import org.apache.avro.generic.GenericData;
11+
import org.apache.avro.generic.GenericRecord;
812
import org.apache.avro.io.BinaryEncoder;
13+
import org.apache.avro.reflect.ReflectData;
914

1015
import com.fasterxml.jackson.core.JsonStreamContext;
1116
import com.fasterxml.jackson.databind.JsonMappingException;
1217
import com.fasterxml.jackson.dataformat.avro.AvroGenerator;
18+
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaHelper;
1319

1420
public abstract class AvroWriteContext
1521
extends JsonStreamContext
@@ -58,8 +64,9 @@ public void complete() throws IOException {
5864
throw new IllegalStateException("Can not be called on "+getClass().getName());
5965
}
6066

61-
/*
62-
/**********************************************************
67+
68+
public AvroWriteContext createChildObjectContext(Object object) throws JsonMappingException { return createChildObjectContext(); }
69+
6370
/* Accessors
6471
/**********************************************************
6572
*/
@@ -140,33 +147,36 @@ protected GenericRecord _createRecord(Schema schema) throws JsonMappingException
140147
throw new JsonMappingException(null, "Failed to create Record type from "+type, e);
141148
}
142149
}
143-
150+
144151
protected GenericArray<Object> _createArray(Schema schema)
145152
{
146153
if (schema.getType() == Schema.Type.UNION) {
147-
Schema match = null;
148-
for (Schema s : schema.getTypes()) {
149-
if (s.getType() == Schema.Type.ARRAY) {
150-
if (match != null) {
151-
throw new IllegalStateException("Multiple Array types, can not figure out which to use for: "
152-
+schema);
153-
}
154-
match = s;
155-
}
156-
}
157-
if (match == null) {
154+
int arraySchemaIndex = schema.getIndexNamed(Type.ARRAY.getName());
155+
if (arraySchemaIndex < 0) {
158156
throw new IllegalStateException("No Array type found in union type: "+schema);
159157
}
160-
schema = match;
158+
schema = schema.getTypes().get(arraySchemaIndex);
161159
}
162160
return new GenericData.Array<Object>(8, schema);
163161
}
164162

165-
protected AvroWriteContext _createObjectContext(Schema schema) throws JsonMappingException
163+
protected AvroWriteContext _createObjectContext(Schema schema) throws JsonMappingException {
164+
if (schema.getType() == Type.UNION) {
165+
schema = _recordOrMapFromUnion(schema);
166+
}
167+
return _createObjectContext(schema, null); // Object doesn't matter as long as schema isn't a union
168+
}
169+
170+
protected AvroWriteContext _createObjectContext(Schema schema, Object object) throws JsonMappingException
166171
{
167172
Type type = schema.getType();
168173
if (type == Schema.Type.UNION) {
169-
schema = _recordOrMapFromUnion(schema);
174+
try {
175+
schema = resolveUnionSchema(schema, object);
176+
} catch (UnresolvedUnionException e) {
177+
// couldn't find an exact match
178+
schema = _recordOrMapFromUnion(schema);
179+
}
170180
type = schema.getType();
171181
}
172182
if (type == Schema.Type.MAP) {
@@ -194,6 +204,69 @@ protected Schema _recordOrMapFromUnion(Schema unionSchema)
194204
return match;
195205
}
196206

207+
/**
208+
* Resolves the sub-schema from a union that should correspond to the {@code datum}.
209+
*
210+
* @param unionSchema Union of schemas from which to choose
211+
* @param datum Object that needs to map to one of the schemas in {@code unionSchema}
212+
* @return Index into {@link Schema#getTypes() unionSchema.getTypes()} that matches {@code datum}
213+
* @see #resolveUnionSchema(Schema, Object)
214+
* @throws org.apache.avro.UnresolvedUnionException if {@code unionSchema} does not have a schema that can encode {@code datum}
215+
*/
216+
public static int resolveUnionIndex(Schema unionSchema, Object datum) {
217+
if (datum != null) {
218+
int subOptimal = -1;
219+
for(int i = 0, size = unionSchema.getTypes().size(); i < size; i++) {
220+
Schema schema = unionSchema.getTypes().get(i);
221+
if (datum instanceof BigDecimal) {
222+
// BigDecimals can be shoved into a double, but optimally would be a String or byte[] with logical type information
223+
if (schema.getType() == Type.DOUBLE) {
224+
subOptimal = i;
225+
continue;
226+
}
227+
}
228+
if (datum instanceof String) {
229+
// Jackson serializes enums as strings, so try and find a matching schema
230+
if (schema.getType() == Type.ENUM && schema.hasEnumSymbol((String) datum)) {
231+
return i;
232+
}
233+
// Jackson serializes char/Character as a string, so try and find a matching schema
234+
if (schema.getType() == Type.INT
235+
&& ((String) datum).length() == 1
236+
&& AvroSchemaHelper.getTypeId(Character.class).equals(schema.getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_CLASS))
237+
) {
238+
return i;
239+
}
240+
// Jackson serializes char[]/Character[] as a string, so try and find a matching schema
241+
if (schema.getType() == Type.ARRAY
242+
&& schema.getElementType().getType() == Type.INT
243+
&& AvroSchemaHelper.getTypeId(Character.class).equals(schema.getElementType().getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_CLASS))
244+
) {
245+
return i;
246+
}
247+
}
248+
}
249+
// Did we find a sub-optimal match?
250+
if (subOptimal != -1) {
251+
return subOptimal;
252+
}
253+
}
254+
return ReflectData.get().resolveUnion(unionSchema, datum);
255+
}
256+
257+
/**
258+
* Resolves the sub-schema from a union that should correspond to the {@code datum}.
259+
*
260+
* @param unionSchema Union of schemas from which to choose
261+
* @param datum Object that needs to map to one of the schemas in {@code unionSchema}
262+
* @return Schema that matches {@code datum}
263+
* @see #resolveUnionIndex(Schema, Object)
264+
* @throws org.apache.avro.UnresolvedUnionException if {@code unionSchema} does not have a schema that can encode {@code datum}
265+
*/
266+
public static Schema resolveUnionSchema(Schema unionSchema, Object datum) {
267+
return unionSchema.getTypes().get(resolveUnionIndex(unionSchema, datum));
268+
}
269+
197270
/*
198271
/**********************************************************
199272
/* Implementations

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/ser/NonBSGenericDatumWriter.java

+1-49
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,13 @@
44
import java.math.BigDecimal;
55
import java.math.BigInteger;
66
import java.util.ArrayList;
7-
import java.util.List;
87

98
import org.apache.avro.Schema;
109
import org.apache.avro.Schema.Type;
1110
import org.apache.avro.generic.GenericData;
1211
import org.apache.avro.generic.GenericDatumWriter;
1312
import org.apache.avro.io.Encoder;
1413

15-
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaHelper;
16-
1714
/**
1815
* Need to sub-class to prevent encoder from crapping on writing an optional
1916
* Enum value (see [dataformat-avro#12])
@@ -31,52 +28,7 @@ public NonBSGenericDatumWriter(Schema root) {
3128

3229
@Override
3330
public int resolveUnion(Schema union, Object datum) {
34-
// Alas, we need a work-around first...
35-
if (datum == null) {
36-
return union.getIndexNamed(Type.NULL.getName());
37-
}
38-
List<Schema> schemas = union.getTypes();
39-
if (datum instanceof String) { // String or Enum or Character or char[]
40-
for (int i = 0, len = schemas.size(); i < len; i++) {
41-
Schema s = schemas.get(i);
42-
switch (s.getType()) {
43-
case STRING:
44-
case ENUM:
45-
return i;
46-
case INT:
47-
// Avro distinguishes between String and Character, whereas Jackson doesn't
48-
// Check if the schema is expecting a Character and handle appropriately
49-
if (Character.class.getName().equals(s.getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_CLASS))) {
50-
return i;
51-
}
52-
break;
53-
case ARRAY:
54-
// Avro distinguishes between String and char[], whereas Jackson doesn't
55-
// Check if the schema is expecting a char[] and handle appropriately
56-
if (s.getElementType().getType() == Type.INT && Character.class
57-
.getName().equals(s.getElementType().getProp(AvroSchemaHelper.AVRO_SCHEMA_PROP_CLASS))) {
58-
return i;
59-
}
60-
break;
61-
default:
62-
}
63-
}
64-
} else if (datum instanceof BigDecimal) {
65-
int subOptimal = -1;
66-
for (int i = 0, len = schemas.size(); i < len; i++) {
67-
if (schemas.get(i).getType() == Type.STRING) {
68-
return i;
69-
}
70-
if (schemas.get(i).getType() == Type.DOUBLE) {
71-
subOptimal = i;
72-
}
73-
}
74-
if (subOptimal > -1) {
75-
return subOptimal;
76-
}
77-
}
78-
// otherwise just default to base impl, stupid as it is...
79-
return super.resolveUnion(union, datum);
31+
return AvroWriteContext.resolveUnionIndex(union, datum);
8032
}
8133

8234
@Override

0 commit comments

Comments
 (0)