From 005533232e5d5b639fb40a873e57bf3103c507d7 Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 10:02:29 +0100 Subject: [PATCH 01/26] add konnection library --- build.gradle.kts | 1 + settings.gradle.kts | 1 + 2 files changed, 2 insertions(+) diff --git a/build.gradle.kts b/build.gradle.kts index 7ad9a25..356b4eb 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -30,6 +30,7 @@ kotlin { implementation(libs.kotlinx.datetime) implementation(libs.kotlinx.atomicfu) implementation(libs.multiplatform.settings) + implementation(libs.konnection) } } val commonTest by getting { diff --git a/settings.gradle.kts b/settings.gradle.kts index 43f8a79..d29841f 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -29,6 +29,7 @@ dependencyResolutionManagement { library("roboelectric", "org.robolectric", "robolectric").version("4.5.1") library("multiplatform-settings", "com.russhwolf", "multiplatform-settings").version("1.0.0-RC") library("multiplatform-settings-test", "com.russhwolf", "multiplatform-settings-test").version("1.0.0-RC") + library("konnection", "dev.tmapps", "konnection").version("1.1.10") plugin("versioning", "net.nemerosa.versioning").version("3.0.0") plugin("kotlin.serialization", "org.jetbrains.kotlin.plugin.serialization").versionRef("kotlin") } From 308600d5ffb10070140d972611c6c8269b141e0b Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 10:06:35 +0100 Subject: [PATCH 02/26] implement NetworkListener using Konnection --- .../com/liftric/job/queue/NetworkListener.kt | 29 +++++++++++++++++++ .../com/liftric/job/queue/NetworkListener.kt | 10 +++++++ .../com/liftric/job/queue/NetworkListener.kt | 27 +++++++++++++++++ 3 files changed, 66 insertions(+) create mode 100644 src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt create mode 100644 src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt create mode 100644 src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt diff --git a/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt new file mode 100644 index 0000000..ab9125c --- /dev/null +++ b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -0,0 +1,29 @@ +package com.liftric.job.queue + +import android.content.Context +import com.liftric.job.queue.rules.NetworkState +import dev.tmapps.konnection.Konnection +import dev.tmapps.konnection.NetworkConnection +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch + +actual class NetworkListener( + val context: Context, + actual var networkState: NetworkState = NetworkState.NONE, + actual val scope: CoroutineScope = CoroutineScope(context = Dispatchers.Default) +) { + actual fun observeNetworkState() { + scope.launch { + Konnection(context = context) + .observeNetworkConnection() + .collect { networkConnection -> + networkState = when (networkConnection) { + NetworkConnection.WIFI -> NetworkState.WIFI + NetworkConnection.MOBILE -> NetworkState.MOBILE + null -> NetworkState.NONE + } + } + } + } +} diff --git a/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt new file mode 100644 index 0000000..de33ab1 --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -0,0 +1,10 @@ +package com.liftric.job.queue + +import com.liftric.job.queue.rules.NetworkState +import kotlinx.coroutines.CoroutineScope + +expect class NetworkListener { + actual var networkState: NetworkState + val scope: CoroutineScope + fun observeNetworkState() +} diff --git a/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt new file mode 100644 index 0000000..184e625 --- /dev/null +++ b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -0,0 +1,27 @@ +package com.liftric.job.queue + +import com.liftric.job.queue.rules.NetworkState +import dev.tmapps.konnection.Konnection +import dev.tmapps.konnection.NetworkConnection +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach + +actual class NetworkListener( + actual var networkState: NetworkState = NetworkState.NONE, + actual val scope: CoroutineScope = CoroutineScope(context = Dispatchers.Default) +) { + actual fun observeNetworkState() { + Konnection() + .observeNetworkConnection() + .onEach { networkConnection -> + networkState = when (networkConnection) { + NetworkConnection.WIFI -> NetworkState.WIFI + NetworkConnection.MOBILE -> NetworkState.MOBILE + null -> NetworkState.NONE + } + } + .launchIn(scope) + } +} From ca347206b2508c798c8eb2eafa1f7ce932501bd4 Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 10:09:01 +0100 Subject: [PATCH 03/26] add NetworkListener to JobQueue --- .../kotlin/com/liftric/job/queue/JobQueue.kt | 12 +++-- .../kotlin/com/liftric/job/queue/JobQueue.kt | 47 ++++++++++++------- .../kotlin/com/liftric/job/queue/JobQueue.kt | 8 ++-- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/src/androidMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/androidMain/kotlin/com/liftric/job/queue/JobQueue.kt index 4a72fa0..0d3dd3e 100644 --- a/src/androidMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/androidMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -8,9 +8,13 @@ actual class JobQueue( context: Context, serializers: SerializersModule = SerializersModule {}, configuration: Queue.Configuration = Queue.DefaultConfiguration, - store: JsonStorage = SettingsStorage(SharedPreferencesSettings.Factory(context).create("com.liftric.persisted.queue")) + networkListener: NetworkListener, + store: JsonStorage = SettingsStorage( + SharedPreferencesSettings.Factory(context).create("com.liftric.persisted.queue") + ) ) : AbstractJobQueue( - serializers, - configuration, - store + serializers = serializers, + networkListener = networkListener, + configuration = configuration, + store = store ) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt index e99f351..1502083 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -3,7 +3,7 @@ package com.liftric.job.queue import com.liftric.job.queue.rules.* import kotlinx.atomicfu.atomic import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withLock @@ -18,12 +18,13 @@ import kotlinx.serialization.modules.plus import kotlinx.serialization.modules.polymorphic import kotlin.time.Duration.Companion.seconds -expect class JobQueue: AbstractJobQueue +expect class JobQueue : AbstractJobQueue abstract class AbstractJobQueue( serializers: SerializersModule, + val networkListener: NetworkListener, final override val configuration: Queue.Configuration, private val store: JsonStorage -): Queue { +) : Queue { private val module = SerializersModule { contextual(InstantIso8601Serializer) polymorphic(JobRule::class) { @@ -37,7 +38,7 @@ abstract class AbstractJobQueue( } private val format = Json { serializersModule = module + serializers } - val listener = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) + val jobEventListener = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) /** * Scheduled jobs @@ -65,6 +66,7 @@ abstract class AbstractJobQueue( init { if (configuration.startsAutomatically) { start() + networkListener.observeNetworkState() } } @@ -72,19 +74,28 @@ abstract class AbstractJobQueue( schedule(task(), configure) } - suspend fun schedule(data: Data, task: (Data) -> DataTask, configure: JobInfo.() -> JobInfo = { JobInfo() }) { + suspend fun schedule( + data: Data, + task: (Data) -> DataTask, + configure: JobInfo.() -> JobInfo = { JobInfo() } + ) { schedule(task(data), configure) } suspend fun schedule(task: Task, configure: JobInfo.() -> JobInfo = { JobInfo() }) { val info = configure(JobInfo()).apply { - rules.forEach { it.mutating(this) } + rules.forEach { rule -> + rule.mutating(this) + } } - val job = Job(task, info) + val job = Job( + task = task, + info = info + ) schedule(job).apply { - listener.emit(JobEvent.DidSchedule(job)) + jobEventListener.emit(JobEvent.DidSchedule(job)) } } @@ -99,7 +110,7 @@ abstract class AbstractJobQueue( queue.value = queue.value.plus(listOf(job)).sortedBy { it.startTime }.toMutableList() } catch (e: Throwable) { - listener.emit(JobEvent.DidThrowOnSchedule(e)) + jobEventListener.emit(JobEvent.DidThrowOnSchedule(e)) } private val delegate = JobDelegate() @@ -118,10 +129,10 @@ abstract class AbstractJobQueue( } is JobEvent.ShouldRepeat -> { schedule(event.job).apply { - listener.emit(JobEvent.DidScheduleRepeat(event.job)) + jobEventListener.emit(JobEvent.DidScheduleRepeat(event.job)) } } - else -> listener.emit(event) + else -> jobEventListener.emit(event) } } } @@ -135,11 +146,11 @@ abstract class AbstractJobQueue( job.delegate = delegate running.value[job.id] = configuration.scope.launch { try { - listener.emit(JobEvent.WillRun(job)) - val result = job.run() - listener.emit(result) + jobEventListener.emit(JobEvent.WillRun(job)) + val result = job.run(currentNetworkState = networkListener.networkState) + jobEventListener.emit(result) } catch (e: CancellationException) { - listener.emit(JobEvent.DidCancel(job)) + jobEventListener.emit(JobEvent.DidCancel(job)) } finally { if (job.info.shouldPersist) { store.remove(job.id.toString()) @@ -173,7 +184,9 @@ abstract class AbstractJobQueue( queue.value.clear() running.value.clear() configuration.scope.coroutineContext.cancelChildren() - if (clearStore) { store.clear() } + if (clearStore) { + store.clear() + } } } @@ -185,7 +198,7 @@ abstract class AbstractJobQueue( isCancelling.withLock { queue.value.firstOrNull { it.id == id }?.let { job -> queue.value.remove(job) - listener.emit(JobEvent.DidCancel(job)) + jobEventListener.emit(JobEvent.DidCancel(job)) } ?: running.value[id]?.cancel() } } diff --git a/src/iosMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/iosMain/kotlin/com/liftric/job/queue/JobQueue.kt index ab1aa64..74805f2 100644 --- a/src/iosMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/iosMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -7,9 +7,11 @@ import platform.Foundation.NSUserDefaults actual class JobQueue( serializers: SerializersModule = SerializersModule {}, configuration: Queue.Configuration = Queue.DefaultConfiguration, + networkListener: NetworkListener, store: JsonStorage = SettingsStorage(NSUserDefaultsSettings(NSUserDefaults("com.liftric.persisted.queue"))) ) : AbstractJobQueue( - serializers, - configuration, - store + serializers = serializers, + networkListener = networkListener, + configuration = configuration, + store = store ) From 4cf22f89fe9816f722b11880656636a773ce5088 Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 10:09:38 +0100 Subject: [PATCH 04/26] implement NetworkRule --- .../kotlin/com/liftric/job/queue/Job.kt | 28 +++++++++++++------ .../com/liftric/job/queue/JobContext.kt | 9 ++++-- .../kotlin/com/liftric/job/queue/JobInfo.kt | 4 ++- .../liftric/job/queue/rules/NetworkRule.kt | 25 +++++++++++++++++ 4 files changed, 55 insertions(+), 11 deletions(-) create mode 100644 src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt diff --git a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt index 3b826b7..6b081d9 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt @@ -1,5 +1,7 @@ package com.liftric.job.queue +import com.liftric.job.queue.rules.NetworkException +import com.liftric.job.queue.rules.NetworkState import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.withTimeout @@ -19,21 +21,31 @@ data class Job( override val info: JobInfo, override val task: Task, override val startTime: Instant -): JobContext { - @Transient internal var delegate: JobDelegate? = null +) : JobContext { + @Transient + internal var delegate: JobDelegate? = null - constructor(task: Task, info: JobInfo) : this (UUIDFactory.create(), info, task, Clock.System.now()) + constructor(task: Task, info: JobInfo) : this( + UUIDFactory.create(), + info, + task, + Clock.System.now() + ) private var canRepeat: Boolean = true - suspend fun run(): JobEvent { + suspend fun run(currentNetworkState: NetworkState): JobEvent { return withTimeout(info.timeout) { val event = try { info.rules.forEach { it.willRun(this@Job) } - - task.body() - - JobEvent.DidSucceed(this@Job) + if (info.minRequiredNetworkState <= currentNetworkState) { + println("NETWORK: satisfied") + task.body() + JobEvent.DidSucceed(this@Job) + } else { + println("NETWORK: unsatisfied") + throw NetworkException("Network requirement not satisfied!") + } } catch (e: CancellationException) { throw e } catch (e: Throwable) { diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobContext.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobContext.kt index 537d691..5f33e46 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobContext.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobContext.kt @@ -2,9 +2,14 @@ package com.liftric.job.queue import kotlinx.datetime.Instant -interface JobContext: JobData { +interface JobContext : JobData { suspend fun cancel() - suspend fun repeat(id: UUID = this.id, info: JobInfo = this.info, task: Task = this.task, startTime: Instant = this.startTime) + suspend fun repeat( + id: UUID = this.id, + info: JobInfo = this.info, + task: Task = this.task, + startTime: Instant = this.startTime, + ) } interface JobData { diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt index c19e15c..6cf4d6a 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt @@ -1,5 +1,6 @@ package com.liftric.job.queue +import com.liftric.job.queue.rules.NetworkState import kotlin.time.Duration import kotlinx.serialization.Serializable @@ -8,5 +9,6 @@ data class JobInfo( var tag: String? = null, var timeout: Duration = Duration.INFINITE, var rules: MutableList = mutableListOf(), - var shouldPersist: Boolean = false + var shouldPersist: Boolean = false, + var minRequiredNetworkState: NetworkState = NetworkState.NONE ) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt new file mode 100644 index 0000000..ae6c0cd --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt @@ -0,0 +1,25 @@ +package com.liftric.job.queue.rules + +import com.liftric.job.queue.JobInfo +import com.liftric.job.queue.JobRule + +data class NetworkRule(val minRequiredNetworkState: NetworkState) : JobRule() { + override suspend fun mutating(info: JobInfo) { + info.minRequiredNetworkState = minRequiredNetworkState + } +} + +fun JobInfo.minRequiredNetwork(networkState: NetworkState): JobInfo { + val rule = NetworkRule(networkState) + rules.add(rule) + return this +} + +class NetworkException(message: String) : Exception(message) + +@kotlinx.serialization.Serializable +enum class NetworkState { + NONE, + MOBILE, + WIFI +} From bd964879da014421341ebe953edf08998efc60e2 Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 10:10:08 +0100 Subject: [PATCH 05/26] update tests --- .../com/liftric/job/queue/JobQueueTests.kt | 22 +++++--- .../com/liftric/job/queue/JobQueueTests.kt | 56 +++++++++++++++++-- .../com/liftric/job/queue/JobQueueTests.kt | 19 ++++--- 3 files changed, 74 insertions(+), 23 deletions(-) diff --git a/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index cc1776d..ebc2245 100644 --- a/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -7,12 +7,16 @@ import kotlinx.serialization.modules.polymorphic import org.junit.runner.RunWith @RunWith(AndroidJUnit4::class) -actual class JobQueueTests: AbstractJobQueueTests(JobQueue( - context = InstrumentationRegistry.getInstrumentation().targetContext, - serializers = SerializersModule { - polymorphic(Task::class) { - subclass(TestTask::class, TestTask.serializer()) - } - }, - store = MapStorage() -)) +actual class JobQueueTests : AbstractJobQueueTests( + JobQueue( + context = InstrumentationRegistry.getInstrumentation().targetContext, + serializers = SerializersModule { + polymorphic(Task::class) { + subclass(TestTask::class, TestTask.serializer()) + } + }, + store = MapStorage(), + networkListener = NetworkListener(context = InstrumentationRegistry.getInstrumentation().context) + ) +) + diff --git a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index 8a6791e..d889b98 100644 --- a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -5,7 +5,7 @@ import kotlinx.coroutines.* import kotlin.test.* import kotlin.time.Duration.Companion.seconds -expect class JobQueueTests: AbstractJobQueueTests +expect class JobQueueTests : AbstractJobQueueTests abstract class AbstractJobQueueTests(private val queue: JobQueue) { @AfterTest fun tearDown() = runBlocking { @@ -18,7 +18,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { runBlocking { val id = UUIDFactory.create().toString() val job = async { - queue.listener.collect { + queue.jobEventListener.collect { println(it) } } @@ -49,7 +49,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { fun testRetry() = runBlocking { var count = 0 val job = launch { - queue.listener.collect { + queue.jobEventListener.collect { println(it) if (it is JobEvent.DidScheduleRepeat) { count += 1 @@ -73,7 +73,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { fun testCancelDuringRun() { runBlocking { val listener = launch { - queue.listener.collect { + queue.jobEventListener.collect { println(it) if (it is JobEvent.DidSucceed || it is JobEvent.DidFail) fail("Continued after run") if (it is JobEvent.WillRun) { @@ -103,7 +103,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { val completable = CompletableDeferred() launch { - queue.listener.collect { + queue.jobEventListener.collect { println(it) if (it is JobEvent.DidSucceed || it is JobEvent.DidFail) fail("Continued after run") if (it is JobEvent.DidSchedule) { @@ -130,7 +130,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { fun testCancelByIdAfterEnqueue() { runBlocking { launch { - queue.listener.collect { + queue.jobEventListener.collect { println(it) if (it is JobEvent.DidSchedule) { delay(3000L) @@ -168,4 +168,48 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { assertEquals(1, queue.numberOfJobs) } + + @Test + fun testNetworkRuleSatisfied() = runBlocking { + val id = UUIDFactory.create().toString() + val job = async { + queue.jobEventListener.collect { + println("TEST -> JOB INFO: $it") + assertTrue(it is JobEvent.DidSucceed) + } + } + + queue.schedule(TestData(id), ::TestTask) { + minRequiredNetwork(NetworkState.MOBILE) + } + + queue.networkListener.networkState = NetworkState.WIFI + println("Network State: ${queue.networkListener.networkState}") + + queue.start() + delay(200) + job.cancel() + } + + @Test + fun testNetworkRuleUnSatisfied() = runBlocking { + val id = UUIDFactory.create().toString() + val job = launch { + queue.jobEventListener.collect { + println("TEST -> JOB INFO: $it") + assertTrue(it is JobEvent.DidFail) + } + } + + queue.schedule(TestData(id), ::TestTask) { + minRequiredNetwork(NetworkState.WIFI) + } + + queue.networkListener.networkState = NetworkState.NONE + println("Network State: ${queue.networkListener.networkState}") + + queue.start() + delay(200) + job.cancel() + } } diff --git a/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index 0891133..395b67a 100644 --- a/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -3,11 +3,14 @@ package com.liftric.job.queue import kotlinx.serialization.modules.SerializersModule import kotlinx.serialization.modules.polymorphic -actual class JobQueueTests: AbstractJobQueueTests(JobQueue( - serializers = SerializersModule { - polymorphic(Task::class) { - subclass(TestTask::class, TestTask.serializer()) - } - }, - store = MapStorage() -)) +actual class JobQueueTests : AbstractJobQueueTests( + JobQueue( + serializers = SerializersModule { + polymorphic(Task::class) { + subclass(TestTask::class, TestTask.serializer()) + } + }, + store = MapStorage(), + networkListener = NetworkListener() + ) +) From 65b1a20335cd5c7c288ca7c7234417ed9b2ab161 Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 13:00:44 +0100 Subject: [PATCH 06/26] move network check inside willRun --- .../kotlin/com/liftric/job/queue/Job.kt | 48 ++++++++----------- .../kotlin/com/liftric/job/queue/JobRule.kt | 3 +- .../com/liftric/job/queue/rules/DelayRule.kt | 2 +- .../liftric/job/queue/rules/NetworkRule.kt | 10 ++++ 4 files changed, 34 insertions(+), 29 deletions(-) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt index 6b081d9..ca2bac8 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt @@ -1,10 +1,8 @@ package com.liftric.job.queue -import com.liftric.job.queue.rules.NetworkException import com.liftric.job.queue.rules.NetworkState import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.withTimeout import kotlinx.datetime.Clock import kotlinx.datetime.Instant import kotlinx.serialization.Serializable @@ -35,34 +33,30 @@ data class Job( private var canRepeat: Boolean = true suspend fun run(currentNetworkState: NetworkState): JobEvent { - return withTimeout(info.timeout) { - val event = try { - info.rules.forEach { it.willRun(this@Job) } - if (info.minRequiredNetworkState <= currentNetworkState) { - println("NETWORK: satisfied") - task.body() - JobEvent.DidSucceed(this@Job) - } else { - println("NETWORK: unsatisfied") - throw NetworkException("Network requirement not satisfied!") - } - } catch (e: CancellationException) { - throw e - } catch (e: Throwable) { - canRepeat = task.onRepeat(e) - JobEvent.DidFail(this@Job, e) + val event = try { + info.rules.forEach { + it.willRun( + context = this@Job, + currentNetworkState = currentNetworkState + ) } + task.body() + JobEvent.DidSucceed(this@Job) + } catch (e: CancellationException) { + throw e + } catch (e: Throwable) { + canRepeat = task.onRepeat(e) + JobEvent.DidFail(this@Job, e) + } - try { - info.rules.forEach { it.willRemove(this@Job, event) } - - event - } catch (e: CancellationException) { - throw e - } catch (e: Throwable) { - JobEvent.DidFailOnRemove(this@Job, e) - } + try { + info.rules.forEach { it.willRemove(this@Job, event) } + } catch (e: CancellationException) { + throw e + } catch (e: Throwable) { + JobEvent.DidFailOnRemove(this@Job, e) } + return event } override suspend fun cancel() { diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt index 5d512ab..da7c424 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt @@ -1,5 +1,6 @@ package com.liftric.job.queue +import com.liftric.job.queue.rules.NetworkState import kotlinx.serialization.Serializable @Serializable @@ -7,6 +8,6 @@ abstract class JobRule { open suspend fun mutating(info: JobInfo) {} @Throws(Throwable::class) open suspend fun willSchedule(queue: Queue, context: JobContext) {} - open suspend fun willRun(context: JobContext) {} + open suspend fun willRun(context: JobContext, currentNetworkState: NetworkState) {} open suspend fun willRemove(context: JobContext, result: JobEvent) {} } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt index a200305..a28b486 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt @@ -8,7 +8,7 @@ import kotlinx.serialization.Serializable @Serializable data class DelayRule(val duration: Duration = 0.seconds): JobRule() { - override suspend fun willRun(context: JobContext) { + override suspend fun willRun(context: JobContext, currentNetworkState: NetworkState) { delay(duration) } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt index ae6c0cd..0472f03 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt @@ -1,5 +1,6 @@ package com.liftric.job.queue.rules +import com.liftric.job.queue.JobContext import com.liftric.job.queue.JobInfo import com.liftric.job.queue.JobRule @@ -7,6 +8,15 @@ data class NetworkRule(val minRequiredNetworkState: NetworkState) : JobRule() { override suspend fun mutating(info: JobInfo) { info.minRequiredNetworkState = minRequiredNetworkState } + + override suspend fun willRun(context: JobContext, currentNetworkState: NetworkState) { + if (context.info.minRequiredNetworkState > currentNetworkState) { + println("NETWORK: unsatisfied") + throw NetworkException("Network requirement not satisfied!") + } else { + println("NETWORK: satisfied") + } + } } fun JobInfo.minRequiredNetwork(networkState: NetworkState): JobInfo { From 8efa999f47cf995408007d25ecae6f8f6f7331ed Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 13:01:11 +0100 Subject: [PATCH 07/26] move withTimeout to JobQueue --- src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt index 1502083..61ceec6 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -147,9 +147,14 @@ abstract class AbstractJobQueue( running.value[job.id] = configuration.scope.launch { try { jobEventListener.emit(JobEvent.WillRun(job)) - val result = job.run(currentNetworkState = networkListener.networkState) - jobEventListener.emit(result) + withTimeout(job.info.timeout) { + val result = job.run(currentNetworkState = networkListener.networkState) + jobEventListener.emit(result) + } } catch (e: CancellationException) { + if(e is TimeoutCancellationException) { + println("Timeout exceeded") + } jobEventListener.emit(JobEvent.DidCancel(job)) } finally { if (job.info.shouldPersist) { From 2635030c9552ae91fd702dbff2779010faba57e8 Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 13:01:37 +0100 Subject: [PATCH 08/26] update network rule test --- src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index d889b98..725bc90 100644 --- a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -192,7 +192,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { } @Test - fun testNetworkRuleUnSatisfied() = runBlocking { + fun testNetworkRuleUnsatisfied() = runBlocking { val id = UUIDFactory.create().toString() val job = launch { queue.jobEventListener.collect { @@ -205,7 +205,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { minRequiredNetwork(NetworkState.WIFI) } - queue.networkListener.networkState = NetworkState.NONE + queue.networkListener.networkState = NetworkState.MOBILE println("Network State: ${queue.networkListener.networkState}") queue.start() From d206da34991e4ba66c59390c87fa9ae3ec680398 Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 14:02:55 +0100 Subject: [PATCH 09/26] remove Konnection library --- build.gradle.kts | 1 - settings.gradle.kts | 1 - 2 files changed, 2 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 356b4eb..7ad9a25 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -30,7 +30,6 @@ kotlin { implementation(libs.kotlinx.datetime) implementation(libs.kotlinx.atomicfu) implementation(libs.multiplatform.settings) - implementation(libs.konnection) } } val commonTest by getting { diff --git a/settings.gradle.kts b/settings.gradle.kts index d29841f..43f8a79 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -29,7 +29,6 @@ dependencyResolutionManagement { library("roboelectric", "org.robolectric", "robolectric").version("4.5.1") library("multiplatform-settings", "com.russhwolf", "multiplatform-settings").version("1.0.0-RC") library("multiplatform-settings-test", "com.russhwolf", "multiplatform-settings-test").version("1.0.0-RC") - library("konnection", "dev.tmapps", "konnection").version("1.1.10") plugin("versioning", "net.nemerosa.versioning").version("3.0.0") plugin("kotlin.serialization", "org.jetbrains.kotlin.plugin.serialization").versionRef("kotlin") } From 493210803415af44328c2b7f4b74a4e95c4dfa55 Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 15:45:43 +0100 Subject: [PATCH 10/26] implement NetworkManager without Konnection and update NetworkListener --- src/androidMain/AndroidManifest.xml | 6 ++ .../com/liftric/job/queue/NetworkListener.kt | 18 +++--- .../com/liftric/job/queue/NetworkManager.kt | 53 ++++++++++++++++ .../kotlin/com/liftric/job/queue/JobQueue.kt | 1 + .../com/liftric/job/queue/NetworkListener.kt | 13 +++- .../liftric/job/queue/rules/NetworkRule.kt | 11 +--- .../com/liftric/job/queue/NetworkListener.kt | 29 +++++---- .../com/liftric/job/queue/NetworkManager.kt | 60 +++++++++++++++++++ 8 files changed, 160 insertions(+), 31 deletions(-) create mode 100644 src/androidMain/AndroidManifest.xml create mode 100644 src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt create mode 100644 src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt diff --git a/src/androidMain/AndroidManifest.xml b/src/androidMain/AndroidManifest.xml new file mode 100644 index 0000000..a37531b --- /dev/null +++ b/src/androidMain/AndroidManifest.xml @@ -0,0 +1,6 @@ + + + + + diff --git a/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt index ab9125c..8923039 100644 --- a/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt +++ b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -1,11 +1,9 @@ package com.liftric.job.queue import android.content.Context -import com.liftric.job.queue.rules.NetworkState -import dev.tmapps.konnection.Konnection -import dev.tmapps.konnection.NetworkConnection import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.launch actual class NetworkListener( @@ -15,15 +13,19 @@ actual class NetworkListener( ) { actual fun observeNetworkState() { scope.launch { - Konnection(context = context) + NetworkManager(context) .observeNetworkConnection() - .collect { networkConnection -> - networkState = when (networkConnection) { - NetworkConnection.WIFI -> NetworkState.WIFI - NetworkConnection.MOBILE -> NetworkState.MOBILE + .collectLatest { currentNetworkState -> + networkState = when (currentNetworkState) { + NetworkState.NONE -> NetworkState.NONE + NetworkState.MOBILE -> NetworkState.MOBILE + NetworkState.WIFI -> NetworkState.WIFI null -> NetworkState.NONE } } } } + + // not needed for Android + actual fun stopMonitoring() {} } diff --git a/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt b/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt new file mode 100644 index 0000000..cc06c67 --- /dev/null +++ b/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt @@ -0,0 +1,53 @@ +package com.liftric.job.queue + +import android.annotation.TargetApi +import android.content.Context +import android.net.ConnectivityManager +import android.net.NetworkCapabilities +import android.os.Build +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow + +actual class NetworkManager(context: Context) { + private var connectivityManager: ConnectivityManager = + context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager + + private val connectionPublisher = MutableStateFlow(getCurrentNetworkConnection()) + + fun observeNetworkConnection(): Flow = connectionPublisher + + private fun getCurrentNetworkConnection(): NetworkState? = + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) { + postAndroidMNetworkConnection(connectivityManager) + } else { + preAndroidMNetworkConnection(connectivityManager) + } + + @TargetApi(Build.VERSION_CODES.M) + private fun postAndroidMNetworkConnection(connectivityManager: ConnectivityManager): NetworkState? { + val network = connectivityManager.activeNetwork + val capabilities = connectivityManager.getNetworkCapabilities(network) + return getNetworkConnection(capabilities) + } + + @Suppress("DEPRECATION") + private fun preAndroidMNetworkConnection(connectivityManager: ConnectivityManager): NetworkState? = + when (connectivityManager.activeNetworkInfo?.type) { + null -> null + ConnectivityManager.TYPE_WIFI -> NetworkState.WIFI + else -> NetworkState.MOBILE + } + + private fun getNetworkConnection(capabilities: NetworkCapabilities?): NetworkState? = + when { + capabilities == null -> null + Build.VERSION.SDK_INT < Build.VERSION_CODES.M + && !capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) -> null + Build.VERSION.SDK_INT >= Build.VERSION_CODES.M && + !(capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) + && capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_VALIDATED)) -> null + capabilities.hasTransport(NetworkCapabilities.TRANSPORT_WIFI) -> NetworkState.WIFI + capabilities.hasTransport(NetworkCapabilities.TRANSPORT_CELLULAR) -> NetworkState.MOBILE + else -> null + } +} diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt index 61ceec6..a64ba2c 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -155,6 +155,7 @@ abstract class AbstractJobQueue( if(e is TimeoutCancellationException) { println("Timeout exceeded") } + networkListener.stopMonitoring() jobEventListener.emit(JobEvent.DidCancel(job)) } finally { if (job.info.shouldPersist) { diff --git a/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt index de33ab1..59dbeae 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -1,10 +1,19 @@ package com.liftric.job.queue -import com.liftric.job.queue.rules.NetworkState import kotlinx.coroutines.CoroutineScope +expect class NetworkManager + expect class NetworkListener { - actual var networkState: NetworkState + var networkState: NetworkState val scope: CoroutineScope fun observeNetworkState() + fun stopMonitoring() +} + +@kotlinx.serialization.Serializable +enum class NetworkState { + NONE, + MOBILE, + WIFI } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt index 0472f03..0166889 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt @@ -3,6 +3,7 @@ package com.liftric.job.queue.rules import com.liftric.job.queue.JobContext import com.liftric.job.queue.JobInfo import com.liftric.job.queue.JobRule +import com.liftric.job.queue.NetworkState data class NetworkRule(val minRequiredNetworkState: NetworkState) : JobRule() { override suspend fun mutating(info: JobInfo) { @@ -11,10 +12,10 @@ data class NetworkRule(val minRequiredNetworkState: NetworkState) : JobRule() { override suspend fun willRun(context: JobContext, currentNetworkState: NetworkState) { if (context.info.minRequiredNetworkState > currentNetworkState) { - println("NETWORK: unsatisfied") + println("NETWORK RULE: unsatisfied") throw NetworkException("Network requirement not satisfied!") } else { - println("NETWORK: satisfied") + println("NETWORK RULE: satisfied") } } } @@ -27,9 +28,3 @@ fun JobInfo.minRequiredNetwork(networkState: NetworkState): JobInfo { class NetworkException(message: String) : Exception(message) -@kotlinx.serialization.Serializable -enum class NetworkState { - NONE, - MOBILE, - WIFI -} diff --git a/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt index 184e625..9fa2547 100644 --- a/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt +++ b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -1,27 +1,30 @@ package com.liftric.job.queue -import com.liftric.job.queue.rules.NetworkState -import dev.tmapps.konnection.Konnection -import dev.tmapps.konnection.NetworkConnection import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.launch actual class NetworkListener( actual var networkState: NetworkState = NetworkState.NONE, actual val scope: CoroutineScope = CoroutineScope(context = Dispatchers.Default) ) { + private val networkManager = NetworkManager() + actual fun observeNetworkState() { - Konnection() - .observeNetworkConnection() - .onEach { networkConnection -> - networkState = when (networkConnection) { - NetworkConnection.WIFI -> NetworkState.WIFI - NetworkConnection.MOBILE -> NetworkState.MOBILE - null -> NetworkState.NONE + networkManager.startMonitoring() + scope.launch { + networkManager.network.collectLatest { currentNetworkState -> + networkState = when (currentNetworkState) { + NetworkState.NONE -> NetworkState.NONE + NetworkState.MOBILE -> NetworkState.MOBILE + NetworkState.WIFI -> NetworkState.WIFI } } - .launchIn(scope) + } + } + + actual fun stopMonitoring() { + networkManager.stopMonitoring() } } diff --git a/src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt b/src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt new file mode 100644 index 0000000..de48e77 --- /dev/null +++ b/src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt @@ -0,0 +1,60 @@ +package com.liftric.job.queue + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.receiveAsFlow +import platform.Network.* +import platform.darwin.DISPATCH_QUEUE_PRIORITY_DEFAULT +import platform.darwin.dispatch_queue_attr_make_with_qos_class +import platform.darwin.dispatch_queue_create +import platform.posix.QOS_CLASS_UTILITY + +class NetworkManager { + private val networkChannel = Channel(Channel.UNLIMITED) + val network: Flow = networkChannel.receiveAsFlow() + + private val networkMonitor = object : nw_path_monitor_update_handler_t { + override fun invoke(network: nw_path_t) { + checkReachability(network) + } + + } + private val nwPathMonitor: nw_path_monitor_t = nw_path_monitor_create().apply { + val queue = dispatch_queue_create( + "com.liftric.job.queue", + dispatch_queue_attr_make_with_qos_class( + null, + QOS_CLASS_UTILITY, + DISPATCH_QUEUE_PRIORITY_DEFAULT + ) + ) + nw_path_monitor_set_queue( + this, + queue + ) + nw_path_monitor_set_update_handler(this, networkMonitor) + } + + fun startMonitoring() { + nw_path_monitor_start(nwPathMonitor) + } + + fun stopMonitoring() { + nw_path_monitor_cancel(nwPathMonitor) + } + + private fun checkReachability(network: nw_path_t) { + when (nw_path_get_status(network)) { + nw_path_status_satisfied -> { + if (nw_path_uses_interface_type(network, nw_interface_type_wifi)) { + networkChannel.trySend(NetworkState.WIFI) + } else if (nw_path_uses_interface_type(network, nw_interface_type_cellular)) { + networkChannel.trySend(NetworkState.MOBILE) + } + } + nw_path_status_unsatisfied -> { + networkChannel.trySend(NetworkState.NONE) + } + } + } +} From a9c940fa1892bc0107167828aecadd4b45bec378 Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 15:46:00 +0100 Subject: [PATCH 11/26] code clean-up --- src/commonMain/kotlin/com/liftric/job/queue/Job.kt | 1 - src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt | 1 - src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt | 1 - 3 files changed, 3 deletions(-) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt index ca2bac8..3f4b976 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt @@ -1,6 +1,5 @@ package com.liftric.job.queue -import com.liftric.job.queue.rules.NetworkState import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.datetime.Clock diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt index 6cf4d6a..c14e415 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt @@ -1,6 +1,5 @@ package com.liftric.job.queue -import com.liftric.job.queue.rules.NetworkState import kotlin.time.Duration import kotlinx.serialization.Serializable diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt index da7c424..2a23fe8 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt @@ -1,6 +1,5 @@ package com.liftric.job.queue -import com.liftric.job.queue.rules.NetworkState import kotlinx.serialization.Serializable @Serializable From a9c7358fad379daba4aee68f1eb4af6315c62c8f Mon Sep 17 00:00:00 2001 From: onalcan Date: Mon, 20 Mar 2023 15:46:18 +0100 Subject: [PATCH 12/26] fix flaky tests --- .../com/liftric/job/queue/JobQueueTests.kt | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index 725bc90..90f0d79 100644 --- a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -172,9 +172,13 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { @Test fun testNetworkRuleSatisfied() = runBlocking { val id = UUIDFactory.create().toString() - val job = async { + launch { queue.jobEventListener.collect { println("TEST -> JOB INFO: $it") + if (it is JobEvent.WillRun) { + return@collect + } + cancel() assertTrue(it is JobEvent.DidSucceed) } } @@ -187,16 +191,18 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { println("Network State: ${queue.networkListener.networkState}") queue.start() - delay(200) - job.cancel() } @Test fun testNetworkRuleUnsatisfied() = runBlocking { val id = UUIDFactory.create().toString() - val job = launch { + launch { queue.jobEventListener.collect { println("TEST -> JOB INFO: $it") + if (it is JobEvent.WillRun) { + return@collect + } + cancel() assertTrue(it is JobEvent.DidFail) } } @@ -209,7 +215,5 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { println("Network State: ${queue.networkListener.networkState}") queue.start() - delay(200) - job.cancel() } } From 51f2b878bc383da7e425922a68db934ca0b6c81f Mon Sep 17 00:00:00 2001 From: onalcan Date: Tue, 21 Mar 2023 09:44:05 +0100 Subject: [PATCH 13/26] add missing actual keyword for ios NetworkManager --- src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt b/src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt index de48e77..fa3aed1 100644 --- a/src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt +++ b/src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt @@ -9,7 +9,7 @@ import platform.darwin.dispatch_queue_attr_make_with_qos_class import platform.darwin.dispatch_queue_create import platform.posix.QOS_CLASS_UTILITY -class NetworkManager { +actual class NetworkManager { private val networkChannel = Channel(Channel.UNLIMITED) val network: Flow = networkChannel.receiveAsFlow() From e1d8a135eb981044a807d1154b03099b76ad64ae Mon Sep 17 00:00:00 2001 From: onalcan Date: Tue, 21 Mar 2023 10:14:53 +0100 Subject: [PATCH 14/26] @SuppressLint("MissingPermission") - AndroidManifest and permissions are already added --- src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt b/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt index cc06c67..4983e3d 100644 --- a/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt +++ b/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt @@ -1,5 +1,6 @@ package com.liftric.job.queue +import android.annotation.SuppressLint import android.annotation.TargetApi import android.content.Context import android.net.ConnectivityManager @@ -8,6 +9,7 @@ import android.os.Build import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow +@SuppressLint("MissingPermission") actual class NetworkManager(context: Context) { private var connectivityManager: ConnectivityManager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager From 208effcea699fddd32a6385cd47a7020fa878b75 Mon Sep 17 00:00:00 2001 From: onalcan Date: Tue, 21 Mar 2023 15:23:23 +0100 Subject: [PATCH 15/26] use abstract NetworkListener --- .../com/liftric/job/queue/NetworkListener.kt | 31 +++++++++++++------ .../com/liftric/job/queue/NetworkListener.kt | 15 ++++++--- .../com/liftric/job/queue/NetworkListener.kt | 28 +++++++++++------ 3 files changed, 51 insertions(+), 23 deletions(-) diff --git a/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt index 8923039..375d2e4 100644 --- a/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt +++ b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -3,29 +3,40 @@ package com.liftric.job.queue import android.content.Context import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.launch actual class NetworkListener( val context: Context, - actual var networkState: NetworkState = NetworkState.NONE, - actual val scope: CoroutineScope = CoroutineScope(context = Dispatchers.Default) + scope: CoroutineScope = CoroutineScope(context = Dispatchers.Default) +) : AbstractNetworkListener( + scope = scope ) { - actual fun observeNetworkState() { + private val _currentNetworkState = MutableSharedFlow(replay = 1) + override val currentNetworkState: SharedFlow + get() = _currentNetworkState.asSharedFlow() + + override fun observeNetworkState() { scope.launch { NetworkManager(context) .observeNetworkConnection() .collectLatest { currentNetworkState -> - networkState = when (currentNetworkState) { - NetworkState.NONE -> NetworkState.NONE - NetworkState.MOBILE -> NetworkState.MOBILE - NetworkState.WIFI -> NetworkState.WIFI - null -> NetworkState.NONE - } + _currentNetworkState.emit( + when (currentNetworkState) { + NetworkState.NONE -> NetworkState.NONE + NetworkState.MOBILE -> NetworkState.MOBILE + NetworkState.WIFI -> NetworkState.WIFI + null -> NetworkState.NONE + } + ) } } } + // not needed for Android - actual fun stopMonitoring() {} + override fun stopMonitoring() {} } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt index 59dbeae..d548282 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -1,14 +1,21 @@ package com.liftric.job.queue import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.SharedFlow expect class NetworkManager -expect class NetworkListener { - var networkState: NetworkState +expect class NetworkListener : AbstractNetworkListener +abstract class AbstractNetworkListener( val scope: CoroutineScope - fun observeNetworkState() - fun stopMonitoring() +) { + abstract val currentNetworkState: SharedFlow + abstract fun observeNetworkState() + abstract fun stopMonitoring() + + fun isNetworkRuleSatisfied(jobInfo: JobInfo, currentNetworkState: NetworkState): Boolean { + return currentNetworkState >= jobInfo.minRequiredNetworkState + } } @kotlinx.serialization.Serializable diff --git a/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt index 9fa2547..7d309fd 100644 --- a/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt +++ b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -2,29 +2,39 @@ package com.liftric.job.queue import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.launch actual class NetworkListener( - actual var networkState: NetworkState = NetworkState.NONE, - actual val scope: CoroutineScope = CoroutineScope(context = Dispatchers.Default) + scope: CoroutineScope = CoroutineScope(context = Dispatchers.Default) +) : AbstractNetworkListener( + scope = scope ) { private val networkManager = NetworkManager() - actual fun observeNetworkState() { + private val _currentNetworkState = MutableSharedFlow(replay = 1) + override val currentNetworkState: SharedFlow + get() = _currentNetworkState.asSharedFlow() + + override fun observeNetworkState() { networkManager.startMonitoring() scope.launch { networkManager.network.collectLatest { currentNetworkState -> - networkState = when (currentNetworkState) { - NetworkState.NONE -> NetworkState.NONE - NetworkState.MOBILE -> NetworkState.MOBILE - NetworkState.WIFI -> NetworkState.WIFI - } + _currentNetworkState.emit( + when (currentNetworkState) { + NetworkState.NONE -> NetworkState.NONE + NetworkState.MOBILE -> NetworkState.MOBILE + NetworkState.WIFI -> NetworkState.WIFI + } + ) } } } - actual fun stopMonitoring() { + override fun stopMonitoring() { networkManager.stopMonitoring() } } From 0bb1b79a3eb77c525f42835bfbe2397a83f03323 Mon Sep 17 00:00:00 2001 From: onalcan Date: Tue, 21 Mar 2023 15:23:50 +0100 Subject: [PATCH 16/26] observe network changes in the queue --- .../kotlin/com/liftric/job/queue/Job.kt | 7 ++----- .../kotlin/com/liftric/job/queue/JobQueue.kt | 19 +++++++++++++------ .../kotlin/com/liftric/job/queue/JobRule.kt | 2 +- .../com/liftric/job/queue/rules/DelayRule.kt | 2 +- .../liftric/job/queue/rules/NetworkRule.kt | 10 ---------- 5 files changed, 17 insertions(+), 23 deletions(-) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt index 3f4b976..ffb176a 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt @@ -31,13 +31,10 @@ data class Job( private var canRepeat: Boolean = true - suspend fun run(currentNetworkState: NetworkState): JobEvent { + suspend fun run(): JobEvent { val event = try { info.rules.forEach { - it.willRun( - context = this@Job, - currentNetworkState = currentNetworkState - ) + it.willRun(context = this@Job) } task.body() JobEvent.DidSucceed(this@Job) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt index a64ba2c..18b1816 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -66,7 +66,6 @@ abstract class AbstractJobQueue( init { if (configuration.startsAutomatically) { start() - networkListener.observeNetworkState() } } @@ -146,13 +145,21 @@ abstract class AbstractJobQueue( job.delegate = delegate running.value[job.id] = configuration.scope.launch { try { - jobEventListener.emit(JobEvent.WillRun(job)) - withTimeout(job.info.timeout) { - val result = job.run(currentNetworkState = networkListener.networkState) - jobEventListener.emit(result) + networkListener.currentNetworkState.collect { currentNetworkState -> + val isNetworkRuleSatisfied = networkListener.isNetworkRuleSatisfied( + jobInfo = job.info, + currentNetworkState = currentNetworkState + ) + if (isNetworkRuleSatisfied) { + jobEventListener.emit(JobEvent.WillRun(job)) + withTimeout(job.info.timeout) { + val result = job.run() + jobEventListener.emit(result) + } + } } } catch (e: CancellationException) { - if(e is TimeoutCancellationException) { + if (e is TimeoutCancellationException) { println("Timeout exceeded") } networkListener.stopMonitoring() diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt index 2a23fe8..5d512ab 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt @@ -7,6 +7,6 @@ abstract class JobRule { open suspend fun mutating(info: JobInfo) {} @Throws(Throwable::class) open suspend fun willSchedule(queue: Queue, context: JobContext) {} - open suspend fun willRun(context: JobContext, currentNetworkState: NetworkState) {} + open suspend fun willRun(context: JobContext) {} open suspend fun willRemove(context: JobContext, result: JobEvent) {} } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt index a28b486..a200305 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt @@ -8,7 +8,7 @@ import kotlinx.serialization.Serializable @Serializable data class DelayRule(val duration: Duration = 0.seconds): JobRule() { - override suspend fun willRun(context: JobContext, currentNetworkState: NetworkState) { + override suspend fun willRun(context: JobContext) { delay(duration) } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt index 0166889..08f982c 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt @@ -1,6 +1,5 @@ package com.liftric.job.queue.rules -import com.liftric.job.queue.JobContext import com.liftric.job.queue.JobInfo import com.liftric.job.queue.JobRule import com.liftric.job.queue.NetworkState @@ -9,15 +8,6 @@ data class NetworkRule(val minRequiredNetworkState: NetworkState) : JobRule() { override suspend fun mutating(info: JobInfo) { info.minRequiredNetworkState = minRequiredNetworkState } - - override suspend fun willRun(context: JobContext, currentNetworkState: NetworkState) { - if (context.info.minRequiredNetworkState > currentNetworkState) { - println("NETWORK RULE: unsatisfied") - throw NetworkException("Network requirement not satisfied!") - } else { - println("NETWORK RULE: satisfied") - } - } } fun JobInfo.minRequiredNetwork(networkState: NetworkState): JobInfo { From bac0add9dc6787da81e2a3ce63cfa1bb3b163b0a Mon Sep 17 00:00:00 2001 From: onalcan Date: Wed, 22 Mar 2023 11:01:17 +0100 Subject: [PATCH 17/26] update NetworkListener: - use StateFlow - take NetworkManager as parameter --- .../com/liftric/job/queue/NetworkListener.kt | 21 +++++++++---------- .../com/liftric/job/queue/JobQueueTests.kt | 4 +++- .../com/liftric/job/queue/NetworkListener.kt | 5 +++-- .../com/liftric/job/queue/NetworkListener.kt | 16 +++++++------- .../com/liftric/job/queue/JobQueueTests.kt | 2 +- 5 files changed, 25 insertions(+), 23 deletions(-) diff --git a/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt index 375d2e4..2b467d5 100644 --- a/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt +++ b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -1,29 +1,28 @@ package com.liftric.job.queue -import android.content.Context import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.asSharedFlow -import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.launch actual class NetworkListener( - val context: Context, + networkManager: NetworkManager, scope: CoroutineScope = CoroutineScope(context = Dispatchers.Default) ) : AbstractNetworkListener( + networkManager = networkManager, scope = scope ) { - private val _currentNetworkState = MutableSharedFlow(replay = 1) - override val currentNetworkState: SharedFlow - get() = _currentNetworkState.asSharedFlow() + private val _currentNetworkState = MutableStateFlow(NetworkState.NONE) + override val currentNetworkState: StateFlow + get() = _currentNetworkState.asStateFlow() override fun observeNetworkState() { scope.launch { - NetworkManager(context) + networkManager .observeNetworkConnection() - .collectLatest { currentNetworkState -> + .collect { currentNetworkState -> _currentNetworkState.emit( when (currentNetworkState) { NetworkState.NONE -> NetworkState.NONE diff --git a/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index ebc2245..8372ae0 100644 --- a/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -16,7 +16,9 @@ actual class JobQueueTests : AbstractJobQueueTests( } }, store = MapStorage(), - networkListener = NetworkListener(context = InstrumentationRegistry.getInstrumentation().context) + networkListener = NetworkListener( + networkManager = NetworkManager(InstrumentationRegistry.getInstrumentation().context) + ) ) ) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt index d548282..db6081d 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -1,15 +1,16 @@ package com.liftric.job.queue import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.StateFlow expect class NetworkManager expect class NetworkListener : AbstractNetworkListener abstract class AbstractNetworkListener( + val networkManager: NetworkManager, val scope: CoroutineScope ) { - abstract val currentNetworkState: SharedFlow + abstract val currentNetworkState: StateFlow abstract fun observeNetworkState() abstract fun stopMonitoring() diff --git a/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt index 7d309fd..63a93cb 100644 --- a/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt +++ b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -2,22 +2,22 @@ package com.liftric.job.queue import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.launch actual class NetworkListener( + networkManager: NetworkManager, scope: CoroutineScope = CoroutineScope(context = Dispatchers.Default) ) : AbstractNetworkListener( + networkManager = networkManager, scope = scope ) { - private val networkManager = NetworkManager() - - private val _currentNetworkState = MutableSharedFlow(replay = 1) - override val currentNetworkState: SharedFlow - get() = _currentNetworkState.asSharedFlow() + private val _currentNetworkState = MutableStateFlow(NetworkState.NONE) + override val currentNetworkState: StateFlow + get() = _currentNetworkState.asStateFlow() override fun observeNetworkState() { networkManager.startMonitoring() diff --git a/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index 395b67a..90fc58b 100644 --- a/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -11,6 +11,6 @@ actual class JobQueueTests : AbstractJobQueueTests( } }, store = MapStorage(), - networkListener = NetworkListener() + networkListener = NetworkListener(networkManager = NetworkManager()) ) ) From 95f862d22fea2a90ac6c63fb7b58e9ee03d3eb51 Mon Sep 17 00:00:00 2001 From: onalcan Date: Wed, 22 Mar 2023 11:01:32 +0100 Subject: [PATCH 18/26] add new JobEvents --- src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt index e1dc6ea..55481f5 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt @@ -10,4 +10,7 @@ sealed class JobEvent { data class ShouldRepeat(val job: Job): JobEvent() data class DidCancel(val job: JobContext): JobEvent() data class DidFailOnRemove(val job: JobContext, val error: Throwable): JobEvent() + data class NetworkRuleSatisfied(val job: JobContext): JobEvent() + data class NetworkRuleTimeout(val job: JobContext): JobEvent() + data class JobTimeout(val job: JobContext): JobEvent() } From 185bed6c065a8faef4442bd7e50f71fc3f9bac0c Mon Sep 17 00:00:00 2001 From: onalcan Date: Wed, 22 Mar 2023 11:01:54 +0100 Subject: [PATCH 19/26] refactor NetworkException to NetworkRuleTimeoutException --- .../kotlin/com/liftric/job/queue/rules/NetworkRule.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt index 08f982c..6e5bd23 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt @@ -3,6 +3,7 @@ package com.liftric.job.queue.rules import com.liftric.job.queue.JobInfo import com.liftric.job.queue.JobRule import com.liftric.job.queue.NetworkState +import kotlinx.coroutines.CancellationException data class NetworkRule(val minRequiredNetworkState: NetworkState) : JobRule() { override suspend fun mutating(info: JobInfo) { @@ -16,5 +17,5 @@ fun JobInfo.minRequiredNetwork(networkState: NetworkState): JobInfo { return this } -class NetworkException(message: String) : Exception(message) +class NetworkRuleTimeoutException(message: String) : CancellationException(message) From 6ea1857e54eb325e3c0b3e3fdd45460ec73df6fa Mon Sep 17 00:00:00 2001 From: onalcan Date: Wed, 22 Mar 2023 11:04:12 +0100 Subject: [PATCH 20/26] wait for network rule to be satisfied 15 seconds, otherwise cancel it --- .../kotlin/com/liftric/job/queue/JobQueue.kt | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt index 18b1816..5fae7a6 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -145,24 +145,40 @@ abstract class AbstractJobQueue( job.delegate = delegate running.value[job.id] = configuration.scope.launch { try { - networkListener.currentNetworkState.collect { currentNetworkState -> - val isNetworkRuleSatisfied = networkListener.isNetworkRuleSatisfied( - jobInfo = job.info, - currentNetworkState = currentNetworkState - ) - if (isNetworkRuleSatisfied) { + var shouldRunJob = false + try { + withTimeout(15.seconds) NetworkRuleTimeout@{ + networkListener.currentNetworkState.collect { currentNetworkState -> + val isNetworkRuleSatisfied = + networkListener.isNetworkRuleSatisfied( + jobInfo = job.info, + currentNetworkState = currentNetworkState + ) + if (isNetworkRuleSatisfied) { + shouldRunJob = true + jobEventListener.emit(JobEvent.NetworkRuleSatisfied(job)) + this@NetworkRuleTimeout.cancel("Network rule satisfied") + } + } + } + } catch (e: CancellationException) { + if (shouldRunJob) { jobEventListener.emit(JobEvent.WillRun(job)) withTimeout(job.info.timeout) { val result = job.run() jobEventListener.emit(result) } - } + } else throw NetworkRuleTimeoutException("Timeout exceeded for the network.") } } catch (e: CancellationException) { - if (e is TimeoutCancellationException) { - println("Timeout exceeded") + when (e) { + is TimeoutCancellationException -> { + jobEventListener.emit(JobEvent.JobTimeout(job)) + } + is NetworkRuleTimeoutException -> { + jobEventListener.emit(JobEvent.NetworkRuleTimeout(job)) + } } - networkListener.stopMonitoring() jobEventListener.emit(JobEvent.DidCancel(job)) } finally { if (job.info.shouldPersist) { @@ -171,6 +187,7 @@ abstract class AbstractJobQueue( running.value[job.id]?.cancel() running.value.remove(job.id) lock.release() + networkListener.stopMonitoring() } } } From ea7642078e3c3d3ad76fba2f53deaaf11d5575c3 Mon Sep 17 00:00:00 2001 From: onalcan Date: Wed, 22 Mar 2023 11:04:26 +0100 Subject: [PATCH 21/26] update test --- .../com/liftric/job/queue/JobQueueTests.kt | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index 90f0d79..759fe00 100644 --- a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -175,11 +175,10 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { launch { queue.jobEventListener.collect { println("TEST -> JOB INFO: $it") - if (it is JobEvent.WillRun) { - return@collect - } - cancel() - assertTrue(it is JobEvent.DidSucceed) + if (it is JobEvent.DidCancel || it is JobEvent.DidSucceed) { + cancel() + assertTrue(it is JobEvent.DidSucceed) + } else return@collect } } @@ -187,8 +186,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { minRequiredNetwork(NetworkState.MOBILE) } - queue.networkListener.networkState = NetworkState.WIFI - println("Network State: ${queue.networkListener.networkState}") + println("Network State: ${queue.networkListener.currentNetworkState.value}") queue.start() } @@ -199,11 +197,10 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { launch { queue.jobEventListener.collect { println("TEST -> JOB INFO: $it") - if (it is JobEvent.WillRun) { - return@collect - } - cancel() - assertTrue(it is JobEvent.DidFail) + if (it is JobEvent.DidCancel || it is JobEvent.DidSucceed) { + cancel() + assertTrue(it is JobEvent.DidCancel) + } else return@collect } } @@ -211,8 +208,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { minRequiredNetwork(NetworkState.WIFI) } - queue.networkListener.networkState = NetworkState.MOBILE - println("Network State: ${queue.networkListener.networkState}") + println("Network State: ${queue.networkListener.currentNetworkState.value}") queue.start() } From d92cc9d4d7804570fc20cc8cb26e0d3c8d3c94cd Mon Sep 17 00:00:00 2001 From: onalcan Date: Wed, 22 Mar 2023 11:24:45 +0100 Subject: [PATCH 22/26] chore: rename timeout to jobTimeout and allow user to define a networkRuleTimeout --- .../kotlin/com/liftric/job/queue/JobInfo.kt | 3 ++- .../kotlin/com/liftric/job/queue/JobQueue.kt | 4 ++-- .../com/liftric/job/queue/rules/NetworkRule.kt | 18 +++++++++++++++--- .../com/liftric/job/queue/rules/TimeoutRule.kt | 2 +- .../com/liftric/job/queue/JobQueueTests.kt | 4 ++-- 5 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt index c14e415..f081c51 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt @@ -6,7 +6,8 @@ import kotlinx.serialization.Serializable @Serializable data class JobInfo( var tag: String? = null, - var timeout: Duration = Duration.INFINITE, + var jobTimeout: Duration = Duration.INFINITE, + var networkRuleTimeout: Duration = Duration.INFINITE, var rules: MutableList = mutableListOf(), var shouldPersist: Boolean = false, var minRequiredNetworkState: NetworkState = NetworkState.NONE diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt index 5fae7a6..93f739b 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -147,7 +147,7 @@ abstract class AbstractJobQueue( try { var shouldRunJob = false try { - withTimeout(15.seconds) NetworkRuleTimeout@{ + withTimeout(job.info.networkRuleTimeout) NetworkRuleTimeout@{ networkListener.currentNetworkState.collect { currentNetworkState -> val isNetworkRuleSatisfied = networkListener.isNetworkRuleSatisfied( @@ -164,7 +164,7 @@ abstract class AbstractJobQueue( } catch (e: CancellationException) { if (shouldRunJob) { jobEventListener.emit(JobEvent.WillRun(job)) - withTimeout(job.info.timeout) { + withTimeout(job.info.jobTimeout) { val result = job.run() jobEventListener.emit(result) } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt index 6e5bd23..fddf2af 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt @@ -4,15 +4,27 @@ import com.liftric.job.queue.JobInfo import com.liftric.job.queue.JobRule import com.liftric.job.queue.NetworkState import kotlinx.coroutines.CancellationException +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds -data class NetworkRule(val minRequiredNetworkState: NetworkState) : JobRule() { +data class NetworkRule( + val minRequiredNetworkState: NetworkState, + val networkRuleTimeout: Duration +) : JobRule() { override suspend fun mutating(info: JobInfo) { info.minRequiredNetworkState = minRequiredNetworkState + info.networkRuleTimeout = networkRuleTimeout } } -fun JobInfo.minRequiredNetwork(networkState: NetworkState): JobInfo { - val rule = NetworkRule(networkState) +fun JobInfo.minRequiredNetwork( + networkState: NetworkState, + networkRuleTimeout: Duration = 30.seconds +): JobInfo { + val rule = NetworkRule( + minRequiredNetworkState = networkState, + networkRuleTimeout = networkRuleTimeout + ) rules.add(rule) return this } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt index 96a4bf0..dda6291 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt @@ -8,7 +8,7 @@ import kotlinx.serialization.Serializable @Serializable data class TimeoutRule(val timeout: Duration): JobRule() { override suspend fun mutating(info: JobInfo) { - info.timeout = timeout + info.jobTimeout = timeout } } diff --git a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index 759fe00..e349464 100644 --- a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -183,7 +183,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { } queue.schedule(TestData(id), ::TestTask) { - minRequiredNetwork(NetworkState.MOBILE) + minRequiredNetwork(NetworkState.NONE, 3.seconds) } println("Network State: ${queue.networkListener.currentNetworkState.value}") @@ -205,7 +205,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { } queue.schedule(TestData(id), ::TestTask) { - minRequiredNetwork(NetworkState.WIFI) + minRequiredNetwork(NetworkState.WIFI, 3.seconds) } println("Network State: ${queue.networkListener.currentNetworkState.value}") From 4b5b720d4ab01bf63b5f79a1393d3416a7306d27 Mon Sep 17 00:00:00 2001 From: onalcan Date: Wed, 22 Mar 2023 11:35:25 +0100 Subject: [PATCH 23/26] chore: fix tests --- .../com/liftric/job/queue/JobQueueTests.kt | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index e349464..ae7a2bb 100644 --- a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -174,7 +174,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { val id = UUIDFactory.create().toString() launch { queue.jobEventListener.collect { - println("TEST -> JOB INFO: $it") + println(it) if (it is JobEvent.DidCancel || it is JobEvent.DidSucceed) { cancel() assertTrue(it is JobEvent.DidSucceed) @@ -196,11 +196,9 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { val id = UUIDFactory.create().toString() launch { queue.jobEventListener.collect { - println("TEST -> JOB INFO: $it") - if (it is JobEvent.DidCancel || it is JobEvent.DidSucceed) { - cancel() - assertTrue(it is JobEvent.DidCancel) - } else return@collect + println(it) + assertTrue(it is JobEvent.NetworkRuleTimeout) + cancel() } } @@ -212,4 +210,24 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { queue.start() } + + @Test + fun testJobTimeout() = runBlocking { + launch { + queue.jobEventListener.collect { + println(it) + if (it is JobEvent.NetworkRuleSatisfied || it is JobEvent.WillRun) { + return@collect + } + assertTrue(it is JobEvent.JobTimeout) + cancel() + } + } + + queue.schedule(::LongRunningTask) { + timeout(5.seconds) + } + + queue.start() + } } From 164ce28652b8645fa8a8f2d8649371c25a0f2ed1 Mon Sep 17 00:00:00 2001 From: onalcan Date: Thu, 23 Mar 2023 09:49:34 +0100 Subject: [PATCH 24/26] refactor: use named arguments and rename some variables --- .../kotlin/com/liftric/job/queue/Job.kt | 33 ++++++++++++------- .../kotlin/com/liftric/job/queue/JobQueue.kt | 24 +++++++------- .../kotlin/com/liftric/job/queue/JobRule.kt | 8 ++--- .../com/liftric/job/queue/rules/DelayRule.kt | 2 +- .../liftric/job/queue/rules/NetworkRule.kt | 6 ++-- .../liftric/job/queue/rules/PeriodicRule.kt | 4 +-- .../job/queue/rules/PersistenceRule.kt | 4 +-- .../com/liftric/job/queue/rules/RetryRule.kt | 8 ++--- .../liftric/job/queue/rules/TimeoutRule.kt | 4 +-- .../com/liftric/job/queue/rules/UniqueRule.kt | 8 ++--- 10 files changed, 57 insertions(+), 44 deletions(-) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt index ffb176a..a3524bf 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt @@ -23,10 +23,10 @@ data class Job( internal var delegate: JobDelegate? = null constructor(task: Task, info: JobInfo) : this( - UUIDFactory.create(), - info, - task, - Clock.System.now() + id = UUIDFactory.create(), + info = info, + task = task, + startTime = Clock.System.now() ) private var canRepeat: Boolean = true @@ -34,34 +34,45 @@ data class Job( suspend fun run(): JobEvent { val event = try { info.rules.forEach { - it.willRun(context = this@Job) + it.willRun(jobContext = this@Job) } task.body() - JobEvent.DidSucceed(this@Job) + JobEvent.DidSucceed(job = this@Job) } catch (e: CancellationException) { throw e } catch (e: Throwable) { canRepeat = task.onRepeat(e) - JobEvent.DidFail(this@Job, e) + JobEvent.DidFail(job = this@Job, error = e) } try { - info.rules.forEach { it.willRemove(this@Job, event) } + info.rules.forEach { rule -> + rule.willRemove(jobContext = this@Job, result = event) + } } catch (e: CancellationException) { throw e } catch (e: Throwable) { - JobEvent.DidFailOnRemove(this@Job, e) + JobEvent.DidFailOnRemove(job = this@Job, error = e) } return event } override suspend fun cancel() { - delegate?.onEvent?.emit(JobEvent.DidCancel(this@Job)) + delegate?.onEvent?.emit(JobEvent.DidCancel(job = this@Job)) } override suspend fun repeat(id: UUID, info: JobInfo, task: Task, startTime: Instant) { if (canRepeat) { - delegate?.onEvent?.emit(JobEvent.ShouldRepeat(Job(id, info, task, startTime))) + delegate?.onEvent?.emit( + JobEvent.ShouldRepeat( + Job( + id = id, + info = info, + task = task, + startTime = startTime + ) + ) + ) } } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt index 93f739b..a1f0128 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -54,7 +54,7 @@ abstract class AbstractJobQueue( /** * Semaphore to limit concurrency */ - private val lock = Semaphore(configuration.maxConcurrency, 0) + private val lock = Semaphore(permits = configuration.maxConcurrency, acquiredPermits = 0) /** * Mutex to suspend queue operations during cancellation @@ -70,7 +70,7 @@ abstract class AbstractJobQueue( } suspend fun schedule(task: () -> Task, configure: JobInfo.() -> JobInfo = { JobInfo() }) { - schedule(task(), configure) + schedule(task = task(), configure = configure) } suspend fun schedule( @@ -78,20 +78,17 @@ abstract class AbstractJobQueue( task: (Data) -> DataTask, configure: JobInfo.() -> JobInfo = { JobInfo() } ) { - schedule(task(data), configure) + schedule(task = task(data), configure = configure) } suspend fun schedule(task: Task, configure: JobInfo.() -> JobInfo = { JobInfo() }) { val info = configure(JobInfo()).apply { rules.forEach { rule -> - rule.mutating(this) + rule.mutating(jobInfo = this) } } - val job = Job( - task = task, - info = info - ) + val job = Job(task = task, info = info) schedule(job).apply { jobEventListener.emit(JobEvent.DidSchedule(job)) @@ -100,14 +97,19 @@ abstract class AbstractJobQueue( private suspend fun schedule(job: Job) = try { job.info.rules.forEach { - it.willSchedule(this, job) + it.willSchedule(queue = this, jobContext = job) } if (job.info.shouldPersist) { - store.set(job.id.toString(), format.encodeToString(job)) + store.set(id = job.id.toString(), json = format.encodeToString(job)) } - queue.value = queue.value.plus(listOf(job)).sortedBy { it.startTime }.toMutableList() + queue.value = queue.value + .plus(listOf(job)) + .sortedBy { queueJob -> + queueJob.startTime + } + .toMutableList() } catch (e: Throwable) { jobEventListener.emit(JobEvent.DidThrowOnSchedule(e)) } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt index 5d512ab..d665327 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt @@ -4,9 +4,9 @@ import kotlinx.serialization.Serializable @Serializable abstract class JobRule { - open suspend fun mutating(info: JobInfo) {} + open suspend fun mutating(jobInfo: JobInfo) {} @Throws(Throwable::class) - open suspend fun willSchedule(queue: Queue, context: JobContext) {} - open suspend fun willRun(context: JobContext) {} - open suspend fun willRemove(context: JobContext, result: JobEvent) {} + open suspend fun willSchedule(queue: Queue, jobContext: JobContext) {} + open suspend fun willRun(jobContext: JobContext) {} + open suspend fun willRemove(jobContext: JobContext, result: JobEvent) {} } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt index a200305..5c886bb 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt @@ -8,7 +8,7 @@ import kotlinx.serialization.Serializable @Serializable data class DelayRule(val duration: Duration = 0.seconds): JobRule() { - override suspend fun willRun(context: JobContext) { + override suspend fun willRun(jobContext: JobContext) { delay(duration) } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt index fddf2af..19a12e3 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt @@ -11,9 +11,9 @@ data class NetworkRule( val minRequiredNetworkState: NetworkState, val networkRuleTimeout: Duration ) : JobRule() { - override suspend fun mutating(info: JobInfo) { - info.minRequiredNetworkState = minRequiredNetworkState - info.networkRuleTimeout = networkRuleTimeout + override suspend fun mutating(jobInfo: JobInfo) { + jobInfo.minRequiredNetworkState = minRequiredNetworkState + jobInfo.networkRuleTimeout = networkRuleTimeout } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/PeriodicRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/PeriodicRule.kt index 8dc68d1..0e159c3 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/PeriodicRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/PeriodicRule.kt @@ -8,9 +8,9 @@ import kotlin.time.Duration.Companion.seconds @Serializable data class PeriodicRule(val interval: Duration = 0.seconds): JobRule() { - override suspend fun willRemove(context: JobContext, result: JobEvent) { + override suspend fun willRemove(jobContext: JobContext, result: JobEvent) { if (result is JobEvent.DidSucceed) { - context.repeat(startTime = Clock.System.now().plus(interval)) + jobContext.repeat(startTime = Clock.System.now().plus(interval)) } } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/PersistenceRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/PersistenceRule.kt index 186a777..cc8045f 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/PersistenceRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/PersistenceRule.kt @@ -6,8 +6,8 @@ import kotlinx.serialization.Serializable @Serializable data class PersistenceRule(val shouldPersist: Boolean): JobRule() { - override suspend fun mutating(info: JobInfo) { - info.shouldPersist = shouldPersist + override suspend fun mutating(jobInfo: JobInfo) { + jobInfo.shouldPersist = shouldPersist } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/RetryRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/RetryRule.kt index 8396c44..a3e7c6b 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/RetryRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/RetryRule.kt @@ -8,16 +8,16 @@ import kotlin.time.Duration.Companion.seconds @Serializable data class RetryRule(val limit: RetryLimit, val delay: Duration = 0.seconds): JobRule() { - override suspend fun willRemove(context: JobContext, result: JobEvent) { + override suspend fun willRemove(jobContext: JobContext, result: JobEvent) { if (result is JobEvent.DidFail) { when (limit) { is RetryLimit.Unlimited -> { - context.repeat(startTime = Clock.System.now()) + jobContext.repeat(startTime = Clock.System.now()) } is RetryLimit.Limited -> { if (limit.count > 0) { - val rules = context.info.rules.minus(this).plus(RetryRule(RetryLimit.Limited(limit.count - 1), delay)) - context.repeat(info = context.info.copy(rules = rules.toMutableList()), startTime = Clock.System.now().plus(delay)) + val rules = jobContext.info.rules.minus(this).plus(RetryRule(RetryLimit.Limited(limit.count - 1), delay)) + jobContext.repeat(info = jobContext.info.copy(rules = rules.toMutableList()), startTime = Clock.System.now().plus(delay)) } } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt index dda6291..2a5a03f 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt @@ -7,8 +7,8 @@ import kotlinx.serialization.Serializable @Serializable data class TimeoutRule(val timeout: Duration): JobRule() { - override suspend fun mutating(info: JobInfo) { - info.jobTimeout = timeout + override suspend fun mutating(jobInfo: JobInfo) { + jobInfo.jobTimeout = timeout } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/UniqueRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/UniqueRule.kt index 31966ec..89ab330 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/UniqueRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/UniqueRule.kt @@ -5,16 +5,16 @@ import kotlinx.serialization.Serializable @Serializable data class UniqueRule(private val tag: String? = null): JobRule() { - override suspend fun mutating(info: JobInfo) { - info.tag = tag + override suspend fun mutating(jobInfo: JobInfo) { + jobInfo.tag = tag } - override suspend fun willSchedule(queue: Queue, context: JobContext) { + override suspend fun willSchedule(queue: Queue, jobContext: JobContext) { for (item in queue.jobs) { if (item.info.tag == tag) { throw Throwable("Job with tag=${item.info.tag} already exists") } - if (item.id == context.id) { + if (item.id == jobContext.id) { throw Throwable("Job with id=${item.id} already exists") } } From 5377d9de89c05d268104682a8138aa660d9d6aee Mon Sep 17 00:00:00 2001 From: onalcan Date: Thu, 23 Mar 2023 11:09:44 +0100 Subject: [PATCH 25/26] call observeNetworkState() before running the job --- src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt index a1f0128..56436a5 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -147,6 +147,7 @@ abstract class AbstractJobQueue( job.delegate = delegate running.value[job.id] = configuration.scope.launch { try { + networkListener.observeNetworkState() var shouldRunJob = false try { withTimeout(job.info.networkRuleTimeout) NetworkRuleTimeout@{ From 56801aa9190241c0c451fcefbbd207ce96bbdc43 Mon Sep 17 00:00:00 2001 From: onalcan Date: Thu, 23 Mar 2023 11:10:33 +0100 Subject: [PATCH 26/26] refactor NetworkListeners and android network manager --- .../com/liftric/job/queue/NetworkListener.kt | 24 ++++++++----------- .../com/liftric/job/queue/NetworkManager.kt | 3 +-- .../com/liftric/job/queue/NetworkListener.kt | 15 +++++------- 3 files changed, 17 insertions(+), 25 deletions(-) diff --git a/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt index 2b467d5..b3d10da 100644 --- a/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt +++ b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.launch actual class NetworkListener( @@ -18,24 +19,19 @@ actual class NetworkListener( override val currentNetworkState: StateFlow get() = _currentNetworkState.asStateFlow() + private var job: kotlinx.coroutines.Job? = null + override fun observeNetworkState() { - scope.launch { + job = scope.launch { networkManager - .observeNetworkConnection() - .collect { currentNetworkState -> - _currentNetworkState.emit( - when (currentNetworkState) { - NetworkState.NONE -> NetworkState.NONE - NetworkState.MOBILE -> NetworkState.MOBILE - NetworkState.WIFI -> NetworkState.WIFI - null -> NetworkState.NONE - } - ) + .network + .collectLatest { currentNetworkState -> + _currentNetworkState.emit(currentNetworkState ?: NetworkState.NONE) } } } - - // not needed for Android - override fun stopMonitoring() {} + override fun stopMonitoring() { + job?.cancel() + } } diff --git a/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt b/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt index 4983e3d..743e655 100644 --- a/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt +++ b/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt @@ -15,8 +15,7 @@ actual class NetworkManager(context: Context) { context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager private val connectionPublisher = MutableStateFlow(getCurrentNetworkConnection()) - - fun observeNetworkConnection(): Flow = connectionPublisher + val network: Flow = connectionPublisher private fun getCurrentNetworkConnection(): NetworkState? = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) { diff --git a/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt index 63a93cb..6a5daa0 100644 --- a/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt +++ b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -19,22 +19,19 @@ actual class NetworkListener( override val currentNetworkState: StateFlow get() = _currentNetworkState.asStateFlow() + private var job: kotlinx.coroutines.Job? = null + override fun observeNetworkState() { - networkManager.startMonitoring() - scope.launch { + job = scope.launch { + networkManager.startMonitoring() networkManager.network.collectLatest { currentNetworkState -> - _currentNetworkState.emit( - when (currentNetworkState) { - NetworkState.NONE -> NetworkState.NONE - NetworkState.MOBILE -> NetworkState.MOBILE - NetworkState.WIFI -> NetworkState.WIFI - } - ) + _currentNetworkState.emit(currentNetworkState) } } } override fun stopMonitoring() { networkManager.stopMonitoring() + job?.cancel() } }