diff --git a/packages/at_secondary_server/lib/src/verb/handler/sync_progressive_verb_handler.dart b/packages/at_secondary_server/lib/src/verb/handler/sync_progressive_verb_handler.dart index d1654948e..08afdabeb 100644 --- a/packages/at_secondary_server/lib/src/verb/handler/sync_progressive_verb_handler.dart +++ b/packages/at_secondary_server/lib/src/verb/handler/sync_progressive_verb_handler.dart @@ -48,23 +48,17 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler { : null; // Get entries to sync Iterator> commitEntryIterator; - if (syncLimit != null) { - commitEntryIterator = atCommitLog!.getEntries( - int.parse(verbParams[AtConstants.fromCommitSequence]!) + 1, - regex: verbParams['regex'], - skipDeletesUntil: skipDeletesUntil, - limit: syncLimit); - } else { - // for backward compatibility if some old client sets isPaginated to false - // (syncLimit will not be set) in SyncVerbBuilder - commitEntryIterator = atCommitLog!.getEntries( - int.parse(verbParams[AtConstants.fromCommitSequence]!) + 1, - regex: verbParams['regex'], - skipDeletesUntil: skipDeletesUntil); - } + // if client doesn't pass syncLimit set the default value from server + syncLimit ??= AtSecondaryConfig.syncPageLimit; + commitEntryIterator = atCommitLog!.getEntries( + int.parse(verbParams[AtConstants.fromCommitSequence]!) + 1, + regex: verbParams['regex'], + skipDeletesUntil: skipDeletesUntil, + limit: syncLimit); List syncResponse = []; - await prepareResponse(capacity, syncResponse, commitEntryIterator, + await prepareResponse( + capacity, syncLimit, syncResponse, commitEntryIterator, enrollmentId: (atConnection.metaData as InboundConnectionMetadata).enrollmentId); @@ -75,8 +69,11 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler { /// 1. there is at least one item in [syncResponse], and the response length is greater than [desiredMaxSyncResponseLength], or /// 2. there are [AtSecondaryConfig.syncPageLimit] items in the [syncResponse] @visibleForTesting - Future prepareResponse(int desiredMaxSyncResponseLength, - List syncResponse, Iterator commitEntryIterator, + Future prepareResponse( + int desiredMaxSyncResponseLength, + int syncPageLimit, + List syncResponse, + Iterator commitEntryIterator, {String? enrollmentId}) async { int currentResponseLength = 0; Map enrolledNamespaces = {}; @@ -87,8 +84,8 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler { .get(enrollmentId)) .namespaces; } - while (commitEntryIterator.moveNext() && - syncResponse.length < AtSecondaryConfig.syncPageLimit) { + while ( + commitEntryIterator.moveNext() && syncResponse.length < syncPageLimit) { var atKeyType = AtKey.getKeyType(commitEntryIterator.current.key, enforceNameSpace: false); if (atKeyType == KeyType.invalidKey) { diff --git a/packages/at_secondary_server/test/sync_verb_test.dart b/packages/at_secondary_server/test/sync_verb_test.dart index 0b1e9fc9f..57ac9bb0a 100644 --- a/packages/at_secondary_server/test/sync_verb_test.dart +++ b/packages/at_secondary_server/test/sync_verb_test.dart @@ -6,6 +6,7 @@ import 'package:at_chops/at_chops.dart'; import 'package:at_commons/at_commons.dart'; import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart'; import 'package:at_secondary/src/connection/inbound/inbound_connection_impl.dart'; +import 'package:at_secondary/src/server/at_secondary_config.dart'; import 'package:at_secondary/src/server/at_secondary_impl.dart'; import 'package:at_secondary/src/utils/handler_util.dart'; import 'package:at_secondary/src/utils/secondary_util.dart'; @@ -161,8 +162,8 @@ void main() { assert(atCommitLog.entriesCount() > 0); List syncResponse = []; - await verbHandler.prepareResponse( - 0, syncResponse, atCommitLog.getEntries(0)); + await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit, + syncResponse, atCommitLog.getEntries(0)); expect(syncResponse.length, 1); expect(syncResponse[0].key, 'test_key_alpha@alice'); }); @@ -192,19 +193,19 @@ void main() { // Since syncResponse already has an entry, and the 'capacity' is 0, then the next entry // should not be added to the syncResponse - await verbHandler.prepareResponse( - 0, syncResponse, atCommitLog.getEntries(0)); + await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit, + syncResponse, atCommitLog.getEntries(0)); expect(syncResponse, [entry]); syncResponse.clear(); - await verbHandler.prepareResponse( - 0, syncResponse, atCommitLog.getEntries(0)); + await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit, + syncResponse, atCommitLog.getEntries(0)); expect(syncResponse.length, 1); expect(syncResponse[0].key, 'test_key_alpha@alice'); syncResponse.clear(); - await verbHandler.prepareResponse( - 0, syncResponse, atCommitLog.getEntries(1)); + await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit, + syncResponse, atCommitLog.getEntries(1)); expect(syncResponse.length, 1); expect(syncResponse[0].key, 'test_key2_beta@alice'); }); @@ -236,7 +237,10 @@ void main() { syncResponse.add(entry); await verbHandler.prepareResponse( - 10 * 1024 * 1024, syncResponse, atCommitLog.getEntries(0)); + 10 * 1024 * 1024, + AtSecondaryConfig.syncPageLimit, + syncResponse, + atCommitLog.getEntries(0)); // Expecting that all the entries in the commitLog have been // added to syncResponse @@ -262,24 +266,151 @@ void main() { assert(atCommitLog.entriesCount() == 2); List syncResponse = []; - await verbHandler.prepareResponse( - 0, syncResponse, atCommitLog.getEntries(0)); + await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit, + syncResponse, atCommitLog.getEntries(0)); expect(syncResponse.length, 1); expect(syncResponse[0].key, 'test_key1@alice'); syncResponse.clear(); - await verbHandler.prepareResponse( - 0, syncResponse, atCommitLog.getEntries(1)); + await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit, + syncResponse, atCommitLog.getEntries(1)); expect(syncResponse.length, 1); expect(syncResponse[0].key, 'test_key2@alice'); // test with empty iterator syncResponse.clear(); await verbHandler.prepareResponse( - 10 * 1024 * 1024, syncResponse, atCommitLog.getEntries(2)); + 10 * 1024 * 1024, + AtSecondaryConfig.syncPageLimit, + syncResponse, + atCommitLog.getEntries(2)); expect(syncResponse.length, 0); }); + test( + 'A test to verify sync returns default number of entries when limit is not passed', + () async { + // Add data to commit log + var atCommitLog = + await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice'); + await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE); + //Add data to keystore + var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance() + .getSecondaryPersistenceStore('@alice'); + var metadata = (AtMetaData() + ..ttl = 10000 + ..ttb = 1000 + ..ttr = 100 + ..isBinary = false + ..encoding = 'base64' + ..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name) + ..pubKeyCS = 'dummy_pub_key_cs'); + for (int i = 1; i <= 40; i++) { + await secondaryKeyStore?.getSecondaryKeyStore()?.put( + 'random_$i.wavi@alice', + AtData() + ..data = i.toString() + ..metaData = metadata); + } + + verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore()); + var response = Response(); + var verbParams = HashMap(); + verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0'); + var inBoundSessionId = '123'; + var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId); + await verbHandler.processVerb(response, verbParams, atConnection); + + var syncResponseList = jsonDecode(response.data!); + expect(syncResponseList.length, 25); + for (int i = 0; i < syncResponseList.length; i++) { + expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice'); + } + }); + + test( + 'A test to verify sync returns correct number of entries when limit (less than default size) is passed', + () async { + // Add data to commit log + var atCommitLog = + await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice'); + await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE); + //Add data to keystore + var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance() + .getSecondaryPersistenceStore('@alice'); + var metadata = (AtMetaData() + ..ttl = 10000 + ..ttb = 1000 + ..ttr = 100 + ..isBinary = false + ..encoding = 'base64' + ..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name) + ..pubKeyCS = 'dummy_pub_key_cs'); + for (int i = 1; i <= 40; i++) { + await secondaryKeyStore?.getSecondaryKeyStore()?.put( + 'random_$i.wavi@alice', + AtData() + ..data = i.toString() + ..metaData = metadata); + } + + verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore()); + var response = Response(); + var verbParams = HashMap(); + verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0'); + verbParams.putIfAbsent(AtConstants.syncLimit, () => '12'); + var inBoundSessionId = '123'; + var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId); + await verbHandler.processVerb(response, verbParams, atConnection); + + var syncResponseList = jsonDecode(response.data!); + expect(syncResponseList.length, 12); + for (int i = 0; i < syncResponseList.length; i++) { + expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice'); + } + }); + test( + 'A test to verify sync returns correct number of entries when limit (greater than default size) is passed', + () async { + // Add data to commit log + var atCommitLog = + await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice'); + await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE); + //Add data to keystore + var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance() + .getSecondaryPersistenceStore('@alice'); + var metadata = (AtMetaData() + ..ttl = 10000 + ..ttb = 1000 + ..ttr = 100 + ..isBinary = false + ..encoding = 'base64' + ..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name) + ..pubKeyCS = 'dummy_pub_key_cs'); + for (int i = 1; i <= 40; i++) { + await secondaryKeyStore?.getSecondaryKeyStore()?.put( + 'random_$i.wavi@alice', + AtData() + ..data = i.toString() + ..metaData = metadata); + } + + verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore()); + var response = Response(); + var verbParams = HashMap(); + verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0'); + verbParams.putIfAbsent(AtConstants.syncLimit, () => '35'); + var inBoundSessionId = '123'; + var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId); + await verbHandler.processVerb(response, verbParams, atConnection); + + var syncResponseList = jsonDecode(response.data!); + expect(syncResponseList.length, 35); + for (int i = 0; i < syncResponseList.length; i++) { + expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice'); + } + }); + tearDown(() async => await tearDownFunc()); }); }