Skip to content

Commit

Permalink
Revert "🐛 Implement multi-subscription and cancellation mechanism for…
Browse files Browse the repository at this point in the history
… peers"

This reverts commit d0093e7.
  • Loading branch information
AlexV525 committed Dec 31, 2024
1 parent d1f6bcf commit d4bf9a6
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 114 deletions.
40 changes: 10 additions & 30 deletions packages/reown_core/lib/relay_client/json_rpc_2/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import 'utils.dart';
/// [sendNotification] if no response is expected.
class Client {
final StreamChannel<dynamic> _channel;
late final _stream = _channel.stream.asBroadcastStream();
final _channelSubscriptions = <StreamSubscription<dynamic>>{};

/// The next request id.
var _id = 0;
Expand Down Expand Up @@ -51,11 +49,9 @@ class Client {
///
/// Note that the client won't begin listening to [responses] until
/// [Client.listen] is called.
factory Client(StreamChannel<String> channel) {
return Client.withoutJson(
jsonDocument.bind(channel).transformStream(ignoreFormatExceptions),
);
}
Client(StreamChannel<String> channel)
: this.withoutJson(
jsonDocument.bind(channel).transformStream(ignoreFormatExceptions));

/// Creates a [Client] that communicates using decoded messages over
/// [channel].
Expand Down Expand Up @@ -84,22 +80,13 @@ class Client {
///
/// [listen] may only be called once.
Future listen() {
late final StreamSubscription<dynamic> subscription;
subscription = _stream.listen(
_handleResponse,
onError: (error, stackTrace) {
_done.completeError(error, stackTrace);
_channel.sink.close();
},
onDone: () {
if (!_done.isCompleted) {
_done.complete();
}
subscription.cancel();
_channelSubscriptions.remove(subscription);
close();
},
);
_channel.stream.listen(_handleResponse, onError: (error, stackTrace) {
_done.completeError(error, stackTrace);
_channel.sink.close();
}, onDone: () {
if (!_done.isCompleted) _done.complete();
close();
});
return done;
}

Expand All @@ -110,13 +97,6 @@ class Client {
Future close() {
_channel.sink.close();
if (!_done.isCompleted) _done.complete();
Future.forEach(
_channelSubscriptions.toSet(),
(subscription) async {
_channelSubscriptions.remove(subscription);
await subscription.cancel();
},
);
return done;
}

Expand Down
56 changes: 14 additions & 42 deletions packages/reown_core/lib/relay_client/json_rpc_2/src/peer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import 'utils.dart';
/// communication channel and expects to connect to a peer that does the same.
class Peer implements Client, Server {
final StreamChannel<dynamic> _channel;
late final _stream = _channel.stream.asBroadcastStream();
final _channelSubscriptions = <StreamSubscription<dynamic>>{};

/// The underlying client that handles request-sending and response-receiving
/// logic.
Expand Down Expand Up @@ -61,17 +59,12 @@ class Peer implements Client, Server {
/// some requests which are not conformant with the JSON-RPC 2.0
/// specification. In particular, requests missing the `jsonrpc` parameter
/// will be accepted.
factory Peer(
StreamChannel<String> channel, {
ErrorCallback? onUnhandledError,
bool strictProtocolChecks = true,
}) {
return Peer.withoutJson(
jsonDocument.bind(channel).transform(respondToFormatExceptions),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks,
);
}
Peer(StreamChannel<String> channel,
{ErrorCallback? onUnhandledError, bool strictProtocolChecks = true})
: this.withoutJson(
jsonDocument.bind(channel).transform(respondToFormatExceptions),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks);

/// Creates a [Peer] that communicates using decoded messages over [channel].
///
Expand All @@ -88,22 +81,14 @@ class Peer implements Client, Server {
/// some requests which are not conformant with the JSON-RPC 2.0
/// specification. In particular, requests missing the `jsonrpc` parameter
/// will be accepted.
Peer.withoutJson(
this._channel, {
ErrorCallback? onUnhandledError,
bool strictProtocolChecks = true,
}) {
Peer.withoutJson(this._channel,
{ErrorCallback? onUnhandledError, bool strictProtocolChecks = true}) {
_server = Server.withoutJson(
StreamChannel(
_serverIncomingForwarder.stream,
_channel.sink,
),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks,
);
StreamChannel(_serverIncomingForwarder.stream, _channel.sink),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks);
_client = Client.withoutJson(
StreamChannel(_clientIncomingForwarder.stream, _channel.sink),
);
StreamChannel(_clientIncomingForwarder.stream, _channel.sink));
}

// Client methods.
Expand Down Expand Up @@ -135,8 +120,7 @@ class Peer implements Client, Server {
Future listen() {
_client.listen();
_server.listen();
late final StreamSubscription<dynamic> subscription;
subscription = _stream.listen((message) {
_channel.stream.listen((message) {
if (message is Map) {
if (message.containsKey('result') || message.containsKey('error')) {
_clientIncomingForwarder.add(message);
Expand All @@ -159,26 +143,14 @@ class Peer implements Client, Server {
}
}, onError: (error, stackTrace) {
_serverIncomingForwarder.addError(error, stackTrace);
}, onDone: () {
subscription.cancel();
_channelSubscriptions.remove(subscription);
close();
});
_channelSubscriptions.add(subscription);
}, onDone: close);
return done;
}

@override
Future close() {
_client.close();
_server.close();
Future.forEach(
_channelSubscriptions.toSet(),
(subscription) async {
_channelSubscriptions.remove(subscription);
await subscription.cancel();
},
);
return done;
}
}
56 changes: 14 additions & 42 deletions packages/reown_core/lib/relay_client/json_rpc_2/src/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ typedef ErrorCallback = void Function(dynamic error, dynamic stackTrace);
/// time, or even for a single method to be invoked multiple times at once.
class Server {
final StreamChannel<dynamic> _channel;
late final _stream = _channel.stream.asBroadcastStream();
final _channelSubscriptions = <StreamSubscription<dynamic>>{};

/// The methods registered for this server.
final _methods = <String, Function>{};
Expand Down Expand Up @@ -83,17 +81,12 @@ class Server {
/// If [strictProtocolChecks] is false, this [Server] will accept some
/// requests which are not conformant with the JSON-RPC 2.0 specification. In
/// particular, requests missing the `jsonrpc` parameter will be accepted.
factory Server(
StreamChannel<String> channel, {
ErrorCallback? onUnhandledError,
bool strictProtocolChecks = true,
}) {
return Server.withoutJson(
jsonDocument.bind(channel).transform(respondToFormatExceptions),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks,
);
}
Server(StreamChannel<String> channel,
{ErrorCallback? onUnhandledError, bool strictProtocolChecks = true})
: this.withoutJson(
jsonDocument.bind(channel).transform(respondToFormatExceptions),
onUnhandledError: onUnhandledError,
strictProtocolChecks: strictProtocolChecks);

/// Creates a [Server] that communicates using decoded messages over
/// [channel].
Expand All @@ -110,11 +103,8 @@ class Server {
/// If [strictProtocolChecks] is false, this [Server] will accept some
/// requests which are not conformant with the JSON-RPC 2.0 specification. In
/// particular, requests missing the `jsonrpc` parameter will be accepted.
Server.withoutJson(
this._channel, {
this.onUnhandledError,
this.strictProtocolChecks = true,
});
Server.withoutJson(this._channel,
{this.onUnhandledError, this.strictProtocolChecks = true});

/// Starts listening to the underlying stream.
///
Expand All @@ -123,23 +113,12 @@ class Server {
///
/// [listen] may only be called once.
Future listen() {
late final StreamSubscription<dynamic> subscription;
subscription = _stream.listen(
_handleRequest,
onError: (error, stackTrace) {
_done.completeError(error, stackTrace);
_channel.sink.close();
},
onDone: () {
if (!_done.isCompleted) {
_done.complete();
}
subscription.cancel();
_channelSubscriptions.remove(subscription);
close();
},
);
_channelSubscriptions.add(subscription);
_channel.stream.listen(_handleRequest, onError: (error, stackTrace) {
_done.completeError(error, stackTrace);
_channel.sink.close();
}, onDone: () {
if (!_done.isCompleted) _done.complete();
});
return done;
}

Expand All @@ -150,13 +129,6 @@ class Server {
Future close() {
_channel.sink.close();
if (!_done.isCompleted) _done.complete();
Future.forEach(
_channelSubscriptions.toSet(),
(subscription) async {
_channelSubscriptions.remove(subscription);
await subscription.cancel();
},
);
return done;
}

Expand Down

0 comments on commit d4bf9a6

Please sign in to comment.