Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stability under load #24

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 2.3.2

- fix: stability under load
- ensure that socket.done is handled in all cases
- check a side's state before attempting to write to that side's socket

## 2.3.1

- fix: correctly handle situation where a socket has been closed but the other
Expand Down
29 changes: 21 additions & 8 deletions lib/src/socket_connector.dart
Original file line number Diff line number Diff line change
Expand Up @@ -136,23 +136,31 @@ class SocketConnector {
'Added connection. There are now ${connections.length} connections.'));

for (final side in [thisSide, thisSide.farSide!]) {
unawaited(side.socket.done
.then((v) => _destroySide(side))
.catchError((err) => _destroySide(side)));
if (side.transformer != null) {
// transformer is there to transform data originating FROM its side
StreamController<Uint8List> sc = StreamController<Uint8List>();
side.farSide!.sink = sc;
Stream<List<int>> transformed = side.transformer!(sc.stream);
transformed.listen((event) async {
transformed.listen((data) {
try {
side.farSide!.socket.add(event);
await side.farSide!.socket.flush();
} catch (e) {
if (side.farSide!.state == SideState.open) {
side.farSide!.socket.add(data);
} else {
throw StateError(
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
}
} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st');
_destroySide(side.farSide!);
}
});
}
side.stream.listen((Uint8List data) async {
side.stream.listen((Uint8List data) {
if (logTraffic) {
final message = String.fromCharCodes(data);
if (side.isSideA) {
Expand All @@ -164,11 +172,16 @@ class SocketConnector {
}
}
try {
side.farSide!.sink.add(data);
await side.farSide!.socket.flush();
} catch (e) {
if (side.farSide!.state == SideState.open) {
side.farSide!.sink.add(data);
} else {
throw StateError(
'Will not write to side ${side.farSide!.name} as its state is ${side.farSide!.state}');
}
} catch (e, st) {
_log('Failed to write to side ${side.farSide!.name} - closing',
force: true);
_log('(Error was $e; Stack trace follows\n$st');
_destroySide(side.farSide!);
}
}, onDone: () {
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: socket_connector
description: Package for joining sockets together to create socket relays.

version: 2.3.1
version: 2.3.2
repository: https://github.com/cconstab/socket_connector

environment:
Expand Down