Skip to content

Commit

Permalink
issue ossrs#222: fix audio stream stutter btween ovelay TS file;
Browse files Browse the repository at this point in the history
1. let continuity_counter continually between overlay ts;
2. copy audio data when transcoding ts;
  • Loading branch information
suzp1984 committed Nov 21, 2024
1 parent 9b475d9 commit 580c403
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 2 deletions.
33 changes: 31 additions & 2 deletions platform/transcript.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ossrs/go-oryx-lib/errors"
ohttp "github.com/ossrs/go-oryx-lib/http"
"github.com/ossrs/go-oryx-lib/logger"

// Use v8 because we use Go 1.16+, while v9 requires Go 1.18+
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
Expand Down Expand Up @@ -1295,6 +1296,9 @@ type TranscriptSegment struct {
// Whether user clear the ASR text of this segment.
UserClearASR bool `json:"uca,omitempty"`

// The map that host the pid -> last cc
OverlayTSLastCC map[uint16]uint8 `json:"start_cc,omitempty"`

// The cost to transcode the TS file to audio file.
CostExtractAudio time.Duration `json:"eac,omitempty"`
// The cost to do ASR, converting speech to text.
Expand Down Expand Up @@ -1402,6 +1406,16 @@ func (v *TranscriptQueue) first() *TranscriptSegment {
return v.Segments[0]
}

func (v *TranscriptQueue) find_by(seq_no uint64) *TranscriptSegment {
for i := len(v.Segments) - 1; i >= 0; i-- {
if v.Segments[i].OverlayFile.SeqNo == seq_no {
return v.Segments[i]
}
}

return nil
}

func (v *TranscriptQueue) clearSubtitle(tsid string) error {
v.lock.Lock()
defer v.lock.Unlock()
Expand Down Expand Up @@ -1594,7 +1608,7 @@ func (v *TranscriptTask) OnTsSegment(ctx context.Context, msg *SrsOnHlsObject) e
func() {
// We must not update the queue, when persistence goroutine is working.
v.lock.Lock()
v.lock.Unlock()
defer v.lock.Unlock()

v.LiveQueue.enqueue(&TranscriptSegment{
Msg: msg.Msg,
Expand Down Expand Up @@ -1978,7 +1992,7 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error {
args = append(args, strings.Fields(videoCodecParams)...)
// Generate other parameters for FFmpeg.
args = append(args, []string{
"-c:a", "aac",
"-c:a", "copy",
"-copyts", // To keep the pts not changed.
"-y", overlayFile.File,
}...)
Expand All @@ -2004,6 +2018,21 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error {
}
overlayFile.Size = uint64(stats.Size())

// recaculate the continuity_counter of overlayFile
// 1. get previous segment in overlayQueue
// 2. adjust current ts segment's continuity_counter
// 2. change segment.OverlayTSLastCC
previous_ts_cc := map[uint16]uint8{}
if previous_segment := v.OverlayQueue.find_by(overlayFile.SeqNo - 1); previous_segment != nil {
previous_ts_cc = previous_segment.OverlayTSLastCC
}

if cc, err := overlayFile.AdjustCC(previous_ts_cc); err != nil {
logger.Wf(ctx, "Error when Adjust Overlay TS file %v", overlayFile.File)
} else {
segment.OverlayTSLastCC = cc
}

// Dequeue the segment from live queue and attach to asr queue.
func() {
v.lock.Lock()
Expand Down
91 changes: 91 additions & 0 deletions platform/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,97 @@ func (v *TsFile) String() string {
)
}

// Adjust ts packet's continuity_counter by previous ts segment.
func (v *TsFile) AdjustCC(cc map[uint16]uint8) (map[uint16]uint8, error) {
const BUF_SIZE = 188
const TS_VIDEO_AVC_PID = 256
const TS_AUDIO_AAC_PID = 257
const TS_AUDIO_MP3_PID = 258

tmp_file_name := v.File + ".tmp"

err := func() error {
file, err := os.Open(v.File)
if err != nil {
return err
}
defer file.Close()

tmp_file, err := os.Create(tmp_file_name)
if err != nil {
return err
}
defer tmp_file.Close()

buffer := make([]byte, BUF_SIZE)

for {
bytes_read, err := file.Read(buffer)
if err != nil {
if err == io.EOF {
return nil
}

return err
}

if bytes_read != BUF_SIZE {
return errors.Errorf("Read TS packet with size %v", bytes_read)
}

if sync_byte := buffer[0]; sync_byte != 0x47 {
return errors.Errorf("TS packet sync byte is not 0x47")
}

// transport_error_indicator := uint8(buffer[1] & 0x80 >> 7)
// payload_unit_start_indicator := uint8(buffer[1] & 0x40 >> 6)
// transport_priority := uint8(buffer[1] & 0x20 >> 5)
pid := uint16(buffer[1]&0x1f)<<8 + uint16(buffer[2])
// transport_scrambling_control := uint8(buffer[3] & 0xC0 >> 6)
adaptation_field_control := uint8(buffer[3] & 0x30 >> 4)
continuity_counter := uint8(buffer[3] & 0x0f)

// check whether ts packet has payload: adaptation_field_control == 01 or 11
has_payload := (adaptation_field_control & 0x01) == 1
is_AV_PID := pid == TS_VIDEO_AVC_PID || pid == TS_AUDIO_AAC_PID || pid == TS_AUDIO_MP3_PID
if has_payload && is_AV_PID {
if counter, hasKey := cc[pid]; hasKey {
if continuity_counter != counter+1 {
continuity_counter = counter + 1
if continuity_counter > 15 {
continuity_counter = 0
}

buffer[3] = (buffer[3] & 0xf0) | continuity_counter
}

cc[pid] = continuity_counter
} else {
cc[pid] = continuity_counter
}
}

if _, err := tmp_file.Write(buffer); err != nil {
return err
}
}
}()

if err != nil {
return nil, err
}

if _, err := os.Stat(tmp_file_name); err != nil {
return nil, err
}

if err := os.Rename(tmp_file_name, v.File); err != nil {
return nil, err
}

return cc, nil
}

// M3u8VoDArtifact is a HLS VoD object. Because each Dvr/Vod/RecordM3u8Stream might be DVR to many VoD file,
// each is an M3u8VoDArtifact. For example, when user publish live/livestream, there is a Dvr/Vod/RecordM3u8Stream and
// M3u8VoDArtifact, then user unpublish stream and after some seconds a VoD file is generated by M3u8VoDArtifact. Then
Expand Down

0 comments on commit 580c403

Please sign in to comment.