Skip to content

Commit

Permalink
fix: ps file read and write for rtsp (#356)
Browse files Browse the repository at this point in the history
* fix: ps file read and write for rtsp

* fix: ps file seek for rtsp test

* fix: ps file seek reset for rtsp test
  • Loading branch information
nanguantong authored Sep 29, 2024
1 parent 4e1a89c commit fdebfd9
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 22 deletions.
55 changes: 45 additions & 10 deletions librtsp/test/media/avpacket-queue.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include "avpacket-queue.h"
#include "sys/sync.hpp"
#include <queue>
#include <deque>

struct avpacket_queue_t
{
std::queue<avpacket_t*>::size_type maxsize;
std::queue<avpacket_t*> q;
std::deque<avpacket_t*>::size_type maxsize;
std::deque<avpacket_t*> q;
std::deque<avpacket_t*>::iterator it;
ThreadLocker locker;
ThreadEvent event;
};
Expand All @@ -14,6 +15,7 @@ struct avpacket_queue_t* avpacket_queue_create(int size)
{
struct avpacket_queue_t* q = new struct avpacket_queue_t;
q->maxsize = size;
q->it = q->q.begin();
return q;
}

Expand All @@ -30,8 +32,9 @@ void avpacket_queue_clear(struct avpacket_queue_t* q)
{
struct avpacket_t* pkt = q->q.front();
avpacket_release(pkt);
q->q.pop();
q->q.pop_front();
}
q->it = q->q.begin();
}

int avpacket_queue_count(struct avpacket_queue_t* q)
Expand All @@ -49,10 +52,13 @@ int avpacket_queue_pop(struct avpacket_queue_t* q)
return -1;

pkt = q->q.front();
q->q.pop();
q->q.pop_front();
q->event.Signal();
}

if (*q->it == pkt)
q->it++;

avpacket_release(pkt);
return 0;
}
Expand All @@ -72,11 +78,11 @@ struct avpacket_t* avpacket_queue_front(struct avpacket_queue_t* q)
int avpacket_queue_push(struct avpacket_queue_t* q, struct avpacket_t* pkt)
{
AutoThreadLocker locker(q->locker);
if (q->q.size() >= q->maxsize)
if (q->maxsize > 0 && q->q.size() >= q->maxsize)
return -1;

avpacket_addref(pkt);
q->q.push(pkt);
q->q.push_back(pkt);
q->event.Signal();
return 0;
}
Expand Down Expand Up @@ -107,22 +113,51 @@ struct avpacket_t* avpacket_queue_front_wait(struct avpacket_queue_t* q, int ms)
int avpacket_queue_push_wait(struct avpacket_queue_t* q, struct avpacket_t* pkt, int ms)
{
q->locker.Lock();
if (q->q.size() >= q->maxsize)
if (q->maxsize > 0 && q->q.size() >= q->maxsize)
{
q->locker.Unlock();
if (0 != q->event.TimeWait(ms))
return -1;
q->locker.Lock();
}

if (q->q.size() >= q->maxsize)
if (q->maxsize > 0 && q->q.size() >= q->maxsize)
{
q->locker.Unlock();
return -1;
}

avpacket_addref(pkt);
q->q.push(pkt);
q->q.push_back(pkt);
q->locker.Unlock();
return 0;
}

struct avpacket_t* avpacket_queue_cur(struct avpacket_queue_t* q)
{
struct avpacket_t* pkt;
AutoThreadLocker locker(q->locker);
if (q->q.empty())
return NULL;

if (q->it == q->q.end())
return NULL;

pkt = *q->it++;
avpacket_addref(pkt);
return pkt;
}

bool avpacket_queue_end(struct avpacket_queue_t* q)
{
AutoThreadLocker locker(q->locker);

return q->it == q->q.end();
}

void avpacket_queue_reset(struct avpacket_queue_t* q)
{
AutoThreadLocker locker(q->locker);

q->it = q->q.begin();
}
8 changes: 8 additions & 0 deletions librtsp/test/media/avpacket-queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ int avpacket_queue_push(struct avpacket_queue_t* q, struct avpacket_t* pkt);
struct avpacket_t* avpacket_queue_front_wait(struct avpacket_queue_t* q, int ms);
int avpacket_queue_push_wait(struct avpacket_queue_t* q, struct avpacket_t* pkt, int ms);

struct avpacket_t* avpacket_queue_cur(struct avpacket_queue_t* q);
bool avpacket_queue_end(struct avpacket_queue_t* q);
void avpacket_queue_reset(struct avpacket_queue_t* q);

#if defined(__cplusplus)
class AVPacketQueue
{
Expand All @@ -36,6 +40,10 @@ class AVPacketQueue
struct avpacket_t* Front() { return avpacket_queue_front(m_pkts); }
struct avpacket_t* FrontWait(int ms) { return avpacket_queue_front_wait(m_pkts, ms); }

struct avpacket_t* Cur() { return avpacket_queue_cur(m_pkts); };
bool End() { return avpacket_queue_end(m_pkts); };
void Reset() { return avpacket_queue_reset(m_pkts); };

private:
struct avpacket_queue_t* m_pkts;
};
Expand Down
2 changes: 1 addition & 1 deletion librtsp/test/media/h264-file-source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ int H264FileSource::GetSDPMedia(std::string& sdp) const
if(parameters.empty())
{
snprintf(base64, sizeof(base64), pattern,
RTP_PAYLOAD_H264, RTP_PAYLOAD_H264,RTP_PAYLOAD_H264,
RTP_PAYLOAD_H264, RTP_PAYLOAD_H264, RTP_PAYLOAD_H264,
(unsigned int)(it->first[1]), (unsigned int)(it->first[2]), (unsigned int)(it->first[3]));
sdp = base64;
}
Expand Down
2 changes: 1 addition & 1 deletion librtsp/test/media/h265-file-source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ int H265FileSource::GetSDPMedia(std::string& sdp) const
if(parameters.empty())
{
snprintf(base64, sizeof(base64), pattern,
RTP_PAYLOAD_H265, RTP_PAYLOAD_H265,RTP_PAYLOAD_H265,
RTP_PAYLOAD_H265, RTP_PAYLOAD_H265, RTP_PAYLOAD_H265,
(unsigned int)(it->first[1]), (unsigned int)(it->first[2]), (unsigned int)(it->first[3]));
sdp = base64;
}
Expand Down
210 changes: 210 additions & 0 deletions librtsp/test/media/ps-file-reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
#include "ps-file-reader.h"
#include "mpeg-util.h"
#include "mpeg-types.h"
#include "mov-format.h"
#include "avcodecid.h"
#include "rtsp-payloads.h"
#include <inttypes.h>
#include <map>

PSFileReader::PSFileReader(const char* file)
:m_fp(NULL), m_pos(0), m_v_start_ts(-1), m_v_end_ts(-1), m_duration(0), m_demuxer(NULL)
{
memset(&m_utils, 0, sizeof(m_utils));
m_fp = fopen(file, "rb");
if (m_fp)
{
static struct ps_demuxer_notify_t notify = {
PSOnStream,
};
m_demuxer = ps_demuxer_create(PSOnRead, this);
ps_demuxer_set_notify(m_demuxer, &notify, this);

m_pkts = std::shared_ptr<AVPacketQueue>(new AVPacketQueue(-1));

Init();
}
}

PSFileReader::~PSFileReader()
{
avpktutil_destroy(&m_utils);

if (m_demuxer)
{
ps_demuxer_destroy(m_demuxer);
m_demuxer = NULL;
}

if (m_fp)
fclose(m_fp);
}

int PSFileReader::Init()
{
int n, i = 0, r = 0;
while ((n = fread(m_packet + i, 1, sizeof(m_packet) - i, m_fp)) > 0)
{
r = ps_demuxer_input(m_demuxer, m_packet, n + i);
assert(r == n + i);
memmove(m_packet, m_packet + r, n + i - r);
i = n + i - r;
}
while (i > 0 && r > 0)
{
r = ps_demuxer_input(m_demuxer, m_packet, i);
memmove(m_packet, m_packet + r, i - r);
i -= r;
}

if (m_v_start_ts >= 0 && m_v_end_ts >= 0)
{
m_duration = (m_v_end_ts - m_v_start_ts) / 90;
}

return 0;
}

int PSFileReader::Seek(int64_t& dts)
{
int64_t fisrt_dts = -1;

while (1)
{
std::shared_ptr<avpacket_t> pkt(m_pkts->Cur(), avpacket_release);
if (NULL == pkt)
return -1;

if (fisrt_dts == -1)
fisrt_dts = pkt->dts / 90;

if (dts < fisrt_dts)
break;

if (dts >= (pkt->dts / 90))
{
// only audio
if (m_v_start_ts < 0)
return 0;

if (pkt->flags & AVPACKET_FLAG_KEY)
return 0;
}
}

m_pkts->Reset();
return 0;
}

int PSFileReader::OnPacket(struct avpacket_t* pkt)
{
int ret = m_pkts->Push(pkt);
m_pkts->Reset();

return ret;
}

int PSFileReader::GetNextFrame(int64_t& pts, int64_t& dts, const uint8_t*& ptr, size_t& bytes, int& codecid, int& flags)
{
if (m_pkts->End())
return -1; // file end

std::shared_ptr<avpacket_t> pkt(m_pkts->Cur(), avpacket_release);

ptr = pkt->data;
bytes = pkt->size;
pts = pkt->pts;
dts = pkt->dts;
flags = pkt->flags;
codecid = pkt->stream->stream_codecid;

return 0;
}

void PSFileReader::PSOnStream(void* param, int stream, int codecid, const void* extra, int bytes, int finish)
{
printf("stream %d, codecid: %d, finish: %s\n", stream, codecid, finish ? "true" : "false");

PSFileReader* self = (PSFileReader*)param;
int r = avpayload_find_by_mpeg2(codecid);
if (r == -1)
return;

AVPACKET_CODEC_ID avcodecid = s_payloads[r].codecid;
if (avcodecid >= AVCODEC_VIDEO_MPEG1 && avcodecid <= AVCODEC_VIDEO_SVAC)
{
struct avstream_t* avstream = avpktutil_addvideo(&self->m_utils, stream, avcodecid, 0, 0, extra, bytes);
avstream->stream_codecid = codecid;
}
else if (avcodecid >= AVCODEC_AUDIO_PCM && avcodecid <= AVCODEC_AUDIO_SVAC)
{
struct avstream_t* avstream = avpktutil_addaudio(&self->m_utils, stream, avcodecid, 0, 0, 0, extra, bytes);
avstream->stream_codecid = codecid;
}
}

inline const char* ftimestamp(int64_t t, char* buf)
{
if (PTS_NO_VALUE == t)
{
sprintf(buf, "(null)");
}
else
{
t /= 90;
sprintf(buf, "%d:%02d:%02d.%03d", (int)(t / 3600000), (int)((t / 60000) % 60), (int)((t / 1000) % 60), (int)(t % 1000));
}
return buf;
}

int PSFileReader::PSOnRead(void* param, int stream, int avtype, int flags, int64_t pts, int64_t dts, const void* data, size_t bytes)
{
PSFileReader* self = (PSFileReader*)param;
static std::map<int, std::pair<int64_t, int64_t>> s_streams;
static char s_pts[64], s_dts[64];

auto it = s_streams.find(stream);
if (it == s_streams.end())
it = s_streams.insert(std::make_pair(stream, std::pair<int64_t, int64_t>(pts, dts))).first;

if (mpeg_stream_type_audio(avtype))
{
//assert(0 == a_dts || dts >= a_dts);
printf("[A] pts: %s(%" PRId64 "), dts: %s(%" PRId64 "), diff: %03d/%03d, size: %u\n",
ftimestamp(pts, s_pts), pts, ftimestamp(dts, s_dts), dts, (int)(pts - it->second.first) / 90,
(int)(dts - it->second.second) / 90, (unsigned int)bytes);
}
else if (mpeg_stream_type_video(avtype))
{
//assert(0 == v_dts || dts >= v_dts);
printf("[V] pts: %s(%" PRId64 "), dts: %s(%" PRId64 "), diff: %03d/%03d, size: %u%s\n",
ftimestamp(pts, s_pts), pts, ftimestamp(dts, s_dts), dts, (int)(pts - it->second.first) / 90,
(int)(dts - it->second.second) / 90, (unsigned int)bytes, (flags & MPEG_FLAG_IDR_FRAME) ? " [I]" : "");

if (self->m_v_start_ts == -1)
self->m_v_start_ts = dts < 0 ? pts : dts;
self->m_v_end_ts = dts < 0 ? pts : dts;
}
else
{
//assert(0);
//assert(0 == x_dts || dts >= x_dts);
printf("[X] pts: %s(%" PRId64 "), dts: %s(%" PRId64 "), diff: %03d/%03d\n",
ftimestamp(pts, s_pts), pts, ftimestamp(dts, s_dts), dts, (int)(pts - it->second.first), (int)(dts - it->second.second));
}

it->second = std::make_pair(pts, dts);

for (int i = 0; i < self->m_utils.count; i++)
{
if (self->m_utils.streams[i]->stream == stream)
{
struct avpacket_t* pkt = NULL;
avpktutil_input(&self->m_utils, self->m_utils.streams[i], data, bytes, pts, dts, flags, &pkt);
self->OnPacket(pkt);
return 0;
}
}

return -1;
}
Loading

0 comments on commit fdebfd9

Please sign in to comment.