Skip to content

Commit

Permalink
swiftnio implement
Browse files Browse the repository at this point in the history
  • Loading branch information
huiping192 committed Mar 17, 2024
1 parent b7f6e4d commit c78ba86
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 77 deletions.
4 changes: 2 additions & 2 deletions Sources/HPRTMP/Network/Network.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ actor NWConnecter: RTMPConnectable {
switch newState {
case .ready:
break
case .failed(let error):
case .failed(_):
break
default:
break
}
}
}
// NWConnection.maxReadSize = Int((await windowControl.windowSize))
// NWConnection.maxReadSize = Int((await windowControl.windowSize))
connection.start(queue: DispatchQueue.global(qos: .default))
}

Expand Down
160 changes: 85 additions & 75 deletions Sources/HPRTMP/Network/RTMPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,92 +2,102 @@ import Foundation
import NIO

final class RTMPClientHandler: ChannelInboundHandler {
typealias InboundIn = Data
private let responseCallback: (Data) -> Void

init(responseCallback: @escaping (Data) -> Void) {
self.responseCallback = responseCallback
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let data = self.unwrapInboundIn(data)
responseCallback(data)
}
typealias InboundIn = Data
private let responseCallback: (Data) -> Void
init(responseCallback: @escaping (Data) -> Void) {
self.responseCallback = responseCallback
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let data = self.unwrapInboundIn(data)
responseCallback(data)
}
}

final class DataDecoder: ByteToMessageDecoder {
typealias InboundIn = ByteBuffer
typealias InboundOut = Data

func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
var data = Data()
buffer.readWithUnsafeReadableBytes { ptr in
data.append(ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), count: ptr.count)
return ptr.count
}
context.fireChannelRead(wrapInboundOut(data))
return .continue
typealias InboundIn = ByteBuffer
typealias InboundOut = Data

func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
var data = Data()
buffer.readWithUnsafeReadableBytes { ptr in
data.append(ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), count: ptr.count)
return ptr.count
}
context.fireChannelRead(wrapInboundOut(data))
return .continue
}
}

final class DataEncoder: MessageToByteEncoder {
typealias OutboundIn = Data
typealias OutboundOut = ByteBuffer

func encode(data: Data, out: inout ByteBuffer) throws {
out.writeBytes(data)
}
typealias OutboundIn = Data
typealias OutboundOut = ByteBuffer
func encode(data: Data, out: inout ByteBuffer) throws {
out.writeBytes(data)
}
}

class RTMPClient {
private let group: EventLoopGroup
private var channel: Channel?
private let host: String
private let port: Int
private var responseCallback: ((Data) -> Void)?

init(host: String, port: Int) {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
self.host = host
self.port = port
class RTMPClient: RTMPConnectable {
private let group: EventLoopGroup
private var channel: Channel?
private let host: String
private let port: Int

private var dataPromise: EventLoopPromise<Data>?

init(host: String, port: Int) {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
self.host = host
self.port = port
}

func connect(host: String, port: UInt16) async throws {
let bootstrap = ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHandlers([
ByteToMessageHandler(DataDecoder()),
MessageToByteHandler(DataEncoder()),
RTMPClientHandler(responseCallback: self.responseReceived)
])
}

do {
self.channel = try await bootstrap.connect(host: host, port: Int(port)).get()
print("Connected to \(host):\(port)")
} catch {
print("Failed to connect: \(error)")
throw error
}

func connect(responseCallback: @escaping (Data) -> Void) {
self.responseCallback = responseCallback
let bootstrap = ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHandlers([
ByteToMessageHandler(DataDecoder()),
MessageToByteHandler(DataEncoder()),
RTMPClientHandler(responseCallback: self.responseReceived)
])
}

do {
self.channel = try bootstrap.connect(host: host, port: port).wait()
print("Connected to \(host):\(port)")
} catch {
print("Failed to connect: \(error)")
}
}

func sendData(_ data: Data) async throws {
guard let channel = self.channel else {
print("Connection not established")
throw NSError(domain: "RTMPClientError", code: 1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"])
}

func send(data: Data) {
guard let channel = channel else {
print("Connection not established")
return
}
channel.writeAndFlush(data, promise: nil)
try await channel.writeAndFlush(data)
}

func receiveData() async throws -> Data {
guard let promise = self.dataPromise else {
throw NSError(domain: "RTMPClientError", code: 2, userInfo: [NSLocalizedDescriptionKey: "Data promise is not initialized."])
}

private func responseReceived(data: Data) {
responseCallback?(data)
}

func shutdown() {
do {
try group.syncShutdownGracefully()
} catch {
print("Error shutting down: \(error)")
}
self.dataPromise = channel!.eventLoop.makePromise(of: Data.self)
return try await promise.futureResult.get()
}

private func responseReceived(data: Data) {
self.dataPromise?.succeed(data)
}

func shutdown() {
do {
try group.syncShutdownGracefully()
} catch {
print("Error shutting down: \(error)")
}
}
}

0 comments on commit c78ba86

Please sign in to comment.