Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33800][JDBC/Connector] Allow passing parameters to database via jdbc url #83

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;

import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
Expand All @@ -92,6 +93,7 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
protected final String pwd;
protected final String baseUrl;
protected final String defaultUrl;
protected final Function<String, String> urlFunction;

public AbstractJdbcCatalog(
ClassLoader userClassLoader,
Expand All @@ -107,13 +109,14 @@ public AbstractJdbcCatalog(
checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));

JdbcCatalogUtils.validateJdbcUrl(baseUrl);
JdbcCatalogUtils.validateJdbcUrl(baseUrl, defaultDatabase);

this.userClassLoader = userClassLoader;
this.username = username;
this.pwd = pwd;
this.urlFunction = calculateUrlFunction(baseUrl);
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
this.defaultUrl = this.baseUrl + defaultDatabase;
this.defaultUrl = this.urlFunction.apply(defaultDatabase);
}

@Override
Expand Down Expand Up @@ -246,7 +249,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
}

String databaseName = tablePath.getDatabaseName();
String dbUrl = baseUrl + databaseName;
String dbUrl = urlFunction.apply(databaseName);

try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
DatabaseMetaData metaData = conn.getMetaData();
Expand Down Expand Up @@ -545,4 +548,19 @@ protected String getSchemaName(ObjectPath tablePath) {
protected String getSchemaTableName(ObjectPath tablePath) {
throw new UnsupportedOperationException();
}

private Function<String, String> calculateUrlFunction(String baseUrl) {
final String prefix;
final int questionMarkIndex = baseUrl.indexOf('?');
if (questionMarkIndex == -1) {
prefix = baseUrl;
} else {
String withoutParams = baseUrl.substring(0, questionMarkIndex);
prefix = withoutParams.substring(0, withoutParams.lastIndexOf('/') + 1);
}
if (questionMarkIndex == -1) {
return dbName -> prefix + "/" + dbName;
}
return dbName -> prefix + "/" + dbName + "/" + baseUrl.substring(questionMarkIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,30 @@
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;

import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkArgument;

/** Utils for {@link JdbcCatalog}. */
public class JdbcCatalogUtils {
/**
* URL has to be without database, like "jdbc:postgresql://localhost:5432/" or
* "jdbc:postgresql://localhost:5432" rather than "jdbc:postgresql://localhost:5432/db".
* URL has to be either without database, like "jdbc:postgresql://localhost:5432/" or
* "jdbc:postgresql://localhost:5432" rather than "jdbc:postgresql://localhost:5432/db" or with
* same database name as at @param databaseName.
*/
public static void validateJdbcUrl(String url) {
String[] parts = url.trim().split("\\/+");

checkArgument(parts.length == 2);
public static void validateJdbcUrl(String url, String databaseName) {
String trimmedUrl = url.trim();
String[] parts = trimmedUrl.split("\\/+", 3);
int questionMark = trimmedUrl.indexOf('?');
if (questionMark == -1) {
checkArgument(parts.length == 2 || parts.length == 3 && parts[2].isEmpty());
} else {
checkArgument(parts.length > 2);
questionMark = parts[2].indexOf('?');
checkArgument(
questionMark > -1
&& Objects.equals(parts[2].substring(0, questionMark), databaseName));
}
}

/** Create catalog instance from given information. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
}

String searchPath =
extractColumnValuesBySQL(baseUrl + DEFAULT_DATABASE, "show search_path", 1, null)
extractColumnValuesBySQL(
urlFunction.apply(DEFAULT_DATABASE), "show search_path", 1, null)
.get(0);
String[] schemas = searchPath.split("\\s*,\\s*");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public List<String> listTables(String databaseName)
}

return extractColumnValuesBySQL(
baseUrl + databaseName,
urlFunction.apply(databaseName),
"SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
1,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public List<String> listTables(String databaseName)
throw new DatabaseNotExistException(getName(), databaseName);
}

final String url = baseUrl + databaseName;
final String url = urlFunction.apply(databaseName);
try (Connection conn = DriverManager.getConnection(url, username, pwd)) {
// get all schemas
List<String> schemas;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ class JdbcCatalogUtilsTest {

@Test
void testJdbcUrl() {
JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432/");
JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432/", null);

JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432");
JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432", null);
JdbcCatalogUtils.validateJdbcUrl(
"jdbc:postgres://demo-postgresql.example.com:18025/defaultdb?sslmode=require",
"defaultdb");
}

@Test
void testInvalidJdbcUrl() {
assertThatThrownBy(
() ->
JdbcCatalogUtils.validateJdbcUrl(
"jdbc:postgresql://localhost:5432/db"))
"jdbc:postgresql://localhost:5432/db", null))
.isInstanceOf(IllegalArgumentException.class);
}
}