diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java index 7ba0c06d..30d9ae25 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java @@ -71,6 +71,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.function.Function; import java.util.function.Predicate; import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD; @@ -92,6 +93,7 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog { protected final String pwd; protected final String baseUrl; protected final String defaultUrl; + protected final Function urlFunction; public AbstractJdbcCatalog( ClassLoader userClassLoader, @@ -107,13 +109,14 @@ public AbstractJdbcCatalog( checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd)); checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl)); - JdbcCatalogUtils.validateJdbcUrl(baseUrl); + JdbcCatalogUtils.validateJdbcUrl(baseUrl, defaultDatabase); this.userClassLoader = userClassLoader; this.username = username; this.pwd = pwd; + this.urlFunction = calculateUrlFunction(baseUrl); this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; - this.defaultUrl = this.baseUrl + defaultDatabase; + this.defaultUrl = this.urlFunction.apply(defaultDatabase); } @Override @@ -246,7 +249,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) } String databaseName = tablePath.getDatabaseName(); - String dbUrl = baseUrl + databaseName; + String dbUrl = urlFunction.apply(databaseName); try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { DatabaseMetaData metaData = conn.getMetaData(); @@ -545,4 +548,19 @@ protected String getSchemaName(ObjectPath tablePath) { protected String getSchemaTableName(ObjectPath tablePath) { throw new UnsupportedOperationException(); } + + private Function calculateUrlFunction(String baseUrl) { + final String prefix; + final int questionMarkIndex = baseUrl.indexOf('?'); + if (questionMarkIndex == -1) { + prefix = baseUrl; + } else { + String withoutParams = baseUrl.substring(0, questionMarkIndex); + prefix = withoutParams.substring(0, withoutParams.lastIndexOf('/') + 1); + } + if (questionMarkIndex == -1) { + return dbName -> prefix + "/" + dbName; + } + return dbName -> prefix + "/" + dbName + "/" + baseUrl.substring(questionMarkIndex); + } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java index 84ac0abd..e2633784 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java @@ -27,18 +27,30 @@ import org.apache.flink.connector.jdbc.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader; +import java.util.Objects; + import static org.apache.flink.util.Preconditions.checkArgument; /** Utils for {@link JdbcCatalog}. */ public class JdbcCatalogUtils { /** - * URL has to be without database, like "jdbc:postgresql://localhost:5432/" or - * "jdbc:postgresql://localhost:5432" rather than "jdbc:postgresql://localhost:5432/db". + * URL has to be either without database, like "jdbc:postgresql://localhost:5432/" or + * "jdbc:postgresql://localhost:5432" rather than "jdbc:postgresql://localhost:5432/db" or with + * same database name as at @param databaseName. */ - public static void validateJdbcUrl(String url) { - String[] parts = url.trim().split("\\/+"); - - checkArgument(parts.length == 2); + public static void validateJdbcUrl(String url, String databaseName) { + String trimmedUrl = url.trim(); + String[] parts = trimmedUrl.split("\\/+", 3); + int questionMark = trimmedUrl.indexOf('?'); + if (questionMark == -1) { + checkArgument(parts.length == 2 || parts.length == 3 && parts[2].isEmpty()); + } else { + checkArgument(parts.length > 2); + questionMark = parts[2].indexOf('?'); + checkArgument( + questionMark > -1 + && Objects.equals(parts[2].substring(0, questionMark), databaseName)); + } } /** Create catalog instance from given information. */ diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java index 04ed2a4f..ec16bc67 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java @@ -116,7 +116,8 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException { } String searchPath = - extractColumnValuesBySQL(baseUrl + DEFAULT_DATABASE, "show search_path", 1, null) + extractColumnValuesBySQL( + urlFunction.apply(DEFAULT_DATABASE), "show search_path", 1, null) .get(0); String[] schemas = searchPath.split("\\s*,\\s*"); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalog.java index b54aa23d..6cace64d 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/mysql/catalog/MySqlCatalog.java @@ -99,7 +99,7 @@ public List listTables(String databaseName) } return extractColumnValuesBySQL( - baseUrl + databaseName, + urlFunction.apply(databaseName), "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?", 1, null, diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java index 7ece9b64..5862da73 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java @@ -152,7 +152,7 @@ public List listTables(String databaseName) throw new DatabaseNotExistException(getName(), databaseName); } - final String url = baseUrl + databaseName; + final String url = urlFunction.apply(databaseName); try (Connection conn = DriverManager.getConnection(url, username, pwd)) { // get all schemas List schemas; diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java index 23407143..07f8a887 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java @@ -27,9 +27,12 @@ class JdbcCatalogUtilsTest { @Test void testJdbcUrl() { - JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432/"); + JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432/", null); - JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432"); + JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432", null); + JdbcCatalogUtils.validateJdbcUrl( + "jdbc:postgres://demo-postgresql.example.com:18025/defaultdb?sslmode=require", + "defaultdb"); } @Test @@ -37,7 +40,7 @@ void testInvalidJdbcUrl() { assertThatThrownBy( () -> JdbcCatalogUtils.validateJdbcUrl( - "jdbc:postgresql://localhost:5432/db")) + "jdbc:postgresql://localhost:5432/db", null)) .isInstanceOf(IllegalArgumentException.class); } }