diff --git a/packages/at_persistence_secondary_server/CHANGELOG.md b/packages/at_persistence_secondary_server/CHANGELOG.md index dfe06279d..c02bb5ff7 100644 --- a/packages/at_persistence_secondary_server/CHANGELOG.md +++ b/packages/at_persistence_secondary_server/CHANGELOG.md @@ -1,3 +1,5 @@ +## 3.0.61 +- feat: delete entries for expired keys are not committed to the commitLog [feature not enabled yet] ## 3.0.60 - build[deps]: Upgraded the following packages: - at_commons to v4.0.0 diff --git a/packages/at_persistence_secondary_server/lib/src/keystore/hive_keystore.dart b/packages/at_persistence_secondary_server/lib/src/keystore/hive_keystore.dart index 875e8d212..ed695982a 100644 --- a/packages/at_persistence_secondary_server/lib/src/keystore/hive_keystore.dart +++ b/packages/at_persistence_secondary_server/lib/src/keystore/hive_keystore.dart @@ -41,7 +41,6 @@ class HiveKeystore implements SecondaryKeyStore { } @Deprecated("Use [initialize]") - /// Deprecated. Use [initialize] Future init() async { await initialize(); @@ -341,7 +340,9 @@ class HiveKeystore implements SecondaryKeyStore { @override @server - Future deleteExpiredKeys() async { + @client + Future deleteExpiredKeys({bool? skipCommit = false}) async { + logger.finer('Removing expired keys'); bool result = true; try { List expiredKeys = await getExpiredKeys(); @@ -351,7 +352,9 @@ class HiveKeystore implements SecondaryKeyStore { for (String element in expiredKeys) { try { - await remove(element); + // delete entries for expired keys will not be added to the commitLog + // Removal of expired keys will be handled on the client side + await remove(element, skipCommit: skipCommit!); } on KeyNotFoundException { continue; } diff --git a/packages/at_persistence_secondary_server/lib/src/keystore/hive_manager.dart b/packages/at_persistence_secondary_server/lib/src/keystore/hive_manager.dart index be7e6f82b..ca5391338 100644 --- a/packages/at_persistence_secondary_server/lib/src/keystore/hive_manager.dart +++ b/packages/at_persistence_secondary_server/lib/src/keystore/hive_manager.dart @@ -113,14 +113,20 @@ class HivePersistenceManager with HiveBase { } //TODO change into to Duration and construct cron string dynamically - void scheduleKeyExpireTask(int runFrequencyMins) { + void scheduleKeyExpireTask(int? runFrequencyMins, {Duration? runTimeInterval, bool skipCommits = false}) { logger.finest('scheduleKeyExpireTask starting cron job.'); - _cron.schedule(Schedule.parse('*/$runFrequencyMins * * * *'), () async { + Schedule schedule; + if(runTimeInterval != null){ + schedule = Schedule(seconds: runTimeInterval.inSeconds); + } else { + schedule = Schedule.parse('*/$runFrequencyMins * * * *'); + } + _cron.schedule(schedule, () async { await Future.delayed(Duration(seconds: _random.nextInt(12))); var hiveKeyStore = SecondaryPersistenceStoreFactory.getInstance() .getSecondaryPersistenceStore(_atsign)! .getSecondaryKeyStore()!; - await hiveKeyStore.deleteExpiredKeys(); + await hiveKeyStore.deleteExpiredKeys(skipCommit: skipCommits); }); } diff --git a/packages/at_persistence_secondary_server/lib/src/notification/at_notification_keystore.dart b/packages/at_persistence_secondary_server/lib/src/notification/at_notification_keystore.dart index 1c828c8cf..bfcd2ef46 100644 --- a/packages/at_persistence_secondary_server/lib/src/notification/at_notification_keystore.dart +++ b/packages/at_persistence_secondary_server/lib/src/notification/at_notification_keystore.dart @@ -115,7 +115,8 @@ class AtNotificationKeystore var expiredKeys = await getExpiredKeys(); if (expiredKeys.isNotEmpty) { await Future.forEach(expiredKeys, (expiredKey) async { - await remove(expiredKey); + // Delete entries for expired keys will not be added to commitLog + await remove(expiredKey, skipCommit: true); }); } else { _logger.finest('notification key store. No expired notifications'); diff --git a/packages/at_persistence_secondary_server/pubspec.yaml b/packages/at_persistence_secondary_server/pubspec.yaml index ce94fd209..fae0eb773 100644 --- a/packages/at_persistence_secondary_server/pubspec.yaml +++ b/packages/at_persistence_secondary_server/pubspec.yaml @@ -1,6 +1,6 @@ name: at_persistence_secondary_server description: A Dart library with the implementation classes for the persistence layer of the secondary server. -version: 3.0.60 +version: 3.0.61 repository: https://github.com/atsign-foundation/at_server homepage: https://docs.atsign.com/ diff --git a/packages/at_persistence_secondary_server/test/hive_key_expiry_check.dart b/packages/at_persistence_secondary_server/test/hive_key_expiry_check.dart index a0615a47c..e39cf0f2f 100644 --- a/packages/at_persistence_secondary_server/test/hive_key_expiry_check.dart +++ b/packages/at_persistence_secondary_server/test/hive_key_expiry_check.dart @@ -1,38 +1,182 @@ import 'dart:io'; +import 'package:at_commons/at_commons.dart'; import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart'; +import 'package:at_persistence_secondary_server/src/keystore/hive_keystore.dart'; +import 'package:test/expect.dart'; +import 'package:test/scaffolding.dart'; void main() async { + var storageDir = '${Directory.current.path}/test/hive'; + String atsign = '@test_user_1'; + HiveKeystore? keyStore; + + group('test scenarios for expired keys - CASE: optimizeCommits set to TRUE', + () { + // verifies that deletion of expired keys does NOT create/update commit entries + setUp(() async { + var keyStoreManager = + await getKeystoreManager(storageDir, atsign, optimizeCommits: true); + keyStore = keyStoreManager.getKeyStore() as HiveKeystore?; + assert(keyStore != null); + }); + + test('fetch expired key returns throws exception', () async { + String key = '123$atsign'; + var atData = AtData()..data = 'abc'; + await keyStore?.put(key, atData, time_to_live: 5 * 1000); + var atDataResponse = await keyStore?.get(key); + assert(atDataResponse?.data == 'abc'); + stdout.writeln('Sleeping for 23s'); + await Future.delayed(Duration(seconds: 23)); + expect( + () async => getKey(keyStore, key), + throwsA(predicate((e) => + e.toString().contains('123$atsign does not exist in keystore')))); + }, timeout: Timeout(Duration(minutes: 1))); + + test('ensure expired keys deletion entry is not added to commitLog', + () async { + String key = 'no_commit_log_test$atsign'; + var atData = AtData()..data = 'randomDataString'; + await keyStore?.put(key, atData, time_to_live: 2000); + expect((await keyStore?.get(key))?.data, atData.data); + + await Future.delayed(Duration(seconds: 4)); + await keyStore?.deleteExpiredKeys(skipCommit: true); + // ensure that the key is expired + expect( + () async => await keyStore?.get(key), + throwsA(predicate((e) => e.toString().contains( + 'no_commit_log_test@test_user_1 does not exist in keystore')))); + AtCommitLog? commitLog = keyStore?.commitLog as AtCommitLog; + expect( + commitLog.getLatestCommitEntry(key)?.operation, CommitOp.UPDATE_ALL); + // the latest commit entry is one with an UPDATE_ALL op which indicates that + // the deleteExpiredKeys did not add a DELETE commitEntry to the commitLog + expect(commitLog.entriesCount(), 1); + }); + + test('manually deleted keys add a commitEntry to commitLog', () async { + AtCommitLog? commitLog = keyStore?.commitLog as AtCommitLog; + // -----------------insert key 1 that expires in 100ms + String key1 = 'no_commit_1$atsign'; + var atData = AtData()..data = 'randomDataString1'; + int? seqNum = await keyStore?.put(key1, atData, time_to_live: 100); + print(seqNum); + await Future.delayed(Duration(seconds: 1)); + await keyStore?.deleteExpiredKeys(skipCommit: true); + // ensure that the key is expired + expect(() async => await keyStore?.get(key1), + throwsA(predicate((p0) => p0 is KeyNotFoundException))); + expect( + commitLog.getLatestCommitEntry(key1)?.operation, CommitOp.UPDATE_ALL); + // ------------------insert key2 that is manually deleted + String key2 = 'no_commit_2$atsign'; + atData = AtData()..data = 'randomDataString2'; + seqNum = await keyStore?.put(key2, atData); + print(seqNum); + seqNum = await keyStore?.remove(key2); + // ensure that the second key does not exist in keystore + expect(() async => await keyStore?.get(key2), + throwsA(predicate((e) => e is KeyNotFoundException))); + expect(commitLog.getLatestCommitEntry(key2)?.operation, CommitOp.DELETE); + // the latest commitEntry for key2 has CommitOp.DELETE indicating that the commits are not being + // skipped for the keys that are not deleted as part of deleteExpiredKeys() + expect(keyStore?.commitLog?.entriesCount(), 2); + }); + + tearDown(() async => await tearDownFunc()); + }); + + group('test scenarios for expired keys - CASE: optimizeCommits set to FALSE', + () { + // verifies that deletion of expired keys creates/updates commit entries + setUp(() async { + var keyStoreManager = + await getKeystoreManager(storageDir, atsign, optimizeCommits: false); + keyStore = keyStoreManager.getKeyStore() as HiveKeystore?; + assert(keyStore != null); + }); + + test('ensure expired keys deletion entry is not added to commitLog', + () async { + AtCommitLog? commitLog = keyStore?.commitLog as AtCommitLog; + String key = 'commit_test$atsign'; + var atData = AtData()..data = 'randomDataString'; + await keyStore?.put(key, atData, time_to_live: 2000); + // ensure key is inserted + expect((await keyStore?.get(key))?.data, atData.data); + + await Future.delayed(Duration(seconds: 4)); + await keyStore?.deleteExpiredKeys(); + // ensure that the key is expired + expect( + () async => await keyStore?.get(key), + throwsA(predicate((e) => + e.toString().contains('$key does not exist in keystore')))); + + expect(commitLog.getLatestCommitEntry(key)?.operation, CommitOp.DELETE); + expect(commitLog.entriesCount(), 1); + }); + + test('manually deleted keys add a commitEntry to commitLog', () async { + AtCommitLog? commitLog = keyStore?.commitLog as AtCommitLog; + // -----------------insert key 1 that expires in 100ms + String key1 = 'no_commit_3$atsign'; + var atData = AtData()..data = 'randomDataString1'; + int? seqNum = await keyStore?.put(key1, atData, time_to_live: 100); + print(seqNum); + await Future.delayed(Duration(seconds: 1)); + await keyStore?.deleteExpiredKeys(); + // ensure that the key is expired + expect(() async => await keyStore?.get(key1), + throwsA(predicate((p0) => p0 is KeyNotFoundException))); + expect(commitLog.getLatestCommitEntry(key1)?.operation, CommitOp.DELETE); + // ------------------insert key2 that is manually deleted + String key2 = 'no_commit_4$atsign'; + atData = AtData()..data = 'randomDataString2'; + seqNum = await keyStore?.put(key2, atData); + print(seqNum); + seqNum = await keyStore?.remove(key2); + // ensure that the second key does not exist in keystore + expect(() async => await keyStore?.get(key2), + throwsA(predicate((e) => e is KeyNotFoundException))); + expect(commitLog.getLatestCommitEntry(key2)?.operation, CommitOp.DELETE); + expect(keyStore?.commitLog?.entriesCount(), 2); + }); + + tearDown(() async => await tearDownFunc()); + }); +} + +Future getKey(keyStore, key) async { + AtData? atData = await keyStore.get(key); + return atData?.data; +} + +Future getKeystoreManager(storageDir, atsign, + {required bool optimizeCommits}) async { var secondaryPersistenceStore = SecondaryPersistenceStoreFactory.getInstance() - .getSecondaryPersistenceStore('@test_user_1')!; + .getSecondaryPersistenceStore(atsign)!; var manager = secondaryPersistenceStore.getHivePersistenceManager()!; - await manager.init('test/hive'); - manager.scheduleKeyExpireTask(1); - + await manager.init(storageDir); + manager.scheduleKeyExpireTask(null, + runTimeInterval: Duration(seconds: 10), skipCommits: optimizeCommits); var keyStoreManager = secondaryPersistenceStore.getSecondaryKeyStoreManager()!; var keyStore = secondaryPersistenceStore.getSecondaryKeyStore()!; - var commitLogKeyStore = CommitLogKeyStore('@test_user_1'); - await commitLogKeyStore.init('test/hive/commit'); - keyStore.commitLog = AtCommitLog(commitLogKeyStore); + var commitLog = await AtCommitLogManagerImpl.getInstance() + .getCommitLog(atsign, commitLogPath: storageDir, enableCommitId: true); + keyStore.commitLog = commitLog; keyStoreManager.keyStore = keyStore; - var atData = AtData(); - atData.data = 'abc'; - await keyStoreManager - .getKeyStore() - .put('123', atData, time_to_live: 30 * 1000); - print('end'); - var atDataResponse = await keyStoreManager.getKeyStore().get('123'); - print(atDataResponse?.data); - assert(atDataResponse?.data == 'abc'); - var expiredKey = - await Future.delayed(Duration(minutes: 2), () => getKey(keyStoreManager)); - assert(expiredKey == null); - print(expiredKey); - exit(0); + return keyStoreManager; } -Future getKey(keyStoreManager) async { - AtData? atData = await keyStoreManager.getKeyStore().get('123'); - return atData?.data; +Future tearDownFunc() async { + await AtCommitLogManagerImpl.getInstance().close(); + var isExists = await Directory('test/hive/').exists(); + if (isExists) { + Directory('test/hive').deleteSync(recursive: true); + } }