From e38ba5316f1f0ccc3e673edd62cf4464d54101d8 Mon Sep 17 00:00:00 2001 From: Di Wu Date: Thu, 18 Apr 2024 10:53:30 -0700 Subject: [PATCH] fix(API): make sure unsubscribe is invoked when subscription cancelled (#3619) * fix(API): make sure unsubscribe is invoked when subscription cancelled * resolve comments --- .../AppSyncRealTimeClient.swift | 4 ++ .../AWSGraphQLSubscriptionTaskRunner.swift | 10 ++- .../GraphQLModelBasedTests.swift | 63 +++++++++++++++++++ 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift index c8bf7efcab..25a695f9b5 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift @@ -54,6 +54,10 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol { self.state.value == .connected } + internal var numberOfSubscriptions: Int { + self.subscriptions.count + } + /** Creates a new AppSyncRealTimeClient with endpoint, requestInterceptor and webSocketClient. - Parameters: diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift index 13cba6b888..12427ad9ab 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift @@ -44,6 +44,9 @@ public class AWSGraphQLSubscriptionTaskRunner: InternalTaskRunner, self.apiAuthProviderFactory = apiAuthProviderFactory } + /// When the top-level AmplifyThrowingSequence is canceled, this cancel method is invoked. + /// In this situation, we need to send the disconnected event because + /// the top-level AmplifyThrowingSequence is terminated immediately upon cancellation. public func cancel() { self.send(GraphQLSubscriptionEvent.connection(.disconnected)) Task { @@ -210,12 +213,7 @@ final public class AWSGraphQLSubscriptionOperation: GraphQLSubscri override public func cancel() { super.cancel() - - Task { [weak self] in - guard let self else { - return - } - + Task { guard let appSyncRealTimeClient = self.appSyncRealTimeClient else { return } diff --git a/AmplifyPlugins/API/Tests/APIHostApp/AWSAPIPluginFunctionalTests/GraphQLModelBasedTests.swift b/AmplifyPlugins/API/Tests/APIHostApp/AWSAPIPluginFunctionalTests/GraphQLModelBasedTests.swift index 770f598a7a..c5c6b87cb4 100644 --- a/AmplifyPlugins/API/Tests/APIHostApp/AWSAPIPluginFunctionalTests/GraphQLModelBasedTests.swift +++ b/AmplifyPlugins/API/Tests/APIHostApp/AWSAPIPluginFunctionalTests/GraphQLModelBasedTests.swift @@ -448,6 +448,65 @@ class GraphQLModelBasedTests: XCTestCase { await fulfillment(of: [progressInvoked], timeout: TestCommonConstants.networkTimeout) } + + /// Given: Several subscriptions with Amplify API plugin + /// When: Cancel subscriptions + /// Then: AppSync real time client automatically unsubscribe and remove the subscription + func testCancelledSubscription_automaticallyUnsubscribeAndRemoved() async throws { + let numberOfSubscription = 5 + let allSubscribedExpectation = expectation(description: "All subscriptions are subscribed") + allSubscribedExpectation.expectedFulfillmentCount = numberOfSubscription + + let subscriptions = (0..<5).map { _ in + Amplify.API.subscribe(request: .subscription(of: Comment.self, type: .onCreate)) + } + subscriptions.forEach { subscription in + Task { + do { + for try await subscriptionEvent in subscription { + switch subscriptionEvent { + case .connection(let state): + switch state { + case .connecting: + break + case .connected: + allSubscribedExpectation.fulfill() + case .disconnected: + break + } + case .data(let result): + switch result { + case .success: break + case .failure(let error): + XCTFail("\(error)") + } + } + } + } catch { + XCTFail("Unexpected subscription failure") + } + } + } + + await fulfillment(of: [allSubscribedExpectation], timeout: 3) + if let appSyncRealTimeClientFactory = + getUnderlyingAPIPlugin()?.appSyncRealTimeClientFactory as? AppSyncRealTimeClientFactory, + let appSyncRealTimeClient = + await appSyncRealTimeClientFactory.apiToClientCache.values.first as? AppSyncRealTimeClient + { + var appSyncSubscriptions = await appSyncRealTimeClient.numberOfSubscriptions + XCTAssertEqual(appSyncSubscriptions, numberOfSubscription) + + subscriptions.forEach { $0.cancel() } + try await Task.sleep(seconds: 2) + appSyncSubscriptions = await appSyncRealTimeClient.numberOfSubscriptions + XCTAssertEqual(appSyncSubscriptions, 0) + + } else { + XCTFail("There should be at least one AppSyncRealTimeClient instance") + } + } + // MARK: Helpers func createPost(id: String, title: String) async throws -> Post? { @@ -499,4 +558,8 @@ class GraphQLModelBasedTests: XCTestCase { throw error } } + + func getUnderlyingAPIPlugin() -> AWSAPIPlugin? { + return Amplify.API.plugins["awsAPIPlugin"] as? AWSAPIPlugin + } }