diff --git a/it/common/src/main/java/org/apache/beam/it/common/PipelineOperator.java b/it/common/src/main/java/org/apache/beam/it/common/PipelineOperator.java index cbb2e03cee76..df1028b373e5 100644 --- a/it/common/src/main/java/org/apache/beam/it/common/PipelineOperator.java +++ b/it/common/src/main/java/org/apache/beam/it/common/PipelineOperator.java @@ -202,7 +202,7 @@ private static Result finishOrTimeout( LOG.warn("Error happened when checking for condition", e); } - LOG.info("Condition was not met yet. Checking if job is finished."); + LOG.debug("Condition was not met yet. Checking if job is finished."); if (launchFinished) { LOG.info("Launch was finished, stop checking."); return Result.LAUNCH_FINISHED; @@ -212,11 +212,15 @@ private static Result finishOrTimeout( LOG.info("Detected that launch was finished, checking conditions once more."); launchFinished = true; } else { - LOG.info( - "Job not finished and conditions not met. Will check again in {} seconds (total wait: {}s of max {}s)", - config.checkAfter().getSeconds(), - Duration.between(start, Instant.now()).getSeconds(), - config.timeoutAfter().getSeconds()); + long checkSec = config.checkAfter().getSeconds(); + long waitSec = Duration.between(start, Instant.now()).getSeconds(); + if (checkSec > 0 && (waitSec / checkSec) % 5 == 0) { // reduce log spam + LOG.info( + "Job not finished and conditions not met. Will check again in {} seconds (total wait: {}s of max {}s)", + checkSec, + waitSec, + config.timeoutAfter().getSeconds()); + } } try { Thread.sleep(config.checkAfter().toMillis()); diff --git a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java index 7813c05699c6..a8ee4053fc54 100644 --- a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java +++ b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/Neo4jResourceManager.java @@ -21,9 +21,11 @@ import static org.apache.beam.it.neo4j.Neo4jResourceManagerUtils.generateDatabaseName; import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.beam.it.common.ResourceManager; import org.apache.beam.it.testcontainers.TestContainerResourceManager; import org.checkerframework.checker.nullness.qual.Nullable; @@ -60,7 +62,8 @@ public class Neo4jResourceManager extends TestContainerResourceManager newDataBases = new ArrayList<>(); private final DatabaseWaitOption waitOption; private final String connectionString; private final boolean usingStaticDatabase; @@ -95,9 +98,8 @@ private Neo4jResourceManager(Builder builder) { this.databaseName = builder.databaseName; this.waitOption = null; } else { - this.databaseName = generateDatabaseName(builder.testId); + this.databaseName = null; this.waitOption = builder.waitOption; - createDatabase(databaseName, waitOption); } } @@ -110,11 +112,12 @@ public synchronized String getUri() { return connectionString; } - public List> run(String query) { - return this.run(query, Collections.emptyMap()); + public List> run(String query, String databaseName) { + return this.run(query, databaseName, Collections.emptyMap()); } - public List> run(String query, Map parameters) { + public List> run( + String query, String databaseName, Map parameters) { try (Session session = neo4jDriver.session(SessionConfig.builder().withDatabase(databaseName).build())) { return session.run(query, parameters).list(record -> record.asMap()); @@ -128,7 +131,7 @@ public List> run(String query, Map parameter * * @return the name of the Neo4j Database. */ - public synchronized String getDatabaseName() { + public synchronized @Nullable String getDatabaseName() { return databaseName; } @@ -140,11 +143,11 @@ public synchronized void cleanupAll() { // First, delete the database if it was not given as a static argument try { - if (!usingStaticDatabase) { - dropDatabase(databaseName, waitOption); + if (!newDataBases.isEmpty()) { + dropTestDatabases(waitOption); } } catch (Exception e) { - LOG.error("Failed to delete Neo4j database {}.", databaseName, e); + LOG.error("Failed to delete Neo4j databases {}.", newDataBases, e); producedError = true; } @@ -167,28 +170,34 @@ public synchronized void cleanupAll() { LOG.info("Neo4j manager successfully cleaned up."); } - private void createDatabase(String databaseName, DatabaseWaitOption waitOption) { + public String createTestDatabase() { + String newDatabaseName = + generateDatabaseName("test" + UUID.randomUUID().toString().substring(0, 4)); try (Session session = neo4jDriver.session(SessionConfig.builder().withDatabase("system").build())) { String query = String.format("CREATE DATABASE $db %s", DatabaseWaitOptions.asCypher(waitOption)); - session.run(query, Collections.singletonMap("db", databaseName)).consume(); + session.run(query, Collections.singletonMap("db", newDatabaseName)).consume(); } catch (Exception e) { throw new Neo4jResourceManagerException( - String.format("Error dropping database %s.", databaseName), e); + String.format("Error dropping database %s.", newDatabaseName), e); } + newDataBases.add(newDatabaseName); + return newDatabaseName; } @VisibleForTesting - void dropDatabase(String databaseName, DatabaseWaitOption waitOption) { + void dropTestDatabases(DatabaseWaitOption waitOption) { try (Session session = neo4jDriver.session(SessionConfig.builder().withDatabase("system").build())) { String query = String.format("DROP DATABASE $db %s", DatabaseWaitOptions.asCypher(waitOption)); - session.run(query, Collections.singletonMap("db", databaseName)).consume(); + for (String databaseName : newDataBases) { + session.run(query, Collections.singletonMap("db", databaseName)).consume(); + } } catch (Exception e) { throw new Neo4jResourceManagerException( - String.format("Error dropping database %s.", databaseName), e); + String.format("Error dropping database %s.", newDataBases), e); } } diff --git a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java index 80f757aaf1c3..d5f593ab36d0 100644 --- a/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java +++ b/it/neo4j/src/main/java/org/apache/beam/it/neo4j/conditions/Neo4jQueryCheck.java @@ -34,6 +34,8 @@ public abstract class Neo4jQueryCheck extends ConditionCheck { abstract List> expectedResult(); + abstract String databaseName(); + abstract String query(); abstract @Nullable Map parameters(); @@ -49,9 +51,9 @@ public String getDescription() { protected CheckResult check() { List> actualResult; if (parameters() != null) { - actualResult = resourceManager().run(query(), parameters()); + actualResult = resourceManager().run(query(), databaseName(), parameters()); } else { - actualResult = resourceManager().run(query()); + actualResult = resourceManager().run(query(), databaseName()); } List> expectedResult = expectedResult(); if (actualResult == null) { @@ -80,6 +82,8 @@ public abstract static class Builder { public abstract Builder setResourceManager(Neo4jResourceManager resourceManager); + public abstract Builder setDatabaseName(String databaseName); + public abstract Builder setQuery(String query); public abstract Builder setParameters(Map parameters); diff --git a/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerIT.java b/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerIT.java index 0d3b8050611b..db6a8fa0d4c4 100644 --- a/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerIT.java +++ b/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerIT.java @@ -37,23 +37,27 @@ public class Neo4jResourceManagerIT { private Neo4jResourceManager neo4jResourceManager; + private static final String STATIC_DATABASE_NAME = "neo4j"; @Before public void setUp() { neo4jResourceManager = Neo4jResourceManager.builder("placeholder") - .setDatabaseName("neo4j", DatabaseWaitOptions.waitDatabase()) + .setDatabaseName(STATIC_DATABASE_NAME, DatabaseWaitOptions.waitDatabase()) .setAdminPassword("password") .build(); } @Test public void testResourceManagerE2E() { + neo4jResourceManager.run( - "CREATE (:Hello {whom: $whom})", Collections.singletonMap("whom", "world")); + "CREATE (:Hello {whom: $whom})", + STATIC_DATABASE_NAME, + Collections.singletonMap("whom", "world")); List> results = - neo4jResourceManager.run("MATCH (h:Hello) RETURN h.whom AS whom"); + neo4jResourceManager.run("MATCH (h:Hello) RETURN h.whom AS whom", STATIC_DATABASE_NAME); assertThat(results).hasSize(1); assertThat(results) diff --git a/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java b/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java index 64bd06261f3a..49d9c7ec2322 100644 --- a/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java +++ b/it/neo4j/src/test/java/org/apache/beam/it/neo4j/Neo4jResourceManagerTest.java @@ -96,7 +96,7 @@ public void testDatabaseIsCreatedWithNoWaitOptions() { Neo4jResourceManager.builder(TEST_ID) .setDatabaseName(STATIC_DATABASE_NAME, DatabaseWaitOptions.noWaitDatabase()); new Neo4jResourceManager(neo4jDriver, container, builder); - + String unused = testManager.createTestDatabase(); verify(session).run(and(startsWith("CREATE DATABASE"), endsWith("NOWAIT")), anyMap()); } @@ -107,35 +107,40 @@ public void testGetUriShouldReturnCorrectValue() { @Test public void testGetDatabaseNameShouldReturnCorrectValue() { - assertThat(testManager.getDatabaseName()).matches(TEST_ID + "-\\d{8}-\\d{6}-\\d{6}"); + String databaseName = testManager.createTestDatabase(); + assertThat(databaseName).matches("test[0-9a-f]{4}-\\d{8}-\\d{6}-\\d{6}"); } @Test public void testDropDatabaseShouldThrowErrorIfDriverFailsToRunQuery() { + String unused = testManager.createTestDatabase(); doThrow(ClientException.class).when(session).run(anyString(), anyMap()); assertThrows( Neo4jResourceManagerException.class, - () -> testManager.dropDatabase(STATIC_DATABASE_NAME, DatabaseWaitOptions.noWaitDatabase())); + () -> testManager.dropTestDatabases(DatabaseWaitOptions.noWaitDatabase())); } @Test public void testRunShouldThrowErrorIfDriverFailsToRunParameterlessQuery() { + String databaseName = testManager.createTestDatabase(); doThrow(ClientException.class).when(session).run(anyString(), anyMap()); - assertThrows( - Neo4jResourceManagerException.class, () -> testManager.run("MATCH (n) RETURN n LIMIT 1")); + Neo4jResourceManagerException.class, + () -> testManager.run(databaseName, "MATCH (n) RETURN n LIMIT 1")); } @Test public void testRunShouldThrowErrorIfDriverFailsToRunQuery() { + String databaseName = testManager.createTestDatabase(); doThrow(ClientException.class).when(session).run(anyString(), anyMap()); - assertThrows( Neo4jResourceManagerException.class, () -> testManager.run( - "MATCH (n) WHERE n < $val RETURN n LIMIT 1", Collections.singletonMap("val", 2))); + "MATCH (n) WHERE n < $val RETURN n LIMIT 1", + databaseName, + Collections.singletonMap("val", 2))); } @Test @@ -152,6 +157,7 @@ public void testCleanupAllShouldNotDropStaticDatabase() { @Test public void testCleanupShouldDropNonStaticDatabase() { + String unused = testManager.createTestDatabase(); when(session.run(anyString(), anyMap())).thenReturn(mock(Result.class)); testManager.cleanupAll(); @@ -162,8 +168,8 @@ public void testCleanupShouldDropNonStaticDatabase() { @Test public void testCleanupAllShouldThrowErrorWhenNeo4jDriverFailsToDropDatabase() { + String unused = testManager.createTestDatabase(); doThrow(ClientException.class).when(session).run(anyString(), anyMap()); - assertThrows(Neo4jResourceManagerException.class, () -> testManager.cleanupAll()); }