Skip to content

[Avro] Add support for @AvroEncode annotation #69

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.databind.util.ClassUtil;
import com.fasterxml.jackson.dataformat.avro.deser.CustomEncodingDeserializer;
import com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaHelper;
import com.fasterxml.jackson.dataformat.avro.ser.CustomEncodingSerializer;

/**
* Adds support for the following annotations from the Apache Avro implementation:
Expand Down Expand Up @@ -62,6 +65,15 @@ public PropertyName findNameForDeserialization(Annotated a) {
return _findName(a);
}

@Override
public Object findDeserializer(Annotated am) {
AvroEncode ann = _findAnnotation(am, AvroEncode.class);
if (ann != null) {
return new CustomEncodingDeserializer<>((CustomEncoding)ClassUtil.createInstance(ann.using(), true));
}
return null;
}

@Override
public String findPropertyDefaultValue(Annotated m) {
AvroDefault ann = _findAnnotation(m, AvroDefault.class);
Expand Down Expand Up @@ -113,6 +125,10 @@ public Object findSerializer(Annotated a) {
if (a.hasAnnotation(Stringable.class)) {
return ToStringSerializer.class;
}
AvroEncode ann = _findAnnotation(a, AvroEncode.class);
if (ann != null) {
return new CustomEncodingSerializer<>((CustomEncoding)ClassUtil.createInstance(ann.using(), true));
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,11 @@

import org.apache.avro.io.BinaryEncoder;

import com.fasterxml.jackson.core.Base64Variant;
import com.fasterxml.jackson.core.FormatFeature;
import com.fasterxml.jackson.core.FormatSchema;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.core.PrettyPrinter;
import com.fasterxml.jackson.core.SerializableString;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.core.*;
import com.fasterxml.jackson.core.base.GeneratorBase;
import com.fasterxml.jackson.core.io.IOContext;
import com.fasterxml.jackson.dataformat.avro.ser.AvroWriteContext;
import com.fasterxml.jackson.dataformat.avro.ser.EncodedDatum;

public class AvroGenerator extends GeneratorBase
{
Expand Down Expand Up @@ -458,6 +451,15 @@ public final void writeUTF8String(byte[] text, int offset, int len) throws IOExc
/**********************************************************
*/

@Override
public void writeEmbeddedObject(Object object) throws IOException {
if (object instanceof EncodedDatum) {
_avroContext.writeValue(object);
return;
}
super.writeEmbeddedObject(object);
}

@Override
public void writeRaw(String text) throws IOException {
_reportUnsupportedOperation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import com.fasterxml.jackson.core.io.IOContext;
import com.fasterxml.jackson.core.json.JsonReadContext;
import com.fasterxml.jackson.core.util.ByteArrayBuilder;
import com.fasterxml.jackson.dataformat.avro.deser.ArrayReader;
import com.fasterxml.jackson.dataformat.avro.deser.AvroReadContext;
import com.fasterxml.jackson.dataformat.avro.deser.MapReader;
import com.fasterxml.jackson.dataformat.avro.deser.MissingReader;

/**
Expand Down Expand Up @@ -257,6 +259,20 @@ public JsonLocation getCurrentLocation()
// !!! TODO
return null;
}

/**
* Returns the remaining number of elements in the current block of a map or array
*/
public long getRemainingElements() {

if ( _avroContext instanceof ArrayReader) {
return ((ArrayReader) _avroContext).getRemainingElements();
}
if (_avroContext instanceof MapReader) {
return ((MapReader) _avroContext).getRemainingElements();
}
return -1;
}

/*
/**********************************************************
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.fasterxml.jackson.dataformat.avro;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.reflect.CustomEncoding;

/**
* Wrapper that makes the methods on a {@link CustomEncoding} accessible since they are otherwise package-private.
*/
public class CustomEncodingWrapper<T> {
private static final Method GET_SCHEMA;
private static final Method READ;
private static final Method WRITE;

static {
try {
GET_SCHEMA = CustomEncoding.class.getDeclaredMethod("getSchema");
READ = CustomEncoding.class.getDeclaredMethod("read", Object.class, Decoder.class);
WRITE = CustomEncoding.class.getDeclaredMethod("write", Object.class, Encoder.class);
GET_SCHEMA.setAccessible(true);
READ.setAccessible(true);
WRITE.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("Failed to initialize CustomEncoderWrapper, Avro version mismatch?", e);
}
}

private final CustomEncoding<T> encoding;

public CustomEncodingWrapper(CustomEncoding<T> encoding) {
this.encoding = encoding;
}

public void write(Object datum, Encoder out) throws IOException {
try {
WRITE.invoke(encoding, datum, out);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("Failed to encode object", e);
}
}

public Schema getSchema() {
try {
return (Schema) GET_SCHEMA.invoke(encoding);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("Failed to access schema", e);
}
}

public T read(Object reuse, Decoder in) throws IOException {
try {
return (T) READ.invoke(encoding, reuse, in);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("Failed to decode object", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import com.fasterxml.jackson.core.JsonToken;

abstract class ArrayReader extends AvroStructureReader
public abstract class ArrayReader extends AvroStructureReader
{
protected final static int STATE_START = 0;
protected final static int STATE_ELEMENTS = 1;
Expand Down Expand Up @@ -55,6 +55,10 @@ protected void appendDesc(StringBuilder sb) {
sb.append(']');
}

public long getRemainingElements() {
return _count - _index;
}

@Override
public String getTypeId() {
return _currToken != JsonToken.START_ARRAY && _currToken != JsonToken.END_ARRAY ? _elementTypeId : super.getTypeId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ public class AvroParserImpl extends AvroParser
*/
protected BinaryDecoder _decoder;

/**
* Index of the union branch that was followed to reach the current token. This is cleared when the next token is read.
*/
protected int _branchIndex;

/**
* Index of the enum that was read as the current token. This is cleared when the next token is read.
*/
protected int _enumIndex;

/*
/**********************************************************
/* Life-cycle
Expand Down Expand Up @@ -101,6 +111,23 @@ protected void _releaseBuffers() throws IOException {
}
}

/**
* Skip to the end of the current structure (array/map/object); This is different from {@link #skipMap()} and {@link #skipArray()}
* because it operates at the parser level instead of at the decoder level and advances the parsing context in addition to consuming
* the data from the input.
*
* @throws IOException If there was an issue advancing through the underlying data stream
*/
protected void skipValue() throws IOException {
if (_avroContext instanceof ArrayReader) {
((ArrayReader) _avroContext).skipValue(this);
} else if (_avroContext instanceof MapReader) {
((MapReader) _avroContext).skipValue(this);
} else if (_avroContext instanceof RecordReader) {
((RecordReader) _avroContext).skipValue(this);
}
}

@Override
public JsonParser overrideFormatFeatures(int values, int mask) {
int oldF = _formatFeatures;
Expand Down Expand Up @@ -141,6 +168,8 @@ protected void _closeInput() throws IOException {
@Override
public JsonToken nextToken() throws IOException
{
_branchIndex = -1;
_enumIndex = -1;
_binaryValue = null;
if (_closed) {
return null;
Expand Down Expand Up @@ -331,11 +360,11 @@ public long skipMap() throws IOException {
// // // Misc other decoding

public int decodeIndex() throws IOException {
return _decoder.readIndex();
return (_branchIndex = _decoder.readIndex());
}

public int decodeEnum() throws IOException {
return _decoder.readEnum();
return (_enumIndex = _decoder.readEnum());
}

public boolean checkInputEnd() throws IOException {
Expand All @@ -347,6 +376,18 @@ public boolean checkInputEnd() throws IOException {
/* Methods for AvroReadContext impls, other
/**********************************************************
*/

protected int branchIndex() {
return _branchIndex;
}

protected int enumIndex() {
return _enumIndex;
}

protected boolean isRecord() {
return _avroContext instanceof RecordReader;
}

protected void setAvroContext(AvroReadContext ctxt) {
if (ctxt == null) { // sanity check
Expand Down
Loading