Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Feb 3, 2025
1 parent d0ada65 commit 5c35522
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 1 deletion.
1 change: 1 addition & 0 deletions data-prepper-plugins/rds-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ dependencies {
testImplementation libs.avro.core
testImplementation libs.parquet.hadoop
testImplementation libs.parquet.avro
// testImplementation 'org.slf4j:slf4j-simple:2.0.9'
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public LogicalReplicationClient(final ConnectionManager connectionManager,

@Override
public void connect() {
LOG.debug("Start connecting logical replication stream. ");
PGReplicationStream stream;
try (Connection conn = connectionManager.getConnection()) {
PGConnection pgConnection = conn.unwrap(PGConnection.class);
Expand All @@ -62,6 +63,7 @@ public void connect() {
logicalStreamBuilder.withStartPosition(startLsn);
}
stream = logicalStreamBuilder.start();
LOG.debug("Logical replication stream started. ");

if (eventProcessor != null) {
while (!disconnectRequested) {
Expand All @@ -88,7 +90,8 @@ public void connect() {
}

stream.close();
LOG.info("Replication stream closed successfully.");
disconnectRequested = false;
LOG.debug("Replication stream closed successfully.");
} catch (Exception e) {
LOG.error("Exception while creating Postgres replication stream. ", e);
}
Expand All @@ -97,6 +100,7 @@ public void connect() {
@Override
public void disconnect() {
disconnectRequested = true;
LOG.debug("Requested to disconnect logical replication stream.");
}

public void setEventProcessor(LogicalReplicationEventProcessor eventProcessor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -87,6 +89,92 @@ void test_connect() throws SQLException, InterruptedException {
verify(stream).setFlushedLSN(lsn);
}

@Test
void test_disconnect() throws SQLException, InterruptedException {
final Connection connection = mock(Connection.class);
final PGConnection pgConnection = mock(PGConnection.class, RETURNS_DEEP_STUBS);
final ChainedLogicalStreamBuilder logicalStreamBuilder = mock(ChainedLogicalStreamBuilder.class);
final PGReplicationStream stream = mock(PGReplicationStream.class);
final ByteBuffer message = ByteBuffer.allocate(0);
final LogSequenceNumber lsn = mock(LogSequenceNumber.class);

when(connectionManager.getConnection()).thenReturn(connection);
when(connection.unwrap(PGConnection.class)).thenReturn(pgConnection);
when(pgConnection.getReplicationAPI().replicationStream().logical()).thenReturn(logicalStreamBuilder);
when(logicalStreamBuilder.withSlotName(anyString())).thenReturn(logicalStreamBuilder);
when(logicalStreamBuilder.withSlotOption(anyString(), anyString())).thenReturn(logicalStreamBuilder);
when(logicalStreamBuilder.start()).thenReturn(stream);
when(stream.readPending()).thenReturn(message).thenReturn(null);
when(stream.getLastReceiveLSN()).thenReturn(lsn);

final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> logicalReplicationClient.connect());

await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> verify(eventProcessor).process(message));
Thread.sleep(20);
verify(stream).setAppliedLSN(lsn);
verify(stream).setFlushedLSN(lsn);

logicalReplicationClient.disconnect();
Thread.sleep(20);
verify(stream).close();
verifyNoMoreInteractions(stream, eventProcessor);

executorService.shutdownNow();
}

@Test
void test_connect_disconnect_cycles() throws SQLException, InterruptedException {
final Connection connection = mock(Connection.class);
final PGConnection pgConnection = mock(PGConnection.class, RETURNS_DEEP_STUBS);
final ChainedLogicalStreamBuilder logicalStreamBuilder = mock(ChainedLogicalStreamBuilder.class);
final PGReplicationStream stream = mock(PGReplicationStream.class);
final ByteBuffer message = ByteBuffer.allocate(0);
final LogSequenceNumber lsn = mock(LogSequenceNumber.class);

when(connectionManager.getConnection()).thenReturn(connection);
when(connection.unwrap(PGConnection.class)).thenReturn(pgConnection);
when(pgConnection.getReplicationAPI().replicationStream().logical()).thenReturn(logicalStreamBuilder);
when(logicalStreamBuilder.withSlotName(anyString())).thenReturn(logicalStreamBuilder);
when(logicalStreamBuilder.withSlotOption(anyString(), anyString())).thenReturn(logicalStreamBuilder);
when(logicalStreamBuilder.start()).thenReturn(stream);
when(stream.readPending()).thenReturn(message).thenReturn(null);
when(stream.getLastReceiveLSN()).thenReturn(lsn);

// First connect
final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> logicalReplicationClient.connect());
await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> verify(eventProcessor, times(1)).process(message));
Thread.sleep(20);
verify(stream).setAppliedLSN(lsn);
verify(stream).setFlushedLSN(lsn);

// First disconnect
logicalReplicationClient.disconnect();
Thread.sleep(20);
verify(stream).close();
verifyNoMoreInteractions(stream, eventProcessor);

// Second connect
when(stream.readPending()).thenReturn(message).thenReturn(null);
executorService.submit(() -> logicalReplicationClient.connect());
await().atMost(Duration.ofSeconds(1))
.untilAsserted(() -> verify(eventProcessor, times(2)).process(message));
Thread.sleep(20);
verify(stream, times(2)).setAppliedLSN(lsn);
verify(stream, times(2)).setFlushedLSN(lsn);

// Second disconnect
logicalReplicationClient.disconnect();
Thread.sleep(20);
verify(stream, times(2)).close();
verifyNoMoreInteractions(stream, eventProcessor);

executorService.shutdownNow();
}

private LogicalReplicationClient createObjectUnderTest() {
return new LogicalReplicationClient(connectionManager, replicationSlotName, publicationName);
}
Expand Down

0 comments on commit 5c35522

Please sign in to comment.