Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: do NOT add delete entries in commit log when expired keys are deleted #1713

Merged
merged 37 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a9b1df8
fix: do NOT add DELETE entries for keys and notifications to commitLog
srieteja Dec 15, 2023
adef51a
Merge branch 'trunk' into delete_commit_nosync
srieteja Dec 19, 2023
dbd16cc
test: add unit tests to verify changes
srieteja Dec 20, 2023
e501dd2
test: fix tests
srieteja Dec 20, 2023
027c080
Merge branch 'trunk' into delete_commit_nosync
srieteja Dec 20, 2023
2e4099c
build: update changelog and pub version to 3.0.60
srieteja Dec 21, 2023
61bcb51
Merge remote-tracking branch 'origin/delete_commit_nosync' into delet…
srieteja Dec 21, 2023
c14326e
fix: remove unnecessary code
srieteja Dec 21, 2023
8cf2cce
test: reduce expired check test delay to 23s
srieteja Dec 29, 2023
c29b137
feat: optionally allow Duration as method param for scheduleKeyExpire…
srieteja Dec 29, 2023
7132cf1
Merge branch 'trunk' into delete_commit_nosync
srieteja Jan 3, 2024
01598e8
Merge branch 'trunk' into delete_commit_nosync
srieteja Jan 4, 2024
06b3ac8
feat: introduce flag to optimize commits for expired keys
srieteja Jan 4, 2024
b1c9493
docs: update changelog
srieteja Jan 4, 2024
1f0f7e3
tests: more tests
srieteja Jan 4, 2024
bdb992f
Merge branch 'trunk' into delete_commit_nosync
srieteja Jan 8, 2024
ff8b1ee
Merge branch 'trunk' into delete_commit_nosync
srieteja Jan 8, 2024
43596d8
hive_manager: remove incorrect schedule snippet
srieteja Jan 9, 2024
df8a192
Merge branch 'trunk' into delete_commit_nosync
srieteja Jan 9, 2024
81ca5e1
Merge branch 'trunk' into delete_commit_nosync
srieteja Jan 11, 2024
551b249
reformat: run dart formatter
srieteja Jan 19, 2024
dd8a20a
Merge remote-tracking branch 'origin/delete_commit_nosync' into delet…
srieteja Jan 19, 2024
a4ff313
Merge branch 'trunk' into delete_commit_nosync
srieteja Jan 19, 2024
e9b97f9
Merge branch 'trunk' into delete_commit_nosync
srieteja Jan 24, 2024
16a5f70
Merge branch 'trunk' into delete_commit_nosync
srieteja Feb 14, 2024
7e42af2
Merge branch 'trunk' into delete_commit_nosync
srieteja Feb 15, 2024
3630740
Merge branch 'trunk' into delete_commit_nosync
srieteja Feb 19, 2024
63c501e
Merge branch 'trunk' into delete_commit_nosync
srieteja Feb 19, 2024
6289b0d
build: update version in pubspec to 3.0.61
srieteja Feb 19, 2024
5e52f53
Merge remote-tracking branch 'origin/delete_commit_nosync' into delet…
srieteja Feb 19, 2024
7b3efa8
Merge branch 'trunk' into delete_commit_nosync
gkc Feb 20, 2024
fefa7d4
refactor: rename optimizeCommits -> skipCommits
srieteja Feb 20, 2024
a52e1ba
Merge remote-tracking branch 'origin/delete_commit_nosync' into delet…
srieteja Feb 20, 2024
60d8eff
Merge branch 'trunk' into delete_commit_nosync
srieteja Feb 20, 2024
7da4a37
tests: rename optimizeCommits -> skipCommits
srieteja Feb 20, 2024
1f1160a
Merge remote-tracking branch 'origin/delete_commit_nosync' into delet…
srieteja Feb 20, 2024
c2928e2
Merge branch 'trunk' into delete_commit_nosync
srieteja Feb 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/at_persistence_secondary_server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## 3.0.61
- feat: delete entries for expired keys are not committed to the commitLog [feature not enabled yet]
srieteja marked this conversation as resolved.
Show resolved Hide resolved
## 3.0.60
- build[deps]: Upgraded the following packages:
- at_commons to v4.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class HiveKeystore implements SecondaryKeyStore<String, AtData?, AtMetaData?> {
}

@Deprecated("Use [initialize]")

/// Deprecated. Use [initialize]
Future<void> init() async {
await initialize();
Expand Down Expand Up @@ -341,7 +340,9 @@ class HiveKeystore implements SecondaryKeyStore<String, AtData?, AtMetaData?> {

@override
@server
Future<bool> deleteExpiredKeys() async {
@client
Future<bool> deleteExpiredKeys({bool? skipCommit = false}) async {
logger.finer('Removing expired keys');
bool result = true;
try {
List<String> expiredKeys = await getExpiredKeys();
Expand All @@ -351,7 +352,9 @@ class HiveKeystore implements SecondaryKeyStore<String, AtData?, AtMetaData?> {

for (String element in expiredKeys) {
try {
await remove(element);
// delete entries for expired keys will not be added to the commitLog
// Removal of expired keys will be handled on the client side
await remove(element, skipCommit: skipCommit!);
} on KeyNotFoundException {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,20 @@ class HivePersistenceManager with HiveBase {
}

//TODO change into to Duration and construct cron string dynamically
void scheduleKeyExpireTask(int runFrequencyMins) {
void scheduleKeyExpireTask(int? runFrequencyMins, {Duration? runTimeInterval, bool skipCommits = false}) {
logger.finest('scheduleKeyExpireTask starting cron job.');
_cron.schedule(Schedule.parse('*/$runFrequencyMins * * * *'), () async {
Schedule schedule;
if(runTimeInterval != null){
schedule = Schedule(seconds: runTimeInterval.inSeconds);
} else {
schedule = Schedule.parse('*/$runFrequencyMins * * * *');
}
_cron.schedule(schedule, () async {
await Future.delayed(Duration(seconds: _random.nextInt(12)));
var hiveKeyStore = SecondaryPersistenceStoreFactory.getInstance()
.getSecondaryPersistenceStore(_atsign)!
.getSecondaryKeyStore()!;
await hiveKeyStore.deleteExpiredKeys();
await hiveKeyStore.deleteExpiredKeys(skipCommit: skipCommits);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class AtNotificationKeystore
var expiredKeys = await getExpiredKeys();
if (expiredKeys.isNotEmpty) {
await Future.forEach(expiredKeys, (expiredKey) async {
await remove(expiredKey);
// Delete entries for expired keys will not be added to commitLog
await remove(expiredKey, skipCommit: true);
});
} else {
_logger.finest('notification key store. No expired notifications');
Expand Down
2 changes: 1 addition & 1 deletion packages/at_persistence_secondary_server/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: at_persistence_secondary_server
description: A Dart library with the implementation classes for the persistence layer of the secondary server.
version: 3.0.60
version: 3.0.61
repository: https://github.com/atsign-foundation/at_server
homepage: https://docs.atsign.com/

Expand Down
192 changes: 168 additions & 24 deletions packages/at_persistence_secondary_server/test/hive_key_expiry_check.dart
Original file line number Diff line number Diff line change
@@ -1,38 +1,182 @@
import 'dart:io';

import 'package:at_commons/at_commons.dart';
import 'package:at_persistence_secondary_server/at_persistence_secondary_server.dart';
import 'package:at_persistence_secondary_server/src/keystore/hive_keystore.dart';
import 'package:test/expect.dart';
import 'package:test/scaffolding.dart';

void main() async {
var storageDir = '${Directory.current.path}/test/hive';
String atsign = '@test_user_1';
HiveKeystore? keyStore;

group('test scenarios for expired keys - CASE: optimizeCommits set to TRUE',
() {
// verifies that deletion of expired keys does NOT create/update commit entries
setUp(() async {
var keyStoreManager =
await getKeystoreManager(storageDir, atsign, optimizeCommits: true);
keyStore = keyStoreManager.getKeyStore() as HiveKeystore?;
assert(keyStore != null);
});

test('fetch expired key returns throws exception', () async {
String key = '123$atsign';
var atData = AtData()..data = 'abc';
await keyStore?.put(key, atData, time_to_live: 5 * 1000);
var atDataResponse = await keyStore?.get(key);
assert(atDataResponse?.data == 'abc');
stdout.writeln('Sleeping for 23s');
await Future.delayed(Duration(seconds: 23));
expect(
() async => getKey(keyStore, key),
throwsA(predicate((e) =>
e.toString().contains('123$atsign does not exist in keystore'))));
}, timeout: Timeout(Duration(minutes: 1)));

test('ensure expired keys deletion entry is not added to commitLog',
() async {
String key = 'no_commit_log_test$atsign';
var atData = AtData()..data = 'randomDataString';
await keyStore?.put(key, atData, time_to_live: 2000);
expect((await keyStore?.get(key))?.data, atData.data);

await Future.delayed(Duration(seconds: 4));
await keyStore?.deleteExpiredKeys(skipCommit: true);
// ensure that the key is expired
expect(
() async => await keyStore?.get(key),
throwsA(predicate((e) => e.toString().contains(
'no_commit_log_test@test_user_1 does not exist in keystore'))));
AtCommitLog? commitLog = keyStore?.commitLog as AtCommitLog;
expect(
commitLog.getLatestCommitEntry(key)?.operation, CommitOp.UPDATE_ALL);
// the latest commit entry is one with an UPDATE_ALL op which indicates that
// the deleteExpiredKeys did not add a DELETE commitEntry to the commitLog
expect(commitLog.entriesCount(), 1);
});

test('manually deleted keys add a commitEntry to commitLog', () async {
AtCommitLog? commitLog = keyStore?.commitLog as AtCommitLog;
// -----------------insert key 1 that expires in 100ms
String key1 = 'no_commit_1$atsign';
var atData = AtData()..data = 'randomDataString1';
int? seqNum = await keyStore?.put(key1, atData, time_to_live: 100);
print(seqNum);
await Future.delayed(Duration(seconds: 1));
await keyStore?.deleteExpiredKeys(skipCommit: true);
// ensure that the key is expired
expect(() async => await keyStore?.get(key1),
throwsA(predicate((p0) => p0 is KeyNotFoundException)));
expect(
commitLog.getLatestCommitEntry(key1)?.operation, CommitOp.UPDATE_ALL);
// ------------------insert key2 that is manually deleted
String key2 = 'no_commit_2$atsign';
atData = AtData()..data = 'randomDataString2';
seqNum = await keyStore?.put(key2, atData);
print(seqNum);
seqNum = await keyStore?.remove(key2);
// ensure that the second key does not exist in keystore
expect(() async => await keyStore?.get(key2),
throwsA(predicate((e) => e is KeyNotFoundException)));
expect(commitLog.getLatestCommitEntry(key2)?.operation, CommitOp.DELETE);
// the latest commitEntry for key2 has CommitOp.DELETE indicating that the commits are not being
// skipped for the keys that are not deleted as part of deleteExpiredKeys()
expect(keyStore?.commitLog?.entriesCount(), 2);
});

tearDown(() async => await tearDownFunc());
});

group('test scenarios for expired keys - CASE: optimizeCommits set to FALSE',
() {
// verifies that deletion of expired keys creates/updates commit entries
setUp(() async {
var keyStoreManager =
await getKeystoreManager(storageDir, atsign, optimizeCommits: false);
keyStore = keyStoreManager.getKeyStore() as HiveKeystore?;
assert(keyStore != null);
});

test('ensure expired keys deletion entry is not added to commitLog',
() async {
AtCommitLog? commitLog = keyStore?.commitLog as AtCommitLog;
String key = 'commit_test$atsign';
var atData = AtData()..data = 'randomDataString';
await keyStore?.put(key, atData, time_to_live: 2000);
// ensure key is inserted
expect((await keyStore?.get(key))?.data, atData.data);

await Future.delayed(Duration(seconds: 4));
await keyStore?.deleteExpiredKeys();
// ensure that the key is expired
expect(
() async => await keyStore?.get(key),
throwsA(predicate((e) =>
e.toString().contains('$key does not exist in keystore'))));

expect(commitLog.getLatestCommitEntry(key)?.operation, CommitOp.DELETE);
expect(commitLog.entriesCount(), 1);
});

test('manually deleted keys add a commitEntry to commitLog', () async {
AtCommitLog? commitLog = keyStore?.commitLog as AtCommitLog;
// -----------------insert key 1 that expires in 100ms
String key1 = 'no_commit_3$atsign';
var atData = AtData()..data = 'randomDataString1';
int? seqNum = await keyStore?.put(key1, atData, time_to_live: 100);
print(seqNum);
await Future.delayed(Duration(seconds: 1));
await keyStore?.deleteExpiredKeys();
// ensure that the key is expired
expect(() async => await keyStore?.get(key1),
throwsA(predicate((p0) => p0 is KeyNotFoundException)));
expect(commitLog.getLatestCommitEntry(key1)?.operation, CommitOp.DELETE);
// ------------------insert key2 that is manually deleted
String key2 = 'no_commit_4$atsign';
atData = AtData()..data = 'randomDataString2';
seqNum = await keyStore?.put(key2, atData);
print(seqNum);
seqNum = await keyStore?.remove(key2);
// ensure that the second key does not exist in keystore
expect(() async => await keyStore?.get(key2),
throwsA(predicate((e) => e is KeyNotFoundException)));
expect(commitLog.getLatestCommitEntry(key2)?.operation, CommitOp.DELETE);
expect(keyStore?.commitLog?.entriesCount(), 2);
});

tearDown(() async => await tearDownFunc());
});
}

Future<String?> getKey(keyStore, key) async {
AtData? atData = await keyStore.get(key);
return atData?.data;
}

Future<SecondaryKeyStoreManager> getKeystoreManager(storageDir, atsign,
{required bool optimizeCommits}) async {
var secondaryPersistenceStore = SecondaryPersistenceStoreFactory.getInstance()
.getSecondaryPersistenceStore('@test_user_1')!;
.getSecondaryPersistenceStore(atsign)!;
var manager = secondaryPersistenceStore.getHivePersistenceManager()!;
await manager.init('test/hive');
manager.scheduleKeyExpireTask(1);

await manager.init(storageDir);
manager.scheduleKeyExpireTask(null,
runTimeInterval: Duration(seconds: 10), skipCommits: optimizeCommits);
var keyStoreManager =
secondaryPersistenceStore.getSecondaryKeyStoreManager()!;
var keyStore = secondaryPersistenceStore.getSecondaryKeyStore()!;
var commitLogKeyStore = CommitLogKeyStore('@test_user_1');
await commitLogKeyStore.init('test/hive/commit');
keyStore.commitLog = AtCommitLog(commitLogKeyStore);
var commitLog = await AtCommitLogManagerImpl.getInstance()
.getCommitLog(atsign, commitLogPath: storageDir, enableCommitId: true);
keyStore.commitLog = commitLog;
keyStoreManager.keyStore = keyStore;
var atData = AtData();
atData.data = 'abc';
await keyStoreManager
.getKeyStore()
.put('123', atData, time_to_live: 30 * 1000);
print('end');
var atDataResponse = await keyStoreManager.getKeyStore().get('123');
print(atDataResponse?.data);
assert(atDataResponse?.data == 'abc');
var expiredKey =
await Future.delayed(Duration(minutes: 2), () => getKey(keyStoreManager));
assert(expiredKey == null);
print(expiredKey);
exit(0);
return keyStoreManager;
}

Future<String?> getKey(keyStoreManager) async {
AtData? atData = await keyStoreManager.getKeyStore().get('123');
return atData?.data;
Future<void> tearDownFunc() async {
await AtCommitLogManagerImpl.getInstance().close();
var isExists = await Directory('test/hive/').exists();
if (isExists) {
Directory('test/hive').deleteSync(recursive: true);
}
}