From e4559c9a7f1d8f0ac16f4c6087464d44d3354fe8 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sun, 12 Jan 2025 17:37:36 +0900 Subject: [PATCH] [FLINK-37288] Add flink-connector-jdbc-spnner --- docs/content.zh/docs/connectors/table/jdbc.md | 46 +- docs/content/docs/connectors/table/jdbc.md | 48 +- .../database/catalog/AbstractJdbcCatalog.java | 19 +- .../sink/JdbcDynamicTableSinkITCase.java | 4 +- .../jdbc/testutils/tables/TableBase.java | 16 +- .../jdbc/testutils/tables/TableRow.java | 2 + flink-connector-jdbc-spanner/pom.xml | 98 ++++ .../jdbc/spanner/database/SpannerFactory.java | 52 ++ .../database/catalog/SpannerCatalog.java | 320 ++++++++++++ .../database/catalog/SpannerTablePath.java | 112 +++++ .../database/catalog/SpannerTypeMapper.java | 160 ++++++ .../database/dialect/SpannerDialect.java | 130 +++++ .../dialect/SpannerDialectConverter.java | 94 ++++ ...k.connector.jdbc.core.database.JdbcFactory | 16 + .../jdbc/spanner/SpannerTestBase.java | 29 +- .../spanner/database/SpannerFactoryTest.java | 82 ++++ .../catalog/SpannerCatalogITCase.java | 460 ++++++++++++++++++ .../catalog/SpannerCatalogTestBase.java | 223 +++++++++ .../catalog/SpannerTablePathTest.java | 51 ++ .../database/dialect/SpannerDialectTest.java | 58 +++ .../table/SpannerDynamicTableSinkITCase.java | 218 +++++++++ .../SpannerDynamicTableSourceITCase.java | 73 +++ .../spanner/testutils/SpannerDatabase.java | 160 ++++++ .../jdbc/spanner/testutils/SpannerImages.java | 23 + .../spanner/testutils/SpannerMetadata.java | 84 ++++ .../testutils/tables/SpannerTableRow.java | 41 ++ .../src/test/resources/log4j2-test.properties | 28 ++ pom.xml | 3 +- 28 files changed, 2581 insertions(+), 69 deletions(-) create mode 100644 flink-connector-jdbc-spanner/pom.xml create mode 100644 flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/SpannerFactory.java create mode 100644 flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalog.java create mode 100644 flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTablePath.java create mode 100644 flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTypeMapper.java create mode 100644 flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialect.java create mode 100644 flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialectConverter.java create mode 100644 flink-connector-jdbc-spanner/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory rename flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java => flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/SpannerTestBase.java (52%) create mode 100644 flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/SpannerFactoryTest.java create mode 100644 flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalogITCase.java create mode 100644 flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalogTestBase.java create mode 100644 flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTablePathTest.java create mode 100644 flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialectTest.java create mode 100644 flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/table/SpannerDynamicTableSinkITCase.java create mode 100644 flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/table/SpannerDynamicTableSourceITCase.java create mode 100644 flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerDatabase.java create mode 100644 flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerImages.java create mode 100644 flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerMetadata.java create mode 100644 flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/tables/SpannerTableRow.java create mode 100644 flink-connector-jdbc-spanner/src/test/resources/log4j2-test.properties diff --git a/docs/content.zh/docs/connectors/table/jdbc.md b/docs/content.zh/docs/connectors/table/jdbc.md index 8390c352..68fd6395 100644 --- a/docs/content.zh/docs/connectors/table/jdbc.md +++ b/docs/content.zh/docs/connectors/table/jdbc.md @@ -47,17 +47,18 @@ JDBC 连接器不是二进制发行版的一部分,请查阅[这里]({{< ref " 在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下: -| Driver | Group Id | Artifact Id | JAR | -|:-----------|:---------------------------|:-----------------------|:----------------------------------------------------------------------------------------------------------------------------| -| MySQL | `mysql` | `mysql-connector-java` | [下载](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) | -| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [下载](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) | -| PostgreSQL | `org.postgresql` | `postgresql` | [下载](https://jdbc.postgresql.org/download/) | -| Derby | `org.apache.derby` | `derby` | [下载](http://db.apache.org/derby/derby_downloads.html) | -| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [下载](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) | -| CrateDB | `io.crate` | `crate-jdbc` | [下载](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) | -| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [下载](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) | -| Trino | `io.trino` | `trino-jdbc` | [下载](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) | -| OceanBase | `com.oceanbase` | `oceanbase-client` | [下载](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) | +| Driver | Group Id | Artifact Id | JAR | +|:-----------|:---------------------------|:----------------------------|:----------------------------------------------------------------------------------------------------------------------------| +| MySQL | `mysql` | `mysql-connector-java` | [下载](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) | +| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [下载](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) | +| PostgreSQL | `org.postgresql` | `postgresql` | [下载](https://jdbc.postgresql.org/download/) | +| Derby | `org.apache.derby` | `derby` | [下载](http://db.apache.org/derby/derby_downloads.html) | +| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [下载](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) | +| CrateDB | `io.crate` | `crate-jdbc` | [下载](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) | +| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [下载](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) | +| Trino | `io.trino` | `trino-jdbc` | [下载](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) | +| OceanBase | `com.oceanbase` | `oceanbase-client` | [下载](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) | +| Spanner | `com.google.cloud` | `google-cloud-spanner-jdbc` | [下载](https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner-jdbc) | 当前,JDBC 连接器和驱动不在 Flink 二进制发布包中,请参阅[这里]({{< ref "docs/dev/configuration/overview" >}})了解在集群上执行时如何连接它们。 @@ -434,6 +435,10 @@ lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性 WHEN NOT MATCHED THEN INSERT (..)
VALUES (..) + + Spanner + INSERT OR UPDATE INTO .. (..) VALUES (..) + @@ -473,6 +478,7 @@ JDBC catalog 支持以下参数: - 对于 Postgres Catalog `base-url` 应为 `"jdbc:postgresql://:"` 的格式。 - 对于 MySQL Catalog `base-url` 应为 `"jdbc:mysql://:"` 的格式。 - 对于 OceanBase Catalog `base-url` 应为 `"jdbc:oceanbase://:"` 的格式。 + - 对于 Spanner Catalog `base-url` 应为 `"jdbc:cloudspanner:/projects//instances//databases/"` 的格式。 - `compatible-mode`: 选填,数据库的兼容模式。 {{< tabs "10bd8bfb-674c-46aa-8a36-385537df5791" >}} @@ -709,7 +715,7 @@ SELECT * FROM given_database.test_table2; 数据类型映射 ---------------- -Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、CrateDB, Derby、Db2、 SQL Server、OceanBase 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。 +Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、CrateDB, Derby、Db2、 SQL Server、OceanBase、Spanner 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。 @@ -723,6 +729,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O + @@ -737,6 +744,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O + @@ -759,6 +767,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O SMALLINT
TINYINT UNSIGNED + @@ -781,6 +790,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O MEDIUMINT
SMALLINT UNSIGNED + @@ -801,6 +811,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O BIGINT
INT UNSIGNED + @@ -813,6 +824,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O + @@ -831,6 +843,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O + @@ -849,6 +862,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O + @@ -877,6 +891,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O + @@ -894,6 +909,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O TINYINT(1) + @@ -906,6 +922,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O + @@ -917,6 +934,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O + @@ -932,6 +950,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O + @@ -980,6 +999,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O NCHAR(n)
VARCHAR2(n)
CLOB + @@ -1005,6 +1025,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O + @@ -1018,6 +1039,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O +
Trino type OceanBase MySQL mode type OceanBase Oracle mode typeSpanner type }}">Flink SQL type
TINYINT TINYINT TINYINT
SMALLINT
INT
INT64 BIGINT
BIGINT UNSIGNED NUMERIC DECIMAL(20, 0)
FLOAT BINARY_FLOATFLOAT32 FLOAT
DOUBLE DOUBLE BINARY_DOUBLEFLOAT64 DOUBLE
FLOAT(s)
NUMBER(p, s)
NUMERIC DECIMAL(p, s)
BOOLEANBOOLEAN
DATEDATE DATE DATEDATE
TIME [(p)]TIME_WITHOUT_TIME_ZONE TIME [(p)] DATE TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP_WITHOUT_TIME_ZONE DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
STRING(n) STRING
RAW(s)
BLOB
BYTES(n) BYTES
ARRAYARRAY
diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md index b0ef84eb..190c9c73 100644 --- a/docs/content/docs/connectors/table/jdbc.md +++ b/docs/content/docs/connectors/table/jdbc.md @@ -45,17 +45,18 @@ See how to link with it for cluster execution [here]({{< ref "docs/dev/configura A driver dependency is also required to connect to a specified database. Here are drivers currently supported: -| Driver | Group Id | Artifact Id | JAR | -|:-----------|:---------------------------|:-----------------------|:----------------------------------------------------------------------------------------------------------------------------------| -| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) | -| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) | -| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download/) | -| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) | -| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) | -| CrateDB | `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) | -| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) | -| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) | -| OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) | +| Driver | Group Id | Artifact Id | JAR | +|:-----------|:---------------------------|:----------------------------|:----------------------------------------------------------------------------------------------------------------------------------------| +| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) | +| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) | +| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download/) | +| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) | +| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) | +| CrateDB | `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) | +| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) | +| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) | +| OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) | +| Spanner | `com.google.cloud` | `google-cloud-spanner-jdbc` | [Download](https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner-jdbc) | JDBC connector and drivers are not part of Flink's binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}). @@ -447,6 +448,10 @@ As there is no standard syntax for upsert, the following table describes the dat WHEN NOT MATCHED THEN INSERT (..)
VALUES (..) + + Spanner + INSERT OR UPDATE INTO .. (..) VALUES (..) + @@ -481,7 +486,8 @@ The JDBC catalog supports the following options: - `base-url`: required (should not contain the database name) - for Postgres Catalog this should be `"jdbc:postgresql://:"` - for MySQL Catalog this should be `"jdbc:mysql://:"` - - for OceanBase Catalog this should be `jdbc:oceanbase://:` + - for OceanBase Catalog this should be `"jdbc:oceanbase://:"` + - for Spanner Catalog this should be `"jdbc:cloudspanner:/projects//instances//databases/"` - `compatible-mode`: optional, the compatible mode of database. {{< tabs "10bd8bfb-674c-46aa-8a36-385537df5791" >}} @@ -702,7 +708,7 @@ SELECT * FROM given_database.test_table2; Data Type Mapping ---------------- -Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily. +Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2, OceanBase and Spanner. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily. @@ -716,6 +722,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl + @@ -730,6 +737,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl + @@ -752,6 +760,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl SMALLINT
TINYINT UNSIGNED + @@ -774,6 +783,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl MEDIUMINT
SMALLINT UNSIGNED + @@ -794,6 +804,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl BIGINT
INT UNSIGNED + @@ -806,6 +817,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl + @@ -824,6 +836,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl + @@ -842,6 +855,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl + @@ -870,6 +884,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl + @@ -887,6 +902,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl TINYINT(1) + @@ -899,6 +915,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl + @@ -910,6 +927,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl + @@ -925,6 +943,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl + @@ -973,6 +992,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl NCHAR(n)
VARCHAR2(n)
CLOB + @@ -998,6 +1018,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl + @@ -1011,6 +1032,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl +
Trino type OceanBase MySQL mode type OceanBase Oracle mode typeSpanner type }}">Flink SQL type
TINYINT TINYINT TINYINT
SMALLINT
INT
INT64 BIGINT
BIGINT UNSIGNED NUMERIC DECIMAL(20, 0)
FLOAT BINARY_FLOATFLOAT32 FLOAT
DOUBLE DOUBLE BINARY_DOUBLEFLOAT64 DOUBLE
FLOAT(s)
NUMBER(p, s)
NUMERIC DECIMAL(p, s)
BOOLEANBOOLEAN
DATEDATE DATE DATEDATE
TIME [(p)]TIME_WITHOUT_TIME_ZONE TIME [(p)] DATE TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP_WITHOUT_TIME_ZONE DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
STRING(n) STRING
RAW(s)
BLOB
BYTES(n) BYTES
ARRAYARRAY
diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java index 73424598..73585b68 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java @@ -125,17 +125,10 @@ public AbstractJdbcCatalog( checkNotNull(userClassLoader); checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl)); - validateJdbcUrl(baseUrl); - this.userClassLoader = userClassLoader; + this.connectionProperties = Preconditions.checkNotNull(connectionProperties); this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; this.defaultUrl = getDatabaseUrl(defaultDatabase); - this.connectionProperties = Preconditions.checkNotNull(connectionProperties); - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY))); - checkArgument( - !StringUtils.isNullOrWhitespaceOnly( - connectionProperties.getProperty(PASSWORD_KEY))); } protected String getDatabaseUrl(String databaseName) { @@ -576,16 +569,6 @@ protected String getSchemaTableName(ObjectPath tablePath) { throw new UnsupportedOperationException(); } - /** - * URL has to be without database, like "jdbc:dialect://localhost:1234/" or - * "jdbc:dialect://localhost:1234" rather than "jdbc:dialect://localhost:1234/db". - */ - protected static void validateJdbcUrl(String url) { - String[] parts = url.trim().split("\\/+"); - - checkArgument(parts.length == 2); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java index f95fd3c9..c89a57be 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java @@ -198,7 +198,7 @@ private void createTestDataTempView(StreamTableEnvironment tEnv, String viewName } @Test - void testReal() throws Exception { + protected void testReal() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableObjectReuse(); StreamTableEnvironment tEnv = @@ -290,7 +290,7 @@ void testAppend() throws Exception { } @Test - void testBatchSink() throws Exception { + protected void testBatchSink() throws Exception { TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); String tableName = "batchSink"; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java index d8cbd793..1b30192e 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableBase.java @@ -52,8 +52,10 @@ /** Base table operations. * */ public abstract class TableBase implements TableManaged { - private final String name; - private final TableField[] fields; + public static final String DEFAULT_PRIMARY_KEY_CONSTRAINT_NAME = "PRIMARY"; + + protected final String name; + protected final TableField[] fields; protected TableBase(String name, TableField[] fields) { Preconditions.checkArgument(name != null && !name.isEmpty(), "Table name must be defined"); @@ -69,7 +71,7 @@ public String getTableName() { return name; } - private Stream getStreamFields() { + protected Stream getStreamFields() { return Arrays.stream(this.fields); } @@ -120,7 +122,7 @@ public int[] getTableTypes() { .toArray(); } - public Schema getTableSchema() { + public Schema getTableSchema(String pkConstraintName) { Schema.Builder schema = Schema.newBuilder(); getStreamFields().forEach(field -> schema.column(field.getName(), field.getDataType())); @@ -129,11 +131,15 @@ public Schema getTableSchema() { .filter(TableField::isPkField) .map(TableField::getName) .collect(Collectors.joining(", ")); - schema.primaryKeyNamed("PRIMARY", pkFields); + schema.primaryKeyNamed(pkConstraintName, pkFields); return schema.build(); } + public Schema getTableSchema() { + return getTableSchema(DEFAULT_PRIMARY_KEY_CONSTRAINT_NAME); + } + public ResolvedSchema getTableResolvedSchema() { return ResolvedSchema.of( getStreamFields() diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java index 87308fb7..448c03e8 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/tables/TableRow.java @@ -91,6 +91,8 @@ protected JdbcResultSetBuilder getResultSetBuilder() { } else if (type.getConversionClass().equals(LocalDateTime.class)) { ps.setTimestamp( i + 1, Timestamp.valueOf(row.getFieldAs(i))); + } else if (type.getConversionClass().equals(Float.class)) { + ps.setFloat(i + 1, row.getFieldAs(i)); } else { ps.setObject(i + 1, row.getField(i)); } diff --git a/flink-connector-jdbc-spanner/pom.xml b/flink-connector-jdbc-spanner/pom.xml new file mode 100644 index 00000000..6a76c63d --- /dev/null +++ b/flink-connector-jdbc-spanner/pom.xml @@ -0,0 +1,98 @@ + + + 4.0.0 + + org.apache.flink + flink-connector-jdbc-parent + 3.3-SNAPSHOT + + + flink-connector-jdbc-spanner + Flink : Connectors : JDBC : Spanner + + jar + + + 2.26.1 + + + + + + org.apache.flink + flink-connector-jdbc-core + ${project.version} + + + + org.apache.flink + flink-connector-jdbc-core + ${project.version} + test-jar + test + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + true + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + + + com.google.cloud + google-cloud-spanner-jdbc + ${spanner.version} + provided + + + + + + org.assertj + assertj-core + ${assertj.version} + test + + + + + org.testcontainers + gcloud + test + + + org.testcontainers + jdbc + test + + + + + diff --git a/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/SpannerFactory.java b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/SpannerFactory.java new file mode 100644 index 00000000..89b1cd75 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/SpannerFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.database; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.JdbcFactory; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.spanner.database.catalog.SpannerCatalog; +import org.apache.flink.connector.jdbc.spanner.database.dialect.SpannerDialect; + +/** Factory for {@link SpannerDialect}. */ +@Internal +public class SpannerFactory implements JdbcFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:cloudspanner:"); + } + + @Override + public JdbcDialect createDialect() { + return new SpannerDialect(); + } + + @Override + public JdbcCatalog createCatalog( + ClassLoader classLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl) { + return new SpannerCatalog( + classLoader, catalogName, defaultDatabase, username, pwd, baseUrl); + } +} diff --git a/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalog.java b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalog.java new file mode 100644 index 00000000..c782dfc6 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalog.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.database.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.jdbc.core.database.catalog.AbstractJdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient; +import com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient.ListDatabasesPage; +import com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient.ListDatabasesPagedResponse; +import com.google.cloud.spanner.connection.ConnectionOptions; +import com.google.spanner.admin.database.v1.Database; +import com.google.spanner.admin.database.v1.InstanceName; +import org.apache.commons.compress.utils.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.jdbc.JdbcConnectionOptions.getBriefAuthProperties; + +/** Catalog for Spanner. */ +@Internal +public class SpannerCatalog extends AbstractJdbcCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(SpannerCatalog.class); + + private static final Set builtinSchemas = + new HashSet() { + { + add("INFORMATION_SCHEMA"); + add("SPANNER_SYS"); + } + }; + + private final JdbcCatalogTypeMapper dialectTypeMapper; + + @VisibleForTesting + public SpannerCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String username, + String pwd, + String baseUrl) { + this( + userClassLoader, + catalogName, + defaultDatabase, + baseUrl, + getBriefAuthProperties(username, pwd)); + } + + public SpannerCatalog( + ClassLoader userClassLoader, + String catalogName, + String defaultDatabase, + String baseUrl, + Properties connectProperties) { + super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectProperties); + this.dialectTypeMapper = new SpannerTypeMapper(); + } + + private ConnectionOptions getConnectionOptions() { + return ConnectionOptions.newBuilder().setUri(defaultUrl.replace("jdbc:", "")).build(); + } + + private SpannerOptions getSpannerOptions(ConnectionOptions options) { + SpannerOptions.Builder builder = + SpannerOptions.newBuilder().setProjectId(options.getProjectId()); + if (options.getHost().contains("localhost")) { + builder.setEmulatorHost(options.getHost()); + } else { + builder.setHost(options.getHost()); + } + return builder.build(); + } + + private Map getColumnNullables(Connection conn, ObjectPath tablePath) + throws SQLException { + Map nullables = new HashMap<>(); + try (PreparedStatement ps = + conn.prepareStatement( + "SELECT COLUMN_NAME, IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS " + + "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " + + "ORDER BY ORDINAL_POSITION;")) { + ps.setString(1, getSchemaName(tablePath)); + ps.setString(2, getTableName(tablePath)); + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + String isNullable = rs.getString(2); + nullables.put(rs.getString(1), isNullable.equals("YES")); + } + } + } + return nullables; + } + + @Override + protected String getDatabaseUrl(String databaseName) { + String databaseUrl = baseUrl + databaseName; + if (connectionProperties.containsKey("autoConfigEmulator")) { + databaseUrl += + ";autoConfigEmulator=" + connectionProperties.getProperty("autoConfigEmulator"); + } + return databaseUrl; + } + + @Override + protected Optional getPrimaryKey( + DatabaseMetaData metaData, String database, String schema, String table) + throws SQLException { + // In the case of Spanner, the database cannot be specified + // because it always uses the connected database. + return super.getPrimaryKey(metaData, null, schema, table); + } + + @Override + public List listDatabases() throws CatalogException { + List databases = Lists.newArrayList(); + ConnectionOptions connOptions = getConnectionOptions(); + SpannerOptions spannerOptions = getSpannerOptions(connOptions); + try (Spanner spanner = spannerOptions.getService(); + DatabaseAdminClient databaseAdminClient = spanner.createDatabaseAdminClient()) { + InstanceName instanceName = + InstanceName.of(connOptions.getProjectId(), connOptions.getInstanceId()); + ListDatabasesPagedResponse response = databaseAdminClient.listDatabases(instanceName); + for (ListDatabasesPage page : response.iteratePages()) { + for (Database database : page.iterateAll()) { + final String fullName = database.getName(); + databases.add(fullName.substring(fullName.lastIndexOf('/') + 1)); + } + } + } catch (Exception e) { + throw new CatalogException("Failed to list databases.", e); + } + return databases; + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + String databaseName = tablePath.getDatabaseName(); + + try (Connection conn = + DriverManager.getConnection(getDatabaseUrl(databaseName), connectionProperties)) { + DatabaseMetaData metaData = conn.getMetaData(); + Optional primaryKey = + getPrimaryKey( + metaData, + databaseName, + getSchemaName(tablePath), + getTableName(tablePath)); + + // ResultSetMetaData.isNullable always returns columnNullableUnknown=2. + // https://github.com/googleapis/java-spanner-jdbc/blob/v2.26.1/src/main/java/com/google/cloud/spanner/jdbc/JdbcResultSetMetaData.java#L76 + // The INFORMATION_SCHEMA.COLUMNS table is used to retrieve nullability. + Map nullables = getColumnNullables(conn, tablePath); + + PreparedStatement ps = + conn.prepareStatement( + String.format("SELECT * FROM %s;", getSchemaTableName(tablePath))); + + ResultSetMetaData resultSetMetaData = ps.getMetaData(); + + String[] columnNames = new String[resultSetMetaData.getColumnCount()]; + DataType[] types = new DataType[resultSetMetaData.getColumnCount()]; + + for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { + final String columnName = resultSetMetaData.getColumnName(i); + columnNames[i - 1] = columnName; + types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i); + if (!nullables.getOrDefault(columnName, true)) { + types[i - 1] = types[i - 1].notNull(); + } + } + + Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types); + primaryKey.ifPresent( + pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns())); + Schema tableSchema = schemaBuilder.build(); + + return CatalogTable.of(tableSchema, null, Lists.newArrayList(), getOptions(tablePath)); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } + } + + private List getTables(Connection conn, List schemas) throws SQLException { + List tables = Lists.newArrayList(); + try (PreparedStatement ps = + conn.prepareStatement( + "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.`TABLES` " + + "WHERE TABLE_TYPE = 'BASE TABLE' " + + "AND TABLE_SCHEMA = ? " + + "ORDER BY TABLE_NAME;")) { + for (String schema : schemas) { + extractColumnValuesByStatement(ps, 1, null, schema).stream() + .map( + table -> + StringUtils.isNullOrWhitespaceOnly(schema) + ? table + : String.format("%s.%s", schema, table)) + .forEach(tables::add); + } + return tables; + } + } + + @Override + public List listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + Preconditions.checkState( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + final String url = getDatabaseUrl(databaseName); + try (Connection conn = DriverManager.getConnection(url, connectionProperties)) { + // get all schemas + List schemas; + try (PreparedStatement ps = + conn.prepareStatement("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA;")) { + schemas = + extractColumnValuesByStatement( + ps, 1, pgSchema -> !builtinSchemas.contains(pgSchema)); + } + + // get all tables + return getTables(conn, schemas); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to list tables for database %s", databaseName), e); + } + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + List tables; + try { + tables = listTables(tablePath.getDatabaseName()); + } catch (DatabaseNotExistException e) { + return false; + } + return tables.contains(getSchemaTableName(tablePath)); + } + + /** Converts Spanner type to Flink {@link DataType}. */ + @Override + protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) + throws SQLException { + return dialectTypeMapper.mapping(tablePath, metadata, colIndex); + } + + @Override + protected String getSchemaName(ObjectPath tablePath) { + return SpannerTablePath.fromFlinkTableName(tablePath.getObjectName()).getSchemaName(); + } + + @Override + protected String getTableName(ObjectPath tablePath) { + return SpannerTablePath.fromFlinkTableName(tablePath.getObjectName()).getTableName(); + } + + @Override + protected String getSchemaTableName(ObjectPath tablePath) { + return SpannerTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath(); + } +} diff --git a/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTablePath.java b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTablePath.java new file mode 100644 index 00000000..8af614ea --- /dev/null +++ b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTablePath.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.database.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.StringUtils; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Table path of Spanner in Flink. Can be of formats "table_name" or "schema_name.table_name". + * Spanner requires the use of fully qualified names to reference a database object in non-default + * named schema. The schema name is not required for the default schema. + * https://cloud.google.com/spanner/docs/named-schemas + */ +@Internal +public class SpannerTablePath { + + private static final String DEFAULT_SCHEMA_NAME = ""; + + private final String schemaName; + private final String tableName; + + public SpannerTablePath(String schemaName, String tableName) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableName), + "Table name is not valid. Null or empty is not allowed"); + this.schemaName = schemaName; + this.tableName = tableName; + } + + public SpannerTablePath(String tableName) { + this(DEFAULT_SCHEMA_NAME, tableName); + } + + public static SpannerTablePath fromFlinkTableName(String flinkTableName) { + if (flinkTableName.contains(".")) { + String[] path = flinkTableName.split("\\."); + checkArgument( + path.length == 2, + String.format( + "Table name '%s' is not valid. The parsed length is %d", + flinkTableName, path.length)); + return new SpannerTablePath(path[0], path[1]); + } else { + return new SpannerTablePath(flinkTableName); + } + } + + public static String toFlinkTableName(String schema, String table) { + return new SpannerTablePath(schema, table).getFullPath(); + } + + public String getFullPath() { + if (StringUtils.isNullOrWhitespaceOnly(schemaName)) { + return tableName; + } else { + return String.format("%s.%s", schemaName, tableName); + } + } + + public String getSchemaName() { + return schemaName; + } + + public String getTableName() { + return tableName; + } + + @Override + public String toString() { + return getFullPath(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + SpannerTablePath that = (SpannerTablePath) o; + return Objects.equals(schemaName, that.schemaName) + && Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(schemaName, tableName); + } +} diff --git a/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTypeMapper.java b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTypeMapper.java new file mode 100644 index 00000000..de97345b --- /dev/null +++ b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTypeMapper.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.database.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalogTypeMapper; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.types.DataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +/** SpannerTypeMapper util class. */ +@Internal +public class SpannerTypeMapper implements JdbcCatalogTypeMapper { + + private static final Logger LOG = LoggerFactory.getLogger(SpannerTypeMapper.class); + + private static final String SPANNER_BOOL = "BOOL"; + private static final String SPANNER_INT64 = "INT64"; + private static final String SPANNER_NUMERIC = "NUMERIC"; + private static final String SPANNER_FLOAT64 = "FLOAT64"; + private static final String SPANNER_FLOAT32 = "FLOAT32"; + private static final String SPANNER_STRING = "STRING"; + private static final String SPANNER_JSON = "JSON"; + private static final String SPANNER_PROTO = "PROTO"; + private static final String SPANNER_ENUM = "ENUM"; + private static final String SPANNER_BYTES = "BYTES"; + private static final String SPANNER_TIMESTAMP = "TIMESTAMP"; + private static final String SPANNER_DATE = "DATE"; + private static final String SPANNER_ARRAY = "ARRAY"; + private static final String SPANNER_STRUCT = "STRUCT"; + + private static final String SPANNER_ARRAY_BOOL = + "[Ljava.lang.Boolean;"; // Boolean[].class.getName() + private static final String SPANNER_ARRAY_BYTES = "[[B"; // byte[][].class.getName() + // private static final String SPANNER_ARRAY_PROTO = "[[B"; + private static final String SPANNER_ARRAY_DATE = "[Ljava.sql.Date;"; // Date[].class.getName() + private static final String SPANNER_ARRAY_FLOAT32 = + "[Ljava.lang.Float;"; // Float[].class.getName() + private static final String SPANNER_ARRAY_FLOAT64 = + "[Ljava.lang.Double;"; // Double[].class.getName() + private static final String SPANNER_ARRAY_INT64 = "[Ljava.lang.Long;"; // Long[].class.getName() + // private static final String SPANNER_ARRAY_ENUM = "[Ljava.lang.Long;"; + private static final String SPANNER_ARRAY_NUMERIC = + "[Ljava.math.BigDecimal;"; // BigDecimal[].class.getName() + private static final String SPANNER_ARRAY_STRING = + "[Ljava.lang.String;"; // String[].class.getName() + // private static final String SPANNER_ARRAY_JSON = "[Ljava.lang.String;"; + private static final String SPANNER_ARRAY_TIMESTAMP = + "[Ljava.sql.Timestamp;"; // Timestamp[].class.getName() + + @Override + public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) + throws SQLException { + final String spannerType = metadata.getColumnTypeName(colIndex); + final String spannerClassName = metadata.getColumnClassName(colIndex); + final int precision = metadata.getPrecision(colIndex); + final int scale = metadata.getScale(colIndex); + return getMapping(spannerType, spannerClassName, precision, scale); + } + + private DataType getMapping( + String spannerType, String spannerClassName, int precision, int scale) { + switch (spannerType) { + case SPANNER_BOOL: + return DataTypes.BOOLEAN(); + case SPANNER_BYTES: + case SPANNER_PROTO: + // The default column display size is returned for precision, + // so it is handled as a BYTES type, not a VARBINARY type. + // https://github.com/googleapis/java-spanner-jdbc/blob/v2.26.1/src/main/java/com/google/cloud/spanner/jdbc/JdbcResultSetMetaData.java#L157 + return DataTypes.BYTES(); + case SPANNER_DATE: + return DataTypes.DATE(); + case SPANNER_FLOAT32: + return DataTypes.FLOAT(); + case SPANNER_FLOAT64: + return DataTypes.DOUBLE(); + case SPANNER_INT64: + case SPANNER_ENUM: + return DataTypes.BIGINT(); + case SPANNER_NUMERIC: + // The precision for the numeric type is 14 and the scale is 15, both of which are + // fixed values. + // https://github.com/googleapis/java-spanner-jdbc/blob/v2.26.1/src/main/java/com/google/cloud/spanner/jdbc/JdbcResultSetMetaData.java#L149 + // https://github.com/googleapis/java-spanner-jdbc/blob/v2.26.1/src/main/java/com/google/cloud/spanner/jdbc/JdbcResultSetMetaData.java#L168 + // But the document describes it as follows. + // The GoogleSQL NUMERIC is an exact numeric data type capable of representing an + // exact numeric value + // with a precision of 38 and scale of 9. + // https://cloud.google.com/spanner/docs/working-with-numerics + return DataTypes.DECIMAL(38, 9); + case SPANNER_STRING: + // The default column display size is returned for precision, + // so it is handled as a STRING type, not a VARCHAR type. + // https://github.com/googleapis/java-spanner-jdbc/blob/v2.26.1/src/main/java/com/google/cloud/spanner/jdbc/JdbcResultSetMetaData.java#L157 + case SPANNER_JSON: + case SPANNER_STRUCT: + return DataTypes.STRING(); + case SPANNER_TIMESTAMP: + return DataTypes.TIMESTAMP(); + case SPANNER_ARRAY: + return getArrayMapping(spannerClassName, precision, scale); + default: + throw new UnsupportedOperationException( + String.format("Unsupported spanner type: %s", spannerType)); + } + } + + private DataType getArrayMapping(String spannerClassName, int precision, int scale) { + switch (spannerClassName) { + case SPANNER_ARRAY_BOOL: + return DataTypes.ARRAY(DataTypes.BOOLEAN()); + case SPANNER_ARRAY_BYTES: + // The default column display size is returned for precision, + // so it is handled as a BYTES type, not a VARBINARY type. + return DataTypes.ARRAY(DataTypes.BYTES()); + case SPANNER_ARRAY_DATE: + return DataTypes.ARRAY(DataTypes.DATE()); + case SPANNER_ARRAY_FLOAT32: + return DataTypes.ARRAY(DataTypes.FLOAT()); + case SPANNER_ARRAY_FLOAT64: + return DataTypes.ARRAY(DataTypes.DOUBLE()); + case SPANNER_ARRAY_INT64: + return DataTypes.ARRAY(DataTypes.BIGINT()); + case SPANNER_ARRAY_NUMERIC: + return DataTypes.ARRAY(DataTypes.DECIMAL(38, 9)); + case SPANNER_ARRAY_STRING: + // The default column display size is returned for precision, + // so it is handled as a STRING type, not a VARCHAR type. + return DataTypes.ARRAY(DataTypes.STRING()); + case SPANNER_ARRAY_TIMESTAMP: + return DataTypes.ARRAY(DataTypes.TIMESTAMP()); + default: + throw new UnsupportedOperationException( + "Unsupported spanner array type: " + spannerClassName); + } + } +} diff --git a/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialect.java b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialect.java new file mode 100644 index 00000000..66e17cab --- /dev/null +++ b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialect.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialect; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** JDBC dialect for Spanner. */ +@Internal +public class SpannerDialect extends AbstractDialect { + + private static final long serialVersionUID = 1L; + + // Define MAX/MIN precision of TIMESTAMP type according to Spanner docs: + // https://cloud.google.com/spanner/docs/reference/standard-sql/data-types#timestamp_type + private static final int MAX_TIMESTAMP_PRECISION = 9; + private static final int MIN_TIMESTAMP_PRECISION = 0; + + // Define MAX/MIN precision of DECIMAL type according to Spanner docs: + // https://cloud.google.com/spanner/docs/reference/standard-sql/data-types#decimal_types + private static final int MAX_DECIMAL_PRECISION = 38; + private static final int MIN_DECIMAL_PRECISION = 0; + + @Override + public SpannerDialectConverter getRowConverter(RowType rowType) { + return new SpannerDialectConverter(rowType); + } + + @Override + public Optional defaultDriverName() { + return Optional.of("com.google.cloud.spanner.jdbc.JdbcDriver"); + } + + @Override + public String dialectName() { + return "Spanner"; + } + + @Override + public String getLimitClause(long limit) { + return "LIMIT " + limit; + } + + /** Spanner upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into Spanner. */ + @Override + public Optional getUpsertStatement( + String tableName, String[] fieldNames, String[] uniqueKeyFields) { + String columns = + Arrays.stream(fieldNames) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String placeholders = + Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", ")); + return Optional.of( + "INSERT OR UPDATE INTO " + + quoteIdentifier(tableName) + + "(" + + columns + + ")" + + " VALUES (" + + placeholders + + ")"); + } + + @Override + public String quoteIdentifier(String identifier) { + return identifier; + } + + @Override + public Optional decimalPrecisionRange() { + return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION)); + } + + @Override + public Optional timestampPrecisionRange() { + return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION)); + } + + @Override + public Set supportedTypes() { + // The data types used in Spanner are list at: + // https://cloud.google.com/spanner/docs/reference/standard-sql/data-types + + // TODO: We can't convert BINARY data type to + // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in + // LegacyTypeInfoDataTypeConverter. + + return EnumSet.of( + LogicalTypeRoot.CHAR, + LogicalTypeRoot.VARCHAR, + LogicalTypeRoot.BOOLEAN, + LogicalTypeRoot.VARBINARY, + LogicalTypeRoot.DECIMAL, + LogicalTypeRoot.TINYINT, + LogicalTypeRoot.SMALLINT, + LogicalTypeRoot.INTEGER, + LogicalTypeRoot.BIGINT, + LogicalTypeRoot.FLOAT, + LogicalTypeRoot.DOUBLE, + LogicalTypeRoot.DATE, + LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, + LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + LogicalTypeRoot.ARRAY); + } +} diff --git a/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialectConverter.java b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialectConverter.java new file mode 100644 index 00000000..d2f8e96f --- /dev/null +++ b/flink-connector-jdbc-spanner/src/main/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialectConverter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.database.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; + +import java.lang.reflect.Array; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * Spanner. + */ +@Internal +public class SpannerDialectConverter extends AbstractDialectConverter { + + private static final long serialVersionUID = 1L; + + @Override + public String converterName() { + return "Spanner"; + } + + public SpannerDialectConverter(RowType rowType) { + super(rowType); + } + + @Override + protected JdbcDeserializationConverter createInternalConverter(LogicalType type) { + LogicalTypeRoot root = type.getTypeRoot(); + + if (root == LogicalTypeRoot.ARRAY) { + ArrayType arrayType = (ArrayType) type; + return createSpannerArrayConverter(arrayType); + } else { + return super.createInternalConverter(type); + } + } + + @Override + protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) { + LogicalTypeRoot root = type.getTypeRoot(); + if (root == LogicalTypeRoot.ARRAY) { + // TODO: FieldNamedPreparedStatement needs to support + // the setArray and createArrayOf methods. + return (val, index, statement) -> { + throw new IllegalStateException( + String.format( + "Writing ARRAY type is not yet supported in JDBC:%s.", + converterName())); + }; + } else { + return super.createNullableExternalConverter(type); + } + } + + private JdbcDeserializationConverter createSpannerArrayConverter(ArrayType arrayType) { + final Class elementClass = + LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); + final JdbcDeserializationConverter elementConverter = + createNullableInternalConverter(arrayType.getElementType()); + return val -> { + java.sql.Array spannerArray = (java.sql.Array) val; + Object[] in = (Object[]) spannerArray.getArray(); + final Object[] array = (Object[]) Array.newInstance(elementClass, in.length); + for (int i = 0; i < in.length; i++) { + array[i] = elementConverter.deserialize(in[i]); + } + return new GenericArrayData(array); + }; + } +} diff --git a/flink-connector-jdbc-spanner/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory b/flink-connector-jdbc-spanner/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory new file mode 100644 index 00000000..8ea8b572 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.spanner.database.SpannerFactory diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/SpannerTestBase.java similarity index 52% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java rename to flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/SpannerTestBase.java index 371140d3..f33fe369 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalogTest.java +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/SpannerTestBase.java @@ -16,27 +16,20 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.core.database.catalog; +package org.apache.flink.connector.jdbc.spanner; -import org.junit.jupiter.api.Test; +import org.apache.flink.connector.jdbc.spanner.testutils.SpannerDatabase; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseTest; -import static org.assertj.core.api.Assertions.assertThatThrownBy; +import org.junit.jupiter.api.extension.ExtendWith; -/** Test for {@link AbstractJdbcCatalog}. */ -class AbstractJdbcCatalogTest { +/** Base class for Spanner testing. */ +@ExtendWith(SpannerDatabase.class) +public interface SpannerTestBase extends DatabaseTest { - @Test - void testJdbcUrl() { - AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234/"); - AbstractJdbcCatalog.validateJdbcUrl("jdbc:dialect://localhost:1234"); - } - - @Test - void testInvalidJdbcUrl() { - assertThatThrownBy( - () -> - AbstractJdbcCatalog.validateJdbcUrl( - "jdbc:dialect://localhost:1234/db")) - .isInstanceOf(IllegalArgumentException.class); + @Override + default DatabaseMetadata getMetadata() { + return SpannerDatabase.getMetadata(); } } diff --git a/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/SpannerFactoryTest.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/SpannerFactoryTest.java new file mode 100644 index 00000000..d9eaf2b0 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/SpannerFactoryTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.database; + +import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader; +import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactory; +import org.apache.flink.connector.jdbc.core.database.catalog.factory.JdbcCatalogFactoryOptions; +import org.apache.flink.connector.jdbc.spanner.SpannerTestBase; +import org.apache.flink.connector.jdbc.spanner.database.catalog.SpannerCatalog; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link JdbcCatalogFactory}. */ +class SpannerFactoryTest implements SpannerTestBase { + + private static String baseUrl; + private static JdbcCatalog catalog; + + private static final String TEST_DEFAULT_DATABASE = "test_database"; + private static final String TEST_CATALOG_NAME = "test_catalog"; + + @BeforeEach + void setup() { + String jdbcUrl = getMetadata().getJdbcUrl(); + baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); + catalog = + JdbcFactoryLoader.loadCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + TEST_DEFAULT_DATABASE, + getMetadata().getUsername(), + getMetadata().getPassword(), + baseUrl, + null); + } + + @Test + void test() { + final Map options = new HashMap<>(); + options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER); + options.put(JdbcCatalogFactoryOptions.DEFAULT_DATABASE.key(), TEST_DEFAULT_DATABASE); + options.put(JdbcCatalogFactoryOptions.USERNAME.key(), getMetadata().getUsername()); + options.put(JdbcCatalogFactoryOptions.PASSWORD.key(), getMetadata().getPassword()); + options.put(JdbcCatalogFactoryOptions.BASE_URL.key(), baseUrl); + + final Catalog actualCatalog = + FactoryUtil.createCatalog( + TEST_CATALOG_NAME, + options, + null, + Thread.currentThread().getContextClassLoader()); + + assertThat(catalog).isEqualTo(actualCatalog); + assertThat(actualCatalog).isInstanceOf(SpannerCatalog.class); + } +} diff --git a/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalogITCase.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalogITCase.java new file mode 100644 index 00000000..c66d6936 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalogITCase.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.database.catalog; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.sql.Date; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** E2E test for {@link SpannerCatalog}. */ +class SpannerCatalogITCase extends SpannerCatalogTestBase { + + // ------ databases ------ + + @Test + void testGetDatabase_DatabaseNotExistException() { + assertThatThrownBy(() -> catalog.getDatabase("nonexistent")) + .isInstanceOf(DatabaseNotExistException.class) + .hasMessageContaining("Database nonexistent does not exist in Catalog"); + } + + @Test + void testListDatabases() { + List actual = catalog.listDatabases(); + + assertThat(actual).isEqualTo(Arrays.asList(metadata.getDatabaseName())); + } + + @Test + void testDatabaseExists() { + assertThat(catalog.databaseExists("nonexistent")).isFalse(); + + assertThat(catalog.databaseExists(metadata.getDatabaseName())).isTrue(); + } + + // ------ tables ------ + + @Test + void testListTables() throws DatabaseNotExistException { + List actual = catalog.listTables(metadata.getDatabaseName()); + + assertThat(actual) + .containsExactlyInAnyOrder( + TABLE_SIMPLE.getTableName(), + TABLE_SIMPLE_WITH_SCHEMA.getTableName(), + TABLE_ALL_TYPES.getTableName(), + TABLE_ALL_TYPES_SINK.getTableName(), + TABLE_ARRAY_TYPES.getTableName(), + TABLE_GROUPED_BY_SINK.getTableName()); + } + + @Test + void testListTables_DatabaseNotExistException() { + assertThatThrownBy(() -> catalog.listTables("nonexistschema")) + .isInstanceOf(DatabaseNotExistException.class); + } + + @Test + void testTableExists() { + assertThat(catalog.tableExists(new ObjectPath(metadata.getDatabaseName(), "nonexist"))) + .isFalse(); + + assertThat( + catalog.tableExists( + new ObjectPath( + metadata.getDatabaseName(), TABLE_SIMPLE.getTableName()))) + .isTrue(); + assertThat( + catalog.tableExists( + new ObjectPath( + metadata.getDatabaseName(), + TABLE_SIMPLE_WITH_SCHEMA.getTableName()))) + .isTrue(); + assertThat( + catalog.tableExists( + new ObjectPath( + metadata.getDatabaseName(), + TABLE_ALL_TYPES.getTableName()))) + .isTrue(); + assertThat( + catalog.tableExists( + new ObjectPath( + metadata.getDatabaseName(), + TABLE_ARRAY_TYPES.getTableName()))) + .isTrue(); + } + + @Test + void testGetTables_TableNotExistException() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + metadata.getDatabaseName(), + SpannerTablePath.toFlinkTableName( + TEST_SCHEMA, "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTables_TableNotExistException_NoSchema() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + metadata.getDatabaseName(), + SpannerTablePath.toFlinkTableName( + "nonexistschema", "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTables_TableNotExistException_NoDatabase() { + assertThatThrownBy( + () -> + catalog.getTable( + new ObjectPath( + "nonexistdb", + SpannerTablePath.toFlinkTableName( + TEST_SCHEMA, "anytable")))) + .isInstanceOf(TableNotExistException.class); + } + + @Test + void testGetTable() throws TableNotExistException, CatalogException { + Schema schema = TABLE_SIMPLE.getTableSchema("PRIMARY_KEY"); + CatalogBaseTable table = + catalog.getTable( + new ObjectPath(metadata.getDatabaseName(), TABLE_SIMPLE.getTableName())); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + schema = TABLE_SIMPLE_WITH_SCHEMA.getTableSchema("PRIMARY_KEY"); + table = + catalog.getTable( + new ObjectPath( + metadata.getDatabaseName(), + TABLE_SIMPLE_WITH_SCHEMA.getTableName())); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + schema = TABLE_ALL_TYPES.getTableSchema("PRIMARY_KEY"); + table = + catalog.getTable( + new ObjectPath(metadata.getDatabaseName(), TABLE_ALL_TYPES.getTableName())); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + + schema = TABLE_ARRAY_TYPES.getTableSchema("PRIMARY_KEY"); + table = + catalog.getTable( + new ObjectPath( + metadata.getDatabaseName(), TABLE_ARRAY_TYPES.getTableName())); + + assertThat(table.getUnresolvedSchema()).isEqualTo(schema); + } + + @Test + void testGetTablePrimaryKey() throws TableNotExistException { + Schema schemaAllTypes = TABLE_ALL_TYPES.getTableSchema("PRIMARY_KEY"); + CatalogBaseTable allTypes = + catalog.getTable( + new ObjectPath(metadata.getDatabaseName(), TABLE_ALL_TYPES.getTableName())); + assertThat(schemaAllTypes.getPrimaryKey()) + .isEqualTo(allTypes.getUnresolvedSchema().getPrimaryKey()); + } + + // ------ select queries ------ + + @Test + void testSelectField() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select id from `%s`", + TABLE_ALL_TYPES.getTableName())) + .execute() + .collect()); + assertThat(results) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "a"), Row.ofKind(RowKind.INSERT, "b")); + } + + @Test + void testWithoutSchema() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`", TABLE_SIMPLE.getTableName())) + .execute() + .collect()); + assertThat(results).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "12345-abcde")); + } + + @Test + void testWithSchema() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`", + SpannerTablePath.fromFlinkTableName( + TABLE_SIMPLE_WITH_SCHEMA.getTableName()))) + .execute() + .collect()); + assertThat(results).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "98765-fghij")); + } + + @Test + void testWithDatabaseWithoutSchema() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`.`%s`", + metadata.getDatabaseName(), + TABLE_SIMPLE.getTableName())) + .execute() + .collect()); + assertThat(results).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "12345-abcde")); + } + + @Test + void testWithDatabaseAndSchema() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`.`%s`", + metadata.getDatabaseName(), + SpannerTablePath.fromFlinkTableName( + TABLE_SIMPLE_WITH_SCHEMA.getTableName()))) + .execute() + .collect()); + assertThat(results).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "98765-fghij")); + } + + @Test + void testFullPath() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`.`%s`.`%s`", + TEST_CATALOG_NAME, + metadata.getDatabaseName(), + TABLE_SIMPLE.getTableName())) + .execute() + .collect()); + assertThat(results).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "12345-abcde")); + } + + @Test + void testFullPathWithSchema() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`.`%s`.`%s`", + TEST_CATALOG_NAME, + metadata.getDatabaseName(), + SpannerTablePath.fromFlinkTableName( + TABLE_SIMPLE_WITH_SCHEMA.getTableName()))) + .execute() + .collect()); + assertThat(results).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "98765-fghij")); + } + + @Test + void testInsert() throws Exception { + String sql = + String.format( + "insert into `%s` select id, col_bool, col_bytes, col_date, col_int64, " + + "col_numeric, col_float32, col_float64, col_string, col_timestamp from `%s`", + TABLE_ALL_TYPES_SINK.getTableName(), TABLE_ALL_TYPES.getTableName()); + tEnv.executeSql(sql).await(); + + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from %s", + TABLE_ALL_TYPES_SINK.getTableName())) + .execute() + .collect()); + assertThat(results) + .containsExactlyInAnyOrder( + Row.ofKind( + RowKind.INSERT, + "a", + true, + "abc".getBytes(), + Date.valueOf("2025-01-01").toLocalDate(), + 1L, + new BigDecimal("3.140000000"), + 1.1f, + -1.1d, + "foo", + ZonedDateTime.of(2025, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime()), + Row.ofKind( + RowKind.INSERT, + "b", + false, + "123".getBytes(), + Date.valueOf("2025-12-31").toLocalDate(), + -1L, + new BigDecimal("-3.140000000"), + -1.1f, + 1.1d, + "foo", + ZonedDateTime.of(2025, 12, 31, 0, 0, 0, 0, ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime())); + } + + @Test + void testGroupByInsert() throws Exception { + tEnv.executeSql( + String.format( + "insert into `%s` select `col_string`, sum(`col_int64`) `col_int64_max` " + + "from `%s` group by `col_string` ", + TABLE_GROUPED_BY_SINK.getTableName(), + TABLE_ALL_TYPES.getTableName())) + .await(); + + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from `%s`", + TABLE_GROUPED_BY_SINK.getTableName())) + .execute() + .collect()); + assertThat(results) + .isEqualTo(Collections.singletonList(Row.ofKind(RowKind.INSERT, "foo", 0L))); + } + + @Test + void testAllTypes() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from %s", TABLE_ALL_TYPES.getTableName())) + .execute() + .collect()); + + assertThat(results) + .containsExactlyInAnyOrder( + Row.ofKind( + RowKind.INSERT, + "a", + true, + "abc".getBytes(), + Date.valueOf("2025-01-01").toLocalDate(), + "{\"key\":\"value1\"}", + 1L, + new BigDecimal("3.140000000"), + 1.1f, + -1.1d, + "foo", + ZonedDateTime.of(2025, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime()), + Row.ofKind( + RowKind.INSERT, + "b", + false, + "123".getBytes(), + Date.valueOf("2025-12-31").toLocalDate(), + "{\"key\":\"value2\"}", + -1L, + new BigDecimal("-3.140000000"), + -1.1f, + 1.1d, + "foo", + ZonedDateTime.of(2025, 12, 31, 0, 0, 0, 0, ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime())); + } + + @Test + void testArrayTypes() { + List results = + CollectionUtil.iteratorToList( + tEnv.sqlQuery( + String.format( + "select * from %s", + TABLE_ARRAY_TYPES.getTableName())) + .execute() + .collect()); + + assertThat(results) + .containsExactlyInAnyOrder( + Row.ofKind( + RowKind.INSERT, + "a", + new Boolean[] {true, false}, + new byte[][] {"abc".getBytes(), "123".getBytes()}, + new LocalDate[] { + Date.valueOf("2025-01-01").toLocalDate(), + Date.valueOf("2025-12-31").toLocalDate() + }, + new Long[] {1L, -1L}, + new BigDecimal[] { + new BigDecimal("3.140000000"), new BigDecimal("-3.140000000") + }, + new Double[] {1.1d, -1.1d}, + new String[] {"foo", "bar"}, + new LocalDateTime[] { + ZonedDateTime.of(2025, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime(), + ZonedDateTime.of(2025, 12, 31, 0, 0, 0, 0, ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime() + })); + } +} diff --git a/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalogTestBase.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalogTestBase.java new file mode 100644 index 00000000..73c79ac3 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerCatalogTestBase.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.database.catalog; + +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.spanner.SpannerTestBase; +import org.apache.flink.connector.jdbc.spanner.testutils.SpannerDatabase; +import org.apache.flink.connector.jdbc.spanner.testutils.SpannerMetadata; +import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; + +import static org.apache.flink.connector.jdbc.spanner.testutils.tables.SpannerTableRow.spannerTableRow; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; +import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; + +/** Test base for {@link SpannerCatalog}. */ +class SpannerCatalogTestBase implements JdbcITCaseBase, SpannerTestBase { + + protected static final String TEST_CATALOG_NAME = "spanner_catalog"; + protected static final String TEST_SCHEMA = "test_schema"; + + protected static final TableRow TABLE_SIMPLE = createTableSimple("test_simple"); + protected static final TableRow TABLE_SIMPLE_WITH_SCHEMA = + createTableWithSchema("test_simple_with_schema"); + protected static final TableRow TABLE_ALL_TYPES = createTableAllTypes("test_all_types"); + protected static final TableRow TABLE_ALL_TYPES_SINK = + createTableAllTypesSink("test_all_types_sink"); + protected static final TableRow TABLE_ARRAY_TYPES = createTableArrayTypes("test_array_types"); + protected static final TableRow TABLE_GROUPED_BY_SINK = + createTableGroupedBy("test_grouped_by_sink"); + + protected static String baseUrl; + protected static SpannerMetadata metadata; + protected static SpannerCatalog catalog; + protected TableEnvironment tEnv; + + protected static TableRow createTableSimple(String name) { + return spannerTableRow( + name, pkField("id", dbType("STRING(36) NOT NULL"), DataTypes.STRING().notNull())); + } + + protected static TableRow createTableAllTypes(String name) { + return spannerTableRow( + name, + pkField("id", dbType("STRING(36) NOT NULL"), DataTypes.STRING().notNull()), + field("col_bool", dbType("BOOL"), DataTypes.BOOLEAN()), + field("col_bytes", dbType("BYTES(MAX)"), DataTypes.BYTES()), + field("col_date", dbType("DATE"), DataTypes.DATE()), + field("col_json", dbType("JSON"), DataTypes.STRING()), + field("col_int64", dbType("INT64"), DataTypes.BIGINT()), + field("col_numeric", dbType("NUMERIC"), DataTypes.DECIMAL(38, 9)), + field("col_float32", dbType("FLOAT32"), DataTypes.FLOAT()), + field("col_float64", dbType("FLOAT64"), DataTypes.DOUBLE()), + field("col_string", dbType("STRING(MAX)"), DataTypes.STRING()), + field("col_timestamp", dbType("TIMESTAMP"), DataTypes.TIMESTAMP())); + } + + protected static TableRow createTableAllTypesSink(String name) { + // JSON type inserts are not supported, so it is excluded. + return spannerTableRow( + name, + pkField("id", dbType("STRING(36) NOT NULL"), DataTypes.STRING().notNull()), + field("col_bool", dbType("BOOL"), DataTypes.BOOLEAN()), + field("col_bytes", dbType("BYTES(MAX)"), DataTypes.BYTES()), + field("col_date", dbType("DATE"), DataTypes.DATE()), + field("col_int64", dbType("INT64"), DataTypes.BIGINT()), + field("col_numeric", dbType("NUMERIC"), DataTypes.DECIMAL(38, 9)), + field("col_float32", dbType("FLOAT32"), DataTypes.FLOAT()), + field("col_float64", dbType("FLOAT64"), DataTypes.DOUBLE()), + field("col_string", dbType("STRING(MAX)"), DataTypes.STRING()), + field("col_timestamp", dbType("TIMESTAMP"), DataTypes.TIMESTAMP())); + } + + protected static TableRow createTableArrayTypes(String name) { + return spannerTableRow( + name, + pkField("id", dbType("STRING(36) NOT NULL"), DataTypes.STRING().notNull()), + field("col_arr_bool", dbType("ARRAY"), DataTypes.ARRAY(DataTypes.BOOLEAN())), + field( + "col_arr_bytes", + dbType("ARRAY"), + DataTypes.ARRAY(DataTypes.BYTES())), + field("col_arr_date", dbType("ARRAY"), DataTypes.ARRAY(DataTypes.DATE())), + field("col_arr_int64", dbType("ARRAY"), DataTypes.ARRAY(DataTypes.BIGINT())), + field( + "col_arr_numeric", + dbType("ARRAY"), + DataTypes.ARRAY(DataTypes.DECIMAL(38, 9))), + // INVALID_ARGUMENT: Value has type ARRAY which cannot be inserted into + // column col_arr_float32, which has type ARRAY + // field( + // "col_arr_float32", + // dbType("ARRAY"), + // DataTypes.ARRAY(DataTypes.FLOAT())), + field( + "col_arr_float64", + dbType("ARRAY"), + DataTypes.ARRAY(DataTypes.DOUBLE())), + field( + "col_arr_string", + dbType("ARRAY"), + DataTypes.ARRAY(DataTypes.STRING())), + field( + "col_arr_timestamp", + dbType("ARRAY"), + DataTypes.ARRAY(DataTypes.TIMESTAMP()))); + } + + protected static TableRow createTableWithSchema(String name) { + return spannerTableRow( + TEST_SCHEMA + "." + name, + pkField("id", dbType("STRING(36) NOT NULL"), DataTypes.STRING().notNull())); + } + + protected static TableRow createTableGroupedBy(String name) { + return spannerTableRow( + name, + pkField("id", dbType("STRING(36) NOT NULL"), DataTypes.STRING().notNull()), + field("col_int64", dbType("INT64"), DataTypes.BIGINT())); + } + + @BeforeAll + static void beforeAll() throws SQLException { + String jdbcUrl = SpannerDatabase.getMetadata().getJdbcUrl(); + baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/")); + metadata = SpannerDatabase.getMetadata(); + + Properties props = + JdbcConnectionOptions.getBriefAuthProperties( + metadata.getUsername(), metadata.getPassword()); + props.put("autoConfigEmulator", "true"); + catalog = + new SpannerCatalog( + Thread.currentThread().getContextClassLoader(), + TEST_CATALOG_NAME, + metadata.getDatabaseName(), + baseUrl, + props); + + executeQuery(TABLE_SIMPLE.getCreateQuery()); + executeQuery(TABLE_ALL_TYPES.getCreateQuery()); + executeQuery(TABLE_ALL_TYPES_SINK.getCreateQuery()); + executeQuery(TABLE_ARRAY_TYPES.getCreateQuery()); + executeQuery(TABLE_GROUPED_BY_SINK.getCreateQuery()); + executeQuery("CREATE SCHEMA IF NOT EXISTS " + TEST_SCHEMA); + executeQuery(TABLE_SIMPLE_WITH_SCHEMA.getCreateQuery()); + try (Connection conn = SpannerDatabase.getMetadata().getConnection()) { + TABLE_SIMPLE.insertIntoTableValues(conn, "'12345-abcde'"); + TABLE_SIMPLE_WITH_SCHEMA.insertIntoTableValues(conn, "'98765-fghij'"); + TABLE_ALL_TYPES.insertIntoTableValues( + conn, + "'a', true, B'abc', DATE'2025-01-01', JSON'{\"key\": \"value1\"}', 1, 3.14, 1.1, -1.1, 'foo', TIMESTAMP'2025-01-01 00:00:00Z'", + "'b', false, B'123', DATE'2025-12-31', JSON'{\"key\": \"value2\"}', -1, -3.14, -1.1, 1.1, 'foo', TIMESTAMP'2025-12-31 00:00:00Z'"); + TABLE_ARRAY_TYPES.insertIntoTableValues( + conn, + "'a', [true, false], [B'abc', B'123'], [DATE'2025-01-01', DATE'2025-12-31'], " + + "[1, -1], [NUMERIC'3.14', NUMERIC'-3.14'], [1.1, -1.1], ['foo', 'bar'], " + + "[TIMESTAMP'2025-01-01 00:00:00Z', TIMESTAMP'2025-12-31 00:00:00Z']"); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @AfterAll + static void afterAll() throws SQLException { + executeQuery("DROP TABLE IF EXISTS " + TABLE_SIMPLE.getTableName()); + executeQuery("DROP TABLE IF EXISTS " + TABLE_ALL_TYPES.getTableName()); + executeQuery("DROP TABLE IF EXISTS " + TABLE_ALL_TYPES_SINK.getTableName()); + executeQuery("DROP TABLE IF EXISTS " + TABLE_ARRAY_TYPES.getTableName()); + executeQuery("DROP TABLE IF EXISTS " + TABLE_GROUPED_BY_SINK.getTableName()); + executeQuery("DROP TABLE IF EXISTS " + TABLE_SIMPLE_WITH_SCHEMA.getTableName()); + executeQuery("DROP SCHEMA IF EXISTS " + TEST_SCHEMA); + } + + @BeforeEach + void beforeEach() { + tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + + // Use Spanner catalog. + tEnv.registerCatalog(TEST_CATALOG_NAME, catalog); + tEnv.useCatalog(TEST_CATALOG_NAME); + } + + protected static void executeQuery(String query) throws SQLException { + try (Connection conn = SpannerDatabase.getMetadata().getConnection(); + Statement statement = conn.createStatement()) { + statement.execute(query); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTablePathTest.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTablePathTest.java new file mode 100644 index 00000000..ce1d7484 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/catalog/SpannerTablePathTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.database.catalog; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link SpannerTablePath}. */ +class SpannerTablePathTest { + @Test + void testToFlinkTableName() { + assertThat(SpannerTablePath.toFlinkTableName("test_schema", "test_table")) + .isEqualTo("test_schema.test_table"); + assertThat(SpannerTablePath.toFlinkTableName("", "test_table")).isEqualTo("test_table"); + assertThat(SpannerTablePath.toFlinkTableName(null, "test_table")).isEqualTo("test_table"); + } + + @Test + void testFromFlinkTableName() { + assertThat(SpannerTablePath.fromFlinkTableName("test_schema.test_table")) + .isEqualTo(new SpannerTablePath("test_schema", "test_table")); + assertThat(SpannerTablePath.fromFlinkTableName("test_table")) + .isEqualTo(new SpannerTablePath("", "test_table")); + assertThatThrownBy( + () -> SpannerTablePath.fromFlinkTableName("test_db.test_schema.test_table")) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Table name 'test_db.test_schema.test_table' is not valid. The parsed length is 3"); + assertThatThrownBy(() -> SpannerTablePath.fromFlinkTableName("")) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage("Table name is not valid. Null or empty is not allowed"); + } +} diff --git a/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialectTest.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialectTest.java new file mode 100644 index 00000000..73684e0a --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/database/dialect/SpannerDialectTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.database.dialect; + +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectTest; +import org.apache.flink.connector.jdbc.spanner.SpannerTestBase; + +import java.util.Arrays; +import java.util.List; + +/** The Spanner params for {@link JdbcDialectTest}. */ +class SpannerDialectTest extends JdbcDialectTest implements SpannerTestBase { + + @Override + protected List testData() { + return Arrays.asList( + createTestItem("CHAR"), + createTestItem("VARCHAR"), + createTestItem("BOOLEAN"), + createTestItem("TINYINT"), + createTestItem("SMALLINT"), + createTestItem("INTEGER"), + createTestItem("BIGINT"), + createTestItem("FLOAT"), + createTestItem("DOUBLE"), + createTestItem("DECIMAL(38, 9)"), + createTestItem("DATE"), + createTestItem("TIMESTAMP(3)"), + createTestItem("TIMESTAMP WITHOUT TIME ZONE"), + createTestItem("TIMESTAMP(9) WITHOUT TIME ZONE"), + createTestItem("VARBINARY"), + createTestItem("ARRAY"), + + // Not valid data + createTestItem("TIME", "The Spanner dialect doesn't support type: TIME(0)."), + createTestItem("BINARY", "The Spanner dialect doesn't support type: BINARY(1)."), + createTestItem( + "VARBINARY(10)", + "The Spanner dialect doesn't support type: VARBINARY(10)."), + createTestItem("TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)")); + } +} diff --git a/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/table/SpannerDynamicTableSinkITCase.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/table/SpannerDynamicTableSinkITCase.java new file mode 100644 index 00000000..174852b4 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/table/SpannerDynamicTableSinkITCase.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.table; + +import org.apache.flink.connector.jdbc.core.table.sink.JdbcDynamicTableSinkITCase; +import org.apache.flink.connector.jdbc.spanner.SpannerTestBase; +import org.apache.flink.connector.jdbc.spanner.database.dialect.SpannerDialect; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.connector.jdbc.spanner.testutils.tables.SpannerTableRow.spannerTableRow; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; +import static org.assertj.core.api.Assertions.assertThat; + +/** The Table Sink ITCase for {@link SpannerDialect}. */ +class SpannerDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase implements SpannerTestBase { + + @Override + protected TableRow createUpsertOutputTable() { + return spannerTableRow( + "dynamicSinkForUpsert", + pkField("cnt", dbType("INT64 NOT NULL"), DataTypes.BIGINT().notNull()), + field("lencnt", dbType("INT64 NOT NULL"), DataTypes.BIGINT().notNull()), + pkField("cTag", dbType("INT64 NOT NULL"), DataTypes.INT().notNull()), + field("ts", dbType("TIMESTAMP"), DataTypes.TIMESTAMP())); + } + + @Override + protected TableRow createAppendOutputTable() { + return spannerTableRow( + "dynamicSinkForAppend", + // Spanner requires primary keys. + pkField("id", dbType("INT64 NOT NULL"), DataTypes.INT().notNull()), + field("num", dbType("INT64 NOT NULL"), DataTypes.BIGINT().notNull()), + field("ts", dbType("TIMESTAMP"), DataTypes.TIMESTAMP())); + } + + protected TableRow createBatchOutputTable() { + return spannerTableRow( + "dynamicSinkForBatch", + // Spanner requires primary keys. + pkField("UUID", dbType("STRING(36) NOT NULL"), DataTypes.STRING().notNull()), + field("NAME", dbType("STRING(20)"), DataTypes.STRING()), + field("SCORE", dbType("INT64 NOT NULL"), DataTypes.BIGINT().notNull())); + } + + protected TableRow createRealOutputTable() { + return spannerTableRow( + "REAL_TABLE", + // Spanner requires primary keys. + pkField("uuid", dbType("STRING(36) NOT NULL"), DataTypes.STRING().notNull()), + field("real_data", dbType("FLOAT32"), DataTypes.FLOAT())); + } + + protected TableRow createCheckpointOutputTable() { + return spannerTableRow( + "checkpointTable", + pkField("id", dbType("INT64 NOT NULL"), DataTypes.BIGINT().notNull())); + } + + protected TableRow createUserOutputTable() { + return spannerTableRow( + "USER_TABLE", + pkField("user_id", dbType("STRING(20) NOT NULL"), DataTypes.VARCHAR(20).notNull()), + field("user_name", dbType("STRING(20) NOT NULL"), DataTypes.VARCHAR(20).notNull()), + field("email", dbType("STRING(255)"), DataTypes.VARCHAR(255)), + field("balance", dbType("NUMERIC"), DataTypes.DECIMAL(18, 2)), + field("balance2", dbType("NUMERIC"), DataTypes.DECIMAL(18, 2))); + } + + @Override + protected List testUserData() { + // NUMERIC in GoogleSQL is a fixed precision numeric type (precision=38 and scale=9) + // and cannot be used to store arbitrary precision numeric data. + // https://cloud.google.com/spanner/docs/storing-numeric-data + return Arrays.asList( + Row.of( + "user1", + "Tom", + "tom123@gmail.com", + new BigDecimal("8.1"), + new BigDecimal("16.2")), + Row.of( + "user3", + "Bailey", + "bailey@qq.com", + new BigDecimal("9.99"), + new BigDecimal("19.98")), + Row.of( + "user4", + "Tina", + "tina@gmail.com", + new BigDecimal("11.3"), + new BigDecimal("22.6"))); + } + + @Override + protected List testData() { + // This is probably due to the implementation of the JDBC driver (it uses the Calendar class + // internally), + // but when using 1970-01-01, there is a possibility of a 1-second error occurring + // if the time zone of the execution environment is not UTC. + return Arrays.asList( + Row.of(1, 1L, "Hi", Timestamp.valueOf("1970-01-02 00:00:00.001")), + Row.of(2, 2L, "Hello", Timestamp.valueOf("1970-01-02 00:00:00.002")), + Row.of(3, 2L, "Hello world", Timestamp.valueOf("1970-01-02 00:00:00.003")), + Row.of( + 4, + 3L, + "Hello world, how are you?", + Timestamp.valueOf("1970-01-02 00:00:00.004")), + Row.of(5, 3L, "I am fine.", Timestamp.valueOf("1970-01-02 00:00:00.005")), + Row.of(6, 3L, "Luke Skywalker", Timestamp.valueOf("1970-01-02 00:00:00.006")), + Row.of(7, 4L, "Comment#1", Timestamp.valueOf("1970-01-02 00:00:00.007")), + Row.of(8, 4L, "Comment#2", Timestamp.valueOf("1970-01-02 00:00:00.008")), + Row.of(9, 4L, "Comment#3", Timestamp.valueOf("1970-01-02 00:00:00.009")), + Row.of(10, 4L, "Comment#4", Timestamp.valueOf("1970-01-02 00:00:00.010")), + Row.of(11, 5L, "Comment#5", Timestamp.valueOf("1970-01-02 00:00:00.011")), + Row.of(12, 5L, "Comment#6", Timestamp.valueOf("1970-01-02 00:00:00.012")), + Row.of(13, 5L, "Comment#7", Timestamp.valueOf("1970-01-02 00:00:00.013")), + Row.of(14, 5L, "Comment#8", Timestamp.valueOf("1970-01-02 00:00:00.014")), + Row.of(15, 5L, "Comment#9", Timestamp.valueOf("1970-01-02 00:00:00.015")), + Row.of(16, 6L, "Comment#10", Timestamp.valueOf("1970-01-02 00:00:00.016")), + Row.of(17, 6L, "Comment#11", Timestamp.valueOf("1970-01-02 00:00:00.017")), + Row.of(18, 6L, "Comment#12", Timestamp.valueOf("1970-01-02 00:00:00.018")), + Row.of(19, 6L, "Comment#13", Timestamp.valueOf("1970-01-02 00:00:00.019")), + Row.of(20, 6L, "Comment#14", Timestamp.valueOf("1970-01-02 00:00:00.020")), + Row.of(21, 6L, "Comment#15", Timestamp.valueOf("1970-01-02 00:00:00.021"))); + } + + @Test + @Override + protected void testReal() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + StreamTableEnvironment tEnv = + StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode()); + + String tableName = "realSink"; + tEnv.executeSql(realOutputTable.getCreateQueryForFlink(getMetadata(), tableName)); + + tEnv.executeSql(String.format("INSERT INTO %s SELECT 'a', CAST(1.0 as FLOAT)", tableName)) + .await(); + + assertThat(realOutputTable.selectAllTable(getMetadata())) + .containsExactly(Row.of("a", 1.0f)); + } + + @Test + @Override + protected void testBatchSink() throws Exception { + TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); + + String tableName = "batchSink"; + tEnv.executeSql( + batchOutputTable.getCreateQueryForFlink( + getMetadata(), + tableName, + Arrays.asList( + "'sink.buffer-flush.max-rows' = '2'", + "'sink.buffer-flush.interval' = '300ms'", + "'sink.max-retries' = '4'"))); + + TableResult tableResult = + tEnv.executeSql( + String.format( + "INSERT INTO %s " + + " SELECT uuid, user_name, score " + + " FROM (VALUES " + + "('a', 1, 'Bob'), " + + "('b', 22, 'Tom'), " + + "('c', 42, 'Kim'), " + + "('d', 42, 'Kim'), " + + "('e', 1, 'Bob')) " + + " AS UserCountTable(uuid, score, user_name) ", + tableName)); + tableResult.await(); + + assertThat(batchOutputTable.selectAllTable(getMetadata())) + .containsExactlyInAnyOrder( + Row.of("a", "Bob", 1L), + Row.of("b", "Tom", 22L), + Row.of("c", "Kim", 42L), + Row.of("d", "Kim", 42L), + Row.of("e", "Bob", 1L)); + } +} diff --git a/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/table/SpannerDynamicTableSourceITCase.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/table/SpannerDynamicTableSourceITCase.java new file mode 100644 index 00000000..7ce0bd64 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/table/SpannerDynamicTableSourceITCase.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.table; + +import org.apache.flink.connector.jdbc.core.table.source.JdbcDynamicTableSourceITCase; +import org.apache.flink.connector.jdbc.spanner.SpannerTestBase; +import org.apache.flink.connector.jdbc.spanner.database.dialect.SpannerDialect; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.types.Row; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.connector.jdbc.spanner.testutils.tables.SpannerTableRow.spannerTableRow; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; + +/** The Table Source ITCase for {@link SpannerDialect}. */ +class SpannerDynamicTableSourceITCase extends JdbcDynamicTableSourceITCase + implements SpannerTestBase { + + @Override + protected TableRow createInputTable() { + return spannerTableRow( + "jdbDynamicTableSource", + pkField("id", dbType("INT64 NOT NULL"), DataTypes.BIGINT().notNull()), + field("decimal_col", dbType("NUMERIC"), DataTypes.DECIMAL(10, 4)), + field("timestamp6_col", dbType("TIMESTAMP"), DataTypes.TIMESTAMP(6)), + // other fields + field("float_col", dbType("FLOAT32"), DataTypes.FLOAT()), + field("double_col", dbType("FLOAT64"), DataTypes.DOUBLE()), + field("timestamp9_col", dbType("TIMESTAMP"), DataTypes.TIMESTAMP(9))); + } + + @Override + protected List getTestData() { + return Arrays.asList( + Row.of( + 1L, + BigDecimal.valueOf(100.1234), + LocalDateTime.parse("2020-01-01T15:35:00.123456"), + 1.175E-37F, + 1.79769E308D, + LocalDateTime.parse("2020-01-01T15:35:00.123456789")), + Row.of( + 2L, + BigDecimal.valueOf(101.1234), + LocalDateTime.parse("2020-01-01T15:36:01.123456"), + -1.175E-37F, + -1.79769E308, + LocalDateTime.parse("2020-01-01T15:36:01.123456789"))); + } +} diff --git a/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerDatabase.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerDatabase.java new file mode 100644 index 00000000..3b2c8e61 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerDatabase.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.testutils; + +import org.apache.flink.connector.jdbc.testutils.DatabaseExtension; +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; +import org.apache.flink.connector.jdbc.testutils.DatabaseResource; +import org.apache.flink.connector.jdbc.testutils.resources.DockerResource; +import org.apache.flink.util.FlinkRuntimeException; + +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +/** A Spanner database for testing. */ +public class SpannerDatabase extends DatabaseExtension implements SpannerImages { + + private static final SpannerEmulatorJdbcContainer CONTAINER = + new SpannerEmulatorJdbcContainer(SPANNER_EMULATOR_1_5); + + private static SpannerMetadata metadata; + + public static SpannerMetadata getMetadata() { + if (!CONTAINER.isRunning()) { + throw new FlinkRuntimeException("Container is stopped."); + } + if (metadata == null) { + metadata = new SpannerMetadata(CONTAINER); + } + return metadata; + } + + @Override + protected DatabaseMetadata getMetadataDB() { + return getMetadata(); + } + + @Override + protected DatabaseResource getResource() { + return new DockerResource(CONTAINER); + } + + public static class SpannerEmulatorJdbcContainer + extends JdbcDatabaseContainer { + + private static final int GRPC_PORT = 9010; + private static final int HTTP_PORT = 9020; + private static final DockerImageName DEFAULT_IMAGE_NAME = + DockerImageName.parse("gcr.io/cloud-spanner-emulator/emulator"); + + private static final String DEFAULT_PROJECT = "test-project"; + private static final String DEFAULT_INSTANCE = "test-instance"; + private static final String DEFAULT_DATABASE = "test-database"; + + private String project; + private String instance; + private String database; + + public SpannerEmulatorJdbcContainer(String image) { + this(DockerImageName.parse(image)); + } + + public SpannerEmulatorJdbcContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + this.project = DEFAULT_PROJECT; + this.instance = DEFAULT_INSTANCE; + this.database = DEFAULT_DATABASE; + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); + addExposedPorts(GRPC_PORT, HTTP_PORT); + setWaitStrategy(Wait.forLogMessage(".*Cloud Spanner emulator running\\..*", 1)); + } + + public String getEmulatorGrpcEndpoint() { + return getHost() + ":" + getMappedPort(GRPC_PORT); + } + + public String getEmulatorHttpEndpoint() { + return getHost() + ":" + getMappedPort(HTTP_PORT); + } + + @Override + public String getDriverClassName() { + return "com.google.cloud.spanner.jdbc.JdbcDriver"; + } + + @Override + public String getJdbcUrl() { + return "jdbc:cloudspanner://" + + getEmulatorGrpcEndpoint() + + "/projects/" + + getProject() + + "/instances/" + + getInstance() + + "/databases/" + + getDatabaseName() + + ";autoConfigEmulator=true"; + } + + public String getProject() { + return project; + } + + public SpannerEmulatorJdbcContainer withProject(String project) { + this.project = project; + return self(); + } + + public String getInstance() { + return instance; + } + + public SpannerEmulatorJdbcContainer withInstance(String instance) { + this.instance = instance; + return self(); + } + + @Override + public String getDatabaseName() { + return database; + } + + @Override + public SpannerEmulatorJdbcContainer withDatabaseName(String dbName) { + this.database = dbName; + return self(); + } + + @Override + public String getUsername() { + // Spanner does not support password authentication. + return ""; + } + + @Override + public String getPassword() { + // Spanner does not support password authentication. + return ""; + } + + @Override + protected String getTestQueryString() { + return "SELECT 1"; + } + } +} diff --git a/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerImages.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerImages.java new file mode 100644 index 00000000..e8b5c5ae --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerImages.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.testutils; + +/** Spanner docker images. */ +public interface SpannerImages { + String SPANNER_EMULATOR_1_5 = "gcr.io/cloud-spanner-emulator/emulator:1.5.28"; +} diff --git a/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerMetadata.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerMetadata.java new file mode 100644 index 00000000..2c33bf16 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/SpannerMetadata.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.spanner.testutils; + +import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata; + +import org.testcontainers.containers.JdbcDatabaseContainer; + +import javax.sql.XADataSource; + +/** Spanner Metadata. */ +public class SpannerMetadata implements DatabaseMetadata { + + private final String username; + private final String password; + private final String url; + private final String driver; + private final String version; + private final String database; + + public SpannerMetadata(JdbcDatabaseContainer container) { + this.username = container.getUsername(); + this.password = container.getPassword(); + this.url = container.getJdbcUrl(); + this.driver = container.getDriverClassName(); + this.version = container.getDockerImageName(); + this.database = container.getDatabaseName(); + } + + @Override + public String getJdbcUrl() { + return this.url; + } + + @Override + public String getJdbcUrlWithCredentials() { + // Spanner does not support password authentication. + return getJdbcUrl(); + } + + @Override + public String getUsername() { + return this.username; + } + + @Override + public String getPassword() { + return this.password; + } + + @Override + public XADataSource buildXaDataSource() { + throw new UnsupportedOperationException(); + } + + @Override + public String getDriverClass() { + return this.driver; + } + + @Override + public String getVersion() { + return version; + } + + public String getDatabaseName() { + return database; + } +} diff --git a/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/tables/SpannerTableRow.java b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/tables/SpannerTableRow.java new file mode 100644 index 00000000..79b7f85b --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/java/org/apache/flink/connector/jdbc/spanner/testutils/tables/SpannerTableRow.java @@ -0,0 +1,41 @@ +package org.apache.flink.connector.jdbc.spanner.testutils.tables; + +import org.apache.flink.connector.jdbc.testutils.tables.TableField; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; + +import java.util.stream.Collectors; + +public class SpannerTableRow extends TableRow { + + public static SpannerTableRow spannerTableRow(String name, TableField... fields) { + return new SpannerTableRow(name, fields); + } + + protected SpannerTableRow(String name, TableField[] fields) { + super(name, fields); + } + + @Override + public String getCreateQuery() { + String pkFields = + getStreamFields() + .filter(TableField::isPkField) + .map(TableField::getName) + .collect(Collectors.joining(", ")); + return String.format( + "CREATE TABLE IF NOT EXISTS %s (%s) %s", + name, + getStreamFields().map(TableField::asString).collect(Collectors.joining(", ")), + pkFields.isEmpty() ? "" : String.format("PRIMARY KEY (%s)", pkFields)); + } + + @Override + protected String getDeleteFromQuery() { + return String.format("DELETE FROM %s WHERE true", name); + } + + @Override + public String getDropTableQuery() { + return String.format("DROP TABLE IF EXISTS %s", name); + } +} diff --git a/flink-connector-jdbc-spanner/src/test/resources/log4j2-test.properties b/flink-connector-jdbc-spanner/src/test/resources/log4j2-test.properties new file mode 100644 index 00000000..835c2ec9 --- /dev/null +++ b/flink-connector-jdbc-spanner/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/pom.xml b/pom.xml index f1d63f66..680211b6 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ under the License. flink-connector-jdbc-oceanbase flink-connector-jdbc-oracle flink-connector-jdbc-postgres + flink-connector-jdbc-spanner flink-connector-jdbc-sqlserver flink-connector-jdbc-trino @@ -452,4 +453,4 @@ under the License. -
\ No newline at end of file +