Skip to content

Commit b06c0a2

Browse files
authored
[FLINK-31619] Upgrade Stateful Functions to Flink 1.16.2. This closes #331
* [FLINK-31619] Upgrade Stateful Functions to Flink 1.16.1 Upgrade Flink version to 1.16.2 (this has been released since the issue was created) and update dependency-confict resolutions. Handle org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders moving to org.apache.flink.util.FlinkUserCodeClassLoaders. Fix ReductionsTest.java. Modify construction of Netty request headers
1 parent 4b1e0ff commit b06c0a2

File tree

5 files changed

+24
-50
lines changed

5 files changed

+24
-50
lines changed

pom.xml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ under the License.
7979
<protobuf.version>3.7.1</protobuf.version>
8080
<unixsocket.version>2.3.2</unixsocket.version>
8181
<protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version>
82-
<flink.version>1.15.2</flink.version>
82+
<flink.version>1.16.2</flink.version>
8383
<scala.binary.version>2.12</scala.binary.version>
8484
<scala.version>2.12.7</scala.version>
8585
<lz4-java.version>1.8.0</lz4-java.version>
@@ -110,26 +110,27 @@ under the License.
110110
<version>1.3</version>
111111
<scope>test</scope>
112112
</dependency>
113+
113114
<!--
114115
Resolve dependency convergence issue:
115-
flink-core:1.15.2 depends on kryo:2.24.0
116-
flink-java:1.15.2 depends on kryo:2.21 (via com.twitter:chill-java:0.7.6)
117-
-->
116+
flink-core:1.16.2 depends on kryo:2.24.0
117+
flink-java:1.16.2 depends on kryo:2.21 (via com.twitter:chill-java:0.7.6)
118+
-->
118119
<dependency>
119120
<groupId>com.esotericsoftware.kryo</groupId>
120121
<artifactId>kryo</artifactId>
121122
<version>2.24.0</version>
122123
</dependency>
123124
<!--
124125
Resolve dependency convergence issue:
125-
flink-connector-kinesis:1.15.2 depends on jackson-databind:2.13.2.2
126-
flink-connector-kinesis:1.15.2 depends on jackson-databind:2.13.2
127-
(via com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.2)
126+
flink-connector-kinesis:1.16.2 depends on jackson-databind:2.13.4.2
127+
flink-connector-kinesis:1.16.2 depends on jackson-databind:2.13.4
128+
(via com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.4)
128129
-->
129130
<dependency>
130131
<groupId>com.fasterxml.jackson.core</groupId>
131132
<artifactId>jackson-databind</artifactId>
132-
<version>2.13.2.2</version>
133+
<version>2.13.4.2</version>
133134
</dependency>
134135
</dependencies>
135136
</dependencyManagement>

statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
import java.util.concurrent.atomic.AtomicInteger;
2424
import org.apache.flink.api.java.utils.ParameterTool;
2525
import org.apache.flink.configuration.Configuration;
26-
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
2726
import org.apache.flink.statefun.flink.core.feedback.FeedbackKey;
2827
import org.apache.flink.statefun.flink.core.message.Message;
2928
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
3029
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3130
import org.apache.flink.util.FlinkUserCodeClassLoader;
31+
import org.apache.flink.util.FlinkUserCodeClassLoaders;
3232

3333
public class StatefulFunctionsJob {
3434

statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequestReplyHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ private DefaultHttpHeaders headers(NettyRequest req, ByteBuf bodyBuf) {
132132
if (cachedHeaders != null) {
133133
headers = cachedHeaders;
134134
} else {
135-
headers = new DefaultHttpHeaders(false);
135+
headers = new DefaultHttpHeaders();
136136
headers.add(req.headers());
137137
this.cachedHeaders = headers;
138138
}

statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,54 +21,24 @@
2121
import static org.junit.Assert.assertThat;
2222

2323
import java.io.Serializable;
24-
import java.util.Collection;
25-
import java.util.HashMap;
26-
import java.util.Iterator;
27-
import java.util.List;
28-
import java.util.Map;
24+
import java.util.*;
2925
import java.util.Map.Entry;
30-
import java.util.Set;
3126
import java.util.stream.Stream;
3227
import javax.annotation.Nonnull;
3328
import org.apache.flink.api.common.ExecutionConfig;
3429
import org.apache.flink.api.common.JobID;
35-
import org.apache.flink.api.common.accumulators.Accumulator;
36-
import org.apache.flink.api.common.accumulators.DoubleCounter;
30+
import org.apache.flink.api.common.accumulators.*;
3731
import org.apache.flink.api.common.accumulators.Histogram;
38-
import org.apache.flink.api.common.accumulators.IntCounter;
39-
import org.apache.flink.api.common.accumulators.LongCounter;
4032
import org.apache.flink.api.common.cache.DistributedCache;
4133
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
4234
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
4335
import org.apache.flink.api.common.functions.RuntimeContext;
44-
import org.apache.flink.api.common.state.AggregatingState;
45-
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
46-
import org.apache.flink.api.common.state.ListState;
47-
import org.apache.flink.api.common.state.ListStateDescriptor;
48-
import org.apache.flink.api.common.state.MapState;
49-
import org.apache.flink.api.common.state.MapStateDescriptor;
50-
import org.apache.flink.api.common.state.ReducingState;
51-
import org.apache.flink.api.common.state.ReducingStateDescriptor;
52-
import org.apache.flink.api.common.state.State;
53-
import org.apache.flink.api.common.state.StateDescriptor;
54-
import org.apache.flink.api.common.state.ValueState;
55-
import org.apache.flink.api.common.state.ValueStateDescriptor;
36+
import org.apache.flink.api.common.state.*;
5637
import org.apache.flink.api.common.typeutils.TypeSerializer;
5738
import org.apache.flink.api.java.tuple.Tuple2;
58-
import org.apache.flink.metrics.CharacterFilter;
59-
import org.apache.flink.metrics.Counter;
60-
import org.apache.flink.metrics.Gauge;
61-
import org.apache.flink.metrics.Meter;
62-
import org.apache.flink.metrics.MetricGroup;
63-
import org.apache.flink.metrics.SimpleCounter;
39+
import org.apache.flink.metrics.*;
6440
import org.apache.flink.metrics.groups.OperatorMetricGroup;
65-
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
66-
import org.apache.flink.runtime.state.Keyed;
67-
import org.apache.flink.runtime.state.KeyedStateBackend;
68-
import org.apache.flink.runtime.state.KeyedStateFunction;
69-
import org.apache.flink.runtime.state.PriorityComparable;
70-
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
71-
import org.apache.flink.runtime.state.VoidNamespace;
41+
import org.apache.flink.runtime.state.*;
7242
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
7343
import org.apache.flink.runtime.state.internal.InternalListState;
7444
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors;
@@ -352,10 +322,13 @@ public boolean deregisterKeySelectionListener(KeySelectionListener<Object> liste
352322

353323
@Nonnull
354324
@Override
355-
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
356-
@Nonnull TypeSerializer<N> namespaceSerializer,
357-
@Nonnull StateDescriptor<S, SV> stateDesc,
358-
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
325+
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(
326+
@Nonnull TypeSerializer<N> typeSerializer,
327+
@Nonnull StateDescriptor<S, SV> stateDescriptor,
328+
@Nonnull
329+
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
330+
stateSnapshotTransformFactory)
331+
throws Exception {
359332
throw new UnsupportedOperationException();
360333
}
361334

tools/docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
FROM apache/flink:1.15.2-scala_2.12-java8
16+
FROM apache/flink:1.16.2-scala_2.12-java8
1717

1818
ENV ROLE worker
1919
ENV MASTER_HOST localhost

0 commit comments

Comments
 (0)