Skip to content

Commit

Permalink
replace network framework
Browse files Browse the repository at this point in the history
  • Loading branch information
huiping192 committed Apr 29, 2024
1 parent c78ba86 commit 97dfe34
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 80 deletions.
43 changes: 0 additions & 43 deletions Sources/HPRTMP/Network/Network.swift

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import Foundation
import NIO

protocol NetworkConnectable {
func connect(host: String, port: Int) async throws
func sendData(_ data: Data) async throws
func receiveData() async throws -> Data
func invalidate()
}

final class RTMPClientHandler: ChannelInboundHandler {
typealias InboundIn = Data
private let responseCallback: (Data) -> Void
Expand Down Expand Up @@ -39,21 +46,21 @@ final class DataEncoder: MessageToByteEncoder {
}
}

class RTMPClient: RTMPConnectable {
class NetworkClient: NetworkConnectable {
private let group: EventLoopGroup
private var channel: Channel?
private let host: String
private let port: Int
private var host: String?
private var port: Int?

private var dataPromise: EventLoopPromise<Data>?

init(host: String, port: Int) {
init() {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
self.host = host
self.port = port
}

func connect(host: String, port: UInt16) async throws {
func connect(host: String, port: Int) async throws {
self.host = host
self.port = port
let bootstrap = ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHandlers([
Expand Down Expand Up @@ -100,4 +107,8 @@ class RTMPClient: RTMPConnectable {
print("Error shutting down: \(error)")
}
}

func invalidate() {
shutdown()
}
}
41 changes: 11 additions & 30 deletions Sources/HPRTMP/RTMPSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
//

import Foundation
import Network
import os

public enum RTMPStatus {
Expand Down Expand Up @@ -67,7 +66,7 @@ protocol RTMPSocketDelegate: Actor {

public actor RTMPSocket {

private var connection: NWConnection?
private let connection: NetworkConnectable = NetworkClient()

private var status: RTMPStatus = .none

Expand Down Expand Up @@ -127,36 +126,20 @@ extension RTMPSocket {
public func resume() async {
guard status != .connected else { return }
guard let urlInfo else { return }

let port = NWEndpoint.Port(rawValue: UInt16(urlInfo.port))
let host = NWEndpoint.Host(urlInfo.host)
let connection = NWConnection(host: host, port: port ?? 1935, using: .tcp)
self.connection = connection
connection.stateUpdateHandler = { [weak self]newState in
guard let self else { return }
do {
try await connection.connect(host: urlInfo.host, port: urlInfo.port)
status = .open
Task {
switch newState {
case .ready:
self.logger.info("connection state: ready")
guard await self.status == .open else { return }
await self.startShakeHands()
case .failed(let error):
self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)")
await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription))
await self.invalidate()
default:
self.logger.info("connection state: other")
break
}
await self.startShakeHands()
}
} catch {
self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)")
await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription))
await self.invalidate()
}
NWConnection.maxReadSize = Int((await windowControl.windowSize))
status = .open
connection.start(queue: DispatchQueue.global(qos: .default))
}

private func startShakeHands() async {
guard let connection = connection else { return }
self.handshake = RTMPHandshake(dataSender: connection.sendData(_:), dataReceiver: receiveData)
await self.handshake?.setDelegate(delegate: self)
do {
Expand All @@ -173,8 +156,7 @@ extension RTMPSocket {
}
await handshake?.reset()
await decoder.reset()
connection?.cancel()
connection = nil
connection.invalidate()
urlInfo = nil
status = .closed
await delegate?.socketDisconnected(self)
Expand Down Expand Up @@ -262,15 +244,14 @@ extension RTMPSocket {

extension RTMPSocket {
private func sendData(_ data: Data) async throws {
try await connection?.sendData(data)
try await connection.sendData(data)
await windowControl.addOutBytesCount(UInt32(data.count))
}
func send(message: RTMPMessage, firstType: Bool) async {
await messagePriorityQueue.enqueue(message, firstType: firstType)
}

private func receiveData() async throws -> Data {
guard let connection = self.connection else { return Data() }
return try await connection.receiveData()
}
}
Expand Down

0 comments on commit 97dfe34

Please sign in to comment.