Skip to content

Commit

Permalink
[ntcore] Various fixes and cleanups (wpilibsuite#4544)
Browse files Browse the repository at this point in the history
* NetworkTableInstance: set handle to 0 after destroy
* Fix multiple notifications of local values
* Detect mismatch between handles
* Server: fix setting min period when no topics
* Limit maximum number of subscribers/publishers/listeners
   This helps find resource leaks and prevents them from causing excessive
   slowdowns/crashes.  The limit on each is currently set to 512.
* Don't use std::swap in move operation
  • Loading branch information
PeterJohnson authored Nov 5, 2022
1 parent 837415a commit 4ba16db
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 40 deletions.
3 changes: 2 additions & 1 deletion ntcore/src/generate/java/NetworkTableInstance.java.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public final class NetworkTableInstance implements AutoCloseable {
if (m_owned && m_handle != 0) {
m_listeners.close();
NetworkTablesJNI.destroyInstance(m_handle);
m_handle = 0;
}
}

Expand Down Expand Up @@ -986,5 +987,5 @@ public final class NetworkTableInstance implements AutoCloseable {
}

private boolean m_owned;
private final int m_handle;
private int m_handle;
}
86 changes: 82 additions & 4 deletions ntcore/src/main/native/cpp/LocalStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@

using namespace nt;

// maximum number of local publishers / subscribers to any given topic
static constexpr size_t kMaxPublishers = 512;
static constexpr size_t kMaxSubscribers = 512;
static constexpr size_t kMaxMultiSubscribers = 512;
static constexpr size_t kMaxListeners = 512;

namespace {

// Utility wrapper for making a set-like vector
Expand Down Expand Up @@ -495,15 +501,19 @@ void LSImpl::NotifyValue(TopicData* topic, unsigned int eventFlags) {
if (subscriber->active) {
subscriber->pollStorage.emplace_back(topic->lastValue);
subscriber->handle.Set();
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
if (!subscriber->valueListeners.empty()) {
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
}
}
}

for (auto&& subscriber : topic->multiSubscribers) {
subscriber->handle.Set();
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
if (!subscriber->valueListeners.empty()) {
m_listenerStorage.Notify(subscriber->valueListeners, eventFlags,
topic->handle, 0, topic->lastValue);
}
}
}

Expand Down Expand Up @@ -889,6 +899,12 @@ std::unique_ptr<MultiSubscriberData> LSImpl::RemoveMultiSubscriber(

void LSImpl::AddListenerImpl(NT_Listener listenerHandle, TopicData* topic,
unsigned int eventMask) {
if (topic->localSubscribers.size() >= kMaxSubscribers) {
ERROR(
"reached maximum number of subscribers to '{}', ignoring listener add",
topic->name);
return;
}
// subscribe to make sure topic updates are received
PubSubConfig config;
config.topicsOnly = (eventMask & NT_EVENT_VALUE_ALL) == 0;
Expand All @@ -906,6 +922,12 @@ void LSImpl::AddListenerImpl(NT_Listener listenerHandle,
auto topic = subscriber->topic;

if ((eventMask & NT_EVENT_TOPIC) != 0) {
if (topic->listeners.size() >= kMaxListeners) {
ERROR("reached maximum number of listeners to '{}', not adding listener",
topic->name);
return;
}

m_listenerStorage.Activate(
listenerHandle, eventMask & (NT_EVENT_TOPIC | NT_EVENT_IMMEDIATE));

Expand All @@ -922,6 +944,11 @@ void LSImpl::AddListenerImpl(NT_Listener listenerHandle,
}

if ((eventMask & NT_EVENT_VALUE_ALL) != 0) {
if (subscriber->valueListeners.size() >= kMaxListeners) {
ERROR("reached maximum number of listeners to '{}', not adding listener",
topic->name);
return;
}
m_listenerStorage.Activate(
listenerHandle, eventMask & (NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE),
[subentryHandle](unsigned int mask, Event* event) {
Expand Down Expand Up @@ -968,6 +995,11 @@ void LSImpl::AddListenerImpl(NT_Listener listenerHandle,
}

if ((eventMask & NT_EVENT_TOPIC) != 0) {
if (m_topicPrefixListeners.size() >= kMaxListeners) {
ERROR("reached maximum number of listeners, not adding listener");
return;
}

m_listenerStorage.Activate(
listenerHandle, eventMask & (NT_EVENT_TOPIC | NT_EVENT_IMMEDIATE));

Expand All @@ -989,6 +1021,11 @@ void LSImpl::AddListenerImpl(NT_Listener listenerHandle,
}

if ((eventMask & NT_EVENT_VALUE_ALL) != 0) {
if (subscriber->valueListeners.size() >= kMaxListeners) {
ERROR("reached maximum number of listeners, not adding listener");
return;
}

m_listenerStorage.Activate(
listenerHandle, eventMask & (NT_EVENT_VALUE_ALL | NT_EVENT_IMMEDIATE),
[subentryHandle = subscriber->handle.GetHandle()](unsigned int mask,
Expand Down Expand Up @@ -1018,6 +1055,10 @@ void LSImpl::AddListenerImpl(NT_Listener listenerHandle,
void LSImpl::AddListener(NT_Listener listenerHandle,
std::span<const std::string_view> prefixes,
unsigned int eventMask) {
if (m_multiSubscribers.size() >= kMaxMultiSubscribers) {
ERROR("reached maximum number of multi-subscribers, not adding listener");
return;
}
// subscribe to make sure topic updates are received
PubSubOptions options;
options.topicsOnly = (eventMask & NT_EVENT_VALUE_ALL) == 0;
Expand Down Expand Up @@ -1548,6 +1589,13 @@ NT_Subscriber LocalStorage::Subscribe(NT_Topic topicHandle, NT_Type type,
return 0;
}

if (topic->localSubscribers.size() >= kMaxSubscribers) {
WPI_ERROR(m_impl->m_logger,
"reached maximum number of subscribers to '{}', not subscribing",
topic->name);
return 0;
}

// Create subscriber
return m_impl->AddLocalSubscriber(topic, PubSubConfig{type, typeStr, options})
->handle;
Expand All @@ -1562,6 +1610,13 @@ NT_MultiSubscriber LocalStorage::SubscribeMultiple(
std::span<const std::string_view> prefixes,
std::span<const PubSubOption> options) {
std::scoped_lock lock{m_mutex};

if (m_impl->m_multiSubscribers.size() >= kMaxMultiSubscribers) {
WPI_ERROR(m_impl->m_logger,
"reached maximum number of multi-subscribers, not subscribing");
return 0;
}

PubSubOptions opts{options};
opts.prefixMatch = true;
return m_impl->AddMultiSubscriber(prefixes, opts)->handle;
Expand Down Expand Up @@ -1594,6 +1649,13 @@ NT_Publisher LocalStorage::Publish(NT_Topic topicHandle, NT_Type type,
return 0;
}

if (topic->localPublishers.size() >= kMaxPublishers) {
WPI_ERROR(m_impl->m_logger,
"reached maximum number of publishers to '{}', not publishing",
topic->name);
return 0;
}

return m_impl
->AddLocalPublisher(topic, properties,
PubSubConfig{type, typeStr, options})
Expand Down Expand Up @@ -1627,6 +1689,14 @@ NT_Entry LocalStorage::GetEntry(NT_Topic topicHandle, NT_Type type,
return 0;
}

if (topic->localSubscribers.size() >= kMaxSubscribers) {
WPI_ERROR(
m_impl->m_logger,
"reached maximum number of subscribers to '{}', not creating entry",
topic->name);
return 0;
}

// Create subscriber
auto subscriber =
m_impl->AddLocalSubscriber(topic, PubSubConfig{type, typeStr, options});
Expand Down Expand Up @@ -2010,6 +2080,14 @@ NT_Entry LocalStorage::GetEntry(std::string_view name) {
auto* topic = m_impl->GetOrCreateTopic(name);

if (topic->entry == 0) {
if (topic->localSubscribers.size() >= kMaxSubscribers) {
WPI_ERROR(
m_impl->m_logger,
"reached maximum number of subscribers to '{}', not creating entry",
topic->name);
return 0;
}

// Create subscriber
auto* subscriber = m_impl->AddLocalSubscriber(topic, {});

Expand Down
28 changes: 13 additions & 15 deletions ntcore/src/main/native/cpp/net/ServerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,20 @@ void ClientData4Base::ClientSubscribe(int64_t subuid,
sub->periodMs = kMinPeriodMs;
}

// update periodic sender (if not local)
if (!m_local) {
if (m_periodMs == UINT32_MAX) {
m_periodMs = sub->periodMs;
} else {
m_periodMs = std::gcd(m_periodMs, sub->periodMs);
}
if (m_periodMs < kMinPeriodMs) {
m_periodMs = kMinPeriodMs;
}
m_setPeriodic(m_periodMs);
}

// see if this immediately subscribes to any topics
bool updatedPeriodic = false;
for (auto&& topic : m_server.m_topics) {
bool removed = false;
if (replace) {
Expand All @@ -647,14 +659,6 @@ void ClientData4Base::ClientSubscribe(int64_t subuid,
m_server.UpdateMetaTopicSub(topic.get());
}

if (added || removed) {
// update periodic sender (if not local)
if (!m_local) {
m_periodMs = std::gcd(m_periodMs, sub->periodMs);
updatedPeriodic = true;
}
}

if (!wasSubscribed && added && !removed) {
// announce topic to client
DEBUG4("client {}: announce {}", m_id, topic->name);
Expand All @@ -667,12 +671,6 @@ void ClientData4Base::ClientSubscribe(int64_t subuid,
}
}
}
if (updatedPeriodic) {
if (m_periodMs < kMinPeriodMs) {
m_periodMs = kMinPeriodMs;
}
m_setPeriodic(m_periodMs);
}

// update meta data
UpdateMetaClientSub();
Expand Down
35 changes: 35 additions & 0 deletions ntcore/src/main/native/cpp/networktables/NetworkTableInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,44 @@ void NetworkTableInstance::SetServer(std::span<const std::string_view> servers,
SetServer(serversArr);
}

NT_Listener NetworkTableInstance::AddListener(Topic topic,
unsigned int eventMask,
ListenerCallback listener) {
if (::nt::GetInstanceFromHandle(topic.GetHandle()) != m_handle) {
fmt::print(stderr, "AddListener: topic is not from this instance\n");
return 0;
}
return ::nt::AddListener(topic.GetHandle(), eventMask, std::move(listener));
}

NT_Listener NetworkTableInstance::AddListener(Subscriber& subscriber,
unsigned int eventMask,
ListenerCallback listener) {
if (::nt::GetInstanceFromHandle(subscriber.GetHandle()) != m_handle) {
fmt::print(stderr, "AddListener: subscriber is not from this instance\n");
return 0;
}
return ::nt::AddListener(subscriber.GetHandle(), eventMask,
std::move(listener));
}

NT_Listener NetworkTableInstance::AddListener(NetworkTableEntry& entry,
int eventMask,
ListenerCallback listener) {
if (::nt::GetInstanceFromHandle(entry.GetHandle()) != m_handle) {
fmt::print(stderr, "AddListener: entry is not from this instance\n");
return 0;
}
return ::nt::AddListener(entry.GetHandle(), eventMask, std::move(listener));
}

NT_Listener NetworkTableInstance::AddListener(MultiSubscriber& subscriber,
int eventMask,
ListenerCallback listener) {
if (::nt::GetInstanceFromHandle(subscriber.GetHandle()) != m_handle) {
fmt::print(stderr, "AddListener: subscriber is not from this instance\n");
return 0;
}
return ::nt::AddListener(subscriber.GetHandle(), eventMask,
std::move(listener));
}
8 changes: 8 additions & 0 deletions ntcore/src/main/native/cpp/ntcore_cpp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,14 @@ NT_Listener AddPolledListener(NT_ListenerPoller poller,
NT_Listener AddPolledListener(NT_ListenerPoller poller, NT_Handle handle,
unsigned int mask) {
if (auto ii = InstanceImpl::GetTyped(poller, Handle::kListenerPoller)) {
if (Handle{handle}.GetInst() != Handle{poller}.GetInst()) {
WPI_ERROR(
ii->logger,
"AddPolledListener(): trying to listen to handle {} (instance {}) "
"with poller {} (instance {}), ignored due to different instance",
handle, Handle{handle}.GetInst(), poller, Handle{poller}.GetInst());
return {};
}
auto listener = ii->listenerStorage.AddListener(poller);
DoAddListener(*ii, listener, handle, mask);
return listener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class NetworkTableInstance final {
*
* @param inst Instance
*/
static void Destroy(NetworkTableInstance inst);
static void Destroy(NetworkTableInstance& inst);

/**
* Gets the native handle for the entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ inline NetworkTableInstance NetworkTableInstance::Create() {
return NetworkTableInstance{CreateInstance()};
}

inline void NetworkTableInstance::Destroy(NetworkTableInstance inst) {
inline void NetworkTableInstance::Destroy(NetworkTableInstance& inst) {
if (inst.m_handle != 0) {
DestroyInstance(inst.m_handle);
inst.m_handle = 0;
}
}

Expand Down Expand Up @@ -99,22 +100,6 @@ inline NT_Listener NetworkTableInstance::AddConnectionListener(
std::move(callback));
}

inline NT_Listener NetworkTableInstance::AddListener(
Topic topic, unsigned int eventMask, ListenerCallback listener) {
return ::nt::AddListener(topic.GetHandle(), eventMask, std::move(listener));
}

inline NT_Listener NetworkTableInstance::AddListener(
Subscriber& subscriber, unsigned int eventMask, ListenerCallback listener) {
return ::nt::AddListener(subscriber.GetHandle(), eventMask,
std::move(listener));
}

inline NT_Listener NetworkTableInstance::AddListener(
NetworkTableEntry& entry, int eventMask, ListenerCallback listener) {
return ::nt::AddListener(entry.GetHandle(), eventMask, std::move(listener));
}

inline NT_Listener NetworkTableInstance::AddListener(
std::span<const std::string_view> prefixes, int eventMask,
ListenerCallback listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ inline NetworkTableListener::NetworkTableListener(NetworkTableListener&& rhs)

inline NetworkTableListener& NetworkTableListener::operator=(
NetworkTableListener&& rhs) {
std::swap(m_handle, rhs.m_handle);
if (m_handle != 0) {
nt::RemoveListener(m_handle);
}
m_handle = rhs.m_handle;
rhs.m_handle = 0;
return *this;
}

Expand Down Expand Up @@ -102,7 +106,11 @@ inline NetworkTableListenerPoller::NetworkTableListenerPoller(

inline NetworkTableListenerPoller& NetworkTableListenerPoller::operator=(
NetworkTableListenerPoller&& rhs) {
std::swap(m_handle, rhs.m_handle);
if (m_handle != 0) {
nt::DestroyListenerPoller(m_handle);
}
m_handle = rhs.m_handle;
rhs.m_handle = 0;
return *this;
}

Expand Down
Loading

0 comments on commit 4ba16db

Please sign in to comment.