diff --git a/Package.resolved b/Package.resolved new file mode 100644 index 0000000..85a7d8c --- /dev/null +++ b/Package.resolved @@ -0,0 +1,32 @@ +{ + "pins" : [ + { + "identity" : "swift-atomics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-atomics.git", + "state" : { + "revision" : "cd142fd2f64be2100422d658e7411e39489da985", + "version" : "1.2.0" + } + }, + { + "identity" : "swift-collections", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-collections.git", + "state" : { + "revision" : "a902f1823a7ff3c9ab2fba0f992396b948eda307", + "version" : "1.0.5" + } + }, + { + "identity" : "swift-nio", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio", + "state" : { + "revision" : "702cd7c56d5d44eeba73fdf83918339b26dc855c", + "version" : "2.62.0" + } + } + ], + "version" : 2 +} diff --git a/Package.swift b/Package.swift index 72748a4..1ce242a 100644 --- a/Package.swift +++ b/Package.swift @@ -5,7 +5,7 @@ import PackageDescription let package = Package( name: "HPRTMP", - platforms: [.iOS(.v14),.macOS(.v11)], + platforms: [.iOS(.v14), .macOS(.v11)], products: [ .library( name: "HPRTMP", @@ -13,15 +13,14 @@ let package = Package( ), ], dependencies: [ - // Dependencies declare other packages that this package depends on. - // .package(url: /* package url */, from: "1.0.0"), + .package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), ], targets: [ - // Targets are the basic building blocks of a package. A target can define a module or a test suite. - // Targets can depend on other targets in this package, and on products in packages this package depends on. .target( name: "HPRTMP", - dependencies: []), + dependencies: [ + .product(name: "NIO", package: "swift-nio") + ]), .testTarget( name: "HPRTMPTests", dependencies: ["HPRTMP"]), diff --git a/Sources/HPRTMP/Network/NetworkClient.swift b/Sources/HPRTMP/Network/NetworkClient.swift new file mode 100644 index 0000000..9b9fef3 --- /dev/null +++ b/Sources/HPRTMP/Network/NetworkClient.swift @@ -0,0 +1,120 @@ +import Foundation +import NIO +import os + +protocol NetworkConnectable { + func connect(host: String, port: Int) async throws + func sendData(_ data: Data) async throws + func receiveData() async throws -> Data + func close() async throws +} + +final class RTMPClientHandler: ChannelInboundHandler { + typealias InboundIn = ByteBuffer + private let responseCallback: (Data) -> Void + + init(responseCallback: @escaping (Data) -> Void) { + self.responseCallback = responseCallback + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + var buffer = self.unwrapInboundIn(data) + + var data = Data() + buffer.readWithUnsafeReadableBytes { ptr in + data.append(ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), count: ptr.count) + return ptr.count + } + + guard !data.isEmpty else { return } + responseCallback(data) + } + + func errorCaught(context: ChannelHandlerContext, error: Error) { + print("error: ", error) + context.close(promise: nil) + } +} + +class NetworkClient: NetworkConnectable { + private let group: EventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + private var channel: Channel? + private var host: String? + private var port: Int? + + private var cachedReceivedData: Data = .init() + private var dataPromise: EventLoopPromise? + + private let logger = Logger(subsystem: "HPRTMP", category: "NetworkClient") + + + init() { + } + + deinit { + let group = group + Task { + try? await group.shutdownGracefully() + } + } + + func connect(host: String, port: Int) async throws { + self.host = host + self.port = port + + let bootstrap = ClientBootstrap(group: group) + .channelInitializer { channel in + channel.pipeline.addHandlers([ + RTMPClientHandler(responseCallback: self.responseReceived) + ]) + } + + do { + self.channel = try await bootstrap.connect(host: host, port: Int(port)).get() + logger.info("[HPRTMP] Connected to \(host):\(port)") + } catch { + logger.error("[HPRTMP] Failed to connect: \(error)") + throw error + } + } + + func sendData(_ data: Data) async throws { + guard let channel = self.channel else { + throw NSError(domain: "RTMPClientError", code: -1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"]) + } + + let buffer = channel.allocator.buffer(bytes: data) + try await channel.writeAndFlush(buffer) + } + + func receiveData() async throws -> Data { + guard let channel = self.channel else { + throw NSError(domain: "RTMPClientError", code: -1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"]) + } + if !cachedReceivedData.isEmpty { + let data = cachedReceivedData + cachedReceivedData = Data() + return data + } + + self.dataPromise = channel.eventLoop.makePromise(of: Data.self) + return try await dataPromise!.futureResult.get() + } + + private func responseReceived(data: Data) { + cachedReceivedData.append(data) + if let dataPromise { + dataPromise.succeed(cachedReceivedData) + cachedReceivedData = Data() + self.dataPromise = nil + } + } + + func close() async throws { + dataPromise?.fail(NSError(domain: "RTMPClientError", code: -2, userInfo: [NSLocalizedDescriptionKey: "Connection invalidated"])) + let channel = self.channel + dataPromise = nil + self.channel = nil + try await channel?.close() + } +} diff --git a/Sources/HPRTMP/RTMPSocket.swift b/Sources/HPRTMP/RTMPSocket.swift index 9b57947..8e3c93b 100644 --- a/Sources/HPRTMP/RTMPSocket.swift +++ b/Sources/HPRTMP/RTMPSocket.swift @@ -6,7 +6,6 @@ // import Foundation -import Network import os public enum RTMPStatus { @@ -67,7 +66,7 @@ protocol RTMPSocketDelegate: Actor { public actor RTMPSocket { - private var connection: NWConnection? + private let connection: NetworkConnectable = NetworkClient() private var status: RTMPStatus = .none @@ -127,36 +126,20 @@ extension RTMPSocket { public func resume() async { guard status != .connected else { return } guard let urlInfo else { return } - - let port = NWEndpoint.Port(rawValue: UInt16(urlInfo.port)) - let host = NWEndpoint.Host(urlInfo.host) - let connection = NWConnection(host: host, port: port ?? 1935, using: .tcp) - self.connection = connection - connection.stateUpdateHandler = { [weak self]newState in - guard let self else { return } + do { + try await connection.connect(host: urlInfo.host, port: urlInfo.port) + status = .open Task { - switch newState { - case .ready: - self.logger.info("connection state: ready") - guard await self.status == .open else { return } - await self.startShakeHands() - case .failed(let error): - self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)") - await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription)) - await self.invalidate() - default: - self.logger.info("connection state: other") - break - } + await self.startShakeHands() } + } catch { + self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)") + await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription)) + await self.invalidate() } - NWConnection.maxReadSize = Int((await windowControl.windowSize)) - status = .open - connection.start(queue: DispatchQueue.global(qos: .default)) } private func startShakeHands() async { - guard let connection = connection else { return } self.handshake = RTMPHandshake(dataSender: connection.sendData(_:), dataReceiver: receiveData) await self.handshake?.setDelegate(delegate: self) do { @@ -173,8 +156,7 @@ extension RTMPSocket { } await handshake?.reset() await decoder.reset() - connection?.cancel() - connection = nil + try? await connection.close() urlInfo = nil status = .closed await delegate?.socketDisconnected(self) @@ -262,7 +244,7 @@ extension RTMPSocket { extension RTMPSocket { private func sendData(_ data: Data) async throws { - try await connection?.sendData(data) + try await connection.sendData(data) await windowControl.addOutBytesCount(UInt32(data.count)) } func send(message: RTMPMessage, firstType: Bool) async { @@ -270,7 +252,6 @@ extension RTMPSocket { } private func receiveData() async throws -> Data { - guard let connection = self.connection else { return Data() } return try await connection.receiveData() } }