diff --git a/Sources/HPRTMP/Network/Network.swift b/Sources/HPRTMP/Network/Network.swift index 2895733..af67d13 100644 --- a/Sources/HPRTMP/Network/Network.swift +++ b/Sources/HPRTMP/Network/Network.swift @@ -21,14 +21,14 @@ actor NWConnecter: RTMPConnectable { switch newState { case .ready: break - case .failed(let error): + case .failed(_): break default: break } } } -// NWConnection.maxReadSize = Int((await windowControl.windowSize)) + // NWConnection.maxReadSize = Int((await windowControl.windowSize)) connection.start(queue: DispatchQueue.global(qos: .default)) } diff --git a/Sources/HPRTMP/Network/RTMPClient.swift b/Sources/HPRTMP/Network/RTMPClient.swift index 7decb74..315a2fb 100644 --- a/Sources/HPRTMP/Network/RTMPClient.swift +++ b/Sources/HPRTMP/Network/RTMPClient.swift @@ -2,92 +2,102 @@ import Foundation import NIO final class RTMPClientHandler: ChannelInboundHandler { - typealias InboundIn = Data - private let responseCallback: (Data) -> Void - - init(responseCallback: @escaping (Data) -> Void) { - self.responseCallback = responseCallback - } - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - let data = self.unwrapInboundIn(data) - responseCallback(data) - } + typealias InboundIn = Data + private let responseCallback: (Data) -> Void + + init(responseCallback: @escaping (Data) -> Void) { + self.responseCallback = responseCallback + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let data = self.unwrapInboundIn(data) + responseCallback(data) + } } final class DataDecoder: ByteToMessageDecoder { - typealias InboundIn = ByteBuffer - typealias InboundOut = Data - - func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState { - var data = Data() - buffer.readWithUnsafeReadableBytes { ptr in - data.append(ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), count: ptr.count) - return ptr.count - } - context.fireChannelRead(wrapInboundOut(data)) - return .continue + typealias InboundIn = ByteBuffer + typealias InboundOut = Data + + func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState { + var data = Data() + buffer.readWithUnsafeReadableBytes { ptr in + data.append(ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), count: ptr.count) + return ptr.count } + context.fireChannelRead(wrapInboundOut(data)) + return .continue + } } final class DataEncoder: MessageToByteEncoder { - typealias OutboundIn = Data - typealias OutboundOut = ByteBuffer - - func encode(data: Data, out: inout ByteBuffer) throws { - out.writeBytes(data) - } + typealias OutboundIn = Data + typealias OutboundOut = ByteBuffer + + func encode(data: Data, out: inout ByteBuffer) throws { + out.writeBytes(data) + } } -class RTMPClient { - private let group: EventLoopGroup - private var channel: Channel? - private let host: String - private let port: Int - private var responseCallback: ((Data) -> Void)? - - init(host: String, port: Int) { - self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) - self.host = host - self.port = port +class RTMPClient: RTMPConnectable { + private let group: EventLoopGroup + private var channel: Channel? + private let host: String + private let port: Int + + private var dataPromise: EventLoopPromise? + + init(host: String, port: Int) { + self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + self.host = host + self.port = port + } + + func connect(host: String, port: UInt16) async throws { + let bootstrap = ClientBootstrap(group: group) + .channelInitializer { channel in + channel.pipeline.addHandlers([ + ByteToMessageHandler(DataDecoder()), + MessageToByteHandler(DataEncoder()), + RTMPClientHandler(responseCallback: self.responseReceived) + ]) + } + + do { + self.channel = try await bootstrap.connect(host: host, port: Int(port)).get() + print("Connected to \(host):\(port)") + } catch { + print("Failed to connect: \(error)") + throw error } - - func connect(responseCallback: @escaping (Data) -> Void) { - self.responseCallback = responseCallback - let bootstrap = ClientBootstrap(group: group) - .channelInitializer { channel in - channel.pipeline.addHandlers([ - ByteToMessageHandler(DataDecoder()), - MessageToByteHandler(DataEncoder()), - RTMPClientHandler(responseCallback: self.responseReceived) - ]) - } - - do { - self.channel = try bootstrap.connect(host: host, port: port).wait() - print("Connected to \(host):\(port)") - } catch { - print("Failed to connect: \(error)") - } + } + + func sendData(_ data: Data) async throws { + guard let channel = self.channel else { + print("Connection not established") + throw NSError(domain: "RTMPClientError", code: 1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"]) } - - func send(data: Data) { - guard let channel = channel else { - print("Connection not established") - return - } - channel.writeAndFlush(data, promise: nil) + + try await channel.writeAndFlush(data) + } + + func receiveData() async throws -> Data { + guard let promise = self.dataPromise else { + throw NSError(domain: "RTMPClientError", code: 2, userInfo: [NSLocalizedDescriptionKey: "Data promise is not initialized."]) } - - private func responseReceived(data: Data) { - responseCallback?(data) - } - - func shutdown() { - do { - try group.syncShutdownGracefully() - } catch { - print("Error shutting down: \(error)") - } + self.dataPromise = channel!.eventLoop.makePromise(of: Data.self) + return try await promise.futureResult.get() + } + + private func responseReceived(data: Data) { + self.dataPromise?.succeed(data) + } + + func shutdown() { + do { + try group.syncShutdownGracefully() + } catch { + print("Error shutting down: \(error)") } + } }