From 93f51c78d21b74ec790067501521ef2f055de10f Mon Sep 17 00:00:00 2001 From: Huiping Guo Date: Sun, 17 Dec 2023 23:43:25 +0900 Subject: [PATCH] add swiftnio implement --- Package.resolved | 32 ++++++ Package.swift | 11 +- Sources/HPRTMP/Network.swift | 43 ------- Sources/HPRTMP/Network/NetworkClient.swift | 128 +++++++++++++++++++++ Sources/HPRTMP/RTMPSocket.swift | 41 ++----- 5 files changed, 176 insertions(+), 79 deletions(-) create mode 100644 Package.resolved delete mode 100644 Sources/HPRTMP/Network.swift create mode 100644 Sources/HPRTMP/Network/NetworkClient.swift diff --git a/Package.resolved b/Package.resolved new file mode 100644 index 0000000..85a7d8c --- /dev/null +++ b/Package.resolved @@ -0,0 +1,32 @@ +{ + "pins" : [ + { + "identity" : "swift-atomics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-atomics.git", + "state" : { + "revision" : "cd142fd2f64be2100422d658e7411e39489da985", + "version" : "1.2.0" + } + }, + { + "identity" : "swift-collections", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-collections.git", + "state" : { + "revision" : "a902f1823a7ff3c9ab2fba0f992396b948eda307", + "version" : "1.0.5" + } + }, + { + "identity" : "swift-nio", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio", + "state" : { + "revision" : "702cd7c56d5d44eeba73fdf83918339b26dc855c", + "version" : "2.62.0" + } + } + ], + "version" : 2 +} diff --git a/Package.swift b/Package.swift index 72748a4..1ce242a 100644 --- a/Package.swift +++ b/Package.swift @@ -5,7 +5,7 @@ import PackageDescription let package = Package( name: "HPRTMP", - platforms: [.iOS(.v14),.macOS(.v11)], + platforms: [.iOS(.v14), .macOS(.v11)], products: [ .library( name: "HPRTMP", @@ -13,15 +13,14 @@ let package = Package( ), ], dependencies: [ - // Dependencies declare other packages that this package depends on. - // .package(url: /* package url */, from: "1.0.0"), + .package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), ], targets: [ - // Targets are the basic building blocks of a package. A target can define a module or a test suite. - // Targets can depend on other targets in this package, and on products in packages this package depends on. .target( name: "HPRTMP", - dependencies: []), + dependencies: [ + .product(name: "NIO", package: "swift-nio") + ]), .testTarget( name: "HPRTMPTests", dependencies: ["HPRTMP"]), diff --git a/Sources/HPRTMP/Network.swift b/Sources/HPRTMP/Network.swift deleted file mode 100644 index 2895733..0000000 --- a/Sources/HPRTMP/Network.swift +++ /dev/null @@ -1,43 +0,0 @@ - - -import Foundation -import Network - -protocol RTMPConnectable { - func connect(host: String, port: UInt16) async throws - func sendData(_ data: Data) async throws - func receiveData() async throws -> Data -} - -actor NWConnecter: RTMPConnectable { - private var connection: NWConnection? - - public func connect(host: String, port: UInt16) async throws { - let connection = NWConnection(host: NWEndpoint.Host(host), port: NWEndpoint.Port(rawValue: port) ?? 1935, using: .tcp) - self.connection = connection - connection.stateUpdateHandler = { [weak self] newState in - guard let self else { return } - Task { - switch newState { - case .ready: - break - case .failed(let error): - break - default: - break - } - } - } -// NWConnection.maxReadSize = Int((await windowControl.windowSize)) - connection.start(queue: DispatchQueue.global(qos: .default)) - } - - func sendData(_ data: Data) async throws { - try await connection?.sendData(data) - } - - func receiveData() async throws -> Data { - guard let connection = self.connection else { return Data() } - return try await connection.receiveData() - } -} diff --git a/Sources/HPRTMP/Network/NetworkClient.swift b/Sources/HPRTMP/Network/NetworkClient.swift new file mode 100644 index 0000000..c6272f2 --- /dev/null +++ b/Sources/HPRTMP/Network/NetworkClient.swift @@ -0,0 +1,128 @@ +import Foundation +import NIO + +protocol NetworkConnectable { + func connect(host: String, port: Int) async throws + func sendData(_ data: Data) async throws + func receiveData() async throws -> Data + func invalidate() +} + +final class RTMPClientHandler: ChannelInboundHandler { + typealias InboundIn = Data + private let responseCallback: (Data) -> Void + + init(responseCallback: @escaping (Data) -> Void) { + self.responseCallback = responseCallback + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let data = self.unwrapInboundIn(data) + guard !data.isEmpty else { return } + responseCallback(data) + } +} + +final class DataDecoder: ByteToMessageDecoder { + typealias InboundIn = ByteBuffer + typealias InboundOut = Data + + func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState { + var data = Data() + buffer.readWithUnsafeReadableBytes { ptr in + data.append(ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), count: ptr.count) + return ptr.count + } + context.fireChannelRead(wrapInboundOut(data)) + return .continue + } +} + +final class DataEncoder: MessageToByteEncoder { + typealias OutboundIn = Data + typealias OutboundOut = ByteBuffer + + func encode(data: Data, out: inout ByteBuffer) throws { + out.writeBytes(data) + } +} + +class NetworkClient: NetworkConnectable { + private let group: EventLoopGroup + private var channel: Channel? + private var host: String? + private var port: Int? + + private var cachedReceivedData: Data = .init() + private var dataPromise: EventLoopPromise? + + init() { + self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + } + + func connect(host: String, port: Int) async throws { + self.host = host + self.port = port + let bootstrap = ClientBootstrap(group: group) + .channelInitializer { channel in + channel.pipeline.addHandlers([ + ByteToMessageHandler(DataDecoder()), + MessageToByteHandler(DataEncoder()), + RTMPClientHandler(responseCallback: self.responseReceived) + ]) + } + + do { + self.channel = try await bootstrap.connect(host: host, port: Int(port)).get() + print("Connected to \(host):\(port)") + } catch { + print("Failed to connect: \(error)") + throw error + } + } + + func sendData(_ data: Data) async throws { + guard let channel = self.channel else { + print("Connection not established") + throw NSError(domain: "RTMPClientError", code: 1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"]) + } + + try await channel.writeAndFlush(data) + } + + func receiveData() async throws -> Data { + guard let channel = self.channel else { + print("Connection not established") + throw NSError(domain: "RTMPClientError", code: 1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"]) + } + if !cachedReceivedData.isEmpty { + let data = cachedReceivedData + cachedReceivedData = Data() + return data + } + + self.dataPromise = channel.eventLoop.makePromise(of: Data.self) + return try await dataPromise!.futureResult.get() + } + + private func responseReceived(data: Data) { + cachedReceivedData.append(data) + if let dataPromise { + dataPromise.succeed(cachedReceivedData) + cachedReceivedData = Data() + self.dataPromise = nil + } + } + + func shutdown() { + do { + try group.syncShutdownGracefully() + } catch { + print("Error shutting down: \(error)") + } + } + + func invalidate() { + shutdown() + } +} diff --git a/Sources/HPRTMP/RTMPSocket.swift b/Sources/HPRTMP/RTMPSocket.swift index 9b57947..82ca7b6 100644 --- a/Sources/HPRTMP/RTMPSocket.swift +++ b/Sources/HPRTMP/RTMPSocket.swift @@ -6,7 +6,6 @@ // import Foundation -import Network import os public enum RTMPStatus { @@ -67,7 +66,7 @@ protocol RTMPSocketDelegate: Actor { public actor RTMPSocket { - private var connection: NWConnection? + private let connection: NetworkConnectable = NetworkClient() private var status: RTMPStatus = .none @@ -127,36 +126,20 @@ extension RTMPSocket { public func resume() async { guard status != .connected else { return } guard let urlInfo else { return } - - let port = NWEndpoint.Port(rawValue: UInt16(urlInfo.port)) - let host = NWEndpoint.Host(urlInfo.host) - let connection = NWConnection(host: host, port: port ?? 1935, using: .tcp) - self.connection = connection - connection.stateUpdateHandler = { [weak self]newState in - guard let self else { return } + do { + try await connection.connect(host: urlInfo.host, port: urlInfo.port) + status = .open Task { - switch newState { - case .ready: - self.logger.info("connection state: ready") - guard await self.status == .open else { return } - await self.startShakeHands() - case .failed(let error): - self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)") - await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription)) - await self.invalidate() - default: - self.logger.info("connection state: other") - break - } + await self.startShakeHands() } + } catch { + self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)") + await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription)) + await self.invalidate() } - NWConnection.maxReadSize = Int((await windowControl.windowSize)) - status = .open - connection.start(queue: DispatchQueue.global(qos: .default)) } private func startShakeHands() async { - guard let connection = connection else { return } self.handshake = RTMPHandshake(dataSender: connection.sendData(_:), dataReceiver: receiveData) await self.handshake?.setDelegate(delegate: self) do { @@ -173,8 +156,7 @@ extension RTMPSocket { } await handshake?.reset() await decoder.reset() - connection?.cancel() - connection = nil + connection.invalidate() urlInfo = nil status = .closed await delegate?.socketDisconnected(self) @@ -262,7 +244,7 @@ extension RTMPSocket { extension RTMPSocket { private func sendData(_ data: Data) async throws { - try await connection?.sendData(data) + try await connection.sendData(data) await windowControl.addOutBytesCount(UInt32(data.count)) } func send(message: RTMPMessage, firstType: Bool) async { @@ -270,7 +252,6 @@ extension RTMPSocket { } private func receiveData() async throws -> Data { - guard let connection = self.connection else { return Data() } return try await connection.receiveData() } }