RxSwift 三大支柱:Observable 可观察对象,操作符 Operator 和线程调度 Schedulers
本文手把手,看下 RxSwift 是怎么实现操作符 Operator 的
例子,是 sample 采样操作符
我们这样使用 sample
// firstSubject 是输入事件流 // secondSubject 是一个控制事件流 let firstSubject = PublishSubject<String>() let secondSubject = PublishSubject<String>() firstSubject.sample(secondSubject).subscribe(onNext: { (content) in print(content) }).disposed(by: rx.disposeBag) // 输入事件流,有了事件 firstSubject.onNext("1") // 控制事件流,去触发输入事件流的事件 secondSubject.onNext("A") 复制代码
下面的代码表明,调用 .sample
, 是一层封装,方便组合与调用
采样实际的实现逻辑,全部在创建的类 Sample
firstSubject.sample(secondSubject)
, 输入两个 observable,输出一个新的 observable .
extension ObservableType { public func sample<Source: ObservableType>(_ sampler: Source) -> Observable<Element> { return Sample(source: self.asObservable(), sampler: sampler.asObservable()) } } 复制代码
Sample 类,是事件生产者类 Producer 的子类
前半段,很简单,弄两属性,
firstSubject.sample(secondSubject)
, 就做了一个 Sample 的初始化,把 firstSubject
和 secondSubject
, 交给这两属性持有
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element
,这方法比较核心,里面完成了 firstSubject.sample(secondSubject)
的两个事件输入流的管理
起作用的是,操作符对应的若干 Sink 类
final private class Sample<Element, SampleType>: Producer<Element> { fileprivate let _source: Observable<Element> fileprivate let _sampler: Observable<SampleType> init(source: Observable<Element>, sampler: Observable<SampleType>) { self._source = source self._sampler = sampler } override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel) let subscription = sink.run() return (sink: sink, subscription: subscription) } } 复制代码
final private class SampleSequenceSink<Observer: ObserverType, SampleType> : Sink<Observer>
上一步的 let subscription = sink.run()
,run()
的部分
func run() -> Disposable { self._sourceSubscription.setDisposable(self._parent._source.subscribe(self)) let samplerSubscription = self._parent._sampler.subscribe(SamplerSink(parent: self)) return Disposables.create(_sourceSubscription, samplerSubscription) } 复制代码
self._parent._source.subscribe(self)
,建立订阅,self._parent._source
是 , firstSubject.sample(secondSubject)
中的 firstSubject
self._parent._sampler.subscribe(SamplerSink(parent: self)
, 继续建立订阅self._parent._sampler
是, firstSubject.sample(secondSubject)
中的 secondSubject
self._sourceSubscription.setDisposable
和return Disposables.create(_sourceSubscription, samplerSubscription)
, 是内存管理方面
管理 subscribe 订阅到的对象 ObservableType
final private class SamplerSink<Observer: ObserverType, SampleType>
SamplerSink
主要涉及 firstSubject.sample(secondSubject)
中的 secondSubject
的事件处理逻辑,采样时机控制
SampleSequenceSink
主要涉及 firstSubject.sample(secondSubject)
中的 firstSubject
的事件处理逻辑,输入事件状态管理
套路三步,有些弯弯绕绕
sink, 事件的下沉,套路三步的底部
粗点讲:
Sink 是从 Observable , 事件流 event sequence 中取出事件,取出事件含有的值的一种方式
Observable 里面的事件 Event, 不能够直接取出来,必须通过订阅,才能读取
操作符 Operator 处理, 可观察对象 Observable
Observable 起作用的方式,是 subscribe 订阅出事件 Event
Observable 事件流,就是传递事件 Event 的
Event 是一个枚举,有三种状态
有事情, .next(value)
没事情,分两种情况,正常结束 gg , .completed
和出错凉凉 .error(error)
public enum Event<Element> { /// 生产出下一个事件 case next(Element) /// 事件流结束,失败 case error(Swift.Error) /// 事件流完成,成功 case completed } 复制代码
firstSubject.sample(secondSubject)
, 从输入两枚,到输出一个
Observable
的事件,处理下,输出需要的 Observable
已经被弃用的
Variable
, 掏出.next(value)
中的value
, 比较简单, 事情想来就来,不符合响应式规范
Observable
的 Event 事件取出,只能通过订阅的方式 subscribe
下面是用例,继续 sample
SampleSequenceSink
里面的订阅逻辑处理self._parent._source.subscribe(self) 复制代码
firstSubject.sample(secondSubject)
中的 firstSubject
, 输入事件流,被采样的对象,
因为 firstSubject
订阅了 self
,即 SampleSequenceSink
类的对象
func _synchronized_on(_ event: Event<Element>)
中的 Event,就是 firstSubject
的事件
firstSubject
流入的 .next
正常事件,仅仅用来状态处理,其最新事件的值 element ,交给 _element
去记录
他流入的 .completed
正常结束事件,也是用来状态处理,记录在内部属性 _atEnd
他流入的 .error
出错了,异常结束事件,转发出去,
forwardOn
就是事件交给下一个,事件被输出了
self.dispose()
, 相关对象被置为 nil, 事件流被立即终止
func _synchronized_on(_ event: Event<Element>) { switch event { case .next(let element): self._element = element case .error: self.forwardOn(event) self.dispose() case .completed: self._atEnd = true self._sourceSubscription.dispose() } } 复制代码
SamplerSink
里面的订阅逻辑处理SampleSequenceSink
里面 run
中, self._parent._sampler.subscribe(SamplerSink(parent: self))
firstSubject.sample(secondSubject)
中的 secondSubject
, 控制事件流,负责采样的,
订阅了 SamplerSink
的对象,
决定了 SamplerSink
的 func _synchronized_on(_ event: Event<Element>)
方法中流入的是 secondSubject
采样事件流
secondSubject
流入的 .next
正常事件,和 .completed
正常结束事件,一道处理了_parent._element
,其中 _parent
是 SampleSequenceSink
类的对象,
_parent._element
,就是 SampleSequenceSink
类记录 firstSubject
流入的 .next
正常事件最新的值 value
采样事件来了,就把输入事件最新的值,转发到输出的事件流 ,forwardOn
一下
再把 _parent._element
置为 nil, 这样不会重复采样
于此,.sample
操作符的实现逻辑,大致出来了
case .next, .completed:
等同于 case .next(let _), .completed:
, secondSubject
采样流事件的值,没有处理
self._parent._atEnd
,上一步记录的输入流终止了,采样流又来,
这个时候转发终止事件到输出,做一个内存释放
.error
出错了,异常结束事件,转发出去,事件流结束,内存释放
func _synchronized_on(_ event: Event<Element>) { switch event { case .next, .completed: if let element = _parent._element { self._parent._element = nil self._parent.forwardOn(.next(element)) } if self._parent._atEnd { self._parent.forwardOn(.completed) self._parent.dispose() } case .error(let e): self._parent.forwardOn(.error(e)) self._parent.dispose() } } 复制代码
SampleSequenceSink
的直接输入是, firstSubject.sample(secondSubject)
中的输入事件流 firstSubject
SampleSequenceSink
的直接输出是 class Sample
SampleSequenceSink
做的事情主要是记录输入事件流 firstSubject
最新的状态,最近事件的值与是否结束
SamplerSink
的直接输入是, firstSubject.sample(secondSubject)
中的采样时机控制secondSubject
采样流到了,他就 self._parent.forwardOn
,
self._parent
即 SampleSequenceSink
的对象
self._parent.forwardOn
, 就是控制住了输入流 firstSubject 到输出流的时机
Producer
类的实现Producer
Producer
类是 Observable<Element>
可观察对象的子类, 可观察即可供订阅
Producer
类的代码也很简单,
初始化,重写了 class Observable<Element>
的订阅方法 public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
重写订阅方法 func subscribe(_ observer: Observer)
, 指的是走该订阅方法,就要走 run
方法,还做了个内存管理
不能直接调用, func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element
Producer
的子类,各种操作符对应的类,一定要重写自己的 func run(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable)
class Producer<Element> : Observable<Element> { override init() { super.init() } // 该方法有简化,省略了线程管理相关 override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element { let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { // 这个方法就是报错 // fatalError(lastMessage(), file: file, line: line) rxAbstractMethod() } } 复制代码
调用 public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable
,
先建立订阅 self.asObservable().subscribe(observer)
,
再处理事件,
AnonymousObserver
匿名观察者,我们调用 subscribe(onNext: (
传入的想做的事情,作为匿名函数,也就是闭包,都交给 AnonymousObserver 类了
let observer = AnonymousObserver<Element> { event in switch event { case .next(let value): onNext?(value) case .error(let error): if let onError = onError { onError(error) } disposable.dispose() case .completed: onCompleted?() disposable.dispose() } } 复制代码
AnonymousObserver
是 class ObserverBase<Element>
的子类
他的实现也很简单
他有一个闭包的属性,用来接我们传入的匿名函数。
重写了事件调用方法 func onCore(
, 将我们传入的闭包执行了
final class AnonymousObserver<Element>: ObserverBase<Element> { typealias EventHandler = (Event<Element>) -> Void private let _eventHandler : EventHandler init(_ eventHandler: @escaping EventHandler) { self._eventHandler = eventHandler } override func onCore(_ event: Event<Element>) { return self._eventHandler(event) } } 复制代码
可观察基类 ObserverBase
类,是一个事件处理类
他的实现也简洁
RxSwift 大量使用泛型,
他不持有事件,他只能处理对应的事件
class ObserverBase<Element> : Disposable, ObserverType { private let _isStopped = AtomicInt(0) // 事件处理, 有一个锁机制,保证事件处理的线程安全 func on(_ event: Event<Element>) { switch event { case .next: if load(self._isStopped) == 0 { self.onCore(event) } case .error, .completed: if fetchOr(self._isStopped, 1) == 0 { self.onCore(event) } } } // 实际事件处理,子类必须重写该方法 func onCore(_ event: Event<Element>) { rxAbstractMethod() } // 内存管理 func dispose() { fetchOr(self._isStopped, 1) } } 复制代码
一般这样弄了,实际发生的步骤简单理下
firstSubject.sample(secondSubject).subscribe(onNext: { (content) in print(content) }) 复制代码
写下 .subscribe(onNext:
,
就走 Sample
类的
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element { let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel) let subscription = sink.run() return (sink: sink, subscription: subscription) } 复制代码
let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel)
里面的 observer,
就是 AnonymousObserver<Element> { event in }
每一次 self._parent.forwardOn(.error(e))
,
都会调用其父类 class Sink<Observer: ObserverType>
的 self._observer.on(event)
self._observer
还是 AnonymousObserver<Element> { event in }
大佬高深的技术,努力拆分,都是好理解的常识
withLatestFrom
withLatestFrom
和 Sample
很像,
区别是,
withLatestFrom
, 前面是控制取事件,后面是输入事件Sample
后面控制采样,前面是输入事件
withLatestFrom
, 前面的控制事件,可以反复取输入事件的最新值Sample
, 输入事件的最新值,只支持一次采样
withLatestFrom
, 两个输入事件的值都可用Sample
,采样事件的值,不可用
这样调用
// firstSubject 是控制事件流 // secondSubject 是输入事件流 let firstSubject = PublishSubject<String>() let secondSubject = PublishSubject<String>() firstSubject.withLatestFrom(secondSubject){ (first, second) in return first + second } .subscribe(onNext: { print($0) }) .disposed(by: rx.disposeBag) // 控制事件流,没输入, miss 掉 firstSubject.onNext("1") // 输入事件流,有了事件 secondSubject.onNext("A") // 控制事件流,去触发输入事件流的事件 firstSubject.onNext("2") 复制代码
withLatestFrom
函数到类 WithLatestFrom
public func withLatestFrom<Source: ObservableConvertibleType, ResultType>(_ second: Source, resultSelector: @escaping (Element, Source.Element) throws -> ResultType) -> Observable<ResultType> { return WithLatestFrom(first: self.asObservable(), second: second.asObservable(), resultSelector: resultSelector) } 复制代码
WithLatestFrom
到实现层的 Sink下面的代码,看看就明白了
final private class WithLatestFrom<FirstType, SecondType, ResultType>: Producer<ResultType> { typealias ResultSelector = (FirstType, SecondType) throws -> ResultType fileprivate let _first: Observable<FirstType> fileprivate let _second: Observable<SecondType> fileprivate let _resultSelector: ResultSelector init(first: Observable<FirstType>, second: Observable<SecondType>, resultSelector: @escaping ResultSelector) { self._first = first self._second = second self._resultSelector = resultSelector } override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType { let sink = WithLatestFromSink(parent: self, observer: observer, cancel: cancel) let subscription = sink.run() return (sink: sink, subscription: subscription) } } 复制代码
func run() -> Disposable
和 func _synchronized_on(_ event: Event<Element>)
WithLatestFromSink
,func run() -> Disposable
先看self._parent._first.subscribe(self)
表明,WithLatestFromSink
管理 firstSubject.withLatestFrom(secondSubject)
中的控制事件流,
self._parent._second.subscribe(sndO)
表明 WithLatestFromSecond
管理输入事件流
func run() -> Disposable { let sndSubscription = SingleAssignmentDisposable() let sndO = WithLatestFromSecond(parent: self, disposable: sndSubscription) sndSubscription.setDisposable(self._parent._second.subscribe(sndO)) let fstSubscription = self._parent._first.subscribe(self) return Disposables.create(fstSubscription, sndSubscription) } 复制代码
func _synchronized_on(_ event: Event<Element>)
再看代码简洁,
self._latest
就是记录的输入事件流最新的值,
.next
事件里面,firstSubject.withLatestFrom(secondSubject)
中 firstSubject 的事件来了,如果之前有 secondSubject 事件的值,就处理掉。
没有就,不用管
func _synchronized_on(_ event: Event<Element>) { switch event { case let .next(value): guard let latest = self._latest else { return } do { let res = try self._parent._resultSelector(value, latest) self.forwardOn(.next(res)) } catch let e { self.forwardOn(.error(e)) self.dispose() } case .completed: self.forwardOn(.completed) self.dispose() case let .error(error): self.forwardOn(.error(error)) self.dispose() } } 复制代码
WithLatestFromSecond
,就是看事件处理,输入事件流 firstSubject.withLatestFrom(secondSubject)
中的 secondSubject,
其事件,出错就转发,.next
事件,就取值记录到主 Sink
类 WithLatestFromSink
其余与前面 .sample
的分析类似
func _synchronized_on(_ event: Event<Element>) { switch event { case let .next(value): self._parent._latest = value case .completed: self._disposable.dispose() case let .error(error): self._parent.forwardOn(.error(error)) self._parent.dispose() } } 复制代码
稍微改造下 withLatestFrom
的代码,就是 sampleWithFrom
, sample
加强版
WithLatestFromSink
的事件流方法加一句 self._latest = nil
, 就好了
记录的输入事件流的最新值,一次可用。之前是,反复可用
func _synchronized_on(_ event: Event<Element>) { switch event { case let .next(value): guard let latest = self._latest else { return } do { self._latest = nil let res = try self._parent._resultSelector(value, latest) self.forwardOn(.next(res)) } catch let e { self.forwardOn(.error(e)) self.dispose() } 复制代码
至此, sampleWithFrom
和 sample
的采样次序不一致,
更改参数位置,很简单
这些自定制的 Operator,是给定的 Observable 后面加几个函数,就好了
没什么状态管理,要用到类
源代码要拖到相关 Pods 里面,编译使用