Skip to content

Commit

Permalink
Merge pull request #1569 from atsign-foundation/feat_apkam_sync
Browse files Browse the repository at this point in the history
feat: APKAM sync changes
  • Loading branch information
gkc authored Oct 16, 2023
2 parents 4a3cd47 + d1e34b5 commit e4ec42c
Show file tree
Hide file tree
Showing 9 changed files with 425 additions and 31 deletions.
1 change: 1 addition & 0 deletions packages/at_secondary_server/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
- fix: Implement notify ephemeral changes - Send notification with value without caching the key on receiver's secondary server
- feat: Implement AtRateLimiter to limit the enrollment requests on a particular connection
- fix: Upgraded at_commons to 3.0.56
- fix: Modify sync_progressive_verb_handler to filter responses on enrolled namespaces if authenticated via APKAM
## 3.0.35
- chore: Upgraded at_persistence_secondary_server to 3.0.57 for memory optimization in commit log
- feat: APKAM keys verb implementation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import 'dart:convert';

import 'package:at_commons/at_commons.dart';
import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart';
import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart';
import 'package:at_secondary/src/constants/enroll_constants.dart';
import 'package:at_secondary/src/server/at_secondary_impl.dart';
import 'package:at_secondary/src/verb/handler/abstract_verb_handler.dart';
import 'package:at_secondary/src/verb/metrics/metrics_impl.dart';
import 'package:at_secondary/src/verb/metrics/metrics_provider.dart';
Expand Down Expand Up @@ -108,12 +111,21 @@ class StatsVerbHandler extends AbstractVerbHandler {
return stats;
}

Future<void> addStatToResult(id, result) async {
Future<void> addStatToResult(
id, result, List<String> enrolledNamespaces) async {
logger.info('addStatToResult for id : $id, regex: $_regex');
var metric = _getMetrics(id);
var name = metric.name!.getName();
dynamic value;
if ((id == '3' || id == '15') && _regex != null) {
if (id == '3') {
if (_regex == null || _regex.isEmpty) {
_regex = '.*';
}
// When connection is authenticated via the APKAM, return the highest commit-Id
// among the specified namespaces.
value = await (metric.name as LastCommitIDMetricImpl)
.getMetrics(regex: _regex, enrolledNamespaces: enrolledNamespaces);
} else if (id == '15' && _regex != null) {
value = await metric.name!.getMetrics(regex: _regex);
} else {
value = await metric.name!.getMetrics();
Expand All @@ -131,8 +143,8 @@ class StatsVerbHandler extends AbstractVerbHandler {
HashMap<String, String?> verbParams,
InboundConnection atConnection) async {
try {
var statID = verbParams[AT_STAT_ID];
_regex = verbParams[AT_REGEX];
var statID = verbParams[AtConstants.statId];
_regex = verbParams[AtConstants.regex];
logger.finer('In statsVerbHandler statID : $statID, regex : $_regex');
Set statsList;
if (statID != null) {
Expand All @@ -143,9 +155,22 @@ class StatsVerbHandler extends AbstractVerbHandler {
statsList = statsMap.keys.toSet();
}
var result = [];
List<String> enrolledNamespaces = [];
if ((atConnection.getMetaData() as InboundConnectionMetadata)
.enrollmentId !=
null) {
var enrollmentKey =
'${(atConnection.getMetaData() as InboundConnectionMetadata).enrollmentId}.$newEnrollmentKeyPattern.$enrollManageNamespace${AtSecondaryServerImpl.getInstance().currentAtSign}';
enrolledNamespaces = (await getEnrollDataStoreValue(enrollmentKey))
.namespaces
.keys
.toList();
}
//Iterate through stats_id_list
await Future.forEach(
statsList, (dynamic element) => addStatToResult(element, result));
statsList,
(dynamic element) =>
addStatToResult(element, result, enrolledNamespaces));
// Create response json
var responseJson = result.toString();
response.data = responseJson;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import 'dart:convert';

import 'package:at_commons/at_commons.dart';
import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart';
import 'package:at_secondary/src/connection/inbound/inbound_connection_metadata.dart';
import 'package:at_secondary/src/constants/enroll_constants.dart';
import 'package:at_secondary/src/server/at_secondary_config.dart';
import 'package:at_secondary/src/server/at_secondary_impl.dart';
import 'package:at_secondary/src/verb/handler/abstract_verb_handler.dart';
Expand Down Expand Up @@ -44,7 +46,9 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
regex: verbParams['regex']);

List<KeyStoreEntry> syncResponse = [];
await prepareResponse(capacity, syncResponse, commitEntryIterator);
await prepareResponse(capacity, syncResponse, commitEntryIterator,
enrollmentId: (atConnection.getMetaData() as InboundConnectionMetadata)
.enrollmentId);

response.data = jsonEncode(syncResponse);
}
Expand All @@ -53,11 +57,18 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
/// 1. there is at least one item in [syncResponse], and the response length is greater than [desiredMaxSyncResponseLength], or
/// 2. there are [AtSecondaryConfig.syncPageLimit] items in the [syncResponse]
@visibleForTesting
Future<void> prepareResponse(
int desiredMaxSyncResponseLength,
List<KeyStoreEntry> syncResponse,
Iterator<dynamic> commitEntryIterator) async {
Future<void> prepareResponse(int desiredMaxSyncResponseLength,
List<KeyStoreEntry> syncResponse, Iterator<dynamic> commitEntryIterator,
{String? enrollmentId}) async {
int currentResponseLength = 0;
Map<String, String> enrolledNamespaces = {};

if (enrollmentId != null && enrollmentId.isNotEmpty) {
String enrollmentKey =
'$enrollmentId.$newEnrollmentKeyPattern.$enrollManageNamespace${AtSecondaryServerImpl.getInstance().currentAtSign}';
enrolledNamespaces =
(await getEnrollDataStoreValue(enrollmentKey)).namespaces;
}

while (commitEntryIterator.moveNext() &&
syncResponse.length < AtSecondaryConfig.syncPageLimit) {
Expand All @@ -68,6 +79,15 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
'${commitEntryIterator.current.key} is an invalid key. Skipping from adding it to sync response');
continue;
}
String? keyNamespace =
AtKey.fromString(commitEntryIterator.current.key!).namespace;
if ((keyNamespace != null && keyNamespace.isNotEmpty) &&
enrolledNamespaces.isNotEmpty &&
(!enrolledNamespaces.containsKey(allNamespaces) &&
!enrolledNamespaces.containsKey(enrollManageNamespace) &&
!enrolledNamespaces.containsKey(keyNamespace))) {
continue;
}
var keyStoreEntry = KeyStoreEntry();
keyStoreEntry.key = commitEntryIterator.current.key;
keyStoreEntry.commitId = commitEntryIterator.current.value.commitId;
Expand All @@ -80,6 +100,7 @@ class SyncProgressiveVerbHandler extends AbstractVerbHandler {
'${commitEntryIterator.current.key} does not exist in the keystore. skipping the key to sync');
continue;
}

var atData = await keyStore.get(commitEntryIterator.current.key);
if (atData == null) {
logger.info('atData is null for ${commitEntryIterator.current.key}');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class OutBoundMetricImpl implements MetricProvider {
class LastCommitIDMetricImpl implements MetricProvider {
static final LastCommitIDMetricImpl _singleton =
LastCommitIDMetricImpl._internal();
var _atCommitLog;
AtCommitLog? _atCommitLog;

set atCommitLog(value) {
_atCommitLog = value;
Expand All @@ -70,15 +70,16 @@ class LastCommitIDMetricImpl implements MetricProvider {
}

@override
Future<String> getMetrics({String? regex}) async {
Future<String> getMetrics({String? regex, List<String>? enrolledNamespaces}) async {
logger.finer('In commitID getMetrics...regex : $regex');
var lastCommitID;
if (regex != null) {
if (regex != null || enrolledNamespaces != null) {
regex ??= '.*';
lastCommitID =
await _atCommitLog.lastCommittedSequenceNumberWithRegex(regex);
await _atCommitLog?.lastCommittedSequenceNumberWithRegex(regex, enrolledNamespace: enrolledNamespaces);
return lastCommitID.toString();
}
lastCommitID = _atCommitLog.lastCommittedSequenceNumber().toString();
lastCommitID = _atCommitLog?.lastCommittedSequenceNumber().toString();
return lastCommitID;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/at_secondary_server/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies:
at_lookup: 3.0.40
at_server_spec: 3.0.15
at_persistence_spec: 2.0.14
at_persistence_secondary_server: 3.0.57
at_persistence_secondary_server: 3.0.58
expire_cache: ^2.0.1
intl: ^0.18.1
json_annotation: ^4.8.0
Expand Down
95 changes: 91 additions & 4 deletions packages/at_secondary_server/test/stats_verb_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ void main() {
group('A group of notificationStats verb tests', () {
SecondaryKeyStoreManager? keyStoreManager;
setUp(() async => keyStoreManager = await setUpFunc(
'${Directory.current.path}/test/hive',
'${Directory.current.path}/unit_test_storage',
atsign: '@alice'));
// test for Notification Stats
test('notification stats command accept test', () {
Expand Down Expand Up @@ -288,7 +288,7 @@ void main() {
group('A group of commitLogCompactionStats verb tests', () {
SecondaryKeyStoreManager? keyStoreManager;
setUp(() async => keyStoreManager = await setUpFunc(
'${Directory.current.path}/test/hive',
'${Directory.current.path}/unit_test_storage',
atsign: '@alice'));

test('commitLogCompactionStats command accept test', () {
Expand Down Expand Up @@ -341,7 +341,7 @@ void main() {
group('A group of accessLogCompactionStats verb tests', () {
SecondaryKeyStoreManager? keyStoreManager;
setUp(() async => keyStoreManager = await setUpFunc(
'${Directory.current.path}/test/hive',
'${Directory.current.path}/unit_test_storage',
atsign: '@alice'));

test('accessLogCompactionStats command acceptance test', () {
Expand Down Expand Up @@ -387,7 +387,7 @@ void main() {
group('A group of notificationCompactionStats verb tests', () {
SecondaryKeyStoreManager? keyStoreManager;
setUp(() async => keyStoreManager = await setUpFunc(
'${Directory.current.path}/test/hive',
'${Directory.current.path}/unit_test_storage',
atsign: '@alice'));

test('notificationCompactionStats command accept test', () {
Expand Down Expand Up @@ -470,5 +470,92 @@ void main() {
(int.parse(lastCommitId) + 5));
expect(latestCommitIdMap['@alice:deletekey-$randomString@alice'][1], '-');
});

test(
'A test to verify latest commitId among enrolled namespaces is returned',
() async {
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:phone.wavi@alice', AtData()..data = '9848033443');
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:location.wavi@alice', AtData()..data = 'Hyderabad');
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:mobile.buzz@alice', AtData()..data = '9848033444');

LastCommitIDMetricImpl.getInstance().atCommitLog =
secondaryPersistenceStore!.getSecondaryKeyStore()!.commitLog;
var lastCommitId = await LastCommitIDMetricImpl.getInstance()
.getMetrics(enrolledNamespaces: ['wavi']);
expect(lastCommitId, '1');
});

test(
'A test to verify highest commitId among the authorized namespaces is returned',
() async {
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:phone.wavi@alice', AtData()..data = '9848033443');
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:location.wavi@alice', AtData()..data = 'Hyderabad');
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:mobile.buzz@alice', AtData()..data = '9848033444');
await secondaryPersistenceStore!.getSecondaryKeyStore()!.put(
'@alice:contact.atmosphere@alice', AtData()..data = '9848033444');

LastCommitIDMetricImpl.getInstance().atCommitLog =
secondaryPersistenceStore!.getSecondaryKeyStore()!.commitLog;
var lastCommitId = await LastCommitIDMetricImpl.getInstance()
.getMetrics(enrolledNamespaces: ['wavi', 'buzz']);
expect(lastCommitId, '2');
});

test(
'A test to verify latestCommitId is returned when enrolledNamespace and regex are not supplied',
() async {
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:phone.wavi@alice', AtData()..data = '9848033443');
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:location.wavi@alice', AtData()..data = 'Hyderabad');
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:mobile.buzz@alice', AtData()..data = '9848033444');
await secondaryPersistenceStore!.getSecondaryKeyStore()!.put(
'@alice:contact.atmosphere@alice', AtData()..data = '9848033444');

LastCommitIDMetricImpl.getInstance().atCommitLog =
secondaryPersistenceStore!.getSecondaryKeyStore()!.commitLog;
var lastCommitId =
await LastCommitIDMetricImpl.getInstance().getMetrics();
expect(lastCommitId, '3');
});

test(
'A test to verify latestCommitId is returned when only regex is not supplied',
() async {
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:phone.wavi@alice', AtData()..data = '9848033443');
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:location.wavi@alice', AtData()..data = 'Hyderabad');
await secondaryPersistenceStore!
.getSecondaryKeyStore()!
.put('@alice:mobile.buzz@alice', AtData()..data = '9848033444');
await secondaryPersistenceStore!.getSecondaryKeyStore()!.put(
'@alice:contact.atmosphere@alice', AtData()..data = '9848033444');

LastCommitIDMetricImpl.getInstance().atCommitLog =
secondaryPersistenceStore!.getSecondaryKeyStore()!.commitLog;
var lastCommitId =
await LastCommitIDMetricImpl.getInstance().getMetrics(regex: 'buzz');
expect(lastCommitId, '2');
});
tearDown(() async => await verbTestsTearDown());
});
}
Loading

0 comments on commit e4ec42c

Please sign in to comment.