diff --git a/packages/reown_core/lib/relay_client/json_rpc_2/src/client.dart b/packages/reown_core/lib/relay_client/json_rpc_2/src/client.dart index fd27377..70bde91 100644 --- a/packages/reown_core/lib/relay_client/json_rpc_2/src/client.dart +++ b/packages/reown_core/lib/relay_client/json_rpc_2/src/client.dart @@ -17,8 +17,6 @@ import 'utils.dart'; /// [sendNotification] if no response is expected. class Client { final StreamChannel _channel; - late final _stream = _channel.stream.asBroadcastStream(); - final _channelSubscriptions = >{}; /// The next request id. var _id = 0; @@ -51,11 +49,9 @@ class Client { /// /// Note that the client won't begin listening to [responses] until /// [Client.listen] is called. - factory Client(StreamChannel channel) { - return Client.withoutJson( - jsonDocument.bind(channel).transformStream(ignoreFormatExceptions), - ); - } + Client(StreamChannel channel) + : this.withoutJson( + jsonDocument.bind(channel).transformStream(ignoreFormatExceptions)); /// Creates a [Client] that communicates using decoded messages over /// [channel]. @@ -84,22 +80,13 @@ class Client { /// /// [listen] may only be called once. Future listen() { - late final StreamSubscription subscription; - subscription = _stream.listen( - _handleResponse, - onError: (error, stackTrace) { - _done.completeError(error, stackTrace); - _channel.sink.close(); - }, - onDone: () { - if (!_done.isCompleted) { - _done.complete(); - } - subscription.cancel(); - _channelSubscriptions.remove(subscription); - close(); - }, - ); + _channel.stream.listen(_handleResponse, onError: (error, stackTrace) { + _done.completeError(error, stackTrace); + _channel.sink.close(); + }, onDone: () { + if (!_done.isCompleted) _done.complete(); + close(); + }); return done; } @@ -110,13 +97,6 @@ class Client { Future close() { _channel.sink.close(); if (!_done.isCompleted) _done.complete(); - Future.forEach( - _channelSubscriptions.toSet(), - (subscription) async { - _channelSubscriptions.remove(subscription); - await subscription.cancel(); - }, - ); return done; } diff --git a/packages/reown_core/lib/relay_client/json_rpc_2/src/peer.dart b/packages/reown_core/lib/relay_client/json_rpc_2/src/peer.dart index f7c98c8..b9fada4 100644 --- a/packages/reown_core/lib/relay_client/json_rpc_2/src/peer.dart +++ b/packages/reown_core/lib/relay_client/json_rpc_2/src/peer.dart @@ -18,8 +18,6 @@ import 'utils.dart'; /// communication channel and expects to connect to a peer that does the same. class Peer implements Client, Server { final StreamChannel _channel; - late final _stream = _channel.stream.asBroadcastStream(); - final _channelSubscriptions = >{}; /// The underlying client that handles request-sending and response-receiving /// logic. @@ -61,17 +59,12 @@ class Peer implements Client, Server { /// some requests which are not conformant with the JSON-RPC 2.0 /// specification. In particular, requests missing the `jsonrpc` parameter /// will be accepted. - factory Peer( - StreamChannel channel, { - ErrorCallback? onUnhandledError, - bool strictProtocolChecks = true, - }) { - return Peer.withoutJson( - jsonDocument.bind(channel).transform(respondToFormatExceptions), - onUnhandledError: onUnhandledError, - strictProtocolChecks: strictProtocolChecks, - ); - } + Peer(StreamChannel channel, + {ErrorCallback? onUnhandledError, bool strictProtocolChecks = true}) + : this.withoutJson( + jsonDocument.bind(channel).transform(respondToFormatExceptions), + onUnhandledError: onUnhandledError, + strictProtocolChecks: strictProtocolChecks); /// Creates a [Peer] that communicates using decoded messages over [channel]. /// @@ -88,22 +81,14 @@ class Peer implements Client, Server { /// some requests which are not conformant with the JSON-RPC 2.0 /// specification. In particular, requests missing the `jsonrpc` parameter /// will be accepted. - Peer.withoutJson( - this._channel, { - ErrorCallback? onUnhandledError, - bool strictProtocolChecks = true, - }) { + Peer.withoutJson(this._channel, + {ErrorCallback? onUnhandledError, bool strictProtocolChecks = true}) { _server = Server.withoutJson( - StreamChannel( - _serverIncomingForwarder.stream, - _channel.sink, - ), - onUnhandledError: onUnhandledError, - strictProtocolChecks: strictProtocolChecks, - ); + StreamChannel(_serverIncomingForwarder.stream, _channel.sink), + onUnhandledError: onUnhandledError, + strictProtocolChecks: strictProtocolChecks); _client = Client.withoutJson( - StreamChannel(_clientIncomingForwarder.stream, _channel.sink), - ); + StreamChannel(_clientIncomingForwarder.stream, _channel.sink)); } // Client methods. @@ -135,8 +120,7 @@ class Peer implements Client, Server { Future listen() { _client.listen(); _server.listen(); - late final StreamSubscription subscription; - subscription = _stream.listen((message) { + _channel.stream.listen((message) { if (message is Map) { if (message.containsKey('result') || message.containsKey('error')) { _clientIncomingForwarder.add(message); @@ -159,12 +143,7 @@ class Peer implements Client, Server { } }, onError: (error, stackTrace) { _serverIncomingForwarder.addError(error, stackTrace); - }, onDone: () { - subscription.cancel(); - _channelSubscriptions.remove(subscription); - close(); - }); - _channelSubscriptions.add(subscription); + }, onDone: close); return done; } @@ -172,13 +151,6 @@ class Peer implements Client, Server { Future close() { _client.close(); _server.close(); - Future.forEach( - _channelSubscriptions.toSet(), - (subscription) async { - _channelSubscriptions.remove(subscription); - await subscription.cancel(); - }, - ); return done; } } diff --git a/packages/reown_core/lib/relay_client/json_rpc_2/src/server.dart b/packages/reown_core/lib/relay_client/json_rpc_2/src/server.dart index caa1ca7..7d8202e 100644 --- a/packages/reown_core/lib/relay_client/json_rpc_2/src/server.dart +++ b/packages/reown_core/lib/relay_client/json_rpc_2/src/server.dart @@ -29,8 +29,6 @@ typedef ErrorCallback = void Function(dynamic error, dynamic stackTrace); /// time, or even for a single method to be invoked multiple times at once. class Server { final StreamChannel _channel; - late final _stream = _channel.stream.asBroadcastStream(); - final _channelSubscriptions = >{}; /// The methods registered for this server. final _methods = {}; @@ -83,17 +81,12 @@ class Server { /// If [strictProtocolChecks] is false, this [Server] will accept some /// requests which are not conformant with the JSON-RPC 2.0 specification. In /// particular, requests missing the `jsonrpc` parameter will be accepted. - factory Server( - StreamChannel channel, { - ErrorCallback? onUnhandledError, - bool strictProtocolChecks = true, - }) { - return Server.withoutJson( - jsonDocument.bind(channel).transform(respondToFormatExceptions), - onUnhandledError: onUnhandledError, - strictProtocolChecks: strictProtocolChecks, - ); - } + Server(StreamChannel channel, + {ErrorCallback? onUnhandledError, bool strictProtocolChecks = true}) + : this.withoutJson( + jsonDocument.bind(channel).transform(respondToFormatExceptions), + onUnhandledError: onUnhandledError, + strictProtocolChecks: strictProtocolChecks); /// Creates a [Server] that communicates using decoded messages over /// [channel]. @@ -110,11 +103,8 @@ class Server { /// If [strictProtocolChecks] is false, this [Server] will accept some /// requests which are not conformant with the JSON-RPC 2.0 specification. In /// particular, requests missing the `jsonrpc` parameter will be accepted. - Server.withoutJson( - this._channel, { - this.onUnhandledError, - this.strictProtocolChecks = true, - }); + Server.withoutJson(this._channel, + {this.onUnhandledError, this.strictProtocolChecks = true}); /// Starts listening to the underlying stream. /// @@ -123,23 +113,12 @@ class Server { /// /// [listen] may only be called once. Future listen() { - late final StreamSubscription subscription; - subscription = _stream.listen( - _handleRequest, - onError: (error, stackTrace) { - _done.completeError(error, stackTrace); - _channel.sink.close(); - }, - onDone: () { - if (!_done.isCompleted) { - _done.complete(); - } - subscription.cancel(); - _channelSubscriptions.remove(subscription); - close(); - }, - ); - _channelSubscriptions.add(subscription); + _channel.stream.listen(_handleRequest, onError: (error, stackTrace) { + _done.completeError(error, stackTrace); + _channel.sink.close(); + }, onDone: () { + if (!_done.isCompleted) _done.complete(); + }); return done; } @@ -150,13 +129,6 @@ class Server { Future close() { _channel.sink.close(); if (!_done.isCompleted) _done.complete(); - Future.forEach( - _channelSubscriptions.toSet(), - (subscription) async { - _channelSubscriptions.remove(subscription); - await subscription.cancel(); - }, - ); return done; }