Skip to content

Commit 4e14b3f

Browse files
fapaulAHeise
authored andcommitted
[FLINK-23322][connectors/rabbitmq] Increase RMQSource handshake timeout tolerating network congestions
1 parent be67258 commit 4e14b3f

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceITCase.java

+9
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@
4242
import org.junit.Rule;
4343
import org.junit.Test;
4444
import org.junit.rules.TemporaryFolder;
45+
import org.slf4j.Logger;
46+
import org.slf4j.LoggerFactory;
4547
import org.testcontainers.containers.RabbitMQContainer;
48+
import org.testcontainers.containers.output.Slf4jLogConsumer;
4649
import org.testcontainers.containers.wait.strategy.Wait;
4750
import org.testcontainers.utility.DockerImageName;
4851

@@ -53,6 +56,10 @@
5356
/** A class containing RabbitMQ source tests against a real RabbiMQ cluster. */
5457
public class RMQSourceITCase {
5558

59+
private static final Logger LOG = LoggerFactory.getLogger(RMQSourceITCase.class);
60+
private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
61+
62+
private static final int HANDSHAKE_TIMEOUT = 30000;
5663
private static final int RABBITMQ_PORT = 5672;
5764
private static final String QUEUE_NAME = "test-queue";
5865
private static final JobID JOB_ID = new JobID();
@@ -75,6 +82,7 @@ public class RMQSourceITCase {
7582
new RabbitMQContainer(
7683
DockerImageName.parse("rabbitmq").withTag("3.7.25-management-alpine"))
7784
.withExposedPorts(RABBITMQ_PORT)
85+
.withLogConsumer(LOG_CONSUMER)
7886
.waitingFor(Wait.forListeningPort());
7987

8088
@Before
@@ -127,6 +135,7 @@ private static Connection getRMQConnection() throws IOException, TimeoutExceptio
127135
ConnectionFactory factory = new ConnectionFactory();
128136
factory.setUsername(RMQ_CONTAINER.getAdminUsername());
129137
factory.setPassword(RMQ_CONTAINER.getAdminPassword());
138+
factory.setHandshakeTimeout(HANDSHAKE_TIMEOUT);
130139
factory.setVirtualHost("/");
131140
factory.setHost(RMQ_CONTAINER.getHost());
132141
factory.setPort(RMQ_CONTAINER.getAmqpPort());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
# Set root logger level to OFF to not flood build logs
20+
# set manually to INFO for debugging purposes
21+
rootLogger.level = OFF
22+
rootLogger.appenderRef.test.ref = TestLogger
23+
24+
appender.testlogger.name = TestLogger
25+
appender.testlogger.type = CONSOLE
26+
appender.testlogger.target = SYSTEM_ERR
27+
appender.testlogger.layout.type = PatternLayout
28+
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n

0 commit comments

Comments
 (0)