From e7ae515bd4bda58b7bad8608544838fea0275f7b Mon Sep 17 00:00:00 2001 From: Huiping Guo Date: Thu, 7 Sep 2023 22:16:22 +0900 Subject: [PATCH 1/2] extract network part to protcol --- Sources/HPRTMP/Network.swift | 43 ++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 Sources/HPRTMP/Network.swift diff --git a/Sources/HPRTMP/Network.swift b/Sources/HPRTMP/Network.swift new file mode 100644 index 0000000..2895733 --- /dev/null +++ b/Sources/HPRTMP/Network.swift @@ -0,0 +1,43 @@ + + +import Foundation +import Network + +protocol RTMPConnectable { + func connect(host: String, port: UInt16) async throws + func sendData(_ data: Data) async throws + func receiveData() async throws -> Data +} + +actor NWConnecter: RTMPConnectable { + private var connection: NWConnection? + + public func connect(host: String, port: UInt16) async throws { + let connection = NWConnection(host: NWEndpoint.Host(host), port: NWEndpoint.Port(rawValue: port) ?? 1935, using: .tcp) + self.connection = connection + connection.stateUpdateHandler = { [weak self] newState in + guard let self else { return } + Task { + switch newState { + case .ready: + break + case .failed(let error): + break + default: + break + } + } + } +// NWConnection.maxReadSize = Int((await windowControl.windowSize)) + connection.start(queue: DispatchQueue.global(qos: .default)) + } + + func sendData(_ data: Data) async throws { + try await connection?.sendData(data) + } + + func receiveData() async throws -> Data { + guard let connection = self.connection else { return Data() } + return try await connection.receiveData() + } +} From 76f1b4ea443aa108fd8aef1f22d68ed6a6828da2 Mon Sep 17 00:00:00 2001 From: Huiping Guo Date: Sun, 17 Dec 2023 23:43:25 +0900 Subject: [PATCH 2/2] add swiftnio implement --- Package.resolved | 32 ++++++ Package.swift | 11 +- Sources/HPRTMP/Network.swift | 43 -------- Sources/HPRTMP/Network/NetworkClient.swift | 120 +++++++++++++++++++++ Sources/HPRTMP/RTMPSocket.swift | 41 ++----- 5 files changed, 168 insertions(+), 79 deletions(-) create mode 100644 Package.resolved delete mode 100644 Sources/HPRTMP/Network.swift create mode 100644 Sources/HPRTMP/Network/NetworkClient.swift diff --git a/Package.resolved b/Package.resolved new file mode 100644 index 0000000..85a7d8c --- /dev/null +++ b/Package.resolved @@ -0,0 +1,32 @@ +{ + "pins" : [ + { + "identity" : "swift-atomics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-atomics.git", + "state" : { + "revision" : "cd142fd2f64be2100422d658e7411e39489da985", + "version" : "1.2.0" + } + }, + { + "identity" : "swift-collections", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-collections.git", + "state" : { + "revision" : "a902f1823a7ff3c9ab2fba0f992396b948eda307", + "version" : "1.0.5" + } + }, + { + "identity" : "swift-nio", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio", + "state" : { + "revision" : "702cd7c56d5d44eeba73fdf83918339b26dc855c", + "version" : "2.62.0" + } + } + ], + "version" : 2 +} diff --git a/Package.swift b/Package.swift index 72748a4..1ce242a 100644 --- a/Package.swift +++ b/Package.swift @@ -5,7 +5,7 @@ import PackageDescription let package = Package( name: "HPRTMP", - platforms: [.iOS(.v14),.macOS(.v11)], + platforms: [.iOS(.v14), .macOS(.v11)], products: [ .library( name: "HPRTMP", @@ -13,15 +13,14 @@ let package = Package( ), ], dependencies: [ - // Dependencies declare other packages that this package depends on. - // .package(url: /* package url */, from: "1.0.0"), + .package(url: "https://github.com/apple/swift-nio", from: "2.0.0"), ], targets: [ - // Targets are the basic building blocks of a package. A target can define a module or a test suite. - // Targets can depend on other targets in this package, and on products in packages this package depends on. .target( name: "HPRTMP", - dependencies: []), + dependencies: [ + .product(name: "NIO", package: "swift-nio") + ]), .testTarget( name: "HPRTMPTests", dependencies: ["HPRTMP"]), diff --git a/Sources/HPRTMP/Network.swift b/Sources/HPRTMP/Network.swift deleted file mode 100644 index 2895733..0000000 --- a/Sources/HPRTMP/Network.swift +++ /dev/null @@ -1,43 +0,0 @@ - - -import Foundation -import Network - -protocol RTMPConnectable { - func connect(host: String, port: UInt16) async throws - func sendData(_ data: Data) async throws - func receiveData() async throws -> Data -} - -actor NWConnecter: RTMPConnectable { - private var connection: NWConnection? - - public func connect(host: String, port: UInt16) async throws { - let connection = NWConnection(host: NWEndpoint.Host(host), port: NWEndpoint.Port(rawValue: port) ?? 1935, using: .tcp) - self.connection = connection - connection.stateUpdateHandler = { [weak self] newState in - guard let self else { return } - Task { - switch newState { - case .ready: - break - case .failed(let error): - break - default: - break - } - } - } -// NWConnection.maxReadSize = Int((await windowControl.windowSize)) - connection.start(queue: DispatchQueue.global(qos: .default)) - } - - func sendData(_ data: Data) async throws { - try await connection?.sendData(data) - } - - func receiveData() async throws -> Data { - guard let connection = self.connection else { return Data() } - return try await connection.receiveData() - } -} diff --git a/Sources/HPRTMP/Network/NetworkClient.swift b/Sources/HPRTMP/Network/NetworkClient.swift new file mode 100644 index 0000000..9b9fef3 --- /dev/null +++ b/Sources/HPRTMP/Network/NetworkClient.swift @@ -0,0 +1,120 @@ +import Foundation +import NIO +import os + +protocol NetworkConnectable { + func connect(host: String, port: Int) async throws + func sendData(_ data: Data) async throws + func receiveData() async throws -> Data + func close() async throws +} + +final class RTMPClientHandler: ChannelInboundHandler { + typealias InboundIn = ByteBuffer + private let responseCallback: (Data) -> Void + + init(responseCallback: @escaping (Data) -> Void) { + self.responseCallback = responseCallback + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + var buffer = self.unwrapInboundIn(data) + + var data = Data() + buffer.readWithUnsafeReadableBytes { ptr in + data.append(ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), count: ptr.count) + return ptr.count + } + + guard !data.isEmpty else { return } + responseCallback(data) + } + + func errorCaught(context: ChannelHandlerContext, error: Error) { + print("error: ", error) + context.close(promise: nil) + } +} + +class NetworkClient: NetworkConnectable { + private let group: EventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + private var channel: Channel? + private var host: String? + private var port: Int? + + private var cachedReceivedData: Data = .init() + private var dataPromise: EventLoopPromise? + + private let logger = Logger(subsystem: "HPRTMP", category: "NetworkClient") + + + init() { + } + + deinit { + let group = group + Task { + try? await group.shutdownGracefully() + } + } + + func connect(host: String, port: Int) async throws { + self.host = host + self.port = port + + let bootstrap = ClientBootstrap(group: group) + .channelInitializer { channel in + channel.pipeline.addHandlers([ + RTMPClientHandler(responseCallback: self.responseReceived) + ]) + } + + do { + self.channel = try await bootstrap.connect(host: host, port: Int(port)).get() + logger.info("[HPRTMP] Connected to \(host):\(port)") + } catch { + logger.error("[HPRTMP] Failed to connect: \(error)") + throw error + } + } + + func sendData(_ data: Data) async throws { + guard let channel = self.channel else { + throw NSError(domain: "RTMPClientError", code: -1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"]) + } + + let buffer = channel.allocator.buffer(bytes: data) + try await channel.writeAndFlush(buffer) + } + + func receiveData() async throws -> Data { + guard let channel = self.channel else { + throw NSError(domain: "RTMPClientError", code: -1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"]) + } + if !cachedReceivedData.isEmpty { + let data = cachedReceivedData + cachedReceivedData = Data() + return data + } + + self.dataPromise = channel.eventLoop.makePromise(of: Data.self) + return try await dataPromise!.futureResult.get() + } + + private func responseReceived(data: Data) { + cachedReceivedData.append(data) + if let dataPromise { + dataPromise.succeed(cachedReceivedData) + cachedReceivedData = Data() + self.dataPromise = nil + } + } + + func close() async throws { + dataPromise?.fail(NSError(domain: "RTMPClientError", code: -2, userInfo: [NSLocalizedDescriptionKey: "Connection invalidated"])) + let channel = self.channel + dataPromise = nil + self.channel = nil + try await channel?.close() + } +} diff --git a/Sources/HPRTMP/RTMPSocket.swift b/Sources/HPRTMP/RTMPSocket.swift index 9b57947..8e3c93b 100644 --- a/Sources/HPRTMP/RTMPSocket.swift +++ b/Sources/HPRTMP/RTMPSocket.swift @@ -6,7 +6,6 @@ // import Foundation -import Network import os public enum RTMPStatus { @@ -67,7 +66,7 @@ protocol RTMPSocketDelegate: Actor { public actor RTMPSocket { - private var connection: NWConnection? + private let connection: NetworkConnectable = NetworkClient() private var status: RTMPStatus = .none @@ -127,36 +126,20 @@ extension RTMPSocket { public func resume() async { guard status != .connected else { return } guard let urlInfo else { return } - - let port = NWEndpoint.Port(rawValue: UInt16(urlInfo.port)) - let host = NWEndpoint.Host(urlInfo.host) - let connection = NWConnection(host: host, port: port ?? 1935, using: .tcp) - self.connection = connection - connection.stateUpdateHandler = { [weak self]newState in - guard let self else { return } + do { + try await connection.connect(host: urlInfo.host, port: urlInfo.port) + status = .open Task { - switch newState { - case .ready: - self.logger.info("connection state: ready") - guard await self.status == .open else { return } - await self.startShakeHands() - case .failed(let error): - self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)") - await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription)) - await self.invalidate() - default: - self.logger.info("connection state: other") - break - } + await self.startShakeHands() } + } catch { + self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)") + await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription)) + await self.invalidate() } - NWConnection.maxReadSize = Int((await windowControl.windowSize)) - status = .open - connection.start(queue: DispatchQueue.global(qos: .default)) } private func startShakeHands() async { - guard let connection = connection else { return } self.handshake = RTMPHandshake(dataSender: connection.sendData(_:), dataReceiver: receiveData) await self.handshake?.setDelegate(delegate: self) do { @@ -173,8 +156,7 @@ extension RTMPSocket { } await handshake?.reset() await decoder.reset() - connection?.cancel() - connection = nil + try? await connection.close() urlInfo = nil status = .closed await delegate?.socketDisconnected(self) @@ -262,7 +244,7 @@ extension RTMPSocket { extension RTMPSocket { private func sendData(_ data: Data) async throws { - try await connection?.sendData(data) + try await connection.sendData(data) await windowControl.addOutBytesCount(UInt32(data.count)) } func send(message: RTMPMessage, firstType: Bool) async { @@ -270,7 +252,6 @@ extension RTMPSocket { } private func receiveData() async throws -> Data { - guard let connection = self.connection else { return Data() } return try await connection.receiveData() } }