diff --git a/java/arcs/core/storage/ReferenceModeStore.kt b/java/arcs/core/storage/ReferenceModeStore.kt index e51aa014658..009fff54cdf 100644 --- a/java/arcs/core/storage/ReferenceModeStore.kt +++ b/java/arcs/core/storage/ReferenceModeStore.kt @@ -452,6 +452,33 @@ class ReferenceModeStore private constructor( private suspend fun handleContainerMessage(proxyMessage: ContainerProxyMessage): Boolean { when (proxyMessage) { is ProxyMessage.Operations -> { + + suspend fun handlePendingId(reference: RawReference, op: CrdtOperation) { + addToHoldQueueFromReferences( + listOf(reference), + onTimeout = {} + ) { + val updated = + getLocalData(reference.id) as? CrdtEntity.Data + + // Bridge the op from the collection using the RawEntity from the + // backing store, and use the refModeOp for sending back to the + // proxy. + val upstreamOps = listOf( + op.toBridgingOp(updated?.toRawEntity(reference.id)).refModeOp + ) + + callbacks.allCallbacksExcept(proxyMessage.id).forEach { callback -> + callback( + ProxyMessage.Operations( + operations = upstreamOps, + id = proxyMessage.id + ) + ) + } + } + } + val containerOps = proxyMessage.operations opLoop@ for (op in containerOps) { val reference = when (op) { @@ -460,34 +487,22 @@ class ReferenceModeStore private constructor( else -> null } val getEntity = if (reference != null) { - val entityCrdt = getLocalData(reference.id) as? CrdtEntity.Data - if (entityCrdt == null) { - addToHoldQueueFromReferences( - listOf(reference), - onTimeout = {} - ) { - val updated = - getLocalData(reference.id) as? CrdtEntity.Data - - // Bridge the op from the collection using the RawEntity from the - // backing store, and use the refModeOp for sending back to the - // proxy. - val upstreamOps = listOf( - op.toBridgingOp(updated?.toRawEntity(reference.id)).refModeOp - ) - - callbacks.allCallbacksExcept(proxyMessage.id).forEach { callback -> - callback( - ProxyMessage.Operations( - operations = upstreamOps, - id = proxyMessage.id - ) - ) - } + if (BuildFlags.REFERENCE_MODE_STORE_FIXES) { + val entityCrdt = getLocalData(reference.id) + val referenceVersionMap = checkNotNull(reference.version) { "reference is null" } + if (entityCrdt.versionMap doesNotDominate referenceVersionMap) { + handlePendingId(reference, op) + continue@opLoop + } + suspend { entityCrdt.toRawEntity(reference.id) } + } else { + val entityCrdt = getLocalData(reference.id) as? CrdtEntity.Data + if (entityCrdt == null) { + handlePendingId(reference, op) + continue@opLoop } - continue@opLoop + suspend { entityCrdt.toRawEntity(reference.id) } } - suspend { entityCrdt.toRawEntity(reference.id) } } else { suspend { null } } diff --git a/java/arcs/core/storage/testutil/ReferenceModeStoreTestBase.kt b/java/arcs/core/storage/testutil/ReferenceModeStoreTestBase.kt index 6afd5b2a01d..ab8f47783ae 100644 --- a/java/arcs/core/storage/testutil/ReferenceModeStoreTestBase.kt +++ b/java/arcs/core/storage/testutil/ReferenceModeStoreTestBase.kt @@ -929,6 +929,116 @@ abstract class ReferenceModeStoreTestBase(private val parameters: ParameterizedB assertThat(message).isInstanceOf(ProxyMessage.SyncRequest::class.java) } + @Test + fun operationsFromContainer_withMissingBackingData_waitsForBackingData() = runBlockingTest { + assume().that(BuildFlags.REFERENCE_MODE_STORE_FIXES).isTrue() + val activeStore = collectionReferenceModeStore(scope = this) + val actor = "me" + val bobEntity = createPersonEntity("an-id", "bob", 42, listOf(1L, 1L, 2L), "inline") + + // Data stored in backing store. + val bobCrdt = CrdtEntity.newAtVersionForTest( + VersionMap(activeStore.crdtKey to 1), + bobEntity + ) + + // Data stored in container store. + val bobReference = bobEntity.toReference( + activeStore.backingStore.storageKey, + VersionMap( + activeStore.crdtKey to 1 + ) + ) + + // Set up proxy. + val deferredOps = CompletableDeferred>() + activeStore.on { + if (it is ProxyMessage.Operations) { + deferredOps.complete(it.operations) + return@on + } + } + + // Send data to container store. + val opsForContainer = CrdtSet.Operation.Add(actor, VersionMap(actor to 1), bobReference) + activeStore.containerStore.onProxyMessage( + ProxyMessage.Operations(listOf(opsForContainer), activeStore.containerStoreId + 1) + ) + + activeStore.idle() + + // Send data to backing store + val bobBackingStore = activeStore.backingStore.getStore( + "an-id", + activeStore.backingStoreId + ).store + bobBackingStore.onReceive(bobCrdt.data, activeStore.backingStoreId + 1) + + val ops = deferredOps.await() + assertThat(ops).containsExactly( + CrdtSet.Operation.Add(actor, VersionMap(actor to 1), bobEntity) + ) + } + + @Test + fun operationsfromContainerStore_withOldBackingData_waitsForBackingData() = runBlockingTest { + assume().that(BuildFlags.REFERENCE_MODE_STORE_FIXES).isTrue() + val activeStore = collectionReferenceModeStore(scope = this) + val actor = "me" + val bobEntity = createPersonEntity("an-id", "bob", 42, listOf(1L, 1L, 2L), "inline") + + // Data stored in backing store. bobCrdt is old backing data. + val bobCrdt = CrdtEntity.newAtVersionForTest( + VersionMap(activeStore.crdtKey to 1), + bobEntity + ) + + // Data stored in container store. bobReference is up to date + val bobReference = bobEntity.toReference( + activeStore.backingStore.storageKey, + VersionMap( + activeStore.crdtKey to 2 + ) + ) + + // Set up proxy. + val deferredOps = CompletableDeferred>() + activeStore.on { + if (it is ProxyMessage.Operations) { + deferredOps.complete(it.operations) + return@on + } + } + + // Send data to backing store. + val bobBackingStore = activeStore.backingStore.getStore( + "an-id", + activeStore.backingStoreId + ).store + bobBackingStore.onReceive(bobCrdt.data, activeStore.backingStoreId + 1) + + // Send data to container store. + val opsForContainer = CrdtSet.Operation.Add(actor, VersionMap(actor to 1), bobReference) + activeStore.containerStore.onProxyMessage( + ProxyMessage.Operations(listOf(opsForContainer), activeStore.containerStoreId + 1) + ) + + activeStore.idle() + +// // Update backing store to update bobCrdt + val updatedBobEntity = createPersonEntity("an-id", "bob", 43, listOf(1L, 1L, 2L), "inline") + val updatedBobCrdt = CrdtEntity.newAtVersionForTest( + VersionMap(activeStore.crdtKey to 2), + updatedBobEntity + ) + bobBackingStore.onReceive(updatedBobCrdt.data, activeStore.backingStoreId + 1) + + val ops = deferredOps.await() + assertThat(ops).containsExactly( + CrdtSet.Operation.Add(actor, VersionMap(actor to 1), updatedBobEntity) + ) + } + @Test fun entityVersion_isIncremented() = runBlockingTest { val activeStore = collectionReferenceModeStore(scope = this)