diff --git a/Sources/HPRTMP/Message/CommandMessage.swift b/Sources/HPRTMP/Message/CommandMessage.swift index 17ab606..ee0fac1 100644 --- a/Sources/HPRTMP/Message/CommandMessage.swift +++ b/Sources/HPRTMP/Message/CommandMessage.swift @@ -254,3 +254,26 @@ class PauseMessage: CommandMessage { } } +class PlayMessage: CommandMessage { + let streamName: String + init(encodeType: ObjectEncodingType = .amf0, streamName: String) { + self.streamName = streamName + super.init(encodeType: encodeType, commandName: "play", transactionId: commonTransactionId.stream) + } + + override var payload: Data { + var data = Data() + let encoder = AMF0Encoder() + + data.append((encoder.encode(commandName)) ?? Data()) + data.append((encoder.encode(Double(transactionId))) ?? Data()) + data.append((encoder.encodeNil())) + data.append((encoder.encode(streamName)) ?? Data()) + + data.append((encoder.encode(Double(-1))) ?? Data()) + data.append((encoder.encode(Double(-1))) ?? Data()) + data.append((encoder.encode(false)) ?? Data()) + + return data + } +} diff --git a/Sources/HPRTMP/RTMPPlayerSession.swift b/Sources/HPRTMP/RTMPPlayerSession.swift new file mode 100644 index 0000000..f28f1f6 --- /dev/null +++ b/Sources/HPRTMP/RTMPPlayerSession.swift @@ -0,0 +1,194 @@ +// +// RTMPPlayerSession.swift +// +// +// Created by 郭 輝平 on 2023/09/25. +// + +import Foundation +import os + +public protocol RTMPPlayerSessionDelegate: Actor { + func sessionStatusChange(_ session: RTMPPlayerSession, status: RTMPPlayerSession.Status) + func sessionError(_ session: RTMPPlayerSession, error: RTMPError) + +} + +public actor RTMPPlayerSession { + public enum Status: Equatable { + case unknown + case handShakeStart + case handShakeDone + case connect + case playStart + case failed(err: RTMPError) + case disconnected + + public static func ==(lhs: Status, rhs: Status) -> Bool { + switch (lhs, rhs) { + case (.unknown, .unknown), + (.connect, .connect), + (.playStart, .playStart), + (.disconnected, .disconnected): + return true + case let (.failed(err1), .failed(err2)): + return err1.localizedDescription == err2.localizedDescription + default: + return false + } + } + } + + public weak var delegate: RTMPPlayerSessionDelegate? + public func setDelegate(_ delegate: RTMPPlayerSessionDelegate?) { + self.delegate = delegate + } + + public var status: Status = .unknown { + didSet { + Task { + await delegate?.sessionStatusChange(self, status: status) + } + } + } + + public let encodeType: ObjectEncodingType = .amf0 + + private var socket: RTMPSocket! + + private let transactionIdGenerator = TransactionIdGenerator() + + private var configure: PublishConfigure? + + private var connectId: Int = 0 + + private let logger = Logger(subsystem: "HPRTMP", category: "Player") + + + public init() {} + + public func publish(url: String, configure: PublishConfigure) async { + self.configure = configure + if socket != nil { + await socket.invalidate() + } + socket = await RTMPSocket() + await socket.setDelegate(delegate: self) + await socket.connect(url: url) + + status = .handShakeStart + } + + + public func invalidate() async { + // send closeStream + let closeStreamMessage = CloseStreamMessage(msgStreamId: connectId) + await socket.send(message: closeStreamMessage, firstType: true) + + // send deleteStream + let deleteStreamMessage = DeleteStreamMessage(msgStreamId: connectId) + await socket.send(message: deleteStreamMessage, firstType: true) + + await self.socket.invalidate() + self.status = .disconnected + } +} + +extension RTMPPlayerSession: RTMPSocketDelegate { + + + // publisher dont need implement + func socketGetMeta(_ socket: RTMPSocket, meta: MetaDataResponse) { + + } + + func socketStreamOutputAudio(_ socket: RTMPSocket, data: Data, timeStamp: Int64) { + + } + func socketStreamOutputVideo(_ socket: RTMPSocket, data: Data, timeStamp: Int64) { + + } + func socketStreamRecord(_ socket: RTMPSocket) {} + + func socketStreamPlayStart(_ socket: RTMPSocket) { + + + } + func socketStreamPause(_ socket: RTMPSocket, pause: Bool) { + + + } + + + func socketStreamPublishStart(_ socket: RTMPSocket) {} + + func socketConnectDone(_ socket: RTMPSocket) { + Task { + let message = CreateStreamMessage(encodeType: encodeType, transactionId: await transactionIdGenerator.nextId()) + await self.socket.messageHolder.register(transactionId: message.transactionId, message: message) + await socket.send(message: message, firstType: true) + + // make chunk size more bigger + let chunkSize: UInt32 = 128 * 6 + let size = ChunkSizeMessage(size: chunkSize) + await socket.send(message: size, firstType: true) + } + } + + func socketHandShakeDone(_ socket: RTMPSocket) { + Task { + status = .handShakeDone + + guard let urlInfo = await socket.urlInfo else { return } + let connect = ConnectMessage(encodeType: encodeType, + tcUrl: urlInfo.tcUrl, + appName: urlInfo.appName, + flashVer: "FMLE/3.0 (compatible; FMSc/1.0)", + fpad: false, + audio: .aac, + video: .h264) + await self.socket.messageHolder.register(transactionId: connect.transactionId, message: connect) + await self.socket.send(message: connect, firstType: true) + } + } + + func socketCreateStreamDone(_ socket: RTMPSocket, msgStreamId: Int) { + Task { + status = .connect + + let message = await PlayMessage(streamName: socket.urlInfo?.key ?? "") + + message.msgStreamId = msgStreamId + self.connectId = msgStreamId + await socket.send(message: message, firstType: true) + } + } + + func socketPinRequest(_ socket: RTMPSocket, data: Data) { + Task { + let message = UserControlMessage(type: .pingResponse, data: data, streamId: UInt16(connectId)) + await socket.send(message: message, firstType: true) + } + } + + func socketError(_ socket: RTMPSocket, err: RTMPError) { + Task { + await delegate?.sessionError(self, error: err) + } + } + + func socketPeerBandWidth(_ socket: RTMPSocket, size: UInt32) { + Task { + // send window ack message to server + await socket.send(message: WindowAckMessage(size: size), firstType: true) + } + } + + func socketDisconnected(_ socket: RTMPSocket) { + status = .disconnected + } + + func socketStreamStatistics(_ socket: RTMPSocket, statistics: TransmissionStatistics) { + } +} +