Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ public <R> R run(Action<R, C, E> action) throws E, InterruptedException {
client = ensureActiveClient(client);
return action.run(client);
} finally {
clients.addFirst(client);
if (this.clients != null) {
clients.addFirst(client);
} else {
close(client);
}
}
}
}
Expand All @@ -93,6 +97,10 @@ public void execute(ExecuteAction<C, E> action) throws E, InterruptedException {

protected abstract void close(C client);

public boolean isClosed() {
return this.clients == null;
}

@Override
public void close() {
LinkedBlockingDeque<C> clients = this.clients;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private void lock(String lockUniqueName) throws SQLException, InterruptedExcepti

@Override
public void close() throws IOException {
// Do nothing
connections.close();
}

public static long checkMaxSleep(Map<String, String> conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Options options() {
}

public JdbcClientPool connections() {
if (connections == null) {
if (connections == null || connections.isClosed()) {
connections =
new JdbcClientPool(
options.get(CatalogOptions.CLIENT_POOL_SIZE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@

package org.apache.paimon.jdbc;

import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link JdbcClientPool} connection validation. */
public class JdbcClientPoolTest {
Expand Down Expand Up @@ -149,4 +157,89 @@ public void testActionIsExecutedOnValidConnection() throws SQLException, Interru
pool.close();
}
}

@Test
public void testIsClosedReportsCorrectly() {
JdbcClientPool pool = createPool(1);
assertThat(pool.isClosed()).isFalse();
pool.close();
assertThat(pool.isClosed()).isTrue();
}

@Test
public void testConnectionClosedWhenPoolClosedDuringAction() throws Exception {
JdbcClientPool pool = createPool(1);
AtomicReference<Connection> connRef = new AtomicReference<>();
CountDownLatch actionStarted = new CountDownLatch(1);
CountDownLatch poolClosed = new CountDownLatch(1);

ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Future<?> future =
executor.submit(
() -> {
try {
pool.run(
connection -> {
connRef.set(connection);
actionStarted.countDown();
try {
poolClosed.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
});
} catch (Exception e) {
// expected
}
});

actionStarted.await();
pool.close();
poolClosed.countDown();
future.get();

assertThat(connRef.get().isClosed()).isTrue();
} finally {
executor.shutdown();
}
}

@Test
public void testPoolThrowsAfterClose() {
JdbcClientPool pool = createPool(1);
pool.close();

assertThatThrownBy(
() ->
pool.run(
connection -> {
return null;
}))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("closed pool");
}

@Test
public void testLockContextRecreatesPoolAfterClose() {
String dbUrl =
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "");
Options options = new Options();
options.set(CatalogOptions.CLIENT_POOL_SIZE, 1);
options.set(CatalogOptions.URI.key(), dbUrl);

JdbcCatalogLockContext context = new JdbcCatalogLockContext("test-catalog", options);

JdbcClientPool firstPool = context.connections();
assertThat(firstPool.isClosed()).isFalse();

firstPool.close();
assertThat(firstPool.isClosed()).isTrue();

JdbcClientPool secondPool = context.connections();
assertThat(secondPool).isNotSameAs(firstPool);
assertThat(secondPool.isClosed()).isFalse();
secondPool.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ public void open(Configuration conf) throws Exception {
this.targetCatalog = createPaimonCatalog(Options.fromMap(targetCatalogConfig));
}

@Override
public void close() throws Exception {
super.close();
if (sourceCatalog != null) {
sourceCatalog.close();
sourceCatalog = null;
}
if (targetCatalog != null) {
targetCatalog.close();
targetCatalog = null;
}
}

@Override
public void processElement(
CloneSplitInfo cloneSplitInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ public void open(Configuration conf) throws Exception {
this.targetCatalog = createPaimonCatalog(Options.fromMap(targetCatalogConfig));
}

@Override
public void close() throws Exception {
super.close();
if (sourceCatalog != null) {
sourceCatalog.close();
sourceCatalog = null;
}
if (targetCatalog != null) {
targetCatalog.close();
targetCatalog = null;
}
}

@Override
public void processElement(
CloneSchemaInfo cloneSchemaInfo,
Expand Down