diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml index c4d8f0283bc6..33aadf4da3da 100644 --- a/paimon-core/pom.xml +++ b/paimon-core/pom.xml @@ -233,6 +233,20 @@ under the License. test + + org.testcontainers + postgresql + ${testcontainers.version} + test + + + + org.postgresql + postgresql + 42.7.3 + + + diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java index 698092e05a76..e0c167a4df05 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java @@ -26,6 +26,8 @@ static JdbcDistributedLockDialect create(String protocol) { return new SqlLiteDistributedLockDialect(); case MYSQL: return new MysqlDistributedLockDialect(); + case POSTGRESQL: + return new PostgresqlDistributedLockDialect(); default: throw new UnsupportedOperationException( String.format("Distributed locks based on %s are not supported", protocol)); @@ -36,6 +38,7 @@ static JdbcDistributedLockDialect create(String protocol) { enum JdbcProtocol { SQLITE, MARIADB, - MYSQL + MYSQL, + POSTGRESQL } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/PostgresqlDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/PostgresqlDistributedLockDialect.java new file mode 100644 index 000000000000..93f5f350ddef --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/PostgresqlDistributedLockDialect.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +/** Distributed lock implementation based on postgres table. */ +public class PostgresqlDistributedLockDialect extends AbstractDistributedLockDialect { + + @Override + public String getCreateTableSql() { + return "CREATE TABLE " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + "(" + + JdbcUtils.LOCK_ID + + " VARCHAR(%s) NOT NULL," + + JdbcUtils.ACQUIRED_AT + + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL," + + JdbcUtils.EXPIRE_TIME + + " BIGINT DEFAULT 0 NOT NULL," + + "PRIMARY KEY (" + + JdbcUtils.LOCK_ID + + ")" + + ")"; + } + + @Override + public String getLockAcquireSql() { + return "INSERT INTO " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " (" + + JdbcUtils.LOCK_ID + + "," + + JdbcUtils.EXPIRE_TIME + + ") VALUES (?,?)"; + } + + @Override + public String getReleaseLockSql() { + return "DELETE FROM " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " WHERE " + + JdbcUtils.LOCK_ID + + " = ?"; + } + + @Override + public String getTryReleaseTimedOutLock() { + return "DELETE FROM " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " WHERE EXTRACT(EPOCH FROM AGE(NOW(), " + + JdbcUtils.ACQUIRED_AT + + ")) >" + + JdbcUtils.EXPIRE_TIME + + " and " + + JdbcUtils.LOCK_ID + + " = ?"; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/PostgresqlCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/PostgresqlCatalogTest.java new file mode 100644 index 000000000000..b6b3e28b8c40 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/PostgresqlCatalogTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.sql.SQLException; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Tests for {@link JdbcCatalog} with Postgres. */ +public class PostgresqlCatalogTest { + private static final Logger LOG = LoggerFactory.getLogger(PostgresqlCatalogTest.class); + + public static final String DEFAULT_DB = "postgres"; + + private static final String USER = "paimonuser"; + private static final String PASSWORD = "paimonpw"; + + @TempDir java.nio.file.Path tempFile; + protected String warehouse; + protected FileIO fileIO; + protected Catalog catalog; + + protected static final PostgreSQLContainer POSTGRES_CONTAINER = + new PostgreSQLContainer<>("postgres:13-alpine") + .withDatabaseName(DEFAULT_DB) + .withUsername(USER) + .withPassword(PASSWORD) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @BeforeAll + protected static void start() { + LOG.info("Starting containers..."); + POSTGRES_CONTAINER.start(); + LOG.info("Containers are started."); + } + + @AfterAll + public static void stopContainers() { + LOG.info("Stopping containers..."); + POSTGRES_CONTAINER.stop(); + LOG.info("Containers are stopped."); + } + + @BeforeEach + public void setUp() throws Exception { + warehouse = tempFile.toUri().toString(); + Options catalogOptions = new Options(); + catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); + CatalogContext catalogContext = CatalogContext.create(catalogOptions); + fileIO = FileIO.get(new Path(warehouse), catalogContext); + catalog = initCatalog(Maps.newHashMap()); + } + + private JdbcCatalog initCatalog(Map props) { + LOG.info("Init catalog {}", POSTGRES_CONTAINER.getJdbcUrl()); + + Map properties = Maps.newHashMap(); + properties.put(CatalogOptions.URI.key(), POSTGRES_CONTAINER.getJdbcUrl()); + + properties.put(JdbcCatalog.PROPERTY_PREFIX + "user", USER); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", PASSWORD); + properties.put(CatalogOptions.WAREHOUSE.key(), warehouse); + properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc"); + properties.putAll(props); + JdbcCatalog catalog = + new JdbcCatalog( + fileIO, + "test-jdbc-postgres-catalog", + Options.fromMap(properties), + warehouse); + assertThat(catalog.warehouse()).isEqualTo(warehouse); + return catalog; + } + + @Test + public void testAcquireLockFail() throws SQLException, InterruptedException { + String lockId = "jdbc.testDb.testTable"; + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 3000)) + .isTrue(); + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 3000)) + .isFalse(); + JdbcUtils.release(((JdbcCatalog) catalog).getConnections(), lockId); + } + + @Test + public void testCleanTimeoutLockAndAcquireLock() throws SQLException, InterruptedException { + String lockId = "jdbc.testDb.testTable"; + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 1000)) + .isTrue(); + Thread.sleep(2000); + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 1000)) + .isTrue(); + JdbcUtils.release(((JdbcCatalog) catalog).getConnections(), lockId); + } +}