From cdeb5d8f5314736b9581bdd3672788c7ef40a50d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Duval?= Date: Tue, 2 Jan 2024 19:17:25 +0100 Subject: [PATCH] improve AdapterIO from experimental media kit to flush read buffer parts, thus avoiding constant memory leak. --- source/AdapterIO.cpp | 491 ++++++++++++++++++++++++++++++++++++++++ source/AdapterIO.h | 100 ++++++++ source/Makefile | 1 + source/StreamIO.h | 2 +- source/StreamPlayer.cpp | 8 +- source/StreamPlayer.h | 1 + 6 files changed, 601 insertions(+), 2 deletions(-) create mode 100644 source/AdapterIO.cpp create mode 100644 source/AdapterIO.h diff --git a/source/AdapterIO.cpp b/source/AdapterIO.cpp new file mode 100644 index 0000000..ab0a5da --- /dev/null +++ b/source/AdapterIO.cpp @@ -0,0 +1,491 @@ +/* + * Copyright 2016 Dario Casalinuovo. All rights reserved. + * Distributed under the terms of the MIT License. + * + */ + +#include "AdapterIO.h" + +#include + +#include + +#include "MediaDebug.h" + + +#define TIMEOUT_QUANTA 100000 + + +class RelativePositionIO : public BPositionIO { +public: + RelativePositionIO(BAdapterIO* owner, BPositionIO* buffer, + bigtime_t timeout) + : + BPositionIO(), + fOwner(owner), + fBackPosition(0), + fStartOffset(0), + fBuffer(buffer), + fTimeout(timeout) + { + } + + virtual ~RelativePositionIO() + { + delete fBuffer; + } + + status_t ResetStartOffset(off_t offset) + { + status_t ret = fBuffer->SetSize(0); + if (ret != B_OK) + return ret; + + fBackPosition = 0; + fStartOffset = offset; + return B_OK; + } + + status_t FlushRead(BPositionIO* buffer, const void* oldBuffer, size_t oldLength) + { + AutoWriteLocker _(fLock); + off_t position = fBuffer->Position(); + buffer->WriteAt(0, (void*)((addr_t)oldBuffer + position), oldLength - position); + buffer->Seek(fBuffer->Position() - position, SEEK_SET); + fBackPosition -= position; + fStartOffset += position; + SetBuffer(buffer); + return B_OK; + } + + status_t EvaluatePosition(off_t position, off_t totalSize) + { + if (position < 0) + return B_ERROR; + + if (position < fStartOffset) + return B_RESOURCE_UNAVAILABLE; + + if (totalSize > 0 && position > totalSize) { + // This is an endless stream, we don't know + // how much data will come and when, we could + // block on that. + if (IsMutable()) + return B_WOULD_BLOCK; + else + return B_ERROR; + } + + return B_OK; + } + + status_t WaitForData(off_t position, off_t size) + { + off_t bufferSize = 0; + status_t ret = GetSize(&bufferSize); + if (ret != B_OK) + return B_ERROR; + + bigtime_t totalTimeOut = 0; + + while (bufferSize < position + size) { + // We are not running, no luck to receive + // more data, let's return and avoid locking. + if (!fOwner->IsRunning()) + return B_NOT_SUPPORTED; + + if (fTimeout != B_INFINITE_TIMEOUT && totalTimeOut >= fTimeout) + return B_TIMED_OUT; + + snooze(TIMEOUT_QUANTA); + + totalTimeOut += TIMEOUT_QUANTA; + GetSize(&bufferSize); + } + return B_OK; + } + + virtual ssize_t ReadAt(off_t position, void* buffer, + size_t size) + { + AutoReadLocker _(fLock); + + return fBuffer->ReadAt( + _PositionToRelative(position), buffer, size); + + } + + virtual ssize_t WriteAt(off_t position, + const void* buffer, size_t size) + { + AutoWriteLocker _(fLock); + + return fBuffer->WriteAt( + _PositionToRelative(position), buffer, size); + } + + virtual off_t Seek(off_t position, uint32 seekMode) + { + AutoWriteLocker _(fLock); + + if (seekMode == SEEK_SET) + return fBuffer->Seek(_PositionToRelative(position), seekMode); + return fBuffer->Seek(position, seekMode); + } + + virtual off_t Position() const + { + AutoReadLocker _(fLock); + + return _RelativeToPosition(fBuffer->Position()); + } + + virtual status_t SetSize(off_t size) + { + AutoWriteLocker _(fLock); + + return fBuffer->SetSize(_PositionToRelative(size)); + } + + virtual status_t GetSize(off_t* size) const + { + AutoReadLocker _(fLock); + + // We use the backend position to make our buffer + // independant of that. + *size = _RelativeToPosition(fBackPosition); + + return B_OK; + } + + ssize_t BackWrite(const void* buffer, size_t size) + { + AutoWriteLocker _(fLock); + + off_t ret = fBuffer->WriteAt(fBackPosition, buffer, size); + fBackPosition += ret; + return ret; + } + + void SetBuffer(BPositionIO* buffer) + { + delete fBuffer; + fBuffer = buffer; + } + + bool IsStreaming() const + { + int32 flags = 0; + fOwner->GetFlags(&flags); + return ((flags & B_MEDIA_STREAMING) == B_MEDIA_STREAMING); + } + + bool IsMutable() const + { + int32 flags = 0; + fOwner->GetFlags(&flags); + return ((flags & B_MEDIA_MUTABLE_SIZE) == B_MEDIA_MUTABLE_SIZE); + } + + bool IsSeekable() const + { + int32 flags = 0; + fOwner->GetFlags(&flags); + return ((flags & B_MEDIA_SEEKABLE) == B_MEDIA_SEEKABLE); + } + + const BPositionIO* Buffer() const + { + return fBuffer; + } + +private: + + off_t _PositionToRelative(off_t position) const + { + return position - fStartOffset; + } + + off_t _RelativeToPosition(off_t position) const + { + return position + fStartOffset; + } + + BAdapterIO* fOwner; + off_t fBackPosition; + off_t fStartOffset; + + BPositionIO* fBuffer; + + mutable RWLocker fLock; + + bigtime_t fTimeout; +}; + + +BAdapterIO::BAdapterIO(int32 flags, bigtime_t timeout) + : + fFlags(flags), + fBuffer(NULL), + fTotalSize(0), + fOpened(false), + fSeekSem(-1), + fInputAdapter(NULL) +{ + CALLED(); + + fBuffer = new RelativePositionIO(this, new BMallocIO(), timeout); +} + + +BAdapterIO::BAdapterIO(const BAdapterIO &) +{ + // copying not allowed... +} + + +BAdapterIO::~BAdapterIO() +{ + CALLED(); + + delete fInputAdapter; + delete fBuffer; +} + + +void +BAdapterIO::GetFlags(int32* flags) const +{ + CALLED(); + + *flags = fFlags; +} + + +ssize_t +BAdapterIO::ReadAt(off_t position, void* buffer, size_t size) +{ + CALLED(); + + status_t ret = _EvaluateWait(position, size); + if (ret != B_OK) + return ret; + + return fBuffer->ReadAt(position, buffer, size); +} + + +ssize_t +BAdapterIO::WriteAt(off_t position, const void* buffer, size_t size) +{ + CALLED(); + + return fBuffer->WriteAt(position, buffer, size); +} + + +off_t +BAdapterIO::Seek(off_t position, uint32 seekMode) +{ + CALLED(); + + off_t absolutePosition = 0; + off_t size = 0; + + if (seekMode == SEEK_CUR) + absolutePosition = Position()+position; + else if (seekMode == SEEK_END) { + if (GetSize(&size) != B_OK) + return B_NOT_SUPPORTED; + + absolutePosition = size-position; + } + + status_t ret = _EvaluateWait(absolutePosition, 0); + + if (ret == B_RESOURCE_UNAVAILABLE && fBuffer->IsStreaming() + && fBuffer->IsSeekable()) { + + fSeekSem = create_sem(0, "BAdapterIO seek sem"); + + if (SeekRequested(absolutePosition) != B_OK) + return B_NOT_SUPPORTED; + + TRACE("BAdapterIO::Seek: Locking on backend seek\n"); + acquire_sem(fSeekSem); + TRACE("BAdapterIO::Seek: Seek completed!\n"); + fBuffer->ResetStartOffset(absolutePosition); + } else if (ret != B_OK) + return B_NOT_SUPPORTED; + + return fBuffer->Seek(position, seekMode); +} + + +off_t +BAdapterIO::Position() const +{ + CALLED(); + + return fBuffer->Position(); +} + + +status_t +BAdapterIO::SetSize(off_t size) +{ + CALLED(); + + if (!fBuffer->IsMutable()) { + fTotalSize = size; + return B_OK; + } + + return fBuffer->SetSize(size); +} + + +status_t +BAdapterIO::GetSize(off_t* size) const +{ + CALLED(); + + if (!fBuffer->IsMutable()) { + *size = fTotalSize; + return B_OK; + } + + return fBuffer->GetSize(size); +} + + +status_t +BAdapterIO::Open() +{ + CALLED(); + + fOpened = true; + return B_OK; +} + + +bool +BAdapterIO::IsRunning() const +{ + return fOpened; +} + + +void +BAdapterIO::SeekCompleted() +{ + CALLED(); + release_sem(fSeekSem); + delete_sem(fSeekSem); + fSeekSem = -1; +} + + +status_t +BAdapterIO::SetBuffer(BPositionIO* buffer) +{ + // We can't change the buffer while we + // are running. + if (fOpened) + return B_ERROR; + + fBuffer->SetBuffer(buffer); + return B_OK; +} + + +status_t +BAdapterIO::FlushRead() +{ + BMallocIO* buffer = new BMallocIO(); + BMallocIO* oldBuffer = (BMallocIO*)fBuffer->Buffer(); + fBuffer->FlushRead(buffer, oldBuffer->Buffer(), oldBuffer->BufferLength()); + return B_OK; +} + + +BInputAdapter* +BAdapterIO::BuildInputAdapter() +{ + if (fInputAdapter != NULL) + return fInputAdapter; + + fInputAdapter = new BInputAdapter(this); + return fInputAdapter; +} + + +status_t +BAdapterIO::SeekRequested(off_t position) +{ + CALLED(); + + return B_ERROR; +} + + +ssize_t +BAdapterIO::BackWrite(const void* buffer, size_t size) +{ + return fBuffer->BackWrite(buffer, size); +} + + +status_t +BAdapterIO::_EvaluateWait(off_t pos, off_t size) +{ + CALLED(); + + off_t totalSize = 0; + if (GetSize(&totalSize) != B_OK) + TRACE("BAdapterIO::ReadAt: Can't get our size!\n"); + + TRACE("BAdapterIO::_EvaluateWait TS %" B_PRId64 " P %" B_PRId64 + " S %" B_PRId64 "\n", totalSize, pos, size); + + status_t err = fBuffer->EvaluatePosition(pos, totalSize); + + TRACE("BAdapterIO::_EvaluateWait: %s\n", strerror(err)); + + if (err != B_OK && err != B_WOULD_BLOCK) + return err; + + TRACE("BAdapterIO::_EvaluateWait: waiting for data\n"); + + return fBuffer->WaitForData(pos, size); +} + + +BInputAdapter::BInputAdapter(BAdapterIO* io) + : + fIO(io) +{ +} + + +BInputAdapter::~BInputAdapter() +{ +} + + +ssize_t +BInputAdapter::Write(const void* buffer, size_t size) +{ + return fIO->BackWrite(buffer, size); +} + + +// FBC +void BAdapterIO::_ReservedAdapterIO1() {} +void BAdapterIO::_ReservedAdapterIO2() {} +void BAdapterIO::_ReservedAdapterIO3() {} +void BAdapterIO::_ReservedAdapterIO4() {} +void BAdapterIO::_ReservedAdapterIO5() {} + +void BInputAdapter::_ReservedInputAdapter1() {} +void BInputAdapter::_ReservedInputAdapter2() {} diff --git a/source/AdapterIO.h b/source/AdapterIO.h new file mode 100644 index 0000000..c0273a8 --- /dev/null +++ b/source/AdapterIO.h @@ -0,0 +1,100 @@ +/* + * Copyright 2016 Haiku, Inc. All rights reserved. + * Distributed under the terms of the MIT License. + */ +#ifndef _ADAPTER_IO_H +#define _ADAPTER_IO_H + + +#include +#include +#include +#include + + +class BAdapterIO; +class RelativePositionIO; + + +class BInputAdapter { +public: + virtual ssize_t Write(const void* buffer, size_t size); + +private: + friend class BAdapterIO; + + BInputAdapter(BAdapterIO* io); + virtual ~BInputAdapter(); + + BAdapterIO* fIO; + + virtual void _ReservedInputAdapter1(); + virtual void _ReservedInputAdapter2(); + + uint32 _reserved[2]; +}; + + +class BAdapterIO : public BMediaIO { +public: + BAdapterIO( + int32 flags, + bigtime_t timeout); + virtual ~BAdapterIO(); + + virtual void GetFlags(int32* flags) const; + + virtual ssize_t ReadAt(off_t position, void* buffer, + size_t size); + virtual ssize_t WriteAt(off_t position, + const void* buffer, size_t size); + + virtual off_t Seek(off_t position, uint32 seekMode); + virtual off_t Position() const; + + virtual status_t SetSize(off_t size); + virtual status_t GetSize(off_t* size) const; + + virtual status_t Open(); + + virtual bool IsRunning() const; + + void SeekCompleted(); + status_t SetBuffer(BPositionIO* buffer); + + status_t FlushRead(); + + BInputAdapter* BuildInputAdapter(); + +protected: + friend class BInputAdapter; + + virtual status_t SeekRequested(off_t position); + + ssize_t BackWrite(const void* buffer, size_t size); + +private: + status_t _EvaluateWait(off_t pos, off_t size); + + int32 fFlags; + + RelativePositionIO* fBuffer; + off_t fTotalSize; + bool fOpened; + sem_id fSeekSem; + + BInputAdapter* fInputAdapter; + + BAdapterIO(const BAdapterIO&); + BAdapterIO& operator=(const BAdapterIO&); + + virtual void _ReservedAdapterIO1(); + virtual void _ReservedAdapterIO2(); + virtual void _ReservedAdapterIO3(); + virtual void _ReservedAdapterIO4(); + virtual void _ReservedAdapterIO5(); + + uint32 _reserved[5]; +}; + +#endif // _ADAPTER_IO_H diff --git a/source/Makefile b/source/Makefile index ec37703..ac2b490 100644 --- a/source/Makefile +++ b/source/Makefile @@ -32,6 +32,7 @@ APP_MIME_SIG = application/x-vnd.Fishpond-StreamRadio # same name (source.c or source.cpp) are included from different directories. # Also note that spaces in folder names do not work well with this Makefile. SRCS = \ + AdapterIO.cpp \ HttpUtils.cpp \ MainWindow.cpp \ RadioApp.cpp \ diff --git a/source/StreamIO.h b/source/StreamIO.h index 04b533c..5a0bab0 100644 --- a/source/StreamIO.h +++ b/source/StreamIO.h @@ -22,7 +22,7 @@ #include #include -#include +#include "AdapterIO.h" #include "override.h" diff --git a/source/StreamPlayer.cpp b/source/StreamPlayer.cpp index 834141f..6b399c1 100644 --- a/source/StreamPlayer.cpp +++ b/source/StreamPlayer.cpp @@ -37,7 +37,8 @@ StreamPlayer::StreamPlayer(Station* station, BLooper* notify) fNotify(notify), fMediaFile(NULL), fPlayer(NULL), - fState(StreamPlayer::Stopped) + fState(StreamPlayer::Stopped), + fFlushCount(0) { TRACE("Trying to set player for stream %s\n", station->StreamUrl().UrlString().String()); @@ -181,6 +182,11 @@ StreamPlayer::_GetDecodedChunk(void* cookie, void* buffer, size_t size, / (format.format & media_raw_audio_format::B_AUDIO_SIZE_MASK); fMediaFile->TrackAt(0)->ReadFrames( buffer, &reqFrames, &player->fHeader, &player->fInfo); + + if (player->fFlushCount++ > 1000) { + player->fFlushCount = 0; + player->fStream->FlushRead(); + } } diff --git a/source/StreamPlayer.h b/source/StreamPlayer.h index f6db508..461db7a 100644 --- a/source/StreamPlayer.h +++ b/source/StreamPlayer.h @@ -76,6 +76,7 @@ class StreamPlayer : private BLocker { media_format fDecodedFormat; media_header fHeader; media_decode_info fInfo; + int32 fFlushCount; };