Skip to content

Commit

Permalink
Added NT4 ping connection checking
Browse files Browse the repository at this point in the history
  • Loading branch information
Gold872 committed Oct 13, 2023
1 parent 6afee51 commit 780e526
Showing 1 changed file with 85 additions and 20 deletions.
105 changes: 85 additions & 20 deletions lib/services/nt4.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import 'package:msgpack_dart/msgpack_dart.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

class NT4Client {
// TODO: Change to 1000 for 2024 NT4.1 updates
static int pingTimeoutMs = 5000;

String serverBaseAddress;
final VoidCallback? onConnect;
final VoidCallback? onDisconnect;
Expand All @@ -27,7 +30,14 @@ class NT4Client {
bool _serverConnectionActive = false;
int _serverTimeOffsetUS = 0;

WebSocketChannel? _ws;
WebSocketChannel? _mainWebsocket;
// TODO: Uncomment for 2024 NT4.1 updates
// WebSocketChannel? _rttWebsocket;

Timer? pingTimer;
Timer? pongTimer;

int lastPongTime = 0;

Map<int, NT4Subscription> get subscriptions => _subscriptions;
Set<NT4Subscription> get subscribedTopics => _subscribedTopics;
Expand All @@ -39,10 +49,6 @@ class NT4Client {
this.onConnect,
this.onDisconnect,
}) {
Timer.periodic(const Duration(milliseconds: 5000), (timer) {
_wsSendTimestamp();
});

wsConnect();
}

Expand Down Expand Up @@ -240,20 +246,27 @@ class NT4Client {
return _getClientTimeUS() + _serverTimeOffsetUS;
}

void _wsSendTimestamp() {
void _rttSendTimestamp() {
var timeTopic = announcedTopics[-1];
if (timeTopic != null) {
int timeToSend = _getClientTimeUS();
addSample(timeTopic, timeToSend, 0);

var rawData =
serialize([timeTopic.pubUID, 0, timeTopic.getTypeId(), timeToSend]);

// TODO: Change this to _rttWebsocket for 2024 NT4.1 updates
_mainWebsocket?.sink.add(rawData);
}
}

void _wsHandleRecieveTimestamp(int serverTimestamp, int clientTimestamp) {
void _rttHandleRecieveTimestamp(int serverTimestamp, int clientTimestamp) {
int rxTime = _getClientTimeUS();

int rtt = rxTime - clientTimestamp;
int serverTimeAtRx = (serverTimestamp - rtt / 2.0).round();
_serverTimeOffsetUS = serverTimeAtRx - rxTime;

lastPongTime = DateTime.now().millisecondsSinceEpoch;
}

void _wsSubscribe(NT4Subscription sub) {
Expand All @@ -277,7 +290,7 @@ class NT4Client {
}

void _wsSendJSON(String method, Map<String, dynamic> params) {
_ws?.sink.add(jsonEncode([
_mainWebsocket?.sink.add(jsonEncode([
{
'method': method,
'params': params,
Expand All @@ -286,7 +299,7 @@ class NT4Client {
}

void _wsSendBinary(dynamic data) {
_ws?.sink.add(data);
_mainWebsocket?.sink.add(data);
}

void wsConnect() async {
Expand All @@ -296,24 +309,39 @@ class NT4Client {

_clientId = Random().nextInt(99999999);

String serverAddr = 'ws://$serverBaseAddress:5810/nt/elastic';
String mainServerAddr = 'ws://$serverBaseAddress:5810/nt/elastic';

// TODO: Uncomment for 2024 NT4.1 updates
// String rttServerAddr = 'ws://$serverBaseAddress:5810/nt/elastic';

_ws = WebSocketChannel.connect(Uri.parse(serverAddr),
_mainWebsocket = WebSocketChannel.connect(Uri.parse(mainServerAddr),
protocols: ['networktables.first.wpi.edu']);

// TODO: Uncomment for 2024 NT4.1 updates
// _rttWebsocket = WebSocketChannel.connect(Uri.parse(rttServerAddr),
// protocols: ['rtt.networktables.first.wpi.edu']);

try {
await _ws!.ready;
await _mainWebsocket!.ready;
// TODO: Uncomment for 2024 NT4.1 updates
// await _rttWebsocket!.ready;
} catch (e) {
// Failed to connect... try again
Future.delayed(const Duration(seconds: 1), wsConnect);
return;
}

_ws!.stream.listen(
pingTimer ??= Timer.periodic(const Duration(milliseconds: 200), (timer) {
_rttSendTimestamp();
});
pongTimer ??=
Timer.periodic(const Duration(milliseconds: 1000), _checkPingStatus);

_mainWebsocket!.stream.listen(
(data) {
// Prevents repeated calls to onConnect and reconnecting after changing ip addresses
if (!_serverConnectionActive &&
serverAddr.contains(serverBaseAddress)) {
mainServerAddr.contains(serverBaseAddress)) {
lastAnnouncedValues.clear();

for (NT4Subscription sub in _subscriptions.values) {
Expand All @@ -326,14 +354,33 @@ class NT4Client {
}
_wsOnMessage(data);
},
onDone: _wsOnClose,
// onDone: _wsOnClose,
onError: (err) {
if (kDebugMode) {
print('NT4 ERR: $err');
}
},
);

// TODO: Uncomment for 2024 NT4.1 updates
// _rttWebsocket!.stream.listen(
// (data) {
// if (data is! List<int>) {
// return;
// }

// var msg = Unpacker.fromList(data).unpackList();

// int topicID = msg[0] as int;
// int timestampUS = msg[1] as int;
// var value = msg[3];

// if (topicID == -1) {
// _rttHandleRecieveTimestamp(timestampUS, value as int);
// }
// },
// );

NT4Topic timeTopic = NT4Topic(
name: "Time",
type: NT4TypeStr.kInt,
Expand All @@ -342,7 +389,7 @@ class NT4Client {
properties: {});
announcedTopics[timeTopic.id] = timeTopic;

_wsSendTimestamp();
_rttSendTimestamp();

for (NT4Topic topic in _clientPublishedTopics.values) {
_wsPublish(topic);
Expand All @@ -355,11 +402,17 @@ class NT4Client {
}

void _wsOnClose() {
_ws?.sink.close();
_mainWebsocket?.sink.close();
// TODO: Uncomment for 2024 NT4.1 updates
// _rttWebsocket?.sink.close();

_ws = null;
_mainWebsocket = null;
// TODO: Uncomment for 2024 NT4.1 updates
// _rttWebsocket = null;
_serverConnectionActive = false;

lastPongTime = 0;

onDisconnect?.call();

announcedTopics.clear();
Expand Down Expand Up @@ -464,7 +517,7 @@ class NT4Client {
}
}
} else if (topicID == -1) {
_wsHandleRecieveTimestamp(timestampUS, value as int);
_rttHandleRecieveTimestamp(timestampUS, value as int);
} else {
if (kDebugMode) {
print('[NT4] ignoring binary data, invalid topic ID');
Expand All @@ -477,6 +530,18 @@ class NT4Client {
}
}

void _checkPingStatus(Timer timer) {
if (!_serverConnectionActive || lastPongTime == 0) {
return;
}

int currentTime = DateTime.now().millisecondsSinceEpoch;

if (currentTime - lastPongTime > pingTimeoutMs) {
_wsOnClose();
}
}

int getNewSubUID() {
_subscriptionUIDCounter++;
return _subscriptionUIDCounter + _clientId;
Expand Down

0 comments on commit 780e526

Please sign in to comment.