Skip to content

Commit

Permalink
Merge pull request #2182 from atsign-foundation/sync_verb_handler_add…
Browse files Browse the repository at this point in the history
…_limit_param

fix: Sync verb handler add limit param
  • Loading branch information
gkc authored Dec 17, 2024
2 parents 52c38fe + 7200277 commit a5fb9b5
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 26 deletions.
3 changes: 3 additions & 0 deletions packages/at_secondary_server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# 3.1.1
- fix: Store "publicKeyHash" value in the keystore
- fix: add limit param in SyncProgressiveVerbHandler
- build[deps]: Upgraded the following package:
at_commons to v5.1.2
# 3.1.0
- feat: sync skip deletes until changes
- fix: Enable persistence of the Initialization Vector for "defaultEncryptionPrivateKey" and "selfEncryptionKey" in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,25 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
// Get Commit Log Instance.
var atCommitLog = await (AtCommitLogManagerImpl.getInstance()
.getCommitLog(AtSecondaryServerImpl.getInstance().currentAtSign));
int? skipDeletesUntil = verbParams['skipDeletesUntil'] != null
? int.parse(verbParams['skipDeletesUntil']!)
int? skipDeletesUntil = verbParams[AtConstants.skipDeletesUntil] != null
? int.parse(verbParams[AtConstants.skipDeletesUntil]!)
: null;
int? syncLimit = verbParams[AtConstants.syncLimit] != null
? int.parse(verbParams[AtConstants.syncLimit]!)
: null;
// Get entries to sync
var commitEntryIterator = atCommitLog!.getEntries(
Iterator<MapEntry<String, CommitEntry>> commitEntryIterator;
// if client doesn't pass syncLimit set the default value from server
syncLimit ??= AtSecondaryConfig.syncPageLimit;
commitEntryIterator = atCommitLog!.getEntries(
int.parse(verbParams[AtConstants.fromCommitSequence]!) + 1,
regex: verbParams['regex'],
skipDeletesUntil: skipDeletesUntil);
skipDeletesUntil: skipDeletesUntil,
limit: syncLimit);

List<KeyStoreEntry> syncResponse = [];
await prepareResponse(capacity, syncResponse, commitEntryIterator,
await prepareResponse(
capacity, syncLimit, syncResponse, commitEntryIterator,
enrollmentId:
(atConnection.metaData as InboundConnectionMetadata).enrollmentId);

Expand All @@ -61,8 +69,11 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
/// 1. there is at least one item in [syncResponse], and the response length is greater than [desiredMaxSyncResponseLength], or
/// 2. there are [AtSecondaryConfig.syncPageLimit] items in the [syncResponse]
@visibleForTesting
Future<void> prepareResponse(int desiredMaxSyncResponseLength,
List<KeyStoreEntry> syncResponse, Iterator<dynamic> commitEntryIterator,
Future<void> prepareResponse(
int desiredMaxSyncResponseLength,
int syncPageLimit,
List<KeyStoreEntry> syncResponse,
Iterator<dynamic> commitEntryIterator,
{String? enrollmentId}) async {
int currentResponseLength = 0;
Map<String, String> enrolledNamespaces = {};
Expand All @@ -73,8 +84,8 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
.get(enrollmentId))
.namespaces;
}
while (commitEntryIterator.moveNext() &&
syncResponse.length < AtSecondaryConfig.syncPageLimit) {
while (
commitEntryIterator.moveNext() && syncResponse.length < syncPageLimit) {
var atKeyType = AtKey.getKeyType(commitEntryIterator.current.key,
enforceNameSpace: false);
if (atKeyType == KeyType.invalidKey) {
Expand Down
2 changes: 1 addition & 1 deletion packages/at_secondary_server/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies:
basic_utils: 5.7.0
ecdsa: 0.1.0
encrypt: 5.0.3
at_commons: 5.1.1
at_commons: 5.1.2
at_utils: 3.0.19
at_chops: 2.2.0
at_lookup: 3.0.49
Expand Down
159 changes: 145 additions & 14 deletions packages/at_secondary_server/test/sync_verb_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'package:at_chops/at_chops.dart';
import 'package:at_commons/at_commons.dart';
import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart';
import 'package:at_secondary/src/connection/inbound/inbound_connection_impl.dart';
import 'package:at_secondary/src/server/at_secondary_config.dart';
import 'package:at_secondary/src/server/at_secondary_impl.dart';
import 'package:at_secondary/src/utils/handler_util.dart';
import 'package:at_secondary/src/utils/secondary_util.dart';
Expand Down Expand Up @@ -161,8 +162,8 @@ void main() {
assert(atCommitLog.entriesCount() > 0);

List<KeyStoreEntry> syncResponse = [];
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key_alpha@alice');
});
Expand Down Expand Up @@ -192,19 +193,19 @@ void main() {

// Since syncResponse already has an entry, and the 'capacity' is 0, then the next entry
// should not be added to the syncResponse
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse, [entry]);

syncResponse.clear();
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key_alpha@alice');

syncResponse.clear();
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(1));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(1));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key2_beta@alice');
});
Expand Down Expand Up @@ -236,7 +237,10 @@ void main() {
syncResponse.add(entry);

await verbHandler.prepareResponse(
10 * 1024 * 1024, syncResponse, atCommitLog.getEntries(0));
10 * 1024 * 1024,
AtSecondaryConfig.syncPageLimit,
syncResponse,
atCommitLog.getEntries(0));

// Expecting that all the entries in the commitLog have been
// added to syncResponse
Expand All @@ -262,24 +266,151 @@ void main() {
assert(atCommitLog.entriesCount() == 2);

List<KeyStoreEntry> syncResponse = [];
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(0));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(0));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key1@alice');

syncResponse.clear();
await verbHandler.prepareResponse(
0, syncResponse, atCommitLog.getEntries(1));
await verbHandler.prepareResponse(0, AtSecondaryConfig.syncPageLimit,
syncResponse, atCommitLog.getEntries(1));
expect(syncResponse.length, 1);
expect(syncResponse[0].key, 'test_key2@alice');

// test with empty iterator
syncResponse.clear();
await verbHandler.prepareResponse(
10 * 1024 * 1024, syncResponse, atCommitLog.getEntries(2));
10 * 1024 * 1024,
AtSecondaryConfig.syncPageLimit,
syncResponse,
atCommitLog.getEntries(2));
expect(syncResponse.length, 0);
});

test(
'A test to verify sync returns default number of entries when limit is not passed',
() async {
// Add data to commit log
var atCommitLog =
await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice');
await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE);
//Add data to keystore
var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance()
.getSecondaryPersistenceStore('@alice');
var metadata = (AtMetaData()
..ttl = 10000
..ttb = 1000
..ttr = 100
..isBinary = false
..encoding = 'base64'
..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name)
..pubKeyCS = 'dummy_pub_key_cs');
for (int i = 1; i <= 40; i++) {
await secondaryKeyStore?.getSecondaryKeyStore()?.put(
'random_$i.wavi@alice',
AtData()
..data = i.toString()
..metaData = metadata);
}

verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore());
var response = Response();
var verbParams = HashMap<String, String>();
verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0');
var inBoundSessionId = '123';
var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId);
await verbHandler.processVerb(response, verbParams, atConnection);

var syncResponseList = jsonDecode(response.data!);
expect(syncResponseList.length, 25);
for (int i = 0; i < syncResponseList.length; i++) {
expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice');
}
});

test(
'A test to verify sync returns correct number of entries when limit (less than default size) is passed',
() async {
// Add data to commit log
var atCommitLog =
await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice');
await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE);
//Add data to keystore
var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance()
.getSecondaryPersistenceStore('@alice');
var metadata = (AtMetaData()
..ttl = 10000
..ttb = 1000
..ttr = 100
..isBinary = false
..encoding = 'base64'
..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name)
..pubKeyCS = 'dummy_pub_key_cs');
for (int i = 1; i <= 40; i++) {
await secondaryKeyStore?.getSecondaryKeyStore()?.put(
'random_$i.wavi@alice',
AtData()
..data = i.toString()
..metaData = metadata);
}

verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore());
var response = Response();
var verbParams = HashMap<String, String>();
verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0');
verbParams.putIfAbsent(AtConstants.syncLimit, () => '12');
var inBoundSessionId = '123';
var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId);
await verbHandler.processVerb(response, verbParams, atConnection);

var syncResponseList = jsonDecode(response.data!);
expect(syncResponseList.length, 12);
for (int i = 0; i < syncResponseList.length; i++) {
expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice');
}
});
test(
'A test to verify sync returns correct number of entries when limit (greater than default size) is passed',
() async {
// Add data to commit log
var atCommitLog =
await AtCommitLogManagerImpl.getInstance().getCommitLog('@alice');
await atCommitLog?.commit('phone.wavi@alice', CommitOp.UPDATE);
//Add data to keystore
var secondaryKeyStore = SecondaryPersistenceStoreFactory.getInstance()
.getSecondaryPersistenceStore('@alice');
var metadata = (AtMetaData()
..ttl = 10000
..ttb = 1000
..ttr = 100
..isBinary = false
..encoding = 'base64'
..pubKeyHash = PublicKeyHash('dummy_hash', HashingAlgoType.sha512.name)
..pubKeyCS = 'dummy_pub_key_cs');
for (int i = 1; i <= 40; i++) {
await secondaryKeyStore?.getSecondaryKeyStore()?.put(
'random_$i.wavi@alice',
AtData()
..data = i.toString()
..metaData = metadata);
}

verbHandler = SyncProgressiveVerbHandler(keyStoreManager.getKeyStore());
var response = Response();
var verbParams = HashMap<String, String>();
verbParams.putIfAbsent(AtConstants.fromCommitSequence, () => '0');
verbParams.putIfAbsent(AtConstants.syncLimit, () => '35');
var inBoundSessionId = '123';
var atConnection = InboundConnectionImpl(mockSocket, inBoundSessionId);
await verbHandler.processVerb(response, verbParams, atConnection);

var syncResponseList = jsonDecode(response.data!);
expect(syncResponseList.length, 35);
for (int i = 0; i < syncResponseList.length; i++) {
expect(syncResponseList[i]['atKey'], 'random_${i + 1}.wavi@alice');
}
});

tearDown(() async => await tearDownFunc());
});
}
Expand Down
2 changes: 1 addition & 1 deletion tests/at_end2end_test/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies:
encrypt: 5.0.3
at_demo_data: ^1.0.3
at_lookup: ^3.0.49
at_commons: ^5.1.0
at_commons: ^5.1.2

dev_dependencies:
lints: ^5.0.0
Expand Down
3 changes: 2 additions & 1 deletion tests/at_functional_test/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ dependencies:
at_demo_data: ^1.1.0
at_chops: ^2.2.0
at_lookup: ^3.0.49
at_commons: ^5.1.0
at_commons: ^5.1.2
uuid: ^3.0.7
elliptic: ^0.3.8


dev_dependencies:
lints: ^5.0.0
test: ^1.25.9
Expand Down

0 comments on commit a5fb9b5

Please sign in to comment.