Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tests][ci] Miscellaneous improvements on CI robustness #3911

Merged
merged 2 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/flink_cdc_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 120
strategy:
fail-fast: false
matrix:
java-version: ${{ fromJSON(inputs.java-version) }}
flink-version: ${{ fromJSON(inputs.flink-version) }}
Expand Down Expand Up @@ -148,7 +149,7 @@ jobs:
maven-version: 3.8.6

- name: Compile and test
timeout-minutes: 90
timeout-minutes: 60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[INFO] Running org.apache.flink.cdc.connectors.polardbx.PolardbxCharsetITCase
Error: The action 'Compile and test' has timed out after 60 minutes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PolardbxCharsetITCase was started at ~16:27 and timed out eventually at 16:49. Locally this case wouldn't take more than 3 minutes to finish, so it might be a bug that should be closed by #3902.

run: |
set -o pipefail

Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/flink_cdc_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ jobs:
run: gem install rubyzip -v 2.3.0 && ./tools/ci/license_check.rb
ut:
strategy:
fail-fast: false
matrix:
module: [ 'core', 'pipeline_connectors', 'mysql', 'postgres', 'oracle', 'mongodb6', 'mongodb7', 'sqlserver', 'tidb', 'oceanbase', 'db2', 'vitess' ]
name: Unit Tests
Expand All @@ -68,6 +69,7 @@ jobs:
module: ${{ matrix.module }}
pipeline_e2e:
strategy:
fail-fast: false
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests (${{ matrix.parallelism }}-Parallelism)
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/flink_cdc_ci_nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ jobs:
run: gem install rubyzip -v 2.3.0 && ./tools/ci/license_check.rb
ut:
strategy:
fail-fast: false
matrix:
module: [ 'core', 'pipeline_connectors', 'mysql', 'postgres', 'oracle', 'mongodb6', 'mongodb7', 'sqlserver', 'tidb', 'oceanbase', 'db2', 'vitess' ]
name: Unit Tests
Expand All @@ -57,6 +58,7 @@ jobs:
module: ${{ matrix.module }}
pipeline_e2e:
strategy:
fail-fast: false
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests (${{ matrix.parallelism }} Parallelism)
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/flink_cdc_migration_test_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
migration_test_ut:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
java-version: ${{ fromJSON(inputs.java-version) }}
flink-version: ${{ fromJSON(inputs.flink-version) }}
Expand All @@ -52,6 +53,7 @@ jobs:
pipeline_migration_test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
java-version: ${{ fromJSON(inputs.java-version) }}
flink-version: ${{ fromJSON(inputs.flink-version) }}
Expand Down Expand Up @@ -91,6 +93,7 @@ jobs:
data_stream_migration_test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
java-version: ${{ fromJSON(inputs.java-version) }}
flink-version: [ '1.19.1', '1.20.0' ]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,6 @@ limitations under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
<scope>test</scope>
</dependency>

<!-- test dependencies on Flink -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.assertj.core.api.Assertions;
import org.junit.Test;

import java.io.IOException;
Expand All @@ -46,8 +49,6 @@
import java.util.Map;
import java.util.Objects;

import static org.junit.Assert.assertTrue;

/** Integration tests for the legacy {@link MySqlSource}. */
public class LegacyMySqlSourceITCase extends LegacyMySqlTestBase {

Expand Down Expand Up @@ -99,9 +100,9 @@ private void testConsumingAllEventsWithJsonFormat(
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());

final JSONObject expected =
JSONObject.parseObject(readLines(expectedFile), JSONObject.class);
JSONObject expectSnapshot = expected.getJSONObject("expected_snapshot");
final JsonNode expected =
new ObjectMapper().readValue(readLines(expectedFile), JsonNode.class);
JsonNode expectSnapshot = expected.get("expected_snapshot");

DataStreamSource<String> source = env.addSource(sourceFunction);
tEnv.createTemporaryView("full_types", source);
Expand All @@ -110,9 +111,8 @@ private void testConsumingAllEventsWithJsonFormat(
// check the snapshot result
CloseableIterator<Row> snapshot = result.collect();
waitForSnapshotStarted(snapshot);
assertTrue(
dataInJsonIsEquals(
fetchRows(snapshot, 1).get(0).toString(), expectSnapshot.toString()));

assertJsonEquals(extractJsonBody(snapshot.next()), expectSnapshot);
try (Connection connection = fullTypesDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
Expand All @@ -121,10 +121,8 @@ private void testConsumingAllEventsWithJsonFormat(

// check the binlog result
CloseableIterator<Row> binlog = result.collect();
JSONObject expectBinlog = expected.getJSONObject("expected_binlog");
assertTrue(
dataInJsonIsEquals(
fetchRows(binlog, 1).get(0).toString(), expectBinlog.toString()));
JsonNode expectBinlog = expected.get("expected_binlog");
assertJsonEquals(extractJsonBody(binlog.next()), expectBinlog);
result.getJobClient().get().cancel().get();
}

Expand Down Expand Up @@ -164,25 +162,23 @@ private static byte[] readLines(String resource) throws IOException, URISyntaxEx
return Files.readAllBytes(path);
}

private static boolean dataInJsonIsEquals(String actual, String expect) {
JSONObject actualJsonObject = JSONObject.parseObject(actual);
JSONObject expectJsonObject = JSONObject.parseObject(expect);

if (expectJsonObject.getJSONObject("payload") != null
&& actualJsonObject.getJSONObject("payload") != null) {
expectJsonObject = expectJsonObject.getJSONObject("payload");
actualJsonObject = actualJsonObject.getJSONObject("payload");
private static void assertJsonEquals(JsonNode actual, JsonNode expect) throws Exception {
if (actual.get("payload") != null && expect.get("payload") != null) {
actual = actual.get("payload");
expect = expect.get("payload");
}
return jsonObjectEquals(
expectJsonObject.getJSONObject("after"),
actualJsonObject.getJSONObject("after"))
&& jsonObjectEquals(
expectJsonObject.getJSONObject("before"),
actualJsonObject.getJSONObject("before"))
&& Objects.equals(expectJsonObject.get("op"), actualJsonObject.get("op"));
Assertions.assertThat(actual.get("after")).isEqualTo(expect.get("after"));
Assertions.assertThat(actual.get("before")).isEqualTo(expect.get("before"));
Assertions.assertThat(actual.get("op")).isEqualTo(expect.get("op"));
}

private static boolean jsonObjectEquals(JSONObject a, JSONObject b) {
return (a == b) || (a != null && a.toString().equals(b.toString()));
private static JsonNode extractJsonBody(Row row) {
try {
String body = row.toString();
return new ObjectMapper()
.readValue(body.substring(3, body.length() - 1), JsonNode.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("Invalid JSON format.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ public void testCharset() throws Exception {
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
testName,
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void testSingleKey() throws Exception {
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ ")",
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
Expand Down Expand Up @@ -234,8 +234,8 @@ public void testFullTypesDdl() throws Exception {
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ ")",
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
Expand Down Expand Up @@ -301,8 +301,8 @@ public void testMultiKeys() throws Exception {
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ ")",
HOST_NAME,
PORT,
getHost(),
getPort(),
USER_NAME,
PASSWORD,
DATABASE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.flink.cdc.connectors.polardbx;

import org.apache.flink.cdc.common.utils.TestCaseUtils;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;

import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -64,36 +62,37 @@
public abstract class PolardbxSourceTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PolardbxSourceTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
protected static final Integer PORT = 8527;
protected static final String HOST_NAME = "127.0.0.1";
protected static final String USER_NAME = "polardbx_root";
protected static final String PASSWORD = "123456";

private static final String IMAGE_VERSION = "2.1.0";
private static final DockerImageName POLARDBX_IMAGE =
DockerImageName.parse("polardbx/polardb-x:" + IMAGE_VERSION);

protected static final Integer INNER_PORT = 8527;
protected static final String USER_NAME = "polardbx_root";
protected static final String PASSWORD = "123456";
protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(1);

protected static final GenericContainer POLARDBX_CONTAINER =
new GenericContainer<>(POLARDBX_IMAGE)
.withExposedPorts(PORT)
.withExposedPorts(INNER_PORT)
.withLogConsumer(new Slf4jLogConsumer(LOG))
.withStartupTimeout(Duration.ofMinutes(3))
.withCreateContainerCmdModifier(
c ->
c.withPortBindings(
new PortBinding(
Ports.Binding.bindPort(PORT),
new ExposedPort(PORT))));
.withStartupTimeout(Duration.ofMinutes(3));

protected static String getHost() {
return POLARDBX_CONTAINER.getHost();
}

protected static int getPort() {
return POLARDBX_CONTAINER.getMappedPort(INNER_PORT);
}

@BeforeClass
public static void startContainers() throws InterruptedException {
// no need to start container when the port 8527 is listening
if (!checkConnection()) {
LOG.info("Polardbx connection is not valid, so try to start containers...");
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
LOG.info("Containers are started.");
// here should wait 10s that make sure the polardbx is ready
Thread.sleep(10 * 1000);
}
public static void startContainers() {
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
LOG.info("Containers are started.");

TestCaseUtils.repeatedCheck(
PolardbxSourceTestBase::checkConnection, WAITING_TIMEOUT, Duration.ofSeconds(1));
}

@AfterClass
Expand All @@ -104,7 +103,7 @@ public static void stopContainers() {
}

protected static String getJdbcUrl() {
return String.format("jdbc:mysql://%s:%s", HOST_NAME, PORT);
return String.format("jdbc:mysql://%s:%s", getHost(), getPort());
}

protected static Connection getJdbcConnection() throws SQLException {
Expand Down
Loading
Loading