Skip to content

Commit

Permalink
Update UDP output to support a "blocking" mode to slow down the outpu…
Browse files Browse the repository at this point in the history
…t a bit. Make that the default as very few shows would be large enough to need the non-blocking mode.
  • Loading branch information
dkulp committed Dec 5, 2024
1 parent 6694544 commit 622e08d
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 31 deletions.
72 changes: 48 additions & 24 deletions src/channeloutput/UDPOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ UDPOutput::UDPOutput(unsigned int startChannel, unsigned int channelCount) :
doneWorkCount(0),
numWorkThreads(0),
runWorkThreads(true),
useThreadedOutput(true) {
useThreadedOutput(true),
blockingOutput(false) {
INSTANCE = this;
}
UDPOutput::~UDPOutput() {
Expand Down Expand Up @@ -284,9 +285,10 @@ int UDPOutput::Init(Json::Value config) {
break;
}
}

if (config.isMember("threaded")) {
useThreadedOutput = config["threaded"].asInt() ? true : false;
int style = config["threaded"].asInt();
useThreadedOutput = style == 1 || style == 3;
blockingOutput = style == 0 || style == 1;
}
if (config.isMember("interface")) {
outInterface = config["interface"].asString();
Expand Down Expand Up @@ -432,7 +434,6 @@ void UDPOutput::GetRequiredChannelRanges(const std::function<void(int, int)>& ad
void UDPOutput::addOutput(UDPOutputData* out) {
outputs.push_back(out);
}

int UDPOutput::SendMessages(unsigned int socketKey, SendSocketInfo* socketInfo, std::vector<struct mmsghdr>& sendmsgs) {
errno = 0;
struct mmsghdr* msgs = &sendmsgs[0];
Expand All @@ -444,30 +445,43 @@ int UDPOutput::SendMessages(unsigned int socketKey, SendSocketInfo* socketInfo,
int newSockKey = socketKey;
int sendSocket = socketInfo->sockets[socketInfo->curSocket];
errno = 0;
int oc = sendmmsg(sendSocket, msgs, msgCount, MSG_DONTWAIT);
// uint64_t st = GetTimeMicros();

int outputCount = 0;
if (oc > 0) {
outputCount = oc;
}
if (outputCount != msgCount) {
// in many cases, a simple thread yield will allow the network stack
// to flush some data and free up space, give that a chance first
std::this_thread::yield();
oc = sendmmsg(sendSocket, &msgs[outputCount], msgCount - outputCount, MSG_DONTWAIT);
if (blockingOutput) {
for (int x = 0; x < msgCount; x++) {
ssize_t s = sendmsg(sendSocket, &msgs[x].msg_hdr, 0);
if (s != -1) {
++outputCount;
} else {
// didn't send, we'll yield and re-send
--x;
std::this_thread::yield();
}
}
} else {
int oc = sendmmsg(sendSocket, msgs, msgCount, MSG_DONTWAIT);
if (oc > 0) {
outputCount += oc;
}
if (outputCount != msgCount) {
// in many cases, a simple thread yield will allow the network stack
// to flush some data and free up space, give that a chance first
std::this_thread::sleep_for(std::chrono::microseconds(100));
oc = sendmmsg(sendSocket, &msgs[outputCount], msgCount - outputCount, MSG_DONTWAIT);
while (oc > 0) {
outputCount += oc;
std::this_thread::sleep_for(std::chrono::microseconds(100));
oc = sendmmsg(sendSocket, &msgs[outputCount], msgCount - outputCount, MSG_DONTWAIT);
}
}
}
// uint64_t ed = GetTimeMicros();
// uint64_t tm = ed - st;
// printf("MSG: %d/%d %d \n", outputCount, msgCount, (int)tm);

int errCount = 0;
while (outputCount != msgCount) {
if (errno != 0) {
LogErr(VB_CHANNELOUT, "sendmmsg() failed for UDP output (key: %X Socket: %d output count: %d/%d) with error: %d %s\n",
socketKey, sendSocket,
outputCount, msgCount,
errno,
strerror(errno));
}

if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (socketKey != BROADCAST_MESSAGES_KEY) {
++socketInfo->curSocket;
Expand All @@ -490,12 +504,19 @@ int UDPOutput::SendMessages(unsigned int socketKey, SendSocketInfo* socketInfo,
}
++errCount;
if (errCount >= 10) {
LogErr(VB_CHANNELOUT, "sendmmsg() failed for UDP output (key: %X Socket: %d output count: %d/%d) with error: %d %s\n",
socketKey, sendSocket,
outputCount, msgCount,
errno,
strerror(errno));
return outputCount;
}
errno = 0;
int oc = sendmmsg(sendSocket, &msgs[outputCount], msgCount - outputCount, MSG_DONTWAIT);
if (oc > 0) {
while (oc > 0) {
outputCount += oc;
std::this_thread::sleep_for(std::chrono::microseconds(100));
oc = sendmmsg(sendSocket, &msgs[outputCount], msgCount - outputCount, MSG_DONTWAIT);
}
}
return outputCount;
Expand Down Expand Up @@ -703,6 +724,9 @@ void UDPOutput::PingControllers(bool failedOnly) {
}
void UDPOutput::DumpConfig() {
ChannelOutput::DumpConfig();
LogDebug(VB_CHANNELOUT, " Interface : %s\n", outInterface.c_str());
LogDebug(VB_CHANNELOUT, " Threaded : %d\n", useThreadedOutput);
LogDebug(VB_CHANNELOUT, " Blocking : %d\n", blockingOutput);
for (auto u : outputs) {
u->DumpConfig();
}
Expand Down Expand Up @@ -756,8 +780,8 @@ int UDPOutput::createSocket(int port, bool broadCast) {
close(sendSocket);
return -1;
}
// make sure the send buffer is actually set to a reasonable size
int bufSize = 512 * 1024;
// make sure the send buffer is actually set to a reasonable size for non-blocking mode
int bufSize = 1024 * blockingOutput ? 4 : 512;
setsockopt(sendSocket, SOL_SOCKET, SO_SNDBUF, &bufSize, sizeof(bufSize));
// these sockets are for sending only, don't need a large receive buffer so
// free some memory by setting to just a single page
Expand Down
1 change: 1 addition & 0 deletions src/channeloutput/UDPOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,5 @@ class UDPOutput : public ChannelOutput {
std::atomic_int numWorkThreads;
volatile bool runWorkThreads;
bool useThreadedOutput;
bool blockingOutput;
};
6 changes: 5 additions & 1 deletion src/util/GPIOUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ static const std::set<std::string> PLATFORM_IGNORES{
"gpio-brcmstb@107d508520",
"gpio-brcmstb@107d517c00",
"gpio-brcmstb@107d517c20",
"pinctrl-rp1" // Pi5's external GPIO chip
"pinctrl-rp1", // Pi5's external GPIO chip
"tps65219-gpio", // AM62x
"4201000.gpio",
"600000.gpio",
"601000.gpio"
};
// No platform information on how to control pins
static std::string PROCESS_NAME = "FPPD";
Expand Down
9 changes: 7 additions & 2 deletions www/co-universes.php
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,14 @@ function PopulateInterfaces()

</div>
<div class="col-md-auto form-inline" <? if ($uiLevel < 1) { ?> style="display:none;" <? } ?>>
<div><i class="fas fa-fw fa-graduation-cap ui-level-1"></i><b> Multi-Threaded:</b></div>
<div><i class="fas fa-fw fa-graduation-cap ui-level-1"></i><b> Sending:</b></div>
<div>
<input id="E131ThreadedOutput" type="checkbox" checked>
<select id="E131ThreadedOutput">
<option value="0">Single-Threaded Blocking</option>
<option value="1" selected>Multi-Threaded Blocking</option>
<option value="2">Single-Threaded Non-Blocking</option>
<option value="3">Multi-Threaded Non-Blocking</option>
</select>
</div>
</div>
<div class="col-md-auto form-inline">
Expand Down
8 changes: 4 additions & 4 deletions www/js/fpp.js
Original file line number Diff line number Diff line change
Expand Up @@ -3454,7 +3454,7 @@ function populateUniverseData (data, reload, input) {
$('#selE131interfaces').val(channelData.interface).prop('selected', true);
}
if (channelData.hasOwnProperty('threaded')) {
$('#E131ThreadedOutput').prop('checked', channelData.threaded);
$('#E131ThreadedOutput').val(channelData.threaded).prop('selected', true);
}
}
UniverseCount = channelData.universes.length;
Expand Down Expand Up @@ -3972,9 +3972,9 @@ function postUniverseJSON (input) {
if (!input) {
// output only properties
output.interface = document.getElementById('selE131interfaces').value;
output.threaded = document.getElementById('E131ThreadedOutput').checked
? 1
: 0;
output.threaded = parseInt(
document.getElementById('E131ThreadedOutput').value
);
} else {
// input only properties
output.timeout = parseInt(document.getElementById('bridgeTimeoutMS').value);
Expand Down

0 comments on commit 622e08d

Please sign in to comment.