Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 41 additions & 26 deletions java/arcs/core/storage/ReferenceModeStore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,33 @@ class ReferenceModeStore private constructor(
private suspend fun handleContainerMessage(proxyMessage: ContainerProxyMessage): Boolean {
when (proxyMessage) {
is ProxyMessage.Operations -> {

suspend fun handlePendingId(reference: RawReference, op: CrdtOperation) {
addToHoldQueueFromReferences(
listOf(reference),
onTimeout = {}
) {
val updated =
getLocalData(reference.id) as? CrdtEntity.Data

// Bridge the op from the collection using the RawEntity from the
// backing store, and use the refModeOp for sending back to the
// proxy.
val upstreamOps = listOf(
op.toBridgingOp(updated?.toRawEntity(reference.id)).refModeOp
)

callbacks.allCallbacksExcept(proxyMessage.id).forEach { callback ->
callback(
ProxyMessage.Operations(
operations = upstreamOps,
id = proxyMessage.id
)
)
}
}
}

val containerOps = proxyMessage.operations
opLoop@ for (op in containerOps) {
val reference = when (op) {
Expand All @@ -460,34 +487,22 @@ class ReferenceModeStore private constructor(
else -> null
}
val getEntity = if (reference != null) {
val entityCrdt = getLocalData(reference.id) as? CrdtEntity.Data
if (entityCrdt == null) {
addToHoldQueueFromReferences(
listOf(reference),
onTimeout = {}
) {
val updated =
getLocalData(reference.id) as? CrdtEntity.Data

// Bridge the op from the collection using the RawEntity from the
// backing store, and use the refModeOp for sending back to the
// proxy.
val upstreamOps = listOf(
op.toBridgingOp(updated?.toRawEntity(reference.id)).refModeOp
)

callbacks.allCallbacksExcept(proxyMessage.id).forEach { callback ->
callback(
ProxyMessage.Operations(
operations = upstreamOps,
id = proxyMessage.id
)
)
}
if (BuildFlags.REFERENCE_MODE_STORE_FIXES) {
val entityCrdt = getLocalData(reference.id)
val referenceVersionMap = checkNotNull(reference.version) { "reference is null" }
if (entityCrdt.versionMap doesNotDominate referenceVersionMap) {
handlePendingId(reference, op)
continue@opLoop
}
suspend { entityCrdt.toRawEntity(reference.id) }
} else {
val entityCrdt = getLocalData(reference.id) as? CrdtEntity.Data
if (entityCrdt == null) {
handlePendingId(reference, op)
continue@opLoop
}
continue@opLoop
suspend { entityCrdt.toRawEntity(reference.id) }
}
suspend { entityCrdt.toRawEntity(reference.id) }
} else {
suspend { null }
}
Expand Down
110 changes: 110 additions & 0 deletions java/arcs/core/storage/testutil/ReferenceModeStoreTestBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,116 @@ abstract class ReferenceModeStoreTestBase(private val parameters: ParameterizedB
assertThat(message).isInstanceOf(ProxyMessage.SyncRequest::class.java)
}

@Test
fun operationsFromContainer_withMissingBackingData_waitsForBackingData() = runBlockingTest {
assume().that(BuildFlags.REFERENCE_MODE_STORE_FIXES).isTrue()
val activeStore = collectionReferenceModeStore(scope = this)
val actor = "me"
val bobEntity = createPersonEntity("an-id", "bob", 42, listOf(1L, 1L, 2L), "inline")

// Data stored in backing store.
val bobCrdt = CrdtEntity.newAtVersionForTest(
VersionMap(activeStore.crdtKey to 1),
bobEntity
)

// Data stored in container store.
val bobReference = bobEntity.toReference(
activeStore.backingStore.storageKey,
VersionMap(
activeStore.crdtKey to 1
)
)

// Set up proxy.
val deferredOps = CompletableDeferred<List<RefModeStoreOp>>()
activeStore.on {
if (it is ProxyMessage.Operations) {
deferredOps.complete(it.operations)
return@on
}
}

// Send data to container store.
val opsForContainer = CrdtSet.Operation.Add(actor, VersionMap(actor to 1), bobReference)
activeStore.containerStore.onProxyMessage(
ProxyMessage.Operations(listOf(opsForContainer), activeStore.containerStoreId + 1)
)

activeStore.idle()

// Send data to backing store
val bobBackingStore = activeStore.backingStore.getStore(
"an-id",
activeStore.backingStoreId
).store
bobBackingStore.onReceive(bobCrdt.data, activeStore.backingStoreId + 1)

val ops = deferredOps.await()
assertThat(ops).containsExactly(
CrdtSet.Operation.Add(actor, VersionMap(actor to 1), bobEntity)
)
}

@Test
fun operationsfromContainerStore_withOldBackingData_waitsForBackingData() = runBlockingTest {
assume().that(BuildFlags.REFERENCE_MODE_STORE_FIXES).isTrue()
val activeStore = collectionReferenceModeStore(scope = this)
val actor = "me"
val bobEntity = createPersonEntity("an-id", "bob", 42, listOf(1L, 1L, 2L), "inline")

// Data stored in backing store. bobCrdt is old backing data.
val bobCrdt = CrdtEntity.newAtVersionForTest(
VersionMap(activeStore.crdtKey to 1),
bobEntity
)

// Data stored in container store. bobReference is up to date
val bobReference = bobEntity.toReference(
activeStore.backingStore.storageKey,
VersionMap(
activeStore.crdtKey to 2
)
)

// Set up proxy.
val deferredOps = CompletableDeferred<List<RefModeStoreOp>>()
activeStore.on {
if (it is ProxyMessage.Operations) {
deferredOps.complete(it.operations)
return@on
}
}

// Send data to backing store.
val bobBackingStore = activeStore.backingStore.getStore(
"an-id",
activeStore.backingStoreId
).store
bobBackingStore.onReceive(bobCrdt.data, activeStore.backingStoreId + 1)

// Send data to container store.
val opsForContainer = CrdtSet.Operation.Add(actor, VersionMap(actor to 1), bobReference)
activeStore.containerStore.onProxyMessage(
ProxyMessage.Operations(listOf(opsForContainer), activeStore.containerStoreId + 1)
)

activeStore.idle()

// // Update backing store to update bobCrdt
val updatedBobEntity = createPersonEntity("an-id", "bob", 43, listOf(1L, 1L, 2L), "inline")
val updatedBobCrdt = CrdtEntity.newAtVersionForTest(
VersionMap(activeStore.crdtKey to 2),
updatedBobEntity
)
bobBackingStore.onReceive(updatedBobCrdt.data, activeStore.backingStoreId + 1)

val ops = deferredOps.await()
assertThat(ops).containsExactly(
CrdtSet.Operation.Add(actor, VersionMap(actor to 1), updatedBobEntity)
)
}

@Test
fun entityVersion_isIncremented() = runBlockingTest {
val activeStore = collectionReferenceModeStore(scope = this)
Expand Down