Skip to content

Commit cb1a58d

Browse files
committed
[Avro] Add support for @AvroEncode annotation
1 parent bc89692 commit cb1a58d

18 files changed

+671
-13
lines changed

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/AvroAnnotationIntrospector.java

+16
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import com.fasterxml.jackson.databind.jsontype.NamedType;
2121
import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder;
2222
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
23+
import com.fasterxml.jackson.databind.util.ClassUtil;
24+
import com.fasterxml.jackson.dataformat.avro.deser.CustomEncodingDeserializer;
2325
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaHelper;
26+
import com.fasterxml.jackson.dataformat.avro.ser.CustomEncodingSerializer;
2427

2528
/**
2629
* Adds support for the following annotations from the Apache Avro implementation:
@@ -62,6 +65,15 @@ public PropertyName findNameForDeserialization(Annotated a) {
6265
return _findName(a);
6366
}
6467

68+
@Override
69+
public Object findDeserializer(Annotated am) {
70+
AvroEncode ann = _findAnnotation(am, AvroEncode.class);
71+
if (ann != null) {
72+
return new CustomEncodingDeserializer<>((CustomEncoding)ClassUtil.createInstance(ann.using(), true));
73+
}
74+
return null;
75+
}
76+
6577
@Override
6678
public String findPropertyDefaultValue(Annotated m) {
6779
AvroDefault ann = _findAnnotation(m, AvroDefault.class);
@@ -113,6 +125,10 @@ public Object findSerializer(Annotated a) {
113125
if (a.hasAnnotation(Stringable.class)) {
114126
return ToStringSerializer.class;
115127
}
128+
AvroEncode ann = _findAnnotation(a, AvroEncode.class);
129+
if (ann != null) {
130+
return new CustomEncodingSerializer<>((CustomEncoding)ClassUtil.createInstance(ann.using(), true));
131+
}
116132
return null;
117133
}
118134

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/AvroGenerator.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,11 @@
88

99
import org.apache.avro.io.BinaryEncoder;
1010

11-
import com.fasterxml.jackson.core.Base64Variant;
12-
import com.fasterxml.jackson.core.FormatFeature;
13-
import com.fasterxml.jackson.core.FormatSchema;
14-
import com.fasterxml.jackson.core.JsonGenerationException;
15-
import com.fasterxml.jackson.core.JsonGenerator;
16-
import com.fasterxml.jackson.core.ObjectCodec;
17-
import com.fasterxml.jackson.core.PrettyPrinter;
18-
import com.fasterxml.jackson.core.SerializableString;
19-
import com.fasterxml.jackson.core.Version;
11+
import com.fasterxml.jackson.core.*;
2012
import com.fasterxml.jackson.core.base.GeneratorBase;
2113
import com.fasterxml.jackson.core.io.IOContext;
2214
import com.fasterxml.jackson.dataformat.avro.ser.AvroWriteContext;
15+
import com.fasterxml.jackson.dataformat.avro.ser.EncodedDatum;
2316

2417
public class AvroGenerator extends GeneratorBase
2518
{
@@ -458,6 +451,15 @@ public final void writeUTF8String(byte[] text, int offset, int len) throws IOExc
458451
/**********************************************************
459452
*/
460453

454+
@Override
455+
public void writeEmbeddedObject(Object object) throws IOException {
456+
if (object instanceof EncodedDatum) {
457+
_avroContext.writeValue(object);
458+
return;
459+
}
460+
super.writeEmbeddedObject(object);
461+
}
462+
461463
@Override
462464
public void writeRaw(String text) throws IOException {
463465
_reportUnsupportedOperation();

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/AvroParser.java

+16
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
import com.fasterxml.jackson.core.io.IOContext;
1010
import com.fasterxml.jackson.core.json.JsonReadContext;
1111
import com.fasterxml.jackson.core.util.ByteArrayBuilder;
12+
import com.fasterxml.jackson.dataformat.avro.deser.ArrayReader;
1213
import com.fasterxml.jackson.dataformat.avro.deser.AvroReadContext;
14+
import com.fasterxml.jackson.dataformat.avro.deser.MapReader;
1315
import com.fasterxml.jackson.dataformat.avro.deser.MissingReader;
1416

1517
/**
@@ -257,6 +259,20 @@ public JsonLocation getCurrentLocation()
257259
// !!! TODO
258260
return null;
259261
}
262+
263+
/**
264+
* Returns the remaining number of elements in the current block of a map or array
265+
*/
266+
public long getRemainingElements() {
267+
268+
if ( _avroContext instanceof ArrayReader) {
269+
return ((ArrayReader) _avroContext).getRemainingElements();
270+
}
271+
if (_avroContext instanceof MapReader) {
272+
return ((MapReader) _avroContext).getRemainingElements();
273+
}
274+
return -1;
275+
}
260276

261277
/*
262278
/**********************************************************
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.fasterxml.jackson.dataformat.avro;
2+
3+
import java.io.IOException;
4+
import java.lang.reflect.InvocationTargetException;
5+
import java.lang.reflect.Method;
6+
7+
import org.apache.avro.Schema;
8+
import org.apache.avro.io.Decoder;
9+
import org.apache.avro.io.Encoder;
10+
import org.apache.avro.reflect.CustomEncoding;
11+
12+
/**
13+
* Wrapper that makes the methods on a {@link CustomEncoding} accessible since they are otherwise package-private.
14+
*/
15+
public class CustomEncodingWrapper<T> {
16+
private static final Method GET_SCHEMA;
17+
private static final Method READ;
18+
private static final Method WRITE;
19+
20+
static {
21+
try {
22+
GET_SCHEMA = CustomEncoding.class.getDeclaredMethod("getSchema");
23+
READ = CustomEncoding.class.getDeclaredMethod("read", Object.class, Decoder.class);
24+
WRITE = CustomEncoding.class.getDeclaredMethod("write", Object.class, Encoder.class);
25+
GET_SCHEMA.setAccessible(true);
26+
READ.setAccessible(true);
27+
WRITE.setAccessible(true);
28+
} catch (NoSuchMethodException e) {
29+
throw new RuntimeException("Failed to initialize CustomEncoderWrapper, Avro version mismatch?", e);
30+
}
31+
}
32+
33+
private final CustomEncoding<T> encoding;
34+
35+
public CustomEncodingWrapper(CustomEncoding<T> encoding) {
36+
this.encoding = encoding;
37+
}
38+
39+
public void write(Object datum, Encoder out) throws IOException {
40+
try {
41+
WRITE.invoke(encoding, datum, out);
42+
} catch (IllegalAccessException | InvocationTargetException e) {
43+
throw new RuntimeException("Failed to encode object", e);
44+
}
45+
}
46+
47+
public Schema getSchema() {
48+
try {
49+
return (Schema) GET_SCHEMA.invoke(encoding);
50+
} catch (IllegalAccessException | InvocationTargetException e) {
51+
throw new RuntimeException("Failed to access schema", e);
52+
}
53+
}
54+
55+
public T read(Object reuse, Decoder in) throws IOException {
56+
try {
57+
return (T) READ.invoke(encoding, reuse, in);
58+
} catch (IllegalAccessException | InvocationTargetException e) {
59+
throw new RuntimeException("Failed to decode object", e);
60+
}
61+
}
62+
}

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/deser/ArrayReader.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import com.fasterxml.jackson.core.JsonToken;
66

7-
abstract class ArrayReader extends AvroStructureReader
7+
public abstract class ArrayReader extends AvroStructureReader
88
{
99
protected final static int STATE_START = 0;
1010
protected final static int STATE_ELEMENTS = 1;
@@ -55,6 +55,10 @@ protected void appendDesc(StringBuilder sb) {
5555
sb.append(']');
5656
}
5757

58+
public long getRemainingElements() {
59+
return _count - _index;
60+
}
61+
5862
@Override
5963
public String getTypeId() {
6064
return _currToken != JsonToken.START_ARRAY && _currToken != JsonToken.END_ARRAY ? _elementTypeId : super.getTypeId();

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/deser/AvroParserImpl.java

+43-2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ public class AvroParserImpl extends AvroParser
5555
*/
5656
protected BinaryDecoder _decoder;
5757

58+
/**
59+
* Index of the union branch that was followed to reach the current token. This is cleared when the next token is read.
60+
*/
61+
protected int _branchIndex;
62+
63+
/**
64+
* Index of the enum that was read as the current token. This is cleared when the next token is read.
65+
*/
66+
protected int _enumIndex;
67+
5868
/*
5969
/**********************************************************
6070
/* Life-cycle
@@ -101,6 +111,23 @@ protected void _releaseBuffers() throws IOException {
101111
}
102112
}
103113

114+
/**
115+
* Skip to the end of the current structure (array/map/object); This is different from {@link #skipMap()} and {@link #skipArray()}
116+
* because it operates at the parser level instead of at the decoder level and advances the parsing context in addition to consuming
117+
* the data from the input.
118+
*
119+
* @throws IOException If there was an issue advancing through the underlying data stream
120+
*/
121+
protected void skipValue() throws IOException {
122+
if (_avroContext instanceof ArrayReader) {
123+
((ArrayReader) _avroContext).skipValue(this);
124+
} else if (_avroContext instanceof MapReader) {
125+
((MapReader) _avroContext).skipValue(this);
126+
} else if (_avroContext instanceof RecordReader) {
127+
((RecordReader) _avroContext).skipValue(this);
128+
}
129+
}
130+
104131
@Override
105132
public JsonParser overrideFormatFeatures(int values, int mask) {
106133
int oldF = _formatFeatures;
@@ -141,6 +168,8 @@ protected void _closeInput() throws IOException {
141168
@Override
142169
public JsonToken nextToken() throws IOException
143170
{
171+
_branchIndex = -1;
172+
_enumIndex = -1;
144173
_binaryValue = null;
145174
if (_closed) {
146175
return null;
@@ -331,11 +360,11 @@ public long skipMap() throws IOException {
331360
// // // Misc other decoding
332361

333362
public int decodeIndex() throws IOException {
334-
return _decoder.readIndex();
363+
return (_branchIndex = _decoder.readIndex());
335364
}
336365

337366
public int decodeEnum() throws IOException {
338-
return _decoder.readEnum();
367+
return (_enumIndex = _decoder.readEnum());
339368
}
340369

341370
public boolean checkInputEnd() throws IOException {
@@ -347,6 +376,18 @@ public boolean checkInputEnd() throws IOException {
347376
/* Methods for AvroReadContext impls, other
348377
/**********************************************************
349378
*/
379+
380+
protected int branchIndex() {
381+
return _branchIndex;
382+
}
383+
384+
protected int enumIndex() {
385+
return _enumIndex;
386+
}
387+
388+
protected boolean isRecord() {
389+
return _avroContext instanceof RecordReader;
390+
}
350391

351392
protected void setAvroContext(AvroReadContext ctxt) {
352393
if (ctxt == null) { // sanity check

0 commit comments

Comments
 (0)