-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support swiftnio #9
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
{ | ||
"pins" : [ | ||
{ | ||
"identity" : "swift-atomics", | ||
"kind" : "remoteSourceControl", | ||
"location" : "https://github.com/apple/swift-atomics.git", | ||
"state" : { | ||
"revision" : "cd142fd2f64be2100422d658e7411e39489da985", | ||
"version" : "1.2.0" | ||
} | ||
}, | ||
{ | ||
"identity" : "swift-collections", | ||
"kind" : "remoteSourceControl", | ||
"location" : "https://github.com/apple/swift-collections.git", | ||
"state" : { | ||
"revision" : "a902f1823a7ff3c9ab2fba0f992396b948eda307", | ||
"version" : "1.0.5" | ||
} | ||
}, | ||
{ | ||
"identity" : "swift-nio", | ||
"kind" : "remoteSourceControl", | ||
"location" : "https://github.com/apple/swift-nio", | ||
"state" : { | ||
"revision" : "702cd7c56d5d44eeba73fdf83918339b26dc855c", | ||
"version" : "2.62.0" | ||
} | ||
} | ||
], | ||
"version" : 2 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
import Foundation | ||
import NIO | ||
import os | ||
|
||
protocol NetworkConnectable { | ||
func connect(host: String, port: Int) async throws | ||
func sendData(_ data: Data) async throws | ||
func receiveData() async throws -> Data | ||
func close() async throws | ||
} | ||
|
||
final class RTMPClientHandler: ChannelInboundHandler { | ||
typealias InboundIn = ByteBuffer | ||
private let responseCallback: (Data) -> Void | ||
|
||
init(responseCallback: @escaping (Data) -> Void) { | ||
self.responseCallback = responseCallback | ||
} | ||
|
||
func channelRead(context: ChannelHandlerContext, data: NIOAny) { | ||
var buffer = self.unwrapInboundIn(data) | ||
|
||
var data = Data() | ||
buffer.readWithUnsafeReadableBytes { ptr in | ||
data.append(ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), count: ptr.count) | ||
return ptr.count | ||
} | ||
|
||
guard !data.isEmpty else { return } | ||
responseCallback(data) | ||
} | ||
|
||
func errorCaught(context: ChannelHandlerContext, error: Error) { | ||
print("error: ", error) | ||
context.close(promise: nil) | ||
} | ||
} | ||
|
||
class NetworkClient: NetworkConnectable { | ||
private let group: EventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) | ||
private var channel: Channel? | ||
private var host: String? | ||
private var port: Int? | ||
|
||
private var cachedReceivedData: Data = .init() | ||
private var dataPromise: EventLoopPromise<Data>? | ||
|
||
private let logger = Logger(subsystem: "HPRTMP", category: "NetworkClient") | ||
|
||
|
||
init() { | ||
} | ||
|
||
deinit { | ||
let group = group | ||
Task { | ||
try? await group.shutdownGracefully() | ||
} | ||
} | ||
|
||
func connect(host: String, port: Int) async throws { | ||
self.host = host | ||
self.port = port | ||
|
||
let bootstrap = ClientBootstrap(group: group) | ||
.channelInitializer { channel in | ||
channel.pipeline.addHandlers([ | ||
RTMPClientHandler(responseCallback: self.responseReceived) | ||
]) | ||
} | ||
|
||
do { | ||
self.channel = try await bootstrap.connect(host: host, port: Int(port)).get() | ||
logger.info("[HPRTMP] Connected to \(host):\(port)") | ||
} catch { | ||
logger.error("[HPRTMP] Failed to connect: \(error)") | ||
throw error | ||
} | ||
} | ||
|
||
func sendData(_ data: Data) async throws { | ||
guard let channel = self.channel else { | ||
throw NSError(domain: "RTMPClientError", code: -1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"]) | ||
} | ||
|
||
let buffer = channel.allocator.buffer(bytes: data) | ||
try await channel.writeAndFlush(buffer) | ||
} | ||
|
||
func receiveData() async throws -> Data { | ||
guard let channel = self.channel else { | ||
throw NSError(domain: "RTMPClientError", code: -1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"]) | ||
} | ||
if !cachedReceivedData.isEmpty { | ||
let data = cachedReceivedData | ||
cachedReceivedData = Data() | ||
return data | ||
} | ||
|
||
self.dataPromise = channel.eventLoop.makePromise(of: Data.self) | ||
return try await dataPromise!.futureResult.get() | ||
} | ||
|
||
private func responseReceived(data: Data) { | ||
cachedReceivedData.append(data) | ||
if let dataPromise { | ||
dataPromise.succeed(cachedReceivedData) | ||
cachedReceivedData = Data() | ||
self.dataPromise = nil | ||
} | ||
} | ||
|
||
func close() async throws { | ||
dataPromise?.fail(NSError(domain: "RTMPClientError", code: -2, userInfo: [NSLocalizedDescriptionKey: "Connection invalidated"])) | ||
let channel = self.channel | ||
dataPromise = nil | ||
self.channel = nil | ||
try await channel?.close() | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code Review:
Overall, the code seems well-structured for networking tasks using NIO. Addressing the highlighted issues and implementing the improvement suggestions can enhance the robustness and maintainability of the codebase. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,6 @@ | |
// | ||
|
||
import Foundation | ||
import Network | ||
import os | ||
|
||
public enum RTMPStatus { | ||
|
@@ -67,7 +66,7 @@ protocol RTMPSocketDelegate: Actor { | |
|
||
public actor RTMPSocket { | ||
|
||
private var connection: NWConnection? | ||
private let connection: NetworkConnectable = NetworkClient() | ||
|
||
private var status: RTMPStatus = .none | ||
|
||
|
@@ -127,36 +126,20 @@ extension RTMPSocket { | |
public func resume() async { | ||
guard status != .connected else { return } | ||
guard let urlInfo else { return } | ||
|
||
let port = NWEndpoint.Port(rawValue: UInt16(urlInfo.port)) | ||
let host = NWEndpoint.Host(urlInfo.host) | ||
let connection = NWConnection(host: host, port: port ?? 1935, using: .tcp) | ||
self.connection = connection | ||
connection.stateUpdateHandler = { [weak self]newState in | ||
guard let self else { return } | ||
do { | ||
try await connection.connect(host: urlInfo.host, port: urlInfo.port) | ||
status = .open | ||
Task { | ||
switch newState { | ||
case .ready: | ||
self.logger.info("connection state: ready") | ||
guard await self.status == .open else { return } | ||
await self.startShakeHands() | ||
case .failed(let error): | ||
self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)") | ||
await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription)) | ||
await self.invalidate() | ||
default: | ||
self.logger.info("connection state: other") | ||
break | ||
} | ||
await self.startShakeHands() | ||
} | ||
} catch { | ||
self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)") | ||
await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription)) | ||
await self.invalidate() | ||
} | ||
NWConnection.maxReadSize = Int((await windowControl.windowSize)) | ||
status = .open | ||
connection.start(queue: DispatchQueue.global(qos: .default)) | ||
} | ||
|
||
private func startShakeHands() async { | ||
guard let connection = connection else { return } | ||
self.handshake = RTMPHandshake(dataSender: connection.sendData(_:), dataReceiver: receiveData) | ||
await self.handshake?.setDelegate(delegate: self) | ||
do { | ||
|
@@ -173,8 +156,7 @@ extension RTMPSocket { | |
} | ||
await handshake?.reset() | ||
await decoder.reset() | ||
connection?.cancel() | ||
connection = nil | ||
try? await connection.close() | ||
urlInfo = nil | ||
status = .closed | ||
await delegate?.socketDisconnected(self) | ||
|
@@ -262,15 +244,14 @@ extension RTMPSocket { | |
|
||
extension RTMPSocket { | ||
private func sendData(_ data: Data) async throws { | ||
try await connection?.sendData(data) | ||
try await connection.sendData(data) | ||
await windowControl.addOutBytesCount(UInt32(data.count)) | ||
} | ||
func send(message: RTMPMessage, firstType: Bool) async { | ||
await messagePriorityQueue.enqueue(message, firstType: firstType) | ||
} | ||
|
||
private func receiveData() async throws -> Data { | ||
guard let connection = self.connection else { return Data() } | ||
return try await connection.receiveData() | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Code Review Summary:Bug Risks:
Improvement Suggestions:
Detailed Changes:
By addressing these areas, you can enhance the reliability, readability, and maintainability of the codebase. |
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review:
Bug Risks:
dataPromise
is nil when attempting to unwrap it inreceiveData()
.dataPromise
in theinit()
method ofNetworkClient
.Improvement Suggestions:
Error Handling:
Documentation:
Testing:
Input Validation:
Dependency Injection:
Resource Cleanup:
Logging:
Error Handling Consistency:
Code Readability:
Overall, the code demonstrates a basic structure for an RTMP client. Addressing the noted issues can enhance its reliability, maintainability, and robustness.