Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocking connect and move handling connection to dedicated NSThread #67

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Info.plist
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<key>CFBundlePackageType</key>
<string>FMWK</string>
<key>CFBundleShortVersionString</key>
<string>0.1.5</string>
<string>0.1.9</string>
<key>CFBundleVersion</key>
<string>$(CURRENT_PROJECT_VERSION)</string>
<key>NSPrincipalClass</key>
Expand Down
26 changes: 23 additions & 3 deletions JFRWebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,37 @@
@property(nonatomic, readonly, nonnull) NSURL *url;

/**
constructor to create a new websocket.
constructor to create a new websocket with QOS_CLASS_UTILITY dispatch queue
@param url the host you want to connect to.
@param protocols the websocket protocols you want to use (e.g. chat,superchat).
@return a newly initalized websocket.
*/
- (nonnull instancetype)initWithURL:(nonnull NSURL *)url protocols:(nullable NSArray*)protocols;

/**
connect to the host.
constructor to create a new websocket
@param url the host you want to connect to.
@param protocols the websocket protocols you want to use (e.g. chat,superchat).
@param callbackQueue the dispatch queue for handling callbacks
@return a newly initalized websocket.
*/
- (nonnull instancetype)initWithURLAndQueue:(nonnull NSURL *)url protocols:(nonnull NSArray*)protocols callbackQueue:(nonnull dispatch_queue_t)callbackQueue;

/**
constructor to create a new websocket
@param url the host you want to connect to.
@param protocols the websocket protocols you want to use (e.g. chat,superchat).
@param callbackQueue the dispatch queue for handling callbacks
@param connectTimeout timeout for blocking connect
@return a newly initalized websocket.
*/
- (nonnull instancetype)initWithURL:(NSURL *)url protocols:(NSArray*)protocols callbackQueue:(dispatch_queue_t)callbackQueue connectTimeout:(NSTimeInterval)connectTimeout;

/**
connect to the host - blocking
@return YES if successfully connected
*/
- (void)connect;
- (BOOL)connect;

/**
disconnect to the host. This sends the close Connection opcode to terminate cleanly.
Expand Down
243 changes: 152 additions & 91 deletions JFRWebSocket.m
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ typedef NS_ENUM(NSUInteger, JFRCloseCode) {

typedef NS_ENUM(NSUInteger, JFRInternalErrorCode) {
// 0-999 WebSocket status codes not used
JFROutputStreamWriteError = 1
JFROutputStreamWriteError = 1,
JFRConnectTimeout = 2
};

#define kJFRInternalHTTPStatusWebSocket 101
Expand All @@ -70,7 +71,6 @@ @interface JFRWebSocket ()<NSStreamDelegate>
@property(nonatomic, strong, null_unspecified)NSInputStream *inputStream;
@property(nonatomic, strong, null_unspecified)NSOutputStream *outputStream;
@property(nonatomic, strong, null_unspecified)NSOperationQueue *writeQueue;
@property(nonatomic, assign)BOOL isRunLoop;
@property(nonatomic, strong, nonnull)NSMutableArray *readStack;
@property(nonatomic, strong, nonnull)NSMutableArray *inputQueue;
@property(nonatomic, strong, nullable)NSData *fragBuffer;
Expand All @@ -79,6 +79,10 @@ @interface JFRWebSocket ()<NSStreamDelegate>
@property(nonatomic, assign)BOOL isCreated;
@property(nonatomic, assign)BOOL didDisconnect;
@property(nonatomic, assign)BOOL certValidated;
@property(nonatomic, assign)NSTimeInterval connectTimeout;

@property (strong) NSThread *wsThread;


@end

Expand Down Expand Up @@ -109,54 +113,68 @@ @interface JFRWebSocket ()<NSStreamDelegate>
static const uint8_t JFRPayloadLenMask = 0x7F;
static const size_t JFRMaxFrameSize = 32;


static const NSTimeInterval JFRDefaultConnectTimeout = 10.0;

@implementation JFRWebSocket


/////////////////////////////////////////////////////////////////////////////
//Default initializer
- (instancetype)initWithURL:(NSURL *)url protocols:(NSArray*)protocols
{
return [self initWithURL:url protocols:protocols callbackQueue:dispatch_get_global_queue(QOS_CLASS_UTILITY, 0)];
}

/////////////////////////////////////////////////////////////////////////////
//Initialized with custom dispatch queue
- (instancetype)initWithURL:(NSURL *)url protocols:(NSArray*)protocols callbackQueue:(dispatch_queue_t)callbackQueue
{
return [self initWithURL:url protocols:protocols callbackQueue:callbackQueue connectTimeout:JFRDefaultConnectTimeout];
}

/////////////////////////////////////////////////////////////////////////////
//Initialized with custom dispatch queue and connection timeout
- (instancetype)initWithURL:(NSURL *)url protocols:(NSArray*)protocols callbackQueue:(dispatch_queue_t)callbackQueue connectTimeout:(NSTimeInterval)connectTimeout
{
if(self = [super init]) {
self.certValidated = NO;
self.voipEnabled = NO;
self.selfSignedSSL = NO;
self.queue = dispatch_get_main_queue();
self.queue = callbackQueue;
self.url = url;
self.readStack = [NSMutableArray new];
self.inputQueue = [NSMutableArray new];
self.optProtocols = protocols;
self.connectTimeout = connectTimeout;
}

return self;
}

/////////////////////////////////////////////////////////////////////////////
//Exposed method for connecting to URL provided in init method.
- (void)connect {
if(self.isCreated) {
return;
- (BOOL)connect {
@synchronized (self) {
if (self.wsThread != nil) {
return YES;
}
self.didDisconnect = NO;
return [self createHTTPRequest];
}

__weak typeof(self) weakSelf = self;
dispatch_async(self.queue, ^{
weakSelf.didDisconnect = NO;
});

//everything is on a background thread.
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
weakSelf.isCreated = YES;
[weakSelf createHTTPRequest];
weakSelf.isCreated = NO;
});
}
/////////////////////////////////////////////////////////////////////////////
- (void)disconnect {
[self writeError:JFRCloseCodeNormal];
}
/////////////////////////////////////////////////////////////////////////////
- (void)writeString:(NSString*)string {
if(string) {
[self dequeueWrite:[string dataUsingEncoding:NSUTF8StringEncoding]
withCode:JFROpCodeTextFrame];
@autoreleasepool {
if(string) {
[self dequeueWrite:[string dataUsingEncoding:NSUTF8StringEncoding]
withCode:JFROpCodeTextFrame];
}
}

}
/////////////////////////////////////////////////////////////////////////////
- (void)writePing:(NSData*)data {
Expand Down Expand Up @@ -198,7 +216,7 @@ - (NSString *)origin;


//Uses CoreFoundation to build a HTTP request to send over TCP stream.
- (void)createHTTPRequest {
- (BOOL)createHTTPRequest {
CFURLRef url = CFURLCreateWithString(kCFAllocatorDefault, (CFStringRef)self.url.absoluteString, NULL);
CFStringRef requestMethod = CFSTR("GET");
CFHTTPMessageRef urlRequest = CFHTTPMessageCreateRequest(kCFAllocatorDefault,
Expand Down Expand Up @@ -254,8 +272,8 @@ - (void)createHTTPRequest {
NSLog(@"urlRequest = \"%@\"", urlRequest);
#endif
NSData *serializedRequest = (__bridge_transfer NSData *)(CFHTTPMessageCopySerializedMessage(urlRequest));
[self initStreamsWithData:serializedRequest port:port];
CFRelease(urlRequest);
return [self initStreamsWithData:serializedRequest port:port];
}
/////////////////////////////////////////////////////////////////////////////
//Random String of 16 lowercase chars, SHA1 and base64 encoded.
Expand All @@ -267,47 +285,85 @@ - (NSString*)generateWebSocketKey {
}
return [[string dataUsingEncoding:NSUTF8StringEncoding] base64EncodedStringWithOptions:0];
}

/////////////////////////////////////////////////////////////////////////////
//Sets up our reader/writer for the TCP stream.
- (void)initStreamsWithData:(NSData*)data port:(NSNumber*)port {
CFReadStreamRef readStream = NULL;
CFWriteStreamRef writeStream = NULL;
CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)self.url.host, [port intValue], &readStream, &writeStream);

self.inputStream = (__bridge_transfer NSInputStream *)readStream;
self.inputStream.delegate = self;
self.outputStream = (__bridge_transfer NSOutputStream *)writeStream;
self.outputStream.delegate = self;
if([self.url.scheme isEqualToString:@"wss"] || [self.url.scheme isEqualToString:@"https"]) {
[self.inputStream setProperty:NSStreamSocketSecurityLevelNegotiatedSSL forKey:NSStreamSocketSecurityLevelKey];
[self.outputStream setProperty:NSStreamSocketSecurityLevelNegotiatedSSL forKey:NSStreamSocketSecurityLevelKey];
} else {
self.certValidated = YES; //not a https session, so no need to check SSL pinning
}
if(self.voipEnabled) {
[self.inputStream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
[self.outputStream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
}
if(self.selfSignedSSL) {
NSString *chain = (__bridge_transfer NSString *)kCFStreamSSLValidatesCertificateChain;
NSString *peerName = (__bridge_transfer NSString *)kCFStreamSSLValidatesCertificateChain;
NSString *key = (__bridge_transfer NSString *)kCFStreamPropertySSLSettings;
NSDictionary *settings = @{chain: [[NSNumber alloc] initWithBool:NO],
peerName: [NSNull null]};
[self.inputStream setProperty:settings forKey:key];
[self.outputStream setProperty:settings forKey:key];
}
self.isRunLoop = YES;
// socket i/o handler for seperate thread
-(void)handleStream:(id)object
{
[self.inputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.outputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.inputStream open];
[self.outputStream open];
size_t dataLen = [data length];
[self.outputStream write:[data bytes] maxLength:dataLen];
while (self.isRunLoop) {

while (![[NSThread currentThread] isCancelled]) {
[[NSRunLoop currentRunLoop] runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]];
}
}

/////////////////////////////////////////////////////////////////////////////
//Sets up our reader/writer for the TCP stream.
- (BOOL)initStreamsWithData:(NSData*)data port:(NSNumber*)port {
@autoreleasepool {
CFReadStreamRef readStream = NULL;
CFWriteStreamRef writeStream = NULL;
CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)self.url.host, [port intValue], &readStream, &writeStream);

self.inputStream = (__bridge_transfer NSInputStream *)readStream;
self.inputStream.delegate = self;
self.outputStream = (__bridge_transfer NSOutputStream *)writeStream;
self.outputStream.delegate = self;
if([self.url.scheme isEqualToString:@"wss"] || [self.url.scheme isEqualToString:@"https"]) {
[self.inputStream setProperty:NSStreamSocketSecurityLevelNegotiatedSSL forKey:NSStreamSocketSecurityLevelKey];
[self.outputStream setProperty:NSStreamSocketSecurityLevelNegotiatedSSL forKey:NSStreamSocketSecurityLevelKey];
} else {
self.certValidated = YES; //not a https session, so no need to check SSL pinning
}
if(self.voipEnabled) {
[self.inputStream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
[self.outputStream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
}
if(self.selfSignedSSL) {
NSString *chain = (__bridge_transfer NSString *)kCFStreamSSLValidatesCertificateChain;
NSString *peerName = (__bridge_transfer NSString *)kCFStreamSSLValidatesCertificateChain;
NSString *key = (__bridge_transfer NSString *)kCFStreamPropertySSLSettings;
NSDictionary *settings = @{chain: [[NSNumber alloc] initWithBool:NO],
peerName: [NSNull null]};
[self.inputStream setProperty:settings forKey:key];
[self.outputStream setProperty:settings forKey:key];
}

[self.inputStream open];
[self.outputStream open];
size_t dataLen = [data length];
[self.outputStream write:[data bytes] maxLength:dataLen];

[self.inputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.outputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
NSDate *timeoutDate = [[NSDate date] dateByAddingTimeInterval:self.connectTimeout];
while (!self.isConnected) {
//process initial connect request synchronously
if ((!self.isConnected && [[NSDate date] compare:timeoutDate] == NSOrderedDescending)) {
[self disconnectStream:[self errorWithDetail:@"Websocket connect timeout" code:JFRConnectTimeout]];
return NO;
}
[[NSRunLoop currentRunLoop] runMode:NSDefaultRunLoopMode beforeDate:[timeoutDate dateByAddingTimeInterval:1.0]]; // Add 1 sec to prevent race condition with beforeDate being in the past
}

//init worker thread
self.wsThread = [[NSThread alloc] initWithTarget:self selector:@selector(handleStream:) object:nil];
self.wsThread.name = @"jetfire.ws.worker";

[self.inputStream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.outputStream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];

if (self.wsThread != nil) {
// delegate work to worker thread
[self.wsThread start];
return YES;
}
return NO;
}
}


/////////////////////////////////////////////////////////////////////////////

#pragma mark - NSStreamDelegate
Expand Down Expand Up @@ -361,7 +417,10 @@ - (void)disconnectStream:(NSError*)error {
[self.inputStream close];
self.outputStream = nil;
self.inputStream = nil;
self.isRunLoop = NO;
if (self.wsThread != nil && !self.wsThread.isCancelled) {
[self.wsThread cancel];
}
self.wsThread = nil;
_isConnected = NO;
self.certValidated = NO;
[self doDisconnect:error];
Expand Down Expand Up @@ -441,7 +500,7 @@ - (BOOL)processHTTP:(uint8_t*)buffer length:(NSInteger)bufferLen responseStatusC
_isConnected = YES;
__weak typeof(self) weakSelf = self;
dispatch_async(self.queue,^{
if([self.delegate respondsToSelector:@selector(websocketDidConnect:)]) {
if([weakSelf.delegate respondsToSelector:@selector(websocketDidConnect:)]) {
[weakSelf.delegate websocketDidConnect:self];
}
if(weakSelf.onConnect) {
Expand Down Expand Up @@ -645,38 +704,40 @@ - (void)processExtra:(uint8_t*)buffer length:(NSInteger)bufferLen {
}
/////////////////////////////////////////////////////////////////////////////
- (BOOL)processResponse:(JFRResponse*)response {
if(response.isFin && response.bytesLeft <= 0) {
NSData *data = response.buffer;
if(response.code == JFROpCodePing) {
[self dequeueWrite:response.buffer withCode:JFROpCodePong];
} else if(response.code == JFROpCodeTextFrame) {
NSString *str = [[NSString alloc] initWithData:response.buffer encoding:NSUTF8StringEncoding];
if(!str) {
[self writeError:JFRCloseCodeEncoding];
return NO;
}
__weak typeof(self) weakSelf = self;
dispatch_async(self.queue,^{
if([weakSelf.delegate respondsToSelector:@selector(websocket:didReceiveMessage:)]) {
[weakSelf.delegate websocket:weakSelf didReceiveMessage:str];
}
if(weakSelf.onText) {
weakSelf.onText(str);
}
});
} else if(response.code == JFROpCodeBinaryFrame) {
__weak typeof(self) weakSelf = self;
dispatch_async(self.queue,^{
if([weakSelf.delegate respondsToSelector:@selector(websocket:didReceiveData:)]) {
[weakSelf.delegate websocket:weakSelf didReceiveData:data];
}
if(weakSelf.onData) {
weakSelf.onData(data);
@autoreleasepool {
if(response.isFin && response.bytesLeft <= 0) {
NSData *data = response.buffer;
if(response.code == JFROpCodePing) {
[self dequeueWrite:response.buffer withCode:JFROpCodePong];
} else if(response.code == JFROpCodeTextFrame) {
NSString *str = [[NSString alloc] initWithData:response.buffer encoding:NSUTF8StringEncoding];
if(!str) {
[self writeError:JFRCloseCodeEncoding];
return NO;
}
});
__weak typeof(self) weakSelf = self;
dispatch_async(self.queue,^{
if([weakSelf.delegate respondsToSelector:@selector(websocket:didReceiveMessage:)]) {
[weakSelf.delegate websocket:weakSelf didReceiveMessage:str];
}
if(weakSelf.onText) {
weakSelf.onText(str);
}
});
} else if(response.code == JFROpCodeBinaryFrame) {
__weak typeof(self) weakSelf = self;
dispatch_async(self.queue,^{
if([weakSelf.delegate respondsToSelector:@selector(websocket:didReceiveData:)]) {
[weakSelf.delegate websocket:weakSelf didReceiveData:data];
}
if(weakSelf.onData) {
weakSelf.onData(data);
}
});
}
[self.readStack removeLastObject];
return YES;
}
[self.readStack removeLastObject];
return YES;
}
return NO;
}
Expand Down