Skip to content

Commit

Permalink
[tests][ci] Miscellaneous improvements on CI robustness
Browse files Browse the repository at this point in the history
This closes #3911
  • Loading branch information
yuxiqian authored Feb 11, 2025
1 parent 6686bcb commit 82bf8a0
Show file tree
Hide file tree
Showing 23 changed files with 504 additions and 240 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/flink_cdc_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 120
strategy:
fail-fast: false
matrix:
java-version: ${{ fromJSON(inputs.java-version) }}
flink-version: ${{ fromJSON(inputs.flink-version) }}
Expand Down Expand Up @@ -148,7 +149,7 @@ jobs:
maven-version: 3.8.6

- name: Compile and test
timeout-minutes: 90
timeout-minutes: 60
run: |
set -o pipefail
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/flink_cdc_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ jobs:
run: gem install rubyzip -v 2.3.0 && ./tools/ci/license_check.rb
ut:
strategy:
fail-fast: false
matrix:
module: [ 'core', 'pipeline_connectors', 'mysql', 'postgres', 'oracle', 'mongodb6', 'mongodb7', 'sqlserver', 'tidb', 'oceanbase', 'db2', 'vitess' ]
name: Unit Tests
Expand All @@ -68,6 +69,7 @@ jobs:
module: ${{ matrix.module }}
pipeline_e2e:
strategy:
fail-fast: false
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests (${{ matrix.parallelism }}-Parallelism)
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/flink_cdc_ci_nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ jobs:
run: gem install rubyzip -v 2.3.0 && ./tools/ci/license_check.rb
ut:
strategy:
fail-fast: false
matrix:
module: [ 'core', 'pipeline_connectors', 'mysql', 'postgres', 'oracle', 'mongodb6', 'mongodb7', 'sqlserver', 'tidb', 'oceanbase', 'db2', 'vitess' ]
name: Unit Tests
Expand All @@ -57,6 +58,7 @@ jobs:
module: ${{ matrix.module }}
pipeline_e2e:
strategy:
fail-fast: false
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests (${{ matrix.parallelism }} Parallelism)
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/flink_cdc_migration_test_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
migration_test_ut:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
java-version: ${{ fromJSON(inputs.java-version) }}
flink-version: ${{ fromJSON(inputs.flink-version) }}
Expand All @@ -52,6 +53,7 @@ jobs:
pipeline_migration_test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
java-version: ${{ fromJSON(inputs.java-version) }}
flink-version: ${{ fromJSON(inputs.flink-version) }}
Expand Down Expand Up @@ -91,6 +93,7 @@ jobs:
data_stream_migration_test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
java-version: ${{ fromJSON(inputs.java-version) }}
flink-version: [ '1.19.1', '1.20.0' ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,6 @@ limitations under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
<scope>test</scope>
</dependency>

<!-- test dependencies on Flink -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.assertj.core.api.Assertions;
import org.junit.Test;

import java.io.IOException;
Expand All @@ -46,8 +49,6 @@
import java.util.Map;
import java.util.Objects;

import static org.junit.Assert.assertTrue;

/** Integration tests for the legacy {@link MySqlSource}. */
public class LegacyMySqlSourceITCase extends LegacyMySqlTestBase {

Expand Down Expand Up @@ -99,9 +100,9 @@ private void testConsumingAllEventsWithJsonFormat(
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());

final JSONObject expected =
JSONObject.parseObject(readLines(expectedFile), JSONObject.class);
JSONObject expectSnapshot = expected.getJSONObject("expected_snapshot");
final JsonNode expected =
new ObjectMapper().readValue(readLines(expectedFile), JsonNode.class);
JsonNode expectSnapshot = expected.get("expected_snapshot");

DataStreamSource<String> source = env.addSource(sourceFunction);
tEnv.createTemporaryView("full_types", source);
Expand All @@ -110,9 +111,8 @@ private void testConsumingAllEventsWithJsonFormat(
// check the snapshot result
CloseableIterator<Row> snapshot = result.collect();
waitForSnapshotStarted(snapshot);
assertTrue(
dataInJsonIsEquals(
fetchRows(snapshot, 1).get(0).toString(), expectSnapshot.toString()));

assertJsonEquals(extractJsonBody(snapshot.next()), expectSnapshot);
try (Connection connection = fullTypesDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
Expand All @@ -121,10 +121,8 @@ private void testConsumingAllEventsWithJsonFormat(

// check the binlog result
CloseableIterator<Row> binlog = result.collect();
JSONObject expectBinlog = expected.getJSONObject("expected_binlog");
assertTrue(
dataInJsonIsEquals(
fetchRows(binlog, 1).get(0).toString(), expectBinlog.toString()));
JsonNode expectBinlog = expected.get("expected_binlog");
assertJsonEquals(extractJsonBody(binlog.next()), expectBinlog);
result.getJobClient().get().cancel().get();
}

Expand Down Expand Up @@ -164,25 +162,23 @@ private static byte[] readLines(String resource) throws IOException, URISyntaxEx
return Files.readAllBytes(path);
}

private static boolean dataInJsonIsEquals(String actual, String expect) {
JSONObject actualJsonObject = JSONObject.parseObject(actual);
JSONObject expectJsonObject = JSONObject.parseObject(expect);

if (expectJsonObject.getJSONObject("payload") != null
&& actualJsonObject.getJSONObject("payload") != null) {
expectJsonObject = expectJsonObject.getJSONObject("payload");
actualJsonObject = actualJsonObject.getJSONObject("payload");
private static void assertJsonEquals(JsonNode actual, JsonNode expect) throws Exception {
if (actual.get("payload") != null && expect.get("payload") != null) {
actual = actual.get("payload");
expect = expect.get("payload");
}
return jsonObjectEquals(
expectJsonObject.getJSONObject("after"),
actualJsonObject.getJSONObject("after"))
&& jsonObjectEquals(
expectJsonObject.getJSONObject("before"),
actualJsonObject.getJSONObject("before"))
&& Objects.equals(expectJsonObject.get("op"), actualJsonObject.get("op"));
Assertions.assertThat(actual.get("after")).isEqualTo(expect.get("after"));
Assertions.assertThat(actual.get("before")).isEqualTo(expect.get("before"));
Assertions.assertThat(actual.get("op")).isEqualTo(expect.get("op"));
}

private static boolean jsonObjectEquals(JSONObject a, JSONObject b) {
return (a == b) || (a != null && a.toString().equals(b.toString()));
private static JsonNode extractJsonBody(Row row) {
try {
String body = row.toString();
return new ObjectMapper()
.readValue(body.substring(3, body.length() - 1), JsonNode.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("Invalid JSON format.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ public void testCharset() throws Exception {
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
testName,
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void testSingleKey() throws Exception {
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ ")",
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
Expand Down Expand Up @@ -234,8 +234,8 @@ public void testFullTypesDdl() throws Exception {
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ ")",
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
Expand Down Expand Up @@ -301,8 +301,8 @@ public void testMultiKeys() throws Exception {
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ ")",
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.flink.cdc.connectors.polardbx;

import org.apache.flink.cdc.common.utils.TestCaseUtils;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;

import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -64,36 +62,37 @@
public abstract class PolardbxSourceTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PolardbxSourceTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
protected static final Integer PORT = 8527;
protected static final String HOST_NAME = "127.0.0.1";
protected static final String USER_NAME = "polardbx_root";
protected static final String PASSWORD = "123456";

private static final String IMAGE_VERSION = "2.1.0";
private static final DockerImageName POLARDBX_IMAGE =
DockerImageName.parse("polardbx/polardb-x:" + IMAGE_VERSION);

protected static final Integer INNER_PORT = 8527;
protected static final String USER_NAME = "polardbx_root";
protected static final String PASSWORD = "123456";
protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(1);

protected static final GenericContainer POLARDBX_CONTAINER =
new GenericContainer<>(POLARDBX_IMAGE)
.withExposedPorts(PORT)
.withExposedPorts(INNER_PORT)
.withLogConsumer(new Slf4jLogConsumer(LOG))
.withStartupTimeout(Duration.ofMinutes(3))
.withCreateContainerCmdModifier(
c ->
c.withPortBindings(
new PortBinding(
Ports.Binding.bindPort(PORT),
new ExposedPort(PORT))));
.withStartupTimeout(Duration.ofMinutes(3));

protected static String getHost() {
return POLARDBX_CONTAINER.getHost();
}

protected static int getPort() {
return POLARDBX_CONTAINER.getMappedPort(INNER_PORT);
}

@BeforeClass
public static void startContainers() throws InterruptedException {
// no need to start container when the port 8527 is listening
if (!checkConnection()) {
LOG.info("Polardbx connection is not valid, so try to start containers...");
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
LOG.info("Containers are started.");
// here should wait 10s that make sure the polardbx is ready
Thread.sleep(10 * 1000);
}
public static void startContainers() {
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
LOG.info("Containers are started.");

TestCaseUtils.repeatedCheck(
PolardbxSourceTestBase::checkConnection, WAITING_TIMEOUT, Duration.ofSeconds(1));
}

@AfterClass
Expand All @@ -104,7 +103,7 @@ public static void stopContainers() {
}

protected static String getJdbcUrl() {
return String.format("jdbc:mysql://%s:%s", HOST_NAME, PORT);
return String.format("jdbc:mysql://%s:%s", getHost(), getPort());
}

protected static Connection getJdbcConnection() throws SQLException {
Expand Down
Loading

0 comments on commit 82bf8a0

Please sign in to comment.