From 616b1f244fc9f9846e7829a723c7869837108b7c Mon Sep 17 00:00:00 2001 From: Huiping Guo Date: Tue, 26 Sep 2023 00:12:41 +0900 Subject: [PATCH 1/4] create player session --- Sources/HPRTMP/Message/CommandMessage.swift | 23 +++ Sources/HPRTMP/RTMPPlayerSession.swift | 194 ++++++++++++++++++++ 2 files changed, 217 insertions(+) create mode 100644 Sources/HPRTMP/RTMPPlayerSession.swift diff --git a/Sources/HPRTMP/Message/CommandMessage.swift b/Sources/HPRTMP/Message/CommandMessage.swift index 17ab606..ee0fac1 100644 --- a/Sources/HPRTMP/Message/CommandMessage.swift +++ b/Sources/HPRTMP/Message/CommandMessage.swift @@ -254,3 +254,26 @@ class PauseMessage: CommandMessage { } } +class PlayMessage: CommandMessage { + let streamName: String + init(encodeType: ObjectEncodingType = .amf0, streamName: String) { + self.streamName = streamName + super.init(encodeType: encodeType, commandName: "play", transactionId: commonTransactionId.stream) + } + + override var payload: Data { + var data = Data() + let encoder = AMF0Encoder() + + data.append((encoder.encode(commandName)) ?? Data()) + data.append((encoder.encode(Double(transactionId))) ?? Data()) + data.append((encoder.encodeNil())) + data.append((encoder.encode(streamName)) ?? Data()) + + data.append((encoder.encode(Double(-1))) ?? Data()) + data.append((encoder.encode(Double(-1))) ?? Data()) + data.append((encoder.encode(false)) ?? Data()) + + return data + } +} diff --git a/Sources/HPRTMP/RTMPPlayerSession.swift b/Sources/HPRTMP/RTMPPlayerSession.swift new file mode 100644 index 0000000..f28f1f6 --- /dev/null +++ b/Sources/HPRTMP/RTMPPlayerSession.swift @@ -0,0 +1,194 @@ +// +// RTMPPlayerSession.swift +// +// +// Created by 郭 輝平 on 2023/09/25. +// + +import Foundation +import os + +public protocol RTMPPlayerSessionDelegate: Actor { + func sessionStatusChange(_ session: RTMPPlayerSession, status: RTMPPlayerSession.Status) + func sessionError(_ session: RTMPPlayerSession, error: RTMPError) + +} + +public actor RTMPPlayerSession { + public enum Status: Equatable { + case unknown + case handShakeStart + case handShakeDone + case connect + case playStart + case failed(err: RTMPError) + case disconnected + + public static func ==(lhs: Status, rhs: Status) -> Bool { + switch (lhs, rhs) { + case (.unknown, .unknown), + (.connect, .connect), + (.playStart, .playStart), + (.disconnected, .disconnected): + return true + case let (.failed(err1), .failed(err2)): + return err1.localizedDescription == err2.localizedDescription + default: + return false + } + } + } + + public weak var delegate: RTMPPlayerSessionDelegate? + public func setDelegate(_ delegate: RTMPPlayerSessionDelegate?) { + self.delegate = delegate + } + + public var status: Status = .unknown { + didSet { + Task { + await delegate?.sessionStatusChange(self, status: status) + } + } + } + + public let encodeType: ObjectEncodingType = .amf0 + + private var socket: RTMPSocket! + + private let transactionIdGenerator = TransactionIdGenerator() + + private var configure: PublishConfigure? + + private var connectId: Int = 0 + + private let logger = Logger(subsystem: "HPRTMP", category: "Player") + + + public init() {} + + public func publish(url: String, configure: PublishConfigure) async { + self.configure = configure + if socket != nil { + await socket.invalidate() + } + socket = await RTMPSocket() + await socket.setDelegate(delegate: self) + await socket.connect(url: url) + + status = .handShakeStart + } + + + public func invalidate() async { + // send closeStream + let closeStreamMessage = CloseStreamMessage(msgStreamId: connectId) + await socket.send(message: closeStreamMessage, firstType: true) + + // send deleteStream + let deleteStreamMessage = DeleteStreamMessage(msgStreamId: connectId) + await socket.send(message: deleteStreamMessage, firstType: true) + + await self.socket.invalidate() + self.status = .disconnected + } +} + +extension RTMPPlayerSession: RTMPSocketDelegate { + + + // publisher dont need implement + func socketGetMeta(_ socket: RTMPSocket, meta: MetaDataResponse) { + + } + + func socketStreamOutputAudio(_ socket: RTMPSocket, data: Data, timeStamp: Int64) { + + } + func socketStreamOutputVideo(_ socket: RTMPSocket, data: Data, timeStamp: Int64) { + + } + func socketStreamRecord(_ socket: RTMPSocket) {} + + func socketStreamPlayStart(_ socket: RTMPSocket) { + + + } + func socketStreamPause(_ socket: RTMPSocket, pause: Bool) { + + + } + + + func socketStreamPublishStart(_ socket: RTMPSocket) {} + + func socketConnectDone(_ socket: RTMPSocket) { + Task { + let message = CreateStreamMessage(encodeType: encodeType, transactionId: await transactionIdGenerator.nextId()) + await self.socket.messageHolder.register(transactionId: message.transactionId, message: message) + await socket.send(message: message, firstType: true) + + // make chunk size more bigger + let chunkSize: UInt32 = 128 * 6 + let size = ChunkSizeMessage(size: chunkSize) + await socket.send(message: size, firstType: true) + } + } + + func socketHandShakeDone(_ socket: RTMPSocket) { + Task { + status = .handShakeDone + + guard let urlInfo = await socket.urlInfo else { return } + let connect = ConnectMessage(encodeType: encodeType, + tcUrl: urlInfo.tcUrl, + appName: urlInfo.appName, + flashVer: "FMLE/3.0 (compatible; FMSc/1.0)", + fpad: false, + audio: .aac, + video: .h264) + await self.socket.messageHolder.register(transactionId: connect.transactionId, message: connect) + await self.socket.send(message: connect, firstType: true) + } + } + + func socketCreateStreamDone(_ socket: RTMPSocket, msgStreamId: Int) { + Task { + status = .connect + + let message = await PlayMessage(streamName: socket.urlInfo?.key ?? "") + + message.msgStreamId = msgStreamId + self.connectId = msgStreamId + await socket.send(message: message, firstType: true) + } + } + + func socketPinRequest(_ socket: RTMPSocket, data: Data) { + Task { + let message = UserControlMessage(type: .pingResponse, data: data, streamId: UInt16(connectId)) + await socket.send(message: message, firstType: true) + } + } + + func socketError(_ socket: RTMPSocket, err: RTMPError) { + Task { + await delegate?.sessionError(self, error: err) + } + } + + func socketPeerBandWidth(_ socket: RTMPSocket, size: UInt32) { + Task { + // send window ack message to server + await socket.send(message: WindowAckMessage(size: size), firstType: true) + } + } + + func socketDisconnected(_ socket: RTMPSocket) { + status = .disconnected + } + + func socketStreamStatistics(_ socket: RTMPSocket, statistics: TransmissionStatistics) { + } +} + From 9ad6dbbf33c9efbc83da5437889ba1815eb084ce Mon Sep 17 00:00:00 2001 From: Huiping Guo Date: Tue, 26 Sep 2023 01:31:30 +0900 Subject: [PATCH 2/4] return video audio data and pts --- Sources/HPRTMP/RTMPPlayerSession.swift | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/Sources/HPRTMP/RTMPPlayerSession.swift b/Sources/HPRTMP/RTMPPlayerSession.swift index f28f1f6..b090a7d 100644 --- a/Sources/HPRTMP/RTMPPlayerSession.swift +++ b/Sources/HPRTMP/RTMPPlayerSession.swift @@ -12,6 +12,8 @@ public protocol RTMPPlayerSessionDelegate: Actor { func sessionStatusChange(_ session: RTMPPlayerSession, status: RTMPPlayerSession.Status) func sessionError(_ session: RTMPPlayerSession, error: RTMPError) + func sessionVideo(_ session: RTMPPlayerSession, data: Data, timestamp: Int64) + func sessionAudio(_ session: RTMPPlayerSession, data: Data, timestamp: Int64) } public actor RTMPPlayerSession { @@ -57,9 +59,7 @@ public actor RTMPPlayerSession { private var socket: RTMPSocket! private let transactionIdGenerator = TransactionIdGenerator() - - private var configure: PublishConfigure? - + private var connectId: Int = 0 private let logger = Logger(subsystem: "HPRTMP", category: "Player") @@ -67,8 +67,7 @@ public actor RTMPPlayerSession { public init() {} - public func publish(url: String, configure: PublishConfigure) async { - self.configure = configure + public func play(url: String) async { if socket != nil { await socket.invalidate() } @@ -103,10 +102,14 @@ extension RTMPPlayerSession: RTMPSocketDelegate { } func socketStreamOutputAudio(_ socket: RTMPSocket, data: Data, timeStamp: Int64) { - + Task { + await delegate?.sessionAudio(self, data: data, timestamp: timeStamp) + } } func socketStreamOutputVideo(_ socket: RTMPSocket, data: Data, timeStamp: Int64) { - + Task { + await delegate?.sessionVideo(self, data: data, timestamp: timeStamp) + } } func socketStreamRecord(_ socket: RTMPSocket) {} From 43e2cb11a19cf0c56aa1539369e7beb81b1cabcc Mon Sep 17 00:00:00 2001 From: Huiping Guo Date: Tue, 17 Oct 2023 23:42:56 +0900 Subject: [PATCH 3/4] add AVCDecoderConfigurationRecord parser --- .../Util/AVCDecoderConfigurationRecord.swift | 150 ++++++++++++++++++ .../AVCDecoderConfigurationRecordTests.swift | 102 ++++++++++++ 2 files changed, 252 insertions(+) create mode 100644 Sources/HPRTMP/Util/AVCDecoderConfigurationRecord.swift create mode 100644 Tests/HPRTMPTests/util/AVCDecoderConfigurationRecordTests.swift diff --git a/Sources/HPRTMP/Util/AVCDecoderConfigurationRecord.swift b/Sources/HPRTMP/Util/AVCDecoderConfigurationRecord.swift new file mode 100644 index 0000000..02600d4 --- /dev/null +++ b/Sources/HPRTMP/Util/AVCDecoderConfigurationRecord.swift @@ -0,0 +1,150 @@ +// +// AVCDecoderConfigurationRecord.swift +// +// +// Created by 郭 輝平 on 2023/10/16. +// + +import Foundation + +public struct AVCDecoderConfigurationRecord { + // The version of the AVCDecoderConfigurationRecord, usually set to 1. + private(set) var configurationVersion: UInt8 = 1 + + // Indicates the profile code as per the H.264 specification. + // This is extracted from the SPS NAL unit. + private(set) var avcProfileIndication: UInt8 = 0 + + // Indicates the compatibility of the stream. + // This is also extracted from the SPS NAL unit. + private(set) var profileCompatibility: UInt8 = 0 + + // Indicates the level code as per the H.264 specification. + // This is extracted from the SPS NAL unit. + private(set) var avcLevelIndication: UInt8 = 0 + + // Specifies the NAL unit length size minus one. + // Default is 3, which means the NAL unit length is 4 bytes. + private(set) var lengthSizeMinusOne: UInt8 = 3 + + // An array containing the SPS NAL units. + private(set) var spsList: [Data] = [] + + // An array containing the PPS NAL units. + private(set) var ppsList: [Data] = [] + + // Initialize with avcDecoderConfigurationRecord data + init?(avcDecoderConfigurationRecord data: Data) { + guard data.count > 6 else { + print("Invalid AVCDecoderConfigurationRecord data") + return nil + } + + var index = 0 + + // Parse the header + self.configurationVersion = data[index]; index += 1 + self.avcProfileIndication = data[index]; index += 1 + self.profileCompatibility = data[index]; index += 1 + self.avcLevelIndication = data[index]; index += 1 + self.lengthSizeMinusOne = data[index] & 0x03; index += 1 // Last 2 bits + let numOfSPS = data[index] & 0x1F; index += 1 // Last 5 bits + + // Parse SPS + for _ in 0.. index + 1 else { + print("Invalid SPS data") + return nil + } + + let spsLength = Int(data[index]) << 8 | Int(data[index + 1]) + index += 2 + + guard data.count >= index + spsLength else { + print("Invalid SPS data") + return nil + } + + let spsData = data[index..<(index + spsLength)] + self.spsList.append(Data(spsData)) + index += spsLength + } + + // Parse PPS + guard data.count > index else { + print("Invalid PPS data") + return nil + } + + let numOfPPS = data[index]; index += 1 + + for _ in 0.. index + 1 else { + print("Invalid PPS data") + return nil + } + + let ppsLength = Int(data[index]) << 8 | Int(data[index + 1]) + index += 2 + + guard data.count >= index + ppsLength else { + print("Invalid PPS data") + return nil + } + + let ppsData = data[index..<(index + ppsLength)] + self.ppsList.append(Data(ppsData)) + index += ppsLength + } + } + + // Initialize with SPS and PPS data + init(sps: Data, pps: Data) { + self.avcProfileIndication = sps[1] + self.profileCompatibility = sps[2] + self.avcLevelIndication = sps[3] + + // In the context of media containers like MP4 or streaming protocols like HLS, the lengthSizeMinusOne value is often set to 3, meaning that each NALU length is represented using 4 bytes (3 + 1). However, this value can also be 0, 1, or 2, representing 1, 2, or 3 bytes respectively. + self.lengthSizeMinusOne = 3 + + self.spsList = [sps] + self.ppsList = [pps] + } + + // Method to generate avcDecoderConfigurationRecord data + func generateConfigurationRecord() -> Data { + var body = Data() + + body.append(configurationVersion) + body.append(avcProfileIndication) + body.append(profileCompatibility) + body.append(avcLevelIndication) + + body.append(0b11111100 | (self.lengthSizeMinusOne & 0b00000011)) + + /*sps*/ + + // numOfSequenceParameterSets + let numOfSequenceParameterSets = 0b11100000 | (UInt8(spsList.count) & 0b00011111) + body.append(Data([numOfSequenceParameterSets])) + + for sps in spsList { + // sequenceParameterSetLength + body.append(UInt16(sps.count).bigEndian.data) + // sequenceParameterSetNALUnit + body.append(Data(sps)) + } + + /*pps*/ + // numOfPictureParameterSets + body.append(UInt8(ppsList.count)) + for pps in ppsList { + // pictureParameterSetLength + body.append(UInt16(pps.count).bigEndian.data) + // pictureParameterSetNALUnit + body.append(Data(pps)) + } + + return body + } +} diff --git a/Tests/HPRTMPTests/util/AVCDecoderConfigurationRecordTests.swift b/Tests/HPRTMPTests/util/AVCDecoderConfigurationRecordTests.swift new file mode 100644 index 0000000..f59b5cd --- /dev/null +++ b/Tests/HPRTMPTests/util/AVCDecoderConfigurationRecordTests.swift @@ -0,0 +1,102 @@ +// +// File.swift +// +// +// Created by 郭 輝平 on 2023/10/16. +// + +import Foundation +import XCTest + +@testable import HPRTMP + +class AVCDecoderConfigurationRecordTests: XCTestCase { + + func testInitWithValidData() { + // Prepare a mock AVCDecoderConfigurationRecord data + let mockData: Data = Data([0x01, 0x42, 0x00, 0x1E, 0xFF, 0xE1, 0x00, 0x04, 0x67, 0x42, 0x00, 0x1E, 0x01, 0x00, 0x04, 0x68, 0x00, 0x00, 0x0D]) + + // Initialize AVCDecoderConfigurationRecord + let avcRecord = AVCDecoderConfigurationRecord(avcDecoderConfigurationRecord: mockData) + + // Validate + XCTAssertNotNil(avcRecord) + XCTAssertEqual(avcRecord?.configurationVersion, 1) + XCTAssertEqual(avcRecord?.avcProfileIndication, 66) + XCTAssertEqual(avcRecord?.profileCompatibility, 0) + XCTAssertEqual(avcRecord?.avcLevelIndication, 30) + XCTAssertEqual(avcRecord?.lengthSizeMinusOne, 3) + XCTAssertEqual(avcRecord?.ppsList.count, 1) + XCTAssertEqual(avcRecord?.spsList.count, 1) + XCTAssertEqual(avcRecord?.spsList.first, Data([103, 66, 0, 30])) + XCTAssertEqual(avcRecord?.ppsList.first, Data([104, 0, 0, 13])) + } + + func testInitWithInvalidData() { + // Prepare an invalid mock AVCDecoderConfigurationRecord data + let mockData: Data = Data([1, 66]) // Too short to be valid + + // Initialize AVCDecoderConfigurationRecord + let avcRecord = AVCDecoderConfigurationRecord(avcDecoderConfigurationRecord: mockData) + + // Validate + XCTAssertNil(avcRecord) + } + + func testInitWithSPSAndPPS() { + // Prepare mock SPS and PPS data + let sps: Data = Data([103, 66, 0, 30]) + let pps: Data = Data([104, 0, 0, 13]) + + // Initialize AVCDecoderConfigurationRecord + let avcRecord = AVCDecoderConfigurationRecord(sps: sps, pps: pps) + + // Validate + XCTAssertNotNil(avcRecord) + XCTAssertEqual(avcRecord.configurationVersion, 1) + XCTAssertEqual(avcRecord.avcProfileIndication, 66) + XCTAssertEqual(avcRecord.profileCompatibility, 0) + XCTAssertEqual(avcRecord.avcLevelIndication, 30) + XCTAssertEqual(avcRecord.lengthSizeMinusOne, 3) + XCTAssertEqual(avcRecord.ppsList.count, 1) + XCTAssertEqual(avcRecord.spsList.count, 1) + XCTAssertEqual(avcRecord.spsList.first, sps) + XCTAssertEqual(avcRecord.ppsList.first, pps) + } + + func testGenerateConfigurationRecord() { + // Prepare mock SPS and PPS data + let sps: Data = Data([103, 66, 0, 30]) + let pps: Data = Data([104, 0, 0, 13]) + + // Initialize AVCDecoderConfigurationRecord + let avcRecord = AVCDecoderConfigurationRecord(sps: sps, pps: pps) + + // Generate Configuration Record + let generatedData = avcRecord.generateConfigurationRecord() + + // Validate + // The expected data would depend on how you've implemented generateConfigurationRecord + // For this example, let's assume it concatenates all the fields and SPS/PPS data + var expectedData = Data() + expectedData.append(1) // configurationVersion + expectedData.append(sps[1]) // avcProfileIndication + expectedData.append(sps[2]) // profileCompatibility + expectedData.append(sps[3]) // avcLevelIndication + expectedData.append(0xFF) // 6 bits reserved (111111) + 2 bits lengthSizeMinusOne (11) + expectedData.append(0xE1) // 3 bits reserved (111) + 5 bits numOfSPS (00001) + + // SPS + let spsLengthData = UInt16(sps.count).bigEndian.data + expectedData.append(spsLengthData) + expectedData.append(sps) + + // PPS + expectedData.append(1) // numOfPPS + let ppsLengthData = UInt16(pps.count).bigEndian.data + expectedData.append(ppsLengthData) + expectedData.append(pps) + + XCTAssertEqual(generatedData, expectedData) + } +} From 84ebc8fd1025dccf89f1b6f7ea4549f400f8f70c Mon Sep 17 00:00:00 2001 From: Huiping Guo Date: Tue, 17 Oct 2023 23:43:15 +0900 Subject: [PATCH 4/4] make chunksize bigger --- Sources/HPRTMP/RTMPPublishSession.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/HPRTMP/RTMPPublishSession.swift b/Sources/HPRTMP/RTMPPublishSession.swift index d8fde9c..c050deb 100644 --- a/Sources/HPRTMP/RTMPPublishSession.swift +++ b/Sources/HPRTMP/RTMPPublishSession.swift @@ -144,7 +144,7 @@ extension RTMPPublishSession: RTMPSocketDelegate { await socket.send(message: message, firstType: true) // make chunk size more bigger - let chunkSize: UInt32 = 128 * 6 + let chunkSize: UInt32 = 128 * 60 let size = ChunkSizeMessage(size: chunkSize) await socket.send(message: size, firstType: true) }