Skip to content

Commit

Permalink
Add PacingHandler
Browse files Browse the repository at this point in the history
MediaHandler that can be used to pace packet delivery

Resolves #1017
  • Loading branch information
Sean-Der committed Apr 17, 2024
1 parent fe7bec8 commit 1d553f6
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ set(LIBDATACHANNEL_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/pacinghandler.cpp
)

set(LIBDATACHANNEL_HEADERS
Expand Down Expand Up @@ -123,6 +124,7 @@ set(LIBDATACHANNEL_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/pacinghandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/version.h
)

Expand Down
51 changes: 51 additions & 0 deletions include/rtc/pacinghandler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright (c) 2020 Staz Modrzynski
* Copyright (c) 2020 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#ifndef RTC_PACING_HANDLER_H
#define RTC_PACING_HANDLER_H

#if RTC_ENABLE_MEDIA

#include "mediahandler.hpp"
#include "utils.hpp"

#include <atomic>
#include <queue>

namespace rtc {

// Paced sending of RTP packets. Takes a stream of RTP packets that can an
// uneven bitrate. It then delivers these packets in a smoother manner by
// sending a fixed size of them on an interval
class RTC_CPP_EXPORT PacingHandler : public MediaHandler {
public:
PacingHandler(double mBytesPerSecond, std::chrono::milliseconds sendInterval);

void outgoing(message_vector &messages, const message_callback &send) override;

private:
std::atomic<bool> mHaveScheduled = false;

double mBytesPerSecond;
double mBudget;

std::chrono::milliseconds mSendInterval;
std::chrono::time_point<std::chrono::high_resolution_clock> mLastRun;

std::mutex mMutex;
std::queue<message_ptr> mRtpBuffer = {};

void schedule(const message_callback &send);
};

} // namespace rtc

#endif // RTC_ENABLE_MEDIA

#endif // RTC_PACING_HANDLER_H
1 change: 1 addition & 0 deletions include/rtc/rtc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "h265rtppacketizer.hpp"
#include "mediahandler.hpp"
#include "plihandler.hpp"
#include "pacinghandler.hpp"
#include "rtcpnackresponder.hpp"
#include "rtcpreceivingsession.hpp"
#include "rtcpsrreporter.hpp"
Expand Down
12 changes: 10 additions & 2 deletions src/impl/track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ void Track::incoming(message_ptr message) {

message_vector messages{std::move(message)};
if (auto handler = getMediaHandler())
handler->incomingChain(messages, [this](message_ptr m) { transportSend(m); });
handler->incomingChain(messages, [weak_this = weak_from_this()](message_ptr m) {
if (auto locked = weak_this.lock()) {
locked->transportSend(m);
}
});

for (auto &m : messages) {
// Tail drop if queue is full
Expand Down Expand Up @@ -175,7 +179,11 @@ bool Track::outgoing(message_ptr message) {

if (handler) {
message_vector messages{std::move(message)};
handler->outgoingChain(messages, [this](message_ptr m) { transportSend(m); });
handler->outgoingChain(messages, [weak_this = weak_from_this()](message_ptr m) {
if (auto locked = weak_this.lock()) {
locked->transportSend(m);
}
});
bool ret = false;
for (auto &m : messages)
ret = transportSend(std::move(m));
Expand Down
68 changes: 68 additions & 0 deletions src/pacinghandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#if RTC_ENABLE_MEDIA

#include <memory>

#include "pacinghandler.hpp"

#include "impl/internals.hpp"
#include "impl/threadpool.hpp"

namespace rtc {

PacingHandler::PacingHandler(double bytesPerSecond, std::chrono::milliseconds sendInterval)
: mBytesPerSecond(bytesPerSecond), mSendInterval(sendInterval){};

void PacingHandler::schedule(const message_callback &send) {
if (!mHaveScheduled.exchange(true)) {
return;
}

impl::ThreadPool::Instance().schedule(mSendInterval, [weak_this = weak_from_this(), send]() {
if (auto locked = std::dynamic_pointer_cast<PacingHandler>(weak_this.lock())) {
const std::lock_guard<std::mutex> lock(locked->mMutex);
locked->mHaveScheduled.store(false);

// Update the budget and cap it
auto newBudget = std::chrono::duration<double>(
std::chrono::high_resolution_clock::now() - locked->mLastRun) *
locked->mBytesPerSecond;
auto maxBudget =
std::chrono::duration<double>(locked->mSendInterval) * locked->mBytesPerSecond;
locked->mBudget = std::min(locked->mBudget + newBudget.count(), maxBudget.count());

// Send packets while there is budget, allow a single partial packet over budget
while (!locked->mRtpBuffer.empty() && locked->mBudget > 0) {
auto size = int(locked->mRtpBuffer.front()->size());
send(std::move(locked->mRtpBuffer.front()));
locked->mRtpBuffer.pop();
locked->mBudget -= size;
}

locked->mLastRun = std::chrono::high_resolution_clock::now();
}
});
}

void PacingHandler::outgoing(message_vector &messages, const message_callback &send) {

std::lock_guard<std::mutex> lock(mMutex);

while (messages.size() > 0) {
mRtpBuffer.push(std::move(messages.front()));
messages.erase(messages.begin());
}

schedule(send);
}

} // namespace rtc

#endif /* RTC_ENABLE_MEDIA */

0 comments on commit 1d553f6

Please sign in to comment.