第一篇文章中我们介绍了冷信号与热信号的概念,前一篇文章我们也讨论了为什么要区分冷信号与热信号,下面我会先为大家揭晓热信号的本质,再给出冷信号转换成热信号的方法。
在ReactiveCocoa中,究竟什么才是热信号呢?冷信号是比较常见的,map一下就会得到一个冷信号。但在RAC中,好像并没有“hot signal”这个单独的说法。原来在RAC的世界中,所有的热信号都属于一个类——RACSubject。接下来我们来看看究竟它为什么这么“神奇”。
在RAC2.5文档的框架概述中,有着这样一段描述:
A subject, represented by the RACSubject class, is a signal that can be manually controlled.
Subjects can be thought of as the "mutable" variant of a signal, much like NSMutableArray is for NSArray. They are extremely useful for bridging non-RAC code into the world of signals.
For example, instead of handling application logic in block callbacks, the blocks can simply send events to a shared subject instead. The subject can then be returned as a RACSignal, hiding the implementation detail of the callbacks.
Some subjects offer additional behaviors as well. In particular, RACReplaySubject can be used to buffer events for future subscribers, like when a network request finishes before anything is ready to handle the result.
从这段描述中,我们可以发现Subject具备如下三个特点:
从第三个特点来看,Subject具备为未来订阅者缓冲事件的能力,那也就说明它是自身是有状态的。根据上文的介绍,Subject是符合热信号的特点的。为了验证它,我们再来做个简单实验:
RACSubject *subject = [RACSubject subject]; RACSubject *replaySubject = [RACReplaySubject subject]; [[RACScheduler mainThreadScheduler] afterDelay:0.1 schedule:^{ // Subscriber 1 [subject subscribeNext:^(id x) { NSLog(@"Subscriber 1 get a next value: %@ from subject", x); }]; [replaySubject subscribeNext:^(id x) { NSLog(@"Subscriber 1 get a next value: %@ from replay subject", x); }]; // Subscriber 2 [subject subscribeNext:^(id x) { NSLog(@"Subscriber 2 get a next value: %@ from subject", x); }]; [replaySubject subscribeNext:^(id x) { NSLog(@"Subscriber 2 get a next value: %@ from replay subject", x); }]; }]; [[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{ [subject sendNext:@"send package 1"]; [replaySubject sendNext:@"send package 1"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:1.1 schedule:^{ // Subscriber 3 [subject subscribeNext:^(id x) { NSLog(@"Subscriber 3 get a next value: %@ from subject", x); }]; [replaySubject subscribeNext:^(id x) { NSLog(@"Subscriber 3 get a next value: %@ from replay subject", x); }]; // Subscriber 4 [subject subscribeNext:^(id x) { NSLog(@"Subscriber 4 get a next value: %@ from subject", x); }]; [replaySubject subscribeNext:^(id x) { NSLog(@"Subscriber 4 get a next value: %@ from replay subject", x); }]; }]; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [subject sendNext:@"send package 2"]; [replaySubject sendNext:@"send package 2"]; }];
按照时间线来解读一下上述代码:
接下来看一下输出的结果:
2015-09-28 13:35:22.855 RACDemos[13646:1269269] Start 2015-09-28 13:35:23.856 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 1 from subject 2015-09-28 13:35:23.856 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 1 from subject 2015-09-28 13:35:23.857 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 1 from replay subject 2015-09-28 13:35:23.857 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 1 from replay subject 2015-09-28 13:35:24.059 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 1 from replay subject 2015-09-28 13:35:24.059 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 1 from replay subject 2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 2 from subject 2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 2 from subject 2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 2 from subject 2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 2 from subject 2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 2 from replay subject 2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 2 from replay subject 2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 2 from replay subject 2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 2 from replay subject
结合结果可以分析出如下内容:
只关注subject,根据时间线,我们可以得到下图:
经过观察不难发现,4个订阅者实际上是共享subject的,一旦这个subject发送了值,当前的订阅者就会同时接收到。由于Subscriber 3与Subscriber 4的订阅时间稍晚,所以错过了第一次值的发送。这与冷信号是截然不同的反应。冷信号的图类似下图:
对比上面两张图,是不是可以发现,subject类似“直播”,错过了就不再处理。而signal类似“点播”,每次订阅都会从头开始。所以我们有理由认定subject天然就是热信号。
下面再来看看replaySubject,根据时间线,我们能得到另一张图:
将图3与图1对比会发现,Subscriber 3与Subscriber 4在订阅后马上接收到了“历史值”。对于Subscriber 3和Subscriber 4来说,它们只关心“历史的值”而不关心“历史的时间线”,因为实际上1与2是间隔1s发送的,但是它们接收到的显然不是。举个生动的例子,就好像科幻电影里面主人公穿越时间线后会先把所有的回忆快速闪过再来到现实一样。(见《X战警:逆转未来》、《蝴蝶效应》)所以我们也有理由认定replaySubject天然也是热信号。
看到这里,我们终于揭开了热信号的面纱,结论就是:
冷信号与热信号的本质区别在于是否保持状态,冷信号的多次订阅是不保持状态的,而热信号的多次订阅可以保持状态。所以一种将冷信号转换为热信号的方法就是,将冷信号订阅,订阅到的每一个时间通过RACSbuject发送出去,其他订阅者只订阅这个RACSubject。
观察下面的代码:
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { NSLog(@"Cold signal be subscribed."); [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{ [subscriber sendNext:@"A"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{ [subscriber sendNext:@"B"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{ [subscriber sendCompleted]; }]; return nil; }]; RACSubject *subject = [RACSubject subject]; NSLog(@"Subject created."); [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [coldSignal subscribe:subject]; }]; [subject subscribeNext:^(id x) { NSLog(@"Subscriber 1 recieve value:%@.", x); }]; [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{ [subject subscribeNext:^(id x) { NSLog(@"Subscriber 2 recieve value:%@.", x); }];
执行顺序是这样的:
如果所料不错的话,通过订阅这个subject并不会引起coldSignal重复执行block的内容。我们来看下结果:
2015-09-28 19:36:45.703 RACDemos[14110:1556061] Subject created. 2015-09-28 19:36:47.705 RACDemos[14110:1556061] Cold signal be subscribed. 2015-09-28 19:36:49.331 RACDemos[14110:1556061] Subscriber 1 recieve value:A. 2015-09-28 19:36:50.999 RACDemos[14110:1556061] Subscriber 1 recieve value:B. 2015-09-28 19:36:50.999 RACDemos[14110:1556061] Subscriber 2 recieve value:B.
参考时间线,会得到下图:
不难发现其中的几个重点:
通过观察可以确定,subject就是coldSignal转化的热信号。所以使用RACSubject来将冷信号转化为热信号是可行的。
当然,使用这种RACSubject来订阅冷信号得到热信号的方式仍有一些小的瑕疵。例如subject的订阅者提前终止了订阅,而subject并不能终止对coldSignal的订阅。(RACDisposable是一个比较大的话题,我计划在其他的文章中详细阐述它,也希望感兴趣的同学自己来理解。)所以在RAC库中对于冷信号转化成热信号有如下标准的封装:
- (RACMulticastConnection *)publish; - (RACMulticastConnection *)multicast:(RACSubject *)subject; - (RACSignal *)replay; - (RACSignal *)replayLast; - (RACSignal *)replayLazily;
这5个方法中,最为重要的就是- (RACMulticastConnection *)multicast:(RACSubject *)subject;这个方法了,其他几个方法也是间接调用它的。我们来看看它的实现:
/// implementation RACSignal (Operations) - (RACMulticastConnection *)multicast:(RACSubject *)subject { [subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name]; RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject]; return connection; } /// implementation RACMulticastConnection - (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject { NSCParameterAssert(source != nil); NSCParameterAssert(subject != nil); self = [super init]; if (self == nil) return nil; _sourceSignal = source; _serialDisposable = [[RACSerialDisposable alloc] init]; _signal = subject; return self; } #pragma mark Connecting - (RACDisposable *)connect { BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected); if (shouldConnect) { self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal]; } return self.serialDisposable; } - (RACSignal *)autoconnect { __block volatile int32_t subscriberCount = 0; return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { OSAtomicIncrement32Barrier(&subscriberCount); RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber]; RACDisposable *connectionDisposable = [self connect]; return [RACDisposable disposableWithBlock:^{ [subscriptionDisposable dispose]; if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) { [connectionDisposable dispose]; } }]; }] setNameWithFormat:@"[%@] -autoconnect", self.signal.name]; }
虽然代码比较短但不是很好懂,大概来说明一下:
由于RAC是一个线程安全的框架,所以好奇的同学可以了解下“OSAtomic*”这一系列的原子操作。抛开这些应该不难理解上述代码。
了解源码之后,这个方法的正确使用就清楚了,应该像这样:
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { NSLog(@"Cold signal be subscribed."); [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{ [subscriber sendNext:@"A"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{ [subscriber sendNext:@"B"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{ [subscriber sendCompleted]; }]; return nil; }]; RACSubject *subject = [RACSubject subject]; NSLog(@"Subject created."); RACMulticastConnection *multicastConnection = [coldSignal multicast:subject]; RACSignal *hotSignal = multicastConnection.signal; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [multicastConnection connect]; }]; [hotSignal subscribeNext:^(id x) { NSLog(@"Subscribe 1 recieve value:%@.", x); }]; [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{ [hotSignal subscribeNext:^(id x) { NSLog(@"Subscribe 2 recieve value:%@.", x); }]; }];或者这样:
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { NSLog(@"Cold signal be subscribed."); [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{ [subscriber sendNext:@"A"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{ [subscriber sendNext:@"B"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{ [subscriber sendCompleted]; }]; return nil; }]; RACSubject *subject = [RACSubject subject]; NSLog(@"Subject created."); RACMulticastConnection *multicastConnection = [coldSignal multicast:subject]; RACSignal *hotSignal = multicastConnection.autoconnect; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [hotSignal subscribeNext:^(id x) { NSLog(@"Subscribe 1 recieve value:%@.", x); }]; }]; [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{ [hotSignal subscribeNext:^(id x) { NSLog(@"Subscribe 2 recieve value:%@.", x); }]; }];
以上的两种写法和之前用Subject来传递的例子都可以得到相同的结果。
下面再来看看其他几个方法的实现:
/// implementation RACSignal (Operations) - (RACMulticastConnection *)publish { RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name]; RACMulticastConnection *connection = [self multicast:subject]; return connection; } - (RACSignal *)replay { RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name]; RACMulticastConnection *connection = [self multicast:subject]; [connection connect]; return connection.signal; } - (RACSignal *)replayLast { RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name]; RACMulticastConnection *connection = [self multicast:subject]; [connection connect]; return connection.signal; } - (RACSignal *)replayLazily { RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]]; return [[RACSignal defer:^{ [connection connect]; return connection.signal; }] setNameWithFormat:@"[%@] -replayLazily", self.name]; }
这几个方法的实现都相当简单,只是为了简化而封装,具体说明一下:
所以,其实本质仍然是
使用一个Subject来订阅原始信号,并让其他订阅者订阅这个Subject,这个Subject就是热信号。
现在再回过来看下之前系列文章第二篇中那个业务场景的例子,其实修改的方法很简单,就是在网络获取的fetchData这个信号后面,增加一个replayLazily变换,就不会出现网络请求重发6次的问题了。
修改后的代码如下,大家可以试试:
self.sessionManager = [[AFHTTPSessionManager alloc] initWithBaseURL:[NSURL URLWithString:@"http://api.xxxx.com"]]; self.sessionManager.requestSerializer = [AFJSONRequestSerializer serializer]; self.sessionManager.responseSerializer = [AFJSONResponseSerializer serializer]; @weakify(self) RACSignal *fetchData = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { @strongify(self) NSURLSessionDataTask *task = [self.sessionManager GET:@"fetchData" parameters:@{@"someParameter": @"someValue"} success:^(NSURLSessionDataTask *task, id responseObject) { [subscriber sendNext:responseObject]; [subscriber sendCompleted]; } failure:^(NSURLSessionDataTask *task, NSError *error) { [subscriber sendError:error]; }]; return [RACDisposable disposableWithBlock:^{ if (task.state != NSURLSessionTaskStateCompleted) { [task cancel]; } }]; }] replayLazily]; // modify here!! RACSignal *title = [fetchData flattenMap:^RACSignal *(NSDictionary *value) { if ([value[@"title"] isKindOfClass:[NSString class]]) { return [RACSignal return:value[@"title"]]; } else { return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]]; } }]; RACSignal *desc = [fetchData flattenMap:^RACSignal *(NSDictionary *value) { if ([value[@"desc"] isKindOfClass:[NSString class]]) { return [RACSignal return:value[@"desc"]]; } else { return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]]; } }]; RACSignal *renderedDesc = [desc flattenMap:^RACStream *(NSString *value) { NSError *error = nil; RenderManager *renderManager = [[RenderManager alloc] init]; NSAttributedString *rendered = [renderManager renderText:value error:&error]; if (error) { return [RACSignal error:error]; } else { return [RACSignal return:rendered]; } }]; RAC(self.someLablel, text) = [[title catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."]; RAC(self.originTextView, text) = [[desc catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."]; RAC(self.renderedTextView, attributedText) = [[renderedDesc catchTo:[RACSignal return:[[NSAttributedString alloc] initWithString:@"Error"]]] startWith:[[NSAttributedString alloc] initWithString:@"Loading..."]]; [[RACSignal merge:@[title, desc, renderedDesc]] subscribeError:^(NSError *error) { UIAlertView *alertView = [[UIAlertView alloc] initWithTitle:@"Error" message:error.domain delegate:nil cancelButtonTitle:@"OK" otherButtonTitles:nil]; [alertView show]; }];
当然,细心的同学会发现这样修改,仍然有许多计算上的浪费,例如将fetchData转换为title的block会执行多次,将fetchData转换为desc的block也会执行多次。但是由于这些block都是无副作用的,计算量并不大,可以忽略不计。如果计算量大的,也需要对中间的信号进行热信号的转换。不过请不要忽略冷热信号的转换本身也是有计算代价的。
好的,写到这里,我们终于揭开RAC中冷信号与热信号的全部面纱,也知道如何使用了。希望这个系列文章可以让大家更好地了解RAC,避免使用RAC遇到的误区。谢谢大家。
美团iOS组有很多志同道合的小伙伴,对于各种技术都有着深入的了解,我们热忱地欢迎一切牛掰的小伙伴加入,共同学习,共同进步。(简历请发送到邮箱 liangsi02@meituan.com)
不想错过技术博客更新?想给文章评论、和作者互动?第一时间获取技术沙龙信息?
请关注我们的官方微信公众号“美团点评技术团队”。现在就拿出手机,扫一扫: