Skip to content

Commit

Permalink
Merge pull request #10 from huiping192/feature/rtmp-status
Browse files Browse the repository at this point in the history
support transmission statistics callback
  • Loading branch information
huiping192 authored Sep 15, 2023
2 parents 6cfdcce + 2fe6a64 commit c6f0644
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/swift.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
- main

env:
DEVELOPER_DIR: /Applications/Xcode.app
DEVELOPER_DIR: /Applications/Xcode-14.3.1.app

jobs:
build:
Expand Down
6 changes: 5 additions & 1 deletion Example/HPRTMPExample/Shared/RTMPService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import HPRTMP
import Combine

actor RTMPService: ObservableObject, RTMPPublishSessionDelegate {
func sessionTransmissionStatisticsChanged(_ session: HPRTMP.RTMPPublishSession, statistics: HPRTMP.TransmissionStatistics) {
print("[test] \(statistics)")
}

func sessionStatusChange(_ session: HPRTMP.RTMPPublishSession, status: HPRTMP.RTMPPublishSession.Status) {
if status == .publishStart {
Task {
Expand Down Expand Up @@ -55,7 +59,7 @@ actor RTMPService: ObservableObject, RTMPPublishSessionDelegate {
func run() async {
await session.setDelegate(self)
let publishConfig = PublishConfigure(width: 1280, height: 720, videocodecid: VideoData.CodecId.avc.rawValue, audiocodecid: AudioData.SoundFormat.aac.rawValue, framerate: 30, videoDatarate: 30, audioDatarate: nil, audioSamplerate: nil)
await session.publish(url: "rtmp://192.168.11.23/live/haha", configure: publishConfig)
await session.publish(url: "rtmp://192.168.11.3/live/haha", configure: publishConfig)

isRunning = true
}
Expand Down
11 changes: 11 additions & 0 deletions Sources/HPRTMP/RTMPPublishSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import os
public protocol RTMPPublishSessionDelegate: Actor {
func sessionStatusChange(_ session: RTMPPublishSession, status: RTMPPublishSession.Status)
func sessionError(_ session: RTMPPublishSession, error: RTMPError)

// transmission statistics
func sessionTransmissionStatisticsChanged(_ session: RTMPPublishSession, statistics: TransmissionStatistics)
}

public actor RTMPPublishSession {
Expand Down Expand Up @@ -113,6 +116,8 @@ public actor RTMPPublishSession {
}

extension RTMPPublishSession: RTMPSocketDelegate {


// publisher dont need implement
func socketGetMeta(_ socket: RTMPSocket, meta: MetaDataResponse) {}
func socketStreamOutputAudio(_ socket: RTMPSocket, data: Data, timeStamp: Int64) {}
Expand Down Expand Up @@ -197,5 +202,11 @@ extension RTMPPublishSession: RTMPSocketDelegate {
func socketDisconnected(_ socket: RTMPSocket) {
publishStatus = .disconnected
}

func socketStreamStatistics(_ socket: RTMPSocket, statistics: TransmissionStatistics) {
Task {
await delegate?.sessionTransmissionStatisticsChanged(self, statistics: statistics)
}
}
}

52 changes: 38 additions & 14 deletions Sources/HPRTMP/RTMPSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public enum RTMPError: Error {
}
}

public struct TransmissionStatistics {
// todo
// let rtt: Int

let pendingMessageCount: Int
}

protocol RTMPSocketDelegate: Actor {
func socketHandShakeDone(_ socket: RTMPSocket)
func socketPinRequest(_ socket: RTMPSocket, data: Data)
Expand All @@ -54,6 +61,8 @@ protocol RTMPSocketDelegate: Actor {
func socketStreamRecord(_ socket: RTMPSocket)
func socketStreamPlayStart(_ socket: RTMPSocket)
func socketStreamPause(_ socket: RTMPSocket, pause: Bool)

func socketStreamStatistics(_ socket: RTMPSocket, statistics: TransmissionStatistics)
}

public actor RTMPSocket {
Expand Down Expand Up @@ -86,9 +95,8 @@ public actor RTMPSocket {

private let logger = Logger(subsystem: "HPRTMP", category: "RTMPSocket")

private var sendMessagesTask: Task<Void, Never>?
private var receiveMessagesTask: Task<Void, Never>?

private var tasks: [Task<Void, Never>] = []

public init() async {
await windowControl.setInBytesWindowEvent { [weak self]inbytesCount in
await self?.sendAcknowledgementMessage(sequence: inbytesCount)
Expand Down Expand Up @@ -160,8 +168,9 @@ extension RTMPSocket {

public func invalidate() async {
guard status != .closed && status != .none else { return }
sendMessagesTask?.cancel()
receiveMessagesTask?.cancel()
tasks.forEach {
$0.cancel()
}
await handshake?.reset()
await decoder.reset()
connection?.cancel()
Expand All @@ -172,7 +181,7 @@ extension RTMPSocket {
}

private func startSendMessages() {
sendMessagesTask = Task {
let task = Task {
while !Task.isCancelled {
guard let messageContainer = await messagePriorityQueue.dequeue() else { continue }
let message = messageContainer.message
Expand Down Expand Up @@ -213,10 +222,12 @@ extension RTMPSocket {
}
}
}

tasks.append(task)
}

private func startReceiveData() {
receiveMessagesTask = Task {
let task = Task {
while !Task.isCancelled {
do {
let data = try await receiveData()
Expand All @@ -229,19 +240,30 @@ extension RTMPSocket {
}
}
}

tasks.append(task)
}

private func startUpdateTransmissionStatistics() {
let task = Task {
while !Task.isCancelled {
// 1 second
try? await Task.sleep(nanoseconds: UInt64(1000 * 1000 * 1000))

let pendingMessageCount = await messagePriorityQueue.pendingMessageCount
let statistics = TransmissionStatistics(pendingMessageCount: pendingMessageCount)
await delegate?.socketStreamStatistics(self, statistics: statistics)
}
}

tasks.append(task)
}
}

extension RTMPSocket {
private func sendData(_ data: Data) async throws {
try await sendDataList([data])
}

private func sendDataList(_ data: [Data]) async throws {
try await connection?.sendData(data)

let bytesCount = data.reduce(0, { $0 + $1.count })
await windowControl.addOutBytesCount(UInt32(bytesCount))
await windowControl.addOutBytesCount(UInt32(data.count))
}
func send(message: RTMPMessage, firstType: Bool) async {
await messagePriorityQueue.enqueue(message, firstType: firstType)
Expand All @@ -264,6 +286,8 @@ extension RTMPSocket: RTMPHandshakeDelegate {

// start receive data
await startReceiveData()

await startUpdateTransmissionStatistics()
}
}
}
Expand Down
6 changes: 0 additions & 6 deletions Sources/HPRTMP/Util/NWConnectionExtensions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@ import Network

extension NWConnection {
static var maxReadSize = Int(UInt16.max)

func sendData(_ datas: [Data]) async throws -> Void {
for data in datas {
try await sendData(data)
}
}

func sendData(_ data: Data) async throws -> Void {
try await withCheckedThrowingContinuation { [weak self](continuation: CheckedContinuation<Void, Error>) in
Expand Down
4 changes: 4 additions & 0 deletions Sources/HPRTMP/Util/PriorityQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ actor PriorityQueue {
var isEmpty: Bool {
highPriorityQueue.isEmpty && mediumPriorityQueue.isEmpty && lowPriorityQueue.isEmpty
}

var pendingMessageCount: Int {
highPriorityQueue.count + mediumPriorityQueue.count + lowPriorityQueue.count
}
}

0 comments on commit c6f0644

Please sign in to comment.