在ReactiveCocoa中,究竟什么才是热信号呢?冷信号是比较常见的,map一下就会得到一个冷信号。但在RAC中,好像并没有“hot signal”这个单独的说法。原来在RAC的世界中,所有的热信号都属于一个类——RACSubject。接下来我们来看看究竟它为什么这么“神奇”。
A subject, represented by the RACSubject class, is a signal that can be manually controlled.
Subjects can be thought of as the "mutable" variant of a signal, much like NSMutableArray is for NSArray. They are extremely useful for bridging non-RAC code into the world of signals.
For example, instead of handling application logic in block callbacks, the blocks can simply send events to a shared subject instead. The subject can then be returned as a RACSignal, hiding the implementation detail of the callbacks.
Some subjects offer additional behaviors as well. In particular, RACReplaySubject can be used to buffer events for future subscribers, like when a network request finishes before anything is ready to handle the result.
RACSubject *subject = [RACSubject subject]; RACSubject *replaySubject = [RACReplaySubject subject]; [[RACScheduler mainThreadScheduler] afterDelay:0.1 schedule:^{ // Subscriber 1 [subject subscribeNext:^(id x) { NSLog(@"Subscriber 1 get a next value: %@ from subject", x); }]; [replaySubject subscribeNext:^(id x) { NSLog(@"Subscriber 1 get a next value: %@ from replay subject", x); }]; // Subscriber 2 [subject subscribeNext:^(id x) { NSLog(@"Subscriber 2 get a next value: %@ from subject", x); }]; [replaySubject subscribeNext:^(id x) { NSLog(@"Subscriber 2 get a next value: %@ from replay subject", x); }]; }]; [[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{ [subject sendNext:@"send package 1"]; [replaySubject sendNext:@"send package 1"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:1.1 schedule:^{ // Subscriber 3 [subject subscribeNext:^(id x) { NSLog(@"Subscriber 3 get a next value: %@ from subject", x); }]; [replaySubject subscribeNext:^(id x) { NSLog(@"Subscriber 3 get a next value: %@ from replay subject", x); }]; // Subscriber 4 [subject subscribeNext:^(id x) { NSLog(@"Subscriber 4 get a next value: %@ from subject", x); }]; [replaySubject subscribeNext:^(id x) { NSLog(@"Subscriber 4 get a next value: %@ from replay subject", x); }]; }]; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [subject sendNext:@"send package 2"]; [replaySubject sendNext:@"send package 2"]; }];
2015-09-28 13:35:22.855 RACDemos[13646:1269269] Start 2015-09-28 13:35:23.856 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 1 from subject 2015-09-28 13:35:23.856 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 1 from subject 2015-09-28 13:35:23.857 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 1 from replay subject 2015-09-28 13:35:23.857 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 1 from replay subject 2015-09-28 13:35:24.059 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 1 from replay subject 2015-09-28 13:35:24.059 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 1 from replay subject 2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 2 from subject 2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 2 from subject 2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 2 from subject 2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 2 from subject 2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 2 from replay subject 2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 2 from replay subject 2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 2 from replay subject 2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 2 from replay subject
经过观察不难发现,4个订阅者实际上是共享subject的,一旦这个subject发送了值,当前的订阅者就会同时接收到。由于Subscriber 3与Subscriber 4的订阅时间稍晚,所以错过了第一次值的发送。这与冷信号是截然不同的反应。冷信号的图类似下图:
将图3与图1对比会发现,Subscriber 3与Subscriber 4在订阅后马上接收到了“历史值”。对于Subscriber 3和Subscriber 4来说,它们只关心“历史的值”而不关心“历史的时间线”,因为实际上1与2是间隔1s发送的,但是它们接收到的显然不是。举个生动的例子,就好像科幻电影里面主人公穿越时间线后会先把所有的回忆快速闪过再来到现实一样。(见《X战警:逆转未来》、《蝴蝶效应》)所以我们也有理由认定replaySubject天然也是热信号。
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { NSLog(@"Cold signal be subscribed."); [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{ [subscriber sendNext:@"A"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{ [subscriber sendNext:@"B"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{ [subscriber sendCompleted]; }]; return nil; }]; RACSubject *subject = [RACSubject subject]; NSLog(@"Subject created."); [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [coldSignal subscribe:subject]; }]; [subject subscribeNext:^(id x) { NSLog(@"Subscriber 1 recieve value:%@.", x); }]; [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{ [subject subscribeNext:^(id x) { NSLog(@"Subscriber 2 recieve value:%@.", x); }];
2015-09-28 19:36:45.703 RACDemos[14110:1556061] Subject created. 2015-09-28 19:36:47.705 RACDemos[14110:1556061] Cold signal be subscribed. 2015-09-28 19:36:49.331 RACDemos[14110:1556061] Subscriber 1 recieve value:A. 2015-09-28 19:36:50.999 RACDemos[14110:1556061] Subscriber 1 recieve value:B. 2015-09-28 19:36:50.999 RACDemos[14110:1556061] Subscriber 2 recieve value:B.
- (RACMulticastConnection *)publish; - (RACMulticastConnection *)multicast:(RACSubject *)subject; - (RACSignal *)replay; - (RACSignal *)replayLast; - (RACSignal *)replayLazily;
这5个方法中,最为重要的就是- (RACMulticastConnection *)multicast:(RACSubject *)subject;这个方法了,其他几个方法也是间接调用它的。我们来看看它的实现:
/// implementation RACSignal (Operations) - (RACMulticastConnection *)multicast:(RACSubject *)subject { [subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name]; RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject]; return connection; } /// implementation RACMulticastConnection - (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject { NSCParameterAssert(source != nil); NSCParameterAssert(subject != nil); self = [super init]; if (self == nil) return nil; _sourceSignal = source; _serialDisposable = [[RACSerialDisposable alloc] init]; _signal = subject; return self; } #pragma mark Connecting - (RACDisposable *)connect { BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected); if (shouldConnect) { self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal]; } return self.serialDisposable; } - (RACSignal *)autoconnect { __block volatile int32_t subscriberCount = 0; return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { OSAtomicIncrement32Barrier(&subscriberCount); RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber]; RACDisposable *connectionDisposable = [self connect]; return [RACDisposable disposableWithBlock:^{ [subscriptionDisposable dispose]; if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) { [connectionDisposable dispose]; } }]; }] setNameWithFormat:@"[%@] -autoconnect", self.signal.name]; }
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { NSLog(@"Cold signal be subscribed."); [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{ [subscriber sendNext:@"A"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{ [subscriber sendNext:@"B"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{ [subscriber sendCompleted]; }]; return nil; }]; RACSubject *subject = [RACSubject subject]; NSLog(@"Subject created."); RACMulticastConnection *multicastConnection = [coldSignal multicast:subject]; RACSignal *hotSignal = multicastConnection.signal; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [multicastConnection connect]; }]; [hotSignal subscribeNext:^(id x) { NSLog(@"Subscribe 1 recieve value:%@.", x); }]; [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{ [hotSignal subscribeNext:^(id x) { NSLog(@"Subscribe 2 recieve value:%@.", x); }]; }];或者这样:
RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { NSLog(@"Cold signal be subscribed."); [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{ [subscriber sendNext:@"A"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{ [subscriber sendNext:@"B"]; }]; [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{ [subscriber sendCompleted]; }]; return nil; }]; RACSubject *subject = [RACSubject subject]; NSLog(@"Subject created."); RACMulticastConnection *multicastConnection = [coldSignal multicast:subject]; RACSignal *hotSignal = multicastConnection.autoconnect; [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{ [hotSignal subscribeNext:^(id x) { NSLog(@"Subscribe 1 recieve value:%@.", x); }]; }]; [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{ [hotSignal subscribeNext:^(id x) { NSLog(@"Subscribe 2 recieve value:%@.", x); }]; }];
/// implementation RACSignal (Operations) - (RACMulticastConnection *)publish { RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name]; RACMulticastConnection *connection = [self multicast:subject]; return connection; } - (RACSignal *)replay { RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name]; RACMulticastConnection *connection = [self multicast:subject]; [connection connect]; return connection.signal; } - (RACSignal *)replayLast { RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name]; RACMulticastConnection *connection = [self multicast:subject]; [connection connect]; return connection.signal; } - (RACSignal *)replayLazily { RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]]; return [[RACSignal defer:^{ [connection connect]; return connection.signal; }] setNameWithFormat:@"[%@] -replayLazily", self.name]; }
self.sessionManager = [[AFHTTPSessionManager alloc] initWithBaseURL:[NSURL URLWithString:@"http://api.xxxx.com"]]; self.sessionManager.requestSerializer = [AFJSONRequestSerializer serializer]; self.sessionManager.responseSerializer = [AFJSONResponseSerializer serializer]; @weakify(self) RACSignal *fetchData = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { @strongify(self) NSURLSessionDataTask *task = [self.sessionManager GET:@"fetchData" parameters:@{@"someParameter": @"someValue"} success:^(NSURLSessionDataTask *task, id responseObject) { [subscriber sendNext:responseObject]; [subscriber sendCompleted]; } failure:^(NSURLSessionDataTask *task, NSError *error) { [subscriber sendError:error]; }]; return [RACDisposable disposableWithBlock:^{ if (task.state != NSURLSessionTaskStateCompleted) { [task cancel]; } }]; }] replayLazily]; // modify here!! RACSignal *title = [fetchData flattenMap:^RACSignal *(NSDictionary *value) { if ([value[@"title"] isKindOfClass:[NSString class]]) { return [RACSignal return:value[@"title"]]; } else { return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]]; } }]; RACSignal *desc = [fetchData flattenMap:^RACSignal *(NSDictionary *value) { if ([value[@"desc"] isKindOfClass:[NSString class]]) { return [RACSignal return:value[@"desc"]]; } else { return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]]; } }]; RACSignal *renderedDesc = [desc flattenMap:^RACStream *(NSString *value) { NSError *error = nil; RenderManager *renderManager = [[RenderManager alloc] init]; NSAttributedString *rendered = [renderManager renderText:value error:&error]; if (error) { return [RACSignal error:error]; } else { return [RACSignal return:rendered]; } }]; RAC(self.someLablel, text) = [[title catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."]; RAC(self.originTextView, text) = [[desc catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."]; RAC(self.renderedTextView, attributedText) = [[renderedDesc catchTo:[RACSignal return:[[NSAttributedString alloc] initWithString:@"Error"]]] startWith:[[NSAttributedString alloc] initWithString:@"Loading..."]]; [[RACSignal merge:@[title, desc, renderedDesc]] subscribeError:^(NSError *error) { UIAlertView *alertView = [[UIAlertView alloc] initWithTitle:@"Error" message:error.domain delegate:nil cancelButtonTitle:@"OK" otherButtonTitles:nil]; [alertView show]; }];
