diff --git a/src/androidMain/AndroidManifest.xml b/src/androidMain/AndroidManifest.xml new file mode 100644 index 0000000..a37531b --- /dev/null +++ b/src/androidMain/AndroidManifest.xml @@ -0,0 +1,6 @@ + + + + + diff --git a/src/androidMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/androidMain/kotlin/com/liftric/job/queue/JobQueue.kt index 4a72fa0..0d3dd3e 100644 --- a/src/androidMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/androidMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -8,9 +8,13 @@ actual class JobQueue( context: Context, serializers: SerializersModule = SerializersModule {}, configuration: Queue.Configuration = Queue.DefaultConfiguration, - store: JsonStorage = SettingsStorage(SharedPreferencesSettings.Factory(context).create("com.liftric.persisted.queue")) + networkListener: NetworkListener, + store: JsonStorage = SettingsStorage( + SharedPreferencesSettings.Factory(context).create("com.liftric.persisted.queue") + ) ) : AbstractJobQueue( - serializers, - configuration, - store + serializers = serializers, + networkListener = networkListener, + configuration = configuration, + store = store ) diff --git a/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt new file mode 100644 index 0000000..b3d10da --- /dev/null +++ b/src/androidMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -0,0 +1,37 @@ +package com.liftric.job.queue + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.launch + +actual class NetworkListener( + networkManager: NetworkManager, + scope: CoroutineScope = CoroutineScope(context = Dispatchers.Default) +) : AbstractNetworkListener( + networkManager = networkManager, + scope = scope +) { + private val _currentNetworkState = MutableStateFlow(NetworkState.NONE) + override val currentNetworkState: StateFlow + get() = _currentNetworkState.asStateFlow() + + private var job: kotlinx.coroutines.Job? = null + + override fun observeNetworkState() { + job = scope.launch { + networkManager + .network + .collectLatest { currentNetworkState -> + _currentNetworkState.emit(currentNetworkState ?: NetworkState.NONE) + } + } + } + + override fun stopMonitoring() { + job?.cancel() + } +} diff --git a/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt b/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt new file mode 100644 index 0000000..743e655 --- /dev/null +++ b/src/androidMain/kotlin/com/liftric/job/queue/NetworkManager.kt @@ -0,0 +1,54 @@ +package com.liftric.job.queue + +import android.annotation.SuppressLint +import android.annotation.TargetApi +import android.content.Context +import android.net.ConnectivityManager +import android.net.NetworkCapabilities +import android.os.Build +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow + +@SuppressLint("MissingPermission") +actual class NetworkManager(context: Context) { + private var connectivityManager: ConnectivityManager = + context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager + + private val connectionPublisher = MutableStateFlow(getCurrentNetworkConnection()) + val network: Flow = connectionPublisher + + private fun getCurrentNetworkConnection(): NetworkState? = + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) { + postAndroidMNetworkConnection(connectivityManager) + } else { + preAndroidMNetworkConnection(connectivityManager) + } + + @TargetApi(Build.VERSION_CODES.M) + private fun postAndroidMNetworkConnection(connectivityManager: ConnectivityManager): NetworkState? { + val network = connectivityManager.activeNetwork + val capabilities = connectivityManager.getNetworkCapabilities(network) + return getNetworkConnection(capabilities) + } + + @Suppress("DEPRECATION") + private fun preAndroidMNetworkConnection(connectivityManager: ConnectivityManager): NetworkState? = + when (connectivityManager.activeNetworkInfo?.type) { + null -> null + ConnectivityManager.TYPE_WIFI -> NetworkState.WIFI + else -> NetworkState.MOBILE + } + + private fun getNetworkConnection(capabilities: NetworkCapabilities?): NetworkState? = + when { + capabilities == null -> null + Build.VERSION.SDK_INT < Build.VERSION_CODES.M + && !capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) -> null + Build.VERSION.SDK_INT >= Build.VERSION_CODES.M && + !(capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) + && capabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_VALIDATED)) -> null + capabilities.hasTransport(NetworkCapabilities.TRANSPORT_WIFI) -> NetworkState.WIFI + capabilities.hasTransport(NetworkCapabilities.TRANSPORT_CELLULAR) -> NetworkState.MOBILE + else -> null + } +} diff --git a/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index cc1776d..8372ae0 100644 --- a/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/androidTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -7,12 +7,18 @@ import kotlinx.serialization.modules.polymorphic import org.junit.runner.RunWith @RunWith(AndroidJUnit4::class) -actual class JobQueueTests: AbstractJobQueueTests(JobQueue( - context = InstrumentationRegistry.getInstrumentation().targetContext, - serializers = SerializersModule { - polymorphic(Task::class) { - subclass(TestTask::class, TestTask.serializer()) - } - }, - store = MapStorage() -)) +actual class JobQueueTests : AbstractJobQueueTests( + JobQueue( + context = InstrumentationRegistry.getInstrumentation().targetContext, + serializers = SerializersModule { + polymorphic(Task::class) { + subclass(TestTask::class, TestTask.serializer()) + } + }, + store = MapStorage(), + networkListener = NetworkListener( + networkManager = NetworkManager(InstrumentationRegistry.getInstrumentation().context) + ) + ) +) + diff --git a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt index 3b826b7..a3524bf 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/Job.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/Job.kt @@ -2,7 +2,6 @@ package com.liftric.job.queue import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.withTimeout import kotlinx.datetime.Clock import kotlinx.datetime.Instant import kotlinx.serialization.Serializable @@ -19,47 +18,61 @@ data class Job( override val info: JobInfo, override val task: Task, override val startTime: Instant -): JobContext { - @Transient internal var delegate: JobDelegate? = null +) : JobContext { + @Transient + internal var delegate: JobDelegate? = null - constructor(task: Task, info: JobInfo) : this (UUIDFactory.create(), info, task, Clock.System.now()) + constructor(task: Task, info: JobInfo) : this( + id = UUIDFactory.create(), + info = info, + task = task, + startTime = Clock.System.now() + ) private var canRepeat: Boolean = true suspend fun run(): JobEvent { - return withTimeout(info.timeout) { - val event = try { - info.rules.forEach { it.willRun(this@Job) } - - task.body() - - JobEvent.DidSucceed(this@Job) - } catch (e: CancellationException) { - throw e - } catch (e: Throwable) { - canRepeat = task.onRepeat(e) - JobEvent.DidFail(this@Job, e) + val event = try { + info.rules.forEach { + it.willRun(jobContext = this@Job) } + task.body() + JobEvent.DidSucceed(job = this@Job) + } catch (e: CancellationException) { + throw e + } catch (e: Throwable) { + canRepeat = task.onRepeat(e) + JobEvent.DidFail(job = this@Job, error = e) + } - try { - info.rules.forEach { it.willRemove(this@Job, event) } - - event - } catch (e: CancellationException) { - throw e - } catch (e: Throwable) { - JobEvent.DidFailOnRemove(this@Job, e) + try { + info.rules.forEach { rule -> + rule.willRemove(jobContext = this@Job, result = event) } + } catch (e: CancellationException) { + throw e + } catch (e: Throwable) { + JobEvent.DidFailOnRemove(job = this@Job, error = e) } + return event } override suspend fun cancel() { - delegate?.onEvent?.emit(JobEvent.DidCancel(this@Job)) + delegate?.onEvent?.emit(JobEvent.DidCancel(job = this@Job)) } override suspend fun repeat(id: UUID, info: JobInfo, task: Task, startTime: Instant) { if (canRepeat) { - delegate?.onEvent?.emit(JobEvent.ShouldRepeat(Job(id, info, task, startTime))) + delegate?.onEvent?.emit( + JobEvent.ShouldRepeat( + Job( + id = id, + info = info, + task = task, + startTime = startTime + ) + ) + ) } } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobContext.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobContext.kt index 537d691..5f33e46 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobContext.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobContext.kt @@ -2,9 +2,14 @@ package com.liftric.job.queue import kotlinx.datetime.Instant -interface JobContext: JobData { +interface JobContext : JobData { suspend fun cancel() - suspend fun repeat(id: UUID = this.id, info: JobInfo = this.info, task: Task = this.task, startTime: Instant = this.startTime) + suspend fun repeat( + id: UUID = this.id, + info: JobInfo = this.info, + task: Task = this.task, + startTime: Instant = this.startTime, + ) } interface JobData { diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt index e1dc6ea..55481f5 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobEvent.kt @@ -10,4 +10,7 @@ sealed class JobEvent { data class ShouldRepeat(val job: Job): JobEvent() data class DidCancel(val job: JobContext): JobEvent() data class DidFailOnRemove(val job: JobContext, val error: Throwable): JobEvent() + data class NetworkRuleSatisfied(val job: JobContext): JobEvent() + data class NetworkRuleTimeout(val job: JobContext): JobEvent() + data class JobTimeout(val job: JobContext): JobEvent() } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt index c19e15c..f081c51 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobInfo.kt @@ -6,7 +6,9 @@ import kotlinx.serialization.Serializable @Serializable data class JobInfo( var tag: String? = null, - var timeout: Duration = Duration.INFINITE, + var jobTimeout: Duration = Duration.INFINITE, + var networkRuleTimeout: Duration = Duration.INFINITE, var rules: MutableList = mutableListOf(), - var shouldPersist: Boolean = false + var shouldPersist: Boolean = false, + var minRequiredNetworkState: NetworkState = NetworkState.NONE ) diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt index e99f351..56436a5 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -3,7 +3,7 @@ package com.liftric.job.queue import com.liftric.job.queue.rules.* import kotlinx.atomicfu.atomic import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.withLock @@ -18,12 +18,13 @@ import kotlinx.serialization.modules.plus import kotlinx.serialization.modules.polymorphic import kotlin.time.Duration.Companion.seconds -expect class JobQueue: AbstractJobQueue +expect class JobQueue : AbstractJobQueue abstract class AbstractJobQueue( serializers: SerializersModule, + val networkListener: NetworkListener, final override val configuration: Queue.Configuration, private val store: JsonStorage -): Queue { +) : Queue { private val module = SerializersModule { contextual(InstantIso8601Serializer) polymorphic(JobRule::class) { @@ -37,7 +38,7 @@ abstract class AbstractJobQueue( } private val format = Json { serializersModule = module + serializers } - val listener = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) + val jobEventListener = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) /** * Scheduled jobs @@ -53,7 +54,7 @@ abstract class AbstractJobQueue( /** * Semaphore to limit concurrency */ - private val lock = Semaphore(configuration.maxConcurrency, 0) + private val lock = Semaphore(permits = configuration.maxConcurrency, acquiredPermits = 0) /** * Mutex to suspend queue operations during cancellation @@ -69,37 +70,48 @@ abstract class AbstractJobQueue( } suspend fun schedule(task: () -> Task, configure: JobInfo.() -> JobInfo = { JobInfo() }) { - schedule(task(), configure) + schedule(task = task(), configure = configure) } - suspend fun schedule(data: Data, task: (Data) -> DataTask, configure: JobInfo.() -> JobInfo = { JobInfo() }) { - schedule(task(data), configure) + suspend fun schedule( + data: Data, + task: (Data) -> DataTask, + configure: JobInfo.() -> JobInfo = { JobInfo() } + ) { + schedule(task = task(data), configure = configure) } suspend fun schedule(task: Task, configure: JobInfo.() -> JobInfo = { JobInfo() }) { val info = configure(JobInfo()).apply { - rules.forEach { it.mutating(this) } + rules.forEach { rule -> + rule.mutating(jobInfo = this) + } } - val job = Job(task, info) + val job = Job(task = task, info = info) schedule(job).apply { - listener.emit(JobEvent.DidSchedule(job)) + jobEventListener.emit(JobEvent.DidSchedule(job)) } } private suspend fun schedule(job: Job) = try { job.info.rules.forEach { - it.willSchedule(this, job) + it.willSchedule(queue = this, jobContext = job) } if (job.info.shouldPersist) { - store.set(job.id.toString(), format.encodeToString(job)) + store.set(id = job.id.toString(), json = format.encodeToString(job)) } - queue.value = queue.value.plus(listOf(job)).sortedBy { it.startTime }.toMutableList() + queue.value = queue.value + .plus(listOf(job)) + .sortedBy { queueJob -> + queueJob.startTime + } + .toMutableList() } catch (e: Throwable) { - listener.emit(JobEvent.DidThrowOnSchedule(e)) + jobEventListener.emit(JobEvent.DidThrowOnSchedule(e)) } private val delegate = JobDelegate() @@ -118,10 +130,10 @@ abstract class AbstractJobQueue( } is JobEvent.ShouldRepeat -> { schedule(event.job).apply { - listener.emit(JobEvent.DidScheduleRepeat(event.job)) + jobEventListener.emit(JobEvent.DidScheduleRepeat(event.job)) } } - else -> listener.emit(event) + else -> jobEventListener.emit(event) } } } @@ -135,11 +147,42 @@ abstract class AbstractJobQueue( job.delegate = delegate running.value[job.id] = configuration.scope.launch { try { - listener.emit(JobEvent.WillRun(job)) - val result = job.run() - listener.emit(result) + networkListener.observeNetworkState() + var shouldRunJob = false + try { + withTimeout(job.info.networkRuleTimeout) NetworkRuleTimeout@{ + networkListener.currentNetworkState.collect { currentNetworkState -> + val isNetworkRuleSatisfied = + networkListener.isNetworkRuleSatisfied( + jobInfo = job.info, + currentNetworkState = currentNetworkState + ) + if (isNetworkRuleSatisfied) { + shouldRunJob = true + jobEventListener.emit(JobEvent.NetworkRuleSatisfied(job)) + this@NetworkRuleTimeout.cancel("Network rule satisfied") + } + } + } + } catch (e: CancellationException) { + if (shouldRunJob) { + jobEventListener.emit(JobEvent.WillRun(job)) + withTimeout(job.info.jobTimeout) { + val result = job.run() + jobEventListener.emit(result) + } + } else throw NetworkRuleTimeoutException("Timeout exceeded for the network.") + } } catch (e: CancellationException) { - listener.emit(JobEvent.DidCancel(job)) + when (e) { + is TimeoutCancellationException -> { + jobEventListener.emit(JobEvent.JobTimeout(job)) + } + is NetworkRuleTimeoutException -> { + jobEventListener.emit(JobEvent.NetworkRuleTimeout(job)) + } + } + jobEventListener.emit(JobEvent.DidCancel(job)) } finally { if (job.info.shouldPersist) { store.remove(job.id.toString()) @@ -147,6 +190,7 @@ abstract class AbstractJobQueue( running.value[job.id]?.cancel() running.value.remove(job.id) lock.release() + networkListener.stopMonitoring() } } } @@ -173,7 +217,9 @@ abstract class AbstractJobQueue( queue.value.clear() running.value.clear() configuration.scope.coroutineContext.cancelChildren() - if (clearStore) { store.clear() } + if (clearStore) { + store.clear() + } } } @@ -185,7 +231,7 @@ abstract class AbstractJobQueue( isCancelling.withLock { queue.value.firstOrNull { it.id == id }?.let { job -> queue.value.remove(job) - listener.emit(JobEvent.DidCancel(job)) + jobEventListener.emit(JobEvent.DidCancel(job)) } ?: running.value[id]?.cancel() } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt index 5d512ab..d665327 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/JobRule.kt @@ -4,9 +4,9 @@ import kotlinx.serialization.Serializable @Serializable abstract class JobRule { - open suspend fun mutating(info: JobInfo) {} + open suspend fun mutating(jobInfo: JobInfo) {} @Throws(Throwable::class) - open suspend fun willSchedule(queue: Queue, context: JobContext) {} - open suspend fun willRun(context: JobContext) {} - open suspend fun willRemove(context: JobContext, result: JobEvent) {} + open suspend fun willSchedule(queue: Queue, jobContext: JobContext) {} + open suspend fun willRun(jobContext: JobContext) {} + open suspend fun willRemove(jobContext: JobContext, result: JobEvent) {} } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt new file mode 100644 index 0000000..db6081d --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -0,0 +1,27 @@ +package com.liftric.job.queue + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.StateFlow + +expect class NetworkManager + +expect class NetworkListener : AbstractNetworkListener +abstract class AbstractNetworkListener( + val networkManager: NetworkManager, + val scope: CoroutineScope +) { + abstract val currentNetworkState: StateFlow + abstract fun observeNetworkState() + abstract fun stopMonitoring() + + fun isNetworkRuleSatisfied(jobInfo: JobInfo, currentNetworkState: NetworkState): Boolean { + return currentNetworkState >= jobInfo.minRequiredNetworkState + } +} + +@kotlinx.serialization.Serializable +enum class NetworkState { + NONE, + MOBILE, + WIFI +} diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt index a200305..5c886bb 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/DelayRule.kt @@ -8,7 +8,7 @@ import kotlinx.serialization.Serializable @Serializable data class DelayRule(val duration: Duration = 0.seconds): JobRule() { - override suspend fun willRun(context: JobContext) { + override suspend fun willRun(jobContext: JobContext) { delay(duration) } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt new file mode 100644 index 0000000..19a12e3 --- /dev/null +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/NetworkRule.kt @@ -0,0 +1,33 @@ +package com.liftric.job.queue.rules + +import com.liftric.job.queue.JobInfo +import com.liftric.job.queue.JobRule +import com.liftric.job.queue.NetworkState +import kotlinx.coroutines.CancellationException +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +data class NetworkRule( + val minRequiredNetworkState: NetworkState, + val networkRuleTimeout: Duration +) : JobRule() { + override suspend fun mutating(jobInfo: JobInfo) { + jobInfo.minRequiredNetworkState = minRequiredNetworkState + jobInfo.networkRuleTimeout = networkRuleTimeout + } +} + +fun JobInfo.minRequiredNetwork( + networkState: NetworkState, + networkRuleTimeout: Duration = 30.seconds +): JobInfo { + val rule = NetworkRule( + minRequiredNetworkState = networkState, + networkRuleTimeout = networkRuleTimeout + ) + rules.add(rule) + return this +} + +class NetworkRuleTimeoutException(message: String) : CancellationException(message) + diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/PeriodicRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/PeriodicRule.kt index 8dc68d1..0e159c3 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/PeriodicRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/PeriodicRule.kt @@ -8,9 +8,9 @@ import kotlin.time.Duration.Companion.seconds @Serializable data class PeriodicRule(val interval: Duration = 0.seconds): JobRule() { - override suspend fun willRemove(context: JobContext, result: JobEvent) { + override suspend fun willRemove(jobContext: JobContext, result: JobEvent) { if (result is JobEvent.DidSucceed) { - context.repeat(startTime = Clock.System.now().plus(interval)) + jobContext.repeat(startTime = Clock.System.now().plus(interval)) } } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/PersistenceRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/PersistenceRule.kt index 186a777..cc8045f 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/PersistenceRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/PersistenceRule.kt @@ -6,8 +6,8 @@ import kotlinx.serialization.Serializable @Serializable data class PersistenceRule(val shouldPersist: Boolean): JobRule() { - override suspend fun mutating(info: JobInfo) { - info.shouldPersist = shouldPersist + override suspend fun mutating(jobInfo: JobInfo) { + jobInfo.shouldPersist = shouldPersist } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/RetryRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/RetryRule.kt index 8396c44..a3e7c6b 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/RetryRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/RetryRule.kt @@ -8,16 +8,16 @@ import kotlin.time.Duration.Companion.seconds @Serializable data class RetryRule(val limit: RetryLimit, val delay: Duration = 0.seconds): JobRule() { - override suspend fun willRemove(context: JobContext, result: JobEvent) { + override suspend fun willRemove(jobContext: JobContext, result: JobEvent) { if (result is JobEvent.DidFail) { when (limit) { is RetryLimit.Unlimited -> { - context.repeat(startTime = Clock.System.now()) + jobContext.repeat(startTime = Clock.System.now()) } is RetryLimit.Limited -> { if (limit.count > 0) { - val rules = context.info.rules.minus(this).plus(RetryRule(RetryLimit.Limited(limit.count - 1), delay)) - context.repeat(info = context.info.copy(rules = rules.toMutableList()), startTime = Clock.System.now().plus(delay)) + val rules = jobContext.info.rules.minus(this).plus(RetryRule(RetryLimit.Limited(limit.count - 1), delay)) + jobContext.repeat(info = jobContext.info.copy(rules = rules.toMutableList()), startTime = Clock.System.now().plus(delay)) } } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt index 96a4bf0..2a5a03f 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/TimeoutRule.kt @@ -7,8 +7,8 @@ import kotlinx.serialization.Serializable @Serializable data class TimeoutRule(val timeout: Duration): JobRule() { - override suspend fun mutating(info: JobInfo) { - info.timeout = timeout + override suspend fun mutating(jobInfo: JobInfo) { + jobInfo.jobTimeout = timeout } } diff --git a/src/commonMain/kotlin/com/liftric/job/queue/rules/UniqueRule.kt b/src/commonMain/kotlin/com/liftric/job/queue/rules/UniqueRule.kt index 31966ec..89ab330 100644 --- a/src/commonMain/kotlin/com/liftric/job/queue/rules/UniqueRule.kt +++ b/src/commonMain/kotlin/com/liftric/job/queue/rules/UniqueRule.kt @@ -5,16 +5,16 @@ import kotlinx.serialization.Serializable @Serializable data class UniqueRule(private val tag: String? = null): JobRule() { - override suspend fun mutating(info: JobInfo) { - info.tag = tag + override suspend fun mutating(jobInfo: JobInfo) { + jobInfo.tag = tag } - override suspend fun willSchedule(queue: Queue, context: JobContext) { + override suspend fun willSchedule(queue: Queue, jobContext: JobContext) { for (item in queue.jobs) { if (item.info.tag == tag) { throw Throwable("Job with tag=${item.info.tag} already exists") } - if (item.id == context.id) { + if (item.id == jobContext.id) { throw Throwable("Job with id=${item.id} already exists") } } diff --git a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index 8a6791e..ae7a2bb 100644 --- a/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/commonTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -5,7 +5,7 @@ import kotlinx.coroutines.* import kotlin.test.* import kotlin.time.Duration.Companion.seconds -expect class JobQueueTests: AbstractJobQueueTests +expect class JobQueueTests : AbstractJobQueueTests abstract class AbstractJobQueueTests(private val queue: JobQueue) { @AfterTest fun tearDown() = runBlocking { @@ -18,7 +18,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { runBlocking { val id = UUIDFactory.create().toString() val job = async { - queue.listener.collect { + queue.jobEventListener.collect { println(it) } } @@ -49,7 +49,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { fun testRetry() = runBlocking { var count = 0 val job = launch { - queue.listener.collect { + queue.jobEventListener.collect { println(it) if (it is JobEvent.DidScheduleRepeat) { count += 1 @@ -73,7 +73,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { fun testCancelDuringRun() { runBlocking { val listener = launch { - queue.listener.collect { + queue.jobEventListener.collect { println(it) if (it is JobEvent.DidSucceed || it is JobEvent.DidFail) fail("Continued after run") if (it is JobEvent.WillRun) { @@ -103,7 +103,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { val completable = CompletableDeferred() launch { - queue.listener.collect { + queue.jobEventListener.collect { println(it) if (it is JobEvent.DidSucceed || it is JobEvent.DidFail) fail("Continued after run") if (it is JobEvent.DidSchedule) { @@ -130,7 +130,7 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { fun testCancelByIdAfterEnqueue() { runBlocking { launch { - queue.listener.collect { + queue.jobEventListener.collect { println(it) if (it is JobEvent.DidSchedule) { delay(3000L) @@ -168,4 +168,66 @@ abstract class AbstractJobQueueTests(private val queue: JobQueue) { assertEquals(1, queue.numberOfJobs) } + + @Test + fun testNetworkRuleSatisfied() = runBlocking { + val id = UUIDFactory.create().toString() + launch { + queue.jobEventListener.collect { + println(it) + if (it is JobEvent.DidCancel || it is JobEvent.DidSucceed) { + cancel() + assertTrue(it is JobEvent.DidSucceed) + } else return@collect + } + } + + queue.schedule(TestData(id), ::TestTask) { + minRequiredNetwork(NetworkState.NONE, 3.seconds) + } + + println("Network State: ${queue.networkListener.currentNetworkState.value}") + + queue.start() + } + + @Test + fun testNetworkRuleUnsatisfied() = runBlocking { + val id = UUIDFactory.create().toString() + launch { + queue.jobEventListener.collect { + println(it) + assertTrue(it is JobEvent.NetworkRuleTimeout) + cancel() + } + } + + queue.schedule(TestData(id), ::TestTask) { + minRequiredNetwork(NetworkState.WIFI, 3.seconds) + } + + println("Network State: ${queue.networkListener.currentNetworkState.value}") + + queue.start() + } + + @Test + fun testJobTimeout() = runBlocking { + launch { + queue.jobEventListener.collect { + println(it) + if (it is JobEvent.NetworkRuleSatisfied || it is JobEvent.WillRun) { + return@collect + } + assertTrue(it is JobEvent.JobTimeout) + cancel() + } + } + + queue.schedule(::LongRunningTask) { + timeout(5.seconds) + } + + queue.start() + } } diff --git a/src/iosMain/kotlin/com/liftric/job/queue/JobQueue.kt b/src/iosMain/kotlin/com/liftric/job/queue/JobQueue.kt index ab1aa64..74805f2 100644 --- a/src/iosMain/kotlin/com/liftric/job/queue/JobQueue.kt +++ b/src/iosMain/kotlin/com/liftric/job/queue/JobQueue.kt @@ -7,9 +7,11 @@ import platform.Foundation.NSUserDefaults actual class JobQueue( serializers: SerializersModule = SerializersModule {}, configuration: Queue.Configuration = Queue.DefaultConfiguration, + networkListener: NetworkListener, store: JsonStorage = SettingsStorage(NSUserDefaultsSettings(NSUserDefaults("com.liftric.persisted.queue"))) ) : AbstractJobQueue( - serializers, - configuration, - store + serializers = serializers, + networkListener = networkListener, + configuration = configuration, + store = store ) diff --git a/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt new file mode 100644 index 0000000..6a5daa0 --- /dev/null +++ b/src/iosMain/kotlin/com/liftric/job/queue/NetworkListener.kt @@ -0,0 +1,37 @@ +package com.liftric.job.queue + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.launch + +actual class NetworkListener( + networkManager: NetworkManager, + scope: CoroutineScope = CoroutineScope(context = Dispatchers.Default) +) : AbstractNetworkListener( + networkManager = networkManager, + scope = scope +) { + private val _currentNetworkState = MutableStateFlow(NetworkState.NONE) + override val currentNetworkState: StateFlow + get() = _currentNetworkState.asStateFlow() + + private var job: kotlinx.coroutines.Job? = null + + override fun observeNetworkState() { + job = scope.launch { + networkManager.startMonitoring() + networkManager.network.collectLatest { currentNetworkState -> + _currentNetworkState.emit(currentNetworkState) + } + } + } + + override fun stopMonitoring() { + networkManager.stopMonitoring() + job?.cancel() + } +} diff --git a/src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt b/src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt new file mode 100644 index 0000000..fa3aed1 --- /dev/null +++ b/src/iosMain/kotlin/com/liftric/job/queue/NetworkManager.kt @@ -0,0 +1,60 @@ +package com.liftric.job.queue + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.receiveAsFlow +import platform.Network.* +import platform.darwin.DISPATCH_QUEUE_PRIORITY_DEFAULT +import platform.darwin.dispatch_queue_attr_make_with_qos_class +import platform.darwin.dispatch_queue_create +import platform.posix.QOS_CLASS_UTILITY + +actual class NetworkManager { + private val networkChannel = Channel(Channel.UNLIMITED) + val network: Flow = networkChannel.receiveAsFlow() + + private val networkMonitor = object : nw_path_monitor_update_handler_t { + override fun invoke(network: nw_path_t) { + checkReachability(network) + } + + } + private val nwPathMonitor: nw_path_monitor_t = nw_path_monitor_create().apply { + val queue = dispatch_queue_create( + "com.liftric.job.queue", + dispatch_queue_attr_make_with_qos_class( + null, + QOS_CLASS_UTILITY, + DISPATCH_QUEUE_PRIORITY_DEFAULT + ) + ) + nw_path_monitor_set_queue( + this, + queue + ) + nw_path_monitor_set_update_handler(this, networkMonitor) + } + + fun startMonitoring() { + nw_path_monitor_start(nwPathMonitor) + } + + fun stopMonitoring() { + nw_path_monitor_cancel(nwPathMonitor) + } + + private fun checkReachability(network: nw_path_t) { + when (nw_path_get_status(network)) { + nw_path_status_satisfied -> { + if (nw_path_uses_interface_type(network, nw_interface_type_wifi)) { + networkChannel.trySend(NetworkState.WIFI) + } else if (nw_path_uses_interface_type(network, nw_interface_type_cellular)) { + networkChannel.trySend(NetworkState.MOBILE) + } + } + nw_path_status_unsatisfied -> { + networkChannel.trySend(NetworkState.NONE) + } + } + } +} diff --git a/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt b/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt index 0891133..90fc58b 100644 --- a/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt +++ b/src/iosTest/kotlin/com/liftric/job/queue/JobQueueTests.kt @@ -3,11 +3,14 @@ package com.liftric.job.queue import kotlinx.serialization.modules.SerializersModule import kotlinx.serialization.modules.polymorphic -actual class JobQueueTests: AbstractJobQueueTests(JobQueue( - serializers = SerializersModule { - polymorphic(Task::class) { - subclass(TestTask::class, TestTask.serializer()) - } - }, - store = MapStorage() -)) +actual class JobQueueTests : AbstractJobQueueTests( + JobQueue( + serializers = SerializersModule { + polymorphic(Task::class) { + subclass(TestTask::class, TestTask.serializer()) + } + }, + store = MapStorage(), + networkListener = NetworkListener(networkManager = NetworkManager()) + ) +)