forked from peter-iakovlev/Signals
-
Notifications
You must be signed in to change notification settings - Fork 0
/
SSignal+Timing.m
109 lines (91 loc) · 3.01 KB
/
SSignal+Timing.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#import "SSignal+Timing.h"
#import "SMetaDisposable.h"
#import "SDisposableSet.h"
#import "SBlockDisposable.h"
#import "SSignal+Dispatch.h"
#import "STimer.h"
@implementation SSignal (Timing)
- (SSignal *)delay:(NSTimeInterval)seconds onQueue:(SQueue *)queue
{
return [[SSignal alloc] initWithGenerator:^id<SDisposable> (SSubscriber *subscriber)
{
SMetaDisposable *disposable = [[SMetaDisposable alloc] init];
STimer *timer = [[STimer alloc] initWithTimeout:seconds repeat:false completion:^
{
[disposable setDisposable:[self startWithNext:^(id next)
{
[subscriber putNext:next];
} error:^(id error)
{
[subscriber putError:error];
} completed:^
{
[subscriber putCompletion];
}]];
} queue:queue];
[timer start];
[disposable setDisposable:[[SBlockDisposable alloc] initWithBlock:^
{
[timer invalidate];
}]];
return disposable;
}];
}
- (SSignal *)timeout:(NSTimeInterval)seconds onQueue:(SQueue *)queue orSignal:(SSignal *)signal
{
return [[SSignal alloc] initWithGenerator:^id<SDisposable> (SSubscriber *subscriber)
{
SMetaDisposable *disposable = [[SMetaDisposable alloc] init];
STimer *timer = [[STimer alloc] initWithTimeout:seconds repeat:false completion:^
{
[disposable setDisposable:[signal startWithNext:^(id next)
{
[subscriber putNext:next];
} error:^(id error)
{
[subscriber putError:error];
} completed:^
{
[subscriber putCompletion];
}]];
} queue:queue];
[timer start];
[disposable setDisposable:[self startWithNext:^(id next)
{
[timer invalidate];
[subscriber putNext:next];
} error:^(id error)
{
[timer invalidate];
[subscriber putError:error];
} completed:^
{
[timer invalidate];
[subscriber putCompletion];
}]];
return disposable;
}];
}
- (SSignal *)wait:(NSTimeInterval)seconds
{
return [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
id<SDisposable> disposable = [self startWithNext:^(id next)
{
dispatch_semaphore_signal(semaphore);
[subscriber putNext:next];
} error:^(id error)
{
dispatch_semaphore_signal(semaphore);
[subscriber putError:error];
} completed:^
{
dispatch_semaphore_signal(semaphore);
[subscriber putCompletion];
}];
dispatch_semaphore_wait(semaphore, dispatch_time(DISPATCH_TIME_NOW, (int64_t)(seconds * NSEC_PER_SEC)));
return disposable;
}];
}
@end