Skip to content

Commit 20e6007

Browse files
authored
[BitSail][CDC] Add cdc source coordinator unit test (#407)
1 parent d575868 commit 20e6007

File tree

4 files changed

+142
-4
lines changed

4 files changed

+142
-4
lines changed

bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/coordinator/CDCSourceSplitCoordinator.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ public CDCSourceSplitCoordinator(SourceSplitCoordinator.Context<BinlogSplit, Bas
5858

5959
@Override
6060
public void start() {
61-
int totalReader = this.context.registeredReaders().size();
62-
LOG.info("Total registered reader number: {}", totalReader);
6361
if (!this.isBinlogAssigned) {
6462
List<BinlogSplit> splitList = new ArrayList<>();
6563
BinlogSplit split = createSplit(this.jobConf);

bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/offset/BinlogOffset.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ public static BinlogOffset specified() {
5757
}
5858

5959
public static BinlogOffset createFromJobConf(BitSailConfiguration jobConf) {
60-
String rawOffsetType = jobConf.getNecessaryOption(
61-
BinlogReaderOptions.INITIAL_OFFSET_TYPE, BinlogReaderErrorCode.REQUIRED_VALUE).toUpperCase().trim();
60+
String rawOffsetType = jobConf.get(BinlogReaderOptions.INITIAL_OFFSET_TYPE).toUpperCase().trim();
6261
BinlogOffsetType offsetType = BinlogOffsetType.valueOf(rawOffsetType);
6362
switch (offsetType) {
6463
case LATEST:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.cdc.source.coordinator;
18+
19+
import com.bytedance.bitsail.base.connector.reader.v1.SourceEvent;
20+
import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator;
21+
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
22+
import com.bytedance.bitsail.connector.cdc.source.coordinator.state.BaseAssignmentState;
23+
import com.bytedance.bitsail.connector.cdc.source.coordinator.state.BinlogAssignmentState;
24+
import com.bytedance.bitsail.connector.cdc.source.offset.BinlogOffsetType;
25+
import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit;
26+
27+
import org.junit.Assert;
28+
import org.junit.Test;
29+
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.Set;
34+
import java.util.concurrent.Callable;
35+
import java.util.function.BiConsumer;
36+
37+
public class CDCSourceSplitCoordinatorTests {
38+
@Test
39+
public void testStartNew() {
40+
BitSailConfiguration jobConf = BitSailConfiguration.newDefault();
41+
Map<Integer, List<BinlogSplit>> assigned = new HashMap<Integer, List<BinlogSplit>>();
42+
SourceSplitCoordinator.Context context = getContext(false, null, assigned);
43+
CDCSourceSplitCoordinator coordinator = new CDCSourceSplitCoordinator(context, jobConf);
44+
coordinator.start();
45+
Assert.assertEquals(1, assigned.get(0).size());
46+
Assert.assertEquals(BinlogOffsetType.EARLIEST, assigned.get(0).get(0).getBeginOffset().getOffsetType());
47+
BinlogAssignmentState state = (BinlogAssignmentState) coordinator.snapshotState();
48+
Assert.assertTrue(state.isAssigned());
49+
}
50+
51+
@Test
52+
public void testRestore() {
53+
BitSailConfiguration jobConf = BitSailConfiguration.newDefault();
54+
Map<Integer, List<BinlogSplit>> assigned = new HashMap<Integer, List<BinlogSplit>>();
55+
BinlogAssignmentState state = new BinlogAssignmentState(true);
56+
SourceSplitCoordinator.Context context = getContext(true, state, assigned);
57+
CDCSourceSplitCoordinator coordinator = new CDCSourceSplitCoordinator(context, jobConf);
58+
coordinator.start();
59+
// no split was assigned
60+
Assert.assertEquals(0, assigned.size());
61+
BinlogAssignmentState newState = (BinlogAssignmentState) coordinator.snapshotState();
62+
Assert.assertTrue(newState.isAssigned());
63+
}
64+
65+
SourceSplitCoordinator.Context<BinlogSplit, BaseAssignmentState> getContext(
66+
boolean isRestore, BaseAssignmentState restoredState, Map<Integer, List<BinlogSplit>> assigned) {
67+
SourceSplitCoordinator.Context<BinlogSplit, BaseAssignmentState> context = new SourceSplitCoordinator.Context<BinlogSplit, BaseAssignmentState>() {
68+
@Override
69+
public boolean isRestored() {
70+
return isRestore;
71+
}
72+
73+
@Override
74+
public BaseAssignmentState getRestoreState() {
75+
return restoredState;
76+
}
77+
78+
@Override
79+
public int totalParallelism() {
80+
return 1;
81+
}
82+
83+
@Override
84+
public Set<Integer> registeredReaders() {
85+
return null;
86+
}
87+
88+
@Override
89+
public void assignSplit(int subtaskId, List splits) {
90+
assigned.put(subtaskId, splits);
91+
}
92+
93+
@Override
94+
public void signalNoMoreSplits(int subtask) {
95+
96+
}
97+
98+
@Override
99+
public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
100+
101+
}
102+
103+
@Override
104+
public void runAsync(Callable callable, BiConsumer handler, int initialDelay, long interval) {
105+
106+
}
107+
108+
@Override
109+
public void runAsyncOnce(Callable callable, BiConsumer handler) {
110+
111+
}
112+
};
113+
return context;
114+
}
115+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.bytedance.bitsail.connector.cdc.mysql.source.reader;
18+
19+
import org.junit.Test;
20+
21+
public class MysqlBinlogSourceReaderTest {
22+
@Test
23+
public void testReader() {
24+
25+
}
26+
}

0 commit comments

Comments
 (0)