Skip to content

Commit

Permalink
add swiftnio implement
Browse files Browse the repository at this point in the history
  • Loading branch information
huiping192 committed May 4, 2024
1 parent e7ae515 commit 93f51c7
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 79 deletions.
32 changes: 32 additions & 0 deletions Package.resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"pins" : [
{
"identity" : "swift-atomics",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-atomics.git",
"state" : {
"revision" : "cd142fd2f64be2100422d658e7411e39489da985",
"version" : "1.2.0"
}
},
{
"identity" : "swift-collections",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-collections.git",
"state" : {
"revision" : "a902f1823a7ff3c9ab2fba0f992396b948eda307",
"version" : "1.0.5"
}
},
{
"identity" : "swift-nio",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio",
"state" : {
"revision" : "702cd7c56d5d44eeba73fdf83918339b26dc855c",
"version" : "2.62.0"
}
}
],
"version" : 2
}
11 changes: 5 additions & 6 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@ import PackageDescription

let package = Package(
name: "HPRTMP",
platforms: [.iOS(.v14),.macOS(.v11)],
platforms: [.iOS(.v14), .macOS(.v11)],
products: [
.library(
name: "HPRTMP",
targets: ["HPRTMP"]
),
],
dependencies: [
// Dependencies declare other packages that this package depends on.
// .package(url: /* package url */, from: "1.0.0"),
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
],
targets: [
// Targets are the basic building blocks of a package. A target can define a module or a test suite.
// Targets can depend on other targets in this package, and on products in packages this package depends on.
.target(
name: "HPRTMP",
dependencies: []),
dependencies: [
.product(name: "NIO", package: "swift-nio")
]),
.testTarget(
name: "HPRTMPTests",
dependencies: ["HPRTMP"]),
Expand Down
43 changes: 0 additions & 43 deletions Sources/HPRTMP/Network.swift

This file was deleted.

128 changes: 128 additions & 0 deletions Sources/HPRTMP/Network/NetworkClient.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import Foundation
import NIO

protocol NetworkConnectable {
func connect(host: String, port: Int) async throws
func sendData(_ data: Data) async throws
func receiveData() async throws -> Data
func invalidate()
}

final class RTMPClientHandler: ChannelInboundHandler {
typealias InboundIn = Data
private let responseCallback: (Data) -> Void

init(responseCallback: @escaping (Data) -> Void) {
self.responseCallback = responseCallback
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let data = self.unwrapInboundIn(data)
guard !data.isEmpty else { return }
responseCallback(data)
}
}

final class DataDecoder: ByteToMessageDecoder {
typealias InboundIn = ByteBuffer
typealias InboundOut = Data

func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
var data = Data()
buffer.readWithUnsafeReadableBytes { ptr in
data.append(ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), count: ptr.count)
return ptr.count
}
context.fireChannelRead(wrapInboundOut(data))
return .continue
}
}

final class DataEncoder: MessageToByteEncoder {
typealias OutboundIn = Data
typealias OutboundOut = ByteBuffer

func encode(data: Data, out: inout ByteBuffer) throws {
out.writeBytes(data)
}
}

class NetworkClient: NetworkConnectable {
private let group: EventLoopGroup
private var channel: Channel?
private var host: String?
private var port: Int?

private var cachedReceivedData: Data = .init()
private var dataPromise: EventLoopPromise<Data>?

init() {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
}

func connect(host: String, port: Int) async throws {
self.host = host
self.port = port
let bootstrap = ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHandlers([
ByteToMessageHandler(DataDecoder()),
MessageToByteHandler(DataEncoder()),
RTMPClientHandler(responseCallback: self.responseReceived)
])
}

do {
self.channel = try await bootstrap.connect(host: host, port: Int(port)).get()
print("Connected to \(host):\(port)")
} catch {
print("Failed to connect: \(error)")
throw error
}
}

func sendData(_ data: Data) async throws {
guard let channel = self.channel else {
print("Connection not established")
throw NSError(domain: "RTMPClientError", code: 1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"])
}

try await channel.writeAndFlush(data)
}

func receiveData() async throws -> Data {
guard let channel = self.channel else {
print("Connection not established")
throw NSError(domain: "RTMPClientError", code: 1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"])
}
if !cachedReceivedData.isEmpty {
let data = cachedReceivedData
cachedReceivedData = Data()
return data
}

self.dataPromise = channel.eventLoop.makePromise(of: Data.self)
return try await dataPromise!.futureResult.get()
}

private func responseReceived(data: Data) {
cachedReceivedData.append(data)
if let dataPromise {
dataPromise.succeed(cachedReceivedData)
cachedReceivedData = Data()
self.dataPromise = nil
}
}

func shutdown() {
do {
try group.syncShutdownGracefully()
} catch {
print("Error shutting down: \(error)")
}
}

func invalidate() {
shutdown()
}
}
41 changes: 11 additions & 30 deletions Sources/HPRTMP/RTMPSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
//

import Foundation
import Network
import os

public enum RTMPStatus {
Expand Down Expand Up @@ -67,7 +66,7 @@ protocol RTMPSocketDelegate: Actor {

public actor RTMPSocket {

private var connection: NWConnection?
private let connection: NetworkConnectable = NetworkClient()

private var status: RTMPStatus = .none

Expand Down Expand Up @@ -127,36 +126,20 @@ extension RTMPSocket {
public func resume() async {
guard status != .connected else { return }
guard let urlInfo else { return }

let port = NWEndpoint.Port(rawValue: UInt16(urlInfo.port))
let host = NWEndpoint.Host(urlInfo.host)
let connection = NWConnection(host: host, port: port ?? 1935, using: .tcp)
self.connection = connection
connection.stateUpdateHandler = { [weak self]newState in
guard let self else { return }
do {
try await connection.connect(host: urlInfo.host, port: urlInfo.port)
status = .open
Task {
switch newState {
case .ready:
self.logger.info("connection state: ready")
guard await self.status == .open else { return }
await self.startShakeHands()
case .failed(let error):
self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)")
await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription))
await self.invalidate()
default:
self.logger.info("connection state: other")
break
}
await self.startShakeHands()
}
} catch {
self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)")
await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription))
await self.invalidate()
}
NWConnection.maxReadSize = Int((await windowControl.windowSize))
status = .open
connection.start(queue: DispatchQueue.global(qos: .default))
}

private func startShakeHands() async {
guard let connection = connection else { return }
self.handshake = RTMPHandshake(dataSender: connection.sendData(_:), dataReceiver: receiveData)
await self.handshake?.setDelegate(delegate: self)
do {
Expand All @@ -173,8 +156,7 @@ extension RTMPSocket {
}
await handshake?.reset()
await decoder.reset()
connection?.cancel()
connection = nil
connection.invalidate()
urlInfo = nil
status = .closed
await delegate?.socketDisconnected(self)
Expand Down Expand Up @@ -262,15 +244,14 @@ extension RTMPSocket {

extension RTMPSocket {
private func sendData(_ data: Data) async throws {
try await connection?.sendData(data)
try await connection.sendData(data)
await windowControl.addOutBytesCount(UInt32(data.count))
}
func send(message: RTMPMessage, firstType: Bool) async {
await messagePriorityQueue.enqueue(message, firstType: firstType)
}

private func receiveData() async throws -> Data {
guard let connection = self.connection else { return Data() }
return try await connection.receiveData()
}
}
Expand Down

0 comments on commit 93f51c7

Please sign in to comment.