您的位置:首页 > Web前端 > React

ReactiveCocoa2 源码浅析

2015-11-04 13:45 661 查看

http://m.blog.csdn.net/blog/womendeaiwoming/48036725


ReactiveCocoa2 源码浅析

标签(空格分隔): ReactiveCocoa iOS Objective-C

  (鄙视下CSDN的代码样式,这里的看起来好点,可惜对目录支持得不好)

  • 开车不需要知道离合器是怎么工作的,但如果知道离合器原理,那么车子可以开得更平稳。

  ReactiveCocoa 是一个重型的 FRP 框架,内容十分丰富,它使用了大量内建的 block,这使得其有强大的功能的同时,内部源码也比较复杂。本文研究的版本是2.4.4,小版本间的差别不是太大,无需担心此问题。 这里只探究其核心 
RACSignal
 源码及其相关部分。本文不会详细解释里面的代码,重点在于讨论那些核心代码是 
怎么来
 的。文本难免有不正确的地方,请不吝指教,非常感谢。


@protocol RACSubscriber

  信号是一个异步数据流,即一个将要发生的以时间为序的事件序列,它能发射出三种不同的东西:
value
error
completed
。咱们能异步地捕获这些事件:监听信号,针对其发出的三种东西进行操作。“监听”信息的行为叫做 订阅(subscriber)。我们定义的操作就是观察者,这个被“监听”的信号就是被观察的主体(subject) 。其实,这正是“观察者”设计模式! 

   

  RAC 针对这个订阅行为定义了一个协议:RACSubscriber。RACSubscriber 协议是与 RACSignal 打交道的唯一方式。咱们先不探究 RACSignal 的内容,而是先研究下 RACSubscriber 是怎么回事。 

   

  先来看下 RACSubscriber 的定义:
// 用于从 RACSignal 中直接接收 values 的对象
@protocol RACSubscriber <NSObject>
@required

/// 发送下一个 value 给 subscribers。value 可以为 nil。
- (void)sendNext:(id)value;

/// 发送 error 给 subscribers。 error 可以为 nil。
///
/// 这会终结整个订阅行为,而且接下来也无法再订阅任何信号了。
- (void)sendError:(NSError *)error;

/// 发送 completed 给 subscribers。
///
/// 这会终结整个订阅行为,而且接下来也无法再订阅任何信号了。
- (void)sendCompleted;

/// 现在重要的是上面三个,先别管这个,忽略掉。
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;

@end


1、NLSubscriber

  咱们自己来实现这个协议看看(本文自定义的类都以 “NL” 开头,以视区别):
// NLSubscriber.h
@interface NLSubscriber : NSObject <RACSubscriber>
@end

// NLSubscriber.m
@implementation NLSubscriber

- (void)sendNext:(id)value {
NSLog(@"%s value:%@", sel_getName(_cmd), value);
}

- (void)sendCompleted {
NSLog(@"%s", sel_getName(_cmd));
}

- (void)sendError:(NSError *)error {
NSLog(@"%s error:%@", sel_getName(_cmd), error);
}

- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
// to nothing
}

@end


  现在咱们这个类只关心 
sendNext:
 、 
sendError:
 和 
sendCompleted
。本类的实现只是简单的打印一些数据。那怎么来使用这个订阅者呢?
RACSignal
 类提供了接口来让实现了
RACSubscriber
 协议的订阅者订阅信号:
@interface RACSignal (Subscription)
/*
*  `subscriber` 订阅 receiver 的变化。由 receiver 决定怎么给 subscriber 发送事件。
*简单来说,就是由这个被订阅的信号来给订阅者 subscriber 发送 `sendNext:` 等消息。
*/
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber;
@end


  用定时器信号来试试看:
/**
*  @brief  创建一个定时器信号,每三秒发出一个当时日期值。一共发5次。
*/
RACSignal *signalInterval = [RACSignal interval:3.0 onScheduler:[RACScheduler mainThreadScheduler]];
signalInterval = [signalInterval take:5];
NLSubscriber *subscriber = [[NLSubscriber alloc] init];

/**
*  @brief  用订阅者 subscriber 订阅定时器信号
*/
[signalInterval subscribe:subscriber];


  下面是输出结果:
2015-08-15 17:45:02.612 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:02 +0000
2015-08-15 17:45:05.612 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:05 +0000
2015-08-15 17:45:08.615 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:08 +0000
2015-08-15 17:45:11.613 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:11 +0000
2015-08-15 17:45:14.615 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:14 +0000
2015-08-15 17:45:14.615 RACPraiseDemo[738:59818] sendCompleted


2、改进NLSubscriber

  现在的这个订阅者类 NLSubscriber 除了打印打东西外,啥也干不了,更别说复用了,如果针对所有的信号都写一个订阅者那也太痛苦了,甚至是不太可能的事。 

   

  咱们来改进一下,做到如下几点: 

  1. 实现 
RACSubscriber
 协议 

  2. 提供与 
RACSubscriber
 相 
对应的
 、
可选的
 、
可配
的接口。 

   

  没错,这正是一个适配器!

  第2点的要求可不少,那怎么才能做到这一点呢?还好,OC 中有 block !咱们可以将 
RACSubscriber
 协议中的三个方法转为三个 block:
- (void)sendNext:(id)value;           ---->    void (^next)(id value);
- (void)sendError:(NSError *)error;   ---->    void (^error)(NSError *error);
- (void)sendCompleted;                ---->    void (^completed)(void);


  改进目标和改进方向都有了,那咱们来看看改进后的的样子:
// 头文件
/**
*  @brief  基于 block 的订阅者
*/
@interface NLSubscriber : NSObject <RACSubscriber>

/**
*  @brief  创建实例
*/
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed;

@end

// 实现文件
@interface NLSubscriber ()

@property (nonatomic, copy) void (^next)(id value);
@property (nonatomic, copy) void (^error)(NSError *error);
@property (nonatomic, copy) void (^completed)(void);

@end

@implementation NLSubscriber

#pragma mark Lifecycle
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
NLSubscriber *subscriber = [[self alloc] init];

subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];

return subscriber;
}

#pragma mark RACSubscriber
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;

nextBlock(value);
}
}

- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];

if (errorBlock == nil) return;
errorBlock(e);
}
}

- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];

if (completedBlock == nil) return;
completedBlock();
}
}

- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
// to nothing
}

@end


  现在来试试看这个改进版,还是上面那个定时器的例子:
/**
*  @brief  创建一个定时器信号,每三秒发出一个当时日期值。一共发5次。
*/
RACSignal *signalInterval = [RACSignal interval:3.0 onScheduler:[RACScheduler mainThreadScheduler]];
signalInterval = [signalInterval take:5];
NLSubscriber *subscriber = [NLSubscriber subscriberWithNext:^(id x) {
NSLog(@"next:%@", x);
} error:nil completed:^{
NSLog(@"completed");
}];

/**
*  @brief  用订阅者 subscriber 订阅定时器信号
*/
[signalInterval subscribe:subscriber];


  输出结果如下:
2015-08-15 19:50:43.355 RACPraiseDemo[870:116551] next:2015-08-15 11:50:43 +0000
2015-08-15 19:50:46.358 RACPraiseDemo[870:116551] next:2015-08-15 11:50:46 +0000
2015-08-15 19:50:49.355 RACPraiseDemo[870:116551] next:2015-08-15 11:50:49 +0000
2015-08-15 19:50:52.356 RACPraiseDemo[870:116551] next:2015-08-15 11:50:52 +0000
2015-08-15 19:50:55.356 RACPraiseDemo[870:116551] next:2015-08-15 11:50:55 +0000
2015-08-15 19:50:55.356 RACPraiseDemo[870:116551] completed


  输出结果没什么变化,但是订阅者的行为终于受到咱们的撑控了。再也不用为了一个信号而去实现 
RACSubscriber
 协议了,只需要拿出 
NLSubscriber
 这个适配器,再加上咱们想要的自定义的行为即可。如果对信号发出的某个事件不感兴趣,直接传个 nil 可以了,例如上面例子的 
error:
 ,要知道, 
RACSubscriber
 协议中的所有方法都是 
@required
 的。
NLSubscriber
 大大方便了我们的工作。 

   

  那还以再改进吗? 

  


3、RACSignal 类别之 Subscription

  有没有可能把 
NLSubscriber
 隐藏起来呢?毕竟作为一个信号的消费者,需要了解的越少就越简单,用起来也就越方便。咱们可以通过 OC 中的类别方式,给 
RACSignal
 加个类别(
nl_Subscription
),将订阅操作封装到这个信号类中。这样,对于使用这个类的客户而言,甚至不知道订阅者的存在。 

   

  
nl_Subscription
 类别代码如下:
// .h
#import "RACSignal.h"

@interface RACSignal (nl_Subscription)

- (void)nl_subscribeNext:(void (^)(id x))nextBlock;
- (void)nl_subscribeNext:(void (^)(id x))nextBlock completed:(void (^)(void))completedBlock;
- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock;
- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock;
- (void)nl_subscribeCompleted:(void (^)(void))completedBlock;
- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock;
- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock;

@end

// .m
#import "RACSignal+nl_Subscription.h"
#import "NLSubscriber.h"

@implementation RACSignal (nl_Subscription)

- (void)nl_subscribeNext:(void (^)(id x))nextBlock {
[self nl_subscribeNext:nextBlock error:nil completed:nil];
}

- (void)nl_subscribeNext:(void (^)(id x))nextBlock completed:(void (^)(void))completedBlock {
[self nl_subscribeNext:nextBlock error:nil completed:completedBlock];
}

- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock {
[self nl_subscribeNext:nil error:errorBlock completed:nil];
}

- (void)nl_subscribeCompleted:(void (^)(void))completedBlock {
[self nl_subscribeNext:nil error:nil completed:completedBlock];
}

- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock {
[self nl_subscribeNext:nextBlock error:errorBlock completed:nil];
}

- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
[self nl_subscribeNext:nil error:errorBlock completed:completedBlock];
}

- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
NLSubscriber *subscriber = [NLSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
[self subscribe:subscriber];
}

@end


  在这个类别中,将信号的 
next:
error:
 和 
completed
 以及这三个事件的组合都以 
block
 的形式封装起来,从以上代码中可以看出,这些方法最终调用的还是 
- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void
(^)(void))completedBlock;
 方法,而它则封装了订阅者 
NLSubsciber
。 

   

  通过这么个小小的封装,客户使用起来就极其方便了:
/**
*  @brief  创建一个自定义的信号。
*        这个信号在被订阅时,会发送一个当前的日期值;
*        再过三秒后,再次发送此时的日期值;
*        最后,再发送完成事件。
*/
RACSignal *signalInterval = [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:[NSDate date]];

dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(3.0 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:[NSDate date]];
[subscriber sendCompleted];
});

return (id)nil;
}];

[signalInterval nl_subscribeNext:^(id x) {
NSLog(@"next:%@", x);
} error:^(NSError *error) {
NSLog(@"error:%@", error);
} completed:^{
NSLog(@"completed");
}];


  输出如下:
2015-08-16 23:29:44.406 RACPraiseDemo[653:32675] next:2015-08-16 15:29:44 +0000
2015-08-16 23:29:47.701 RACPraiseDemo[653:32675] next:2015-08-16 15:29:47 +0000
2015-08-16 23:29:47.701 RACPraiseDemo[653:32675] completed


  本例并没有采用之前的 “定时器信号”,而是自己创建的信号,当有订阅者到来时,由这个信号来决定在什么时候发送什么事件。这个例子里发送的事件的逻辑请看代码里的注释。 

   

  看到这里,是不是很熟悉了?有没有想起 
subscribeNext:
,好吧,我就是在使用好多好多次它之后才慢慢入门的,谁让 RAC 的大部分教程里面第一个讲的就是它呢! 

   

  到了这里,是不是订阅者这部分就完了呢?我相信你也注意到了,这里有几个不对劲的地方: 

   

  1. 无法随时中断订阅操作。想想订阅了一个无限次的定时器信号,无法中断订阅操作的话,定时器就是永不停止的发下去。 

   

  2. 订阅完成或错误时,没有统一的地方做清理、扫尾等工作。比如现在有一个上传文件的信号,当上传完成或上传错误时,你得断开与文件服务器的网络连接,还得清空内存里的文件数据。


4、Disposable


RACDisposable

  针对上述两个问题,RACDisposable 应运而生。也就是说 Disposable 有两个作用: 

   

  1. 中断订阅某信号 

  2. 订阅完成后,执行一些收尾任务(清理、回收等等)。

  订阅者与 Disposable 的关系: 

   

  1. 当 Disposable 有“清理”过,那么订阅者就不会再接收到这个被“清理”订阅源的任何事件。举例而言,就是订阅者 subscriberX 订阅了信号 signalA 和 signalB 两个信号,其所对应的 Disposable 分别为 disposableA 和 disposableB,也就是说 subscriberX 会同时接收来自 signalA 和 signalB 的信号。当我们手动强制 “清理” disposableA 后,subscriberX 就不会再接收来自 signalA 的任何事件;而来自
signalB 的事件则不受影响。

  2. 当订阅者 subscriberX 有接收来自任何一个信号的 “error” 或 “completed” 事件时,则不会再接收任何事件了。 

   

  可以这么说:
Disposable
 代表发生了订阅行为

  根据 Disposable 的作用和与订阅者的关系,来总结它所需要提供的接口: 

   

  1. 包含清理任务的 block ; 

  2. 执行清理任务的方法:- (void)dispose ; 

  3. 一个用来表明是否已经 “清理” 过的布尔变量:BOOL disposed 。

  咱们为这个 Disposable 也整了一个类,如下:
// .h file
/**
*  @brief  一个 disposable 封装了用于拆除、清理订阅的任务工作
*/
@interface NLDisposable : NSObject

/**
*  @brief  这个 disposable 是否已经拆除过
*/
@property (atomic, assign, getter = isDisposed, readonly) BOOL disposed;

+ (instancetype)disposableWithBlock:(void (^)(void))block;

/**
*  @brief  执行拆除工作。能多次调用这个消息,只有第一次调用有效。
*/
- (void)dispose;
@end

// .m file

@interface NLDisposable () {
/**
*  @brief  类型类似于:@property (copy) void (^disposeBlock)(void);
*       在 disposal 要执行的任务逻辑。
*       1、如果没有初妈值的话,则初始值默认为 `self`
*       2、当已经 disposal 过后,值为 NULL。
*/
void * volatile _disposeBlock;
}
@end

@implementation NLDisposable

#pragma mark Properties

- (BOOL)isDisposed {
return _disposeBlock == NULL;
}

#pragma mark Lifecycle

- (id)init {
self = [super init];
if (self == nil) return nil;

_disposeBlock = (__bridge void *)self;
OSMemoryBarrier();

return self;
}

- (id)initWithBlock:(void (^)(void))block {
NSCParameterAssert(block != nil);

self = [super init];
if (self == nil) return nil;

_disposeBlock = (void *)CFBridgingRetain([block copy]);
OSMemoryBarrier();

return self;
}

+ (instancetype)disposableWithBlock:(void (^)(void))block {
return [[self alloc] initWithBlock:block];
}

- (void)dealloc {
if (_disposeBlock == NULL || _disposeBlock == (__bridge void *)self) return;

CFRelease(_disposeBlock);
_disposeBlock = NULL;
}

#pragma mark Disposal

- (void)dispose {
/**
*  @brief  这里为了让逻辑更清晰,去掉了原子性操作。具体代码请看 RACDisposable
*/
if (_disposeBlock == NULL || _disposeBlock == (__bridge void *)self) return;

void (^disposeBlock)(void) = CFBridgingRelease((void *)_disposeBlock);
_disposeBlock = NULL;
disposeBlock();
}

@end


   

  从这个类提供的接口来看,显然是做不到 “订阅者与 Disposable 的关系” 中的第2条的。因为这条中所描述的是一个订阅者订阅多个信号,且能手动中断订阅其中一个信号的功能,而 NLDisposable 是单个订阅关系所设计的。


RACCompoundDisposable

  那怎么组织这“多个”的关系呢?数组?Good,就是数组。OK,咱们来相像一下这个方案的初步代码。每个订阅者有一个 Disposable 数组,订阅一个一个信号,则加入一个 Disposable;当手动拆除一个订阅关系时,找到与之相关的 Disposable,发送 dispose 消息,将其从数组中移除;当订阅者不能再接收消息时(接收过 
error
 或 
completed
 消息),要 dispose 数组中所有元素,接下来再加入元素时,直接给这个要加入的元素发送
dispose 消息;在多线程环境下,每一次加入或移除或其遍历时,都得加锁。。。(好吧,我编不下去了) 

   

  我** ,这么复杂,看来直接用数组来维护是不可行的了。有啥其它可行的法子没?还好,GoF 对此有个方案,叫做“组合模式”:

组合模式 允许你将对象组合成树形结构来表现 “整体/部分” 层次结构。组合能让客户以一致的方式处理个别对象以及对象组合。

  使用组合结构,我们能把相同的操作应用在组合和个别对象上。换句话说,在大多数情况下,我们可以 忽略 对象组合和个别对象之间的差别。 

   

  本文毕竟不是来讲模式的,关于这个模式更多的信息,请自行 google。 

   

  RAC 中这个组合类叫 
RACCompoundDisposable
, 咱们的叫 
NLCompoundDisposable
,来看看咱们这个类的代码:
// .h file
#import "NLDisposable.h"

/**
*  @brief  A disposable of disposables。当它 dispose 时,它会 dispose
*       它所包含的所有的 disposables。
*
*       如果本 compound disposable 已经 dispose 过后,再来调用 -addDisposable:,
*     那么其参数 disposable 会立马调用 dispose 方法。
*
*       本类中的方法说明请查看 RACCompoundDisposable 中的同名方法。
*       本类与真正的类 RACCompoundDisposable 代码差别较大,但本质是一样的
*/
@interface NLCompoundDisposable : NLDisposable

/**
*  @brief  创建并返回一个新的 compound disposable。
*/
+ (instancetype)compoundDisposable;

/**
*  @brief  创建并返回一个新的包含了 disposables 的 compound disposable。
*
*  @param disposables disposable 数组
*
*  @return 一个新的 compound disposable
*/
+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables;

/**
*  @brief  将 disposable 加到本 compound disposable 中。如果本 compound disposable
*       已经 dispose 的话,那么参数 disposable 会被立即 dispose。
*
*          本方法是线程安全的。
*
*  @param disposable 要被加入的 disposable。如果它为 nil 的话,那么什么也不会发生。
*/
- (void)addDisposable:(NLDisposable *)disposable;

/**
*  @brief  从本 compound disposable 移除指定的 disposable(不管这个 disposable 是什么
*       状态);如果这个 disposable 不在本 compound disposable 中,则什么也不会发生。
*
*          本方法是线程安全的。
*
*  @param disposable 要被移除的 disposable。可以为 nil。
*/
- (void)removeDisposable:(NLDisposable *)disposable;

@end

// .m file
#import "NLCompoundDisposable.h"
#import <libkern/OSAtomic.h>

@interface NLCompoundDisposable () {
/**
*  @brief  同步锁
*/
OSSpinLock _spinLock;

/**
*  @brief  本 compound disposable 所包含的 disposables。
*
*          在操作这个数组时,应该使用 _spinLock 进行同步。如果
*       `_disposed` 为 YES,则这个数组可能为 nil。
*/
NSMutableArray *_disposables;

/**
*  @brief  本 compound disposable 是否已经 dispose 。
*
*          在操作这个变量时,应该使用 _spinLock 进行同步。
*/
BOOL _disposed;
}

@end

@implementation NLCompoundDisposable

#pragma mark Properties
- (BOOL)isDisposed {
OSSpinLockLock(&_spinLock);
BOOL disposed = _disposed;
OSSpinLockUnlock(&_spinLock);

return disposed;
}

#pragma mark Lifecycle
+ (instancetype)compoundDisposable {
return [[self alloc] initWithDisposables:nil];
}

+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables {
return [[self alloc] initWithDisposables:disposables];
}

- (id)initWithDisposables:(NSArray *)otherDisposables {
self = [self init];
if (self == nil) return nil;

if ([otherDisposables count]) {
_disposables = [NSMutableArray arrayWithArray:otherDisposables];
}

return self;
}

- (id)initWithBlock:(void (^)(void))block {
NLDisposable *disposable = [NLDisposable disposableWithBlock:block];
return [self initWithDisposables:@[ disposable ]];
}

- (void)dealloc {
_disposables = nil;
}

#pragma mark Addition and Removal
- (void)addDisposable:(NLDisposable *)disposable {
NSCParameterAssert(disposable != self);
if (disposable == nil || disposable.disposed) return;

BOOL shouldDispose = NO;

OSSpinLockLock(&_spinLock);
{
if (_disposed) {
shouldDispose = YES;
} else {
if (_disposables == nil) {
_disposables = [NSMutableArray array];
}
[_disposables addObject:disposable];
}
}
OSSpinLockUnlock(&_spinLock);

if (shouldDispose) {
[disposable dispose];
}
}

- (void)removeDisposable:(NLDisposable *)disposable {
if (disposable == nil) return;

OSSpinLockLock(&_spinLock);
{
if (!_disposed) {
if (_disposables != nil) {
[_disposables removeObject:disposable];
}
}
}
OSSpinLockUnlock(&_spinLock);
}

#pragma mark RACDisposable
- (void)dispose {
NSArray *remainingDisposables = nil;

OSSpinLockLock(&_spinLock);
{
_disposed = YES;
remainingDisposables = _disposables;
_disposables = nil;
}
OSSpinLockUnlock(&_spinLock);

if (remainingDisposables == nil) return;

[remainingDisposables makeObjectsPerformSelector:@selector(dispose)];
}

@end


RACScheduler 简介

  本文不打算研究 RACScheduler 源码,但其又是 RAC 中不可或缺的一个组件,在研究 RACSignal 的源码时不可避免地会遇到它,所以对其作下介绍还是有必要的。其实它的源码并不复杂,可自行研究。

  ReactiveCocoa 中 RACSignal 发送的所有事件的传递交给了一个特殊的框架组件——调度器,即 RACScheduler 类簇(类簇模式稍后介绍)。调度器是为了简化 同步/异步/延迟 事件传递 以及 取消预定的任务(scheduded actions) 这两种 RAC 中常见的动作而提出来的。“事件传递” 简单而言就是些 blocks,RACScheduler 所做的就是:调度这些 blocks (schedule blokcs,还是英文的意思准确些)。我们可以通过那些调度方法所返回的 
RACDisposable
 对象来取消那些
scheduling blocks。 

   

  正如前面所说,RACScheduler 是一个类簇。咱们来看看几种具体的调度器:


RACImmediateScheduler

  这是 RAC 内部使用的私有调度器,只支持同步 scheduling。就是简单的马上执行 block。这个调试器的延迟 scheduling 是通过调用 
-[NSThread sleepUntilDate:]
 来阻塞当前线程来达到目的的。显然,这样一个调度器,没法取消 scheduling,所以它那些方法返回的 disposables 啥也不会做(实际上,它那些 scheduling 方法返回的是nil)。


RACQueueScheduler

  这个调度器使用 GCD 队列来 scheduling blocks。如果你对 GCD 有所了解的话,你会发现这个调度器的功能很简单,它只是在 GCD 队列 dispatching blocks 上的简单封装罢了。


RACSubscriptionScheduler

  这是另一个内部使用的私有调度器。如果当前线程有调度器(调度器可以与线程相关联起来:associated)那它就将 scheduling 转发给这个线程的调度器;否则就转发给默认的 background queue 调试器。


接口

  调试器有下面一些方法:
- (RACDisposable *)schedule:(void (^)(void))block;

- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block;

- (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block;

- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block;


  scheduling block 如下:
RACDisposable *disposable = [[RACScheduler mainThreadScheduler] afterDelay:5.0 schedule:^{
// do something
}];

// 如果你想要取消 scheduling block
[disposable dispose]; // block scheduling 被取消了,不会再被执行。


5、Subscriber 和 Disposable

  前面介绍了 Disposable 的来源,现在来研究下怎么使用它。还记得吗,订阅者与信号打交道的唯一方式是 
RACSignal
 中的一个方法:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber;


自定义信号所对应的类是 
RACDynamicSignal
RACSignal
 采用的是类簇模式。除自定义信号之外还有几种其它的信号,之后会研究到。OC 中的 
NSNumber
 用的就是类簇模式。类簇是Foundation框架中广泛使用的设计模式。类簇将一些私有的、具体的子类组合在一个公共的、抽象的超类下面,以这种方法来组织类可以简化一个面向对象框架的公开架构,而又不减少功能的丰富性。

  咱们来研究一下自定义信号里的这个方法的实现。这个方法实现的难处在于:“一个订阅者可以订阅多个信号,并可以手动拆除其中任何一个订阅”。针对这个问题,提出了上节讲到的 
RACDisposable
。也就是说,在每一次订阅时,都会返回一个与这次订阅相关的 
Disposable
,那怎么做到这一点呢?

  给订阅者添加一个 
CompoundDisposable
 类型的属性 (毕竟 
CompoundDisposable
 就是用来针对多个 
Disposable
 的统一管理而存在的),然后在每一次订阅时,都加一个 
Disposable
到这个属性里,行不行?但很可惜,订阅者是一个协议 
protocol RACSubscriber
,而不是一个具体的类,咱们在使用到它时,都是别人实现了这个协议的类的对象,所以咱们不太可能做到说给这么一个未知的类添加一个属性。

事实上,RAC 中确实有 
RACSubscriber
 这么一个私有类(它是咱们第一个自定义类 
NLSubscriber
 的原型),咱们叫它做 
class RACSubscriber
。嗯,
class RACSusbscriber
实现了 
protocol RACSubscriber
 协议:
@interface RACSubscriber : NSObject <RACSubscriber>
。有没有想到 
class
NSObject
 和 
protocol NSObject
 ?虽然它们形式上确实很像,但千万别混为一谈。RAC 中的其它实现了 
protocol RACSubscriber
 协议的订阅者类可没有一个继承自 
class RACSubscriber
 的。

  咱们可以用装饰模式来解决这个问题

装饰模式。在不必改变原类文件和使用继承的情况下,动态地扩展一个对象的功能。


订阅者装饰器 RACPassthroughSubscriber

  在订阅者每一次订阅信号时产生一个 
Disposable
,并将其与此次订阅关联起来,这是通过装饰器 
RACPassthroughSubscriber
 来做到的。这个装饰器的功能: 

   

  1. 包装真正的订阅者,使自己成为订阅者的替代者。 

   

  2. 将真正的订阅者与一个订阅时产生的 
Disposable
 关联起来。 

   

  这正是一个装饰器所应该做的。依之前的,咱们来模仿这个装饰器,新建一个咱们的装饰器:
NLPassthroughSubscriber
,来看下它的代码:
// .f file
@class RACCompoundDisposable;
@class RACSignal;

/**
*  @brief  这是一个订阅者的装饰器。在没有 dispose 时,它会把接收到的所有
*       的事件都转发给真实的订阅者。
*/
@interface NLPassthroughSubscriber : NSObject <RACSubscriber>

/**
*  @brief  初始化方法
*
*  @param subscriber 被包装的真实的订阅者,本装饰器会把接收到的所有的事件都转发给这个订阅者。不能为 nil
*  @param signal     要发送事件给这个装饰器的信号。
*  @param disposable 当这个 disposable 接收到 dispose 消息后,将不会再转发事件。不能为 nil
*
*  @return 返回一个初始化后的 passthrough subscriber
*/
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable;

@end

// .m file
@interface NLPassthroughSubscriber ()

// 被转发事件的订阅者。
@property (nonatomic, strong, readonly) id<RACSubscriber> innerSubscriber;

// 要发送事件给本装饰器的信号
@property (nonatomic, unsafe_unretained, readonly) RACSignal *signal;

// 代表当前订阅关系的 disposable。当它 dispose 后,将不会再转发任何事件给 `innerSubscriber`。
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

@end

@implementation NLPassthroughSubscriber

#pragma mark Lifecycle

- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
NSCParameterAssert(subscriber != nil);

self = [super init];
if (self == nil) return nil;

_innerSubscriber = subscriber;
_signal = signal;
_disposable = disposable;

/**
*  告诉订阅者:发生了订阅行为。并将这次订阅行为相关的 `Disposable` 传给订阅者。
*/
[self.innerSubscriber didSubscribeWithDisposable:self.disposable];
return self;
}

#pragma mark RACSubscriber

- (void)sendNext:(id)value {
/**
*  如果 disposable 已经 dispose 过,就不再转发事件
*/
if (self.disposable.disposed) return;

/**
*  转发 next 事件
*/
[self.innerSubscriber sendNext:value];
}

- (void)sendError:(NSError *)error {
if (self.disposable.disposed) return;

[self.innerSubscriber sendError:error];
}

- (void)sendCompleted {
if (self.disposable.disposed) return;

[self.innerSubscriber sendCompleted];
}

- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
if (disposable != self.disposable) {
[self.disposable addDisposable:disposable];
}
}

@end


自定义信号 RACDynamicSignal 的订阅方法 subscribe

  咱们来看看 
RACDynamicSignal
 是怎么来使用 
RACPassthroughSubscriber
 的,这里就不自己写代码了,直接上它的代码:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);

/**
*  本次订阅相关 disposable。本方法的返回值,起 拆除 本次订阅的作用。
*/
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

/**
* 订阅者装饰器。
*/
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

/**
*  _didSubscriber 是在 `+ createSignal` 方法中进入的 block 参数。
*  + (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe;
*  这个 block 以 subscriber 为参数,返回一个 disposable,即 innerDisposable,而这个 innerDisposable
*  的作用是在 subscriber 不再订阅本 signal 时,起回收资源的作用。
*/
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];

[disposable addDisposable:schedulingDisposable];
}

return disposable;
}


  可以看到,订阅者装饰器直接伪装成真正的订阅器,传给 didSubscribe 这个 block 使用。在这个 block 中,会有一些事件发送给订阅者装饰器,而这个订阅者装饰器则根据 disposable 的状态来来决定是否转发给真正的订阅者。disposable 作为返回值,返回给外部,也就是说能够从外部来取消这个订阅了。
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];

[disposable addDisposable:schedulingDisposable];


  从这几行代码中,我们可以看到,
didSubscribe
 这个 block 是处于 
subscriptionScheduler
 这个 scheduler 的调度中。
RACSubscriptionScheduler
 的调度是取决于当前所在的线程的,即 
didSubscribe
 可能会在不同的调度器中被执行。

  假设当前 
-(RACDisposable *)subscribe:(id<RACSubscriber>)subscriber
 这个方法是在异步环境下调用的,那么在 
disposable
 返回后,在schedule block 还没有来得及调用,此时 
disposable
 中包含 
schedulingDisposable
。如果我们此时给 
disposable
 发送 
dispose
 消息,那么 
schedulingDisposable
 也会被 
dispose
,schedule
block 就不会执行了;如果是在 schedule block 执行中或执行后给 
disposable
 发送 
dispose
 消息,那么 
innerDisposable
 和 
schedulingDisposable
 都会被 
dispose
。这些行为正是咱们所预期的。


6、再次改进NLSubscriber


1、didSubscribeWithDisposable

@protocol RACSubscriber <NSObject>
@required
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;
@end


  这个 
RACSubscriber
 协议中声明的一个方法,在最开始的时候被我们特意给忽略,现在是时候回过头来看看它了。对于一个订阅者来说,
next
error
 和 
completed
 三种事件分别对应协议里的三种方法,那么这个方法存在的意义是什么呢?

  从 
RACSubscriber
 协议中,可以看到,当一个订阅者有收到过 
error
 或 
completed
 事件后,这个订阅者就不能再接收任何事件了,换句话说,此时这个订阅者会解除所有的订阅关系,且无法再次订阅。既然要解除所有订阅,首先我得知道我订阅过哪些信号是不?而代表一个订阅行为的就是 
disposable
 ,告诉它就传一个给它好了。所以这个方法就是告诉订阅者:你发生了订阅行为。

  那为啥要 
RACCompoundDisposable
 类型作为参数呢?因为有些订阅者会针对其附加一些操作,而只有这个类型的 
disposable
 才能动态加入一些操作。接下来我们就会看到的。


2、NLSubscriber 结合 RACDisposable

  这一次改进 
NLSubscriber
 的目的是让其可以终结自己的订阅能力的功能。同时实现 
didSubscribeWithDisposable
 方法。千言万语不如实际代码,让我们来一探究竟:
#import "NLSubscriber.h"
#import <ReactiveCocoa.h>

@interface NLSubscriber ()

@property (nonatomic, copy) void (^next)(id value);
@property (nonatomic, copy) void (^error)(NSError *error);
@property (nonatomic, copy) void (^completed)(void);

/**
*  @brief  代表订阅者本身总体订阅行为的 disposable。
*       当有接收到 error 或 completed 事件时,应该 dispose 这个 disposable。
*/
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

@end

@implementation NLSubscriber

#pragma mark Lifecycle
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
NLSubscriber *subscriber = [[self alloc] init];

subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];

return subscriber;
}

- (instancetype)init {
self = [super init];
if (self == nil) return nil;

@unsafeify(self);

/**
*  当 _disposable 被发送 dispose 消息时,将 next、error 和 completed 这三个
*  block 设置为 nil,从而间实现订阅者无法再接收任何事件的功能。
*/
RACDisposable *selfDisposable = [RACDisposable disposableWithBlock:^{
@strongify(self);

@synchronized (self) {
self.next = nil;
self.error = nil;
self.completed = nil;
}
}];

_disposable = [RACCompoundDisposable compoundDisposable];
[_disposable addDisposable:selfDisposable];

return self;
}

- (void)dealloc {
[self.disposable dispose];
}

#pragma mark RACSubscriber
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;

nextBlock(value);
}
}

- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
[self.disposable dispose];

if (errorBlock == nil) return;
errorBlock(e);
}
}

- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];

if (completedBlock == nil) return;
completedBlock();
}
}

- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)otherDisposable {
if (otherDisposable.disposed) return;

/**
*  将 otherDisposable 添加到 selfDisposable 中。这样当 selfDisposable 被 dispose 时,
*  otherDisposable 也能被 dispose。
*
*  这样,当订阅者接收到 error 或 completed 事件时,就能解除这个订阅者自身的所有订阅行为了。
*/
RACCompoundDisposable *selfDisposable = self.disposable;
[selfDisposable addDisposable:otherDisposable];

@unsafeify(otherDisposable);
/**
*  如果这个订阅行为被解除,就将 otherDisposable 从 selfDisposable 中移除。
*  (我们给 otherDisposable 增加了行为,这也就是参数需要是 RACCompoundDisposable
*  类型的原因了。当然,其它的订阅者怎么用这个参数就跟其实际的业务相关了。)
*/
[otherDisposable addDisposable:[RACDisposable disposableWithBlock:^{
@strongify(otherDisposable);
[selfDisposable removeDisposable:otherDisposable];
}]];
}

@end


3、改进类别 nl_Subscription

  还记得么?
nl_Subscription
 类别中的订阅方法一旦订阅,就无法停止了,这显然有很大的问题。解决这个问题很简单,直接将 
disposable
 返回即可: 

  
// .h file
@interface RACSignal (nl_Subscription)
...
- (RACDisposable *)nl_subscribeError:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock;

@end

// .m file
@implementation RACSignal (nl_Subscription)
...
- (RACDisposable *)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
NLSubscriber *subscriber = [NLSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
return [self subscribe:subscriber];
}


RACSignal - Operations

  本节主要研究这些操作(Operations) —— 
flattenMap:
map:
、 
filter:
 ….

  终于看到你想看的东西了?好吧,我承认,上节的东西很无趣,可能压根不是你想看的东西。但如果没弄清上面的内容的话,直接研究 Operations 可是会比较吃力的哟~

  你以为咱们现在开始研究 Operations?哈哈,你又得失望了~ 咱得先看看这两个类:
RACEmptySignal
 和 
RACReturnSignal


1、两个 RACSignal 的特殊子类 RACEmptySignal 和 RACReturnSignal


1、RACEmptySignal

  
RACEmptySignal
 是 
+[RACSignal empty]
 的内部实现,一个私有 
RACSignal
 子类。它就是一个会立即 
completed
 的信号。让我们来看看它的 
- subscribe:
 方法:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);

/**
*  只要一订阅,就给 subscriber 发送 completed 事件。
*/
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendCompleted];
}];
}


  这样一个订阅者一订阅就会 
completed
 信号有什么用呢?稍后揭晓。


2、RACReturnSignal

  
RACReturnSignal
 是 
+[RACSignal return:]
 的内部实现,也是一个私有 
RACSignal
 子类。它会同步发送出一个值(即 
next
)给订阅者,然后再发送 
completed
 事件。 它比
RACEmptySignal
 多了一点点东西,它是。直接看其实现:
@interface RACReturnSignal ()
// 在本信号被订阅时会发送的值。
@property (nonatomic, strong, readonly) id value;
@end

@implementation RACReturnSignal

#pragma mark Lifecycle

+ (RACSignal *)return:(id)value {
RACReturnSignal *signal = [[self alloc] init];
signal->_value = value;
return signal;
}

#pragma mark Subscription

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);

return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendNext:self.value];
[subscriber sendCompleted];
}];
}

@end


  纯吐槽:为啥要叫 
ReturnSignal
 呢?不如直接 
OneValueSignal
 好了。O(∩_∩)O~~ 不过说真的,RAC 的命名真心不咋地。

  那么发送一个 
next
 后又 
completed
 的信号又有啥用呢?等下会知道地。


2、concat: 练手

  
-[RACSignal concat:]
 是源码较简单,且使用频率也较多的。那咱们就来拿它来练练手好了。
- (RACSignal *)concat:(RACSignal *)signal {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];

RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
RACDisposable *concattedDisposable = [signal subscribe:subscriber];
serialDisposable.disposable = concattedDisposable;
}];

serialDisposable.disposable = sourceDisposable;
return serialDisposable;
}] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}


  
RACSerialDisposable
 是 
RACDisposable
 的子类,它包含一个 
Disposable
,能够在运行时设置这个 
Disposable
。当设置新的 
newDisposable
时,老的 
oldDisposable
 会被
dispose
。当 
RACSerialDisposable
 被 
dispose
 时,其所包含的 
Disposable
 会被 
dispose


  基本上,对一个 
RACSignal
 的操作的返回值是一个新的 
RACSignal
 值时,其内部都是调用了 
+[RACSignal createSignal:]
 这个方法。这个创建信号返回的实际是自定义信号:
RACDynamicSignal
,针对它前文有所介绍。

  这里有一个小技巧。因为很多信号的操作是针对该信号本身 
self
 所发送的值作的操作。那也就是说会订阅 
self
,那咱们先找到这一句再说:
self subscribe:
 或 
self subscribeNext:...
。嗯,找到了这几行:
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
RACDisposable *concattedDisposable = [signal subscribe:subscriber];
serialDisposable.disposable = concattedDisposable;
}];


  在订阅了 
self
 后,将 
next
 和 
error
 事件发送给订阅者 
subscriber
。当 
self
 发送了 
completed
 事件事,再让 
subscriber
 订阅参数 
signal
。也就是当源信号完成后订阅 
signal
。怎么样,很简单吧。


3、zipWith:

  再来一个练手的玩意。
-[RACSignal zipWith:]
 比 
-[RACSignal concat:]
 稍微复杂点。它是将 
self
 和 参数 
signal
 两个信号发送的值合并起来发送给订阅者。
- (RACSignal *)zipWith:(RACSignal *)signal {
NSCParameterAssert(signal != nil);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block BOOL selfCompleted = NO;
NSMutableArray *selfValues = [NSMutableArray array];

__block BOOL otherCompleted = NO;
NSMutableArray *otherValues = [NSMutableArray array];

void (^sendCompletedIfNecessary)(void) = ^{
@synchronized (selfValues) {
BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
if (selfEmpty || otherEmpty) [subscriber sendCompleted];
}
};

void (^sendNext)(void) = ^{
@synchronized (selfValues) {
if (selfValues.count == 0) return;
if (otherValues.count == 0) return;

RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]);
[selfValues removeObjectAtIndex:0];
[otherValues removeObjectAtIndex:0];

[subscriber sendNext:tuple];
sendCompletedIfNecessary();
}
};

RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (selfValues) {
[selfValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
selfCompleted = YES;
sendCompletedIfNecessary();
}
}];

RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
@synchronized (selfValues) {
[otherValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
otherCompleted = YES;
sendCompletedIfNecessary();
}
}];

return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[otherDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -zipWith: %@", self.name, signal];
}

@end


  同样的,重点在 
[self subscriberNext:]
 和 
[signal subscribeNext:]
 处。这里的实现是订阅 
self
 和 
signal
 信号,然后将它们发送出的值收集起来,当两个都发出了值时,分别拿出两个信号最早发出的值,合并为一个 
RACTuple
,再发送给订阅者 
subscriber
。这个也很简单吧,只是代码稍多点而已。


4、bind:


1、说明

  信号的很多 operations 的实现调用来调用去最后都是调用了这个 
-[RACSignal bind:]
 方法,比如 
flattenMap:
 、
map:
filter
 等等。那咱们就来看看这个方法是哪路神仙?

  这是在 
RACStream
 中声明的抽象方法。来看看它的声明:
typedef RACStream * (^RACStreamBindBlock)(id value, BOOL *stop);
- (instancetype)bind:(RACStreamBindBlock (^)(void))block;


  
RACStreamBindBlock
 是一个 block。它从一个 
RACStream
 中接收一个值,并且返回一个与该流相同类型的实例。如果将 
stop
 设为 
YES
,则会在返回一个实例后终结此次 
bind
。如果返回
nil
 则会立即终结。 

   

  
bind:
 方法是将流中每一个值都放到 
RACStreamBindBlock
 中跑一下。来看看其参数:
block
。然而这有什么卵用呢?好吧,我太笨,从它的说明来看,我真的不能理解它有什么用。


2、源码解读

  既然从方法说明了解不到,那直接来看其源码了。
- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACStreamBindBlock bindingBlock = block();

NSMutableArray *signals = [NSMutableArray arrayWithObject:self];

RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

// 三.
void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
BOOL removeDisposable = NO;

@synchronized (signals) {
[signals removeObject:signal];

if (signals.count == 0) {
[subscriber sendCompleted];
[compoundDisposable dispose];
} else {
removeDisposable = YES;
}
}

if (removeDisposable) [compoundDisposable removeDisposable:finishedDisposable];
};

// 二.
void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
@synchronized (signals) {
[signals addObject:signal];
}

RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];

RACDisposable *disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(signal, selfDisposable);
}
}];

selfDisposable.disposable = disposable;
};

@autoreleasepool {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];

//  一.
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
// Manually check disposal to handle synchronous errors.
if (compoundDisposable.disposed) return;

BOOL stop = NO;
id signal = bindingBlock(x, &stop);

@autoreleasepool {
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(self, selfDisposable);
}
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(self, selfDisposable);
}
}];

selfDisposable.disposable = bindingDisposable;
}

return compoundDisposable;
}] setNameWithFormat:@"[%@] -bind:", self.name];
}


  我们一步一步来看。先从第 一 步开始,其步骤如下: 

  1. 订阅 
self
 

  2. 针对 
self
 发出的每一个值 
x
,经过 
bindingBlock
,获取一个信号:
signal
 

    1. 如果 
signal
 不为 nil,就转到第二步:
addSignal
 

    2. 如果 
signal
 为 nil,或 
stop
 为 
YES
,则转到第三步:
completedSignal
 

  3. 如果 
self
 发出 
error
 事件,则中断订阅;如果 
self
 发出 
completed
 事件则转到第三步:
completedSignal


  第二步:
addSignal:signal
 

  1. 先将 
signal
 添加到 
signals
中 

  2. 订阅 
signal
 

    1. 将 
signal
 的 
next
 事件转发给订阅者 
subscriber
 

    2. 如果 
signal
 发送 
error
 事件则中断订阅 

    3. 如果 
signal
 发送 
complete
 事件,则转到第三步

  第三步:
completeSignal:signal:disposable
 

  1. 将 
signal
 从 
signals
 中移除 

  2. 如果 
signals
 中没有了 
signal
,那么订阅就完成了

  好了,来总结一下这个 
-bind:
 : 

  1. 订阅原信号 
self
 的 values。 

  2. 将 
self
 发出的任何一个值,都对其使用 
bindingBlock
 进行转换。 

  3. 如果 
bindingBlock
 返回一个信号,则订阅它,将从它那接收到的每个值都传递给订阅者 
subscriber
。 

  4. 如果 
bindingBlock
 要求结束绑定,则 complete 
self
 信号。 

  5. 如果 所有 的信号全都 complete,则给 
subscriber
 发送 
completed
 事件. 

  6. 如果任何一个信号发出 
error
,将其发送给 
subscriber


  那从中可以玩出什么花样呢?


3、示例

  咱们先用用它,再看看能怎么玩吧。 

  


示例1:结合 
RACReturnSignal

RACSignal *signalInterval = [[RACSignal interval:1.0 onScheduler:[RACScheduler mainThreadScheduler]] take:3];

RACSignal *bindSignal = [signalInterval bind:^RACStreamBindBlock{
return ^(id value, BOOL *stop) {
NSLog(@"inner value: %@", value);
return [RACSignal return:value];
};
}];

[bindSignal subscribeNext:^(id x) {
NSLog(@"outer value: %@", x);
}];


  输出如下:
2015-08-27 17:16:17.933 RACPraiseDemo[3063:168556] inner value: 2015-08-27 09:16:17 +0000
2015-08-27 17:16:17.934 RACPraiseDemo[3063:168556] outer value: 2015-08-27 09:16:17 +0000
2015-08-27 17:16:18.931 RACPraiseDemo[3063:168556] inner value: 2015-08-27 09:16:18 +0000
2015-08-27 17:16:18.931 RACPraiseDemo[3063:168556] outer value: 2015-08-27 09:16:18 +0000
2015-08-27 17:16:19.931 RACPraiseDemo[3063:168556] inner value: 2015-08-27 09:16:19 +0000
2015-08-27 17:16:19.931 RACPraiseDemo[3063:168556] outer value: 2015-08-27 09:16:19 +0000


  这个示例就是在 
bind:
 中简单的返回值。那咱们将这个值变化一下如何?


示例2:结合 
RACReturnSignal
、转换 value

RACSignal *signalInterval = [[RACSignal interval:1.0 onScheduler:[RACScheduler mainThreadScheduler]] take:3];

RACSignal *bindSignal = [signalInterval bind:^RACStreamBindBlock{
return ^(NSDate *value, BOOL *stop) {
NSLog(@"inner value: %@", value);

NSTimeInterval nowTime = [value timeIntervalSince1970];
return [RACSignal return:@(nowTime)];
};
}];

[bindSignal subscribeNext:^(id x) {
NSLog(@"outer value: %@", x);
}];


  输出如下:
2015-08-27 17:34:04.938 RACPraiseDemo[3153:176383] inner value: 2015-08-27 09:34:04 +0000
2015-08-27 17:34:04.939 RACPraiseDemo[3153:176383] outer value: 1440668044.936496
2015-08-27 17:34:05.939 RACPraiseDemo[3153:176383] inner value: 2015-08-27 09:34:05 +0000
2015-08-27 17:34:05.939 RACPraiseDemo[3153:176383] outer value: 1440668045.939163
2015-08-27 17:34:06.941 RACPraiseDemo[3153:176383] inner value: 2015-08-27 09:34:06 +0000
2015-08-27 17:34:06.941 RACPraiseDemo[3153:176383] outer value: 1440668046.941275


  哇哇,这就是个 
map:
 有木有? 现在,有感受到 
RACReturnSignal
 的魅力?
RACReturnSignal
 和 
-bind:
 结合能转换 value。


示例3:结合 
RACEmptySignal

  现在来换个玩法试试看,这回换 
RACEmptySignal
 来玩玩。
RACSignal *signalInterval = [[RACSignal interval:1.0 onScheduler:[RACScheduler mainThreadScheduler]] take:3];

__block NSUInteger count = 0;
RACSignal *bindSignal = [signalInterval bind:^RACStreamBindBlock{
return ^(NSDate *value, BOOL *stop) {
NSLog(@"inner value: %@", value);

++count;
if (count % 2 == 0) {
return [RACSignal empty];
}

return [RACSignal return:value];
};
}];

[bindSignal subscribeNext:^(id x) {
NSLog(@"outer value: %@", x);
}];


  输出如下:
2015-08-27 17:53:45.345 RACPraiseDemo[3363:188270] inner value: 2015-08-27 09:53:45 +0000
2015-08-27 17:53:45.346 RACPraiseDemo[3363:188270] outer value: 2015-08-27 09:53:45 +0000
2015-08-27 17:53:46.345 RACPraiseDemo[3363:188270] inner value: 2015-08-27 09:53:46 +0000
2015-08-27 17:53:47.342 RACPraiseDemo[3363:188270] inner value: 2015-08-27 09:53:47 +0000
2015-08-27 17:53:47.342 RACPraiseDemo[3363:188270] outer value: 2015-08-27 09:53:47 +0000


  这一次,“outer value” 比 “inner value” 少了一个,这就是 
filter:
 呀!
RACEmptySignal
 与 
bind:
 结合能过滤 value。


示例4:改进 
bind:

  经过这几个示例,我们可以发现,直接使用 
bind:
 是比较麻烦的。而一般情况下,咱们还真用不到 
stop
,那咱们就改进一下呗:
- (instancetype)flattenMap:(RACStream * (^)(id value))block {
Class class = self.class;

return [[self bind:^{
return ^(id value, BOOL *stop) {
/**
*  如果 block 返回 nil,得用 RACEmptySignal 代替,
*  不然会结束 `bind:`
*/
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}


  哈哈,这个就是 
- flattenMap:
了。不必过多解释了吧~


5、
-map:

  嗯,这其实就是 
-flattenMap:
 与 
RACReturnSignal
 的结合:
- (instancetype)map:(id (^)(id value))block {
NSCParameterAssert(block != nil);

Class class = self.class;

return [[self flattenMap:^(id value) {
return [class return:block(value)];
}] setNameWithFormat:@"[%@] -map:", self.name];
}


6、
-flatten

  信号可以发送任何类型的值,当然也包括 
RACSignal
 类型。例如,
RACCommand
 的 
executionSignals
 这个信号,它发出的值就是 
RACSignal
 类型的。对于这种发出的值是 
RACSignal
 类型的 
RACSignal
,叫做 signal of signals。这有点类似于 disposable
of disposables。

  既然这个信号发出的就是 
RACSignal
,那在 
-flattenMap:
中,我们直接将 value 返回就好了。来看看示例:
/**
*  有效三次的间隔为1秒定时器信号,此时 signalInterval 是一个 signal of NSDates
*/
RACSignal *signalInterval = [[RACSignal interval:1 onScheduler:[RACScheduler mainThreadScheduler]] take:3];

/**
*  将定时器信号里的值修改成 RACSignal 类型
*  此时,signalInterval 变成了一个 signal of signals
*/
signalInterval = [signalInterval map:^id(NSDate *date) {
return [RACSignal return:date];
}];

/**
*  既然 signalInterval 里的值都是信号,那直接将这些信号返回即可
*/
RACSignal *signal = [signalInterval flattenMap:^RACStream *(RACSignal *returnSignal) {
return returnSignal;
}];

/**
*  由于 signalInterval 里的值都是包含了一个 NSDate 值的 RACReturnSignal,
*  经过 `-flattenMap:` 过后,signal 就变成了 signal of NSDates。
*/
[signal subscribeNext:^(id x) {
NSLog(@"value: %@", x);
}];


  输出如下:
2015-08-27 21:16:29.517 RACPraiseDemo[549:11996] value: 2015-08-27 13:16:29 +0000
2015-08-27 21:16:30.516 RACPraiseDemo[549:11996] value: 2015-08-27 13:16:30 +0000
2015-08-27 21:16:31.516 RACPraiseDemo[549:11996] value: 2015-08-27 13:16:31 +0000


7、小结

  
RACSignal
 的 operations 实在太多,全部在这里列出来不现实,也没有这个必要。我相信,经过前面的解析,你现在再去看其它 的一个 operation 源码,也应该不是太大的难事。


RAC() 宏展开

  RAC 的最大的魅力之一就是绑定:
RAC(self, ...) = signal;
 这应该是大家经常写的一条语句。有没有想过它是怎么工作的呢?咱们来看点代码:
@property (nonatomic, strong) NSString *text;

//--------
RACSignal *signal = [[RACSignal interval:1 onScheduler:[RACScheduler mainThreadScheduler]] take:3];
signal = [signal map:^id(id value) {
return [value description];
}];

RAC(self, text) = signal;


  重点在 
RAC(self, text) = signal;
 这一行。先来看看将这个宏展开是什么样子(RAC 对宏的运用很是牛B,有兴趣请看这篇文章):
[[RACSubscriptingAssignmentTrampoline alloc] initWithTarget:(self) nilValue:nil][@"text"] = signal;


  看得更清楚一点:
RACSubscriptingAssignmentTrampoline *assignment = [[RACSubscriptingAssignmentTrampoline alloc] initWithTarget:(self) nilValue:nil];
assignment[@"text"] = signal;


  跳到 
RACSubscriptingAssignmentTrampoline
 类的声明,可以看到:
@interface RACSubscriptingAssignmentTrampoline : NSObject
- (id)initWithTarget:(id)target nilValue:(id)nilValue;

// 这是可以使用下标 `[]` 语法的关键
- (void)setObject:(RACSignal *)signal forKeyedSubscript:(NSString *)keyPath;
@end


  这个类使用了 clang 的特性,可以使用 
[]
语法(
[]
 的相关文章)。也就是说 
assignment[@"text"] = signal;
,实际上是这样子的:
[assignment setObject:signal forKeyedSubscript:@"text"];


  再看 
- (void)setObject:(RACSignal *)signal forKeyedSubscript:(NSString *)keyPath;
 这个方法的实现,我们发现,它其实调用的是 
signal
 的方法:
- (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object nilValue:(id)nilValue
,再像上面的方法一样来分析这个方法,我们找到了关键点:
- (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object nilValue:(id)nilValue {
...
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
...
[object setValue:x ?: nilValue forKeyPath:keyPath];
}...
...
}


  哦,原来它就是订阅了 
signal
,并将 
signal
 发出的每一值都设置给 
object
 的 
keyPath
 属性而已。很简单嘛~


结束

  本文研究了 RAC 中的一些基本组件,并没有对一些高级内容进行深入研究,所以才叫“浅析”。但这些也是对高级内容深入研究的基础,既然有“渔”,何惧无“鱼”呢?

  其实颇想继续分享,但心有余而力不足。

  还可研究的主题: 

  1. 
Subjects
 它也是 
RACSignal
 一些操作的基础,值得研究。难度系数:2 (最高为5) 

  2. 
RACMulticastConnection
 常用,值得研究。难度系数:3 

  3. Foundation、UIKit、KVO (给各系统类加的 rac_ 扩展),有研究价值。研究过后,你会对 runtime 会有很深入的了解,还会接触到一些 OC 中少用的知识(如 NSProxy 等),能开拓视野。难度系数:5 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: