Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wangqinghuan committed Jan 29, 2025
1 parent 44bff3a commit cc79b4d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
9 changes: 5 additions & 4 deletions flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,13 @@ limitations under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
<scope>test</scope>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
Expand Down Expand Up @@ -55,6 +53,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -97,8 +96,7 @@ public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
@Parameterized.Parameters(name = "flinkVersion: {0}, elasticsearchVersion: {1}")
public static Collection<Object[]> getTestParameters() {
List<Object[]> parameters = new ArrayList<>();

for (String flinkVersion : Arrays.asList("1.19.1", "1.20.0")) {
for (String flinkVersion : getFlinkVersion()) {
parameters.add(new Object[] {flinkVersion, "6.8.20"});
parameters.add(new Object[] {flinkVersion, "7.10.2"});
parameters.add(new Object[] {flinkVersion, "8.12.1"});
Expand All @@ -117,8 +115,7 @@ public void before() throws Exception {
.withNetworkAliases("elasticsearch")
.withLogConsumer(new Slf4jLogConsumer(LOG));

Startables.deepStart(Stream.of(MYSQL)).join();
Startables.deepStart(Stream.of(elasticsearchContainer)).join();
Startables.deepStart(Stream.of(MYSQL, elasticsearchContainer)).join();
}

@After
Expand Down Expand Up @@ -159,12 +156,12 @@ public void testSyncWholeDatabase() throws Exception {
+ " record.size.max.bytes: 5242880\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
databaseName,
elasticsearchVersion.split("\\.")[0] // Use major version number
);
elasticsearchVersion.split("\\.")[0],
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path elasticsearchCdcConnector =
TestUtils.getResource("elasticsearch-cdc-pipeline-connector.jar");
Expand Down Expand Up @@ -336,15 +333,17 @@ private List<String> fetchTableContent(String indexName, List<String> columns)
Thread.sleep(1000);
throw new Exception();
}
JSONObject jsonObject = JSON.parseObject(responseBody);
JSONObject hits = jsonObject.getJSONObject("hits");
JSONArray jsonArray = hits.getJSONArray("hits");
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject json = jsonArray.getJSONObject(i);
JSONObject sourceObject = json.getJSONObject("_source");
ObjectMapper objectMapper = new ObjectMapper();
Map responseMap = objectMapper.readValue(responseBody, Map.class);
Map<String, Object> hitsMap = (Map<String, Object>) responseMap.get("hits");
List<Map<String, Object>> hitsArr = (List<Map<String, Object>>) hitsMap.get("hits");

for (int i = 0; i < hitsArr.size(); i++) {
Map<String, Object> hit = hitsArr.get(i);
Map<String, Object> source = (Map<String, Object>) hit.get("_source");
List<String> doc = new ArrayList<>();
for (String column : columns) {
doc.add(String.valueOf(sourceObject.get(column)));
doc.add(String.valueOf(source.get(column)));
}
results.add(String.join(" | ", doc));
}
Expand Down

0 comments on commit cc79b4d

Please sign in to comment.