Skip to content

Commit 7b0a882

Browse files
[FLINK-38564][connector] FLIP-537: Enumerator maintains global splits distribution for reassignment
[FLINK-38564][connector] FLIP-537: Enumerator maintains global splits distribution for reassignment. This closes #27149.
1 parent bc84294 commit 7b0a882

File tree

16 files changed

+424
-89
lines changed

16 files changed

+424
-89
lines changed

flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,44 @@
2121
import org.apache.flink.annotation.Public;
2222

2323
import java.io.Serializable;
24+
import java.util.Collections;
25+
import java.util.List;
2426
import java.util.Objects;
2527

26-
/** A container class hosting the information of a {@link SourceReader}. */
28+
/**
29+
* A container class hosting the information of a {@link SourceReader}.
30+
*
31+
* <p>The {@code reportedSplitsOnRegistration} can only be provided when the source implements
32+
* {@link SupportsSplitReassignmentOnRecovery}.
33+
*/
2734
@Public
2835
public final class ReaderInfo implements Serializable {
2936

3037
private static final long serialVersionUID = 1L;
3138

3239
private final int subtaskId;
3340
private final String location;
41+
private final List<SourceSplit> reportedSplitsOnRegistration;
3442

3543
public ReaderInfo(int subtaskId, String location) {
44+
this(subtaskId, location, Collections.emptyList());
45+
}
46+
47+
ReaderInfo(int subtaskId, String location, List<SourceSplit> splits) {
3648
this.subtaskId = subtaskId;
3749
this.location = location;
50+
this.reportedSplitsOnRegistration = splits;
51+
}
52+
53+
@SuppressWarnings("unchecked")
54+
public static <SplitT extends SourceSplit> ReaderInfo createReaderInfo(
55+
int subtaskId, String location, List<SplitT> splits) {
56+
return new ReaderInfo(subtaskId, location, (List<SourceSplit>) splits);
57+
}
58+
59+
@SuppressWarnings("unchecked")
60+
public <SplitT extends SourceSplit> List<SplitT> getReportedSplitsOnRegistration() {
61+
return (List<SplitT>) reportedSplitsOnRegistration;
3862
}
3963

4064
/**
@@ -52,16 +76,18 @@ public String getLocation() {
5276
}
5377

5478
@Override
55-
public int hashCode() {
56-
return Objects.hash(subtaskId, location);
79+
public boolean equals(Object o) {
80+
if (!(o instanceof ReaderInfo)) {
81+
return false;
82+
}
83+
ReaderInfo that = (ReaderInfo) o;
84+
return subtaskId == that.subtaskId
85+
&& Objects.equals(location, that.location)
86+
&& Objects.equals(reportedSplitsOnRegistration, that.reportedSplitsOnRegistration);
5787
}
5888

5989
@Override
60-
public boolean equals(Object obj) {
61-
if (!(obj instanceof ReaderInfo)) {
62-
return false;
63-
}
64-
ReaderInfo other = (ReaderInfo) obj;
65-
return subtaskId == other.subtaskId && location.equals(other.location);
90+
public int hashCode() {
91+
return Objects.hash(subtaskId, location, reportedSplitsOnRegistration);
6692
}
6793
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.source;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
23+
/**
24+
* A decorative interface for {@link Source}. Implementing this interface indicates that the source
25+
* operator needs to report splits to the enumerator on start up and receive reassignment on
26+
* recovery.
27+
*/
28+
@PublicEvolving
29+
public interface SupportsSplitReassignmentOnRecovery {}

flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java

Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.api.connector.source.mocks;
2020

21+
import org.apache.flink.api.connector.source.ReaderInfo;
2122
import org.apache.flink.api.connector.source.SourceEvent;
2223
import org.apache.flink.api.connector.source.SplitEnumerator;
2324
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -28,20 +29,20 @@
2829

2930
import java.io.IOException;
3031
import java.util.ArrayList;
32+
import java.util.Collection;
3133
import java.util.Collections;
32-
import java.util.Comparator;
3334
import java.util.HashMap;
3435
import java.util.HashSet;
3536
import java.util.List;
3637
import java.util.Map;
3738
import java.util.Set;
38-
import java.util.SortedSet;
39-
import java.util.TreeSet;
39+
import java.util.stream.Collectors;
4040

4141
/** A mock {@link SplitEnumerator} for unit tests. */
4242
public class MockSplitEnumerator
4343
implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>, SupportsBatchSnapshot {
44-
private final SortedSet<MockSourceSplit> unassignedSplits;
44+
private final Map<Integer, Set<MockSourceSplit>> pendingSplitAssignment;
45+
private final Map<String, Integer> globalSplitAssignment;
4546
private final SplitEnumeratorContext<MockSourceSplit> enumContext;
4647
private final List<SourceEvent> handledSourceEvent;
4748
private final List<Long> successfulCheckpoints;
@@ -50,22 +51,24 @@ public class MockSplitEnumerator
5051

5152
public MockSplitEnumerator(int numSplits, SplitEnumeratorContext<MockSourceSplit> enumContext) {
5253
this(new HashSet<>(), enumContext);
54+
List<MockSourceSplit> unassignedSplits = new ArrayList<>();
5355
for (int i = 0; i < numSplits; i++) {
5456
unassignedSplits.add(new MockSourceSplit(i));
5557
}
58+
recalculateAssignments(unassignedSplits);
5659
}
5760

5861
public MockSplitEnumerator(
5962
Set<MockSourceSplit> unassignedSplits,
6063
SplitEnumeratorContext<MockSourceSplit> enumContext) {
61-
this.unassignedSplits =
62-
new TreeSet<>(Comparator.comparingInt(o -> Integer.parseInt(o.splitId())));
63-
this.unassignedSplits.addAll(unassignedSplits);
64+
this.pendingSplitAssignment = new HashMap<>();
65+
this.globalSplitAssignment = new HashMap<>();
6466
this.enumContext = enumContext;
6567
this.handledSourceEvent = new ArrayList<>();
6668
this.successfulCheckpoints = new ArrayList<>();
6769
this.started = false;
6870
this.closed = false;
71+
recalculateAssignments(unassignedSplits);
6972
}
7073

7174
@Override
@@ -83,25 +86,36 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
8386

8487
@Override
8588
public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
86-
unassignedSplits.addAll(splits);
89+
// add back to same subtaskId.
90+
putPendingAssignments(subtaskId, splits);
8791
}
8892

8993
@Override
9094
public void addReader(int subtaskId) {
91-
List<MockSourceSplit> assignment = new ArrayList<>();
92-
for (MockSourceSplit split : unassignedSplits) {
93-
if (Integer.parseInt(split.splitId()) % enumContext.currentParallelism() == subtaskId) {
94-
assignment.add(split);
95+
ReaderInfo readerInfo = enumContext.registeredReaders().get(subtaskId);
96+
List<MockSourceSplit> splitsOnRecovery = readerInfo.getReportedSplitsOnRegistration();
97+
98+
List<MockSourceSplit> redistributedSplits = new ArrayList<>();
99+
List<MockSourceSplit> addBackSplits = new ArrayList<>();
100+
for (MockSourceSplit split : splitsOnRecovery) {
101+
if (!globalSplitAssignment.containsKey(split.splitId())) {
102+
// if the split is not present in globalSplitAssignment, it means that this split is
103+
// being registered for the first time and is eligible for redistribution.
104+
redistributedSplits.add(split);
105+
} else if (!globalSplitAssignment.containsKey(split.splitId())) {
106+
// if split is already assigned to other sub-task, just ignore it. Otherwise, add
107+
// back to this sub-task again.
108+
addBackSplits.add(split);
95109
}
96110
}
97-
enumContext.assignSplits(
98-
new SplitsAssignment<>(Collections.singletonMap(subtaskId, assignment)));
99-
unassignedSplits.removeAll(assignment);
111+
recalculateAssignments(redistributedSplits);
112+
putPendingAssignments(subtaskId, addBackSplits);
113+
assignAllSplits();
100114
}
101115

102116
@Override
103117
public Set<MockSourceSplit> snapshotState(long checkpointId) {
104-
return unassignedSplits;
118+
return getUnassignedSplits();
105119
}
106120

107121
@Override
@@ -114,11 +128,6 @@ public void close() throws IOException {
114128
this.closed = true;
115129
}
116130

117-
public void addNewSplits(List<MockSourceSplit> newSplits) {
118-
unassignedSplits.addAll(newSplits);
119-
assignAllSplits();
120-
}
121-
122131
// --------------------
123132

124133
public boolean started() {
@@ -130,7 +139,9 @@ public boolean closed() {
130139
}
131140

132141
public Set<MockSourceSplit> getUnassignedSplits() {
133-
return unassignedSplits;
142+
return pendingSplitAssignment.values().stream()
143+
.flatMap(Set::stream)
144+
.collect(Collectors.toSet());
134145
}
135146

136147
public List<SourceEvent> getHandledSourceEvent() {
@@ -145,17 +156,27 @@ public List<Long> getSuccessfulCheckpoints() {
145156

146157
private void assignAllSplits() {
147158
Map<Integer, List<MockSourceSplit>> assignment = new HashMap<>();
148-
unassignedSplits.forEach(
149-
split -> {
150-
int subtaskId =
151-
Integer.parseInt(split.splitId()) % enumContext.currentParallelism();
152-
if (enumContext.registeredReaders().containsKey(subtaskId)) {
153-
assignment
154-
.computeIfAbsent(subtaskId, ignored -> new ArrayList<>())
155-
.add(split);
156-
}
157-
});
159+
for (Map.Entry<Integer, Set<MockSourceSplit>> iter : pendingSplitAssignment.entrySet()) {
160+
Integer subtaskId = iter.getKey();
161+
if (enumContext.registeredReaders().containsKey(subtaskId)) {
162+
assignment.put(subtaskId, new ArrayList<>(iter.getValue()));
163+
}
164+
}
158165
enumContext.assignSplits(new SplitsAssignment<>(assignment));
159-
assignment.values().forEach(l -> unassignedSplits.removeAll(l));
166+
assignment.keySet().forEach(pendingSplitAssignment::remove);
167+
}
168+
169+
private void recalculateAssignments(Collection<MockSourceSplit> newSplits) {
170+
for (MockSourceSplit split : newSplits) {
171+
int subtaskId = Integer.parseInt(split.splitId()) % enumContext.currentParallelism();
172+
putPendingAssignments(subtaskId, Collections.singletonList(split));
173+
}
174+
}
175+
176+
private void putPendingAssignments(int subtaskId, Collection<MockSourceSplit> splits) {
177+
Set<MockSourceSplit> pendingSplits =
178+
pendingSplitAssignment.computeIfAbsent(subtaskId, HashSet::new);
179+
pendingSplits.addAll(splits);
180+
splits.forEach(split -> globalSplitAssignment.put(split.splitId(), subtaskId));
160181
}
161182
}

flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
113113
/** The Source that is associated with this SourceCoordinator. */
114114
private final Source<?, SplitT, EnumChkT> source;
115115

116+
/** The serializer that handles the serde of the split. */
117+
private final SimpleVersionedSerializer<SplitT> splitSerializer;
118+
116119
/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
117120
private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
118121

@@ -163,6 +166,7 @@ public SourceCoordinator(
163166
this.operatorName = operatorName;
164167
this.source = source;
165168
this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
169+
this.splitSerializer = source.getSplitSerializer();
166170
this.context = context;
167171
this.coordinatorStore = coordinatorStore;
168172
this.watermarkAlignmentParams = watermarkAlignmentParams;
@@ -427,7 +431,7 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> r
427431
// assignments
428432
byte[] assignmentData =
429433
context.getAssignmentTracker()
430-
.snapshotState(source.getSplitSerializer());
434+
.snapshotState(splitSerializer);
431435
out.writeInt(assignmentData.length);
432436
out.write(assignmentData);
433437

@@ -680,7 +684,7 @@ private void handleSourceEvent(int subtask, int attemptNumber, SourceEvent event
680684
}
681685

682686
private void handleReaderRegistrationEvent(
683-
int subtask, int attemptNumber, ReaderRegistrationEvent event) {
687+
int subtask, int attemptNumber, ReaderRegistrationEvent event) throws Exception {
684688
checkArgument(subtask == event.subtaskId());
685689

686690
LOG.info(
@@ -692,7 +696,8 @@ private void handleReaderRegistrationEvent(
692696

693697
final boolean subtaskReaderExisted =
694698
context.registeredReadersOfAttempts().containsKey(subtask);
695-
context.registerSourceReader(subtask, attemptNumber, event.location());
699+
context.registerSourceReader(
700+
subtask, attemptNumber, event.location(), event.splits(splitSerializer));
696701
if (!subtaskReaderExisted) {
697702
enumerator.addReader(event.subtaskId());
698703

flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,16 +465,18 @@ void onCheckpoint(long checkpointId) throws Exception {
465465
* @param subtaskId the subtask id of the source reader.
466466
* @param attemptNumber the attempt number of the source reader.
467467
* @param location the location of the source reader.
468+
* @param splits the split restored from source reader's state.
468469
*/
469-
void registerSourceReader(int subtaskId, int attemptNumber, String location) {
470+
void registerSourceReader(
471+
int subtaskId, int attemptNumber, String location, List<SplitT> splits) {
470472
final Map<Integer, ReaderInfo> attemptReaders =
471473
registeredReaders.computeIfAbsent(subtaskId, k -> new ConcurrentHashMap<>());
472474
checkState(
473475
!attemptReaders.containsKey(attemptNumber),
474476
"ReaderInfo of subtask %s (#%s) already exists.",
475477
subtaskId,
476478
attemptNumber);
477-
attemptReaders.put(attemptNumber, new ReaderInfo(subtaskId, location));
479+
attemptReaders.put(attemptNumber, ReaderInfo.createReaderInfo(subtaskId, location, splits));
478480

479481
sendCachedSplitsToNewlyRegisteredReader(subtaskId, attemptNumber);
480482
}

0 commit comments

Comments
 (0)