Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue based stream #25

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion JFRWebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
Set your own custom queue.
Default setting is dispatch_get_main_queue.
*/
@property(nonatomic, strong)dispatch_queue_t queue;
@property(nonatomic)dispatch_queue_t queue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rename this to something more explicit like "delegateQueue"? (see suggestion on callbackQueue as well). I found it to be confusing.



@end
100 changes: 59 additions & 41 deletions JFRWebSocket.m
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ @interface JFRWebSocket ()<NSStreamDelegate>
@property(nonatomic, strong)NSInputStream *inputStream;
@property(nonatomic, strong)NSOutputStream *outputStream;
@property(nonatomic, strong)NSOperationQueue *writeQueue;
@property(nonatomic, assign)BOOL isRunLoop;
@property(nonatomic, strong)NSMutableArray *readStack;
@property(nonatomic, strong)NSMutableArray *inputQueue;
@property(nonatomic, strong)NSData *fragBuffer;
@property(nonatomic, strong)NSMutableDictionary *headers;
@property(nonatomic, strong)NSArray *optProtocols;
@property(nonatomic, assign)BOOL isCreated;
@property(nonatomic)dispatch_queue_t callbackQueue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you name this something less likely to be confused with the the delegate callback queue? Perhaps "networkQueue" or "workQueue"?


@end

Expand Down Expand Up @@ -100,6 +100,9 @@ - (instancetype)initWithURL:(NSURL *)url protocols:(NSArray*)protocols
self.readStack = [NSMutableArray new];
self.inputQueue = [NSMutableArray new];
self.optProtocols = protocols;
self.callbackQueue = dispatch_queue_create("com.vluxe.jetfire.socket", DISPATCH_QUEUE_SERIAL);
self.writeQueue = [[NSOperationQueue alloc] init];
self.writeQueue.maxConcurrentOperationCount = 1;
}

return self;
Expand All @@ -112,12 +115,9 @@ - (void)connect
return;
}

//everything is on a background thread.
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
self.isCreated = YES;
[self createHTTPRequest];
self.isCreated = NO;
});
self.isCreated = YES;
[self createHTTPRequest];
self.isCreated = NO;
}
/////////////////////////////////////////////////////////////////////////////
- (void)disconnect
Expand Down Expand Up @@ -195,10 +195,11 @@ - (void)createHTTPRequest
(__bridge CFStringRef)headerWSHostName,
(__bridge CFStringRef)[NSString stringWithFormat:@"%@:%@",self.url.host,port]);

for(NSString *key in self.headers) {
NSDictionary *heads = [self.headers copy];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method was moved unto the client thread, the copy is a bit pedantic.

I had suggested adding copy to the addHeader: method itself, which would make self.headers immutable and totally safe under any situation.

for(NSString *key in heads) {
CFHTTPMessageSetHeaderFieldValue(urlRequest,
(__bridge CFStringRef)key,
(__bridge CFStringRef)self.headers[key]);
(__bridge CFStringRef)heads[key]);
}

NSData *serializedRequest = (__bridge_transfer NSData *)(CFHTTPMessageCopySerializedMessage(urlRequest));
Expand Down Expand Up @@ -245,19 +246,18 @@ - (void)initStreamsWithData:(NSData *)data port:(NSNumber*)port
[self.inputStream setProperty:settings forKey:key];
[self.outputStream setProperty:settings forKey:key];
}
[self.inputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.outputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
CFReadStreamSetDispatchQueue(readStream, self.callbackQueue);
CFWriteStreamSetDispatchQueue(writeStream, self.callbackQueue);
[self.inputStream open];
[self.outputStream open];
NSInteger len = [self.outputStream write:[data bytes] maxLength:[data length]];
if(len < 0 || len == NSNotFound) {
[self doWriteError];
return;
}
self.isRunLoop = YES;
while (self.isRunLoop) {
[[NSRunLoop currentRunLoop] runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]];
}
[self.writeQueue addOperationWithBlock:^{
NSInteger len = [self.outputStream write:[data bytes] maxLength:[data length]];
if(len < 0 || len == NSNotFound) {
dispatch_async(self.callbackQueue, ^{
[self doWriteError];
});
}
}];
}
/////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -297,14 +297,11 @@ - (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode
/////////////////////////////////////////////////////////////////////////////
-(void)disconnectStream:(NSError*)error
{
[self.writeQueue waitUntilAllOperationsAreFinished];
[self.inputStream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.outputStream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
[self.writeQueue cancelAllOperations];
[self.outputStream close];
[self.inputStream close];
self.outputStream = nil;
self.inputStream = nil;
self.isRunLoop = NO;
_isConnected = NO;

if([self.delegate respondsToSelector:@selector(websocketDidDisconnect:error:)]) {
Expand All @@ -327,8 +324,11 @@ - (void)processInputStream
if(!self.isConnected) {
_isConnected = [self processHTTP:buffer length:length];
if(!self.isConnected) {
if([self.delegate respondsToSelector:@selector(websocketDidDisconnect:error:)])
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"Invalid HTTP upgrade" code:1]];
if([self.delegate respondsToSelector:@selector(websocketDidDisconnect:error:)]) {
dispatch_async(self.queue,^{
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"Invalid HTTP upgrade" code:1]];
});
}
}
} else {
BOOL process = NO;
Expand Down Expand Up @@ -450,23 +450,32 @@ -(void)processRawMessage:(uint8_t*)buffer length:(NSInteger)bufferLen
uint8_t payloadLen = (JFRPayloadLenMask & buffer[1]);
NSInteger offset = 2; //how many bytes do we need to skip for the header
if((isMasked || (JFRRSVMask & buffer[0])) && receivedOpcode != JFROpCodePong) {

if([self.delegate respondsToSelector:@selector(websocketDidDisconnect:error:)]) {
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"masked and rsv data is not currently supported" code:JFRCloseCodeProtocolError]];
dispatch_async(self.queue,^{
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"masked and rsv data is not currently supported" code:JFRCloseCodeProtocolError]];
});
}
[self writeError:JFRCloseCodeProtocolError];
return;
}
BOOL isControlFrame = (receivedOpcode == JFROpCodeConnectionClose || receivedOpcode == JFROpCodePing); //|| receivedOpcode == JFROpCodePong
if(!isControlFrame && (receivedOpcode != JFROpCodeBinaryFrame && receivedOpcode != JFROpCodeContinueFrame && receivedOpcode != JFROpCodeTextFrame && receivedOpcode != JFROpCodePong)) {

if([self.delegate respondsToSelector:@selector(websocketDidDisconnect:error:)]) {
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:[NSString stringWithFormat:@"unknown opcode: 0x%x",receivedOpcode] code:JFRCloseCodeProtocolError]];
dispatch_async(self.queue,^{
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:[NSString stringWithFormat:@"unknown opcode: 0x%x",receivedOpcode] code:JFRCloseCodeProtocolError]];
});
}
[self writeError:JFRCloseCodeProtocolError];
return;
}
if(isControlFrame && !isFin) {

if([self.delegate respondsToSelector:@selector(websocketDidDisconnect:error:)]) {
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"control frames can't be fragmented" code:JFRCloseCodeProtocolError]];
dispatch_async(self.queue,^{
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"control frames can't be fragmented" code:JFRCloseCodeProtocolError]];
});
}
[self writeError:JFRCloseCodeProtocolError];
return;
Expand Down Expand Up @@ -494,7 +503,9 @@ -(void)processRawMessage:(uint8_t*)buffer length:(NSInteger)bufferLen
}
[self writeError:code];
if([self.delegate respondsToSelector:@selector(websocketDidDisconnect:error:)]) {
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"continue frame before a binary or text frame" code:code]];
dispatch_async(self.queue,^{
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"continue frame before a binary or text frame" code:code]];
});
}
return;
}
Expand Down Expand Up @@ -534,17 +545,23 @@ -(void)processRawMessage:(uint8_t*)buffer length:(NSInteger)bufferLen
response = nil; //don't append pings
}
if(!isFin && receivedOpcode == JFROpCodeContinueFrame && !response) {

if([self.delegate respondsToSelector:@selector(websocketDidDisconnect:error:)]) {
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"continue frame before a binary or text frame" code:JFRCloseCodeProtocolError]];
dispatch_async(self.queue,^{
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"continue frame before a binary or text frame" code:JFRCloseCodeProtocolError]];
});
}
[self writeError:JFRCloseCodeProtocolError];
return;
}
BOOL isNew = NO;
if(!response) {

if(receivedOpcode == JFROpCodeContinueFrame) {
if([self.delegate respondsToSelector:@selector(websocketDidDisconnect:error:)]) {
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"first frame can't be a continue frame" code:JFRCloseCodeProtocolError]];
dispatch_async(self.queue,^{
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"first frame can't be a continue frame" code:JFRCloseCodeProtocolError]];
});
}
[self writeError:JFRCloseCodeProtocolError];
return;
Expand All @@ -555,11 +572,15 @@ -(void)processRawMessage:(uint8_t*)buffer length:(NSInteger)bufferLen
response.bytesLeft = dataLength;
response.buffer = [NSMutableData dataWithData:data];
} else {

if(receivedOpcode == JFROpCodeContinueFrame) {
response.bytesLeft = dataLength;
} else {

if([self.delegate respondsToSelector:@selector(websocketDidDisconnect:error:)]) {
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"second and beyond of fragment message must be a continue frame" code:JFRCloseCodeProtocolError]];
dispatch_async(self.queue,^{
[self.delegate websocketDidDisconnect:self error:[self errorWithDetail:@"second and beyond of fragment message must be a continue frame" code:JFRCloseCodeProtocolError]];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 8 lines of disconnect logic occurs 8 times – begging to be abstracted into to a method. Could do it in a separate commit though.

});
}
[self writeError:JFRCloseCodeProtocolError];
return;
Expand Down Expand Up @@ -631,15 +652,12 @@ -(BOOL)processCloseCode:(uint16_t)code
/////////////////////////////////////////////////////////////////////////////
-(void)dequeueWrite:(NSData*)data withCode:(JFROpCode)code
{
if(!self.writeQueue) {
self.writeQueue = [[NSOperationQueue alloc] init];
self.writeQueue.maxConcurrentOperationCount = 1;
}
//we have a queue so we can be thread safe.
__weak JFRWebSocket *weakSelf = self;
[self.writeQueue addOperationWithBlock:^{
//stream isn't ready, let's wait
int tries = 0;
while(!self.outputStream || !self.isConnected) {
while(!weakSelf.outputStream || !weakSelf.isConnected) {
if(tries < 5) {
sleep(1);
} else {
Expand Down Expand Up @@ -683,12 +701,12 @@ -(void)dequeueWrite:(NSData*)data withCode:(JFROpCode)code
}
uint64_t total = 0;
while (true) {
if(!self.outputStream) {
if(!weakSelf.outputStream) {
break;
}
NSInteger len = [self.outputStream write:([frame bytes]+total) maxLength:(NSInteger)(offset-total)];
NSInteger len = [weakSelf.outputStream write:([frame bytes]+total) maxLength:(NSInteger)(offset-total)];
if(len < 0 || len == NSNotFound) {
[self doWriteError];
[weakSelf doWriteError];
break;
} else {
total += len;
Expand Down
9 changes: 7 additions & 2 deletions SimpleTest/SimpleTest/ViewController.m
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ -(void)websocketDidConnect:(JFRWebSocket*)socket {

-(void)websocketDidDisconnect:(JFRWebSocket*)socket error:(NSError*)error {
NSLog(@"websocket is disconnected: %@", [error localizedDescription]);
[self.socket connect];
}

-(void)websocket:(JFRWebSocket*)socket didReceiveMessage:(NSString*)string {
Expand All @@ -50,7 +49,13 @@ - (IBAction)writeText:(UIBarButtonItem *)sender {
}

- (IBAction)disconnect:(UIBarButtonItem *)sender {
[self.socket disconnect];
if(self.socket.isConnected) {
sender.title = @"connect";
[self.socket disconnect];
} else {
sender.title = @"disconnect";
[self.socket connect];
}
}

@end