Skip to content

Commit

Permalink
feat(DataStore): DisableRealTimeUpdates flag
Browse files Browse the repository at this point in the history
  • Loading branch information
lawmicha committed Nov 17, 2023
1 parent b622589 commit ad032ec
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,17 @@ extension DataStoreConfiguration {
syncMaxRecords: UInt = DataStoreConfiguration.defaultSyncMaxRecords,
syncPageSize: UInt = DataStoreConfiguration.defaultSyncPageSize,
syncExpressions: [DataStoreSyncExpression] = [],
authModeStrategy: AuthModeStrategyType = .default
authModeStrategy: AuthModeStrategyType = .default,
disableRealTimeUpdates: @escaping () -> Bool = { false}
) -> DataStoreConfiguration {
return DataStoreConfiguration(errorHandler: errorHandler,
conflictHandler: conflictHandler,
syncInterval: syncInterval,
syncMaxRecords: syncMaxRecords,
syncPageSize: syncPageSize,
syncExpressions: syncExpressions,
authModeStrategy: authModeStrategy)
authModeStrategy: authModeStrategy,
disableRealTimeUpdates: disableRealTimeUpdates)
}

/// The default configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,24 @@ public struct DataStoreConfiguration {
/// Authorization mode strategy
public var authModeStrategyType: AuthModeStrategyType

public let disableRealTimeUpdates: () -> Bool

init(errorHandler: @escaping DataStoreErrorHandler,
conflictHandler: @escaping DataStoreConflictHandler,
syncInterval: TimeInterval,
syncMaxRecords: UInt,
syncPageSize: UInt,
syncExpressions: [DataStoreSyncExpression],
authModeStrategy: AuthModeStrategyType = .default) {
authModeStrategy: AuthModeStrategyType = .default,
disableRealTimeUpdates: @escaping () -> Bool = { false } ) {
self.errorHandler = errorHandler
self.conflictHandler = conflictHandler
self.syncInterval = syncInterval
self.syncMaxRecords = syncMaxRecords
self.syncPageSize = syncPageSize
self.syncExpressions = syncExpressions
self.authModeStrategyType = authModeStrategy
self.disableRealTimeUpdates = disableRealTimeUpdates
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
authModeStrategy: resolvedAuthStrategy)

let reconciliationQueueFactory = reconciliationQueueFactory ??
AWSIncomingEventReconciliationQueue.init(modelSchemas:api:storageAdapter:syncExpressions:auth:authModeStrategy:modelReconciliationQueueFactory:)
AWSIncomingEventReconciliationQueue.init(modelSchemas:api:storageAdapter:syncExpressions:auth:authModeStrategy:modelReconciliationQueueFactory:disableRealTimeUpdates:)

let initialSyncOrchestratorFactory = initialSyncOrchestratorFactory ??
AWSInitialSyncOrchestrator.init(dataStoreConfiguration:authModeStrategy:api:reconciliationQueue:storageAdapter:)
Expand Down Expand Up @@ -289,7 +289,8 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior {
dataStoreConfiguration.syncExpressions,
auth,
authModeStrategy,
nil)
nil,
dataStoreConfiguration.disableRealTimeUpdates)
reconciliationQueueSink = reconciliationQueue?
.publisher
.sink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import AWSPluginsCore
import Combine
import Foundation

typealias DisableRealTimeUpdates = () -> Bool

// Used for testing:
typealias IncomingEventReconciliationQueueFactory =
([ModelSchema],
Expand All @@ -18,7 +20,8 @@ typealias IncomingEventReconciliationQueueFactory =
[DataStoreSyncExpression],
AuthCategoryBehavior?,
AuthModeStrategy,
ModelReconciliationQueueFactory?
ModelReconciliationQueueFactory?,
@escaping DisableRealTimeUpdates
) async -> IncomingEventReconciliationQueue

final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueue {
Expand Down Expand Up @@ -48,7 +51,8 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
syncExpressions: [DataStoreSyncExpression],
auth: AuthCategoryBehavior? = nil,
authModeStrategy: AuthModeStrategy,
modelReconciliationQueueFactory: ModelReconciliationQueueFactory? = nil) async {
modelReconciliationQueueFactory: ModelReconciliationQueueFactory? = nil,
disableRealTimeUpdates: @escaping () -> Bool = { false } ) async {
self.modelSchemasCount = modelSchemas.count
self.modelReconciliationQueueSinks.set([:])
self.eventReconciliationQueueTopic = CurrentValueSubject<IncomingEventReconciliationQueueEvent, DataStoreError>(.idle)
Expand Down Expand Up @@ -78,13 +82,13 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
continue
}
let queue = await self.modelReconciliationQueueFactory(modelSchema,
storageAdapter,
api,
reconcileAndSaveQueue,
modelPredicate,
auth,
authModeStrategy,
nil)
storageAdapter,
api,
reconcileAndSaveQueue,
modelPredicate,
auth,
authModeStrategy,
disableRealTimeUpdates() ? OperationDisabledIncomingSubscriptionEventPublisher() : nil)

reconciliationQueues.with { reconciliationQueues in
reconciliationQueues[modelName] = queue
Expand Down Expand Up @@ -190,14 +194,15 @@ extension AWSIncomingEventReconciliationQueue: DefaultLogger {

// MARK: - Static factory
extension AWSIncomingEventReconciliationQueue {
static let factory: IncomingEventReconciliationQueueFactory = { modelSchemas, api, storageAdapter, syncExpressions, auth, authModeStrategy, _ in
static let factory: IncomingEventReconciliationQueueFactory = { modelSchemas, api, storageAdapter, syncExpressions, auth, authModeStrategy, _, disableRealTimeUpdates in
await AWSIncomingEventReconciliationQueue(modelSchemas: modelSchemas,
api: api,
storageAdapter: storageAdapter,
syncExpressions: syncExpressions,
auth: auth,
authModeStrategy: authModeStrategy,
modelReconciliationQueueFactory: nil)
api: api,
storageAdapter: storageAdapter,
syncExpressions: syncExpressions,
auth: auth,
authModeStrategy: authModeStrategy,
modelReconciliationQueueFactory: nil,
disableRealTimeUpdates: disableRealTimeUpdates)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Amplify
import AWSPluginsCore
import Combine

final class OperationDisabledIncomingSubscriptionEventPublisher: IncomingSubscriptionEventPublisher {

private let subscriptionEventSubject: PassthroughSubject<IncomingSubscriptionEventPublisherEvent, DataStoreError>

var publisher: AnyPublisher<IncomingSubscriptionEventPublisherEvent, DataStoreError> {
return subscriptionEventSubject.eraseToAnyPublisher()
}

init() {
self.subscriptionEventSubject = PassthroughSubject<IncomingSubscriptionEventPublisherEvent, DataStoreError>()

let apiError = APIError.operationError(AppSyncErrorType.operationDisabled.rawValue, "", nil)
let dataStoreError = DataStoreError.api(apiError, nil)
subscriptionEventSubject.send(completion: .failure(dataStoreError))

}

func cancel() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
return
}
if case let .api(error, _) = dataStoreError,
case let APIError.operationError(_, _, underlyingError) = error,
isOperationDisabledError(underlyingError) {
case let APIError.operationError(errorMessage, _, underlyingError) = error,
isOperationDisabledError(errorMessage, underlyingError) {
log.verbose("[InitializeSubscription.3] AWSModelReconciliationQueue determined isOperationDisabledError \(modelSchema.name)")
modelReconciliationQueueSubject.send(.disconnected(modelName: modelSchema.name, reason: .operationDisabled))
return
Expand Down Expand Up @@ -284,7 +284,12 @@ extension AWSModelReconciliationQueue {
return false
}

private func isOperationDisabledError(_ error: Error?) -> Bool {
private func isOperationDisabledError(_ errorMessage: String?, _ error: Error?) -> Bool {
if let errorMessage = errorMessage,
case .operationDisabled = AppSyncErrorType(errorMessage) {
return true
}

if let responseError = error as? GraphQLResponseError<ResponseType>,
let graphQLError = graphqlErrors(from: responseError)?.first,
let errorTypeValue = errorTypeValueFrom(graphQLError: graphQLError),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import Combine
@testable import AWSDataStorePlugin

class MockAWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueue {
static let factory: IncomingEventReconciliationQueueFactory = { modelSchemas, api, storageAdapter, syncExpressions, auth, _, _ in
static let factory: IncomingEventReconciliationQueueFactory = { modelSchemas, api, storageAdapter, syncExpressions, auth, _, _, _ in
MockAWSIncomingEventReconciliationQueue(modelSchemas: modelSchemas,
api: api,
storageAdapter: storageAdapter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,99 @@ class DataStoreConnectionScenario1Tests: SyncEngineIntegrationTestBase {
let version: String = "1"
}


// MARK: - Disable Real Time Updates

func testStartAndSync() async throws {
await setUp(withModels: TestModelRegistration(),
logLevel: .verbose,
dataStoreConfiguration: .custom(syncMaxRecords: 100, disableRealTimeUpdates: { true }))
try await startAmplifyAndWaitForSync()
}

func testStartAndSyncAndRestartAndSync() async throws {
var disabledRealTimeUpdates = true
let disableRealTimeUpdates = {
disabledRealTimeUpdates
}
await setUp(withModels: TestModelRegistration(),
logLevel: .verbose,
dataStoreConfiguration: .custom(syncMaxRecords: 100, disableRealTimeUpdates: disableRealTimeUpdates))
try await startAmplifyAndWaitForSync()
disabledRealTimeUpdates = false
try await Amplify.DataStore.stop()
try await Amplify.DataStore.start()

let eventReceived = expectation(description: "DataStore ready event")
let sink = Amplify.Hub.publisher(for: .dataStore)
.filter { $0.eventName == HubPayload.EventName.DataStore.ready }
.sink { _ in
eventReceived.fulfill()
}

try await Amplify.DataStore.start()

await fulfillment(of: [eventReceived], timeout: 10)
}

func testSaveReconciled() async throws {
await setUp(withModels: TestModelRegistration(),
logLevel: .verbose,
dataStoreConfiguration: .custom(syncMaxRecords: 100, disableRealTimeUpdates: { true }))
try await startAmplifyAndWaitForSync()

let team = Team1(name: "name1")
let project = Project1(team: team)
let syncedTeamReceived = expectation(description: "received team from sync path")
var hubListener = Amplify.Hub.listen(to: .dataStore,
eventName: HubPayload.EventName.DataStore.syncReceived) { payload in
guard let mutationEvent = payload.data as? MutationEvent else {
XCTFail("Could not cast payload to mutation event")
return
}

if let syncedTeam = try? mutationEvent.decodeModel() as? Team1,
syncedTeam == team {
syncedTeamReceived.fulfill()
}
}
guard try await HubListenerTestUtilities.waitForListener(with: hubListener, timeout: 5.0) else {
XCTFail("Listener not registered for hub")
return
}

_ = try await Amplify.DataStore.save(team)
await fulfillment(of: [syncedTeamReceived], timeout: networkTimeout)

let syncProjectReceived = expectation(description: "received project from sync path")
hubListener = Amplify.Hub.listen(to: .dataStore,
eventName: HubPayload.EventName.DataStore.syncReceived) { payload in
guard let mutationEvent = payload.data as? MutationEvent else {
XCTFail("Could not cast payload to mutation event")
return
}

if let syncedProject = try? mutationEvent.decodeModel() as? Project1,
syncedProject == project {
syncProjectReceived.fulfill()
}
}
guard try await HubListenerTestUtilities.waitForListener(with: hubListener, timeout: 5.0) else {
XCTFail("Listener not registered for hub")
return
}
_ = try await Amplify.DataStore.save(project)
await fulfillment(of: [syncProjectReceived], timeout: networkTimeout)

let queriedProjectOptional = try await Amplify.DataStore.query(Project1.self, byId: project.id)
guard let queriedProject = queriedProjectOptional else {
XCTFail("Failed")
return
}
XCTAssertEqual(queriedProject.id, project.id)
XCTAssertEqual(queriedProject.team, team)
}

func testSaveTeamAndProjectSyncToCloud() async throws {
await setUp(withModels: TestModelRegistration())
try await startAmplifyAndWaitForSync()
Expand Down

0 comments on commit ad032ec

Please sign in to comment.