您的位置:首页 > 移动开发 > Swift

函数响应编程&RxSwift核心逻辑 下

2020-02-18 07:16 525 查看

RxSwift
RxSwift 使用详解系列

函数响应编程&RxSwift核心逻辑 上
函数响应编程&RxSwift核心逻辑 下
待续…正在努力编写
RxSwift-中介者模式
RxSwift-KVO\内存管理

介绍

在Swift环境下,作为函数式响应编程的代表,RAC的孪生兄弟RxSwift同样提供了相同的框架使用,并且基于Swift语言的优点,RxSwift甚至能够更简洁地开发业务代码。ReactiveX 官方给Rx的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,他结合了观察者模式、迭代器模式和函数式编程的精华。ReactiveX 不仅仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。它拓展了观察者模式,使你能够自由组合多个异步事件,而不需要去关心线程,同步,线程安全,并发数据以及I/O阻塞。

RxSwift 是 ReactiveX 家族的重要一员, ReactiveX 是 Reactive Extensions 的缩写,一般简写为Rx。

函数响应编程

代码片
.

// swift 可选性 + 类 + 枚举 + 结构体 + 协议 + 泛型
array.filter{ $0 > 3}
.filter{ ($0+1) % 2 == 0 }
.forEach { print($0) }


一、RxSwift核心重点

代码片
.

  • Observable - 可观察序列
    = Sequence - 序列
    = Observable - 的生命周期 核心逻辑
    = create - 创建序列
    = subscribe - 订阅序列
    = onNext - 发送信号

(1)Observable - 可监听序列

可观察序列 - sequence

无限 - 有穷 next

(2)核心逻辑
代码块

// UI -> target - event
// 1:创建序列
//AnonymousObservable\AnonymousObserver保存了外界的 onNext, onError , onCompleted , onDisposed 的处理回调闭包的调用
// AnonymousObservable -> producer.subscriber -> run
// 保存闭包  - 函数式 保存 _subscribeHandler
let ob = Observable<Any>.create { (obserber) -> Disposable in
// 3:发送信号
obserber.onNext("001")
obserber.onCompleted()
return Disposables.create()
}
// 2:订阅信号
// AnonymousObserver  - event .next -> onNext()
// _eventHandler
// AnonymousObservable._subscribeHandler(observer)
// 销毁
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("完成")
}) {
print("销毁")
}
// A { C 从B哪里来的 }
// create -
// C obserber
// B subscriber
// c的创建 在B里面 -
// 传递C -> A : A(C)
// B ( A(C) ) { }

导图

源码分析

/////// 创建序列:(http://reactivex.io/documentation/operators/create.html)
/**
Creates an observable sequence from a specified subscribe method implementation.

- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)

- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> Disposable) -> RxSwift.Observable<Self.E>

----------------------------------

//源码实现:create.swift类--->>AnonymousObservable-->> 保存闭包(函数式编程)
//
// 保存闭包(函数式编程) _subscribeHandler
//
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

let _subscribeHandler: SubscribeHandler

//保存闭包(函数式编程)
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
/////////// 2:订阅信号 --> 核心代码
/**
Subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence.

- parameter onNext: Action to invoke for each element in the observable sequence.
- parameter onError: Action to invoke upon errored termination of the observable sequence.
- parameter onCompleted: Action to invoke upon graceful termination of the observable sequence.
- parameter onDisposed: Action to invoke upon any type of termination of sequence (if the sequence has
gracefully completed, errored, or if the generation is canceled by disposing subscription).
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable

if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}

#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif

let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []

//**
//** 保存_eventHandler 事件
//** let observer = AnonymousObserver
//** 闭包 -- 逃逸闭包(){}
//**
let observer = AnonymousObserver<E> { event in

#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif

switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}

核心总结

  • 点赞
  • 收藏
  • 分享
  • 文章举报
J-M 发布了3 篇原创文章 · 获赞 0 · 访问量 67 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: