Skip to content

Commit

Permalink
fix: avpacket queue iterator invalid
Browse files Browse the repository at this point in the history
  • Loading branch information
ireader committed Nov 2, 2024
1 parent 27ff66f commit 53254e9
Show file tree
Hide file tree
Showing 5 changed files with 422 additions and 472 deletions.
1 change: 1 addition & 0 deletions libmpeg/include/mpeg-proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ enum EPSI_STREAM_TYPE
PSI_STREAM_AUDIO_DTS = 0x8a, // ffmpeg/libavformat/mpegts.h
PSI_STREAM_VIDEO_DIRAC = 0xd1, // ffmpeg/libavformat/mpegts.h
PSI_STREAM_VIDEO_AVS3 = 0xd4, // ffmpeg/libavformat/mpegts.h
PSI_STREAM_AUDIO_AVS3 = 0xd5, // avs3-p6
PSI_STREAM_VIDEO_VC1 = 0xea, // ffmpeg/libavformat/mpegts.h
PSI_STREAM_VIDEO_SVAC = 0x80, // GBT 25724-2010 SVAC(2014)
PSI_STREAM_AUDIO_SVAC = 0x9B, // GBT 25724-2010 SVAC(2014)
Expand Down
291 changes: 128 additions & 163 deletions librtsp/test/media/avpacket-queue.cpp
Original file line number Diff line number Diff line change
@@ -1,163 +1,128 @@
#include "avpacket-queue.h"
#include "sys/sync.hpp"
#include <deque>

struct avpacket_queue_t
{
std::deque<avpacket_t*>::size_type maxsize;
std::deque<avpacket_t*> q;
std::deque<avpacket_t*>::iterator it;
ThreadLocker locker;
ThreadEvent event;
};

struct avpacket_queue_t* avpacket_queue_create(int size)
{
struct avpacket_queue_t* q = new struct avpacket_queue_t;
q->maxsize = size;
q->it = q->q.begin();
return q;
}

void avpacket_queue_destroy(struct avpacket_queue_t* q)
{
avpacket_queue_clear(q);
delete q;
}

void avpacket_queue_clear(struct avpacket_queue_t* q)
{
AutoThreadLocker locker(q->locker);
while (!q->q.empty())
{
struct avpacket_t* pkt = q->q.front();
avpacket_release(pkt);
q->q.pop_front();
}
q->it = q->q.begin();
}

int avpacket_queue_count(struct avpacket_queue_t* q)
{
AutoThreadLocker locker(q->locker);
return (int)q->q.size();
}

int avpacket_queue_pop(struct avpacket_queue_t* q)
{
struct avpacket_t* pkt;
{
AutoThreadLocker locker(q->locker);
if (q->q.empty())
return -1;

pkt = q->q.front();
q->q.pop_front();
q->event.Signal();
}

if (*q->it == pkt)
q->it++;

avpacket_release(pkt);
return 0;
}

struct avpacket_t* avpacket_queue_front(struct avpacket_queue_t* q)
{
struct avpacket_t* pkt;
AutoThreadLocker locker(q->locker);
if (q->q.empty())
return NULL;

pkt = q->q.front();
avpacket_addref(pkt);
return pkt;
}

int avpacket_queue_push(struct avpacket_queue_t* q, struct avpacket_t* pkt)
{
AutoThreadLocker locker(q->locker);
if (q->maxsize > 0 && q->q.size() >= q->maxsize)
return -1;

avpacket_addref(pkt);
q->q.push_back(pkt);
q->event.Signal();
return 0;
}

struct avpacket_t* avpacket_queue_front_wait(struct avpacket_queue_t* q, int ms)
{
q->locker.Lock();
if (q->q.empty())
{
q->locker.Unlock();
if (0 != q->event.TimeWait(ms))
return NULL;
q->locker.Lock();
}

if (q->q.empty())
{
q->locker.Unlock();
return NULL;
}

struct avpacket_t* pkt = q->q.front();
avpacket_addref(pkt);
q->locker.Unlock();
return pkt;
}

int avpacket_queue_push_wait(struct avpacket_queue_t* q, struct avpacket_t* pkt, int ms)
{
q->locker.Lock();
if (q->maxsize > 0 && q->q.size() >= q->maxsize)
{
q->locker.Unlock();
if (0 != q->event.TimeWait(ms))
return -1;
q->locker.Lock();
}

if (q->maxsize > 0 && q->q.size() >= q->maxsize)
{
q->locker.Unlock();
return -1;
}

avpacket_addref(pkt);
q->q.push_back(pkt);
q->locker.Unlock();
return 0;
}

struct avpacket_t* avpacket_queue_cur(struct avpacket_queue_t* q)
{
struct avpacket_t* pkt;
AutoThreadLocker locker(q->locker);
if (q->q.empty())
return NULL;

if (q->it == q->q.end())
return NULL;

pkt = *q->it++;
avpacket_addref(pkt);
return pkt;
}

bool avpacket_queue_end(struct avpacket_queue_t* q)
{
AutoThreadLocker locker(q->locker);

return q->it == q->q.end();
}

void avpacket_queue_reset(struct avpacket_queue_t* q)
{
AutoThreadLocker locker(q->locker);

q->it = q->q.begin();
}
#include "avpacket-queue.h"
#include "sys/sync.hpp"
#include <queue>

struct avpacket_queue_t
{
std::queue<avpacket_t*>::size_type maxsize;
std::queue<avpacket_t*> q;
ThreadLocker locker;
ThreadEvent event;
};

struct avpacket_queue_t* avpacket_queue_create(int size)
{
struct avpacket_queue_t* q = new struct avpacket_queue_t;
q->maxsize = size;
return q;
}

void avpacket_queue_destroy(struct avpacket_queue_t* q)
{
avpacket_queue_clear(q);
delete q;
}

void avpacket_queue_clear(struct avpacket_queue_t* q)
{
AutoThreadLocker locker(q->locker);
while (!q->q.empty())
{
struct avpacket_t* pkt = q->q.front();
avpacket_release(pkt);
q->q.pop();
}
}

int avpacket_queue_count(struct avpacket_queue_t* q)
{
AutoThreadLocker locker(q->locker);
return (int)q->q.size();
}

int avpacket_queue_pop(struct avpacket_queue_t* q)
{
struct avpacket_t* pkt;
{
AutoThreadLocker locker(q->locker);
if (q->q.empty())
return -1;

pkt = q->q.front();
q->q.pop();
q->event.Signal();
}

avpacket_release(pkt);
return 0;
}

struct avpacket_t* avpacket_queue_front(struct avpacket_queue_t* q)
{
struct avpacket_t* pkt;
AutoThreadLocker locker(q->locker);
if (q->q.empty())
return NULL;

pkt = q->q.front();
avpacket_addref(pkt);
return pkt;
}

int avpacket_queue_push(struct avpacket_queue_t* q, struct avpacket_t* pkt)
{
AutoThreadLocker locker(q->locker);
if (q->q.size() >= q->maxsize)
return -1;

avpacket_addref(pkt);
q->q.push(pkt);
q->event.Signal();
return 0;
}

struct avpacket_t* avpacket_queue_front_wait(struct avpacket_queue_t* q, int ms)
{
q->locker.Lock();
if (q->q.empty())
{
q->locker.Unlock();
if (0 != q->event.TimeWait(ms))
return NULL;
q->locker.Lock();
}

if (q->q.empty())
{
q->locker.Unlock();
return NULL;
}

struct avpacket_t* pkt = q->q.front();
avpacket_addref(pkt);
q->locker.Unlock();
return pkt;
}

int avpacket_queue_push_wait(struct avpacket_queue_t* q, struct avpacket_t* pkt, int ms)
{
q->locker.Lock();
if (q->q.size() >= q->maxsize)
{
q->locker.Unlock();
if (0 != q->event.TimeWait(ms))
return -1;
q->locker.Lock();
}

if (q->q.size() >= q->maxsize)
{
q->locker.Unlock();
return -1;
}

avpacket_addref(pkt);
q->q.push(pkt);
q->locker.Unlock();
return 0;
}
94 changes: 43 additions & 51 deletions librtsp/test/media/avpacket-queue.h
Original file line number Diff line number Diff line change
@@ -1,51 +1,43 @@
#ifndef _avpacket_queue_h_
#define _avpacket_queue_h_

#include "avpacket.h"
#include <stdint.h>

struct avpacket_queue_t;

struct avpacket_queue_t* avpacket_queue_create(int size);
void avpacket_queue_destroy(struct avpacket_queue_t* q);

void avpacket_queue_clear(struct avpacket_queue_t* q);
int avpacket_queue_count(struct avpacket_queue_t* q);
int avpacket_queue_pop(struct avpacket_queue_t* q);

struct avpacket_t* avpacket_queue_front(struct avpacket_queue_t* q);
int avpacket_queue_push(struct avpacket_queue_t* q, struct avpacket_t* pkt);

struct avpacket_t* avpacket_queue_front_wait(struct avpacket_queue_t* q, int ms);
int avpacket_queue_push_wait(struct avpacket_queue_t* q, struct avpacket_t* pkt, int ms);

struct avpacket_t* avpacket_queue_cur(struct avpacket_queue_t* q);
bool avpacket_queue_end(struct avpacket_queue_t* q);
void avpacket_queue_reset(struct avpacket_queue_t* q);

#if defined(__cplusplus)
class AVPacketQueue
{
public:
AVPacketQueue(int size) :m_pkts(avpacket_queue_create(size)) {}
~AVPacketQueue() { if (m_pkts) avpacket_queue_destroy(m_pkts); }

public:
void Clear() { avpacket_queue_clear(m_pkts); }
int Count() const { return avpacket_queue_count(m_pkts); }

int Pop() { return avpacket_queue_pop(m_pkts); }
int Push(struct avpacket_t* pkt) { return avpacket_queue_push(m_pkts, pkt); }
int PushWait(struct avpacket_t* pkt, int ms) { return avpacket_queue_push_wait(m_pkts, pkt, ms); }
struct avpacket_t* Front() { return avpacket_queue_front(m_pkts); }
struct avpacket_t* FrontWait(int ms) { return avpacket_queue_front_wait(m_pkts, ms); }

struct avpacket_t* Cur() { return avpacket_queue_cur(m_pkts); };
bool End() { return avpacket_queue_end(m_pkts); };
void Reset() { return avpacket_queue_reset(m_pkts); };

private:
struct avpacket_queue_t* m_pkts;
};
#endif
#endif /* !_avpacket_queue_h_*/
#ifndef _avpacket_queue_h_
#define _avpacket_queue_h_

#include "avpacket.h"
#include <stdint.h>

struct avpacket_queue_t;

struct avpacket_queue_t* avpacket_queue_create(int size);
void avpacket_queue_destroy(struct avpacket_queue_t* q);

void avpacket_queue_clear(struct avpacket_queue_t* q);
int avpacket_queue_count(struct avpacket_queue_t* q);
int avpacket_queue_pop(struct avpacket_queue_t* q);

struct avpacket_t* avpacket_queue_front(struct avpacket_queue_t* q);
int avpacket_queue_push(struct avpacket_queue_t* q, struct avpacket_t* pkt);

struct avpacket_t* avpacket_queue_front_wait(struct avpacket_queue_t* q, int ms);
int avpacket_queue_push_wait(struct avpacket_queue_t* q, struct avpacket_t* pkt, int ms);

#if defined(__cplusplus)
class AVPacketQueue
{
public:
AVPacketQueue(int size) :m_pkts(avpacket_queue_create(size)) {}
~AVPacketQueue() { if (m_pkts) avpacket_queue_destroy(m_pkts); }

public:
void Clear() { avpacket_queue_clear(m_pkts); }
int Count() const { return avpacket_queue_count(m_pkts); }

int Pop() { return avpacket_queue_pop(m_pkts); }
int Push(struct avpacket_t* pkt) { return avpacket_queue_push(m_pkts, pkt); }
int PushWait(struct avpacket_t* pkt, int ms) { return avpacket_queue_push_wait(m_pkts, pkt, ms); }
struct avpacket_t* Front() { return avpacket_queue_front(m_pkts); }
struct avpacket_t* FrontWait(int ms) { return avpacket_queue_front_wait(m_pkts, ms); }

private:
struct avpacket_queue_t* m_pkts;
};
#endif
#endif /* !_avpacket_queue_h_*/
Loading

0 comments on commit 53254e9

Please sign in to comment.