diff --git a/Sources/HPRTMP/Network/Network.swift b/Sources/HPRTMP/Network/Network.swift deleted file mode 100644 index af67d13..0000000 --- a/Sources/HPRTMP/Network/Network.swift +++ /dev/null @@ -1,43 +0,0 @@ - - -import Foundation -import Network - -protocol RTMPConnectable { - func connect(host: String, port: UInt16) async throws - func sendData(_ data: Data) async throws - func receiveData() async throws -> Data -} - -actor NWConnecter: RTMPConnectable { - private var connection: NWConnection? - - public func connect(host: String, port: UInt16) async throws { - let connection = NWConnection(host: NWEndpoint.Host(host), port: NWEndpoint.Port(rawValue: port) ?? 1935, using: .tcp) - self.connection = connection - connection.stateUpdateHandler = { [weak self] newState in - guard let self else { return } - Task { - switch newState { - case .ready: - break - case .failed(_): - break - default: - break - } - } - } - // NWConnection.maxReadSize = Int((await windowControl.windowSize)) - connection.start(queue: DispatchQueue.global(qos: .default)) - } - - func sendData(_ data: Data) async throws { - try await connection?.sendData(data) - } - - func receiveData() async throws -> Data { - guard let connection = self.connection else { return Data() } - return try await connection.receiveData() - } -} diff --git a/Sources/HPRTMP/Network/RTMPClient.swift b/Sources/HPRTMP/Network/NetworkClient.swift similarity index 87% rename from Sources/HPRTMP/Network/RTMPClient.swift rename to Sources/HPRTMP/Network/NetworkClient.swift index 315a2fb..153b0a9 100644 --- a/Sources/HPRTMP/Network/RTMPClient.swift +++ b/Sources/HPRTMP/Network/NetworkClient.swift @@ -1,6 +1,13 @@ import Foundation import NIO +protocol NetworkConnectable { + func connect(host: String, port: Int) async throws + func sendData(_ data: Data) async throws + func receiveData() async throws -> Data + func invalidate() +} + final class RTMPClientHandler: ChannelInboundHandler { typealias InboundIn = Data private let responseCallback: (Data) -> Void @@ -39,21 +46,21 @@ final class DataEncoder: MessageToByteEncoder { } } -class RTMPClient: RTMPConnectable { +class NetworkClient: NetworkConnectable { private let group: EventLoopGroup private var channel: Channel? - private let host: String - private let port: Int + private var host: String? + private var port: Int? private var dataPromise: EventLoopPromise? - init(host: String, port: Int) { + init() { self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) - self.host = host - self.port = port } - func connect(host: String, port: UInt16) async throws { + func connect(host: String, port: Int) async throws { + self.host = host + self.port = port let bootstrap = ClientBootstrap(group: group) .channelInitializer { channel in channel.pipeline.addHandlers([ @@ -100,4 +107,8 @@ class RTMPClient: RTMPConnectable { print("Error shutting down: \(error)") } } + + func invalidate() { + shutdown() + } } diff --git a/Sources/HPRTMP/RTMPSocket.swift b/Sources/HPRTMP/RTMPSocket.swift index 9b57947..82ca7b6 100644 --- a/Sources/HPRTMP/RTMPSocket.swift +++ b/Sources/HPRTMP/RTMPSocket.swift @@ -6,7 +6,6 @@ // import Foundation -import Network import os public enum RTMPStatus { @@ -67,7 +66,7 @@ protocol RTMPSocketDelegate: Actor { public actor RTMPSocket { - private var connection: NWConnection? + private let connection: NetworkConnectable = NetworkClient() private var status: RTMPStatus = .none @@ -127,36 +126,20 @@ extension RTMPSocket { public func resume() async { guard status != .connected else { return } guard let urlInfo else { return } - - let port = NWEndpoint.Port(rawValue: UInt16(urlInfo.port)) - let host = NWEndpoint.Host(urlInfo.host) - let connection = NWConnection(host: host, port: port ?? 1935, using: .tcp) - self.connection = connection - connection.stateUpdateHandler = { [weak self]newState in - guard let self else { return } + do { + try await connection.connect(host: urlInfo.host, port: urlInfo.port) + status = .open Task { - switch newState { - case .ready: - self.logger.info("connection state: ready") - guard await self.status == .open else { return } - await self.startShakeHands() - case .failed(let error): - self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)") - await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription)) - await self.invalidate() - default: - self.logger.info("connection state: other") - break - } + await self.startShakeHands() } + } catch { + self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)") + await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription)) + await self.invalidate() } - NWConnection.maxReadSize = Int((await windowControl.windowSize)) - status = .open - connection.start(queue: DispatchQueue.global(qos: .default)) } private func startShakeHands() async { - guard let connection = connection else { return } self.handshake = RTMPHandshake(dataSender: connection.sendData(_:), dataReceiver: receiveData) await self.handshake?.setDelegate(delegate: self) do { @@ -173,8 +156,7 @@ extension RTMPSocket { } await handshake?.reset() await decoder.reset() - connection?.cancel() - connection = nil + connection.invalidate() urlInfo = nil status = .closed await delegate?.socketDisconnected(self) @@ -262,7 +244,7 @@ extension RTMPSocket { extension RTMPSocket { private func sendData(_ data: Data) async throws { - try await connection?.sendData(data) + try await connection.sendData(data) await windowControl.addOutBytesCount(UInt32(data.count)) } func send(message: RTMPMessage, firstType: Bool) async { @@ -270,7 +252,6 @@ extension RTMPSocket { } private func receiveData() async throws -> Data { - guard let connection = self.connection else { return Data() } return try await connection.receiveData() } }