Skip to content

Commit 9798a75

Browse files
committed
upgrade protobuf/protoc to 4.32.1
1 parent 7129b7a commit 9798a75

File tree

9 files changed

+1161
-12
lines changed

9 files changed

+1161
-12
lines changed

flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoWriters.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.parquet.hadoop.ParquetWriter;
2727
import org.apache.parquet.hadoop.api.WriteSupport;
2828
import org.apache.parquet.io.OutputFile;
29-
import org.apache.parquet.proto.ProtoWriteSupport;
3029

3130
/** Convenience builder for creating {@link ParquetWriterFactory} instances for Protobuf classes. */
3231
public class ParquetProtoWriters {
@@ -62,7 +61,8 @@ protected ParquetProtoWriterBuilder<T> self() {
6261

6362
@Override
6463
protected WriteSupport<T> getWriteSupport(Configuration conf) {
65-
return new ProtoWriteSupport<>(clazz);
64+
// Use patched implementation compatible with protobuf 4.x
65+
return new PatchedProtoWriteSupport<>(clazz);
6666
}
6767
}
6868

flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/protobuf/PatchedProtoWriteSupport.java

Lines changed: 874 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.formats.parquet.protobuf;
20+
21+
import org.apache.hadoop.fs.Path;
22+
import org.apache.parquet.hadoop.ParquetReader;
23+
import org.apache.parquet.hadoop.ParquetWriter;
24+
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
25+
import org.apache.parquet.hadoop.util.HadoopOutputFile;
26+
import org.apache.parquet.io.OutputFile;
27+
import org.apache.parquet.proto.ProtoParquetReader;
28+
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.io.TempDir;
30+
31+
import java.io.File;
32+
import java.io.IOException;
33+
34+
import static org.apache.flink.formats.parquet.protobuf.SimpleRecord.SimpleProtoRecord;
35+
import static org.apache.flink.formats.parquet.protobuf.TestProto2.TestProto2Record;
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
/**
39+
* Tests for {@link PatchedProtoWriteSupport} to verify protobuf 4.x compatibility.
40+
*
41+
* <p>This test validates that the patched string-based syntax detection correctly handles both
42+
* proto2 and proto3 messages when using protobuf 4.x, where the enum-based Syntax API was removed.
43+
*/
44+
class PatchedProtoWriteSupportTest {
45+
46+
@TempDir File tempDir;
47+
48+
/**
49+
* Tests that proto3 messages can be written and read correctly with the patched write support.
50+
*/
51+
@Test
52+
void testProto3SyntaxDetection() throws IOException {
53+
File outputFile = new File(tempDir, "proto3_test.parquet");
54+
Path path = new Path(outputFile.toURI());
55+
56+
// Create a proto3 message
57+
SimpleProtoRecord record =
58+
SimpleProtoRecord.newBuilder()
59+
.setFoo("test_foo")
60+
.setBar("test_bar")
61+
.setNum(42)
62+
.build();
63+
64+
// Write using PatchedProtoWriteSupport directly
65+
try (ParquetWriter<SimpleProtoRecord> writer =
66+
new ParquetWriter<>(
67+
path,
68+
new PatchedProtoWriteSupport<>(SimpleProtoRecord.class),
69+
CompressionCodecName.SNAPPY,
70+
ParquetWriter.DEFAULT_BLOCK_SIZE,
71+
ParquetWriter.DEFAULT_PAGE_SIZE)) {
72+
writer.write(record);
73+
}
74+
75+
// Read back and verify
76+
try (ParquetReader<SimpleProtoRecord.Builder> reader =
77+
ProtoParquetReader.<SimpleProtoRecord.Builder>builder(path).build()) {
78+
SimpleProtoRecord.Builder readRecord = reader.read();
79+
assertThat(readRecord).isNotNull();
80+
assertThat(readRecord.build()).isEqualTo(record);
81+
}
82+
}
83+
84+
/**
85+
* Tests that proto2 messages can be written and read correctly with the patched write support.
86+
*/
87+
@Test
88+
void testProto2SyntaxDetection() throws IOException {
89+
File outputFile = new File(tempDir, "proto2_test.parquet");
90+
Path path = new Path(outputFile.toURI());
91+
92+
// Create a proto2 message with only some fields set
93+
TestProto2Record record =
94+
TestProto2Record.newBuilder().setName("test_name").setValue(123).build();
95+
96+
// Write using PatchedProtoWriteSupport directly
97+
try (ParquetWriter<TestProto2Record> writer =
98+
new ParquetWriter<>(
99+
path,
100+
new PatchedProtoWriteSupport<>(TestProto2Record.class),
101+
CompressionCodecName.SNAPPY,
102+
ParquetWriter.DEFAULT_BLOCK_SIZE,
103+
ParquetWriter.DEFAULT_PAGE_SIZE)) {
104+
writer.write(record);
105+
}
106+
107+
// Read back and verify
108+
try (ParquetReader<TestProto2Record.Builder> reader =
109+
ProtoParquetReader.<TestProto2Record.Builder>builder(path).build()) {
110+
TestProto2Record.Builder readRecord = reader.read();
111+
assertThat(readRecord).isNotNull();
112+
TestProto2Record result = readRecord.build();
113+
assertThat(result.getName()).isEqualTo("test_name");
114+
assertThat(result.getValue()).isEqualTo(123);
115+
// flag field was not set, should be default
116+
assertThat(result.hasFlag()).isFalse();
117+
}
118+
}
119+
120+
/**
121+
* Tests that proto3 messages with default values are handled correctly.
122+
*
123+
* <p>In proto3, all fields are written including those with default values.
124+
*/
125+
@Test
126+
void testProto3WithDefaults() throws IOException {
127+
File outputFile = new File(tempDir, "proto3_defaults.parquet");
128+
Path path = new Path(outputFile.toURI());
129+
130+
// Create a proto3 message with default values
131+
SimpleProtoRecord record =
132+
SimpleProtoRecord.newBuilder().setFoo("").setBar("").setNum(0).build();
133+
134+
// Write using PatchedProtoWriteSupport
135+
try (ParquetWriter<SimpleProtoRecord> writer =
136+
new ParquetWriter<>(
137+
path,
138+
new PatchedProtoWriteSupport<>(SimpleProtoRecord.class),
139+
CompressionCodecName.SNAPPY,
140+
ParquetWriter.DEFAULT_BLOCK_SIZE,
141+
ParquetWriter.DEFAULT_PAGE_SIZE)) {
142+
writer.write(record);
143+
}
144+
145+
// Read back and verify - proto3 should read all fields even if default
146+
try (ParquetReader<SimpleProtoRecord.Builder> reader =
147+
ProtoParquetReader.<SimpleProtoRecord.Builder>builder(path).build()) {
148+
SimpleProtoRecord.Builder readRecord = reader.read();
149+
assertThat(readRecord).isNotNull();
150+
assertThat(readRecord.build()).isEqualTo(record);
151+
}
152+
}
153+
154+
/**
155+
* Tests that proto2 only writes fields that have been explicitly set.
156+
*
157+
* <p>In proto2, unset optional fields should not be written to the file.
158+
*/
159+
@Test
160+
void testProto2OnlyWritesSetFields() throws IOException {
161+
File outputFile = new File(tempDir, "proto2_partial.parquet");
162+
Path path = new Path(outputFile.toURI());
163+
164+
// Create a proto2 message with only one field set
165+
TestProto2Record record = TestProto2Record.newBuilder().setName("only_name").build();
166+
167+
// Write using PatchedProtoWriteSupport
168+
try (ParquetWriter<TestProto2Record> writer =
169+
new ParquetWriter<>(
170+
path,
171+
new PatchedProtoWriteSupport<>(TestProto2Record.class),
172+
CompressionCodecName.SNAPPY,
173+
ParquetWriter.DEFAULT_BLOCK_SIZE,
174+
ParquetWriter.DEFAULT_PAGE_SIZE)) {
175+
writer.write(record);
176+
}
177+
178+
// Read back and verify
179+
try (ParquetReader<TestProto2Record.Builder> reader =
180+
ProtoParquetReader.<TestProto2Record.Builder>builder(path).build()) {
181+
TestProto2Record.Builder readRecord = reader.read();
182+
assertThat(readRecord).isNotNull();
183+
TestProto2Record result = readRecord.build();
184+
assertThat(result.getName()).isEqualTo("only_name");
185+
// value and flag were not set
186+
assertThat(result.hasValue()).isFalse();
187+
assertThat(result.hasFlag()).isFalse();
188+
}
189+
}
190+
191+
/**
192+
* Integration test using ParquetProtoWriters (Flink's production API).
193+
*
194+
* <p>This validates that PatchedProtoWriteSupport works correctly when used through Flink's
195+
* ParquetProtoWriters factory, which is the actual production code path.
196+
*/
197+
@Test
198+
void testViaParquetProtoWritersForProto3() throws IOException {
199+
File outputFile = new File(tempDir, "proto3_via_writers.parquet");
200+
Path hadoopPath = new Path(outputFile.toURI());
201+
OutputFile outputFileObj =
202+
HadoopOutputFile.fromPath(hadoopPath, new org.apache.hadoop.conf.Configuration());
203+
204+
// Create a proto3 message
205+
SimpleProtoRecord record =
206+
SimpleProtoRecord.newBuilder()
207+
.setFoo("via_writers")
208+
.setBar("test")
209+
.setNum(99)
210+
.build();
211+
212+
// Write using ParquetProtoWriters (production code path)
213+
try (ParquetWriter<SimpleProtoRecord> writer =
214+
new ParquetProtoWriters.ParquetProtoWriterBuilder<>(
215+
outputFileObj, SimpleProtoRecord.class)
216+
.withCompressionCodec(CompressionCodecName.SNAPPY)
217+
.build()) {
218+
writer.write(record);
219+
}
220+
221+
// Read back and verify
222+
try (ParquetReader<SimpleProtoRecord.Builder> reader =
223+
ProtoParquetReader.<SimpleProtoRecord.Builder>builder(hadoopPath).build()) {
224+
SimpleProtoRecord.Builder readRecord = reader.read();
225+
assertThat(readRecord).isNotNull();
226+
assertThat(readRecord.build()).isEqualTo(record);
227+
}
228+
}
229+
230+
/**
231+
* Integration test using ParquetProtoWriters for proto2 messages.
232+
*
233+
* <p>Verifies that proto2 syntax detection works correctly through the production API.
234+
*/
235+
@Test
236+
void testViaParquetProtoWritersForProto2() throws IOException {
237+
File outputFile = new File(tempDir, "proto2_via_writers.parquet");
238+
Path hadoopPath = new Path(outputFile.toURI());
239+
OutputFile outputFileObj =
240+
HadoopOutputFile.fromPath(hadoopPath, new org.apache.hadoop.conf.Configuration());
241+
242+
// Create a proto2 message with partial fields
243+
TestProto2Record record =
244+
TestProto2Record.newBuilder().setName("proto2_writer").setFlag(true).build();
245+
246+
// Write using ParquetProtoWriters (production code path)
247+
try (ParquetWriter<TestProto2Record> writer =
248+
new ParquetProtoWriters.ParquetProtoWriterBuilder<>(
249+
outputFileObj, TestProto2Record.class)
250+
.withCompressionCodec(CompressionCodecName.SNAPPY)
251+
.build()) {
252+
writer.write(record);
253+
}
254+
255+
// Read back and verify
256+
try (ParquetReader<TestProto2Record.Builder> reader =
257+
ProtoParquetReader.<TestProto2Record.Builder>builder(hadoopPath).build()) {
258+
TestProto2Record.Builder readRecord = reader.read();
259+
assertThat(readRecord).isNotNull();
260+
TestProto2Record result = readRecord.build();
261+
assertThat(result.getName()).isEqualTo("proto2_writer");
262+
assertThat(result.getFlag()).isTrue();
263+
// value was not set
264+
assertThat(result.hasValue()).isFalse();
265+
}
266+
}
267+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
syntax = "proto2";
2+
3+
package org.apache.flink.formats.parquet.protobuf;
4+
5+
message TestProto2Record {
6+
optional string name = 1;
7+
optional int32 value = 2;
8+
optional bool flag = 3;
9+
}

flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636

3737
import com.google.protobuf.ByteString;
3838
import com.google.protobuf.Descriptors;
39-
import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
4039
import org.slf4j.Logger;
4140
import org.slf4j.LoggerFactory;
4241

@@ -69,7 +68,7 @@ public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig)
6968
Thread.currentThread().getContextClassLoader());
7069
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
7170
boolean readDefaultValuesForPrimitiveTypes = formatConfig.isReadDefaultValues();
72-
if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
71+
if ("proto3".equals(descriptor.getFile().toProto().getSyntax())) {
7372
// pb3 always read default values for primitive types
7473
readDefaultValuesForPrimitiveTypes = true;
7574
}

flink-formats/flink-sql-protobuf/src/main/resources/META-INF/NOTICE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ The Apache Software Foundation (http://www.apache.org/).
77
This project bundles the following dependencies under BSD-3 License (https://opensource.org/licenses/BSD-3-Clause).
88
See bundled license files for details.
99

10-
- com.google.protobuf:protobuf-java:3.21.7
10+
- com.google.protobuf:protobuf-java:4.32.1

flink-python/src/main/java/org/apache/flink/table/runtime/runners/python/beam/BeamTablePythonFunctionRunner.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
3030
import org.apache.flink.util.Preconditions;
3131

32-
import com.google.protobuf.GeneratedMessageV3;
32+
import com.google.protobuf.GeneratedMessage;
3333
import org.apache.beam.model.pipeline.v1.RunnerApi;
3434
import org.apache.beam.runners.core.construction.graph.TimerReference;
3535

@@ -50,14 +50,14 @@ public class BeamTablePythonFunctionRunner extends BeamPythonFunctionRunner {
5050
/** The urn which represents the function kind to be executed. */
5151
private final String functionUrn;
5252

53-
private final GeneratedMessageV3 userDefinedFunctionProto;
53+
private final GeneratedMessage userDefinedFunctionProto;
5454

5555
public BeamTablePythonFunctionRunner(
5656
Environment environment,
5757
String taskName,
5858
ProcessPythonEnvironmentManager environmentManager,
5959
String functionUrn,
60-
GeneratedMessageV3 userDefinedFunctionProto,
60+
GeneratedMessage userDefinedFunctionProto,
6161
FlinkMetricContainer flinkMetricContainer,
6262
KeyedStateBackend<?> keyedStateBackend,
6363
TypeSerializer<?> keySerializer,
@@ -124,7 +124,7 @@ public static BeamTablePythonFunctionRunner stateless(
124124
String taskName,
125125
ProcessPythonEnvironmentManager environmentManager,
126126
String functionUrn,
127-
GeneratedMessageV3 userDefinedFunctionProto,
127+
GeneratedMessage userDefinedFunctionProto,
128128
FlinkMetricContainer flinkMetricContainer,
129129
MemoryManager memoryManager,
130130
double managedMemoryFraction,
@@ -151,7 +151,7 @@ public static BeamTablePythonFunctionRunner stateful(
151151
String taskName,
152152
ProcessPythonEnvironmentManager environmentManager,
153153
String functionUrn,
154-
GeneratedMessageV3 userDefinedFunctionProto,
154+
GeneratedMessage userDefinedFunctionProto,
155155
FlinkMetricContainer flinkMetricContainer,
156156
KeyedStateBackend<?> keyedStateBackend,
157157
TypeSerializer<?> keySerializer,

flink-python/src/main/resources/META-INF/NOTICE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ This project bundles the following dependencies under the BSD license.
3434
See bundled license files for details
3535

3636
- net.sf.py4j:py4j:0.10.9.7
37-
- com.google.protobuf:protobuf-java:3.21.7
37+
- com.google.protobuf:protobuf-java:4.32.1
3838

3939
This project bundles the following dependencies under the MIT license. (https://opensource.org/licenses/MIT)
4040
See bundled license files for details.

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ under the License.
162162
<assertj.version>3.27.3</assertj.version>
163163
<py4j.version>0.10.9.7</py4j.version>
164164
<beam.version>2.54.0</beam.version>
165-
<protoc.version>3.21.7</protoc.version>
165+
<protoc.version>4.32.1</protoc.version>
166166
<okhttp.version>3.14.9</okhttp.version>
167167
<testcontainers.version>1.20.2</testcontainers.version>
168168
<lz4.version>1.8.0</lz4.version>

0 commit comments

Comments
 (0)