From 35f3cd02dc985a0b4e5925a54bc3fe178a4f51ac Mon Sep 17 00:00:00 2001 From: Jan Kevin Dick Date: Sun, 24 Sep 2023 10:25:01 +0200 Subject: [PATCH] Fixed a Problem in the Exception Handling of the Close Method. This Problem leads to Connection Leaks if some problem occurs. --- .../jdbc/internal/JdbcOutputFormat.java | 2 +- .../flink/connector/jdbc/JdbcTestFixture.java | 3 ++ .../internal/JdbcTableOutputFormatTest.java | 30 +++++++++++++++++-- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java index 1ca8b635..5676a8cb 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java @@ -262,7 +262,7 @@ public synchronized void close() { flush(); } catch (Exception e) { LOG.warn("Writing records to JDBC failed.", e); - throw new RuntimeException("Writing records to JDBC failed.", e); + flushException = e; } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java index 4c7519dd..07db44a0 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestFixture.java @@ -44,6 +44,7 @@ public class JdbcTestFixture implements DerbyTestBase { public static final String OUTPUT_TABLE = "newbooks"; public static final String OUTPUT_TABLE_2 = "newbooks2"; public static final String OUTPUT_TABLE_3 = "newbooks3"; + public static final String OUTPUT_TABLE_4 = "newbooks4"; public static final String WORDS_TABLE = "words"; public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE; public static final String SELECT_ID_BOOKS = "select id from " + INPUT_TABLE; @@ -189,6 +190,7 @@ public static void initSchema(DatabaseMetadata metadata) throws SQLException { createTable(conn, OUTPUT_TABLE); createTable(conn, OUTPUT_TABLE_2); createTable(conn, OUTPUT_TABLE_3); + createTable(conn, OUTPUT_TABLE_4); createWordsTable(conn); } } @@ -232,6 +234,7 @@ public static void cleanUpDatabasesStatic(DatabaseMetadata dbMetadata) throws SQ stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE); stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_2); stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_3); + stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_4); stat.executeUpdate("DROP TABLE " + WORDS_TABLE); } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java index d0caddf0..c682a69f 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.jdbc.JdbcDataTestBase; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcRowOutputFormat; import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; @@ -34,6 +35,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -44,9 +46,7 @@ import java.util.Arrays; import java.util.List; -import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE; -import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA; -import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry; +import static org.apache.flink.connector.jdbc.JdbcTestFixture.*; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doReturn; @@ -221,6 +221,30 @@ void testJdbcOutputFormat() throws Exception { check(expected); } + @Test + public void testExceptionOnFlush() { + JdbcRowOutputFormat jdbcOutputFormat = + JdbcRowOutputFormat.buildJdbcOutputFormat() + .setDrivername(getMetadata().getDriverClass()) + .setDBUrl(getMetadata().getJdbcUrl()) + .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE_4)) + .setBatchSize(2) + .finish(); + setRuntimeContext(jdbcOutputFormat, true); + try { + jdbcOutputFormat.open(0, 1); + + jdbcOutputFormat.writeRecord(toRow(TEST_DATA[1])); + jdbcOutputFormat.writeRecord(toRow(TEST_DATA[1])); + } catch (IOException e) { + try { + jdbcOutputFormat.close(); + } catch (Exception e1) { + assertThat(jdbcOutputFormat.getConnection()).isEqualTo(null); + } + } + } + private void check(Row[] rows) throws SQLException { check(rows, getMetadata().getJdbcUrl(), OUTPUT_TABLE, fieldNames); }