Skip to content

Commit 2ed9c3f

Browse files
authored
[AVRO] Support of Avro logical type Decimal. (#542)
1 parent 1b2adbe commit 2ed9c3f

File tree

11 files changed

+506
-40
lines changed

11 files changed

+506
-40
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.fasterxml.jackson.dataformat.avro.annotation;
2+
3+
import java.lang.annotation.ElementType;
4+
import java.lang.annotation.Retention;
5+
import java.lang.annotation.RetentionPolicy;
6+
import java.lang.annotation.Target;
7+
8+
/**
9+
* When generate logical types is enabled, annotation instructs the
10+
* {@link com.fasterxml.jackson.dataformat.avro.schema.AvroSchemaGenerator AvroSchemaGenerator}
11+
* to declare the annotated property's logical type as "decimal" ({@link org.apache.avro.LogicalTypes.Decimal}).
12+
* By default, the Avro type is "bytes" ({@link org.apache.avro.Schema.Type#BYTES}), unless the field is also
13+
* annotated with {@link com.fasterxml.jackson.dataformat.avro.AvroFixedSize}, in which case the Avro type
14+
* will be "fixed" ({@link org.apache.avro.Schema.Type#FIXED}).
15+
* <p>
16+
* This annotation is only used during Avro schema generation and does not affect data serialization
17+
* or deserialization.
18+
*
19+
* @since 2.19
20+
*/
21+
@Target({ElementType.ANNOTATION_TYPE, ElementType.FIELD})
22+
@Retention(RetentionPolicy.RUNTIME)
23+
public @interface AvroDecimal {
24+
25+
/**
26+
* Maximum precision of decimals stored in this type.
27+
*/
28+
int precision();
29+
30+
/**
31+
* Scale must be zero or a positive integer less than or equal to the precision.
32+
*/
33+
int scale();
34+
35+
}

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/deser/AvroParserImpl.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,38 @@ public long getRemainingElements()
581581
public abstract int decodeIndex() throws IOException;
582582
public abstract int decodeEnum() throws IOException;
583583

584+
/*
585+
/**********************************************************
586+
/* Methods for AvroReadContext implementations: decimals
587+
/**********************************************************
588+
*/
589+
590+
// @since 2.19
591+
public JsonToken decodeBytesDecimal(int scale) throws IOException {
592+
decodeBytes();
593+
_numberBigDecimal = new BigDecimal(new BigInteger(_binaryValue), scale);
594+
_numTypesValid = NR_BIGDECIMAL;
595+
return JsonToken.VALUE_NUMBER_FLOAT;
596+
}
597+
598+
// @since 2.19
599+
public void skipBytesDecimal() throws IOException {
600+
skipBytes();
601+
}
602+
603+
// @since 2.19
604+
public JsonToken decodeFixedDecimal(int scale, int size) throws IOException {
605+
decodeFixed(size);
606+
_numberBigDecimal = new BigDecimal(new BigInteger(_binaryValue), scale);
607+
_numTypesValid = NR_BIGDECIMAL;
608+
return JsonToken.VALUE_NUMBER_FLOAT;
609+
}
610+
611+
// @since 2.19
612+
public void skipFixedDecimal(int size) throws IOException {
613+
skipFixed(size);
614+
}
615+
584616
/*
585617
/**********************************************************
586618
/* Methods for AvroReadContext impls, other

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/deser/AvroReaderFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.IOException;
44
import java.util.*;
55

6+
import org.apache.avro.LogicalTypes;
67
import org.apache.avro.Schema;
78

89
import com.fasterxml.jackson.dataformat.avro.deser.ScalarDecoder.*;
@@ -56,12 +57,18 @@ public ScalarDecoder createScalarValueDecoder(Schema type)
5657
case BOOLEAN:
5758
return READER_BOOLEAN;
5859
case BYTES:
60+
if (type.getLogicalType() instanceof LogicalTypes.Decimal) {
61+
return new BytesDecimalReader(((LogicalTypes.Decimal) type.getLogicalType()).getScale());
62+
}
5963
return READER_BYTES;
6064
case DOUBLE:
6165
return READER_DOUBLE;
6266
case ENUM:
6367
return new EnumDecoder(AvroSchemaHelper.getFullName(type), type.getEnumSymbols());
6468
case FIXED:
69+
if (type.getLogicalType() instanceof LogicalTypes.Decimal) {
70+
return new FixedDecimalReader(((LogicalTypes.Decimal) type.getLogicalType()).getScale(), type.getFixedSize());
71+
}
6572
return new FixedDecoder(type.getFixedSize(), AvroSchemaHelper.getFullName(type));
6673
case FLOAT:
6774
return READER_FLOAT;

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/deser/ScalarDecoder.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.fasterxml.jackson.dataformat.avro.deser;
22

33
import java.io.IOException;
4+
import java.math.BigDecimal;
45
import java.util.List;
56

67
import com.fasterxml.jackson.core.JsonToken;
@@ -546,4 +547,106 @@ public void skipValue(AvroParserImpl parser) throws IOException {
546547
}
547548
}
548549
}
550+
551+
/**
552+
* @since 2.19
553+
*/
554+
protected final static class FixedDecimalReader extends ScalarDecoder {
555+
private final int _scale;
556+
private final int _size;
557+
558+
public FixedDecimalReader(int scale, int size) {
559+
_scale = scale;
560+
_size = size;
561+
}
562+
563+
@Override
564+
public JsonToken decodeValue(AvroParserImpl parser) throws IOException {
565+
return parser.decodeFixedDecimal(_scale, _size);
566+
}
567+
568+
@Override
569+
protected void skipValue(AvroParserImpl parser) throws IOException {
570+
parser.skipFixedDecimal(_size);
571+
}
572+
573+
@Override
574+
public String getTypeId() {
575+
return AvroSchemaHelper.getTypeId(BigDecimal.class);
576+
}
577+
578+
@Override
579+
public AvroFieldReader asFieldReader(String name, boolean skipper) {
580+
return new FR(name, skipper, getTypeId(), _scale, _size);
581+
}
582+
583+
private final static class FR extends AvroFieldReader {
584+
private final int _scale;
585+
private final int _size;
586+
public FR(String name, boolean skipper, String typeId, int scale, int size) {
587+
super(name, skipper, typeId);
588+
_scale = scale;
589+
_size = size;
590+
}
591+
592+
@Override
593+
public JsonToken readValue(AvroReadContext parent, AvroParserImpl parser) throws IOException {
594+
return parser.decodeFixedDecimal(_scale, _size);
595+
}
596+
597+
@Override
598+
public void skipValue(AvroParserImpl parser) throws IOException {
599+
parser.skipFixedDecimal(_size);
600+
}
601+
}
602+
}
603+
604+
/**
605+
* @since 2.19
606+
*/
607+
protected final static class BytesDecimalReader extends ScalarDecoder {
608+
private final int _scale;
609+
610+
public BytesDecimalReader(int scale) {
611+
_scale = scale;
612+
}
613+
614+
@Override
615+
public JsonToken decodeValue(AvroParserImpl parser) throws IOException {
616+
return parser.decodeBytesDecimal(_scale);
617+
}
618+
619+
@Override
620+
protected void skipValue(AvroParserImpl parser) throws IOException {
621+
parser.skipBytesDecimal();
622+
}
623+
624+
@Override
625+
public String getTypeId() {
626+
return AvroSchemaHelper.getTypeId(BigDecimal.class);
627+
}
628+
629+
@Override
630+
public AvroFieldReader asFieldReader(String name, boolean skipper) {
631+
return new FR(name, skipper, getTypeId(), _scale);
632+
}
633+
634+
private final static class FR extends AvroFieldReader {
635+
private final int _scale;
636+
public FR(String name, boolean skipper, String typeId, int scale) {
637+
super(name, skipper, typeId);
638+
_scale = scale;
639+
}
640+
641+
@Override
642+
public JsonToken readValue(AvroReadContext parent, AvroParserImpl parser) throws IOException {
643+
return parser.decodeBytesDecimal(_scale);
644+
}
645+
646+
@Override
647+
public void skipValue(AvroParserImpl parser) throws IOException {
648+
parser.skipFloat();
649+
}
650+
}
651+
}
549652
}

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/schema/RecordVisitor.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.List;
55
import java.util.Map;
66

7+
import org.apache.avro.LogicalTypes;
78
import org.apache.avro.Schema;
89
import org.apache.avro.Schema.Type;
910
import org.apache.avro.reflect.AvroMeta;
@@ -15,6 +16,7 @@
1516
import com.fasterxml.jackson.databind.jsontype.NamedType;
1617
import com.fasterxml.jackson.databind.ser.BeanPropertyWriter;
1718
import com.fasterxml.jackson.dataformat.avro.AvroFixedSize;
19+
import com.fasterxml.jackson.dataformat.avro.annotation.AvroDecimal;
1820
import com.fasterxml.jackson.dataformat.avro.ser.CustomEncodingSerializer;
1921

2022
public class RecordVisitor
@@ -141,7 +143,7 @@ public void optionalProperty(String name, JsonFormatVisitable handler,
141143

142144
protected Schema.Field schemaFieldForWriter(BeanProperty prop, boolean optional) throws JsonMappingException
143145
{
144-
Schema writerSchema;
146+
Schema writerSchema = null;
145147
// Check if schema for property is overridden
146148
AvroSchema schemaOverride = prop.getAnnotation(AvroSchema.class);
147149
if (schemaOverride != null) {
@@ -151,16 +153,25 @@ protected Schema.Field schemaFieldForWriter(BeanProperty prop, boolean optional)
151153
AvroFixedSize fixedSize = prop.getAnnotation(AvroFixedSize.class);
152154
if (fixedSize != null) {
153155
writerSchema = Schema.createFixed(fixedSize.typeName(), null, fixedSize.typeNamespace(), fixedSize.size());
154-
} else {
156+
}
157+
if (_visitorWrapper.isLogicalTypesEnabled()) {
158+
AvroDecimal avroDecimal = prop.getAnnotation(AvroDecimal.class);
159+
if (avroDecimal != null) {
160+
if (writerSchema == null) {
161+
writerSchema = Schema.create(Type.BYTES);
162+
}
163+
writerSchema = LogicalTypes.decimal(avroDecimal.precision(), avroDecimal.scale())
164+
.addToSchema(writerSchema);
165+
}
166+
}
167+
if (writerSchema == null) {
155168
JsonSerializer<?> ser = null;
156169

157170
// 23-Nov-2012, tatu: Ideally shouldn't need to do this but...
158171
if (prop instanceof BeanPropertyWriter) {
159172
BeanPropertyWriter bpw = (BeanPropertyWriter) prop;
160173
ser = bpw.getSerializer();
161-
/*
162-
* 2-Mar-2017, bryan: AvroEncode annotation expects to have the schema used directly
163-
*/
174+
// 2-Mar-2017, bryan: AvroEncode annotation expects to have the schema used directly
164175
optional = optional && !(ser instanceof CustomEncodingSerializer); // Don't modify schema
165176
}
166177
final SerializerProvider prov = getProvider();
@@ -204,7 +215,7 @@ protected Schema.Field schemaFieldForWriter(BeanProperty prop, boolean optional)
204215

205216
/**
206217
* A union schema with a default value must always have the schema branch corresponding to the default value first, or Avro will print a
207-
* warning complaining that the default value is not compatible. If {@code schema} is a {@code Schema.Type.UNION} schema and
218+
* warning complaining that the default value is not compatible. If {@code schema} is a {@link Type#UNION UNION} schema and
208219
* {@code defaultValue} is non-{@code null}, this finds the appropriate branch in the union and reorders the union so that it is first.
209220
*
210221
* @param schema

avro/src/main/java/com/fasterxml/jackson/dataformat/avro/ser/NonBSGenericDatumWriter.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.nio.ByteBuffer;
77
import java.util.ArrayList;
88

9+
import org.apache.avro.Conversions.DecimalConversion;
910
import org.apache.avro.Schema;
1011
import org.apache.avro.Schema.Type;
1112
import org.apache.avro.generic.GenericData;
@@ -27,6 +28,9 @@ public class NonBSGenericDatumWriter<D>
2728
private final static Class<?> CLS_BIG_DECIMAL = BigDecimal.class;
2829
private final static Class<?> CLS_BIG_INTEGER = BigInteger.class;
2930

31+
// @since 2.19
32+
private final static DecimalConversion BIG_DECIMAL_CONVERSION = new DecimalConversion();
33+
3034
public NonBSGenericDatumWriter(Schema root) {
3135
super(root);
3236
}
@@ -97,6 +101,11 @@ protected void write(Schema schema, Object datum, Encoder out) throws IOExceptio
97101
super.writeWithoutConversion(schema, ByteBuffer.wrap((byte[]) datum), out);
98102
return;
99103
}
104+
if (datum.getClass() == CLS_BIG_DECIMAL) {
105+
super.writeWithoutConversion(schema, BIG_DECIMAL_CONVERSION.toBytes(
106+
(BigDecimal) datum, schema, schema.getLogicalType()), out);
107+
return;
108+
}
100109
break;
101110
case FIXED:
102111
// One more mismatch to fix
@@ -111,6 +120,11 @@ protected void write(Schema schema, Object datum, Encoder out) throws IOExceptio
111120
super.writeWithoutConversion(schema, new GenericData.Fixed(schema, (byte[]) datum), out);
112121
return;
113122
}
123+
if (datum.getClass() == CLS_BIG_DECIMAL) {
124+
super.writeWithoutConversion(schema, BIG_DECIMAL_CONVERSION.toFixed(
125+
(BigDecimal) datum, schema, schema.getLogicalType()), out);
126+
return;
127+
}
114128
break;
115129

116130
default:

avro/src/test/java/com/fasterxml/jackson/dataformat/avro/BigDecimalTest.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

0 commit comments

Comments
 (0)