Skip to content

Commit

Permalink
Remove intrinsics usage in RibCoroutineWorker
Browse files Browse the repository at this point in the history
This commit changes dispatching logic to a much simpler one that does not require usage of intrinsics.

In order to synchronously get an instance of `bindJob`, we start the `unbindJob` coroutine undispatched (`bindJob` is a child of `unbindJob`, so we need an instance of `unbindJob` to create an instance of `bindJob`).

After saving `bindJob`, we properly dispatch in a cancellable way by simply `launch`ing a new coroutine.
  • Loading branch information
psteiger committed Oct 14, 2024
1 parent d2711d8 commit 4b2df0e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,10 @@
package com.uber.rib.core

import com.uber.autodispose.coroutinesinterop.asScopeProvider
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.intrinsics.intercepted
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
import kotlin.coroutines.resume
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -122,10 +112,17 @@ public fun CoroutineScope.bind(
worker: RibCoroutineWorker,
context: CoroutineContext = RibDispatchers.Default,
): BindWorkerHandle {
val bindJob: CompletableJob // A job that completes once worker's onStart completes
var bindJob: CompletableJob? = null // A job that completes once worker's onStart completes
val unbindJob =
launch(context, { bindJob = createBindingJob() }) { bindAndAwaitCancellation(worker, bindJob) }
return BindWorkerHandleImpl(bindJob, unbindJob)
launch(context, CoroutineStart.UNDISPATCHED) {
val job = createBindingJob()
bindJob = job
// launch again -- this time, we will dispatch if installed dispatcher
// tell us to (CoroutineDispatcher.isDispatchNeeded()).
launch { bindAndAwaitCancellation(worker, job) }
}
// !! is safe here -- outer coroutine was started undispatched.
return BindWorkerHandleImpl(bindJob!!, unbindJob)
}

/** Binds [workers] in a scope that is a child of the [CoroutineScope] receiver. */
Expand All @@ -139,46 +136,6 @@ public fun CoroutineScope.bind(
}
}

/**
* Guarantees to run synchronous [init] block exactly once in an undispatched manner.
*
* **Exceptions thrown in [init] block will be rethrown at call site.**
*/
@OptIn(ExperimentalContracts::class)
private fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
init: CoroutineScope.() -> Unit = {},
block: suspend CoroutineScope.() -> Unit,
): Job {
contract {
callsInPlace(init, InvocationKind.EXACTLY_ONCE)
callsInPlace(block, InvocationKind.AT_MOST_ONCE)
}
var initError: Throwable? = null
val job =
launch(context, CoroutineStart.UNDISPATCHED) {
runCatching(init).onFailure { initError = it }.getOrThrow()
dispatchIfNeeded()
block()
}
initError?.let { throw it }
return job
}

private suspend inline fun dispatchIfNeeded() {
suspendCoroutineUninterceptedOrReturn sc@{ cont ->
val context = cont.context
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
if (!dispatcher.isDispatchNeeded(context)) return@sc Unit
// Coroutine was not in the right context -- we'll dispatch.
context.ensureActive()
cont.intercepted().resume(Unit)
COROUTINE_SUSPENDED
}
// Don't continue if coroutine was cancelled after returning from dispatch.
coroutineContext.ensureActive()
}

private fun CoroutineScope.createBindingJob(): CompletableJob =
Job(coroutineContext.job).also {
// Cancel `unbindJob` if `bindJob` has cancelled. This is important to abort `onStart` if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,18 @@ class RibCoroutineWorkerTest {
val router = mock<Router<*>>()
val interactor = object : Interactor<Any, Router<*>>() {}
val subject = PublishSubject.create<Unit>()
var started = false
var disposed = false
val ribCoroutineWorker = RibCoroutineWorker {
subject.doOnDispose { disposed = true }.autoDispose(this).subscribe()
val ribCoroutineWorker = RibCoroutineWorker { scope ->
started = true
subject.doOnDispose { disposed = true }.autoDispose(scope).subscribe()
}
val worker = ribCoroutineWorker.asWorker()
InteractorHelper.attach(interactor, Any(), router, null)
val unbinder = WorkerBinder.bind(interactor, worker)
runCurrent()
subject.onNext(Unit)
assertThat(started).isTrue()
assertThat(disposed).isFalse()
unbinder.unbind()
runCurrent()
Expand Down

0 comments on commit 4b2df0e

Please sign in to comment.