From 21f0fe3e7a42d47f38eadd9eaded73523790171c Mon Sep 17 00:00:00 2001
From: Taylor Gray <tylgry@amazon.com>
Date: Mon, 27 Nov 2023 14:10:42 -0600
Subject: [PATCH] Fix bug so GLOBAL read-only items do not expire from TTL in
 ddb source coordination store

Signed-off-by: Taylor Gray <tylgry@amazon.com>
---
 .../model/source/SourceCoordinationStore.java |  3 ++-
 .../LeaseBasedSourceCoordinator.java          |  5 +++--
 .../EnhancedLeaseBasedSourceCoordinator.java  |  8 ++++----
 .../LeaseBasedSourceCoordinatorTest.java      | 13 ++++++------
 ...hancedLeaseBasedSourceCoordinatorTest.java |  4 ++--
 .../DynamoDbSourceCoordinationStore.java      |  5 +++--
 .../DynamoDbSourceCoordinationStoreTest.java  | 20 ++++++++++++++-----
 .../InMemorySourceCoordinationStore.java      |  3 ++-
 .../InMemorySourceCoordinationStoreTest.java  |  4 ++--
 9 files changed, 40 insertions(+), 25 deletions(-)

diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/SourceCoordinationStore.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/SourceCoordinationStore.java
index 13d8d00a03..26bfb5e8d8 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/SourceCoordinationStore.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/SourceCoordinationStore.java
@@ -39,7 +39,8 @@ boolean tryCreatePartitionItem(final String sourceIdentifier,
                                    final String partitionKey,
                                    final SourcePartitionStatus sourcePartitionStatus,
                                    final Long closedCount,
-                                   final String partitionProgressState);
+                                   final String partitionProgressState,
+                                   final boolean isReadOnlyItem);
 
     /**
      * The following scenarios should qualify a partition as available to be acquired
diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java
index be282d0ca5..272cdbb21c 100644
--- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java
+++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinator.java
@@ -134,7 +134,7 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
     public void initialize() {
         sourceCoordinationStore.initializeStore();
         initialized = true;
-        sourceCoordinationStore.tryCreatePartitionItem(sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS, SourcePartitionStatus.UNASSIGNED, 0L, null);
+        sourceCoordinationStore.tryCreatePartitionItem(sourceIdentifierWithGlobalStateType, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS, SourcePartitionStatus.UNASSIGNED, 0L, null, false);
     }
 
     @Override
@@ -195,7 +195,8 @@ private void createPartitions(final List<PartitionIdentifier> partitionIdentifie
                     partitionIdentifier.getPartitionKey(),
                     SourcePartitionStatus.UNASSIGNED,
                     0L,
-                    null
+                    null,
+                    false
             );
 
             if (partitionCreated) {
diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java
index 619d8ca3e5..4406c7c607 100644
--- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java
+++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java
@@ -101,14 +101,14 @@ public <T> boolean createPartition(EnhancedSourcePartition<T> partition) {
         // Don't need the status for Global state which is not for lease.
         SourcePartitionStatus status = partition.getPartitionType() == null ? null : SourcePartitionStatus.UNASSIGNED;
 
-        boolean partitionCreated = coordinationStore.tryCreatePartitionItem(
+        return coordinationStore.tryCreatePartitionItem(
                 this.sourceIdentifier + "|" + partitionType,
                 partition.getPartitionKey(),
                 status,
                 0L,
-                partition.convertPartitionProgressStatetoString(partition.getProgressState())
-        );
-        return partitionCreated;
+                partition.convertPartitionProgressStatetoString(partition.getProgressState()),
+                // For now, global items with no partitionType will be considered ReadOnly, but this should be directly in EnhancedSourcePartition in the future
+                partition.getPartitionType() == null);
 
     }
 
diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java
index 2a95299593..8984f7cfc0 100644
--- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java
+++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/LeaseBasedSourceCoordinatorTest.java
@@ -46,6 +46,7 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
@@ -163,7 +164,7 @@ private SourceCoordinator<String> createObjectUnderTest() {
         doNothing().when(sourceCoordinationStore).initializeStore();
         given(sourceCoordinationStore.tryCreatePartitionItem(
                 fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS,
-                SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(true);
+                SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(true);
         objectUnderTest.initialize();
         return objectUnderTest;
     }
@@ -175,7 +176,7 @@ void initialize_calls_initializeStore() {
 
         verify(sourceCoordinationStore).initializeStore();
         verify(sourceCoordinationStore).tryCreatePartitionItem(fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS,
-                SourcePartitionStatus.UNASSIGNED, 0L, null);
+                SourcePartitionStatus.UNASSIGNED, 0L, null, false);
     }
 
     @Test
@@ -200,7 +201,7 @@ void getNextPartition_calls_supplier_and_creates_partition_with_existing_then_no
         doNothing().when(sourceCoordinationStore).tryUpdateSourcePartitionItem(globalStateForPartitionCreationItem);
         given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifierToSkip.getPartitionKey())).willReturn(Optional.of(sourcePartitionStoreItem));
         given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey())).willReturn(Optional.empty());
-        given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(true);
+        given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(true);
 
         final Optional<SourcePartition<String>> result = createObjectUnderTest().getNextPartition(partitionCreationSupplier);
 
@@ -242,7 +243,7 @@ void getNextPartition_calls_supplier_which_returns_existing_partition_does_not_c
 
         assertThat(result.isEmpty(), equalTo(true));
 
-        verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString());
+        verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString(), eq(false));
 
         verify(partitionCreationSupplierInvocationsCounter).increment();
         verify(noPartitionsAcquiredCounter).increment();
@@ -271,7 +272,7 @@ void getNextPartition_with_non_existing_item_and_create_attempt_fails_will_do_no
         given(globalStateForPartitionCreationItem.getPartitionOwner()).willReturn(sourceIdentifierWithPartitionPrefix + ":" + InetAddress.getLocalHost().getHostName());
         given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForGlobalState, GLOBAL_STATE_SOURCE_PARTITION_KEY_FOR_CREATING_PARTITIONS)).willReturn(Optional.of(globalStateForPartitionCreationItem));
         given(sourceCoordinationStore.getSourcePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey())).willReturn(Optional.empty());
-        given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null)).willReturn(false);
+        given(sourceCoordinationStore.tryCreatePartitionItem(fullSourceIdentifierForPartition, partitionIdentifier.getPartitionKey(), SourcePartitionStatus.UNASSIGNED, 0L, null, false)).willReturn(false);
 
         final Optional<SourcePartition<String>> result = createObjectUnderTest().getNextPartition(partitionCreationSupplier);
 
@@ -381,7 +382,7 @@ void getNextPartition_with_no_active_partition_and_successful_tryAcquireAvailabl
         verify(partitionManager).setActivePartition(result.get());
         verify(sourceCoordinationStore, never()).getSourcePartitionItem(anyString(), anyString());
         verify(sourceCoordinationStore, never()).tryUpdateSourcePartitionItem(any(SourcePartitionStoreItem.class));
-        verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString());
+        verify(sourceCoordinationStore, never()).tryCreatePartitionItem(anyString(), anyString(), any(), anyLong(), anyString(), eq(false));
 
         verify(partitionsAcquiredCounter).increment();
 
diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java
index 6a1122ee56..6608811945 100644
--- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java
+++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java
@@ -121,12 +121,12 @@ void test_createPartition() {
         // A normal type.
         TestEnhancedSourcePartition partition = new TestEnhancedSourcePartition(false);
         coordinator.createPartition(partition);
-        verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|" + DEFAULT_PARTITION_TYPE), anyString(), eq(SourcePartitionStatus.UNASSIGNED), anyLong(), eq(null));
+        verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|" + DEFAULT_PARTITION_TYPE), anyString(), eq(SourcePartitionStatus.UNASSIGNED), anyLong(), eq(null), eq(false));
 
         // GlobalState.
         TestEnhancedSourcePartition globalState = new TestEnhancedSourcePartition(true);
         coordinator.createPartition(globalState);
-        verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|GLOBAL"), anyString(), eq(null), anyLong(), eq(null));
+        verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|GLOBAL"), anyString(), eq(null), anyLong(), eq(null), eq(true));
 
     }
 
diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java
index d8f6cde81f..2654cf5110 100644
--- a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java
+++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java
@@ -72,10 +72,11 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier,
                                           final String sourcePartitionKey,
                                           final SourcePartitionStatus sourcePartitionStatus,
                                           final Long closedCount,
-                                          final String partitionProgressState) {
+                                          final String partitionProgressState,
+                                          final boolean isReadOnlyItem) {
         final DynamoDbSourcePartitionItem newPartitionItem = new DynamoDbSourcePartitionItem();
 
-        if (Objects.nonNull(dynamoStoreSettings.getTtl())) {
+        if (!isReadOnlyItem && Objects.nonNull(dynamoStoreSettings.getTtl())) {
             newPartitionItem.setExpirationTime(Instant.now().plus(dynamoStoreSettings.getTtl()).getEpochSecond());
         }
         newPartitionItem.setSourceIdentifier(sourceIdentifier);
diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java
index 5003b17150..3c15376a5d 100644
--- a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java
+++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java
@@ -8,6 +8,8 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockedStatic;
@@ -27,11 +29,13 @@
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
@@ -94,8 +98,9 @@ void getSourcePartitionItem_calls_dynamoClientWrapper_correctly() {
         assertThat(result.get(), equalTo(sourcePartitionStoreItem));
     }
 
-    @Test
-    void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly(final boolean isReadOnlyItem) {
         final String sourceIdentifier = UUID.randomUUID().toString();
         final String sourcePartitionKey = UUID.randomUUID().toString();
         final SourcePartitionStatus sourcePartitionStatus = SourcePartitionStatus.UNASSIGNED;
@@ -107,9 +112,9 @@ void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() {
 
         final Duration ttl = Duration.ofSeconds(30);
         final Long nowPlusTtl = Instant.now().plus(ttl).getEpochSecond();
-        given(dynamoStoreSettings.getTtl()).willReturn(ttl);
+        lenient().when(dynamoStoreSettings.getTtl()).thenReturn(ttl);
 
-        final boolean result = createObjectUnderTest().tryCreatePartitionItem(sourceIdentifier, sourcePartitionKey, sourcePartitionStatus, closedCount, partitionProgressState);
+        final boolean result = createObjectUnderTest().tryCreatePartitionItem(sourceIdentifier, sourcePartitionKey, sourcePartitionStatus, closedCount, partitionProgressState, isReadOnlyItem);
 
         assertThat(result, equalTo(true));
 
@@ -122,7 +127,12 @@ void tryCreatePartitionItem_calls_dynamoDbClientWrapper_correctly() {
         assertThat(createdItem.getPartitionProgressState(), equalTo(partitionProgressState));
         assertThat(createdItem.getSourceStatusCombinationKey(), equalTo(sourceIdentifier + "|" + sourcePartitionStatus));
         assertThat(createdItem.getPartitionPriority(), notNullValue());
-        assertThat(createdItem.getExpirationTime(), greaterThanOrEqualTo(nowPlusTtl));
+
+        if (isReadOnlyItem) {
+            assertThat(createdItem.getExpirationTime(), nullValue());
+        } else {
+            assertThat(createdItem.getExpirationTime(), greaterThanOrEqualTo(nowPlusTtl));
+        }
     }
 
     @Test
diff --git a/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStore.java b/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStore.java
index 0ea8c601a9..16cfb48a6d 100644
--- a/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStore.java
+++ b/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStore.java
@@ -68,7 +68,8 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier,
                                           final String partitionKey,
                                           final SourcePartitionStatus sourcePartitionStatus,
                                           final Long closedCount,
-                                          final String partitionProgressState) {
+                                          final String partitionProgressState,
+                                          final boolean isReadOnlyItem) {
         synchronized (this) {
             if (inMemoryPartitionAccessor.getItem(sourceIdentifier, partitionKey).isEmpty()) {
                 final InMemorySourcePartitionStoreItem inMemorySourcePartitionStoreItem = new InMemorySourcePartitionStoreItem();
diff --git a/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStoreTest.java b/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStoreTest.java
index ac1b19ad25..bbc6a88224 100644
--- a/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStoreTest.java
+++ b/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemorySourceCoordinationStoreTest.java
@@ -109,7 +109,7 @@ void tryCreatePartitionItem_for_item_that_exists_does_not_queuePartition_and_ret
         given(inMemoryPartitionAccessor.getItem(sourceIdentifier, partitionKey)).willReturn(Optional.of(mock(SourcePartitionStoreItem.class)));
         final InMemorySourceCoordinationStore objectUnderTest = createObjectUnderTest();
 
-        final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState);
+        final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState, false);
         assertThat(created, equalTo(false));
         verify(inMemoryPartitionAccessor, never()).queuePartition(any());
     }
@@ -129,7 +129,7 @@ void tryCreatePartitionItem_for_item_that_does_not_exist_queues_that_partition()
         doNothing().when(inMemoryPartitionAccessor).queuePartition(argumentCaptor.capture());
         final InMemorySourceCoordinationStore objectUnderTest = createObjectUnderTest();
 
-        final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState);
+        final boolean created = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, partitionKey, status, closedCount, partitionProgressState, false);
         assertThat(created, equalTo(true));
 
         final InMemorySourcePartitionStoreItem createdItem = argumentCaptor.getValue();