Skip to content

Commit

Permalink
fix: added unit test for sync pagelimit
Browse files Browse the repository at this point in the history
  • Loading branch information
murali-shris committed Dec 17, 2024
1 parent bf3c307 commit 22a1e4c
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,17 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
: null;
// Get entries to sync
Iterator<MapEntry<String, CommitEntry>> commitEntryIterator;
if (syncLimit != null) {
commitEntryIterator = atCommitLog!.getEntries(
int.parse(verbParams[AtConstants.fromCommitSequence]!) + 1,
regex: verbParams['regex'],
skipDeletesUntil: skipDeletesUntil,
limit: syncLimit);
} else {
// for backward compatibility if some old client sets isPaginated to false
// (syncLimit will not be set) in SyncVerbBuilder
commitEntryIterator = atCommitLog!.getEntries(
int.parse(verbParams[AtConstants.fromCommitSequence]!) + 1,
regex: verbParams['regex'],
skipDeletesUntil: skipDeletesUntil);
}
// if client doesn't pass syncLimit set the default value from server
syncLimit ??= AtSecondaryConfig.syncPageLimit;
commitEntryIterator = atCommitLog!.getEntries(
int.parse(verbParams[AtConstants.fromCommitSequence]!) + 1,
regex: verbParams['regex'],
skipDeletesUntil: skipDeletesUntil,
limit: syncLimit);

List<KeyStoreEntry> syncResponse = [];
await prepareResponse(capacity, syncResponse, commitEntryIterator,
await prepareResponse(
capacity, syncLimit, syncResponse, commitEntryIterator,
enrollmentId:
(atConnection.metaData as InboundConnectionMetadata).enrollmentId);

Expand All @@ -75,8 +69,11 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
/// 1. there is at least one item in [syncResponse], and the response length is greater than [desiredMaxSyncResponseLength], or
/// 2. there are [AtSecondaryConfig.syncPageLimit] items in the [syncResponse]
@visibleForTesting
Future<void> prepareResponse(int desiredMaxSyncResponseLength,
List<KeyStoreEntry> syncResponse, Iterator<dynamic> commitEntryIterator,
Future<void> prepareResponse(
int desiredMaxSyncResponseLength,
int syncPageLimit,
List<KeyStoreEntry> syncResponse,
Iterator<dynamic> commitEntryIterator,
{String? enrollmentId}) async {
int currentResponseLength = 0;
Map<String, String> enrolledNamespaces = {};
Expand All @@ -87,8 +84,8 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
.get(enrollmentId))
.namespaces;
}
while (commitEntryIterator.moveNext() &&
syncResponse.length < AtSecondaryConfig.syncPageLimit) {
while (
commitEntryIterator.moveNext() && syncResponse.length < syncPageLimit) {
var atKeyType = AtKey.getKeyType(commitEntryIterator.current.key,
enforceNameSpace: false);
if (atKeyType == KeyType.invalidKey) {
Expand Down
159 changes: 145 additions & 14 deletions packages/at_secondary_server/test/sync_verb_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'package:at_chops/at_chops.dart';
import 'package:at_commons/at_commons.dart';
import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart';
import 'package:at_secondary/src/connection/inbound/inbound_connection_impl.dart';
import 'package:at_secondary/src/server/at_secondary_config.dart';
import 'package:at_secondary/src/server/at_secondary_impl.dart';
import 'package:at_secondary/src/utils/handler_util.dart';
import 'package:at_secondary/src/utils/secondary_util.dart';
Expand Down Expand Up @@ -161,8 +162,8 @@ void main() {
assert(atCommitLog.entriesCount() > 0);

List<KeyStoreEntry> syncResponse = [];
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key_alpha@alice');
});
Expand Down Expand Up @@ -192,19 +193,19 @@ void main() {

// Since syncResponse already has an entry, and the 'capacity' is 0, then the next entry
// should not be added to the syncResponse
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse, [entry]);

syncResponse.clear();
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key_alpha@alice');

syncResponse.clear();
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(1));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(1));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key2_beta@alice');
});
Expand Down Expand Up @@ -236,7 +237,10 @@ void main() {
syncResponse.add(entry);

await verbHandler.prepareResponse(
10 * 1024 * 1024, syncResponse, atCommitLog.getEntries(0));
10 * 1024 * 1024,
AtSecondaryConfig.syncPageLimit,
syncResponse,
atCommitLog.getEntries(0));

// Expecting that all the entries in the commitLog have been
// added to syncResponse
Expand All @@ -262,24 +266,151 @@ void main() {
assert(atCommitLog.entriesCount() == 2);

List<KeyStoreEntry> syncResponse = [];
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key1@alice');

syncResponse.clear();
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(1));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(1));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key2@alice');

// test with empty iterator
syncResponse.clear();
await verbHandler.prepareResponse(
10 * 1024 * 1024, syncResponse, atCommitLog.getEntries(2));
10 * 1024 * 1024,
AtSecondaryConfig.syncPageLimit,
syncResponse,
atCommitLog.getEntries(2));
expect(syncResponse.length, 0);
});

test(
'A test to verify sync returns default number of entries when limit is not passed',
() async {
// Add data to commit log
var atCommitLog =
await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice');
await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE);
//Add data to keystore
var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance()
.getSecondaryPersistenceStore('@alice');
var metadata = (AtMetaData()
..ttl = 10000
..ttb = 1000
..ttr = 100
..isBinary = false
..encoding = 'base64'
..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name)
..pubKeyCS = 'dummy_pub_key_cs');
for (int i = 1; i <= 40; i++) {
await secondaryKeyStore?.getSecondaryKeyStore()?.put(
'random_$i.wavi@alice',
AtData()
..data = i.toString()
..metaData = metadata);
}

verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore());
var response = Response();
var verbParams = HashMap<String, String>();
verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0');
var inBoundSessionId = '123';
var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId);
await verbHandler.processVerb(response, verbParams, atConnection);

var syncResponseList = jsonDecode(response.data!);
expect(syncResponseList.length, 25);
for (int i = 0; i < syncResponseList.length; i++) {
expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice');
}
});

test(
'A test to verify sync returns correct number of entries when limit (less than default size) is passed',
() async {
// Add data to commit log
var atCommitLog =
await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice');
await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE);
//Add data to keystore
var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance()
.getSecondaryPersistenceStore('@alice');
var metadata = (AtMetaData()
..ttl = 10000
..ttb = 1000
..ttr = 100
..isBinary = false
..encoding = 'base64'
..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name)
..pubKeyCS = 'dummy_pub_key_cs');
for (int i = 1; i <= 40; i++) {
await secondaryKeyStore?.getSecondaryKeyStore()?.put(
'random_$i.wavi@alice',
AtData()
..data = i.toString()
..metaData = metadata);
}

verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore());
var response = Response();
var verbParams = HashMap<String, String>();
verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0');
verbParams.putIfAbsent(AtConstants.syncLimit, () => '12');
var inBoundSessionId = '123';
var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId);
await verbHandler.processVerb(response, verbParams, atConnection);

var syncResponseList = jsonDecode(response.data!);
expect(syncResponseList.length, 12);
for (int i = 0; i < syncResponseList.length; i++) {
expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice');
}
});
test(
'A test to verify sync returns correct number of entries when limit (greater than default size) is passed',
() async {
// Add data to commit log
var atCommitLog =
await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice');
await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE);
//Add data to keystore
var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance()
.getSecondaryPersistenceStore('@alice');
var metadata = (AtMetaData()
..ttl = 10000
..ttb = 1000
..ttr = 100
..isBinary = false
..encoding = 'base64'
..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name)
..pubKeyCS = 'dummy_pub_key_cs');
for (int i = 1; i <= 40; i++) {
await secondaryKeyStore?.getSecondaryKeyStore()?.put(
'random_$i.wavi@alice',
AtData()
..data = i.toString()
..metaData = metadata);
}

verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore());
var response = Response();
var verbParams = HashMap<String, String>();
verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0');
verbParams.putIfAbsent(AtConstants.syncLimit, () => '35');
var inBoundSessionId = '123';
var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId);
await verbHandler.processVerb(response, verbParams, atConnection);

var syncResponseList = jsonDecode(response.data!);
expect(syncResponseList.length, 35);
for (int i = 0; i < syncResponseList.length; i++) {
expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice');
}
});

tearDown(() async => await tearDownFunc());
});
}
Expand Down

0 comments on commit 22a1e4c

Please sign in to comment.