Skip to content

Commit f11b3f2

Browse files
committed
[FLINK-23600] Rework remote module component parsing and binding
This closes #247.
1 parent 9cbcf84 commit f11b3f2

File tree

130 files changed

+3587
-4664
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

130 files changed

+3587
-4664
lines changed

statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml

Lines changed: 26 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,42 +13,29 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
version: "3.0"
17-
18-
module:
19-
meta:
20-
type: remote
21-
spec:
22-
endpoints:
23-
- endpoint:
24-
meta:
25-
kind: http
26-
spec:
27-
functions: org.apache.flink.statefun.e2e.remote/*
28-
urlPathTemplate: http://remote-function:8000/service
29-
maxNumBatchRequests: 10000
30-
ingresses:
31-
- ingress:
32-
meta:
33-
type: io.statefun.kafka/ingress
34-
id: org.apache.flink.statefun.e2e.remote/invoke
35-
spec:
36-
address: kafka-broker:9092
37-
consumerGroupId: remote-module-e2e
38-
startupPosition:
39-
type: earliest
40-
topics:
41-
- topic: invoke
42-
valueType: statefun.e2e/org.apache.flink.statefun.e2e.remote.Invoke
43-
targets:
44-
- org.apache.flink.statefun.e2e.remote/counter
45-
egresses:
46-
- egress:
47-
meta:
48-
type: io.statefun.kafka/egress
49-
id: org.apache.flink.statefun.e2e.remote/invoke-results
50-
spec:
51-
address: kafka-broker:9092
52-
deliverySemantic:
53-
type: exactly-once
54-
transactionTimeout: 15min
16+
kind: io.statefun.endpoints.v2/http
17+
spec:
18+
functions: org.apache.flink.statefun.e2e.remote/*
19+
urlPathTemplate: http://remote-function:8000/service
20+
maxNumBatchRequests: 10000
21+
---
22+
kind: io.statefun.kafka.v1/ingress
23+
spec:
24+
id: org.apache.flink.statefun.e2e.remote/invoke
25+
address: kafka-broker:9092
26+
consumerGroupId: remote-module-e2e
27+
startupPosition:
28+
type: earliest
29+
topics:
30+
- topic: invoke
31+
valueType: statefun.e2e/org.apache.flink.statefun.e2e.remote.Invoke
32+
targets:
33+
- org.apache.flink.statefun.e2e.remote/counter
34+
---
35+
kind: io.statefun.kafka.v1/egress
36+
spec:
37+
id: org.apache.flink.statefun.e2e.remote/invoke-results
38+
address: kafka-broker:9092
39+
deliverySemantic:
40+
type: exactly-once
41+
transactionTimeout: 15min

statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
version: "3.0"
17-
18-
module:
19-
meta:
20-
type: remote
21-
spec:
22-
endpoints:
23-
- endpoint:
24-
meta:
25-
kind: http
26-
spec:
27-
functions: statefun.smoke.e2e/*
28-
urlPathTemplate: http://remote-function-host:8000
29-
maxNumBatchRequests: 10000
16+
kind: io.statefun.endpoints.v2/http
17+
spec:
18+
functions: statefun.smoke.e2e/command-interpreter-fn
19+
urlPathTemplate: http://remote-function-host:8000
20+
maxNumBatchRequests: 10000

statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/resources/module.yaml

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
version: "3.0"
17-
18-
module:
19-
meta:
20-
type: remote
21-
spec:
22-
endpoints:
23-
- endpoint:
24-
meta:
25-
kind: http
26-
spec:
27-
functions: statefun.smoke.e2e/command-interpreter-fn
28-
urlPathTemplate: http://localhost:8000
29-
maxNumBatchRequests: 10000
16+
kind: io.statefun.endpoints.v2/http
17+
spec:
18+
functions: statefun.smoke.e2e/command-interpreter-fn
19+
urlPathTemplate: http://localhost:8000
20+
maxNumBatchRequests: 10000

statefun-flink/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ under the License.
3232

3333
<modules>
3434
<module>statefun-flink-common</module>
35+
<module>statefun-flink-extensions</module>
3536
<module>statefun-flink-launcher</module>
3637
<module>statefun-flink-io</module>
3738
<module>statefun-flink-io-bundle</module>

statefun-flink/statefun-flink-common/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ under the License.
3434
</properties>
3535

3636
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.flink</groupId>
39+
<artifactId>statefun-sdk-embedded</artifactId>
40+
<version>${project.version}</version>
41+
</dependency>
42+
3743
<!-- flink runtime -->
3844
<dependency>
3945
<groupId>org.apache.flink</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.statefun.flink.common.json;
20+
21+
import java.io.IOException;
22+
import java.time.Duration;
23+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
24+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
25+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
26+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
27+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
28+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
29+
import org.apache.flink.statefun.sdk.TypeName;
30+
import org.apache.flink.util.TimeUtils;
31+
32+
public final class StateFunObjectMapper {
33+
34+
public static ObjectMapper create() {
35+
final ObjectMapper mapper =
36+
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
37+
38+
final SimpleModule module = new SimpleModule("statefun");
39+
module.addDeserializer(Duration.class, new DurationJsonDeserializer());
40+
module.addDeserializer(TypeName.class, new TypeNameJsonDeserializer());
41+
42+
mapper.registerModule(module);
43+
return mapper;
44+
}
45+
46+
private static final class DurationJsonDeserializer extends JsonDeserializer<Duration> {
47+
@Override
48+
public Duration deserialize(
49+
JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
50+
return TimeUtils.parseDuration(jsonParser.getText());
51+
}
52+
}
53+
54+
private static final class TypeNameJsonDeserializer extends JsonDeserializer<TypeName> {
55+
@Override
56+
public TypeName deserialize(
57+
JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException {
58+
return TypeName.parseFrom(jsonParser.getText());
59+
}
60+
}
61+
}

statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/protobuf/FileDescriptorResolver.java

Lines changed: 0 additions & 89 deletions
This file was deleted.

statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/protobuf/ProtobufDescriptorMap.java

Lines changed: 0 additions & 100 deletions
This file was deleted.

0 commit comments

Comments
 (0)