前语

NSOperation是Foundation中用于多线程并发使命的抽象类,它能够放到NSOperationQueue使命行列中去履行。它底层还是基于GCD来完结的,相比GCD它支撑撤销操作和依靠管理。

Apple用swift重写了Foundation库并且开源,能够经过阅览NSOperation的swift源代码来了解其完结原理。

github.com/apple/swift…

在剖析源代码之前也请阅览一下苹果的《Concurrency Programming Guide》的Operation Queues一章,要了解怎么运用Operation。

同步和异步

NSOperation是抽象类,运用它需求子类承继它,并覆写一些成员办法。

它有两种形式,一种是同步履行,一种是异步履行。

同步履行,需求覆写main办法即可。

@interface MyOperation : NSOperation
@end
@implementation MyOperation
- (void)main {
  // 要完结的事务逻辑写这儿即可
}
@end

当main办法回来的时分,使命也就完毕了。

异步履行,需求覆写start办法,一起完结asynchronous、concurrent、executing和finished属性。

@interface MyOperation : NSOperation {
  BOOL _executing;
  BOOL _finished;
}
@property (nonatomic, assign, getter = isExecuting) BOOL executing;
@property (nonatomic, assign, getter = isFinished) BOOL finished;
@end
@implementation MyOperation
@synthesize executing = _executing;
@synthesize finished = _finished;
- (void)start {
  // 要完结的事务逻辑写这儿即可
}
- (BOOL)isAsynchronous {
    return YES;
}
- (BOOL)isConcurrent {
    return YES;
}
- (void)setFinished:(BOOL)finished {
    [self willChangeValueForKey:@"isFinished"];
    _finished = finished;
    [self didChangeValueForKey:@"isFinished"];
}
- (void)setExecuting:(BOOL)executing {
    [self willChangeValueForKey:@"isExecuting"];
    _executing = executing;
    [self didChangeValueForKey:@"isExecuting"];
}
@end

当start办法回来的时分,使命能够持续履行,直到使命真正履行完毕,我们履行self.finished=YES; self.executing=YES;经过KVO的办法告诉NSOperation使命现已履行完毕,能够从NSOperationQueue中移除此使命了。

OperationQueue

OperationQueue是使命的履行行列

初始化

OperationQueue有两个初始化办法:

一个是public办法给大家创建使命行列运用的

open class OperationQueue : NSObject, ProgressReporting {
    // 使命行列名字
    var __name: String?
    // 对外的初始化办法
    public override init() {
        super.init()
        __name = "NSOperationQueue (Unmanaged<OperationQueue>.passUnretained(self).toOpaque())"
    }
}

另一个是internal办法,专门用来初始化mainQueue的

extension OperationQueue {
    public static let defaultMaxConcurrentOperationCount: Int = -1
}
open class OperationQueue : NSObject, ProgressReporting {
    // 是否是主线程行列
    var __mainQ: Bool = false
    // 并发数上限
    var __maxNumOps: Int = OperationQueue.defaultMaxConcurrentOperationCount
    // 实践并发数上限
    var __actualMaxNumOps: Int32 = .max
    // 使命行列名字
    var __name: String?
    // QOS,使命行列的优先级
    var __propertyQoS: QualityOfService?
    // asMainQueue是一个空元组,仅仅为了重载init办法,便利内部运用
    internal init(asMainQueue: ()) {
        super.init()
        __mainQ = true
        __maxNumOps = 1
        __actualMaxNumOps = 1
        __name = "NSOperationQueue Main Queue"
#if canImport(Darwin)
        __propertyQoS = QualityOfService(qos_class_main())
#else
        __propertyQoS = QualityOfService.userInteractive
#endif
    }
    // 回来主线程使命行列
    open class var main: OperationQueue {
        get {
            struct Once {
                // 运用static确保了变量只初始化一次
                static let mainQ = OperationQueue(asMainQueue: ())
            }
            return Once.mainQ
        }
    }
}

增加使命

增加一个Operation到使命行列中,对外提供了两个办法,能够增加单个使命或许批量使命。

open class OperationQueue : NSObject, ProgressReporting {
    // 增加单个使命
    open func addOperation(_ op: Operation) {
        _addOperations([op], barrier: false)
    }
    // 增加多个使命,一起支撑设置是否同步等候
    open func addOperations(_ ops: [Operation], waitUntilFinished wait: Bool) {
        _addOperations(ops, barrier: false)
        if wait {
            for op in ops {
                op.waitUntilFinished()
            }
        }
    }
}

对内提供了一个下划线最初的批量增加使命办法,一切的增加使命,都最终会走到此办法中

open class OperationQueue : NSObject, ProgressReporting {
    internal func _addOperations(_ ops: [Operation], barrier: Bool = false) {
        // 判空维护
        if ops.isEmpty { return }
        // 失利数
        var failures = 0
        // 成功数
        var successes = 0
        // 第一个新使命
        var firstNewOp: Unmanaged<Operation>?
        // 上一个新使命
        var lastNewOp: Unmanaged<Operation>?
        // 循环传入的使命数组
        for op in ops {
            // 对op的状况进行CAS修正,从initialized修正成enqueuing
            if op._compareAndSwapState(.initialized, .enqueuing) {
                // 假如修正成功,成功数累加1
                successes += 1
                // 判别之前的修正是否都成功
                if 0 == failures {
                    // 走到这儿,阐明一切的使命的状况都正确修正成enqueuing
                    // retain一下使命目标
                    let retained = Unmanaged.passRetained(op)
                    // 假如使命的没有未完结的依靠使命,则为isReady为YES,缓存到_cachedIsReady
                    op._cachedIsReady = op.isReady
                    // GCD的工作目标
                    let schedule: DispatchWorkItem
                    // 创建GCD的工作目标
                    if let qos = op.__propertyQoS?.qosClass {
                        // 使命有设置过QoS,enforceQoS表明运用传入的qos
                        schedule = DispatchWorkItem.init(qos: qos, flags: .enforceQoS, block: {
                            // 当工作目标放入DispatchQueue的时分,会开端履行此block
                            // self是OperationQueue目标,履行其_schedule办法,来调度使命
                            self._schedule(op)
                        })
                    } else {
                        // 使命没有设置过Qos,assignCurrentContext表明运用当时线程的Qos
                        schedule = DispatchWorkItem(flags: .assignCurrentContext, block: {
                            // 重复代码,同上
                            self._schedule(op)
                        })
                    }
                    // 调用Operation的_adopt办法,设置__queue和__schedule成员变量
                    op._adopt(queue: self, schedule: schedule)
                    // 使命行列实践上是双向链表存储,设置使命的前后链表节点
                    op.__previousOperation = lastNewOp
                    op.__nextOperation = nil
                    if let lastNewOperation = lastNewOp?.takeUnretainedValue() {
                        lastNewOperation.__nextOperation = retained
                    } else {
                        firstNewOp = retained
                    }
                    lastNewOp = retained
                } else {
                   // 前面的使命状况有修正失利的场景,回滚当时使命的状况修正
                    _ = op._compareAndSwapState(.enqueuing, .initialized)
                }
            } else {
                // 假如修正失利,失利数累加1
                failures += 1
            }
        }
        // 判别是否有使命入队失利
        if 0 < failures {
            // 由于有失利状况产生,要回滚前面的操作,firstNewOp便是双向链表的头节点
            while let firstNewOperation = firstNewOp?.takeUnretainedValue() {
                // 取出下一个节点
                let nextNewOp = firstNewOperation.__nextOperation
                // 对当时使命目标做重置操作
                firstNewOperation._invalidateQueue()
                // 清理链接节点指针
                firstNewOperation.__previousOperation = nil
                firstNewOperation.__nextOperation = nil
                // 回滚使命的状况
                _ = firstNewOperation._compareAndSwapState(.enqueuing, .initialized)
                // 释放使命
                firstNewOp?.release()
                // 持续处理后继节点
                firstNewOp = nextNewOp
            }
            // 回滚完,还是要报fatalError
            fatalError("operations finished, executing or already in a queue cannot be enqueued")
        }
        // Attach any operations pending attachment to main list
        // 非堵塞使命
        if !barrier {
            // 加锁,由于后续要修正OperationQueue的成员变量
            _lock()
            // OperationCount自增1,这儿自增1不是很对啊
            _incrementOperationCount()
        }
        // 前面现已处理好使命的状况,并生成了一个双向链表
        // firstNewOp便是链表头,lastNewOp便是链表最后一个元素
        // __firstOperation和__lastOperation是OperationQueue现已存在的双向链表
        // 这儿是要把此次传入的使命链表,挂到OperationQueue的双向链表上去
        var pending = firstNewOp
        if let pendingOperation = pending?.takeUnretainedValue() {
            let old_last = __lastOperation
            pendingOperation.__previousOperation = old_last
            if let old = old_last?.takeUnretainedValue() {
                old.__nextOperation = pending
            } else {
                __firstOperation = pending
            }
            __lastOperation = lastNewOp
        }
        // pending便是firstNewOp,是链表表头,这儿便是循环处理使命链表
        while let pendingOperation = pending?.takeUnretainedValue() {
            if !barrier {
                // 假如使命中有要堵塞的barrie使命,是经过把它增加到依靠使命中,来完结的
                var barrierOp = _firstPriorityOperation(Operation.QueuePriority.barrier)
                while let barrierOperation = barrierOp?.takeUnretainedValue() {
                    pendingOperation._addDependency(barrierOperation)
                    barrierOp = barrierOperation.__nextPriorityOperation
                }
            }
            // 使命的状况从enqueuing改成enqueued
            _ = pendingOperation._compareAndSwapState(.enqueuing, .enqueued)
            // 依据__priorityValue和__propertyQoS设置一下使命的优先级
            // 假如未设置过优先级,默许是Operation.QueuePriority.normal等级
            var pri = pendingOperation.__priorityValue
            if pri == nil {
                let v = __actualMaxNumOps == 1 ? nil : pendingOperation.__propertyQoS
                if let qos = v {
                    switch qos {
                    case .default: pri = Operation.QueuePriority.normal.rawValue
                    case .userInteractive: pri = Operation.QueuePriority.veryHigh.rawValue
                    case .userInitiated: pri = Operation.QueuePriority.high.rawValue
                    case .utility: pri = Operation.QueuePriority.low.rawValue
                    case .background: pri = Operation.QueuePriority.veryLow.rawValue
                    }
                } else {
                    pri = Operation.QueuePriority.normal.rawValue
                }
            }
            // 按照使命的优先级,还有一个单链表,链表节点字段是Operation的__nextPriorityOperation
            // 单链表的头节点和尾节点是OperationQueue的__firstPriorityOperation和__lastPriorityOperation
            // 它俩是元组目标,包含一切等级的单链表的头节点和尾节点
            pendingOperation.__nextPriorityOperation = nil
            if let old_last = _lastPriorityOperation(pri)?.takeUnretainedValue() {
                old_last.__nextPriorityOperation = pending
            } else {
                _setFirstPriorityOperation(pri!, pending)
            }
            _setlastPriorityOperation(pri!, pending)
            // 循环下一个链表
            pending = pendingOperation.__nextOperation
        }
        // 非堵塞使命
        if !barrier {
            // 解锁,使命增加完毕
            _unlock()
        }
        // 堵塞使命由调用方完结调度逻辑
        if !barrier {
            // 调度使命
            _schedule()
        }
    }
}

调度使命

internal func _schedule() {
        // 重试使命数组
        var retestOps = [Operation]()
        // 由于要对OperationQueue操作,所以要先加锁
        _lock()
        // 可用的槽位数
        var slotsAvail = __actualMaxNumOps - __numExecOps
        // 要循环一下优先级单链表上的使命
        for prio in Operation.QueuePriority.priorities {
            // 假如没有可用的槽位,或许当时使命行列被暂停,则跳出循环
            if 0 >= slotsAvail || _suspended {
                break
            }
            // 依据使命优先级获取使命优先级单链表的表头
            var op = _firstPriorityOperation(prio)
            // 前驱节点,移除当时节点的时分要运用
            var prev: Unmanaged<Operation>?
            while let operation = op?.takeUnretainedValue() {
                // 维护判别一下
                if 0 >= slotsAvail || _suspended {
                    break
                }
                // 后继节点
                let next = operation.__nextPriorityOperation
                // 是否要重试
                var retest = false
                // if the cached state is possibly not valid then the isReady value needs to be re-updated
                // 假如使命状况现已是enqueued,并且没有未完结的依靠,则开端履行此使命
                if Operation.__NSOperationState.enqueued == operation._state && operation._fetchCachedIsReady(&retest) {
                    // 从使命优先级单链表上移除此使命
                    if let previous = prev?.takeUnretainedValue() {
                        previous.__nextPriorityOperation = next
                    } else {
                        _setFirstPriorityOperation(prio, next)
                    }
                    if next == nil {
                        _setlastPriorityOperation(prio, prev)
                    }
                    // 现已从单链表上移除,清空后继节点
                    operation.__nextPriorityOperation = nil
                    // 状况修正成dispatching
                    operation._state = .dispatching
                    // __numExecOps自增1
                    _incrementExecutingOperations()
                    // 可用的槽位数减1
                    slotsAvail -= 1
                    // 选择GCD的Queue
                    let queue: DispatchQueue
                    if __mainQ {
                        // 假如是主线程,则运用主线程的GCD的Queue
                        queue = DispatchQueue.main
                    } else {
                        // 假如未设置__dispatch_queue,会依据QoS合成一个对应的GCD DispatchQueue
                        queue = __dispatch_queue ?? _synthesizeBackingQueue()
                    }
                    // 取出使命的GCD工作目标
                    if let schedule = operation.__schedule {
                        if operation is _BarrierOperation {
                            // 假如是Barrier堵塞使命,则调用GCD的时分也设置barrier参数
                            queue.async(flags: .barrier, execute: {
                                schedule.perform()
                            })
                        } else {
                            // 异步履行使命
                            queue.async(execute: schedule)
                        }
                    }
                    // 处理后续使命
                    op = next
                } else {
                    // 假如使命还未准备好
                    if retest {
                        // 放到重试数组中
                        retestOps.append(operation)
                    }
                    // 持续循环后续使命
                    prev = op
                    op = next
                }
            }
        }
        // 解锁
        _unlock()
        // 尝试从头计算未准备好的使命的ready状况
        for op in retestOps {
            if op.isReady {
                op._cachedIsReady = true
            }
        }
    }

DispatchWorkItem的block会履行OperationQueue的_schedule(_ op: Operation)办法,便是下面这个办法,这个block是_addOperations的时分设置的

internal func _schedule(_ op: Operation) {
    // 设置使命状况为starting
    op._state = .starting
    // set current tsd
    // 这样设置今后,在当时线程中的start办法能够找到OperationQueue的引证,Operation没有对外露出其引证
    // 便是OperationQueue.current
    OperationQueue._currentQueue.set(self)
    // 开端履行Operation的start办法
    op.start()
    // 重置currentQueue
    OperationQueue._currentQueue.clear()
    // We've just cleared _currentQueue storage.
    // NSThreadSpecific doesn't release stored value on clear.
    // This means `self` will leak if we don't release manually.
    Unmanaged.passUnretained(self).release()
    // 假如使命的isFinished回来YES,可是状况还不是finished,触发一下KVO,完毕当时使命
    // 这个归于补偿逻辑,一般设置了finished = YES,按理说就应该触发其KVO
    // unset current tsd
    if op.isFinished && op._state.rawValue < Operation.__NSOperationState.finishing.rawValue {
        Operation.observeValue(forKeyPath: _NSOperationIsFinished, ofObject: op)
    }
}

履行使命

在同步形式的时分,还有一个_execute办法会履行

internal func _execute(_ op: Operation) {
    // 同步形式能够反应使命进展
    var operationProgress: Progress? = nil
    if !(op is _BarrierOperation) && _isReportingProgress {
        let opProgress = Progress(parent: nil, userInfo: nil)
        opProgress.totalUnitCount = 1
        progress.addChild(opProgress, withPendingUnitCount: 1)
        operationProgress = opProgress
    }
    operationProgress?.becomeCurrent(withPendingUnitCount: 1)
    defer { operationProgress?.resignCurrent() }
    // 履行使命的main办法
    op.main()
}

完毕使命

internal func _operationFinished(_ op: Operation, _ previousState: Operation.__NSOperationState) {
    // There are only three cases where an operation might have a nil queue
    // A) The operation was never added to a queue and we got here by a normal KVO change
    // B) The operation was somehow already finished
    // C) the operation was attempted to be added to a queue but an exception occured and was ignored...
    // Option C is NOT supported!
    // 判别使命是否是堵塞使命
    let isBarrier = op is _BarrierOperation
    // 加锁
    _lock()
    // 取后续使命,移除链表的时分要运用
    let nextOp = op.__nextOperation
    // 判别使命状况是否已完毕
    if Operation.__NSOperationState.finished == op._state {
        // 使命已履行完,讲使命移除链表
        let prevOp = op.__previousOperation
        if let prev = prevOp {
            prev.takeUnretainedValue().__nextOperation = nextOp
        } else {
            __firstOperation = nextOp
        }
        if let next = nextOp {
            next.takeUnretainedValue().__previousOperation = prevOp
        } else {
            __lastOperation = prevOp
        }
        // only decrement execution count on operations that were executing! (the execution was initially set to __NSOperationStateDispatching so we must compare from that or later)
        // else the number of executing operations might underflow
        // 减少正在履行的使命数
        if previousState.rawValue >= Operation.__NSOperationState.dispatching.rawValue {
            _decrementExecutingOperations()
        }
        // 重置使命的前后链表节点
        op.__previousOperation = nil
        op.__nextOperation = nil
        // 重置使命行列
        op._invalidateQueue()
    }
    // 减少使命数
    if !isBarrier {
        _decrementOperationCount()
    }
    // 解锁
    _unlock()
    // 在调度一下其他使命
    _schedule()
    // 释放使命
    if previousState.rawValue >= Operation.__NSOperationState.enqueuing.rawValue {
        Unmanaged.passUnretained(op).release()
    }
}

Operation

使命状况

Operation有一个__state成员变量,表明当时使命状况,__NSOperationState枚举有8个值,初始化的时分initialized,每次对__state状况修正,都有__atomicLoad锁来维护。

open class Operation : NSObject {
    // 使命状况枚举
    enum __NSOperationState : UInt8 {
        case initialized = 0x00
        case enqueuing = 0x48
        case enqueued = 0x50
        case dispatching = 0x88
        case starting = 0xD8
        case executing = 0xE0
        case finishing = 0xF0
        case finished = 0xF4
    }
    // 使命状况
    internal var __state: __NSOperationState = .initialized
    // 负责同步使命状况的锁
    var __atomicLoad = NSLock()
    // 使命状况的get和set办法都由__atomicLoad来同步
    internal var _state: __NSOperationState {
        get {
            __atomicLoad.lock()
            defer { __atomicLoad.unlock() }
            return __state
        }
        set(newValue) {
            __atomicLoad.lock()
            defer { __atomicLoad.unlock() }
            __state = newValue
        }
    }
    // CAS办法,用来修正使命状况
    internal func _compareAndSwapState(_ old: __NSOperationState, _ new: __NSOperationState) -> Bool {
        __atomicLoad.lock()
        defer { __atomicLoad.unlock() }
        if __state != old { return false }
        __state = new
        return true
    }
}

初始化

Operation的init办法是一个空完结,子类完结的时分能够重载init办法,带一些事务数据字段,将来能够在start办法中运用,比方,下载的链接地址NSURL目标。

open class Operation : NSObject {
    public override init() { }
}

start

OperationQueue调度使命履行的时分,最终会履行使命的start办法,假如是异步形式,子类是要覆写start办法的,假如是同步形式,start办法不用动,start办法内部最终会调用main办法。

open func start() {
    let state = _state
    // 判别一下使命的状况,假如现已完毕就直接回来
    if __NSOperationState.finished == state { return }
    // 判别使命是否是重复履行
    if !_compareAndSwapState(__NSOperationState.initialized, __NSOperationState.starting) && !(__NSOperationState.starting == state && __queue != nil) {
        switch state {
        case .executing: fallthrough
        case .finishing:
            fatalError("(self): receiver is already executing")
        default:
            fatalError("(self): something is trying to start the receiver simultaneously from more than one thread")
        }
    }
    // 使命状况是已入队,可是还没有ready,就开端履行,也要报错
    if state.rawValue < __NSOperationState.enqueued.rawValue && !isReady {
        _state = state
        fatalError("(self): receiver is not yet ready to execute")
    }
    // 判别使命是否已撤销
    let isCanc = _isCancelled
    if !isCanc {
        // 使命未撤销,状况改成executing开端履行
        _state = .executing
        Operation.observeValue(forKeyPath: _NSOperationIsExecuting, ofObject: self)
        // 假如使命放到了行列中,就调用行列的_execute办法,不然直接履行main办法
        _queue?._execute(self) ?? main()
    }
    // 完毕使命
    if __NSOperationState.executing == _state {
        _state = .finishing
        Operation.observeValue(forKeyPath: _NSOperationIsExecuting, ofObject: self)
        Operation.observeValue(forKeyPath: _NSOperationIsFinished, ofObject: self)
    } else {
        _state = .finishing
        Operation.observeValue(forKeyPath: _NSOperationIsFinished, ofObject: self)
    }
}

KVO监控处理

internal static func observeValue(forKeyPath keyPath: String, ofObject op: Operation) {
    enum Transition {
        case toFinished
        case toExecuting
        case toReady
    }
    // 依据keyPath来设置kind
    let kind: Transition?
    if keyPath == _NSOperationIsFinished || keyPath == _NSOperationIsFinishedAlternate {
        kind = .toFinished
    } else if keyPath == _NSOperationIsExecuting || keyPath == _NSOperationIsExecutingAlternate {
        kind = .toExecuting
    } else if keyPath == _NSOperationIsReady || keyPath == _NSOperationIsReadyAlternate {
        kind = .toReady
    } else {
        kind = nil
    }
    if let transition = kind {
        switch transition {
        case .toFinished: // we only care about NO -> YES
            // 触发了isFinished KVO
            if !op.isFinished {
                // 误触发,isFinished回来NO,先回来
                return
            }
            // 以及ready的依靠使命
            var ready_deps = [Operation]()
            op._lock()
            let state = op._state
            // 打印警告信息,使命还未开端履行,就现已设置了isFinished KVO
            if op.__queue != nil && state.rawValue < __NSOperationState.starting.rawValue {
                print("*** (type(of: op)) (Unmanaged.passUnretained(op).toOpaque()) went isFinished=YES without being started by the queue it is in")
            }
            if state.rawValue < __NSOperationState.finishing.rawValue {
                op._state = .finishing
            } else if state == .finished {
                op._unlock()
                return
            }
            let down_deps = op.__downDependencies
            op.__downDependencies.removeAll()
            if 0 < down_deps.count {
                for down in down_deps {
                    let idown = down.contents.takeUnretainedValue()
                    idown._lock()
                    if idown._unfinishedDependencyCount == 1 {
                        ready_deps.append(idown)
                    } else if idown._unfinishedDependencyCount > 1 {
                        idown._decrementUnfinishedDependencyCount()
                    } else {
                        assert(idown._unfinishedDependencyCount  == 0)
                        assert(idown._isCancelled == true)
                    }
                    idown._unlock()
                }
            }
            // 使命完毕,修正状况
            op._state = .finished
            // 重置使命行列引证
            let oq = op.__queue
            op.__queue = nil
            // 是否使命锁
            op._unlock()
            if 0 < ready_deps.count {
                for down in ready_deps {
                    down._lock()
                    if down._unfinishedDependencyCount >= 1 {
                        down._decrementUnfinishedDependencyCount()
                    }
                    down._unlock()
                    Operation.observeValue(forKeyPath: _NSOperationIsReady, ofObject: down)
                }
            }
            // waitUntilFinished办法能够回来了
            op.__waitCondition.lock()
            op.__waitCondition.broadcast()
            op.__waitCondition.unlock()
            // 假如设置__completion回调,这异步履行一下回调
            if let complete = op.__completion {
                let held = Unmanaged.passRetained(op)
                DispatchQueue.global(qos: .default).async {
                    complete()
                    held.release()
                }
            }
            // 从使命行列中移除现已完结的使命
            if let queue = oq {
                queue.takeUnretainedValue()._operationFinished(op, state)
                queue.release()
            }
        case .toExecuting:
            let isExecuting = op.isExecuting
            op._lock()
            // 更新状况为正在履行
            if op._state.rawValue < __NSOperationState.executing.rawValue && isExecuting {
                op._state = .executing
            }
            op._unlock()
        case .toReady:
            // 使命现已ready,调度履行一下使命
            let r = op.isReady
            op._cachedIsReady = r
            let q = op._queue
            if r {
                q?._schedule()
            }
        }
    }
}

总结

以上便是Operation的核心处理流程,通读一遍源代码,对了解Operation Queue有非常大的协助。