getWriteSupport(Configuration conf) {
- return new ProtoWriteSupport<>(clazz);
+ // Use patched implementation compatible with protobuf 4.x
+ return new PatchedProtoWriteSupport<>(clazz);
}
}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupport.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupport.java
new file mode 100644
index 0000000000000..dbd8ec1d756b9
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupport.java
@@ -0,0 +1,881 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.protobuf;
+
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.MessageOrBuilder;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.UInt32Value;
+import com.google.protobuf.UInt64Value;
+import com.google.protobuf.util.Timestamps;
+import com.google.type.Date;
+import com.google.type.TimeOfDay;
+import com.twitter.elephantbird.util.Protobufs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.conf.HadoopParquetConfiguration;
+import org.apache.parquet.conf.ParquetConfiguration;
+import org.apache.parquet.hadoop.BadConfigurationException;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.InvalidRecordException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.proto.ProtoReadSupport;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.IncompatibleSchemaModificationException;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Array;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_ITEM_SEPARATOR;
+import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_KEY_VALUE_SEPARATOR;
+import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX;
+
+/**
+ * Implementation of {@link WriteSupport} for writing Protocol Buffers.
+ *
+ * NOTE: This is a vendored patched version of ProtoWriteSupport to work with protobuf 4.x. The
+ * patch replaces the deprecated/removed enum based syntax detection with a string based approach
+ * compatible with protobuf 3 and 4.See parquet-java issue
+ * https://github.com/apache/parquet-java/issues/3175.
+ *
+ *
The original source can be found here:
+ * https://github.com/apache/parquet-java/blob/apache-parquet-1.15.2/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
+ *
+ *
Patched code is marked with BEGIN PATCH / END PATCH comments in the source.
+ */
+class PatchedProtoWriteSupport extends WriteSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PatchedProtoWriteSupport.class);
+ public static final String PB_CLASS_WRITE = "parquet.proto.writeClass";
+ // PARQUET-968 introduces changes to allow writing specs compliant schemas with
+ // parquet-protobuf.
+ // In the past, collection were not written using the LIST and MAP wrappers and thus were not
+ // compliant
+ // with the parquet specs. This flag, is set to true, allows to write using spec compliant
+ // schemas
+ // but is set to false by default to keep backward compatibility.
+ public static final String PB_SPECS_COMPLIANT_WRITE = "parquet.proto.writeSpecsCompliant";
+
+ public static final String PB_UNWRAP_PROTO_WRAPPERS = "parquet.proto.unwrapProtoWrappers";
+
+ private boolean writeSpecsCompliant = false;
+ private boolean unwrapProtoWrappers = false;
+ private RecordConsumer recordConsumer;
+ private Class extends Message> protoMessage;
+ private Descriptor descriptor;
+ private MessageWriter messageWriter;
+ // Keep protobuf enum value with number in the metadata, so that in read time, a reader can read
+ // at least
+ // the number back even with an outdated schema which might not contain all enum values.
+ private Map> protoEnumBookKeeper = new HashMap<>();
+
+ public PatchedProtoWriteSupport() {}
+
+ public PatchedProtoWriteSupport(Class extends Message> protobufClass) {
+ this.protoMessage = protobufClass;
+ }
+
+ public PatchedProtoWriteSupport(Descriptor descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ @Override
+ public String getName() {
+ return "protobuf";
+ }
+
+ public static void setSchema(Configuration configuration, Class extends Message> protoClass) {
+ configuration.setClass(PB_CLASS_WRITE, protoClass, Message.class);
+ }
+
+ /**
+ * Make parquet-protobuf use the LIST and MAP wrappers for collections. Set to false if you need
+ * backward compatibility with parquet before PARQUET-968 (1.9.0 and older).
+ *
+ * @param configuration The hadoop configuration
+ * @param writeSpecsCompliant If set to true, the old schema style will be used (without
+ * wrappers).
+ */
+ public static void setWriteSpecsCompliant(
+ Configuration configuration, boolean writeSpecsCompliant) {
+ configuration.setBoolean(PB_SPECS_COMPLIANT_WRITE, writeSpecsCompliant);
+ }
+
+ public static void setUnwrapProtoWrappers(
+ Configuration configuration, boolean unwrapProtoWrappers) {
+ configuration.setBoolean(PB_UNWRAP_PROTO_WRAPPERS, unwrapProtoWrappers);
+ }
+
+ /**
+ * Writes Protocol buffer to parquet file.
+ *
+ * @param record instance of Message.Builder or Message.
+ */
+ @Override
+ public void write(T record) {
+ recordConsumer.startMessage();
+ try {
+ messageWriter.writeTopLevelMessage(record);
+ } catch (RuntimeException e) {
+ Message m =
+ (record instanceof Message.Builder)
+ ? ((Message.Builder) record).build()
+ : (Message) record;
+ LOG.error("Cannot write message {}: {}", e.getMessage(), m);
+ throw e;
+ }
+ recordConsumer.endMessage();
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.recordConsumer = recordConsumer;
+ }
+
+ @Override
+ public WriteContext init(Configuration configuration) {
+ return init(new HadoopParquetConfiguration(configuration));
+ }
+
+ @Override
+ public WriteContext init(ParquetConfiguration configuration) {
+
+ Map extraMetaData = new HashMap<>();
+
+ // if no protobuf descriptor was given in constructor, load descriptor from configuration
+ // (set with
+ // setProtobufClass)
+ if (descriptor == null) {
+ if (protoMessage == null) {
+ Class extends Message> pbClass =
+ configuration.getClass(PB_CLASS_WRITE, null, Message.class);
+ if (pbClass != null) {
+ protoMessage = pbClass;
+ } else {
+ String msg = "Protocol buffer class or descriptor not specified.";
+ String hint =
+ " Please use method ProtoParquetOutputFormat.setProtobufClass(...) or other similar method.";
+ throw new BadConfigurationException(msg + hint);
+ }
+ }
+ descriptor = Protobufs.getMessageDescriptor(protoMessage);
+ extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName());
+ }
+
+ unwrapProtoWrappers =
+ configuration.getBoolean(PB_UNWRAP_PROTO_WRAPPERS, unwrapProtoWrappers);
+ writeSpecsCompliant =
+ configuration.getBoolean(PB_SPECS_COMPLIANT_WRITE, writeSpecsCompliant);
+ MessageType rootSchema = new PatchedProtoSchemaConverter(configuration).convert(descriptor);
+ validatedMapping(descriptor, rootSchema);
+
+ this.messageWriter = new MessageWriter(descriptor, rootSchema);
+
+ extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, descriptor.toProto().toString());
+ extraMetaData.put(PB_SPECS_COMPLIANT_WRITE, String.valueOf(writeSpecsCompliant));
+ extraMetaData.put(PB_UNWRAP_PROTO_WRAPPERS, String.valueOf(unwrapProtoWrappers));
+ return new WriteContext(rootSchema, extraMetaData);
+ }
+
+ @Override
+ public FinalizedWriteContext finalizeWrite() {
+ Map protoMetadata = enumMetadata();
+ return new FinalizedWriteContext(protoMetadata);
+ }
+
+ private Map enumMetadata() {
+ Map enumMetadata = new HashMap<>();
+ for (Map.Entry> enumNameNumberMapping :
+ protoEnumBookKeeper.entrySet()) {
+ StringBuilder nameNumberPairs = new StringBuilder();
+ if (enumNameNumberMapping.getValue().isEmpty()) {
+ // No enum is ever written to any column of this file, put an empty string as the
+ // value in the metadata
+ LOG.info("No enum is written for {}", enumNameNumberMapping.getKey());
+ }
+ int idx = 0;
+ for (Map.Entry nameNumberPair :
+ enumNameNumberMapping.getValue().entrySet()) {
+ nameNumberPairs
+ .append(nameNumberPair.getKey())
+ .append(METADATA_ENUM_KEY_VALUE_SEPARATOR)
+ .append(nameNumberPair.getValue());
+ idx++;
+ if (idx < enumNameNumberMapping.getValue().size()) {
+ nameNumberPairs.append(METADATA_ENUM_ITEM_SEPARATOR);
+ }
+ }
+ enumMetadata.put(
+ METADATA_ENUM_PREFIX + enumNameNumberMapping.getKey(),
+ nameNumberPairs.toString());
+ }
+ return enumMetadata;
+ }
+
+ class FieldWriter {
+ String fieldName;
+ int index = -1;
+
+ void setFieldName(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ /** Sets index of field inside parquet message. */
+ void setIndex(int index) {
+ this.index = index;
+ }
+
+ /** Used for writing repeated fields. */
+ void writeRawValue(Object value) {}
+
+ /** Used for writing nonrepeated (optional, required) fields. */
+ void writeField(Object value) {
+ if (!(this instanceof PatchedProtoWriteSupport.MapWriter)) {
+ recordConsumer.startField(fieldName, index);
+ }
+ writeRawValue(value);
+ if (!(this instanceof PatchedProtoWriteSupport.MapWriter)) {
+ recordConsumer.endField(fieldName, index);
+ }
+ }
+ }
+
+ class MessageWriter extends FieldWriter {
+
+ final FieldWriter[] fieldWriters;
+
+ @SuppressWarnings("unchecked")
+ MessageWriter(Descriptor descriptor, GroupType schema) {
+ List fields = descriptor.getFields();
+ fieldWriters = (FieldWriter[]) Array.newInstance(FieldWriter.class, fields.size());
+
+ for (FieldDescriptor fieldDescriptor : fields) {
+ String name = fieldDescriptor.getName();
+ Type type = schema.getType(name);
+ FieldWriter writer = createWriter(fieldDescriptor, type);
+
+ if (writeSpecsCompliant
+ && fieldDescriptor.isRepeated()
+ && !fieldDescriptor.isMapField()) {
+ writer = new ArrayWriter(writer);
+ } else if (!writeSpecsCompliant && fieldDescriptor.isRepeated()) {
+ // the old schemas style used to write maps as repeated fields instead of
+ // wrapping them in a LIST
+ writer = new RepeatedWriter(writer);
+ }
+
+ writer.setFieldName(name);
+ writer.setIndex(schema.getFieldIndex(name));
+
+ fieldWriters[fieldDescriptor.getIndex()] = writer;
+ }
+ }
+
+ private FieldWriter createWriter(FieldDescriptor fieldDescriptor, Type type) {
+
+ switch (fieldDescriptor.getJavaType()) {
+ case STRING:
+ return new StringWriter();
+ case MESSAGE:
+ return createMessageWriter(fieldDescriptor, type);
+ case INT:
+ return new IntWriter();
+ case LONG:
+ return new LongWriter();
+ case FLOAT:
+ return new FloatWriter();
+ case DOUBLE:
+ return new DoubleWriter();
+ case ENUM:
+ return new EnumWriter(fieldDescriptor.getEnumType());
+ case BOOLEAN:
+ return new BooleanWriter();
+ case BYTE_STRING:
+ return new BinaryWriter();
+ }
+
+ return unknownType(fieldDescriptor); // should not be executed, always throws exception.
+ }
+
+ private FieldWriter createMessageWriter(FieldDescriptor fieldDescriptor, Type type) {
+ if (fieldDescriptor.isMapField() && writeSpecsCompliant) {
+ return createMapWriter(fieldDescriptor, type);
+ }
+
+ if (unwrapProtoWrappers) {
+ Descriptor messageType = fieldDescriptor.getMessageType();
+ if (messageType.equals(Timestamp.getDescriptor())) {
+ return new TimestampWriter();
+ }
+ if (messageType.equals(Date.getDescriptor())) {
+ return new DateWriter();
+ }
+ if (messageType.equals(TimeOfDay.getDescriptor())) {
+ return new TimeWriter();
+ }
+ if (messageType.equals(DoubleValue.getDescriptor())) {
+ return new DoubleValueWriter();
+ }
+ if (messageType.equals(FloatValue.getDescriptor())) {
+ return new FloatValueWriter();
+ }
+ if (messageType.equals(Int64Value.getDescriptor())) {
+ return new Int64ValueWriter();
+ }
+ if (messageType.equals(UInt64Value.getDescriptor())) {
+ return new UInt64ValueWriter();
+ }
+ if (messageType.equals(Int32Value.getDescriptor())) {
+ return new Int32ValueWriter();
+ }
+ if (messageType.equals(UInt32Value.getDescriptor())) {
+ return new UInt32ValueWriter();
+ }
+ if (messageType.equals(BoolValue.getDescriptor())) {
+ return new BoolValueWriter();
+ }
+ if (messageType.equals(StringValue.getDescriptor())) {
+ return new StringValueWriter();
+ }
+ if (messageType.equals(BytesValue.getDescriptor())) {
+ return new BytesValueWriter();
+ }
+ }
+
+ // This can happen now that recursive schemas get truncated to bytes. Write the bytes.
+ if (type.isPrimitive()
+ && type.asPrimitiveType().getPrimitiveTypeName()
+ == PrimitiveType.PrimitiveTypeName.BINARY) {
+ return new BinaryWriter();
+ }
+
+ return new MessageWriter(fieldDescriptor.getMessageType(), getGroupType(type));
+ }
+
+ private GroupType getGroupType(Type type) {
+ LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+ if (logicalTypeAnnotation == null) {
+ return type.asGroupType();
+ }
+ return logicalTypeAnnotation
+ .accept(
+ new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor() {
+ @Override
+ public Optional visit(
+ LogicalTypeAnnotation.ListLogicalTypeAnnotation
+ listLogicalType) {
+ return ofNullable(
+ type.asGroupType()
+ .getType("list")
+ .asGroupType()
+ .getType("element")
+ .asGroupType());
+ }
+
+ @Override
+ public Optional visit(
+ LogicalTypeAnnotation.MapLogicalTypeAnnotation
+ mapLogicalType) {
+ return ofNullable(
+ type.asGroupType()
+ .getType("key_value")
+ .asGroupType()
+ .getType("value")
+ .asGroupType());
+ }
+ })
+ .orElse(type.asGroupType());
+ }
+
+ private MapWriter createMapWriter(FieldDescriptor fieldDescriptor, Type type) {
+ List fields = fieldDescriptor.getMessageType().getFields();
+ if (fields.size() != 2) {
+ throw new UnsupportedOperationException(
+ "Expected two fields for the map (key/value), but got: " + fields);
+ }
+
+ // KeyFieldWriter
+ FieldDescriptor keyProtoField = fields.get(0);
+ FieldWriter keyWriter = createWriter(keyProtoField, type);
+ keyWriter.setFieldName(keyProtoField.getName());
+ keyWriter.setIndex(0);
+
+ // ValueFieldWriter
+ FieldDescriptor valueProtoField = fields.get(1);
+ FieldWriter valueWriter = createWriter(valueProtoField, type);
+ valueWriter.setFieldName(valueProtoField.getName());
+ valueWriter.setIndex(1);
+
+ return new MapWriter(keyWriter, valueWriter);
+ }
+
+ /** Writes top level message. It cannot call startGroup() */
+ void writeTopLevelMessage(Object value) {
+ writeAllFields((MessageOrBuilder) value);
+ }
+
+ /** Writes message as part of repeated field. It cannot start field */
+ @Override
+ final void writeRawValue(Object value) {
+ recordConsumer.startGroup();
+ writeAllFields((MessageOrBuilder) value);
+ recordConsumer.endGroup();
+ }
+
+ /** Used for writing nonrepeated (optional, required) fields. */
+ @Override
+ final void writeField(Object value) {
+ recordConsumer.startField(fieldName, index);
+ writeRawValue(value);
+ recordConsumer.endField(fieldName, index);
+ }
+
+ private void writeAllFields(MessageOrBuilder pb) {
+ Descriptor messageDescriptor = pb.getDescriptorForType();
+ // ============================================================================
+ // BEGIN PATCH: Replace enum-based syntax detection with string-based approach
+ // ============================================================================
+ String syntax = messageDescriptor.getFile().toProto().getSyntax();
+
+ // Check for editions syntax (not supported)
+ if ("editions".equals(syntax)) {
+ throw new UnsupportedOperationException(
+ "Protocol Buffers editions syntax is not supported");
+ }
+
+ // proto2 uses empty string or "proto2", proto3 uses "proto3"
+ boolean isProto2 = syntax.isEmpty() || "proto2".equals(syntax);
+
+ if (isProto2) {
+ // ============================================================================
+ // END PATCH
+ // ============================================================================
+ // Returns changed fields with values. Map is ordered by id.
+ Map changedPbFields = pb.getAllFields();
+
+ for (Map.Entry entry : changedPbFields.entrySet()) {
+ FieldDescriptor fieldDescriptor = entry.getKey();
+
+ if (fieldDescriptor.isExtension()) {
+ // Field index of an extension field might overlap with a base field.
+ throw new UnsupportedOperationException(
+ "Cannot convert Protobuf message with extension field(s)");
+ }
+
+ int fieldIndex = fieldDescriptor.getIndex();
+ fieldWriters[fieldIndex].writeField(entry.getValue());
+ }
+ } else {
+ // proto3
+ List fieldDescriptors = messageDescriptor.getFields();
+ for (FieldDescriptor fieldDescriptor : fieldDescriptors) {
+ FieldDescriptor.Type type = fieldDescriptor.getType();
+
+ // For a field in a oneOf that isn't set don't write anything
+ if (fieldDescriptor.getContainingOneof() != null
+ && !pb.hasField(fieldDescriptor)) {
+ continue;
+ }
+
+ if (!fieldDescriptor.isRepeated()
+ && FieldDescriptor.Type.MESSAGE.equals(type)
+ && !pb.hasField(fieldDescriptor)) {
+ continue;
+ }
+ int fieldIndex = fieldDescriptor.getIndex();
+ FieldWriter fieldWriter = fieldWriters[fieldIndex];
+ fieldWriter.writeField(pb.getField(fieldDescriptor));
+ }
+ }
+ }
+ }
+
+ class ArrayWriter extends FieldWriter {
+ final FieldWriter fieldWriter;
+
+ ArrayWriter(FieldWriter fieldWriter) {
+ this.fieldWriter = fieldWriter;
+ }
+
+ @Override
+ final void writeRawValue(Object value) {
+ throw new UnsupportedOperationException("Array has no raw value");
+ }
+
+ @Override
+ final void writeField(Object value) {
+ List> list = (List>) value;
+ if (list.isEmpty()) {
+ return;
+ }
+
+ recordConsumer.startField(fieldName, index);
+ recordConsumer.startGroup();
+
+ recordConsumer.startField("list", 0); // This is the wrapper group for the array field
+ for (Object listEntry : list) {
+ recordConsumer.startGroup();
+ recordConsumer.startField("element", 0); // This is the mandatory inner field
+
+ fieldWriter.writeRawValue(listEntry);
+
+ recordConsumer.endField("element", 0);
+ recordConsumer.endGroup();
+ }
+ recordConsumer.endField("list", 0);
+
+ recordConsumer.endGroup();
+ recordConsumer.endField(fieldName, index);
+ }
+ }
+
+ /**
+ * The RepeatedWriter is used to write collections (lists and maps) using the old style (without
+ * LIST and MAP wrappers).
+ */
+ class RepeatedWriter extends FieldWriter {
+ final FieldWriter fieldWriter;
+
+ RepeatedWriter(FieldWriter fieldWriter) {
+ this.fieldWriter = fieldWriter;
+ }
+
+ @Override
+ final void writeRawValue(Object value) {
+ throw new UnsupportedOperationException("Array has no raw value");
+ }
+
+ @Override
+ final void writeField(Object value) {
+ List> list = (List>) value;
+ if (list.isEmpty()) {
+ return;
+ }
+
+ recordConsumer.startField(fieldName, index);
+
+ for (Object listEntry : list) {
+ fieldWriter.writeRawValue(listEntry);
+ }
+
+ recordConsumer.endField(fieldName, index);
+ }
+ }
+
+ /** validates mapping between protobuffer fields and parquet fields. */
+ private void validatedMapping(Descriptor descriptor, GroupType parquetSchema) {
+ List allFields = descriptor.getFields();
+
+ for (FieldDescriptor fieldDescriptor : allFields) {
+ String fieldName = fieldDescriptor.getName();
+ int fieldIndex = fieldDescriptor.getIndex();
+ int parquetIndex = parquetSchema.getFieldIndex(fieldName);
+ if (fieldIndex != parquetIndex) {
+ String message =
+ "FieldIndex mismatch name="
+ + fieldName
+ + ": "
+ + fieldIndex
+ + " != "
+ + parquetIndex;
+ throw new IncompatibleSchemaModificationException(message);
+ }
+ }
+ }
+
+ class StringWriter extends FieldWriter {
+ @Override
+ final void writeRawValue(Object value) {
+ Binary binaryString = Binary.fromString((String) value);
+ recordConsumer.addBinary(binaryString);
+ }
+ }
+
+ class IntWriter extends FieldWriter {
+ @Override
+ final void writeRawValue(Object value) {
+ recordConsumer.addInteger((Integer) value);
+ }
+ }
+
+ class LongWriter extends FieldWriter {
+
+ @Override
+ final void writeRawValue(Object value) {
+ recordConsumer.addLong((Long) value);
+ }
+ }
+
+ class MapWriter extends FieldWriter {
+
+ private final FieldWriter keyWriter;
+ private final FieldWriter valueWriter;
+
+ public MapWriter(FieldWriter keyWriter, FieldWriter valueWriter) {
+ super();
+ this.keyWriter = keyWriter;
+ this.valueWriter = valueWriter;
+ }
+
+ @Override
+ final void writeRawValue(Object value) {
+ Collection collection = (Collection) value;
+ if (collection.isEmpty()) {
+ return;
+ }
+ recordConsumer.startField(fieldName, index);
+ recordConsumer.startGroup();
+
+ recordConsumer.startField(
+ "key_value", 0); // This is the wrapper group for the map field
+ for (Message msg : collection) {
+ recordConsumer.startGroup();
+
+ final Descriptor descriptorForType = msg.getDescriptorForType();
+ final FieldDescriptor keyDesc = descriptorForType.findFieldByName("key");
+ final FieldDescriptor valueDesc = descriptorForType.findFieldByName("value");
+
+ keyWriter.writeField(msg.getField(keyDesc));
+ valueWriter.writeField(msg.getField(valueDesc));
+
+ recordConsumer.endGroup();
+ }
+
+ recordConsumer.endField("key_value", 0);
+
+ recordConsumer.endGroup();
+ recordConsumer.endField(fieldName, index);
+ }
+ }
+
+ class FloatWriter extends FieldWriter {
+ @Override
+ final void writeRawValue(Object value) {
+ recordConsumer.addFloat((Float) value);
+ }
+ }
+
+ class DoubleWriter extends FieldWriter {
+ @Override
+ final void writeRawValue(Object value) {
+ recordConsumer.addDouble((Double) value);
+ }
+ }
+
+ class EnumWriter extends FieldWriter {
+ Map enumNameNumberPairs;
+
+ public EnumWriter(Descriptors.EnumDescriptor enumType) {
+ if (protoEnumBookKeeper.containsKey(enumType.getFullName())) {
+ enumNameNumberPairs = protoEnumBookKeeper.get(enumType.getFullName());
+ } else {
+ enumNameNumberPairs = new HashMap<>();
+ protoEnumBookKeeper.put(enumType.getFullName(), enumNameNumberPairs);
+ }
+ }
+
+ @Override
+ final void writeRawValue(Object value) {
+ Descriptors.EnumValueDescriptor enumValueDesc = (Descriptors.EnumValueDescriptor) value;
+ Binary binary = Binary.fromString(enumValueDesc.getName());
+ recordConsumer.addBinary(binary);
+ enumNameNumberPairs.putIfAbsent(enumValueDesc.getName(), enumValueDesc.getNumber());
+ }
+ }
+
+ class BooleanWriter extends FieldWriter {
+ @Override
+ final void writeRawValue(Object value) {
+ recordConsumer.addBoolean((Boolean) value);
+ }
+ }
+
+ class BinaryWriter extends FieldWriter {
+ @Override
+ final void writeRawValue(Object value) {
+ // Non-ByteString values can happen when recursions gets truncated.
+ ByteString byteString =
+ value instanceof ByteString
+ ? (ByteString) value
+ // TODO: figure out a way to use MessageOrBuilder
+ : value instanceof Message
+ ? ((Message) value).toByteString()
+ // Worst-case, just dump as plain java string.
+ : ByteString.copyFromUtf8(value.toString());
+ Binary binary = Binary.fromConstantByteArray(byteString.toByteArray());
+ recordConsumer.addBinary(binary);
+ }
+ }
+
+ class TimestampWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ Timestamp timestamp = (Timestamp) value;
+ recordConsumer.addLong(Timestamps.toNanos(timestamp));
+ }
+ }
+
+ class DateWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ Date date = (Date) value;
+ LocalDate localDate = LocalDate.of(date.getYear(), date.getMonth(), date.getDay());
+ recordConsumer.addInteger((int) localDate.toEpochDay());
+ }
+ }
+
+ class TimeWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ com.google.type.TimeOfDay timeOfDay = (com.google.type.TimeOfDay) value;
+ LocalTime localTime =
+ LocalTime.of(
+ timeOfDay.getHours(),
+ timeOfDay.getMinutes(),
+ timeOfDay.getSeconds(),
+ timeOfDay.getNanos());
+ recordConsumer.addLong(localTime.toNanoOfDay());
+ }
+ }
+
+ class DoubleValueWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ recordConsumer.addDouble(((DoubleValue) value).getValue());
+ }
+ }
+
+ class FloatValueWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ recordConsumer.addFloat(((FloatValue) value).getValue());
+ }
+ }
+
+ class Int64ValueWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ recordConsumer.addLong(((Int64Value) value).getValue());
+ }
+ }
+
+ class UInt64ValueWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ recordConsumer.addLong(((UInt64Value) value).getValue());
+ }
+ }
+
+ class Int32ValueWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ recordConsumer.addInteger(((Int32Value) value).getValue());
+ }
+ }
+
+ class UInt32ValueWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ recordConsumer.addLong(((UInt32Value) value).getValue());
+ }
+ }
+
+ class BoolValueWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ recordConsumer.addBoolean(((BoolValue) value).getValue());
+ }
+ }
+
+ class StringValueWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ Binary binaryString = Binary.fromString(((StringValue) value).getValue());
+ recordConsumer.addBinary(binaryString);
+ }
+ }
+
+ class BytesValueWriter extends FieldWriter {
+ @Override
+ void writeRawValue(Object value) {
+ byte[] byteArray = ((BytesValue) value).getValue().toByteArray();
+ Binary binary = Binary.fromConstantByteArray(byteArray);
+ recordConsumer.addBinary(binary);
+ }
+ }
+
+ private FieldWriter unknownType(FieldDescriptor fieldDescriptor) {
+ String exceptionMsg =
+ "Unknown type with descriptor \""
+ + fieldDescriptor
+ + "\" and type \""
+ + fieldDescriptor.getJavaType()
+ + "\".";
+ throw new InvalidRecordException(exceptionMsg);
+ }
+}
+
+/**
+ * Minimal schema converter extracting only needed behavior for the patched support. For the test
+ * cases we only rely on primitive field mappings, so we can forward directly to the real converter
+ * if present; else implement minimal mapping. To minimize risk, we reflectively invoke the original
+ * ProtoSchemaConverter if available.
+ */
+class PatchedProtoSchemaConverter {
+ private final ParquetConfiguration configuration;
+
+ PatchedProtoSchemaConverter(ParquetConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ MessageType convert(Descriptor descriptor) {
+ try {
+ Class> clazz = Class.forName("org.apache.parquet.proto.ProtoSchemaConverter");
+ Object inst =
+ clazz.getConstructor(ParquetConfiguration.class).newInstance(configuration);
+ return (MessageType)
+ clazz.getMethod("convert", Descriptor.class).invoke(inst, descriptor);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to delegate to ProtoSchemaConverter", e);
+ }
+ }
+}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupportTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupportTest.java
new file mode 100644
index 0000000000000..77f829285f23a
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupportTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.protobuf;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.proto.ProtoParquetReader;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.flink.formats.parquet.protobuf.SimpleRecord.SimpleProtoRecord;
+import static org.apache.flink.formats.parquet.protobuf.TestProto2.TestProto2Record;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link PatchedProtoWriteSupport} to verify protobuf 4.x compatibility.
+ *
+ * This test validates that the patched string-based syntax detection correctly handles both
+ * proto2 and proto3 messages when using protobuf 4.x, where the enum-based Syntax API was removed.
+ */
+class PatchedProtoWriteSupportTest {
+
+ @TempDir File tempDir;
+
+ /**
+ * Tests that proto3 messages can be written and read correctly with the patched write support.
+ */
+ @Test
+ void testProto3SyntaxDetection() throws IOException {
+ File outputFile = new File(tempDir, "proto3_test.parquet");
+ Path path = new Path(outputFile.toURI());
+
+ // Create a proto3 message
+ SimpleProtoRecord record =
+ SimpleProtoRecord.newBuilder()
+ .setFoo("test_foo")
+ .setBar("test_bar")
+ .setNum(42)
+ .build();
+
+ // Write using PatchedProtoWriteSupport directly
+ try (ParquetWriter writer =
+ new ParquetWriter<>(
+ path,
+ new PatchedProtoWriteSupport<>(SimpleProtoRecord.class),
+ CompressionCodecName.SNAPPY,
+ ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE)) {
+ writer.write(record);
+ }
+
+ // Read back and verify
+ try (ParquetReader reader =
+ ProtoParquetReader.builder(path).build()) {
+ SimpleProtoRecord.Builder readRecord = reader.read();
+ assertThat(readRecord).isNotNull();
+ assertThat(readRecord.build()).isEqualTo(record);
+ }
+ }
+
+ /**
+ * Tests that proto2 messages can be written and read correctly with the patched write support.
+ */
+ @Test
+ void testProto2SyntaxDetection() throws IOException {
+ File outputFile = new File(tempDir, "proto2_test.parquet");
+ Path path = new Path(outputFile.toURI());
+
+ // Create a proto2 message with only some fields set
+ TestProto2Record record =
+ TestProto2Record.newBuilder().setName("test_name").setValue(123).build();
+
+ // Write using PatchedProtoWriteSupport directly
+ try (ParquetWriter writer =
+ new ParquetWriter<>(
+ path,
+ new PatchedProtoWriteSupport<>(TestProto2Record.class),
+ CompressionCodecName.SNAPPY,
+ ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE)) {
+ writer.write(record);
+ }
+
+ // Read back and verify
+ try (ParquetReader reader =
+ ProtoParquetReader.builder(path).build()) {
+ TestProto2Record.Builder readRecord = reader.read();
+ assertThat(readRecord).isNotNull();
+ TestProto2Record result = readRecord.build();
+ assertThat(result.getName()).isEqualTo("test_name");
+ assertThat(result.getValue()).isEqualTo(123);
+ // flag field was not set, should be default
+ assertThat(result.hasFlag()).isFalse();
+ }
+ }
+
+ /**
+ * Tests that proto3 messages with default values are handled correctly.
+ *
+ * In proto3, all fields are written including those with default values.
+ */
+ @Test
+ void testProto3WithDefaults() throws IOException {
+ File outputFile = new File(tempDir, "proto3_defaults.parquet");
+ Path path = new Path(outputFile.toURI());
+
+ // Create a proto3 message with default values
+ SimpleProtoRecord record =
+ SimpleProtoRecord.newBuilder().setFoo("").setBar("").setNum(0).build();
+
+ // Write using PatchedProtoWriteSupport
+ try (ParquetWriter writer =
+ new ParquetWriter<>(
+ path,
+ new PatchedProtoWriteSupport<>(SimpleProtoRecord.class),
+ CompressionCodecName.SNAPPY,
+ ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE)) {
+ writer.write(record);
+ }
+
+ // Read back and verify - proto3 should read all fields even if default
+ try (ParquetReader reader =
+ ProtoParquetReader.builder(path).build()) {
+ SimpleProtoRecord.Builder readRecord = reader.read();
+ assertThat(readRecord).isNotNull();
+ assertThat(readRecord.build()).isEqualTo(record);
+ }
+ }
+
+ /**
+ * Tests that proto2 only writes fields that have been explicitly set.
+ *
+ * In proto2, unset optional fields should not be written to the file.
+ */
+ @Test
+ void testProto2OnlyWritesSetFields() throws IOException {
+ File outputFile = new File(tempDir, "proto2_partial.parquet");
+ Path path = new Path(outputFile.toURI());
+
+ // Create a proto2 message with only one field set
+ TestProto2Record record = TestProto2Record.newBuilder().setName("only_name").build();
+
+ // Write using PatchedProtoWriteSupport
+ try (ParquetWriter writer =
+ new ParquetWriter<>(
+ path,
+ new PatchedProtoWriteSupport<>(TestProto2Record.class),
+ CompressionCodecName.SNAPPY,
+ ParquetWriter.DEFAULT_BLOCK_SIZE,
+ ParquetWriter.DEFAULT_PAGE_SIZE)) {
+ writer.write(record);
+ }
+
+ // Read back and verify
+ try (ParquetReader reader =
+ ProtoParquetReader.builder(path).build()) {
+ TestProto2Record.Builder readRecord = reader.read();
+ assertThat(readRecord).isNotNull();
+ TestProto2Record result = readRecord.build();
+ assertThat(result.getName()).isEqualTo("only_name");
+ // value and flag were not set
+ assertThat(result.hasValue()).isFalse();
+ assertThat(result.hasFlag()).isFalse();
+ }
+ }
+
+ /**
+ * Integration test using ParquetProtoWriters (Flink's production API).
+ *
+ * This validates that PatchedProtoWriteSupport works correctly when used through Flink's
+ * ParquetProtoWriters factory, which is the actual production code path.
+ */
+ @Test
+ void testViaParquetProtoWritersForProto3() throws IOException {
+ File outputFile = new File(tempDir, "proto3_via_writers.parquet");
+ Path hadoopPath = new Path(outputFile.toURI());
+ OutputFile outputFileObj =
+ HadoopOutputFile.fromPath(hadoopPath, new org.apache.hadoop.conf.Configuration());
+
+ // Create a proto3 message
+ SimpleProtoRecord record =
+ SimpleProtoRecord.newBuilder()
+ .setFoo("via_writers")
+ .setBar("test")
+ .setNum(99)
+ .build();
+
+ // Write using ParquetProtoWriters (production code path)
+ try (ParquetWriter writer =
+ new ParquetProtoWriters.ParquetProtoWriterBuilder<>(
+ outputFileObj, SimpleProtoRecord.class)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .build()) {
+ writer.write(record);
+ }
+
+ // Read back and verify
+ try (ParquetReader reader =
+ ProtoParquetReader.builder(hadoopPath).build()) {
+ SimpleProtoRecord.Builder readRecord = reader.read();
+ assertThat(readRecord).isNotNull();
+ assertThat(readRecord.build()).isEqualTo(record);
+ }
+ }
+
+ /**
+ * Integration test using ParquetProtoWriters for proto2 messages.
+ *
+ * Verifies that proto2 syntax detection works correctly through the production API.
+ */
+ @Test
+ void testViaParquetProtoWritersForProto2() throws IOException {
+ File outputFile = new File(tempDir, "proto2_via_writers.parquet");
+ Path hadoopPath = new Path(outputFile.toURI());
+ OutputFile outputFileObj =
+ HadoopOutputFile.fromPath(hadoopPath, new org.apache.hadoop.conf.Configuration());
+
+ // Create a proto2 message with partial fields
+ TestProto2Record record =
+ TestProto2Record.newBuilder().setName("proto2_writer").setFlag(true).build();
+
+ // Write using ParquetProtoWriters (production code path)
+ try (ParquetWriter writer =
+ new ParquetProtoWriters.ParquetProtoWriterBuilder<>(
+ outputFileObj, TestProto2Record.class)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .build()) {
+ writer.write(record);
+ }
+
+ // Read back and verify
+ try (ParquetReader reader =
+ ProtoParquetReader.builder(hadoopPath).build()) {
+ TestProto2Record.Builder readRecord = reader.read();
+ assertThat(readRecord).isNotNull();
+ TestProto2Record result = readRecord.build();
+ assertThat(result.getName()).isEqualTo("proto2_writer");
+ assertThat(result.getFlag()).isTrue();
+ // value was not set
+ assertThat(result.hasValue()).isFalse();
+ }
+ }
+}
diff --git a/flink-formats/flink-parquet/src/test/resources/protobuf/test_proto2.proto b/flink-formats/flink-parquet/src/test/resources/protobuf/test_proto2.proto
new file mode 100644
index 0000000000000..d580e93076c3d
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/resources/protobuf/test_proto2.proto
@@ -0,0 +1,9 @@
+syntax = "proto2";
+
+package org.apache.flink.formats.parquet.protobuf;
+
+message TestProto2Record {
+ optional string name = 1;
+ optional int32 value = 2;
+ optional bool flag = 3;
+}
diff --git a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
index 29665f2a306eb..730cc897a0d0e 100644
--- a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
+++ b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
@@ -36,7 +36,6 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
-import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +68,7 @@ public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
Thread.currentThread().getContextClassLoader());
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
boolean readDefaultValuesForPrimitiveTypes = formatConfig.isReadDefaultValues();
- if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
+ if ("proto3".equals(descriptor.getFile().toProto().getSyntax())) {
// pb3 always read default values for primitive types
readDefaultValuesForPrimitiveTypes = true;
}
diff --git a/flink-formats/flink-sql-protobuf/src/main/resources/META-INF/NOTICE b/flink-formats/flink-sql-protobuf/src/main/resources/META-INF/NOTICE
index c1b0186bcf609..38197b7abaea0 100644
--- a/flink-formats/flink-sql-protobuf/src/main/resources/META-INF/NOTICE
+++ b/flink-formats/flink-sql-protobuf/src/main/resources/META-INF/NOTICE
@@ -7,4 +7,4 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under BSD-3 License (https://opensource.org/licenses/BSD-3-Clause).
See bundled license files for details.
-- com.google.protobuf:protobuf-java:3.21.7
+- com.google.protobuf:protobuf-java:4.32.1
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
index 3df85ee3a7ef1..7ff4d32fa1392 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java
@@ -29,7 +29,7 @@
import org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
import org.apache.flink.util.Preconditions;
-import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.GeneratedMessage;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.graph.TimerReference;
@@ -50,14 +50,14 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner {
/** The urn which represents the function kind to be executed. */
private final String functionUrn;
- private final GeneratedMessageV3 userDefinedFunctionProto;
+ private final GeneratedMessage userDefinedFunctionProto;
public BeamTablePythonFunctionRunner(
Environment environment,
String taskName,
ProcessPythonEnvironmentManager environmentManager,
String functionUrn,
- GeneratedMessageV3 userDefinedFunctionProto,
+ GeneratedMessage userDefinedFunctionProto,
FlinkMetricContainer flinkMetricContainer,
KeyedStateBackend> keyedStateBackend,
TypeSerializer> keySerializer,
@@ -124,7 +124,7 @@ public static BeamTablePythonFunctionRunner stateless(
String taskName,
ProcessPythonEnvironmentManager environmentManager,
String functionUrn,
- GeneratedMessageV3 userDefinedFunctionProto,
+ GeneratedMessage userDefinedFunctionProto,
FlinkMetricContainer flinkMetricContainer,
MemoryManager memoryManager,
double managedMemoryFraction,
@@ -151,7 +151,7 @@ public static BeamTablePythonFunctionRunner stateful(
String taskName,
ProcessPythonEnvironmentManager environmentManager,
String functionUrn,
- GeneratedMessageV3 userDefinedFunctionProto,
+ GeneratedMessage userDefinedFunctionProto,
FlinkMetricContainer flinkMetricContainer,
KeyedStateBackend> keyedStateBackend,
TypeSerializer> keySerializer,
diff --git a/flink-python/src/main/resources/META-INF/NOTICE b/flink-python/src/main/resources/META-INF/NOTICE
index b6768c3e0118f..6a7f3ade23196 100644
--- a/flink-python/src/main/resources/META-INF/NOTICE
+++ b/flink-python/src/main/resources/META-INF/NOTICE
@@ -34,7 +34,7 @@ This project bundles the following dependencies under the BSD license.
See bundled license files for details
- net.sf.py4j:py4j:0.10.9.7
-- com.google.protobuf:protobuf-java:3.21.7
+- com.google.protobuf:protobuf-java:4.32.1
This project bundles the following dependencies under the MIT license. (https://opensource.org/licenses/MIT)
See bundled license files for details.
diff --git a/pom.xml b/pom.xml
index b9cc0ab6c8e24..f3d2ddc194af0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,7 +162,7 @@ under the License.
3.27.3
0.10.9.7
2.54.0
- 3.21.7
+ 4.32.1
3.14.9
1.20.2
1.8.0