From ad032ec5cc766778d4062a7b2fa0ff5961879ca5 Mon Sep 17 00:00:00 2001 From: Michael Law <1365977+lawmicha@users.noreply.github.com> Date: Fri, 17 Nov 2023 10:35:15 -0500 Subject: [PATCH] feat(DataStore): DisableRealTimeUpdates flag --- .../DataStoreConfiguration+Helper.swift | 6 +- .../DataStoreConfiguration.swift | 6 +- .../Sync/RemoteSyncEngine.swift | 5 +- .../AWSIncomingEventReconciliationQueue.swift | 37 ++++---- ...onDisabledSubscriptionEventPublisher.swift | 31 +++++++ .../AWSModelReconciliationQueue.swift | 11 ++- ...kAWSIncomingEventReconciliationQueue.swift | 2 +- .../DataStoreConnectionScenario1Tests.swift | 93 +++++++++++++++++++ 8 files changed, 166 insertions(+), 25 deletions(-) create mode 100644 AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/OperationDisabledSubscriptionEventPublisher.swift diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Configuration/DataStoreConfiguration+Helper.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Configuration/DataStoreConfiguration+Helper.swift index ae530e0bb0..aed3c1d90f 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Configuration/DataStoreConfiguration+Helper.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Configuration/DataStoreConfiguration+Helper.swift @@ -36,7 +36,8 @@ extension DataStoreConfiguration { syncMaxRecords: UInt = DataStoreConfiguration.defaultSyncMaxRecords, syncPageSize: UInt = DataStoreConfiguration.defaultSyncPageSize, syncExpressions: [DataStoreSyncExpression] = [], - authModeStrategy: AuthModeStrategyType = .default + authModeStrategy: AuthModeStrategyType = .default, + disableRealTimeUpdates: @escaping () -> Bool = { false} ) -> DataStoreConfiguration { return DataStoreConfiguration(errorHandler: errorHandler, conflictHandler: conflictHandler, @@ -44,7 +45,8 @@ extension DataStoreConfiguration { syncMaxRecords: syncMaxRecords, syncPageSize: syncPageSize, syncExpressions: syncExpressions, - authModeStrategy: authModeStrategy) + authModeStrategy: authModeStrategy, + disableRealTimeUpdates: disableRealTimeUpdates) } /// The default configuration. diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Configuration/DataStoreConfiguration.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Configuration/DataStoreConfiguration.swift index f491600882..38fba046f3 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Configuration/DataStoreConfiguration.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Configuration/DataStoreConfiguration.swift @@ -70,13 +70,16 @@ public struct DataStoreConfiguration { /// Authorization mode strategy public var authModeStrategyType: AuthModeStrategyType + public let disableRealTimeUpdates: () -> Bool + init(errorHandler: @escaping DataStoreErrorHandler, conflictHandler: @escaping DataStoreConflictHandler, syncInterval: TimeInterval, syncMaxRecords: UInt, syncPageSize: UInt, syncExpressions: [DataStoreSyncExpression], - authModeStrategy: AuthModeStrategyType = .default) { + authModeStrategy: AuthModeStrategyType = .default, + disableRealTimeUpdates: @escaping () -> Bool = { false } ) { self.errorHandler = errorHandler self.conflictHandler = conflictHandler self.syncInterval = syncInterval @@ -84,6 +87,7 @@ public struct DataStoreConfiguration { self.syncPageSize = syncPageSize self.syncExpressions = syncExpressions self.authModeStrategyType = authModeStrategy + self.disableRealTimeUpdates = disableRealTimeUpdates } } diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift index a32648d653..f1801351b6 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift @@ -86,7 +86,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { authModeStrategy: resolvedAuthStrategy) let reconciliationQueueFactory = reconciliationQueueFactory ?? - AWSIncomingEventReconciliationQueue.init(modelSchemas:api:storageAdapter:syncExpressions:auth:authModeStrategy:modelReconciliationQueueFactory:) + AWSIncomingEventReconciliationQueue.init(modelSchemas:api:storageAdapter:syncExpressions:auth:authModeStrategy:modelReconciliationQueueFactory:disableRealTimeUpdates:) let initialSyncOrchestratorFactory = initialSyncOrchestratorFactory ?? AWSInitialSyncOrchestrator.init(dataStoreConfiguration:authModeStrategy:api:reconciliationQueue:storageAdapter:) @@ -289,7 +289,8 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { dataStoreConfiguration.syncExpressions, auth, authModeStrategy, - nil) + nil, + dataStoreConfiguration.disableRealTimeUpdates) reconciliationQueueSink = reconciliationQueue? .publisher .sink( diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift index c9994ccbdd..3111ac7932 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift @@ -10,6 +10,8 @@ import AWSPluginsCore import Combine import Foundation +typealias DisableRealTimeUpdates = () -> Bool + // Used for testing: typealias IncomingEventReconciliationQueueFactory = ([ModelSchema], @@ -18,7 +20,8 @@ typealias IncomingEventReconciliationQueueFactory = [DataStoreSyncExpression], AuthCategoryBehavior?, AuthModeStrategy, - ModelReconciliationQueueFactory? + ModelReconciliationQueueFactory?, + @escaping DisableRealTimeUpdates ) async -> IncomingEventReconciliationQueue final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueue { @@ -48,7 +51,8 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu syncExpressions: [DataStoreSyncExpression], auth: AuthCategoryBehavior? = nil, authModeStrategy: AuthModeStrategy, - modelReconciliationQueueFactory: ModelReconciliationQueueFactory? = nil) async { + modelReconciliationQueueFactory: ModelReconciliationQueueFactory? = nil, + disableRealTimeUpdates: @escaping () -> Bool = { false } ) async { self.modelSchemasCount = modelSchemas.count self.modelReconciliationQueueSinks.set([:]) self.eventReconciliationQueueTopic = CurrentValueSubject(.idle) @@ -78,13 +82,13 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu continue } let queue = await self.modelReconciliationQueueFactory(modelSchema, - storageAdapter, - api, - reconcileAndSaveQueue, - modelPredicate, - auth, - authModeStrategy, - nil) + storageAdapter, + api, + reconcileAndSaveQueue, + modelPredicate, + auth, + authModeStrategy, + disableRealTimeUpdates() ? OperationDisabledIncomingSubscriptionEventPublisher() : nil) reconciliationQueues.with { reconciliationQueues in reconciliationQueues[modelName] = queue @@ -190,14 +194,15 @@ extension AWSIncomingEventReconciliationQueue: DefaultLogger { // MARK: - Static factory extension AWSIncomingEventReconciliationQueue { - static let factory: IncomingEventReconciliationQueueFactory = { modelSchemas, api, storageAdapter, syncExpressions, auth, authModeStrategy, _ in + static let factory: IncomingEventReconciliationQueueFactory = { modelSchemas, api, storageAdapter, syncExpressions, auth, authModeStrategy, _, disableRealTimeUpdates in await AWSIncomingEventReconciliationQueue(modelSchemas: modelSchemas, - api: api, - storageAdapter: storageAdapter, - syncExpressions: syncExpressions, - auth: auth, - authModeStrategy: authModeStrategy, - modelReconciliationQueueFactory: nil) + api: api, + storageAdapter: storageAdapter, + syncExpressions: syncExpressions, + auth: auth, + authModeStrategy: authModeStrategy, + modelReconciliationQueueFactory: nil, + disableRealTimeUpdates: disableRealTimeUpdates) } } diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/OperationDisabledSubscriptionEventPublisher.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/OperationDisabledSubscriptionEventPublisher.swift new file mode 100644 index 0000000000..277b4685d8 --- /dev/null +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/OperationDisabledSubscriptionEventPublisher.swift @@ -0,0 +1,31 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +import Amplify +import AWSPluginsCore +import Combine + +final class OperationDisabledIncomingSubscriptionEventPublisher: IncomingSubscriptionEventPublisher { + + private let subscriptionEventSubject: PassthroughSubject + + var publisher: AnyPublisher { + return subscriptionEventSubject.eraseToAnyPublisher() + } + + init() { + self.subscriptionEventSubject = PassthroughSubject() + + let apiError = APIError.operationError(AppSyncErrorType.operationDisabled.rawValue, "", nil) + let dataStoreError = DataStoreError.api(apiError, nil) + subscriptionEventSubject.send(completion: .failure(dataStoreError)) + + } + + func cancel() { + } +} diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift index 7f34881c49..86c7aca2c9 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift @@ -209,8 +209,8 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue { return } if case let .api(error, _) = dataStoreError, - case let APIError.operationError(_, _, underlyingError) = error, - isOperationDisabledError(underlyingError) { + case let APIError.operationError(errorMessage, _, underlyingError) = error, + isOperationDisabledError(errorMessage, underlyingError) { log.verbose("[InitializeSubscription.3] AWSModelReconciliationQueue determined isOperationDisabledError \(modelSchema.name)") modelReconciliationQueueSubject.send(.disconnected(modelName: modelSchema.name, reason: .operationDisabled)) return @@ -284,7 +284,12 @@ extension AWSModelReconciliationQueue { return false } - private func isOperationDisabledError(_ error: Error?) -> Bool { + private func isOperationDisabledError(_ errorMessage: String?, _ error: Error?) -> Bool { + if let errorMessage = errorMessage, + case .operationDisabled = AppSyncErrorType(errorMessage) { + return true + } + if let responseError = error as? GraphQLResponseError, let graphQLError = graphqlErrors(from: responseError)?.first, let errorTypeValue = errorTypeValueFrom(graphQLError: graphQLError), diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockAWSIncomingEventReconciliationQueue.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockAWSIncomingEventReconciliationQueue.swift index 6be0a7ccfc..05b5750b8a 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockAWSIncomingEventReconciliationQueue.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockAWSIncomingEventReconciliationQueue.swift @@ -13,7 +13,7 @@ import Combine @testable import AWSDataStorePlugin class MockAWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueue { - static let factory: IncomingEventReconciliationQueueFactory = { modelSchemas, api, storageAdapter, syncExpressions, auth, _, _ in + static let factory: IncomingEventReconciliationQueueFactory = { modelSchemas, api, storageAdapter, syncExpressions, auth, _, _, _ in MockAWSIncomingEventReconciliationQueue(modelSchemas: modelSchemas, api: api, storageAdapter: storageAdapter, diff --git a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift index bb4d290b87..2fae34c5a8 100644 --- a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift +++ b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift @@ -42,6 +42,99 @@ class DataStoreConnectionScenario1Tests: SyncEngineIntegrationTestBase { let version: String = "1" } + + // MARK: - Disable Real Time Updates + + func testStartAndSync() async throws { + await setUp(withModels: TestModelRegistration(), + logLevel: .verbose, + dataStoreConfiguration: .custom(syncMaxRecords: 100, disableRealTimeUpdates: { true })) + try await startAmplifyAndWaitForSync() + } + + func testStartAndSyncAndRestartAndSync() async throws { + var disabledRealTimeUpdates = true + let disableRealTimeUpdates = { + disabledRealTimeUpdates + } + await setUp(withModels: TestModelRegistration(), + logLevel: .verbose, + dataStoreConfiguration: .custom(syncMaxRecords: 100, disableRealTimeUpdates: disableRealTimeUpdates)) + try await startAmplifyAndWaitForSync() + disabledRealTimeUpdates = false + try await Amplify.DataStore.stop() + try await Amplify.DataStore.start() + + let eventReceived = expectation(description: "DataStore ready event") + let sink = Amplify.Hub.publisher(for: .dataStore) + .filter { $0.eventName == HubPayload.EventName.DataStore.ready } + .sink { _ in + eventReceived.fulfill() + } + + try await Amplify.DataStore.start() + + await fulfillment(of: [eventReceived], timeout: 10) + } + + func testSaveReconciled() async throws { + await setUp(withModels: TestModelRegistration(), + logLevel: .verbose, + dataStoreConfiguration: .custom(syncMaxRecords: 100, disableRealTimeUpdates: { true })) + try await startAmplifyAndWaitForSync() + + let team = Team1(name: "name1") + let project = Project1(team: team) + let syncedTeamReceived = expectation(description: "received team from sync path") + var hubListener = Amplify.Hub.listen(to: .dataStore, + eventName: HubPayload.EventName.DataStore.syncReceived) { payload in + guard let mutationEvent = payload.data as? MutationEvent else { + XCTFail("Could not cast payload to mutation event") + return + } + + if let syncedTeam = try? mutationEvent.decodeModel() as? Team1, + syncedTeam == team { + syncedTeamReceived.fulfill() + } + } + guard try await HubListenerTestUtilities.waitForListener(with: hubListener, timeout: 5.0) else { + XCTFail("Listener not registered for hub") + return + } + + _ = try await Amplify.DataStore.save(team) + await fulfillment(of: [syncedTeamReceived], timeout: networkTimeout) + + let syncProjectReceived = expectation(description: "received project from sync path") + hubListener = Amplify.Hub.listen(to: .dataStore, + eventName: HubPayload.EventName.DataStore.syncReceived) { payload in + guard let mutationEvent = payload.data as? MutationEvent else { + XCTFail("Could not cast payload to mutation event") + return + } + + if let syncedProject = try? mutationEvent.decodeModel() as? Project1, + syncedProject == project { + syncProjectReceived.fulfill() + } + } + guard try await HubListenerTestUtilities.waitForListener(with: hubListener, timeout: 5.0) else { + XCTFail("Listener not registered for hub") + return + } + _ = try await Amplify.DataStore.save(project) + await fulfillment(of: [syncProjectReceived], timeout: networkTimeout) + + let queriedProjectOptional = try await Amplify.DataStore.query(Project1.self, byId: project.id) + guard let queriedProject = queriedProjectOptional else { + XCTFail("Failed") + return + } + XCTAssertEqual(queriedProject.id, project.id) + XCTAssertEqual(queriedProject.team, team) + } + func testSaveTeamAndProjectSyncToCloud() async throws { await setUp(withModels: TestModelRegistration()) try await startAmplifyAndWaitForSync()