Skip to content

Commit

Permalink
fix(datastore): full sync when sync predicate changes
Browse files Browse the repository at this point in the history
  • Loading branch information
lawmicha committed Nov 30, 2023
1 parent b9b5918 commit 3f8504c
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import Foundation
/// - `Temporal.Time`
/// - Warning: Although this has `public` access, it is intended for internal use and should not be used directly
/// by host applications. The behavior of this may change without warning.
public protocol Persistable {}
public protocol Persistable: Encodable {}

extension Bool: Persistable {}
extension Double: Persistable {}
Expand Down
60 changes: 58 additions & 2 deletions Amplify/Categories/DataStore/Query/QueryOperator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import Foundation

public enum QueryOperator {
public enum QueryOperator: Encodable {
case notEqual(_ value: Persistable?)
case equals(_ value: Persistable?)
case lessOrEqual(_ value: Persistable)
Expand All @@ -18,7 +18,7 @@ public enum QueryOperator {
case notContains(_ value: String)
case between(start: Persistable, end: Persistable)
case beginsWith(_ value: String)

public func evaluate(target: Any) -> Bool {
switch self {
case .notEqual(let predicateValue):
Expand Down Expand Up @@ -51,4 +51,60 @@ public enum QueryOperator {
}
return false
}

private enum CodingKeys: String, CodingKey {
case type
case value
case start
case end
}

public func encode(to encoder: Encoder) throws {
var container = encoder.container(keyedBy: CodingKeys.self)

switch self {
case .notEqual(let value):
try container.encode("notEqual", forKey: .type)
if let value = value {
try container.encode(value, forKey: .value)
}
case .equals(let value):
try container.encode("equals", forKey: .type)
if let value = value {
try container.encode(value, forKey: .value)
}
case .lessOrEqual(let value):
try container.encode("lessOrEqual", forKey: .type)
try container.encode(value, forKey: .value)

case .lessThan(let value):
try container.encode("lessThan", forKey: .type)
try container.encode(value, forKey: .value)

case .greaterOrEqual(let value):
try container.encode("greaterOrEqual", forKey: .type)
try container.encode(value, forKey: .value)

case .greaterThan(let value):
try container.encode("greaterThan", forKey: .type)
try container.encode(value, forKey: .value)

case .contains(let value):
try container.encode("contains", forKey: .type)
try container.encode(value, forKey: .value)

case .notContains(let value):
try container.encode("notContains", forKey: .type)
try container.encode(value, forKey: .value)

case .between(let start, let end):
try container.encode("between", forKey: .type)
try container.encode(start, forKey: .start)
try container.encode(end, forKey: .end)

case .beginsWith(let value):
try container.encode("beginsWith", forKey: .type)
try container.encode(value, forKey: .value)
}
}
}
38 changes: 33 additions & 5 deletions Amplify/Categories/DataStore/Query/QueryPredicate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import Foundation

/// Protocol that indicates concrete types conforming to it can be used a predicate member.
public protocol QueryPredicate: Evaluable {}
public protocol QueryPredicate: Evaluable, Encodable {}

public enum QueryPredicateGroupType: String {
public enum QueryPredicateGroupType: String, Encodable {
case and
case or
case not
Expand All @@ -26,14 +26,14 @@ public func not<Predicate: QueryPredicate>(_ predicate: Predicate) -> QueryPredi
/// The case `.all` is a predicate used as an argument to select all of a single modeltype. We
/// chose `.all` instead of `nil` because we didn't want to use the implicit nature of `nil` to
/// specify an action applies to an entire data set.
public enum QueryPredicateConstant: QueryPredicate {
public enum QueryPredicateConstant: QueryPredicate, Encodable {
case all
public func evaluate(target: Model) -> Bool {
return true
}
}

public class QueryPredicateGroup: QueryPredicate {
public class QueryPredicateGroup: QueryPredicate, Encodable {
public internal(set) var type: QueryPredicateGroupType
public internal(set) var predicates: [QueryPredicate]

Expand Down Expand Up @@ -92,9 +92,37 @@ public class QueryPredicateGroup: QueryPredicate {
return !predicate.evaluate(target: target)
}
}

// MARK: - Encodable conformance

private enum CodingKeys: String, CodingKey {
case type
case predicates
}

struct AnyQueryPredicate: Encodable {
private let _encode: (Encoder) throws -> Void

init(_ base: QueryPredicate) {
_encode = base.encode
}

func encode(to encoder: Encoder) throws {
try _encode(encoder)
}
}

public func encode(to encoder: Encoder) throws {
var container = encoder.container(keyedBy: CodingKeys.self)
try container.encode(type.rawValue, forKey: .type)

let anyPredicates = predicates.map(AnyQueryPredicate.init)
try container.encode(anyPredicates, forKey: .predicates)
}

}

public class QueryPredicateOperation: QueryPredicate {
public class QueryPredicateOperation: QueryPredicate, Encodable {

public let field: String
public let `operator`: QueryOperator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ extension ModelSyncMetadata {
public enum CodingKeys: String, ModelKey {
case id
case lastSync
case syncPredicate
}

public static let keys = CodingKeys.self
Expand All @@ -27,7 +28,8 @@ extension ModelSyncMetadata {

definition.fields(
.id(),
.field(keys.lastSync, is: .optional, ofType: .int)
.field(keys.lastSync, is: .optional, ofType: .int),
.field(keys.syncPredicate, is: .optional, ofType: .string)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ public struct ModelSyncMetadata: Model {

/// The timestamp (in Unix seconds) at which the last sync was started, as reported by the service
public var lastSync: Int?

/// The sync predicate for this model, extracted out from the sync expression.
public var syncPredicate: String?

public init(id: String,
lastSync: Int?) {
lastSync: Int?,
syncPredicate: String? = nil) {
self.id = id
self.lastSync = lastSync
self.syncPredicate = syncPredicate
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,32 @@ final class InitialSyncOperation: AsynchronousOperation {
private var syncPageSize: UInt {
return dataStoreConfiguration.syncPageSize
}


private var syncPredicate: QueryPredicate? {
return dataStoreConfiguration.syncExpressions.first {
$0.modelSchema.name == self.modelSchema.name
}?.modelPredicate()
}

private var syncPredicateString: String? {
guard let syncPredicate = syncPredicate,
let data = try? syncPredicateEncoder.encode(syncPredicate) else {
return nil
}
return String(data: data, encoding: .utf8)
}

private lazy var _syncPredicateEncoder: JSONEncoder = {
var encoder = JSONEncoder()
encoder.dateEncodingStrategy = ModelDateFormatting.encodingStrategy
encoder.outputFormatting = [.sortedKeys]
return encoder
}()

var syncPredicateEncoder: JSONEncoder {
_syncPredicateEncoder
}

private let initialSyncOperationTopic: PassthroughSubject<InitialSyncOperationEvent, DataStoreError>
var publisher: AnyPublisher<InitialSyncOperationEvent, DataStoreError> {
return initialSyncOperationTopic.eraseToAnyPublisher()
Expand Down Expand Up @@ -59,36 +84,13 @@ final class InitialSyncOperation: AsynchronousOperation {
}

log.info("Beginning sync for \(modelSchema.name)")
let lastSyncTime = getLastSyncTime()
let syncType: SyncType = lastSyncTime == nil ? .fullSync : .deltaSync
initialSyncOperationTopic.send(.started(modelName: modelSchema.name, syncType: syncType))
let lastSyncMetadata = getLastSyncMetadata()
let lastSyncTime = getLastSyncTime(lastSyncMetadata)
Task {
await query(lastSyncTime: lastSyncTime)
}
}

private func getLastSyncTime() -> Int? {
guard !isCancelled else {
finish(result: .successfulVoid)
return nil
}

let lastSyncMetadata = getLastSyncMetadata()
guard let lastSync = lastSyncMetadata?.lastSync else {
return nil
}

let lastSyncDate = Date(timeIntervalSince1970: TimeInterval.milliseconds(Double(lastSync)))
let secondsSinceLastSync = (lastSyncDate.timeIntervalSinceNow * -1)
if secondsSinceLastSync < 0 {
log.info("lastSyncTime was in the future, assuming base query")
return nil
}

let shouldDoDeltaQuery = secondsSinceLastSync < dataStoreConfiguration.syncInterval
return shouldDoDeltaQuery ? lastSync : nil
}

private func getLastSyncMetadata() -> ModelSyncMetadata? {
guard !isCancelled else {
finish(result: .successfulVoid)
Expand All @@ -108,6 +110,47 @@ final class InitialSyncOperation: AsynchronousOperation {
return nil
}
}

private func getLastSyncTime(_ lastSyncMetadata: ModelSyncMetadata?) -> Int? {
let syncType: SyncType
let lastSyncTime: Int?
if syncPredicateChanged(self.syncPredicateString, lastSyncMetadata?.syncPredicate) {
log.info("SyncPredicate for \(modelSchema.name) changed, performing full sync.")
lastSyncTime = nil
syncType = .fullSync
} else {
lastSyncTime = getLastSyncTime(lastSync: lastSyncMetadata?.lastSync)
syncType = lastSyncTime == nil ? .fullSync : .deltaSync
}
initialSyncOperationTopic.send(.started(modelName: modelSchema.name, syncType: syncType))
return lastSyncTime
}

private func syncPredicateChanged(_ lastSyncPredicate: String?, _ currentSyncPredicate: String?) -> Bool {
switch (lastSyncPredicate, currentSyncPredicate) {
case (.some, .some):
return lastSyncPredicate != currentSyncPredicate
case (.some, .none), (.none, .some):
return true
case (.none, .none):
return false
}
}

private func getLastSyncTime(lastSync: Int?) -> Int? {
guard let lastSync = lastSync else {
return nil
}
let lastSyncDate = Date(timeIntervalSince1970: TimeInterval.milliseconds(Double(lastSync)))
let secondsSinceLastSync = (lastSyncDate.timeIntervalSinceNow * -1)
if secondsSinceLastSync < 0 {
log.info("lastSyncTime was in the future, assuming base query")
return nil
}

let shouldDoDeltaQuery = secondsSinceLastSync < dataStoreConfiguration.syncInterval
return shouldDoDeltaQuery ? lastSync : nil
}

private func query(lastSyncTime: Int?, nextToken: String? = nil) async {
guard !isCancelled else {
Expand All @@ -121,11 +164,6 @@ final class InitialSyncOperation: AsynchronousOperation {
}
let minSyncPageSize = Int(min(syncMaxRecords - recordsReceived, syncPageSize))
let limit = minSyncPageSize < 0 ? Int(syncPageSize) : minSyncPageSize
let syncExpression = dataStoreConfiguration.syncExpressions.first {
$0.modelSchema.name == modelSchema.name
}
let queryPredicate = syncExpression?.modelPredicate()

let completionListener: GraphQLOperation<SyncQueryResult>.ResultListener = { result in
switch result {
case .failure(let apiError):
Expand All @@ -146,7 +184,7 @@ final class InitialSyncOperation: AsynchronousOperation {

RetryableGraphQLOperation(requestFactory: {
GraphQLRequest<SyncQueryResult>.syncQuery(modelSchema: self.modelSchema,
where: queryPredicate,
where: self.syncPredicate,
limit: limit,
nextToken: nextToken,
lastSync: lastSyncTime,
Expand Down Expand Up @@ -208,8 +246,10 @@ final class InitialSyncOperation: AsynchronousOperation {
finish(result: .failure(DataStoreError.nilStorageAdapter()))
return
}

let syncMetadata = ModelSyncMetadata(id: modelSchema.name, lastSync: lastSyncTime)

let syncMetadata = ModelSyncMetadata(id: modelSchema.name,
lastSync: lastSyncTime,
syncPredicate: syncPredicateString)
storageAdapter.save(syncMetadata, condition: nil, eagerLoad: true) { result in
switch result {
case .failure(let dataStoreError):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ import XCTest

class QueryPredicateTests: XCTestCase {

private lazy var _encoder: JSONEncoder = {
var encoder = JSONEncoder()
encoder.dateEncodingStrategy = ModelDateFormatting.encodingStrategy
encoder.outputFormatting = [.sortedKeys]
return encoder
}()

var encoder: JSONEncoder {
_encoder
}

/// it should create a simple `QueryPredicateOperation`
func testSingleQueryPredicateOperation() {
let post = Post.keys
Expand All @@ -36,6 +47,10 @@ class QueryPredicateTests: XCTestCase {
)

XCTAssertEqual(predicate, expected)

let predicateString = String(data: try! encoder.encode(predicate), encoding: .utf8)!
let expectedString = String(data: try! encoder.encode(expected), encoding: .utf8)!
XCTAssert(predicateString == expectedString)
}

/// it should create a valid `QueryPredicateOperation` with nested predicates
Expand Down Expand Up @@ -68,6 +83,10 @@ class QueryPredicateTests: XCTestCase {
]
)
XCTAssert(predicate == expected)

let predicateString = String(data: try! encoder.encode(predicate), encoding: .utf8)!
let expectedString = String(data: try! encoder.encode(expected), encoding: .utf8)!
XCTAssert(predicateString == expectedString)
}

/// it should verify that predicates created using functions match their operators
Expand Down Expand Up @@ -144,6 +163,9 @@ class QueryPredicateTests: XCTestCase {
&& !(post.updatedAt == nil)

XCTAssertEqual(funcationPredicate, operatorPredicate)

let funcationPredicateString = String(data: try! encoder.encode(funcationPredicate), encoding: .utf8)!
let operatorPredicateString = String(data: try! encoder.encode(operatorPredicate), encoding: .utf8)!
XCTAssert(funcationPredicateString == operatorPredicateString)
}

}

0 comments on commit 3f8504c

Please sign in to comment.