Skip to content

Commit

Permalink
[ntcore] Various NT4 fixes (wpilibsuite#4474)
Browse files Browse the repository at this point in the history
* TopicListener: Fix Add() return values
* Update PubSubOption poll storage documentation
* Update NetworkTableEntry::GetValue() doc
* Add documentation regarding asynchronous callbacks
* Unpublish entry: set publisher to nullptr
* Implement ValueListenerPoller default constructor
* Remove SetNetworkIdentity, make parameter to StartClient
* URI-escape client ID, improve error message
* Add connected message with client id; also improve disconnected message a bit
* Handle SetServers either before or after StartClient
* Fix client use-after-free; also delay reconnect after disconnect to rate limit
* Don't re-announce to already subscribed client; we especially don't want to send the last value again
* Always accept in-order sets, only use timestamp for tiebreak
* Fix LocalStorage::StartNetwork race
* Remove unused/unimplemented function

Also:
* [glass] Remove debug print
* [glass] Fix mpack string decoding
* [cameraserver] Fix up startclient
  • Loading branch information
PeterJohnson authored Oct 22, 2022
1 parent 4a401b8 commit 10ed4b3
Show file tree
Hide file tree
Showing 47 changed files with 253 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public static void main(String... args) {
} else {
System.out.println("Setting up NetworkTables client for team " + team);
ntinst.setServerTeam(team);
ntinst.startClient4();
ntinst.startClient4("multicameraserver");
}

// start cameras
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ int main(int argc, char* argv[]) {
ntinst.StartServer();
} else {
fmt::print("Setting up NetworkTables client for team {}\n", team);
ntinst.StartClient4();
ntinst.StartClient4("multicameraserver");
ntinst.SetServerTeam(team);
}

Expand Down
2 changes: 0 additions & 2 deletions glass/src/libnt/native/cpp/NetworkTables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ static void UpdateMsgpackValueSource(NetworkTablesModel::ValueSource* out,
case mpack::mpack_type_str: {
std::string str;
mpack_read_str(&r, &tag, &str);
mpack_done_str(&r);
out->UpdateFromValue(nt::Value::MakeString(std::move(str), time), name,
"");
break;
Expand Down Expand Up @@ -461,7 +460,6 @@ void NetworkTablesModel::Update() {
entry->info.type_str);
if (wpi::starts_with(entry->info.name, '$') && entry->value.IsRaw() &&
entry->info.type_str == "msgpack") {
fmt::print(stderr, "Updating meta-topic {}\n", entry->info.name);
// meta topic handling
if (entry->info.name == "$clients") {
UpdateClients(entry->value.GetRaw());
Expand Down
5 changes: 2 additions & 3 deletions glass/src/libnt/native/cpp/NetworkTablesSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ void NetworkTablesSettings::Thread::Main() {
if (m_mode == 1 || m_mode == 2) {
std::string_view serverTeam{m_serverTeam};
std::optional<unsigned int> team;
nt::SetNetworkIdentity(m_inst, m_clientName);
if (m_mode == 1) {
nt::StartClient4(m_inst);
nt::StartClient4(m_inst, m_clientName);
} else if (m_mode == 2) {
nt::StartClient3(m_inst);
nt::StartClient3(m_inst, m_clientName);
}
if (!wpi::contains(serverTeam, '.') &&
(team = wpi::parse_integer<unsigned int>(serverTeam, 10))) {
Expand Down
4 changes: 1 addition & 3 deletions ntcore/src/dev/native/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ void bench() {
// set up instances
auto client = nt::CreateInstance();
auto server = nt::CreateInstance();
nt::SetNetworkIdentity(server, "server");
nt::SetNetworkIdentity(client, "client");

// connect client and server
nt::StartServer(server, "bench.json", "127.0.0.1", 0, 10000);
nt::StartClient4(client);
nt::StartClient4(client, "client");
nt::SetServer(client, "127.0.0.1", 10000);

using namespace std::chrono_literals;
Expand Down
30 changes: 14 additions & 16 deletions ntcore/src/generate/java/NetworkTableInstance.java.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -450,16 +450,6 @@ public final class NetworkTableInstance implements AutoCloseable {
* Client/Server Functions
*/

/**
* Set the network identity of this node. This is the name used during the initial connection
* handshake, and is visible through ConnectionInfo on the remote node.
*
* @param name identity to advertise
*/
public void setNetworkIdentity(String name) {
NetworkTablesJNI.setNetworkIdentity(m_handle, name);
}

/**
* Get the current network mode.
*
Expand Down Expand Up @@ -541,14 +531,22 @@ public final class NetworkTableInstance implements AutoCloseable {
NetworkTablesJNI.stopServer(m_handle);
}

/** Starts a NT3 client. Use SetServer or SetServerTeam to set the server name and port. */
public void startClient3() {
NetworkTablesJNI.startClient3(m_handle);
/**
* Starts a NT3 client. Use SetServer or SetServerTeam to set the server name and port.
*
* @param identity network identity to advertise (cannot be empty string)
*/
public void startClient3(String identity) {
NetworkTablesJNI.startClient3(m_handle, identity);
}

/** Starts a NT4 client. Use SetServer or SetServerTeam to set the server name and port. */
public void startClient4() {
NetworkTablesJNI.startClient4(m_handle);
/**
* Starts a NT4 client. Use SetServer or SetServerTeam to set the server name and port.
*
* @param identity network identity to advertise (cannot be empty string)
*/
public void startClient4(String identity) {
NetworkTablesJNI.startClient4(m_handle, identity);
}

/** Stops the client if it is running. */
Expand Down
6 changes: 2 additions & 4 deletions ntcore/src/generate/java/NetworkTablesJNI.java.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,6 @@ public final class NetworkTablesJNI {

public static native void removeConnectionListener(int connListener);

public static native void setNetworkIdentity(int inst, String name);

public static native int getNetworkMode(int inst);

public static native void startLocal(int inst);
Expand All @@ -247,9 +245,9 @@ public final class NetworkTablesJNI {

public static native void stopServer(int inst);

public static native void startClient3(int inst);
public static native void startClient3(int inst, String identity);

public static native void startClient4(int inst);
public static native void startClient4(int inst, String identity);

public static native void stopClient(int inst);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public final class ConnectionInfo {
/**
* The remote identifier (as set on the remote node by {@link
* NetworkTableInstance#setNetworkIdentity(String)}).
* NetworkTableInstance#startClient4(String)}).
*/
public final String remote_id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

/** Connection listener. This calls back to a callback function when a connection change occurs. */
/**
* Connection listener. This calls back to a callback function when a connection change occurs. The
* callback function is called asynchronously on a separate thread, so it's important to use
* synchronization or atomics when accessing any shared state from the callback function.
*/
public final class ConnectionListener implements AutoCloseable {
/**
* Create a listener for connection changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public static PubSubOption keepDuplicates(boolean enabled) {

/**
* Polling storage for subscription. Specifies the maximum number of updates NetworkTables should
* store between calls to the subscriber's poll() function. Defaults to 1.
* store between calls to the subscriber's poll() function. Defaults to 1 if sendAll is false, 20
* if sendAll is true.
*
* @param depth number of entries to save for polling.
* @return option
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

/**
* Topic change listener. This calls back to a callback function when a topic change matching the
* specified mask occurs.
* specified mask occurs. The callback function is called asynchronously on a separate thread, so
* it's important to use synchronization or atomics when accessing any shared state from the
* callback function.
*/
public final class TopicListener implements AutoCloseable {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

/**
* Value change listener. This calls back to a callback function when a value change matching the
* specified mask occurs.
* specified mask occurs. The callback function is called asynchronously on a separate thread, so
* it's important to use synchronization or atomics when accessing any shared state from the
* callback function.
*/
public final class ValueListener implements AutoCloseable {
/**
Expand Down
22 changes: 16 additions & 6 deletions ntcore/src/main/native/cpp/InstanceImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,29 @@ void InstanceImpl::StopServer() {
networkMode = NT_NET_MODE_NONE;
}

void InstanceImpl::StartClient3() {
void InstanceImpl::StartClient3(std::string_view identity) {
std::scoped_lock lock{m_mutex};
if (networkMode != NT_NET_MODE_NONE) {
return;
}
m_networkClient = std::make_shared<NetworkClient3>(
m_inst, m_identity, localStorage, connectionList, logger);
m_inst, identity, localStorage, connectionList, logger);
if (!m_servers.empty()) {
m_networkClient->SetServers(m_servers);
}
networkMode = NT_NET_MODE_CLIENT3;
}

void InstanceImpl::StartClient4() {
void InstanceImpl::StartClient4(std::string_view identity) {
std::scoped_lock lock{m_mutex};
if (networkMode != NT_NET_MODE_NONE) {
return;
}
m_networkClient = std::make_shared<NetworkClient>(
m_inst, m_identity, localStorage, connectionList, logger);
m_inst, identity, localStorage, connectionList, logger);
if (!m_servers.empty()) {
m_networkClient->SetServers(m_servers);
}
networkMode = NT_NET_MODE_CLIENT4;
}

Expand All @@ -149,9 +155,13 @@ void InstanceImpl::StopClient() {
networkMode = NT_NET_MODE_NONE;
}

void InstanceImpl::SetIdentity(std::string_view identity) {
void InstanceImpl::SetServers(
std::span<const std::pair<std::string, unsigned int>> servers) {
std::scoped_lock lock{m_mutex};
m_identity = identity;
m_servers = {servers.begin(), servers.end()};
if (m_networkClient) {
m_networkClient->SetServers(servers);
}
}

std::shared_ptr<NetworkServer> InstanceImpl::GetServer() {
Expand Down
10 changes: 6 additions & 4 deletions ntcore/src/main/native/cpp/InstanceImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <memory>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

#include <wpi/mutex.h>
Expand Down Expand Up @@ -45,10 +46,11 @@ class InstanceImpl {
std::string_view listenAddress, unsigned int port3,
unsigned int port4);
void StopServer();
void StartClient3();
void StartClient4();
void StartClient3(std::string_view identity);
void StartClient4(std::string_view identity);
void StopClient();
void SetIdentity(std::string_view identity);
void SetServers(
std::span<const std::pair<std::string, unsigned int>> servers);

std::shared_ptr<NetworkServer> GetServer();
std::shared_ptr<INetworkClient> GetClient();
Expand All @@ -68,9 +70,9 @@ class InstanceImpl {
static wpi::mutex s_mutex;

wpi::mutex m_mutex;
std::string m_identity;
std::shared_ptr<NetworkServer> m_networkServer;
std::shared_ptr<INetworkClient> m_networkClient;
std::vector<std::pair<std::string, unsigned int>> m_servers;
int m_inst;
};

Expand Down
23 changes: 13 additions & 10 deletions ntcore/src/main/native/cpp/LocalStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct TopicData {
std::string name;

Value lastValue; // also stores timestamp
bool lastValueNetwork{false};
NT_Type type{NT_UNASSIGNED};
std::string typeStr;
unsigned int flags{0}; // for NT3 APIs
Expand Down Expand Up @@ -646,6 +647,7 @@ void LSImpl::CheckReset(TopicData* topic) {
return;
}
topic->lastValue = {};
topic->lastValueNetwork = false;
topic->type = NT_UNASSIGNED;
topic->typeStr.clear();
topic->flags = 0;
Expand All @@ -658,10 +660,13 @@ bool LSImpl::SetValue(TopicData* topic, const Value& value,
if (topic->type != NT_UNASSIGNED && topic->type != value.type()) {
return false;
}
if (!topic->lastValue || value.time() >= topic->lastValue.time()) {
bool isNetwork = (eventFlags & NT_VALUE_NOTIFY_LOCAL) == 0;
if (!topic->lastValue || topic->lastValueNetwork == isNetwork ||
value.time() >= topic->lastValue.time()) {
// TODO: notify option even if older value
topic->type = value.type();
topic->lastValue = value;
topic->lastValueNetwork = isNetwork;
NotifyValue(topic, eventFlags);
}
if (topic->datalogType == value.type()) {
Expand Down Expand Up @@ -999,6 +1004,7 @@ std::unique_ptr<PublisherData> LSImpl::RemoveLocalPublisher(

SubscriberData* LSImpl::AddLocalSubscriber(TopicData* topic,
const PubSubConfig& config) {
DEBUG4("AddLocalSubscriber({})", topic->name);
auto subscriber = m_subscribers.Add(m_inst, topic, config);
topic->localSubscribers.Add(subscriber);
// set subscriber to active if the type matches
Expand All @@ -1011,6 +1017,7 @@ SubscriberData* LSImpl::AddLocalSubscriber(TopicData* topic,
topic->name, config.typeStr, topic->typeStr);
}
if (m_network) {
DEBUG4("-> NetworkSubscribe({})", topic->name);
m_network->Subscribe(subscriber->handle, {{topic->name}}, config);
}
return subscriber;
Expand Down Expand Up @@ -1578,7 +1585,8 @@ void LocalStorage::NetworkSetValue(NT_Topic topicHandle, const Value& value) {
}
}

void LocalStorage::StartNetwork(net::NetworkStartupInterface& startup) {
void LocalStorage::StartNetwork(net::NetworkStartupInterface& startup,
net::NetworkInterface* network) {
std::scoped_lock lock{m_mutex};
// publish all active publishers to the network and send last values
// only send value once per topic
Expand All @@ -1596,19 +1604,13 @@ void LocalStorage::StartNetwork(net::NetworkStartupInterface& startup) {
}
}
for (auto&& subscriber : m_impl->m_subscribers) {
if (subscriber->active) {
startup.Subscribe(subscriber->handle, {{subscriber->topic->name}},
subscriber->config);
}
startup.Subscribe(subscriber->handle, {{subscriber->topic->name}},
subscriber->config);
}
for (auto&& subscriber : m_impl->m_multiSubscribers) {
startup.Subscribe(subscriber->handle, subscriber->prefixes,
subscriber->options);
}
}

void LocalStorage::SetNetwork(net::NetworkInterface* network) {
std::scoped_lock lock{m_mutex};
m_impl->m_network = network;
}

Expand Down Expand Up @@ -1914,6 +1916,7 @@ void LocalStorage::Unpublish(NT_Handle pubentryHandle) {
} else if (auto entry = m_impl->m_entries.Get(pubentryHandle)) {
if (entry->publisher) {
m_impl->RemoveLocalPublisher(entry->publisher->handle);
entry->publisher = nullptr;
}
} else {
// TODO: report warning
Expand Down
4 changes: 2 additions & 2 deletions ntcore/src/main/native/cpp/LocalStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class LocalStorage final : public net::ILocalStorage {
bool ack) final;
void NetworkSetValue(NT_Topic topicHandle, const Value& value) final;

void StartNetwork(net::NetworkStartupInterface& startup) final;
void SetNetwork(net::NetworkInterface* network) final;
void StartNetwork(net::NetworkStartupInterface& startup,
net::NetworkInterface* network) final;
void ClearNetwork() final;

// User functions. These are the actual implementations of the corresponding
Expand Down
Loading

0 comments on commit 10ed4b3

Please sign in to comment.