public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
// 1 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) }
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) }
// 等价 // ↓
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { // 1 val continuation = createCoroutineUnintercepted(completion) // 2 val dispatchedContinuation = continuation.intercepted() // 3 dispatchedContinuation.resumeCancellableWith(Result.success(Unit)) }
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) // 1 if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE // 2 dispatcher.dispatch(context, this) } else { // 3 executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled(state)) { resumeUndispatchedWith(result) } } } }
}
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { // 默认是true public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable) }
internal open class SchedulerCoroutineDispatcher( private val corePoolSize: Int = CORE_POOL_SIZE, private val maxPoolSize: Int = MAX_POOL_SIZE, private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, private val schedulerName: String = "CoroutineScheduler", ) : ExecutorCoroutineDispatcher() {
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) }
internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable {
override fun execute(command: Runnable) = dispatch(command)
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { trackTask() // 1 val task = createTask(block, taskContext) // 2 val currentWorker = currentWorker() // 3 val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) if (notAdded != null) { if (!addToGlobalQueue(notAdded)) {
throw RejectedExecutionException("$schedulerName was terminated") } } val skipUnpark = tailDispatch && currentWorker != null
if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) return signalCpuWork() } else {
signalBlockingWork(skipUnpark = skipUnpark) } }
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
public final override fun run() {
val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation<T> val continuation = delegate.continuation withContinuationContext(continuation, delegate.countOrElement) { val context = continuation.context val state = takeState() val exception = getExceptionalResult(state)
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null if (job != null && !job.isActive) { // 1 val cause = job.getCancellationException() cancelCompletedResult(state, cause) continuation.resumeWithStackTrace(cause) } else { if (exception != null) { // 2 continuation.resumeWithException(exception) } else { // 3 continuation.resume(getSuccessfulResult(state)) } } } } catch (e: Throwable) {
fatalException = e } finally { val result = runCatching { taskContext.afterTask() } handleFatalException(fatalException, result.exceptionOrNull()) } } }