Skip to content

[AVRO] Support of Avro logical type Decimal. #542

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.fasterxml.jackson.dataformat.avro.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* When generate logical types is enabled, annotation instructs the
* {@link com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator AvroSchemaGenerator}
* to declare the annotated property's logical type as "decimal" ({@link org.apache.avro.LogicalTypes.Decimal}).
* By default, the Avro type is "bytes" ({@link org.apache.avro.Schema.Type#BYTES}), unless the field is also
* annotated with {@link com.fasterxml.jackson.dataformat.avro.AvroFixedSize}, in which case the Avro type
* will be "fixed" ({@link org.apache.avro.Schema.Type#FIXED}).
* <p>
* This annotation is only used during Avro schema generation and does not affect data serialization
* or deserialization.
*
* @since 2.19
*/
@Target({ElementType.ANNOTATION_TYPE, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface AvroDecimal {

/**
* Maximum precision of decimals stored in this type.
*/
int precision();

/**
* Scale must be zero or a positive integer less than or equal to the precision.
*/
int scale();

}
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,38 @@ public long getRemainingElements()
public abstract int decodeIndex() throws IOException;
public abstract int decodeEnum() throws IOException;

/*
/**********************************************************
/* Methods for AvroReadContext implementations: decimals
/**********************************************************
*/

// @since 2.19
public JsonToken decodeBytesDecimal(int scale) throws IOException {
decodeBytes();
_numberBigDecimal = new BigDecimal(new BigInteger(_binaryValue), scale);
_numTypesValid = NR_BIGDECIMAL;
return JsonToken.VALUE_NUMBER_FLOAT;
}

// @since 2.19
public void skipBytesDecimal() throws IOException {
skipBytes();
}

// @since 2.19
public JsonToken decodeFixedDecimal(int scale, int size) throws IOException {
decodeFixed(size);
_numberBigDecimal = new BigDecimal(new BigInteger(_binaryValue), scale);
_numTypesValid = NR_BIGDECIMAL;
return JsonToken.VALUE_NUMBER_FLOAT;
}

// @since 2.19
public void skipFixedDecimal(int size) throws IOException {
skipFixed(size);
}

/*
/**********************************************************
/* Methods for AvroReadContext impls, other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.*;

import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;

import com.fasterxml.jackson.dataformat.avro.deser.ScalarDecoder.*;
Expand Down Expand Up @@ -56,12 +57,18 @@ public ScalarDecoder createScalarValueDecoder(Schema type)
case BOOLEAN:
return READER_BOOLEAN;
case BYTES:
if (type.getLogicalType() instanceof LogicalTypes.Decimal) {
return new BytesDecimalReader(((LogicalTypes.Decimal) type.getLogicalType()).getScale());
}
return READER_BYTES;
case DOUBLE:
return READER_DOUBLE;
case ENUM:
return new EnumDecoder(AvroSchemaHelper.getFullName(type), type.getEnumSymbols());
case FIXED:
if (type.getLogicalType() instanceof LogicalTypes.Decimal) {
return new FixedDecimalReader(((LogicalTypes.Decimal) type.getLogicalType()).getScale(), type.getFixedSize());
}
return new FixedDecoder(type.getFixedSize(), AvroSchemaHelper.getFullName(type));
case FLOAT:
return READER_FLOAT;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.fasterxml.jackson.dataformat.avro.deser;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;

import com.fasterxml.jackson.core.JsonToken;
Expand Down Expand Up @@ -546,4 +547,106 @@ public void skipValue(AvroParserImpl parser) throws IOException {
}
}
}

/**
* @since 2.19
*/
protected final static class FixedDecimalReader extends ScalarDecoder {
private final int _scale;
private final int _size;

public FixedDecimalReader(int scale, int size) {
_scale = scale;
_size = size;
}

@Override
public JsonToken decodeValue(AvroParserImpl parser) throws IOException {
return parser.decodeFixedDecimal(_scale, _size);
}

@Override
protected void skipValue(AvroParserImpl parser) throws IOException {
parser.skipFixedDecimal(_size);
}

@Override
public String getTypeId() {
return AvroSchemaHelper.getTypeId(BigDecimal.class);
}

@Override
public AvroFieldReader asFieldReader(String name, boolean skipper) {
return new FR(name, skipper, getTypeId(), _scale, _size);
}

private final static class FR extends AvroFieldReader {
private final int _scale;
private final int _size;
public FR(String name, boolean skipper, String typeId, int scale, int size) {
super(name, skipper, typeId);
_scale = scale;
_size = size;
}

@Override
public JsonToken readValue(AvroReadContext parent, AvroParserImpl parser) throws IOException {
return parser.decodeFixedDecimal(_scale, _size);
}

@Override
public void skipValue(AvroParserImpl parser) throws IOException {
parser.skipFixedDecimal(_size);
}
}
}

/**
* @since 2.19
*/
protected final static class BytesDecimalReader extends ScalarDecoder {
private final int _scale;

public BytesDecimalReader(int scale) {
_scale = scale;
}

@Override
public JsonToken decodeValue(AvroParserImpl parser) throws IOException {
return parser.decodeBytesDecimal(_scale);
}

@Override
protected void skipValue(AvroParserImpl parser) throws IOException {
parser.skipBytesDecimal();
}

@Override
public String getTypeId() {
return AvroSchemaHelper.getTypeId(BigDecimal.class);
}

@Override
public AvroFieldReader asFieldReader(String name, boolean skipper) {
return new FR(name, skipper, getTypeId(), _scale);
}

private final static class FR extends AvroFieldReader {
private final int _scale;
public FR(String name, boolean skipper, String typeId, int scale) {
super(name, skipper, typeId);
_scale = scale;
}

@Override
public JsonToken readValue(AvroReadContext parent, AvroParserImpl parser) throws IOException {
return parser.decodeBytesDecimal(_scale);
}

@Override
public void skipValue(AvroParserImpl parser) throws IOException {
parser.skipFloat();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import java.util.Map;

import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.reflect.AvroMeta;
Expand All @@ -15,6 +16,7 @@
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.ser.BeanPropertyWriter;
import com.fasterxml.jackson.dataformat.avro.AvroFixedSize;
import com.fasterxml.jackson.dataformat.avro.annotation.AvroDecimal;
import com.fasterxml.jackson.dataformat.avro.ser.CustomEncodingSerializer;

public class RecordVisitor
Expand Down Expand Up @@ -141,7 +143,7 @@ public void optionalProperty(String name, JsonFormatVisitable handler,

protected Schema.Field schemaFieldForWriter(BeanProperty prop, boolean optional) throws JsonMappingException
{
Schema writerSchema;
Schema writerSchema = null;
// Check if schema for property is overridden
AvroSchema schemaOverride = prop.getAnnotation(AvroSchema.class);
if (schemaOverride != null) {
Expand All @@ -151,16 +153,25 @@ protected Schema.Field schemaFieldForWriter(BeanProperty prop, boolean optional)
AvroFixedSize fixedSize = prop.getAnnotation(AvroFixedSize.class);
if (fixedSize != null) {
writerSchema = Schema.createFixed(fixedSize.typeName(), null, fixedSize.typeNamespace(), fixedSize.size());
} else {
}
if (_visitorWrapper.isLogicalTypesEnabled()) {
AvroDecimal avroDecimal = prop.getAnnotation(AvroDecimal.class);
if (avroDecimal != null) {
if (writerSchema == null) {
writerSchema = Schema.create(Type.BYTES);
}
writerSchema = LogicalTypes.decimal(avroDecimal.precision(), avroDecimal.scale())
.addToSchema(writerSchema);
}
}
if (writerSchema == null) {
JsonSerializer<?> ser = null;

// 23-Nov-2012, tatu: Ideally shouldn't need to do this but...
if (prop instanceof BeanPropertyWriter) {
BeanPropertyWriter bpw = (BeanPropertyWriter) prop;
ser = bpw.getSerializer();
/*
* 2-Mar-2017, bryan: AvroEncode annotation expects to have the schema used directly
*/
// 2-Mar-2017, bryan: AvroEncode annotation expects to have the schema used directly
optional = optional && !(ser instanceof CustomEncodingSerializer); // Don't modify schema
}
final SerializerProvider prov = getProvider();
Expand Down Expand Up @@ -204,7 +215,7 @@ protected Schema.Field schemaFieldForWriter(BeanProperty prop, boolean optional)

/**
* A union schema with a default value must always have the schema branch corresponding to the default value first, or Avro will print a
* warning complaining that the default value is not compatible. If {@code schema} is a {@code Schema.Type.UNION} schema and
* warning complaining that the default value is not compatible. If {@code schema} is a {@link Type#UNION UNION} schema and
* {@code defaultValue} is non-{@code null}, this finds the appropriate branch in the union and reorders the union so that it is first.
*
* @param schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;

import org.apache.avro.Conversions.DecimalConversion;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
Expand All @@ -27,6 +28,9 @@ public class NonBSGenericDatumWriter<D>
private final static Class<?> CLS_BIG_DECIMAL = BigDecimal.class;
private final static Class<?> CLS_BIG_INTEGER = BigInteger.class;

// @since 2.19
private final static DecimalConversion BIG_DECIMAL_CONVERSION = new DecimalConversion();

public NonBSGenericDatumWriter(Schema root) {
super(root);
}
Expand Down Expand Up @@ -97,6 +101,11 @@ protected void write(Schema schema, Object datum, Encoder out) throws IOExceptio
super.writeWithoutConversion(schema, ByteBuffer.wrap((byte[]) datum), out);
return;
}
if (datum.getClass() == CLS_BIG_DECIMAL) {
super.writeWithoutConversion(schema, BIG_DECIMAL_CONVERSION.toBytes(
(BigDecimal) datum, schema, schema.getLogicalType()), out);
return;
}
break;
case FIXED:
// One more mismatch to fix
Expand All @@ -111,6 +120,11 @@ protected void write(Schema schema, Object datum, Encoder out) throws IOExceptio
super.writeWithoutConversion(schema, new GenericData.Fixed(schema, (byte[]) datum), out);
return;
}
if (datum.getClass() == CLS_BIG_DECIMAL) {
super.writeWithoutConversion(schema, BIG_DECIMAL_CONVERSION.toFixed(
(BigDecimal) datum, schema, schema.getLogicalType()), out);
return;
}
break;

default:
Expand Down

This file was deleted.

Loading