Skip to content

Commit 7c964fa

Browse files
Configurable Bundle size to upload to the server (#1584)
* Users can provide an upload configuration to limit the number of resources in each bundle to upload to the server * Review changes: Added upload config as param for SyncJob * Review comments: Refactored code to just have Size based splitter Co-authored-by: PallaviGanorkar <[email protected]>
1 parent ff2a5bd commit 7c964fa

File tree

11 files changed

+306
-29
lines changed

11 files changed

+306
-29
lines changed

engine/src/main/java/com/google/android/fhir/sync/Config.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ typealias ParamMap = Map<String, String>
3434
/** Constant for the Greater Than Search Prefix */
3535
@PublishedApi internal const val GREATER_THAN_PREFIX = "gt"
3636

37+
/** Constant for the default number of resource entries in a singe Bundle for upload. */
38+
const val DEFAULT_BUNDLE_SIZE = 500
39+
3740
val defaultRetryConfiguration =
3841
RetryConfiguration(BackoffCriteria(BackoffPolicy.LINEAR, 30, TimeUnit.SECONDS), 3)
3942

@@ -104,3 +107,15 @@ data class BackoffCriteria(
104107
/** The time unit for [backoffDelay] */
105108
val timeUnit: TimeUnit
106109
)
110+
111+
/**
112+
* Configuration for max number of resources to be uploaded in a Bundle.The default size is
113+
* [DEFAULT_BUNDLE_SIZE].
114+
*/
115+
data class UploadConfiguration(
116+
/**
117+
* Number of [Resource]s to be added in a singe [Bundle] for upload and default is
118+
* [DEFAULT_BUNDLE_SIZE]
119+
*/
120+
val uploadBundleSize: Int = DEFAULT_BUNDLE_SIZE
121+
)

engine/src/main/java/com/google/android/fhir/sync/FhirSyncWorker.kt

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ import com.google.android.fhir.FhirEngineProvider
2626
import com.google.android.fhir.OffsetDateTimeTypeAdapter
2727
import com.google.android.fhir.sync.Result.Error
2828
import com.google.android.fhir.sync.Result.Success
29+
import com.google.android.fhir.sync.download.DownloaderImpl
30+
import com.google.android.fhir.sync.upload.BundleUploader
31+
import com.google.android.fhir.sync.upload.LocalChangesPaginator
32+
import com.google.android.fhir.sync.upload.TransactionBundleGenerator
2933
import com.google.gson.ExclusionStrategy
3034
import com.google.gson.FieldAttributes
3135
import com.google.gson.GsonBuilder
@@ -45,6 +49,12 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
4549
abstract fun getDownloadWorkManager(): DownloadWorkManager
4650
abstract fun getConflictResolver(): ConflictResolver
4751

52+
/**
53+
* Configuration defining the max upload Bundle size (in terms to number of resources in a Bundle)
54+
* and optionally defining the order of Resources.
55+
*/
56+
open fun getUploadConfiguration(): UploadConfiguration = UploadConfiguration()
57+
4858
private val gson =
4959
GsonBuilder()
5060
.registerTypeAdapter(OffsetDateTime::class.java, OffsetDateTimeTypeAdapter().nullSafe())
@@ -65,14 +75,6 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
6575
)
6676
)
6777

68-
val fhirSynchronizer =
69-
FhirSynchronizer(
70-
applicationContext,
71-
getFhirEngine(),
72-
dataSource,
73-
getDownloadWorkManager(),
74-
conflictResolver = getConflictResolver()
75-
)
7678
val flow = MutableSharedFlow<State>()
7779

7880
val job =
@@ -87,11 +89,21 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
8789
}
8890
}
8991

90-
fhirSynchronizer.subscribe(flow)
91-
9292
Timber.v("Subscribed to flow for progress")
93-
94-
val result = fhirSynchronizer.synchronize()
93+
val result =
94+
FhirSynchronizer(
95+
applicationContext,
96+
getFhirEngine(),
97+
BundleUploader(
98+
dataSource,
99+
TransactionBundleGenerator.getDefault(),
100+
LocalChangesPaginator.create(getUploadConfiguration())
101+
),
102+
DownloaderImpl(dataSource, getDownloadWorkManager()),
103+
getConflictResolver()
104+
)
105+
.apply { subscribe(flow) }
106+
.synchronize()
95107
val output = buildOutput(result)
96108

97109
// await/join is needed to collect states completely

engine/src/main/java/com/google/android/fhir/sync/FhirSynchronizer.kt

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@ package com.google.android.fhir.sync
1919
import android.content.Context
2020
import com.google.android.fhir.DatastoreUtil
2121
import com.google.android.fhir.FhirEngine
22-
import com.google.android.fhir.sync.download.DownloaderImpl
23-
import com.google.android.fhir.sync.upload.BundleUploader
24-
import com.google.android.fhir.sync.upload.TransactionBundleGenerator
2522
import java.time.OffsetDateTime
2623
import kotlinx.coroutines.flow.MutableSharedFlow
2724
import kotlinx.coroutines.flow.collect
@@ -51,11 +48,8 @@ data class ResourceSyncException(val resourceType: ResourceType, val exception:
5148
internal class FhirSynchronizer(
5249
context: Context,
5350
private val fhirEngine: FhirEngine,
54-
private val dataSource: DataSource,
55-
private val downloadManager: DownloadWorkManager,
56-
private val uploader: Uploader =
57-
BundleUploader(dataSource, TransactionBundleGenerator.getDefault()),
58-
private val downloader: Downloader = DownloaderImpl(dataSource, downloadManager),
51+
private val uploader: Uploader,
52+
private val downloader: Downloader,
5953
private val conflictResolver: ConflictResolver
6054
) {
6155
private var syncState: MutableSharedFlow<State>? = null

engine/src/main/java/com/google/android/fhir/sync/Sync.kt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ import androidx.work.PeriodicWorkRequest
2525
import androidx.work.WorkManager
2626
import com.google.android.fhir.FhirEngine
2727
import com.google.android.fhir.FhirEngineProvider
28+
import com.google.android.fhir.sync.download.DownloaderImpl
29+
import com.google.android.fhir.sync.upload.BundleUploader
30+
import com.google.android.fhir.sync.upload.LocalChangesPaginator
31+
import com.google.android.fhir.sync.upload.TransactionBundleGenerator
2832
import org.hl7.fhir.r4.model.ResourceType
2933

3034
object Sync {
@@ -42,10 +46,21 @@ object Sync {
4246
context: Context,
4347
fhirEngine: FhirEngine,
4448
downloadManager: DownloadWorkManager,
49+
uploadConfiguration: UploadConfiguration = UploadConfiguration(),
4550
resolver: ConflictResolver
4651
): Result {
4752
return FhirEngineProvider.getDataSource(context)?.let {
48-
FhirSynchronizer(context, fhirEngine, it, downloadManager, conflictResolver = resolver)
53+
FhirSynchronizer(
54+
context,
55+
fhirEngine,
56+
BundleUploader(
57+
it,
58+
TransactionBundleGenerator.getDefault(),
59+
LocalChangesPaginator.create(uploadConfiguration)
60+
),
61+
DownloaderImpl(it, downloadManager),
62+
resolver
63+
)
4964
.synchronize()
5065
}
5166
?: Result.Error(

engine/src/main/java/com/google/android/fhir/sync/SyncJob.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ interface SyncJob {
3434
fhirEngine: FhirEngine,
3535
downloadManager: DownloadWorkManager,
3636
resolver: ConflictResolver,
37-
subscribeTo: MutableSharedFlow<State>?
37+
subscribeTo: MutableSharedFlow<State>?,
38+
uploadConfiguration: UploadConfiguration = UploadConfiguration()
3839
): Result
3940

4041
fun workInfoFlow(): Flow<WorkInfo>

engine/src/main/java/com/google/android/fhir/sync/SyncJobImpl.kt

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ import com.google.android.fhir.DatastoreUtil
2626
import com.google.android.fhir.FhirEngine
2727
import com.google.android.fhir.FhirEngineProvider
2828
import com.google.android.fhir.OffsetDateTimeTypeAdapter
29+
import com.google.android.fhir.sync.download.DownloaderImpl
30+
import com.google.android.fhir.sync.upload.BundleUploader
31+
import com.google.android.fhir.sync.upload.LocalChangesPaginator
32+
import com.google.android.fhir.sync.upload.TransactionBundleGenerator
2933
import com.google.gson.GsonBuilder
3034
import java.time.OffsetDateTime
3135
import kotlinx.coroutines.ExperimentalCoroutinesApi
@@ -101,10 +105,21 @@ class SyncJobImpl(private val context: Context) : SyncJob {
101105
fhirEngine: FhirEngine,
102106
downloadManager: DownloadWorkManager,
103107
resolver: ConflictResolver,
104-
subscribeTo: MutableSharedFlow<State>?
108+
subscribeTo: MutableSharedFlow<State>?,
109+
uploadConfiguration: UploadConfiguration
105110
): Result {
106111
return FhirEngineProvider.getDataSource(context)?.let {
107-
FhirSynchronizer(context, fhirEngine, it, downloadManager, conflictResolver = resolver)
112+
FhirSynchronizer(
113+
context,
114+
fhirEngine,
115+
BundleUploader(
116+
it,
117+
TransactionBundleGenerator.getDefault(),
118+
LocalChangesPaginator.create(uploadConfiguration)
119+
),
120+
DownloaderImpl(it, downloadManager),
121+
resolver
122+
)
108123
.apply { if (subscribeTo != null) subscribe(subscribeTo) }
109124
.synchronize()
110125
}

engine/src/main/java/com/google/android/fhir/sync/upload/BundleUploader.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@ import org.hl7.fhir.r4.model.ResourceType
3333
/** [Uploader] implementation to work with Fhir [Bundle]. */
3434
internal class BundleUploader(
3535
private val dataSource: DataSource,
36-
private val bundleGenerator: TransactionBundleGenerator
36+
private val bundleGenerator: TransactionBundleGenerator,
37+
private val localChangesPaginator: LocalChangesPaginator
3738
) : Uploader {
3839

3940
override suspend fun upload(
4041
localChanges: List<LocalChange>,
4142
): Flow<UploadResult> = flow {
42-
bundleGenerator.generate(listOf(localChanges)).forEach { (bundle, localChangeTokens) ->
43+
bundleGenerator.generate(localChangesPaginator.page(localChanges)).forEach {
44+
(bundle, localChangeTokens) ->
4345
try {
4446
val response = dataSource.upload(bundle)
4547
emit(getUploadResult(response, localChangeTokens))
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.android.fhir.sync.upload
18+
19+
import com.google.android.fhir.LocalChange
20+
import com.google.android.fhir.sync.UploadConfiguration
21+
22+
/** Splits the [List]<[LocalChange]> into smaller chunks. */
23+
internal fun interface LocalChangesPaginator {
24+
fun page(list: List<LocalChange>): List<List<LocalChange>>
25+
26+
companion object Factory {
27+
28+
fun create(uploadConfiguration: UploadConfiguration): LocalChangesPaginator {
29+
require(uploadConfiguration.uploadBundleSize > 0) {
30+
"UploadConfiguration.uploadBundleSize ${uploadConfiguration.uploadBundleSize} must be greater than zero."
31+
}
32+
33+
return SizeBasedLocalChangesPaginator(uploadConfiguration.uploadBundleSize)
34+
}
35+
36+
val DEFAULT = create(UploadConfiguration())
37+
}
38+
}
39+
40+
/**
41+
* Splits the [List]<[LocalChange]> into equal chunks of provided [size]. The last chunk may have
42+
* fewer elements than the [size].
43+
*/
44+
internal class SizeBasedLocalChangesPaginator(val size: Int) : LocalChangesPaginator {
45+
override fun page(list: List<LocalChange>) = list.chunked(size)
46+
}

engine/src/test/java/com/google/android/fhir/sync/upload/BundleUploaderTest.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ class BundleUploaderTest {
4646
TestingUtils.BundleDataSource {
4747
Bundle().apply { type = Bundle.BundleType.TRANSACTIONRESPONSE }
4848
},
49-
TransactionBundleGenerator.getDefault()
49+
TransactionBundleGenerator.getDefault(),
50+
LocalChangesPaginator.DEFAULT
5051
)
5152
.upload(localChanges)
5253
.toList()
@@ -69,7 +70,8 @@ class BundleUploaderTest {
6970
)
7071
}
7172
},
72-
TransactionBundleGenerator.getDefault()
73+
TransactionBundleGenerator.getDefault(),
74+
LocalChangesPaginator.DEFAULT
7375
)
7476
.upload(localChanges)
7577
.toList()
@@ -83,7 +85,8 @@ class BundleUploaderTest {
8385
val result =
8486
BundleUploader(
8587
TestingUtils.BundleDataSource { throw ConnectException("Failed to connect to server.") },
86-
TransactionBundleGenerator.getDefault()
88+
TransactionBundleGenerator.getDefault(),
89+
LocalChangesPaginator.DEFAULT
8790
)
8891
.upload(localChanges)
8992
.toList()
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2022 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.android.fhir.sync.upload
18+
19+
import com.google.android.fhir.LocalChange
20+
import com.google.android.fhir.db.impl.dao.LocalChangeToken
21+
import com.google.common.truth.Truth.assertThat
22+
import org.hl7.fhir.r4.model.ResourceType
23+
import org.junit.Test
24+
import org.junit.runner.RunWith
25+
import org.robolectric.RobolectricTestRunner
26+
27+
@RunWith(RobolectricTestRunner::class)
28+
class LocalChangesPaginatorTest {
29+
30+
@Test
31+
fun `SizeSplitter split should return 3 lists in order`() {
32+
val changes =
33+
listOf(
34+
LocalChange(
35+
ResourceType.Patient.name,
36+
"patient-001",
37+
type = LocalChange.Type.INSERT,
38+
payload = "{}",
39+
token = LocalChangeToken(listOf(1L))
40+
),
41+
LocalChange(
42+
ResourceType.Patient.name,
43+
"patient-002",
44+
type = LocalChange.Type.INSERT,
45+
payload = "{}",
46+
token = LocalChangeToken(listOf(2L))
47+
),
48+
LocalChange(
49+
ResourceType.Patient.name,
50+
"patient-003",
51+
type = LocalChange.Type.INSERT,
52+
payload = "{}",
53+
token = LocalChangeToken(listOf(3L))
54+
),
55+
LocalChange(
56+
ResourceType.Patient.name,
57+
"patient-004",
58+
type = LocalChange.Type.INSERT,
59+
payload = "{}",
60+
token = LocalChangeToken(listOf(4L))
61+
),
62+
LocalChange(
63+
ResourceType.Patient.name,
64+
"patient-005",
65+
type = LocalChange.Type.INSERT,
66+
payload = "{}",
67+
token = LocalChangeToken(listOf(5L))
68+
)
69+
)
70+
71+
val result = SizeBasedLocalChangesPaginator(2).page(changes)
72+
assertThat(result).hasSize(3)
73+
assertThat(result[0].map { it.resourceId }).containsExactly("patient-001", "patient-002")
74+
assertThat(result[1].map { it.resourceId }).containsExactly("patient-003", "patient-004")
75+
assertThat(result[2].map { it.resourceId }).containsExactly("patient-005")
76+
}
77+
78+
@Test
79+
fun `SizeSplitter split should return empty lists when input its empty`() {
80+
val result = SizeBasedLocalChangesPaginator(2).page(emptyList())
81+
assertThat(result).hasSize(0)
82+
}
83+
}

0 commit comments

Comments
 (0)